1 /*------------------------------------------------------------------------------
  2 Name:      XmlBlasterAccess.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 
  7 #include <client/XmlBlasterAccess.h>
  8 #include <util/Global.h>
  9 #include <util/lexical_cast.h>
 10 #include <util/Timestamp.h>
 11 #include <util/dispatch/DispatchManager.h>
 12 #include <util/parser/ParserFactory.h>
 13 #include <util/queue/MsgQueueEntry.h>
 14 #include <util/queue/I_Queue.h>
 15 
 16 
 17 namespace org { namespace xmlBlaster { namespace client {
 18 
 19 using namespace std;
 20 using namespace org::xmlBlaster::util;
 21 using namespace org::xmlBlaster::util::qos;
 22 using namespace org::xmlBlaster::util::dispatch;
 23 using namespace org::xmlBlaster::util::queue;
 24 using namespace org::xmlBlaster::util::qos::storage;
 25 using namespace org::xmlBlaster::util::qos::address;
 26 using namespace org::xmlBlaster::authentication;
 27 using namespace org::xmlBlaster::client::protocol;
 28 using namespace org::xmlBlaster::client::key;
 29 using namespace org::xmlBlaster::client::qos;
 30 
 31 XmlBlasterAccess::XmlBlasterAccess(Global& global)
 32    : ME(string("XmlBlasterAccess-UNCONNECTED")),
 33      global_(global), 
 34      globalRef_(NULL), 
 35      log_(global.getLog("org.xmlBlaster.client")),
 36      serverNodeId_("xmlBlaster"), 
 37      connectQos_(new ConnectQos(global)), 
 38      connectReturnQos_((ConnectReturnQos*)0),
 39      subscriptionCallbackMap_(),
 40      updateMutex_(),
 41      invocationMutex_(global.getProperty().get("xmlBlaster/invocationMutex/recursive", true)),
 42      postSendListener_(0)
 43 {
 44    log_.call(ME, "::constructor");
 45    cbServer_           = NULL;
 46    updateClient_       = NULL;
 47    connection_         = NULL;
 48    dispatchManager_    = NULL;
 49    connectionProblems_ = NULL;
 50    instanceName_       = lexical_cast<std::string>(TimestampFactory::getInstance().getTimestamp());
 51 
 52    // Hack for Windows: Initialize it from main thread, using the callback thread fails undeterminable (with xerces)
 53    org::xmlBlaster::util::parser::ParserFactory::getFactory().initialize(global);
 54 }
 55 
 56 XmlBlasterAccess::XmlBlasterAccess(GlobalRef globalRef)
 57    : ME(string("XmlBlasterAccess-UNCONNECTED")),
 58      global_(*globalRef), 
 59      globalRef_(globalRef), 
 60      log_(global_.getLog("org.xmlBlaster.client")),
 61      serverNodeId_("xmlBlaster"), 
 62      connectQos_(new ConnectQos(global_)), 
 63      connectReturnQos_((ConnectReturnQos*)0),
 64      subscriptionCallbackMap_(),
 65      updateMutex_(),
 66      invocationMutex_(globalRef->getProperty().get("xmlBlaster/invocationMutex/recursive", true)),
 67      postSendListener_(0)
 68 {
 69    log_.call(ME, "::constructor");
 70    cbServer_           = NULL;
 71    updateClient_       = NULL;
 72    connection_         = NULL;
 73    dispatchManager_    = NULL;
 74    connectionProblems_ = NULL;
 75    instanceName_       = lexical_cast<std::string>(TimestampFactory::getInstance().getTimestamp());
 76 
 77    // Hack for Windows: Initialize it from main thread, using the callback thread fails undeterminable (with xerces)
 78    org::xmlBlaster::util::parser::ParserFactory::getFactory().initialize(global_);
 79 }
 80 
 81 XmlBlasterAccess::~XmlBlasterAccess()
 82 {
 83    if (log_.call()) log_.call(ME, "destructor");
 84    cleanup(true);
 85    dispatchManager_    = NULL;
 86    updateClient_       = NULL;
 87    connectionProblems_ = NULL;
 88    if (log_.trace()) log_.trace(ME, "destructor ended");
 89 }
 90 
 91 void XmlBlasterAccess::cleanup(bool doLock)
 92 {
 93    if (log_.call()) log_.call(ME, "cleanup");
 94    if (doLock) {
 95       // synchronization
 96       org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
 97       org::xmlBlaster::util::thread::Lock lock1(updateMutex_);
 98       subscriptionCallbackMap_.clear();
 99    }
100    else {
101       org::xmlBlaster::util::thread::Lock lock1(updateMutex_);
102       subscriptionCallbackMap_.clear();
103    }
104 
105    if (cbServer_) {
106       CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
107       const AddressBaseRef& addr = prop.getCurrentCallbackAddress(); // c++ may not return null
108       global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() );
109       cbServer_ = NULL;
110    }
111    if (connection_) {
112       if (log_.trace()) log_.trace(ME, "destructor: going to delete the connection");
113       connection_->shutdown();
114       delete connection_;
115       connection_ = NULL;
116    }
117 }
118 
119 
120 ConnectReturnQos XmlBlasterAccess::connect(const ConnectQos& qos, I_Callback *clientCb)
121 {
122    ME = string("XmlBlasterAccess-") + qos.getSessionQos().getAbsoluteName();
123    if (log_.call()) log_.call(ME, "::connect");
124    if (log_.dump()) log_.dump(ME, string("::connect: qos: ") + qos.toXml());
125 
126    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
127 
128    cleanup(false);
129 
130    global_.setId(qos.getSessionQos().getAbsoluteName()); // global_.setId(loginName + currentTimeMillis());
131    connectQos_ = new ConnectQos(qos);
132    connectQos_->setInstanceId(global_.getInstanceId());
133 
134    SecurityQos securityQos = connectQos_->getSecurityQos();
135 
136    ME = string("XmlBlasterAccess-") + getId();
137    string typeVersion = global_.getProperty().getStringProperty("queue/defaultPlugin", "CACHE,1.0");
138    typeVersion = global_.getProperty().getStringProperty("queue/connection/defaultPlugin", "typeVersion");
139    updateClient_ = clientCb;
140 
141    if (updateClient_) createDefaultCbServer();
142 
143    if (log_.trace()) log_.trace(ME, string("::connect. CbServer done"));
144    // currently the simple version will do it ...
145    if (!dispatchManager_) dispatchManager_ = &(global_.getDispatchManager());
146 
147    if (!connection_) {
148       connection_ = dispatchManager_->getConnectionsHandler(instanceName_);
149       connection_->registerPostSendListener(this);
150    }
151 
152    if (connectionProblems_) {
153       if (log_.trace()) log_.trace(ME, "::connect. Registering initFailsafe");
154       connection_->initFailsafe(connectionProblems_);
155       connectionProblems_ = NULL;
156    }
157    if (log_.trace()) log_.trace(ME, string("::connect. connectQos: ") + connectQos_->toXml());
158    // do connect() now:
159    connectReturnQos_ = connection_->connect(connectQos_);
160 
161    ME = string("XmlBlasterAccess-") + connectReturnQos_->getSessionQos().getAbsoluteName();
162 
163    setServerNodeId(connectReturnQos_->getSessionQos().getClusterNodeId());
164    
165    // Is done in ConnectionsHandler.cpp
166    //global_.setId(connectReturnQos_->getSessionQos().getAbsoluteName());
167 
168    return *connectReturnQos_;
169 }
170 
171 org::xmlBlaster::util::Global& XmlBlasterAccess::getGlobal()
172 {
173    return this->global_;
174 }
175 
176 org::xmlBlaster::util::queue::I_Queue* XmlBlasterAccess::getQueue()
177 {
178    if (connection_) {
179       return connection_->getQueue();
180    }
181    return 0;
182 }
183 
184 
185 org::xmlBlaster::client::I_Callback* XmlBlasterAccess::getCallback()
186 {
187    return this->updateClient_;
188 }
189 
190 void XmlBlasterAccess::setCallbackDispatcherActive(bool isActive)
191 {
192    string command = getSessionName() + "/?dispatcherActive=" + lexical_cast<string>(isActive);
193    sendAdministrativeCommand(command);
194    connectQos_->getCbAddress()->setDispatcherActive(isActive);
195 }
196 
197 string XmlBlasterAccess::sendAdministrativeCommand(const string &command, PublishQos *publishQosP)
198 {
199    bool isGet = command.find("get ") == 0 || command.find("GET ") == 0;
200    bool isSet = command.find("set ") == 0 || command.find("SET ") == 0;
201    const string cmd = ((isGet || isSet)) ? command.substr(4) : command;
202    
203    if (publishQosP ||(isSet || (!isGet && cmd.find("=") != string::npos)) ) {
204       string oid = string("__cmd:") + cmd;
205       PublishKey  key(global_, oid); // oid="__cmd:/client/joe/1/?dispatcherActive=false"
206       PublishQos qos(global_);
207       MessageUnit msgUnit(key, "", ( publishQosP ) ? *publishQosP : qos );
208       try {
209          PublishReturnQos ret = publish(msgUnit);
210          if (log_.trace()) log_.trace(ME, "Send '" + cmd + " '");
211          return ret.getState();
212       }
213       catch (XmlBlasterException &e) {
214          if (log_.trace()) log_.trace(ME, "Sending of '" + cmd + " ' failed: " + e.getMessage());
215          throw e;
216       }
217    }
218    else {
219       string oid = string("__cmd:") + cmd;
220       GetKey getKey(global_);
221       getKey.setOid(oid);
222       GetQos getQos(global_);
223       try {
224          vector<MessageUnit> msgVec = get(getKey, getQos);
225          if (log_.trace()) log_.trace(ME, "Send '" + cmd + " ', got array of size " + lexical_cast<string>(msgVec.size()));
226          if (msgVec.size() == 0)
227             return "";
228          return msgVec[0].getContentStr();
229       }
230       catch (XmlBlasterException &e) {
231          if (log_.trace()) log_.trace(ME, "Sending of '" + cmd + " ' failed: " + e.getMessage());
232          throw e;
233       }
234    }
235 }
236 
237 
238 void XmlBlasterAccess::createDefaultCbServer()
239 {
240    log_.call(ME, "::createDefaultCbServer");
241 
242    CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
243    const AddressBaseRef &addr = prop.getCurrentCallbackAddress();
244 
245    if(!cbServer_)
246      cbServer_ = initCbServer(getLoginName(), addr->getType(), addr->getVersion());
247 
248    addr->setAddress(cbServer_->getCbAddress());
249    addr->setType(cbServer_->getCbProtocol());
250    // !!!!! prop.setCallbackAddress(addr);
251    connectQos_->setSessionCbQueueProperty(prop);
252    if (log_.trace()) log_.trace(ME, string("::createDefaultCbServer: connectQos: ") + connectQos_->toXml());
253    log_.info(ME, "Callback settings: " + prop.getSettings());
254 }
255 
256 I_CallbackServer*
257 XmlBlasterAccess::initCbServer(const string& loginName, const string& type, const string& version)
258 {
259    if (log_.call()) log_.call(ME, string("::initCbServer: loginName='") + loginName + "' type='" + type + "' version='" + version +"'");
260    if (log_.trace()) log_.trace(ME, string("Using 'client.cbProtocol=") + type + string("' to be used by ") + getServerNodeId() + string(", trying to create the callback server ..."));
261    I_CallbackServer* server = &(global_.getCbServerPluginManager().getPlugin(instanceName_, type, version));
262    if (log_.trace()) log_.trace(ME, "After callback plugin creation");
263    server->initialize(loginName, *this);
264    if (log_.trace()) log_.trace(ME, "After callback plugin initialize");
265    return server;
266 }
267 
268 org::xmlBlaster::util::dispatch::I_PostSendListener* XmlBlasterAccess::registerPostSendListener(org::xmlBlaster::util::dispatch::I_PostSendListener *listener) {
269    I_PostSendListener* old = this->postSendListener_;
270    this->postSendListener_ = listener;
271    //if (connection_)
272    //   return connection_->registerPostSendListener(this);
273    return old;
274 }
275 
276 // I_PostSendListener
277 void XmlBlasterAccess::postSend(const std::vector<EntryType> &entries)
278 {
279    I_PostSendListener* l = this->postSendListener_;
280    if (l)
281       l->postSend(entries);
282 }
283 
284 // I_PostSendListener
285 bool XmlBlasterAccess::sendingFailed(const std::vector<EntryType> &entries, const XmlBlasterException &exception)
286 {
287    I_PostSendListener* l = this->postSendListener_;
288    if (l)
289       return l->sendingFailed(entries, exception);
290    return false;
291 }
292 
293 org::xmlBlaster::client::protocol::I_ProgressListener* XmlBlasterAccess::registerProgressListener(org::xmlBlaster::client::protocol::I_ProgressListener *listener)
294 {
295    return (this->cbServer_) ? this->cbServer_->registerProgressListener(listener) : 0;
296 }
297 
298 org::xmlBlaster::util::qos::ConnectQosRef XmlBlasterAccess::getConnectQos() {
299    return connectQos_;
300 }
301 
302 //org::xmlBlaster::util::qos::ConnectReturnQosRef XmlBlasterAccess::getConnectReturnQos() {
303 //}
304 
305 void
306 XmlBlasterAccess::initSecuritySettings(const string& /*secMechanism*/, const string& /*secVersion*/)
307 {
308    log_.error(ME, "initSecuritySettings not implemented yet");
309 }
310 
311 void XmlBlasterAccess::leaveServer(const StringMap &/*map*/)
312 {
313    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
314    if (!isConnected()) {
315       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::leaveServer", "You are not connected to the xmlBlaster");
316    }
317    
318    if (cbServer_) {
319       if (log_.trace()) log_.trace(ME, "destructor: going to delete the callback connection");
320       cbServer_->shutdownCb();
321    }
322    
323    if (connection_) {
324       if (log_.trace()) log_.trace(ME, "destructor: going to delete the connection");
325       connection_->shutdown();
326    }
327    
328    if (cbServer_) {
329       CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
330       AddressBaseRef addr = prop.getCurrentCallbackAddress(); // c++ may not return null
331       global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() );
332       cbServer_ = NULL;
333    }
334    
335    if (connection_) {
336       delete connection_;
337       connection_ = NULL;
338    }
339    log_.info(ME, "leaveServer() done");
340 }
341 
342 
343 bool
344 XmlBlasterAccess::disconnect(const DisconnectQos& qos, bool flush, bool shutdown, bool shutdownCb)
345 {
346    // locking until finished 
347    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
348    bool ret1 = true;
349    bool ret3 = true;
350    if (log_.call()) {
351       log_.call(ME, string("disconnect called with flush='") + Global::getBoolAsString(flush) + 
352                               "' shutdown='" + Global::getBoolAsString(shutdown) + 
353                     "' shutdownCb='" + Global::getBoolAsString(shutdownCb) + "'");
354    }
355 
356    if (log_.trace()) log_.trace(ME, "disconnecting the client connection");
357    if (log_.dump()) log_.dump(ME, string("disconnect: the qos is:\n") + qos.toXml());
358    if (connection_ != NULL) {
359       ret1  = connection_->disconnect(qos);
360       if (shutdown) connection_->shutdown();
361    }
362    else {
363       ret1 = false;
364    }
365    if (shutdownCb) {
366       if (cbServer_) {
367          ret3 = cbServer_->shutdownCb();
368 
369          CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
370          const AddressBaseRef &addr = prop.getCurrentCallbackAddress();
371          global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() );
372          cbServer_ = NULL;
373       }
374       else ret3 = false;
375    }
376    return ret1 && ret3;
377 }
378 
379 string XmlBlasterAccess::getId()
380 {
381    return getSessionName();
382 }
383 
384 SessionNameRef XmlBlasterAccess::getSessionNameRef()
385 {
386    if (!connectReturnQos_.isNull()) return connectReturnQos_->getSessionQos().getSessionName();
387    return connectQos_->getSessionQos().getSessionName();
388 }
389 
390 string XmlBlasterAccess::getSessionName()
391 {
392    string ret;
393    if (!connectReturnQos_.isNull()) ret = connectReturnQos_->getSessionQos().getAbsoluteName();
394    if (ret == "") ret = connectQos_->getSessionQos().getAbsoluteName();
395    return ret;
396 }
397 
398 string XmlBlasterAccess::getLoginName()
399 {
400    try {
401       string nm = connectQos_->getSecurityQos().getUserId();
402       if (nm != "") return nm;
403    }
404    catch (XmlBlasterException e) {
405       log_.warn(ME, e.toString());
406    }
407    return string("client?");
408 }
409 
410 void XmlBlasterAccess::setServerNodeId(const string& nodeId)
411 {
412    serverNodeId_ = nodeId;
413 }
414 
415 string XmlBlasterAccess::getServerNodeId() const
416 {
417    return serverNodeId_;
418 }
419 
420 /*
421 MsgQueueEntry
422 XmlBlasterAccess::queueMessage(const MsgQueueEntry& entry)
423 {
424  return entry;
425 }
426 
427 vector<MsgQueueEntry*>
428 XmlBlasterAccess::queueMessage(const vector<MsgQueueEntry*>& entries)
429 {
430    return entries;
431 }
432 */
433 
434 SubscribeReturnQos XmlBlasterAccess::subscribe(const SubscribeKey& key, const SubscribeQos& qos, I_Callback *callback)
435 {
436    // locking until finished 
437    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
438    
439    if (!isConnected()) {
440       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::subscribe", "you are not connected to the xmlBlaster");
441    }
442    if (log_.call()) log_.call(ME, "subscribe");
443    if (log_.dump()) {
444       log_.dump(ME, string("subscribe. The key:\n") + key.toXml());
445       log_.dump(ME, string("subscribe. The Qos:\n") + qos.toXml());
446    }
447 
448    SessionNameRef sessionName = getSessionNameRef();
449    if (sessionName->getPubSessionId() > 0 &&
450       qos.getMultiSubscribe()==false &&
451       !qos.hasSubscriptionId()) {
452       // For failsave clients we generate on client side the subscriptionId
453       // In case of offline/clientSideQueued operation we guarantee like this a not changing
454       // subscriptionId and the client code can reliably use the subscriptionId for further dispatching
455       // of update() messages.
456       SubscribeQos& q = const_cast<SubscribeQos&>(qos);
457       q.generateSubscriptionId(sessionName, key);
458       if (log_.trace()) log_.trace(ME, "subscribe: generated client side subscriptionId=" + q.getData().getSubscriptionId());
459    }
460 
461    if (callback != 0) { // using a subscribe specific callback?
462       if (log_.trace()) log_.trace(ME, "subscribe: inserting individual callback in callback map");
463       org::xmlBlaster::util::thread::Lock lockUpdate(updateMutex_);
464       SubscribeReturnQos retQos = connection_->subscribe(key, qos);
465       std::string subId = retQos.getSubscriptionId();
466       subscriptionCallbackMap_.insert(std::map<std::string, I_Callback*>::value_type(subId, callback));
467       return retQos;
468    }
469    else {
470       if (log_.trace()) log_.trace(ME, "subscribe: no specific callback");
471       return connection_->subscribe(key, qos);
472    }
473 }
474 
475 vector<MessageUnit> XmlBlasterAccess::get(const GetKey& key, const GetQos& qos)
476 {
477    // locking until finished 
478    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
479    if (!isConnected()) {
480       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::get", "you are not connected to the xmlBlaster");
481    }
482    if (log_.call()) log_.call(ME, "get");
483    if (log_.dump()) {
484       log_.dump(ME, string("get. The key:\n") + key.toXml());
485       log_.dump(ME, string("get. The Qos:\n") + qos.toXml());
486    }
487    return connection_->get(key, qos);
488 }
489 
490 vector<MessageUnit> XmlBlasterAccess::receive(string oid, int maxEntries, long timeout, bool consumable) {
491    if (!isConnected()) {
492       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::receive", "you are not connected to the xmlBlaster");
493    }
494    if (log_.call()) log_.call(ME, "receive");
495    
496     //topic/hello          to access a history queue,
497     //client/joe           to access a subject queue or
498     //client/joe/session/1 
499    if (oid.find("topic") != string::npos)
500       oid = "__cmd:"+oid+"/?historyQueueEntries"; // "__cmd:topic/hello/?historyQueueEntries"
501    else if (oid.find("session") != string::npos)
502       oid = "__cmd:"+oid+"/?callbackQueueEntries"; // "__cmd:client/joe/session/1/?callbackQueueEntries";
503    else if (oid.find("subject") != string::npos)
504       oid = "__cmd:"+oid+"/?subjectQueueEntries"; // "__cmd:client/joe/?subjectQueueEntries"
505    else
506       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::receive", "Can't parse '" + oid + "'");
507 
508    GetKey getKey(global_, oid);
509    QueryQosData data(global_);
510    data.setQueryQos(maxEntries, timeout, consumable);
511    GetQos getQos(global_, data);
512    vector<MessageUnit> msgs = get(getKey, getQos);
513    if (log_.trace()) log_.trace(ME, string("receive - got '") + lexical_cast<std::string>(msgs.size()) + "'");
514    return msgs;
515 }
516 
517 vector<MessageUnit> XmlBlasterAccess::request(MessageUnit &msgUnit, long timeout, int maxEntries) {
518    if (log_.call()) log_.call(ME, "request");
519 
520    // Create a temporary reply topic ...
521    long destroyDelay = timeout+86400000; // on client crash, cleanup after one day; //long destroyDelay = -1;
522    string tempTopicOid = createTemporaryTopic(destroyDelay, maxEntries);
523 
524    try {
525       // Send the request ...
526       // "__jms:JMSReplyTo"
527       org::xmlBlaster::util::qos::QosData &qos =  const_cast<org::xmlBlaster::util::qos::QosData&>(msgUnit.getQos());
528       qos.addClientProperty(string(Constants::JMS_REPLY_TO), tempTopicOid);
529       publish(msgUnit);
530       // Access the reply ...
531       vector<MessageUnit> msgs = receive("topic/"+tempTopicOid, maxEntries, timeout, true);
532       {  // Clean up temporary topic ...
533          EraseKey ek(global_, tempTopicOid);
534          EraseQos eq(global_);
535          eq.setForceDestroy(true);
536          erase(ek, eq);
537       }
538       return msgs;
539    }
540    catch (exception &ex) {
541       {  // Clean up temporary topic ...
542          EraseKey ek(global_, tempTopicOid);
543          EraseQos eq(global_);
544          eq.setForceDestroy(true);
545          erase(ek, eq);
546       }
547       throw ex;
548    }
549 }
550 
551 std::string XmlBlasterAccess::createTemporaryTopic(long destroyDelay, int historyMaxMsg) {
552    PublishKey pk(global_);
553    PublishQos pq(global_);
554    TopicProperty topicProperty(global_);
555    topicProperty.setDestroyDelay(destroyDelay);
556    topicProperty.setCreateDomEntry(false);
557    topicProperty.setReadonly(false);
558    pq.setAdministrative(true);
559    if (historyMaxMsg >= 0L) {
560       HistoryQueueProperty prop(global_, "");
561       prop.setMaxEntries(historyMaxMsg);
562       topicProperty.setHistoryQueueProperty(prop);
563    }
564    pq.setTopicProperty(topicProperty);
565    MessageUnit msgUnit(pk, "", pq);
566    PublishReturnQos prq = publish(msgUnit);
567    if (log_.call()) log_.call(ME, string("Created temporary topic ") + prq.getKeyOid());
568    return prq.getKeyOid();
569 }
570 
571 
572 vector<UnSubscribeReturnQos>
573 XmlBlasterAccess::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
574 {
575    // locking until finished 
576    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
577    if (!isConnected()) {
578       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::unsSubscribe", "you are not connected to the xmlBlaster");
579    }
580    if (log_.call()) log_.call(ME, "unSubscribe");
581    if (log_.dump()) {
582       log_.dump(ME, string("unSubscribe. The key:\n") + key.toXml());
583       log_.dump(ME, string("unSubscribe. The Qos:\n") + qos.toXml());
584    }
585    // synchronization
586    org::xmlBlaster::util::thread::Lock lock1(updateMutex_);
587    vector<UnSubscribeReturnQos> ret = connection_->unSubscribe(key, qos);
588    vector<UnSubscribeReturnQos>::iterator iter = ret.begin();
589    while (iter != ret.end()) {
590       if (log_.trace()) log_.trace(ME, std::string("unSubscribe: removing callback for '") + (*iter).getSubscriptionId() + "'");
591       subscriptionCallbackMap_.erase((*iter).getSubscriptionId());
592       iter++;
593    }
594    return ret;
595 }
596 
597 PublishReturnQos XmlBlasterAccess::publish(const MessageUnit& msgUnit)
598 {
599    // locking until finished 
600    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
601    if (!isConnected()) {
602       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publish", "you are not connected to the xmlBlaster");
603    }
604    if (log_.call()) log_.call(ME, "publish");
605    if (log_.dump()) {
606       log_.dump(ME, string("publish. The msgUnit:\n") + msgUnit.toXml());
607    }
608    return connection_->publish(msgUnit);
609 }
610 
611 void XmlBlasterAccess::publishOneway(const vector<MessageUnit>& msgUnitArr)
612 {
613    // locking until finished 
614    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
615    if (!isConnected()) {
616       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publishOneway", "you are not connected to the xmlBlaster");
617    }
618    if (log_.call()) log_.call(ME, "publishOneway");
619    if (log_.dump()) {
620       for (vector<MessageUnit>::size_type i=0; i < msgUnitArr.size(); i++) {
621          log_.dump(ME, string("publishOneway. The msgUnit[") + lexical_cast<std::string>(i) + "]:\n" + msgUnitArr[i].toXml());
622       }
623    }
624    connection_->publishOneway(msgUnitArr);
625 }
626 
627 vector<PublishReturnQos> XmlBlasterAccess::publishArr(const vector<MessageUnit> &msgUnitArr)
628 {
629    // locking until finished 
630    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
631    if (!isConnected()) {
632       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publishArr", "you are not connected to the xmlBlaster");
633    }
634    if (log_.call()) log_.call(ME, "publishArr");
635    if (log_.dump()) {
636       for (vector<MessageUnit>::size_type i=0; i < msgUnitArr.size(); i++) {
637          log_.dump(ME, string("publishArr. The msgUnit[") + lexical_cast<std::string>(i) + "]:\n" + msgUnitArr[i].toXml());
638       }
639    }
640    return connection_->publishArr(msgUnitArr);
641 }
642 
643 vector<EraseReturnQos> XmlBlasterAccess::erase(const EraseKey& key, const EraseQos& qos)
644 {
645    // locking until finished 
646    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
647    if (!isConnected()) {
648       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::erase", "you are not connected to the xmlBlaster");
649    }
650    if (log_.call()) log_.call(ME, "erase");
651    if (log_.dump()) {
652       log_.dump(ME, string("erase. The key:\n") + key.toXml());
653       log_.dump(ME, string("erase. The Qos:\n") + qos.toXml());
654    }
655    return connection_->erase(key, qos);
656 }
657 
658 string
659 XmlBlasterAccess::update(const string &sessionId, UpdateKey &updateKey, const unsigned char *content, long contentSize, UpdateQos &updateQos)
660 {
661    if (log_.call()) log_.call(ME, "::update");
662    if (log_.trace()) log_.trace(ME, string("update. The sessionId is '") + sessionId + "'");
663    if (log_.dump()) {
664       log_.dump(ME, string("update. The key:\n") + updateKey.toXml());
665       log_.dump(ME, string("update. The Qos:\n") + updateQos.toXml());
666    }
667 
668    if (!subscriptionCallbackMap_.empty()) {
669       // This is synchronized but you must ensure the callback is still in scope when the update method is 
670       // invoked. This could be more robust with a reference counted I_Callback.
671       I_Callback* subscriptionCallback = 0;
672       {
673          org::xmlBlaster::util::thread::Lock lock(updateMutex_);
674          CallbackMapType::iterator iter = subscriptionCallbackMap_.end();
675          iter = subscriptionCallbackMap_.find(updateQos.getSubscriptionId());
676          if (iter != subscriptionCallbackMap_.end()) subscriptionCallback = (*iter).second;
677       }
678 
679       if (subscriptionCallback != 0) {
680          if (log_.trace()) log_.trace(ME, std::string("update: invoking specific subscription callback"));
681          return subscriptionCallback->update(sessionId, updateKey, content, contentSize, updateQos);
682       }
683    }
684 
685    if (updateClient_)
686       return updateClient_->update(sessionId, updateKey, content, contentSize, updateQos);
687    else {
688       // See similar behavior in XmlBlasterAccess.java
689       log_.error(ME, string("Ignoring unexpected update message as client has not registered a callback: ") + updateKey.toXml() + "" + updateQos.toXml());
690    }
691 
692    return Constants::RET_OK; // "<qos><state id='OK'/></qos>";
693 }
694 
695 std::string XmlBlasterAccess::usage()
696 {
697    string text = string("\n");
698    text += string("Choose a connection protocol:\n");
699    text += string("   -protocol           Specify a protocol to talk with xmlBlaster, choose 'SOCKET' or 'IOR' depending on your compilation.\n");
700    text += string("                       Current setting is '") + Global::getInstance().getProperty().getStringProperty("protocol", Global::getDefaultProtocol());
701    text += string("\n\n");
702    text += string("Security features:\n");
703    text += string("   -Security.Client.DefaultPlugin \"gui,1.0\"\n");
704    text += string("                       Force the given authentication schema, here the GUI is enforced\n");
705    text += string("                       Clients can overwrite this with ConnectQos.java\n");
706 
707    return text; // std::cout << text << std::endl;
708 }
709 
710 void XmlBlasterAccess::initFailsafe(I_ConnectionProblems* connectionProblems)
711 {
712    if (connection_) connection_->initFailsafe(connectionProblems);
713    else connectionProblems_ = connectionProblems;   
714 }
715 
716 string XmlBlasterAccess::ping()
717 {
718    // locking until finished 
719    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
720    return connection_->ping("<qos/>");
721 }
722 
723 long XmlBlasterAccess::flushQueue()
724 {
725    if (!connection_) {
726       throw XmlBlasterException(INTERNAL_NULLPOINTER, ME + "::flushQueue", "no connection exists when trying to flush the queue: try to connect to xmlBlaster first");
727    }
728    return connection_->flushQueue();
729 }
730 
731 
732 bool XmlBlasterAccess::isConnected() const
733 {
734    if (!connection_) return false;
735    return connection_->isConnected();
736 }
737 
738 bool XmlBlasterAccess::isAlive() const
739 {
740    if (!connection_) return false;
741    return connection_->isAlive();
742 }
743 
744 bool XmlBlasterAccess::isPolling() const
745 {
746    if (!connection_) return false;
747    return connection_->isPolling();
748 }
749 
750 bool XmlBlasterAccess::isDead() const
751 {
752    if (!connection_) return false;
753    return connection_->isDead();
754 }
755  
756 
757 std::string XmlBlasterAccess::getStatusString() const
758 {
759    if (!connection_) return "DEAD";
760    return connection_->getStatusString();
761 }
762 
763 
764 }}} // namespaces
765 
766 
767 #ifdef _XMLBLASTER_CLASSTEST
768 
769 #include <util/Timestamp.h>
770 #include <util/thread/ThreadImpl.h>
771 
772 using namespace std;
773 using namespace org::xmlBlaster::client;
774 using namespace org::xmlBlaster::util::thread;
775 
776 int main(int args, char* argv[])
777 {
778     // Init the XML platform
779     try
780     {
781        Global& glob = Global::getInstance();
782        glob.initialize(args, argv);
783        Log& log = glob.getLog("org.xmlBlaster.client");
784 
785        XmlBlasterAccess xmlBlasterAccess(glob);
786        ConnectQos connectQos(glob);
787 
788        log.info("main", string("the connect qos is: ") + connectQos.toXml());
789 
790        ConnectReturnQosRef retQos = xmlBlasterAccess.connect(connectQos, NULL);
791        log.info("", "Successfully connect to xmlBlaster");
792 
793        if (log.trace()) log.trace("main", "Subscribing using XPath syntax ...");
794        SubscribeKey subKey(glob,"//test","XPATH");
795        log.info("main", string("subscribe key: ") + subKey.toXml());
796        SubscribeQos subQos(glob);
797        log.info("main", string("subscribe qos: ")  + subQos.toXml());
798        try {
799           SubscribeReturnQos subReturnQos = xmlBlasterAccess.subscribe(subKey, subQos);
800           log.info("main", string("Success: Subscribe return qos=") +
801                    subReturnQos.toXml() + " done");
802        }
803        catch (XmlBlasterException &ex) {
804           log.error("main", ex.toXml());
805        }
806 
807        PublishKey pubKey(glob);
808        pubKey.setOid("HelloWorld");
809        pubKey.setClientTags("<test></test>");
810        PublishQos pubQos(glob);
811        MessageUnit msgUnit(pubKey, string("Hi"), pubQos);
812 
813        PublishReturnQos pubRetQos = xmlBlasterAccess.publish(msgUnit);
814        log.info("main", string("successfully published, publish return qos: ") + pubRetQos.toXml());
815 
816        log.info("", "Successfully published a message to xmlBlaster");
817        log.info("", "Sleeping");
818        Timestamp delay = 10000000000ll; // 10 seconds
819        Thread::sleep(delay);
820    }
821    catch (XmlBlasterException &ex) {
822       std::cout << ex.toXml() << std::endl;
823    }
824    return 0;
825 }
826 
827 #endif


syntax highlighted by Code2HTML, v. 0.9.1