1 /*----------------------------------------------------------------------------
   2 Name:      XmlBlasterAccessUnparsed.c
   3 Project:   xmlBlaster.org
   4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
   5 Comment:   Wraps raw socket connection to xmlBlaster
   6            Implements sync connection and async callback
   7            Needs pthread to compile (multi threading).
   8 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
   9 Compile:
  10   LINUX:   gcc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -export-dynamic -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
  11            g++ -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
  12            icc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
  13   WIN:     cl /MT /W4 -DXmlBlasterAccessUnparsedMain -D_WINDOWS -I.. -I../pthreads /FeXmlBlasterAccessUnparsedMain.exe  XmlBlasterAccessUnparsed.c ..\util\msgUtil.c ..\util\Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c ws2_32.lib pthreadVC2.lib
  14            (download pthread for Windows and WinCE from http://sources.redhat.com/pthreads-win32)
  15   Solaris: cc  -DXmlBlasterAccessUnparsedMain -v -Xc -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
  16            CC  -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
  17 
  18   Linux with libxmlBlasterC.so:
  19            gcc -DXmlBlasterAccessUnparsedMain -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c  -L../../../lib -lxmlBlasterClientC -I.. -Wl,-rpath=../../../lib -D_REENTRANT  -lpthread
  20 See:       http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
  21 -----------------------------------------------------------------------------*/
  22 #include <stdio.h>
  23 #include <stdlib.h>
  24 #include <string.h>
  25 #if defined(WINCE)
  26 #  if defined(XB_USE_PTHREADS)
  27 #     include <pthreads/pthread.h>
  28 #  else
  29       /*#include <pthreads/need_errno.h> */
  30       static int errno=0; /* single threaded workaround*/
  31 #  endif
  32 #else
  33 #  include <errno.h>
  34 #  include <sys/types.h>
  35 #endif
  36 #include <socket/xmlBlasterSocket.h>
  37 #include <socket/xmlBlasterZlib.h>
  38 #include <XmlBlasterAccessUnparsed.h>
  39 #include <util/Timestampc.h>
  40 
  41 /**
  42  * Little helper to collect args for the new created thread
  43  */
  44 typedef struct Dll_Export UpdateContainer {
  45    XmlBlasterAccessUnparsed *xa;
  46    MsgUnitArr *msgUnitArrP;
  47    void *userData;
  48    XmlBlasterException exception;     /* Holding a clone from the original as the callback thread may use it for another message */
  49    SocketDataHolder socketDataHolder; /* Holding a clone from the original */
  50 } UpdateContainer;
  51 
  52 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp update, XmlBlasterException *exception);
  53 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos, UpdateFp update, XmlBlasterException *exception);
  54 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
  55 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception);
  56 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
  57 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
  58 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
  59 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
  60 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
  61 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
  62 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
  63 static bool isConnected(XmlBlasterAccessUnparsed *xa);
  64 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder);
  65 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
  66 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
  67 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
  68 static void interceptUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, void/*SocketDataHolder*/ *socketDataHolder);
  69 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception);
  70 static ssize_t writenPlain(void *xa, const int fd, const char *ptr, const size_t nbytes);
  71 static ssize_t writenCompressed(void *xa, const int fd, const char *ptr, const size_t nbytes);
  72 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
  73 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
  74 
  75 Dll_Export XmlBlasterAccessUnparsed *getXmlBlasterAccessUnparsed(int argc, const char* const* argv) {
  76    XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)calloc(1, sizeof(XmlBlasterAccessUnparsed));
  77    if (xa == 0) return xa;
  78    xa->argc = argc;
  79    xa->argv = argv;
  80    xa->props = createProperties(xa->argc, xa->argv);
  81    if (xa->props == 0) {
  82       freeXmlBlasterAccessUnparsed(xa);
  83       return (XmlBlasterAccessUnparsed *)0;
  84    }
  85    xa->isInitialized = false;
  86    xa->isShutdown = false;
  87    xa->connectionP = 0;
  88    xa->callbackP = 0;
  89    xa->userObject = 0; /* A client can use this pointer to point to any client specific information */
  90    xa->userFp = 0;
  91    xa->connect = xmlBlasterConnect;
  92    xa->initialize = initialize;
  93    xa->disconnect = xmlBlasterDisconnect;
  94    xa->publish = xmlBlasterPublish;
  95    xa->publishArr = xmlBlasterPublishArr;
  96    xa->publishOneway = xmlBlasterPublishOneway;
  97    xa->subscribe = xmlBlasterSubscribe;
  98    xa->unSubscribe = xmlBlasterUnSubscribe;
  99    xa->erase = xmlBlasterErase;
 100    xa->get = xmlBlasterGet;
 101    xa->ping = xmlBlasterPing;
 102    xa->isConnected = isConnected;
 103    xa->logLevel = parseLogLevel(xa->props->getString(xa->props, "logLevel", "WARN"));
 104    xa->log = xmlBlasterDefaultLogging;
 105    xa->logUserP = 0;
 106    xa->clientsUpdateFp = 0;
 107    xa->callbackMultiThreaded = xa->props->getBool(xa->props, "plugin/socket/multiThreaded", true);
 108    xa->callbackMultiThreaded = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/multiThreaded", xa->callbackMultiThreaded);
 109    /*   xa->lowLevelAutoAck = xa->props->getBool(xa->props, "plugin/socket/lowLevelAutoAck", false); */
 110    /*   xa->lowLevelAutoAck = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/lowLevelAutoAck", xa->lowLevelAutoAck); */
 111    /* Currently forced to false: needs mutex and reference counter to not freeMsgUnitArr twice */
 112    xa->lowLevelAutoAck = false;
 113 
 114    /* We shouldn't do much logging here, as the caller had no chance to redirect it up to now */
 115    if (xa->callbackMultiThreaded == true) {
 116       if (xa->logLevel>=XMLBLASTER_LOG_DUMP)
 117          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, "Multi threaded callback delivery is activated with -plugin/socket/multiThreaded true");
 118       /*xa->callbackMultiThreaded = false;*/
 119    }
 120    /* stdint.h: # define INT32_MAX              (2147483647) */
 121    xa->responseTimeout = xa->props->getLong(xa->props, "plugin/socket/responseTimeout", 2147483647L); /* Before xmlBlaster 1.1: One minute (given in millis) */
 122    xa->responseTimeout = xa->props->getLong(xa->props, "dispatch/connection/plugin/socket/responseTimeout", xa->responseTimeout);
 123    /* ERROR HANDLING ? xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Your configuration '-plugin/socket/responseTimeout %s' is invalid", argv[iarg]); */
 124    memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
 125    xa->threadCounter = 0;
 126 
 127    if (xa->logLevel>=XMLBLASTER_LOG_DUMP) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__,
 128                                 "Created handle: -logLevel=%s -plugin/socket/responseTimeout=%ld",
 129                                 getLogLevelStr(xa->logLevel), xa->responseTimeout);
 130 
 131    /* See: http://www.llnl.gov/computing/tutorials/workshops/workshop/pthreads/MAIN.html */
 132    pthread_mutex_init(&xa->writenMutex, NULL); /* returns always 0 */
 133    pthread_mutex_init(&xa->readnMutex, NULL);
 134    return xa;
 135 }
 136 
 137 Dll_Export void freeXmlBlasterAccessUnparsed(XmlBlasterAccessUnparsed *xa)
 138 {
 139    int rc;
 140 
 141    if (xa == 0) {
 142       char *stack = getStackTrace(10);
 143       printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to freeXmlBlasterAccessUnparsed() %s",
 144                 __FILE__, __LINE__, stack);
 145       free(stack);
 146       return;
 147    }
 148 
 149    if (xa->isShutdown) return; /* Avoid simultaneous multiple calls */
 150    xa->isShutdown = true;      /* Inhibit access to xa */
 151 
 152    if (xa->callbackP != 0) {
 153       xa->callbackP->shutdown(xa->callbackP);
 154    }
 155    if (xa->connectionP != 0) {
 156       xa->connectionP->shutdown(xa->connectionP);
 157    }
 158 
 159    if (xa->callbackP != 0) {
 160       /* Detach or join? On Linux both work fine. On Windows it blocks sometimes forever during join */
 161       const bool USE_DETACH_MODE = xa->props->getBool(xa->props, "plugin/socket/detachCbThread", true);
 162       int retVal;
 163       if (xa->callbackP->threadIsAlive && !USE_DETACH_MODE) {
 164          /* pthread_cancel() does not block. Who cleans up open resources? TODO: pthread_cleanup_push() */
 165          /* On Linux all works fine without pthread_cancel() but on Windows the later pthread_join() sometimes hangs without a pthread_cancel() */
 166          /*
 167          retVal = pthread_cancel(xa->callbackThreadId);
 168          if (retVal != 0) {
 169             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cancel problem return value is %d", retVal);
 170          }
 171          */
 172       }
 173 
 174       if (USE_DETACH_MODE) {
 175          /* Check if above xa->callbackP->shutdown(xa->callbackP) thread has finished: */
 176          /*bool hasTerminated = */xa->callbackP->waitOnCallbackThreadTermination(xa->callbackP, 2000);
 177 
 178          retVal = pthread_detach(xa->callbackThreadId); /* Frees resources (even if thread has died already), don't call multiple times on same thread! */
 179          if (retVal != 0) {
 180             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching callback thread 0x%x failed with error number %d", __LINE__, get_pthread_id(xa->callbackThreadId), retVal);
 181          }
 182          else {
 183             if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 184                                           "pthread_detach(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
 185          }
 186       }
 187       else { /* JOIN mode */
 188          retVal = pthread_join(xa->callbackThreadId, 0);
 189          if (retVal != 0) {
 190             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_join problem return value is %d", retVal);
 191          }
 192          else {
 193             if (xa->logLevel>=XMLBLASTER_LOG_INFO) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
 194                                           "pthread_join(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
 195          }
 196       }
 197 
 198       memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
 199    }
 200 
 201    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterAccessUnparsed() conP=0x%x cbP=0x%x", xa->connectionP, xa->callbackP);
 202 
 203    {  /* Wait for any pending update() dispatcher threads to die */
 204       int i;
 205       int num = 1000;
 206       int interval = 10;
 207       for (i=0; i<num; i++) {
 208          if ((int)xa->threadCounter < 1)
 209             break;
 210          sleepMillis(interval);
 211          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 212              "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for update thread to join. %d/%d", interval, i, num);
 213       }
 214       if (i >= num) {
 215          if (xa->logLevel>=XMLBLASTER_LOG_ERROR) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
 216              "freeXmlBlasterAccessUnparsed(): There are active callback threads in user code which didn't return after sleeping for %ld millis, we continue now to shutdown ...", (long)interval*num);
 217       }
 218    }
 219 
 220    if (xa->connectionP != 0) {
 221       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
 222    }
 223 
 224    if (xa->callbackP != 0) {
 225       freeCallbackServerUnparsed(&xa->callbackP);
 226    }
 227 
 228    freeProperties(xa->props);
 229 
 230    rc = pthread_mutex_destroy(&xa->writenMutex); /* On Linux this does nothing, but returns an error code EBUSY if the mutex was locked */
 231    if (rc != 0) /* EBUSY=16 "Device or resource busy": char *strerror_r(int errnum, char *buf, size_t buflen); */
 232       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(writenMutex) returned %d, we ignore it", rc);
 233 
 234    rc = pthread_mutex_destroy(&xa->readnMutex);
 235    if (rc != 0) /* EBUSY */
 236       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(readnMutex) returned %d, we ignore it", rc);
 237 
 238    free(xa);
 239 }
 240 
 241 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp clientUpdateFp, XmlBlasterException *exception)
 242 {
 243    int threadRet = 0;
 244    const char *compressType = 0;
 245 
 246    if (checkArgs(xa, "initialize", false, exception) == false) return false;
 247 
 248    if (xa->isInitialized) {
 249       return true;
 250    }
 251 
 252    if (clientUpdateFp == 0) {
 253       xa->clientsUpdateFp = 0;
 254       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, "",
 255         "Your callback UpdateFp pointer is NULL, we use our default callback handler");
 256    }
 257    else {
 258       xa->clientsUpdateFp = clientUpdateFp;
 259    }
 260 
 261    if (xa->connectionP) {
 262       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
 263    }
 264    xa->connectionP = getXmlBlasterConnectionUnparsed(xa->argc, xa->argv);
 265    if (xa->connectionP == 0) {
 266       strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 267       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 268                "[%.100s:%d] Creating XmlBlasterConnectionUnparsed failed", __FILE__, __LINE__);
 269       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 270       return false;
 271    }
 272    xa->connectionP->log = xa->log;
 273    xa->connectionP->logUserP = xa->logUserP;
 274    xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Created XmlBlasterConnectionUnparsed");
 275 
 276 
 277    /* Switch on compression? */
 278    compressType = xa->props->getString(xa->props, "plugin/socket/compress/type", "");
 279    compressType = xa->props->getString(xa->props, "dispatch/connection/plugin/socket/compress/type", compressType);
 280 
 281    if (!strcmp(compressType, "zlib:stream")) {
 282       xa->connectionP->writeToSocket.writeToSocketFuncP = writenCompressed;
 283       xa->connectionP->writeToSocket.userP = xa;
 284       xa->connectionP->readFromSocket.readFromSocketFuncP = readnCompressed;
 285       xa->connectionP->readFromSocket.userP = xa;
 286    }
 287    else {
 288       if (strcmp(compressType, "")) {
 289          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode", compressType);
 290       }
 291       xa->connectionP->writeToSocket.writeToSocketFuncP = writenPlain;
 292       xa->connectionP->writeToSocket.userP = xa;
 293       xa->connectionP->readFromSocket.readFromSocketFuncP = readnPlain;
 294       xa->connectionP->readFromSocket.userP = xa;
 295    }
 296 
 297    if (xa->connectionP->initConnection(xa->connectionP, exception) == false) /* Establish low level IP connection */
 298       return false;
 299 
 300    /* the fourth arg 'xa' is returned as 'void *userData' in update() method */
 301    if (xa->callbackP != 0) {
 302       freeCallbackServerUnparsed(&xa->callbackP);
 303    }
 304    xa->callbackP = getCallbackServerUnparsed(xa->argc, xa->argv, interceptUpdate, xa);
 305    if (xa->callbackP == 0) {
 306       strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 307       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 308                "[%.100s:%d] Creating CallbackServerUnparsed failed", __FILE__, __LINE__);
 309       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 310       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
 311       return false;
 312    }
 313    xa->callbackP->log = xa->log;
 314    xa->callbackP->logUserP = xa->logUserP;
 315 
 316    if (!strcmp(compressType, "zlib:stream")) {
 317       xa->callbackP->writeToSocket.writeToSocketFuncP = writenCompressed;
 318       xa->callbackP->writeToSocket.userP = xa;
 319       xa->callbackP->readFromSocket.readFromSocketFuncP = readnCompressed;
 320       xa->callbackP->readFromSocket.userP = xa;
 321    }
 322    else {
 323       xa->callbackP->writeToSocket.writeToSocketFuncP = writenPlain;
 324       xa->callbackP->writeToSocket.userP = xa;
 325       xa->callbackP->readFromSocket.readFromSocketFuncP = readnPlain;
 326       xa->callbackP->readFromSocket.userP = xa;
 327    }
 328 
 329    xa->callbackP->useThisSocket(xa->callbackP, xa->connectionP->socketToXmlBlaster, xa->connectionP->socketToXmlBlasterUdp);
 330 
 331    xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
 332           "Created CallbackServerUnparsed instance, creating on a separate thread a listener on socket://%s:%d...",
 333           (xa->callbackP->hostCB == 0) ? "" : xa->callbackP->hostCB, xa->callbackP->portCB);
 334 
 335    /* Register our callback funtion which is called just before sending a message */
 336    xa->connectionP->preSendEvent = preSendEvent;
 337    xa->connectionP->preSendEvent_userP = xa;
 338 
 339    /* Register our callback funtion which is called just after sending a message */
 340    xa->connectionP->postSendEvent = postSendEvent;
 341    xa->connectionP->postSendEvent_userP = xa;
 342 
 343    /* thread blocks on socket listener or on socket read (if useThisSocket) */
 344    threadRet = pthread_create(&xa->callbackThreadId, (const pthread_attr_t *)0, (void * (*)(void *))xa->callbackP->runCallbackServer, (void *)xa->callbackP);
 345    if (threadRet != 0) {
 346       strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 347       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 348                "[%.100s:%d] Creating thread failed with error number %d",
 349                __FILE__, __LINE__, threadRet);
 350       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 351       freeCallbackServerUnparsed(&xa->callbackP);
 352       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
 353       return false;
 354    }
 355    /* bool hasStarted = */xa->callbackP->waitOnCallbackThreadAlive(xa->callbackP, 5000);
 356 
 357    xa->isInitialized = true;
 358    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 359                                 "initialize() successful");
 360    return xa->isInitialized;
 361 }
 362 
 363 static bool isConnected(XmlBlasterAccessUnparsed *xa)
 364 {
 365    if (xa == 0 || xa->isShutdown || xa->connectionP == 0) {
 366       return false;
 367    }
 368    return xa->connectionP->isConnected(xa->connectionP);
 369 }
 370 
 371 /**
 372  * Callback from #XmlBlasterConnectionUnparsed just before a message is sent,
 373  * the msgRequestInfo contains the requestId used.
 374  * This is the clients calling thread.
 375  * @param msgRequestInfoP Contains some informations about the request, may not be NULL
 376  * @param exception May not be NULL
 377  * @return The same (or a manipulated/encrypted) msgRequestInfo, if NULL the exception is filled.
 378  *         If msgRequestInfoP->blob.data was changed and malloc()'d by you, the caller will free() it.
 379  *         If you return NULL you need to call removeResponseListener() to avoid a memory leak.
 380  */
 381 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
 382 {
 383    bool retBool;
 384    int retInt;
 385    XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
 386 
 387    /* if (!strcmp(XMLBLASTER_PUBLISH_ONEWAY, msgRequestInfoP->methodName)) */
 388    if (xbl_isOneway(MSG_TYPE_INVOKE, msgRequestInfoP->methodName))
 389       return msgRequestInfoP;
 390 
 391    /* ======== Initialize threading ====== */
 392    msgRequestInfoP->responseMutexIsValid = false; /* Only to remember if the client thread holds the lock */
 393 
 394    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 395                                 "preSendEvent(%s) occurred", msgRequestInfoP->methodName);
 396    retBool = xa->callbackP->addResponseListener(xa->callbackP, msgRequestInfoP, responseEvent);
 397    if (retBool == false) {
 398       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 399       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 400                "[%.100s:%d] Couldn't register as response listener", __FILE__, __LINE__);
 401       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 402       return (MsgRequestInfo *)0;
 403    }
 404 
 405    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 406                   "preSendEvent(requestId=%s, msgRequestInfoP->responseBlob.dataLen=%d), entering lock",
 407                   msgRequestInfoP->requestIdStr, msgRequestInfoP->responseBlob.dataLen);
 408    pthread_mutex_init(&msgRequestInfoP->responseMutex, NULL); /* returns always 0 */
 409    if ((retInt = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
 410       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 411       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 412                "[%.100s:%d] Error trying to lock responseMutex %d", __FILE__, __LINE__, retInt);
 413       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 414       return (MsgRequestInfo *)0;
 415    }
 416    msgRequestInfoP->responseMutexIsValid = true; /* Only if the client thread holds the lock */
 417 
 418    return msgRequestInfoP;
 419 }
 420 
 421 /**
 422  * This function is called by the callback server when a response message arrived (after we send a request).
 423  * The xa->responseBlob->data is malloc()'d with the response string, you need to free it.
 424  * This method is executed by the callback server thread.
 425  * @param msgRequestInfoP May not be NULL
 426  * @param socketDataHolder is on the stack and does not need to be freed, the 'data' member is
 427  *        malloc()'d and must be freed by the caller.
 428  */
 429 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder) {
 430    int retVal;
 431    SocketDataHolder *s = (SocketDataHolder *)socketDataHolder;
 432    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
 433 
 434    if (msgRequestInfoP == 0)
 435       return;
 436 
 437    if (msgRequestInfoP->responseMutexIsValid == false)
 438       return;
 439 
 440    if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
 441       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to lock responseMutex in responseEvent() failed %d", retVal);
 442       if (msgRequestInfoP->responseMutexIsValid == false)
 443          return;
 444    }
 445    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is LOCKED");
 446 
 447    blobcpyAlloc(&msgRequestInfoP->responseBlob, s->blob.data, s->blob.dataLen);
 448    msgRequestInfoP->responseType = s->type;
 449 
 450    if ((retVal = pthread_cond_signal(&msgRequestInfoP->responseCond)) != 0) {
 451       if (retVal == EINVAL)
 452          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d EINVAL: responseCond is not valid", retVal);
 453       else if (retVal == EFAULT)
 454          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d EFAULT: responseCond points to illegal address", retVal);
 455      else
 456          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d", retVal);
 457      /*return; we need to unlock the mutex */
 458    }
 459 
 460    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 461                                 "responseEvent(requestId '%s', msgType=%c, dataLen=%d) occurred, wake up signal sent",
 462                                 s->requestId, msgRequestInfoP->responseType, msgRequestInfoP->responseBlob.dataLen);
 463 
 464    if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
 465       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to unlock responseMutex in responseEvent() failed %d", retVal);
 466       /* return; */
 467    }
 468    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is UNLOCKED");
 469 }
 470 
 471 /**
 472  * Callback function (wait for response) called directly after a message is sent.
 473  * @param msgRequestInfoP Contains some informations about the request, may not be NULL
 474  * @param exception May not be NULL
 475  * @return The returned string from a request is written into msgRequestInfoP->data,
 476  *         the caller needs to free() it.
 477  */
 478 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
 479 {
 480    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
 481    struct timespec abstime;
 482    bool useTimeout = false;
 483    int retVal, i;
 484 
 485    if (msgRequestInfoP->rollback) {
 486       xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
 487       /* cb->shutdown(), cb->waitOnCallbackThreadTermination() */
 488       mutexUnlock(msgRequestInfoP, exception);
 489       return (MsgRequestInfo *)0;
 490    }
 491 
 492    if (xa->responseTimeout > 0 && getAbsoluteTime(xa->responseTimeout, &abstime) == true) {
 493       useTimeout = true;
 494    }
 495 
 496    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) responseMutex is LOCKED, entering wait ...", msgRequestInfoP->requestIdStr);
 497 
 498    if ((retVal = pthread_cond_init(&msgRequestInfoP->responseCond, NULL)) != 0) {
 499       xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
 500       strncpy0(exception->errorCode, "resource.exhaust", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */
 501       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] pthread_cond_init() for '%s()' with requestId=%s returned %d.",
 502                __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
 503       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 504       return (MsgRequestInfo *)0;
 505    }
 506 
 507    /* Wait for response, the callback server delivers it */
 508    while (msgRequestInfoP->responseType == 0) { /* Protect for spurious wake ups (e.g. by SIGUSR1) */
 509       if (useTimeout == true) {
 510          int error = pthread_cond_timedwait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex, &abstime);
 511          if (error == ETIMEDOUT) {
 512             /*
 513              * TODO: msgRequestInfoP is on the stack and if we now return
 514              * it will be invalid:
 515              * removeResponseListener() removes it from the callback thread
 516              * but what if the callback thread currently uses it?
 517              */
 518             xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
 519             strncpy0(exception->errorCode, "communication.responseTimeout", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */
 520             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Waiting on response for '%s()' with requestId=%s timed out after blocking %ld millis",
 521                     __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, xa->responseTimeout);
 522             if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 523             return (MsgRequestInfo *)0;
 524          }
 525       }
 526       else {
 527          pthread_cond_wait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex); /* Wakes up from responseEvent() */
 528          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 529             "Wake up tread, response of length %d arrived", msgRequestInfoP->responseBlob.dataLen);
 530       }
 531    }
 532 
 533    for (i=0; i<10; i++) { /* Error recovery loop */
 534       if ((retVal = pthread_cond_destroy(&msgRequestInfoP->responseCond)) != 0) {
 535          if (retVal == EBUSY) { /* Is in use by another thread */
 536             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cond_destroy() for '%s()' with requestId=%s returned EBUSY=%d, we try again #%d/10",
 537                 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal, i);
 538             sleepMillis(10);
 539             continue;
 540          }
 541          else {
 542              xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cond_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
 543                  msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
 544          }
 545       }
 546       break;
 547    }
 548 
 549    msgRequestInfoP->blob.dataLen = msgRequestInfoP->responseBlob.dataLen;
 550    msgRequestInfoP->blob.data = msgRequestInfoP->responseBlob.data;
 551    msgRequestInfoP->responseBlob.dataLen = 0;
 552    msgRequestInfoP->responseBlob.data = 0; /* msgRequestInfoP->blob.data is now responsible to free() the data */
 553 
 554    if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
 555       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 556          "Thread #%ld woke up in postSendEvent() for msgType=%c and dataLen=%d",
 557          msgRequestInfoP->requestIdStr, msgRequestInfoP->responseType, msgRequestInfoP->blob.dataLen);
 558 
 559 
 560    if (msgRequestInfoP->responseType == (char)MSG_TYPE_EXCEPTION) {
 561       convertToXmlBlasterException(&msgRequestInfoP->blob, exception, false);
 562       freeBlobHolderContent(&msgRequestInfoP->blob);
 563       msgRequestInfoP->responseType = 0;
 564       return (MsgRequestInfo *)0;
 565    }
 566 
 567    msgRequestInfoP->responseType = 0;
 568 
 569    /* if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) i woke up, entering unlock ...", msgRequestInfoP->requestIdStr); */
 570    if (mutexUnlock(msgRequestInfoP, exception) == false)
 571       return (MsgRequestInfo *)0;
 572 
 573    return msgRequestInfoP;
 574 }
 575 
 576 /**
 577  * Free lock.
 578  * @param msgRequestInfoP Transporting data
 579  * @param exception The exception struct, can be null
 580  * @return false on error, the exception struct is filled in this case and the lock is not released
 581  */
 582 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception) {
 583    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
 584    int retVal;
 585    if (msgRequestInfoP->responseMutexIsValid == false)
 586       return true;
 587    msgRequestInfoP->responseMutexIsValid = false;
 588    if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
 589       char embeddedText[XMLBLASTEREXCEPTION_MESSAGE_LEN];
 590       if (exception == 0) {
 591          if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
 592             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
 593                        msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
 594          }
 595          return false;
 596       }
 597       if (*exception->errorCode != 0) {
 598          SNPRINTF(embeddedText, XMLBLASTEREXCEPTION_MESSAGE_LEN, "{%s:%s}", exception->errorCode, exception->message);
 599          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Ignoring embedded exception %s: %s", exception->errorCode, exception->message);
 600       }
 601       else
 602          *embeddedText = 0;
 603       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 604       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] ERROR trying to unlock responseMutex, return=%d. Embedded %s", __FILE__, __LINE__, retVal, embeddedText);
 605       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 606 
 607       if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
 608          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
 609                     msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
 610       }
 611       return false;
 612    }
 613    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent() responseMutex is UNLOCKED");
 614 
 615    if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
 616       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
 617                  msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
 618    }
 619    return true;
 620 }
 621 
 622 Dll_Export const char *xmlBlasterAccessUnparsedUsage(char *usage)
 623 {
 624    /* take care not to exceed XMLBLASTER_MAX_USAGE_LEN */
 625    SNPRINTF(usage, XMLBLASTER_MAX_USAGE_LEN, "%.800s%.800s%.400s", xmlBlasterConnectionUnparsedUsage(), callbackServerRawUsage(),
 626                   "\n   -plugin/socket/multiThreaded  [true]"
 627                   "\n                       If true the update() call to your client code is a separate thread."
 628                   "\n   -plugin/socket/responseTimeout  [60000 (one minute)]"
 629                   "\n                       The time in millis to wait on a response, 0 is forever."
 630                   "\n   -logLevel           ERROR | WARN | INFO | TRACE | DUMP [WARN]"
 631                   );
 632 
 633    return usage;
 634 }
 635 
 636 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos,
 637                                UpdateFp clientUpdateFp, XmlBlasterException *exception)
 638 {
 639    char *response = 0;
 640    char *qos_;
 641 
 642    if (checkArgs(xa, "connect", false, exception) == false) return 0;
 643 
 644    /* Is allowed, we use our default handler in this case
 645    if (clientUpdateFp == 0) {
 646       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 647       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'updateFp' to connect()", __FILE__, __LINE__);
 648       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 649       return false;
 650    }
 651    */
 652 
 653    if (qos == 0) {
 654       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 655       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'qos' to connect()", __FILE__, __LINE__);
 656       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 657       return false;
 658    }
 659 
 660    if (initialize(xa, clientUpdateFp, exception) == false) {
 661       return false;
 662    }
 663 
 664    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Invoking connect()");
 665 
 666    if (strstr(qos, "<callback") != 0) {
 667       /* User has given us a callback address */
 668       qos_ = strcpyAlloc(qos);
 669    }
 670    else {
 671       /* We add the callback sequence with our tunnel callback host and port
 672          HACK: This is error prone depending on the given qos */
 673       const char *pos;
 674       enum { SIZE=1024 };
 675       char callbackQos[SIZE];
 676       snprintf0(callbackQos, SIZE,
 677                "<queue relating='callback'>" /* maxEntries='100' maxEntriesCache='100'>" */
 678                "  <callback type='SOCKET' sessionId='%s'>"
 679                "    socket://%.120s:%d"
 680                "  </callback>"
 681                "</queue>",
 682                "NoCallbackSessionId", xa->callbackP->hostCB, xa->callbackP->portCB);
 683       qos_ = (char *)calloc(strlen(qos) + SIZE, sizeof(char *));
 684       pos = strstr(qos, "</qos>");
 685       if (pos == 0) {
 686          strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 687          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid 'qos' markup to connect()", __FILE__, __LINE__);
 688          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 689          return false;
 690       }
 691       strncpy0(qos_, qos, pos-qos+1);
 692       strncat0(qos_, callbackQos, SIZE-strlen(qos_));
 693       strncat0(qos_, "</qos>", 8);
 694    }
 695    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connecting with qos=%s", qos_);
 696 
 697    /* Register our function responseEvent() to be notified when the response arrives,
 698       this is done by preSendEvent() callback called during connect() */
 699 
 700    response = xa->connectionP->connect(xa->connectionP, qos_, exception);
 701 
 702    free(qos_);
 703    /* freeBlobHolderContent(&xa->responseBlob); */
 704 
 705    /* The response was handled by a callback to postSendEvent */
 706 
 707    if (response == 0) return response;
 708 
 709    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 710       "Got response for connect(secretSessionId=%s)", xa->connectionP->secretSessionId);
 711    return response;
 712 }
 713 
 714 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception)
 715 {
 716    bool p;
 717    if (checkArgs(xa, "disconnect", true, exception) == false ) return 0;
 718    p = xa->connectionP->disconnect(xa->connectionP, qos, exception);
 719    return p;
 720 }
 721 
 722 /**
 723  * Publish a message to the server.
 724  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
 725  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 726  * @see XmlBlasterConnectionUnparsed#publish() for a function documentation
 727  */
 728 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception)
 729 {
 730    char *p;
 731    if (checkArgs(xa, "publish", true, exception) == false ) return 0;
 732    p = xa->connectionP->publish(xa->connectionP, msgUnit, exception);
 733    return p;
 734 }
 735 
 736 /**
 737  * Publish a message array in a bulk to the server.
 738  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
 739  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 740  * @see XmlBlasterConnectionUnparsed#publishArr() for a function documentation
 741  */
 742 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
 743 {
 744    QosArr *p;
 745    if (checkArgs(xa, "publishArr", true, exception) == false ) return 0;
 746    p = xa->connectionP->publishArr(xa->connectionP, msgUnitArr, exception);
 747    return p;
 748 }
 749 
 750 /**
 751  * Publish a message array in a bulk to the server, we don't receive an ACK.
 752  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
 753  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 754  * @see XmlBlasterConnectionUnparsed#publishOneway() for a function documentation
 755  */
 756 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
 757 {
 758    if (checkArgs(xa, "publishOneway", true, exception) == false ) return;
 759    xa->connectionP->publishOneway(xa->connectionP, msgUnitArr, exception);
 760 }
 761 
 762 /**
 763  * Subscribe a message.
 764  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.subscribe.html
 765  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 766  */
 767 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
 768 {
 769    char *p;
 770    if (checkArgs(xa, "subscribe", true, exception) == false ) return 0;
 771    p = xa->connectionP->subscribe(xa->connectionP, key, qos, exception);
 772    return p;
 773 }
 774 
 775 /**
 776  * UnSubscribe a message from the server.
 777  * @return The raw QoS XML strings returned from xmlBlaster, only NULL if an exception is thrown
 778  *         You need to free it with freeQosArr() after usage
 779  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.unSubscribe.html
 780  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 781  */
 782 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
 783 {
 784    QosArr *p;
 785    if (checkArgs(xa, "unSubscribe", true, exception) == false ) return 0;
 786    p = xa->connectionP->unSubscribe(xa->connectionP, key, qos, exception);
 787    return p;
 788 }
 789 
 790 /**
 791  * Erase a message from the server.
 792  * @return A struct holding the raw QoS XML strings returned from xmlBlaster,
 793  *         only NULL if an exception is thrown.
 794  *         You need to freeQosArr() it
 795  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.erase.html
 796  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 797  */
 798 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
 799 {
 800    QosArr *p;
 801    if (checkArgs(xa, "erase", true, exception) == false ) return 0;
 802    p = xa->connectionP->erase(xa->connectionP, key, qos, exception);
 803    return p;
 804 }
 805 
 806 /**
 807  * Ping the server.
 808  * @param xa The 'this' pointer
 809  * @param qos The QoS or 0
 810  * @param exception *errorCode!=0 on failure
 811  * @return The ping return QoS raw xml string, you need to free() it
 812  *         or 0 on failure (in which case *exception.errorCode!=0)
 813  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 814  */
 815 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception)
 816 {
 817    char *p;
 818    if (checkArgs(xa, "ping", true, exception) == false ) return 0;
 819    p = xa->connectionP->ping(xa->connectionP, qos, exception);
 820    return p;
 821 }
 822 
 823 /**
 824  * Get a message.
 825  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.get.html
 826  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 827  * @return NULL on error, please check exception in such a case
 828  */
 829 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
 830 {
 831    MsgUnitArr *msgUnitArr;
 832    if (checkArgs(xa, "get", true, exception) == false ) return 0;
 833    msgUnitArr = xa->connectionP->get(xa->connectionP, key, qos, exception);
 834    return msgUnitArr;
 835 }
 836 
 837 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName,
 838             bool checkIsConnected, XmlBlasterException *exception)
 839 {
 840    if (exception != 0)
 841       initializeXmlBlasterException(exception);
 842 
 843    if (xa == 0) {
 844       char *stack = getStackTrace(10);
 845       if (exception == 0) {
 846          printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %s() %s",
 847                   __FILE__, __LINE__, methodName, stack);
 848       }
 849       else {
 850          strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 851          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 852                   "[%.100s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %.16s() %s",
 853                    __FILE__, __LINE__, methodName, stack);
 854          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
 855       }
 856       free(stack);
 857       return false;
 858    }
 859 
 860    if (exception == 0) {
 861       char *stack = getStackTrace(10);
 862       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s",
 863               __FILE__, __LINE__, methodName, stack);
 864       free(stack);
 865       return false;
 866    }
 867 
 868    if (xa->isShutdown || (checkIsConnected && !xa->isConnected(xa))) {
 869       char *stack = getStackTrace(10);
 870       strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 871       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 872                "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s",
 873                 __FILE__, __LINE__, methodName, stack);
 874       free(stack);
 875       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
 876       return false;
 877    }
 878 
 879    return true;
 880 }
 881 
 882 /**
 883  * Run by the new created thread, calls the clients update method.
 884  * Leaving this pthread start routine does an implicit pthread_exit().
 885  * @param container Holding all necessary informations, we free it when we are done
 886  * @return 0 on success, 1 on error. The return value is the exit value returned by pthread_join()
 887  */
 888 static int runUpdate(UpdateContainer *container)
 889 {
 890    XmlBlasterAccessUnparsed *xa = container->xa;
 891    MsgUnitArr *msgUnitArrP = container->msgUnitArrP;
 892    void *userData = container->userData;
 893    CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData;
 894    XmlBlasterException *exception = &container->exception;
 895    SocketDataHolder *socketDataHolder = &container->socketDataHolder;
 896    XMLBLASTER_C_bool retVal;
 897 
 898    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Entering runUpdate()");
 899 
 900    retVal = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
 901 
 902    if (xa->lowLevelAutoAck) { /* returned already */
 903    }
 904    else {
 905       cb->sendResponseOrException(retVal, cb, socketDataHolder, msgUnitArrP, exception);
 906    }
 907 
 908    free(container);
 909 
 910    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 911                                 "runUpdate: Update thread 0x%x is exiting", get_pthread_id(pthread_self()));
 912    xa->threadCounter--;
 913    return (retVal==true) ? 0 : 1;
 914 }
 915 
 916 /**
 917  * Here we receive the callback messages from xmlBlaster, create a thread and dispatch
 918  * it to the clients update.
 919  * @see UpdateFp in CallbackServerUnparsed.h
 920  */
 921 static void interceptUpdate(MsgUnitArr *msgUnitArrP, void *userData,
 922                             XmlBlasterException *exception, void /*SocketDataHolder*/ *socketDataHolder)
 923 {
 924    CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData;
 925    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)cb->updateCbUserData;
 926 
 927    if (xa->clientsUpdateFp == 0) { /* Client has not registered an update() */
 928       size_t i;
 929       bool testException = false;
 930       bool success = true;
 931 
 932       for (i=0; i<msgUnitArrP->len; i++) {
 933          const char *key = msgUnitArrP->msgUnitArr[i].key;
 934          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
 935              "CALLBACK update() default handler: Asynchronous message update arrived:%s id=%s, we ignore it in this default handler\n",
 936              key, ((SocketDataHolder*)socketDataHolder)->requestId);
 937          msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
 938          /* Return QoS: Everything is OK */
 939       }
 940       if (testException) {
 941          strncpy0(exception->errorCode, "user.clientCode",
 942                   XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 943          strncpy0(exception->message, "I don't want these messages",
 944                   XMLBLASTEREXCEPTION_MESSAGE_LEN);
 945          success = false;
 946       }
 947       cb->sendResponseOrException(success, cb, socketDataHolder, msgUnitArrP, exception);
 948       return;
 949    }
 950 
 951    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "interceptUpdate(): Received message");
 952 
 953    if (xa->callbackMultiThreaded == false) {
 954       XMLBLASTER_C_bool ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
 955       cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception);
 956       return;
 957    }
 958 
 959    {
 960       pthread_t tid;
 961       int threadRet = 0;
 962       UpdateContainer *container = (UpdateContainer*)malloc(sizeof(UpdateContainer));
 963       pthread_attr_t attr;
 964 
 965       pthread_attr_init(&attr);
 966       /* Cleanup all resources after ending the thread, instead of calling pthread_join() */
 967       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 968 
 969       container->xa = xa;
 970       container->msgUnitArrP = msgUnitArrP;
 971       container->userData = userData;
 972       memcpy(&container->exception, exception, sizeof(XmlBlasterException));
 973       memcpy(&container->socketDataHolder, socketDataHolder, sizeof(SocketDataHolder)); /* The blob pointer is freed already by CallbackServerUnparsed */
 974 
 975       if (xa->lowLevelAutoAck) {
 976          size_t i;
 977          for (i=0; i<msgUnitArrP->len; i++) {
 978             msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
 979          }
 980       }
 981 
 982       /*
 983         Guaranteed sequence:
 984         The server uses max one thread to deliver update() for each client
 985         If the update contains an array of messages those are handled as a
 986         complete bulk in the correct sequence here.
 987       */
 988 
 989       /* this thread will deliver the update message to the client code,
 990          Note: we need a thread pool cache for better performance */
 991       xa->threadCounter++;
 992       threadRet = pthread_create(&tid, &attr,
 993                         (void * (*)(void *))runUpdate, (void *)container);
 994       if (threadRet != 0) {
 995          XMLBLASTER_C_bool ret = false;
 996          free(container);
 997          strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 998          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 999                   "[%.100s:%d] Creating thread failed with error number %d, we deliver the message in the same thread",
1000                   __FILE__, __LINE__, threadRet);
1001          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
1002          ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
1003          cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception);
1004          xa->threadCounter--;
1005          pthread_attr_destroy(&attr);
1006          return;
1007       }
1008 
1009       /* Is done already with above PTHREAD_CREATE_DETACHED
1010          threadRet = pthread_detach(tid);
1011          if (threadRet != 0) {
1012             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching thread failed with error number %d", __LINE__, threadRet);
1013          }
1014       */
1015 
1016       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1017          "interceptUpdate: Received message and delegated it to a separate thread 0x%x to deliver", get_pthread_id(tid));
1018 
1019       pthread_attr_destroy(&attr);
1020    }
1021 
1022    if (xa->lowLevelAutoAck) {
1023       *exception->errorCode = 0;
1024       cb->sendResponseOrException(true, cb, socketDataHolder, msgUnitArrP, exception);
1025    }
1026 }
1027 
1028 /**
1029  * Write uncompressed to socket (thread safe)
1030  */
1031 static ssize_t writenPlain(void * userP, const int fd, const char *ptr, const size_t nbytes) {
1032    int rc;
1033    ssize_t ret;
1034 
1035    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1036 
1037    /* Start mutex */
1038    rc = pthread_mutex_lock(&xa->writenMutex);
1039    if (rc != 0) /* EINVAL */
1040       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc);
1041 
1042    /* Send data */
1043    ret = writen(fd, ptr, nbytes);
1044 
1045    /* End mutex */
1046    rc = pthread_mutex_unlock(&xa->writenMutex);
1047    if (rc != 0) /* EPERM */
1048       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc);
1049 
1050    return ret;
1051 
1052 }
1053 
1054 /**
1055  * Compress data and send to socket.
1056  */
1057 static ssize_t writenCompressed(void *userP, const int fd, const char *ptr, const size_t nbytes) {
1058    int rc;
1059    ssize_t ret;
1060 
1061    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1062    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "writenCompressed(%u)", nbytes);
1063 
1064    /* Start mutex */
1065    rc = pthread_mutex_lock(&xa->writenMutex);
1066    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc);
1067 
1068    /* Send data */
1069    ret = xmlBlaster_writenCompressed(xa->connectionP->zlibWriteBuf, fd, ptr, nbytes);
1070 
1071    /* End mutex */
1072    rc = pthread_mutex_unlock(&xa->writenMutex);
1073    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc);
1074 
1075    return ret;
1076 }
1077 
1078 /**
1079  * Read uncompressed to socket (thread safe)
1080  */
1081 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1082    int rc;
1083    ssize_t ret;
1084 
1085    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1086 
1087    rc = pthread_mutex_lock(&xa->readnMutex);
1088    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc);
1089 
1090    ret = readn(fd, ptr, nbytes, fpNumRead, userP2);
1091 
1092    rc = pthread_mutex_unlock(&xa->readnMutex);
1093    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc);
1094 
1095    return ret;
1096 }
1097 
1098 /**
1099  * Read data from socket, uncompress it if necessary.
1100  */
1101 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1102    int rc;
1103    ssize_t ret;
1104 
1105    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1106    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "readnCompressed(%u)", nbytes);
1107 
1108    rc = pthread_mutex_lock(&xa->readnMutex);
1109    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc);
1110 
1111    ret = xmlBlaster_readnCompressed(xa->connectionP->zlibReadBuf, fd, ptr, nbytes, fpNumRead, userP2);
1112 
1113    rc = pthread_mutex_unlock(&xa->readnMutex);
1114    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc);
1115 
1116    return ret;
1117 }
1118 
1119 #ifdef XmlBlasterAccessUnparsedMain /* compile a standalone test program */
1120 
1121 /**
1122  * Here we receive the callback messages from xmlBlaster
1123  * FOR TESTING ONLY
1124  * @see UpdateFp in CallbackServerUnparsed.h
1125  */
1126 static bool myUpdate(MsgUnitArr *msgUnitArrP, void *userData, XmlBlasterException *xmlBlasterException)
1127 {
1128    size_t i;
1129    bool testException = false;
1130    if (userData != 0) ; /* to avoid compiler warning (we don't need it here) */
1131    for (i=0; i<msgUnitArrP->len; i++) {
1132       char *xml = messageUnitToXml(&msgUnitArrP->msgUnitArr[i]);
1133       printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n", xml);
1134       free(xml);
1135       msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
1136       /* Return QoS: Everything is OK */
1137    }
1138    if (testException) {
1139       strncpy0(xmlBlasterException->errorCode, "user.clientCode",
1140                XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1141       strncpy0(xmlBlasterException->message, "I don't want these messages",
1142                XMLBLASTEREXCEPTION_MESSAGE_LEN);
1143       return false;
1144    }
1145    return true;
1146 }
1147 
1148 /**
1149  * Invoke: XmlBlasterAccessUnparsedMain -logLevel TRACE  -numTests 10
1150  */
1151 int main(int argc, char** argv)
1152 {
1153    int ii;
1154    int numTests = 1;
1155    bool testCallInitialize = false;
1156 
1157    for (ii=0; ii < argc-1; ii++)
1158       if (strcmp(argv[ii], "-numTests") == 0) {
1159          if (strToInt(&numTests, argv[++ii]) == false)
1160             printf("[XmlBlasterAccessUnparsed] WARN '-numTests %s' is invalid\n", argv[ii]);
1161       }
1162 
1163    for (ii=0; ii<numTests; ii++) {
1164       int iarg;
1165       char *response = (char *)0;
1166       /*
1167        * callbackSessionId:
1168        * Is created by the client and used to validate callback messages in update.
1169        * This is sent on connect in ConnectQos.
1170        * (Is different from the xmlBlaster secret session ID)
1171        */
1172       const char *callbackSessionId = "topSecret";
1173       XmlBlasterException xmlBlasterException;
1174       XmlBlasterAccessUnparsed *xa = 0;
1175 
1176       /*
1177       const char *tmp = getStackTrace(20);
1178       printf("[client] stackTrace=%s\n", tmp);
1179       free(tmp);
1180       */
1181 
1182 #     ifdef PTHREAD_THREADS_MAX
1183          printf("[client] Try option '-help' if you need usage informations, max %d"
1184                 " threads per process are supported on this OS\n", PTHREAD_THREADS_MAX);
1185 #     else
1186          printf("[client] Try option '-help' if you need usage informations\n");
1187 #     endif
1188 
1189       for (iarg=0; iarg < argc; iarg++) {
1190          if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
1191             char usage[XMLBLASTER_MAX_USAGE_LEN];
1192             const char *pp =
1193             "\n   -logLevel            ERROR | WARN | INFO | TRACE | DUMP [WARN]"
1194             "\n   -numTests            How often to run the same tests [1]"
1195             "\n\nExample:"
1196             "\n   XmlBlasterAccessUnparsedMain -logLevel TRACE"
1197                  " -dispatch/connection/plugin/socket/hostname server.mars.universe";
1198             printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
1199                    getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
1200             exit(1);
1201          }
1202       }
1203 
1204       xa = getXmlBlasterAccessUnparsed(argc, argv);
1205 
1206       if (testCallInitialize) {
1207          if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) {
1208             printf("[client] Connection to xmlBlaster failed,"
1209                    " please start the server or check your configuration\n");
1210             freeXmlBlasterAccessUnparsed(xa);
1211             exit(1);
1212          }
1213       }
1214 
1215       {  /* connect */
1216          char connectQos[2048];
1217          char callbackQos[1024];
1218 
1219          if (testCallInitialize) {
1220             SNPRINTF(callbackQos, 1024,
1221                      "<queue relating='callback' maxEntries='100' maxEntriesCache='100'>"
1222                      "  <callback type='SOCKET' sessionId='%s'>"
1223                      "    socket://%.120s:%d"
1224                      "  </callback>"
1225                      "</queue>",
1226                      callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB);
1227          }
1228          else
1229             *callbackQos = '\0';
1230 
1231          SNPRINTF(connectQos, 2048,
1232                 "<qos>"
1233                 " <securityService type='htpasswd' version='1.0'>"
1234                 "  <![CDATA["
1235                 "   <user>fritz</user>"
1236                 "   <passwd>secret</passwd>"
1237                 "  ]]>"
1238                 " </securityService>"
1239                 "%.1024s"
1240                 "</qos>", callbackQos);
1241 
1242          response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException);
1243          if (*xmlBlasterException.errorCode != 0) {
1244             printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
1245                    xmlBlasterException.errorCode, xmlBlasterException.message);
1246             freeXmlBlasterAccessUnparsed(xa);
1247             exit(1);
1248          }
1249          free(response);
1250          printf("[client] Connected to xmlBlaster, do some tests ...\n");
1251       }
1252 
1253       response = xa->ping(xa, 0, &xmlBlasterException);
1254       if (response == (char *)0) {
1255          printf("[client] ERROR: Pinging a connected server failed: errorCode=%s, message=%s\n",
1256             xmlBlasterException.errorCode, xmlBlasterException.message);
1257       }
1258       else {
1259          printf("[client] Pinging a connected server, response=%s\n", response);
1260          free(response);
1261       }
1262 
1263       { /* subscribe ... */
1264          const char *key = "<key oid='HelloWorld'/>";
1265          const char *qos = "<qos/>";
1266          printf("[client] Subscribe message 'HelloWorld' ...\n");
1267          response = xa->subscribe(xa, key, qos, &xmlBlasterException);
1268          if (*xmlBlasterException.errorCode != 0) {
1269             printf("[client] Caught exception in subscribe errorCode=%s, message=%s\n",
1270                    xmlBlasterException.errorCode, xmlBlasterException.message);
1271             xa->disconnect(xa, 0, &xmlBlasterException);
1272             freeXmlBlasterAccessUnparsed(xa);
1273             exit(1);
1274          }
1275          printf("[client] Subscribe success, returned status is '%s'\n", response);
1276          free(response);
1277       }
1278 
1279       {  /* publish ... */
1280          MsgUnit msgUnit;
1281          printf("[client] Publishing message 'HelloWorld' ...\n");
1282          msgUnit.key = strcpyAlloc("<key oid='HelloWorld'/>");
1283          msgUnit.content = strcpyAlloc("Some message payload");
1284          msgUnit.contentLen = strlen(msgUnit.content);
1285          msgUnit.qos =strcpyAlloc("<qos><persistent/></qos>");
1286          response = xa->publish(xa, &msgUnit, &xmlBlasterException);
1287          freeMsgUnitData(&msgUnit);
1288          if (*xmlBlasterException.errorCode != 0) {
1289             printf("[client] Caught exception in publish errorCode=%s, message=%s\n",
1290                    xmlBlasterException.errorCode, xmlBlasterException.message);
1291             xa->disconnect(xa, 0, &xmlBlasterException);
1292             freeXmlBlasterAccessUnparsed(xa);
1293             exit(1);
1294          }
1295          printf("[client] Publish success, returned status is '%s'\n", response);
1296          free(response);
1297       }
1298 
1299       {  /* unSubscribe ... */
1300          const char *key = "<key oid='HelloWorld'/>";
1301          const char *qos = "<qos/>";
1302          printf("[client] UnSubscribe message 'HelloWorld' ...\n");
1303          response = xa->unSubscribe(xa, key, qos, &xmlBlasterException);
1304          if (response) {
1305             printf("[client] Unsubscribe success, returned status is '%s'\n", response);
1306             free(response);
1307          }
1308          else {
1309             printf("[client] Caught exception in unSubscribe errorCode=%s, message=%s\n",
1310                    xmlBlasterException.errorCode, xmlBlasterException.message);
1311             xa->disconnect(xa, 0, &xmlBlasterException);
1312             freeXmlBlasterAccessUnparsed(xa);
1313             exit(1);
1314          }
1315       }
1316 
1317       {  /* get synchnronous ... */
1318          size_t i;
1319          const char *key = "<key queryType='XPATH'>//key</key>";
1320          const char *qos = "<qos/>";
1321          MsgUnitArr *msgUnitArr;
1322          printf("[client] Get synchronous messages with XPath '//key' ...\n");
1323          msgUnitArr = xa->get(xa, key, qos, &xmlBlasterException);
1324          if (*xmlBlasterException.errorCode != 0) {
1325             printf("[client] Caught exception in get errorCode=%s, message=%s\n",
1326                    xmlBlasterException.errorCode, xmlBlasterException.message);
1327             xa->disconnect(xa, 0, &xmlBlasterException);
1328             freeXmlBlasterAccessUnparsed(xa);
1329             exit(1);
1330          }
1331          if (msgUnitArr != (MsgUnitArr *)0) {
1332             for (i=0; i<msgUnitArr->len; i++) {
1333                char *contentStr = strFromBlobAlloc(msgUnitArr->msgUnitArr[i].content,
1334                                                 msgUnitArr->msgUnitArr[i].contentLen);
1335                const char *dots = (msgUnitArr->msgUnitArr[i].contentLen > 96) ?
1336                                   " ..." : "";
1337                printf("\n[client] Received message#%u/%u:\n"
1338                       "-------------------------------------"
1339                       "%s\n <content>%.100s%s</content>%s\n"
1340                       "-------------------------------------\n",
1341                       i+1, msgUnitArr->len,
1342                       msgUnitArr->msgUnitArr[i].key,
1343                       contentStr, dots,
1344                       msgUnitArr->msgUnitArr[i].qos);
1345                free(contentStr);
1346             }
1347             freeMsgUnitArr(msgUnitArr);
1348          }
1349          else {
1350             printf("[client] Caught exception in get errorCode=%s, message=%s\n",
1351                    xmlBlasterException.errorCode, xmlBlasterException.message);
1352             xa->disconnect(xa, 0, &xmlBlasterException);
1353             freeXmlBlasterAccessUnparsed(xa);
1354             exit(1);
1355          }
1356       }
1357 
1358 
1359       {  /* erase ... */
1360          const char *key = "<key oid='HelloWorld'/>";
1361          const char *qos = "<qos/>";
1362          printf("[client] Erasing message 'HelloWorld' ...\n");
1363          response = xa->erase(xa, key, qos, &xmlBlasterException);
1364          if (*xmlBlasterException.errorCode != 0) {
1365             printf("[client] Caught exception in erase errorCode=%s, message=%s\n",
1366                    xmlBlasterException.errorCode, xmlBlasterException.message);
1367             xa->disconnect(xa, 0, &xmlBlasterException);
1368             freeXmlBlasterAccessUnparsed(xa);
1369             exit(1);
1370          }
1371          printf("[client] Erase success, returned status is '%s'\n", response);
1372          free(response);
1373       }
1374 
1375       if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
1376          printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
1377                 xmlBlasterException.errorCode, xmlBlasterException.message);
1378          freeXmlBlasterAccessUnparsed(xa);
1379          exit(1);
1380       }
1381 
1382       freeXmlBlasterAccessUnparsed(xa);
1383       if (numTests > 1) {
1384          printf("[client] Successfully finished test #%d from %d\n\n", ii, numTests);
1385       }
1386    }
1387    printf("[client] Good bye.\n");
1388    return 0; /*exit(0);*/
1389 }
1390 #endif /* #ifdef XmlBlasterAccessUnparsedMain */


syntax highlighted by Code2HTML, v. 0.9.1