00001 /*----------------------------------------------------------------------------- 00002 Name: PublishDemo.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Little demo to show how a publish is done 00006 -----------------------------------------------------------------------------*/ 00007 00008 #include <client/XmlBlasterAccess.h> 00009 #include <util/Global.h> 00010 #include <util/lexical_cast.h> 00011 #include <util/qos/ClientProperty.h> 00012 #include <util/queue/PublishQueueEntry.h> 00013 #include <authentication/SecurityQos.h> 00014 #include <iostream> 00015 #include <fstream> 00016 #include <map> 00017 00018 using namespace std; 00019 using namespace org::xmlBlaster::client; 00020 using namespace org::xmlBlaster::util; 00021 using namespace org::xmlBlaster::util::qos; 00022 using namespace org::xmlBlaster::util::qos::storage; 00023 using namespace org::xmlBlaster::util::queue; 00024 using namespace org::xmlBlaster::client::qos; 00025 using namespace org::xmlBlaster::client::key; 00026 00027 static unsigned long filesize(ifstream &ins) 00028 { 00029 unsigned long s,e,c; 00030 c = ins.tellg(); // save current file position 00031 ins.seekg(0, ios::end); // position at end 00032 e = ins.tellg(); 00033 ins.seekg(0, ios::beg); // position at beginning 00034 s = ins.tellg(); 00035 ins.seekg(c); // restore file position 00036 return e-s; 00037 } 00038 00039 static int fileRead(string &fn, string &content) 00040 { 00041 unsigned char *buf; 00042 ifstream ins(fn.c_str(), ios_base::binary); 00043 if (!ins.is_open()) return -1; 00044 int fs = filesize(ins); 00045 buf = new unsigned char [fs+1]; 00046 buf[fs] = 0; // so we can assign to string 00047 ins.read((char *)buf,fs); 00048 ins.close(); 00049 content = (char *)buf; 00050 delete [] buf; 00051 return fs; 00052 } 00053 00054 00055 class PublishDemo : public org::xmlBlaster::util::dispatch::I_PostSendListener 00056 { 00057 private: 00058 string ME; 00059 Global& global_; 00060 I_Log& log_; 00061 char ptr[2]; 00062 XmlBlasterAccess connection_; 00063 bool interactive; 00064 bool oneway; 00065 long sleep; 00066 int numPublish; 00067 string oid; 00068 string domain; 00069 string clientTags; 00070 string contentStr; 00071 string contentFile; 00072 PriorityEnum priority; 00073 bool persistent; 00074 long lifeTime; 00075 bool forceUpdate; 00076 bool forceDestroy; 00077 bool readonly; 00078 long destroyDelay; 00079 bool createDomEntry; 00080 long historyMaxMsg; 00081 bool forceQueuing; 00082 bool subscribable; 00083 string destination; 00084 bool doErase; 00085 bool disconnect; 00086 bool eraseTailback; 00087 int contentSize; 00088 bool eraseForceDestroy; 00089 QosData::ClientPropertyMap clientPropertyMap; 00090 00091 public: 00092 PublishDemo(Global& glob) 00093 : ME("PublishDemo"), 00094 global_(glob), 00095 log_(glob.getLog("demo")), 00096 connection_(global_) 00097 { 00098 initEnvironment(); 00099 run(); 00100 } 00101 00102 void run() 00103 { 00104 connect(); 00105 publish(); 00106 erase(); 00107 connection_.disconnect(DisconnectQos(global_)); 00108 } 00109 00110 void initEnvironment(); 00111 00112 void connect(); 00113 00114 void publish(); 00115 00116 void erase() 00117 { 00118 if (doErase) { 00119 if (interactive) { 00120 string outStr = "Hit 'e' to erase topic '" + oid + "' ('q' to exit without erase) >> "; 00121 string ret = org::xmlBlaster::util::waitOnKeyboardHit(outStr); 00122 if (ret == "q") return; 00123 } 00124 log_.info(ME, "Erasing topic '" + oid + "'"); 00125 EraseKey key(global_); 00126 key.setOid(oid); 00127 EraseQos eq(global_); 00128 eq.setForceDestroy(eraseForceDestroy); 00129 connection_.erase(key, eq); 00130 } 00131 } 00132 00138 void postSend(const org::xmlBlaster::util::queue::MsgQueueEntry &msgQueueEntry) 00139 { 00140 if (msgQueueEntry.isPublish()) { 00141 const PublishQueueEntry* entry = dynamic_cast<const PublishQueueEntry*>(&msgQueueEntry); 00142 const PublishReturnQos* qos = entry->getPublishReturnQos(); 00143 log_.info(ME, "Tailback message is send from client queue, state=" + qos->getState() + ": " + msgQueueEntry.getMsgUnit().getContentStr()); 00144 } 00145 else { 00146 log_.info(ME, "Tailback message is send from client queue"); 00147 } 00148 } 00149 00150 }; 00151 00152 void PublishDemo::initEnvironment() 00153 { 00154 interactive = global_.getProperty().get("interactive", true); 00155 oneway = global_.getProperty().get("oneway", false); 00156 sleep = global_.getProperty().get("sleep", 1000L); 00157 numPublish = global_.getProperty().get("numPublish", 1); 00158 oid = global_.getProperty().get("oid", string("Hello")); 00159 domain = global_.getProperty().get("domain", string("")); 00160 clientTags = global_.getProperty().get("clientTags", ""); // "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>"); 00161 contentStr = global_.getProperty().get("content", "Hi-%counter"); 00162 contentFile = global_.getProperty().get("contentFile", ""); 00163 priority = int2Priority(global_.getProperty().get("priority", NORM_PRIORITY)); 00164 persistent = global_.getProperty().get("persistent", true); 00165 lifeTime = global_.getProperty().get("lifeTime", -1L); 00166 forceUpdate = global_.getProperty().get("forceUpdate", true); 00167 forceDestroy = global_.getProperty().get("forceDestroy", false); 00168 readonly = global_.getProperty().get("readonly", false); 00169 destroyDelay = global_.getProperty().get("destroyDelay", 60000L); 00170 createDomEntry = global_.getProperty().get("createDomEntry", true); 00171 historyMaxMsg = global_.getProperty().get("queue/history/maxEntries", -1L); 00172 forceQueuing = global_.getProperty().get("forceQueuing", true); 00173 subscribable = global_.getProperty().get("subscribable", true); 00174 destination = global_.getProperty().get("destination", ""); 00175 doErase = global_.getProperty().get("doErase", true); 00176 disconnect = global_.getProperty().get("disconnect", true); 00177 eraseTailback = global_.getProperty().get("eraseTailback", false); 00178 contentSize = global_.getProperty().get("contentSize", -1); // 2000000); 00179 eraseForceDestroy = global_.getProperty().get("erase.forceDestroy", false); 00180 00181 //TODO: Needs to be ported similar to Java 00182 //map<std::string,std::string> clientPropertyMap = global_.getProperty().get("clientProperty", map<std::string,std::string>()); 00183 string clientPropertyKey = global_.getProperty().get("clientProperty.key", string("")); 00184 string clientPropertyValue = global_.getProperty().get("clientProperty.value", string("")); 00185 string clientPropertyEncoding = global_.getProperty().get("clientProperty.encoding", ""); // Force to Constants::ENCODING_BASE64="base64" 00186 string clientPropertyCharset = global_.getProperty().get("clientProperty.charset", ""); // Force to e.g. "windows-1252" 00187 string clientPropertyType = global_.getProperty().get("clientProperty.type", ""); // Date type, see Constants::TYPE_DOUBLE, Constants::TYPE_STRING etc 00188 if (clientPropertyKey != "") { 00189 ClientProperty cp(clientPropertyKey, clientPropertyValue, clientPropertyType, clientPropertyEncoding); 00190 if (clientPropertyCharset != "") cp.setCharset(clientPropertyCharset); 00191 // 00192 // Returns "en_US.UTF-8" on Linux and "English_United States.1252" on WinXP 00193 //char *p = setlocale(LC_CTYPE, ""); 00194 //log_.info(ME, "setlocale CTYPE returns: " + string(p)); 00195 // But java (server on Linux or Windows) can't handle "English_United States.1252" or "1252": java.io.UnsupportedEncodingException: 1252 00196 // but it can handle conversion from "windows-1252" to "UTF-8" 00197 // Further, java does: UnsupportedEncodingException: en_US.UTF-8 00198 // but likes "UTF-8" 00199 //What else instead of setlocal() could we use for automatic charset detection of this C++ client (which is compatible to Java used names)? 00200 clientPropertyMap.insert(QosData::ClientPropertyMap::value_type(clientPropertyKey, cp)); 00201 } 00202 00203 if (historyMaxMsg < 1 && !global_.getProperty().propertyExists("destroyDelay")) 00204 destroyDelay = 24L*60L*60L*1000L; // Increase destroyDelay to one day if no history queue is used 00205 00206 log_.info(ME, "You can use for example '-session.name publisher/1 -passwd secret' to pass your credentials"); 00207 log_.info(ME, "Used settings are:"); 00208 log_.info(ME, " -interactive " + lexical_cast<string>(interactive)); 00209 log_.info(ME, " -sleep " + lexical_cast<string>(sleep)); // org.jutils.time.TimeHelper.millisToNice(sleep)); 00210 log_.info(ME, " -oneway " + lexical_cast<string>(oneway)); 00211 log_.info(ME, " -doErase " + lexical_cast<string>(doErase)); 00212 log_.info(ME, " -disconnect " + lexical_cast<string>(disconnect)); 00213 log_.info(ME, " -eraseTailback " + lexical_cast<string>(eraseTailback)); 00214 log_.info(ME, " Pub/Sub settings"); 00215 log_.info(ME, " -numPublish " + lexical_cast<string>(numPublish)); 00216 log_.info(ME, " -oid " + lexical_cast<string>(oid)); 00217 log_.info(ME, " -domain " + lexical_cast<string>(domain)); 00218 log_.info(ME, " -clientTags " + clientTags); 00219 if (contentSize >= 0) { 00220 log_.info(ME, " -content [generated]"); 00221 log_.info(ME, " -contentSize " + lexical_cast<string>(contentSize)); 00222 } 00223 else if (contentFile.size() > 0) { 00224 log_.info(ME, " -contentFile " + contentFile); 00225 } 00226 else { 00227 log_.info(ME, " -content " + contentStr); 00228 log_.info(ME, " -contentSize " + lexical_cast<string>(contentStr.length())); 00229 } 00230 log_.info(ME, " -priority " + lexical_cast<string>(priority)); 00231 log_.info(ME, " -persistent " + lexical_cast<string>(persistent)); 00232 log_.info(ME, " -lifeTime " + lexical_cast<string>(lifeTime)); // org.jutils.time.TimeHelper.millisToNice(lifeTime)); 00233 log_.info(ME, " -forceUpdate " + lexical_cast<string>(forceUpdate)); 00234 log_.info(ME, " -forceDestroy " + lexical_cast<string>(forceDestroy)); 00235 if (clientPropertyMap.size() > 0) { 00236 QosData::ClientPropertyMap::const_iterator mi; 00237 for (mi=clientPropertyMap.begin(); mi!=clientPropertyMap.end(); ++mi) { 00238 log_.info(ME, " -clientProperty["+mi->first+"] " + mi->second.getStringValue()); 00239 } 00240 } 00241 else { 00242 log_.info(ME, " -clientProperty[] "); 00243 } 00244 log_.info(ME, " Topic settings"); 00245 log_.info(ME, " -readonly " + lexical_cast<string>(readonly)); 00246 log_.info(ME, " -destroyDelay " + lexical_cast<string>(destroyDelay)); // org.jutils.time.TimeHelper.millisToNice(destroyDelay)); 00247 log_.info(ME, " -createDomEntry " + lexical_cast<string>(createDomEntry)); 00248 log_.info(ME, " -queue/history/maxEntries " + lexical_cast<string>(historyMaxMsg)); 00249 log_.info(ME, " PtP settings"); 00250 log_.info(ME, " -subscribable " + lexical_cast<string>(subscribable)); 00251 log_.info(ME, " -forceQueuing " + lexical_cast<string>(forceQueuing)); 00252 log_.info(ME, " -destination " + destination); 00253 log_.info(ME, " Erase settings"); 00254 log_.info(ME, " -erase.forceDestroy " + lexical_cast<string>(eraseForceDestroy)); 00255 log_.info(ME, "For more info please read:"); 00256 log_.info(ME, " http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html"); 00257 } 00258 00259 void PublishDemo::connect() 00260 { 00261 ConnectQos connQos(global_); 00262 //org::xmlBlaster::authentication::SecurityQos sec(global_, "jack", "secret", "htpasswd,1.0"); 00263 //connQos.setSecurityQos(sec); 00264 00265 connection_.registerPostSendListener(this); 00266 00267 log_.trace(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos.toXml()); 00268 ConnectReturnQos retQos = connection_.connect(connQos, NULL); // no callback 00269 log_.trace(ME, "successfully connected to " + connection_.getServerNodeId() + ". Return qos: " + retQos.toXml()); 00270 } 00271 00272 void PublishDemo::publish() 00273 { 00274 for(int i=0; i<numPublish; i++) { 00275 00276 if (interactive) { 00277 std::cout << "Hit a key to publish '" + oid + "' #" + lexical_cast<string>(i+1) + "/" + lexical_cast<string>(numPublish) + " ('b' to break) >> "; 00278 std::cin.read(ptr,1); 00279 if (*ptr == 'b') break; 00280 } 00281 else { 00282 if (sleep > 0) { 00283 try { 00284 org::xmlBlaster::util::thread::Thread::sleep(sleep); 00285 } 00286 catch(XmlBlasterException e) { 00287 log_.error(ME, e.toXml()); 00288 } 00289 } 00290 log_.info(ME, "Publish '" + oid + "' #" + lexical_cast<string>(i+1) + "/" + lexical_cast<string>(numPublish)); 00291 } 00292 00293 PublishKey key(global_, oid, "text/xml", "1.0"); 00294 key.setClientTags(clientTags); 00295 if (domain != "") key.setDomain(domain); 00296 if (i==0) log_.info(ME, "PublishKey: " + key.toXml()); 00297 00298 PublishQos pq(global_); 00299 pq.setPriority(priority); 00300 pq.setPersistent(persistent); 00301 pq.setLifeTime(lifeTime); 00302 pq.setForceUpdate(forceUpdate); 00303 pq.setForceDestroy(forceDestroy); 00304 pq.setSubscribable(subscribable); 00305 if (clientPropertyMap.size() > 0) { 00306 pq.setClientProperties(clientPropertyMap); 00307 //This is the correct way for a typed property: 00308 pq.addClientProperty("ALONG", long(12L)); 00309 } 00310 00311 if (i == 0) { 00312 TopicProperty topicProperty(global_); 00313 topicProperty.setDestroyDelay(destroyDelay); 00314 topicProperty.setCreateDomEntry(createDomEntry); 00315 topicProperty.setReadonly(readonly); 00316 if (historyMaxMsg >= 0L) { 00317 HistoryQueueProperty prop(global_, ""); 00318 prop.setMaxEntries(historyMaxMsg); 00319 topicProperty.setHistoryQueueProperty(prop); 00320 } 00321 pq.setTopicProperty(topicProperty); 00322 log_.info(ME, "Added TopicProperty on first publish: " + topicProperty.toXml()); 00323 } 00324 00325 if (destination != "") { 00326 SessionName sessionName(global_, destination); 00327 Destination dest(global_, sessionName); 00328 dest.forceQueuing(forceQueuing); 00329 pq.addDestination(dest); 00330 } 00331 00332 log_.info(ME, "mapSize=" + lexical_cast<string>(clientPropertyMap.size()) + " PublishQos: " + pq.toXml()); 00333 00334 string contentTmp = contentStr; 00335 if (contentSize >= 0) { 00336 contentTmp = ""; 00337 for (int j=0; j<contentSize; j++) 00338 contentTmp += "X"; 00339 } 00340 else if (contentFile.size() > 0) { 00341 fileRead(contentFile, contentTmp); 00342 } 00343 else { 00344 contentTmp = StringTrim::replaceAll(contentTmp, "%counter", lexical_cast<string>(i+1)); 00345 } 00346 00347 MessageUnit msgUnit(key, contentTmp, pq); 00348 if (oneway) { 00349 log_.trace(ME, string("publishOneway() message unit: ") + msgUnit.toXml()); 00350 vector<MessageUnit> msgUnitArr; 00351 msgUnitArr.push_back(msgUnit); 00352 connection_.publishOneway(msgUnitArr); 00353 log_.trace(ME, "publishOneway() done"); 00354 } 00355 else { 00356 log_.trace(ME, string("publish() message unit: ") + msgUnit.toXml()); 00357 PublishReturnQos tmp = connection_.publish(msgUnit); 00358 log_.trace(ME, string("publish return qos: ") + tmp.toXml()); 00359 } 00360 } 00361 } 00362 00363 00364 static void usage(I_Log& log) 00365 { 00366 log.plain("PublishDemo usage:", Global::usage()); 00367 string str = "\nPlus many more additional command line arguments:"; 00368 str += "\n -numPublish (int): the number of publishes which have to be done"; 00369 str += "\n -sleep (ms): the delay to wait between each publish. If negative (default) it does not wait"; 00370 str += "\n ..."; 00371 str += "\nExample:\n"; 00372 str += " PublishDemo -trace true -numPublish 1000\n"; 00373 str += " PublishDemo -destination joe -oid Hello -content 'Hi joe'\n"; 00374 log.plain("PublishDemo", str); 00375 exit(0); 00376 } 00377 00378 00388 int main(int args, char ** argv) 00389 { 00390 try { 00391 org::xmlBlaster::util::Object_Lifetime_Manager::init(); 00392 Global& glob = Global::getInstance(); 00393 glob.initialize(args, argv); 00394 I_Log& log = glob.getLog("demo"); 00395 00396 if (glob.wantsHelp()) { 00397 usage(log); 00398 } 00399 00400 PublishDemo demo(glob); 00401 } 00402 catch (XmlBlasterException& ex) { 00403 std::cout << ex.toXml() << std::endl; 00404 } 00405 catch (bad_exception& ex) { 00406 cout << "bad_exception: " << ex.what() << endl; 00407 } 00408 catch (exception& ex) { 00409 cout << " exception: " << ex.what() << endl; 00410 } 00411 catch (string& ex) { 00412 cout << "string: " << ex << endl; 00413 } 00414 catch (char* ex) { 00415 cout << "char* : " << ex << endl; 00416 } 00417 catch (...) { 00418 cout << "unknown exception occured" << endl; 00419 XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread"); 00420 cout << e.toXml() << endl; 00421 } 00422 00423 try { 00424 org::xmlBlaster::util::Object_Lifetime_Manager::fini(); 00425 } 00426 catch (...) { 00427 cout << "unknown exception occured in fini()" << endl; 00428 XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread"); 00429 cout << e.toXml() << endl; 00430 } 00431 00432 return 0; 00433 }