00001 /*------------------------------------------------------------------------------ 00002 Name: xmlBlaster/demo/c++/HelloWorld2.cpp 00003 Project: xmlBlaster.org 00004 Comment: C++ client example 00005 Author: Michele Laghi 00006 ------------------------------------------------------------------------------*/ 00007 #include <client/XmlBlasterAccess.h> 00008 #include <util/Global.h> 00009 #include <util/queue/I_Queue.h> 00010 00011 using namespace std; 00012 using namespace org::xmlBlaster::util; 00013 using namespace org::xmlBlaster::util::qos; 00014 using namespace org::xmlBlaster::util::dispatch; 00015 using namespace org::xmlBlaster::client; 00016 using namespace org::xmlBlaster::client::qos; 00017 using namespace org::xmlBlaster::client::key; 00018 using namespace org::xmlBlaster::util::queue; 00019 00035 class HelloWorld2 : public I_Callback, // for the asynchroneous updates 00036 public I_ConnectionProblems // notification of connection problems when failsafe 00037 { 00038 private: 00039 string ME; // the string identifying this class when logging 00040 Global& global_; 00041 I_Log& log_; // the reference to the log object for this instance 00042 00043 public: 00044 HelloWorld2(Global& glob) 00045 : ME("HelloWorld2"), 00046 global_(glob), 00047 log_(glob.getLog("HelloWorld2")) // all logs written in this class are written to the 00048 { // log channel called 'HelloWorld2'. To see the traces of this 00049 // channel invoke -trace[HelloWorld2] true on the command line, 00050 // then it will only switch on the traces for the demo channel 00051 log_.info(ME, "Trying to connect to xmlBlaster with C++ client lib " + Global::getReleaseId() + 00052 " from " + Global::getBuildTimestamp()); 00053 } 00054 00055 virtual ~HelloWorld2() // the constructor does nothing for the moment 00056 { 00057 } 00058 00059 00060 bool reachedAlive(StatesEnum /*oldState*/, I_ConnectionsHandler* connectionsHandler) 00061 { 00062 I_Queue* queue = connectionsHandler->getQueue(); 00063 if (queue && queue->getNumOfEntries() > 0) { 00064 log_.info(ME, "Clearing " + lexical_cast<std::string>(queue->getNumOfEntries()) + " entries from queue"); 00065 queue->clear(); 00066 } 00067 log_.info(ME, "reconnected"); 00068 return true; 00069 } 00070 00071 void reachedDead(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/) 00072 { 00073 log_.info(ME, "lost connection"); 00074 } 00075 00076 void reachedPolling(StatesEnum /*oldState*/, I_ConnectionsHandler* connectionsHandler) 00077 { 00078 I_Queue* queue = connectionsHandler->getQueue(); 00079 if (queue) { 00080 log_.info(ME, "Found " + lexical_cast<std::string>(queue->getNumOfEntries()) + " entries in client side queue"); 00081 } 00082 log_.info(ME, "going to poll modus"); 00083 } 00084 00085 void execute() 00086 { 00087 long sleepMillis = global_.getProperty().get("sleep", 1000L); 00088 try { 00089 XmlBlasterAccess con(global_); 00090 con.initFailsafe(this); 00091 00092 // Creates a connect qos with the user 'joe' and the password 'secret' 00093 ConnectQos qos(global_, "joe", "secret"); 00094 /* 00095 string user = "joe"; 00096 string pwd = "secret"; 00097 00098 // Creates a connect qos with the user and password 00099 // <session name="client/joe/session/1" ...> 00100 ConnectQos qos(global_, user+"/session/1", pwd); 00101 00102 // Configure htpasswd security plugin 00103 // <securityService type="htpasswd" version="1.0"> 00104 // <![CDATA[ 00105 // <user>joe</user> 00106 // <passwd>secret</passwd> 00107 // ]]> 00108 // </securityService> 00109 org::xmlBlaster::authentication::SecurityQos sec(global_, user, pwd, "htpasswd,1.0"); 00110 qos.setSecurityQos(sec); 00111 */ 00112 log_.info(ME, string("connecting to xmlBlaster. Connect qos: ") + qos.toXml()); 00113 00114 00115 // connects to xmlBlaster and gives a pointer to this class to tell 00116 // which update method to invoke when callbacks come from the server. 00117 ConnectReturnQos retQos = con.connect(qos, this); // Login and register for updates 00118 log_.info(ME, "successfully connected to xmlBlaster. Return qos: " + retQos.toXml()); 00119 00120 00121 // subscribe key. By invoking setOid you implicitly choose the 'EXACT' mode. 00122 // If you want to subscribe with XPATH use setQueryString instead. 00123 SubscribeKey subKey(global_); 00124 subKey.setOid("HelloWorld2"); 00125 SubscribeQos subQos(global_); 00126 subQos.setMultiSubscribe(false); 00127 log_.info(ME, string("subscribing to xmlBlaster with key: ") + subKey.toXml() + 00128 " and qos: " + subQos.toXml()); 00129 00130 I_Queue* queue = con.getQueue(); 00131 if (queue && queue->getNumOfEntries() > 0) { 00132 log_.info(ME, "Found " + lexical_cast<std::string>(queue->getNumOfEntries()) + " entries in client side connection queue"); 00133 //queue->clear(); 00134 } 00135 00136 SubscribeReturnQos subRetQos = con.subscribe(subKey, subQos); 00137 log_.info(ME, string("successfully subscribed to xmlBlaster. Return qos: ") + 00138 subRetQos.toXml()); 00139 00140 // publish a message with the oid 'HelloWorld2' 00141 PublishQos publishQos(global_); 00142 PublishKey publishKey(global_); 00143 publishKey.setOid("HelloWorld2"); 00144 MessageUnit msgUnit(publishKey, string("Hi"), publishQos); 00145 log_.info(ME, string("publishing to xmlBlaster with message: ") + msgUnit.toXml()); 00146 PublishReturnQos pubRetQos = con.publish(msgUnit); 00147 log_.info(ME, "successfully published to xmlBlaster. Return qos: " + pubRetQos.toXml()); 00148 try { 00149 log_.info(ME, "Sleeping now for " + lexical_cast<string>(sleepMillis) + " msec ..."); 00150 org::xmlBlaster::util::thread::Thread::sleep(sleepMillis); 00151 } 00152 catch(const XmlBlasterException &e) { 00153 log_.error(ME, e.toXml()); 00154 } 00155 00156 // peek a history queue (could be any queue, see reveive() method 00157 string oid("topic/HelloWorld2"); 00158 int maxEntries = 4; 00159 long timeout = 0; 00160 bool consumable = false; 00161 vector<MessageUnit> msgVec = con.receive(oid, maxEntries, timeout, consumable); 00162 log_.info(ME, "Success, got " + lexical_cast<string>(msgVec.size()) + " messages"); 00163 for (size_t i=0; i<msgVec.size(); i++) { 00164 string str = msgVec[i].getContentStr(); 00165 log_.info(ME, "Peeked history messages '" + str + "'"); 00166 } 00167 00168 00169 log_.info(ME, "Hit a key to finish ..."); 00170 char ptr[1]; 00171 std::cin.read(ptr,1); 00172 00173 // now an update should have come. Its time to erase the message, 00174 // otherwise you would get directly an update the next time you connect 00175 // to the same xmlBlaster server. 00176 // Specify which messages you want to erase. Note that you will get an 00177 // update with the status of the UpdateQos set to 'ERASED'. 00178 EraseKey eraseKey(global_); 00179 eraseKey.setOid("HelloWorld2"); 00180 EraseQos eraseQos(global_); 00181 log_.info(ME, string("erasing the published message. Key: ") + eraseKey.toXml() + 00182 " qos: " + eraseQos.toXml()); 00183 vector<EraseReturnQos> eraseRetQos = con.erase(eraseKey, eraseQos); 00184 for (size_t i=0; i < eraseRetQos.size(); i++ ) { 00185 log_.info(ME, string("successfully erased the message. return qos: ") + 00186 eraseRetQos[i].toXml()); 00187 } 00188 org::xmlBlaster::util::thread::Thread::sleep(500); // wait for erase notification 00189 00190 log_.info(ME, "Disconnect, bye."); 00191 DisconnectQos disconnectQos(global_); 00192 con.disconnect(disconnectQos); 00193 } 00194 catch (const XmlBlasterException &e) { 00195 log_.error(ME, e.toXml()); 00196 } 00197 } 00198 00202 string update(const string& sessionId, UpdateKey& updateKey, 00203 const unsigned char* content, 00204 long contentSize, UpdateQos& updateQos) 00205 { 00206 string contentStr(reinterpret_cast<char *>(const_cast<unsigned char *>(content)), contentSize); 00207 log_.info(ME, "Received update message with secret sessionId '" + sessionId + "':" + 00208 updateKey.toXml() + 00209 "\n content=" + contentStr + 00210 updateQos.toXml()); 00211 // if (true) throw XmlBlasterException(USER_UPDATE_ERROR, "HelloWorld2", "TEST"); 00212 return ""; 00213 } 00214 00215 }; 00216 00217 #include <iostream> 00218 00226 int main(int args, char ** argv) 00227 { 00228 try { 00229 org::xmlBlaster::util::Object_Lifetime_Manager::init(); 00230 Global& glob = Global::getInstance(); 00231 glob.initialize(args, argv); 00232 00233 string intro = "XmlBlaster C++ client " + glob.getReleaseId() + 00234 ", try option '-help' if you need usage informations."; 00235 glob.getLog().info("HelloWorld2", intro); 00236 00237 if (glob.wantsHelp()) { 00238 cout << Global::usage() << endl; 00239 cout << endl << "HelloWorld2"; 00240 cout << endl << " -sleep Sleep after publishing [1000 millisec]" << endl; 00241 cout << endl << "Example:" << endl; 00242 cout << endl << "HelloWorld2 -trace true -sleep 2000"; 00243 cout << endl << "HelloWorld2 -dispatch/connection/delay 10000 -sleep 2000000" << endl << endl; 00244 org::xmlBlaster::util::Object_Lifetime_Manager::fini(); 00245 return 1; 00246 } 00247 00248 HelloWorld2 hello(glob); 00249 hello.execute(); 00250 } 00251 catch (XmlBlasterException &e) { 00252 std::cerr << "Caught exception: " << e.getMessage() << std::endl; 00253 } 00254 catch (...) { 00255 std::cerr << "Caught exception, exit" << std::endl; 00256 } 00257 org::xmlBlaster::util::Object_Lifetime_Manager::fini(); 00258 return 0; 00259 }