1 /*------------------------------------------------------------------------------
  2 Name:      SocketDriver.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   The client driver for the socket protocol
  6 ------------------------------------------------------------------------------*/
  7 #include <client/protocol/socket/SocketDriver.h>

  8 #include <util/ErrorCode.h>

  9 #include <util/XmlBlasterException.h>

 10 #include <util/Global.h>

 11 #include <util/lexical_cast.h>

 12 #include <XmlBlasterAccessUnparsed.h> // The C SOCKET client library

 13 #include <util/qos/ConnectQosFactory.h>

 14 #include <util/Properties.h>

 15 #include <string>

 16 #include <cstdarg> // va_start

 17 #include <cstdio> // vsnprintf for g++ 2.9x only

 18 #include <cstring> // memset()

 19 
 20 static void myLogger(void *logUserP, 
 21                      XMLBLASTER_LOG_LEVEL currLevel,
 22                      XMLBLASTER_LOG_LEVEL level,
 23                      const char *location, const char *fmt, ...);
 24 
 25 //static ::XmlBlasterNumReadFunc callbackProgressListener;  // what's wrong with this?

 26 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes);
 27 
 28 /**
 29  * Customized logging output is handled by this method. 
 30  * <p>
 31  * We register this function with 
 32  * </p>
 33  * <pre>
 34  * xa->log = myLogger;
 35  * </pre>
 36  * @param currLevel The actual log level of the client
 37  * @param level The level of this log entry
 38  * @param location A string describing the code place
 39  * @param fmt The formatting string
 40  * @param ... Other variables to log, corresponds to 'fmt'
 41  * @see xmlBlaster/src/c/msgUtil.c: xmlBlasterDefaultLogging() is the default
 42  *      implementation
 43  */
 44 static void myLogger(void *logUserP, 
 45                      XMLBLASTER_LOG_LEVEL currLevel,
 46                      XMLBLASTER_LOG_LEVEL level,
 47                      const char *location, const char *fmt, ...)
 48 {
 49    /* Guess we need no more than 200 bytes. */
 50    int n, size = 200;
 51    char *p = 0;
 52    va_list ap;
 53    org::xmlBlaster::client::protocol::socket::SocketDriver *sd =
 54          (org::xmlBlaster::client::protocol::socket::SocketDriver *)logUserP;
 55    org::xmlBlaster::util::I_Log& log = sd->getLog();
 56 
 57    if (level > currLevel) { /* XMLBLASTER_LOG_ERROR, XMLBLASTER_LOG_WARN, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_TRACE */
 58       return;
 59    }
 60    if ((p = (char *)malloc (size)) == NULL)
 61       return;
 62 
 63    for (;;) {
 64       /* Try to print in the allocated space. */
 65       va_start(ap, fmt);
 66       n = VSNPRINTF(p, size, fmt, ap); /* UNIX: vsnprintf(), WINDOWS: _vsnprintf() */
 67       va_end(ap);
 68       /* If that worked, print the string to console. */
 69       if (n > -1 && n < size) {
 70          if (level == XMLBLASTER_LOG_INFO)
 71             log.info(location, p);
 72          else if (level == XMLBLASTER_LOG_WARN)
 73             log.warn(location, p);
 74          else if (level == XMLBLASTER_LOG_ERROR)
 75             log.error(location, p);
 76          else
 77             log.trace(location, p);
 78          free(p);
 79          return;
 80       }
 81       /* Else try again with more space. */
 82       if (n > -1)    /* glibc 2.1 */
 83          size = n+1; /* precisely what is needed */
 84       else           /* glibc 2.0 */
 85          size *= 2;  /* twice the old size */
 86       if ((p = (char *)realloc (p, size)) == NULL) {
 87          return;
 88       }
 89    }
 90 }
 91 
 92 /**
 93  * Access the read socket progress. 
 94  * You need to register this function pointer if you want to see the progress of huge messages
 95  * on slow connections.
 96  */
 97 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes) {
 98    org::xmlBlaster::client::protocol::socket::SocketDriver *sd =
 99          (org::xmlBlaster::client::protocol::socket::SocketDriver *)userP;
100    //org::xmlBlaster::util::I_Log& log = sd->getLog();

101    //if (log.trace()) log.trace("SocketDriver", "Update data progress currBytesRead=" +

102    //                          org::xmlBlaster::util::lexical_cast<std::string>(currBytesRead) +

103    //                          " nbytes=" + org::xmlBlaster::util::lexical_cast<std::string>(nbytes));

104    if (sd != 0 && sd->progressListener_ != 0) {
105       sd->progressListener_->progress("", currBytesRead, nbytes);
106    }
107 }
108 
109 namespace org {
110  namespace xmlBlaster {
111   namespace client {
112    namespace protocol {
113     namespace socket {
114 
115 using namespace std;
116 using namespace org::xmlBlaster::util;
117 using namespace org::xmlBlaster::util::qos;
118 using namespace org::xmlBlaster::util::key;
119 using namespace org::xmlBlaster::util::thread;
120 using namespace org::xmlBlaster::client::protocol;
121 using namespace org::xmlBlaster::client::key;
122 using namespace org::xmlBlaster::client::qos;
123 
124 static XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData,
125                      ::ExceptionStruct *exception);
126 
127 void SocketDriver::freeResources(bool deleteConnection)
128 {
129    if (log_.call()) log_.call(ME, "freeResources("+lexical_cast<std::string>(deleteConnection)+") connection_=" + ((connection_==0)?"0":lexical_cast<std::string>(connection_)));
130    if (deleteConnection && connection_ != 0) {
131       freeXmlBlasterAccessUnparsed(connection_);
132       connection_ = 0;
133    }
134    if (deleteConnection && argsStructP_ != 0) {
135       global_.freeArgs(*argsStructP_);
136       delete argsStructP_;
137       argsStructP_ = 0;
138    }
139 }
140 
141 /*
142  Note on exception handling:
143  If we throw an exception, our master ConnectionsHandler.cpp will
144  catch it and to a shutdown() on us. This will cleanup the resources.
145  */
146 #define catch_MACRO(methodName, deleteConnection)                     \

147    catch(const XmlBlasterException *ex) {                             \
148       freeResources(deleteConnection);                                \
149       throw ex;                                                       \
150    }                                                                  \
151    catch(XmlBlasterException &ex) {                                   \
152       freeResources(deleteConnection);                                \
153       ex.setLocation(ME + string(methodName));                        \
154       throw ex;                                                       \
155    }                                                                  \
156    catch(const ::ExceptionStruct *ex) {                               \
157       freeResources(deleteConnection);                                \
158       org::xmlBlaster::util::XmlBlasterException xx = convertFromSocketException(*ex); \
159       delete ex;                                                      \
160       throw xx;                                                       \
161    }                                                                  \
162    catch(const ::ExceptionStruct &ex) {                               \
163       freeResources(deleteConnection);                                \
164       throw convertFromSocketException(ex);                           \
165    }                                                                  \
166    catch(const exception &ex) {                                       \
167       freeResources(deleteConnection);                                \
168       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
169                        loginName_, ME + string(methodName), "en", \
170                        global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
171                        string("type='exception', msg='")              \
172                         + ex.what() + "'");                           \
173    }                                                                  \
174    catch(const string &ex) {                                          \
175       freeResources(deleteConnection);                                \
176       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
177                        loginName_, ME + string(methodName), "en", \
178                        global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
179                        string("type='string', msg='") + ex + "'");    \
180    }                                                                  \
181    catch(const char *ex) {                                            \
182       freeResources(deleteConnection);                                \
183       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
184                        loginName_, ME + string(methodName), "en", \
185                        global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
186                        string("type='char*', msg='") + ex + "'");     \
187    }                                                                  \
188    catch(int ex) {                                                    \
189       freeResources(deleteConnection);                                \
190       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
191                        loginName_, ME + string(methodName), "en", \
192                        global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
193        string("type='int', msg='") + lexical_cast<std::string>(ex) + "'"); \
194    }                                                                  \
195    catch (...) {                                                      \
196       freeResources(deleteConnection);                                \
197       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
198                        loginName_, ME + string(methodName), "en", \
199                        global_.getVersion() + " " + global_.getBuildTimestamp());\
200    }
201 
202 SocketDriver::SocketDriver(const SocketDriver& socketDriver)
203    : mutex_(socketDriver.mutex_),
204      ME("SocketDriver"), 
205      argsStructP_(0),
206      global_(socketDriver.global_), 
207      log_(socketDriver.log_),
208      statusQosFactory_(socketDriver.global_), 
209      msgKeyFactory_(socketDriver.global_), 
210      msgQosFactory_(socketDriver.global_),
211      callbackClient_(0),
212      progressListener_(0)
213 {
214    // no instantiation of these since this should never be invoked (just to make it private)

215    connection_      = NULL;
216    argsStructP_ = new ArgsStruct_T;
217    //memset(argsStructP_, '\0', sizeof(ArgsStruct_T));

218    global_.fillArgs(*argsStructP_);
219    if (log_.call()) log_.call(ME, string("Copy constructor"));
220 }
221 
222 SocketDriver& SocketDriver::operator =(const SocketDriver& /*socketDriver*/)
223 {
224    if (log_.call()) log_.call(ME, "operator=()");
225    return *this;
226 }
227 
228 
229 SocketDriver::SocketDriver(Global& global, const string instanceName)
230    : mutex_(),
231      instanceName_(instanceName),
232      connection_(NULL),
233      ME(string("SocketDriver-") + instanceName), 
234      argsStructP_(0),
235      global_(global), 
236      log_(global.getLog("org.xmlBlaster.client.protocol.socket")),
237      statusQosFactory_(global),
238      msgKeyFactory_(global),
239      msgQosFactory_(global),
240      callbackClient_(0),
241      progressListener_(0)
242 {
243    if (log_.call()) log_.call("SocketDriver", string("getInstance for ") + instanceName);
244 
245    argsStructP_ = new ArgsStruct_T;
246    if (!global_.getProperty().propertyExists("logLevel")) {
247       if (log_.trace() || log_.call())
248          global_.getProperty().setProperty("logLevel", "TRACE");
249       else if (log_.dump())
250          global_.getProperty().setProperty("logLevel", "DUMP");
251    }
252 
253    global_.fillArgs(*argsStructP_);
254    try {
255       connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv);
256       if (connection_) {
257          connection_->userObject = this; // Transports us to the myUpdate() method

258          connection_->log = myLogger;    // Register our own logging function

259          connection_->logUserP = this;   // Pass ourself to myLogger()

260          if (log_.dump()) {
261             log_.dump(ME, "C properties:");
262             ::dumpProperties(connection_->props);
263          }
264       }
265       else {
266          log_.error(ME, "Allocation of C SOCKET library failed");
267       }
268    } catch_MACRO("::Constructor", true)
269 }
270 
271 /**
272  * Called on polling, must be synchronized from outside,
273  * throws an exception on failure
274  */
275 void SocketDriver::reconnectOnIpLevel(void)
276 {
277    log_.trace(ME, "Trying to reconnect to server");
278 
279    freeResources(true); // Cleanup if old connection exists

280 
281    // Give a chance to new configuration settings

282    if (argsStructP_ != 0) {
283       global_.freeArgs(*argsStructP_);
284       delete argsStructP_;
285       argsStructP_ = 0;
286    }
287    argsStructP_ = new ArgsStruct_T;
288    global_.fillArgs(*argsStructP_);
289 
290    ::ExceptionStruct socketException;
291 
292    try {
293       connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv);
294       connection_->userObject = this; // Transports us to the myUpdate() method

295       connection_->log = myLogger;    // Register our own logging function

296       connection_->logUserP = this;   // Pass SocketDriver to myLogger()

297    } catch_MACRO("::Constructor", true)
298    
299    try {
300       if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
301       if (connection_->initialize(connection_, myUpdate, &socketException) == false) {
302          if (log_.trace()) log_.trace(ME, string("Reconnection to xmlBlaster failed, please start the server or check your network: ") + socketException.message);
303          throw socketException;
304       }
305       registerProgressListener(this->progressListener_); // Re-register

306       if (log_.trace()) log_.trace(ME, "After createCallbackServer");
307    } catch_MACRO("::initialize", true)
308 }
309 
310 SocketDriver::~SocketDriver()
311 {
312    if (log_.call()) log_.call(ME, "~SocketDriver()");
313    try {
314       freeResources(true);
315    }
316    catch (...) {
317       log_.error(ME, "Unexpected catch in ~SocketDriver()");
318    }
319 }
320 
321 XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData,
322                      ::ExceptionStruct *exception)
323 {
324    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
325    SocketDriver* socketDriver = static_cast<SocketDriver*>(xa->userObject);
326    Global& global = socketDriver->getGlobal();
327    I_Log& log = socketDriver->getLog();
328    const string &ME = socketDriver->me();
329 
330    try {
331       for (size_t i=0; i<msgUnitArr->len; i++) {
332          //char *xml = messageUnitToXml(&msgUnitArr->msgUnitArr[i]);

333          //printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n",xml);

334                         //xmlBlasterFree(xml);

335          if (log.trace()) log.trace(ME, "Received callback message");
336          ::MsgUnit& msgUnit = msgUnitArr->msgUnitArr[i];
337          I_Callback* cb = socketDriver->getCallbackClient();
338          if (cb != 0) {
339             UpdateKey updateKey(global, socketDriver->getMsgKeyFactory().readObject(string(msgUnit.key)));
340             UpdateQos updateQos(global, socketDriver->getMsgQosFactory().readObject(string(msgUnit.qos)));
341             std::string retQos = cb->update(msgUnitArr->secretSessionId,
342                           updateKey, (const unsigned char*)msgUnit.content,
343                           msgUnit.contentLen, updateQos);
344             msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(retQos.c_str());
345          }
346          else { /* Return QoS: Everything is OK */
347             log.error(ME, string("Ignoring unexpected update message as client has not registered a callback: ") + msgUnit.key);
348             msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(Constants::RET_OK); // "<qos><state id='OK'/></qos>");

349          }
350       }
351       //throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "TEST THROWING EXCEPTION");

352    } 
353    catch (XmlBlasterException &e) {
354       string tmp = "Exception caught in C++ update(), " +
355                    lexical_cast<std::string>(msgUnitArr->len) +
356                    " messages are handled as not delivered: " +
357                    e.getMessage();
358       log.error(ME, tmp);
359       for (size_t i=0; i<msgUnitArr->len; i++) {
360          char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
361          log.error(ME, xml);
362          xmlBlasterFree(xml);
363       }
364       strncpy0(exception->errorCode, e.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN);
365       strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
366       return (XMLBLASTER_C_bool)0;
367    }
368    catch(...) {
369       string tmp = "Unidentified exception caught in C++ update(), " + lexical_cast<std::string>(msgUnitArr->len) + " messages are handled as not delivered";
370       log.error(ME, tmp);
371       for (size_t i=0; i<msgUnitArr->len; i++) {
372          char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
373          log.error(ME, xml);
374          xmlBlasterFree(xml);
375       }
376       strncpy0(exception->errorCode, "user.update.error", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
377       strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
378       return (XMLBLASTER_C_bool)0;
379    }
380    return (XMLBLASTER_C_bool)1;
381 }
382 
383 I_Callback* SocketDriver::getCallbackClient()
384 {
385    return callbackClient_;
386 }
387 
388 /** Enforced by I_CallbackServer */
389 void SocketDriver::initialize(const string& name, I_Callback &client)
390 {
391    ::ExceptionStruct socketException;
392    ME = string("SocketDriver-") + instanceName_ + "-" + name;
393    if (log_.call()) log_.call(ME, "initialize() callback server");
394    callbackClient_ = &client;
395    Lock lock(mutex_);
396    if (connection_ == 0) {
397       if (log_.trace()) log_.trace(ME, "ERROR: connection_ is null");
398       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, name, ME + ".initialize", "en",
399                        global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL");
400    }
401    try {
402       if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
403       if (connection_->initialize(connection_, myUpdate, &socketException) == false) {
404          log_.warn(ME, "Connection to xmlBlaster failed,"
405                 " please start the server or check your configuration\n");
406          freeResources(true);
407       }
408       if (log_.trace()) log_.trace(ME, "After createCallbackServer");
409    } catch_MACRO("::initialize", true)
410 }
411 
412 string SocketDriver::getCbProtocol()
413 {
414     return Constants::SOCKET; // "SOCKET";

415 }                             
416 
417 string SocketDriver::getCbAddress()
418 {
419    Lock lock(mutex_);
420    if (connection_ == 0 || connection_->callbackP == 0) {
421       return string("socket://:");
422    }
423    try {
424       return string("socket://") + string(connection_->callbackP->hostCB) + ":" +
425              lexical_cast<std::string>(connection_->callbackP->portCB);
426    } catch_MACRO("::getCbAddress", false)
427 }
428 
429 bool SocketDriver::shutdownCb()
430 {
431    Lock lock(mutex_);
432    if (connection_ == 0 || connection_->callbackP == 0) return false;
433    connection_->callbackP->shutdown(connection_->callbackP);
434    return true;
435 }
436 
437 ConnectReturnQosRef SocketDriver::connect(const ConnectQosRef& qos) //throw (XmlBlasterException) // Visual C++ emits a warning with this throw clause

438 {
439    if (log_.call()) log_.call(ME, string("connect() ") + string((connection_==0)?"connection_==0":"connection_!=0") +
440                               ", secretSessionId_="+secretSessionId_);
441                               //+" isConnected=" + ((connection_==0)?XMLBLASTER_FALSE:lexical_cast<string>(connection_->isConnected(connection_))));

442    ::ExceptionStruct socketException;
443    Lock lock(mutex_);
444    try {
445       loginName_ = qos->getUserId();
446      if ( connection_ != 0 && !connection_->isConnected(connection_)) {
447          freeResources(true);
448      }
449       if (connection_ == 0) {
450          reconnectOnIpLevel(); // Connects on IP level only, throws an exception on failure

451          if (secretSessionId_ != "") {
452             qos->getSessionQos().setSecretSessionId(secretSessionId_);
453          }
454          if (connection_ != 0 && connection_->callbackP != 0) {
455             ConnectQos *qq = const_cast<ConnectQos*>(&(*qos));
456             if (qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->getType() == Constants::SOCKET) {
457                // Force callback address, it could have changed on reconnect (checked to cb not be a delegate)

458                string addr = string("socket://") + string(connection_->callbackP->hostCB) + ":" +
459                       lexical_cast<std::string>(connection_->callbackP->portCB);
460                qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->setAddress(addr);
461                log_.trace(ME, "Setting callback address to " + addr);
462             }
463          }
464       }
465 
466       char *retQos = connection_->connect(connection_, qos->toXml().c_str(),
467                                           myUpdate, &socketException);
468       if (*socketException.errorCode != 0) {
469          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO

470       }
471       ConnectQosFactory factory(global_);
472       ConnectReturnQosRef connectReturnQos = factory.readObject(retQos);
473       xmlBlasterFree(retQos);
474       secretSessionId_ = connectReturnQos->getSecretSessionId();
475       return connectReturnQos;
476    } catch_MACRO("::connect", false)
477 }
478 
479 bool SocketDriver::disconnect(const DisconnectQos& qos)
480 {
481    if (log_.call()) log_.call(ME, "disconnect()");
482    if (connection_ == 0) return false;
483    ::ExceptionStruct socketException;
484    Lock lock(mutex_);
485    try {
486       bool ret = connection_->disconnect(connection_, qos.toXml().c_str(), &socketException);
487       if (*socketException.errorCode != 0) {
488          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO

489       }
490       return ret;
491    } catch_MACRO("::disconnect", false)
492    return true;
493 }
494 
495 string SocketDriver::getProtocol()
496 {
497    return Constants::SOCKET; // "SOCKET";

498 }
499 
500 /** Called when going to POLLING mode */
501 bool SocketDriver::shutdown()
502 {
503    if (log_.call()) log_.call(ME, "shutdown()");
504    Lock lock(mutex_);
505    if (connection_ == 0) return false;
506    freeResources(true);
507    return true;
508 }
509 
510 string SocketDriver::getLoginName()
511 {
512    return loginName_;
513 }
514 
515 bool SocketDriver::isLoggedIn()
516 {
517    Lock lock(mutex_);
518    return connection_ != 0 && connection_->isConnected(connection_);
519 }
520 
521 string SocketDriver::ping(const string& qos)
522 {
523    ::ExceptionStruct socketException;
524    Lock lock(mutex_);
525    if (connection_ == 0) {
526       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, "", ME + ".ping", "en",
527                        global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL");
528    }
529    try {
530       char *retQosP = connection_->ping(connection_, qos.c_str(), &socketException);
531       if (retQosP == 0 || *socketException.errorCode != 0) {
532          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO

533       }
534       string retQos(retQosP);
535       xmlBlasterFree(retQosP);
536       return retQos;
537    } catch_MACRO("::ping", false)
538 }
539 
540 SubscribeReturnQos SocketDriver::subscribe(const SubscribeKey& key, const SubscribeQos& qos)
541 {
542    ::ExceptionStruct socketException;
543    Lock lock(mutex_);
544    if (connection_ == 0) {
545       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
546    }
547    try {
548       char *response = connection_->subscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
549       if (*socketException.errorCode != 0) {
550          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO

551       }
552       SubscribeReturnQos subscribeReturnQos(global_, statusQosFactory_.readObject(response));
553       xmlBlasterFree(response);
554       return subscribeReturnQos;
555    } catch_MACRO("::subscribe", false)
556 }
557 
558 vector<MessageUnit> SocketDriver::get(const GetKey& getKey, const GetQos& getQos)
559 {
560    ::ExceptionStruct socketException;
561    Lock lock(mutex_);
562    if (connection_ == 0) {
563       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
564    }
565    try {
566       MsgUnitArr *msgUnitArr;  // The returned C struct array

567       string key = getKey.toXml();
568       string qos = getQos.toXml();
569       msgUnitArr = connection_->get(connection_, key.c_str(), qos.c_str(), &socketException);
570       if (*socketException.errorCode != 0) {
571          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO

572       }
573       if (msgUnitArr != (MsgUnitArr *)0) {
574          vector<MessageUnit> ret;
575          for (size_t i=0; i<msgUnitArr->len; i++) {
576             MsgKeyData msgKeyData = msgKeyFactory_.readObject(string(msgUnitArr->msgUnitArr[i].key));
577             MsgQosData msgQosData = msgQosFactory_.readObject(string(msgUnitArr->msgUnitArr[i].qos));
578             MessageUnit messageUnit(msgKeyData,
579                          msgUnitArr->msgUnitArr[i].contentLen,
580                          (const unsigned char*)msgUnitArr->msgUnitArr[i].content,
581                          msgQosData);
582             ret.insert(ret.end(),  messageUnit);
583          }
584          freeMsgUnitArr(msgUnitArr);
585          return ret;
586       }
587    } catch_MACRO("::get", false)
588    return vector<MessageUnit>();
589 }
590 
591 vector<UnSubscribeReturnQos>
592 SocketDriver::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
593 {
594    ::ExceptionStruct socketException;
595    Lock lock(mutex_);
596    if (connection_ == 0) {
597       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
598    }
599    try {
600       QosArr* retC = connection_->unSubscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
601       if (*socketException.errorCode != 0) {
602          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO

603       }
604       vector<UnSubscribeReturnQos> ret;
605       for (size_t ii=0; ii<retC->len; ii++) {
606          ret.insert(ret.end(),  UnSubscribeReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])));
607       }
608       freeQosArr(retC);
609       return ret;
610    } catch_MACRO("::unSubscribe", false)
611    return vector<UnSubscribeReturnQos>();
612 }
613 
614 PublishReturnQos SocketDriver::publish(const MessageUnit& msgUnit)
615 {
616    ::ExceptionStruct socketException;
617    Lock lock(mutex_);
618    if (connection_ == 0) {
619       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
620    }
621    try {
622       if (log_.call()) log_.call(ME, "publish");
623       ::MsgUnit msgUnitC;
624       const string key = msgUnit.getKey().toXml();
625       msgUnitC.key = key.c_str();
626       msgUnitC.content = reinterpret_cast<const char *>(msgUnit.getContent());
627       msgUnitC.contentLen = msgUnit.getContentLen();
628       const string qos = msgUnit.getQos().toXml();
629       msgUnitC.qos = qos.c_str();
630 
631       char* response = connection_->publish(connection_, &msgUnitC, &socketException);
632 
633       if (*socketException.errorCode != 0) {
634          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO

635       }
636 
637       //freeMsgUnitData(&msgUnitC); -> not needed as it contains pointers only

638       if (log_.trace()) log_.trace(ME, "successfully published");
639       PublishReturnQos publishReturnQos(global_, statusQosFactory_.readObject(response));
640       xmlBlasterFree(response);
641       return publishReturnQos;
642    } catch_MACRO("::publish", false)
643 }
644 
645 void SocketDriver::publishOneway(const vector<MessageUnit> &msgUnitArr)
646 {
647    ::ExceptionStruct socketException;
648    Lock lock(mutex_);
649    if (connection_ == 0) {
650       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
651    }
652    try {
653 
654       // Copy C++ MessageUnit to C MsgUnit

655       ::MsgUnitArr msgUnitArrC;
656       vector<MessageUnit>::const_iterator iter;
657       memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr));
658       msgUnitArrC.len = msgUnitArr.size();
659       msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit));
660       size_t ii=0;
661       vector<string> keyArr;  // We need to hold key/qos on the stack because toXml() returns a temporary string

662       vector<string> qosArr;
663       for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) {
664          //log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len));

665          const MessageUnit& msgUnitCpp = *iter;
666          ::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii];
667          keyArr.push_back(msgUnitCpp.getKey().toXml());
668          msgUnitC.key = keyArr[ii].c_str();
669          qosArr.push_back(msgUnitCpp.getQos().toXml());
670          msgUnitC.qos = qosArr[ii].c_str();
671          msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen();
672          msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent());
673          ii++;
674       }
675 
676       connection_->publishOneway(connection_, &msgUnitArrC, &socketException);
677 
678       ::free(msgUnitArrC.msgUnitArr);
679 
680       if (*socketException.errorCode != 0) {
681          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO

682       }
683    } catch_MACRO("::publishOneway", false)
684 }
685 
686 vector<PublishReturnQos> SocketDriver::publishArr(const vector<MessageUnit> &msgUnitArr)
687 {
688    ::ExceptionStruct socketException;
689    Lock lock(mutex_);
690    if (connection_ == 0) {
691       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
692    }
693    try {
694 
695       // Copy C++ MessageUnit to C MsgUnit

696       ::MsgUnitArr msgUnitArrC;
697       vector<MessageUnit>::const_iterator iter;
698       memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr));
699       msgUnitArrC.len = msgUnitArr.size();
700       msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit));
701       size_t ii=0;
702       vector<string> keyArr;  // We need to hold key/qos on the stack because toXml() returns a temporary string

703       vector<string> qosArr;
704       for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) {
705          //log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len));

706          const MessageUnit& msgUnitCpp = *iter;
707          ::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii];
708          keyArr.push_back(msgUnitCpp.getKey().toXml());
709          msgUnitC.key = keyArr[ii].c_str();
710          qosArr.push_back(msgUnitCpp.getQos().toXml());
711          msgUnitC.qos = qosArr[ii].c_str();
712          msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen();
713          msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent());
714          ii++;
715       }
716 
717       QosArr* retC = connection_->publishArr(connection_, &msgUnitArrC, &socketException);
718 
719       ::free(msgUnitArrC.msgUnitArr);
720 
721       if (*socketException.errorCode != 0) {
722          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO

723       }
724       vector<PublishReturnQos> ret;
725       for (size_t jj=0; jj<retC->len; jj++) {
726          ret.insert(ret.end(),  PublishReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[jj])) );
727       }
728       freeQosArr(retC);
729       return ret;
730    } catch_MACRO("::publishArr", false)
731    return vector<PublishReturnQos>();
732 }
733 
734 vector<EraseReturnQos> SocketDriver::erase(const EraseKey& key, const EraseQos& qos)
735 {
736    ::ExceptionStruct socketException;
737    Lock lock(mutex_);
738    if (connection_ == 0) {
739       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
740    }
741    try {
742       QosArr* retC = connection_->erase(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
743       if (*socketException.errorCode != 0) {
744          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO

745       }
746       vector<EraseReturnQos> ret;
747       for (size_t ii=0; ii<retC->len; ii++) {
748          ret.insert(ret.end(),  EraseReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])) );
749       }
750       freeQosArr(retC);
751       return ret;
752    } catch_MACRO("::erase", false)
753    return vector<EraseReturnQos>();
754 }
755 
756 I_ProgressListener* SocketDriver::registerProgressListener(I_ProgressListener *listener) {
757    I_ProgressListener *old = this->progressListener_;
758    this->progressListener_ = listener;
759    if (connection_ && connection_->callbackP != 0) {
760       connection_->callbackP->readFromSocket.numReadUserP = this;
761       if (this->progressListener_ && connection_->callbackP != 0) {
762          connection_->callbackP->readFromSocket.numReadFuncP = callbackProgressListener;
763       }
764       else {
765          connection_->callbackP->readFromSocket.numReadFuncP = 0; // Dangerous: not thread safe, TODO: Add a mutex

766       }
767    }
768    return old;
769 }
770 
771 string SocketDriver::usage()
772 {
773    char usage[XMLBLASTER_MAX_USAGE_LEN];
774    ::xmlBlasterAccessUnparsedUsage(usage);
775    return  "\nThe SOCKET plugin configuration:" +
776            string(usage);
777 }
778 
779 // Exception conversion ....

780 org::xmlBlaster::util::XmlBlasterException SocketDriver::convertFromSocketException(const ::ExceptionStruct& ex) const
781 {
782    return org::xmlBlaster::util::XmlBlasterException(
783             (*ex.errorCode=='\0')?string("internal.unknown"):string(ex.errorCode),
784             string(""),
785             ME,
786             "en",
787             string(ex.message),
788             global_.getVersion() + " " + global_.getBuildTimestamp());
789             // TODO: isServerSide!!!

790 }
791 
792 
793 ::ExceptionStruct SocketDriver::convertToSocketException(org::xmlBlaster::util::XmlBlasterException& ex)
794 {
795    ::ExceptionStruct exSocket;
796    ::initializeXmlBlasterException(&exSocket);
797    strncpy0(exSocket.errorCode, ex.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN);
798    strncpy0(exSocket.message, ex.getMessage().c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
799    //exSocket.remote = ??

800    return exSocket;
801 }
802 
803 }}}}} // namespaces

804 


syntax highlighted by Code2HTML, v. 0.9.1