1 /*----------------------------------------------------------------------------
2 Name: XmlBlasterAccess.c
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Wraps raw socket connection to xmlBlaster
6 Implements sync connection and async callback
7 Needs pthread to compile (multi threading).
8 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info>
9 Compile:
10 LINUX: gcc -DXmlBlasterAccessMain -D_ENABLE_STACK_TRACE_ -rdynamic -export-dynamic -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessMain XmlBlasterAccess.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterAccessUnparsed.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
11 g++ -DXmlBlasterAccessMain -DXMLBLASTER_C_COMPILE_AS_CPP -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessMain XmlBlasterAccess.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterAccessUnparsed.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
12 icc -DXmlBlasterAccessMain -D_ENABLE_STACK_TRACE_ -rdynamic -g -D_REENTRANT -I.. -o XmlBlasterAccessMain XmlBlasterAccess.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterAccessUnparsed.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
13 WIN: cl /MT /W4 -DXmlBlasterAccessMain -D_WINDOWS -I.. -I../pthreads /FeXmlBlasterAccessMain.exe XmlBlasterAccess.c ..\util\msgUtil.c ..\util\Properties.c xmlBlasterSocket.c XmlBlasterAccessUnparsed.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c ws2_32.lib pthreadVC2.lib
14 (download pthread for Windows and WinCE from http://sources.redhat.com/pthreads-win32)
15 Solaris: cc -DXmlBlasterAccessMain -v -Xc -g -D_REENTRANT -I.. -o XmlBlasterAccessMain XmlBlasterAccess.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterAccessUnparsed.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
16 CC -DXmlBlasterAccessMain -DXMLBLASTER_C_COMPILE_AS_CPP -g -D_REENTRANT -I.. -o XmlBlasterAccessMain XmlBlasterAccess.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterAccessUnparsed.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
17
18 Linux with libxmlBlasterC.so:
19 gcc -DXmlBlasterAccessMain -o XmlBlasterAccessMain XmlBlasterAccess.c -L../../../lib -lxmlBlasterClientC -I.. -Wl,-rpath=../../../lib -D_REENTRANT -lpthread
20 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
21 -----------------------------------------------------------------------------*/
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #if defined(WINCE)
26 # if defined(XB_USE_PTHREADS)
27 # include <pthreads/pthread.h>
28 # else
29 /*#include <pthreads/need_errno.h> */
30 static int errno=0; /* single threaded workaround*/
31 # endif
32 #else
33 # include <errno.h>
34 # include <sys/types.h>
35 #endif
36 #include <socket/xmlBlasterSocket.h>
37 #include <socket/xmlBlasterZlib.h>
38 #include <XmlBlasterAccess.h>
39 #include <util/XmlUtil.h>
40 #include <util/Timestampc.h>
41
42 static const int XBTYPE_PING=0;
43 static const int XBTYPE_POLL=1;
44
45 static XMLBLASTER_C_bool checkArgs(XmlBlasterAccess *xa, const char *methodName,
46 bool checkIsConnected, XmlBlasterException *exception);
47 static XMLBLASTER_C_bool checkPost(XmlBlasterAccess *xa, const char *methodName,
48 void *returnObj, XmlBlasterException *exception);
49
50 static XMLBLASTER_C_bool xmlBlasterIsStateOk(ReturnQos *returnQos);
51
52 static void xmlBlasterRegisterConnectionListener(struct XmlBlasterAccess *xa, ConnectionListenerCbFp cbFp, void *userData);
53
54 static XMLBLASTER_C_bool _initialize(XmlBlasterAccess *xa, XmlBlasterAccessUpdateFp update, XmlBlasterException *exception);
55 static ConnectReturnQos *xmlBlasterConnect(XmlBlasterAccess *xa, const ConnectQos * connectQos, XmlBlasterAccessUpdateFp update, XmlBlasterException *exception);
56 static XMLBLASTER_C_bool xmlBlasterDisconnect(XmlBlasterAccess *xa, const DisconnectQos * const disconnectQos, XmlBlasterException *exception);
57 static PublishReturnQos *xmlBlasterPublish(XmlBlasterAccess *xa, MsgUnit *msgUnit, XmlBlasterException *exception);
58 static PublishReturnQosArr *xmlBlasterPublishArr(XmlBlasterAccess *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
59 static void xmlBlasterPublishOneway(XmlBlasterAccess *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
60 static SubscribeReturnQos *xmlBlasterSubscribe(XmlBlasterAccess *xa, const SubscribeKey * subscribeKey, const SubscribeQos * subscribeQos, XmlBlasterException *exception);
61 static UnSubscribeReturnQosArr *xmlBlasterUnSubscribe(XmlBlasterAccess *xa, const UnSubscribeKey * unSubscribeKey, const UnSubscribeQos * unSubscribeQos, XmlBlasterException *exception);
62 static EraseReturnQosArr *xmlBlasterErase(XmlBlasterAccess *xa, const EraseKey * eraseKey, const EraseQos * eraseQos, XmlBlasterException *exception);
63 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccess *xa, const GetKey * const getKey, const GetQos * getQos, XmlBlasterException *exception);
64 static PingReturnQos *xmlBlasterPing(XmlBlasterAccess *xa, const PingQos * pingQos, XmlBlasterException *exception);
65 static XMLBLASTER_C_bool isConnected(XmlBlasterAccess *xa);
66
67 Dll_Export XmlBlasterAccess *getXmlBlasterAccess(int argc, const char* const* argv) {
68 XmlBlasterAccess * const xa = (XmlBlasterAccess *)calloc(1, sizeof(XmlBlasterAccess));
69 if (xa == 0) return xa;
70 xa->argc = argc;
71 xa->argv = argv;
72 xa->props = createProperties(xa->argc, xa->argv);
73 if (xa->props == 0) {
74 freeXmlBlasterAccess(xa);
75 return (XmlBlasterAccess *)0;
76 }
77 xa->isShutdown = false;
78 xa->connectionP = 0;
79 xa->userObject = 0; /* A client can use this pointer to point to any client specific information */
80 xa->connectionListenerCbFp = 0;
81 xa->pingPollTimer = createTimeout("PingPollTimer");
82 xa->userFp = 0;
83 xa->registerConnectionListener = xmlBlasterRegisterConnectionListener;
84 xa->connect = xmlBlasterConnect;
85 xa->disconnect = xmlBlasterDisconnect;
86 xa->publish = xmlBlasterPublish;
87 xa->publishArr = xmlBlasterPublishArr;
88 xa->publishOneway = xmlBlasterPublishOneway;
89 xa->subscribe = xmlBlasterSubscribe;
90 xa->unSubscribe = xmlBlasterUnSubscribe;
91 xa->erase = xmlBlasterErase;
92 xa->get = xmlBlasterGet;
93 xa->ping = xmlBlasterPing;
94 xa->isConnected = isConnected;
95
96 xa->pingInterval = 10000;
97 xa->retries = -1;
98 xa->delay = 5000;
99 xa->connnectionState = XBCONSTATE_UNDEF;
100
101 xa->logLevel = parseLogLevel(xa->props->getString(xa->props, "logLevel", "WARN"));
102 xa->log = xmlBlasterDefaultLogging;
103 xa->logUserP = 0;
104 return xa;
105 }
106
107 static void _freeConnectionP(XmlBlasterAccess *xa) {
108 XmlBlasterAccessUnparsed *cP = xa->connectionP;
109 if (xa == 0) return;
110 if (cP != 0) {
111 xa->connectionP = 0;
112 freeXmlBlasterAccessUnparsed(cP);
113 }
114 }
115
116 Dll_Export void freeXmlBlasterAccess(XmlBlasterAccess *xa)
117 {
118 if (xa == 0) {
119 char *stack = getStackTrace(10);
120 printf("[%s:%d] Please provide a valid XmlBlasterAccess pointer to freeXmlBlasterAccess() %s",
121 __FILE__, __LINE__, stack);
122 free(stack);
123 return;
124 }
125
126 if (xa->isShutdown) return; /* Avoid simultaneous multiple calls */
127 xa->isShutdown = true; /* Inhibit access to xa */
128
129 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterAccess() conP=0x%x", xa->connectionP);
130
131 freeXmlBlasterQos(xa->connectQos);
132 freeXmlBlasterReturnQos(xa->connectReturnQos);
133
134 freeTimeout(xa->pingPollTimer);
135 xa->pingPollTimer = 0;
136
137 _freeConnectionP(xa);
138
139 freeProperties(xa->props);
140 free(xa);
141 }
142
143 static void xmlBlasterRegisterConnectionListener(struct XmlBlasterAccess *xa, ConnectionListenerCbFp cbFp, void *userData) {
144 xa->connectionListenerCbFp = cbFp;
145 xa->connectionListenerUserData = userData;
146 }
147
148 Dll_Export const char *connectionStateToStr(XBCONSTATE state) {
149 if (state == XBCONSTATE_SOCKALIVE)
150 return "SOCKALIVE";
151 else if (state == XBCONSTATE_LOGGEDINALIVE)
152 return "LOGGEDINALIVE";
153 else if (state == XBCONSTATE_POLLING)
154 return "POLLING";
155 else if (state == XBCONSTATE_DEAD)
156 return "DEAD";
157 return "UNDEF";
158 }
159
160 static int changeConnectionStateTo(XmlBlasterAccess *xa, XBCONSTATE newState, XmlBlasterException *exception) {
161 ConnectionListenerCbFp cb = xa->connectionListenerCbFp;
162 XBCONSTATE oldState = xa->connnectionState;
163 xa->connnectionState = newState;
164
165 /* Ignore same states */
166 if (oldState == newState)
167 return newState;
168
169 /* Logging only */
170 /*
171 if ((oldState == XBCONSTATE_ALIVE || oldState == XBCONSTATE_LOGGEDIN)
172 && newState != XBCONSTATE_ALIVE && newState != XBCONSTATE_LOGGEDIN) {
173 if (exception != 0)
174 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "New connectionState=%s, errorCode=%s message=%s",
175 conStateToStr(newState), exception->errorCode, exception->message);
176 }
177 */
178 if (exception != 0 && *exception->errorCode != 0)
179 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "New connectionState=%s, errorCode=%s message=%s",
180 connectionStateToStr(newState), exception->errorCode, exception->message);
181 else
182 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Transition connectionState %s to %s",
183 connectionStateToStr(oldState), connectionStateToStr(newState));
184
185 /* Notify user */
186 if (cb != 0) {
187 cb(xa, oldState, newState, exception, xa->connectionListenerUserData);
188 }
189
190 return newState;
191 }
192
193 /**
194 * Has the connect() method successfully passed?
195 * <p>
196 * Note that this contains no information about the current connection state
197 * of the protocol layer.
198 * </p>
199 * @return true If the connection() method was invoked without exception
200 */
201 static XMLBLASTER_C_bool isConnected(XmlBlasterAccess *xa)
202 {
203 if (xa == 0 || xa->isShutdown/* || xa->connectionP == 0*/) {
204 return false;
205 }
206 return xa->connectReturnQos != 0; /*xa->connectionP->isConnected(xa->connectionP);*/
207 }
208
209
210 Dll_Export const char *XmlBlasterAccessUsage(char *usage)
211 {
212 /* take care not to exceed XMLBLASTER_MAX_USAGE_LEN */
213 char tmp[XMLBLASTER_MAX_USAGE_LEN];
214 xmlBlasterAccessUnparsedUsage(tmp);
215 SNPRINTF(usage, XMLBLASTER_MAX_USAGE_LEN, "%.1600s,%.300s%.146s", tmp,
216 "\n -dispatch/connection/pingInterval"
217 "\n Pinging every given milliseconds [10000]"
218 "\n 0 switches pinging off"
219 "\n -dispatch/connection/retries"
220 "\n How often to retry if connection fails (-1 is forever) [-1]"
221 "\n Set to -1 for failsafe operation",
222 "\n -dispatch/connection/delay"
223 "\n Delay between connection retries in milliseconds [5000]"
224 "\n A delay value > 0 switches fails save mode on, 0 switches it off"
225 );
226
227 return usage;
228 }
229
230
231 Dll_Export Key *createXmlBlasterKey(const char * keyP) {
232 Key *key = (Key *)calloc(1, sizeof(Key));
233 if (key == 0) return key;
234 if (keyP != 0)
235 key->key = strcpyAlloc(keyP);
236 return key;
237 }
238
239
240 Dll_Export Qos *createXmlBlasterQos(const char * qosP) {
241 Qos *qos = (Qos *)calloc(1, sizeof(Qos));
242 if (qos == 0) return qos;
243 if (qosP != 0)
244 qos->qos = strcpyAlloc(qosP);
245 else
246 qos->qos = strcpyAlloc("<qos/>");
247 return qos;
248 }
249
250 /**
251 * Aware: Uses allocated qosXml for returnQos->returnQos
252 * so you don't need to free qosXml anymore.
253 * Freeing of ReturnQos will free qosXml
254 */
255 Dll_Export ReturnQos *createXmlBlasterReturnQos(const char * qosXml) {
256 ReturnQos *returnQos = (ReturnQos *)calloc(1, sizeof(ReturnQos));
257 if (returnQos == 0) return 0;
258 if (qosXml != 0) {
259 returnQos->returnQos = (char *)qosXml;
260 /*returnQos->returnQos = strcpyAlloc(qosXml);*/
261 returnQos->isOk = xmlBlasterIsStateOk;
262 }
263 return returnQos;
264 }
265
266
267 /**
268 * Aware: Frees QosArr
269 */
270 Dll_Export ReturnQosArr *createXmlBlasterReturnQosArr(QosArr * qosArr) {
271 ReturnQosArr *qos = (ReturnQosArr *)calloc(1, sizeof(ReturnQosArr));
272 if (qos == 0) return qos;
273 if (qosArr != 0) {
274 size_t i;
275 qos->len = qosArr->len;
276 qos->returnQosArr = (ReturnQos*)calloc(1, qos->len*sizeof(ReturnQos));
277 for (i=0; i<qos->len; i++) {
278 /*qos->returnQosArr[i] = (ReturnQos)calloc(sizeof(ReturnQos));*/
279 qos->returnQosArr[i].isOk = xmlBlasterIsStateOk;
280 qos->returnQosArr[i].returnQos = strcpyAlloc(qosArr->qosArr[i]);
281 }
282 freeQosArr(qosArr);
283 }
284 return qos;
285 }
286
287 static XMLBLASTER_C_bool xmlBlasterIsStateOk(ReturnQos *returnQos) {
288 if (returnQos == 0) return false;
289 return true; /* todo: parse qos xml markup */
290 }
291
292 Dll_Export void freeXmlBlasterKey(Key * key) {
293 if (key == 0) return;
294 if (key->key != 0) {
295 free(key->key);
296 key->key = 0;
297 }
298 free(key);
299 }
300
301 Dll_Export void freeXmlBlasterQos(Qos * qos) {
302 if (qos == 0) return;
303 if (qos->qos != 0) {
304 free(qos->qos);
305 qos->qos = 0;
306 }
307 free(qos);
308 }
309
310 static void freeXmlBlasterReturnQos_(ReturnQos * returnQos, bool freeContainerAsWell) {
311 if (returnQos == 0) return;
312 if (returnQos->returnQos != 0) {
313 free(returnQos->returnQos);
314 }
315 if (freeContainerAsWell)
316 free(returnQos);
317 }
318
319 Dll_Export void freeXmlBlasterReturnQosArr(ReturnQosArr * returnQosArr) {
320 if (returnQosArr == 0) return;
321 if (returnQosArr->returnQosArr != 0) {
322 size_t i;
323 for (i=0; i<returnQosArr->len; i++) {
324 ReturnQos *returnQos = &returnQosArr->returnQosArr[i];
325 freeXmlBlasterReturnQos_(returnQos, false);
326 }
327 free(returnQosArr->returnQosArr);
328 returnQosArr->returnQosArr = 0;
329 returnQosArr->len = 0;
330 }
331 free(returnQosArr);
332 }
333
334 Dll_Export extern void freeXmlBlasterReturnQos(ReturnQos * returnQos) {
335 freeXmlBlasterReturnQos_(returnQos, true);
336 }
337
338 /**
339 * Delegates callback to client
340 */
341 static XMLBLASTER_C_bool _myUpdate(MsgUnitArr *msgUnitArr, void *userData,
342 XmlBlasterException *exception)
343 {
344 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
345 XmlBlasterAccess *xb = (XmlBlasterAccess *)xa->userObject;
346 if (xb->clientsUpdateFp != 0)
347 return xb->clientsUpdateFp(xb, msgUnitArr, exception);
348 return true;
349 }
350
351 static XMLBLASTER_C_bool _initialize(XmlBlasterAccess *xa, XmlBlasterAccessUpdateFp clientUpdateFp, XmlBlasterException *exception)
352 {
353 UpdateFp myUpdate = 0;
354 if (checkArgs(xa, "initialize", false, exception) == false) return false;
355
356 if (xa->connectionP) {
357 return true;
358 }
359
360 xa->clientsUpdateFp = clientUpdateFp;
361 if (xa->clientsUpdateFp != 0)
362 myUpdate = _myUpdate;
363
364 _freeConnectionP(xa);
365 xa->connectionP = getXmlBlasterAccessUnparsed(xa->argc, xa->argv);
366 if (xa->connectionP == 0) {
367 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
368 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
369 "[%.100s:%d] Creating XmlBlasterAccessUnparsed failed", __FILE__, __LINE__);
370 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
371 return false;
372 }
373 xa->connectionP->log = xa->log;
374 xa->connectionP->logUserP = xa->logUserP;
375 xa->connectionP->userObject = xa;
376 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Created XmlBlasterAccessUnparsed");
377
378 /*
379 -dispatch/connection/pingInterval
380 Pinging every given milliseconds [10000]
381 0 switches pinging off
382 -dispatch/connection/retries
383 How often to retry if connection fails (-1 is forever) [-1]
384 Set to -1 for failsafe operation
385 -dispatch/connection/delay
386 Delay between connection retries in milliseconds [5000]
387 A delay value > 0 switches fails save mode on, 0 switches it off
388 */
389 /*<queue relating='connection'><address type="socket" pingInterval='0' retries='-1' delay='10000'/></queue>*/
390 xa->pingInterval = xmlBlasterExtractAttributeLong(xa->connectQos->qos, "address", "pingInterval",
391 xa->connectionP->props->getLong(xa->connectionP->props, "dispatch/connection/pingInterval", 10000));
392 xa->retries = xmlBlasterExtractAttributeLong(xa->connectQos->qos, "address", "retries",
393 xa->connectionP->props->getLong(xa->connectionP->props, "dispatch/connection/retries", -1));
394 xa->delay = xmlBlasterExtractAttributeLong(xa->connectQos->qos, "address", "delay",
395 xa->connectionP->props->getLong(xa->connectionP->props, "dispatch/connection/delay", 5000));
396
397 /* Establish low level IP connection */
398 if (xa->connectionP->initialize(xa->connectionP, myUpdate, exception) == false) {
399 checkPost(xa, "initialize", 0, exception);
400 return false;
401 }
402 checkPost(xa, "initialize", 0, exception);
403
404 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
405 "initialize() successful");
406 return true;
407 }
408
409 /** Called internally only */
410 static ConnectReturnQos *_xmlBlasterReConnect(XmlBlasterAccess *xa, XmlBlasterAccessUpdateFp clientUpdateFp, XmlBlasterException *exception)
411 {
412 UpdateFp myUpdate = 0;
413 char *response = 0;
414 if (_initialize(xa, clientUpdateFp, exception) == false) {
415 return 0;
416 }
417
418 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Invoking connect()");
419
420 xa->clientsUpdateFp = clientUpdateFp;
421 if (xa->clientsUpdateFp != 0)
422 myUpdate = _myUpdate;
423
424 response = xa->connectionP->connect(xa->connectionP, (xa->connectQos==0)?0:xa->connectQos->qos, myUpdate, exception);
425
426 freeXmlBlasterReturnQos(xa->connectReturnQos);
427 xa->connectReturnQos = 0;
428
429 if (*exception->errorCode == 0) {
430 xa->connectReturnQos = createXmlBlasterReturnQos(response); /* reuses response memory */
431 }
432
433 /* must have new connectReturnQos for the callback function */
434 if (checkPost(xa, "connect", response, exception) == false ) return 0;
435
436 return xa->connectReturnQos;
437 }
438
439 static void onPingPollTimeout(Timeout *timeout, void *userData, void *userData2) {
440 XmlBlasterAccess *xa = (XmlBlasterAccess *)userData;
441 int type = (int)(*((int*)userData2)); /* XBTYPE_PING=0 | XBTYPE_POLL=1 */
442 char timeStr[64];
443 /*ConnectionListenerCbFp cb = xa->connectionListenerCbFp;*/
444
445 if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
446 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
447 "%s Timeout occurred, timer=%s delay=%ld type=%s\n",
448 getCurrentTimeStr(timeStr, 64), timeout->name,
449 timeout->timeoutContainer.delay, (type==XBTYPE_PING?"PING":"POLL"));
450
451 if (type == XBTYPE_PING) {
452 PingQos *pingQos = 0;
453 PingReturnQos *pingReturnQos = 0;
454 XmlBlasterException exception;
455 pingReturnQos = xa->ping(xa, pingQos, &exception); /* Does error handling */
456 freeXmlBlasterReturnQos(pingReturnQos);
457 /*
458 if (*exception.errorCode != 0) {
459 if (xa->logLevel>=XMLBLASTER_LOG_WARN)
460 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN,
461 __FILE__, "Ping failed: %s %s",
462 exception.errorCode, exception.message);
463 cb(xa, XBCONSTATE_ALIVE, XBCONSTATE_POLLING, &exception, xa->connectionListenerUserData);
464 }
465 else {
466 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Ping success");
467 }
468 */
469 }
470
471 else if (type == XBTYPE_POLL) {
472 XmlBlasterException xmlBlasterException;
473 initializeXmlBlasterException(&xmlBlasterException);
474 if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
475 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE,
476 __FILE__, "Polling again (every %ld milli seconds)", xa->delay);
477 _freeConnectionP(xa);
478 _xmlBlasterReConnect(xa, xa->clientsUpdateFp, &xmlBlasterException);
479 }
480 }
481
482 /** Called by user code */
483 static ConnectReturnQos *xmlBlasterConnect(XmlBlasterAccess *xa, const ConnectQos * connectQos,
484 XmlBlasterAccessUpdateFp clientUpdateFp, XmlBlasterException *exception)
485 {
486 ConnectReturnQos *ret = 0;
487 if (checkArgs(xa, "connect", false, exception) == false) return 0;
488
489 if (connectQos == 0) {
490 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
491 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'connectQos' to connect()", __FILE__, __LINE__);
492 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
493 return false;
494 }
495
496 if (xa->connectQos != 0) freeXmlBlasterQos(xa->connectQos);
497 xa->connectQos = createXmlBlasterQos(connectQos->qos); /* take a clone */
498
499 ret = _xmlBlasterReConnect(xa, clientUpdateFp, exception);
500
501 /* return a clone */
502 return (ret==0) ? 0 : createXmlBlasterReturnQos(strcpyAlloc(ret->returnQos));
503 }
504
505 static XMLBLASTER_C_bool xmlBlasterDisconnect(XmlBlasterAccess *xa, const DisconnectQos * const disconnectQos, XmlBlasterException *exception)
506 {
507 XMLBLASTER_C_bool p;
508 if (checkArgs(xa, "disconnect", true, exception) == false ) return 0;
509 p = xa->connectionP->disconnect(xa->connectionP, (disconnectQos==0)?0:disconnectQos->qos, exception);
510 if (checkPost(xa, "disconnect", 0, exception) == false ) return 0;
511 return p;
512 }
513
514 /**
515 * Publish a message to the server.
516 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
517 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
518 * @see XmlBlasterAccessUnparsed#publish() for a function documentation
519 */
520 static PublishReturnQos *xmlBlasterPublish(XmlBlasterAccess *xa, MsgUnit *msgUnit, XmlBlasterException *exception)
521 {
522 char *p;
523 if (checkArgs(xa, "publish", true, exception) == false ) return 0;
524 p = xa->connectionP->publish(xa->connectionP, msgUnit, exception);
525 if (checkPost(xa, "publish", p, exception) == false ) return 0;
526 return createXmlBlasterReturnQos(p);
527 }
528
529 /**
530 * Publish a message array in a bulk to the server.
531 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
532 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
533 * @see XmlBlasterAccessUnparsed#publishArr() for a function documentation
534 */
535 static PublishReturnQosArr *xmlBlasterPublishArr(XmlBlasterAccess *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
536 {
537 QosArr *p;
538 if (checkArgs(xa, "publishArr", true, exception) == false ) return 0;
539 p = xa->connectionP->publishArr(xa->connectionP, msgUnitArr, exception);
540 if (checkPost(xa, "publishArr", p, exception) == false ) return 0;
541 return createXmlBlasterReturnQosArr(p);
542 }
543
544 /**
545 * Publish a message array in a bulk to the server, we don't receive an ACK.
546 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
547 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
548 * @see XmlBlasterAccessUnparsed#publishOneway() for a function documentation
549 */
550 static void xmlBlasterPublishOneway(XmlBlasterAccess *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
551 {
552 if (checkArgs(xa, "publishOneway", true, exception) == false ) return;
553 xa->connectionP->publishOneway(xa->connectionP, msgUnitArr, exception);
554 if (checkPost(xa, "publishOneway", 0, exception)) return;
555 }
556
557 /**
558 * Subscribe a message.
559 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.subscribe.html
560 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
561 */
562 static SubscribeReturnQos *xmlBlasterSubscribe(XmlBlasterAccess *xa, const SubscribeKey * subscribeKey, const SubscribeQos * subscribeQos, XmlBlasterException *exception)
563 {
564 char *p;
565 if (checkArgs(xa, "subscribe", true, exception) == false ) return 0;
566 p = xa->connectionP->subscribe(xa->connectionP, (subscribeKey==0)?0:subscribeKey->key, (subscribeQos==0)?0:subscribeQos->qos, exception);
567 if (checkPost(xa, "subscribe", p, exception) == false ) return 0;
568 return createXmlBlasterReturnQos(p);
569 }
570
571 /**
572 * UnSubscribe a message from the server.
573 * @return The raw QoS XML strings returned from xmlBlaster, only NULL if an exception is thrown
574 * You need to free it with freeQosArr() after usage
575 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.unSubscribe.html
576 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
577 */
578 static UnSubscribeReturnQosArr *xmlBlasterUnSubscribe(XmlBlasterAccess *xa, const UnSubscribeKey * unSubscribeKey, const UnSubscribeQos * unSubscribeQos, XmlBlasterException *exception)
579 {
580 QosArr *p;
581 if (checkArgs(xa, "unSubscribe", true, exception) == false ) return 0;
582 p = xa->connectionP->unSubscribe(xa->connectionP, (unSubscribeKey==0)?0:unSubscribeKey->key, (unSubscribeQos==0)?0:unSubscribeQos->qos, exception);
583 if (checkPost(xa, "unSubscribe", p, exception) == false ) return 0;
584 return createXmlBlasterReturnQosArr(p);
585 }
586
587 /**
588 * Erase a message from the server.
589 * @return A struct holding the raw QoS XML strings returned from xmlBlaster,
590 * only NULL if an exception is thrown.
591 * You need to freeQosArr() it
592 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.erase.html
593 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
594 */
595 static EraseReturnQosArr *xmlBlasterErase(XmlBlasterAccess *xa, const EraseKey * eraseKey, const EraseQos * eraseQos, XmlBlasterException *exception)
596 {
597 QosArr *p;
598 if (checkArgs(xa, "erase", true, exception) == false ) return 0;
599 p = xa->connectionP->erase(xa->connectionP, (eraseKey==0)?0:eraseKey->key, (eraseQos==0)?0:eraseQos->qos, exception);
600 if (checkPost(xa, "erase", p, exception) == false ) return 0;
601 return createXmlBlasterReturnQosArr(p);
602 }
603
604 /**
605 * Ping the server.
606 * @param xa The 'this' pointer
607 * @param qos The QoS or 0
608 * @param exception *errorCode!=0 on failure
609 * @return The ping return QoS raw xml string, you need to free() it
610 * or 0 on failure (in which case *exception.errorCode!=0)
611 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
612 */
613 static PingReturnQos *xmlBlasterPing(XmlBlasterAccess *xa, const PingQos * pingQos, XmlBlasterException *exception)
614 {
615 char *p;
616 if (checkArgs(xa, "ping", true, exception) == false ) return 0;
617 p = xa->connectionP->ping(xa->connectionP, (pingQos==0)?0:pingQos->qos, exception);
618 if (checkPost(xa, "ping", p, exception) == false ) return 0;
619 return createXmlBlasterReturnQos(p);
620 }
621
622 /**
623 * Get a message.
624 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.get.html
625 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
626 * @return NULL on error, please check exception in such a case
627 */
628 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccess *xa, const GetKey * const getKey, const GetQos * getQos, XmlBlasterException *exception)
629 {
630 MsgUnitArr *msgUnitArr;
631 if (checkArgs(xa, "get", true, exception) == false ) return 0;
632 msgUnitArr = xa->connectionP->get(xa->connectionP, (getKey==0)?0:getKey->key, (getQos==0)?0:getQos->qos, exception);
633 if (checkPost(xa, "get", msgUnitArr, exception) == false ) return 0;
634 return msgUnitArr;
635 }
636
637 /**
638 * Post processing of remote calls: check connection status and lauch ping or poll thread.
639 */
640 static XMLBLASTER_C_bool checkPost(XmlBlasterAccess *xa, const char *methodName,
641 void *returnObj, XmlBlasterException *exception)
642 {
643 /* Success: No exception */
644 if (exception == 0 || *exception->errorCode == 0) {
645 if (!strcmp("initialize", methodName)) {
646 /* raw socket connected */
647 changeConnectionStateTo(xa, XBCONSTATE_SOCKALIVE, exception);
648 return true;
649 }
650 if (xa->pingInterval > 0 && xa->connnectionState != XBCONSTATE_LOGGEDINALIVE
651 && !strcmp("connect", methodName)) {
652 if (xa->logLevel>=XMLBLASTER_LOG_INFO)
653 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO,
654 __FILE__, "Connected to xmlBlaster server, pinging now every %ld milli seconds", xa->pingInterval);
655 /* reset timer */
656 /*xa->pingPollTimer->setTimeoutListener(xa->pingPollTimer, onPingPollTimeout, 0, xa, (void*)&XBTYPE_PING);*/
657 /* start pinging */
658 xa->pingPollTimer->setTimeoutListener(xa->pingPollTimer, onPingPollTimeout, xa->pingInterval, xa, (void*)&XBTYPE_PING);
659 }
660 changeConnectionStateTo(xa, XBCONSTATE_LOGGEDINALIVE, exception);
661 return true;
662 }
663
664 /* Exception occurred */
665 if (xa->retries == -1 || xa->retries > 0) {
666 /* start polling */
667 if (xa->connnectionState != XBCONSTATE_POLLING) {
668 if (xa->logLevel>=XMLBLASTER_LOG_WARN)
669 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN,
670 __FILE__, "Lost server connection during %s, polling now every %ld milli seconds", methodName, xa->delay);
671 _freeConnectionP(xa);
672 changeConnectionStateTo(xa, XBCONSTATE_POLLING, exception);
673 /* reset timer */
674 /*xa->pingPollTimer->setTimeoutListener(xa->pingPollTimer, onPingPollTimeout, 0, xa, (void*)&XBTYPE_POLL);*/
675 /* start poller */
676 xa->pingPollTimer->setTimeoutListener(xa->pingPollTimer, onPingPollTimeout, xa->delay, xa, (void*)&XBTYPE_POLL);
677 }
678 }
679 else {
680 if (xa->logLevel>=XMLBLASTER_LOG_ERROR)
681 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR,
682 __FILE__, "Lost server connection during %s, giving up as no fail safe mode is configured (-dispatch/connection/retries -1).", methodName);
683 /* stop timer */
684 xa->pingPollTimer->setTimeoutListener(xa->pingPollTimer, onPingPollTimeout, 0, xa, (void*)&XBTYPE_POLL);
685 changeConnectionStateTo(xa, XBCONSTATE_DEAD, exception);
686 }
687
688 if (returnObj != 0)
689 printf("**** TODO Check if we shall free returnObj!!!\n");
690 return false;
691 }
692
693 static XMLBLASTER_C_bool checkArgs(XmlBlasterAccess *xa, const char *methodName,
694 bool checkIsConnected, XmlBlasterException *exception)
695 {
696 if (xa == 0) {
697 char *stack = getStackTrace(10);
698 if (exception == 0) {
699 printf("[%s:%d] Please provide a valid XmlBlasterAccess pointer to %s() %s",
700 __FILE__, __LINE__, methodName, stack);
701 }
702 else {
703 strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
704 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
705 "[%.100s:%d] Please provide a valid XmlBlasterAccess pointer to %.16s() %s",
706 __FILE__, __LINE__, methodName, stack);
707 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
708 }
709 free(stack);
710 return false;
711 }
712
713 if (exception == 0) {
714 char *stack = getStackTrace(10);
715 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s",
716 __FILE__, __LINE__, methodName, stack);
717 free(stack);
718 return false;
719 }
720
721 initializeXmlBlasterException(exception);
722
723 if (strcmp("connect", methodName) && strcmp("initialize", methodName) && xa->connectionP == 0) {
724 if (xa->retries == -1 || xa->retries > 0) {
725 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "[%s:%d] We are polling %s",
726 __FILE__, __LINE__, methodName);
727 }
728 else {
729 char *stack = getStackTrace(10);
730 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] No valid connectionP pointer %s() %s",
731 __FILE__, __LINE__, methodName, stack);
732 free(stack);
733 }
734 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
735 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
736 "[%.100s:%d] No connection to xmlBlaster, %s() connectionState=%s",
737 __FILE__, __LINE__, methodName, connectionStateToStr(xa->connnectionState));
738 return false;
739 }
740
741 if (xa->isShutdown || (checkIsConnected && !xa->isConnected(xa))) {
742 char *stack = getStackTrace(10);
743 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
744 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
745 "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s",
746 __FILE__, __LINE__, methodName, stack);
747 free(stack);
748 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
749 return false;
750 }
751
752 return true;
753 }
syntax highlighted by Code2HTML, v. 0.9.1