demo/c++/SubscribeDemo.cpp

Go to the documentation of this file.
00001 /*-----------------------------------------------------------------------------
00002 Name:      SubscribeDemo.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Little demo to show how a subscribe is done
00006 -----------------------------------------------------------------------------*/
00007 #include <client/XmlBlasterAccess.h>
00008 #include <util/Global.h>
00009 #include <util/lexical_cast.h>
00010 #include <iostream>
00011 #include <sstream>
00012 #include <stdexcept>
00013 
00014 namespace std {
00015   class UpdateException : public exception {
00016   public:
00017     explicit UpdateException(const string&  what_arg );
00018   };
00019 }
00020 
00021 using namespace std;
00022 using namespace org::xmlBlaster::client;
00023 using namespace org::xmlBlaster::util;
00024 using namespace org::xmlBlaster::util::qos;
00025 using namespace org::xmlBlaster::util::dispatch;
00026 using namespace org::xmlBlaster::client::qos;
00027 using namespace org::xmlBlaster::client::key;
00028 
00029 class SubscribeDemo;
00030 class ProgressListener : public org::xmlBlaster::client::protocol::I_ProgressListener
00031 {
00032    SubscribeDemo *demoP;
00033    public:
00034    ProgressListener(SubscribeDemo *p) { this->demoP = p; }
00035    void progress(const std::string& name, unsigned long currBytesRead, unsigned long numBytes);
00036 };
00037 
00038 
00048 class SubscribeDemo : public I_Callback, public I_ConnectionProblems
00049 {
00050 private:
00051    string           ME;
00052    Global&          global_;
00053    I_Log&           log_;
00054    XmlBlasterAccess connection_;
00055    ProgressListener progressListener;
00056    int updateCounter;
00057    char ptr[1];
00058    string subscriptionId;
00059    bool dispatcherActive;
00060    bool disconnect;
00061    bool interactive;
00062    bool interactiveUpdate;
00063    bool firstTime;
00064    long updateSleep;
00065    bool reportUpdateProgress;
00066    string updateExceptionErrorCode;
00067    string updateExceptionMessage;
00068    string updateExceptionRuntime;
00069    string oid;
00070    string domain;
00071    string xpath;
00072    bool multiSubscribe;
00073    bool persistentSubscribe;
00074    bool notifyOnErase;
00075    bool local;
00076    bool initialUpdate;
00077    bool updateOneway;
00078    bool wantContent;
00079    int historyNumUpdates;
00080    bool historyNewestFirst;
00081    string filterType;
00082    string filterVersion;
00083    string filterQuery;
00084    bool unSubscribe;
00085    int maxContentLength;
00086 
00087 public:
00088    bool doContinue_;
00089 
00090    SubscribeDemo(Global& glob) 
00091       : ME("SubscribeDemo"), 
00092         global_(glob), 
00093         log_(glob.getLog("demo")),
00094         connection_(global_),
00095         progressListener(this),
00096         updateCounter(0)
00097    {
00098       initEnvironment();
00099       doContinue_ = true;
00100       firstTime = true;
00101       execute();
00102    }
00103 
00104    I_Log& getLog() { return log_; }
00105 
00106    void execute()
00107    {
00108       connect();
00109 
00110       subscribe();
00111       
00112       log_.info(ME, "Please use PublishDemo to publish a message '"+oid+"', i'm waiting for updates ...");
00113 
00114       if (interactive) {
00115          org::xmlBlaster::util::thread::Thread::sleepSecs(1);
00116          bool stop = false;
00117          while (!stop) {
00118             string dd = dispatcherActive ? "'d' to deactivate dispatcher" : "'a' to activate dispatcher";
00119             std::cout << "(Enter " << dd << " 'q' to exit) >> ";
00120             std::cin.read(ptr,1);
00121             if (*ptr == 'q') stop = true;
00122             if (*ptr == 'a') connection_.setCallbackDispatcherActive(true), dispatcherActive=true;
00123             if (*ptr == 'd') connection_.setCallbackDispatcherActive(false), dispatcherActive=false;
00124          }
00125       }
00126       else {
00127          log_.plain(ME, "I will exit when the publisher destroys the topic ...");
00128          while (doContinue_) {
00129             org::xmlBlaster::util::thread::Thread::sleepSecs(2);
00130          }
00131       }
00132       
00133       unSubscribe_();
00134       
00135       disconnect_();
00136    }
00137 
00138    void initEnvironment()
00139    {
00140       dispatcherActive = global_.getProperty().get("dispatcherActive", true);
00141       disconnect = global_.getProperty().get("disconnect", true);
00142       interactive = global_.getProperty().get("interactive", true);
00143       interactiveUpdate = global_.getProperty().get("interactiveUpdate", false);
00144       updateSleep = global_.getProperty().get("updateSleep", 0L);
00145       reportUpdateProgress = global_.getProperty().get("reportUpdateProgress", false);
00146       updateExceptionErrorCode = global_.getProperty().get("updateException.errorCode", string(""));
00147       updateExceptionMessage = global_.getProperty().get("updateException.message", string(""));
00148       updateExceptionRuntime = global_.getProperty().get("updateException.runtime", string(""));
00149       oid = global_.getProperty().get("oid", "");
00150       domain = global_.getProperty().get("domain", "");
00151       xpath = global_.getProperty().get("xpath", "");
00152       multiSubscribe = global_.getProperty().get("multiSubscribe", true);
00153       persistentSubscribe = global_.getProperty().get("persistentSubscribe", false);
00154       notifyOnErase = global_.getProperty().get("notifyOnErase", true);
00155       local = global_.getProperty().get("local", true);
00156       initialUpdate = global_.getProperty().get("initialUpdate", true);
00157       updateOneway = global_.getProperty().get("updateOneway", false);
00158       wantContent = global_.getProperty().get("wantContent", true);
00159       historyNumUpdates = global_.getProperty().get("historyNumUpdates", 1);
00160       historyNewestFirst = global_.getProperty().get("historyNewestFirst", true);
00161       filterType = global_.getProperty().get("filter.type", "GnuRegexFilter");// XPathFilter | ContentLenFilter | Sql92Filter
00162       filterVersion = global_.getProperty().get("filter.version", "1.0");
00163       filterQuery = global_.getProperty().get("filter.query", "");
00164       unSubscribe = global_.getProperty().get("unSubscribe", true);
00165       maxContentLength = global_.getProperty().get("maxContentLength", 250);
00166 
00167       if (oid == "" && xpath == "") {
00168          log_.warn(ME, "No -oid or -xpath given, we subscribe to oid='Hello'.");
00169          oid = "Hello";
00170       }
00171 
00172       if (updateSleep > 0L && interactiveUpdate == true) {
00173          log_.warn(ME, "You can't set 'updateSleep' and  'interactiveUpdate' simultaneous, we reset interactiveUpdate to false");
00174          interactiveUpdate = false;
00175       }
00176 
00177       if (updateExceptionErrorCode != "" && updateExceptionRuntime != "") {
00178          log_.warn(ME, "You can't throw a runtime and an XmlBlasterException simultaneous, please check your settings "
00179                         " -updateException.errorCode and -updateException.runtime");
00180          updateExceptionRuntime = "";
00181       }
00182 
00183       log_.info(ME, "Used settings are:");
00184       log_.info(ME, "   -dispatcherActive    " + lexical_cast<string>(dispatcherActive));
00185       log_.info(ME, "   -interactive         " + lexical_cast<string>(interactive));
00186       log_.info(ME, "   -interactiveUpdate   " + lexical_cast<string>(interactiveUpdate));
00187       log_.info(ME, "   -updateSleep         " + lexical_cast<string>(updateSleep));
00188       log_.info(ME, "   -reportUpdateProgress      " + lexical_cast<string>(reportUpdateProgress));
00189       log_.info(ME, "   -updateException.errorCode " + updateExceptionErrorCode);
00190       log_.info(ME, "   -updateException.message   " + updateExceptionMessage);
00191       log_.info(ME, "   -updateException.runtime   " + updateExceptionRuntime);
00192       log_.info(ME, "   -oid                 " + oid);
00193       log_.info(ME, "   -domain              " + domain);
00194       log_.info(ME, "   -xpath               " + xpath);
00195       log_.info(ME, "   -multiSubscribe      " + lexical_cast<string>(multiSubscribe));
00196       log_.info(ME, "   -persistentSubscribe " + lexical_cast<string>(persistentSubscribe));
00197       log_.info(ME, "   -notifyOnErase       " + lexical_cast<string>(notifyOnErase));
00198       log_.info(ME, "   -local               " + lexical_cast<string>(local));
00199       log_.info(ME, "   -initialUpdate       " + lexical_cast<string>(initialUpdate));
00200       log_.info(ME, "   -updateOneway        " + lexical_cast<string>(updateOneway));
00201       log_.info(ME, "   -historyNumUpdates   " + lexical_cast<string>(historyNumUpdates));
00202       log_.info(ME, "   -historyNewestFirst  " + lexical_cast<string>(historyNewestFirst));
00203       log_.info(ME, "   -wantContent         " + lexical_cast<string>(wantContent));
00204       log_.info(ME, "   -maxContentLength    " + lexical_cast<string>(maxContentLength));
00205       log_.info(ME, "   -unSubscribe         " + lexical_cast<string>(unSubscribe));
00206       log_.info(ME, "   -disconnect          " + lexical_cast<string>(disconnect));
00207       log_.info(ME, "   -filter.type         " + filterType);
00208       log_.info(ME, "   -filter.version      " + filterVersion);
00209       log_.info(ME, "   -filter.query        " + filterQuery);
00210       log_.info(ME, "For more info please read:");
00211       log_.info(ME, "   http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.subscribe.html");
00212    }
00213 
00214    bool reachedAlive(StatesEnum /*oldState*/, I_ConnectionsHandler* connectionsHandler)
00215    {
00216       log_.info(ME, "reachedAlive()");
00217       if (!firstTime && !connectionsHandler->getConnectReturnQos()->isReconnected() && !persistentSubscribe) {
00218          subscribe(); // We lost the old subscription, initialize subscription again
00219       }
00220       return true;
00221    }
00222 
00223    void reachedDead(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
00224    {
00225       log_.info(ME, "reachedDead()");
00226    }
00227 
00228    void reachedPolling(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
00229    {
00230       log_.info(ME, "reachedPolling()");
00231    }
00232 
00233    void connect()
00234    {
00235       connection_.initFailsafe(this);
00236       ConnectQos connQos(global_);
00237       connQos.getCbAddress()->setDispatcherActive(dispatcherActive);
00238       if (log_.trace()) log_.trace(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos.toXml());
00239       ConnectReturnQos retQos = connection_.connect(connQos, this);
00240       if (log_.trace()) log_.trace(ME, "successfully connected to xmlBlaster. Return qos: " + retQos.toXml());
00241       if (reportUpdateProgress) {
00242          connection_.registerProgressListener(&progressListener);
00243       }
00244    }
00245 
00246    void subscribe()
00247    {
00248       SubscribeKey *sk = 0;
00249       string qStr = "";
00250       try {
00251          sk = new SubscribeKey(global_);
00252          if (oid.length() > 0) {
00253             //sk = new SubscribeKey(global_, oid);
00254             qStr = oid;
00255             sk->setOid(oid);
00256          }
00257          else if (xpath.length() > 0) {
00258             //sk = new SubscribeKey(global_, xpath, Constants::XPATH);
00259             qStr = xpath;
00260             sk->setQueryString(xpath);
00261          }
00262          if (domain.length() > 0) {  // cluster routing information
00263             if (sk == 0) sk = new SubscribeKey(global_, "", Constants::D_O_M_A_I_N);
00264             sk->setDomain(domain);
00265             qStr = domain;
00266          }
00267          SubscribeQos sq(global_);
00268          sq.setWantInitialUpdate(initialUpdate);
00269          sq.setWantUpdateOneway(updateOneway);
00270          sq.setMultiSubscribe(multiSubscribe);
00271          sq.setPersistent(persistentSubscribe);
00272          sq.setWantNotify(notifyOnErase);
00273          sq.setWantLocal(local);
00274          sq.setWantContent(wantContent);
00275          
00276          HistoryQos historyQos(global_);
00277          historyQos.setNumEntries(historyNumUpdates);
00278          historyQos.setNewestFirst(historyNewestFirst);
00279          sq.setHistoryQos(historyQos);
00280 
00281          if (filterQuery.length() > 0) {
00282             AccessFilterQos filter(global_, filterType, filterVersion, filterQuery);
00283             sq.addAccessFilter(filter);
00284          }
00285 
00286          log_.info(ME, "SubscribeKey=" + sk->toXml());
00287          log_.info(ME, "SubscribeQos=" + sq.toXml());
00288 
00289          if (firstTime && interactive) {
00290             log_.info(ME, "Hit a key to subscribe '" + qStr + "'");
00291             std::cin.read(ptr,1);
00292          }
00293          firstTime = false;
00294 
00295          SubscribeReturnQos srq = connection_.subscribe(*sk, sq);
00296          subscriptionId = srq.getSubscriptionId();
00297 
00298          log_.info(ME, "Subscribed on topic '" + ((oid.length() > 0) ? oid : xpath) +
00299                       "', got subscription id='" + srq.getSubscriptionId() + "'\n" + srq.toXml());
00300          if (log_.dump()) log_.dump("", "Subscribed: " + sk->toXml() + sq.toXml() + srq.toXml());
00301          delete sk;
00302       }
00303       catch (...) { // a finally would have been more appropriate
00304          delete sk;
00305          throw;
00306       }
00307    }
00308 
00309    void unSubscribe_()
00310    {
00311       if (unSubscribe) {
00312          if (interactive) {
00313             log_.info(ME, "Hit a key to unSubscribe");
00314             std::cin.read(ptr,1);
00315          }
00316 
00317          UnSubscribeKey uk(global_, subscriptionId);
00318          if (domain.length() > 0)  // cluster routing information TODO!!!
00319             uk.setDomain(domain);
00320          UnSubscribeQos uq(global_);
00321          log_.info(ME, "UnSubscribeKey=" + uk.toXml());
00322          log_.info(ME, "UnSubscribeQos=" + uq.toXml());
00323          vector<UnSubscribeReturnQos> urqArr = connection_.unSubscribe(uk, uq);
00324          log_.info(ME, "UnSubscribe on " + lexical_cast<string>(urqArr.size()) + " subscriptions done");
00325       }
00326    }
00327 
00328    void disconnect_()
00329    {
00330       if (disconnect) {
00331          DisconnectQos dq(global_);
00332          connection_.disconnect(dq);
00333          log_.info(ME, "Disconnected");
00334       }
00335    }
00336 
00340    string update(const string& sessionId, UpdateKey& updateKey, const unsigned char *content, long contentSize, UpdateQos& updateQos)
00341    {
00342       stringstream sout;
00343 
00344       if (updateQos.isErased() && oid.length() > 0) { // Erased topic with EXACT subscription?
00345          sout << endl << "============= Topic '" + updateKey.getOid() + "' is ERASED =======================" << endl;
00346          log_.plain(ME, sout.str());
00347          subscribe();              // topic is erased -> re-subsribe
00348          return Constants::RET_OK; // "<qos><state id='OK'/></qos>";
00349       }
00350 
00351       //const Global& global_ = updateKey.getGlobal();
00352       ++updateCounter;
00353 
00354       //NOTE: subscribe("anotherDummy");  -> subscribe in update does not work
00355       //      with single threaded 'mico' or SOCKET protocol
00356 
00357       log_.info(ME, "Receiving update #" + lexical_cast<string>(updateCounter) + " of a message, secret sessionId=" + sessionId + " ...");
00358 
00359       sout << endl << "============= START #" << updateCounter << " '" << updateKey.getOid() << "' =======================";
00360       string contentStr((char*)content, (char*)(content)+contentSize);
00361       sout << endl << "<xmlBlaster>";
00362       sout << updateKey.toXml("  ");
00363       sout << endl << " <content size='" << contentSize << "'>";
00364       if (contentSize < maxContentLength)
00365          sout << endl << contentStr;
00366       else
00367          sout << endl << contentStr.substr(0, maxContentLength-5) << " ...";
00368       sout << endl << " </content>";
00369       sout << updateQos.toXml("  ");
00370       sout << endl << "</xmlBlaster>";
00371       sout << endl << "============= END #" << updateCounter << " '" << updateKey.getOid() << "' =========================";
00372       sout << endl;
00373       log_.plain(ME, sout.str());
00374 
00375       // Dump the ClientProperties decoded (the above dump may contain Base64 encoding):
00376       const QosData::ClientPropertyMap& propMap = updateQos.getClientProperties();
00377       QosData::ClientPropertyMap::const_iterator mi;
00378       for (mi=propMap.begin(); mi!=propMap.end(); ++mi) {
00379          const ClientProperty& clientProperty = mi->second;
00380          if (clientProperty.isBase64()) {
00381             log_.info(ME, "ClientProperty decoded: "+mi->first+"=" + clientProperty.getStringValue());
00382          }
00383       }
00384       // Examples for direct access:
00385       if (updateQos.hasClientProperty(string("StringKey"))) {
00386          log_.info(ME, "ClientProperty BLA=" +updateQos.getClientProperty("BLA", string("MISSING VALUE?")));
00387       }
00388       if (updateQos.hasClientProperty(string("ALONG"))) {
00389          long aLongValue = updateQos.getClientProperty("ALONG", -1L);
00390          log_.info(ME, "ClientProperty ALONG=" + lexical_cast<string>(aLongValue));
00391       }
00392       
00393       if (updateSleep > 0L) {
00394          log_.info(ME, "Sleeping for " + lexical_cast<string>(updateSleep) + " millis in callback ...");
00395          org::xmlBlaster::util::thread::Thread::sleep(updateSleep);
00396          log_.info(ME, "Waking up.");
00397       }
00398       else if (interactiveUpdate) {
00399          log_.info(ME, "Hit a key to return from update() (we are blocking the server callback) ...");
00400          std::cin.read(ptr,1);
00401          log_.info(ME, "Returning update() - control goes back to server");
00402       }
00403 
00404       if (updateExceptionErrorCode != "") {
00405          log_.info(ME, "Throwing XmlBlasterException with errorCode='" + updateExceptionErrorCode + "' back to server ...");
00406          throw XmlBlasterException(updateExceptionErrorCode, ME, updateExceptionMessage); 
00407       }
00408 
00409       if (updateExceptionRuntime != "") {
00410          log_.info(ME, "Throwing RuntimeException '" + updateExceptionRuntime + "'");
00411          //throw UpdateException(updateExceptionRuntime);
00412          throw logic_error(updateExceptionRuntime);
00413       }
00414 
00415       return Constants::RET_OK;
00416    }
00417 };
00418 
00419 void ProgressListener::progress(const std::string& name, unsigned long currBytesRead, unsigned long numBytes) {
00420    demoP->getLog().info("SubscribeDemo", name + "Progress of incoming message is currBytesRead=" +
00421             lexical_cast<string>(currBytesRead) + " nbytes=" + lexical_cast<string>(numBytes));
00422 }
00423 
00431 int main(int args, char ** argv)
00432 {
00433    org::xmlBlaster::util::Object_Lifetime_Manager::init();
00434    //TimestampFactory& tsFactory = TimestampFactory::getInstance();
00435    //Timestamp startStamp = tsFactory.getTimestamp();
00436    //std::cout << " start time: " << tsFactory.toXml(startStamp, "", true) << std::endl;
00437 
00438    try {
00439       Global& glob = Global::getInstance();
00440       glob.initialize(args, argv);
00441 
00442       if (glob.wantsHelp()) {
00443          glob.getLog().plain("", Global::usage());
00444          glob.getLog().plain("", "\nExample:\n");
00445          glob.getLog().plain("", "   SubscribeDemo -trace true -interactiveUpdate true\n");
00446          glob.getLog().plain("", "   SubscribeDemo -xpath '//key' -filter.query '^H.*'\n");
00447          org::xmlBlaster::util::Object_Lifetime_Manager::fini();
00448          return 1;
00449       }
00450 
00451       SubscribeDemo demo(glob);
00452 
00453       //Timestamp stopStamp = tsFactory.getTimestamp();
00454       //std::cout << " end time: " << tsFactory.toXml(stopStamp, "", true) << std::endl;
00455       //Timestamp diff = stopStamp - startStamp;
00456       //std::cout << " time used for demo: " << tsFactory.toXml(diff, "", true) << std::endl;
00457    }
00458    catch (XmlBlasterException& ex) {
00459       std::cout << ex.toXml() << std::endl;
00460    }
00461    catch (bad_exception& ex) {
00462       cout << "bad_exception: " << ex.what() << endl;
00463    }
00464    catch (exception& ex) {
00465       cout << " exception: " << ex.what() << endl;
00466    }
00467    catch (string& ex) {
00468       cout << "string: " << ex << endl;
00469    }
00470    catch (char* ex) {
00471       cout << "char* :  " << ex << endl;
00472    }
00473 
00474    catch (...)
00475    {
00476       cout << "unknown exception occured" << endl;
00477       XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
00478       cout << e.toXml() << endl;
00479    }
00480 
00481    org::xmlBlaster::util::Object_Lifetime_Manager::fini();
00482    return 0;
00483 }