1 /*------------------------------------------------------------------------------
2 Name: ConnectionsHandler.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Handles the I_XmlBlasterConnections
6 ------------------------------------------------------------------------------*/
7
8 #include <util/dispatch/ConnectionsHandler.h>
9 #include <util/Global.h>
10 #include <util/Timeout.hpp>
11 #include <util/Timestamp.h>
12 #include <util/Constants.h>
13 #include <util/lexical_cast.h>
14 #include <util/queue/QueueFactory.h>
15 #include <util/queue/PublishQueueEntry.h>
16 #include <util/queue/ConnectQueueEntry.h>
17 #include <util/queue/SubscribeQueueEntry.h>
18
19 namespace org { namespace xmlBlaster { namespace util { namespace dispatch {
20
21 using namespace std;
22 using namespace org::xmlBlaster::client::protocol;
23 using namespace org::xmlBlaster::client;
24 using namespace org::xmlBlaster::util;
25 using namespace org::xmlBlaster::util::qos;
26 using namespace org::xmlBlaster::util::thread;
27 using namespace org::xmlBlaster::util::qos::storage;
28 using namespace org::xmlBlaster::util::queue;
29 using namespace org::xmlBlaster::client::qos;
30 using namespace org::xmlBlaster::client::key;
31
32 ConnectionsHandler::ConnectionsHandler(org::xmlBlaster::util::Global& global,
33 const string& instanceName)
34 : ME(string("ConnectionsHandler-") + instanceName),
35 connectQos_((ConnectQos*)0),
36 connectReturnQos_((ConnectReturnQos*)0),
37 status_(START),
38 global_(global),
39 log_(global.getLog("org.xmlBlaster.util.dispatch")),
40 connectMutex_(),
41 publishMutex_(),
42 postSendListener_(0),
43 instanceName_(instanceName)
44 {
45 ClientQueueProperty prop(global_, "");
46 connectionProblemsListener_ = NULL;
47 connection_ = NULL;
48 queue_ = NULL;
49 retries_ = -1;
50 currentRetry_ = 0;
51 pingPollTimerKey_ = 0;
52 doStopPing_ = false;
53 if (log_.call()) log_.call(ME, "constructor");
54 }
55
56 ConnectionsHandler::~ConnectionsHandler()
57 {
58 if (log_.call()) log_.call(ME, "destructor");
59 if (pingPollTimerKey_ != 0) {
60 global_.getPingTimer().removeTimeoutListener(pingPollTimerKey_);
61 pingPollTimerKey_ = 0;
62 }
63 doStopPing_ = true;
64 /*
65 while (pingIsStarted_) {
66 Thread::sleep(200);
67 }
68 */
69 Lock lock(connectMutex_);
70 string type = (connectQos_.isNull()) ? org::xmlBlaster::util::Global::getDefaultProtocol() : connectQos_->getAddress()->getType(); // "SOCKET"
71 string version = "1.0"; // currently hardcoded
72 if (connection_) {
73 global_.getDispatchManager().releasePlugin(instanceName_, type, version);
74 connection_ = NULL;
75 }
76 if ( queue_ ) {
77 delete queue_;
78 queue_ = NULL;
79 }
80 if (log_.trace()) log_.trace(ME, "destructor: going to delete the connectQos");
81 status_ = END;
82 if (log_.trace()) log_.trace(ME, "destructor ended");
83 }
84
85
86 ConnectReturnQosRef ConnectionsHandler::connect(const ConnectQosRef& qos)
87 {
88 if (log_.call()) log_.call(ME, string("::connect status is '") + lexical_cast<std::string>(status_) + "'");
89 if (qos.isNull()) {
90 throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::connect", "your connectQos is null");
91 }
92 if (log_.dump()) log_.dump(ME, string("::connect, the qos is: ") + qos->toXml());
93 Lock lock(connectMutex_);
94 if (isConnected()) {
95 log_.warn(ME, "connect: you are already connected");
96 return connectReturnQos_;
97 }
98
99 connectQos_ = qos;
100
101 global_.setSessionName(connectQos_->getSessionQos().getSessionName());
102 global_.setImmutableId(connectQos_->getSessionQos().getRelativeName());
103 global_.setId(connectQos_->getSessionQos().getAbsoluteName()); // temporary
104 //log_.info(ME, "BEFORE id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName());
105
106 retries_ = connectQos_->getAddress()->getRetries();
107 long pingInterval = connectQos_->getAddress()->getPingInterval();
108 if (log_.trace()) {
109 log_.trace(ME, string("connect: number of retries during communication failure: ") + lexical_cast<std::string>(retries_));
110 log_.trace(ME, string("connect: Ping Interval: ") + lexical_cast<std::string>(pingInterval));
111 }
112
113 string type = connectQos_->getAddress()->getType();
114 string version = "1.0"; // currently hardcoded
115 if (!connection_) {
116 connection_ = &(global_.getDispatchManager().getPlugin(instanceName_, type, version));
117 }
118
119 try {
120 connectReturnQos_ = connection_->connect(*connectQos_);
121 global_.setSessionName(connectReturnQos_->getSessionQos().getSessionName());
122 // For "joe/1" it remains immutable; For "joe" there is added the server side generated sessionId "joe/-33":
123 global_.setImmutableId(connectReturnQos_->getSessionQos().getRelativeName());
124 global_.setId(connectReturnQos_->getSessionQos().getAbsoluteName());
125 //log_.info(ME, "AFTER id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName());
126 }
127 catch (XmlBlasterException &ex) {
128 if ((ex.isCommunication() || ex.getErrorCodeStr().find("user.configuration") == 0)) {
129 log_.warn(ME, "Got exception when connecting, polling now: " + ex.toString());
130 if (pingPollTimerKey_ == 0)
131 startPinger(false);
132 return queueConnect();
133 }
134 else {
135 if (log_.trace()) log_.trace(ME, string("the exception in connect is ") + ex.toXml());
136 throw ex;
137 }
138 }
139
140 log_.info(ME, string("successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'");
141 connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId());
142
143 enum States oldState = status_;
144 status_ = ALIVE;
145 if (connectionProblemsListener_) connectionProblemsListener_->reachedAlive(oldState, this);
146 // start the ping if in failsafe, i.e. if delay > 0
147 startPinger(false);
148 if (log_.dump()) log_.dump(ME, string("::connect, the return qos is: ") + connectReturnQos_->toXml());
149
150 flushQueue();
151
152 return connectReturnQos_;
153 }
154
155 bool ConnectionsHandler::disconnect(const DisconnectQos& qos)
156 {
157 Lock lock(connectMutex_);
158 if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::DISCONNECT);
159 if (log_.dump()) log_.dump(ME, string("::disconnect, the qos is: ") + qos.toXml());
160
161 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::DISCONNECT);
162 if (status_ == DEAD) {
163 log_.warn(ME, "already disconnected");
164 return false;
165 }
166 if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::DISCONNECT);
167
168 if (qos.getClearClientQueue() && queue_ != 0) queue_->clear();
169
170 bool ret = connection_->disconnect(qos);
171 enum States oldState = status_;
172 status_ = DEAD;
173 if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this);
174 return ret;
175 }
176
177 string ConnectionsHandler::getProtocol()
178 {
179 return connection_->getProtocol();
180 }
181
182 /*
183 string ConnectionsHandler::loginRaw()
184 {
185 return connection_->loginRaw();
186 }
187 */
188
189 bool ConnectionsHandler::shutdown()
190 {
191 if (connection_) {
192 return connection_->shutdown();
193 }
194 return false;
195 }
196
197 string ConnectionsHandler::getLoginName()
198 {
199 return connection_->getLoginName();
200 }
201
202 bool ConnectionsHandler::isLoggedIn()
203 {
204 return connection_->isLoggedIn();
205 }
206
207 string ConnectionsHandler::ping(const string& qos)
208 {
209 // Lock lock(connectionMutex_);
210 return connection_->ping(qos);
211 }
212
213 SubscribeReturnQos ConnectionsHandler::subscribe(const SubscribeKey& key, const SubscribeQos& qos)
214 {
215 if (log_.call()) log_.call(ME, MethodName::SUBSCRIBE);
216 if (log_.dump()) log_.dump(ME, string("::subscribe, the key is: ") + key.toXml());
217 if (log_.dump()) log_.dump(ME, string("::subscribe, the qos is: ") + qos.toXml());
218
219 // Lock lock(connectionMutex_);
220
221 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, MethodName::SUBSCRIBE);
222 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, MethodName::SUBSCRIBE);
223 if (putToQueue()) return queueSubscribe(key, qos);
224 try {
225 SubscribeReturnQos ret = connection_->subscribe(key, qos);
226 return ret;
227 }
228 catch (XmlBlasterException& ex) {
229 toPollingOrDead(&ex);
230 if (putToQueue() && isRecoverable(&ex)) {
231 log_.info(ME, string("::subscribe ") + key.getOid() + " is queued, exception=" + ex.getMessage());
232 return queueSubscribe(key, qos);
233 }
234 else {
235 log_.warn(ME, string("::subscribe failed throwing now exception: ") + key.toXml() + qos.toXml() + " exception=" + ex.getMessage());
236 throw ex;
237 }
238 }
239 }
240
241
242 vector<MessageUnit> ConnectionsHandler::get(const GetKey& key, const GetQos& qos)
243 {
244 if (log_.call()) log_.call(ME, "get");
245 if (log_.dump()) log_.dump(ME, string("::get, the key is: ") + key.toXml());
246 if (log_.dump()) log_.dump(ME, string("::get, the qos is: ") + qos.toXml());
247 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "get");
248 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "get");
249 if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, "get");
250 try {
251 return connection_->get(key, qos);
252 }
253 catch (XmlBlasterException& ex) {
254 toPollingOrDead(&ex);
255 throw ex;
256 }
257 }
258
259
260 vector<UnSubscribeReturnQos>
261 ConnectionsHandler::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
262 {
263 if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
264 if (log_.dump()) log_.dump(ME, string("::unSubscribe, the key is: ") + key.toXml());
265 if (log_.dump()) log_.dump(ME, string("::unSubscribe, the qos is: ") + qos.toXml());
266 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
267 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
268 if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
269 try {
270 vector<UnSubscribeReturnQos> ret = connection_->unSubscribe(key, qos);
271 return ret;
272 }
273 catch (XmlBlasterException& ex) {
274 toPollingOrDead(&ex);
275 throw ex;
276 }
277 }
278
279 bool ConnectionsHandler::putToQueue() {
280 if (status_ == POLLING) return true;
281 if (queue_ && queue_->getNumOfEntries() > 0) {
282 return true; // guarantee sequence
283 }
284 return false;
285 }
286
287 PublishReturnQos ConnectionsHandler::publish(const MessageUnit& msgUnit)
288 {
289 if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::PUBLISH);
290 if (log_.dump()) log_.dump(ME, string("::publish, the msgUnit is: ") + msgUnit.toXml());
291 Lock lock(publishMutex_);
292 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::PUBLISH);
293 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::PUBLISH);
294 if (putToQueue()) return queuePublish(msgUnit);
295 try {
296 // fill in the sender absolute name
297 if (!connectReturnQos_.isNull()) {
298 msgUnit.getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());
299 }
300 return connection_->publish(msgUnit);
301 }
302 catch (XmlBlasterException& ex) {
303 toPollingOrDead(&ex);
304 if (putToQueue() && isRecoverable(&ex)) {
305 log_.info(ME, string("::publish ") + msgUnit.getKey().getOid() + " is queued, exception=" + ex.getMessage());
306 return queuePublish(msgUnit);
307 }
308 else {
309 log_.warn(ME, string("::publish failed throwing now exception, the msgUnit is: ") + msgUnit.toXml() + " exception=" + ex.getMessage());
310 throw ex;
311 }
312 }
313 }
314
315
316 void ConnectionsHandler::publishOneway(const vector<MessageUnit> &msgUnitArr)
317 {
318 if (log_.call()) log_.call(ME, "publishOneway");
319 Lock lock(publishMutex_);
320
321 // fill in the sender absolute name
322 if (!connectReturnQos_.isNull()) {
323 for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) {
324 msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());
325 }
326 }
327
328 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishOneway");
329 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishOneway");
330 if (putToQueue()) {
331 for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]);
332 }
333
334 try {
335 connection_->publishOneway(msgUnitArr);
336 }
337 catch (XmlBlasterException& ex) {
338 toPollingOrDead(&ex);
339 if (putToQueue() && isRecoverable(&ex)) {
340 for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]);
341 }
342 else
343 throw ex;
344 }
345 }
346
347
348 vector<PublishReturnQos> ConnectionsHandler::publishArr(const vector<MessageUnit> &msgUnitArr)
349 {
350 if (log_.call()) log_.call(ME, "publishArr");
351 Lock lock(publishMutex_);
352
353 // fill in the sender absolute name
354 if (!connectReturnQos_.isNull()) {
355 for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) {
356 msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());
357 }
358 }
359
360 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishArr");
361 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishArr");
362 if (putToQueue()) {
363 vector<PublishReturnQos> retQos;
364 for (size_t i=0; i < msgUnitArr.size(); i++) {
365 retQos.insert(retQos.end(), queuePublish(msgUnitArr[i]));
366 }
367 return retQos;
368 }
369 try {
370 return connection_->publishArr(msgUnitArr);
371 }
372 catch (XmlBlasterException& ex) {
373 toPollingOrDead(&ex);
374 if (putToQueue() && isRecoverable(&ex)) {
375 vector<PublishReturnQos> retQos;
376 for (size_t i=0; i < msgUnitArr.size(); i++) {
377 retQos.insert(retQos.end(), queuePublish(msgUnitArr[i]));
378 }
379 return retQos;
380 }
381 else throw ex;
382 }
383 }
384
385
386 vector<EraseReturnQos> ConnectionsHandler::erase(const EraseKey& key, const EraseQos& qos)
387 {
388 if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::ERASE);
389 if (log_.dump()) log_.dump(ME, string("::erase, the key is: ") + key.toXml());
390 if (log_.dump()) log_.dump(ME, string("::erase, the qos is: ") + qos.toXml());
391
392 if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::ERASE);
393 if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::ERASE);
394 if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::ERASE);
395
396 try {
397 return connection_->erase(key, qos);
398 }
399 catch (XmlBlasterException& ex) {
400 toPollingOrDead(&ex);
401 throw ex;
402 }
403 }
404
405 void ConnectionsHandler::initFailsafe(I_ConnectionProblems* connectionProblems)
406 {
407 // Lock lock(connectionMutex_);
408 if (log_.trace()) log_.trace(ME, "Register initFailsafe " + lexical_cast<string>(connectionProblems!=0));
409 connectionProblemsListener_ = connectionProblems;
410 }
411
412 // If recoverable we queue a msgUnit, else we throw an exception
413 bool ConnectionsHandler::isRecoverable(const org::xmlBlaster::util::XmlBlasterException* reason)
414 {
415 // TODO: Authorization could also be recoverable (by a server admin)
416 // Such decision must be left to the user (we need a callback to the user here)
417 // As a default all communication problems are assumed to be recoverable
418 if (reason == 0)
419 return true;
420 bool ret = reason->isCommunication();
421 if (log_.call()) log_.call(ME, "isRecoverable " + lexical_cast<string>(ret));
422 return ret;
423 }
424
425 void ConnectionsHandler::toDead(const org::xmlBlaster::util::XmlBlasterException* reason)
426 {
427 if (log_.call()) log_.call(ME, "toDead");
428 enum States oldState = status_;
429 log_.info(ME, "going into DEAD status" + ((reason != 0) ? (": " + reason->getMessage()) : ""));
430 status_ = DEAD;
431 connection_->shutdown(); // close socket/corba connection
432 if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this);
433 }
434
435 void ConnectionsHandler::toPolling(const org::xmlBlaster::util::XmlBlasterException* reason)
436 {
437 log_.info(ME, "going into POLLING status:" + ((reason != 0) ? (": " + reason->getMessage()) : ""));
438 enum States oldState = status_;
439 status_ = POLLING;
440 currentRetry_ = 0;
441 /*
442 try {
443 DisconnectQos discQos(global_);
444 connection_->disconnect(discQos);
445 }
446 catch (...) {
447 log_.warn(ME, "exception when trying to disconnect");
448 }
449 */
450 connection_->shutdown(); // close socket/corba connection
451 if (connectionProblemsListener_) connectionProblemsListener_->reachedPolling(oldState, this);
452 startPinger(true);
453 }
454
455 void ConnectionsHandler::toPollingOrDead(const org::xmlBlaster::util::XmlBlasterException* reason)
456 {
457 if (reason == 0)
458 return;
459 if (!reason->isCommunication())
460 return;
461
462 if (log_.call()) log_.call(ME, "toPollingOrDead");
463
464 if (!isFailsafe()) {
465 log_.info(ME, "going into DEAD status since not in failsafe mode. "
466 "For failsafe mode set 'delay' to a positive long value, for example on the cmd line: -delay 10000" +
467 ((reason != 0) ? (": " + reason->getMessage()) : ""));
468 toDead(reason);
469 return;
470 }
471
472 toPolling(reason);
473 }
474
475
476 void ConnectionsHandler::timeout(void * /*userData*/)
477 {
478
479 Lock lock(connectMutex_);
480 pingPollTimerKey_ = 0;
481 if (doStopPing_) return; // then it must stop
482 if ( log_.call() ) log_.call(ME, string("ping timeout occured with status '") + getStatusString() + "'" );
483 if (status_ == ALIVE) { // then I am pinging
484 if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'ALIVE'");
485 try {
486 if (connection_) {
487 connection_->ping("<qos/>");
488 if ( log_.trace() ) log_.trace(ME, "lowlevel ping returned: status is 'ALIVE'");
489 startPinger(false);
490 }
491 }
492 catch (XmlBlasterException& ex) {
493 if ( log_.trace() ) log_.trace(ME, "lowlevel ping failed: " + ex.toString());
494 toPollingOrDead(&ex);
495 }
496 return;
497 }
498
499 if (status_ == POLLING) {
500 if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'POLLING'");
501 try {
502 if (connection_ && !connectQos_.isNull()) {
503 if ( log_.trace() ) log_.trace(ME, "ping timeout: going to retry a connection");
504
505 string lastSessionId = connectQos_->getSessionQos().getSecretSessionId();
506 connectReturnQos_ = connection_->connect(*connectQos_);
507 if (log_.trace()) log_.trace(ME, string("Successfully reconnected, ConnectRetQos: ") + connectReturnQos_->toXml());
508 string sessionId = connectReturnQos_->getSessionQos().getSecretSessionId();
509 log_.info(ME, string("Successfully reconnected as '") + connectReturnQos_->getSessionQos().getAbsoluteName() +
510 "' after " + lexical_cast<string>(currentRetry_) + " attempts");
511 connectQos_->getSessionQos().setSecretSessionId(sessionId);
512
513 if ( log_.trace() ) {
514 log_.trace(ME, string("ping timeout: re-connection, the new connect returnQos: ") + connectReturnQos_->toXml());
515 }
516
517 bool doFlush = true;
518 enum States oldState = status_;
519 status_ = ALIVE;
520 if ( connectionProblemsListener_ ) doFlush = connectionProblemsListener_->reachedAlive(oldState, this);
521
522 Lock lockPub(publishMutex_); // lock here to avoid publishing while flushing queue (to ensure sequence)
523 if (sessionId != lastSessionId) {
524 log_.trace(ME, string("When reconnecting the sessionId changed from '") + lastSessionId + "' to '" + sessionId + "'");
525 }
526
527 if (doFlush) {
528 try {
529 flushQueueUnlocked(queue_, true);
530 }
531 catch (const XmlBlasterException &ex) {
532 log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue:" + ex.getMessage());
533 }
534 catch (...) {
535 log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue");
536 }
537 }
538 startPinger(false);
539 }
540 }
541 catch (XmlBlasterException ex) {
542 if (log_.trace()) log_.trace(ME, "timeout got exception: " + ex.getMessage());
543 currentRetry_++;
544 if ( currentRetry_ < retries_ || retries_ < 0) { // continue to poll
545 startPinger(false);
546 }
547 else {
548 enum States oldState = status_;
549 status_ = DEAD;
550 if ( connectionProblemsListener_ ) {
551 connectionProblemsListener_->reachedDead(oldState, this);
552 // stopping
553 }
554 }
555 }
556 return;
557 }
558
559 // if it comes here it will stop
560
561 }
562
563 SubscribeReturnQos ConnectionsHandler::queueSubscribe(const SubscribeKey& key, const SubscribeQos& qos)
564 {
565 if (!queue_) {
566 if (connectQos_.isNull()) {
567 throw XmlBlasterException(INTERNAL_SUBSCRIBE, ME + "::queueSubscribe", "need to create a queue but the connectQos is NULL (probably never connected)");
568 }
569 if (log_.trace()) log_.trace(ME+":queueSubscribe", "creating a client queue ...");
570 queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
571 if (log_.trace()) log_.trace(ME+":queueSubscribe", "created a client queue");
572 }
573 SubscribeReturnQos retQos(global_);
574 SubscribeQos& q = const_cast<SubscribeQos&>(qos);
575 SessionNameRef sessionName = global_.getSessionName();
576 std::string subscriptionId = q.generateSubscriptionId(sessionName, key);
577 retQos.getData().setSubscriptionId(subscriptionId);
578 retQos.getData().setState(Constants::STATE_OK);
579 retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED"
580 qos.setSubscriptionId(subscriptionId);
581 SubscribeQueueEntry entry(global_, key, qos, qos.getData().getPriority());
582 queue_->put(entry);
583 //if (log_.trace())
584 log_.warn(ME, string("queueSubscribe: entry '") + key.getOid() +
585 "' has been queued with client side generated subscriptionId=" + subscriptionId);
586 return retQos;
587 }
588
589 PublishReturnQos ConnectionsHandler::queuePublish(const MessageUnit& msgUnit)
590 {
591 if (log_.call()) log_.call(ME, "queuePublish");
592 if (!queue_) {
593 if (connectQos_.isNull()) {
594 throw XmlBlasterException(INTERNAL_PUBLISH, ME + "::queuePublish", "need to create a queue but the connectQos is NULL (probably never connected)");
595 }
596 if (log_.trace()) log_.trace(ME+":queuePublish", "creating a client queue ...");
597 queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
598 if (log_.trace()) log_.trace(ME+":queuePublish", "created a client queue");
599 }
600 if (log_.trace())
601 log_.trace(ME, string("queuePublish: entry '") + msgUnit.getKey().getOid() + "' has been queued");
602 PublishReturnQos retQos(global_);
603 retQos.setKeyOid(msgUnit.getKey().getOid());
604 retQos.setState(Constants::STATE_OK);
605 retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED"
606 PublishQueueEntry entry(global_, msgUnit, msgUnit.getQos().getPriority());
607 queue_->put(entry);
608 return retQos;
609 }
610
611 ConnectReturnQosRef& ConnectionsHandler::queueConnect()
612 {
613 if (log_.call()) log_.call(ME, string("::queueConnect with sessionQos: '") + connectQos_->getSessionQos().getAbsoluteName() + "'");
614 long tmp = connectQos_->getSessionQos().getPubSessionId();
615 if ( tmp <= 0) {
616 if (log_.trace()) log_.trace(ME, string("::queueConnect, the public session id is '") + lexical_cast<std::string>(tmp));
617 throw XmlBlasterException(USER_CONNECT, ME + "::queueConnect", "queueing connection request not possible because you did not specify a positive public sessionId");
618 }
619
620 if (!queue_) {
621 if (log_.trace()) log_.info(ME, "::queueConnect: created a client queue");
622 queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
623 }
624 if (log_.trace())
625 log_.trace(ME, string("queueConnect: entry '") + connectQos_->getSessionQos().getAbsoluteName() + "' has been queued");
626
627 connectReturnQos_ = new ConnectReturnQos(*connectQos_);
628
629 /* Michele thinks we should not queue the ConnectQos
630 ConnectQueueEntry entry(global_, *connectQos_);
631 queue_->put(entry);
632 */
633 enum States oldState = status_;
634 status_ = POLLING;
635 if ( connectionProblemsListener_ ) {
636 connectionProblemsListener_->reachedPolling(oldState, this);
637 // stopping
638 }
639 startPinger(true);
640 return connectReturnQos_;
641 }
642
643 I_PostSendListener* ConnectionsHandler::registerPostSendListener(I_PostSendListener *listener) {
644 I_PostSendListener* old = postSendListener_;
645 postSendListener_ = listener;
646 return old;
647 }
648
649 /**
650 * Flushes all entries in the queue, i.e. the entries of the queue are sent to xmlBlaster.
651 * If the queue is empty or NULL, then 0 is returned. If the state is in POLLING or DEAD, or the
652 * connection is not established yet (i.e. connection_ = NULL), then -1 is
653 * returned.. This method blocks until all entries in the queue have been sent.
654 */
655 long ConnectionsHandler::flushQueue()
656 {
657 if (log_.call()) log_.call(ME, "flushQueue");
658 // Lock lock(connectionMutex_);
659
660 if (!queue_) {
661 if (connectQos_.isNull()) {
662 log_.error(ME+".flusgQueue", "need to create a queue but the connectQos is NULL (probably never connected)");
663 }
664 if (log_.trace()) log_.trace(ME+".flushQueue", "creating the client queue ...");
665 queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
666 if (queue_->getNumOfEntries() < 1) {
667 if (log_.trace()) log_.trace(ME+".flushQueue", "Created queue [" + queue_->getType() + "][" + queue_->getVersion() +
668 "], it is empty, nothing to do.");
669 return 0;
670 }
671 log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " +
672 lexical_cast<string>(queue_->getNumOfEntries()) + " entries.");
673 }
674
675 return flushQueueUnlocked(queue_, true);
676 }
677
678 // Notify client code
679 bool ConnectionsHandler::sendingFailedNotification(const std::vector<org::xmlBlaster::util::queue::EntryType> &entries, const org::xmlBlaster::util::XmlBlasterException &exception)
680 {
681 I_PostSendListener *p = postSendListener_;
682 bool isHandled = false;
683 if (p) {
684 try {
685 isHandled = p->sendingFailed(entries, exception); // Notify client code
686 }
687 catch (...) {
688 log_.error(ME, "flushQueueUnlocked(async sending mode): Ignoring exception thrown from user code sendingFailed(...)");
689 }
690 }
691 return isHandled;
692 }
693
694
695 /**
696 * Called synchronously by connect()
697 * or async by timeout of reconnect poller
698 */
699 long ConnectionsHandler::flushQueueUnlocked(I_Queue *queueToFlush, bool doRemove)
700 {
701 if ( log_.call() ) log_.call(ME, "flushQueueUnlocked");
702 if (!queueToFlush || queueToFlush->empty()) return 0;
703 if (status_ != ALIVE || connection_ == NULL) return -1;
704
705 long ret = 0;
706 if (!queueToFlush->empty()) {
707 log_.info(ME, "Queue [" + queue_->getType() + "][" + queue_->getVersion() + "] contains " +
708 lexical_cast<string>(queue_->getNumOfEntries()) + " entries, we send them to the server");
709 }
710 while (!queueToFlush->empty()) {
711 long maxNumOfEntries= (doRemove) ? 1 : -1; // doRemove==false makes no sense, TODO: remove this arg
712 if (log_.trace()) log_.trace(ME, "flushQueueUnlocked: flushing one priority sweep maxNumOfEntries=" + lexical_cast<string>(maxNumOfEntries));
713 const vector<EntryType> entries = queueToFlush->peekWithSamePriority(maxNumOfEntries);
714 vector<EntryType>::const_iterator iter = entries.begin();
715 bool isHandled = false;
716 while (iter != entries.end()) {
717 const EntryType entry = (*iter);
718 iter++;
719 const MsgQueueEntry &entry2 = *entry;
720
721 try {
722 if (log_.trace()) log_.trace(ME, "sending the content to xmlBlaster: " + entry->toXml());
723 {
724 MsgQueueEntry &entry3 = const_cast<MsgQueueEntry&>(entry2);
725 entry3.setSender(connectReturnQos_->getSessionQos().getSessionName());
726 }
727 entry2.send(*this); // entry2 contains the PublishReturnQos after calling send
728 isHandled = false;
729 if (log_.trace()) log_.trace(ME, "content to xmlBlaster successfully sent");
730 }
731 catch (XmlBlasterException &ex) {
732 if (ex.isCommunication()) {
733 toPollingOrDead(&ex);
734 throw ex; // Terminate timer thread //return;
735 }
736 log_.warn(ME, "flushQueueUnlocked(async sending mode): can't send queued message " + entry->toXml() + " to server: " + ex.getMessage());
737 isHandled = sendingFailedNotification(entries, ex); // Notify client code
738 if (isHandled) {
739 if (log_.trace()) log_.trace(ME, "message is handled by user code, we remove it now from queue: " + entry->toXml());
740 }
741 else {
742 toDead(&ex);
743 // Instead of toDead we should set dispatcher active false so that a C++ client can later reactivate it?
744 throw ex; // Up to timer thread or back to user
745 }
746 }
747 catch (...) {
748 XmlBlasterException ex = XmlBlasterException(INTERNAL_UNKNOWN, ME, "flushQueueUnlocked(async sending mode): can't send queued message, reason is unknown");
749 log_.warn(ME, "flushQueueUnlocked(async sending mode): can't send queued message " + entry->toXml() + " to server because of unknown exception");
750 isHandled = sendingFailedNotification(entries, ex); // Notify client code
751 if (isHandled) {
752 if (log_.trace()) log_.trace(ME, "message is handled by user code, we remove it now from queue: " + entry->toXml());
753 }
754 else {
755 toDead(&ex);
756 // We should set dispatcher active false so that a C++ client can later reactivate it?
757 throw ex;
758 }
759 }
760 }
761 if (doRemove) {
762 //log_.trace(ME, "remove send message from client queue");
763 ret += queueToFlush->randomRemove(entries.begin(), entries.end());
764 }
765 if (!isHandled) {
766 I_PostSendListener *p = postSendListener_;
767 if (p) {
768 p->postSend(entries);
769 }
770 }
771 }
772 return ret;
773 }
774
775 I_Queue* ConnectionsHandler::getQueue()
776 {
777 if (!queue_) {
778 if (log_.trace()) log_.trace(ME+".getQueue", "creating the client queue ...");
779 queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
780 log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " +
781 lexical_cast<string>(queue_->getNumOfEntries()) + " entries.");
782 }
783 return queue_;
784 }
785
786 bool ConnectionsHandler::isFailsafe() const
787 {
788 if (connectQos_.isNull()) return false;
789 return connectQos_->getAddress()->getDelay() > 0;
790 }
791
792 // pinger or poller
793 bool ConnectionsHandler::startPinger(bool withInitialPing)
794 {
795 if (log_.call()) log_.call(ME, "startPinger");
796 if (doStopPing_) return false;
797
798 if (log_.trace()) log_.trace(ME, "startPinger (no request to stop the pinger is active for the moment)");
799 if (pingPollTimerKey_ != 0 && !withInitialPing) {
800 if (log_.trace()) log_.trace(ME, "startPinger: the pinger is already running. I will return without starting a new thread");
801 return false;
802 }
803
804 long delay = 10000;
805 long pingInterval = 0;
806 if (connectQos_.isNull()) {
807 ConnectQos tmp(global_);
808 delay = tmp.getAddress()->getDelay();
809 pingInterval = tmp.getAddress()->getPingInterval();
810 }
811 else {
812 delay = connectQos_->getAddress()->getDelay();
813 pingInterval = connectQos_->getAddress()->getPingInterval();
814 }
815 if (log_.trace()) {
816 log_.trace(ME, string("startPinger(status=") +
817 getStatusString() +
818 "): parameters are: delay '" + lexical_cast<std::string>(delay) +
819 "' and pingInterval '" + lexical_cast<std::string>(pingInterval) +
820 " withInitialPing=" + lexical_cast<string>(withInitialPing));
821 }
822 if (delay > 0 && pingInterval > 0) {
823 long delta = delay;
824 if (status_ == ALIVE) delta = pingInterval;
825 if (withInitialPing) delta = 400;
826 pingPollTimerKey_ = global_.getPingTimer().addOrRefreshTimeoutListener(this, delta, NULL, pingPollTimerKey_);
827 }
828 return true;
829 }
830
831 string ConnectionsHandler::getStatusString() const
832 {
833 if (status_ == ALIVE) return "ALIVE";
834 else if (status_ == POLLING) return "POLLING";
835 else if (status_ == DEAD) return "DEAD";
836 else if (status_ == START) return "START";
837 return "END";;
838 }
839
840
841 bool ConnectionsHandler::isConnected() const
842 {
843 return status_ == ALIVE || status_ == POLLING;
844 }
845
846 bool ConnectionsHandler::isAlive() const
847 {
848 return status_ == ALIVE;
849 }
850
851 bool ConnectionsHandler::isPolling() const
852 {
853 return status_ == POLLING;
854 }
855
856 bool ConnectionsHandler::isDead() const
857 {
858 return status_ == DEAD;
859 }
860
861 ConnectReturnQosRef ConnectionsHandler::connectRaw(const ConnectQosRef& connectQos)
862 {
863 if (log_.call()) log_.call(ME, "::connectRaw");
864 connectReturnQos_ = connection_->connect(connectQos);
865 connectQos_ = connectQos;
866 log_.info(ME, string("Successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'");
867 connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId());
868 return connectReturnQos_;
869 }
870
871
872 I_XmlBlasterConnection& ConnectionsHandler::getConnection() const
873 {
874 if (!connection_) {
875 throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::getConnection", "the connection is still NULL: it is not assigned yet. You probably called this method before a connection was made");
876 }
877 return *connection_;
878 }
879
880
881 ConnectReturnQosRef ConnectionsHandler::getConnectReturnQos()
882 {
883 return connectReturnQos_;
884 }
885
886 ConnectQosRef ConnectionsHandler::getConnectQos()
887 {
888 return connectReturnQos_; // contains everything and is typedef on ConnectQos
889 }
890
891 /*
892 void ConnectionsHandler::setConnectReturnQos(const connectReturnQos& retQos)
893 {
894 if (connectReturnQos_) {
895 delete connectReturnQos_;
896 connectReturnQos_ = NULL;
897 }
898 connectReturnQos_ = new ConnectReturnQos(retQos);
899 }
900 */
901
902 }}}} // namespaces
syntax highlighted by Code2HTML, v. 0.9.1