1 /*-----------------------------------------------------------------------------
  2 Name:      PublishDemo.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Little demo to show how a publish is done
  6 -----------------------------------------------------------------------------*/
  7 
  8 #include <client/XmlBlasterAccess.h>

  9 #include <util/Global.h>

 10 #include <util/lexical_cast.h>

 11 #include <util/qos/ClientProperty.h>

 12 #include <util/queue/PublishQueueEntry.h>

 13 #include <authentication/SecurityQos.h>

 14 #include <iostream>

 15 #include <fstream>

 16 #include <map>

 17 
 18 using namespace std;
 19 using namespace org::xmlBlaster::client;
 20 using namespace org::xmlBlaster::util;
 21 using namespace org::xmlBlaster::util::qos;
 22 using namespace org::xmlBlaster::util::qos::storage;
 23 using namespace org::xmlBlaster::util::queue;
 24 using namespace org::xmlBlaster::client::qos;
 25 using namespace org::xmlBlaster::client::key;
 26 
 27 static unsigned long filesize(ifstream &ins)
 28 {
 29    unsigned long s,e,c;
 30    c = ins.tellg();        // save current file position

 31    ins.seekg(0, ios::end); // position at end

 32    e = ins.tellg();
 33    ins.seekg(0, ios::beg); // position at beginning

 34    s = ins.tellg();
 35    ins.seekg(c);           // restore file position

 36    return e-s;
 37 }
 38 
 39 static int fileRead(string &fn, string &content)
 40 {
 41    unsigned char *buf;
 42    ifstream ins(fn.c_str(), ios_base::binary);
 43    if (!ins.is_open()) return -1;
 44    int   fs   = filesize(ins);
 45    buf  = new unsigned char [fs+1];
 46    buf[fs]    = 0; // so we can assign to string

 47    ins.read((char *)buf,fs);
 48    ins.close();
 49    content = (char *)buf;
 50    delete [] buf;
 51    return fs;
 52 }
 53 
 54 
 55 class PublishDemo : public org::xmlBlaster::util::dispatch::I_PostSendListener
 56 {
 57 private:
 58    string ME;
 59    Global& global_;
 60    I_Log& log_;
 61    char ptr[2];
 62    XmlBlasterAccess connection_;
 63    bool interactive;
 64    bool oneway;
 65    long sleep;
 66    int numPublish;
 67    string oid;
 68    string domain;
 69    string clientTags;
 70    string contentStr;
 71    string contentFile;
 72    PriorityEnum priority;
 73    bool persistent;
 74    long lifeTime;
 75    bool forceUpdate;
 76    bool forceDestroy;
 77    bool readonly;
 78    long destroyDelay;
 79    bool createDomEntry;
 80    long historyMaxMsg;
 81    bool forceQueuing;
 82    bool subscribable;
 83    string destination;
 84    bool doErase;
 85    bool disconnect;
 86    bool eraseTailback;
 87    int contentSize;
 88    bool eraseForceDestroy;
 89    QosData::ClientPropertyMap clientPropertyMap;
 90 
 91 public:
 92    PublishDemo(Global& glob) 
 93       : ME("PublishDemo"), 
 94         global_(glob), 
 95         log_(glob.getLog("demo")),
 96         connection_(global_)
 97    {
 98       initEnvironment();
 99       run();
100    }
101 
102    void run() 
103    {
104       connect();
105       publish();
106       erase();
107       connection_.disconnect(DisconnectQos(global_));
108    }
109 
110    void initEnvironment();
111 
112    void connect();
113 
114    void publish();
115 
116    void erase()
117    {
118       if (doErase) {
119          if (interactive) {
120             string outStr = "Hit 'e' to erase topic '" + oid + "' ('q' to exit without erase) >> "; 
121             string ret = org::xmlBlaster::util::waitOnKeyboardHit(outStr);
122             if (ret == "q") return;
123          }
124          log_.info(ME, "Erasing topic '" + oid + "'");
125          EraseKey key(global_);
126          key.setOid(oid);
127          EraseQos eq(global_);
128          eq.setForceDestroy(eraseForceDestroy);
129          connection_.erase(key, eq);
130       }
131    }
132    
133    /**
134     * Is called after each successful send tail back message from client side queue
135     * (typically after a connection loss with ongoing publishes)
136     * @see I_PostSendListener and connection_.registerPostSendListener(this);
137     */
138    void postSend(const std::vector<org::xmlBlaster::util::queue::EntryType> &entries)
139    {
140       vector<EntryType>::const_iterator iter = entries.begin();
141       while (iter != entries.end()) {
142          const EntryType entryRef = (*iter);
143          iter++;
144          const MsgQueueEntry &entry = *entryRef;
145          if (entry.isPublish()) {
146             const PublishQueueEntry& pubEntry = *(dynamic_cast<const PublishQueueEntry*>(&entry));
147             const PublishReturnQos* qos = pubEntry.getPublishReturnQos();
148             log_.info(ME, "Tailback message is send from client queue, state=" + qos->getState() + ": " + pubEntry.getMsgUnit().getContentStr());
149          }
150          else {
151             log_.info(ME, "Tailback message is send from client queue");
152          }
153       }
154    }
155 
156    /**
157     * Is called asynchronously if a tailback message from our queue couldn't be send. 
158     * (typically after a connection loss with ongoing publishes)
159     * @see I_PostSendListener and connection_.registerPostSendListener(this);
160     */
161    bool sendingFailed(const std::vector<org::xmlBlaster::util::queue::EntryType> &entries, const XmlBlasterException &exception)
162    {
163       vector<EntryType>::const_iterator iter = entries.begin();
164       while (iter != entries.end()) {
165          const EntryType entryRef = (*iter);
166          iter++;
167          const MsgQueueEntry &entry = *entryRef;
168          log_.warn(ME, "Tailback '"+ entry.getMethodName() + "' message sending from client queue failed: " + exception.getMessage());
169       }
170       return false; // Let the framework handle it

171    }
172 
173 
174 };
175 
176 void PublishDemo::initEnvironment()
177 {
178    interactive = global_.getProperty().get("interactive", true);
179    oneway = global_.getProperty().get("oneway", false);
180    sleep = global_.getProperty().get("sleep", 1000L);
181    numPublish = global_.getProperty().get("numPublish", 1);
182    oid = global_.getProperty().get("oid", string("Hello"));
183    domain = global_.getProperty().get("domain", string(""));
184    clientTags = global_.getProperty().get("clientTags", ""); // "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>");

185    contentStr = global_.getProperty().get("content", "Hi-%counter");
186    contentFile = global_.getProperty().get("contentFile", "");
187    priority = int2Priority(global_.getProperty().get("priority", NORM_PRIORITY));
188    persistent = global_.getProperty().get("persistent", true);
189    lifeTime = global_.getProperty().get("lifeTime", -1L);
190    forceUpdate = global_.getProperty().get("forceUpdate", true);
191    forceDestroy = global_.getProperty().get("forceDestroy", false);
192    readonly = global_.getProperty().get("readonly", false);
193    destroyDelay = global_.getProperty().get("destroyDelay", 60000L);
194    createDomEntry = global_.getProperty().get("createDomEntry", true);
195    historyMaxMsg = global_.getProperty().get("queue/history/maxEntries", -1L);
196    forceQueuing = global_.getProperty().get("forceQueuing", true);
197    subscribable = global_.getProperty().get("subscribable", true);
198    destination = global_.getProperty().get("destination", "");
199    doErase = global_.getProperty().get("doErase", true);
200    disconnect = global_.getProperty().get("disconnect", true);
201    eraseTailback = global_.getProperty().get("eraseTailback", false);
202    contentSize = global_.getProperty().get("contentSize", -1); // 2000000);

203    eraseForceDestroy = global_.getProperty().get("erase.forceDestroy", false);
204 
205    //TODO: Needs to be ported similar to Java

206    //map<std::string,std::string> clientPropertyMap = global_.getProperty().get("clientProperty", map<std::string,std::string>());

207    string clientPropertyKey = global_.getProperty().get("clientProperty.key", string(""));
208    string clientPropertyValue = global_.getProperty().get("clientProperty.value", string(""));
209    string clientPropertyEncoding = global_.getProperty().get("clientProperty.encoding", ""); // Force to Constants::ENCODING_BASE64="base64"

210    string clientPropertyCharset = global_.getProperty().get("clientProperty.charset", ""); // Force to e.g. "windows-1252"

211    string clientPropertyType = global_.getProperty().get("clientProperty.type", ""); // Date type, see Constants::TYPE_DOUBLE, Constants::TYPE_STRING etc

212    if (clientPropertyKey != "") {
213       ClientProperty cp(clientPropertyKey, clientPropertyValue, clientPropertyType, clientPropertyEncoding);
214       if (clientPropertyCharset != "") cp.setCharset(clientPropertyCharset);
215       //

216       // Returns "en_US.UTF-8" on Linux and "English_United States.1252" on WinXP

217       //char *p = setlocale(LC_CTYPE, "");

218       //log_.info(ME, "setlocale CTYPE returns: " + string(p));

219       // But java (server on Linux or Windows) can't handle "English_United States.1252" or "1252": java.io.UnsupportedEncodingException: 1252

220       // but it can handle conversion from "windows-1252" to "UTF-8"

221       // Further, java does: UnsupportedEncodingException: en_US.UTF-8

222       // but likes "UTF-8"

223       //What else instead of setlocal() could we use for automatic charset detection of this C++ client (which is compatible to Java used names)?

224       clientPropertyMap.insert(QosData::ClientPropertyMap::value_type(clientPropertyKey, cp));
225    }
226 
227    if (historyMaxMsg < 1 && !global_.getProperty().propertyExists("destroyDelay"))
228       destroyDelay = 24L*60L*60L*1000L; // Increase destroyDelay to one day if no history queue is used

229 
230    log_.info(ME, "You can use for example '-session.name publisher/1 -passwd secret' to pass your credentials");
231    log_.info(ME, "Used settings are:");
232    log_.info(ME, "   -interactive    " + lexical_cast<string>(interactive));
233    log_.info(ME, "   -sleep          " + lexical_cast<string>(sleep)); // org.jutils.time.TimeHelper.millisToNice(sleep));

234    log_.info(ME, "   -oneway         " + lexical_cast<string>(oneway));
235    log_.info(ME, "   -doErase        " + lexical_cast<string>(doErase));
236    log_.info(ME, "   -disconnect     " + lexical_cast<string>(disconnect));
237    log_.info(ME, "   -eraseTailback  " + lexical_cast<string>(eraseTailback));
238    log_.info(ME, " Pub/Sub settings");
239    log_.info(ME, "   -numPublish     " + lexical_cast<string>(numPublish));
240    log_.info(ME, "   -oid            " + lexical_cast<string>(oid));
241    log_.info(ME, "   -domain         " + lexical_cast<string>(domain));
242    log_.info(ME, "   -clientTags     " + clientTags);
243    if (contentSize >= 0) {
244       log_.info(ME, "   -content        [generated]");
245       log_.info(ME, "   -contentSize    " + lexical_cast<string>(contentSize));
246    }
247    else if (contentFile.size() > 0) {
248       log_.info(ME, "   -contentFile    " + contentFile);
249    }
250    else {
251       log_.info(ME, "   -content        " + contentStr);
252       log_.info(ME, "   -contentSize    " + lexical_cast<string>(contentStr.length()));
253    }
254    log_.info(ME, "   -priority       " + lexical_cast<string>(priority));
255    log_.info(ME, "   -persistent     " + lexical_cast<string>(persistent));
256    log_.info(ME, "   -lifeTime       " + lexical_cast<string>(lifeTime)); // org.jutils.time.TimeHelper.millisToNice(lifeTime));

257    log_.info(ME, "   -forceUpdate    " + lexical_cast<string>(forceUpdate));
258    log_.info(ME, "   -forceDestroy   " + lexical_cast<string>(forceDestroy));
259    if (clientPropertyMap.size() > 0) {
260       QosData::ClientPropertyMap::const_iterator mi;
261       for (mi=clientPropertyMap.begin(); mi!=clientPropertyMap.end(); ++mi) {
262          log_.info(ME, "   -clientProperty["+mi->first+"]   " + mi->second.getStringValue());
263       }
264    }
265    else {
266       log_.info(ME, "   -clientProperty[]   ");
267    }
268    log_.info(ME, " Topic settings");
269    log_.info(ME, "   -readonly       " + lexical_cast<string>(readonly));
270    log_.info(ME, "   -destroyDelay   " + lexical_cast<string>(destroyDelay)); // org.jutils.time.TimeHelper.millisToNice(destroyDelay));

271    log_.info(ME, "   -createDomEntry " + lexical_cast<string>(createDomEntry));
272    log_.info(ME, "   -queue/history/maxEntries " + lexical_cast<string>(historyMaxMsg));
273    log_.info(ME, " PtP settings");
274    log_.info(ME, "   -subscribable  " + lexical_cast<string>(subscribable));
275    log_.info(ME, "   -forceQueuing   " + lexical_cast<string>(forceQueuing));
276    log_.info(ME, "   -destination    " + destination);
277    log_.info(ME, " Erase settings");
278    log_.info(ME, "   -erase.forceDestroy " + lexical_cast<string>(eraseForceDestroy));
279    log_.info(ME, "For more info please read:");
280    log_.info(ME, "   http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html");
281 }
282 
283 void PublishDemo::connect()
284 {
285    ConnectQos connQos(global_);
286    //org::xmlBlaster::authentication::SecurityQos sec(global_, "jack", "secret", "htpasswd,1.0");

287    //connQos.setSecurityQos(sec);

288    
289    connection_.registerPostSendListener(this);
290 
291    log_.trace(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos.toXml());
292    ConnectReturnQos retQos = connection_.connect(connQos, NULL); // no callback

293    log_.trace(ME, "successfully connected to " + connection_.getServerNodeId() + ". Return qos: " + retQos.toXml());
294 }
295 
296 void PublishDemo::publish()
297 {
298    for(int i=0; i<numPublish; i++) {
299    
300       if (interactive) {
301          std::cout << "Hit a key to publish '" + oid + "' #" + lexical_cast<string>(i+1) + "/" + lexical_cast<string>(numPublish) + " ('b' to break) >> ";
302          std::cin.read(ptr,1);
303          if (*ptr == 'b') break;
304       }
305       else {
306          if (sleep > 0) {
307             try {
308                org::xmlBlaster::util::thread::Thread::sleep(sleep);
309             }
310             catch(XmlBlasterException e) {
311                log_.error(ME, e.toXml());
312             }
313          }
314          log_.info(ME, "Publish '" + oid + "' #" + lexical_cast<string>(i+1) + "/" + lexical_cast<string>(numPublish));
315       }
316 
317       PublishKey key(global_, oid, "text/xml", "1.0");
318       key.setClientTags(clientTags);
319       if (domain != "")  key.setDomain(domain);
320       if (i==0) log_.info(ME, "PublishKey: " + key.toXml());
321 
322       PublishQos pq(global_);
323       pq.setPriority(priority);
324       pq.setPersistent(persistent);
325       pq.setLifeTime(lifeTime);
326       pq.setForceUpdate(forceUpdate);
327       pq.setForceDestroy(forceDestroy);
328       pq.setSubscribable(subscribable);
329       if (clientPropertyMap.size() > 0) {
330          pq.setClientProperties(clientPropertyMap);
331          //This is the correct way for a typed property:

332          pq.addClientProperty("ALONG", long(12L));
333       }
334       
335       if (i == 0) {
336          TopicProperty topicProperty(global_);
337          topicProperty.setDestroyDelay(destroyDelay);
338          topicProperty.setCreateDomEntry(createDomEntry);
339          topicProperty.setReadonly(readonly);
340          if (historyMaxMsg >= 0L) {
341             HistoryQueueProperty prop(global_, "");
342             prop.setMaxEntries(historyMaxMsg);
343             topicProperty.setHistoryQueueProperty(prop);
344          }
345          pq.setTopicProperty(topicProperty);
346          log_.info(ME, "Added TopicProperty on first publish: " + topicProperty.toXml());
347       }
348       
349       if (destination != "") {
350          SessionName sessionName(global_, destination);
351          Destination dest(global_, sessionName);
352          dest.forceQueuing(forceQueuing);
353          pq.addDestination(dest);
354       }
355 
356       log_.info(ME, "mapSize=" + lexical_cast<string>(clientPropertyMap.size()) + " PublishQos: " + pq.toXml());
357 
358       string contentTmp = contentStr;
359       if (contentSize >= 0) {
360          contentTmp = "";
361          for (int j=0; j<contentSize; j++)
362             contentTmp += "X";
363       }
364       else if (contentFile.size() > 0) {
365          fileRead(contentFile, contentTmp);
366       }
367       else {
368          contentTmp = StringTrim::replaceAll(contentTmp, "%counter", lexical_cast<string>(i+1));
369       }
370 
371       MessageUnit msgUnit(key, contentTmp, pq);
372       if (oneway) {
373          log_.trace(ME, string("publishOneway() message unit: ") + msgUnit.toXml());
374          vector<MessageUnit> msgUnitArr;
375          msgUnitArr.push_back(msgUnit);
376          connection_.publishOneway(msgUnitArr);
377          log_.trace(ME, "publishOneway() done");
378       }
379       else {
380          log_.trace(ME, string("publish() message unit: ") + msgUnit.toXml());
381          PublishReturnQos tmp = connection_.publish(msgUnit);
382          log_.trace(ME, string("publish return qos: ") + tmp.toXml());
383       }
384    }
385 }
386 
387 
388 static void usage(I_Log& log) 
389 {
390    log.plain("PublishDemo usage:", Global::usage());
391    string str = "\nPlus many more additional command line arguments:";
392    str += "\n -numPublish (int): the number of publishes which have to be done";
393    str += "\n -sleep (ms): the delay to wait between each publish. If negative (default) it does not wait";
394    str += "\n ...";
395    str += "\nExample:\n";
396    str += "   PublishDemo -trace true -numPublish 1000\n";
397    str += "   PublishDemo -destination joe -oid Hello -content 'Hi joe'\n";
398    log.plain("PublishDemo", str);
399    exit(0);
400 }
401 
402 
403 /**
404  * Try
405  * <pre>
406  *   PublishDemo -help
407  * </pre>
408  * for usage help
409  * <p />Example:
410  * PublishDemo -oid __sys__remoteProperties -clientProperty.key "MultiByte" -clientProperty.value "With '�' multibyte" -clientProperty.charset windows-1252 -clientProperty.encoding base64
411  */
412 int main(int args, char ** argv)
413 {
414    try {
415       org::xmlBlaster::util::Object_Lifetime_Manager::init();
416       Global& glob = Global::getInstance();
417       glob.initialize(args, argv);
418       I_Log& log  = glob.getLog("demo");
419 
420       if (glob.wantsHelp()) {
421          usage(log);
422       }
423 
424       PublishDemo demo(glob);
425    }
426    catch (XmlBlasterException& ex) {
427       std::cout << ex.toXml() << std::endl;
428    }
429    catch (bad_exception& ex) {
430       cout << "bad_exception: " << ex.what() << endl;
431    }
432    catch (exception& ex) {
433       cout << " exception: " << ex.what() << endl;
434    }
435    catch (string& ex) {
436       cout << "string: " << ex << endl;
437    }
438    catch (char* ex) {
439       cout << "char* :  " << ex << endl;
440    }
441    catch (...) {
442       cout << "unknown exception occured" << endl;
443       XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
444       cout << e.toXml() << endl;
445    }
446 
447    try {
448       org::xmlBlaster::util::Object_Lifetime_Manager::fini();
449    }
450    catch (...) {
451       cout << "unknown exception occured in fini()" << endl;
452       XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
453       cout << e.toXml() << endl;
454    }
455 
456    return 0;
457 }


syntax highlighted by Code2HTML, v. 0.9.1