1 /*------------------------------------------------------------------------------
  2 Name:      ConnectionsHandler.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Handles the I_XmlBlasterConnections 
  6 ------------------------------------------------------------------------------*/
  7 
  8 #include <util/dispatch/ConnectionsHandler.h>
  9 #include <util/Global.h>
 10 #include <util/Timeout.hpp>
 11 #include <util/Timestamp.h>
 12 #include <util/Constants.h>
 13 #include <util/lexical_cast.h>
 14 #include <util/queue/QueueFactory.h>
 15 #include <util/queue/PublishQueueEntry.h>
 16 #include <util/queue/ConnectQueueEntry.h>
 17 #include <util/queue/SubscribeQueueEntry.h>
 18 
 19 namespace org { namespace xmlBlaster { namespace util { namespace dispatch {
 20 
 21 using namespace std;
 22 using namespace org::xmlBlaster::client::protocol;
 23 using namespace org::xmlBlaster::client;
 24 using namespace org::xmlBlaster::util;
 25 using namespace org::xmlBlaster::util::qos;
 26 using namespace org::xmlBlaster::util::thread;
 27 using namespace org::xmlBlaster::util::qos::storage;
 28 using namespace org::xmlBlaster::util::queue;
 29 using namespace org::xmlBlaster::client::qos;
 30 using namespace org::xmlBlaster::client::key;
 31 
 32 ConnectionsHandler::ConnectionsHandler(org::xmlBlaster::util::Global& global,
 33                                        const string& instanceName)
 34    : ME(string("ConnectionsHandler-") + instanceName), 
 35      connectQos_((ConnectQos*)0),
 36      connectReturnQos_((ConnectReturnQos*)0),
 37      status_(START), 
 38      global_(global), 
 39      log_(global.getLog("org.xmlBlaster.util.dispatch")),
 40      connectMutex_(),
 41      publishMutex_(),
 42      postSendListener_(0),
 43      instanceName_(instanceName)
 44 {
 45    ClientQueueProperty prop(global_, "");
 46    connectionProblemsListener_ = NULL;
 47    connection_         = NULL;
 48    queue_              = NULL;
 49    retries_            = -1;
 50    currentRetry_       = 0;
 51    pingPollTimerKey_   = 0;
 52    doStopPing_         = false;
 53    if (log_.call()) log_.call(ME, "constructor");
 54 }
 55 
 56 ConnectionsHandler::~ConnectionsHandler()
 57 {
 58    if (log_.call()) log_.call(ME, "destructor");
 59    if (pingPollTimerKey_ != 0) {
 60       global_.getPingTimer().removeTimeoutListener(pingPollTimerKey_);
 61       pingPollTimerKey_ = 0;
 62    }
 63    doStopPing_ = true;
 64    /*
 65    while (pingIsStarted_) {
 66       Thread::sleep(200);
 67    }
 68    */
 69    Lock lock(connectMutex_);
 70    string type = (connectQos_.isNull()) ? org::xmlBlaster::util::Global::getDefaultProtocol() : connectQos_->getAddress()->getType(); // "SOCKET"
 71    string version = "1.0"; // currently hardcoded
 72    if (connection_) {
 73       global_.getDispatchManager().releasePlugin(instanceName_, type, version);
 74       connection_ = NULL;
 75    }
 76    if ( queue_ ) {
 77       delete queue_;
 78       queue_ = NULL;
 79    }
 80    if (log_.trace()) log_.trace(ME, "destructor: going to delete the connectQos");
 81    status_ = END;
 82    if (log_.trace()) log_.trace(ME, "destructor ended");
 83 } 
 84 
 85 
 86 ConnectReturnQosRef ConnectionsHandler::connect(const ConnectQosRef& qos)
 87 {
 88    if (log_.call()) log_.call(ME, string("::connect status is '") + lexical_cast<std::string>(status_) + "'");
 89    if (qos.isNull()) {
 90       throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::connect", "your connectQos is null");
 91    }
 92    if (log_.dump()) log_.dump(ME, string("::connect, the qos is: ") + qos->toXml());
 93    Lock lock(connectMutex_);
 94    if (isConnected()) {
 95       log_.warn(ME, "connect: you are already connected");
 96       return connectReturnQos_;
 97    }
 98 
 99    connectQos_ = qos;
100 
101    global_.setSessionName(connectQos_->getSessionQos().getSessionName());
102    global_.setImmutableId(connectQos_->getSessionQos().getRelativeName());
103    global_.setId(connectQos_->getSessionQos().getAbsoluteName()); // temporary
104    //log_.info(ME, "BEFORE id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName());
105 
106    retries_ = connectQos_->getAddress()->getRetries();
107    long pingInterval = connectQos_->getAddress()->getPingInterval();
108    if (log_.trace()) {
109       log_.trace(ME, string("connect: number of retries during communication failure: ") + lexical_cast<std::string>(retries_));
110       log_.trace(ME, string("connect: Ping Interval: ") + lexical_cast<std::string>(pingInterval));
111    }
112 
113    string type = connectQos_->getAddress()->getType();
114    string version = "1.0"; // currently hardcoded
115    if (!connection_) {
116       connection_ = &(global_.getDispatchManager().getPlugin(instanceName_, type, version));
117    }
118 
119    try {
120       connectReturnQos_ = connection_->connect(*connectQos_);
121       global_.setSessionName(connectReturnQos_->getSessionQos().getSessionName());
122       // For "joe/1" it remains immutable; For "joe" there is added the server side generated sessionId "joe/-33":
123       global_.setImmutableId(connectReturnQos_->getSessionQos().getRelativeName());
124       global_.setId(connectReturnQos_->getSessionQos().getAbsoluteName());
125                 //log_.info(ME, "AFTER id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName());
126    }
127    catch (XmlBlasterException &ex) {
128       if ((ex.isCommunication() || ex.getErrorCodeStr().find("user.configuration") == 0)) {
129          log_.warn(ME, "Got exception when connecting, polling now: " + ex.toString());
130          if (pingPollTimerKey_ == 0)
131             startPinger(false);
132          return queueConnect();
133       }
134       else {
135          if (log_.trace()) log_.trace(ME, string("the exception in connect is ") + ex.toXml());
136          throw ex;
137       }
138    }                                                                                                                                                                                                                                                                                    
139    
140    log_.info(ME, string("successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'");
141    connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId());
142 
143    enum States oldState = status_;
144    status_ = ALIVE;
145    if (connectionProblemsListener_) connectionProblemsListener_->reachedAlive(oldState, this);
146    // start the ping if in failsafe, i.e. if delay > 0
147    startPinger(false);
148    if (log_.dump()) log_.dump(ME, string("::connect, the return qos is: ") + connectReturnQos_->toXml());
149 
150    flushQueue();
151 
152    return connectReturnQos_;
153 }
154 
155 bool ConnectionsHandler::disconnect(const DisconnectQos& qos)
156 {
157    Lock lock(connectMutex_);
158    if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::DISCONNECT);
159    if (log_.dump()) log_.dump(ME, string("::disconnect, the qos is: ") + qos.toXml());
160 
161    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::DISCONNECT);
162    if (status_ == DEAD) {
163       log_.warn(ME, "already disconnected");
164       return false;
165    }
166    if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::DISCONNECT);
167 
168    if (qos.getClearClientQueue() && queue_ != 0) queue_->clear();
169 
170    bool ret = connection_->disconnect(qos);
171    enum States oldState = status_;
172    status_ = DEAD;
173    if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this);
174    return ret;
175 }
176 
177 string ConnectionsHandler::getProtocol()
178 {
179    return connection_->getProtocol();
180 }
181 
182 /*
183 string ConnectionsHandler::loginRaw()
184 {
185    return connection_->loginRaw();
186 }
187 */
188 
189 bool ConnectionsHandler::shutdown()
190 {
191    if (connection_) {
192       return connection_->shutdown();
193    }
194    return false;
195 }
196 
197 string ConnectionsHandler::getLoginName() 
198 {
199    return connection_->getLoginName();
200 }
201 
202 bool ConnectionsHandler::isLoggedIn()
203 {
204    return connection_->isLoggedIn();
205 }
206 
207 string ConnectionsHandler::ping(const string& qos)
208 {
209 //   Lock lock(connectionMutex_);
210    return connection_->ping(qos);
211 }
212 
213 SubscribeReturnQos ConnectionsHandler::subscribe(const SubscribeKey& key, const SubscribeQos& qos)
214 {
215    if (log_.call()) log_.call(ME, MethodName::SUBSCRIBE);
216    if (log_.dump()) log_.dump(ME, string("::subscribe, the key is: ") + key.toXml());
217    if (log_.dump()) log_.dump(ME, string("::subscribe, the qos is: ") + qos.toXml());
218 
219 //   Lock lock(connectionMutex_);
220 
221    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, MethodName::SUBSCRIBE);
222    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, MethodName::SUBSCRIBE);
223    if (putToQueue()) return queueSubscribe(key, qos);
224    try {
225       SubscribeReturnQos ret = connection_->subscribe(key, qos);
226       return ret;
227    }   
228    catch (XmlBlasterException& ex) {
229       toPollingOrDead(&ex);
230       if (putToQueue() && isRecoverable(&ex)) {
231          log_.info(ME, string("::subscribe ") + key.getOid() + " is queued, exception=" + ex.getMessage());
232          return queueSubscribe(key, qos);
233       }
234       else {
235          log_.warn(ME, string("::subscribe failed throwing now exception: ") + key.toXml() + qos.toXml() + " exception=" + ex.getMessage());
236          throw ex;
237       }
238    }
239 }
240 
241 
242 vector<MessageUnit> ConnectionsHandler::get(const GetKey& key, const GetQos& qos)
243 {
244    if (log_.call()) log_.call(ME, "get");
245    if (log_.dump()) log_.dump(ME, string("::get, the key is: ") + key.toXml());
246    if (log_.dump()) log_.dump(ME, string("::get, the qos is: ") + qos.toXml());
247    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "get");
248    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "get");
249    if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, "get");
250    try {
251       return connection_->get(key, qos);
252    }   
253    catch (XmlBlasterException& ex) {
254       toPollingOrDead(&ex);
255       throw ex;
256    }
257 }
258 
259 
260 vector<UnSubscribeReturnQos> 
261    ConnectionsHandler::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
262 {
263    if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
264    if (log_.dump()) log_.dump(ME, string("::unSubscribe, the key is: ") + key.toXml());
265    if (log_.dump()) log_.dump(ME, string("::unSubscribe, the qos is: ") + qos.toXml());
266    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
267    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
268    if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
269    try {
270       vector<UnSubscribeReturnQos> ret = connection_->unSubscribe(key, qos);
271       return ret;
272    }   
273    catch (XmlBlasterException& ex) {
274       toPollingOrDead(&ex);
275       throw ex;
276    }
277 }
278 
279 bool ConnectionsHandler::putToQueue() {
280    if (status_ == POLLING) return true;
281    if (queue_ && queue_->getNumOfEntries() > 0) {
282       return true; // guarantee sequence
283    }
284    return false;
285 }
286 
287 PublishReturnQos ConnectionsHandler::publish(const MessageUnit& msgUnit)
288 {
289    if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::PUBLISH);
290    if (log_.dump()) log_.dump(ME, string("::publish, the msgUnit is: ") + msgUnit.toXml());
291    Lock lock(publishMutex_);
292    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::PUBLISH);
293    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::PUBLISH);
294    if (putToQueue()) return queuePublish(msgUnit);
295    try {
296       // fill in the sender absolute name
297       if (!connectReturnQos_.isNull()) {
298          msgUnit.getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());
299       }
300       return connection_->publish(msgUnit);
301    }   
302    catch (XmlBlasterException& ex) {
303       toPollingOrDead(&ex);
304       if (putToQueue() && isRecoverable(&ex)) {
305          log_.info(ME, string("::publish ") + msgUnit.getKey().getOid() + " is queued, exception=" + ex.getMessage());
306          return queuePublish(msgUnit);
307       }
308       else {
309          log_.warn(ME, string("::publish failed throwing now exception, the msgUnit is: ") + msgUnit.toXml() + " exception=" + ex.getMessage());
310          throw ex;
311       }
312    }
313 }
314 
315 
316 void ConnectionsHandler::publishOneway(const vector<MessageUnit> &msgUnitArr)
317 {
318    if (log_.call()) log_.call(ME, "publishOneway");
319    Lock lock(publishMutex_);
320 
321    // fill in the sender absolute name
322    if (!connectReturnQos_.isNull()) {
323       for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) {
324          msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());
325       }
326    }
327 
328    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishOneway");
329    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishOneway");
330    if (putToQueue()) {
331       for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]);
332    }
333 
334    try {
335       connection_->publishOneway(msgUnitArr);
336    }   
337    catch (XmlBlasterException& ex) {
338       toPollingOrDead(&ex);
339       if (putToQueue() && isRecoverable(&ex)) {
340          for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]);
341       }
342       else
343          throw ex;
344    }
345 }
346 
347 
348 vector<PublishReturnQos> ConnectionsHandler::publishArr(const vector<MessageUnit> &msgUnitArr)
349 {
350    if (log_.call()) log_.call(ME, "publishArr");
351    Lock lock(publishMutex_);
352 
353    // fill in the sender absolute name
354    if (!connectReturnQos_.isNull()) {
355       for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) {
356          msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());
357       }
358    }
359 
360    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishArr");
361    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishArr");
362    if (putToQueue()) {
363       vector<PublishReturnQos> retQos;
364       for (size_t i=0; i < msgUnitArr.size(); i++) {
365          retQos.insert(retQos.end(), queuePublish(msgUnitArr[i]));
366       }
367       return retQos;
368    }
369    try {
370       return connection_->publishArr(msgUnitArr);
371    }   
372    catch (XmlBlasterException& ex) {
373       toPollingOrDead(&ex);
374       if (putToQueue() && isRecoverable(&ex)) {
375          vector<PublishReturnQos> retQos;
376          for (size_t i=0; i < msgUnitArr.size(); i++) {
377             retQos.insert(retQos.end(), queuePublish(msgUnitArr[i]));
378          }
379          return retQos;
380       }
381       else throw ex;
382    }
383 }
384 
385 
386 vector<EraseReturnQos> ConnectionsHandler::erase(const EraseKey& key, const EraseQos& qos)
387 {
388    if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::ERASE);
389    if (log_.dump()) log_.dump(ME, string("::erase, the key is: ") + key.toXml());
390    if (log_.dump()) log_.dump(ME, string("::erase, the qos is: ") + qos.toXml());
391 
392    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::ERASE);
393    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::ERASE);
394    if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::ERASE);
395 
396    try {
397       return connection_->erase(key, qos);
398    }   
399    catch (XmlBlasterException& ex) {
400       toPollingOrDead(&ex);
401       throw ex;
402    }
403 }
404 
405 void ConnectionsHandler::initFailsafe(I_ConnectionProblems* connectionProblems)
406 {
407 //   Lock lock(connectionMutex_);
408    if (log_.trace()) log_.trace(ME, "Register initFailsafe " + lexical_cast<string>(connectionProblems!=0));
409    connectionProblemsListener_ = connectionProblems;
410 }
411 
412 // If recoverable we queue a msgUnit, else we throw an exception
413 bool ConnectionsHandler::isRecoverable(const org::xmlBlaster::util::XmlBlasterException* reason)
414 {
415         // TODO: Authorization could also be recoverable (by a server admin)
416         //       Such decision must be left to the user (we need a callback to the user here)
417         // As a default all communication problems are assumed to be recoverable
418         if (reason == 0)
419                 return true;
420         bool ret = reason->isCommunication();
421     if (log_.call()) log_.call(ME, "isRecoverable " + lexical_cast<string>(ret));
422         return ret;
423 }
424 
425 void ConnectionsHandler::toDead(const org::xmlBlaster::util::XmlBlasterException* reason)
426 {
427    if (log_.call()) log_.call(ME, "toDead");
428    enum States oldState = status_;
429    log_.info(ME, "going into DEAD status" + ((reason != 0) ? (": " + reason->getMessage()) : ""));
430    status_ = DEAD;
431    connection_->shutdown(); // close socket/corba connection
432    if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this);
433 }
434 
435 void ConnectionsHandler::toPolling(const org::xmlBlaster::util::XmlBlasterException* reason)
436 {
437    log_.info(ME, "going into POLLING status:" + ((reason != 0) ? (": " + reason->getMessage()) : ""));
438    enum States oldState = status_;
439    status_ = POLLING;
440    currentRetry_ = 0;
441    /*
442    try {
443       DisconnectQos discQos(global_);
444       connection_->disconnect(discQos);
445    }
446    catch (...) {
447       log_.warn(ME, "exception when trying to disconnect");
448    }
449    */
450    connection_->shutdown(); // close socket/corba connection
451    if (connectionProblemsListener_) connectionProblemsListener_->reachedPolling(oldState, this);
452    startPinger(true);
453 }
454 
455 void ConnectionsHandler::toPollingOrDead(const org::xmlBlaster::util::XmlBlasterException* reason)
456 {
457    if (reason == 0)
458       return;
459    if (!reason->isCommunication())
460       return;
461 
462    if (log_.call()) log_.call(ME, "toPollingOrDead");
463 
464    if (!isFailsafe()) {
465       log_.info(ME, "going into DEAD status since not in failsafe mode. "
466                     "For failsafe mode set 'delay' to a positive long value, for example on the cmd line: -delay 10000" +
467                     ((reason != 0) ? (": " + reason->getMessage()) : ""));
468       toDead(reason);
469       return;
470    }
471 
472    toPolling(reason);
473 }
474 
475 
476 void ConnectionsHandler::timeout(void * /*userData*/)
477 {
478                                                     
479   Lock lock(connectMutex_);
480    pingPollTimerKey_ = 0;
481    if (doStopPing_) return; // then it must stop
482    if ( log_.call() ) log_.call(ME, string("ping timeout occured with status '") + getStatusString() + "'" );
483    if (status_ == ALIVE) { // then I am pinging
484       if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'ALIVE'");
485       try {
486          if (connection_) {
487             connection_->ping("<qos/>");
488             if ( log_.trace() ) log_.trace(ME, "lowlevel ping returned: status is 'ALIVE'");
489             startPinger(false);
490          }
491       }
492       catch (XmlBlasterException& ex) {
493          if ( log_.trace() ) log_.trace(ME, "lowlevel ping failed: " + ex.toString());
494          toPollingOrDead(&ex);
495       }
496       return;
497    }
498  
499    if (status_ == POLLING) {
500       if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'POLLING'");
501       try {
502          if (connection_ && !connectQos_.isNull()) {
503             if ( log_.trace() ) log_.trace(ME, "ping timeout: going to retry a connection");
504  
505             string lastSessionId = connectQos_->getSessionQos().getSecretSessionId();
506             connectReturnQos_ = connection_->connect(*connectQos_);
507             if (log_.trace()) log_.trace(ME, string("Successfully reconnected, ConnectRetQos: ") + connectReturnQos_->toXml());
508             string sessionId = connectReturnQos_->getSessionQos().getSecretSessionId();
509             log_.info(ME, string("Successfully reconnected as '") + connectReturnQos_->getSessionQos().getAbsoluteName() +
510                           "' after " + lexical_cast<string>(currentRetry_) + " attempts");
511             connectQos_->getSessionQos().setSecretSessionId(sessionId);
512  
513             if ( log_.trace() ) {
514                log_.trace(ME, string("ping timeout: re-connection, the new connect returnQos: ") + connectReturnQos_->toXml());
515             }
516  
517             bool doFlush = true;
518             enum States oldState = status_;
519             status_ = ALIVE;
520             if ( connectionProblemsListener_ ) doFlush = connectionProblemsListener_->reachedAlive(oldState, this);
521  
522             Lock lockPub(publishMutex_); // lock here to avoid publishing while flushing queue (to ensure sequence)
523             if (sessionId != lastSessionId) {
524                log_.trace(ME, string("When reconnecting the sessionId changed from '") + lastSessionId + "' to '" + sessionId + "'");
525             }
526  
527             if (doFlush) {
528                try {
529                   flushQueueUnlocked(queue_, true);
530                }
531                catch (const XmlBlasterException &ex) {
532                   log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue:" + ex.getMessage());
533                }
534                catch (...) {
535                   log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue");
536                }
537             }
538             startPinger(false);
539          }
540       }
541       catch (XmlBlasterException ex) {
542          if (log_.trace()) log_.trace(ME, "timeout got exception: " + ex.getMessage());
543          currentRetry_++;
544          if ( currentRetry_ < retries_ || retries_ < 0) { // continue to poll
545             startPinger(false);
546          }
547          else {
548             enum States oldState = status_;
549             status_ = DEAD;
550             if ( connectionProblemsListener_ ) {
551                connectionProblemsListener_->reachedDead(oldState, this);
552                // stopping
553             }
554          }
555       }
556       return;
557    }
558  
559    // if it comes here it will stop
560  
561 }
562 
563 SubscribeReturnQos ConnectionsHandler::queueSubscribe(const SubscribeKey& key, const SubscribeQos& qos)
564 {
565    if (!queue_) {
566       if (connectQos_.isNull()) {
567          throw XmlBlasterException(INTERNAL_SUBSCRIBE, ME + "::queueSubscribe", "need to create a queue but the connectQos is NULL (probably never connected)");
568       }
569       if (log_.trace()) log_.trace(ME+":queueSubscribe", "creating a client queue ...");
570       queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
571       if (log_.trace()) log_.trace(ME+":queueSubscribe", "created a client queue");
572    }
573    SubscribeReturnQos retQos(global_);
574    SubscribeQos& q = const_cast<SubscribeQos&>(qos);
575    SessionNameRef sessionName = global_.getSessionName();
576    std::string subscriptionId = q.generateSubscriptionId(sessionName, key);
577    retQos.getData().setSubscriptionId(subscriptionId);
578    retQos.getData().setState(Constants::STATE_OK);
579    retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED"
580    qos.setSubscriptionId(subscriptionId);
581    SubscribeQueueEntry entry(global_, key, qos, qos.getData().getPriority());
582    queue_->put(entry);
583    //if (log_.trace()) 
584       log_.warn(ME, string("queueSubscribe: entry '") + key.getOid() +
585                      "' has been queued with client side generated subscriptionId=" + subscriptionId);
586    return retQos;
587 }
588 
589 PublishReturnQos ConnectionsHandler::queuePublish(const MessageUnit& msgUnit)
590 {
591    if (log_.call()) log_.call(ME, "queuePublish");
592    if (!queue_) {
593       if (connectQos_.isNull()) {
594          throw XmlBlasterException(INTERNAL_PUBLISH, ME + "::queuePublish", "need to create a queue but the connectQos is NULL (probably never connected)");
595       }
596       if (log_.trace()) log_.trace(ME+":queuePublish", "creating a client queue ...");
597       queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
598       if (log_.trace()) log_.trace(ME+":queuePublish", "created a client queue");
599    }
600    if (log_.trace()) 
601       log_.trace(ME, string("queuePublish: entry '") + msgUnit.getKey().getOid() + "' has been queued");
602    PublishReturnQos retQos(global_);
603    retQos.setKeyOid(msgUnit.getKey().getOid());
604    retQos.setState(Constants::STATE_OK);
605    retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED"
606    PublishQueueEntry entry(global_, msgUnit, msgUnit.getQos().getPriority());
607    queue_->put(entry);
608    return retQos;
609 }
610 
611 ConnectReturnQosRef& ConnectionsHandler::queueConnect()
612 {
613    if (log_.call()) log_.call(ME, string("::queueConnect with sessionQos: '") + connectQos_->getSessionQos().getAbsoluteName() + "'");
614    long tmp = connectQos_->getSessionQos().getPubSessionId(); 
615    if ( tmp <= 0) {
616       if (log_.trace()) log_.trace(ME, string("::queueConnect, the public session id is '") + lexical_cast<std::string>(tmp));
617       throw XmlBlasterException(USER_CONNECT, ME + "::queueConnect", "queueing connection request not possible because you did not specify a positive public sessionId");
618    }
619 
620    if (!queue_) {
621       if (log_.trace()) log_.info(ME, "::queueConnect: created a client queue");
622       queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
623    }
624    if (log_.trace()) 
625       log_.trace(ME, string("queueConnect: entry '") + connectQos_->getSessionQos().getAbsoluteName() + "' has been queued");
626 
627    connectReturnQos_ = new ConnectReturnQos(*connectQos_);
628 
629    /* Michele thinks we should not queue the ConnectQos
630    ConnectQueueEntry entry(global_, *connectQos_);
631    queue_->put(entry);
632    */
633    enum States oldState = status_;
634    status_ = POLLING;
635    if ( connectionProblemsListener_ ) {
636       connectionProblemsListener_->reachedPolling(oldState, this);
637       // stopping
638    }
639    startPinger(true);
640    return connectReturnQos_;
641 }
642 
643 I_PostSendListener* ConnectionsHandler::registerPostSendListener(I_PostSendListener *listener) {
644    I_PostSendListener* old = postSendListener_; 
645    postSendListener_ = listener;
646    return old;
647 }
648 
649 /**
650  * Flushes all entries in the queue, i.e. the entries of the queue are sent to xmlBlaster.
651  * If the queue is empty or NULL, then 0 is returned. If the state is in POLLING or DEAD, or the 
652  * connection is not established yet (i.e. connection_ = NULL),  then -1 is
653  * returned.. This method blocks until all entries in the queue have been sent.
654  */
655 long ConnectionsHandler::flushQueue()
656 {
657    if (log_.call()) log_.call(ME, "flushQueue");
658    //   Lock lock(connectionMutex_);
659 
660    if (!queue_) {
661       if (connectQos_.isNull()) {
662          log_.error(ME+".flusgQueue", "need to create a queue but the connectQos is NULL (probably never connected)");
663       }
664       if (log_.trace()) log_.trace(ME+".flushQueue", "creating the client queue ...");
665       queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
666       if (queue_->getNumOfEntries() < 1) {
667          if (log_.trace()) log_.trace(ME+".flushQueue", "Created queue [" + queue_->getType() + "][" + queue_->getVersion() +
668                                                         "], it is empty, nothing to do.");
669          return 0;
670       }
671       log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " +
672                     lexical_cast<string>(queue_->getNumOfEntries()) + " entries.");
673    }
674 
675    return flushQueueUnlocked(queue_, true);
676 }  
677 
678 // Notify client code
679 bool ConnectionsHandler::sendingFailedNotification(const std::vector<org::xmlBlaster::util::queue::EntryType> &entries, const org::xmlBlaster::util::XmlBlasterException &exception)
680 {
681     I_PostSendListener *p = postSendListener_;
682     bool isHandled = false; 
683     if (p) {
684        try {
685           isHandled = p->sendingFailed(entries, exception); // Notify client code
686        }
687        catch (...) {
688           log_.error(ME, "flushQueueUnlocked(async sending mode): Ignoring exception thrown from user code sendingFailed(...)");
689        }
690     }
691     return isHandled;
692 }
693 
694 
695 /**
696  * Called synchronously by connect() 
697  * or async by timeout of reconnect poller
698  */
699 long ConnectionsHandler::flushQueueUnlocked(I_Queue *queueToFlush, bool doRemove)
700 {
701    if ( log_.call() ) log_.call(ME, "flushQueueUnlocked");
702            if (!queueToFlush || queueToFlush->empty()) return 0;
703    if (status_ != ALIVE || connection_ == NULL) return -1;
704 
705    long ret = 0;
706    if (!queueToFlush->empty()) {
707       log_.info(ME, "Queue [" + queue_->getType() + "][" + queue_->getVersion() + "] contains " +
708                   lexical_cast<string>(queue_->getNumOfEntries()) + " entries, we send them to the server");
709    }
710    while (!queueToFlush->empty()) { 
711       long maxNumOfEntries= (doRemove) ? 1 : -1; // doRemove==false makes no sense, TODO: remove this arg
712       if (log_.trace()) log_.trace(ME, "flushQueueUnlocked: flushing one priority sweep maxNumOfEntries=" + lexical_cast<string>(maxNumOfEntries));
713       const vector<EntryType> entries = queueToFlush->peekWithSamePriority(maxNumOfEntries);
714       vector<EntryType>::const_iterator iter = entries.begin();
715       bool isHandled = false;
716       while (iter != entries.end()) {
717          const EntryType entry = (*iter);
718          iter++;
719          const MsgQueueEntry &entry2 = *entry;
720 
721          try {
722             if (log_.trace()) log_.trace(ME, "sending the content to xmlBlaster: " + entry->toXml());
723             {
724                MsgQueueEntry &entry3 = const_cast<MsgQueueEntry&>(entry2);
725                entry3.setSender(connectReturnQos_->getSessionQos().getSessionName());
726             }
727             entry2.send(*this); // entry2 contains the PublishReturnQos after calling send
728             isHandled = false;
729             if (log_.trace()) log_.trace(ME, "content to xmlBlaster successfully sent");
730          }
731          catch (XmlBlasterException &ex) {
732             if (ex.isCommunication()) {
733                toPollingOrDead(&ex);
734                throw ex; // Terminate timer thread //return;
735             }
736             log_.warn(ME, "flushQueueUnlocked(async sending mode): can't send queued message " + entry->toXml() + " to server: " + ex.getMessage());
737             isHandled = sendingFailedNotification(entries, ex); // Notify client code 
738             if (isHandled) {
739                if (log_.trace()) log_.trace(ME, "message is handled by user code, we remove it now from queue: " + entry->toXml());
740             }
741             else {
742                toDead(&ex);
743                // Instead of toDead we should set dispatcher active false so that a C++ client can later reactivate it?
744                throw ex; // Up to timer thread or back to user
745             }
746          }
747          catch (...) {
748             XmlBlasterException ex = XmlBlasterException(INTERNAL_UNKNOWN, ME, "flushQueueUnlocked(async sending mode): can't send queued message, reason is unknown");
749             log_.warn(ME, "flushQueueUnlocked(async sending mode): can't send queued message " + entry->toXml() + " to server because of unknown exception");
750             isHandled = sendingFailedNotification(entries, ex); // Notify client code 
751             if (isHandled) {
752                if (log_.trace()) log_.trace(ME, "message is handled by user code, we remove it now from queue: " + entry->toXml());
753             }
754             else {
755                toDead(&ex);
756                // We should set dispatcher active false so that a C++ client can later reactivate it?
757                throw ex;
758             }
759          }
760       }
761       if (doRemove) {
762           //log_.trace(ME, "remove send message from client queue");
763           ret += queueToFlush->randomRemove(entries.begin(), entries.end());
764       }
765       if (!isHandled) {
766               I_PostSendListener *p = postSendListener_;
767               if (p) {
768                   p->postSend(entries);
769               }
770       }
771    }
772    return ret;
773 }
774 
775 I_Queue* ConnectionsHandler::getQueue()
776 {
777    if (!queue_) {
778       if (log_.trace()) log_.trace(ME+".getQueue", "creating the client queue ...");
779       queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
780       log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " +
781                     lexical_cast<string>(queue_->getNumOfEntries()) + " entries.");
782    }
783    return queue_;
784 }
785 
786 bool ConnectionsHandler::isFailsafe() const
787 {
788    if (connectQos_.isNull()) return false;
789    return connectQos_->getAddress()->getDelay() > 0;
790 }
791 
792 // pinger or poller
793 bool ConnectionsHandler::startPinger(bool withInitialPing)
794 {
795    if (log_.call()) log_.call(ME, "startPinger");
796    if (doStopPing_) return false;
797 
798    if (log_.trace()) log_.trace(ME, "startPinger (no request to stop the pinger is active for the moment)");
799    if (pingPollTimerKey_ != 0 && !withInitialPing) {
800       if (log_.trace()) log_.trace(ME, "startPinger: the pinger is already running. I will return without starting a new thread");
801       return false;  
802    }
803 
804    long delay        = 10000;
805    long pingInterval = 0;
806    if (connectQos_.isNull()) {
807       ConnectQos tmp(global_);
808       delay        = tmp.getAddress()->getDelay();
809       pingInterval = tmp.getAddress()->getPingInterval();
810    }
811    else {
812       delay        = connectQos_->getAddress()->getDelay();
813       pingInterval = connectQos_->getAddress()->getPingInterval();
814    }
815    if (log_.trace()) {
816       log_.trace(ME, string("startPinger(status=") + 
817                getStatusString() +
818                "): parameters are: delay '" + lexical_cast<std::string>(delay) +
819                "' and pingInterval '" + lexical_cast<std::string>(pingInterval) +
820                " withInitialPing=" + lexical_cast<string>(withInitialPing));
821    }
822    if (delay > 0 && pingInterval > 0) {
823       long delta = delay;
824       if (status_ == ALIVE) delta = pingInterval;
825       if (withInitialPing) delta = 400;
826       pingPollTimerKey_ = global_.getPingTimer().addOrRefreshTimeoutListener(this, delta, NULL, pingPollTimerKey_);
827    }
828    return true;
829 }
830 
831 string ConnectionsHandler::getStatusString() const
832 {
833    if (status_ == ALIVE) return "ALIVE";
834    else if (status_ == POLLING) return "POLLING";
835    else if (status_ == DEAD) return "DEAD";
836    else if (status_ == START) return "START";
837    return "END";;
838 }
839 
840 
841 bool ConnectionsHandler::isConnected() const
842 {
843    return status_ == ALIVE || status_ == POLLING;
844 }
845 
846 bool ConnectionsHandler::isAlive() const
847 {
848    return status_ == ALIVE;
849 }
850 
851 bool ConnectionsHandler::isPolling() const
852 {
853    return status_ == POLLING;
854 }
855 
856 bool ConnectionsHandler::isDead() const
857 {
858    return status_ == DEAD;
859 }
860 
861 ConnectReturnQosRef ConnectionsHandler::connectRaw(const ConnectQosRef& connectQos)
862 {
863    if (log_.call()) log_.call(ME, "::connectRaw");
864    connectReturnQos_ = connection_->connect(connectQos);
865    connectQos_ = connectQos;
866    log_.info(ME, string("Successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'");
867    connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId());
868    return connectReturnQos_;
869 }
870 
871 
872 I_XmlBlasterConnection& ConnectionsHandler::getConnection() const
873 {
874    if (!connection_) {
875       throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::getConnection", "the connection is still NULL: it is not assigned yet. You probably called this method before a connection was made");
876    }
877    return *connection_;
878 }
879 
880 
881 ConnectReturnQosRef ConnectionsHandler::getConnectReturnQos()
882 {
883    return connectReturnQos_;
884 }
885 
886 ConnectQosRef ConnectionsHandler::getConnectQos()
887 {
888    return connectReturnQos_; // contains everything and is typedef on ConnectQos
889 }
890 
891 /*
892 void ConnectionsHandler::setConnectReturnQos(const connectReturnQos& retQos)
893 {
894    if (connectReturnQos_)  {
895       delete connectReturnQos_;
896       connectReturnQos_ = NULL;
897    }
898    connectReturnQos_ = new ConnectReturnQos(retQos);
899 }
900 */
901 
902 }}}} // namespaces


syntax highlighted by Code2HTML, v. 0.9.1