1 /*-----------------------------------------------------------------------------
2 Name: PublishDemo.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Little demo to show how a publish is done
6 -----------------------------------------------------------------------------*/
7
8 #include <client/XmlBlasterAccess.h>
9 #include <util/Global.h>
10 #include <util/lexical_cast.h>
11 #include <util/qos/ClientProperty.h>
12 #include <util/queue/PublishQueueEntry.h>
13 #include <authentication/SecurityQos.h>
14 #include <iostream>
15 #include <fstream>
16 #include <map>
17
18 using namespace std;
19 using namespace org::xmlBlaster::client;
20 using namespace org::xmlBlaster::util;
21 using namespace org::xmlBlaster::util::qos;
22 using namespace org::xmlBlaster::util::qos::storage;
23 using namespace org::xmlBlaster::util::queue;
24 using namespace org::xmlBlaster::client::qos;
25 using namespace org::xmlBlaster::client::key;
26
27 static unsigned long filesize(ifstream &ins)
28 {
29 unsigned long s,e,c;
30 c = ins.tellg(); // save current file position
31 ins.seekg(0, ios::end); // position at end
32 e = ins.tellg();
33 ins.seekg(0, ios::beg); // position at beginning
34 s = ins.tellg();
35 ins.seekg(c); // restore file position
36 return e-s;
37 }
38
39 static int fileRead(string &fn, string &content)
40 {
41 unsigned char *buf;
42 ifstream ins(fn.c_str(), ios_base::binary);
43 if (!ins.is_open()) return -1;
44 int fs = filesize(ins);
45 buf = new unsigned char [fs+1];
46 buf[fs] = 0; // so we can assign to string
47 ins.read((char *)buf,fs);
48 ins.close();
49 content = (char *)buf;
50 delete [] buf;
51 return fs;
52 }
53
54
55 class PublishDemo : public org::xmlBlaster::util::dispatch::I_PostSendListener
56 {
57 private:
58 string ME;
59 Global& global_;
60 I_Log& log_;
61 char ptr[2];
62 XmlBlasterAccess connection_;
63 bool interactive;
64 bool oneway;
65 long sleep;
66 int numPublish;
67 string oid;
68 string domain;
69 string clientTags;
70 string contentStr;
71 string contentFile;
72 PriorityEnum priority;
73 bool persistent;
74 long lifeTime;
75 bool forceUpdate;
76 bool forceDestroy;
77 bool readonly;
78 long destroyDelay;
79 bool createDomEntry;
80 long historyMaxMsg;
81 bool forceQueuing;
82 bool subscribable;
83 string destination;
84 bool doErase;
85 bool disconnect;
86 bool eraseTailback;
87 int contentSize;
88 bool eraseForceDestroy;
89 QosData::ClientPropertyMap clientPropertyMap;
90
91 public:
92 PublishDemo(Global& glob)
93 : ME("PublishDemo"),
94 global_(glob),
95 log_(glob.getLog("demo")),
96 connection_(global_)
97 {
98 initEnvironment();
99 run();
100 }
101
102 void run()
103 {
104 connect();
105 publish();
106 erase();
107 connection_.disconnect(DisconnectQos(global_));
108 }
109
110 void initEnvironment();
111
112 void connect();
113
114 void publish();
115
116 void erase()
117 {
118 if (doErase) {
119 if (interactive) {
120 string outStr = "Hit 'e' to erase topic '" + oid + "' ('q' to exit without erase) >> ";
121 string ret = org::xmlBlaster::util::waitOnKeyboardHit(outStr);
122 if (ret == "q") return;
123 }
124 log_.info(ME, "Erasing topic '" + oid + "'");
125 EraseKey key(global_);
126 key.setOid(oid);
127 EraseQos eq(global_);
128 eq.setForceDestroy(eraseForceDestroy);
129 connection_.erase(key, eq);
130 }
131 }
132
133 /**
134 * Is called after each successful send tail back message from client side queue
135 * (typically after a connection loss with ongoing publishes)
136 * @see I_PostSendListener and connection_.registerPostSendListener(this);
137 */
138 void postSend(const std::vector<org::xmlBlaster::util::queue::EntryType> &entries)
139 {
140 vector<EntryType>::const_iterator iter = entries.begin();
141 while (iter != entries.end()) {
142 const EntryType entryRef = (*iter);
143 iter++;
144 const MsgQueueEntry &entry = *entryRef;
145 if (entry.isPublish()) {
146 const PublishQueueEntry& pubEntry = *(dynamic_cast<const PublishQueueEntry*>(&entry));
147 const PublishReturnQos* qos = pubEntry.getPublishReturnQos();
148 log_.info(ME, "Tailback message is send from client queue, state=" + qos->getState() + ": " + pubEntry.getMsgUnit().getContentStr());
149 }
150 else {
151 log_.info(ME, "Tailback message is send from client queue");
152 }
153 }
154 }
155
156 /**
157 * Is called asynchronously if a tailback message from our queue couldn't be send.
158 * (typically after a connection loss with ongoing publishes)
159 * @see I_PostSendListener and connection_.registerPostSendListener(this);
160 */
161 bool sendingFailed(const std::vector<org::xmlBlaster::util::queue::EntryType> &entries, const XmlBlasterException &exception)
162 {
163 vector<EntryType>::const_iterator iter = entries.begin();
164 while (iter != entries.end()) {
165 const EntryType entryRef = (*iter);
166 iter++;
167 const MsgQueueEntry &entry = *entryRef;
168 log_.warn(ME, "Tailback '"+ entry.getMethodName() + "' message sending from client queue failed: " + exception.getMessage());
169 }
170 return false; // Let the framework handle it
171 }
172
173
174 };
175
176 void PublishDemo::initEnvironment()
177 {
178 interactive = global_.getProperty().get("interactive", true);
179 oneway = global_.getProperty().get("oneway", false);
180 sleep = global_.getProperty().get("sleep", 1000L);
181 numPublish = global_.getProperty().get("numPublish", 1);
182 oid = global_.getProperty().get("oid", string("Hello"));
183 domain = global_.getProperty().get("domain", string(""));
184 clientTags = global_.getProperty().get("clientTags", ""); // "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>");
185 contentStr = global_.getProperty().get("content", "Hi-%counter");
186 contentFile = global_.getProperty().get("contentFile", "");
187 priority = int2Priority(global_.getProperty().get("priority", NORM_PRIORITY));
188 persistent = global_.getProperty().get("persistent", true);
189 lifeTime = global_.getProperty().get("lifeTime", -1L);
190 forceUpdate = global_.getProperty().get("forceUpdate", true);
191 forceDestroy = global_.getProperty().get("forceDestroy", false);
192 readonly = global_.getProperty().get("readonly", false);
193 destroyDelay = global_.getProperty().get("destroyDelay", 60000L);
194 createDomEntry = global_.getProperty().get("createDomEntry", true);
195 historyMaxMsg = global_.getProperty().get("queue/history/maxEntries", -1L);
196 forceQueuing = global_.getProperty().get("forceQueuing", true);
197 subscribable = global_.getProperty().get("subscribable", true);
198 destination = global_.getProperty().get("destination", "");
199 doErase = global_.getProperty().get("doErase", true);
200 disconnect = global_.getProperty().get("disconnect", true);
201 eraseTailback = global_.getProperty().get("eraseTailback", false);
202 contentSize = global_.getProperty().get("contentSize", -1); // 2000000);
203 eraseForceDestroy = global_.getProperty().get("erase.forceDestroy", false);
204
205 //TODO: Needs to be ported similar to Java
206 //map<std::string,std::string> clientPropertyMap = global_.getProperty().get("clientProperty", map<std::string,std::string>());
207 string clientPropertyKey = global_.getProperty().get("clientProperty.key", string(""));
208 string clientPropertyValue = global_.getProperty().get("clientProperty.value", string(""));
209 string clientPropertyEncoding = global_.getProperty().get("clientProperty.encoding", ""); // Force to Constants::ENCODING_BASE64="base64"
210 string clientPropertyCharset = global_.getProperty().get("clientProperty.charset", ""); // Force to e.g. "windows-1252"
211 string clientPropertyType = global_.getProperty().get("clientProperty.type", ""); // Date type, see Constants::TYPE_DOUBLE, Constants::TYPE_STRING etc
212 if (clientPropertyKey != "") {
213 ClientProperty cp(clientPropertyKey, clientPropertyValue, clientPropertyType, clientPropertyEncoding);
214 if (clientPropertyCharset != "") cp.setCharset(clientPropertyCharset);
215 //
216 // Returns "en_US.UTF-8" on Linux and "English_United States.1252" on WinXP
217 //char *p = setlocale(LC_CTYPE, "");
218 //log_.info(ME, "setlocale CTYPE returns: " + string(p));
219 // But java (server on Linux or Windows) can't handle "English_United States.1252" or "1252": java.io.UnsupportedEncodingException: 1252
220 // but it can handle conversion from "windows-1252" to "UTF-8"
221 // Further, java does: UnsupportedEncodingException: en_US.UTF-8
222 // but likes "UTF-8"
223 //What else instead of setlocal() could we use for automatic charset detection of this C++ client (which is compatible to Java used names)?
224 clientPropertyMap.insert(QosData::ClientPropertyMap::value_type(clientPropertyKey, cp));
225 }
226
227 if (historyMaxMsg < 1 && !global_.getProperty().propertyExists("destroyDelay"))
228 destroyDelay = 24L*60L*60L*1000L; // Increase destroyDelay to one day if no history queue is used
229
230 log_.info(ME, "You can use for example '-session.name publisher/1 -passwd secret' to pass your credentials");
231 log_.info(ME, "Used settings are:");
232 log_.info(ME, " -interactive " + lexical_cast<string>(interactive));
233 log_.info(ME, " -sleep " + lexical_cast<string>(sleep)); // org.jutils.time.TimeHelper.millisToNice(sleep));
234 log_.info(ME, " -oneway " + lexical_cast<string>(oneway));
235 log_.info(ME, " -doErase " + lexical_cast<string>(doErase));
236 log_.info(ME, " -disconnect " + lexical_cast<string>(disconnect));
237 log_.info(ME, " -eraseTailback " + lexical_cast<string>(eraseTailback));
238 log_.info(ME, " Pub/Sub settings");
239 log_.info(ME, " -numPublish " + lexical_cast<string>(numPublish));
240 log_.info(ME, " -oid " + lexical_cast<string>(oid));
241 log_.info(ME, " -domain " + lexical_cast<string>(domain));
242 log_.info(ME, " -clientTags " + clientTags);
243 if (contentSize >= 0) {
244 log_.info(ME, " -content [generated]");
245 log_.info(ME, " -contentSize " + lexical_cast<string>(contentSize));
246 }
247 else if (contentFile.size() > 0) {
248 log_.info(ME, " -contentFile " + contentFile);
249 }
250 else {
251 log_.info(ME, " -content " + contentStr);
252 log_.info(ME, " -contentSize " + lexical_cast<string>(contentStr.length()));
253 }
254 log_.info(ME, " -priority " + lexical_cast<string>(priority));
255 log_.info(ME, " -persistent " + lexical_cast<string>(persistent));
256 log_.info(ME, " -lifeTime " + lexical_cast<string>(lifeTime)); // org.jutils.time.TimeHelper.millisToNice(lifeTime));
257 log_.info(ME, " -forceUpdate " + lexical_cast<string>(forceUpdate));
258 log_.info(ME, " -forceDestroy " + lexical_cast<string>(forceDestroy));
259 if (clientPropertyMap.size() > 0) {
260 QosData::ClientPropertyMap::const_iterator mi;
261 for (mi=clientPropertyMap.begin(); mi!=clientPropertyMap.end(); ++mi) {
262 log_.info(ME, " -clientProperty["+mi->first+"] " + mi->second.getStringValue());
263 }
264 }
265 else {
266 log_.info(ME, " -clientProperty[] ");
267 }
268 log_.info(ME, " Topic settings");
269 log_.info(ME, " -readonly " + lexical_cast<string>(readonly));
270 log_.info(ME, " -destroyDelay " + lexical_cast<string>(destroyDelay)); // org.jutils.time.TimeHelper.millisToNice(destroyDelay));
271 log_.info(ME, " -createDomEntry " + lexical_cast<string>(createDomEntry));
272 log_.info(ME, " -queue/history/maxEntries " + lexical_cast<string>(historyMaxMsg));
273 log_.info(ME, " PtP settings");
274 log_.info(ME, " -subscribable " + lexical_cast<string>(subscribable));
275 log_.info(ME, " -forceQueuing " + lexical_cast<string>(forceQueuing));
276 log_.info(ME, " -destination " + destination);
277 log_.info(ME, " Erase settings");
278 log_.info(ME, " -erase.forceDestroy " + lexical_cast<string>(eraseForceDestroy));
279 log_.info(ME, "For more info please read:");
280 log_.info(ME, " http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html");
281 }
282
283 void PublishDemo::connect()
284 {
285 ConnectQos connQos(global_);
286 //org::xmlBlaster::authentication::SecurityQos sec(global_, "jack", "secret", "htpasswd,1.0");
287 //connQos.setSecurityQos(sec);
288
289 connection_.registerPostSendListener(this);
290
291 log_.trace(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos.toXml());
292 ConnectReturnQos retQos = connection_.connect(connQos, NULL); // no callback
293 log_.trace(ME, "successfully connected to " + connection_.getServerNodeId() + ". Return qos: " + retQos.toXml());
294 }
295
296 void PublishDemo::publish()
297 {
298 for(int i=0; i<numPublish; i++) {
299
300 if (interactive) {
301 std::cout << "Hit a key to publish '" + oid + "' #" + lexical_cast<string>(i+1) + "/" + lexical_cast<string>(numPublish) + " ('b' to break) >> ";
302 std::cin.read(ptr,1);
303 if (*ptr == 'b') break;
304 }
305 else {
306 if (sleep > 0) {
307 try {
308 org::xmlBlaster::util::thread::Thread::sleep(sleep);
309 }
310 catch(XmlBlasterException e) {
311 log_.error(ME, e.toXml());
312 }
313 }
314 log_.info(ME, "Publish '" + oid + "' #" + lexical_cast<string>(i+1) + "/" + lexical_cast<string>(numPublish));
315 }
316
317 PublishKey key(global_, oid, "text/xml", "1.0");
318 key.setClientTags(clientTags);
319 if (domain != "") key.setDomain(domain);
320 if (i==0) log_.info(ME, "PublishKey: " + key.toXml());
321
322 PublishQos pq(global_);
323 pq.setPriority(priority);
324 pq.setPersistent(persistent);
325 pq.setLifeTime(lifeTime);
326 pq.setForceUpdate(forceUpdate);
327 pq.setForceDestroy(forceDestroy);
328 pq.setSubscribable(subscribable);
329 if (clientPropertyMap.size() > 0) {
330 pq.setClientProperties(clientPropertyMap);
331 //This is the correct way for a typed property:
332 pq.addClientProperty("ALONG", long(12L));
333 }
334
335 if (i == 0) {
336 TopicProperty topicProperty(global_);
337 topicProperty.setDestroyDelay(destroyDelay);
338 topicProperty.setCreateDomEntry(createDomEntry);
339 topicProperty.setReadonly(readonly);
340 if (historyMaxMsg >= 0L) {
341 HistoryQueueProperty prop(global_, "");
342 prop.setMaxEntries(historyMaxMsg);
343 topicProperty.setHistoryQueueProperty(prop);
344 }
345 pq.setTopicProperty(topicProperty);
346 log_.info(ME, "Added TopicProperty on first publish: " + topicProperty.toXml());
347 }
348
349 if (destination != "") {
350 SessionName sessionName(global_, destination);
351 Destination dest(global_, sessionName);
352 dest.forceQueuing(forceQueuing);
353 pq.addDestination(dest);
354 }
355
356 log_.info(ME, "mapSize=" + lexical_cast<string>(clientPropertyMap.size()) + " PublishQos: " + pq.toXml());
357
358 string contentTmp = contentStr;
359 if (contentSize >= 0) {
360 contentTmp = "";
361 for (int j=0; j<contentSize; j++)
362 contentTmp += "X";
363 }
364 else if (contentFile.size() > 0) {
365 fileRead(contentFile, contentTmp);
366 }
367 else {
368 contentTmp = StringTrim::replaceAll(contentTmp, "%counter", lexical_cast<string>(i+1));
369 }
370
371 MessageUnit msgUnit(key, contentTmp, pq);
372 if (oneway) {
373 log_.trace(ME, string("publishOneway() message unit: ") + msgUnit.toXml());
374 vector<MessageUnit> msgUnitArr;
375 msgUnitArr.push_back(msgUnit);
376 connection_.publishOneway(msgUnitArr);
377 log_.trace(ME, "publishOneway() done");
378 }
379 else {
380 log_.trace(ME, string("publish() message unit: ") + msgUnit.toXml());
381 PublishReturnQos tmp = connection_.publish(msgUnit);
382 log_.trace(ME, string("publish return qos: ") + tmp.toXml());
383 }
384 }
385 }
386
387
388 static void usage(I_Log& log)
389 {
390 log.plain("PublishDemo usage:", Global::usage());
391 string str = "\nPlus many more additional command line arguments:";
392 str += "\n -numPublish (int): the number of publishes which have to be done";
393 str += "\n -sleep (ms): the delay to wait between each publish. If negative (default) it does not wait";
394 str += "\n ...";
395 str += "\nExample:\n";
396 str += " PublishDemo -trace true -numPublish 1000\n";
397 str += " PublishDemo -destination joe -oid Hello -content 'Hi joe'\n";
398 log.plain("PublishDemo", str);
399 exit(0);
400 }
401
402
403 /**
404 * Try
405 * <pre>
406 * PublishDemo -help
407 * </pre>
408 * for usage help
409 * <p />Example:
410 * PublishDemo -oid __sys__remoteProperties -clientProperty.key "MultiByte" -clientProperty.value "With '�' multibyte" -clientProperty.charset windows-1252 -clientProperty.encoding base64
411 */
412 int main(int args, char ** argv)
413 {
414 try {
415 org::xmlBlaster::util::Object_Lifetime_Manager::init();
416 Global& glob = Global::getInstance();
417 glob.initialize(args, argv);
418 I_Log& log = glob.getLog("demo");
419
420 if (glob.wantsHelp()) {
421 usage(log);
422 }
423
424 PublishDemo demo(glob);
425 }
426 catch (XmlBlasterException& ex) {
427 std::cout << ex.toXml() << std::endl;
428 }
429 catch (bad_exception& ex) {
430 cout << "bad_exception: " << ex.what() << endl;
431 }
432 catch (exception& ex) {
433 cout << " exception: " << ex.what() << endl;
434 }
435 catch (string& ex) {
436 cout << "string: " << ex << endl;
437 }
438 catch (char* ex) {
439 cout << "char* : " << ex << endl;
440 }
441 catch (...) {
442 cout << "unknown exception occured" << endl;
443 XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
444 cout << e.toXml() << endl;
445 }
446
447 try {
448 org::xmlBlaster::util::Object_Lifetime_Manager::fini();
449 }
450 catch (...) {
451 cout << "unknown exception occured in fini()" << endl;
452 XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
453 cout << e.toXml() << endl;
454 }
455
456 return 0;
457 }
syntax highlighted by Code2HTML, v. 0.9.1