1 /*-----------------------------------------------------------------------------
2 Name: SubscribeDemo.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Little demo to show how a subscribe is done
6 -----------------------------------------------------------------------------*/
7 #include <client/XmlBlasterAccess.h>
8 #include <util/Global.h>
9 #include <util/lexical_cast.h>
10 #include <iostream>
11 #include <sstream>
12 #include <stdexcept>
14 namespace std {
15 class UpdateException : public exception {
16 public:
17 explicit UpdateException(const string& what_arg );
18 };
19 }
21 using namespace std;
22 using namespace org::xmlBlaster::client;
23 using namespace org::xmlBlaster::util;
24 using namespace org::xmlBlaster::util::qos;
25 using namespace org::xmlBlaster::util::dispatch;
26 using namespace org::xmlBlaster::client::qos;
27 using namespace org::xmlBlaster::client::key;
29 class SubscribeDemo;
30 class ProgressListener : public org::xmlBlaster::client::protocol::I_ProgressListener
31 {
32 SubscribeDemo *demoP;
33 public:
34 ProgressListener(SubscribeDemo *p) { this->demoP = p; }
35 void progress(const std::string& name, unsigned long currBytesRead, unsigned long numBytes);
36 };
39 /**
40 * This subscriber subscribes on the topic 'Hello' and dumps
41 * the received messages.
42 * Please start the publisher demo
43 * <code>
44 * PublishDemo -numPublish 10
45 * </code>
46 * to receives some messages
47 */
48 class SubscribeDemo : public I_Callback, public I_ConnectionProblems
49 {
50 private:
51 string ME;
52 Global& global_;
53 I_Log& log_;
54 XmlBlasterAccess connection_;
55 ProgressListener progressListener;
56 int updateCounter;
57 char ptr[1];
58 string subscriptionId;
59 bool dispatcherActive;
60 bool disconnect;
61 bool interactive;
62 bool interactiveUpdate;
63 bool firstTime;
64 long updateSleep;
65 bool reportUpdateProgress;
66 string updateExceptionErrorCode;
67 string updateExceptionMessage;
68 string updateExceptionRuntime;
69 string oid;
70 string domain;
71 string xpath;
72 bool multiSubscribe;
73 bool persistentSubscribe;
74 bool notifyOnErase;
75 bool local;
76 bool initialUpdate;
77 bool updateOneway;
78 bool wantContent;
79 int historyNumUpdates;
80 bool historyNewestFirst;
81 string filterType;
82 string filterVersion;
83 string filterQuery;
84 bool unSubscribe;
85 int maxContentLength;
87 public:
88 bool doContinue_;
90 SubscribeDemo(Global& glob)
91 : ME("SubscribeDemo"),
92 global_(glob),
93 log_(glob.getLog("demo")),
94 connection_(global_),
95 progressListener(this),
96 updateCounter(0)
97 {
98 initEnvironment();
99 doContinue_ = true;
100 firstTime = true;
101 execute();
102 }
104 I_Log& getLog() { return log_; }
106 void execute()
107 {
108 connect();
110 subscribe();
112 log_.info(ME, "Please use PublishDemo to publish a message '"+oid+"', i'm waiting for updates ...");
114 if (interactive) {
115 org::xmlBlaster::util::thread::Thread::sleepSecs(1);
116 bool stop = false;
117 while (!stop) {
118 string dd = dispatcherActive ? "'d' to deactivate dispatcher" : "'a' to activate dispatcher";
119 std::cout << "(Enter " << dd << " 'q' to exit) >> ";
120 std::cin.read(ptr,1);
121 if (*ptr == 'q') stop = true;
122 if (*ptr == 'a') connection_.setCallbackDispatcherActive(true), dispatcherActive=true;
123 if (*ptr == 'd') connection_.setCallbackDispatcherActive(false), dispatcherActive=false;
124 }
125 }
126 else {
127 log_.plain(ME, "I will exit when the publisher destroys the topic ...");
128 while (doContinue_) {
129 org::xmlBlaster::util::thread::Thread::sleepSecs(2);
130 }
131 }
133 unSubscribe_();
135 disconnect_();
136 }
138 void initEnvironment()
139 {
140 dispatcherActive = global_.getProperty().get("dispatcherActive", true);
141 disconnect = global_.getProperty().get("disconnect", true);
142 interactive = global_.getProperty().get("interactive", true);
143 interactiveUpdate = global_.getProperty().get("interactiveUpdate", false);
144 updateSleep = global_.getProperty().get("updateSleep", 0L);
145 reportUpdateProgress = global_.getProperty().get("reportUpdateProgress", false);
146 updateExceptionErrorCode = global_.getProperty().get("updateException.errorCode", string(""));
147 updateExceptionMessage = global_.getProperty().get("updateException.message", string(""));
148 updateExceptionRuntime = global_.getProperty().get("updateException.runtime", string(""));
149 oid = global_.getProperty().get("oid", "");
150 domain = global_.getProperty().get("domain", "");
151 xpath = global_.getProperty().get("xpath", "");
152 multiSubscribe = global_.getProperty().get("multiSubscribe", true);
153 persistentSubscribe = global_.getProperty().get("persistentSubscribe", false);
154 notifyOnErase = global_.getProperty().get("notifyOnErase", true);
155 local = global_.getProperty().get("local", true);
156 initialUpdate = global_.getProperty().get("initialUpdate", true);
157 updateOneway = global_.getProperty().get("updateOneway", false);
158 wantContent = global_.getProperty().get("wantContent", true);
159 historyNumUpdates = global_.getProperty().get("historyNumUpdates", 1);
160 historyNewestFirst = global_.getProperty().get("historyNewestFirst", true);
161 filterType = global_.getProperty().get("filter.type", "GnuRegexFilter");// XPathFilter | ContentLenFilter | Sql92Filter
162 filterVersion = global_.getProperty().get("filter.version", "1.0");
163 filterQuery = global_.getProperty().get("filter.query", "");
164 unSubscribe = global_.getProperty().get("unSubscribe", true);
165 maxContentLength = global_.getProperty().get("maxContentLength", 250);
167 if (oid == "" && xpath == "") {
168 log_.warn(ME, "No -oid or -xpath given, we subscribe to oid='Hello'.");
169 oid = "Hello";
170 }
172 if (updateSleep > 0L && interactiveUpdate == true) {
173 log_.warn(ME, "You can't set 'updateSleep' and 'interactiveUpdate' simultaneous, we reset interactiveUpdate to false");
174 interactiveUpdate = false;
175 }
177 if (updateExceptionErrorCode != "" && updateExceptionRuntime != "") {
178 log_.warn(ME, "You can't throw a runtime and an XmlBlasterException simultaneous, please check your settings "
179 " -updateException.errorCode and -updateException.runtime");
180 updateExceptionRuntime = "";
181 }
183 log_.info(ME, "Used settings are:");
184 log_.info(ME, " -dispatcherActive " + lexical_cast<string>(dispatcherActive));
185 log_.info(ME, " -interactive " + lexical_cast<string>(interactive));
186 log_.info(ME, " -interactiveUpdate " + lexical_cast<string>(interactiveUpdate));
187 log_.info(ME, " -updateSleep " + lexical_cast<string>(updateSleep));
188 log_.info(ME, " -reportUpdateProgress " + lexical_cast<string>(reportUpdateProgress));
189 log_.info(ME, " -updateException.errorCode " + updateExceptionErrorCode);
190 log_.info(ME, " -updateException.message " + updateExceptionMessage);
191 log_.info(ME, " -updateException.runtime " + updateExceptionRuntime);
192 log_.info(ME, " -oid " + oid);
193 log_.info(ME, " -domain " + domain);
194 log_.info(ME, " -xpath " + xpath);
195 log_.info(ME, " -multiSubscribe " + lexical_cast<string>(multiSubscribe));
196 log_.info(ME, " -persistentSubscribe " + lexical_cast<string>(persistentSubscribe));
197 log_.info(ME, " -notifyOnErase " + lexical_cast<string>(notifyOnErase));
198 log_.info(ME, " -local " + lexical_cast<string>(local));
199 log_.info(ME, " -initialUpdate " + lexical_cast<string>(initialUpdate));
200 log_.info(ME, " -updateOneway " + lexical_cast<string>(updateOneway));
201 log_.info(ME, " -historyNumUpdates " + lexical_cast<string>(historyNumUpdates));
202 log_.info(ME, " -historyNewestFirst " + lexical_cast<string>(historyNewestFirst));
203 log_.info(ME, " -wantContent " + lexical_cast<string>(wantContent));
204 log_.info(ME, " -maxContentLength " + lexical_cast<string>(maxContentLength));
205 log_.info(ME, " -unSubscribe " + lexical_cast<string>(unSubscribe));
206 log_.info(ME, " -disconnect " + lexical_cast<string>(disconnect));
207 log_.info(ME, " -filter.type " + filterType);
208 log_.info(ME, " -filter.version " + filterVersion);
209 log_.info(ME, " -filter.query " + filterQuery);
210 log_.info(ME, "For more info please read:");
211 log_.info(ME, " http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.subscribe.html");
212 }
214 bool reachedAlive(StatesEnum /*oldState*/, I_ConnectionsHandler* connectionsHandler)
215 {
216 log_.info(ME, "reachedAlive()");
217 if (!firstTime && !connectionsHandler->getConnectReturnQos()->isReconnected() && !persistentSubscribe) {
218 subscribe(); // We lost the old subscription, initialize subscription again
219 }
220 return true;
221 }
223 void reachedDead(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
224 {
225 log_.info(ME, "reachedDead()");
226 }
228 void reachedPolling(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
229 {
230 log_.info(ME, "reachedPolling()");
231 }
233 void connect()
234 {
235 connection_.initFailsafe(this);
236 ConnectQos connQos(global_);
237 connQos.getCbAddress()->setDispatcherActive(dispatcherActive);
238 if (log_.trace()) log_.trace(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos.toXml());
239 ConnectReturnQos retQos = connection_.connect(connQos, this);
240 if (log_.trace()) log_.trace(ME, "successfully connected to xmlBlaster. Return qos: " + retQos.toXml());
241 if (reportUpdateProgress) {
242 connection_.registerProgressListener(&progressListener);
243 }
244 }
246 void subscribe()
247 {
248 SubscribeKey *sk = 0;
249 string qStr = "";
250 try {
251 sk = new SubscribeKey(global_);
252 if (oid.length() > 0) {
253 //sk = new SubscribeKey(global_, oid);
254 qStr = oid;
255 sk->setOid(oid);
256 }
257 else if (xpath.length() > 0) {
258 //sk = new SubscribeKey(global_, xpath, Constants::XPATH);
259 qStr = xpath;
260 sk->setQueryString(xpath);
261 }
262 if (domain.length() > 0) { // cluster routing information
263 if (sk == 0) sk = new SubscribeKey(global_, "", Constants::D_O_M_A_I_N);
264 sk->setDomain(domain);
265 qStr = domain;
266 }
267 SubscribeQos sq(global_);
268 sq.setWantInitialUpdate(initialUpdate);
269 sq.setWantUpdateOneway(updateOneway);
270 sq.setMultiSubscribe(multiSubscribe);
271 sq.setPersistent(persistentSubscribe);
272 sq.setWantNotify(notifyOnErase);
273 sq.setWantLocal(local);
274 sq.setWantContent(wantContent);
276 HistoryQos historyQos(global_);
277 historyQos.setNumEntries(historyNumUpdates);
278 historyQos.setNewestFirst(historyNewestFirst);
279 sq.setHistoryQos(historyQos);
281 if (filterQuery.length() > 0) {
282 AccessFilterQos filter(global_, filterType, filterVersion, filterQuery);
283 sq.addAccessFilter(filter);
284 }
286 log_.info(ME, "SubscribeKey=" + sk->toXml());
287 log_.info(ME, "SubscribeQos=" + sq.toXml());
289 if (firstTime && interactive) {
290 log_.info(ME, "Hit a key to subscribe '" + qStr + "'");
291 std::cin.read(ptr,1);
292 }
293 firstTime = false;
295 SubscribeReturnQos srq = connection_.subscribe(*sk, sq);
296 subscriptionId = srq.getSubscriptionId();
298 log_.info(ME, "Subscribed on topic '" + ((oid.length() > 0) ? oid : xpath) +
299 "', got subscription id='" + srq.getSubscriptionId() + "'\n" + srq.toXml());
300 if (log_.dump()) log_.dump("", "Subscribed: " + sk->toXml() + sq.toXml() + srq.toXml());
301 delete sk;
302 }
303 catch (...) { // a finally would have been more appropriate
304 delete sk;
305 throw;
306 }
307 }
309 void unSubscribe_()
310 {
311 if (unSubscribe) {
312 if (interactive) {
313 log_.info(ME, "Hit a key to unSubscribe");
314 std::cin.read(ptr,1);
315 }
317 UnSubscribeKey uk(global_, subscriptionId);
318 if (domain.length() > 0) // cluster routing information TODO!!!
319 uk.setDomain(domain);
320 UnSubscribeQos uq(global_);
321 log_.info(ME, "UnSubscribeKey=" + uk.toXml());
322 log_.info(ME, "UnSubscribeQos=" + uq.toXml());
323 vector<UnSubscribeReturnQos> urqArr = connection_.unSubscribe(uk, uq);
324 log_.info(ME, "UnSubscribe on " + lexical_cast<string>(urqArr.size()) + " subscriptions done");
325 }
326 }
328 void disconnect_()
329 {
330 if (disconnect) {
331 DisconnectQos dq(global_);
332 connection_.disconnect(dq);
333 log_.info(ME, "Disconnected");
334 }
335 }
337 /**
338 * Here we receive the asynchronous callback messages from the xmlBlaster server.
339 */
340 string update(const string& sessionId, UpdateKey& updateKey, const unsigned char *content, long contentSize, UpdateQos& updateQos)
341 {
342 stringstream sout;
344 if (updateQos.isErased() && oid.length() > 0) { // Erased topic with EXACT subscription?
345 sout << endl << "============= Topic '" + updateKey.getOid() + "' is ERASED =======================" << endl;
346 log_.plain(ME, sout.str());
347 subscribe(); // topic is erased -> re-subsribe
348 return Constants::RET_OK; // "<qos><state id='OK'/></qos>";
349 }
351 //const Global& global_ = updateKey.getGlobal();
352 ++updateCounter;
354 //NOTE: subscribe("anotherDummy"); -> subscribe in update does not work
355 // with single threaded 'mico' or SOCKET protocol
357 log_.info(ME, "Receiving update #" + lexical_cast<string>(updateCounter) + " of a message, secret sessionId=" + sessionId + " ...");
359 sout << endl << "============= START #" << updateCounter << " '" << updateKey.getOid() << "' =======================";
360 string contentStr((char*)content, (char*)(content)+contentSize);
361 sout << endl << "<xmlBlaster>";
362 sout << updateKey.toXml(" ");
363 sout << endl << " <content size='" << contentSize << "'>";
364 if (contentSize < maxContentLength)
365 sout << endl << contentStr;
366 else
367 sout << endl << contentStr.substr(0, maxContentLength-5) << " ...";
368 sout << endl << " </content>";
369 sout << updateQos.toXml(" ");
370 sout << endl << "</xmlBlaster>";
371 sout << endl << "============= END #" << updateCounter << " '" << updateKey.getOid() << "' =========================";
372 sout << endl;
373 log_.plain(ME, sout.str());
375 // Dump the ClientProperties decoded (the above dump may contain Base64 encoding):
376 const QosData::ClientPropertyMap& propMap = updateQos.getClientProperties();
377 QosData::ClientPropertyMap::const_iterator mi;
378 for (mi=propMap.begin(); mi!=propMap.end(); ++mi) {
379 const ClientProperty& clientProperty = mi->second;
380 if (clientProperty.isBase64()) {
381 log_.info(ME, "ClientProperty decoded: "+mi->first+"=" + clientProperty.getStringValue());
382 }
383 }
384 // Examples for direct access:
385 if (updateQos.hasClientProperty(string("StringKey"))) {
386 log_.info(ME, "ClientProperty BLA=" +updateQos.getClientProperty("BLA", string("MISSING VALUE?")));
387 }
388 if (updateQos.hasClientProperty(string("ALONG"))) {
389 long aLongValue = updateQos.getClientProperty("ALONG", -1L);
390 log_.info(ME, "ClientProperty ALONG=" + lexical_cast<string>(aLongValue));
391 }
393 if (updateSleep > 0L) {
394 log_.info(ME, "Sleeping for " + lexical_cast<string>(updateSleep) + " millis in callback ...");
395 org::xmlBlaster::util::thread::Thread::sleep(updateSleep);
396 log_.info(ME, "Waking up.");
397 }
398 else if (interactiveUpdate) {
399 log_.info(ME, "Hit a key to return from update() (we are blocking the server callback) ...");
400 std::cin.read(ptr,1);
401 log_.info(ME, "Returning update() - control goes back to server");
402 }
404 if (updateExceptionErrorCode != "") {
405 log_.info(ME, "Throwing XmlBlasterException with errorCode='" + updateExceptionErrorCode + "' back to server ...");
406 throw XmlBlasterException(updateExceptionErrorCode, ME, updateExceptionMessage);
407 }
409 if (updateExceptionRuntime != "") {
410 log_.info(ME, "Throwing RuntimeException '" + updateExceptionRuntime + "'");
411 //throw UpdateException(updateExceptionRuntime);
412 throw logic_error(updateExceptionRuntime);
413 }
415 return Constants::RET_OK;
416 }
417 };
419 void ProgressListener::progress(const std::string& name, unsigned long currBytesRead, unsigned long numBytes) {
420 demoP->getLog().info("SubscribeDemo", name + "Progress of incoming message is currBytesRead=" +
421 lexical_cast<string>(currBytesRead) + " nbytes=" + lexical_cast<string>(numBytes));
422 }
424 /**
425 * Try
426 * <pre>
427 * SubscribeDemo -help
428 * </pre>
429 * for usage help
430 */
431 int main(int args, char ** argv)
432 {
433 org::xmlBlaster::util::Object_Lifetime_Manager::init();
434 //TimestampFactory& tsFactory = TimestampFactory::getInstance();
435 //Timestamp startStamp = tsFactory.getTimestamp();
436 //std::cout << " start time: " << tsFactory.toXml(startStamp, "", true) << std::endl;
438 try {
439 Global& glob = Global::getInstance();
440 glob.initialize(args, argv);
442 if (glob.wantsHelp()) {
443 glob.getLog().plain("", Global::usage());
444 glob.getLog().plain("", "\nExample:\n");
445 glob.getLog().plain("", " SubscribeDemo -trace true -interactiveUpdate true\n");
446 glob.getLog().plain("", " SubscribeDemo -xpath '//key' -filter.query '^H.*'\n");
447 org::xmlBlaster::util::Object_Lifetime_Manager::fini();
448 return 1;
449 }
451 SubscribeDemo demo(glob);
453 //Timestamp stopStamp = tsFactory.getTimestamp();
454 //std::cout << " end time: " << tsFactory.toXml(stopStamp, "", true) << std::endl;
455 //Timestamp diff = stopStamp - startStamp;
456 //std::cout << " time used for demo: " << tsFactory.toXml(diff, "", true) << std::endl;
457 }
458 catch (XmlBlasterException& ex) {
459 std::cout << ex.toXml() << std::endl;
460 }
461 catch (bad_exception& ex) {
462 cout << "bad_exception: " << ex.what() << endl;
463 }
464 catch (exception& ex) {
465 cout << " exception: " << ex.what() << endl;
466 }
467 catch (string& ex) {
468 cout << "string: " << ex << endl;
469 }
470 catch (char* ex) {
471 cout << "char* : " << ex << endl;
472 }
474 catch (...)
475 {
476 cout << "unknown exception occured" << endl;
477 XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
478 cout << e.toXml() << endl;
479 }
481 org::xmlBlaster::util::Object_Lifetime_Manager::fini();
482 return 0;
483 }
syntax highlighted by Code2HTML, v. 0.9.1