00001 /*---------------------------------------------------------------------------- 00002 Name: XmlBlasterConnectionUnparsed.c 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Wraps raw socket connection to xmlBlaster 00006 for complete synchronous xmlBlaster access, 00007 without callbacks and not threading necessary 00008 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info> 00009 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html 00010 -----------------------------------------------------------------------------*/ 00011 #include <stdio.h> 00012 #include <stdlib.h> 00013 #include <string.h> 00014 #include <ctype.h> /* isalpha() */ 00015 #if defined(WINCE) 00016 # if defined(XB_USE_PTHREADS) 00017 # include <pthreads/pthread.h> 00018 # else 00019 /*#include <pthreads/need_errno.h> */ 00020 static int errno=0; /* single threaded workaround*/ 00021 # endif 00022 #else 00023 # include <errno.h> 00024 # include <sys/types.h> 00025 #endif 00026 #include <socket/xmlBlasterSocket.h> 00027 #include <socket/xmlBlasterZlib.h> 00028 #include <XmlBlasterConnectionUnparsed.h> 00029 #define SOCKET_TCP false 00030 00031 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception); 00032 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception); 00033 static bool getResponse(XmlBlasterConnectionUnparsed *xb, SocketDataHolder *responseSocketDataHolder, XmlBlasterException *exception, bool udp); 00034 static char *xmlBlasterConnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception); 00035 static bool xmlBlasterDisconnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception); 00036 static char *xmlBlasterPublish(XmlBlasterConnectionUnparsed *xb, MsgUnit *msgUnit, XmlBlasterException *exception); 00037 static QosArr *xmlBlasterPublishArr(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception); 00038 static void xmlBlasterPublishOneway(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception); 00039 static char *xmlBlasterSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception); 00040 static QosArr *xmlBlasterUnSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception); 00041 static QosArr *xmlBlasterErase(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception); 00042 static MsgUnitArr *xmlBlasterGet(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception); 00043 static char *xmlBlasterPing(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception); 00044 static bool isConnected(XmlBlasterConnectionUnparsed *xb); 00045 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb); 00046 static ssize_t writenPlain(void *xb, const int fd, const char *ptr, const size_t nbytes); 00047 static ssize_t writenCompressed(void *xb, const int fd, const char *ptr, const size_t nbytes); 00048 static ssize_t readnPlain(void *xb, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2); 00049 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2); 00050 static bool checkArgs(XmlBlasterConnectionUnparsed *xb, const char *methodName, bool checkIsConnected, XmlBlasterException *exception); 00051 00058 XmlBlasterConnectionUnparsed *getXmlBlasterConnectionUnparsed(int argc, const char* const* argv) { 00059 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)calloc(1, sizeof(XmlBlasterConnectionUnparsed)); 00060 if (xb == 0) return xb; 00061 xb->argc = argc; 00062 xb->argv = argv; 00063 xb->props = createProperties(xb->argc, xb->argv); 00064 if (xb->props == 0) { 00065 freeXmlBlasterConnectionUnparsed(&xb); 00066 return (XmlBlasterConnectionUnparsed *)0; 00067 } 00068 xb->socketToXmlBlaster = -1; 00069 xb->socketToXmlBlasterUdp = -1; 00070 xb->isInitialized = false; 00071 xb->requestId = 0; 00072 *xb->secretSessionId = 0; 00073 xb->initConnection = initConnection; 00074 xb->initQueue = xmlBlasterInitQueue; 00075 xb->connect = xmlBlasterConnect; 00076 xb->disconnect = xmlBlasterDisconnect; 00077 xb->publish = xmlBlasterPublish; 00078 xb->publishArr = xmlBlasterPublishArr; 00079 xb->publishOneway = xmlBlasterPublishOneway; 00080 xb->subscribe = xmlBlasterSubscribe; 00081 xb->unSubscribe = xmlBlasterUnSubscribe; 00082 xb->erase = xmlBlasterErase; 00083 xb->get = xmlBlasterGet; 00084 xb->ping = xmlBlasterPing; 00085 xb->isConnected = isConnected; 00086 xb->shutdown = xmlBlasterConnectionShutdown; 00087 xb->preSendEvent = 0; 00088 xb->preSendEvent_userP = 0; 00089 xb->postSendEvent = 0; 00090 xb->postSendEvent_userP = 0; 00091 xb->queueP = 0; 00092 xb->logLevel = parseLogLevel(xb->props->getString(xb->props, "logLevel", "WARN")); 00093 xb->log = xmlBlasterDefaultLogging; 00094 xb->logUserP = 0; 00095 xb->useUdpForOneway = false; 00096 xb->writeToSocket.writeToSocketFuncP = 0; 00097 xb->writeToSocket.userP = xb; 00098 xb->zlibWriteBuf = 0; 00099 xb->readFromSocket.readFromSocketFuncP = 0; 00100 xb->readFromSocket.userP = xb; 00101 xb->zlibReadBuf = 0; 00102 return xb; 00103 } 00104 00105 void freeXmlBlasterConnectionUnparsed(XmlBlasterConnectionUnparsed **xb_) 00106 { 00107 XmlBlasterConnectionUnparsed *xb = *xb_; 00108 if (xb != 0) { 00109 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterConnectionUnparsed 0x%x", xb); 00110 freeProperties(xb->props); 00111 if (xb->zlibWriteBuf) { 00112 xmlBlaster_endZlibWriter(xb->zlibWriteBuf); 00113 free(xb->zlibWriteBuf); 00114 xb->zlibWriteBuf = 0; 00115 } 00116 if (xb->zlibReadBuf) { 00117 xmlBlaster_endZlibReader(xb->zlibReadBuf); 00118 free(xb->zlibReadBuf); 00119 xb->zlibReadBuf = 0; 00120 } 00121 xmlBlasterConnectionShutdown(xb); 00122 free(xb); 00123 *xb_ = 0; 00124 } 00125 } 00126 00131 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception) 00132 { 00133 const char *servTcpPort = 0; 00134 00135 struct sockaddr_in xmlBlasterAddr; 00136 struct hostent hostbuf, *hostP = 0; 00137 struct servent *portP = 0; 00138 00139 size_t hstbuflen=0; 00140 00141 char serverHostName[256]; 00142 char errP[MAX_ERRNO_LEN]; 00143 00144 #if defined(_WINDOWS) 00145 WORD wVersionRequested; 00146 WSADATA wsaData; 00147 int err; 00148 wVersionRequested = MAKEWORD( 2, 2 ); 00149 err = WSAStartup( wVersionRequested, &wsaData ); 00150 if ( err != 0 ) { 00151 strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00152 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL", __FILE__, __LINE__); 00153 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00154 return false; 00155 } 00156 00157 if ( LOBYTE( wsaData.wVersion ) != 2 || 00158 HIBYTE( wsaData.wVersion ) != 2 ) { 00159 WSACleanup( ); 00160 strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00161 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL which supports version 2.2", __FILE__, __LINE__); 00162 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00163 return false; 00164 } 00165 # endif 00166 *errP = 0; 00167 00168 if (xb->isInitialized) { 00169 return true; 00170 } 00171 00172 { /* Switch on compression? */ 00173 const char *compressType = xb->props->getString(xb->props, "plugin/socket/compress/type", ""); 00174 compressType = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/compress/type", compressType); 00175 00176 if (!strcmp(compressType, "zlib:stream")) { 00177 00178 xb->zlibWriteBuf = (XmlBlasterZlibWriteBuffers *)malloc(sizeof(struct XmlBlasterZlibWriteBuffers)); 00179 xb->zlibReadBuf = (XmlBlasterZlibReadBuffers *)malloc(sizeof(struct XmlBlasterZlibReadBuffers)); 00180 00181 if (xmlBlaster_initZlibWriter(xb->zlibWriteBuf) != 0/*Z_OK*/) { 00182 if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00183 "Failed switching on 'plugin/socket/compress/type=%s'", compressType); 00184 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00185 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00186 "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'", 00187 __FILE__, __LINE__, compressType); 00188 free(xb->zlibWriteBuf); 00189 xb->zlibWriteBuf = 0; 00190 free(xb->zlibReadBuf); 00191 xb->zlibReadBuf = 0; 00192 return false; 00193 } 00194 00195 if (xmlBlaster_initZlibReader(xb->zlibReadBuf) != 0/*Z_OK*/) { 00196 if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00197 "Failed switching on 'plugin/socket/compress/type=%s'", compressType); 00198 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00199 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00200 "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'", 00201 __FILE__, __LINE__, compressType); 00202 free(xb->zlibWriteBuf); 00203 xb->zlibWriteBuf = 0; 00204 free(xb->zlibReadBuf); 00205 xb->zlibReadBuf = 0; 00206 return false; 00207 } 00208 00209 if (xb->logLevel>=XMLBLASTER_LOG_DUMP) { 00210 xb->zlibWriteBuf->debug = true; 00211 xb->zlibReadBuf->debug = true; 00212 } 00213 00214 if (!xb->writeToSocket.writeToSocketFuncP) { /* Accept setting from XmlBlasterAccessUnparsed */ 00215 xb->writeToSocket.writeToSocketFuncP = writenCompressed; 00216 xb->readFromSocket.readFromSocketFuncP = readnCompressed; 00217 } 00218 } 00219 else { 00220 if (strcmp(compressType, "")) { 00221 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode.", compressType); 00222 } 00223 if (!xb->writeToSocket.writeToSocketFuncP) { /* Accept setting from XmlBlasterAccessUnparsed */ 00224 xb->writeToSocket.writeToSocketFuncP = writenPlain; 00225 xb->readFromSocket.readFromSocketFuncP = readnPlain; 00226 } 00227 } 00228 } 00229 00230 00231 servTcpPort = xb->props->getString(xb->props, "plugin/socket/port", "7607"); 00232 servTcpPort = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/port", servTcpPort); 00233 00234 strncpy0(serverHostName, "localhost", 250); 00235 gethostname(serverHostName, 250); 00236 { 00237 const char *hn = xb->props->getString(xb->props, "plugin/socket/hostname", serverHostName); 00238 memmove(serverHostName, hn, strlen(hn)+1); /* including '\0' */ 00239 hn = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/hostname", serverHostName); 00240 memmove(serverHostName, hn, strlen(hn)+1); 00241 } 00242 00243 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00244 "Lookup xmlBlaster on -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %s ...", 00245 serverHostName, servTcpPort); 00246 00247 *xb->secretSessionId = 0; 00248 memset((char *)&xmlBlasterAddr, 0, sizeof(xmlBlasterAddr)); 00249 xmlBlasterAddr.sin_family=AF_INET; 00250 00251 # if _WINDOWS_NOT_YET_PORTED /* Windows gethostbyname is deprecated */ 00252 const struct addrinfo hints; 00253 struct addrinfo** res; 00254 int getaddrinfo(serverHostName, servTcpPort, &hints, res); 00255 res->ai_next : ai_family, ai_socktype, and ai_protocol 00256 00257 ... 00258 00259 void freeaddrinfo(*res); 00260 # endif 00261 if (isalpha(serverHostName[0]) || strchr(serverHostName,':') != 0) { /* look for dns name or ipv6 */ 00262 char *tmphstbuf=0; 00263 memset((char *)&hostbuf, 0, sizeof(struct hostent)); 00264 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server DNS lookup of hostname '%s'", serverHostName); 00265 hostP = gethostbyname_re(serverHostName, &hostbuf, &tmphstbuf, &hstbuflen, errP); 00266 if (hostP == 0) { 00267 if (*errP != 0) { 00268 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00269 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00270 "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s: %s", 00271 __FILE__, __LINE__, serverHostName, servTcpPort, errP); 00272 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00273 *errP = 0; 00274 } 00275 else { 00276 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00277 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00278 "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s, errno=%d", 00279 __FILE__, __LINE__, serverHostName, servTcpPort, errno); 00280 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00281 } 00282 return false; 00283 } 00284 xmlBlasterAddr.sin_addr.s_addr = ((struct in_addr *)(hostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */ 00285 free(tmphstbuf); 00286 } 00287 else { 00288 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server IP4 usage (without DNS lookup) of IP '%s'", serverHostName); 00289 /* use ip4 addr directly to avoid dns lookup */ 00290 xmlBlasterAddr.sin_addr.s_addr = inet_addr(serverHostName); 00291 } 00292 00293 portP = getservbyname(servTcpPort, "tcp"); 00294 if (portP != 0) 00295 xmlBlasterAddr.sin_port = (u_short)portP->s_port; 00296 else 00297 xmlBlasterAddr.sin_port = htons((u_short)atoi(servTcpPort)); 00298 00299 xb->socketToXmlBlaster = (int)socket(AF_INET, SOCK_STREAM, 0); 00300 if (xb->socketToXmlBlaster != -1) { 00301 int ret=0; 00302 const char *localHostName = xb->props->getString(xb->props, "plugin/socket/localHostname", 0); 00303 int localPort = xb->props->getInt(xb->props, "plugin/socket/localPort", 0); 00304 localHostName = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/localHostname", localHostName); 00305 localPort = xb->props->getInt(xb->props, "dispatch/connection/plugin/socket/localPort", localPort); 00306 00307 /* Sometimes a user may whish to force the local host/port setting (e.g. for firewall tunneling 00308 and on multi homed hosts */ 00309 if (localHostName != 0 || localPort > 0) { 00310 struct sockaddr_in localAddr; 00311 struct hostent localHostbuf, *localHostP = 0; 00312 char *tmpLocalHostbuf=0; 00313 size_t localHostbuflen=0; 00314 memset(&localAddr, 0, sizeof(localAddr)); 00315 localAddr.sin_family = AF_INET; 00316 if (localHostName) { 00317 if (isalpha(localHostName[0]) || strchr(localHostName,':') != 0) { /* look for dns name or ipv6 */ 00318 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local hostname DNS lookup of hostname '%s'", localHostName); 00319 *errP = 0; 00320 localHostP = gethostbyname_re(localHostName, &localHostbuf, &tmpLocalHostbuf, &localHostbuflen, errP); 00321 if (*errP != 0) { 00322 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00323 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00324 "[%.100s:%d] Lookup of local IP failed, %s", 00325 __FILE__, __LINE__, errP); 00326 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00327 *errP = 0; 00328 } 00329 if (localHostP != 0) { 00330 localAddr.sin_addr.s_addr = ((struct in_addr *)(localHostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */ 00331 free(tmpLocalHostbuf); 00332 } 00333 } 00334 else { 00335 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local IP4 usage (without DNS lookup) of IP '%s'", localHostName); 00336 /* use ip4 addr directly to avoid dns lookup */ 00337 localAddr.sin_addr.s_addr = inet_addr(localHostName); 00338 } 00339 } 00340 if (localPort > 0) { 00341 localAddr.sin_port = htons((unsigned short)localPort); 00342 } 00343 if (bind(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { 00344 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00345 "Failed binding local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d", 00346 localHostName, localPort); 00347 } 00348 else { 00349 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00350 "Bound local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d", 00351 localHostName, localPort); 00352 } 00353 } 00354 00355 /* int retval = fcntl(xb->socketToXmlBlaster, F_SETFL, O_NONBLOCK); */ /* Switch on none blocking mode: we then should use select() to be notified when the kernel succeeded with connect() */ 00356 00357 if ((ret=connect(xb->socketToXmlBlaster, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) != -1) { 00358 if (xb->logLevel>=XMLBLASTER_LOG_INFO) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Connected to xmlBlaster"); 00359 xb->useUdpForOneway = xb->props->getBool(xb->props, "plugin/socket/useUdpForOneway", xb->useUdpForOneway); 00360 xb->useUdpForOneway = xb->props->getBool(xb->props, "dispatch/connection/plugin/socket/useUdpForOneway", xb->useUdpForOneway); 00361 00362 if (xb->useUdpForOneway) { 00363 struct sockaddr_in localAddr; 00364 socklen_t size = (socklen_t)sizeof(localAddr); 00365 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00366 "Using UDP connection for oneway calls, see -dispatch/connection/plugin/socket/useUdpForOneway true"); 00367 00368 xb->socketToXmlBlasterUdp = (int)socket(AF_INET, SOCK_DGRAM, 0); 00369 00370 if (xb->socketToXmlBlasterUdp != -1) { 00371 if (getsockname(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, &size) == -1) { 00372 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00373 "Can't determine the local socket host and port (in UDP), errno=%d", errno); 00374 return false; 00375 } 00376 00377 if (bind(xb->socketToXmlBlasterUdp, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { 00378 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00379 "Failed binding local port (in UDP) -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d", 00380 localHostName, localPort); 00381 return false; 00382 } 00383 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00384 "Bound local UDP port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d", 00385 localHostName, localPort); 00386 00387 if ((ret=connect(xb->socketToXmlBlasterUdp, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) == -1) { 00388 char errnoStr[MAX_ERRNO_LEN]; 00389 xb_strerror(errnoStr, MAX_ERRNO_LEN, errno); 00390 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00391 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00392 "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP), ret=%d, %s", 00393 __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr); 00394 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00395 return false; 00396 } 00397 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connected to xmlBlaster with UDP"); 00398 } /* if (xb->socketToXmlBlasterUdp != -1) */ 00399 else { 00400 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00401 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00402 "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP) errno=%d", 00403 __FILE__, __LINE__, serverHostName, servTcpPort, errno); 00404 return false; 00405 } 00406 } /* if (xb->useUdpForOneway) */ 00407 00408 } 00409 else { /* connect(...) == -1 */ 00410 char errnoStr[MAX_ERRNO_LEN]; 00411 xb_strerror(errnoStr, MAX_ERRNO_LEN, errno); 00412 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00413 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00414 "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed, ret=%d, %s", 00415 __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr); 00416 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00417 return false; 00418 } 00419 } 00420 else { 00421 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00422 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00423 "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed errno=%d", 00424 __FILE__, __LINE__, serverHostName, servTcpPort, errno); 00425 return false; 00426 } 00427 xb->isInitialized = true; 00428 return true; 00429 } 00430 00431 00449 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception) 00450 { 00451 #ifdef XMLBLASTER_PERSISTENT_QUEUE_TEST 00452 if (checkArgs(xb, "initQueue", false, exception) == false ) return false; 00453 if (xb->queueP) { 00454 char message[XMLBLASTEREXCEPTION_MESSAGE_LEN]; 00455 SNPRINTF(message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00456 "[%.100s:%d] The queue is initialized already, call to initQueue() is ignored", __FILE__, __LINE__); 00457 embedException(exception, "user.illegalArgument", message, exception); 00458 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00459 return false; 00460 } 00461 00462 { 00463 QueueProperties tmp; 00464 memset(&tmp, 0, sizeof(QueueProperties)); 00465 00466 if (queueProperties == 0) 00467 queueProperties = &tmp; 00468 00469 if (*queueProperties->dbName == 0) { 00470 strncpy0(queueProperties->dbName, xb->props->getString(xb->props, "queue/connection/dbName", "xmlBlasterClient.db"), QUEUE_DBNAME_MAX); 00471 } 00472 if (*queueProperties->nodeId == 0) { 00473 strncpy0(queueProperties->nodeId, xb->props->getString(xb->props, "queue/connection/nodeId", "client"), QUEUE_ID_MAX); 00474 } 00475 if (*queueProperties->queueName == 0) { 00476 strncpy0(queueProperties->queueName, xb->props->getString(xb->props, "queue/connection/queueName", "connection_client"), QUEUE_ID_MAX); 00477 } 00478 if (*queueProperties->tablePrefix == 0) { 00479 strncpy0(queueProperties->tablePrefix, xb->props->getString(xb->props, "queue/connection/tablePrefix", "XB_"), QUEUE_PREFIX_MAX); 00480 } 00481 if (queueProperties->maxNumOfEntries == 0) { 00482 queueProperties->maxNumOfEntries = xb->props->getInt(xb->props, "queue/connection/maxEntries", 10000000); 00483 } 00484 if (queueProperties->maxNumOfBytes == 0) { 00485 queueProperties->maxNumOfBytes = xb->props->getInt64(xb->props, "queue/connection/maxBytes", 10000000LL); 00486 } 00487 if (queueProperties->logFp == 0) queueProperties->logFp = xb->log; 00488 if (queueProperties->logLevel == 0) queueProperties->logLevel = xb->logLevel; 00489 if (queueProperties->userObject == 0) queueProperties->userObject = xb->userObject; 00490 00491 xb->queueP = createQueue(queueProperties, exception); 00492 if (*exception->errorCode != 0) { 00493 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Queue initializeation failed: [%s] %s\n", exception->errorCode, exception->message); 00494 return false; 00495 } 00496 xb->queueP->userObject = xb; 00497 } 00498 return true; 00499 #else 00500 if (queueProperties) {} /* To suppress compiler warning that not used */ 00501 strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00502 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00503 "[%.100s:%d] Queue support is not compiled into the library, please recompile with '-DXMLBLASTER_PERSISTENT_QUEUE=1 and -DXMLBLASTER_PERSISTENT_QUEUE_TEST=1", __FILE__, __LINE__); 00504 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00505 return false; 00506 #endif /* XMLBLASTER_PERSISTENT_QUEUE_TEST */ 00507 } 00508 00509 static bool isConnected(XmlBlasterConnectionUnparsed *xb) 00510 { 00511 return (xb->socketToXmlBlaster > -1) ? true : false; 00512 } 00513 00514 const char *xmlBlasterConnectionUnparsedUsage() 00515 { 00516 /* To prevent compiler warning */ 00517 /* "string length `596' is greater than the length `509' ISO C89 compilers are required to support" */ 00518 /* we have a static variable */ 00519 enum { SIZE=2048 }; 00520 static char usage[SIZE]; 00521 strncpy0(usage, 00522 "\n -dispatch/connection/plugin/socket/hostname [localhost]" 00523 "\n Where to find xmlBlaster." 00524 "\n -dispatch/connection/plugin/socket/port [7607]" 00525 "\n The port where xmlBlaster listens." 00526 "\n -dispatch/connection/plugin/socket/localHostname [NULL]", SIZE/2); 00527 strncat0(usage, 00528 "\n Force the local IP, useful on multi homed computers." 00529 "\n -dispatch/connection/plugin/socket/localPort [0]" 00530 "\n Force the local port, useful to tunnel firewalls." 00531 "\n -dispatch/connection/plugin/socket/compress/type []" 00532 #if XMLBLASTER_ZLIB==1 00533 "\n Switch on compression with 'zlib:stream'." 00534 #else 00535 "\n No compression support. Try recompiling with with '-DXMLBLASTER_ZLIB=1'." 00536 #endif 00537 "\n -dispatch/connection/plugin/socket/useUdpForOneway [false]" 00538 "\n Use UDP for publishOneway() calls.", SIZE/2); 00539 return usage; 00540 } 00541 00545 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb) 00546 { 00547 if (xb != 0 && xb->isConnected(xb)) { 00548 # if defined(_WINDOWS) 00549 int how = SD_BOTH; /* SD_BOTH requires Winsock2.h */ 00550 # else 00551 int how = SHUT_RDWR; /* enum SHUT_RDWR = 2 */ 00552 # endif 00553 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00554 "shutdown() socketToXmlBlaster=%d socketToXmlBlasterUdp=%d", xb->socketToXmlBlaster, xb->socketToXmlBlasterUdp); 00555 shutdown(xb->socketToXmlBlaster, how); 00556 closeSocket(xb->socketToXmlBlaster); 00557 xb->socketToXmlBlaster = -1; 00558 if (xb->socketToXmlBlasterUdp != -1) { 00559 shutdown(xb->socketToXmlBlasterUdp, how); 00560 closeSocket(xb->socketToXmlBlasterUdp); 00561 xb->socketToXmlBlasterUdp = -1; 00562 } 00563 } 00564 } 00565 00581 static bool sendData(XmlBlasterConnectionUnparsed *xb, 00582 const char * const methodName, 00583 enum XMLBLASTER_MSG_TYPE_ENUM msgType, 00584 const char *data_, 00585 size_t dataLen_, 00586 SocketDataHolder *responseSocketDataHolder, 00587 XmlBlasterException *exception, 00588 bool udp) 00589 { 00590 ssize_t numSent; 00591 size_t rawMsgLen = 0; 00592 char *rawMsg = (char *)0; 00593 char *rawMsgStr; 00594 MsgRequestInfo *requestInfoP; 00595 MsgRequestInfo requestInfo; 00596 memset(&requestInfo, 0, sizeof(MsgRequestInfo)); 00597 00598 if (data_ == 0) { 00599 data_ = ""; 00600 dataLen_ = 0; 00601 } 00602 00603 if (exception == 0) { 00604 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception to sendData()", __FILE__, __LINE__); 00605 return false; 00606 } 00607 initializeXmlBlasterException(exception); 00608 00609 if (responseSocketDataHolder) 00610 memset(responseSocketDataHolder, 0, sizeof(SocketDataHolder)); 00611 00612 if (!xb->isConnected(xb)) { 00613 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00614 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] No connection to xmlBlaster", __FILE__, __LINE__); 00615 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00616 return false; 00617 } 00618 00619 if (strcmp(XMLBLASTER_CONNECT, methodName) && strlen(xb->secretSessionId) < 1) { 00620 strncpy0(exception->errorCode, "user.notConnected", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00621 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please call connect() before invoking '%s'", __FILE__, __LINE__, methodName); 00622 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00623 return false; 00624 } 00625 00626 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00627 "sendData(udp=%s) requestId '%ld' increment to '%ld', dataLen=%d", 00628 ((udp==true) ? "true" : "false"), xb->requestId, xb->requestId+1, dataLen_); 00629 00630 { 00631 long tmp = ++xb->requestId; /* TODO: We need to sync requestId !!!! */ 00632 if (xb->requestId > 1000000000) xb->requestId = 0; 00633 SNPRINTF(requestInfo.requestIdStr, MAX_REQUESTID_LEN, "%-ld", tmp); 00634 } 00635 00636 requestInfo.methodName = methodName; 00637 if (xb->preSendEvent != 0) { 00638 /* A callback function pointer is registered to be notified just before sending */ 00639 XmlBlasterBlob blob; 00640 blobcpyAlloc(&blob, data_, dataLen_); /* Take a clone, the preSendEvent() function may manipulate it */ 00641 requestInfo.blob.dataLen = blob.dataLen; 00642 requestInfo.blob.data = blob.data; 00643 requestInfo.xa = xb->preSendEvent_userP; 00644 requestInfoP = xb->preSendEvent(&requestInfo, exception); 00645 if (*exception->message != 0) { 00646 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00647 "Re-throw exception from preSendEvent errorCode=%s message=%s", exception->errorCode, exception->message); 00648 return false; 00649 } 00650 if (requestInfoP == 0) { 00651 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00652 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR: returning requestInfo 0 without exception is not supported, please correct your preSendEvent() function.", __FILE__, __LINE__); 00653 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00654 return false; 00655 } 00656 if (blob.data != requestInfoP->blob.data) { 00657 /* The callback function has changed/manipulated the user data */ 00658 freeBlobHolderContent(&blob); 00659 } 00660 rawMsg = encodeSocketMessage(msgType, requestInfo.requestIdStr, requestInfo.methodName, xb->secretSessionId, 00661 requestInfoP->blob.data, requestInfoP->blob.dataLen, xb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen); 00662 freeBlobHolderContent(&requestInfoP->blob); 00663 } 00664 else { 00665 rawMsg = encodeSocketMessage(msgType, requestInfo.requestIdStr, requestInfo.methodName, xb->secretSessionId, 00666 data_, dataLen_, xb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen); 00667 } 00668 00669 /* send the header ... */ 00670 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Lowlevel writing data to socket ..."); 00671 numSent = xb->writeToSocket.writeToSocketFuncP(xb->writeToSocket.userP, udp ? xb->socketToXmlBlasterUdp : xb->socketToXmlBlaster, rawMsg, (int)rawMsgLen); 00672 if (numSent == -1) { 00673 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00674 "Lost connection to xmlBlaster server"); 00675 xmlBlasterConnectionShutdown(xb); 00676 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00677 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Lost connection to xmlBlaster server", __FILE__, __LINE__); 00678 free(rawMsg); 00679 if (xb->postSendEvent != 0) { 00680 requestInfo.rollback = true; 00681 requestInfoP = xb->postSendEvent(&requestInfo, exception); 00682 } 00683 return false; 00684 } 00685 00686 if (numSent != (int)rawMsgLen) { 00687 if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00688 "Sent only %d bytes from %u", numSent, rawMsgLen); 00689 strncpy0(exception->errorCode, "user.connect", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00690 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR Sent only %ld bytes from %lu", __FILE__, __LINE__, (long)numSent, (unsigned long)rawMsgLen); 00691 free(rawMsg); 00692 if (xb->postSendEvent != 0) { 00693 requestInfo.rollback = true; 00694 requestInfoP = xb->postSendEvent(&requestInfo, exception); 00695 } 00696 return false; 00697 } 00698 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Lowlevel writing data to socket done."); 00699 00700 free(rawMsg); 00701 rawMsg = 0; 00702 00703 if (xbl_isOneway(msgType, methodName)) 00704 return true; /* Responses and exceptions are oneway */ 00705 00706 if (responseSocketDataHolder) { /* if not oneway read the response message */ 00707 00708 if (xb->postSendEvent != 0) { 00709 /* A callback function pointer is registered to be notified just after sending */ 00710 requestInfo.responseType = 0; 00711 requestInfo.blob.dataLen = 0; 00712 requestInfo.blob.data = 0; 00713 /* Here the thread blocks until a response from CallbackServer arrives */ 00714 requestInfoP = xb->postSendEvent(&requestInfo, exception); 00715 if (*exception->message != 0) { 00716 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00717 "Re-throw exception from preSendEvent errorCode=%s message=%s", exception->errorCode, exception->message); 00718 return false; 00719 } 00720 if (requestInfoP == 0) { 00721 printf("[XmlBlasterConnectionUnparsed] TODO: returning requestInfo 0 is not implemented"); 00722 } 00723 /* TODO: Possible race condition */ 00724 responseSocketDataHolder->type = requestInfoP->responseType; 00725 responseSocketDataHolder->version = XMLBLASTER_SOCKET_VERSION; 00726 strncpy0(responseSocketDataHolder->requestId, requestInfo.requestIdStr, MAX_REQUESTID_LEN); 00727 strncpy0(responseSocketDataHolder->methodName, methodName, MAX_METHODNAME_LEN); 00728 00729 if (requestInfoP->responseType == MSG_TYPE_EXCEPTION) { /* convert XmlBlasterException thrown from remote */ 00730 convertToXmlBlasterException(&requestInfoP->blob, exception, xb->logLevel >= XMLBLASTER_LOG_DUMP); 00731 freeBlobHolderContent(&requestInfoP->blob); 00732 return false; 00733 } 00734 else { 00735 responseSocketDataHolder->blob.dataLen = requestInfoP->blob.dataLen; 00736 responseSocketDataHolder->blob.data = requestInfoP->blob.data; /* The responseSocketDataHolder is now responsible to free(responseSocketDataHolder->blob.data) */ 00737 } 00738 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00739 "requestId '%s' returns dataLen=%d", requestInfo.requestIdStr, requestInfoP->blob.dataLen); 00740 } 00741 else { 00742 /* Wait on the response ourself */ 00743 if (getResponse(xb, responseSocketDataHolder, exception, udp) == false) { /* false on EOF */ 00744 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Lost connection to xmlBlaster server"); 00745 xmlBlasterConnectionShutdown(xb); 00746 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00747 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Lost connection to xmlBlaster server", __FILE__, __LINE__); 00748 return false; 00749 } 00750 if (responseSocketDataHolder->type == MSG_TYPE_EXCEPTION) { /* convert XmlBlasterException */ 00751 convertToXmlBlasterException(&responseSocketDataHolder->blob, exception, xb->logLevel >= XMLBLASTER_LOG_DUMP); 00752 freeBlobHolderContent(&responseSocketDataHolder->blob); 00753 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00754 "Re-throw exception from response errorCode=%s message=%s", exception->errorCode, exception->message); 00755 return false; 00756 } 00757 } 00758 00759 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) { 00760 rawMsgStr = blobDump(&responseSocketDataHolder->blob); 00761 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Received response msgLen=%u type=%c version=%c requestId=%s methodName=%s dateLen=%u data='%.100s ...'", 00762 responseSocketDataHolder->msgLen, responseSocketDataHolder->type, responseSocketDataHolder->version, responseSocketDataHolder->requestId, 00763 responseSocketDataHolder->methodName, responseSocketDataHolder->blob.dataLen, rawMsgStr); 00764 freeBlobDump(rawMsgStr); 00765 } 00766 } 00767 00768 return true; 00769 } 00770 00782 static bool getResponse(XmlBlasterConnectionUnparsed *xb, SocketDataHolder *responseSocketDataHolder, XmlBlasterException *exception, bool udp) 00783 { 00784 bool stopListenLoop = false; 00785 return parseSocketData(xb->socketToXmlBlaster, &xb->readFromSocket, responseSocketDataHolder, exception, &stopListenLoop, udp, xb->logLevel >= XMLBLASTER_LOG_DUMP); 00786 } 00787 00799 static char *xmlBlasterConnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception) 00800 { 00801 SocketDataHolder responseSocketDataHolder; 00802 char *response; 00803 00804 if (qos == 0) { 00805 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00806 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterConnect()", __FILE__, __LINE__); 00807 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00808 return (char *)0; 00809 } 00810 00811 if (initConnection(xb, exception) == false) { 00812 return (char *)0; 00813 } 00814 00815 if (sendData(xb, XMLBLASTER_CONNECT, MSG_TYPE_INVOKE, (const char *)qos, 00816 (qos == (const char *)0) ? 0 : strlen(qos), 00817 &responseSocketDataHolder, exception, SOCKET_TCP) == false) { 00818 return (char *)0; 00819 } 00820 00821 response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen); 00822 freeBlobHolderContent(&responseSocketDataHolder.blob); 00823 00824 /* Extract secret session ID from ConnectReturnQos */ 00825 *xb->secretSessionId = 0; 00826 { 00827 const char *pEnd = (const char *)0; 00828 const char *pStart = strstr(response, "sessionId='"); 00829 if (pStart) { 00830 pStart += strlen("sessionId='"); 00831 pEnd = strstr(pStart, "'"); 00832 if (pEnd) { 00833 int len = (int)(pEnd - pStart + 1); 00834 if (len >= MAX_SECRETSESSIONID_LEN) { 00835 strncpy0(exception->errorCode, "user.response", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00836 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR Received too long secret sessionId with len=%d, please change setting MAX_SECRETSESSIONID_LEN", __FILE__, __LINE__, len); 00837 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00838 } 00839 strncpy0(xb->secretSessionId, pStart, len); 00840 } 00841 } 00842 } 00843 00844 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00845 "Got response for connect(secretSessionId=%s)", xb->secretSessionId); 00846 00847 return response; 00848 } 00849 00858 static bool xmlBlasterDisconnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception) 00859 { 00860 SocketDataHolder responseSocketDataHolder; 00861 00862 if (checkArgs(xb, XMLBLASTER_DISCONNECT, true, exception) == false ) return 0; 00863 00864 if (sendData(xb, XMLBLASTER_DISCONNECT, MSG_TYPE_INVOKE, (const char *)qos, 00865 (qos == (const char *)0) ? 0 : strlen(qos), 00866 &responseSocketDataHolder, exception, SOCKET_TCP) == false) { 00867 return false; 00868 } 00869 00870 freeBlobHolderContent(&responseSocketDataHolder.blob); 00871 00872 xmlBlasterConnectionShutdown(xb); 00873 *xb->secretSessionId = 0; 00874 return true; 00875 } 00876 00877 00878 #if XMLBLASTER_PERSISTENT_QUEUE_TEST==1 00879 00883 static int parsePriority(const char *qos) { 00884 char *pPrio, *pPrioEnd; 00885 /*const int PRIORITY_MAXLEN = 10;*/ 00886 #define PRIORITY_MAXLEN 10 /* To be backward compatible to C90 */ 00887 char prioStr[PRIORITY_MAXLEN]; 00888 int len = 1; 00889 int prio = 5; 00890 const int lenPrio=strlen("<priority>"); 00891 00892 if (qos == 0) return prio; 00893 00894 pPrio = strstr(qos, "<priority>"); 00895 if (pPrio == 0) return prio; 00896 00897 pPrioEnd = strstr(qos, "</priority>"); 00898 if (pPrioEnd == 0) return prio; 00899 00900 len = pPrioEnd-pPrio-lenPrio; 00901 if (len >= PRIORITY_MAXLEN) { 00902 return prio; 00903 } 00904 strncpy(prioStr, pPrio+lenPrio, len); 00905 *(prioStr+len) = 0; 00906 sscanf(prioStr, "%d", &prio); /* on error prio remains 5, white spaces are stripped by sscanf */ 00907 return prio; 00908 } 00909 00915 static char *xmlBlasterQueuePut(XmlBlasterConnectionUnparsed *xb, int priority, BlobHolder *blob, XmlBlasterException *exception) 00916 { 00917 QueueEntry queueEntry; 00918 XmlBlasterException queueException; 00919 00920 QueueProperties *queuePropertiesP = 0; /* 0: read configuration from environment */ 00921 /* 00922 QueueProperties queueProperties; 00923 memset(&queueProperties, 0, sizeof(QueueProperties)); 00924 queuePropertiesP = &queueProperties; 00925 strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX); 00926 strncpy0(queueProperties.nodeId, "clientJoe1081594557415", QUEUE_ID_MAX); 00927 strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX); 00928 strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX); 00929 queueProperties.maxNumOfEntries = 10000000L; 00930 queueProperties.maxNumOfBytes = 1000000000LL; 00931 queueProperties.logFp = xb->log; 00932 queueProperties.logLevel = xb->logLevel; 00933 queueProperties.userObject = xb->userObject; 00934 queueP = createQueue(&queueProperties, &queueException); 00935 */ 00936 00937 if (xb->queueP == 0) { 00938 if (xb->initQueue(xb, queuePropertiesP, exception) == false) 00939 return 0; 00940 } 00941 00942 queueEntry.priority = priority; 00943 queueEntry.isPersistent = true; 00944 queueEntry.uniqueId = getTimestamp(); 00945 strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN); 00946 queueEntry.embeddedBlob.data = blob->data; 00947 queueEntry.embeddedBlob.dataLen = blob->dataLen; 00948 00949 xb->queueP->put(xb->queueP, &queueEntry, &queueException); 00950 if (*queueException.errorCode != 0) { 00951 embedException(exception, queueException.errorCode, queueException.message, exception); 00952 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Put to queue failed: [%s] %s\n", exception->errorCode, exception->message); 00953 return 0; 00954 } 00955 *exception->errorCode = 0; /* Successfully queued: no error */ 00956 return strcpyAlloc("<qos><state id='OK' info='QUEUED'/></qos>"); 00957 } 00958 #endif /*XMLBLASTER_PERSISTENT_QUEUE_TEST==1*/ 00959 00967 static char *xmlBlasterPublish(XmlBlasterConnectionUnparsed *xb, MsgUnit *msgUnit, XmlBlasterException *exception) 00968 { 00969 SocketDataHolder responseSocketDataHolder; 00970 char *response = 0; 00971 00972 BlobHolder blob = encodeMsgUnit(msgUnit, xb->logLevel >= XMLBLASTER_LOG_DUMP); 00973 msgUnit->responseQos = 0; /* In case no initial memset(&msgUnit, 0, sizeof(MsgUnit)); was made */ 00974 00975 if (checkArgs(xb, "publish", true, exception) == false ) return 0; 00976 00977 msgUnit->responseQos = 0; /* Initialize properly */ 00978 00979 if (sendData(xb, XMLBLASTER_PUBLISH, MSG_TYPE_INVOKE, blob.data, blob.dataLen, 00980 &responseSocketDataHolder, exception, SOCKET_TCP) == false) { 00981 00982 # if XMLBLASTER_PERSISTENT_QUEUE_TEST==1 /* TEST CODE */ 00983 if (strstr(exception->errorCode, "user.notConnected") != 0 || 00984 strstr(exception->errorCode, "communication.noConnection") != 0) { /* On communication problem queue messages */ 00985 int priority = parsePriority(msgUnit->qos); 00986 response = xmlBlasterQueuePut(xb, priority, &blob, exception); 00987 /* NO: msgUnit->responseQos = response; otherwise a free(msgUnit) will free the response as well */ 00988 } 00989 # endif 00990 00991 free(blob.data); 00992 return response; 00993 } 00994 free(blob.data); 00995 00996 response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen); 00997 freeBlobHolderContent(&responseSocketDataHolder.blob); 00998 00999 return response; 01000 } 01001 01009 static QosArr *xmlBlasterPublishArr(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception) 01010 { 01011 size_t i; 01012 SocketDataHolder responseSocketDataHolder; 01013 QosArr *response = 0; 01014 01015 BlobHolder blob = encodeMsgUnitArr(msgUnitArr, xb->logLevel >= XMLBLASTER_LOG_DUMP); 01016 01017 if (checkArgs(xb, "publishArr", true, exception) == false ) return 0; 01018 01019 for (i=0; i<msgUnitArr->len; i++) 01020 msgUnitArr->msgUnitArr[i].responseQos = 0; /* Initialize properly */ 01021 01022 if (sendData(xb, XMLBLASTER_PUBLISH, MSG_TYPE_INVOKE, blob.data, blob.dataLen, 01023 &responseSocketDataHolder, exception, SOCKET_TCP) == false) { 01024 free(blob.data); 01025 return 0; 01026 } 01027 free(blob.data); 01028 01029 response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data); 01030 freeBlobHolderContent(&responseSocketDataHolder.blob); 01031 01032 return response; 01033 } 01034 01040 static void xmlBlasterPublishOneway(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception) 01041 { 01042 size_t i; 01043 SocketDataHolder responseSocketDataHolder; 01044 01045 BlobHolder blob = encodeMsgUnitArr(msgUnitArr, xb->logLevel >= XMLBLASTER_LOG_DUMP); 01046 01047 if (checkArgs(xb, "publishOneway", true, exception) == false ) return; 01048 01049 for (i=0; i<msgUnitArr->len; i++) { 01050 msgUnitArr->msgUnitArr[i].responseQos = 0; /* Initialize properly */ 01051 } 01052 01053 /* 01054 if (!xb->useUdpForOneway) { 01055 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 01056 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] UDP not enabled, use -dispatch/connection/plugin/socket/enableUDP true", __FILE__, __LINE__); 01057 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 01058 free(blob.data); 01059 return; 01060 } 01061 */ 01062 01063 if (sendData(xb, XMLBLASTER_PUBLISH_ONEWAY, MSG_TYPE_INVOKE, blob.data, blob.dataLen, 01064 &responseSocketDataHolder, exception, xb->useUdpForOneway) == false) { 01065 free(blob.data); 01066 return; 01067 } 01068 free(blob.data); 01069 freeBlobHolderContent(&responseSocketDataHolder.blob); /* Could be ommitted for oneway */ 01070 } 01071 01079 static char *xmlBlasterSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception) 01080 { 01081 size_t qosLen, keyLen, totalLen; 01082 char *data; 01083 size_t currpos = 0; 01084 SocketDataHolder responseSocketDataHolder; 01085 char *response; 01086 01087 if (checkArgs(xb, "subscribe", true, exception) == false ) return 0; 01088 01089 if (key == 0) { 01090 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 01091 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterSubscribe()", __FILE__, __LINE__); 01092 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 01093 return (char *)0; 01094 } 01095 01096 if (qos == (const char *)0) { 01097 qos = ""; 01098 } 01099 qosLen = strlen(qos); 01100 keyLen = strlen(key); 01101 01102 totalLen = qosLen + 1 + keyLen + 1; 01103 01104 data = (char *)malloc(totalLen); 01105 01106 memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */ 01107 currpos += qosLen+1; 01108 01109 memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */ 01110 currpos += keyLen+1; 01111 01112 if (sendData(xb, XMLBLASTER_SUBSCRIBE, MSG_TYPE_INVOKE, data, totalLen, 01113 &responseSocketDataHolder, exception, SOCKET_TCP) == false) { 01114 free(data); 01115 return (char *)0; 01116 } 01117 free(data); 01118 01119 response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen); 01120 freeBlobHolderContent(&responseSocketDataHolder.blob); 01121 01122 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 01123 "Got response for subscribe(): %s", response); 01124 01125 return response; 01126 } 01127 01135 static QosArr *xmlBlasterUnSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception) 01136 { 01137 size_t qosLen, keyLen, totalLen; 01138 char *data; 01139 size_t currpos = 0; 01140 SocketDataHolder responseSocketDataHolder; 01141 QosArr *response; 01142 01143 if (checkArgs(xb, "unSubscribe", true, exception) == false ) return 0; 01144 01145 if (key == 0) { 01146 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 01147 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterUnSubscribe()", __FILE__, __LINE__); 01148 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 01149 return (QosArr *)0; 01150 } 01151 01152 if (qos == (const char *)0) { 01153 qos = ""; 01154 } 01155 qosLen = strlen(qos); 01156 keyLen = strlen(key); 01157 01158 totalLen = qosLen + 1 + keyLen + 1; 01159 01160 data = (char *)malloc(totalLen); 01161 01162 memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */ 01163 currpos += qosLen+1; 01164 01165 memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */ 01166 currpos += keyLen+1; 01167 01168 if (sendData(xb, XMLBLASTER_UNSUBSCRIBE, MSG_TYPE_INVOKE, data, totalLen, 01169 &responseSocketDataHolder, exception, SOCKET_TCP) == false) { 01170 free(data); 01171 return (QosArr *)0; 01172 } 01173 free(data); 01174 01175 response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data); 01176 freeBlobHolderContent(&responseSocketDataHolder.blob); 01177 01178 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) { 01179 size_t ii; 01180 for (ii=0; ii<response->len; ii++) { 01181 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 01182 "Got response for unSubscribe(): %s", response->qosArr[ii]); 01183 } 01184 } 01185 01186 return response; 01187 } 01188 01197 static QosArr *xmlBlasterErase(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception) 01198 { 01199 size_t qosLen, keyLen, totalLen; 01200 char *data; 01201 size_t currpos = 0; 01202 SocketDataHolder responseSocketDataHolder; 01203 QosArr *response; 01204 01205 if (checkArgs(xb, "erase", true, exception) == false ) return 0; 01206 01207 if (key == 0) { 01208 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 01209 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterErase()", __FILE__, __LINE__); 01210 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 01211 return (QosArr *)0; 01212 } 01213 01214 if (qos == (const char *)0) { 01215 qos = ""; 01216 } 01217 qosLen = strlen(qos); 01218 keyLen = strlen(key); 01219 01220 totalLen = qosLen + 1 + keyLen + 1; 01221 01222 data = (char *)malloc(totalLen); 01223 01224 memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */ 01225 currpos += qosLen+1; 01226 01227 memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */ 01228 currpos += keyLen+1; 01229 01230 if (sendData(xb, XMLBLASTER_ERASE, MSG_TYPE_INVOKE, data, totalLen, 01231 &responseSocketDataHolder, exception, SOCKET_TCP) == false) { 01232 free(data); 01233 return (QosArr *)0; 01234 } 01235 free(data); 01236 01237 response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data); 01238 freeBlobHolderContent(&responseSocketDataHolder.blob); 01239 01240 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) { 01241 size_t ii; 01242 for (ii=0; ii<response->len; ii++) { 01243 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 01244 "Got response for erase(): %s", response->qosArr[ii]); 01245 } 01246 } 01247 01248 return response; 01249 } 01250 01259 static char *xmlBlasterPing(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception) 01260 { 01261 SocketDataHolder responseSocketDataHolder; 01262 char *response; 01263 01264 if (checkArgs(xb, "ping", true, exception) == false ) return 0; 01265 01266 if (sendData(xb, XMLBLASTER_PING, MSG_TYPE_INVOKE, (const char *)qos, 01267 (qos == (const char *)0) ? 0 : strlen(qos), 01268 &responseSocketDataHolder, exception, SOCKET_TCP) == false) { 01269 return (char *)0; 01270 } 01271 01272 response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen); 01273 freeBlobHolderContent(&responseSocketDataHolder.blob); 01274 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 01275 "Got response for ping '%s'", response); 01276 return response; 01277 } 01278 01286 static MsgUnitArr *xmlBlasterGet(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception) 01287 { 01288 size_t qosLen, keyLen, totalLen; 01289 char *data; 01290 size_t currpos = 0; 01291 SocketDataHolder responseSocketDataHolder; 01292 MsgUnitArr *msgUnitArr = 0; 01293 01294 if (key == 0) { 01295 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 01296 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterGet()", __FILE__, __LINE__); 01297 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 01298 return (MsgUnitArr *)0; 01299 } 01300 01301 if (qos == (const char *)0) qos = ""; 01302 qosLen = strlen(qos); 01303 keyLen = strlen(key); 01304 01305 totalLen = qosLen + 1 + keyLen + 1; 01306 01307 data = (char *)malloc(totalLen); 01308 01309 memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */ 01310 currpos += qosLen+1; 01311 01312 memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */ 01313 currpos += keyLen+1; 01314 01315 if (sendData(xb, XMLBLASTER_GET, MSG_TYPE_INVOKE, data, totalLen, 01316 &responseSocketDataHolder, exception, SOCKET_TCP) == false) { 01317 free(data); 01318 return (MsgUnitArr *)0; /* exception is filled with details */ 01319 } 01320 free(data); 01321 01322 /* Now process the returned messages */ 01323 01324 msgUnitArr = parseMsgUnitArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data); 01325 freeBlobHolderContent(&responseSocketDataHolder.blob); 01326 01327 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 01328 "Returned %u messages for get()", msgUnitArr->len); 01329 01330 return msgUnitArr; 01331 } 01332 01336 static ssize_t writenPlain(void *userP, const int fd, const char *ptr, const size_t nbytes) { 01337 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP; 01338 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "writenPlain(%u)", nbytes); 01339 return writen(fd, ptr, nbytes); 01340 } 01341 01345 static ssize_t writenCompressed(void *userP, const int fd, const char *ptr, const size_t nbytes) { 01346 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP; 01347 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "writenCompressed(%u)", nbytes); 01348 return xmlBlaster_writenCompressed(xb->zlibWriteBuf, fd, ptr, nbytes); 01349 } 01350 01354 static ssize_t readnPlain(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) { 01355 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP; 01356 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "readnPlain(%u)", nbytes); 01357 return readn(fd, ptr, nbytes, fpNumRead, userP2); 01358 } 01359 01363 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) { 01364 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP; 01365 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "readnCompressed(%u)", nbytes); 01366 return xmlBlaster_readnCompressed(xb->zlibReadBuf, fd, ptr, nbytes, fpNumRead, userP2); 01367 } 01368 01376 static bool checkArgs(XmlBlasterConnectionUnparsed *xb, const char *methodName, bool checkIsConnected, XmlBlasterException *exception) 01377 { 01378 if (xb == 0) { 01379 char *stack = getStackTrace(10); 01380 printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %s() %s", 01381 __FILE__, __LINE__, methodName, stack); 01382 free(stack); 01383 return false; 01384 } 01385 01386 if (exception == 0) { 01387 char *stack = getStackTrace(10); 01388 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s", 01389 __FILE__, __LINE__, methodName, stack); 01390 free(stack); 01391 return false; 01392 } 01393 01394 if (checkIsConnected) { 01395 if (!xb->isConnected(xb)) { 01396 char *stack = getStackTrace(10); 01397 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 01398 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 01399 "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s", 01400 __FILE__, __LINE__, methodName, stack); 01401 free(stack); 01402 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 01403 return false; 01404 } 01405 } 01406 01407 return true; 01408 } 01409 01410