client/protocol/socket/SocketDriver.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      SocketDriver.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   The client driver for the socket protocol
00006 ------------------------------------------------------------------------------*/
00007 #include <client/protocol/socket/SocketDriver.h>
00008 #include <util/ErrorCode.h>
00009 #include <util/XmlBlasterException.h>
00010 #include <util/Global.h>
00011 #include <util/lexical_cast.h>
00012 #include <XmlBlasterAccessUnparsed.h> // The C SOCKET client library
00013 #include <util/qos/ConnectQosFactory.h>
00014 #include <util/Properties.h>
00015 #include <string>
00016 #include <stdarg.h> // va_start
00017 #include <stdio.h> // vsnprintf for g++ 2.9x only
00018 
00019 static void myLogger(void *logUserP, 
00020                      XMLBLASTER_LOG_LEVEL currLevel,
00021                      XMLBLASTER_LOG_LEVEL level,
00022                      const char *location, const char *fmt, ...);
00023 
00024 //static ::XmlBlasterNumReadFunc callbackProgressListener;  // what's wrong with this?
00025 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes);
00026 
00043 static void myLogger(void *logUserP, 
00044                      XMLBLASTER_LOG_LEVEL currLevel,
00045                      XMLBLASTER_LOG_LEVEL level,
00046                      const char *location, const char *fmt, ...)
00047 {
00048    /* Guess we need no more than 200 bytes. */
00049    int n, size = 200;
00050    char *p = 0;
00051    va_list ap;
00052    org::xmlBlaster::client::protocol::socket::SocketDriver *sd =
00053          (org::xmlBlaster::client::protocol::socket::SocketDriver *)logUserP;
00054    org::xmlBlaster::util::I_Log& log = sd->getLog();
00055 
00056    if (level > currLevel) { /* XMLBLASTER_LOG_ERROR, XMLBLASTER_LOG_WARN, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_TRACE */
00057       return;
00058    }
00059    if ((p = (char *)malloc (size)) == NULL)
00060       return;
00061 
00062    for (;;) {
00063       /* Try to print in the allocated space. */
00064       va_start(ap, fmt);
00065       n = VSNPRINTF(p, size, fmt, ap); /* UNIX: vsnprintf(), WINDOWS: _vsnprintf() */
00066       va_end(ap);
00067       /* If that worked, print the string to console. */
00068       if (n > -1 && n < size) {
00069          if (level == XMLBLASTER_LOG_INFO)
00070             log.info(location, p);
00071          else if (level == XMLBLASTER_LOG_WARN)
00072             log.warn(location, p);
00073          else if (level == XMLBLASTER_LOG_ERROR)
00074             log.error(location, p);
00075          else
00076             log.trace(location, p);
00077          free(p);
00078          return;
00079       }
00080       /* Else try again with more space. */
00081       if (n > -1)    /* glibc 2.1 */
00082          size = n+1; /* precisely what is needed */
00083       else           /* glibc 2.0 */
00084          size *= 2;  /* twice the old size */
00085       if ((p = (char *)realloc (p, size)) == NULL) {
00086          return;
00087       }
00088    }
00089 }
00090 
00096 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes) {
00097    org::xmlBlaster::client::protocol::socket::SocketDriver *sd =
00098          (org::xmlBlaster::client::protocol::socket::SocketDriver *)userP;
00099    //org::xmlBlaster::util::I_Log& log = sd->getLog();
00100    //if (log.trace()) log.trace("SocketDriver", "Update data progress currBytesRead=" +
00101    //                          org::xmlBlaster::util::lexical_cast<std::string>(currBytesRead) +
00102    //                          " nbytes=" + org::xmlBlaster::util::lexical_cast<std::string>(nbytes));
00103    if (sd != 0 && sd->progressListener_ != 0) {
00104       sd->progressListener_->progress("", currBytesRead, nbytes);
00105    }
00106 }
00107 
00108 namespace org {
00109  namespace xmlBlaster {
00110   namespace client {
00111    namespace protocol {
00112     namespace socket {
00113 
00114 using namespace std;
00115 using namespace org::xmlBlaster::util;
00116 using namespace org::xmlBlaster::util::qos;
00117 using namespace org::xmlBlaster::util::key;
00118 using namespace org::xmlBlaster::util::thread;
00119 using namespace org::xmlBlaster::client::protocol;
00120 using namespace org::xmlBlaster::client::key;
00121 using namespace org::xmlBlaster::client::qos;
00122 
00123 static XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData,
00124                      ::ExceptionStruct *exception);
00125 
00126 void SocketDriver::freeResources(bool deleteConnection)
00127 {
00128    if (log_.call()) log_.call(ME, "freeResources("+lexical_cast<std::string>(deleteConnection)+") connection_=" + ((connection_==0)?"0":lexical_cast<std::string>(connection_)));
00129    if (deleteConnection && connection_ != 0) {
00130       freeXmlBlasterAccessUnparsed(connection_);
00131       connection_ = 0;
00132    }
00133    if (deleteConnection && argsStructP_ != 0) {
00134       global_.freeArgs(*argsStructP_);
00135       delete argsStructP_;
00136       argsStructP_ = 0;
00137    }
00138 }
00139 
00140 /*
00141  Note on exception handling:
00142  If we throw an exception, our master ConnectionsHandler.cpp will
00143  catch it and to a shutdown() on us. This will cleanup the resources.
00144  */
00145 #define catch_MACRO(methodName, deleteConnection)                     \
00146    catch(const XmlBlasterException *ex) {                             \
00147       freeResources(deleteConnection);                                \
00148       throw ex;                                                       \
00149    }                                                                  \
00150    catch(XmlBlasterException &ex) {                                   \
00151       freeResources(deleteConnection);                                \
00152       ex.setLocation(ME + string(methodName));                        \
00153       throw ex;                                                       \
00154    }                                                                  \
00155    catch(const ::ExceptionStruct *ex) {                               \
00156       freeResources(deleteConnection);                                \
00157       org::xmlBlaster::util::XmlBlasterException xx = convertFromSocketException(*ex); \
00158       delete ex;                                                      \
00159       throw xx;                                                       \
00160    }                                                                  \
00161    catch(const ::ExceptionStruct &ex) {                               \
00162       freeResources(deleteConnection);                                \
00163       throw convertFromSocketException(ex);                           \
00164    }                                                                  \
00165    catch(const exception &ex) {                                       \
00166       freeResources(deleteConnection);                                \
00167       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
00168                        loginName_, ME + string(methodName), "en", \
00169                        global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
00170                        string("type='exception', msg='")              \
00171                         + ex.what() + "'");                           \
00172    }                                                                  \
00173    catch(const string &ex) {                                          \
00174       freeResources(deleteConnection);                                \
00175       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
00176                        loginName_, ME + string(methodName), "en", \
00177                        global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
00178                        string("type='string', msg='") + ex + "'");    \
00179    }                                                                  \
00180    catch(const char *ex) {                                            \
00181       freeResources(deleteConnection);                                \
00182       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
00183                        loginName_, ME + string(methodName), "en", \
00184                        global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
00185                        string("type='char*', msg='") + ex + "'");     \
00186    }                                                                  \
00187    catch(int ex) {                                                    \
00188       freeResources(deleteConnection);                                \
00189       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
00190                        loginName_, ME + string(methodName), "en", \
00191                        global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
00192        string("type='int', msg='") + lexical_cast<std::string>(ex) + "'"); \
00193    }                                                                  \
00194    catch (...) {                                                      \
00195       freeResources(deleteConnection);                                \
00196       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
00197                        loginName_, ME + string(methodName), "en", \
00198                        global_.getVersion() + " " + global_.getBuildTimestamp());\
00199    }
00200 
00201 SocketDriver::SocketDriver(const SocketDriver& socketDriver)
00202    : mutex_(socketDriver.mutex_),
00203      ME("SocketDriver"), 
00204      argsStructP_(0),
00205      global_(socketDriver.global_), 
00206      log_(socketDriver.log_),
00207      statusQosFactory_(socketDriver.global_), 
00208      msgKeyFactory_(socketDriver.global_), 
00209      msgQosFactory_(socketDriver.global_),
00210      callbackClient_(0),
00211      progressListener_(0)
00212 {
00213    // no instantiation of these since this should never be invoked (just to make it private)
00214    connection_      = NULL;
00215    argsStructP_ = new ArgsStruct_T;
00216    //memset(argsStructP_, '\0', sizeof(ArgsStruct_T));
00217    global_.fillArgs(*argsStructP_);
00218    if (log_.call()) log_.call(ME, string("Copy constructor"));
00219 }
00220 
00221 SocketDriver& SocketDriver::operator =(const SocketDriver& /*socketDriver*/)
00222 {
00223    if (log_.call()) log_.call(ME, "operator=()");
00224    return *this;
00225 }
00226 
00227 
00228 SocketDriver::SocketDriver(Global& global, const string instanceName)
00229    : mutex_(),
00230      instanceName_(instanceName),
00231      connection_(NULL),
00232      ME(string("SocketDriver-") + instanceName), 
00233      argsStructP_(0),
00234      global_(global), 
00235      log_(global.getLog("org.xmlBlaster.client.protocol.socket")),
00236      statusQosFactory_(global),
00237      msgKeyFactory_(global),
00238      msgQosFactory_(global),
00239      callbackClient_(0),
00240      progressListener_(0)
00241 {
00242    if (log_.call()) log_.call("SocketDriver", string("getInstance for ") + instanceName);
00243 
00244    argsStructP_ = new ArgsStruct_T;
00245    if (!global_.getProperty().propertyExists("logLevel")) {
00246       if (log_.trace() || log_.call())
00247          global_.getProperty().setProperty("logLevel", "TRACE");
00248       else if (log_.dump())
00249          global_.getProperty().setProperty("logLevel", "DUMP");
00250    }
00251 
00252    global_.fillArgs(*argsStructP_);
00253    try {
00254       connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv);
00255       if (connection_) {
00256          connection_->userObject = this; // Transports us to the myUpdate() method
00257          connection_->log = myLogger;    // Register our own logging function
00258          connection_->logUserP = this;   // Pass ourself to myLogger()
00259          if (log_.dump()) {
00260             log_.dump(ME, "C properties:");
00261             ::dumpProperties(connection_->props);
00262          }
00263       }
00264       else {
00265          log_.error(ME, "Allocation of C SOCKET library failed");
00266       }
00267    } catch_MACRO("::Constructor", true)
00268 }
00269 
00274 void SocketDriver::reconnectOnIpLevel(void)
00275 {
00276    log_.trace(ME, "Trying to reconnect to server");
00277 
00278    freeResources(true); // Cleanup if old connection exists
00279 
00280    // Give a chance to new configuration settings
00281    if (argsStructP_ != 0) {
00282       global_.freeArgs(*argsStructP_);
00283       delete argsStructP_;
00284       argsStructP_ = 0;
00285    }
00286    argsStructP_ = new ArgsStruct_T;
00287    global_.fillArgs(*argsStructP_);
00288 
00289    ::ExceptionStruct socketException;
00290 
00291    try {
00292       connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv);
00293       connection_->userObject = this; // Transports us to the myUpdate() method
00294       connection_->log = myLogger;    // Register our own logging function
00295       connection_->logUserP = this;   // Pass SocketDriver to myLogger()
00296    } catch_MACRO("::Constructor", true)
00297    
00298    try {
00299       if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
00300       if (connection_->initialize(connection_, myUpdate, &socketException) == false) {
00301          if (log_.trace()) log_.trace(ME, string("Reconnection to xmlBlaster failed, please start the server or check your network: ") + socketException.message);
00302          throw socketException;
00303       }
00304       registerProgressListener(this->progressListener_); // Re-register
00305       if (log_.trace()) log_.trace(ME, "After createCallbackServer");
00306    } catch_MACRO("::initialize", true)
00307 }
00308 
00309 SocketDriver::~SocketDriver()
00310 {
00311    if (log_.call()) log_.call(ME, "~SocketDriver()");
00312    try {
00313       freeResources(true);
00314    }
00315    catch (...) {
00316       log_.error(ME, "Unexpected catch in ~SocketDriver()");
00317    }
00318 }
00319 
00320 XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData,
00321                      ::ExceptionStruct *exception)
00322 {
00323    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
00324    SocketDriver* socketDriver = static_cast<SocketDriver*>(xa->userObject);
00325    Global& global = socketDriver->getGlobal();
00326    I_Log& log = socketDriver->getLog();
00327    const string &ME = socketDriver->me();
00328 
00329    try {
00330       for (size_t i=0; i<msgUnitArr->len; i++) {
00331          //char *xml = messageUnitToXml(&msgUnitArr->msgUnitArr[i]);
00332          //printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n",xml);
00333                         //xmlBlasterFree(xml);
00334          if (log.trace()) log.trace(ME, "Received callback message");
00335          ::MsgUnit& msgUnit = msgUnitArr->msgUnitArr[i];
00336          I_Callback* cb = socketDriver->getCallbackClient();
00337          if (cb != 0) {
00338             UpdateKey updateKey(global, socketDriver->getMsgKeyFactory().readObject(string(msgUnit.key)));
00339             UpdateQos updateQos(global, socketDriver->getMsgQosFactory().readObject(string(msgUnit.qos)));
00340             std::string retQos = cb->update(msgUnitArr->secretSessionId,
00341                           updateKey, (const unsigned char*)msgUnit.content,
00342                           msgUnit.contentLen, updateQos);
00343             msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(retQos.c_str());
00344          }
00345          else { /* Return QoS: Everything is OK */
00346             log.error(ME, string("Ignoring unexpected update message as client has not registered a callback: ") + msgUnit.key);
00347             msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(Constants::RET_OK); // "<qos><state id='OK'/></qos>");
00348          }
00349       }
00350       //throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "TEST THROWING EXCEPTION");
00351    } 
00352    catch (XmlBlasterException &e) {
00353       string tmp = "Exception caught in C++ update(), " +
00354                    lexical_cast<std::string>(msgUnitArr->len) +
00355                    " messages are handled as not delivered: " +
00356                    e.getMessage();
00357       log.error(ME, tmp);
00358       for (size_t i=0; i<msgUnitArr->len; i++) {
00359          char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
00360          log.error(ME, xml);
00361          xmlBlasterFree(xml);
00362       }
00363       strncpy0(exception->errorCode, e.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00364       strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
00365       return (XMLBLASTER_C_bool)0;
00366    }
00367    catch(...) {
00368       string tmp = "Unidentified exception caught in C++ update(), " + lexical_cast<std::string>(msgUnitArr->len) + " messages are handled as not delivered";
00369       log.error(ME, tmp);
00370       for (size_t i=0; i<msgUnitArr->len; i++) {
00371          char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
00372          log.error(ME, xml);
00373          xmlBlasterFree(xml);
00374       }
00375       strncpy0(exception->errorCode, "user.update.error", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00376       strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
00377       return (XMLBLASTER_C_bool)0;
00378    }
00379    return (XMLBLASTER_C_bool)1;
00380 }
00381 
00382 I_Callback* SocketDriver::getCallbackClient()
00383 {
00384    return callbackClient_;
00385 }
00386 
00388 void SocketDriver::initialize(const string& name, I_Callback &client)
00389 {
00390    ::ExceptionStruct socketException;
00391    ME = string("SocketDriver-") + instanceName_ + "-" + name;
00392    if (log_.call()) log_.call(ME, "initialize() callback server");
00393    callbackClient_ = &client;
00394    Lock lock(mutex_);
00395    if (connection_ == 0) {
00396       if (log_.trace()) log_.trace(ME, "ERROR: connection_ is null");
00397       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, name, ME + ".initialize", "en",
00398                        global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL");
00399    }
00400    try {
00401       if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
00402       if (connection_->initialize(connection_, myUpdate, &socketException) == false) {
00403          log_.warn(ME, "Connection to xmlBlaster failed,"
00404                 " please start the server or check your configuration\n");
00405          freeResources(true);
00406       }
00407       if (log_.trace()) log_.trace(ME, "After createCallbackServer");
00408    } catch_MACRO("::initialize", true)
00409 }
00410 
00411 string SocketDriver::getCbProtocol()
00412 {
00413     return Constants::SOCKET; // "SOCKET";
00414 }                             
00415 
00416 string SocketDriver::getCbAddress()
00417 {
00418    Lock lock(mutex_);
00419    if (connection_ == 0 || connection_->callbackP == 0) {
00420       return string("socket://:");
00421    }
00422    try {
00423       return string("socket://") + string(connection_->callbackP->hostCB) + ":" +
00424              lexical_cast<std::string>(connection_->callbackP->portCB);
00425    } catch_MACRO("::getCbAddress", false)
00426 }
00427 
00428 bool SocketDriver::shutdownCb()
00429 {
00430    Lock lock(mutex_);
00431    if (connection_ == 0 || connection_->callbackP == 0) return false;
00432    connection_->callbackP->shutdown(connection_->callbackP);
00433    return true;
00434 }
00435 
00436 ConnectReturnQosRef SocketDriver::connect(const ConnectQosRef& qos) //throw (XmlBlasterException) // Visual C++ emits a warning with this throw clause
00437 {
00438    if (log_.call()) log_.call(ME, string("connect() ") + string((connection_==0)?"connection_==0":"connection_!=0") +
00439                               ", secretSessionId_="+secretSessionId_);
00440                               //+" isConnected=" + ((connection_==0)?XMLBLASTER_FALSE:lexical_cast<string>(connection_->isConnected(connection_))));
00441    ::ExceptionStruct socketException;
00442    Lock lock(mutex_);
00443    try {
00444       loginName_ = qos->getUserId();
00445       if (connection_ == 0) {
00446          reconnectOnIpLevel(); // Connects on IP level only, throws an exception on failure
00447          if (secretSessionId_ != "") {
00448             qos->getSessionQos().setSecretSessionId(secretSessionId_);
00449          }
00450          if (connection_ != 0 && connection_->callbackP != 0) {
00451             ConnectQos *qq = const_cast<ConnectQos*>(&(*qos));
00452             if (qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->getType() == Constants::SOCKET) {
00453                // Force callback address, it could have changed on reconnect (checked to cb not be a delegate)
00454                string addr = string("socket://") + string(connection_->callbackP->hostCB) + ":" +
00455                       lexical_cast<std::string>(connection_->callbackP->portCB);
00456                qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->setAddress(addr);
00457                log_.trace(ME, "Setting callback address to " + addr);
00458             }
00459          }
00460       }
00461 
00462       char *retQos = connection_->connect(connection_, qos->toXml().c_str(),
00463                                           myUpdate, &socketException);
00464       if (*socketException.errorCode != 0) {
00465          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
00466       }
00467       ConnectQosFactory factory(global_);
00468       ConnectReturnQosRef connectReturnQos = factory.readObject(retQos);
00469       xmlBlasterFree(retQos);
00470       secretSessionId_ = connectReturnQos->getSecretSessionId();
00471       return connectReturnQos;
00472    } catch_MACRO("::connect", false)
00473 }
00474 
00475 bool SocketDriver::disconnect(const DisconnectQos& qos)
00476 {
00477    if (log_.call()) log_.call(ME, "disconnect()");
00478    if (connection_ == 0) return false;
00479    ::ExceptionStruct socketException;
00480    Lock lock(mutex_);
00481    try {
00482       bool ret = connection_->disconnect(connection_, qos.toXml().c_str(), &socketException);
00483       if (*socketException.errorCode != 0) {
00484          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
00485       }
00486       return ret;
00487    } catch_MACRO("::disconnect", false)
00488    return true;
00489 }
00490 
00491 string SocketDriver::getProtocol()
00492 {
00493    return Constants::SOCKET; // "SOCKET";
00494 }
00495 
00497 bool SocketDriver::shutdown()
00498 {
00499    if (log_.call()) log_.call(ME, "shutdown()");
00500    Lock lock(mutex_);
00501    if (connection_ == 0) return false;
00502    freeResources(true);
00503    return true;
00504 }
00505 
00506 string SocketDriver::getLoginName()
00507 {
00508    return loginName_;
00509 }
00510 
00511 bool SocketDriver::isLoggedIn()
00512 {
00513    Lock lock(mutex_);
00514    return connection_ != 0 && connection_->isConnected(connection_);
00515 }
00516 
00517 string SocketDriver::ping(const string& qos)
00518 {
00519    ::ExceptionStruct socketException;
00520    Lock lock(mutex_);
00521    if (connection_ == 0) {
00522       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, "", ME + ".ping", "en",
00523                        global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL");
00524    }
00525    try {
00526       char *retQosP = connection_->ping(connection_, qos.c_str(), &socketException);
00527       if (retQosP == 0 || *socketException.errorCode != 0) {
00528          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
00529       }
00530       string retQos(retQosP);
00531       xmlBlasterFree(retQosP);
00532       return retQos;
00533    } catch_MACRO("::ping", false)
00534 }
00535 
00536 SubscribeReturnQos SocketDriver::subscribe(const SubscribeKey& key, const SubscribeQos& qos)
00537 {
00538    ::ExceptionStruct socketException;
00539    Lock lock(mutex_);
00540    if (connection_ == 0) {
00541       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
00542    }
00543    try {
00544       char *response = connection_->subscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
00545       if (*socketException.errorCode != 0) {
00546          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
00547       }
00548       SubscribeReturnQos subscribeReturnQos(global_, statusQosFactory_.readObject(response));
00549       xmlBlasterFree(response);
00550       return subscribeReturnQos;
00551    } catch_MACRO("::subscribe", false)
00552 }
00553 
00554 vector<MessageUnit> SocketDriver::get(const GetKey& getKey, const GetQos& getQos)
00555 {
00556    ::ExceptionStruct socketException;
00557    Lock lock(mutex_);
00558    if (connection_ == 0) {
00559       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
00560    }
00561    try {
00562       MsgUnitArr *msgUnitArr;  // The returned C struct array
00563       string key = getKey.toXml();
00564       string qos = getQos.toXml();
00565       msgUnitArr = connection_->get(connection_, key.c_str(), qos.c_str(), &socketException);
00566       if (*socketException.errorCode != 0) {
00567          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
00568       }
00569       if (msgUnitArr != (MsgUnitArr *)0) {
00570          vector<MessageUnit> ret;
00571          for (size_t i=0; i<msgUnitArr->len; i++) {
00572             MsgKeyData msgKeyData = msgKeyFactory_.readObject(string(msgUnitArr->msgUnitArr[i].key));
00573             MsgQosData msgQosData = msgQosFactory_.readObject(string(msgUnitArr->msgUnitArr[i].qos));
00574             MessageUnit messageUnit(msgKeyData,
00575                          msgUnitArr->msgUnitArr[i].contentLen,
00576                          (const unsigned char*)msgUnitArr->msgUnitArr[i].content,
00577                          msgQosData);
00578             ret.insert(ret.end(),  messageUnit);
00579          }
00580          freeMsgUnitArr(msgUnitArr);
00581          return ret;
00582       }
00583    } catch_MACRO("::get", false)
00584    return vector<MessageUnit>();
00585 }
00586 
00587 vector<UnSubscribeReturnQos>
00588 SocketDriver::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
00589 {
00590    ::ExceptionStruct socketException;
00591    Lock lock(mutex_);
00592    if (connection_ == 0) {
00593       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
00594    }
00595    try {
00596       QosArr* retC = connection_->unSubscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
00597       if (*socketException.errorCode != 0) {
00598          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
00599       }
00600       vector<UnSubscribeReturnQos> ret;
00601       for (size_t ii=0; ii<retC->len; ii++) {
00602          ret.insert(ret.end(),  UnSubscribeReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])));
00603       }
00604       freeQosArr(retC);
00605       return ret;
00606    } catch_MACRO("::unSubscribe", false)
00607    return vector<UnSubscribeReturnQos>();
00608 }
00609 
00610 PublishReturnQos SocketDriver::publish(const MessageUnit& msgUnit)
00611 {
00612    ::ExceptionStruct socketException;
00613    Lock lock(mutex_);
00614    if (connection_ == 0) {
00615       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
00616    }
00617    try {
00618       if (log_.call()) log_.call(ME, "publish");
00619       ::MsgUnit msgUnitC;
00620       const string key = msgUnit.getKey().toXml();
00621       msgUnitC.key = key.c_str();
00622       msgUnitC.content = reinterpret_cast<const char *>(msgUnit.getContent());
00623       msgUnitC.contentLen = msgUnit.getContentLen();
00624       const string qos = msgUnit.getQos().toXml();
00625       msgUnitC.qos = qos.c_str();
00626 
00627       char* response = connection_->publish(connection_, &msgUnitC, &socketException);
00628 
00629       if (*socketException.errorCode != 0) {
00630          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
00631       }
00632 
00633       //freeMsgUnitData(&msgUnitC); -> not needed as it contains pointers only
00634       if (log_.trace()) log_.trace(ME, "successfully published");
00635       PublishReturnQos publishReturnQos(global_, statusQosFactory_.readObject(response));
00636       xmlBlasterFree(response);
00637       return publishReturnQos;
00638    } catch_MACRO("::publish", false)
00639 }
00640 
00641 void SocketDriver::publishOneway(const vector<MessageUnit> &msgUnitArr)
00642 {
00643    ::ExceptionStruct socketException;
00644    Lock lock(mutex_);
00645    if (connection_ == 0) {
00646       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
00647    }
00648    try {
00649 
00650       // Copy C++ MessageUnit to C MsgUnit
00651       ::MsgUnitArr msgUnitArrC;
00652       vector<MessageUnit>::const_iterator iter;
00653       memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr));
00654       msgUnitArrC.len = msgUnitArr.size();
00655       msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit));
00656       size_t ii=0;
00657       vector<string> keyArr;  // We need to hold key/qos on the stack because toXml() returns a temporary string
00658       vector<string> qosArr;
00659       for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) {
00660          //log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len));
00661          const MessageUnit& msgUnitCpp = *iter;
00662          ::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii];
00663          keyArr.push_back(msgUnitCpp.getKey().toXml());
00664          msgUnitC.key = keyArr[ii].c_str();
00665          qosArr.push_back(msgUnitCpp.getQos().toXml());
00666          msgUnitC.qos = qosArr[ii].c_str();
00667          msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen();
00668          msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent());
00669          ii++;
00670       }
00671 
00672       connection_->publishOneway(connection_, &msgUnitArrC, &socketException);
00673 
00674       ::free(msgUnitArrC.msgUnitArr);
00675 
00676       if (*socketException.errorCode != 0) {
00677          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
00678       }
00679    } catch_MACRO("::publishOneway", false)
00680 }
00681 
00682 vector<PublishReturnQos> SocketDriver::publishArr(const vector<MessageUnit> &msgUnitArr)
00683 {
00684    ::ExceptionStruct socketException;
00685    Lock lock(mutex_);
00686    if (connection_ == 0) {
00687       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
00688    }
00689    try {
00690 
00691       // Copy C++ MessageUnit to C MsgUnit
00692       ::MsgUnitArr msgUnitArrC;
00693       vector<MessageUnit>::const_iterator iter;
00694       memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr));
00695       msgUnitArrC.len = msgUnitArr.size();
00696       msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit));
00697       size_t ii=0;
00698       vector<string> keyArr;  // We need to hold key/qos on the stack because toXml() returns a temporary string
00699       vector<string> qosArr;
00700       for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) {
00701          //log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len));
00702          const MessageUnit& msgUnitCpp = *iter;
00703          ::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii];
00704          keyArr.push_back(msgUnitCpp.getKey().toXml());
00705          msgUnitC.key = keyArr[ii].c_str();
00706          qosArr.push_back(msgUnitCpp.getQos().toXml());
00707          msgUnitC.qos = qosArr[ii].c_str();
00708          msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen();
00709          msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent());
00710          ii++;
00711       }
00712 
00713       QosArr* retC = connection_->publishArr(connection_, &msgUnitArrC, &socketException);
00714 
00715       ::free(msgUnitArrC.msgUnitArr);
00716 
00717       if (*socketException.errorCode != 0) {
00718          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
00719       }
00720       vector<PublishReturnQos> ret;
00721       for (size_t jj=0; jj<retC->len; jj++) {
00722          ret.insert(ret.end(),  PublishReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[jj])) );
00723       }
00724       freeQosArr(retC);
00725       return ret;
00726    } catch_MACRO("::publishArr", false)
00727    return vector<PublishReturnQos>();
00728 }
00729 
00730 vector<EraseReturnQos> SocketDriver::erase(const EraseKey& key, const EraseQos& qos)
00731 {
00732    ::ExceptionStruct socketException;
00733    Lock lock(mutex_);
00734    if (connection_ == 0) {
00735       throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
00736    }
00737    try {
00738       QosArr* retC = connection_->erase(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
00739       if (*socketException.errorCode != 0) {
00740          throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
00741       }
00742       vector<EraseReturnQos> ret;
00743       for (size_t ii=0; ii<retC->len; ii++) {
00744          ret.insert(ret.end(),  EraseReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])) );
00745       }
00746       freeQosArr(retC);
00747       return ret;
00748    } catch_MACRO("::erase", false)
00749    return vector<EraseReturnQos>();
00750 }
00751 
00752 I_ProgressListener* SocketDriver::registerProgressListener(I_ProgressListener *listener) {
00753    I_ProgressListener *old = this->progressListener_;
00754    this->progressListener_ = listener;
00755    if (connection_ && connection_->callbackP != 0) {
00756       connection_->callbackP->readFromSocket.numReadUserP = this;
00757       if (this->progressListener_ && connection_->callbackP != 0) {
00758          connection_->callbackP->readFromSocket.numReadFuncP = callbackProgressListener;
00759       }
00760       else {
00761          connection_->callbackP->readFromSocket.numReadFuncP = 0; // Dangerous: not thread safe, TODO: Add a mutex
00762       }
00763    }
00764    return old;
00765 }
00766 
00767 string SocketDriver::usage()
00768 {
00769    char usage[XMLBLASTER_MAX_USAGE_LEN];
00770    ::xmlBlasterAccessUnparsedUsage(usage);
00771    return  "\nThe SOCKET plugin configuration:" +
00772            string(usage);
00773 }
00774 
00775 // Exception conversion ....
00776 org::xmlBlaster::util::XmlBlasterException SocketDriver::convertFromSocketException(const ::ExceptionStruct& ex) const
00777 {
00778    return org::xmlBlaster::util::XmlBlasterException(
00779             (*ex.errorCode=='\0')?string("internal.unknown"):string(ex.errorCode),
00780             string(""),
00781             ME,
00782             "en",
00783             string(ex.message),
00784             global_.getVersion() + " " + global_.getBuildTimestamp());
00785             // TODO: isServerSide!!!
00786 }
00787 
00788 
00789 ::ExceptionStruct SocketDriver::convertToSocketException(org::xmlBlaster::util::XmlBlasterException& ex)
00790 {
00791    ::ExceptionStruct exSocket;
00792    ::initializeXmlBlasterException(&exSocket);
00793    strncpy0(exSocket.errorCode, ex.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00794    strncpy0(exSocket.message, ex.getMessage().c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
00795    //exSocket.remote = ??
00796    return exSocket;
00797 }
00798 
00799 }}}}} // namespaces
00800