00001 /*------------------------------------------------------------------------------ 00002 Name: ConnectionsHandler.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Handles the I_XmlBlasterConnections 00006 ------------------------------------------------------------------------------*/ 00007 00008 #include <util/dispatch/ConnectionsHandler.h> 00009 #include <util/Global.h> 00010 #include <util/Timeout.h> 00011 #include <util/Timestamp.h> 00012 #include <util/Constants.h> 00013 #include <util/lexical_cast.h> 00014 #include <util/queue/QueueFactory.h> 00015 #include <util/queue/PublishQueueEntry.h> 00016 #include <util/queue/ConnectQueueEntry.h> 00017 #include <util/queue/SubscribeQueueEntry.h> 00018 00019 namespace org { namespace xmlBlaster { namespace util { namespace dispatch { 00020 00021 using namespace std; 00022 using namespace org::xmlBlaster::client::protocol; 00023 using namespace org::xmlBlaster::client; 00024 using namespace org::xmlBlaster::util; 00025 using namespace org::xmlBlaster::util::qos; 00026 using namespace org::xmlBlaster::util::thread; 00027 using namespace org::xmlBlaster::util::qos::storage; 00028 using namespace org::xmlBlaster::util::queue; 00029 using namespace org::xmlBlaster::client::qos; 00030 using namespace org::xmlBlaster::client::key; 00031 00032 ConnectionsHandler::ConnectionsHandler(org::xmlBlaster::util::Global& global, 00033 const string& instanceName) 00034 : ME(string("ConnectionsHandler-") + instanceName), 00035 connectQos_((ConnectQos*)0), 00036 connectReturnQos_((ConnectReturnQos*)0), 00037 status_(START), 00038 global_(global), 00039 log_(global.getLog("org.xmlBlaster.util.dispatch")), 00040 connectMutex_(), 00041 publishMutex_(), 00042 postSendListener_(0), 00043 instanceName_(instanceName) 00044 { 00045 ClientQueueProperty prop(global_, ""); 00046 connectionProblemsListener_ = NULL; 00047 connection_ = NULL; 00048 queue_ = NULL; 00049 retries_ = -1; 00050 currentRetry_ = 0; 00051 pingPollTimerKey_ = 0; 00052 doStopPing_ = false; 00053 if (log_.call()) log_.call(ME, "constructor"); 00054 } 00055 00056 ConnectionsHandler::~ConnectionsHandler() 00057 { 00058 if (log_.call()) log_.call(ME, "destructor"); 00059 if (pingPollTimerKey_ != 0) { 00060 global_.getPingTimer().removeTimeoutListener(pingPollTimerKey_); 00061 pingPollTimerKey_ = 0; 00062 } 00063 doStopPing_ = true; 00064 /* 00065 while (pingIsStarted_) { 00066 Thread::sleep(200); 00067 } 00068 */ 00069 Lock lock(connectMutex_); 00070 string type = (connectQos_.isNull()) ? org::xmlBlaster::util::Global::getDefaultProtocol() : connectQos_->getAddress()->getType(); // "SOCKET" 00071 string version = "1.0"; // currently hardcoded 00072 if (connection_) { 00073 global_.getDispatchManager().releasePlugin(instanceName_, type, version); 00074 connection_ = NULL; 00075 } 00076 if ( queue_ ) { 00077 delete queue_; 00078 queue_ = NULL; 00079 } 00080 if (log_.trace()) log_.trace(ME, "destructor: going to delete the connectQos"); 00081 status_ = END; 00082 if (log_.trace()) log_.trace(ME, "destructor ended"); 00083 } 00084 00085 00086 ConnectReturnQosRef ConnectionsHandler::connect(const ConnectQosRef& qos) 00087 { 00088 if (log_.call()) log_.call(ME, string("::connect status is '") + lexical_cast<std::string>(status_) + "'"); 00089 if (qos.isNull()) { 00090 throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::connect", "your connectQos is null"); 00091 } 00092 if (log_.dump()) log_.dump(ME, string("::connect, the qos is: ") + qos->toXml()); 00093 Lock lock(connectMutex_); 00094 if (isConnected()) { 00095 log_.warn(ME, "connect: you are already connected"); 00096 return connectReturnQos_; 00097 } 00098 00099 connectQos_ = qos; 00100 00101 global_.setSessionName(connectQos_->getSessionQos().getSessionName()); 00102 global_.setImmutableId(connectQos_->getSessionQos().getRelativeName()); 00103 global_.setId(connectQos_->getSessionQos().getAbsoluteName()); // temporary 00104 //log_.info(ME, "BEFORE id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName()); 00105 00106 retries_ = connectQos_->getAddress()->getRetries(); 00107 long pingInterval = connectQos_->getAddress()->getPingInterval(); 00108 if (log_.trace()) { 00109 log_.trace(ME, string("connect: number of retries during communication failure: ") + lexical_cast<std::string>(retries_)); 00110 log_.trace(ME, string("connect: Ping Interval: ") + lexical_cast<std::string>(pingInterval)); 00111 } 00112 00113 string type = connectQos_->getAddress()->getType(); 00114 string version = "1.0"; // currently hardcoded 00115 if (!connection_) { 00116 connection_ = &(global_.getDispatchManager().getPlugin(instanceName_, type, version)); 00117 } 00118 00119 try { 00120 connectReturnQos_ = connection_->connect(*connectQos_); 00121 global_.setSessionName(connectReturnQos_->getSessionQos().getSessionName()); 00122 // For "joe/1" it remains immutable; For "joe" there is added the server side generated sessionId "joe/-33": 00123 global_.setImmutableId(connectReturnQos_->getSessionQos().getRelativeName()); 00124 global_.setId(connectReturnQos_->getSessionQos().getAbsoluteName()); 00125 //log_.info(ME, "AFTER id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName()); 00126 } 00127 catch (XmlBlasterException &ex) { 00128 if ((ex.isCommunication() || ex.getErrorCodeStr().find("user.configuration") == 0)) { 00129 log_.warn(ME, "Got exception when connecting, polling now: " + ex.toString()); 00130 if (pingPollTimerKey_ == 0) 00131 startPinger(false); 00132 return queueConnect(); 00133 } 00134 else { 00135 if (log_.trace()) log_.trace(ME, string("the exception in connect is ") + ex.toXml()); 00136 throw ex; 00137 } 00138 } 00139 00140 log_.info(ME, string("successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'"); 00141 connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId()); 00142 00143 enum States oldState = status_; 00144 status_ = ALIVE; 00145 if (connectionProblemsListener_) connectionProblemsListener_->reachedAlive(oldState, this); 00146 // start the ping if in failsafe, i.e. if delay > 0 00147 startPinger(false); 00148 if (log_.dump()) log_.dump(ME, string("::connect, the return qos is: ") + connectReturnQos_->toXml()); 00149 00150 flushQueue(); 00151 00152 return connectReturnQos_; 00153 } 00154 00155 bool ConnectionsHandler::disconnect(const DisconnectQos& qos) 00156 { 00157 Lock lock(connectMutex_); 00158 if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::DISCONNECT); 00159 if (log_.dump()) log_.dump(ME, string("::disconnect, the qos is: ") + qos.toXml()); 00160 00161 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::DISCONNECT); 00162 if (status_ == DEAD) { 00163 log_.warn(ME, "already disconnected"); 00164 return false; 00165 } 00166 if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::DISCONNECT); 00167 00168 if (qos.getClearClientQueue() && queue_ != 0) queue_->clear(); 00169 00170 bool ret = connection_->disconnect(qos); 00171 enum States oldState = status_; 00172 status_ = DEAD; 00173 if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this); 00174 return ret; 00175 } 00176 00177 string ConnectionsHandler::getProtocol() 00178 { 00179 return connection_->getProtocol(); 00180 } 00181 00182 /* 00183 string ConnectionsHandler::loginRaw() 00184 { 00185 return connection_->loginRaw(); 00186 } 00187 */ 00188 00189 bool ConnectionsHandler::shutdown() 00190 { 00191 if (connection_) { 00192 return connection_->shutdown(); 00193 } 00194 return false; 00195 } 00196 00197 string ConnectionsHandler::getLoginName() 00198 { 00199 return connection_->getLoginName(); 00200 } 00201 00202 bool ConnectionsHandler::isLoggedIn() 00203 { 00204 return connection_->isLoggedIn(); 00205 } 00206 00207 string ConnectionsHandler::ping(const string& qos) 00208 { 00209 // Lock lock(connectionMutex_); 00210 return connection_->ping(qos); 00211 } 00212 00213 SubscribeReturnQos ConnectionsHandler::subscribe(const SubscribeKey& key, const SubscribeQos& qos) 00214 { 00215 if (log_.call()) log_.call(ME, MethodName::SUBSCRIBE); 00216 if (log_.dump()) log_.dump(ME, string("::subscribe, the key is: ") + key.toXml()); 00217 if (log_.dump()) log_.dump(ME, string("::subscribe, the qos is: ") + qos.toXml()); 00218 00219 // Lock lock(connectionMutex_); 00220 00221 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, MethodName::SUBSCRIBE); 00222 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, MethodName::SUBSCRIBE); 00223 if (putToQueue()) return queueSubscribe(key, qos); 00224 try { 00225 SubscribeReturnQos ret = connection_->subscribe(key, qos); 00226 return ret; 00227 } 00228 catch (XmlBlasterException& ex) { 00229 toPollingOrDead(&ex); 00230 if (putToQueue() && isRecoverable(&ex)) { 00231 log_.info(ME, string("::subscribe ") + key.getOid() + " is queued, exception=" + ex.getMessage()); 00232 return queueSubscribe(key, qos); 00233 } 00234 else { 00235 log_.warn(ME, string("::subscribe failed throwing now exception: ") + key.toXml() + qos.toXml() + " exception=" + ex.getMessage()); 00236 throw ex; 00237 } 00238 } 00239 } 00240 00241 00242 vector<MessageUnit> ConnectionsHandler::get(const GetKey& key, const GetQos& qos) 00243 { 00244 if (log_.call()) log_.call(ME, "get"); 00245 if (log_.dump()) log_.dump(ME, string("::get, the key is: ") + key.toXml()); 00246 if (log_.dump()) log_.dump(ME, string("::get, the qos is: ") + qos.toXml()); 00247 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "get"); 00248 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "get"); 00249 if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, "get"); 00250 try { 00251 return connection_->get(key, qos); 00252 } 00253 catch (XmlBlasterException& ex) { 00254 toPollingOrDead(&ex); 00255 throw ex; 00256 } 00257 } 00258 00259 00260 vector<UnSubscribeReturnQos> 00261 ConnectionsHandler::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos) 00262 { 00263 if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE); 00264 if (log_.dump()) log_.dump(ME, string("::unSubscribe, the key is: ") + key.toXml()); 00265 if (log_.dump()) log_.dump(ME, string("::unSubscribe, the qos is: ") + qos.toXml()); 00266 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE); 00267 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE); 00268 if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE); 00269 try { 00270 vector<UnSubscribeReturnQos> ret = connection_->unSubscribe(key, qos); 00271 return ret; 00272 } 00273 catch (XmlBlasterException& ex) { 00274 toPollingOrDead(&ex); 00275 throw ex; 00276 } 00277 } 00278 00279 bool ConnectionsHandler::putToQueue() { 00280 if (status_ == POLLING) return true; 00281 if (queue_ && queue_->getNumOfEntries() > 0) { 00282 return true; // guarantee sequence 00283 } 00284 return false; 00285 } 00286 00287 PublishReturnQos ConnectionsHandler::publish(const MessageUnit& msgUnit) 00288 { 00289 if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::PUBLISH); 00290 if (log_.dump()) log_.dump(ME, string("::publish, the msgUnit is: ") + msgUnit.toXml()); 00291 Lock lock(publishMutex_); 00292 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::PUBLISH); 00293 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::PUBLISH); 00294 if (putToQueue()) return queuePublish(msgUnit); 00295 try { 00296 // fill in the sender absolute name 00297 if (!connectReturnQos_.isNull()) { 00298 msgUnit.getQos().setSender(connectReturnQos_->getSessionQos().getSessionName()); 00299 } 00300 return connection_->publish(msgUnit); 00301 } 00302 catch (XmlBlasterException& ex) { 00303 toPollingOrDead(&ex); 00304 if (putToQueue() && isRecoverable(&ex)) { 00305 log_.info(ME, string("::publish ") + msgUnit.getKey().getOid() + " is queued, exception=" + ex.getMessage()); 00306 return queuePublish(msgUnit); 00307 } 00308 else { 00309 log_.warn(ME, string("::publish failed throwing now exception, the msgUnit is: ") + msgUnit.toXml() + " exception=" + ex.getMessage()); 00310 throw ex; 00311 } 00312 } 00313 } 00314 00315 00316 void ConnectionsHandler::publishOneway(const vector<MessageUnit> &msgUnitArr) 00317 { 00318 if (log_.call()) log_.call(ME, "publishOneway"); 00319 Lock lock(publishMutex_); 00320 00321 // fill in the sender absolute name 00322 if (!connectReturnQos_.isNull()) { 00323 for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) { 00324 msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName()); 00325 } 00326 } 00327 00328 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishOneway"); 00329 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishOneway"); 00330 if (putToQueue()) { 00331 for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]); 00332 } 00333 00334 try { 00335 connection_->publishOneway(msgUnitArr); 00336 } 00337 catch (XmlBlasterException& ex) { 00338 toPollingOrDead(&ex); 00339 if (putToQueue() && isRecoverable(&ex)) { 00340 for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]); 00341 } 00342 else 00343 throw ex; 00344 } 00345 } 00346 00347 00348 vector<PublishReturnQos> ConnectionsHandler::publishArr(const vector<MessageUnit> &msgUnitArr) 00349 { 00350 if (log_.call()) log_.call(ME, "publishArr"); 00351 Lock lock(publishMutex_); 00352 00353 // fill in the sender absolute name 00354 if (!connectReturnQos_.isNull()) { 00355 for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) { 00356 msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName()); 00357 } 00358 } 00359 00360 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishArr"); 00361 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishArr"); 00362 if (putToQueue()) { 00363 vector<PublishReturnQos> retQos; 00364 for (size_t i=0; i < msgUnitArr.size(); i++) { 00365 retQos.insert(retQos.end(), queuePublish(msgUnitArr[i])); 00366 } 00367 return retQos; 00368 } 00369 try { 00370 return connection_->publishArr(msgUnitArr); 00371 } 00372 catch (XmlBlasterException& ex) { 00373 toPollingOrDead(&ex); 00374 if (putToQueue() && isRecoverable(&ex)) { 00375 vector<PublishReturnQos> retQos; 00376 for (size_t i=0; i < msgUnitArr.size(); i++) { 00377 retQos.insert(retQos.end(), queuePublish(msgUnitArr[i])); 00378 } 00379 return retQos; 00380 } 00381 else throw ex; 00382 } 00383 } 00384 00385 00386 vector<EraseReturnQos> ConnectionsHandler::erase(const EraseKey& key, const EraseQos& qos) 00387 { 00388 if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::ERASE); 00389 if (log_.dump()) log_.dump(ME, string("::erase, the key is: ") + key.toXml()); 00390 if (log_.dump()) log_.dump(ME, string("::erase, the qos is: ") + qos.toXml()); 00391 00392 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::ERASE); 00393 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::ERASE); 00394 if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::ERASE); 00395 00396 try { 00397 return connection_->erase(key, qos); 00398 } 00399 catch (XmlBlasterException& ex) { 00400 toPollingOrDead(&ex); 00401 throw ex; 00402 } 00403 } 00404 00405 void ConnectionsHandler::initFailsafe(I_ConnectionProblems* connectionProblems) 00406 { 00407 // Lock lock(connectionMutex_); 00408 if (log_.trace()) log_.trace(ME, "Register initFailsafe " + lexical_cast<string>(connectionProblems!=0)); 00409 connectionProblemsListener_ = connectionProblems; 00410 } 00411 00412 // If recoverable we queue a msgUnit, else we throw an exception 00413 bool ConnectionsHandler::isRecoverable(const org::xmlBlaster::util::XmlBlasterException* reason) 00414 { 00415 // TODO: Authorization could also be recoverable (by a server admin) 00416 // Such decision must be left to the user (we need a callback to the user here) 00417 // As a default all communication problems are assumed to be recoverable 00418 if (reason == 0) 00419 return true; 00420 bool ret = reason->isCommunication(); 00421 if (log_.call()) log_.call(ME, "isRecoverable " + lexical_cast<string>(ret)); 00422 return ret; 00423 } 00424 00425 void ConnectionsHandler::toPollingOrDead(const org::xmlBlaster::util::XmlBlasterException* reason) 00426 { 00427 if (reason == 0) 00428 return; 00429 if (!reason->isCommunication()) 00430 return; 00431 00432 if (log_.call()) log_.call(ME, "toPollingOrDead"); 00433 00434 enum States oldState = status_; 00435 if (!isFailsafe()) { 00436 log_.info(ME, "going into DEAD status since not in failsafe mode. " 00437 "For failsafe mode set 'delay' to a positive long value, for example on the cmd line: -delay 10000" + 00438 ((reason != 0) ? (": " + reason->getMessage()) : "")); 00439 status_ = DEAD; 00440 connection_->shutdown(); 00441 if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this); 00442 return; 00443 } 00444 00445 log_.info(ME, "going into POLLING status:" + ((reason != 0) ? (": " + reason->getMessage()) : "")); 00446 status_ = POLLING; 00447 currentRetry_ = 0; 00448 /* 00449 try { 00450 DisconnectQos discQos(global_); 00451 connection_->disconnect(discQos); 00452 } 00453 catch (...) { 00454 log_.warn(ME, "exception when trying to disconnect"); 00455 } 00456 */ 00457 connection_->shutdown(); 00458 if (connectionProblemsListener_) connectionProblemsListener_->reachedPolling(oldState, this); 00459 startPinger(true); 00460 } 00461 00462 00463 void ConnectionsHandler::timeout(void * /*userData*/) 00464 { 00465 00466 Lock lock(connectMutex_); 00467 pingPollTimerKey_ = 0; 00468 if (doStopPing_) return; // then it must stop 00469 if ( log_.call() ) log_.call(ME, string("ping timeout occured with status '") + getStatusString() + "'" ); 00470 if (status_ == ALIVE) { // then I am pinging 00471 if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'ALIVE'"); 00472 try { 00473 if (connection_) { 00474 connection_->ping("<qos/>"); 00475 if ( log_.trace() ) log_.trace(ME, "lowlevel ping returned: status is 'ALIVE'"); 00476 startPinger(false); 00477 } 00478 } 00479 catch (XmlBlasterException& ex) { 00480 if ( log_.trace() ) log_.trace(ME, "lowlevel ping failed: " + ex.toString()); 00481 toPollingOrDead(&ex); 00482 } 00483 return; 00484 } 00485 00486 if (status_ == POLLING) { 00487 if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'POLLING'"); 00488 try { 00489 if (connection_ && !connectQos_.isNull()) { 00490 if ( log_.trace() ) log_.trace(ME, "ping timeout: going to retry a connection"); 00491 00492 string lastSessionId = connectQos_->getSessionQos().getSecretSessionId(); 00493 connectReturnQos_ = connection_->connect(*connectQos_); 00494 if (log_.trace()) log_.trace(ME, string("Successfully reconnected, ConnectRetQos: ") + connectReturnQos_->toXml()); 00495 string sessionId = connectReturnQos_->getSessionQos().getSecretSessionId(); 00496 log_.info(ME, string("Successfully reconnected as '") + connectReturnQos_->getSessionQos().getAbsoluteName() + 00497 "' after " + lexical_cast<string>(currentRetry_) + " attempts"); 00498 connectQos_->getSessionQos().setSecretSessionId(sessionId); 00499 00500 if ( log_.trace() ) { 00501 log_.trace(ME, string("ping timeout: re-connection, the new connect returnQos: ") + connectReturnQos_->toXml()); 00502 } 00503 00504 bool doFlush = true; 00505 enum States oldState = status_; 00506 status_ = ALIVE; 00507 if ( connectionProblemsListener_ ) doFlush = connectionProblemsListener_->reachedAlive(oldState, this); 00508 00509 Lock lockPub(publishMutex_); // lock here to avoid publishing while flushing queue (to ensure sequence) 00510 if (sessionId != lastSessionId) { 00511 log_.trace(ME, string("When reconnecting the sessionId changed from '") + lastSessionId + "' to '" + sessionId + "'"); 00512 } 00513 00514 if (doFlush) { 00515 try { 00516 flushQueueUnlocked(queue_, true); 00517 } 00518 catch (const XmlBlasterException &ex) { 00519 log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue:" + ex.getMessage()); 00520 } 00521 catch (...) { 00522 log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue"); 00523 } 00524 } 00525 startPinger(false); 00526 } 00527 } 00528 catch (XmlBlasterException ex) { 00529 if (log_.trace()) log_.trace(ME, "timeout got exception: " + ex.getMessage()); 00530 currentRetry_++; 00531 if ( currentRetry_ < retries_ || retries_ < 0) { // continue to poll 00532 startPinger(false); 00533 } 00534 else { 00535 enum States oldState = status_; 00536 status_ = DEAD; 00537 if ( connectionProblemsListener_ ) { 00538 connectionProblemsListener_->reachedDead(oldState, this); 00539 // stopping 00540 } 00541 } 00542 } 00543 return; 00544 } 00545 00546 // if it comes here it will stop 00547 00548 } 00549 00550 SubscribeReturnQos ConnectionsHandler::queueSubscribe(const SubscribeKey& key, const SubscribeQos& qos) 00551 { 00552 if (!queue_) { 00553 if (connectQos_.isNull()) { 00554 throw XmlBlasterException(INTERNAL_SUBSCRIBE, ME + "::queueSubscribe", "need to create a queue but the connectQos is NULL (probably never connected)"); 00555 } 00556 if (log_.trace()) log_.trace(ME+":queueSubscribe", "creating a client queue ..."); 00557 queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty()); 00558 if (log_.trace()) log_.trace(ME+":queueSubscribe", "created a client queue"); 00559 } 00560 SubscribeReturnQos retQos(global_); 00561 SubscribeQos& q = const_cast<SubscribeQos&>(qos); 00562 SessionNameRef sessionName = global_.getSessionName(); 00563 std::string subscriptionId = q.generateSubscriptionId(sessionName, key); 00564 retQos.getData().setSubscriptionId(subscriptionId); 00565 retQos.getData().setState(Constants::STATE_OK); 00566 retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED" 00567 qos.setSubscriptionId(subscriptionId); 00568 SubscribeQueueEntry entry(global_, key, qos, qos.getData().getPriority()); 00569 queue_->put(entry); 00570 //if (log_.trace()) 00571 log_.warn(ME, string("queueSubscribe: entry '") + key.getOid() + 00572 "' has been queued with client side generated subscriptionId=" + subscriptionId); 00573 return retQos; 00574 } 00575 00576 PublishReturnQos ConnectionsHandler::queuePublish(const MessageUnit& msgUnit) 00577 { 00578 if (log_.call()) log_.call(ME, "queuePublish"); 00579 if (!queue_) { 00580 if (connectQos_.isNull()) { 00581 throw XmlBlasterException(INTERNAL_PUBLISH, ME + "::queuePublish", "need to create a queue but the connectQos is NULL (probably never connected)"); 00582 } 00583 if (log_.trace()) log_.trace(ME+":queuePublish", "creating a client queue ..."); 00584 queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty()); 00585 if (log_.trace()) log_.trace(ME+":queuePublish", "created a client queue"); 00586 } 00587 if (log_.trace()) 00588 log_.trace(ME, string("queuePublish: entry '") + msgUnit.getKey().getOid() + "' has been queued"); 00589 PublishReturnQos retQos(global_); 00590 retQos.setKeyOid(msgUnit.getKey().getOid()); 00591 retQos.setState(Constants::STATE_OK); 00592 retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED" 00593 PublishQueueEntry entry(global_, msgUnit, msgUnit.getQos().getPriority()); 00594 queue_->put(entry); 00595 return retQos; 00596 } 00597 00598 ConnectReturnQosRef& ConnectionsHandler::queueConnect() 00599 { 00600 if (log_.call()) log_.call(ME, string("::queueConnect with sessionQos: '") + connectQos_->getSessionQos().getAbsoluteName() + "'"); 00601 long tmp = connectQos_->getSessionQos().getPubSessionId(); 00602 if ( tmp <= 0) { 00603 if (log_.trace()) log_.trace(ME, string("::queueConnect, the public session id is '") + lexical_cast<std::string>(tmp)); 00604 throw XmlBlasterException(USER_CONNECT, ME + "::queueConnect", "queueing connection request not possible because you did not specify a positive public sessionId"); 00605 } 00606 00607 if (!queue_) { 00608 if (log_.trace()) log_.info(ME, "::queueConnect: created a client queue"); 00609 queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty()); 00610 } 00611 if (log_.trace()) 00612 log_.trace(ME, string("queueConnect: entry '") + connectQos_->getSessionQos().getAbsoluteName() + "' has been queued"); 00613 00614 connectReturnQos_ = new ConnectReturnQos(*connectQos_); 00615 00616 /* Michele thinks we should not queue the ConnectQos 00617 ConnectQueueEntry entry(global_, *connectQos_); 00618 queue_->put(entry); 00619 */ 00620 enum States oldState = status_; 00621 status_ = POLLING; 00622 if ( connectionProblemsListener_ ) { 00623 connectionProblemsListener_->reachedPolling(oldState, this); 00624 // stopping 00625 } 00626 startPinger(true); 00627 return connectReturnQos_; 00628 } 00629 00630 I_PostSendListener* ConnectionsHandler::registerPostSendListener(I_PostSendListener *listener) { 00631 I_PostSendListener* old = postSendListener_; 00632 postSendListener_ = listener; 00633 return old; 00634 } 00635 00642 long ConnectionsHandler::flushQueue() 00643 { 00644 if (log_.call()) log_.call(ME, "flushQueue"); 00645 // Lock lock(connectionMutex_); 00646 00647 if (!queue_) { 00648 if (connectQos_.isNull()) { 00649 log_.error(ME+".flusgQueue", "need to create a queue but the connectQos is NULL (probably never connected)"); 00650 } 00651 if (log_.trace()) log_.trace(ME+".flushQueue", "creating the client queue ..."); 00652 queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty()); 00653 if (queue_->getNumOfEntries() < 1) { 00654 if (log_.trace()) log_.trace(ME+".flushQueue", "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + 00655 "], it is empty, nothing to do."); 00656 return 0; 00657 } 00658 log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " + 00659 lexical_cast<string>(queue_->getNumOfEntries()) + " entries."); 00660 } 00661 00662 return flushQueueUnlocked(queue_, true); 00663 } 00664 00665 00666 long ConnectionsHandler::flushQueueUnlocked(I_Queue *queueToFlush, bool doRemove) 00667 { 00668 if ( log_.call() ) log_.call(ME, "flushQueueUnlocked"); 00669 if (!queueToFlush || queueToFlush->empty()) return 0; 00670 if (status_ != ALIVE || connection_ == NULL) return -1; 00671 00672 long ret = 0; 00673 if (!queueToFlush->empty()) { 00674 log_.info(ME, "Queue [" + queue_->getType() + "][" + queue_->getVersion() + "] contains " + 00675 lexical_cast<string>(queue_->getNumOfEntries()) + " entries, we send them to the server"); 00676 } 00677 while (!queueToFlush->empty()) { 00678 long maxNumOfEntries= (doRemove) ? 1 : -1; // doRemove==false makes no sense, TODO: remove this arg 00679 if (log_.trace()) log_.trace(ME, "flushQueueUnlocked: flushing one priority sweep maxNumOfEntries=" + lexical_cast<string>(maxNumOfEntries)); 00680 const vector<EntryType> entries = queueToFlush->peekWithSamePriority(maxNumOfEntries); 00681 vector<EntryType>::const_iterator iter = entries.begin(); 00682 while (iter != entries.end()) { 00683 try { 00684 if (log_.trace()) log_.trace(ME, "sending the content to xmlBlaster: " + (*iter)->toXml()); 00685 const EntryType entry = (*iter); 00686 const MsgQueueEntry &entry2 = *entry; 00687 { 00688 MsgQueueEntry &entry3 = const_cast<MsgQueueEntry&>(entry2); 00689 entry3.setSender(connectReturnQos_->getSessionQos().getSessionName()); 00690 } 00691 entry2.send(*this); // entry2 contains the PublishReturnQos after calling send 00692 if (log_.trace()) log_.trace(ME, "content to xmlBlaster successfully sent"); 00693 00694 I_PostSendListener *p = postSendListener_; 00695 if (p) { 00696 p->postSend(entry2); 00697 } 00698 } 00699 catch (XmlBlasterException &ex) { 00700 if (ex.isCommunication()) toPollingOrDead(&ex); 00701 log_.warn(ME, "flushQueueUnlocked: can't send queued message to server: " + ex.getMessage()); 00702 //if (doRemove) queueToFlush->randomRemove(entries.begin(), iter); 00703 throw ex; 00704 } 00705 iter++; 00706 } 00707 if (doRemove) { 00708 //log_.trace(ME, "remove send message from client queue"); 00709 ret += queueToFlush->randomRemove(entries.begin(), entries.end()); 00710 } 00711 } 00712 return ret; 00713 } 00714 00715 I_Queue* ConnectionsHandler::getQueue() 00716 { 00717 if (!queue_) { 00718 if (log_.trace()) log_.trace(ME+".getQueue", "creating the client queue ..."); 00719 queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty()); 00720 log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " + 00721 lexical_cast<string>(queue_->getNumOfEntries()) + " entries."); 00722 } 00723 return queue_; 00724 } 00725 00726 bool ConnectionsHandler::isFailsafe() const 00727 { 00728 if (connectQos_.isNull()) return false; 00729 return connectQos_->getAddress()->getDelay() > 0; 00730 } 00731 00732 // pinger or poller 00733 bool ConnectionsHandler::startPinger(bool withInitialPing) 00734 { 00735 if (log_.call()) log_.call(ME, "startPinger"); 00736 if (doStopPing_) return false; 00737 00738 if (log_.trace()) log_.trace(ME, "startPinger (no request to stop the pinger is active for the moment)"); 00739 if (pingPollTimerKey_ != 0 && !withInitialPing) { 00740 if (log_.trace()) log_.trace(ME, "startPinger: the pinger is already running. I will return without starting a new thread"); 00741 return false; 00742 } 00743 00744 long delay = 10000; 00745 long pingInterval = 0; 00746 if (connectQos_.isNull()) { 00747 ConnectQos tmp(global_); 00748 delay = tmp.getAddress()->getDelay(); 00749 pingInterval = tmp.getAddress()->getPingInterval(); 00750 } 00751 else { 00752 delay = connectQos_->getAddress()->getDelay(); 00753 pingInterval = connectQos_->getAddress()->getPingInterval(); 00754 } 00755 if (log_.trace()) { 00756 log_.trace(ME, string("startPinger(status=") + 00757 getStatusString() + 00758 "): parameters are: delay '" + lexical_cast<std::string>(delay) + 00759 "' and pingInterval '" + lexical_cast<std::string>(pingInterval) + 00760 " withInitialPing=" + lexical_cast<string>(withInitialPing)); 00761 } 00762 if (delay > 0 && pingInterval > 0) { 00763 long delta = delay; 00764 if (status_ == ALIVE) delta = pingInterval; 00765 if (withInitialPing) delta = 400; 00766 pingPollTimerKey_ = global_.getPingTimer().addOrRefreshTimeoutListener(this, delta, NULL, pingPollTimerKey_); 00767 } 00768 return true; 00769 } 00770 00771 string ConnectionsHandler::getStatusString() const 00772 { 00773 if (status_ == ALIVE) return "ALIVE"; 00774 else if (status_ == POLLING) return "POLLING"; 00775 else if (status_ == DEAD) return "DEAD"; 00776 else if (status_ == START) return "START"; 00777 return "END";; 00778 } 00779 00780 00781 bool ConnectionsHandler::isConnected() const 00782 { 00783 return status_ == ALIVE || status_ == POLLING; 00784 } 00785 00786 bool ConnectionsHandler::isAlive() const 00787 { 00788 return status_ == ALIVE; 00789 } 00790 00791 bool ConnectionsHandler::isPolling() const 00792 { 00793 return status_ == POLLING; 00794 } 00795 00796 bool ConnectionsHandler::isDead() const 00797 { 00798 return status_ == DEAD; 00799 } 00800 00801 ConnectReturnQosRef ConnectionsHandler::connectRaw(const ConnectQosRef& connectQos) 00802 { 00803 if (log_.call()) log_.call(ME, "::connectRaw"); 00804 connectReturnQos_ = connection_->connect(connectQos); 00805 connectQos_ = connectQos; 00806 log_.info(ME, string("Successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'"); 00807 connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId()); 00808 return connectReturnQos_; 00809 } 00810 00811 00812 I_XmlBlasterConnection& ConnectionsHandler::getConnection() const 00813 { 00814 if (!connection_) { 00815 throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::getConnection", "the connection is still NULL: it is not assigned yet. You probably called this method before a connection was made"); 00816 } 00817 return *connection_; 00818 } 00819 00820 00821 ConnectReturnQosRef ConnectionsHandler::getConnectReturnQos() 00822 { 00823 return connectReturnQos_; 00824 } 00825 00826 ConnectQosRef ConnectionsHandler::getConnectQos() 00827 { 00828 return connectReturnQos_; // contains everything and is typedef on ConnectQos 00829 } 00830 00831 /* 00832 void ConnectionsHandler::setConnectReturnQos(const connectReturnQos& retQos) 00833 { 00834 if (connectReturnQos_) { 00835 delete connectReturnQos_; 00836 connectReturnQos_ = NULL; 00837 } 00838 connectReturnQos_ = new ConnectReturnQos(retQos); 00839 } 00840 */ 00841 00842 }}}} // namespaces 00843 00844