00001 /*------------------------------------------------------------------------------ 00002 Name: SocketDriver.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: The client driver for the socket protocol 00006 ------------------------------------------------------------------------------*/ 00007 #include <client/protocol/socket/SocketDriver.h> 00008 #include <util/ErrorCode.h> 00009 #include <util/XmlBlasterException.h> 00010 #include <util/Global.h> 00011 #include <util/lexical_cast.h> 00012 #include <XmlBlasterAccessUnparsed.h> // The C SOCKET client library 00013 #include <util/qos/ConnectQosFactory.h> 00014 #include <util/Properties.h> 00015 #include <string> 00016 #include <stdarg.h> // va_start 00017 #include <stdio.h> // vsnprintf for g++ 2.9x only 00018 00019 static void myLogger(void *logUserP, 00020 XMLBLASTER_LOG_LEVEL currLevel, 00021 XMLBLASTER_LOG_LEVEL level, 00022 const char *location, const char *fmt, ...); 00023 00024 //static ::XmlBlasterNumReadFunc callbackProgressListener; // what's wrong with this? 00025 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes); 00026 00043 static void myLogger(void *logUserP, 00044 XMLBLASTER_LOG_LEVEL currLevel, 00045 XMLBLASTER_LOG_LEVEL level, 00046 const char *location, const char *fmt, ...) 00047 { 00048 /* Guess we need no more than 200 bytes. */ 00049 int n, size = 200; 00050 char *p = 0; 00051 va_list ap; 00052 org::xmlBlaster::client::protocol::socket::SocketDriver *sd = 00053 (org::xmlBlaster::client::protocol::socket::SocketDriver *)logUserP; 00054 org::xmlBlaster::util::I_Log& log = sd->getLog(); 00055 00056 if (level > currLevel) { /* XMLBLASTER_LOG_ERROR, XMLBLASTER_LOG_WARN, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_TRACE */ 00057 return; 00058 } 00059 if ((p = (char *)malloc (size)) == NULL) 00060 return; 00061 00062 for (;;) { 00063 /* Try to print in the allocated space. */ 00064 va_start(ap, fmt); 00065 n = VSNPRINTF(p, size, fmt, ap); /* UNIX: vsnprintf(), WINDOWS: _vsnprintf() */ 00066 va_end(ap); 00067 /* If that worked, print the string to console. */ 00068 if (n > -1 && n < size) { 00069 if (level == XMLBLASTER_LOG_INFO) 00070 log.info(location, p); 00071 else if (level == XMLBLASTER_LOG_WARN) 00072 log.warn(location, p); 00073 else if (level == XMLBLASTER_LOG_ERROR) 00074 log.error(location, p); 00075 else 00076 log.trace(location, p); 00077 free(p); 00078 return; 00079 } 00080 /* Else try again with more space. */ 00081 if (n > -1) /* glibc 2.1 */ 00082 size = n+1; /* precisely what is needed */ 00083 else /* glibc 2.0 */ 00084 size *= 2; /* twice the old size */ 00085 if ((p = (char *)realloc (p, size)) == NULL) { 00086 return; 00087 } 00088 } 00089 } 00090 00096 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes) { 00097 org::xmlBlaster::client::protocol::socket::SocketDriver *sd = 00098 (org::xmlBlaster::client::protocol::socket::SocketDriver *)userP; 00099 //org::xmlBlaster::util::I_Log& log = sd->getLog(); 00100 //if (log.trace()) log.trace("SocketDriver", "Update data progress currBytesRead=" + 00101 // org::xmlBlaster::util::lexical_cast<std::string>(currBytesRead) + 00102 // " nbytes=" + org::xmlBlaster::util::lexical_cast<std::string>(nbytes)); 00103 if (sd != 0 && sd->progressListener_ != 0) { 00104 sd->progressListener_->progress("", currBytesRead, nbytes); 00105 } 00106 } 00107 00108 namespace org { 00109 namespace xmlBlaster { 00110 namespace client { 00111 namespace protocol { 00112 namespace socket { 00113 00114 using namespace std; 00115 using namespace org::xmlBlaster::util; 00116 using namespace org::xmlBlaster::util::qos; 00117 using namespace org::xmlBlaster::util::key; 00118 using namespace org::xmlBlaster::util::thread; 00119 using namespace org::xmlBlaster::client::protocol; 00120 using namespace org::xmlBlaster::client::key; 00121 using namespace org::xmlBlaster::client::qos; 00122 00123 static XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData, 00124 ::ExceptionStruct *exception); 00125 00126 void SocketDriver::freeResources(bool deleteConnection) 00127 { 00128 if (log_.call()) log_.call(ME, "freeResources("+lexical_cast<std::string>(deleteConnection)+") connection_=" + ((connection_==0)?"0":lexical_cast<std::string>(connection_))); 00129 if (deleteConnection && connection_ != 0) { 00130 freeXmlBlasterAccessUnparsed(connection_); 00131 connection_ = 0; 00132 } 00133 if (deleteConnection && argsStructP_ != 0) { 00134 global_.freeArgs(*argsStructP_); 00135 delete argsStructP_; 00136 argsStructP_ = 0; 00137 } 00138 } 00139 00140 /* 00141 Note on exception handling: 00142 If we throw an exception, our master ConnectionsHandler.cpp will 00143 catch it and to a shutdown() on us. This will cleanup the resources. 00144 */ 00145 #define catch_MACRO(methodName, deleteConnection) \ 00146 catch(const XmlBlasterException *ex) { \ 00147 freeResources(deleteConnection); \ 00148 throw ex; \ 00149 } \ 00150 catch(XmlBlasterException &ex) { \ 00151 freeResources(deleteConnection); \ 00152 ex.setLocation(ME + string(methodName)); \ 00153 throw ex; \ 00154 } \ 00155 catch(const ::ExceptionStruct *ex) { \ 00156 freeResources(deleteConnection); \ 00157 org::xmlBlaster::util::XmlBlasterException xx = convertFromSocketException(*ex); \ 00158 delete ex; \ 00159 throw xx; \ 00160 } \ 00161 catch(const ::ExceptionStruct &ex) { \ 00162 freeResources(deleteConnection); \ 00163 throw convertFromSocketException(ex); \ 00164 } \ 00165 catch(const exception &ex) { \ 00166 freeResources(deleteConnection); \ 00167 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \ 00168 loginName_, ME + string(methodName), "en", \ 00169 global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \ 00170 string("type='exception', msg='") \ 00171 + ex.what() + "'"); \ 00172 } \ 00173 catch(const string &ex) { \ 00174 freeResources(deleteConnection); \ 00175 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \ 00176 loginName_, ME + string(methodName), "en", \ 00177 global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \ 00178 string("type='string', msg='") + ex + "'"); \ 00179 } \ 00180 catch(const char *ex) { \ 00181 freeResources(deleteConnection); \ 00182 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \ 00183 loginName_, ME + string(methodName), "en", \ 00184 global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \ 00185 string("type='char*', msg='") + ex + "'"); \ 00186 } \ 00187 catch(int ex) { \ 00188 freeResources(deleteConnection); \ 00189 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \ 00190 loginName_, ME + string(methodName), "en", \ 00191 global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \ 00192 string("type='int', msg='") + lexical_cast<std::string>(ex) + "'"); \ 00193 } \ 00194 catch (...) { \ 00195 freeResources(deleteConnection); \ 00196 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \ 00197 loginName_, ME + string(methodName), "en", \ 00198 global_.getVersion() + " " + global_.getBuildTimestamp());\ 00199 } 00200 00201 SocketDriver::SocketDriver(const SocketDriver& socketDriver) 00202 : mutex_(socketDriver.mutex_), 00203 ME("SocketDriver"), 00204 argsStructP_(0), 00205 global_(socketDriver.global_), 00206 log_(socketDriver.log_), 00207 statusQosFactory_(socketDriver.global_), 00208 msgKeyFactory_(socketDriver.global_), 00209 msgQosFactory_(socketDriver.global_), 00210 callbackClient_(0), 00211 progressListener_(0) 00212 { 00213 // no instantiation of these since this should never be invoked (just to make it private) 00214 connection_ = NULL; 00215 argsStructP_ = new ArgsStruct_T; 00216 //memset(argsStructP_, '\0', sizeof(ArgsStruct_T)); 00217 global_.fillArgs(*argsStructP_); 00218 if (log_.call()) log_.call(ME, string("Copy constructor")); 00219 } 00220 00221 SocketDriver& SocketDriver::operator =(const SocketDriver& /*socketDriver*/) 00222 { 00223 if (log_.call()) log_.call(ME, "operator=()"); 00224 return *this; 00225 } 00226 00227 00228 SocketDriver::SocketDriver(Global& global, const string instanceName) 00229 : mutex_(), 00230 instanceName_(instanceName), 00231 connection_(NULL), 00232 ME(string("SocketDriver-") + instanceName), 00233 argsStructP_(0), 00234 global_(global), 00235 log_(global.getLog("org.xmlBlaster.client.protocol.socket")), 00236 statusQosFactory_(global), 00237 msgKeyFactory_(global), 00238 msgQosFactory_(global), 00239 callbackClient_(0), 00240 progressListener_(0) 00241 { 00242 if (log_.call()) log_.call("SocketDriver", string("getInstance for ") + instanceName); 00243 00244 argsStructP_ = new ArgsStruct_T; 00245 if (!global_.getProperty().propertyExists("logLevel")) { 00246 if (log_.trace() || log_.call()) 00247 global_.getProperty().setProperty("logLevel", "TRACE"); 00248 else if (log_.dump()) 00249 global_.getProperty().setProperty("logLevel", "DUMP"); 00250 } 00251 00252 global_.fillArgs(*argsStructP_); 00253 try { 00254 connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv); 00255 if (connection_) { 00256 connection_->userObject = this; // Transports us to the myUpdate() method 00257 connection_->log = myLogger; // Register our own logging function 00258 connection_->logUserP = this; // Pass ourself to myLogger() 00259 if (log_.dump()) { 00260 log_.dump(ME, "C properties:"); 00261 ::dumpProperties(connection_->props); 00262 } 00263 } 00264 else { 00265 log_.error(ME, "Allocation of C SOCKET library failed"); 00266 } 00267 } catch_MACRO("::Constructor", true) 00268 } 00269 00274 void SocketDriver::reconnectOnIpLevel(void) 00275 { 00276 log_.trace(ME, "Trying to reconnect to server"); 00277 00278 freeResources(true); // Cleanup if old connection exists 00279 00280 // Give a chance to new configuration settings 00281 if (argsStructP_ != 0) { 00282 global_.freeArgs(*argsStructP_); 00283 delete argsStructP_; 00284 argsStructP_ = 0; 00285 } 00286 argsStructP_ = new ArgsStruct_T; 00287 global_.fillArgs(*argsStructP_); 00288 00289 ::ExceptionStruct socketException; 00290 00291 try { 00292 connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv); 00293 connection_->userObject = this; // Transports us to the myUpdate() method 00294 connection_->log = myLogger; // Register our own logging function 00295 connection_->logUserP = this; // Pass SocketDriver to myLogger() 00296 } catch_MACRO("::Constructor", true) 00297 00298 try { 00299 if (log_.trace()) log_.trace(ME, "Before createCallbackServer"); 00300 if (connection_->initialize(connection_, myUpdate, &socketException) == false) { 00301 if (log_.trace()) log_.trace(ME, string("Reconnection to xmlBlaster failed, please start the server or check your network: ") + socketException.message); 00302 throw socketException; 00303 } 00304 registerProgressListener(this->progressListener_); // Re-register 00305 if (log_.trace()) log_.trace(ME, "After createCallbackServer"); 00306 } catch_MACRO("::initialize", true) 00307 } 00308 00309 SocketDriver::~SocketDriver() 00310 { 00311 if (log_.call()) log_.call(ME, "~SocketDriver()"); 00312 try { 00313 freeResources(true); 00314 } 00315 catch (...) { 00316 log_.error(ME, "Unexpected catch in ~SocketDriver()"); 00317 } 00318 } 00319 00320 XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData, 00321 ::ExceptionStruct *exception) 00322 { 00323 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData; 00324 SocketDriver* socketDriver = static_cast<SocketDriver*>(xa->userObject); 00325 Global& global = socketDriver->getGlobal(); 00326 I_Log& log = socketDriver->getLog(); 00327 const string &ME = socketDriver->me(); 00328 00329 try { 00330 for (size_t i=0; i<msgUnitArr->len; i++) { 00331 //char *xml = messageUnitToXml(&msgUnitArr->msgUnitArr[i]); 00332 //printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n",xml); 00333 //xmlBlasterFree(xml); 00334 if (log.trace()) log.trace(ME, "Received callback message"); 00335 ::MsgUnit& msgUnit = msgUnitArr->msgUnitArr[i]; 00336 I_Callback* cb = socketDriver->getCallbackClient(); 00337 if (cb != 0) { 00338 UpdateKey updateKey(global, socketDriver->getMsgKeyFactory().readObject(string(msgUnit.key))); 00339 UpdateQos updateQos(global, socketDriver->getMsgQosFactory().readObject(string(msgUnit.qos))); 00340 std::string retQos = cb->update(msgUnitArr->secretSessionId, 00341 updateKey, (const unsigned char*)msgUnit.content, 00342 msgUnit.contentLen, updateQos); 00343 msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(retQos.c_str()); 00344 } 00345 else { /* Return QoS: Everything is OK */ 00346 log.error(ME, string("Ignoring unexpected update message as client has not registered a callback: ") + msgUnit.key); 00347 msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(Constants::RET_OK); // "<qos><state id='OK'/></qos>"); 00348 } 00349 } 00350 //throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "TEST THROWING EXCEPTION"); 00351 } 00352 catch (XmlBlasterException &e) { 00353 string tmp = "Exception caught in C++ update(), " + 00354 lexical_cast<std::string>(msgUnitArr->len) + 00355 " messages are handled as not delivered: " + 00356 e.getMessage(); 00357 log.error(ME, tmp); 00358 for (size_t i=0; i<msgUnitArr->len; i++) { 00359 char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100); 00360 log.error(ME, xml); 00361 xmlBlasterFree(xml); 00362 } 00363 strncpy0(exception->errorCode, e.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00364 strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN); 00365 return (XMLBLASTER_C_bool)0; 00366 } 00367 catch(...) { 00368 string tmp = "Unidentified exception caught in C++ update(), " + lexical_cast<std::string>(msgUnitArr->len) + " messages are handled as not delivered"; 00369 log.error(ME, tmp); 00370 for (size_t i=0; i<msgUnitArr->len; i++) { 00371 char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100); 00372 log.error(ME, xml); 00373 xmlBlasterFree(xml); 00374 } 00375 strncpy0(exception->errorCode, "user.update.error", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00376 strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN); 00377 return (XMLBLASTER_C_bool)0; 00378 } 00379 return (XMLBLASTER_C_bool)1; 00380 } 00381 00382 I_Callback* SocketDriver::getCallbackClient() 00383 { 00384 return callbackClient_; 00385 } 00386 00388 void SocketDriver::initialize(const string& name, I_Callback &client) 00389 { 00390 ::ExceptionStruct socketException; 00391 ME = string("SocketDriver-") + instanceName_ + "-" + name; 00392 if (log_.call()) log_.call(ME, "initialize() callback server"); 00393 callbackClient_ = &client; 00394 Lock lock(mutex_); 00395 if (connection_ == 0) { 00396 if (log_.trace()) log_.trace(ME, "ERROR: connection_ is null"); 00397 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, name, ME + ".initialize", "en", 00398 global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL"); 00399 } 00400 try { 00401 if (log_.trace()) log_.trace(ME, "Before createCallbackServer"); 00402 if (connection_->initialize(connection_, myUpdate, &socketException) == false) { 00403 log_.warn(ME, "Connection to xmlBlaster failed," 00404 " please start the server or check your configuration\n"); 00405 freeResources(true); 00406 } 00407 if (log_.trace()) log_.trace(ME, "After createCallbackServer"); 00408 } catch_MACRO("::initialize", true) 00409 } 00410 00411 string SocketDriver::getCbProtocol() 00412 { 00413 return Constants::SOCKET; // "SOCKET"; 00414 } 00415 00416 string SocketDriver::getCbAddress() 00417 { 00418 Lock lock(mutex_); 00419 if (connection_ == 0 || connection_->callbackP == 0) { 00420 return string("socket://:"); 00421 } 00422 try { 00423 return string("socket://") + string(connection_->callbackP->hostCB) + ":" + 00424 lexical_cast<std::string>(connection_->callbackP->portCB); 00425 } catch_MACRO("::getCbAddress", false) 00426 } 00427 00428 bool SocketDriver::shutdownCb() 00429 { 00430 Lock lock(mutex_); 00431 if (connection_ == 0 || connection_->callbackP == 0) return false; 00432 connection_->callbackP->shutdown(connection_->callbackP); 00433 return true; 00434 } 00435 00436 ConnectReturnQosRef SocketDriver::connect(const ConnectQosRef& qos) //throw (XmlBlasterException) // Visual C++ emits a warning with this throw clause 00437 { 00438 if (log_.call()) log_.call(ME, string("connect() ") + string((connection_==0)?"connection_==0":"connection_!=0") + 00439 ", secretSessionId_="+secretSessionId_); 00440 //+" isConnected=" + ((connection_==0)?XMLBLASTER_FALSE:lexical_cast<string>(connection_->isConnected(connection_)))); 00441 ::ExceptionStruct socketException; 00442 Lock lock(mutex_); 00443 try { 00444 loginName_ = qos->getUserId(); 00445 if (connection_ == 0) { 00446 reconnectOnIpLevel(); // Connects on IP level only, throws an exception on failure 00447 if (secretSessionId_ != "") { 00448 qos->getSessionQos().setSecretSessionId(secretSessionId_); 00449 } 00450 if (connection_ != 0 && connection_->callbackP != 0) { 00451 ConnectQos *qq = const_cast<ConnectQos*>(&(*qos)); 00452 if (qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->getType() == Constants::SOCKET) { 00453 // Force callback address, it could have changed on reconnect (checked to cb not be a delegate) 00454 string addr = string("socket://") + string(connection_->callbackP->hostCB) + ":" + 00455 lexical_cast<std::string>(connection_->callbackP->portCB); 00456 qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->setAddress(addr); 00457 log_.trace(ME, "Setting callback address to " + addr); 00458 } 00459 } 00460 } 00461 00462 char *retQos = connection_->connect(connection_, qos->toXml().c_str(), 00463 myUpdate, &socketException); 00464 if (*socketException.errorCode != 0) { 00465 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO 00466 } 00467 ConnectQosFactory factory(global_); 00468 ConnectReturnQosRef connectReturnQos = factory.readObject(retQos); 00469 xmlBlasterFree(retQos); 00470 secretSessionId_ = connectReturnQos->getSecretSessionId(); 00471 return connectReturnQos; 00472 } catch_MACRO("::connect", false) 00473 } 00474 00475 bool SocketDriver::disconnect(const DisconnectQos& qos) 00476 { 00477 if (log_.call()) log_.call(ME, "disconnect()"); 00478 if (connection_ == 0) return false; 00479 ::ExceptionStruct socketException; 00480 Lock lock(mutex_); 00481 try { 00482 bool ret = connection_->disconnect(connection_, qos.toXml().c_str(), &socketException); 00483 if (*socketException.errorCode != 0) { 00484 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO 00485 } 00486 return ret; 00487 } catch_MACRO("::disconnect", false) 00488 return true; 00489 } 00490 00491 string SocketDriver::getProtocol() 00492 { 00493 return Constants::SOCKET; // "SOCKET"; 00494 } 00495 00497 bool SocketDriver::shutdown() 00498 { 00499 if (log_.call()) log_.call(ME, "shutdown()"); 00500 Lock lock(mutex_); 00501 if (connection_ == 0) return false; 00502 freeResources(true); 00503 return true; 00504 } 00505 00506 string SocketDriver::getLoginName() 00507 { 00508 return loginName_; 00509 } 00510 00511 bool SocketDriver::isLoggedIn() 00512 { 00513 Lock lock(mutex_); 00514 return connection_ != 0 && connection_->isConnected(connection_); 00515 } 00516 00517 string SocketDriver::ping(const string& qos) 00518 { 00519 ::ExceptionStruct socketException; 00520 Lock lock(mutex_); 00521 if (connection_ == 0) { 00522 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, "", ME + ".ping", "en", 00523 global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL"); 00524 } 00525 try { 00526 char *retQosP = connection_->ping(connection_, qos.c_str(), &socketException); 00527 if (retQosP == 0 || *socketException.errorCode != 0) { 00528 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO 00529 } 00530 string retQos(retQosP); 00531 xmlBlasterFree(retQosP); 00532 return retQos; 00533 } catch_MACRO("::ping", false) 00534 } 00535 00536 SubscribeReturnQos SocketDriver::subscribe(const SubscribeKey& key, const SubscribeQos& qos) 00537 { 00538 ::ExceptionStruct socketException; 00539 Lock lock(mutex_); 00540 if (connection_ == 0) { 00541 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server"); 00542 } 00543 try { 00544 char *response = connection_->subscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException); 00545 if (*socketException.errorCode != 0) { 00546 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO 00547 } 00548 SubscribeReturnQos subscribeReturnQos(global_, statusQosFactory_.readObject(response)); 00549 xmlBlasterFree(response); 00550 return subscribeReturnQos; 00551 } catch_MACRO("::subscribe", false) 00552 } 00553 00554 vector<MessageUnit> SocketDriver::get(const GetKey& getKey, const GetQos& getQos) 00555 { 00556 ::ExceptionStruct socketException; 00557 Lock lock(mutex_); 00558 if (connection_ == 0) { 00559 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server"); 00560 } 00561 try { 00562 MsgUnitArr *msgUnitArr; // The returned C struct array 00563 string key = getKey.toXml(); 00564 string qos = getQos.toXml(); 00565 msgUnitArr = connection_->get(connection_, key.c_str(), qos.c_str(), &socketException); 00566 if (*socketException.errorCode != 0) { 00567 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO 00568 } 00569 if (msgUnitArr != (MsgUnitArr *)0) { 00570 vector<MessageUnit> ret; 00571 for (size_t i=0; i<msgUnitArr->len; i++) { 00572 MsgKeyData msgKeyData = msgKeyFactory_.readObject(string(msgUnitArr->msgUnitArr[i].key)); 00573 MsgQosData msgQosData = msgQosFactory_.readObject(string(msgUnitArr->msgUnitArr[i].qos)); 00574 MessageUnit messageUnit(msgKeyData, 00575 msgUnitArr->msgUnitArr[i].contentLen, 00576 (const unsigned char*)msgUnitArr->msgUnitArr[i].content, 00577 msgQosData); 00578 ret.insert(ret.end(), messageUnit); 00579 } 00580 freeMsgUnitArr(msgUnitArr); 00581 return ret; 00582 } 00583 } catch_MACRO("::get", false) 00584 return vector<MessageUnit>(); 00585 } 00586 00587 vector<UnSubscribeReturnQos> 00588 SocketDriver::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos) 00589 { 00590 ::ExceptionStruct socketException; 00591 Lock lock(mutex_); 00592 if (connection_ == 0) { 00593 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server"); 00594 } 00595 try { 00596 QosArr* retC = connection_->unSubscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException); 00597 if (*socketException.errorCode != 0) { 00598 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO 00599 } 00600 vector<UnSubscribeReturnQos> ret; 00601 for (size_t ii=0; ii<retC->len; ii++) { 00602 ret.insert(ret.end(), UnSubscribeReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii]))); 00603 } 00604 freeQosArr(retC); 00605 return ret; 00606 } catch_MACRO("::unSubscribe", false) 00607 return vector<UnSubscribeReturnQos>(); 00608 } 00609 00610 PublishReturnQos SocketDriver::publish(const MessageUnit& msgUnit) 00611 { 00612 ::ExceptionStruct socketException; 00613 Lock lock(mutex_); 00614 if (connection_ == 0) { 00615 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server"); 00616 } 00617 try { 00618 if (log_.call()) log_.call(ME, "publish"); 00619 ::MsgUnit msgUnitC; 00620 const string key = msgUnit.getKey().toXml(); 00621 msgUnitC.key = key.c_str(); 00622 msgUnitC.content = reinterpret_cast<const char *>(msgUnit.getContent()); 00623 msgUnitC.contentLen = msgUnit.getContentLen(); 00624 const string qos = msgUnit.getQos().toXml(); 00625 msgUnitC.qos = qos.c_str(); 00626 00627 char* response = connection_->publish(connection_, &msgUnitC, &socketException); 00628 00629 if (*socketException.errorCode != 0) { 00630 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO 00631 } 00632 00633 //freeMsgUnitData(&msgUnitC); -> not needed as it contains pointers only 00634 if (log_.trace()) log_.trace(ME, "successfully published"); 00635 PublishReturnQos publishReturnQos(global_, statusQosFactory_.readObject(response)); 00636 xmlBlasterFree(response); 00637 return publishReturnQos; 00638 } catch_MACRO("::publish", false) 00639 } 00640 00641 void SocketDriver::publishOneway(const vector<MessageUnit> &msgUnitArr) 00642 { 00643 ::ExceptionStruct socketException; 00644 Lock lock(mutex_); 00645 if (connection_ == 0) { 00646 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server"); 00647 } 00648 try { 00649 00650 // Copy C++ MessageUnit to C MsgUnit 00651 ::MsgUnitArr msgUnitArrC; 00652 vector<MessageUnit>::const_iterator iter; 00653 memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr)); 00654 msgUnitArrC.len = msgUnitArr.size(); 00655 msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit)); 00656 size_t ii=0; 00657 vector<string> keyArr; // We need to hold key/qos on the stack because toXml() returns a temporary string 00658 vector<string> qosArr; 00659 for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) { 00660 //log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len)); 00661 const MessageUnit& msgUnitCpp = *iter; 00662 ::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii]; 00663 keyArr.push_back(msgUnitCpp.getKey().toXml()); 00664 msgUnitC.key = keyArr[ii].c_str(); 00665 qosArr.push_back(msgUnitCpp.getQos().toXml()); 00666 msgUnitC.qos = qosArr[ii].c_str(); 00667 msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen(); 00668 msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent()); 00669 ii++; 00670 } 00671 00672 connection_->publishOneway(connection_, &msgUnitArrC, &socketException); 00673 00674 ::free(msgUnitArrC.msgUnitArr); 00675 00676 if (*socketException.errorCode != 0) { 00677 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO 00678 } 00679 } catch_MACRO("::publishOneway", false) 00680 } 00681 00682 vector<PublishReturnQos> SocketDriver::publishArr(const vector<MessageUnit> &msgUnitArr) 00683 { 00684 ::ExceptionStruct socketException; 00685 Lock lock(mutex_); 00686 if (connection_ == 0) { 00687 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server"); 00688 } 00689 try { 00690 00691 // Copy C++ MessageUnit to C MsgUnit 00692 ::MsgUnitArr msgUnitArrC; 00693 vector<MessageUnit>::const_iterator iter; 00694 memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr)); 00695 msgUnitArrC.len = msgUnitArr.size(); 00696 msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit)); 00697 size_t ii=0; 00698 vector<string> keyArr; // We need to hold key/qos on the stack because toXml() returns a temporary string 00699 vector<string> qosArr; 00700 for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) { 00701 //log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len)); 00702 const MessageUnit& msgUnitCpp = *iter; 00703 ::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii]; 00704 keyArr.push_back(msgUnitCpp.getKey().toXml()); 00705 msgUnitC.key = keyArr[ii].c_str(); 00706 qosArr.push_back(msgUnitCpp.getQos().toXml()); 00707 msgUnitC.qos = qosArr[ii].c_str(); 00708 msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen(); 00709 msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent()); 00710 ii++; 00711 } 00712 00713 QosArr* retC = connection_->publishArr(connection_, &msgUnitArrC, &socketException); 00714 00715 ::free(msgUnitArrC.msgUnitArr); 00716 00717 if (*socketException.errorCode != 0) { 00718 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO 00719 } 00720 vector<PublishReturnQos> ret; 00721 for (size_t jj=0; jj<retC->len; jj++) { 00722 ret.insert(ret.end(), PublishReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[jj])) ); 00723 } 00724 freeQosArr(retC); 00725 return ret; 00726 } catch_MACRO("::publishArr", false) 00727 return vector<PublishReturnQos>(); 00728 } 00729 00730 vector<EraseReturnQos> SocketDriver::erase(const EraseKey& key, const EraseQos& qos) 00731 { 00732 ::ExceptionStruct socketException; 00733 Lock lock(mutex_); 00734 if (connection_ == 0) { 00735 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server"); 00736 } 00737 try { 00738 QosArr* retC = connection_->erase(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException); 00739 if (*socketException.errorCode != 0) { 00740 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO 00741 } 00742 vector<EraseReturnQos> ret; 00743 for (size_t ii=0; ii<retC->len; ii++) { 00744 ret.insert(ret.end(), EraseReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])) ); 00745 } 00746 freeQosArr(retC); 00747 return ret; 00748 } catch_MACRO("::erase", false) 00749 return vector<EraseReturnQos>(); 00750 } 00751 00752 I_ProgressListener* SocketDriver::registerProgressListener(I_ProgressListener *listener) { 00753 I_ProgressListener *old = this->progressListener_; 00754 this->progressListener_ = listener; 00755 if (connection_ && connection_->callbackP != 0) { 00756 connection_->callbackP->readFromSocket.numReadUserP = this; 00757 if (this->progressListener_ && connection_->callbackP != 0) { 00758 connection_->callbackP->readFromSocket.numReadFuncP = callbackProgressListener; 00759 } 00760 else { 00761 connection_->callbackP->readFromSocket.numReadFuncP = 0; // Dangerous: not thread safe, TODO: Add a mutex 00762 } 00763 } 00764 return old; 00765 } 00766 00767 string SocketDriver::usage() 00768 { 00769 char usage[XMLBLASTER_MAX_USAGE_LEN]; 00770 ::xmlBlasterAccessUnparsedUsage(usage); 00771 return "\nThe SOCKET plugin configuration:" + 00772 string(usage); 00773 } 00774 00775 // Exception conversion .... 00776 org::xmlBlaster::util::XmlBlasterException SocketDriver::convertFromSocketException(const ::ExceptionStruct& ex) const 00777 { 00778 return org::xmlBlaster::util::XmlBlasterException( 00779 (*ex.errorCode=='\0')?string("internal.unknown"):string(ex.errorCode), 00780 string(""), 00781 ME, 00782 "en", 00783 string(ex.message), 00784 global_.getVersion() + " " + global_.getBuildTimestamp()); 00785 // TODO: isServerSide!!! 00786 } 00787 00788 00789 ::ExceptionStruct SocketDriver::convertToSocketException(org::xmlBlaster::util::XmlBlasterException& ex) 00790 { 00791 ::ExceptionStruct exSocket; 00792 ::initializeXmlBlasterException(&exSocket); 00793 strncpy0(exSocket.errorCode, ex.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00794 strncpy0(exSocket.message, ex.getMessage().c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN); 00795 //exSocket.remote = ?? 00796 return exSocket; 00797 } 00798 00799 }}}}} // namespaces 00800