1 /*----------------------------------------------------------------------------
2 Name: XmlBlasterConnectionUnparsed.c
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Wraps raw socket connection to xmlBlaster
6 for complete synchronous xmlBlaster access,
7 without callbacks and not threading necessary
8 socket connect timeout may be specified ja, bj
9 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info>
10 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
11 -----------------------------------------------------------------------------*/
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <ctype.h> /* isalpha() */
16 #if defined(WINCE)
17 # if defined(XB_USE_PTHREADS)
18 # include <pthreads/pthread.h>
19 # else
20 /*#include <pthreads/need_errno.h> */
21 static int errno=0; /* single threaded workaround*/
22 # endif
23 #else
24 # include <errno.h>
25 # include <sys/types.h>
26 #endif
27 #include <socket/xmlBlasterSocket.h>
28 #include <socket/xmlBlasterZlib.h>
29 #include <XmlBlasterConnectionUnparsed.h>
30 #include <util/Timestampc.h>
31 #define SOCKET_TCP false
32
33 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception);
34 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception);
35 static bool getResponse(XmlBlasterConnectionUnparsed *xb, SocketDataHolder *responseSocketDataHolder, XmlBlasterException *exception, bool udp);
36 static char *xmlBlasterConnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
37 static bool xmlBlasterDisconnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
38 static char *xmlBlasterPublish(XmlBlasterConnectionUnparsed *xb, MsgUnit *msgUnit, XmlBlasterException *exception);
39 static QosArr *xmlBlasterPublishArr(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
40 static void xmlBlasterPublishOneway(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
41 static char *xmlBlasterSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
42 static QosArr *xmlBlasterUnSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
43 static QosArr *xmlBlasterErase(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
44 static MsgUnitArr *xmlBlasterGet(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
45 static char *xmlBlasterPing(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
46 static bool isConnected(XmlBlasterConnectionUnparsed *xb);
47 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb);
48 static ssize_t writenPlain(void *xb, const int fd, const char *ptr, const size_t nbytes);
49 static ssize_t writenCompressed(void *xb, const int fd, const char *ptr, const size_t nbytes);
50 static ssize_t readnPlain(void *xb, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2);
51 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2);
52 static bool checkArgs(XmlBlasterConnectionUnparsed *xb, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
53
54 /**
55 * Create a new instance to handle a synchronous connection to the server.
56 * This is usually the first call of a client.
57 * @return NULL if bootstrapping failed. If not NULL you need to free() it when you are done
58 * usually by calling freeXmlBlasterConnectionUnparsed().
59 */
60 XmlBlasterConnectionUnparsed *getXmlBlasterConnectionUnparsed(int argc, const char* const* argv) {
61 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)calloc(1, sizeof(XmlBlasterConnectionUnparsed));
62 if (xb == 0) return xb;
63 xb->argc = argc;
64 xb->argv = argv;
65 xb->props = createProperties(xb->argc, xb->argv);
66 if (xb->props == 0) {
67 freeXmlBlasterConnectionUnparsed(&xb);
68 return (XmlBlasterConnectionUnparsed *)0;
69 }
70 #ifdef __IPhoneOS__
71 xb->cfSocketRef = nil;
72 xb->readStream = nil;
73 xb->writeStream = nil;
74 #endif
75 xb->socketToXmlBlaster = -1;
76 xb->socketToXmlBlasterUdp = -1;
77 xb->isInitialized = false;
78 *xb->secretSessionId = 0;
79 xb->initConnection = initConnection;
80 xb->initQueue = xmlBlasterInitQueue;
81 xb->connect = xmlBlasterConnect;
82 xb->disconnect = xmlBlasterDisconnect;
83 xb->publish = xmlBlasterPublish;
84 xb->publishArr = xmlBlasterPublishArr;
85 xb->publishOneway = xmlBlasterPublishOneway;
86 xb->subscribe = xmlBlasterSubscribe;
87 xb->unSubscribe = xmlBlasterUnSubscribe;
88 xb->erase = xmlBlasterErase;
89 xb->get = xmlBlasterGet;
90 xb->ping = xmlBlasterPing;
91 xb->isConnected = isConnected;
92 xb->shutdown = xmlBlasterConnectionShutdown;
93 xb->preSendEvent = 0;
94 xb->preSendEvent_userP = 0;
95 xb->postSendEvent = 0;
96 xb->postSendEvent_userP = 0;
97 xb->queueP = 0;
98 xb->logLevel = parseLogLevel(xb->props->getString(xb->props, "logLevel", "WARN"));
99 xb->log = xmlBlasterDefaultLogging;
100 xb->logUserP = 0;
101 xb->useUdpForOneway = false;
102 xb->writeToSocket.writeToSocketFuncP = 0;
103 xb->writeToSocket.userP = xb;
104 xb->zlibWriteBuf = 0;
105 xb->readFromSocket.readFromSocketFuncP = 0;
106 xb->readFromSocket.userP = xb;
107 xb->zlibReadBuf = 0;
108 return xb;
109 }
110
111 void freeXmlBlasterConnectionUnparsed(XmlBlasterConnectionUnparsed **xb_)
112 {
113 XmlBlasterConnectionUnparsed *xb = *xb_;
114 if (xb != 0) {
115 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterConnectionUnparsed 0x%x", xb);
116 freeProperties(xb->props);
117 if (xb->zlibWriteBuf) {
118 xmlBlaster_endZlibWriter(xb->zlibWriteBuf);
119 free(xb->zlibWriteBuf);
120 xb->zlibWriteBuf = 0;
121 }
122 if (xb->zlibReadBuf) {
123 xmlBlaster_endZlibReader(xb->zlibReadBuf);
124 free(xb->zlibReadBuf);
125 xb->zlibReadBuf = 0;
126 }
127 xmlBlasterConnectionShutdown(xb);
128 free(xb);
129 *xb_ = 0;
130 }
131 }
132
133 /**
134 * Connects on TCP/IP level to xmlBlaster
135 * @return true If the low level TCP/IP connect to xmlBlaster succeeded
136 */
137 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception)
138 {
139 const char *servTcpPort = 0;
140
141 struct sockaddr_in xmlBlasterAddr;
142 struct hostent hostbuf, *hostP = 0;
143 struct servent *portP = 0;
144
145 size_t hstbuflen=0;
146
147 char serverHostName[256];
148 char errP[MAX_ERRNO_LEN];
149
150 #if defined(_WINDOWS)
151 WORD wVersionRequested;
152 WSADATA wsaData;
153 int err;
154 wVersionRequested = MAKEWORD( 2, 2 );
155 err = WSAStartup( wVersionRequested, &wsaData );
156 if ( err != 0 ) {
157 strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
158 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL", __FILE__, __LINE__);
159 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
160 return false;
161 }
162
163 if ( LOBYTE( wsaData.wVersion ) != 2 ||
164 HIBYTE( wsaData.wVersion ) != 2 ) {
165 WSACleanup( );
166 strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
167 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL which supports version 2.2", __FILE__, __LINE__);
168 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
169 return false;
170 }
171 # endif
172 *errP = 0;
173
174 if (xb->isInitialized) {
175 return true;
176 }
177
178 { /* Switch on compression? */
179 const char *compressType = xb->props->getString(xb->props, "plugin/socket/compress/type", "");
180 compressType = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/compress/type", compressType);
181
182 if (!strcmp(compressType, "zlib:stream")) {
183
184 xb->zlibWriteBuf = (XmlBlasterZlibWriteBuffers *)malloc(sizeof(struct XmlBlasterZlibWriteBuffers));
185 xb->zlibReadBuf = (XmlBlasterZlibReadBuffers *)malloc(sizeof(struct XmlBlasterZlibReadBuffers));
186
187 if (xmlBlaster_initZlibWriter(xb->zlibWriteBuf) != 0/*Z_OK*/) {
188 if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
189 "Failed switching on 'plugin/socket/compress/type=%s'", compressType);
190 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
191 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
192 "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'",
193 __FILE__, __LINE__, compressType);
194 free(xb->zlibWriteBuf);
195 xb->zlibWriteBuf = 0;
196 free(xb->zlibReadBuf);
197 xb->zlibReadBuf = 0;
198 return false;
199 }
200
201 if (xmlBlaster_initZlibReader(xb->zlibReadBuf) != 0/*Z_OK*/) {
202 if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
203 "Failed switching on 'plugin/socket/compress/type=%s'", compressType);
204 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
205 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
206 "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'",
207 __FILE__, __LINE__, compressType);
208 free(xb->zlibWriteBuf);
209 xb->zlibWriteBuf = 0;
210 free(xb->zlibReadBuf);
211 xb->zlibReadBuf = 0;
212 return false;
213 }
214
215 if (xb->logLevel>=XMLBLASTER_LOG_DUMP) {
216 xb->zlibWriteBuf->debug = true;
217 xb->zlibReadBuf->debug = true;
218 }
219
220 if (!xb->writeToSocket.writeToSocketFuncP) { /* Accept setting from XmlBlasterAccessUnparsed */
221 xb->writeToSocket.writeToSocketFuncP = writenCompressed;
222 xb->readFromSocket.readFromSocketFuncP = readnCompressed;
223 }
224 }
225 else {
226 if (strcmp(compressType, "")) {
227 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode.", compressType);
228 }
229 if (!xb->writeToSocket.writeToSocketFuncP) { /* Accept setting from XmlBlasterAccessUnparsed */
230 xb->writeToSocket.writeToSocketFuncP = writenPlain;
231 xb->readFromSocket.readFromSocketFuncP = readnPlain;
232 }
233 }
234 }
235
236
237 servTcpPort = xb->props->getString(xb->props, "plugin/socket/port", "7607");
238 servTcpPort = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/port", servTcpPort);
239
240 strncpy0(serverHostName, "localhost", 250);
241 gethostname(serverHostName, 250);
242 {
243 const char *hn = xb->props->getString(xb->props, "plugin/socket/hostname", serverHostName);
244 memmove(serverHostName, hn, strlen(hn)+1); /* including '\0' */
245 hn = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/hostname", serverHostName);
246 memmove(serverHostName, hn, strlen(hn)+1);
247 }
248
249 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
250 "Lookup xmlBlaster on -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %s ...",
251 serverHostName, servTcpPort);
252
253 *xb->secretSessionId = 0;
254 memset((char *)&xmlBlasterAddr, 0, sizeof(xmlBlasterAddr));
255 xmlBlasterAddr.sin_family=AF_INET;
256
257 # ifdef _WINDOWS_NOT_YET_PORTED /* Windows gethostbyname is deprecated */
258 const struct addrinfo hints;
259 struct addrinfo** res;
260 int getaddrinfo(serverHostName, servTcpPort, &hints, res);
261 res->ai_next : ai_family, ai_socktype, and ai_protocol
262
263 ...
264
265 void freeaddrinfo(*res);
266 # endif
267 if (isalpha(serverHostName[0]) || strchr(serverHostName,':') != 0) { /* look for dns name or ipv6 */
268 char *tmphstbuf=0;
269 memset((char *)&hostbuf, 0, sizeof(struct hostent));
270 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server DNS lookup of hostname '%s'", serverHostName);
271 hostP = gethostbyname_re(serverHostName, &hostbuf, &tmphstbuf, &hstbuflen, errP);
272 if (hostP == 0) {
273 if (*errP != 0) {
274 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
275 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
276 "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s: %s",
277 __FILE__, __LINE__, serverHostName, servTcpPort, errP);
278 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
279 *errP = 0;
280 }
281 else {
282 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
283 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
284 "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s, errno=%d",
285 __FILE__, __LINE__, serverHostName, servTcpPort, errno);
286 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
287 }
288 return false;
289 }
290 xmlBlasterAddr.sin_addr.s_addr = ((struct in_addr *)(hostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */
291 free(tmphstbuf);
292 }
293 else {
294 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server IP4 usage (without DNS lookup) of IP '%s'", serverHostName);
295 /* use ip4 addr directly to avoid dns lookup */
296 xmlBlasterAddr.sin_addr.s_addr = inet_addr(serverHostName);
297 }
298
299 portP = getservbyname(servTcpPort, "tcp");
300 if (portP != 0)
301 xmlBlasterAddr.sin_port = (u_short)portP->s_port;
302 else
303 xmlBlasterAddr.sin_port = htons((u_short)atoi(servTcpPort));
304 #ifdef __IPhoneOS__
305 xb->socketToXmlBlaster = 0;
306 #else
307 xb->socketToXmlBlaster = (int)socket(AF_INET, SOCK_STREAM, 0);
308 #endif
309 if (xb->socketToXmlBlaster != -1) {
310 int ret=0;
311 const char *localHostName = xb->props->getString(xb->props, "plugin/socket/localHostname", 0);
312 int localPort = xb->props->getInt(xb->props, "plugin/socket/localPort", 0);
313 localHostName = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/localHostname", localHostName);
314 localPort = xb->props->getInt(xb->props, "dispatch/connection/plugin/socket/localPort", localPort);
315
316 /* Sometimes a user may whish to force the local host/port setting (e.g. for firewall tunneling
317 and on multi homed hosts */
318 if (localHostName != 0 || localPort > 0) {
319 struct sockaddr_in localAddr;
320 struct hostent localHostbuf, *localHostP = 0;
321 char *tmpLocalHostbuf=0;
322 size_t localHostbuflen=0;
323 memset(&localAddr, 0, sizeof(localAddr));
324 localAddr.sin_family = AF_INET;
325 if (localHostName) {
326 if (isalpha(localHostName[0]) || strchr(localHostName,':') != 0) { /* look for dns name or ipv6 */
327 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local hostname DNS lookup of hostname '%s'", localHostName);
328 *errP = 0;
329 localHostP = gethostbyname_re(localHostName, &localHostbuf, &tmpLocalHostbuf, &localHostbuflen, errP);
330 if (*errP != 0) {
331 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
332 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
333 "[%.100s:%d] Lookup of local IP failed, %s",
334 __FILE__, __LINE__, errP);
335 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
336 *errP = 0;
337 }
338 if (localHostP != 0) {
339 localAddr.sin_addr.s_addr = ((struct in_addr *)(localHostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */
340 free(tmpLocalHostbuf);
341 }
342 }
343 else {
344 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local IP4 usage (without DNS lookup) of IP '%s'", localHostName);
345 /* use ip4 addr directly to avoid dns lookup */
346 localAddr.sin_addr.s_addr = inet_addr(localHostName);
347 }
348 }
349 if (localPort > 0) {
350 localAddr.sin_port = htons((unsigned short)localPort);
351 }
352 if (bind(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
353 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
354 "Failed binding local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
355 localHostName, localPort);
356 }
357 else {
358 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
359 "Bound local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
360 localHostName, localPort);
361 }
362 }
363
364 /* int retval = fcntl(xb->socketToXmlBlaster, F_SETFL, O_NONBLOCK); */ /* Switch on none blocking mode: we then should use select() to be notified when the kernel succeeded with connect() */
365 #ifdef __IPhoneOS__
366 globalIPhoneXb = xb;
367 {
368 CFStringRef hostnameRef =CFStringCreateWithCString (kCFAllocatorDefault, serverHostName, kCFStringEncodingUTF8);
369 CFHostRef hostRef = CFHostCreateWithName (kCFAllocatorDefault, hostnameRef);
370 CFStreamCreatePairWithSocketToCFHost (kCFAllocatorDefault,hostRef, atoi(servTcpPort), &xb->readStream, &xb->writeStream);
371 }
372 if(xb->readStream != nil && xb->writeStream != nil)
373 {
374 ret = 0;
375 if(!CFWriteStreamOpen (xb->writeStream)) /* true if stream was successfully opened */
376 ret = -1;
377 if(!CFReadStreamOpen (xb->readStream))
378 ret = -1;
379 if (ret != -1) {
380 if (!isIPhoneSocketConnectionEstablished(xb))
381 ret = -1;
382 }
383 }
384 else
385 {
386 ret = -1;
387 }
388 #else
389 {
390 /*
391 xmlBlasterProps:
392 dispatch/connection/useSelect=1
393 dispatch/connection/plugin/socket/connectTimeout=3
394 Johannes Ahlert:
395 beides ist so laufzeitabh�ngig, find ich noch besser
396 default f�r useSelect ist 0
397 */
398 int useSelect = 0;
399 useSelect = xb->props->getInt(xb->props, "dispatch/connection/plugin/socket/useSelect", useSelect);
400
401 if ( useSelect )
402 {
403 #if defined(_WINDOWS)
404 /* Die Variante mit select erfordert, dass der Socket nicht-blockierend
405 gemacht wird. Dies kann (und wird in diesem Beispiel) nach dem Verbinden
406 wieder rueckgaengig gemacht, sodass man wie gewohnt mit dem Socket arbeiten kann.
407 */
408 fd_set fds;
409 int connectTimeout = 3;
410 unsigned long opt = 1;
411 struct timeval timeout;
412 int conret, wsaret, rets;
413
414 ioctlsocket( xb->socketToXmlBlaster, FIONBIO, &opt );
415
416 /*
417 Den Verbindungsaufbau anstossen
418 returns 0 or SOCKET_ERROR only
419 */
420 if ( (conret = connect(xb->socketToXmlBlaster, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) == SOCKET_ERROR )
421 {
422 /*
423 Das schlaegt normalerweise fehl, wobei der Fehler WSAEWOULDBLOCK
424 darauf hinweist, dass der Verbindungsaufbau durchaus noch Erfolgreich
425 sein kann, und der Aufruf nur fehlgeschlagen ist, weil er andernfalls
426 blockieren wuerde, was ja absichtlich deaktiviert wurde.
427 */
428 if ( (wsaret = WSAGetLastError()) != WSAEWOULDBLOCK ) {
429 return false; /* kein logging und socket wird nicht wieder blockierend? TODO ?? */
430 }
431
432 /* Deskriptor-Set zuruecksetzen und mit dem zu verbindenden Socket belegen */
433 FD_ZERO( &fds );
434 FD_SET( xb->socketToXmlBlaster, &fds );
435
436 /* Den gewaehlte timeout-Wert einsetzen */
437 connectTimeout = xb->props->getInt(xb->props, "dispatch/connection/plugin/socket/connectTimeout", connectTimeout);
438 timeout.tv_sec = connectTimeout;
439 timeout.tv_usec = 0;
440
441 /* Nun select aufrufen; dieses kehrt entweder nach Ablauf des Timeouts
442 zurueck, oder wenn der Socket zum Schreiben bereit ist, was genau dann
443 passiert, wenn er erfolgreich verbunden wurde.
444
445 The select function returns the total number of socket handles that are ready and contained in the fd_set structures,
446 zero if the time limit expired, or SOCKET_ERROR if an error occurred.
447 If the return value is SOCKET_ERROR, WSAGetLastError can be used to retrieve a specific error code.
448 */
449 rets = select( xb->socketToXmlBlaster + 1, 0, &fds, 0, &timeout );
450 if ( rets == SOCKET_ERROR )
451 ret = -1;
452
453 if (rets == 0)
454 ret = -1; /* timeout */
455
456 /* Falls select zurueckgekehrt ist, aber der zu verbindende Socket nicht
457 im Deskriptor-Set vorhanden ist, war das Verbinden in der gegebenen
458 Zeit nicht erfolgreich.
459 */
460 if ( FD_ISSET(xb->socketToXmlBlaster, &fds) == 0 )
461 ret = -1; /* timeout anderer test */
462 }
463
464 /*
465 Der Socket kann nun wieder blockierend gemacht werden
466 */
467 opt = 0;
468 ioctlsocket( xb->socketToXmlBlaster, FIONBIO, &opt );
469 #else
470 printf("dispatch/connection/plugin/socket/useSelect is not supported on Linux, exit! (For Linux porting you could use fcntl)");
471 exit(1);
472 #endif
473 }
474 else {
475 ret=connect(xb->socketToXmlBlaster, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr));
476 }
477 }
478 #endif
479 if (ret == -1) {
480 char errnoStr[MAX_ERRNO_LEN];
481 xb_strerror(errnoStr, MAX_ERRNO_LEN, errno);
482 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
483 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
484 "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed, ret=%d, %s",
485 __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr);
486 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
487 return false;
488 }
489
490 if (xb->logLevel>=XMLBLASTER_LOG_INFO) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Connected to xmlBlaster");
491 xb->useUdpForOneway = xb->props->getBool(xb->props, "plugin/socket/useUdpForOneway", xb->useUdpForOneway);
492 xb->useUdpForOneway = xb->props->getBool(xb->props, "dispatch/connection/plugin/socket/useUdpForOneway", xb->useUdpForOneway);
493
494 if (xb->useUdpForOneway) {
495 struct sockaddr_in localAddr;
496 socklen_t size = (socklen_t)sizeof(localAddr);
497 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
498 "Using UDP connection for oneway calls, see -dispatch/connection/plugin/socket/useUdpForOneway true");
499
500 xb->socketToXmlBlasterUdp = (int)socket(AF_INET, SOCK_DGRAM, 0);
501
502 if (xb->socketToXmlBlasterUdp != -1) {
503 if (getsockname(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, &size) == -1) {
504 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
505 "Can't determine the local socket host and port (in UDP), errno=%d", errno);
506 return false;
507 }
508
509 if (bind(xb->socketToXmlBlasterUdp, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
510 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
511 "Failed binding local port (in UDP) -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
512 localHostName, localPort);
513 return false;
514 }
515 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
516 "Bound local UDP port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
517 localHostName, localPort);
518
519 if ((ret=connect(xb->socketToXmlBlasterUdp, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) == -1) {
520 char errnoStr[MAX_ERRNO_LEN];
521 xb_strerror(errnoStr, MAX_ERRNO_LEN, errno);
522 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
523 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
524 "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP), ret=%d, %s",
525 __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr);
526 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
527 return false;
528 }
529 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connected to xmlBlaster with UDP");
530 } /* if (xb->socketToXmlBlasterUdp != -1) */
531 else {
532 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
533 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
534 "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP) errno=%d",
535 __FILE__, __LINE__, serverHostName, servTcpPort, errno);
536 return false;
537 }
538 } /* if (xb->useUdpForOneway) */
539
540 }
541 else {
542 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
543 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
544 "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed errno=%d",
545 __FILE__, __LINE__, serverHostName, servTcpPort, errno);
546 return false;
547 }
548 xb->isInitialized = true;
549 return true;
550 }
551
552
553 /**
554 * Set the queue properties.
555 * Example:
556 * <pre>
557 QueueProperties queueProperties;
558 strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
559 strncpy0(queueProperties.nodeId, "clientJoe1081594557415", QUEUE_ID_MAX);
560 strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
561 strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
562 queueProperties.maxNumOfEntries = 10000000L;
563 queueProperties.maxNumOfBytes = 1000000000LL;
564 * <pre>
565 * @param queueProperties The queue configuration,
566 * if 0 or parts of it are empty it will be initialized by environment settings
567 * @return true on success
568 * @throws exception if already initialized or if initialization fails
569 */
570 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception)
571 {
572 #ifdef XMLBLASTER_PERSISTENT_QUEUE_TEST
573 if (checkArgs(xb, "initQueue", false, exception) == false ) return false;
574 if (xb->queueP) {
575 char message[XMLBLASTEREXCEPTION_MESSAGE_LEN];
576 SNPRINTF(message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
577 "[%.100s:%d] The queue is initialized already, call to initQueue() is ignored", __FILE__, __LINE__);
578 embedException(exception, "user.illegalArgument", message, exception);
579 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
580 return false;
581 }
582
583 {
584 QueueProperties tmp;
585 memset(&tmp, 0, sizeof(QueueProperties));
586
587 if (queueProperties == 0)
588 queueProperties = &tmp;
589
590 if (*queueProperties->dbName == 0) {
591 strncpy0(queueProperties->dbName, xb->props->getString(xb->props, "queue/connection/dbName", "xmlBlasterClient.db"), QUEUE_DBNAME_MAX);
592 }
593 if (*queueProperties->nodeId == 0) {
594 strncpy0(queueProperties->nodeId, xb->props->getString(xb->props, "queue/connection/nodeId", "client"), QUEUE_ID_MAX);
595 }
596 if (*queueProperties->queueName == 0) {
597 strncpy0(queueProperties->queueName, xb->props->getString(xb->props, "queue/connection/queueName", "connection_client"), QUEUE_ID_MAX);
598 }
599 if (*queueProperties->tablePrefix == 0) {
600 strncpy0(queueProperties->tablePrefix, xb->props->getString(xb->props, "queue/connection/tablePrefix", "XB_"), QUEUE_PREFIX_MAX);
601 }
602 if (queueProperties->maxNumOfEntries == 0) {
603 queueProperties->maxNumOfEntries = xb->props->getInt(xb->props, "queue/connection/maxEntries", 10000000);
604 }
605 if (queueProperties->maxNumOfBytes == 0) {
606 queueProperties->maxNumOfBytes = xb->props->getInt64(xb->props, "queue/connection/maxBytes", 10000000LL);
607 }
608 if (queueProperties->logFp == 0) queueProperties->logFp = xb->log;
609 if (queueProperties->logLevel == 0) queueProperties->logLevel = xb->logLevel;
610 if (queueProperties->userObject == 0) queueProperties->userObject = xb->userObject;
611
612 xb->queueP = createQueue(queueProperties, exception);
613 if (*exception->errorCode != 0) {
614 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Queue initializeation failed: [%s] %s\n", exception->errorCode, exception->message);
615 return false;
616 }
617 xb->queueP->userObject = xb;
618 }
619 return true;
620 #else
621 if (queueProperties) {} /* To suppress compiler warning that not used */
622 strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
623 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
624 "[%.100s:%d] Queue support is not compiled into the library, please recompile with '-DXMLBLASTER_PERSISTENT_QUEUE=1 and -DXMLBLASTER_PERSISTENT_QUEUE_TEST=1", __FILE__, __LINE__);
625 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
626 return false;
627 #endif /* XMLBLASTER_PERSISTENT_QUEUE_TEST */
628 }
629
630 static bool isConnected(XmlBlasterConnectionUnparsed *xb)
631 {
632 return (xb->socketToXmlBlaster > -1) ? true : false;
633 }
634
635 const char *xmlBlasterConnectionUnparsedUsage()
636 {
637 /* To prevent compiler warning */
638 /* "string length `596' is greater than the length `509' ISO C89 compilers are required to support" */
639 /* we have a static variable */
640 enum { SIZE=2048 };
641 static char usage[SIZE];
642 strncpy0(usage,
643 "\n -dispatch/connection/plugin/socket/hostname [localhost]"
644 "\n Where to find xmlBlaster."
645 "\n -dispatch/connection/plugin/socket/port [7607]"
646 "\n The port where xmlBlaster listens."
647 "\n -dispatch/connection/plugin/socket/localHostname [NULL]", SIZE/2);
648 strncat0(usage,
649 "\n Force the local IP, useful on multi homed computers."
650 "\n -dispatch/connection/plugin/socket/localPort [0]"
651 "\n Force the local port, useful to tunnel firewalls."
652 "\n -dispatch/connection/plugin/socket/compress/type []"
653 #if XMLBLASTER_ZLIB==1
654 "\n Switch on compression with 'zlib:stream'."
655 #else
656 "\n No compression support. Try recompiling with with '-DXMLBLASTER_ZLIB=1'."
657 #endif
658 "\n -dispatch/connection/plugin/socket/useUdpForOneway [false]"
659 "\n Use UDP for publishOneway() calls.", SIZE/2);
660 return usage;
661 }
662
663 /**
664 * Used internally only, does no disconnect, only cleanup of socket
665 */
666 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb)
667 {
668 if (xb != 0 && xb->isConnected(xb)) {
669 # if defined(_WINDOWS)
670 int how = SD_BOTH; /* SD_BOTH requires Winsock2.h */
671 # elif defined(__IPhoneOS__)
672 # else
673 int how = SHUT_RDWR; /* enum SHUT_RDWR = 2 */
674 # endif
675
676 #ifdef __IPhoneOS__
677 {
678 CFReadStreamRef readStream = xb->readStream;
679 if (readStream != nil) {
680 xb->readStream = nil;
681 CFReadStreamClose(readStream);
682 CFRelease(readStream);
683 }
684 }
685 {
686 CFWriteStreamRef writeStream = xb->writeStream;
687 if (writeStream != nil) {
688 xb->writeStream = nil;
689 CFWriteStreamClose(writeStream);
690 CFRelease(writeStream);
691 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
692 "shutdown() CFStreams were cosed=%d", writeStream);
693 }
694 }
695 #else
696 if (xb->socketToXmlBlaster != -1 && xb->socketToXmlBlaster != 0) {
697 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
698 "shutdown() socketToXmlBlaster=%d socketToXmlBlasterUdp=%d", xb->socketToXmlBlaster, xb->socketToXmlBlasterUdp);
699 shutdown(xb->socketToXmlBlaster, how);
700 closeSocket(xb->socketToXmlBlaster);
701 xb->socketToXmlBlaster = -1;
702 }
703 if (xb->socketToXmlBlasterUdp != -1) {
704 shutdown(xb->socketToXmlBlasterUdp, how);
705 closeSocket(xb->socketToXmlBlasterUdp);
706 xb->socketToXmlBlasterUdp = -1;
707 }
708 #endif
709 }
710
711
712 }
713
714 /**
715 * Send a message over the socket to xmlBlaster.
716 * @param xb The this pointer
717 * @param methodName The name of the remote method to invoke e.g. "connect"
718 * @param msgType The type of message: INVOKE, RESPONSE, EXCEPTION
719 * @param data The message payload to send, we take a clone so you can do with it what you want
720 * @param dataLen The length of data in bytes
721 * @param responseSocketDataHolder The returned data, you need to free it with free(response->data) if we returned true.
722 * Supply NULL for oneway messages.
723 * @param exception The exception struct, exception->errorCode is filled on exception.
724 * You need to supply it.
725 * @param udp Whether to use UDP or TCP. Supply true for UDP.
726 * @return true if OK and response is filled (if not oneway or exception or response itself)<br />
727 false on error and exception is filled
728 */
729 static bool sendData(XmlBlasterConnectionUnparsed *xb,
730 const char * const methodName,
731 enum XMLBLASTER_MSG_TYPE_ENUM msgType,
732 const char *data_,
733 size_t dataLen_,
734 SocketDataHolder *responseSocketDataHolder,
735 XmlBlasterException *exception,
736 bool udp)
737 {
738 ssize_t numSent;
739 size_t rawMsgLen = 0;
740 char *rawMsg = (char *)0;
741 char *rawMsgStr;
742 MsgRequestInfo *requestInfoP;
743 MsgRequestInfo requestInfo;
744 memset(&requestInfo, 0, sizeof(MsgRequestInfo));
745 requestInfo.responseMutexIsValid = false; /* Remember when the client thread has created the mutex */
746 if (data_ == 0) {
747 data_ = "";
748 dataLen_ = 0;
749 }
750
751 if (exception == 0) {
752 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception to sendData()", __FILE__, __LINE__);
753 return false;
754 }
755 initializeXmlBlasterException(exception);
756
757 if (responseSocketDataHolder)
758 memset(responseSocketDataHolder, 0, sizeof(SocketDataHolder));
759
760 if (!xb->isConnected(xb)) {
761 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
762 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] No connection to xmlBlaster", __FILE__, __LINE__);
763 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
764 return false;
765 }
766
767 if (strcmp(XMLBLASTER_CONNECT, methodName) && strlen(xb->secretSessionId) < 1) {
768 strncpy0(exception->errorCode, "user.notConnected", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
769 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please call connect() before invoking '%s'", __FILE__, __LINE__, methodName);
770 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
771 return false;
772 }
773
774 getTimestampStr(requestInfo.requestIdStr, MAX_REQUESTID_LEN);
775
776 requestInfo.methodName = methodName;
777 if (xb->preSendEvent != 0) {
778 /* A callback function pointer is registered to be notified just before sending */
779 XmlBlasterBlob blob;
780 blobcpyAlloc(&blob, data_, dataLen_); /* Take a clone, the preSendEvent() function may manipulate it */
781 requestInfo.blob.dataLen = blob.dataLen;
782 requestInfo.blob.data = blob.data;
783 requestInfo.xa = xb->preSendEvent_userP;
784 requestInfoP = xb->preSendEvent(&requestInfo, exception);
785 if (*exception->message != 0) {
786 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
787 "Re-throw exception from preSendEvent errorCode=%s message=%s", exception->errorCode, exception->message);
788 return false;
789 }
790 if (requestInfoP == 0) {
791 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
792 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR: returning requestInfo 0 without exception is not supported, please correct your preSendEvent() function.", __FILE__, __LINE__);
793 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
794 return false;
795 }
796 if (blob.data != requestInfoP->blob.data) {
797 /* The callback function has changed/manipulated the user data */
798 freeBlobHolderContent(&blob);
799 }
800 rawMsg = encodeSocketMessage(msgType, requestInfo.requestIdStr, requestInfo.methodName, xb->secretSessionId,
801 requestInfoP->blob.data, requestInfoP->blob.dataLen, xb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen);
802 freeBlobHolderContent(&requestInfoP->blob);
803 }
804 else {
805 rawMsg = encodeSocketMessage(msgType, requestInfo.requestIdStr, requestInfo.methodName, xb->secretSessionId,
806 data_, dataLen_, xb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen);
807 }
808
809 /* AWARE:
810 * From now on requestInfoP is used by callback thread
811 * (was added by successful xb->preSendEvent() above)
812 * If we leave sendData() this is destroyed (requestInfoP is on stack!)
813 */
814
815 /* send the header ... */
816 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Lowlevel writing data to socket ...");
817 numSent = xb->writeToSocket.writeToSocketFuncP(xb->writeToSocket.userP, udp ? xb->socketToXmlBlasterUdp : xb->socketToXmlBlaster, rawMsg, (int)rawMsgLen);
818 if (numSent == -1) {
819 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
820 "Lost connection to xmlBlaster server");
821 xmlBlasterConnectionShutdown(xb);
822 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
823 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Lost connection to xmlBlaster server", __FILE__, __LINE__);
824 free(rawMsg);
825 if (xb->postSendEvent != 0) {
826 requestInfo.rollback = true;
827 requestInfoP = xb->postSendEvent(&requestInfo, exception);
828 }
829 return false;
830 }
831
832 if (numSent != (int)rawMsgLen) {
833 if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
834 "Sent only %d bytes from %u", numSent, rawMsgLen);
835 strncpy0(exception->errorCode, "user.connect", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
836 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR Sent only %ld bytes from %lu", __FILE__, __LINE__, (long)numSent, (unsigned long)rawMsgLen);
837 free(rawMsg);
838 if (xb->postSendEvent != 0) {
839 requestInfo.rollback = true;
840 requestInfoP = xb->postSendEvent(&requestInfo, exception);
841 }
842 return false;
843 }
844 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Lowlevel writing data to socket done.");
845
846 free(rawMsg);
847 rawMsg = 0;
848
849 if (xbl_isOneway(msgType, methodName))
850 return true; /* Responses and exceptions are oneway */
851
852 if (responseSocketDataHolder) { /* if not oneway read the response message */
853
854 if (xb->postSendEvent != 0) {
855 /* A callback function pointer is registered to be notified just after sending */
856 requestInfo.responseType = 0;
857 requestInfo.blob.dataLen = 0;
858 requestInfo.blob.data = 0;
859
860 /* !!! Here the thread blocks until a response from CallbackServer arrives !!! */
861 requestInfoP = xb->postSendEvent(&requestInfo, exception);
862 if (*exception->message != 0) {
863 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
864 "Re-throw exception from preSendEvent errorCode=%s message=%s", exception->errorCode, exception->message);
865 return false;
866 }
867 if (requestInfoP == 0) {
868 printf("[XmlBlasterConnectionUnparsed] TODO: returning requestInfo 0 is not implemented");
869 }
870 /* TODO: Possible race condition */
871 responseSocketDataHolder->type = requestInfoP->responseType;
872 responseSocketDataHolder->version = XMLBLASTER_SOCKET_VERSION;
873 strncpy0(responseSocketDataHolder->requestId, requestInfo.requestIdStr, MAX_REQUESTID_LEN);
874 strncpy0(responseSocketDataHolder->methodName, methodName, MAX_METHODNAME_LEN);
875
876 if (requestInfoP->responseType == MSG_TYPE_EXCEPTION) { /* convert XmlBlasterException thrown from remote */
877 convertToXmlBlasterException(&requestInfoP->blob, exception, xb->logLevel >= XMLBLASTER_LOG_DUMP);
878 freeBlobHolderContent(&requestInfoP->blob);
879 return false;
880 }
881 else {
882 responseSocketDataHolder->blob.dataLen = requestInfoP->blob.dataLen;
883 responseSocketDataHolder->blob.data = requestInfoP->blob.data; /* The responseSocketDataHolder is now responsible to free(responseSocketDataHolder->blob.data) */
884 }
885 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
886 "requestId '%s' returns dataLen=%d", requestInfo.requestIdStr, requestInfoP->blob.dataLen);
887 }
888 else {
889 /* Wait on the response ourself */
890 if (getResponse(xb, responseSocketDataHolder, exception, udp) == false) { /* false on EOF */
891 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Lost connection to xmlBlaster server");
892 xmlBlasterConnectionShutdown(xb);
893 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
894 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Lost connection to xmlBlaster server", __FILE__, __LINE__);
895 return false;
896 }
897 if (responseSocketDataHolder->type == MSG_TYPE_EXCEPTION) { /* convert XmlBlasterException */
898 convertToXmlBlasterException(&responseSocketDataHolder->blob, exception, xb->logLevel >= XMLBLASTER_LOG_DUMP);
899 freeBlobHolderContent(&responseSocketDataHolder->blob);
900 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
901 "Re-throw exception from response errorCode=%s message=%s", exception->errorCode, exception->message);
902 return false;
903 }
904 }
905
906 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) {
907 rawMsgStr = blobDump(&responseSocketDataHolder->blob);
908 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Received response msgLen=%u type=%c version=%c requestId=%s methodName=%s dateLen=%u data='%.100s ...'",
909 responseSocketDataHolder->msgLen, responseSocketDataHolder->type, responseSocketDataHolder->version, responseSocketDataHolder->requestId,
910 responseSocketDataHolder->methodName, responseSocketDataHolder->blob.dataLen, rawMsgStr);
911 freeBlobDump(rawMsgStr);
912 }
913 }
914
915 return true;
916 }
917
918 /**
919 * Parse the returned message from xmlBlaster.
920 * This method blocks until data arrives.
921 * <br />
922 * The responseSocketDataHolder holds all informations about the returned data from xmlBlaster,
923 * on error the exception struct is filled.
924 *
925 * @param responseSocketDataHolder You need to free(responseSocketDataHolder->data) if return is 'true'.
926 * @param exception Contains the exception thrown (on error only *exception->errorCode!=0)
927 * @return true if OK or on exception, false on EOF
928 */
929 static bool getResponse(XmlBlasterConnectionUnparsed *xb, SocketDataHolder *responseSocketDataHolder, XmlBlasterException *exception, bool udp)
930 {
931 bool stopListenLoop = false;
932 return parseSocketData(xb->socketToXmlBlaster, &xb->readFromSocket, responseSocketDataHolder, exception, &stopListenLoop, udp, xb->logLevel >= XMLBLASTER_LOG_DUMP);
933 }
934
935 /**
936 * Connect to the server.
937 * @param qos The QoS to connect
938 * @param The exception struct, exception->errorCode is filled on exception
939 * @return The raw ConnectReturnQos XML string returned from xmlBlaster,
940 * only NULL if an exception is thrown.
941 * You need to free() it
942 * @return The ConnectReturnQos raw xml string, you need to free() it
943 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.connect.html
944 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
945 */
946 static char *xmlBlasterConnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception)
947 {
948 SocketDataHolder responseSocketDataHolder;
949 char *response;
950 char *qos2;
951 char timestampStr[256];
952
953 if (qos == 0) {
954 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
955 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterConnect()", __FILE__, __LINE__);
956 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
957 return (char *)0;
958 }
959
960 if (initConnection(xb, exception) == false) {
961 return (char *)0;
962 }
963
964 /** Append current client UTC timestamp */
965 qos2 = strcpyAlloc(qos);
966 trimEnd(qos2);
967 if (endsWith(qos2, "</qos>")) {
968 getCurrentLocalIsoTimestampStr(timestampStr, 200);
969 qos2[strlen(qos2) - 6] = 0;
970 strcatAlloc(&qos2, "<clientProperty name='__UTC'>");
971 strcatAlloc(&qos2, timestampStr);
972 strcatAlloc(&qos2, "</clientProperty></qos>");
973 }
974
975 if (sendData(xb, XMLBLASTER_CONNECT, MSG_TYPE_INVOKE, (const char *)qos2,
976 (qos2 == (const char *)0) ? 0 : strlen(qos2),
977 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
978 free(qos2);
979 return (char *)0;
980 }
981 free(qos2);
982
983 response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
984 freeBlobHolderContent(&responseSocketDataHolder.blob);
985
986 /* Extract secret session ID from ConnectReturnQos */
987 *xb->secretSessionId = 0;
988 {
989 const char *pEnd = (const char *)0;
990 const char *pStart = strstr(response, "sessionId='");
991 if (pStart) {
992 pStart += strlen("sessionId='");
993 pEnd = strstr(pStart, "'");
994 if (pEnd) {
995 int len = (int)(pEnd - pStart + 1);
996 if (len >= MAX_SECRETSESSIONID_LEN) {
997 strncpy0(exception->errorCode, "user.response", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
998 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR Received too long secret sessionId with len=%d, please change setting MAX_SECRETSESSIONID_LEN", __FILE__, __LINE__, len);
999 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1000 }
1001 strncpy0(xb->secretSessionId, pStart, len);
1002 }
1003 }
1004 }
1005
1006 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1007 "Got response for connect(secretSessionId=%s)", xb->secretSessionId);
1008
1009 return response;
1010 }
1011
1012 /**
1013 * Disconnect from server.
1014 * @param qos The QoS to disconnect
1015 * @param The exception struct, exception->errorCode is filled on exception
1016 * @return false on exception
1017 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.disconnect.html
1018 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1019 */
1020 static bool xmlBlasterDisconnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception)
1021 {
1022 SocketDataHolder responseSocketDataHolder;
1023
1024 if (checkArgs(xb, XMLBLASTER_DISCONNECT, true, exception) == false ) return 0;
1025
1026 if (sendData(xb, XMLBLASTER_DISCONNECT, MSG_TYPE_INVOKE, (const char *)qos,
1027 (qos == (const char *)0) ? 0 : strlen(qos),
1028 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1029 return false;
1030 }
1031
1032 freeBlobHolderContent(&responseSocketDataHolder.blob);
1033
1034 xmlBlasterConnectionShutdown(xb);
1035 *xb->secretSessionId = 0;
1036 return true;
1037 }
1038
1039
1040 #ifdef XMLBLASTER_PERSISTENT_QUEUE_TEST
1041 /**
1042 * Extracts the priority from the given QoS.
1043 * @return NORM=5 on error
1044 */
1045 static int parsePriority(const char *qos) {
1046 char *pPrio, *pPrioEnd;
1047 /*const int PRIORITY_MAXLEN = 10;*/
1048 #define PRIORITY_MAXLEN 10 /* To be backward compatible to C90 */
1049 char prioStr[PRIORITY_MAXLEN];
1050 int len = 1;
1051 int prio = 5;
1052 const int lenPrio=strlen("<priority>");
1053
1054 if (qos == 0) return prio;
1055
1056 pPrio = strstr(qos, "<priority>");
1057 if (pPrio == 0) return prio;
1058
1059 pPrioEnd = strstr(qos, "</priority>");
1060 if (pPrioEnd == 0) return prio;
1061
1062 len = pPrioEnd-pPrio-lenPrio;
1063 if (len >= PRIORITY_MAXLEN) {
1064 return prio;
1065 }
1066 strncpy(prioStr, pPrio+lenPrio, len);
1067 *(prioStr+len) = 0;
1068 sscanf(prioStr, "%d", &prio); /* on error prio remains 5, white spaces are stripped by sscanf */
1069 return prio;
1070 }
1071
1072 /**
1073 * Puts an entry into the client side queue.
1074 * @param exception Can be prefilled with an original exception which will be embedded
1075 * @return 0 on failure, else an allocated "<qos><state id='OK' info='QUEUED'/></qos>" which the caller needs to free()
1076 */
1077 static char *xmlBlasterQueuePut(XmlBlasterConnectionUnparsed *xb, int priority, BlobHolder *blob, XmlBlasterException *exception)
1078 {
1079 QueueEntry queueEntry;
1080 XmlBlasterException queueException;
1081
1082 QueueProperties *queuePropertiesP = 0; /* 0: read configuration from environment */
1083 /*
1084 QueueProperties queueProperties;
1085 memset(&queueProperties, 0, sizeof(QueueProperties));
1086 queuePropertiesP = &queueProperties;
1087 strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
1088 strncpy0(queueProperties.nodeId, "clientJoe1081594557415", QUEUE_ID_MAX);
1089 strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
1090 strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
1091 queueProperties.maxNumOfEntries = 10000000L;
1092 queueProperties.maxNumOfBytes = 1000000000LL;
1093 queueProperties.logFp = xb->log;
1094 queueProperties.logLevel = xb->logLevel;
1095 queueProperties.userObject = xb->userObject;
1096 queueP = createQueue(&queueProperties, &queueException);
1097 */
1098
1099 if (xb->queueP == 0) {
1100 if (xb->initQueue(xb, queuePropertiesP, exception) == false)
1101 return 0;
1102 }
1103
1104 queueEntry.priority = priority;
1105 queueEntry.isPersistent = true;
1106 queueEntry.uniqueId = getTimestamp();
1107 strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
1108 queueEntry.embeddedBlob.data = blob->data;
1109 queueEntry.embeddedBlob.dataLen = blob->dataLen;
1110
1111 xb->queueP->put(xb->queueP, &queueEntry, &queueException);
1112 if (*queueException.errorCode != 0) {
1113 embedException(exception, queueException.errorCode, queueException.message, exception);
1114 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Put to queue failed: [%s] %s\n", exception->errorCode, exception->message);
1115 return 0;
1116 }
1117 *exception->errorCode = 0; /* Successfully queued: no error */
1118 return strcpyAlloc("<qos><state id='OK' info='QUEUED'/></qos>");
1119 }
1120 #endif /*XMLBLASTER_PERSISTENT_QUEUE_TEST==1*/
1121
1122 /**
1123 * Publish a message to the server.
1124 * @return The raw XML string returned from xmlBlaster, only NULL if an exception is thrown
1125 * You need to free() it
1126 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
1127 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1128 */
1129 static char *xmlBlasterPublish(XmlBlasterConnectionUnparsed *xb, MsgUnit *msgUnit, XmlBlasterException *exception)
1130 {
1131 SocketDataHolder responseSocketDataHolder;
1132 char *response = 0;
1133
1134 BlobHolder blob = encodeMsgUnit(msgUnit, xb->logLevel >= XMLBLASTER_LOG_DUMP);
1135 msgUnit->responseQos = 0; /* In case no initial memset(&msgUnit, 0, sizeof(MsgUnit)); was made */
1136
1137 if (checkArgs(xb, "publish", true, exception) == false ) return 0;
1138
1139 msgUnit->responseQos = 0; /* Initialize properly */
1140
1141 if (sendData(xb, XMLBLASTER_PUBLISH, MSG_TYPE_INVOKE, blob.data, blob.dataLen,
1142 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1143
1144 # ifdef XMLBLASTER_PERSISTENT_QUEUE_TEST /* TEST CODE */
1145 if (strstr(exception->errorCode, "user.notConnected") != 0 ||
1146 strstr(exception->errorCode, "communication.noConnection") != 0) { /* On communication problem queue messages */
1147 int priority = parsePriority(msgUnit->qos);
1148 response = xmlBlasterQueuePut(xb, priority, &blob, exception);
1149 /* NO: msgUnit->responseQos = response; otherwise a free(msgUnit) will free the response as well */
1150 }
1151 # endif
1152
1153 free(blob.data);
1154 return response;
1155 }
1156 free(blob.data);
1157
1158 response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
1159 freeBlobHolderContent(&responseSocketDataHolder.blob);
1160
1161 return response;
1162 }
1163
1164 /**
1165 * Publish a message array in a bulk to the server.
1166 * @return The raw XML string array returned from xmlBlaster, only NULL if an exception is thrown
1167 * You need to free() it
1168 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
1169 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1170 */
1171 static QosArr *xmlBlasterPublishArr(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
1172 {
1173 size_t i;
1174 SocketDataHolder responseSocketDataHolder;
1175 QosArr *response = 0;
1176
1177 BlobHolder blob = encodeMsgUnitArr(msgUnitArr, xb->logLevel >= XMLBLASTER_LOG_DUMP);
1178
1179 if (checkArgs(xb, "publishArr", true, exception) == false ) return 0;
1180
1181 for (i=0; i<msgUnitArr->len; i++)
1182 msgUnitArr->msgUnitArr[i].responseQos = 0; /* Initialize properly */
1183
1184 if (sendData(xb, XMLBLASTER_PUBLISH, MSG_TYPE_INVOKE, blob.data, blob.dataLen,
1185 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1186 free(blob.data);
1187 return 0;
1188 }
1189 free(blob.data);
1190
1191 response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
1192 freeBlobHolderContent(&responseSocketDataHolder.blob);
1193
1194 return response;
1195 }
1196
1197 /**
1198 * Publish oneway a message array in a bulk to the server without receiving an ACK.
1199 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
1200 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1201 */
1202 static void xmlBlasterPublishOneway(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
1203 {
1204 size_t i;
1205 SocketDataHolder responseSocketDataHolder;
1206
1207 BlobHolder blob = encodeMsgUnitArr(msgUnitArr, xb->logLevel >= XMLBLASTER_LOG_DUMP);
1208
1209 if (checkArgs(xb, "publishOneway", true, exception) == false ) return;
1210
1211 for (i=0; i<msgUnitArr->len; i++) {
1212 msgUnitArr->msgUnitArr[i].responseQos = 0; /* Initialize properly */
1213 }
1214
1215 /*
1216 if (!xb->useUdpForOneway) {
1217 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1218 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] UDP not enabled, use -dispatch/connection/plugin/socket/enableUDP true", __FILE__, __LINE__);
1219 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1220 free(blob.data);
1221 return;
1222 }
1223 */
1224
1225 if (sendData(xb, XMLBLASTER_PUBLISH_ONEWAY, MSG_TYPE_INVOKE, blob.data, blob.dataLen,
1226 &responseSocketDataHolder, exception, xb->useUdpForOneway) == false) {
1227 free(blob.data);
1228 return;
1229 }
1230 free(blob.data);
1231 freeBlobHolderContent(&responseSocketDataHolder.blob); /* Could be ommitted for oneway */
1232 }
1233
1234 /**
1235 * Subscribe a message.
1236 * @return The raw XML string returned from xmlBlaster, only NULL if an exception is thrown
1237 * You need to free() it
1238 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.subscribe.html
1239 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1240 */
1241 static char *xmlBlasterSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
1242 {
1243 size_t qosLen, keyLen, totalLen;
1244 char *data;
1245 size_t currpos = 0;
1246 SocketDataHolder responseSocketDataHolder;
1247 char *response;
1248
1249 if (checkArgs(xb, "subscribe", true, exception) == false ) return 0;
1250
1251 if (key == 0) {
1252 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1253 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterSubscribe()", __FILE__, __LINE__);
1254 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1255 return (char *)0;
1256 }
1257
1258 if (qos == (const char *)0) {
1259 qos = "";
1260 }
1261 qosLen = strlen(qos);
1262 keyLen = strlen(key);
1263
1264 totalLen = qosLen + 1 + keyLen + 1;
1265
1266 data = (char *)malloc(totalLen);
1267
1268 memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
1269 currpos += qosLen+1;
1270
1271 memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
1272 currpos += keyLen+1;
1273
1274 if (sendData(xb, XMLBLASTER_SUBSCRIBE, MSG_TYPE_INVOKE, data, totalLen,
1275 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1276 free(data);
1277 return (char *)0;
1278 }
1279 free(data);
1280
1281 response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
1282 freeBlobHolderContent(&responseSocketDataHolder.blob);
1283
1284 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1285 "Got response for subscribe(): %s", response);
1286
1287 return response;
1288 }
1289
1290 /**
1291 * UnSubscribe a message from the server.
1292 * @return The raw QoS XML strings returned from xmlBlaster, only NULL if an exception is thrown
1293 * You need to free it with freeQosArr() after usage
1294 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.unSubscribe.html
1295 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1296 */
1297 static QosArr *xmlBlasterUnSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
1298 {
1299 size_t qosLen, keyLen, totalLen;
1300 char *data;
1301 size_t currpos = 0;
1302 SocketDataHolder responseSocketDataHolder;
1303 QosArr *response;
1304
1305 if (checkArgs(xb, "unSubscribe", true, exception) == false ) return 0;
1306
1307 if (key == 0) {
1308 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1309 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterUnSubscribe()", __FILE__, __LINE__);
1310 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1311 return (QosArr *)0;
1312 }
1313
1314 if (qos == (const char *)0) {
1315 qos = "";
1316 }
1317 qosLen = strlen(qos);
1318 keyLen = strlen(key);
1319
1320 totalLen = qosLen + 1 + keyLen + 1;
1321
1322 data = (char *)malloc(totalLen);
1323
1324 memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
1325 currpos += qosLen+1;
1326
1327 memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
1328 currpos += keyLen+1;
1329
1330 if (sendData(xb, XMLBLASTER_UNSUBSCRIBE, MSG_TYPE_INVOKE, data, totalLen,
1331 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1332 free(data);
1333 return (QosArr *)0;
1334 }
1335 free(data);
1336
1337 response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
1338 freeBlobHolderContent(&responseSocketDataHolder.blob);
1339
1340 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) {
1341 size_t ii;
1342 for (ii=0; ii<response->len; ii++) {
1343 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1344 "Got response for unSubscribe(): %s", response->qosArr[ii]);
1345 }
1346 }
1347
1348 return response;
1349 }
1350
1351 /**
1352 * Erase a message from the server.
1353 * @return A struct holding the raw QoS XML strings returned from xmlBlaster,
1354 * only NULL if an exception is thrown.
1355 * You need to freeQosArr() it
1356 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.erase.html
1357 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1358 */
1359 static QosArr *xmlBlasterErase(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
1360 {
1361 size_t qosLen, keyLen, totalLen;
1362 char *data;
1363 size_t currpos = 0;
1364 SocketDataHolder responseSocketDataHolder;
1365 QosArr *response;
1366
1367 if (checkArgs(xb, "erase", true, exception) == false ) return 0;
1368
1369 if (key == 0) {
1370 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1371 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterErase()", __FILE__, __LINE__);
1372 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1373 return (QosArr *)0;
1374 }
1375
1376 if (qos == (const char *)0) {
1377 qos = "";
1378 }
1379 qosLen = strlen(qos);
1380 keyLen = strlen(key);
1381
1382 totalLen = qosLen + 1 + keyLen + 1;
1383
1384 data = (char *)malloc(totalLen);
1385
1386 memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
1387 currpos += qosLen+1;
1388
1389 memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
1390 currpos += keyLen+1;
1391
1392 if (sendData(xb, XMLBLASTER_ERASE, MSG_TYPE_INVOKE, data, totalLen,
1393 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1394 free(data);
1395 return (QosArr *)0;
1396 }
1397 free(data);
1398
1399 response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
1400 freeBlobHolderContent(&responseSocketDataHolder.blob);
1401
1402 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) {
1403 size_t ii;
1404 for (ii=0; ii<response->len; ii++) {
1405 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1406 "Got response for erase(): %s", response->qosArr[ii]);
1407 }
1408 }
1409
1410 return response;
1411 }
1412
1413 /**
1414 * Ping the server.
1415 * @param qos The QoS or 0
1416 * @param exception *errorCode!=0 on failure
1417 * @return The ping return QoS raw xml string, you need to free() it
1418 * or 0 on failure (in which case *exception.errorCode!='\0')
1419 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1420 */
1421 static char *xmlBlasterPing(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception)
1422 {
1423 SocketDataHolder responseSocketDataHolder;
1424 char *response;
1425
1426 if (checkArgs(xb, "ping", true, exception) == false ) return 0;
1427
1428 if (sendData(xb, XMLBLASTER_PING, MSG_TYPE_INVOKE, (const char *)qos,
1429 (qos == (const char *)0) ? 0 : strlen(qos),
1430 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1431 return (char *)0;
1432 }
1433
1434 response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
1435 freeBlobHolderContent(&responseSocketDataHolder.blob);
1436 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1437 "Got response for ping '%s'", response);
1438 return response;
1439 }
1440
1441 /**
1442 * Get a message.
1443 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.get.html
1444 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1445 * @return NULL on error, please check exception in such a case, you need to
1446 * call freeMsgUnitArr(msgUnitArr); after usage.
1447 */
1448 static MsgUnitArr *xmlBlasterGet(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
1449 {
1450 size_t qosLen, keyLen, totalLen;
1451 char *data;
1452 size_t currpos = 0;
1453 SocketDataHolder responseSocketDataHolder;
1454 MsgUnitArr *msgUnitArr = 0;
1455
1456 if (key == 0) {
1457 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1458 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterGet()", __FILE__, __LINE__);
1459 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1460 return (MsgUnitArr *)0;
1461 }
1462
1463 if (qos == (const char *)0) qos = "";
1464 qosLen = strlen(qos);
1465 keyLen = strlen(key);
1466
1467 totalLen = qosLen + 1 + keyLen + 1;
1468
1469 data = (char *)malloc(totalLen);
1470
1471 memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
1472 currpos += qosLen+1;
1473
1474 memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
1475 currpos += keyLen+1;
1476
1477 if (sendData(xb, XMLBLASTER_GET, MSG_TYPE_INVOKE, data, totalLen,
1478 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1479 free(data);
1480 return (MsgUnitArr *)0; /* exception is filled with details */
1481 }
1482 free(data);
1483
1484 /* Now process the returned messages */
1485
1486 msgUnitArr = parseMsgUnitArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
1487 freeBlobHolderContent(&responseSocketDataHolder.blob);
1488
1489 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1490 "Returned %u messages for get()", msgUnitArr->len);
1491
1492 return msgUnitArr;
1493 }
1494
1495 /**
1496 * Write uncompressed to socket (not thread safe)
1497 */
1498 static ssize_t writenPlain(void *userP, const int fd, const char *ptr, const size_t nbytes) {
1499 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
1500 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "writenPlain(%u)", nbytes);
1501 return writen(fd, ptr, nbytes);
1502 }
1503
1504 /**
1505 * Compress data and send to socket.
1506 */
1507 static ssize_t writenCompressed(void *userP, const int fd, const char *ptr, const size_t nbytes) {
1508 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
1509 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "writenCompressed(%u)", nbytes);
1510 return xmlBlaster_writenCompressed(xb->zlibWriteBuf, fd, ptr, nbytes);
1511 }
1512
1513 /**
1514 * Write uncompressed to socket (not thread safe)
1515 */
1516 static ssize_t readnPlain(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1517 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
1518 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "readnPlain(%u)", nbytes);
1519 return readn(fd, ptr, nbytes, fpNumRead, userP2);
1520 }
1521
1522 /**
1523 * Compress data and send to socket.
1524 */
1525 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1526 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
1527 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "readnCompressed(%u)", nbytes);
1528 return xmlBlaster_readnCompressed(xb->zlibReadBuf, fd, ptr, nbytes, fpNumRead, userP2);
1529 }
1530
1531 /**
1532 * Checks the given arguments to be valid.
1533 * @param methodName For logging
1534 * @param checkIsConnected If true does check the connection state as well
1535 * @return false if the parameters are not usable,
1536 * in this case 'exception' is filled with detail informations
1537 */
1538 static bool checkArgs(XmlBlasterConnectionUnparsed *xb, const char *methodName, bool checkIsConnected, XmlBlasterException *exception)
1539 {
1540 if (xb == 0) {
1541 char *stack = getStackTrace(10);
1542 printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %s() %s",
1543 __FILE__, __LINE__, methodName, stack);
1544 free(stack);
1545 return false;
1546 }
1547
1548 if (exception == 0) {
1549 char *stack = getStackTrace(10);
1550 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s",
1551 __FILE__, __LINE__, methodName, stack);
1552 free(stack);
1553 return false;
1554 }
1555
1556 if (checkIsConnected) {
1557 if (!xb->isConnected(xb)) {
1558 char *stack = getStackTrace(10);
1559 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1560 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
1561 "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s",
1562 __FILE__, __LINE__, methodName, stack);
1563 free(stack);
1564 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
1565 return false;
1566 }
1567 }
1568
1569 return true;
1570 }
syntax highlighted by Code2HTML, v. 0.9.1