1 /*------------------------------------------------------------------------------
2 Name: SocketDriver.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: The client driver for the socket protocol
6 ------------------------------------------------------------------------------*/
7 #include <client/protocol/socket/SocketDriver.h>
8 #include <util/ErrorCode.h>
9 #include <util/XmlBlasterException.h>
10 #include <util/Global.h>
11 #include <util/lexical_cast.h>
12 #include <XmlBlasterAccessUnparsed.h> // The C SOCKET client library
13 #include <util/qos/ConnectQosFactory.h>
14 #include <util/Properties.h>
15 #include <string>
16 #include <cstdarg> // va_start
17 #include <cstdio> // vsnprintf for g++ 2.9x only
18 #include <cstring> // memset()
19
20 static void myLogger(void *logUserP,
21 XMLBLASTER_LOG_LEVEL currLevel,
22 XMLBLASTER_LOG_LEVEL level,
23 const char *location, const char *fmt, ...);
24
25 //static ::XmlBlasterNumReadFunc callbackProgressListener; // what's wrong with this?
26 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes);
27
28 /**
29 * Customized logging output is handled by this method.
30 * <p>
31 * We register this function with
32 * </p>
33 * <pre>
34 * xa->log = myLogger;
35 * </pre>
36 * @param currLevel The actual log level of the client
37 * @param level The level of this log entry
38 * @param location A string describing the code place
39 * @param fmt The formatting string
40 * @param ... Other variables to log, corresponds to 'fmt'
41 * @see xmlBlaster/src/c/msgUtil.c: xmlBlasterDefaultLogging() is the default
42 * implementation
43 */
44 static void myLogger(void *logUserP,
45 XMLBLASTER_LOG_LEVEL currLevel,
46 XMLBLASTER_LOG_LEVEL level,
47 const char *location, const char *fmt, ...)
48 {
49 /* Guess we need no more than 200 bytes. */
50 int n, size = 200;
51 char *p = 0;
52 va_list ap;
53 org::xmlBlaster::client::protocol::socket::SocketDriver *sd =
54 (org::xmlBlaster::client::protocol::socket::SocketDriver *)logUserP;
55 org::xmlBlaster::util::I_Log& log = sd->getLog();
56
57 if (level > currLevel) { /* XMLBLASTER_LOG_ERROR, XMLBLASTER_LOG_WARN, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_TRACE */
58 return;
59 }
60 if ((p = (char *)malloc (size)) == NULL)
61 return;
62
63 for (;;) {
64 /* Try to print in the allocated space. */
65 va_start(ap, fmt);
66 n = VSNPRINTF(p, size, fmt, ap); /* UNIX: vsnprintf(), WINDOWS: _vsnprintf() */
67 va_end(ap);
68 /* If that worked, print the string to console. */
69 if (n > -1 && n < size) {
70 if (level == XMLBLASTER_LOG_INFO)
71 log.info(location, p);
72 else if (level == XMLBLASTER_LOG_WARN)
73 log.warn(location, p);
74 else if (level == XMLBLASTER_LOG_ERROR)
75 log.error(location, p);
76 else
77 log.trace(location, p);
78 free(p);
79 return;
80 }
81 /* Else try again with more space. */
82 if (n > -1) /* glibc 2.1 */
83 size = n+1; /* precisely what is needed */
84 else /* glibc 2.0 */
85 size *= 2; /* twice the old size */
86 if ((p = (char *)realloc (p, size)) == NULL) {
87 return;
88 }
89 }
90 }
91
92 /**
93 * Access the read socket progress.
94 * You need to register this function pointer if you want to see the progress of huge messages
95 * on slow connections.
96 */
97 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes) {
98 org::xmlBlaster::client::protocol::socket::SocketDriver *sd =
99 (org::xmlBlaster::client::protocol::socket::SocketDriver *)userP;
100 //org::xmlBlaster::util::I_Log& log = sd->getLog();
101 //if (log.trace()) log.trace("SocketDriver", "Update data progress currBytesRead=" +
102 // org::xmlBlaster::util::lexical_cast<std::string>(currBytesRead) +
103 // " nbytes=" + org::xmlBlaster::util::lexical_cast<std::string>(nbytes));
104 if (sd != 0 && sd->progressListener_ != 0) {
105 sd->progressListener_->progress("", currBytesRead, nbytes);
106 }
107 }
108
109 namespace org {
110 namespace xmlBlaster {
111 namespace client {
112 namespace protocol {
113 namespace socket {
114
115 using namespace std;
116 using namespace org::xmlBlaster::util;
117 using namespace org::xmlBlaster::util::qos;
118 using namespace org::xmlBlaster::util::key;
119 using namespace org::xmlBlaster::util::thread;
120 using namespace org::xmlBlaster::client::protocol;
121 using namespace org::xmlBlaster::client::key;
122 using namespace org::xmlBlaster::client::qos;
123
124 static XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData,
125 ::ExceptionStruct *exception);
126
127 void SocketDriver::freeResources(bool deleteConnection)
128 {
129 if (log_.call()) log_.call(ME, "freeResources("+lexical_cast<std::string>(deleteConnection)+") connection_=" + ((connection_==0)?"0":lexical_cast<std::string>(connection_)));
130 if (deleteConnection && connection_ != 0) {
131 freeXmlBlasterAccessUnparsed(connection_);
132 connection_ = 0;
133 }
134 if (deleteConnection && argsStructP_ != 0) {
135 global_.freeArgs(*argsStructP_);
136 delete argsStructP_;
137 argsStructP_ = 0;
138 }
139 }
140
141 /*
142 Note on exception handling:
143 If we throw an exception, our master ConnectionsHandler.cpp will
144 catch it and to a shutdown() on us. This will cleanup the resources.
145 */
146 #define catch_MACRO(methodName, deleteConnection) \
147 catch(const XmlBlasterException *ex) { \
148 freeResources(deleteConnection); \
149 throw ex; \
150 } \
151 catch(XmlBlasterException &ex) { \
152 freeResources(deleteConnection); \
153 ex.setLocation(ME + string(methodName)); \
154 throw ex; \
155 } \
156 catch(const ::ExceptionStruct *ex) { \
157 freeResources(deleteConnection); \
158 org::xmlBlaster::util::XmlBlasterException xx = convertFromSocketException(*ex); \
159 delete ex; \
160 throw xx; \
161 } \
162 catch(const ::ExceptionStruct &ex) { \
163 freeResources(deleteConnection); \
164 throw convertFromSocketException(ex); \
165 } \
166 catch(const exception &ex) { \
167 freeResources(deleteConnection); \
168 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
169 loginName_, ME + string(methodName), "en", \
170 global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
171 string("type='exception', msg='") \
172 + ex.what() + "'"); \
173 } \
174 catch(const string &ex) { \
175 freeResources(deleteConnection); \
176 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
177 loginName_, ME + string(methodName), "en", \
178 global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
179 string("type='string', msg='") + ex + "'"); \
180 } \
181 catch(const char *ex) { \
182 freeResources(deleteConnection); \
183 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
184 loginName_, ME + string(methodName), "en", \
185 global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
186 string("type='char*', msg='") + ex + "'"); \
187 } \
188 catch(int ex) { \
189 freeResources(deleteConnection); \
190 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
191 loginName_, ME + string(methodName), "en", \
192 global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
193 string("type='int', msg='") + lexical_cast<std::string>(ex) + "'"); \
194 } \
195 catch (...) { \
196 freeResources(deleteConnection); \
197 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
198 loginName_, ME + string(methodName), "en", \
199 global_.getVersion() + " " + global_.getBuildTimestamp());\
200 }
201
202 SocketDriver::SocketDriver(const SocketDriver& socketDriver)
203 : mutex_(socketDriver.mutex_),
204 ME("SocketDriver"),
205 argsStructP_(0),
206 global_(socketDriver.global_),
207 log_(socketDriver.log_),
208 statusQosFactory_(socketDriver.global_),
209 msgKeyFactory_(socketDriver.global_),
210 msgQosFactory_(socketDriver.global_),
211 callbackClient_(0),
212 progressListener_(0)
213 {
214 // no instantiation of these since this should never be invoked (just to make it private)
215 connection_ = NULL;
216 argsStructP_ = new ArgsStruct_T;
217 //memset(argsStructP_, '\0', sizeof(ArgsStruct_T));
218 global_.fillArgs(*argsStructP_);
219 if (log_.call()) log_.call(ME, string("Copy constructor"));
220 }
221
222 SocketDriver& SocketDriver::operator =(const SocketDriver& /*socketDriver*/)
223 {
224 if (log_.call()) log_.call(ME, "operator=()");
225 return *this;
226 }
227
228
229 SocketDriver::SocketDriver(Global& global, const string instanceName)
230 : mutex_(),
231 instanceName_(instanceName),
232 connection_(NULL),
233 ME(string("SocketDriver-") + instanceName),
234 argsStructP_(0),
235 global_(global),
236 log_(global.getLog("org.xmlBlaster.client.protocol.socket")),
237 statusQosFactory_(global),
238 msgKeyFactory_(global),
239 msgQosFactory_(global),
240 callbackClient_(0),
241 progressListener_(0)
242 {
243 if (log_.call()) log_.call("SocketDriver", string("getInstance for ") + instanceName);
244
245 argsStructP_ = new ArgsStruct_T;
246 if (!global_.getProperty().propertyExists("logLevel")) {
247 if (log_.trace() || log_.call())
248 global_.getProperty().setProperty("logLevel", "TRACE");
249 else if (log_.dump())
250 global_.getProperty().setProperty("logLevel", "DUMP");
251 }
252
253 global_.fillArgs(*argsStructP_);
254 try {
255 connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv);
256 if (connection_) {
257 connection_->userObject = this; // Transports us to the myUpdate() method
258 connection_->log = myLogger; // Register our own logging function
259 connection_->logUserP = this; // Pass ourself to myLogger()
260 if (log_.dump()) {
261 log_.dump(ME, "C properties:");
262 ::dumpProperties(connection_->props);
263 }
264 }
265 else {
266 log_.error(ME, "Allocation of C SOCKET library failed");
267 }
268 } catch_MACRO("::Constructor", true)
269 }
270
271 /**
272 * Called on polling, must be synchronized from outside,
273 * throws an exception on failure
274 */
275 void SocketDriver::reconnectOnIpLevel(void)
276 {
277 log_.trace(ME, "Trying to reconnect to server");
278
279 freeResources(true); // Cleanup if old connection exists
280
281 // Give a chance to new configuration settings
282 if (argsStructP_ != 0) {
283 global_.freeArgs(*argsStructP_);
284 delete argsStructP_;
285 argsStructP_ = 0;
286 }
287 argsStructP_ = new ArgsStruct_T;
288 global_.fillArgs(*argsStructP_);
289
290 ::ExceptionStruct socketException;
291
292 try {
293 connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv);
294 connection_->userObject = this; // Transports us to the myUpdate() method
295 connection_->log = myLogger; // Register our own logging function
296 connection_->logUserP = this; // Pass SocketDriver to myLogger()
297 } catch_MACRO("::Constructor", true)
298
299 try {
300 if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
301 if (connection_->initialize(connection_, myUpdate, &socketException) == false) {
302 if (log_.trace()) log_.trace(ME, string("Reconnection to xmlBlaster failed, please start the server or check your network: ") + socketException.message);
303 throw socketException;
304 }
305 registerProgressListener(this->progressListener_); // Re-register
306 if (log_.trace()) log_.trace(ME, "After createCallbackServer");
307 } catch_MACRO("::initialize", true)
308 }
309
310 SocketDriver::~SocketDriver()
311 {
312 if (log_.call()) log_.call(ME, "~SocketDriver()");
313 try {
314 freeResources(true);
315 }
316 catch (...) {
317 log_.error(ME, "Unexpected catch in ~SocketDriver()");
318 }
319 }
320
321 XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData,
322 ::ExceptionStruct *exception)
323 {
324 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
325 SocketDriver* socketDriver = static_cast<SocketDriver*>(xa->userObject);
326 Global& global = socketDriver->getGlobal();
327 I_Log& log = socketDriver->getLog();
328 const string &ME = socketDriver->me();
329
330 try {
331 for (size_t i=0; i<msgUnitArr->len; i++) {
332 //char *xml = messageUnitToXml(&msgUnitArr->msgUnitArr[i]);
333 //printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n",xml);
334 //xmlBlasterFree(xml);
335 if (log.trace()) log.trace(ME, "Received callback message");
336 ::MsgUnit& msgUnit = msgUnitArr->msgUnitArr[i];
337 I_Callback* cb = socketDriver->getCallbackClient();
338 if (cb != 0) {
339 UpdateKey updateKey(global, socketDriver->getMsgKeyFactory().readObject(string(msgUnit.key)));
340 UpdateQos updateQos(global, socketDriver->getMsgQosFactory().readObject(string(msgUnit.qos)));
341 std::string retQos = cb->update(msgUnitArr->secretSessionId,
342 updateKey, (const unsigned char*)msgUnit.content,
343 msgUnit.contentLen, updateQos);
344 msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(retQos.c_str());
345 }
346 else { /* Return QoS: Everything is OK */
347 log.error(ME, string("Ignoring unexpected update message as client has not registered a callback: ") + msgUnit.key);
348 msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(Constants::RET_OK); // "<qos><state id='OK'/></qos>");
349 }
350 }
351 //throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "TEST THROWING EXCEPTION");
352 }
353 catch (XmlBlasterException &e) {
354 string tmp = "Exception caught in C++ update(), " +
355 lexical_cast<std::string>(msgUnitArr->len) +
356 " messages are handled as not delivered: " +
357 e.getMessage();
358 log.error(ME, tmp);
359 for (size_t i=0; i<msgUnitArr->len; i++) {
360 char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
361 log.error(ME, xml);
362 xmlBlasterFree(xml);
363 }
364 strncpy0(exception->errorCode, e.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN);
365 strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
366 return (XMLBLASTER_C_bool)0;
367 }
368 catch(...) {
369 string tmp = "Unidentified exception caught in C++ update(), " + lexical_cast<std::string>(msgUnitArr->len) + " messages are handled as not delivered";
370 log.error(ME, tmp);
371 for (size_t i=0; i<msgUnitArr->len; i++) {
372 char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
373 log.error(ME, xml);
374 xmlBlasterFree(xml);
375 }
376 strncpy0(exception->errorCode, "user.update.error", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
377 strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
378 return (XMLBLASTER_C_bool)0;
379 }
380 return (XMLBLASTER_C_bool)1;
381 }
382
383 I_Callback* SocketDriver::getCallbackClient()
384 {
385 return callbackClient_;
386 }
387
388 /** Enforced by I_CallbackServer */
389 void SocketDriver::initialize(const string& name, I_Callback &client)
390 {
391 ::ExceptionStruct socketException;
392 ME = string("SocketDriver-") + instanceName_ + "-" + name;
393 if (log_.call()) log_.call(ME, "initialize() callback server");
394 callbackClient_ = &client;
395 Lock lock(mutex_);
396 if (connection_ == 0) {
397 if (log_.trace()) log_.trace(ME, "ERROR: connection_ is null");
398 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, name, ME + ".initialize", "en",
399 global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL");
400 }
401 try {
402 if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
403 if (connection_->initialize(connection_, myUpdate, &socketException) == false) {
404 log_.warn(ME, "Connection to xmlBlaster failed,"
405 " please start the server or check your configuration\n");
406 freeResources(true);
407 }
408 if (log_.trace()) log_.trace(ME, "After createCallbackServer");
409 } catch_MACRO("::initialize", true)
410 }
411
412 string SocketDriver::getCbProtocol()
413 {
414 return Constants::SOCKET; // "SOCKET";
415 }
416
417 string SocketDriver::getCbAddress()
418 {
419 Lock lock(mutex_);
420 if (connection_ == 0 || connection_->callbackP == 0) {
421 return string("socket://:");
422 }
423 try {
424 return string("socket://") + string(connection_->callbackP->hostCB) + ":" +
425 lexical_cast<std::string>(connection_->callbackP->portCB);
426 } catch_MACRO("::getCbAddress", false)
427 }
428
429 bool SocketDriver::shutdownCb()
430 {
431 Lock lock(mutex_);
432 if (connection_ == 0 || connection_->callbackP == 0) return false;
433 connection_->callbackP->shutdown(connection_->callbackP);
434 return true;
435 }
436
437 ConnectReturnQosRef SocketDriver::connect(const ConnectQosRef& qos) //throw (XmlBlasterException) // Visual C++ emits a warning with this throw clause
438 {
439 if (log_.call()) log_.call(ME, string("connect() ") + string((connection_==0)?"connection_==0":"connection_!=0") +
440 ", secretSessionId_="+secretSessionId_);
441 //+" isConnected=" + ((connection_==0)?XMLBLASTER_FALSE:lexical_cast<string>(connection_->isConnected(connection_))));
442 ::ExceptionStruct socketException;
443 Lock lock(mutex_);
444 try {
445 loginName_ = qos->getUserId();
446 if ( connection_ != 0 && !connection_->isConnected(connection_)) {
447 freeResources(true);
448 }
449 if (connection_ == 0) {
450 reconnectOnIpLevel(); // Connects on IP level only, throws an exception on failure
451 if (secretSessionId_ != "") {
452 qos->getSessionQos().setSecretSessionId(secretSessionId_);
453 }
454 if (connection_ != 0 && connection_->callbackP != 0) {
455 ConnectQos *qq = const_cast<ConnectQos*>(&(*qos));
456 if (qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->getType() == Constants::SOCKET) {
457 // Force callback address, it could have changed on reconnect (checked to cb not be a delegate)
458 string addr = string("socket://") + string(connection_->callbackP->hostCB) + ":" +
459 lexical_cast<std::string>(connection_->callbackP->portCB);
460 qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->setAddress(addr);
461 log_.trace(ME, "Setting callback address to " + addr);
462 }
463 }
464 }
465
466 char *retQos = connection_->connect(connection_, qos->toXml().c_str(),
467 myUpdate, &socketException);
468 if (*socketException.errorCode != 0) {
469 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
470 }
471 ConnectQosFactory factory(global_);
472 ConnectReturnQosRef connectReturnQos = factory.readObject(retQos);
473 xmlBlasterFree(retQos);
474 secretSessionId_ = connectReturnQos->getSecretSessionId();
475 return connectReturnQos;
476 } catch_MACRO("::connect", false)
477 }
478
479 bool SocketDriver::disconnect(const DisconnectQos& qos)
480 {
481 if (log_.call()) log_.call(ME, "disconnect()");
482 if (connection_ == 0) return false;
483 ::ExceptionStruct socketException;
484 Lock lock(mutex_);
485 try {
486 bool ret = connection_->disconnect(connection_, qos.toXml().c_str(), &socketException);
487 if (*socketException.errorCode != 0) {
488 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
489 }
490 return ret;
491 } catch_MACRO("::disconnect", false)
492 return true;
493 }
494
495 string SocketDriver::getProtocol()
496 {
497 return Constants::SOCKET; // "SOCKET";
498 }
499
500 /** Called when going to POLLING mode */
501 bool SocketDriver::shutdown()
502 {
503 if (log_.call()) log_.call(ME, "shutdown()");
504 Lock lock(mutex_);
505 if (connection_ == 0) return false;
506 freeResources(true);
507 return true;
508 }
509
510 string SocketDriver::getLoginName()
511 {
512 return loginName_;
513 }
514
515 bool SocketDriver::isLoggedIn()
516 {
517 Lock lock(mutex_);
518 return connection_ != 0 && connection_->isConnected(connection_);
519 }
520
521 string SocketDriver::ping(const string& qos)
522 {
523 ::ExceptionStruct socketException;
524 Lock lock(mutex_);
525 if (connection_ == 0) {
526 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, "", ME + ".ping", "en",
527 global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL");
528 }
529 try {
530 char *retQosP = connection_->ping(connection_, qos.c_str(), &socketException);
531 if (retQosP == 0 || *socketException.errorCode != 0) {
532 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
533 }
534 string retQos(retQosP);
535 xmlBlasterFree(retQosP);
536 return retQos;
537 } catch_MACRO("::ping", false)
538 }
539
540 SubscribeReturnQos SocketDriver::subscribe(const SubscribeKey& key, const SubscribeQos& qos)
541 {
542 ::ExceptionStruct socketException;
543 Lock lock(mutex_);
544 if (connection_ == 0) {
545 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
546 }
547 try {
548 char *response = connection_->subscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
549 if (*socketException.errorCode != 0) {
550 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
551 }
552 SubscribeReturnQos subscribeReturnQos(global_, statusQosFactory_.readObject(response));
553 xmlBlasterFree(response);
554 return subscribeReturnQos;
555 } catch_MACRO("::subscribe", false)
556 }
557
558 vector<MessageUnit> SocketDriver::get(const GetKey& getKey, const GetQos& getQos)
559 {
560 ::ExceptionStruct socketException;
561 Lock lock(mutex_);
562 if (connection_ == 0) {
563 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
564 }
565 try {
566 MsgUnitArr *msgUnitArr; // The returned C struct array
567 string key = getKey.toXml();
568 string qos = getQos.toXml();
569 msgUnitArr = connection_->get(connection_, key.c_str(), qos.c_str(), &socketException);
570 if (*socketException.errorCode != 0) {
571 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
572 }
573 if (msgUnitArr != (MsgUnitArr *)0) {
574 vector<MessageUnit> ret;
575 for (size_t i=0; i<msgUnitArr->len; i++) {
576 MsgKeyData msgKeyData = msgKeyFactory_.readObject(string(msgUnitArr->msgUnitArr[i].key));
577 MsgQosData msgQosData = msgQosFactory_.readObject(string(msgUnitArr->msgUnitArr[i].qos));
578 MessageUnit messageUnit(msgKeyData,
579 msgUnitArr->msgUnitArr[i].contentLen,
580 (const unsigned char*)msgUnitArr->msgUnitArr[i].content,
581 msgQosData);
582 ret.insert(ret.end(), messageUnit);
583 }
584 freeMsgUnitArr(msgUnitArr);
585 return ret;
586 }
587 } catch_MACRO("::get", false)
588 return vector<MessageUnit>();
589 }
590
591 vector<UnSubscribeReturnQos>
592 SocketDriver::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
593 {
594 ::ExceptionStruct socketException;
595 Lock lock(mutex_);
596 if (connection_ == 0) {
597 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
598 }
599 try {
600 QosArr* retC = connection_->unSubscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
601 if (*socketException.errorCode != 0) {
602 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
603 }
604 vector<UnSubscribeReturnQos> ret;
605 for (size_t ii=0; ii<retC->len; ii++) {
606 ret.insert(ret.end(), UnSubscribeReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])));
607 }
608 freeQosArr(retC);
609 return ret;
610 } catch_MACRO("::unSubscribe", false)
611 return vector<UnSubscribeReturnQos>();
612 }
613
614 PublishReturnQos SocketDriver::publish(const MessageUnit& msgUnit)
615 {
616 ::ExceptionStruct socketException;
617 Lock lock(mutex_);
618 if (connection_ == 0) {
619 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
620 }
621 try {
622 if (log_.call()) log_.call(ME, "publish");
623 ::MsgUnit msgUnitC;
624 const string key = msgUnit.getKey().toXml();
625 msgUnitC.key = key.c_str();
626 msgUnitC.content = reinterpret_cast<const char *>(msgUnit.getContent());
627 msgUnitC.contentLen = msgUnit.getContentLen();
628 const string qos = msgUnit.getQos().toXml();
629 msgUnitC.qos = qos.c_str();
630
631 char* response = connection_->publish(connection_, &msgUnitC, &socketException);
632
633 if (*socketException.errorCode != 0) {
634 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
635 }
636
637 //freeMsgUnitData(&msgUnitC); -> not needed as it contains pointers only
638 if (log_.trace()) log_.trace(ME, "successfully published");
639 PublishReturnQos publishReturnQos(global_, statusQosFactory_.readObject(response));
640 xmlBlasterFree(response);
641 return publishReturnQos;
642 } catch_MACRO("::publish", false)
643 }
644
645 void SocketDriver::publishOneway(const vector<MessageUnit> &msgUnitArr)
646 {
647 ::ExceptionStruct socketException;
648 Lock lock(mutex_);
649 if (connection_ == 0) {
650 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
651 }
652 try {
653
654 // Copy C++ MessageUnit to C MsgUnit
655 ::MsgUnitArr msgUnitArrC;
656 vector<MessageUnit>::const_iterator iter;
657 memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr));
658 msgUnitArrC.len = msgUnitArr.size();
659 msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit));
660 size_t ii=0;
661 vector<string> keyArr; // We need to hold key/qos on the stack because toXml() returns a temporary string
662 vector<string> qosArr;
663 for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) {
664 //log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len));
665 const MessageUnit& msgUnitCpp = *iter;
666 ::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii];
667 keyArr.push_back(msgUnitCpp.getKey().toXml());
668 msgUnitC.key = keyArr[ii].c_str();
669 qosArr.push_back(msgUnitCpp.getQos().toXml());
670 msgUnitC.qos = qosArr[ii].c_str();
671 msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen();
672 msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent());
673 ii++;
674 }
675
676 connection_->publishOneway(connection_, &msgUnitArrC, &socketException);
677
678 ::free(msgUnitArrC.msgUnitArr);
679
680 if (*socketException.errorCode != 0) {
681 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
682 }
683 } catch_MACRO("::publishOneway", false)
684 }
685
686 vector<PublishReturnQos> SocketDriver::publishArr(const vector<MessageUnit> &msgUnitArr)
687 {
688 ::ExceptionStruct socketException;
689 Lock lock(mutex_);
690 if (connection_ == 0) {
691 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
692 }
693 try {
694
695 // Copy C++ MessageUnit to C MsgUnit
696 ::MsgUnitArr msgUnitArrC;
697 vector<MessageUnit>::const_iterator iter;
698 memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr));
699 msgUnitArrC.len = msgUnitArr.size();
700 msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit));
701 size_t ii=0;
702 vector<string> keyArr; // We need to hold key/qos on the stack because toXml() returns a temporary string
703 vector<string> qosArr;
704 for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) {
705 //log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len));
706 const MessageUnit& msgUnitCpp = *iter;
707 ::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii];
708 keyArr.push_back(msgUnitCpp.getKey().toXml());
709 msgUnitC.key = keyArr[ii].c_str();
710 qosArr.push_back(msgUnitCpp.getQos().toXml());
711 msgUnitC.qos = qosArr[ii].c_str();
712 msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen();
713 msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent());
714 ii++;
715 }
716
717 QosArr* retC = connection_->publishArr(connection_, &msgUnitArrC, &socketException);
718
719 ::free(msgUnitArrC.msgUnitArr);
720
721 if (*socketException.errorCode != 0) {
722 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
723 }
724 vector<PublishReturnQos> ret;
725 for (size_t jj=0; jj<retC->len; jj++) {
726 ret.insert(ret.end(), PublishReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[jj])) );
727 }
728 freeQosArr(retC);
729 return ret;
730 } catch_MACRO("::publishArr", false)
731 return vector<PublishReturnQos>();
732 }
733
734 vector<EraseReturnQos> SocketDriver::erase(const EraseKey& key, const EraseQos& qos)
735 {
736 ::ExceptionStruct socketException;
737 Lock lock(mutex_);
738 if (connection_ == 0) {
739 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
740 }
741 try {
742 QosArr* retC = connection_->erase(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
743 if (*socketException.errorCode != 0) {
744 throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
745 }
746 vector<EraseReturnQos> ret;
747 for (size_t ii=0; ii<retC->len; ii++) {
748 ret.insert(ret.end(), EraseReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])) );
749 }
750 freeQosArr(retC);
751 return ret;
752 } catch_MACRO("::erase", false)
753 return vector<EraseReturnQos>();
754 }
755
756 I_ProgressListener* SocketDriver::registerProgressListener(I_ProgressListener *listener) {
757 I_ProgressListener *old = this->progressListener_;
758 this->progressListener_ = listener;
759 if (connection_ && connection_->callbackP != 0) {
760 connection_->callbackP->readFromSocket.numReadUserP = this;
761 if (this->progressListener_ && connection_->callbackP != 0) {
762 connection_->callbackP->readFromSocket.numReadFuncP = callbackProgressListener;
763 }
764 else {
765 connection_->callbackP->readFromSocket.numReadFuncP = 0; // Dangerous: not thread safe, TODO: Add a mutex
766 }
767 }
768 return old;
769 }
770
771 string SocketDriver::usage()
772 {
773 char usage[XMLBLASTER_MAX_USAGE_LEN];
774 ::xmlBlasterAccessUnparsedUsage(usage);
775 return "\nThe SOCKET plugin configuration:" +
776 string(usage);
777 }
778
779 // Exception conversion ....
780 org::xmlBlaster::util::XmlBlasterException SocketDriver::convertFromSocketException(const ::ExceptionStruct& ex) const
781 {
782 return org::xmlBlaster::util::XmlBlasterException(
783 (*ex.errorCode=='\0')?string("internal.unknown"):string(ex.errorCode),
784 string(""),
785 ME,
786 "en",
787 string(ex.message),
788 global_.getVersion() + " " + global_.getBuildTimestamp());
789 // TODO: isServerSide!!!
790 }
791
792
793 ::ExceptionStruct SocketDriver::convertToSocketException(org::xmlBlaster::util::XmlBlasterException& ex)
794 {
795 ::ExceptionStruct exSocket;
796 ::initializeXmlBlasterException(&exSocket);
797 strncpy0(exSocket.errorCode, ex.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN);
798 strncpy0(exSocket.message, ex.getMessage().c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
799 //exSocket.remote = ??
800 return exSocket;
801 }
802
803 }}}}} // namespaces
804
syntax highlighted by Code2HTML, v. 0.9.1