00001 /*---------------------------------------------------------------------------- 00002 Name: XmlBlasterAccessUnparsed.c 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Wraps raw socket connection to xmlBlaster 00006 Implements sync connection and async callback 00007 Needs pthread to compile (multi threading). 00008 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info> 00009 Compile: 00010 LINUX: gcc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -export-dynamic -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread 00011 g++ -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread 00012 icc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread 00013 WIN: cl /MT /W4 -DXmlBlasterAccessUnparsedMain -D_WINDOWS -I.. -I../pthreads /FeXmlBlasterAccessUnparsedMain.exe XmlBlasterAccessUnparsed.c ..\util\msgUtil.c ..\util\Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c ws2_32.lib pthreadVC2.lib 00014 (download pthread for Windows and WinCE from http://sources.redhat.com/pthreads-win32) 00015 Solaris: cc -DXmlBlasterAccessUnparsedMain -v -Xc -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl 00016 CC -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl 00017 00018 Linux with libxmlBlasterC.so: 00019 gcc -DXmlBlasterAccessUnparsedMain -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c -L../../../lib -lxmlBlasterClientC -I.. -Wl,-rpath=../../../lib -D_REENTRANT -lpthread 00020 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html 00021 -----------------------------------------------------------------------------*/ 00022 #include <stdio.h> 00023 #include <stdlib.h> 00024 #include <string.h> 00025 #if defined(WINCE) 00026 # if defined(XB_USE_PTHREADS) 00027 # include <pthreads/pthread.h> 00028 # else 00029 /*#include <pthreads/need_errno.h> */ 00030 static int errno=0; /* single threaded workaround*/ 00031 # endif 00032 #else 00033 # include <errno.h> 00034 # include <sys/types.h> 00035 #endif 00036 #include <socket/xmlBlasterSocket.h> 00037 #include <socket/xmlBlasterZlib.h> 00038 #include <XmlBlasterAccessUnparsed.h> 00039 00043 typedef struct Dll_Export UpdateContainer { 00044 XmlBlasterAccessUnparsed *xa; 00045 MsgUnitArr *msgUnitArrP; 00046 void *userData; 00047 XmlBlasterException exception; /* Holding a clone from the original as the callback thread may use it for another message */ 00048 SocketDataHolder socketDataHolder; /* Holding a clone from the original */ 00049 } UpdateContainer; 00050 00051 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp update, XmlBlasterException *exception); 00052 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos, UpdateFp update, XmlBlasterException *exception); 00053 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception); 00054 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception); 00055 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception); 00056 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception); 00057 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception); 00058 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception); 00059 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception); 00060 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception); 00061 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception); 00062 static bool isConnected(XmlBlasterAccessUnparsed *xa); 00063 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder); 00064 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception); 00065 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception); 00066 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName, bool checkIsConnected, XmlBlasterException *exception); 00067 static void interceptUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, void/*SocketDataHolder*/ *socketDataHolder); 00068 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception); 00069 static ssize_t writenPlain(void *xa, const int fd, const char *ptr, const size_t nbytes); 00070 static ssize_t writenCompressed(void *xa, const int fd, const char *ptr, const size_t nbytes); 00071 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2); 00072 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2); 00073 00074 Dll_Export XmlBlasterAccessUnparsed *getXmlBlasterAccessUnparsed(int argc, const char* const* argv) { 00075 XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)calloc(1, sizeof(XmlBlasterAccessUnparsed)); 00076 if (xa == 0) return xa; 00077 xa->argc = argc; 00078 xa->argv = argv; 00079 xa->props = createProperties(xa->argc, xa->argv); 00080 if (xa->props == 0) { 00081 freeXmlBlasterAccessUnparsed(xa); 00082 return (XmlBlasterAccessUnparsed *)0; 00083 } 00084 xa->isInitialized = false; 00085 xa->isShutdown = false; 00086 xa->connectionP = 0; 00087 xa->callbackP = 0; 00088 xa->userObject = 0; /* A client can use this pointer to point to any client specific information */ 00089 xa->userFp = 0; 00090 xa->connect = xmlBlasterConnect; 00091 xa->initialize = initialize; 00092 xa->disconnect = xmlBlasterDisconnect; 00093 xa->publish = xmlBlasterPublish; 00094 xa->publishArr = xmlBlasterPublishArr; 00095 xa->publishOneway = xmlBlasterPublishOneway; 00096 xa->subscribe = xmlBlasterSubscribe; 00097 xa->unSubscribe = xmlBlasterUnSubscribe; 00098 xa->erase = xmlBlasterErase; 00099 xa->get = xmlBlasterGet; 00100 xa->ping = xmlBlasterPing; 00101 xa->isConnected = isConnected; 00102 xa->logLevel = parseLogLevel(xa->props->getString(xa->props, "logLevel", "WARN")); 00103 xa->log = xmlBlasterDefaultLogging; 00104 xa->logUserP = 0; 00105 xa->clientsUpdateFp = 0; 00106 xa->callbackMultiThreaded = xa->props->getBool(xa->props, "plugin/socket/multiThreaded", true); 00107 xa->callbackMultiThreaded = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/multiThreaded", xa->callbackMultiThreaded); 00108 /* xa->lowLevelAutoAck = xa->props->getBool(xa->props, "plugin/socket/lowLevelAutoAck", false); */ 00109 /* xa->lowLevelAutoAck = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/lowLevelAutoAck", xa->lowLevelAutoAck); */ 00110 /* Currently forced to false: needs mutex and reference counter to not freeMsgUnitArr twice */ 00111 xa->lowLevelAutoAck = false; 00112 00113 /* We shouldn't do much logging here, as the caller had no chance to redirect it up to now */ 00114 if (xa->callbackMultiThreaded == true) { 00115 if (xa->logLevel>=XMLBLASTER_LOG_DUMP) 00116 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, "Multi threaded callback delivery is activated with -plugin/socket/multiThreaded true"); 00117 /*xa->callbackMultiThreaded = false;*/ 00118 } 00119 /* stdint.h: # define INT32_MAX (2147483647) */ 00120 xa->responseTimeout = xa->props->getLong(xa->props, "plugin/socket/responseTimeout", 2147483647L); /* Before xmlBlaster 1.1: One minute (given in millis) */ 00121 xa->responseTimeout = xa->props->getLong(xa->props, "dispatch/connection/plugin/socket/responseTimeout", xa->responseTimeout); 00122 /* ERROR HANDLING ? xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Your configuration '-plugin/socket/responseTimeout %s' is invalid", argv[iarg]); */ 00123 memset(&xa->callbackThreadId, 0, sizeof(pthread_t)); 00124 xa->threadCounter = 0; 00125 00126 if (xa->logLevel>=XMLBLASTER_LOG_DUMP) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, 00127 "Created handle: -logLevel=%s -plugin/socket/responseTimeout=%ld", 00128 getLogLevelStr(xa->logLevel), xa->responseTimeout); 00129 00130 /* See: http://www.llnl.gov/computing/tutorials/workshops/workshop/pthreads/MAIN.html */ 00131 pthread_mutex_init(&xa->writenMutex, NULL); /* returns always 0 */ 00132 pthread_mutex_init(&xa->readnMutex, NULL); 00133 return xa; 00134 } 00135 00136 Dll_Export void freeXmlBlasterAccessUnparsed(XmlBlasterAccessUnparsed *xa) 00137 { 00138 int rc; 00139 00140 if (xa == 0) { 00141 char *stack = getStackTrace(10); 00142 printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to freeXmlBlasterAccessUnparsed() %s", 00143 __FILE__, __LINE__, stack); 00144 free(stack); 00145 return; 00146 } 00147 00148 if (xa->isShutdown) return; /* Avoid simultaneous multiple calls */ 00149 xa->isShutdown = true; /* Inhibit access to xa */ 00150 00151 if (xa->callbackP != 0) { 00152 xa->callbackP->shutdown(xa->callbackP); 00153 } 00154 if (xa->connectionP != 0) { 00155 xa->connectionP->shutdown(xa->connectionP); 00156 } 00157 00158 if (xa->callbackP != 0) { 00159 /* Detach or join? On Linux both work fine. On Windows it blocks sometimes forever during join */ 00160 const bool USE_DETACH_MODE = xa->props->getBool(xa->props, "plugin/socket/detachCbThread", true); 00161 int retVal; 00162 if (!xa->callbackP->isShutdown) { 00163 00164 { /* Wait for any pending update() dispatcher threads to die */ 00165 int i; 00166 int num = 200; 00167 int interval = 10; 00168 for (i=0; i<num; i++) { 00169 if (xa->callbackP->isShutdown) 00170 break; 00171 /*pthread_yield(0);*/ 00172 sleepMillis(interval); 00173 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00174 "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for callback thread to join. %d/%d", interval, i, num); 00175 } 00176 if (i == num) { 00177 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Proper shutdown of callback thread failed, it seems to block on the socket"); 00178 } 00179 } 00180 00181 if (!USE_DETACH_MODE) { 00182 /* pthread_cancel() does not block. Who cleans up open resources? TODO: pthread_cleanup_push() */ 00183 /* On Linux all works fine without pthread_cancel() but on Windows the later pthread_join() sometimes hangs without a pthread_cancel() */ 00184 /* 00185 retVal = pthread_cancel(xa->callbackThreadId); 00186 if (retVal != 0) { 00187 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cancel problem return value is %d", retVal); 00188 } 00189 */ 00190 } 00191 } 00192 00193 if (USE_DETACH_MODE) { 00194 retVal = pthread_detach(xa->callbackThreadId); /* Frees resources (even if thread has died already), don't call multiple times on same thread! */ 00195 if (retVal != 0) { 00196 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching callback thread 0x%x failed with error number %d", __LINE__, get_pthread_id(xa->callbackThreadId), retVal); 00197 } 00198 else { 00199 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00200 "pthread_detach(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId)); 00201 } 00202 } 00203 else { /* JOIN mode */ 00204 retVal = pthread_join(xa->callbackThreadId, 0); 00205 if (retVal != 0) { 00206 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_join problem return value is %d", retVal); 00207 } 00208 else { 00209 if (xa->logLevel>=XMLBLASTER_LOG_INFO) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00210 "pthread_join(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId)); 00211 } 00212 } 00213 00214 memset(&xa->callbackThreadId, 0, sizeof(pthread_t)); 00215 } 00216 00217 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterAccessUnparsed() conP=0x%x cbP=0x%x", xa->connectionP, xa->callbackP); 00218 00219 { /* Wait for any pending update() dispatcher threads to die */ 00220 int i; 00221 int num = 1000; 00222 int interval = 10; 00223 for (i=0; i<num; i++) { 00224 if ((int)xa->threadCounter < 1) 00225 break; 00226 sleepMillis(interval); 00227 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00228 "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for update thread to join. %d/%d", interval, i, num); 00229 } 00230 if (i >= num) { 00231 if (xa->logLevel>=XMLBLASTER_LOG_ERROR) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00232 "freeXmlBlasterAccessUnparsed(): There are active callback threads in user code which didn't return after sleeping for %ld millis, we continue now to shutdown ...", (long)interval*num); 00233 } 00234 } 00235 00236 if (xa->connectionP != 0) { 00237 freeXmlBlasterConnectionUnparsed(&xa->connectionP); 00238 } 00239 00240 if (xa->callbackP != 0) { 00241 freeCallbackServerUnparsed(&xa->callbackP); 00242 } 00243 00244 freeProperties(xa->props); 00245 00246 rc = pthread_mutex_destroy(&xa->writenMutex); /* On Linux this does nothing, but returns an error code EBUSY if the mutex was locked */ 00247 if (rc != 0) /* EBUSY */ 00248 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(writenMutex) returned %d, we ignore it", rc); 00249 00250 rc = pthread_mutex_destroy(&xa->readnMutex); 00251 if (rc != 0) /* EBUSY */ 00252 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(readnMutex) returned %d, we ignore it", rc); 00253 00254 free(xa); 00255 } 00256 00257 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp clientUpdateFp, XmlBlasterException *exception) 00258 { 00259 int threadRet = 0; 00260 const char *compressType = 0; 00261 00262 if (checkArgs(xa, "initialize", false, exception) == false) return false; 00263 00264 if (xa->isInitialized) { 00265 return true; 00266 } 00267 00268 if (clientUpdateFp == 0) { 00269 xa->clientsUpdateFp = 0; 00270 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, "", 00271 "Your callback UpdateFp pointer is NULL, we use our default callback handler"); 00272 } 00273 else { 00274 xa->clientsUpdateFp = clientUpdateFp; 00275 } 00276 00277 if (xa->connectionP) { 00278 freeXmlBlasterConnectionUnparsed(&xa->connectionP); 00279 } 00280 xa->connectionP = getXmlBlasterConnectionUnparsed(xa->argc, xa->argv); 00281 if (xa->connectionP == 0) { 00282 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00283 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00284 "[%.100s:%d] Creating XmlBlasterConnectionUnparsed failed", __FILE__, __LINE__); 00285 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00286 return false; 00287 } 00288 xa->connectionP->log = xa->log; 00289 xa->connectionP->logUserP = xa->logUserP; 00290 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Created XmlBlasterConnectionUnparsed"); 00291 00292 00293 /* Switch on compression? */ 00294 compressType = xa->props->getString(xa->props, "plugin/socket/compress/type", ""); 00295 compressType = xa->props->getString(xa->props, "dispatch/connection/plugin/socket/compress/type", compressType); 00296 00297 if (!strcmp(compressType, "zlib:stream")) { 00298 xa->connectionP->writeToSocket.writeToSocketFuncP = writenCompressed; 00299 xa->connectionP->writeToSocket.userP = xa; 00300 xa->connectionP->readFromSocket.readFromSocketFuncP = readnCompressed; 00301 xa->connectionP->readFromSocket.userP = xa; 00302 } 00303 else { 00304 if (strcmp(compressType, "")) { 00305 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode", compressType); 00306 } 00307 xa->connectionP->writeToSocket.writeToSocketFuncP = writenPlain; 00308 xa->connectionP->writeToSocket.userP = xa; 00309 xa->connectionP->readFromSocket.readFromSocketFuncP = readnPlain; 00310 xa->connectionP->readFromSocket.userP = xa; 00311 } 00312 00313 if (xa->connectionP->initConnection(xa->connectionP, exception) == false) /* Establish low level IP connection */ 00314 return false; 00315 00316 /* the fourth arg 'xa' is returned as 'void *userData' in update() method */ 00317 if (xa->callbackP != 0) { 00318 freeCallbackServerUnparsed(&xa->callbackP); 00319 } 00320 xa->callbackP = getCallbackServerUnparsed(xa->argc, xa->argv, interceptUpdate, xa); 00321 if (xa->callbackP == 0) { 00322 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00323 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00324 "[%.100s:%d] Creating CallbackServerUnparsed failed", __FILE__, __LINE__); 00325 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00326 freeXmlBlasterConnectionUnparsed(&xa->connectionP); 00327 return false; 00328 } 00329 xa->callbackP->log = xa->log; 00330 xa->callbackP->logUserP = xa->logUserP; 00331 00332 if (!strcmp(compressType, "zlib:stream")) { 00333 xa->callbackP->writeToSocket.writeToSocketFuncP = writenCompressed; 00334 xa->callbackP->writeToSocket.userP = xa; 00335 xa->callbackP->readFromSocket.readFromSocketFuncP = readnCompressed; 00336 xa->callbackP->readFromSocket.userP = xa; 00337 } 00338 else { 00339 xa->callbackP->writeToSocket.writeToSocketFuncP = writenPlain; 00340 xa->callbackP->writeToSocket.userP = xa; 00341 xa->callbackP->readFromSocket.readFromSocketFuncP = readnPlain; 00342 xa->callbackP->readFromSocket.userP = xa; 00343 } 00344 00345 xa->callbackP->useThisSocket(xa->callbackP, xa->connectionP->socketToXmlBlaster, xa->connectionP->socketToXmlBlasterUdp); 00346 00347 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00348 "Created CallbackServerUnparsed instance, creating on a separate thread a listener on socket://%s:%d...", 00349 (xa->callbackP->hostCB == 0) ? "" : xa->callbackP->hostCB, xa->callbackP->portCB); 00350 00351 /* Register our callback funtion which is called just before sending a message */ 00352 xa->connectionP->preSendEvent = preSendEvent; 00353 xa->connectionP->preSendEvent_userP = xa; 00354 00355 /* Register our callback funtion which is called just after sending a message */ 00356 xa->connectionP->postSendEvent = postSendEvent; 00357 xa->connectionP->postSendEvent_userP = xa; 00358 00359 /* thread blocks on socket listener */ 00360 threadRet = pthread_create(&xa->callbackThreadId, (const pthread_attr_t *)0, (void * (*)(void *))xa->callbackP->runCallbackServer, (void *)xa->callbackP); 00361 if (threadRet != 0) { 00362 strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00363 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00364 "[%.100s:%d] Creating thread failed with error number %d", 00365 __FILE__, __LINE__, threadRet); 00366 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00367 freeCallbackServerUnparsed(&xa->callbackP); 00368 freeXmlBlasterConnectionUnparsed(&xa->connectionP); 00369 return false; 00370 } 00371 00372 xa->isInitialized = true; 00373 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00374 "initialize() successful"); 00375 return xa->isInitialized; 00376 } 00377 00378 static bool isConnected(XmlBlasterAccessUnparsed *xa) 00379 { 00380 if (xa == 0 || xa->isShutdown || xa->connectionP == 0) { 00381 return false; 00382 } 00383 return xa->connectionP->isConnected(xa->connectionP); 00384 } 00385 00396 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception) 00397 { 00398 bool retVal; 00399 XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa; 00400 00401 /* if (!strcmp(XMLBLASTER_PUBLISH_ONEWAY, msgRequestInfoP->methodName)) */ 00402 if (xbl_isOneway(MSG_TYPE_INVOKE, msgRequestInfoP->methodName)) 00403 return msgRequestInfoP; 00404 00405 /* ======== Initialize threading ====== */ 00406 msgRequestInfoP->responseMutexIsLocked = false; /* Only to remember if the client thread holds the lock */ 00407 00408 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00409 "preSendEvent(%s) occurred", msgRequestInfoP->methodName); 00410 retVal = xa->callbackP->addResponseListener(xa->callbackP, msgRequestInfoP, responseEvent); 00411 if (retVal == false) { 00412 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00413 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00414 "[%.100s:%d] Couldn't register as response listener", __FILE__, __LINE__); 00415 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00416 return (MsgRequestInfo *)0; 00417 } 00418 00419 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00420 "preSendEvent(requestId=%s, msgRequestInfoP->responseBlob.dataLen=%d), entering lock", 00421 msgRequestInfoP->requestIdStr, msgRequestInfoP->responseBlob.dataLen); 00422 pthread_mutex_init(&msgRequestInfoP->responseMutex, NULL); /* returns always 0 */ 00423 if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) { 00424 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00425 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00426 "[%.100s:%d] Error trying to lock responseMutex %d", __FILE__, __LINE__, retVal); 00427 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00428 return (MsgRequestInfo *)0; 00429 } 00430 msgRequestInfoP->responseMutexIsLocked = true; /* Only if the client thread holds the lock */ 00431 00432 return msgRequestInfoP; 00433 } 00434 00443 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder) { 00444 int retVal; 00445 SocketDataHolder *s = (SocketDataHolder *)socketDataHolder; 00446 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa; 00447 00448 if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) { 00449 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to lock responseMutex in responseEvent() failed %d", retVal); 00450 /* return; */ 00451 } 00452 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is LOCKED"); 00453 00454 blobcpyAlloc(&msgRequestInfoP->responseBlob, s->blob.data, s->blob.dataLen); 00455 msgRequestInfoP->responseType = s->type; 00456 00457 if ((retVal = pthread_cond_signal(&msgRequestInfoP->responseCond)) != 0) { 00458 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d", retVal); 00459 /* return; */ 00460 } 00461 00462 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00463 "responseEvent(requestId '%s', msgType=%c, dataLen=%d) occurred, wake up signal sent", 00464 s->requestId, msgRequestInfoP->responseType, msgRequestInfoP->responseBlob.dataLen); 00465 00466 if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) { 00467 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to unlock responseMutex in responseEvent() failed %d", retVal); 00468 /* return; */ 00469 } 00470 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is UNLOCKED"); 00471 } 00472 00480 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception) 00481 { 00482 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa; 00483 struct timespec abstime; 00484 bool useTimeout = false; 00485 int retVal; 00486 00487 if (msgRequestInfoP->rollback) { 00488 mutexUnlock(msgRequestInfoP, exception); 00489 return (MsgRequestInfo *)0; 00490 } 00491 00492 if (xa->responseTimeout > 0 && getAbsoluteTime(xa->responseTimeout, &abstime) == true) { 00493 useTimeout = true; 00494 } 00495 00496 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) responseMutex is LOCKED, entering wait ...", msgRequestInfoP->requestIdStr); 00497 00498 if ((retVal = pthread_cond_init(&msgRequestInfoP->responseCond, NULL)) != 0) { 00499 xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr); 00500 strncpy0(exception->errorCode, "resource.exhaust", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */ 00501 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] pthread_cond_init() for '%s()' with requestId=%s returned %d.", 00502 __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal); 00503 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00504 return (MsgRequestInfo *)0; 00505 } 00506 00507 /* Wait for response, the callback server delivers it */ 00508 while (msgRequestInfoP->responseType == 0) { /* Protect for spurious wake ups (e.g. by SIGUSR1) */ 00509 if (useTimeout == true) { 00510 int error = pthread_cond_timedwait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex, &abstime); 00511 if (error == ETIMEDOUT) { 00512 xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr); 00513 strncpy0(exception->errorCode, "communication.responseTimeout", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */ 00514 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Waiting on response for '%s()' with requestId=%s timed out after blocking %ld millis", 00515 __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, xa->responseTimeout); 00516 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00517 return (MsgRequestInfo *)0; 00518 } 00519 } 00520 else { 00521 pthread_cond_wait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex); /* Wakes up from responseEvent() */ 00522 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00523 "Wake up tread, response of length %d arrived", msgRequestInfoP->responseBlob.dataLen); 00524 } 00525 } 00526 00527 if ((retVal = pthread_cond_destroy(&msgRequestInfoP->responseCond)) != 0) { 00528 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cond_destroy() for '%s()' with requestId=%s returned %d, we ignore it.", 00529 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal); 00530 } 00531 00532 msgRequestInfoP->blob.dataLen = msgRequestInfoP->responseBlob.dataLen; 00533 msgRequestInfoP->blob.data = msgRequestInfoP->responseBlob.data; 00534 msgRequestInfoP->responseBlob.dataLen = 0; 00535 msgRequestInfoP->responseBlob.data = 0; /* msgRequestInfoP->blob.data is now responsible to free() the data */ 00536 00537 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) 00538 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00539 "Thread #%ld woke up in postSendEvent() for msgType=%c and dataLen=%d", 00540 msgRequestInfoP->requestIdStr, msgRequestInfoP->responseType, msgRequestInfoP->blob.dataLen); 00541 00542 00543 if (msgRequestInfoP->responseType == (char)MSG_TYPE_EXCEPTION) { 00544 convertToXmlBlasterException(&msgRequestInfoP->blob, exception, false); 00545 freeBlobHolderContent(&msgRequestInfoP->blob); 00546 msgRequestInfoP->responseType = 0; 00547 return (MsgRequestInfo *)0; 00548 } 00549 00550 msgRequestInfoP->responseType = 0; 00551 00552 /* if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) i woke up, entering unlock ...", msgRequestInfoP->requestIdStr); */ 00553 if (mutexUnlock(msgRequestInfoP, exception) == false) 00554 return (MsgRequestInfo *)0; 00555 00556 return msgRequestInfoP; 00557 } 00558 00565 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception) { 00566 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa; 00567 int retVal; 00568 if (msgRequestInfoP->responseMutexIsLocked == false) 00569 return true; 00570 msgRequestInfoP->responseMutexIsLocked = false; 00571 if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) { 00572 char embeddedText[XMLBLASTEREXCEPTION_MESSAGE_LEN]; 00573 if (exception == 0) { 00574 if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) { 00575 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.", 00576 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal); 00577 } 00578 return false; 00579 } 00580 if (*exception->errorCode != 0) { 00581 SNPRINTF(embeddedText, XMLBLASTEREXCEPTION_MESSAGE_LEN, "{%s:%s}", exception->errorCode, exception->message); 00582 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Ignoring embedded exception %s: %s", exception->errorCode, exception->message); 00583 } 00584 else 00585 *embeddedText = 0; 00586 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00587 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] ERROR trying to unlock responseMutex, return=%d. Embedded %s", __FILE__, __LINE__, retVal, embeddedText); 00588 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00589 00590 if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) { 00591 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.", 00592 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal); 00593 } 00594 return false; 00595 } 00596 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent() responseMutex is UNLOCKED"); 00597 00598 if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) { 00599 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.", 00600 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal); 00601 } 00602 return true; 00603 } 00604 00605 Dll_Export const char *xmlBlasterAccessUnparsedUsage(char *usage) 00606 { 00607 /* take care not to exceed XMLBLASTER_MAX_USAGE_LEN */ 00608 SNPRINTF(usage, XMLBLASTER_MAX_USAGE_LEN, "%.950s%.950s%s", xmlBlasterConnectionUnparsedUsage(), callbackServerRawUsage(), 00609 "\n -plugin/socket/multiThreaded [true]" 00610 "\n If true the update() call to your client code is a separate thread." 00611 "\n -plugin/socket/responseTimeout [60000 (one minute)]" 00612 "\n The time in millis to wait on a response, 0 is forever." 00613 "\n -logLevel ERROR | WARN | INFO | TRACE | DUMP [WARN]" 00614 ); 00615 00616 return usage; 00617 } 00618 00619 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos, 00620 UpdateFp clientUpdateFp, XmlBlasterException *exception) 00621 { 00622 char *response = 0; 00623 char *qos_; 00624 00625 if (checkArgs(xa, "connect", false, exception) == false) return 0; 00626 00627 /* Is allowed, we use our default handler in this case 00628 if (clientUpdateFp == 0) { 00629 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00630 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'updateFp' to connect()", __FILE__, __LINE__); 00631 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00632 return false; 00633 } 00634 */ 00635 00636 if (qos == 0) { 00637 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00638 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'qos' to connect()", __FILE__, __LINE__); 00639 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00640 return false; 00641 } 00642 00643 if (initialize(xa, clientUpdateFp, exception) == false) { 00644 return false; 00645 } 00646 00647 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Invoking connect()"); 00648 00649 if (strstr(qos, "<callback") != 0) { 00650 /* User has given us a callback address */ 00651 qos_ = strcpyAlloc(qos); 00652 } 00653 else { 00654 /* We add the callback sequence with our tunnel callback host and port 00655 HACK: This is error prone depending on the given qos */ 00656 const char *pos; 00657 enum { SIZE=1024 }; 00658 char callbackQos[SIZE]; 00659 snprintf0(callbackQos, SIZE, 00660 "<queue relating='callback'>" /* maxEntries='100' maxEntriesCache='100'>" */ 00661 " <callback type='SOCKET' sessionId='%s'>" 00662 " socket://%.120s:%d" 00663 " </callback>" 00664 "</queue>", 00665 "NoCallbackSessionId", xa->callbackP->hostCB, xa->callbackP->portCB); 00666 qos_ = (char *)calloc(strlen(qos) + SIZE, sizeof(char *)); 00667 pos = strstr(qos, "</qos>"); 00668 if (pos == 0) { 00669 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00670 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid 'qos' markup to connect()", __FILE__, __LINE__); 00671 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00672 return false; 00673 } 00674 strncpy0(qos_, qos, pos-qos+1); 00675 strncat0(qos_, callbackQos, SIZE); 00676 strncat0(qos_, "</qos>", 8); 00677 } 00678 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connecting with qos=%s", qos_); 00679 00680 /* Register our function responseEvent() to be notified when the response arrives, 00681 this is done by preSendEvent() callback called during connect() */ 00682 00683 response = xa->connectionP->connect(xa->connectionP, qos_, exception); 00684 00685 free(qos_); 00686 /* freeBlobHolderContent(&xa->responseBlob); */ 00687 00688 /* The response was handled by a callback to postSendEvent */ 00689 00690 if (response == 0) return response; 00691 00692 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00693 "Got response for connect(secretSessionId=%s)", xa->connectionP->secretSessionId); 00694 return response; 00695 } 00696 00697 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception) 00698 { 00699 bool p; 00700 if (checkArgs(xa, "disconnect", true, exception) == false ) return 0; 00701 p = xa->connectionP->disconnect(xa->connectionP, qos, exception); 00702 return p; 00703 } 00704 00711 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception) 00712 { 00713 char *p; 00714 if (checkArgs(xa, "publish", true, exception) == false ) return 0; 00715 p = xa->connectionP->publish(xa->connectionP, msgUnit, exception); 00716 return p; 00717 } 00718 00725 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception) 00726 { 00727 QosArr *p; 00728 if (checkArgs(xa, "publishArr", true, exception) == false ) return 0; 00729 p = xa->connectionP->publishArr(xa->connectionP, msgUnitArr, exception); 00730 return p; 00731 } 00732 00739 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception) 00740 { 00741 if (checkArgs(xa, "publishOneway", true, exception) == false ) return; 00742 xa->connectionP->publishOneway(xa->connectionP, msgUnitArr, exception); 00743 } 00744 00750 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception) 00751 { 00752 char *p; 00753 if (checkArgs(xa, "subscribe", true, exception) == false ) return 0; 00754 p = xa->connectionP->subscribe(xa->connectionP, key, qos, exception); 00755 return p; 00756 } 00757 00765 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception) 00766 { 00767 QosArr *p; 00768 if (checkArgs(xa, "unSubscribe", true, exception) == false ) return 0; 00769 p = xa->connectionP->unSubscribe(xa->connectionP, key, qos, exception); 00770 return p; 00771 } 00772 00781 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception) 00782 { 00783 QosArr *p; 00784 if (checkArgs(xa, "erase", true, exception) == false ) return 0; 00785 p = xa->connectionP->erase(xa->connectionP, key, qos, exception); 00786 return p; 00787 } 00788 00798 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception) 00799 { 00800 char *p; 00801 if (checkArgs(xa, "ping", true, exception) == false ) return 0; 00802 p = xa->connectionP->ping(xa->connectionP, qos, exception); 00803 return p; 00804 } 00805 00812 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception) 00813 { 00814 MsgUnitArr *msgUnitArr; 00815 if (checkArgs(xa, "get", true, exception) == false ) return 0; 00816 msgUnitArr = xa->connectionP->get(xa->connectionP, key, qos, exception); 00817 return msgUnitArr; 00818 } 00819 00820 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName, 00821 bool checkIsConnected, XmlBlasterException *exception) 00822 { 00823 if (xa == 0) { 00824 char *stack = getStackTrace(10); 00825 if (exception == 0) { 00826 printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %s() %s", 00827 __FILE__, __LINE__, methodName, stack); 00828 } 00829 else { 00830 strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00831 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00832 "[%.100s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %.16s() %s", 00833 __FILE__, __LINE__, methodName, stack); 00834 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message); 00835 } 00836 free(stack); 00837 return false; 00838 } 00839 00840 if (exception == 0) { 00841 char *stack = getStackTrace(10); 00842 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s", 00843 __FILE__, __LINE__, methodName, stack); 00844 free(stack); 00845 return false; 00846 } 00847 00848 if (xa->isShutdown || (checkIsConnected && !xa->isConnected(xa))) { 00849 char *stack = getStackTrace(10); 00850 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00851 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00852 "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s", 00853 __FILE__, __LINE__, methodName, stack); 00854 free(stack); 00855 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00856 return false; 00857 } 00858 00859 initializeXmlBlasterException(exception); 00860 00861 return true; 00862 } 00863 00870 static bool runUpdate(UpdateContainer *container) 00871 { 00872 XmlBlasterAccessUnparsed *xa = container->xa; 00873 MsgUnitArr *msgUnitArrP = container->msgUnitArrP; 00874 void *userData = container->userData; 00875 CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData; 00876 XmlBlasterException *exception = &container->exception; 00877 SocketDataHolder *socketDataHolder = &container->socketDataHolder; 00878 bool retVal; 00879 00880 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Entering runUpdate()"); 00881 00882 retVal = xa->clientsUpdateFp(msgUnitArrP, xa, exception); 00883 00884 if (xa->lowLevelAutoAck) { /* returned already */ 00885 } 00886 else { 00887 cb->sendResponseOrException(retVal, cb, socketDataHolder, msgUnitArrP, exception); 00888 } 00889 00890 free(container); 00891 00892 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00893 "runUpdate: Update thread 0x%x is exiting", get_pthread_id(pthread_self())); 00894 xa->threadCounter--; 00895 return (retVal==true) ? 0 : 1; 00896 } 00897 00903 static void interceptUpdate(MsgUnitArr *msgUnitArrP, void *userData, 00904 XmlBlasterException *exception, void /*SocketDataHolder*/ *socketDataHolder) 00905 { 00906 CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData; 00907 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)cb->updateCbUserData; 00908 00909 if (xa->clientsUpdateFp == 0) { /* Client has not registered an update() */ 00910 size_t i; 00911 bool testException = false; 00912 bool success = true; 00913 00914 for (i=0; i<msgUnitArrP->len; i++) { 00915 const char *key = msgUnitArrP->msgUnitArr[i].key; 00916 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00917 "CALLBACK update() default handler: Asynchronous message update arrived:%s id=%s, we ignore it in this default handler\n", 00918 key, ((SocketDataHolder*)socketDataHolder)->requestId); 00919 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>"); 00920 /* Return QoS: Everything is OK */ 00921 } 00922 if (testException) { 00923 strncpy0(exception->errorCode, "user.clientCode", 00924 XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00925 strncpy0(exception->message, "I don't want these messages", 00926 XMLBLASTEREXCEPTION_MESSAGE_LEN); 00927 success = false; 00928 } 00929 cb->sendResponseOrException(success, cb, socketDataHolder, msgUnitArrP, exception); 00930 return; 00931 } 00932 00933 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "interceptUpdate(): Received message"); 00934 00935 if (xa->callbackMultiThreaded == false) { 00936 bool ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception); 00937 cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception); 00938 return; 00939 } 00940 00941 { 00942 pthread_t tid; 00943 int threadRet = 0; 00944 UpdateContainer *container = (UpdateContainer*)malloc(sizeof(UpdateContainer)); 00945 pthread_attr_t attr; 00946 00947 pthread_attr_init(&attr); 00948 /* Cleanup all resources after ending the thread, instead of calling pthread_join() */ 00949 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 00950 00951 container->xa = xa; 00952 container->msgUnitArrP = msgUnitArrP; 00953 container->userData = userData; 00954 memcpy(&container->exception, exception, sizeof(XmlBlasterException)); 00955 memcpy(&container->socketDataHolder, socketDataHolder, sizeof(SocketDataHolder)); /* The blob pointer is freed already by CallbackServerUnparsed */ 00956 00957 if (xa->lowLevelAutoAck) { 00958 size_t i; 00959 for (i=0; i<msgUnitArrP->len; i++) { 00960 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>"); 00961 } 00962 } 00963 00964 /* 00965 Guaranteed sequence: 00966 The server uses max one thread to deliver update() for each client 00967 If the update contains an array of messages those are handled as a 00968 complete bulk in the correct sequence here. 00969 */ 00970 00971 /* this thread will deliver the update message to the client code, 00972 Note: we need a thread pool cache for better performance */ 00973 xa->threadCounter++; 00974 threadRet = pthread_create(&tid, &attr, 00975 (void * (*)(void *))runUpdate, (void *)container); 00976 if (threadRet != 0) { 00977 bool ret = false; 00978 free(container); 00979 strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00980 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00981 "[%.100s:%d] Creating thread failed with error number %d, we deliver the message in the same thread", 00982 __FILE__, __LINE__, threadRet); 00983 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message); 00984 ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception); 00985 cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception); 00986 xa->threadCounter--; 00987 pthread_attr_destroy(&attr); 00988 return; 00989 } 00990 00991 /* Is done already with above PTHREAD_CREATE_DETACHED 00992 threadRet = pthread_detach(tid); 00993 if (threadRet != 0) { 00994 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching thread failed with error number %d", __LINE__, threadRet); 00995 } 00996 */ 00997 00998 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00999 "interceptUpdate: Received message and delegated it to a separate thread 0x%x to deliver", get_pthread_id(tid)); 01000 01001 pthread_attr_destroy(&attr); 01002 } 01003 01004 if (xa->lowLevelAutoAck) { 01005 *exception->errorCode = 0; 01006 cb->sendResponseOrException(true, cb, socketDataHolder, msgUnitArrP, exception); 01007 } 01008 } 01009 01013 static ssize_t writenPlain(void * userP, const int fd, const char *ptr, const size_t nbytes) { 01014 int rc; 01015 ssize_t ret; 01016 01017 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP; 01018 01019 /* Start mutex */ 01020 rc = pthread_mutex_lock(&xa->writenMutex); 01021 if (rc != 0) /* EINVAL */ 01022 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc); 01023 01024 /* Send data */ 01025 ret = writen(fd, ptr, nbytes); 01026 01027 /* End mutex */ 01028 rc = pthread_mutex_unlock(&xa->writenMutex); 01029 if (rc != 0) /* EPERM */ 01030 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc); 01031 01032 return ret; 01033 01034 } 01035 01039 static ssize_t writenCompressed(void *userP, const int fd, const char *ptr, const size_t nbytes) { 01040 int rc; 01041 ssize_t ret; 01042 01043 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP; 01044 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "writenCompressed(%u)", nbytes); 01045 01046 /* Start mutex */ 01047 rc = pthread_mutex_lock(&xa->writenMutex); 01048 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc); 01049 01050 /* Send data */ 01051 ret = xmlBlaster_writenCompressed(xa->connectionP->zlibWriteBuf, fd, ptr, nbytes); 01052 01053 /* End mutex */ 01054 rc = pthread_mutex_unlock(&xa->writenMutex); 01055 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc); 01056 01057 return ret; 01058 } 01059 01063 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) { 01064 int rc; 01065 ssize_t ret; 01066 01067 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP; 01068 01069 rc = pthread_mutex_lock(&xa->readnMutex); 01070 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc); 01071 01072 ret = readn(fd, ptr, nbytes, fpNumRead, userP2); 01073 01074 rc = pthread_mutex_unlock(&xa->readnMutex); 01075 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc); 01076 01077 return ret; 01078 } 01079 01083 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) { 01084 int rc; 01085 ssize_t ret; 01086 01087 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP; 01088 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "readnCompressed(%u)", nbytes); 01089 01090 rc = pthread_mutex_lock(&xa->readnMutex); 01091 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc); 01092 01093 ret = xmlBlaster_readnCompressed(xa->connectionP->zlibReadBuf, fd, ptr, nbytes, fpNumRead, userP2); 01094 01095 rc = pthread_mutex_unlock(&xa->readnMutex); 01096 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc); 01097 01098 return ret; 01099 } 01100 01101 #ifdef XmlBlasterAccessUnparsedMain /* compile a standalone test program */ 01102 01108 static bool myUpdate(MsgUnitArr *msgUnitArrP, void *userData, XmlBlasterException *xmlBlasterException) 01109 { 01110 size_t i; 01111 bool testException = false; 01112 if (userData != 0) ; /* to avoid compiler warning (we don't need it here) */ 01113 for (i=0; i<msgUnitArrP->len; i++) { 01114 char *xml = messageUnitToXml(&msgUnitArrP->msgUnitArr[i]); 01115 printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n", xml); 01116 free(xml); 01117 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>"); 01118 /* Return QoS: Everything is OK */ 01119 } 01120 if (testException) { 01121 strncpy0(xmlBlasterException->errorCode, "user.clientCode", 01122 XMLBLASTEREXCEPTION_ERRORCODE_LEN); 01123 strncpy0(xmlBlasterException->message, "I don't want these messages", 01124 XMLBLASTEREXCEPTION_MESSAGE_LEN); 01125 return false; 01126 } 01127 return true; 01128 } 01129 01133 int main(int argc, char** argv) 01134 { 01135 int ii; 01136 int numTests = 1; 01137 bool testCallInitialize = false; 01138 01139 for (ii=0; ii < argc-1; ii++) 01140 if (strcmp(argv[ii], "-numTests") == 0) { 01141 if (strToInt(&numTests, argv[++ii]) == false) 01142 printf("[XmlBlasterAccessUnparsed] WARN '-numTests %s' is invalid\n", argv[ii]); 01143 } 01144 01145 for (ii=0; ii<numTests; ii++) { 01146 int iarg; 01147 char *response = (char *)0; 01148 /* 01149 * callbackSessionId: 01150 * Is created by the client and used to validate callback messages in update. 01151 * This is sent on connect in ConnectQos. 01152 * (Is different from the xmlBlaster secret session ID) 01153 */ 01154 const char *callbackSessionId = "topSecret"; 01155 XmlBlasterException xmlBlasterException; 01156 XmlBlasterAccessUnparsed *xa = 0; 01157 01158 /* 01159 const char *tmp = getStackTrace(20); 01160 printf("[client] stackTrace=%s\n", tmp); 01161 free(tmp); 01162 */ 01163 01164 # ifdef PTHREAD_THREADS_MAX 01165 printf("[client] Try option '-help' if you need usage informations, max %d" 01166 " threads per process are supported on this OS\n", PTHREAD_THREADS_MAX); 01167 # else 01168 printf("[client] Try option '-help' if you need usage informations\n"); 01169 # endif 01170 01171 for (iarg=0; iarg < argc; iarg++) { 01172 if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) { 01173 char usage[XMLBLASTER_MAX_USAGE_LEN]; 01174 const char *pp = 01175 "\n -logLevel ERROR | WARN | INFO | TRACE | DUMP [WARN]" 01176 "\n -numTests How often to run the same tests [1]" 01177 "\n\nExample:" 01178 "\n XmlBlasterAccessUnparsedMain -logLevel TRACE" 01179 " -dispatch/connection/plugin/socket/hostname server.mars.universe"; 01180 printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n", 01181 getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp); 01182 exit(1); 01183 } 01184 } 01185 01186 xa = getXmlBlasterAccessUnparsed(argc, argv); 01187 01188 if (testCallInitialize) { 01189 if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) { 01190 printf("[client] Connection to xmlBlaster failed," 01191 " please start the server or check your configuration\n"); 01192 freeXmlBlasterAccessUnparsed(xa); 01193 exit(1); 01194 } 01195 } 01196 01197 { /* connect */ 01198 char connectQos[2048]; 01199 char callbackQos[1024]; 01200 01201 if (testCallInitialize) { 01202 SNPRINTF(callbackQos, 1024, 01203 "<queue relating='callback' maxEntries='100' maxEntriesCache='100'>" 01204 " <callback type='SOCKET' sessionId='%s'>" 01205 " socket://%.120s:%d" 01206 " </callback>" 01207 "</queue>", 01208 callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB); 01209 } 01210 else 01211 *callbackQos = '\0'; 01212 01213 SNPRINTF(connectQos, 2048, 01214 "<qos>" 01215 " <securityService type='htpasswd' version='1.0'>" 01216 " <![CDATA[" 01217 " <user>fritz</user>" 01218 " <passwd>secret</passwd>" 01219 " ]]>" 01220 " </securityService>" 01221 "%.1024s" 01222 "</qos>", callbackQos); 01223 01224 response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException); 01225 if (*xmlBlasterException.errorCode != 0) { 01226 printf("[client] Caught exception during connect errorCode=%s, message=%s\n", 01227 xmlBlasterException.errorCode, xmlBlasterException.message); 01228 freeXmlBlasterAccessUnparsed(xa); 01229 exit(1); 01230 } 01231 free(response); 01232 printf("[client] Connected to xmlBlaster, do some tests ...\n"); 01233 } 01234 01235 response = xa->ping(xa, 0, &xmlBlasterException); 01236 if (response == (char *)0) { 01237 printf("[client] ERROR: Pinging a connected server failed: errorCode=%s, message=%s\n", 01238 xmlBlasterException.errorCode, xmlBlasterException.message); 01239 } 01240 else { 01241 printf("[client] Pinging a connected server, response=%s\n", response); 01242 free(response); 01243 } 01244 01245 { /* subscribe ... */ 01246 const char *key = "<key oid='HelloWorld'/>"; 01247 const char *qos = "<qos/>"; 01248 printf("[client] Subscribe message 'HelloWorld' ...\n"); 01249 response = xa->subscribe(xa, key, qos, &xmlBlasterException); 01250 if (*xmlBlasterException.errorCode != 0) { 01251 printf("[client] Caught exception in subscribe errorCode=%s, message=%s\n", 01252 xmlBlasterException.errorCode, xmlBlasterException.message); 01253 xa->disconnect(xa, 0, &xmlBlasterException); 01254 freeXmlBlasterAccessUnparsed(xa); 01255 exit(1); 01256 } 01257 printf("[client] Subscribe success, returned status is '%s'\n", response); 01258 free(response); 01259 } 01260 01261 { /* publish ... */ 01262 MsgUnit msgUnit; 01263 printf("[client] Publishing message 'HelloWorld' ...\n"); 01264 msgUnit.key = strcpyAlloc("<key oid='HelloWorld'/>"); 01265 msgUnit.content = strcpyAlloc("Some message payload"); 01266 msgUnit.contentLen = strlen(msgUnit.content); 01267 msgUnit.qos =strcpyAlloc("<qos><persistent/></qos>"); 01268 response = xa->publish(xa, &msgUnit, &xmlBlasterException); 01269 freeMsgUnitData(&msgUnit); 01270 if (*xmlBlasterException.errorCode != 0) { 01271 printf("[client] Caught exception in publish errorCode=%s, message=%s\n", 01272 xmlBlasterException.errorCode, xmlBlasterException.message); 01273 xa->disconnect(xa, 0, &xmlBlasterException); 01274 freeXmlBlasterAccessUnparsed(xa); 01275 exit(1); 01276 } 01277 printf("[client] Publish success, returned status is '%s'\n", response); 01278 free(response); 01279 } 01280 01281 { /* unSubscribe ... */ 01282 const char *key = "<key oid='HelloWorld'/>"; 01283 const char *qos = "<qos/>"; 01284 printf("[client] UnSubscribe message 'HelloWorld' ...\n"); 01285 response = xa->unSubscribe(xa, key, qos, &xmlBlasterException); 01286 if (response) { 01287 printf("[client] Unsubscribe success, returned status is '%s'\n", response); 01288 free(response); 01289 } 01290 else { 01291 printf("[client] Caught exception in unSubscribe errorCode=%s, message=%s\n", 01292 xmlBlasterException.errorCode, xmlBlasterException.message); 01293 xa->disconnect(xa, 0, &xmlBlasterException); 01294 freeXmlBlasterAccessUnparsed(xa); 01295 exit(1); 01296 } 01297 } 01298 01299 { /* get synchnronous ... */ 01300 size_t i; 01301 const char *key = "<key queryType='XPATH'>//key</key>"; 01302 const char *qos = "<qos/>"; 01303 MsgUnitArr *msgUnitArr; 01304 printf("[client] Get synchronous messages with XPath '//key' ...\n"); 01305 msgUnitArr = xa->get(xa, key, qos, &xmlBlasterException); 01306 if (*xmlBlasterException.errorCode != 0) { 01307 printf("[client] Caught exception in get errorCode=%s, message=%s\n", 01308 xmlBlasterException.errorCode, xmlBlasterException.message); 01309 xa->disconnect(xa, 0, &xmlBlasterException); 01310 freeXmlBlasterAccessUnparsed(xa); 01311 exit(1); 01312 } 01313 if (msgUnitArr != (MsgUnitArr *)0) { 01314 for (i=0; i<msgUnitArr->len; i++) { 01315 char *contentStr = strFromBlobAlloc(msgUnitArr->msgUnitArr[i].content, 01316 msgUnitArr->msgUnitArr[i].contentLen); 01317 const char *dots = (msgUnitArr->msgUnitArr[i].contentLen > 96) ? 01318 " ..." : ""; 01319 printf("\n[client] Received message#%u/%u:\n" 01320 "-------------------------------------" 01321 "%s\n <content>%.100s%s</content>%s\n" 01322 "-------------------------------------\n", 01323 i+1, msgUnitArr->len, 01324 msgUnitArr->msgUnitArr[i].key, 01325 contentStr, dots, 01326 msgUnitArr->msgUnitArr[i].qos); 01327 free(contentStr); 01328 } 01329 freeMsgUnitArr(msgUnitArr); 01330 } 01331 else { 01332 printf("[client] Caught exception in get errorCode=%s, message=%s\n", 01333 xmlBlasterException.errorCode, xmlBlasterException.message); 01334 xa->disconnect(xa, 0, &xmlBlasterException); 01335 freeXmlBlasterAccessUnparsed(xa); 01336 exit(1); 01337 } 01338 } 01339 01340 01341 { /* erase ... */ 01342 const char *key = "<key oid='HelloWorld'/>"; 01343 const char *qos = "<qos/>"; 01344 printf("[client] Erasing message 'HelloWorld' ...\n"); 01345 response = xa->erase(xa, key, qos, &xmlBlasterException); 01346 if (*xmlBlasterException.errorCode != 0) { 01347 printf("[client] Caught exception in erase errorCode=%s, message=%s\n", 01348 xmlBlasterException.errorCode, xmlBlasterException.message); 01349 xa->disconnect(xa, 0, &xmlBlasterException); 01350 freeXmlBlasterAccessUnparsed(xa); 01351 exit(1); 01352 } 01353 printf("[client] Erase success, returned status is '%s'\n", response); 01354 free(response); 01355 } 01356 01357 if (xa->disconnect(xa, 0, &xmlBlasterException) == false) { 01358 printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n", 01359 xmlBlasterException.errorCode, xmlBlasterException.message); 01360 freeXmlBlasterAccessUnparsed(xa); 01361 exit(1); 01362 } 01363 01364 freeXmlBlasterAccessUnparsed(xa); 01365 if (numTests > 1) { 01366 printf("[client] Successfully finished test #%d from %d\n\n", ii, numTests); 01367 } 01368 } 01369 printf("[client] Good bye.\n"); 01370 return 0; /*exit(0);*/ 01371 } 01372 #endif /* #ifdef XmlBlasterAccessUnparsedMain */ 01373