00001 /*----------------------------------------------------------------------------- 00002 Name: SubscribeDemo.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Little demo to show how a subscribe is done 00006 -----------------------------------------------------------------------------*/ 00007 #include <client/XmlBlasterAccess.h> 00008 #include <util/Global.h> 00009 #include <util/lexical_cast.h> 00010 #include <iostream> 00011 #include <sstream> 00012 #include <stdexcept> 00013 00014 namespace std { 00015 class UpdateException : public exception { 00016 public: 00017 explicit UpdateException(const string& what_arg ); 00018 }; 00019 } 00020 00021 using namespace std; 00022 using namespace org::xmlBlaster::client; 00023 using namespace org::xmlBlaster::util; 00024 using namespace org::xmlBlaster::util::qos; 00025 using namespace org::xmlBlaster::util::dispatch; 00026 using namespace org::xmlBlaster::client::qos; 00027 using namespace org::xmlBlaster::client::key; 00028 00029 class SubscribeDemo; 00030 class ProgressListener : public org::xmlBlaster::client::protocol::I_ProgressListener 00031 { 00032 SubscribeDemo *demoP; 00033 public: 00034 ProgressListener(SubscribeDemo *p) { this->demoP = p; } 00035 void progress(const std::string& name, unsigned long currBytesRead, unsigned long numBytes); 00036 }; 00037 00038 00048 class SubscribeDemo : public I_Callback, public I_ConnectionProblems 00049 { 00050 private: 00051 string ME; 00052 Global& global_; 00053 I_Log& log_; 00054 XmlBlasterAccess connection_; 00055 ProgressListener progressListener; 00056 int updateCounter; 00057 char ptr[1]; 00058 string subscriptionId; 00059 bool dispatcherActive; 00060 bool disconnect; 00061 bool interactive; 00062 bool interactiveUpdate; 00063 bool firstTime; 00064 long updateSleep; 00065 bool reportUpdateProgress; 00066 string updateExceptionErrorCode; 00067 string updateExceptionMessage; 00068 string updateExceptionRuntime; 00069 string oid; 00070 string domain; 00071 string xpath; 00072 bool multiSubscribe; 00073 bool persistentSubscribe; 00074 bool notifyOnErase; 00075 bool local; 00076 bool initialUpdate; 00077 bool updateOneway; 00078 bool wantContent; 00079 int historyNumUpdates; 00080 bool historyNewestFirst; 00081 string filterType; 00082 string filterVersion; 00083 string filterQuery; 00084 bool unSubscribe; 00085 int maxContentLength; 00086 00087 public: 00088 bool doContinue_; 00089 00090 SubscribeDemo(Global& glob) 00091 : ME("SubscribeDemo"), 00092 global_(glob), 00093 log_(glob.getLog("demo")), 00094 connection_(global_), 00095 progressListener(this), 00096 updateCounter(0) 00097 { 00098 initEnvironment(); 00099 doContinue_ = true; 00100 firstTime = true; 00101 execute(); 00102 } 00103 00104 I_Log& getLog() { return log_; } 00105 00106 void execute() 00107 { 00108 connect(); 00109 00110 subscribe(); 00111 00112 log_.info(ME, "Please use PublishDemo to publish a message '"+oid+"', i'm waiting for updates ..."); 00113 00114 if (interactive) { 00115 org::xmlBlaster::util::thread::Thread::sleepSecs(1); 00116 bool stop = false; 00117 while (!stop) { 00118 string dd = dispatcherActive ? "'d' to deactivate dispatcher" : "'a' to activate dispatcher"; 00119 std::cout << "(Enter " << dd << " 'q' to exit) >> "; 00120 std::cin.read(ptr,1); 00121 if (*ptr == 'q') stop = true; 00122 if (*ptr == 'a') connection_.setCallbackDispatcherActive(true), dispatcherActive=true; 00123 if (*ptr == 'd') connection_.setCallbackDispatcherActive(false), dispatcherActive=false; 00124 } 00125 } 00126 else { 00127 log_.plain(ME, "I will exit when the publisher destroys the topic ..."); 00128 while (doContinue_) { 00129 org::xmlBlaster::util::thread::Thread::sleepSecs(2); 00130 } 00131 } 00132 00133 unSubscribe_(); 00134 00135 disconnect_(); 00136 } 00137 00138 void initEnvironment() 00139 { 00140 dispatcherActive = global_.getProperty().get("dispatcherActive", true); 00141 disconnect = global_.getProperty().get("disconnect", true); 00142 interactive = global_.getProperty().get("interactive", true); 00143 interactiveUpdate = global_.getProperty().get("interactiveUpdate", false); 00144 updateSleep = global_.getProperty().get("updateSleep", 0L); 00145 reportUpdateProgress = global_.getProperty().get("reportUpdateProgress", false); 00146 updateExceptionErrorCode = global_.getProperty().get("updateException.errorCode", string("")); 00147 updateExceptionMessage = global_.getProperty().get("updateException.message", string("")); 00148 updateExceptionRuntime = global_.getProperty().get("updateException.runtime", string("")); 00149 oid = global_.getProperty().get("oid", ""); 00150 domain = global_.getProperty().get("domain", ""); 00151 xpath = global_.getProperty().get("xpath", ""); 00152 multiSubscribe = global_.getProperty().get("multiSubscribe", true); 00153 persistentSubscribe = global_.getProperty().get("persistentSubscribe", false); 00154 notifyOnErase = global_.getProperty().get("notifyOnErase", true); 00155 local = global_.getProperty().get("local", true); 00156 initialUpdate = global_.getProperty().get("initialUpdate", true); 00157 updateOneway = global_.getProperty().get("updateOneway", false); 00158 wantContent = global_.getProperty().get("wantContent", true); 00159 historyNumUpdates = global_.getProperty().get("historyNumUpdates", 1); 00160 historyNewestFirst = global_.getProperty().get("historyNewestFirst", true); 00161 filterType = global_.getProperty().get("filter.type", "GnuRegexFilter");// XPathFilter | ContentLenFilter | Sql92Filter 00162 filterVersion = global_.getProperty().get("filter.version", "1.0"); 00163 filterQuery = global_.getProperty().get("filter.query", ""); 00164 unSubscribe = global_.getProperty().get("unSubscribe", true); 00165 maxContentLength = global_.getProperty().get("maxContentLength", 250); 00166 00167 if (oid == "" && xpath == "") { 00168 log_.warn(ME, "No -oid or -xpath given, we subscribe to oid='Hello'."); 00169 oid = "Hello"; 00170 } 00171 00172 if (updateSleep > 0L && interactiveUpdate == true) { 00173 log_.warn(ME, "You can't set 'updateSleep' and 'interactiveUpdate' simultaneous, we reset interactiveUpdate to false"); 00174 interactiveUpdate = false; 00175 } 00176 00177 if (updateExceptionErrorCode != "" && updateExceptionRuntime != "") { 00178 log_.warn(ME, "You can't throw a runtime and an XmlBlasterException simultaneous, please check your settings " 00179 " -updateException.errorCode and -updateException.runtime"); 00180 updateExceptionRuntime = ""; 00181 } 00182 00183 log_.info(ME, "Used settings are:"); 00184 log_.info(ME, " -dispatcherActive " + lexical_cast<string>(dispatcherActive)); 00185 log_.info(ME, " -interactive " + lexical_cast<string>(interactive)); 00186 log_.info(ME, " -interactiveUpdate " + lexical_cast<string>(interactiveUpdate)); 00187 log_.info(ME, " -updateSleep " + lexical_cast<string>(updateSleep)); 00188 log_.info(ME, " -reportUpdateProgress " + lexical_cast<string>(reportUpdateProgress)); 00189 log_.info(ME, " -updateException.errorCode " + updateExceptionErrorCode); 00190 log_.info(ME, " -updateException.message " + updateExceptionMessage); 00191 log_.info(ME, " -updateException.runtime " + updateExceptionRuntime); 00192 log_.info(ME, " -oid " + oid); 00193 log_.info(ME, " -domain " + domain); 00194 log_.info(ME, " -xpath " + xpath); 00195 log_.info(ME, " -multiSubscribe " + lexical_cast<string>(multiSubscribe)); 00196 log_.info(ME, " -persistentSubscribe " + lexical_cast<string>(persistentSubscribe)); 00197 log_.info(ME, " -notifyOnErase " + lexical_cast<string>(notifyOnErase)); 00198 log_.info(ME, " -local " + lexical_cast<string>(local)); 00199 log_.info(ME, " -initialUpdate " + lexical_cast<string>(initialUpdate)); 00200 log_.info(ME, " -updateOneway " + lexical_cast<string>(updateOneway)); 00201 log_.info(ME, " -historyNumUpdates " + lexical_cast<string>(historyNumUpdates)); 00202 log_.info(ME, " -historyNewestFirst " + lexical_cast<string>(historyNewestFirst)); 00203 log_.info(ME, " -wantContent " + lexical_cast<string>(wantContent)); 00204 log_.info(ME, " -maxContentLength " + lexical_cast<string>(maxContentLength)); 00205 log_.info(ME, " -unSubscribe " + lexical_cast<string>(unSubscribe)); 00206 log_.info(ME, " -disconnect " + lexical_cast<string>(disconnect)); 00207 log_.info(ME, " -filter.type " + filterType); 00208 log_.info(ME, " -filter.version " + filterVersion); 00209 log_.info(ME, " -filter.query " + filterQuery); 00210 log_.info(ME, "For more info please read:"); 00211 log_.info(ME, " http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.subscribe.html"); 00212 } 00213 00214 bool reachedAlive(StatesEnum /*oldState*/, I_ConnectionsHandler* connectionsHandler) 00215 { 00216 log_.info(ME, "reachedAlive()"); 00217 if (!firstTime && !connectionsHandler->getConnectReturnQos()->isReconnected() && !persistentSubscribe) { 00218 subscribe(); // We lost the old subscription, initialize subscription again 00219 } 00220 return true; 00221 } 00222 00223 void reachedDead(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/) 00224 { 00225 log_.info(ME, "reachedDead()"); 00226 } 00227 00228 void reachedPolling(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/) 00229 { 00230 log_.info(ME, "reachedPolling()"); 00231 } 00232 00233 void connect() 00234 { 00235 connection_.initFailsafe(this); 00236 ConnectQos connQos(global_); 00237 connQos.getCbAddress()->setDispatcherActive(dispatcherActive); 00238 if (log_.trace()) log_.trace(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos.toXml()); 00239 ConnectReturnQos retQos = connection_.connect(connQos, this); 00240 if (log_.trace()) log_.trace(ME, "successfully connected to xmlBlaster. Return qos: " + retQos.toXml()); 00241 if (reportUpdateProgress) { 00242 connection_.registerProgressListener(&progressListener); 00243 } 00244 } 00245 00246 void subscribe() 00247 { 00248 SubscribeKey *sk = 0; 00249 string qStr = ""; 00250 try { 00251 sk = new SubscribeKey(global_); 00252 if (oid.length() > 0) { 00253 //sk = new SubscribeKey(global_, oid); 00254 qStr = oid; 00255 sk->setOid(oid); 00256 } 00257 else if (xpath.length() > 0) { 00258 //sk = new SubscribeKey(global_, xpath, Constants::XPATH); 00259 qStr = xpath; 00260 sk->setQueryString(xpath); 00261 } 00262 if (domain.length() > 0) { // cluster routing information 00263 if (sk == 0) sk = new SubscribeKey(global_, "", Constants::D_O_M_A_I_N); 00264 sk->setDomain(domain); 00265 qStr = domain; 00266 } 00267 SubscribeQos sq(global_); 00268 sq.setWantInitialUpdate(initialUpdate); 00269 sq.setWantUpdateOneway(updateOneway); 00270 sq.setMultiSubscribe(multiSubscribe); 00271 sq.setPersistent(persistentSubscribe); 00272 sq.setWantNotify(notifyOnErase); 00273 sq.setWantLocal(local); 00274 sq.setWantContent(wantContent); 00275 00276 HistoryQos historyQos(global_); 00277 historyQos.setNumEntries(historyNumUpdates); 00278 historyQos.setNewestFirst(historyNewestFirst); 00279 sq.setHistoryQos(historyQos); 00280 00281 if (filterQuery.length() > 0) { 00282 AccessFilterQos filter(global_, filterType, filterVersion, filterQuery); 00283 sq.addAccessFilter(filter); 00284 } 00285 00286 log_.info(ME, "SubscribeKey=" + sk->toXml()); 00287 log_.info(ME, "SubscribeQos=" + sq.toXml()); 00288 00289 if (firstTime && interactive) { 00290 log_.info(ME, "Hit a key to subscribe '" + qStr + "'"); 00291 std::cin.read(ptr,1); 00292 } 00293 firstTime = false; 00294 00295 SubscribeReturnQos srq = connection_.subscribe(*sk, sq); 00296 subscriptionId = srq.getSubscriptionId(); 00297 00298 log_.info(ME, "Subscribed on topic '" + ((oid.length() > 0) ? oid : xpath) + 00299 "', got subscription id='" + srq.getSubscriptionId() + "'\n" + srq.toXml()); 00300 if (log_.dump()) log_.dump("", "Subscribed: " + sk->toXml() + sq.toXml() + srq.toXml()); 00301 delete sk; 00302 } 00303 catch (...) { // a finally would have been more appropriate 00304 delete sk; 00305 throw; 00306 } 00307 } 00308 00309 void unSubscribe_() 00310 { 00311 if (unSubscribe) { 00312 if (interactive) { 00313 log_.info(ME, "Hit a key to unSubscribe"); 00314 std::cin.read(ptr,1); 00315 } 00316 00317 UnSubscribeKey uk(global_, subscriptionId); 00318 if (domain.length() > 0) // cluster routing information TODO!!! 00319 uk.setDomain(domain); 00320 UnSubscribeQos uq(global_); 00321 log_.info(ME, "UnSubscribeKey=" + uk.toXml()); 00322 log_.info(ME, "UnSubscribeQos=" + uq.toXml()); 00323 vector<UnSubscribeReturnQos> urqArr = connection_.unSubscribe(uk, uq); 00324 log_.info(ME, "UnSubscribe on " + lexical_cast<string>(urqArr.size()) + " subscriptions done"); 00325 } 00326 } 00327 00328 void disconnect_() 00329 { 00330 if (disconnect) { 00331 DisconnectQos dq(global_); 00332 connection_.disconnect(dq); 00333 log_.info(ME, "Disconnected"); 00334 } 00335 } 00336 00340 string update(const string& sessionId, UpdateKey& updateKey, const unsigned char *content, long contentSize, UpdateQos& updateQos) 00341 { 00342 stringstream sout; 00343 00344 if (updateQos.isErased() && oid.length() > 0) { // Erased topic with EXACT subscription? 00345 sout << endl << "============= Topic '" + updateKey.getOid() + "' is ERASED =======================" << endl; 00346 log_.plain(ME, sout.str()); 00347 subscribe(); // topic is erased -> re-subsribe 00348 return Constants::RET_OK; // "<qos><state id='OK'/></qos>"; 00349 } 00350 00351 //const Global& global_ = updateKey.getGlobal(); 00352 ++updateCounter; 00353 00354 //NOTE: subscribe("anotherDummy"); -> subscribe in update does not work 00355 // with single threaded 'mico' or SOCKET protocol 00356 00357 log_.info(ME, "Receiving update #" + lexical_cast<string>(updateCounter) + " of a message, secret sessionId=" + sessionId + " ..."); 00358 00359 sout << endl << "============= START #" << updateCounter << " '" << updateKey.getOid() << "' ======================="; 00360 string contentStr((char*)content, (char*)(content)+contentSize); 00361 sout << endl << "<xmlBlaster>"; 00362 sout << updateKey.toXml(" "); 00363 sout << endl << " <content size='" << contentSize << "'>"; 00364 if (contentSize < maxContentLength) 00365 sout << endl << contentStr; 00366 else 00367 sout << endl << contentStr.substr(0, maxContentLength-5) << " ..."; 00368 sout << endl << " </content>"; 00369 sout << updateQos.toXml(" "); 00370 sout << endl << "</xmlBlaster>"; 00371 sout << endl << "============= END #" << updateCounter << " '" << updateKey.getOid() << "' ========================="; 00372 sout << endl; 00373 log_.plain(ME, sout.str()); 00374 00375 // Dump the ClientProperties decoded (the above dump may contain Base64 encoding): 00376 const QosData::ClientPropertyMap& propMap = updateQos.getClientProperties(); 00377 QosData::ClientPropertyMap::const_iterator mi; 00378 for (mi=propMap.begin(); mi!=propMap.end(); ++mi) { 00379 const ClientProperty& clientProperty = mi->second; 00380 if (clientProperty.isBase64()) { 00381 log_.info(ME, "ClientProperty decoded: "+mi->first+"=" + clientProperty.getStringValue()); 00382 } 00383 } 00384 // Examples for direct access: 00385 if (updateQos.hasClientProperty(string("StringKey"))) { 00386 log_.info(ME, "ClientProperty BLA=" +updateQos.getClientProperty("BLA", string("MISSING VALUE?"))); 00387 } 00388 if (updateQos.hasClientProperty(string("ALONG"))) { 00389 long aLongValue = updateQos.getClientProperty("ALONG", -1L); 00390 log_.info(ME, "ClientProperty ALONG=" + lexical_cast<string>(aLongValue)); 00391 } 00392 00393 if (updateSleep > 0L) { 00394 log_.info(ME, "Sleeping for " + lexical_cast<string>(updateSleep) + " millis in callback ..."); 00395 org::xmlBlaster::util::thread::Thread::sleep(updateSleep); 00396 log_.info(ME, "Waking up."); 00397 } 00398 else if (interactiveUpdate) { 00399 log_.info(ME, "Hit a key to return from update() (we are blocking the server callback) ..."); 00400 std::cin.read(ptr,1); 00401 log_.info(ME, "Returning update() - control goes back to server"); 00402 } 00403 00404 if (updateExceptionErrorCode != "") { 00405 log_.info(ME, "Throwing XmlBlasterException with errorCode='" + updateExceptionErrorCode + "' back to server ..."); 00406 throw XmlBlasterException(updateExceptionErrorCode, ME, updateExceptionMessage); 00407 } 00408 00409 if (updateExceptionRuntime != "") { 00410 log_.info(ME, "Throwing RuntimeException '" + updateExceptionRuntime + "'"); 00411 //throw UpdateException(updateExceptionRuntime); 00412 throw logic_error(updateExceptionRuntime); 00413 } 00414 00415 return Constants::RET_OK; 00416 } 00417 }; 00418 00419 void ProgressListener::progress(const std::string& name, unsigned long currBytesRead, unsigned long numBytes) { 00420 demoP->getLog().info("SubscribeDemo", name + "Progress of incoming message is currBytesRead=" + 00421 lexical_cast<string>(currBytesRead) + " nbytes=" + lexical_cast<string>(numBytes)); 00422 } 00423 00431 int main(int args, char ** argv) 00432 { 00433 org::xmlBlaster::util::Object_Lifetime_Manager::init(); 00434 //TimestampFactory& tsFactory = TimestampFactory::getInstance(); 00435 //Timestamp startStamp = tsFactory.getTimestamp(); 00436 //std::cout << " start time: " << tsFactory.toXml(startStamp, "", true) << std::endl; 00437 00438 try { 00439 Global& glob = Global::getInstance(); 00440 glob.initialize(args, argv); 00441 00442 if (glob.wantsHelp()) { 00443 glob.getLog().plain("", Global::usage()); 00444 glob.getLog().plain("", "\nExample:\n"); 00445 glob.getLog().plain("", " SubscribeDemo -trace true -interactiveUpdate true\n"); 00446 glob.getLog().plain("", " SubscribeDemo -xpath '//key' -filter.query '^H.*'\n"); 00447 org::xmlBlaster::util::Object_Lifetime_Manager::fini(); 00448 return 1; 00449 } 00450 00451 SubscribeDemo demo(glob); 00452 00453 //Timestamp stopStamp = tsFactory.getTimestamp(); 00454 //std::cout << " end time: " << tsFactory.toXml(stopStamp, "", true) << std::endl; 00455 //Timestamp diff = stopStamp - startStamp; 00456 //std::cout << " time used for demo: " << tsFactory.toXml(diff, "", true) << std::endl; 00457 } 00458 catch (XmlBlasterException& ex) { 00459 std::cout << ex.toXml() << std::endl; 00460 } 00461 catch (bad_exception& ex) { 00462 cout << "bad_exception: " << ex.what() << endl; 00463 } 00464 catch (exception& ex) { 00465 cout << " exception: " << ex.what() << endl; 00466 } 00467 catch (string& ex) { 00468 cout << "string: " << ex << endl; 00469 } 00470 catch (char* ex) { 00471 cout << "char* : " << ex << endl; 00472 } 00473 00474 catch (...) 00475 { 00476 cout << "unknown exception occured" << endl; 00477 XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread"); 00478 cout << e.toXml() << endl; 00479 } 00480 00481 org::xmlBlaster::util::Object_Lifetime_Manager::fini(); 00482 return 0; 00483 }