00001 /*------------------------------------------------------------------------------ 00002 Name: XmlBlasterAccess.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 ------------------------------------------------------------------------------*/ 00006 00007 #include <client/XmlBlasterAccess.h> 00008 #include <util/Global.h> 00009 #include <util/lexical_cast.h> 00010 #include <util/Timestamp.h> 00011 #include <util/dispatch/DispatchManager.h> 00012 #include <util/parser/ParserFactory.h> 00013 00014 namespace org { namespace xmlBlaster { namespace client { 00015 00016 using namespace std; 00017 using namespace org::xmlBlaster::util; 00018 using namespace org::xmlBlaster::util::qos; 00019 using namespace org::xmlBlaster::util::dispatch; 00020 using namespace org::xmlBlaster::util::dispatch; 00021 using namespace org::xmlBlaster::util::qos::storage; 00022 using namespace org::xmlBlaster::util::qos::address; 00023 using namespace org::xmlBlaster::authentication; 00024 using namespace org::xmlBlaster::client::protocol; 00025 using namespace org::xmlBlaster::client::key; 00026 using namespace org::xmlBlaster::client::qos; 00027 00028 XmlBlasterAccess::XmlBlasterAccess(Global& global) 00029 : ME(string("XmlBlasterAccess-UNCONNECTED")), 00030 global_(global), 00031 globalRef_(NULL), 00032 log_(global.getLog("org.xmlBlaster.client")), 00033 serverNodeId_("xmlBlaster"), 00034 connectQos_(new ConnectQos(global)), 00035 connectReturnQos_((ConnectReturnQos*)0), 00036 subscriptionCallbackMap_(), 00037 updateMutex_(), 00038 invocationMutex_(global.getProperty().get("xmlBlaster/invocationMutex/recursive", true)), 00039 postSendListener_(0) 00040 { 00041 log_.call(ME, "::constructor"); 00042 cbServer_ = NULL; 00043 updateClient_ = NULL; 00044 connection_ = NULL; 00045 dispatchManager_ = NULL; 00046 connectionProblems_ = NULL; 00047 instanceName_ = lexical_cast<std::string>(TimestampFactory::getInstance().getTimestamp()); 00048 00049 // Hack for Windows: Initialize it from main thread, using the callback thread fails undeterminable (with xerces) 00050 org::xmlBlaster::util::parser::ParserFactory::getFactory().initialize(global); 00051 } 00052 00053 XmlBlasterAccess::XmlBlasterAccess(GlobalRef globalRef) 00054 : ME(string("XmlBlasterAccess-UNCONNECTED")), 00055 global_(*globalRef), 00056 globalRef_(globalRef), 00057 log_(global_.getLog("org.xmlBlaster.client")), 00058 serverNodeId_("xmlBlaster"), 00059 connectQos_(new ConnectQos(global_)), 00060 connectReturnQos_((ConnectReturnQos*)0), 00061 subscriptionCallbackMap_(), 00062 updateMutex_(), 00063 invocationMutex_(globalRef->getProperty().get("xmlBlaster/invocationMutex/recursive", true)), 00064 postSendListener_(0) 00065 { 00066 log_.call(ME, "::constructor"); 00067 cbServer_ = NULL; 00068 updateClient_ = NULL; 00069 connection_ = NULL; 00070 dispatchManager_ = NULL; 00071 connectionProblems_ = NULL; 00072 instanceName_ = lexical_cast<std::string>(TimestampFactory::getInstance().getTimestamp()); 00073 00074 // Hack for Windows: Initialize it from main thread, using the callback thread fails undeterminable (with xerces) 00075 org::xmlBlaster::util::parser::ParserFactory::getFactory().initialize(global_); 00076 } 00077 00078 XmlBlasterAccess::~XmlBlasterAccess() 00079 { 00080 if (log_.call()) log_.call(ME, "destructor"); 00081 cleanup(true); 00082 dispatchManager_ = NULL; 00083 updateClient_ = NULL; 00084 connectionProblems_ = NULL; 00085 if (log_.trace()) log_.trace(ME, "destructor ended"); 00086 } 00087 00088 void XmlBlasterAccess::cleanup(bool doLock) 00089 { 00090 if (log_.call()) log_.call(ME, "cleanup"); 00091 if (doLock) { 00092 // synchronization 00093 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00094 org::xmlBlaster::util::thread::Lock lock1(updateMutex_); 00095 subscriptionCallbackMap_.clear(); 00096 } 00097 else { 00098 org::xmlBlaster::util::thread::Lock lock1(updateMutex_); 00099 subscriptionCallbackMap_.clear(); 00100 } 00101 00102 if (cbServer_) { 00103 CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available 00104 const AddressBaseRef& addr = prop.getCurrentCallbackAddress(); // c++ may not return null 00105 global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() ); 00106 cbServer_ = NULL; 00107 } 00108 if (connection_) { 00109 if (log_.trace()) log_.trace(ME, "destructor: going to delete the connection"); 00110 connection_->shutdown(); 00111 delete connection_; 00112 connection_ = NULL; 00113 } 00114 } 00115 00116 00117 ConnectReturnQos XmlBlasterAccess::connect(const ConnectQos& qos, I_Callback *clientCb) 00118 { 00119 ME = string("XmlBlasterAccess-") + qos.getSessionQos().getAbsoluteName(); 00120 if (log_.call()) log_.call(ME, "::connect"); 00121 if (log_.dump()) log_.dump(ME, string("::connect: qos: ") + qos.toXml()); 00122 00123 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00124 00125 cleanup(false); 00126 00127 global_.setId(qos.getSessionQos().getAbsoluteName()); // global_.setId(loginName + currentTimeMillis()); 00128 connectQos_ = new ConnectQos(qos); 00129 connectQos_->setInstanceId(global_.getInstanceId()); 00130 00131 SecurityQos securityQos = connectQos_->getSecurityQos(); 00132 00133 ME = string("XmlBlasterAccess-") + getId(); 00134 string typeVersion = global_.getProperty().getStringProperty("queue/defaultPlugin", "CACHE,1.0"); 00135 typeVersion = global_.getProperty().getStringProperty("queue/connection/defaultPlugin", "typeVersion"); 00136 updateClient_ = clientCb; 00137 00138 if (updateClient_) createDefaultCbServer(); 00139 00140 if (log_.trace()) log_.trace(ME, string("::connect. CbServer done")); 00141 // currently the simple version will do it ... 00142 if (!dispatchManager_) dispatchManager_ = &(global_.getDispatchManager()); 00143 00144 if (!connection_) { 00145 connection_ = dispatchManager_->getConnectionsHandler(instanceName_); 00146 connection_->registerPostSendListener(this); 00147 } 00148 00149 if (connectionProblems_) { 00150 if (log_.trace()) log_.trace(ME, "::connect. Registering initFailsafe"); 00151 connection_->initFailsafe(connectionProblems_); 00152 connectionProblems_ = NULL; 00153 } 00154 if (log_.trace()) log_.trace(ME, string("::connect. connectQos: ") + connectQos_->toXml()); 00155 // do connect() now: 00156 connectReturnQos_ = connection_->connect(connectQos_); 00157 00158 ME = string("XmlBlasterAccess-") + connectReturnQos_->getSessionQos().getAbsoluteName(); 00159 00160 setServerNodeId(connectReturnQos_->getSessionQos().getClusterNodeId()); 00161 00162 // Is done in ConnectionsHandler.cpp 00163 //global_.setId(connectReturnQos_->getSessionQos().getAbsoluteName()); 00164 00165 return *connectReturnQos_; 00166 } 00167 00168 org::xmlBlaster::util::Global& XmlBlasterAccess::getGlobal() 00169 { 00170 return this->global_; 00171 } 00172 00173 org::xmlBlaster::util::queue::I_Queue* XmlBlasterAccess::getQueue() 00174 { 00175 if (connection_) { 00176 return connection_->getQueue(); 00177 } 00178 return 0; 00179 } 00180 00181 00182 org::xmlBlaster::client::I_Callback* XmlBlasterAccess::getCallback() 00183 { 00184 return this->updateClient_; 00185 } 00186 00187 void XmlBlasterAccess::setCallbackDispatcherActive(bool isActive) 00188 { 00189 string command = getSessionName() + "/?dispatcherActive=" + lexical_cast<string>(isActive); 00190 sendAdministrativeCommand(command); 00191 connectQos_->getCbAddress()->setDispatcherActive(isActive); 00192 } 00193 00194 string XmlBlasterAccess::sendAdministrativeCommand(const string &command) 00195 { 00196 bool isGet = command.find("get ") == 0 || command.find("GET ") == 0; 00197 bool isSet = command.find("set ") == 0 || command.find("SET ") == 0; 00198 const string cmd = ((isGet || isSet)) ? command.substr(4) : command; 00199 00200 if (isSet || (!isGet && cmd.find("=") != string::npos)) { 00201 string oid = string("__cmd:") + cmd; 00202 PublishKey key(global_, oid); // oid="__cmd:/client/joe/1/?dispatcherActive=false" 00203 PublishQos qos(global_); 00204 MessageUnit msgUnit(key, "", qos); 00205 try { 00206 PublishReturnQos ret = publish(msgUnit); 00207 if (log_.trace()) log_.trace(ME, "Send '" + cmd + " '"); 00208 return ret.getState(); 00209 } 00210 catch (XmlBlasterException &e) { 00211 if (log_.trace()) log_.trace(ME, "Sending of '" + cmd + " ' failed: " + e.getMessage()); 00212 throw e; 00213 } 00214 } 00215 else { 00216 string oid = string("__cmd:") + cmd; 00217 GetKey getKey(global_); 00218 getKey.setOid(oid); 00219 GetQos getQos(global_); 00220 try { 00221 vector<MessageUnit> msgVec = get(getKey, getQos); 00222 if (log_.trace()) log_.trace(ME, "Send '" + cmd + " ', got array of size " + lexical_cast<string>(msgVec.size())); 00223 if (msgVec.size() == 0) 00224 return ""; 00225 return msgVec[0].getContentStr(); 00226 } 00227 catch (XmlBlasterException &e) { 00228 if (log_.trace()) log_.trace(ME, "Sending of '" + cmd + " ' failed: " + e.getMessage()); 00229 throw e; 00230 } 00231 } 00232 } 00233 00234 00235 void XmlBlasterAccess::createDefaultCbServer() 00236 { 00237 log_.call(ME, "::createDefaultCbServer"); 00238 00239 CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available 00240 const AddressBaseRef &addr = prop.getCurrentCallbackAddress(); 00241 00242 if(!cbServer_) 00243 cbServer_ = initCbServer(getLoginName(), addr->getType(), addr->getVersion()); 00244 00245 addr->setAddress(cbServer_->getCbAddress()); 00246 addr->setType(cbServer_->getCbProtocol()); 00247 // !!!!! prop.setCallbackAddress(addr); 00248 connectQos_->setSessionCbQueueProperty(prop); 00249 if (log_.trace()) log_.trace(ME, string("::createDefaultCbServer: connectQos: ") + connectQos_->toXml()); 00250 log_.info(ME, "Callback settings: " + prop.getSettings()); 00251 } 00252 00253 I_CallbackServer* 00254 XmlBlasterAccess::initCbServer(const string& loginName, const string& type, const string& version) 00255 { 00256 if (log_.call()) log_.call(ME, string("::initCbServer: loginName='") + loginName + "' type='" + type + "' version='" + version +"'"); 00257 if (log_.trace()) log_.trace(ME, string("Using 'client.cbProtocol=") + type + string("' to be used by ") + getServerNodeId() + string(", trying to create the callback server ...")); 00258 I_CallbackServer* server = &(global_.getCbServerPluginManager().getPlugin(instanceName_, type, version)); 00259 if (log_.trace()) log_.trace(ME, "After callback plugin creation"); 00260 server->initialize(loginName, *this); 00261 if (log_.trace()) log_.trace(ME, "After callback plugin initialize"); 00262 return server; 00263 } 00264 00265 org::xmlBlaster::util::dispatch::I_PostSendListener* XmlBlasterAccess::registerPostSendListener(org::xmlBlaster::util::dispatch::I_PostSendListener *listener) { 00266 I_PostSendListener* old = this->postSendListener_; 00267 this->postSendListener_ = listener; 00268 //if (connection_) 00269 // return connection_->registerPostSendListener(this); 00270 return old; 00271 } 00272 00273 // I_PostSendListener 00274 void XmlBlasterAccess::postSend(const org::xmlBlaster::util::queue::MsgQueueEntry &msgQueueEntry) 00275 { 00276 I_PostSendListener* l = this->postSendListener_; 00277 if (l) 00278 l->postSend(msgQueueEntry); 00279 } 00280 00281 org::xmlBlaster::client::protocol::I_ProgressListener* XmlBlasterAccess::registerProgressListener(org::xmlBlaster::client::protocol::I_ProgressListener *listener) 00282 { 00283 return (this->cbServer_) ? this->cbServer_->registerProgressListener(listener) : 0; 00284 } 00285 00286 org::xmlBlaster::util::qos::ConnectQosRef XmlBlasterAccess::getConnectQos() { 00287 return connectQos_; 00288 } 00289 00290 //org::xmlBlaster::util::qos::ConnectReturnQosRef XmlBlasterAccess::getConnectReturnQos() { 00291 //} 00292 00293 void 00294 XmlBlasterAccess::initSecuritySettings(const string& /*secMechanism*/, const string& /*secVersion*/) 00295 { 00296 log_.error(ME, "initSecuritySettings not implemented yet"); 00297 } 00298 00299 void XmlBlasterAccess::leaveServer(const StringMap &/*map*/) 00300 { 00301 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00302 if (!isConnected()) { 00303 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::leaveServer", "You are not connected to the xmlBlaster"); 00304 } 00305 00306 if (cbServer_) { 00307 if (log_.trace()) log_.trace(ME, "destructor: going to delete the callback connection"); 00308 cbServer_->shutdownCb(); 00309 } 00310 00311 if (connection_) { 00312 if (log_.trace()) log_.trace(ME, "destructor: going to delete the connection"); 00313 connection_->shutdown(); 00314 } 00315 00316 if (cbServer_) { 00317 CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available 00318 AddressBaseRef addr = prop.getCurrentCallbackAddress(); // c++ may not return null 00319 global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() ); 00320 cbServer_ = NULL; 00321 } 00322 00323 if (connection_) { 00324 delete connection_; 00325 connection_ = NULL; 00326 } 00327 log_.info(ME, "leaveServer() done"); 00328 } 00329 00330 00331 bool 00332 XmlBlasterAccess::disconnect(const DisconnectQos& qos, bool flush, bool shutdown, bool shutdownCb) 00333 { 00334 // locking until finished 00335 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00336 bool ret1 = true; 00337 bool ret3 = true; 00338 if (log_.call()) { 00339 log_.call(ME, string("disconnect called with flush='") + Global::getBoolAsString(flush) + 00340 "' shutdown='" + Global::getBoolAsString(shutdown) + 00341 "' shutdownCb='" + Global::getBoolAsString(shutdownCb) + "'"); 00342 } 00343 00344 if (log_.trace()) log_.trace(ME, "disconnecting the client connection"); 00345 if (log_.dump()) log_.dump(ME, string("disconnect: the qos is:\n") + qos.toXml()); 00346 if (connection_ != NULL) { 00347 ret1 = connection_->disconnect(qos); 00348 if (shutdown) connection_->shutdown(); 00349 } 00350 else { 00351 ret1 = false; 00352 } 00353 if (shutdownCb) { 00354 if (cbServer_) { 00355 ret3 = cbServer_->shutdownCb(); 00356 00357 CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available 00358 const AddressBaseRef &addr = prop.getCurrentCallbackAddress(); 00359 global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() ); 00360 cbServer_ = NULL; 00361 } 00362 else ret3 = false; 00363 } 00364 return ret1 && ret3; 00365 } 00366 00367 string XmlBlasterAccess::getId() 00368 { 00369 return getSessionName(); 00370 } 00371 00372 SessionNameRef XmlBlasterAccess::getSessionNameRef() 00373 { 00374 if (!connectReturnQos_.isNull()) return connectReturnQos_->getSessionQos().getSessionName(); 00375 return connectQos_->getSessionQos().getSessionName(); 00376 } 00377 00378 string XmlBlasterAccess::getSessionName() 00379 { 00380 string ret; 00381 if (!connectReturnQos_.isNull()) ret = connectReturnQos_->getSessionQos().getAbsoluteName(); 00382 if (ret == "") ret = connectQos_->getSessionQos().getAbsoluteName(); 00383 return ret; 00384 } 00385 00386 string XmlBlasterAccess::getLoginName() 00387 { 00388 try { 00389 string nm = connectQos_->getSecurityQos().getUserId(); 00390 if (nm != "") return nm; 00391 } 00392 catch (XmlBlasterException e) { 00393 log_.warn(ME, e.toString()); 00394 } 00395 return string("client?"); 00396 } 00397 00398 void XmlBlasterAccess::setServerNodeId(const string& nodeId) 00399 { 00400 serverNodeId_ = nodeId; 00401 } 00402 00403 string XmlBlasterAccess::getServerNodeId() const 00404 { 00405 return serverNodeId_; 00406 } 00407 00408 /* 00409 MsgQueueEntry 00410 XmlBlasterAccess::queueMessage(const MsgQueueEntry& entry) 00411 { 00412 return entry; 00413 } 00414 00415 vector<MsgQueueEntry*> 00416 XmlBlasterAccess::queueMessage(const vector<MsgQueueEntry*>& entries) 00417 { 00418 return entries; 00419 } 00420 */ 00421 00422 SubscribeReturnQos XmlBlasterAccess::subscribe(const SubscribeKey& key, const SubscribeQos& qos, I_Callback *callback) 00423 { 00424 // locking until finished 00425 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00426 00427 if (!isConnected()) { 00428 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::subscribe", "you are not connected to the xmlBlaster"); 00429 } 00430 if (log_.call()) log_.call(ME, "subscribe"); 00431 if (log_.dump()) { 00432 log_.dump(ME, string("subscribe. The key:\n") + key.toXml()); 00433 log_.dump(ME, string("subscribe. The Qos:\n") + qos.toXml()); 00434 } 00435 00436 SessionNameRef sessionName = getSessionNameRef(); 00437 if (sessionName->getPubSessionId() > 0 && 00438 qos.getMultiSubscribe()==false && 00439 !qos.hasSubscriptionId()) { 00440 // For failsave clients we generate on client side the subscriptionId 00441 // In case of offline/clientSideQueued operation we guarantee like this a not changing 00442 // subscriptionId and the client code can reliably use the subscriptionId for further dispatching 00443 // of update() messages. 00444 SubscribeQos& q = const_cast<SubscribeQos&>(qos); 00445 q.generateSubscriptionId(sessionName, key); 00446 if (log_.trace()) log_.trace(ME, "subscribe: generated client side subscriptionId=" + q.getData().getSubscriptionId()); 00447 } 00448 00449 if (callback != 0) { // using a subscribe specific callback? 00450 if (log_.trace()) log_.trace(ME, "subscribe: inserting individual callback in callback map"); 00451 org::xmlBlaster::util::thread::Lock lockUpdate(updateMutex_); 00452 SubscribeReturnQos retQos = connection_->subscribe(key, qos); 00453 std::string subId = retQos.getSubscriptionId(); 00454 subscriptionCallbackMap_.insert(std::map<std::string, I_Callback*>::value_type(subId, callback)); 00455 return retQos; 00456 } 00457 else { 00458 if (log_.trace()) log_.trace(ME, "subscribe: no specific callback"); 00459 return connection_->subscribe(key, qos); 00460 } 00461 } 00462 00463 vector<MessageUnit> XmlBlasterAccess::get(const GetKey& key, const GetQos& qos) 00464 { 00465 // locking until finished 00466 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00467 if (!isConnected()) { 00468 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::get", "you are not connected to the xmlBlaster"); 00469 } 00470 if (log_.call()) log_.call(ME, "get"); 00471 if (log_.dump()) { 00472 log_.dump(ME, string("get. The key:\n") + key.toXml()); 00473 log_.dump(ME, string("get. The Qos:\n") + qos.toXml()); 00474 } 00475 return connection_->get(key, qos); 00476 } 00477 00478 vector<MessageUnit> XmlBlasterAccess::receive(string oid, int maxEntries, long timeout, bool consumable) { 00479 if (!isConnected()) { 00480 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::receive", "you are not connected to the xmlBlaster"); 00481 } 00482 if (log_.call()) log_.call(ME, "receive"); 00483 00484 //topic/hello to access a history queue, 00485 //client/joe to access a subject queue or 00486 //client/joe/session/1 00487 if (oid.find("topic") != string::npos) 00488 oid = "__cmd:"+oid+"/?historyQueueEntries"; // "__cmd:topic/hello/?historyQueueEntries" 00489 else if (oid.find("session") != string::npos) 00490 oid = "__cmd:"+oid+"/?callbackQueueEntries"; // "__cmd:client/joe/session/1/?callbackQueueEntries"; 00491 else if (oid.find("subject") != string::npos) 00492 oid = "__cmd:"+oid+"/?subjectQueueEntries"; // "__cmd:client/joe/?subjectQueueEntries" 00493 else 00494 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::receive", "Can't parse '" + oid + "'"); 00495 00496 GetKey getKey(global_, oid); 00497 QueryQosData data(global_); 00498 data.setQueryQos(maxEntries, timeout, consumable); 00499 GetQos getQos(global_, data); 00500 vector<MessageUnit> msgs = get(getKey, getQos); 00501 if (log_.trace()) log_.trace(ME, string("receive - got '") + lexical_cast<std::string>(msgs.size()) + "'"); 00502 return msgs; 00503 } 00504 00505 00506 vector<UnSubscribeReturnQos> 00507 XmlBlasterAccess::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos) 00508 { 00509 // locking until finished 00510 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00511 if (!isConnected()) { 00512 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::unsSubscribe", "you are not connected to the xmlBlaster"); 00513 } 00514 if (log_.call()) log_.call(ME, "unSubscribe"); 00515 if (log_.dump()) { 00516 log_.dump(ME, string("unSubscribe. The key:\n") + key.toXml()); 00517 log_.dump(ME, string("unSubscribe. The Qos:\n") + qos.toXml()); 00518 } 00519 // synchronization 00520 org::xmlBlaster::util::thread::Lock lock1(updateMutex_); 00521 vector<UnSubscribeReturnQos> ret = connection_->unSubscribe(key, qos); 00522 vector<UnSubscribeReturnQos>::iterator iter = ret.begin(); 00523 while (iter != ret.end()) { 00524 if (log_.trace()) log_.trace(ME, std::string("unSubscribe: removing callback for '") + (*iter).getSubscriptionId() + "'"); 00525 subscriptionCallbackMap_.erase((*iter).getSubscriptionId()); 00526 iter++; 00527 } 00528 return ret; 00529 } 00530 00531 PublishReturnQos XmlBlasterAccess::publish(const MessageUnit& msgUnit) 00532 { 00533 // locking until finished 00534 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00535 if (!isConnected()) { 00536 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publish", "you are not connected to the xmlBlaster"); 00537 } 00538 if (log_.call()) log_.call(ME, "publish"); 00539 if (log_.dump()) { 00540 log_.dump(ME, string("publish. The msgUnit:\n") + msgUnit.toXml()); 00541 } 00542 return connection_->publish(msgUnit); 00543 } 00544 00545 void XmlBlasterAccess::publishOneway(const vector<MessageUnit>& msgUnitArr) 00546 { 00547 // locking until finished 00548 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00549 if (!isConnected()) { 00550 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publishOneway", "you are not connected to the xmlBlaster"); 00551 } 00552 if (log_.call()) log_.call(ME, "publishOneway"); 00553 if (log_.dump()) { 00554 for (vector<MessageUnit>::size_type i=0; i < msgUnitArr.size(); i++) { 00555 log_.dump(ME, string("publishOneway. The msgUnit[") + lexical_cast<std::string>(i) + "]:\n" + msgUnitArr[i].toXml()); 00556 } 00557 } 00558 connection_->publishOneway(msgUnitArr); 00559 } 00560 00561 vector<PublishReturnQos> XmlBlasterAccess::publishArr(const vector<MessageUnit> &msgUnitArr) 00562 { 00563 // locking until finished 00564 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00565 if (!isConnected()) { 00566 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publishArr", "you are not connected to the xmlBlaster"); 00567 } 00568 if (log_.call()) log_.call(ME, "publishArr"); 00569 if (log_.dump()) { 00570 for (vector<MessageUnit>::size_type i=0; i < msgUnitArr.size(); i++) { 00571 log_.dump(ME, string("publishArr. The msgUnit[") + lexical_cast<std::string>(i) + "]:\n" + msgUnitArr[i].toXml()); 00572 } 00573 } 00574 return connection_->publishArr(msgUnitArr); 00575 } 00576 00577 vector<EraseReturnQos> XmlBlasterAccess::erase(const EraseKey& key, const EraseQos& qos) 00578 { 00579 // locking until finished 00580 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00581 if (!isConnected()) { 00582 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::erase", "you are not connected to the xmlBlaster"); 00583 } 00584 if (log_.call()) log_.call(ME, "erase"); 00585 if (log_.dump()) { 00586 log_.dump(ME, string("erase. The key:\n") + key.toXml()); 00587 log_.dump(ME, string("erase. The Qos:\n") + qos.toXml()); 00588 } 00589 return connection_->erase(key, qos); 00590 } 00591 00592 string 00593 XmlBlasterAccess::update(const string &sessionId, UpdateKey &updateKey, const unsigned char *content, long contentSize, UpdateQos &updateQos) 00594 { 00595 if (log_.call()) log_.call(ME, "::update"); 00596 if (log_.trace()) log_.trace(ME, string("update. The sessionId is '") + sessionId + "'"); 00597 if (log_.dump()) { 00598 log_.dump(ME, string("update. The key:\n") + updateKey.toXml()); 00599 log_.dump(ME, string("update. The Qos:\n") + updateQos.toXml()); 00600 } 00601 00602 if (!subscriptionCallbackMap_.empty()) { 00603 // This is synchronized but you must ensure the callback is still in scope when the update method is 00604 // invoked. This could be more robust with a reference counted I_Callback. 00605 I_Callback* subscriptionCallback = 0; 00606 { 00607 org::xmlBlaster::util::thread::Lock lock(updateMutex_); 00608 CallbackMapType::iterator iter = subscriptionCallbackMap_.end(); 00609 iter = subscriptionCallbackMap_.find(updateQos.getSubscriptionId()); 00610 if (iter != subscriptionCallbackMap_.end()) subscriptionCallback = (*iter).second; 00611 } 00612 00613 if (subscriptionCallback != 0) { 00614 if (log_.trace()) log_.trace(ME, std::string("update: invoking specific subscription callback")); 00615 return subscriptionCallback->update(sessionId, updateKey, content, contentSize, updateQos); 00616 } 00617 } 00618 00619 if (updateClient_) 00620 return updateClient_->update(sessionId, updateKey, content, contentSize, updateQos); 00621 else { 00622 // See similar behavior in XmlBlasterAccess.java 00623 log_.error(ME, string("Ignoring unexpected update message as client has not registered a callback: ") + updateKey.toXml() + "" + updateQos.toXml()); 00624 } 00625 00626 return Constants::RET_OK; // "<qos><state id='OK'/></qos>"; 00627 } 00628 00629 std::string XmlBlasterAccess::usage() 00630 { 00631 string text = string("\n"); 00632 text += string("Choose a connection protocol:\n"); 00633 text += string(" -protocol Specify a protocol to talk with xmlBlaster, choose 'SOCKET' or 'IOR' depending on your compilation.\n"); 00634 text += string(" Current setting is '") + Global::getInstance().getProperty().getStringProperty("protocol", Global::getDefaultProtocol()); 00635 text += string("\n\n"); 00636 text += string("Security features:\n"); 00637 text += string(" -Security.Client.DefaultPlugin \"gui,1.0\"\n"); 00638 text += string(" Force the given authentication schema, here the GUI is enforced\n"); 00639 text += string(" Clients can overwrite this with ConnectQos.java\n"); 00640 00641 return text; // std::cout << text << std::endl; 00642 } 00643 00644 void XmlBlasterAccess::initFailsafe(I_ConnectionProblems* connectionProblems) 00645 { 00646 if (connection_) connection_->initFailsafe(connectionProblems); 00647 else connectionProblems_ = connectionProblems; 00648 } 00649 00650 string XmlBlasterAccess::ping() 00651 { 00652 // locking until finished 00653 org::xmlBlaster::util::thread::Lock lock(invocationMutex_); 00654 return connection_->ping("<qos/>"); 00655 } 00656 00657 long XmlBlasterAccess::flushQueue() 00658 { 00659 if (!connection_) { 00660 throw XmlBlasterException(INTERNAL_NULLPOINTER, ME + "::flushQueue", "no connection exists when trying to flush the queue: try to connect to xmlBlaster first"); 00661 } 00662 return connection_->flushQueue(); 00663 } 00664 00665 00666 bool XmlBlasterAccess::isConnected() const 00667 { 00668 if (!connection_) return false; 00669 return connection_->isConnected(); 00670 } 00671 00672 bool XmlBlasterAccess::isAlive() const 00673 { 00674 if (!connection_) return false; 00675 return connection_->isAlive(); 00676 } 00677 00678 bool XmlBlasterAccess::isPolling() const 00679 { 00680 if (!connection_) return false; 00681 return connection_->isPolling(); 00682 } 00683 00684 bool XmlBlasterAccess::isDead() const 00685 { 00686 if (!connection_) return false; 00687 return connection_->isDead(); 00688 } 00689 00690 00691 std::string XmlBlasterAccess::getStatusString() const 00692 { 00693 if (!connection_) return "DEAD"; 00694 return connection_->getStatusString(); 00695 } 00696 00697 00698 }}} // namespaces 00699 00700 00701 #ifdef _XMLBLASTER_CLASSTEST 00702 00703 #include <util/Timestamp.h> 00704 #include <util/thread/ThreadImpl.h> 00705 00706 using namespace std; 00707 using namespace org::xmlBlaster::client; 00708 using namespace org::xmlBlaster::util::thread; 00709 00710 int main(int args, char* argv[]) 00711 { 00712 // Init the XML platform 00713 try 00714 { 00715 Global& glob = Global::getInstance(); 00716 glob.initialize(args, argv); 00717 Log& log = glob.getLog("org.xmlBlaster.client"); 00718 00719 XmlBlasterAccess xmlBlasterAccess(glob); 00720 ConnectQos connectQos(glob); 00721 00722 log.info("main", string("the connect qos is: ") + connectQos.toXml()); 00723 00724 ConnectReturnQosRef retQos = xmlBlasterAccess.connect(connectQos, NULL); 00725 log.info("", "Successfully connect to xmlBlaster"); 00726 00727 if (log.trace()) log.trace("main", "Subscribing using XPath syntax ..."); 00728 SubscribeKey subKey(glob,"//test","XPATH"); 00729 log.info("main", string("subscribe key: ") + subKey.toXml()); 00730 SubscribeQos subQos(glob); 00731 log.info("main", string("subscribe qos: ") + subQos.toXml()); 00732 try { 00733 SubscribeReturnQos subReturnQos = xmlBlasterAccess.subscribe(subKey, subQos); 00734 log.info("main", string("Success: Subscribe return qos=") + 00735 subReturnQos.toXml() + " done"); 00736 } 00737 catch (XmlBlasterException &ex) { 00738 log.error("main", ex.toXml()); 00739 } 00740 00741 PublishKey pubKey(glob); 00742 pubKey.setOid("HelloWorld"); 00743 pubKey.setClientTags("<test></test>"); 00744 PublishQos pubQos(glob); 00745 MessageUnit msgUnit(pubKey, string("Hi"), pubQos); 00746 00747 PublishReturnQos pubRetQos = xmlBlasterAccess.publish(msgUnit); 00748 log.info("main", string("successfully published, publish return qos: ") + pubRetQos.toXml()); 00749 00750 log.info("", "Successfully published a message to xmlBlaster"); 00751 log.info("", "Sleeping"); 00752 Timestamp delay = 10000000000ll; // 10 seconds 00753 Thread::sleep(delay); 00754 } 00755 catch (XmlBlasterException &ex) { 00756 std::cout << ex.toXml() << std::endl; 00757 } 00758 return 0; 00759 } 00760 00761 #endif