1 /*----------------------------------------------------------------------------
2 Name: XmlBlasterAccessUnparsed.c
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Wraps raw socket connection to xmlBlaster
6 Implements sync connection and async callback
7 Needs pthread to compile (multi threading).
8 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info>
9 Compile:
10 LINUX: gcc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -export-dynamic -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
11 g++ -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
12 icc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
13 WIN: cl /MT /W4 -DXmlBlasterAccessUnparsedMain -D_WINDOWS -I.. -I../pthreads /FeXmlBlasterAccessUnparsedMain.exe XmlBlasterAccessUnparsed.c ..\util\msgUtil.c ..\util\Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c ws2_32.lib pthreadVC2.lib
14 (download pthread for Windows and WinCE from http://sources.redhat.com/pthreads-win32)
15 Solaris: cc -DXmlBlasterAccessUnparsedMain -v -Xc -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
16 CC -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
17
18 Linux with libxmlBlasterC.so:
19 gcc -DXmlBlasterAccessUnparsedMain -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c -L../../../lib -lxmlBlasterClientC -I.. -Wl,-rpath=../../../lib -D_REENTRANT -lpthread
20 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
21 -----------------------------------------------------------------------------*/
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #if defined(WINCE)
26 # if defined(XB_USE_PTHREADS)
27 # include <pthreads/pthread.h>
28 # else
29 /*#include <pthreads/need_errno.h> */
30 static int errno=0; /* single threaded workaround*/
31 # endif
32 #else
33 # include <errno.h>
34 # include <sys/types.h>
35 #endif
36 #include <socket/xmlBlasterSocket.h>
37 #include <socket/xmlBlasterZlib.h>
38 #include <XmlBlasterAccessUnparsed.h>
39 #include <util/Timestampc.h>
40
41 /**
42 * Little helper to collect args for the new created thread
43 */
44 typedef struct Dll_Export UpdateContainer {
45 XmlBlasterAccessUnparsed *xa;
46 MsgUnitArr *msgUnitArrP;
47 void *userData;
48 XmlBlasterException exception; /* Holding a clone from the original as the callback thread may use it for another message */
49 SocketDataHolder socketDataHolder; /* Holding a clone from the original */
50 } UpdateContainer;
51
52 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp update, XmlBlasterException *exception);
53 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos, UpdateFp update, XmlBlasterException *exception);
54 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
55 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception);
56 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
57 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
58 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
59 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
60 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
61 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
62 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
63 static bool isConnected(XmlBlasterAccessUnparsed *xa);
64 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder);
65 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
66 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
67 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
68 static void interceptUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, void/*SocketDataHolder*/ *socketDataHolder);
69 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception);
70 static ssize_t writenPlain(void *xa, const int fd, const char *ptr, const size_t nbytes);
71 static ssize_t writenCompressed(void *xa, const int fd, const char *ptr, const size_t nbytes);
72 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
73 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
74
75 Dll_Export XmlBlasterAccessUnparsed *getXmlBlasterAccessUnparsed(int argc, const char* const* argv) {
76 XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)calloc(1, sizeof(XmlBlasterAccessUnparsed));
77 if (xa == 0) return xa;
78 xa->argc = argc;
79 xa->argv = argv;
80 xa->props = createProperties(xa->argc, xa->argv);
81 if (xa->props == 0) {
82 freeXmlBlasterAccessUnparsed(xa);
83 return (XmlBlasterAccessUnparsed *)0;
84 }
85 xa->isInitialized = false;
86 xa->isShutdown = false;
87 xa->connectionP = 0;
88 xa->callbackP = 0;
89 xa->userObject = 0; /* A client can use this pointer to point to any client specific information */
90 xa->userFp = 0;
91 xa->connect = xmlBlasterConnect;
92 xa->initialize = initialize;
93 xa->disconnect = xmlBlasterDisconnect;
94 xa->publish = xmlBlasterPublish;
95 xa->publishArr = xmlBlasterPublishArr;
96 xa->publishOneway = xmlBlasterPublishOneway;
97 xa->subscribe = xmlBlasterSubscribe;
98 xa->unSubscribe = xmlBlasterUnSubscribe;
99 xa->erase = xmlBlasterErase;
100 xa->get = xmlBlasterGet;
101 xa->ping = xmlBlasterPing;
102 xa->isConnected = isConnected;
103 xa->logLevel = parseLogLevel(xa->props->getString(xa->props, "logLevel", "WARN"));
104 xa->log = xmlBlasterDefaultLogging;
105 xa->logUserP = 0;
106 xa->clientsUpdateFp = 0;
107 xa->callbackMultiThreaded = xa->props->getBool(xa->props, "plugin/socket/multiThreaded", true);
108 xa->callbackMultiThreaded = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/multiThreaded", xa->callbackMultiThreaded);
109 /* xa->lowLevelAutoAck = xa->props->getBool(xa->props, "plugin/socket/lowLevelAutoAck", false); */
110 /* xa->lowLevelAutoAck = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/lowLevelAutoAck", xa->lowLevelAutoAck); */
111 /* Currently forced to false: needs mutex and reference counter to not freeMsgUnitArr twice */
112 xa->lowLevelAutoAck = false;
113
114 /* We shouldn't do much logging here, as the caller had no chance to redirect it up to now */
115 if (xa->callbackMultiThreaded == true) {
116 if (xa->logLevel>=XMLBLASTER_LOG_DUMP)
117 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, "Multi threaded callback delivery is activated with -plugin/socket/multiThreaded true");
118 /*xa->callbackMultiThreaded = false;*/
119 }
120 /* stdint.h: # define INT32_MAX (2147483647) */
121 xa->responseTimeout = xa->props->getLong(xa->props, "plugin/socket/responseTimeout", 2147483647L); /* Before xmlBlaster 1.1: One minute (given in millis) */
122 xa->responseTimeout = xa->props->getLong(xa->props, "dispatch/connection/plugin/socket/responseTimeout", xa->responseTimeout);
123 /* ERROR HANDLING ? xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Your configuration '-plugin/socket/responseTimeout %s' is invalid", argv[iarg]); */
124 memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
125 xa->threadCounter = 0;
126
127 if (xa->logLevel>=XMLBLASTER_LOG_DUMP) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__,
128 "Created handle: -logLevel=%s -plugin/socket/responseTimeout=%ld",
129 getLogLevelStr(xa->logLevel), xa->responseTimeout);
130
131 /* See: http://www.llnl.gov/computing/tutorials/workshops/workshop/pthreads/MAIN.html */
132 pthread_mutex_init(&xa->writenMutex, NULL); /* returns always 0 */
133 pthread_mutex_init(&xa->readnMutex, NULL);
134 return xa;
135 }
136
137 Dll_Export void freeXmlBlasterAccessUnparsed(XmlBlasterAccessUnparsed *xa)
138 {
139 int rc;
140
141 if (xa == 0) {
142 char *stack = getStackTrace(10);
143 printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to freeXmlBlasterAccessUnparsed() %s",
144 __FILE__, __LINE__, stack);
145 free(stack);
146 return;
147 }
148
149 if (xa->isShutdown) return; /* Avoid simultaneous multiple calls */
150 xa->isShutdown = true; /* Inhibit access to xa */
151
152 if (xa->callbackP != 0) {
153 xa->callbackP->shutdown(xa->callbackP);
154 }
155 if (xa->connectionP != 0) {
156 xa->connectionP->shutdown(xa->connectionP);
157 }
158
159 if (xa->callbackP != 0) {
160 /* Detach or join? On Linux both work fine. On Windows it blocks sometimes forever during join */
161 const bool USE_DETACH_MODE = xa->props->getBool(xa->props, "plugin/socket/detachCbThread", true);
162 int retVal;
163 if (xa->callbackP->threadIsAlive && !USE_DETACH_MODE) {
164 /* pthread_cancel() does not block. Who cleans up open resources? TODO: pthread_cleanup_push() */
165 /* On Linux all works fine without pthread_cancel() but on Windows the later pthread_join() sometimes hangs without a pthread_cancel() */
166 /*
167 retVal = pthread_cancel(xa->callbackThreadId);
168 if (retVal != 0) {
169 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cancel problem return value is %d", retVal);
170 }
171 */
172 }
173
174 if (USE_DETACH_MODE) {
175 /* Check if above xa->callbackP->shutdown(xa->callbackP) thread has finished: */
176 /*bool hasTerminated = */xa->callbackP->waitOnCallbackThreadTermination(xa->callbackP, 2000);
177
178 retVal = pthread_detach(xa->callbackThreadId); /* Frees resources (even if thread has died already), don't call multiple times on same thread! */
179 if (retVal != 0) {
180 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching callback thread 0x%x failed with error number %d", __LINE__, get_pthread_id(xa->callbackThreadId), retVal);
181 }
182 else {
183 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
184 "pthread_detach(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
185 }
186 }
187 else { /* JOIN mode */
188 retVal = pthread_join(xa->callbackThreadId, 0);
189 if (retVal != 0) {
190 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_join problem return value is %d", retVal);
191 }
192 else {
193 if (xa->logLevel>=XMLBLASTER_LOG_INFO) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
194 "pthread_join(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
195 }
196 }
197
198 memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
199 }
200
201 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterAccessUnparsed() conP=0x%x cbP=0x%x", xa->connectionP, xa->callbackP);
202
203 { /* Wait for any pending update() dispatcher threads to die */
204 int i;
205 int num = 1000;
206 int interval = 10;
207 for (i=0; i<num; i++) {
208 if ((int)xa->threadCounter < 1)
209 break;
210 sleepMillis(interval);
211 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
212 "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for update thread to join. %d/%d", interval, i, num);
213 }
214 if (i >= num) {
215 if (xa->logLevel>=XMLBLASTER_LOG_ERROR) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
216 "freeXmlBlasterAccessUnparsed(): There are active callback threads in user code which didn't return after sleeping for %ld millis, we continue now to shutdown ...", (long)interval*num);
217 }
218 }
219
220 if (xa->connectionP != 0) {
221 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
222 }
223
224 if (xa->callbackP != 0) {
225 freeCallbackServerUnparsed(&xa->callbackP);
226 }
227
228 freeProperties(xa->props);
229
230 rc = pthread_mutex_destroy(&xa->writenMutex); /* On Linux this does nothing, but returns an error code EBUSY if the mutex was locked */
231 if (rc != 0) /* EBUSY=16 "Device or resource busy": char *strerror_r(int errnum, char *buf, size_t buflen); */
232 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(writenMutex) returned %d, we ignore it", rc);
233
234 rc = pthread_mutex_destroy(&xa->readnMutex);
235 if (rc != 0) /* EBUSY */
236 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(readnMutex) returned %d, we ignore it", rc);
237
238 free(xa);
239 }
240
241 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp clientUpdateFp, XmlBlasterException *exception)
242 {
243 int threadRet = 0;
244 const char *compressType = 0;
245
246 if (checkArgs(xa, "initialize", false, exception) == false) return false;
247
248 if (xa->isInitialized) {
249 return true;
250 }
251
252 if (clientUpdateFp == 0) {
253 xa->clientsUpdateFp = 0;
254 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, "",
255 "Your callback UpdateFp pointer is NULL, we use our default callback handler");
256 }
257 else {
258 xa->clientsUpdateFp = clientUpdateFp;
259 }
260
261 if (xa->connectionP) {
262 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
263 }
264 xa->connectionP = getXmlBlasterConnectionUnparsed(xa->argc, xa->argv);
265 if (xa->connectionP == 0) {
266 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
267 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
268 "[%.100s:%d] Creating XmlBlasterConnectionUnparsed failed", __FILE__, __LINE__);
269 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
270 return false;
271 }
272 xa->connectionP->log = xa->log;
273 xa->connectionP->logUserP = xa->logUserP;
274 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Created XmlBlasterConnectionUnparsed");
275
276
277 /* Switch on compression? */
278 compressType = xa->props->getString(xa->props, "plugin/socket/compress/type", "");
279 compressType = xa->props->getString(xa->props, "dispatch/connection/plugin/socket/compress/type", compressType);
280
281 if (!strcmp(compressType, "zlib:stream")) {
282 xa->connectionP->writeToSocket.writeToSocketFuncP = writenCompressed;
283 xa->connectionP->writeToSocket.userP = xa;
284 xa->connectionP->readFromSocket.readFromSocketFuncP = readnCompressed;
285 xa->connectionP->readFromSocket.userP = xa;
286 }
287 else {
288 if (strcmp(compressType, "")) {
289 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode", compressType);
290 }
291 xa->connectionP->writeToSocket.writeToSocketFuncP = writenPlain;
292 xa->connectionP->writeToSocket.userP = xa;
293 xa->connectionP->readFromSocket.readFromSocketFuncP = readnPlain;
294 xa->connectionP->readFromSocket.userP = xa;
295 }
296
297 if (xa->connectionP->initConnection(xa->connectionP, exception) == false) /* Establish low level IP connection */
298 return false;
299
300 /* the fourth arg 'xa' is returned as 'void *userData' in update() method */
301 if (xa->callbackP != 0) {
302 freeCallbackServerUnparsed(&xa->callbackP);
303 }
304 xa->callbackP = getCallbackServerUnparsed(xa->argc, xa->argv, interceptUpdate, xa);
305 if (xa->callbackP == 0) {
306 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
307 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
308 "[%.100s:%d] Creating CallbackServerUnparsed failed", __FILE__, __LINE__);
309 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
310 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
311 return false;
312 }
313 xa->callbackP->log = xa->log;
314 xa->callbackP->logUserP = xa->logUserP;
315
316 if (!strcmp(compressType, "zlib:stream")) {
317 xa->callbackP->writeToSocket.writeToSocketFuncP = writenCompressed;
318 xa->callbackP->writeToSocket.userP = xa;
319 xa->callbackP->readFromSocket.readFromSocketFuncP = readnCompressed;
320 xa->callbackP->readFromSocket.userP = xa;
321 }
322 else {
323 xa->callbackP->writeToSocket.writeToSocketFuncP = writenPlain;
324 xa->callbackP->writeToSocket.userP = xa;
325 xa->callbackP->readFromSocket.readFromSocketFuncP = readnPlain;
326 xa->callbackP->readFromSocket.userP = xa;
327 }
328
329 xa->callbackP->useThisSocket(xa->callbackP, xa->connectionP->socketToXmlBlaster, xa->connectionP->socketToXmlBlasterUdp);
330
331 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
332 "Created CallbackServerUnparsed instance, creating on a separate thread a listener on socket://%s:%d...",
333 (xa->callbackP->hostCB == 0) ? "" : xa->callbackP->hostCB, xa->callbackP->portCB);
334
335 /* Register our callback funtion which is called just before sending a message */
336 xa->connectionP->preSendEvent = preSendEvent;
337 xa->connectionP->preSendEvent_userP = xa;
338
339 /* Register our callback funtion which is called just after sending a message */
340 xa->connectionP->postSendEvent = postSendEvent;
341 xa->connectionP->postSendEvent_userP = xa;
342
343 /* thread blocks on socket listener or on socket read (if useThisSocket) */
344 threadRet = pthread_create(&xa->callbackThreadId, (const pthread_attr_t *)0, (void * (*)(void *))xa->callbackP->runCallbackServer, (void *)xa->callbackP);
345 if (threadRet != 0) {
346 strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
347 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
348 "[%.100s:%d] Creating thread failed with error number %d",
349 __FILE__, __LINE__, threadRet);
350 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
351 freeCallbackServerUnparsed(&xa->callbackP);
352 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
353 return false;
354 }
355 /* bool hasStarted = */xa->callbackP->waitOnCallbackThreadAlive(xa->callbackP, 5000);
356
357 xa->isInitialized = true;
358 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
359 "initialize() successful");
360 return xa->isInitialized;
361 }
362
363 static bool isConnected(XmlBlasterAccessUnparsed *xa)
364 {
365 if (xa == 0 || xa->isShutdown || xa->connectionP == 0) {
366 return false;
367 }
368 return xa->connectionP->isConnected(xa->connectionP);
369 }
370
371 /**
372 * Callback from #XmlBlasterConnectionUnparsed just before a message is sent,
373 * the msgRequestInfo contains the requestId used.
374 * This is the clients calling thread.
375 * @param msgRequestInfoP Contains some informations about the request, may not be NULL
376 * @param exception May not be NULL
377 * @return The same (or a manipulated/encrypted) msgRequestInfo, if NULL the exception is filled.
378 * If msgRequestInfoP->blob.data was changed and malloc()'d by you, the caller will free() it.
379 * If you return NULL you need to call removeResponseListener() to avoid a memory leak.
380 */
381 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
382 {
383 bool retBool;
384 int retInt;
385 XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
386
387 /* if (!strcmp(XMLBLASTER_PUBLISH_ONEWAY, msgRequestInfoP->methodName)) */
388 if (xbl_isOneway(MSG_TYPE_INVOKE, msgRequestInfoP->methodName))
389 return msgRequestInfoP;
390
391 /* ======== Initialize threading ====== */
392 msgRequestInfoP->responseMutexIsValid = false; /* Only to remember if the client thread holds the lock */
393
394 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
395 "preSendEvent(%s) occurred", msgRequestInfoP->methodName);
396 retBool = xa->callbackP->addResponseListener(xa->callbackP, msgRequestInfoP, responseEvent);
397 if (retBool == false) {
398 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
399 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
400 "[%.100s:%d] Couldn't register as response listener", __FILE__, __LINE__);
401 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
402 return (MsgRequestInfo *)0;
403 }
404
405 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
406 "preSendEvent(requestId=%s, msgRequestInfoP->responseBlob.dataLen=%d), entering lock",
407 msgRequestInfoP->requestIdStr, msgRequestInfoP->responseBlob.dataLen);
408 pthread_mutex_init(&msgRequestInfoP->responseMutex, NULL); /* returns always 0 */
409 if ((retInt = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
410 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
411 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
412 "[%.100s:%d] Error trying to lock responseMutex %d", __FILE__, __LINE__, retInt);
413 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
414 return (MsgRequestInfo *)0;
415 }
416 msgRequestInfoP->responseMutexIsValid = true; /* Only if the client thread holds the lock */
417
418 return msgRequestInfoP;
419 }
420
421 /**
422 * This function is called by the callback server when a response message arrived (after we send a request).
423 * The xa->responseBlob->data is malloc()'d with the response string, you need to free it.
424 * This method is executed by the callback server thread.
425 * @param msgRequestInfoP May not be NULL
426 * @param socketDataHolder is on the stack and does not need to be freed, the 'data' member is
427 * malloc()'d and must be freed by the caller.
428 */
429 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder) {
430 int retVal;
431 SocketDataHolder *s = (SocketDataHolder *)socketDataHolder;
432 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
433
434 if (msgRequestInfoP == 0)
435 return;
436
437 if (msgRequestInfoP->responseMutexIsValid == false)
438 return;
439
440 if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
441 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to lock responseMutex in responseEvent() failed %d", retVal);
442 if (msgRequestInfoP->responseMutexIsValid == false)
443 return;
444 }
445 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is LOCKED");
446
447 blobcpyAlloc(&msgRequestInfoP->responseBlob, s->blob.data, s->blob.dataLen);
448 msgRequestInfoP->responseType = s->type;
449
450 if ((retVal = pthread_cond_signal(&msgRequestInfoP->responseCond)) != 0) {
451 if (retVal == EINVAL)
452 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d EINVAL: responseCond is not valid", retVal);
453 else if (retVal == EFAULT)
454 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d EFAULT: responseCond points to illegal address", retVal);
455 else
456 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d", retVal);
457 /*return; we need to unlock the mutex */
458 }
459
460 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
461 "responseEvent(requestId '%s', msgType=%c, dataLen=%d) occurred, wake up signal sent",
462 s->requestId, msgRequestInfoP->responseType, msgRequestInfoP->responseBlob.dataLen);
463
464 if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
465 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to unlock responseMutex in responseEvent() failed %d", retVal);
466 /* return; */
467 }
468 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is UNLOCKED");
469 }
470
471 /**
472 * Callback function (wait for response) called directly after a message is sent.
473 * @param msgRequestInfoP Contains some informations about the request, may not be NULL
474 * @param exception May not be NULL
475 * @return The returned string from a request is written into msgRequestInfoP->data,
476 * the caller needs to free() it.
477 */
478 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
479 {
480 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
481 struct timespec abstime;
482 bool useTimeout = false;
483 int retVal, i;
484
485 if (msgRequestInfoP->rollback) {
486 xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
487 /* cb->shutdown(), cb->waitOnCallbackThreadTermination() */
488 mutexUnlock(msgRequestInfoP, exception);
489 return (MsgRequestInfo *)0;
490 }
491
492 if (xa->responseTimeout > 0 && getAbsoluteTime(xa->responseTimeout, &abstime) == true) {
493 useTimeout = true;
494 }
495
496 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) responseMutex is LOCKED, entering wait ...", msgRequestInfoP->requestIdStr);
497
498 if ((retVal = pthread_cond_init(&msgRequestInfoP->responseCond, NULL)) != 0) {
499 xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
500 strncpy0(exception->errorCode, "resource.exhaust", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */
501 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] pthread_cond_init() for '%s()' with requestId=%s returned %d.",
502 __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
503 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
504 return (MsgRequestInfo *)0;
505 }
506
507 /* Wait for response, the callback server delivers it */
508 while (msgRequestInfoP->responseType == 0) { /* Protect for spurious wake ups (e.g. by SIGUSR1) */
509 if (useTimeout == true) {
510 int error = pthread_cond_timedwait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex, &abstime);
511 if (error == ETIMEDOUT) {
512 /*
513 * TODO: msgRequestInfoP is on the stack and if we now return
514 * it will be invalid:
515 * removeResponseListener() removes it from the callback thread
516 * but what if the callback thread currently uses it?
517 */
518 xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
519 strncpy0(exception->errorCode, "communication.responseTimeout", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */
520 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Waiting on response for '%s()' with requestId=%s timed out after blocking %ld millis",
521 __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, xa->responseTimeout);
522 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
523 return (MsgRequestInfo *)0;
524 }
525 }
526 else {
527 pthread_cond_wait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex); /* Wakes up from responseEvent() */
528 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
529 "Wake up tread, response of length %d arrived", msgRequestInfoP->responseBlob.dataLen);
530 }
531 }
532
533 for (i=0; i<10; i++) { /* Error recovery loop */
534 if ((retVal = pthread_cond_destroy(&msgRequestInfoP->responseCond)) != 0) {
535 if (retVal == EBUSY) { /* Is in use by another thread */
536 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cond_destroy() for '%s()' with requestId=%s returned EBUSY=%d, we try again #%d/10",
537 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal, i);
538 sleepMillis(10);
539 continue;
540 }
541 else {
542 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cond_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
543 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
544 }
545 }
546 break;
547 }
548
549 msgRequestInfoP->blob.dataLen = msgRequestInfoP->responseBlob.dataLen;
550 msgRequestInfoP->blob.data = msgRequestInfoP->responseBlob.data;
551 msgRequestInfoP->responseBlob.dataLen = 0;
552 msgRequestInfoP->responseBlob.data = 0; /* msgRequestInfoP->blob.data is now responsible to free() the data */
553
554 if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
555 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
556 "Thread #%ld woke up in postSendEvent() for msgType=%c and dataLen=%d",
557 msgRequestInfoP->requestIdStr, msgRequestInfoP->responseType, msgRequestInfoP->blob.dataLen);
558
559
560 if (msgRequestInfoP->responseType == (char)MSG_TYPE_EXCEPTION) {
561 convertToXmlBlasterException(&msgRequestInfoP->blob, exception, false);
562 freeBlobHolderContent(&msgRequestInfoP->blob);
563 msgRequestInfoP->responseType = 0;
564 return (MsgRequestInfo *)0;
565 }
566
567 msgRequestInfoP->responseType = 0;
568
569 /* if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) i woke up, entering unlock ...", msgRequestInfoP->requestIdStr); */
570 if (mutexUnlock(msgRequestInfoP, exception) == false)
571 return (MsgRequestInfo *)0;
572
573 return msgRequestInfoP;
574 }
575
576 /**
577 * Free lock.
578 * @param msgRequestInfoP Transporting data
579 * @param exception The exception struct, can be null
580 * @return false on error, the exception struct is filled in this case and the lock is not released
581 */
582 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception) {
583 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
584 int retVal;
585 if (msgRequestInfoP->responseMutexIsValid == false)
586 return true;
587 msgRequestInfoP->responseMutexIsValid = false;
588 if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
589 char embeddedText[XMLBLASTEREXCEPTION_MESSAGE_LEN];
590 if (exception == 0) {
591 if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
592 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
593 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
594 }
595 return false;
596 }
597 if (*exception->errorCode != 0) {
598 SNPRINTF(embeddedText, XMLBLASTEREXCEPTION_MESSAGE_LEN, "{%s:%s}", exception->errorCode, exception->message);
599 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Ignoring embedded exception %s: %s", exception->errorCode, exception->message);
600 }
601 else
602 *embeddedText = 0;
603 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
604 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] ERROR trying to unlock responseMutex, return=%d. Embedded %s", __FILE__, __LINE__, retVal, embeddedText);
605 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
606
607 if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
608 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
609 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
610 }
611 return false;
612 }
613 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent() responseMutex is UNLOCKED");
614
615 if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
616 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
617 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
618 }
619 return true;
620 }
621
622 Dll_Export const char *xmlBlasterAccessUnparsedUsage(char *usage)
623 {
624 /* take care not to exceed XMLBLASTER_MAX_USAGE_LEN */
625 SNPRINTF(usage, XMLBLASTER_MAX_USAGE_LEN, "%.800s%.800s%.400s", xmlBlasterConnectionUnparsedUsage(), callbackServerRawUsage(),
626 "\n -plugin/socket/multiThreaded [true]"
627 "\n If true the update() call to your client code is a separate thread."
628 "\n -plugin/socket/responseTimeout [60000 (one minute)]"
629 "\n The time in millis to wait on a response, 0 is forever."
630 "\n -logLevel ERROR | WARN | INFO | TRACE | DUMP [WARN]"
631 );
632
633 return usage;
634 }
635
636 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos,
637 UpdateFp clientUpdateFp, XmlBlasterException *exception)
638 {
639 char *response = 0;
640 char *qos_;
641
642 if (checkArgs(xa, "connect", false, exception) == false) return 0;
643
644 /* Is allowed, we use our default handler in this case
645 if (clientUpdateFp == 0) {
646 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
647 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'updateFp' to connect()", __FILE__, __LINE__);
648 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
649 return false;
650 }
651 */
652
653 if (qos == 0) {
654 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
655 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'qos' to connect()", __FILE__, __LINE__);
656 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
657 return false;
658 }
659
660 if (initialize(xa, clientUpdateFp, exception) == false) {
661 return false;
662 }
663
664 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Invoking connect()");
665
666 if (strstr(qos, "<callback") != 0) {
667 /* User has given us a callback address */
668 qos_ = strcpyAlloc(qos);
669 }
670 else {
671 /* We add the callback sequence with our tunnel callback host and port
672 HACK: This is error prone depending on the given qos */
673 const char *pos;
674 enum { SIZE=1024 };
675 char callbackQos[SIZE];
676 snprintf0(callbackQos, SIZE,
677 "<queue relating='callback'>" /* maxEntries='100' maxEntriesCache='100'>" */
678 " <callback type='SOCKET' sessionId='%s'>"
679 " socket://%.120s:%d"
680 " </callback>"
681 "</queue>",
682 "NoCallbackSessionId", xa->callbackP->hostCB, xa->callbackP->portCB);
683 qos_ = (char *)calloc(strlen(qos) + SIZE, sizeof(char *));
684 pos = strstr(qos, "</qos>");
685 if (pos == 0) {
686 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
687 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid 'qos' markup to connect()", __FILE__, __LINE__);
688 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
689 return false;
690 }
691 strncpy0(qos_, qos, pos-qos+1);
692 strncat0(qos_, callbackQos, SIZE-strlen(qos_));
693 strncat0(qos_, "</qos>", 8);
694 }
695 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connecting with qos=%s", qos_);
696
697 /* Register our function responseEvent() to be notified when the response arrives,
698 this is done by preSendEvent() callback called during connect() */
699
700 response = xa->connectionP->connect(xa->connectionP, qos_, exception);
701
702 free(qos_);
703 /* freeBlobHolderContent(&xa->responseBlob); */
704
705 /* The response was handled by a callback to postSendEvent */
706
707 if (response == 0) return response;
708
709 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
710 "Got response for connect(secretSessionId=%s)", xa->connectionP->secretSessionId);
711 return response;
712 }
713
714 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception)
715 {
716 bool p;
717 if (checkArgs(xa, "disconnect", true, exception) == false ) return 0;
718 p = xa->connectionP->disconnect(xa->connectionP, qos, exception);
719 return p;
720 }
721
722 /**
723 * Publish a message to the server.
724 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
725 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
726 * @see XmlBlasterConnectionUnparsed#publish() for a function documentation
727 */
728 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception)
729 {
730 char *p;
731 if (checkArgs(xa, "publish", true, exception) == false ) return 0;
732 p = xa->connectionP->publish(xa->connectionP, msgUnit, exception);
733 return p;
734 }
735
736 /**
737 * Publish a message array in a bulk to the server.
738 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
739 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
740 * @see XmlBlasterConnectionUnparsed#publishArr() for a function documentation
741 */
742 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
743 {
744 QosArr *p;
745 if (checkArgs(xa, "publishArr", true, exception) == false ) return 0;
746 p = xa->connectionP->publishArr(xa->connectionP, msgUnitArr, exception);
747 return p;
748 }
749
750 /**
751 * Publish a message array in a bulk to the server, we don't receive an ACK.
752 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
753 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
754 * @see XmlBlasterConnectionUnparsed#publishOneway() for a function documentation
755 */
756 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
757 {
758 if (checkArgs(xa, "publishOneway", true, exception) == false ) return;
759 xa->connectionP->publishOneway(xa->connectionP, msgUnitArr, exception);
760 }
761
762 /**
763 * Subscribe a message.
764 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.subscribe.html
765 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
766 */
767 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
768 {
769 char *p;
770 if (checkArgs(xa, "subscribe", true, exception) == false ) return 0;
771 p = xa->connectionP->subscribe(xa->connectionP, key, qos, exception);
772 return p;
773 }
774
775 /**
776 * UnSubscribe a message from the server.
777 * @return The raw QoS XML strings returned from xmlBlaster, only NULL if an exception is thrown
778 * You need to free it with freeQosArr() after usage
779 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.unSubscribe.html
780 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
781 */
782 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
783 {
784 QosArr *p;
785 if (checkArgs(xa, "unSubscribe", true, exception) == false ) return 0;
786 p = xa->connectionP->unSubscribe(xa->connectionP, key, qos, exception);
787 return p;
788 }
789
790 /**
791 * Erase a message from the server.
792 * @return A struct holding the raw QoS XML strings returned from xmlBlaster,
793 * only NULL if an exception is thrown.
794 * You need to freeQosArr() it
795 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.erase.html
796 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
797 */
798 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
799 {
800 QosArr *p;
801 if (checkArgs(xa, "erase", true, exception) == false ) return 0;
802 p = xa->connectionP->erase(xa->connectionP, key, qos, exception);
803 return p;
804 }
805
806 /**
807 * Ping the server.
808 * @param xa The 'this' pointer
809 * @param qos The QoS or 0
810 * @param exception *errorCode!=0 on failure
811 * @return The ping return QoS raw xml string, you need to free() it
812 * or 0 on failure (in which case *exception.errorCode!=0)
813 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
814 */
815 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception)
816 {
817 char *p;
818 if (checkArgs(xa, "ping", true, exception) == false ) return 0;
819 p = xa->connectionP->ping(xa->connectionP, qos, exception);
820 return p;
821 }
822
823 /**
824 * Get a message.
825 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.get.html
826 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
827 * @return NULL on error, please check exception in such a case
828 */
829 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
830 {
831 MsgUnitArr *msgUnitArr;
832 if (checkArgs(xa, "get", true, exception) == false ) return 0;
833 msgUnitArr = xa->connectionP->get(xa->connectionP, key, qos, exception);
834 return msgUnitArr;
835 }
836
837 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName,
838 bool checkIsConnected, XmlBlasterException *exception)
839 {
840 if (exception != 0)
841 initializeXmlBlasterException(exception);
842
843 if (xa == 0) {
844 char *stack = getStackTrace(10);
845 if (exception == 0) {
846 printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %s() %s",
847 __FILE__, __LINE__, methodName, stack);
848 }
849 else {
850 strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
851 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
852 "[%.100s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %.16s() %s",
853 __FILE__, __LINE__, methodName, stack);
854 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
855 }
856 free(stack);
857 return false;
858 }
859
860 if (exception == 0) {
861 char *stack = getStackTrace(10);
862 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s",
863 __FILE__, __LINE__, methodName, stack);
864 free(stack);
865 return false;
866 }
867
868 if (xa->isShutdown || (checkIsConnected && !xa->isConnected(xa))) {
869 char *stack = getStackTrace(10);
870 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
871 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
872 "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s",
873 __FILE__, __LINE__, methodName, stack);
874 free(stack);
875 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
876 return false;
877 }
878
879 return true;
880 }
881
882 /**
883 * Run by the new created thread, calls the clients update method.
884 * Leaving this pthread start routine does an implicit pthread_exit().
885 * @param container Holding all necessary informations, we free it when we are done
886 * @return 0 on success, 1 on error. The return value is the exit value returned by pthread_join()
887 */
888 static int runUpdate(UpdateContainer *container)
889 {
890 XmlBlasterAccessUnparsed *xa = container->xa;
891 MsgUnitArr *msgUnitArrP = container->msgUnitArrP;
892 void *userData = container->userData;
893 CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData;
894 XmlBlasterException *exception = &container->exception;
895 SocketDataHolder *socketDataHolder = &container->socketDataHolder;
896 XMLBLASTER_C_bool retVal;
897
898 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Entering runUpdate()");
899
900 retVal = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
901
902 if (xa->lowLevelAutoAck) { /* returned already */
903 }
904 else {
905 cb->sendResponseOrException(retVal, cb, socketDataHolder, msgUnitArrP, exception);
906 }
907
908 free(container);
909
910 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
911 "runUpdate: Update thread 0x%x is exiting", get_pthread_id(pthread_self()));
912 xa->threadCounter--;
913 return (retVal==true) ? 0 : 1;
914 }
915
916 /**
917 * Here we receive the callback messages from xmlBlaster, create a thread and dispatch
918 * it to the clients update.
919 * @see UpdateFp in CallbackServerUnparsed.h
920 */
921 static void interceptUpdate(MsgUnitArr *msgUnitArrP, void *userData,
922 XmlBlasterException *exception, void /*SocketDataHolder*/ *socketDataHolder)
923 {
924 CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData;
925 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)cb->updateCbUserData;
926
927 if (xa->clientsUpdateFp == 0) { /* Client has not registered an update() */
928 size_t i;
929 bool testException = false;
930 bool success = true;
931
932 for (i=0; i<msgUnitArrP->len; i++) {
933 const char *key = msgUnitArrP->msgUnitArr[i].key;
934 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
935 "CALLBACK update() default handler: Asynchronous message update arrived:%s id=%s, we ignore it in this default handler\n",
936 key, ((SocketDataHolder*)socketDataHolder)->requestId);
937 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
938 /* Return QoS: Everything is OK */
939 }
940 if (testException) {
941 strncpy0(exception->errorCode, "user.clientCode",
942 XMLBLASTEREXCEPTION_ERRORCODE_LEN);
943 strncpy0(exception->message, "I don't want these messages",
944 XMLBLASTEREXCEPTION_MESSAGE_LEN);
945 success = false;
946 }
947 cb->sendResponseOrException(success, cb, socketDataHolder, msgUnitArrP, exception);
948 return;
949 }
950
951 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "interceptUpdate(): Received message");
952
953 if (xa->callbackMultiThreaded == false) {
954 XMLBLASTER_C_bool ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
955 cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception);
956 return;
957 }
958
959 {
960 pthread_t tid;
961 int threadRet = 0;
962 UpdateContainer *container = (UpdateContainer*)malloc(sizeof(UpdateContainer));
963 pthread_attr_t attr;
964
965 pthread_attr_init(&attr);
966 /* Cleanup all resources after ending the thread, instead of calling pthread_join() */
967 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
968
969 container->xa = xa;
970 container->msgUnitArrP = msgUnitArrP;
971 container->userData = userData;
972 memcpy(&container->exception, exception, sizeof(XmlBlasterException));
973 memcpy(&container->socketDataHolder, socketDataHolder, sizeof(SocketDataHolder)); /* The blob pointer is freed already by CallbackServerUnparsed */
974
975 if (xa->lowLevelAutoAck) {
976 size_t i;
977 for (i=0; i<msgUnitArrP->len; i++) {
978 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
979 }
980 }
981
982 /*
983 Guaranteed sequence:
984 The server uses max one thread to deliver update() for each client
985 If the update contains an array of messages those are handled as a
986 complete bulk in the correct sequence here.
987 */
988
989 /* this thread will deliver the update message to the client code,
990 Note: we need a thread pool cache for better performance */
991 xa->threadCounter++;
992 threadRet = pthread_create(&tid, &attr,
993 (void * (*)(void *))runUpdate, (void *)container);
994 if (threadRet != 0) {
995 XMLBLASTER_C_bool ret = false;
996 free(container);
997 strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
998 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
999 "[%.100s:%d] Creating thread failed with error number %d, we deliver the message in the same thread",
1000 __FILE__, __LINE__, threadRet);
1001 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
1002 ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
1003 cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception);
1004 xa->threadCounter--;
1005 pthread_attr_destroy(&attr);
1006 return;
1007 }
1008
1009 /* Is done already with above PTHREAD_CREATE_DETACHED
1010 threadRet = pthread_detach(tid);
1011 if (threadRet != 0) {
1012 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching thread failed with error number %d", __LINE__, threadRet);
1013 }
1014 */
1015
1016 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1017 "interceptUpdate: Received message and delegated it to a separate thread 0x%x to deliver", get_pthread_id(tid));
1018
1019 pthread_attr_destroy(&attr);
1020 }
1021
1022 if (xa->lowLevelAutoAck) {
1023 *exception->errorCode = 0;
1024 cb->sendResponseOrException(true, cb, socketDataHolder, msgUnitArrP, exception);
1025 }
1026 }
1027
1028 /**
1029 * Write uncompressed to socket (thread safe)
1030 */
1031 static ssize_t writenPlain(void * userP, const int fd, const char *ptr, const size_t nbytes) {
1032 int rc;
1033 ssize_t ret;
1034
1035 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1036
1037 /* Start mutex */
1038 rc = pthread_mutex_lock(&xa->writenMutex);
1039 if (rc != 0) /* EINVAL */
1040 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc);
1041
1042 /* Send data */
1043 ret = writen(fd, ptr, nbytes);
1044
1045 /* End mutex */
1046 rc = pthread_mutex_unlock(&xa->writenMutex);
1047 if (rc != 0) /* EPERM */
1048 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc);
1049
1050 return ret;
1051
1052 }
1053
1054 /**
1055 * Compress data and send to socket.
1056 */
1057 static ssize_t writenCompressed(void *userP, const int fd, const char *ptr, const size_t nbytes) {
1058 int rc;
1059 ssize_t ret;
1060
1061 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1062 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "writenCompressed(%u)", nbytes);
1063
1064 /* Start mutex */
1065 rc = pthread_mutex_lock(&xa->writenMutex);
1066 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc);
1067
1068 /* Send data */
1069 ret = xmlBlaster_writenCompressed(xa->connectionP->zlibWriteBuf, fd, ptr, nbytes);
1070
1071 /* End mutex */
1072 rc = pthread_mutex_unlock(&xa->writenMutex);
1073 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc);
1074
1075 return ret;
1076 }
1077
1078 /**
1079 * Read uncompressed to socket (thread safe)
1080 */
1081 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1082 int rc;
1083 ssize_t ret;
1084
1085 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1086
1087 rc = pthread_mutex_lock(&xa->readnMutex);
1088 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc);
1089
1090 ret = readn(fd, ptr, nbytes, fpNumRead, userP2);
1091
1092 rc = pthread_mutex_unlock(&xa->readnMutex);
1093 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc);
1094
1095 return ret;
1096 }
1097
1098 /**
1099 * Read data from socket, uncompress it if necessary.
1100 */
1101 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1102 int rc;
1103 ssize_t ret;
1104
1105 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1106 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "readnCompressed(%u)", nbytes);
1107
1108 rc = pthread_mutex_lock(&xa->readnMutex);
1109 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc);
1110
1111 ret = xmlBlaster_readnCompressed(xa->connectionP->zlibReadBuf, fd, ptr, nbytes, fpNumRead, userP2);
1112
1113 rc = pthread_mutex_unlock(&xa->readnMutex);
1114 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc);
1115
1116 return ret;
1117 }
1118
1119 #ifdef XmlBlasterAccessUnparsedMain /* compile a standalone test program */
1120
1121 /**
1122 * Here we receive the callback messages from xmlBlaster
1123 * FOR TESTING ONLY
1124 * @see UpdateFp in CallbackServerUnparsed.h
1125 */
1126 static bool myUpdate(MsgUnitArr *msgUnitArrP, void *userData, XmlBlasterException *xmlBlasterException)
1127 {
1128 size_t i;
1129 bool testException = false;
1130 if (userData != 0) ; /* to avoid compiler warning (we don't need it here) */
1131 for (i=0; i<msgUnitArrP->len; i++) {
1132 char *xml = messageUnitToXml(&msgUnitArrP->msgUnitArr[i]);
1133 printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n", xml);
1134 free(xml);
1135 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
1136 /* Return QoS: Everything is OK */
1137 }
1138 if (testException) {
1139 strncpy0(xmlBlasterException->errorCode, "user.clientCode",
1140 XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1141 strncpy0(xmlBlasterException->message, "I don't want these messages",
1142 XMLBLASTEREXCEPTION_MESSAGE_LEN);
1143 return false;
1144 }
1145 return true;
1146 }
1147
1148 /**
1149 * Invoke: XmlBlasterAccessUnparsedMain -logLevel TRACE -numTests 10
1150 */
1151 int main(int argc, char** argv)
1152 {
1153 int ii;
1154 int numTests = 1;
1155 bool testCallInitialize = false;
1156
1157 for (ii=0; ii < argc-1; ii++)
1158 if (strcmp(argv[ii], "-numTests") == 0) {
1159 if (strToInt(&numTests, argv[++ii]) == false)
1160 printf("[XmlBlasterAccessUnparsed] WARN '-numTests %s' is invalid\n", argv[ii]);
1161 }
1162
1163 for (ii=0; ii<numTests; ii++) {
1164 int iarg;
1165 char *response = (char *)0;
1166 /*
1167 * callbackSessionId:
1168 * Is created by the client and used to validate callback messages in update.
1169 * This is sent on connect in ConnectQos.
1170 * (Is different from the xmlBlaster secret session ID)
1171 */
1172 const char *callbackSessionId = "topSecret";
1173 XmlBlasterException xmlBlasterException;
1174 XmlBlasterAccessUnparsed *xa = 0;
1175
1176 /*
1177 const char *tmp = getStackTrace(20);
1178 printf("[client] stackTrace=%s\n", tmp);
1179 free(tmp);
1180 */
1181
1182 # ifdef PTHREAD_THREADS_MAX
1183 printf("[client] Try option '-help' if you need usage informations, max %d"
1184 " threads per process are supported on this OS\n", PTHREAD_THREADS_MAX);
1185 # else
1186 printf("[client] Try option '-help' if you need usage informations\n");
1187 # endif
1188
1189 for (iarg=0; iarg < argc; iarg++) {
1190 if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
1191 char usage[XMLBLASTER_MAX_USAGE_LEN];
1192 const char *pp =
1193 "\n -logLevel ERROR | WARN | INFO | TRACE | DUMP [WARN]"
1194 "\n -numTests How often to run the same tests [1]"
1195 "\n\nExample:"
1196 "\n XmlBlasterAccessUnparsedMain -logLevel TRACE"
1197 " -dispatch/connection/plugin/socket/hostname server.mars.universe";
1198 printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
1199 getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
1200 exit(1);
1201 }
1202 }
1203
1204 xa = getXmlBlasterAccessUnparsed(argc, argv);
1205
1206 if (testCallInitialize) {
1207 if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) {
1208 printf("[client] Connection to xmlBlaster failed,"
1209 " please start the server or check your configuration\n");
1210 freeXmlBlasterAccessUnparsed(xa);
1211 exit(1);
1212 }
1213 }
1214
1215 { /* connect */
1216 char connectQos[2048];
1217 char callbackQos[1024];
1218
1219 if (testCallInitialize) {
1220 SNPRINTF(callbackQos, 1024,
1221 "<queue relating='callback' maxEntries='100' maxEntriesCache='100'>"
1222 " <callback type='SOCKET' sessionId='%s'>"
1223 " socket://%.120s:%d"
1224 " </callback>"
1225 "</queue>",
1226 callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB);
1227 }
1228 else
1229 *callbackQos = '\0';
1230
1231 SNPRINTF(connectQos, 2048,
1232 "<qos>"
1233 " <securityService type='htpasswd' version='1.0'>"
1234 " <![CDATA["
1235 " <user>fritz</user>"
1236 " <passwd>secret</passwd>"
1237 " ]]>"
1238 " </securityService>"
1239 "%.1024s"
1240 "</qos>", callbackQos);
1241
1242 response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException);
1243 if (*xmlBlasterException.errorCode != 0) {
1244 printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
1245 xmlBlasterException.errorCode, xmlBlasterException.message);
1246 freeXmlBlasterAccessUnparsed(xa);
1247 exit(1);
1248 }
1249 free(response);
1250 printf("[client] Connected to xmlBlaster, do some tests ...\n");
1251 }
1252
1253 response = xa->ping(xa, 0, &xmlBlasterException);
1254 if (response == (char *)0) {
1255 printf("[client] ERROR: Pinging a connected server failed: errorCode=%s, message=%s\n",
1256 xmlBlasterException.errorCode, xmlBlasterException.message);
1257 }
1258 else {
1259 printf("[client] Pinging a connected server, response=%s\n", response);
1260 free(response);
1261 }
1262
1263 { /* subscribe ... */
1264 const char *key = "<key oid='HelloWorld'/>";
1265 const char *qos = "<qos/>";
1266 printf("[client] Subscribe message 'HelloWorld' ...\n");
1267 response = xa->subscribe(xa, key, qos, &xmlBlasterException);
1268 if (*xmlBlasterException.errorCode != 0) {
1269 printf("[client] Caught exception in subscribe errorCode=%s, message=%s\n",
1270 xmlBlasterException.errorCode, xmlBlasterException.message);
1271 xa->disconnect(xa, 0, &xmlBlasterException);
1272 freeXmlBlasterAccessUnparsed(xa);
1273 exit(1);
1274 }
1275 printf("[client] Subscribe success, returned status is '%s'\n", response);
1276 free(response);
1277 }
1278
1279 { /* publish ... */
1280 MsgUnit msgUnit;
1281 printf("[client] Publishing message 'HelloWorld' ...\n");
1282 msgUnit.key = strcpyAlloc("<key oid='HelloWorld'/>");
1283 msgUnit.content = strcpyAlloc("Some message payload");
1284 msgUnit.contentLen = strlen(msgUnit.content);
1285 msgUnit.qos =strcpyAlloc("<qos><persistent/></qos>");
1286 response = xa->publish(xa, &msgUnit, &xmlBlasterException);
1287 freeMsgUnitData(&msgUnit);
1288 if (*xmlBlasterException.errorCode != 0) {
1289 printf("[client] Caught exception in publish errorCode=%s, message=%s\n",
1290 xmlBlasterException.errorCode, xmlBlasterException.message);
1291 xa->disconnect(xa, 0, &xmlBlasterException);
1292 freeXmlBlasterAccessUnparsed(xa);
1293 exit(1);
1294 }
1295 printf("[client] Publish success, returned status is '%s'\n", response);
1296 free(response);
1297 }
1298
1299 { /* unSubscribe ... */
1300 const char *key = "<key oid='HelloWorld'/>";
1301 const char *qos = "<qos/>";
1302 printf("[client] UnSubscribe message 'HelloWorld' ...\n");
1303 response = xa->unSubscribe(xa, key, qos, &xmlBlasterException);
1304 if (response) {
1305 printf("[client] Unsubscribe success, returned status is '%s'\n", response);
1306 free(response);
1307 }
1308 else {
1309 printf("[client] Caught exception in unSubscribe errorCode=%s, message=%s\n",
1310 xmlBlasterException.errorCode, xmlBlasterException.message);
1311 xa->disconnect(xa, 0, &xmlBlasterException);
1312 freeXmlBlasterAccessUnparsed(xa);
1313 exit(1);
1314 }
1315 }
1316
1317 { /* get synchnronous ... */
1318 size_t i;
1319 const char *key = "<key queryType='XPATH'>//key</key>";
1320 const char *qos = "<qos/>";
1321 MsgUnitArr *msgUnitArr;
1322 printf("[client] Get synchronous messages with XPath '//key' ...\n");
1323 msgUnitArr = xa->get(xa, key, qos, &xmlBlasterException);
1324 if (*xmlBlasterException.errorCode != 0) {
1325 printf("[client] Caught exception in get errorCode=%s, message=%s\n",
1326 xmlBlasterException.errorCode, xmlBlasterException.message);
1327 xa->disconnect(xa, 0, &xmlBlasterException);
1328 freeXmlBlasterAccessUnparsed(xa);
1329 exit(1);
1330 }
1331 if (msgUnitArr != (MsgUnitArr *)0) {
1332 for (i=0; i<msgUnitArr->len; i++) {
1333 char *contentStr = strFromBlobAlloc(msgUnitArr->msgUnitArr[i].content,
1334 msgUnitArr->msgUnitArr[i].contentLen);
1335 const char *dots = (msgUnitArr->msgUnitArr[i].contentLen > 96) ?
1336 " ..." : "";
1337 printf("\n[client] Received message#%u/%u:\n"
1338 "-------------------------------------"
1339 "%s\n <content>%.100s%s</content>%s\n"
1340 "-------------------------------------\n",
1341 i+1, msgUnitArr->len,
1342 msgUnitArr->msgUnitArr[i].key,
1343 contentStr, dots,
1344 msgUnitArr->msgUnitArr[i].qos);
1345 free(contentStr);
1346 }
1347 freeMsgUnitArr(msgUnitArr);
1348 }
1349 else {
1350 printf("[client] Caught exception in get errorCode=%s, message=%s\n",
1351 xmlBlasterException.errorCode, xmlBlasterException.message);
1352 xa->disconnect(xa, 0, &xmlBlasterException);
1353 freeXmlBlasterAccessUnparsed(xa);
1354 exit(1);
1355 }
1356 }
1357
1358
1359 { /* erase ... */
1360 const char *key = "<key oid='HelloWorld'/>";
1361 const char *qos = "<qos/>";
1362 printf("[client] Erasing message 'HelloWorld' ...\n");
1363 response = xa->erase(xa, key, qos, &xmlBlasterException);
1364 if (*xmlBlasterException.errorCode != 0) {
1365 printf("[client] Caught exception in erase errorCode=%s, message=%s\n",
1366 xmlBlasterException.errorCode, xmlBlasterException.message);
1367 xa->disconnect(xa, 0, &xmlBlasterException);
1368 freeXmlBlasterAccessUnparsed(xa);
1369 exit(1);
1370 }
1371 printf("[client] Erase success, returned status is '%s'\n", response);
1372 free(response);
1373 }
1374
1375 if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
1376 printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
1377 xmlBlasterException.errorCode, xmlBlasterException.message);
1378 freeXmlBlasterAccessUnparsed(xa);
1379 exit(1);
1380 }
1381
1382 freeXmlBlasterAccessUnparsed(xa);
1383 if (numTests > 1) {
1384 printf("[client] Successfully finished test #%d from %d\n\n", ii, numTests);
1385 }
1386 }
1387 printf("[client] Good bye.\n");
1388 return 0; /*exit(0);*/
1389 }
1390 #endif /* #ifdef XmlBlasterAccessUnparsedMain */
syntax highlighted by Code2HTML, v. 0.9.1