00001 /*---------------------------------------------------------------------------- 00002 Name: CallbackServerUnparsed.c 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Establish a listen socket for xmlBlaster callbacks 00006 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info> 00007 Compile: 00008 LINUX: gcc -g -Wall -DUSE_MAIN_CB -I.. -o CallbackServerUnparsed CallbackServerUnparsed.c xmlBlasterSocket.c ../util/msgUtil.c ../util/Properties.c 00009 WIN: cl /MT -DUSE_MAIN_CB -D_WINDOWS -I.. CallbackServerUnparsed.c xmlBlasterSocket.c ../util/msgUtil.c ../util/Properties.c ws2_32.lib 00010 Solaris: cc -g -DUSE_MAIN_CB -I.. -o CallbackServerUnparsed CallbackServerUnparsed.c xmlBlasterSocket.c ../util/msgUtil.c ../util/Properties.c -lsocket -lnsl 00011 -----------------------------------------------------------------------------*/ 00012 #include <stdio.h> 00013 #include <string.h> 00014 #if defined(WINCE) 00015 # if defined(XB_USE_PTHREADS) 00016 # include <pthreads/pthread.h> 00017 # else 00018 /*#include <pthreads/need_errno.h> */ 00019 static int errno=0; /* single threaded workaround*/ 00020 # endif 00021 #else 00022 # include <errno.h> 00023 #endif 00024 #include <socket/xmlBlasterSocket.h> /* gethostname() */ 00025 #include <CallbackServerUnparsed.h> 00026 00027 static bool useThisSocket(CallbackServerUnparsed *cb, int socketToUse, int socketToUseUdp); 00028 static int runCallbackServer(CallbackServerUnparsed *cb); 00029 static bool createCallbackServer(CallbackServerUnparsed *cb); 00030 static bool isListening(CallbackServerUnparsed *cb); 00031 static bool readMessage(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, XmlBlasterException *exception, bool udp); 00032 static ssize_t writenPlain(void *cb, const int fd, const char *ptr, const size_t nbytes); 00033 static ssize_t readnPlain(void *cb, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2); 00034 static bool addResponseListener(CallbackServerUnparsed *cb, MsgRequestInfo *msgRequestInfoP, ResponseFp responseEventFp); 00035 static ResponseListener *removeResponseListener(CallbackServerUnparsed *cb, const char *requestId); 00036 static void voidSendResponse(CallbackServerUnparsed *cb, void *socketDataHolder, MsgUnitArr *msgUnitArr); 00037 static void sendResponse(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, MsgUnitArr *msgUnitArr); 00038 static void voidSendXmlBlasterException(CallbackServerUnparsed *cb, void *socketDataHolder, XmlBlasterException *exception); 00039 static void sendXmlBlasterException(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, XmlBlasterException *exception); 00040 static void voidSendResponseOrException(bool success, CallbackServerUnparsed *cb, void *socketDataHolder, MsgUnitArr *msgUnitArrP, XmlBlasterException *exception); 00041 static void sendResponseOrException(bool success, CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, MsgUnitArr *msgUnitArrP, XmlBlasterException *exception); 00042 static void shutdownCallbackServer(CallbackServerUnparsed *cb); 00043 static void closeAcceptSocket(CallbackServerUnparsed *cb); 00044 00045 /* 00046 static void xmlBlasterNumRead_test(void *xb, const size_t currBytesRead, const size_t nbytes) { 00047 printf("xmlBlasterSocket.c: DEUBG ONLY currBytesRead=%ld nbytes=%ld\n", (long)currBytesRead, (long)nbytes); 00048 } 00049 */ 00050 00051 00055 CallbackServerUnparsed *getCallbackServerUnparsed(int argc, const char* const* argv, 00056 UpdateCbFp updateCb, void *updateCbUserData) 00057 { 00058 CallbackServerUnparsed *cb = (CallbackServerUnparsed *)calloc(1, 00059 sizeof(CallbackServerUnparsed)); 00060 if (cb == 0) return cb; 00061 cb->props = createProperties(argc, argv); 00062 if (cb->props == 0) { 00063 freeCallbackServerUnparsed(&cb); 00064 return (CallbackServerUnparsed *)0; 00065 } 00066 cb->stopListenLoop = false; 00067 cb->listenSocket = -1; /* Can be reused from XmlBlasterConnectionUnparsed */ 00068 cb->acceptSocket = -1; /* Can be reused from XmlBlasterConnectionUnparsed */ 00069 cb->socketUdp = -1; /* Can be reused from XmlBlasterConnectionUnparsed */ 00070 cb->useThisSocket = useThisSocket; 00071 cb->runCallbackServer = runCallbackServer; 00072 cb->isListening = isListening; 00073 cb->shutdown = shutdownCallbackServer; 00074 cb->reusingConnectionSocket = false; /* is true if we tunnel callback through the client connection socket */ 00075 cb->logLevel = parseLogLevel(cb->props->getString(cb->props, "logLevel", "WARN")); 00076 cb->log = xmlBlasterDefaultLogging; 00077 cb->logUserP = 0; 00078 cb->hostCB = strcpyAlloc(cb->props->getString(cb->props, "dispatch/callback/plugin/socket/hostname", 0)); 00079 cb->portCB = cb->props->getInt(cb->props, "dispatch/callback/plugin/socket/port", DEFAULT_CALLBACK_SERVER_PORT); 00080 cb->updateCb = updateCb; 00081 cb->updateCbUserData = updateCbUserData; /* A optional pointer from the client code which is returned to the update() function call */ 00082 memset(cb->responseListener, 0, MAX_RESPONSE_LISTENER_SIZE*sizeof(ResponseListener)); 00083 cb->addResponseListener = addResponseListener; 00084 cb->removeResponseListener = removeResponseListener; 00085 cb->isShutdown = false; 00086 cb->sendResponse = voidSendResponse; 00087 cb->sendXmlBlasterException = voidSendXmlBlasterException; 00088 cb->sendResponseOrException = voidSendResponseOrException; 00089 00090 cb->writeToSocket.writeToSocketFuncP = writenPlain; 00091 cb->writeToSocket.userP = cb; 00092 00093 cb->readFromSocket.readFromSocketFuncP = readnPlain; 00094 cb->readFromSocket.userP = cb; 00095 cb->readFromSocket.numReadFuncP = 0; /* xmlBlasterNumRead_test */ 00096 cb->readFromSocket.numReadUserP = 0; 00097 return cb; 00098 } 00099 00100 /* 00101 * @see header 00102 */ 00103 bool useThisSocket(CallbackServerUnparsed *cb, int socketToUse, int socketToUseUdp) 00104 { 00105 struct sockaddr_in localAddr; 00106 socklen_t size = (socklen_t)sizeof(localAddr); 00107 memset((char *)&localAddr, 0, (size_t)size); 00108 if (getsockname(socketToUse, (struct sockaddr *)&localAddr, &size) == -1) { 00109 if (cb->logLevel>=XMLBLASTER_LOG_WARN) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00110 "Can't determine the local socket host and port, errno=%d", errno); 00111 return false; 00112 } 00113 cb->portCB = (int)ntohs(localAddr.sin_port); 00114 strcpyRealloc(&cb->hostCB, inet_ntoa(localAddr.sin_addr)); /* inet_ntoa holds the host in an internal static string */ 00115 00116 cb->listenSocket = socketToUse; 00117 cb->acceptSocket = socketToUse; 00118 cb->socketUdp = socketToUseUdp; 00119 cb->reusingConnectionSocket = true; /* we tunnel callback through the client connection socket */ 00120 00121 if (cb->logLevel>=XMLBLASTER_LOG_INFO) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00122 "Forced callback server to reuse socket descriptor '%d' on localHostname=%s localPort=%d", 00123 socketToUse, cb->hostCB, cb->portCB); 00124 return true; 00125 } 00126 00127 void freeCallbackServerUnparsed(CallbackServerUnparsed **cb_) 00128 { 00129 CallbackServerUnparsed *cb = *cb_; 00130 if (cb != 0) { 00131 shutdownCallbackServer(cb); 00132 freeProperties(cb->props); 00133 free(cb); 00134 *cb_ = 0; 00135 } 00136 } 00137 00141 static ssize_t writenPlain(void *userP, const int fd, const char *ptr, const size_t nbytes) { 00142 if (userP) userP = 0; /* To avoid compiler warning */ 00143 return writen(fd, ptr, nbytes); 00144 } 00145 00149 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) { 00150 if (userP) userP = 0; /* To avoid compiler warning */ 00151 return readn(fd, ptr, nbytes, fpNumRead, userP2); 00152 } 00153 00154 static bool addResponseListener(CallbackServerUnparsed *cb, MsgRequestInfo *msgRequestInfoP, ResponseFp responseEventFp) { 00155 int i; 00156 if (responseEventFp == 0) { 00157 return false; 00158 } 00159 for (i=0; i<MAX_RESPONSE_LISTENER_SIZE; i++) { 00160 if (cb->responseListener[i].msgRequestInfoP == 0) { 00161 cb->responseListener[i].msgRequestInfoP = msgRequestInfoP; 00162 cb->responseListener[i].responseEventFp = responseEventFp; 00163 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00164 "addResponseListener(i=%d, requestId=%s)", i, msgRequestInfoP->requestIdStr); 00165 return true; 00166 } 00167 } 00168 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00169 "PANIC too many requests (%d) are waiting for a response, you are not registered", MAX_RESPONSE_LISTENER_SIZE); 00170 return false; 00171 } 00172 00173 static ResponseListener *getResponseListener(CallbackServerUnparsed *cb, const char *requestId) { 00174 int i; 00175 if (requestId == 0) { 00176 return 0; 00177 } 00178 for (i=0; i<MAX_RESPONSE_LISTENER_SIZE; i++) { 00179 if (cb->responseListener[i].msgRequestInfoP == 0) { 00180 continue; 00181 } 00182 if (!strcmp(cb->responseListener[i].msgRequestInfoP->requestIdStr, requestId)) { 00183 return &cb->responseListener[i]; 00184 } 00185 } 00186 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "RequestId '%s' is not registered", requestId); 00187 return 0; 00188 } 00189 00190 static ResponseListener *removeResponseListener(CallbackServerUnparsed *cb, const char *requestId) { 00191 int i; 00192 for (i=0; i<MAX_RESPONSE_LISTENER_SIZE; i++) { 00193 if (cb->responseListener[i].msgRequestInfoP == 0) { 00194 continue; 00195 } 00196 if (!strcmp(cb->responseListener[i].msgRequestInfoP->requestIdStr, requestId)) { 00197 cb->responseListener[i].msgRequestInfoP = 0; 00198 return &cb->responseListener[i]; 00199 } 00200 } 00201 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Can't remove requestId '%s', requestId is not registered", requestId); 00202 return (ResponseListener *)0; 00203 } 00204 00205 static void handleMessage(CallbackServerUnparsed *cb, SocketDataHolder* socketDataHolder, XmlBlasterException* xmlBlasterException, bool success) { 00206 00207 MsgUnitArr *msgUnitArrP; 00208 00209 if (success == false) { /* EOF */ 00210 int i; 00211 if (!cb->reusingConnectionSocket) 00212 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Lost callback socket connection to xmlBlaster (EOF)"); 00213 closeAcceptSocket(cb); 00214 /* Notify pending requests, otherwise they block in their mutex for a minute ... */ 00215 for (i=0; i<MAX_RESPONSE_LISTENER_SIZE; i++) { 00216 if (cb->responseListener[i].msgRequestInfoP == 0) { 00217 continue; 00218 } 00219 if (true) { /* Handle waiting MSG_TYPE_INVOKE threads (oneways are not in this list) */ 00220 ResponseListener *listener = &cb->responseListener[i]; 00221 MsgRequestInfo *msgRequestInfoP = listener->msgRequestInfoP; 00222 XmlBlasterException exception; 00223 initializeXmlBlasterException(&exception); 00224 00225 cb->responseListener[i].msgRequestInfoP = 0; 00226 00227 /* Simulate an exception on client side ... */ 00228 socketDataHolder->type = (char)MSG_TYPE_EXCEPTION; 00229 strncpy0(socketDataHolder->requestId, msgRequestInfoP->requestIdStr, MAX_REQUESTID_LEN); 00230 strncpy0(socketDataHolder->methodName, msgRequestInfoP->methodName, MAX_METHODNAME_LEN); 00231 00232 exception.remote = true; 00233 strncpy0(exception.errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00234 SNPRINTF(exception.message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00235 "[%.100s:%d] Lost connection to xmlBlaster with server side EOF", __FILE__, __LINE__); 00236 00237 encodeXmlBlasterException(&socketDataHolder->blob, &exception, false); 00238 00239 /* Takes a clone of socketDataHolder->blob */ 00240 listener->responseEventFp(msgRequestInfoP, socketDataHolder); 00241 00242 freeBlobHolderContent(&socketDataHolder->blob); 00243 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00244 "Notified pending requestId '%s' about lost socket connection", socketDataHolder->requestId); 00245 } 00246 } 00247 return; 00248 } 00249 00250 if (*xmlBlasterException->errorCode != 0) { 00251 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00252 "Couldn't read message from xmlBlaster: errorCode=%s message=%s", 00253 xmlBlasterException->errorCode, xmlBlasterException->message); 00254 return; 00255 } 00256 00257 if (cb->reusingConnectionSocket && 00258 (socketDataHolder->type == (char)MSG_TYPE_RESPONSE || socketDataHolder->type == (char)MSG_TYPE_EXCEPTION)) { 00259 ResponseListener *listener = getResponseListener(cb, socketDataHolder->requestId); 00260 if (listener != 0) { 00261 /* This is a response for a request (no callback for us) */ 00262 MsgRequestInfo *msgRequestInfoP = listener->msgRequestInfoP; 00263 removeResponseListener(cb, socketDataHolder->requestId); 00264 listener->responseEventFp(msgRequestInfoP, socketDataHolder); 00265 freeBlobHolderContent(&socketDataHolder->blob); 00266 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00267 "Forwarded message with requestId '%s' to response listener", socketDataHolder->requestId); 00268 return; 00269 } 00270 else { 00271 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00272 "PANIC: Did not expect an INVOCATION '%c'='%d' as a callback", 00273 socketDataHolder->type, (int)socketDataHolder->type); 00274 freeBlobHolderContent(&socketDataHolder->blob); 00275 return; 00276 } 00277 } 00278 00279 msgUnitArrP = parseMsgUnitArr(socketDataHolder->blob.dataLen, socketDataHolder->blob.data); 00280 freeBlobHolderContent(&(socketDataHolder->blob)); 00281 00282 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00283 "Received requestId '%s' callback %s()", 00284 socketDataHolder->requestId, socketDataHolder->methodName); 00285 00286 if (strcmp(socketDataHolder->methodName, XMLBLASTER_PING) == 0) { 00287 size_t i; 00288 for (i=0; i<msgUnitArrP->len; i++) { 00289 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos/>"); 00290 } 00291 sendResponse(cb, socketDataHolder, msgUnitArrP); 00292 freeMsgUnitArr(msgUnitArrP); 00293 } 00294 else if (strcmp(socketDataHolder->methodName, XMLBLASTER_UPDATE) == 0 || 00295 strcmp(socketDataHolder->methodName, XMLBLASTER_UPDATE_ONEWAY) == 0) { 00296 if (cb->updateCb != 0) { /* Client has registered to receive callback messages? */ 00297 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00298 "Calling client %s() for requestId '%s' ...", 00299 socketDataHolder->methodName, socketDataHolder->requestId); 00300 00301 strncpy0(msgUnitArrP->secretSessionId, socketDataHolder->secretSessionId, MAX_SESSIONID_LEN); 00302 msgUnitArrP->isOneway = (strcmp(socketDataHolder->methodName, XMLBLASTER_UPDATE_ONEWAY) == 0); 00303 cb->updateCb(msgUnitArrP, cb, xmlBlasterException, socketDataHolder); 00304 } 00305 else { /* Unexpected update arrived, the client was not interested, see similar behavior in XmlBlasterAccess.java:update() */ 00306 size_t i; 00307 for (i=0; i<msgUnitArrP->len; i++) { 00308 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>"); 00309 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00310 "Ignoring unexpected %s() message as client has not registered a callback, requestId is '%s' ...", 00311 socketDataHolder->methodName, socketDataHolder->requestId); 00312 } 00313 sendResponseOrException(true, cb, socketDataHolder, msgUnitArrP, xmlBlasterException); 00314 } 00315 } 00316 else { 00317 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00318 "Received unknown callback methodName=%s", socketDataHolder->methodName); 00319 } 00320 00321 } 00322 00323 00327 static int listenLoop(ListenLoopArgs* ls) 00328 { 00329 int rc; 00330 CallbackServerUnparsed *cb = ls->cb; 00331 bool udp = ls->udp; 00332 XmlBlasterException xmlBlasterException; 00333 SocketDataHolder socketDataHolder; 00334 bool success; 00335 bool useUdpForOneway = cb->socketUdp != -1; 00336 00337 for(;;) { 00338 memset(&xmlBlasterException, 0, sizeof(XmlBlasterException)); 00339 /* Here we block until a message arrives, see parseSocketData() */ 00340 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00341 "Going to block on socket read until a new message arrives ..."); 00342 if (cb->stopListenLoop) break; 00343 success = readMessage(cb, &socketDataHolder, &xmlBlasterException, udp); 00344 if (cb->stopListenLoop) break; 00345 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "%s arrived, success=%s", udp ? "UDP" : "TCP", success ? "true" : "false -> EOF"); 00346 00347 if (useUdpForOneway) { 00348 rc = pthread_mutex_lock(&cb->listenMutex); 00349 if (rc != 0) /* EINVAL */ 00350 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock() returned %d.", rc); 00351 } 00352 00353 handleMessage(cb, &socketDataHolder, &xmlBlasterException, success); 00354 00355 if (useUdpForOneway) { 00356 rc = pthread_mutex_unlock(&cb->listenMutex); 00357 if (rc != 0) /* EPERM */ 00358 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock() returned %d.", rc); 00359 } 00360 00361 if (cb->stopListenLoop || !success) 00362 break; 00363 } 00364 /*pthread_exit(NULL);*/ 00365 return 0; 00366 } 00367 00368 00377 static int runCallbackServer(CallbackServerUnparsed *cb) 00378 { 00379 int rc; 00380 int retVal = 0; 00381 ListenLoopArgs* tcpLoop = 0; 00382 ListenLoopArgs* udpLoop = 0; 00383 00384 bool useUdpForOneway = cb->socketUdp != -1; 00385 00386 cb->isShutdown = false; 00387 00388 if (cb->listenSocket == -1) { 00389 if (createCallbackServer(cb) == false) 00390 return 1; 00391 } 00392 else { 00393 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00394 "Reusing connection socket to tunnel callback messages"); 00395 } 00396 00397 if (useUdpForOneway) { 00398 /* We need to create two threads: one for TCP and one for the UDP callback listener */ 00399 pthread_t tcpListenThread, udpListenThread; 00400 00401 rc = pthread_mutex_init(&cb->listenMutex, NULL); /* rc is always 0 */ 00402 00403 tcpLoop = (ListenLoopArgs*)malloc(sizeof(ListenLoopArgs)); tcpLoop->cb = cb; tcpLoop->udp = false; 00404 rc = pthread_create(&tcpListenThread, NULL, (void * (*)(void *))listenLoop, tcpLoop); 00405 00406 if (useUdpForOneway) { 00407 udpLoop = (ListenLoopArgs*)malloc(sizeof(ListenLoopArgs)); udpLoop->cb = cb; udpLoop->udp = true; 00408 rc = pthread_create(&udpListenThread, NULL, (void * (*)(void *))listenLoop, udpLoop); 00409 } 00410 00411 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00412 "Waiting to join tcpListenThread ..."); 00413 pthread_join(tcpListenThread, NULL); 00414 free(tcpLoop); 00415 if (useUdpForOneway) { 00416 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00417 "Waiting to join udpListenThread ..."); 00418 pthread_join(udpListenThread, NULL); 00419 free(udpLoop); 00420 } 00421 rc = pthread_mutex_destroy(&cb->listenMutex); 00422 if (rc != 0) /* EBUSY */ 00423 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() returned %d, we ignore it", rc); 00424 } 00425 else { 00426 /* TCP only: no separate thread is needed */ 00427 tcpLoop = (ListenLoopArgs*)malloc(sizeof(ListenLoopArgs)); tcpLoop->cb = cb; tcpLoop->udp = false; 00428 retVal = listenLoop(tcpLoop); 00429 free(tcpLoop); 00430 } 00431 00432 cb->isShutdown = true; 00433 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00434 "Callbackserver thread is dying now ..."); 00435 return retVal; 00436 } 00437 00443 static bool createCallbackServer(CallbackServerUnparsed *cb) 00444 { 00445 socklen_t cli_len; 00446 struct hostent hostbuf, *hostP = NULL; 00447 struct sockaddr_in serv_addr, cli_addr; 00448 char *tmphstbuf=NULL; 00449 size_t hstbuflen=0; 00450 char serverHostName[256]; 00451 char errP[MAX_ERRNO_LEN]; 00452 if (cb->hostCB == 0) { 00453 strcpyRealloc(&cb->hostCB, "localhost"); 00454 if (gethostname(serverHostName, 125) == 0) 00455 strcpyRealloc(&cb->hostCB, serverHostName); 00456 } 00457 00458 if (cb->logLevel>=XMLBLASTER_LOG_INFO) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00459 "Starting callback server -dispatch/callback/plugin/socket/hostname %s -dispatch/callback/plugin/socket/port %d ...", 00460 cb->hostCB, cb->portCB); 00461 00462 /* 00463 * Get a socket to work with. 00464 */ 00465 if ((cb->listenSocket = (int)socket(AF_INET, SOCK_STREAM, 0)) < 0) { 00466 if (cb->logLevel>=XMLBLASTER_LOG_WARN) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00467 "Failed creating socket for callback server -dispatch/callback/plugin/socket/hostname %s -dispatch/callback/plugin/socket/port %d", 00468 cb->hostCB, cb->portCB); 00469 cb->isShutdown = true; 00470 return false; 00471 } 00472 00473 /* 00474 * Create the address we will be binding to. 00475 */ 00476 serv_addr.sin_family = AF_INET; 00477 *errP = 0; 00478 hostP = gethostbyname_re(cb->hostCB, &hostbuf, &tmphstbuf, &hstbuflen, errP); 00479 00480 if (*errP != 0) { 00481 char message[EXCEPTIONSTRUCT_MESSAGE_LEN]; 00482 SNPRINTF(message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00483 "[%.100s:%d] Lookup xmlBlaster failed, %s", 00484 __FILE__, __LINE__, errP); 00485 cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, message); 00486 *errP = 0; 00487 } 00488 00489 if (hostP != NULL) { 00490 serv_addr.sin_addr.s_addr = ((struct in_addr *)(hostP->h_addr))->s_addr; /*inet_addr("192.168.1.2"); */ 00491 free(tmphstbuf); 00492 } 00493 else 00494 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); 00495 serv_addr.sin_port = htons((u_short)cb->portCB); 00496 00497 if (bind(cb->listenSocket, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { 00498 if (cb->logLevel>=XMLBLASTER_LOG_WARN) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00499 "Failed binding port for callback server -dispatch/callback/plugin/socket/hostname %s -dispatch/callback/plugin/socket/port %d", 00500 cb->hostCB, cb->portCB); 00501 cb->isShutdown = true; 00502 return false; 00503 } 00504 00505 /* 00506 * Listen on the socket. 00507 */ 00508 if (listen(cb->listenSocket, 5) < 0) { 00509 if (cb->logLevel>=XMLBLASTER_LOG_WARN) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00510 "Failed creating listener for callback server -dispatch/callback/plugin/socket/hostname %s -dispatch/callback/plugin/socket/port %d", 00511 cb->hostCB, cb->portCB); 00512 cb->isShutdown = true; 00513 return false; 00514 } 00515 00516 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00517 "[CallbackServerUnparsed] Waiting for xmlBlaster to connect ..."); 00518 00519 /* 00520 * Accept connections. When we accept one, ns 00521 * will be connected to the client. cli_addr will 00522 * contain the address of the client. 00523 */ 00524 cli_len = (socklen_t)sizeof(cli_addr); 00525 if ((cb->acceptSocket = (int)accept(cb->listenSocket, (struct sockaddr *)&cli_addr, &cli_len)) < 0) { 00526 if (cb->logLevel>=XMLBLASTER_LOG_ERROR) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00527 "[CallbackServerUnparsed] accept failed"); 00528 cb->isShutdown = true; 00529 return false; 00530 } 00531 if (cb->logLevel>=XMLBLASTER_LOG_INFO) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00532 "[CallbackServerUnparsed] XmlBlaster connected from %s:%hd", 00533 inet_ntoa(cli_addr.sin_addr), cli_addr.sin_port); 00534 return true; 00535 } 00536 00537 static bool isListening(CallbackServerUnparsed *cb) 00538 { 00539 if (cb->listenSocket > -1) { 00540 return true; 00541 } 00542 return false; 00543 } 00544 00559 static bool readMessage(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, XmlBlasterException *exception, bool udp) 00560 { 00561 return parseSocketData(udp ? cb->socketUdp : cb->acceptSocket, &cb->readFromSocket, socketDataHolder, 00562 exception, &cb->stopListenLoop, udp, cb->logLevel >= XMLBLASTER_LOG_DUMP); 00563 } 00564 00566 static void voidSendResponse(CallbackServerUnparsed *cb, void *socketDataHolder, MsgUnitArr *msgUnitArrP) 00567 { 00568 sendResponse(cb, (SocketDataHolder *)socketDataHolder, msgUnitArrP); 00569 } 00570 00571 static void sendResponse(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, MsgUnitArr *msgUnitArrP) 00572 { 00573 char *rawMsg; 00574 size_t rawMsgLen; 00575 size_t dataLen = 0; 00576 char *data = 0; 00577 size_t i; 00578 MsgUnit msgUnit; /* we (mis)use MsgUnit for simple transformation of the exception into a raw blob */ 00579 bool allocated = false; 00580 memset(&msgUnit, 0, sizeof(MsgUnit)); 00581 00582 for (i=0; i<msgUnitArrP->len; i++) { 00583 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00584 "Returning the UpdateReturnQos '%s' to the server.", 00585 msgUnitArrP->msgUnitArr[i].responseQos); 00586 00587 if (msgUnitArrP->msgUnitArr[i].responseQos != 0) { 00588 msgUnit.qos = msgUnitArrP->msgUnitArr[i].responseQos; 00589 } 00590 else { 00591 msgUnit.qos = strcpyAlloc("<qos/>"); 00592 allocated = true; 00593 } 00594 00595 if (data == 0) { 00596 BlobHolder blob = encodeMsgUnit(&msgUnit, cb->logLevel >= XMLBLASTER_LOG_DUMP); 00597 data = blob.data; 00598 dataLen = blob.dataLen; 00599 } 00600 else { 00601 BlobHolder blob = encodeMsgUnit(&msgUnit, cb->logLevel >= XMLBLASTER_LOG_DUMP); 00602 data = (char *)realloc(data, dataLen+blob.dataLen); 00603 memcpy(data+dataLen, blob.data, blob.dataLen); 00604 dataLen += blob.dataLen; 00605 free(blob.data); 00606 } 00607 } 00608 00609 rawMsg = encodeSocketMessage(MSG_TYPE_RESPONSE, socketDataHolder->requestId, 00610 socketDataHolder->methodName, socketDataHolder->secretSessionId, 00611 data, dataLen, cb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen); 00612 free(data); 00613 00614 /*ssize_t numSent =*/(void) cb->writeToSocket.writeToSocketFuncP(cb->updateCbUserData, cb->acceptSocket, rawMsg, (int)rawMsgLen); 00615 00616 free(rawMsg); 00617 00618 if (allocated) free((char *)msgUnit.qos); 00619 } 00620 00621 static void voidSendXmlBlasterException(CallbackServerUnparsed *cb, void *socketDataHolder, XmlBlasterException *exception) 00622 { 00623 sendXmlBlasterException(cb, (SocketDataHolder *)socketDataHolder, exception); 00624 } 00625 00626 static void sendXmlBlasterException(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, XmlBlasterException *exception) 00627 { 00628 size_t currpos = 0; 00629 char *rawMsg; 00630 size_t rawMsgLen; 00631 BlobHolder blob; 00632 MsgUnit msgUnit; /* we (mis)use MsgUnit for simple transformation of the exception into a raw blob */ 00633 memset(&msgUnit, 0, sizeof(MsgUnit)); 00634 00635 msgUnit.qos = exception->errorCode; 00636 00637 /* see XmlBlasterException.toByteArr() and parseByteArr() */ 00638 msgUnit.contentLen = strlen(exception->errorCode) + strlen(exception->message) + 11; 00639 msgUnit.content = (char *)calloc(msgUnit.contentLen, sizeof(char)); 00640 00641 memcpy((char *)msgUnit.content, exception->errorCode, strlen(exception->errorCode)); 00642 currpos = strlen(exception->errorCode) + 4; 00643 00644 memcpy((char *)msgUnit.content+currpos, exception->message, strlen(exception->message)); 00645 00646 blob = encodeMsgUnit(&msgUnit, cb->logLevel >= XMLBLASTER_LOG_DUMP); 00647 00648 rawMsg = encodeSocketMessage(MSG_TYPE_EXCEPTION, socketDataHolder->requestId, 00649 socketDataHolder->methodName, socketDataHolder->secretSessionId, 00650 blob.data, blob.dataLen, cb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen); 00651 free(blob.data); 00652 free((char *)msgUnit.content); 00653 00654 /*ssize_t numSent =*/(void) cb->writeToSocket.writeToSocketFuncP(cb->updateCbUserData, cb->acceptSocket, rawMsg, (int)rawMsgLen); 00655 00656 free(rawMsg); 00657 } 00658 00663 static void voidSendResponseOrException(bool success, CallbackServerUnparsed *cb, void *socketDataHolder, MsgUnitArr *msgUnitArrP, XmlBlasterException *exception) 00664 { 00665 sendResponseOrException(success, cb, (SocketDataHolder *)socketDataHolder, msgUnitArrP, exception); 00666 } 00667 00672 static void sendResponseOrException(bool success, CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, MsgUnitArr *msgUnitArrP, XmlBlasterException *exception) 00673 { 00674 if (! (strcmp(socketDataHolder->methodName, XMLBLASTER_UPDATE_ONEWAY) == 0)) { 00675 if (success == true) { 00676 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00677 "update(): Sending response for requestId '%s'", socketDataHolder->requestId); 00678 sendResponse(cb, socketDataHolder, msgUnitArrP); 00679 } 00680 else { 00681 if (*(exception->errorCode) == 0) { 00682 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00683 "update(): We don't return anything for requestId '%s', the return message will come later by the client update dispatcher thread", socketDataHolder->requestId); 00684 } 00685 else { 00686 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00687 "update(): Throwing the XmlBlasterException '%s' back to the server:\n%s", 00688 exception->errorCode, exception->message); 00689 sendXmlBlasterException(cb, socketDataHolder, exception); 00690 } 00691 } 00692 } 00693 00694 freeMsgUnitArr(msgUnitArrP); 00695 } 00696 00700 static void closeAcceptSocket(CallbackServerUnparsed *cb) 00701 { 00702 /* We close even if cb->reusingConnectionSocket is set 00703 to react instantly on EOF from server side. 00704 Otherwise the client thread would block until socket response timeout happens (one minute) 00705 */ 00706 if (cb->acceptSocket != -1) { 00707 closeSocket(cb->acceptSocket); 00708 cb->acceptSocket = -1; 00709 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00710 "Closed accept socket"); 00711 } 00712 } 00713 00717 static void shutdownCallbackServer(CallbackServerUnparsed *cb) 00718 { 00719 if (cb == 0) return; 00720 00721 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00722 "Shutdown callback server stopListenLoop=%s (changes now to true), reusingConnectionSocket=%s", (cb->stopListenLoop?"true":"false"), (cb->reusingConnectionSocket?"true":"false")); 00723 00724 cb->stopListenLoop = true; 00725 00726 if (cb->hostCB != 0) { 00727 free(cb->hostCB); 00728 cb->hostCB = 0; 00729 } 00730 00731 if (cb->reusingConnectionSocket) { 00732 return; /* not our duty, we only have borrowed the socket from the client side connection */ 00733 } 00734 00735 00736 closeAcceptSocket(cb); 00737 00738 if (isListening(cb)) { 00739 closeSocket(cb->listenSocket); 00740 cb->listenSocket = -1; 00741 if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00742 "Closed listener socket"); 00743 } 00744 00745 cb->readFromSocket.numReadFuncP = 0; 00746 /* 00747 for(i=0; i<10; i++) { 00748 if (cb->isShutdown) { 00749 return; 00750 } 00751 if (cb->debug) printf("[CallbackServerUnparsed] Waiting for thread to die ..."); 00752 sleepMillis(1000); 00753 } 00754 printf("[CallbackServerUnparsed] WARNING: Thread has not died after 10 sec"); 00755 */ 00756 } 00757 00758 const char *callbackServerRawUsage() 00759 { 00760 return 00761 "\n -dispatch/callback/plugin/socket/hostname [localhost]" 00762 "\n The IP where to establish the callback server." 00763 "\n Can be useful on multi homed hosts." 00764 "\n -dispatch/callback/plugin/socket/port [7611]" 00765 "\n The port of the callback server."; 00766 } 00767 00768 #ifdef USE_MAIN_CB 00769 00772 bool myUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, SocketDataHandler socketDataHandler) 00773 { 00774 size_t i; 00775 bool testException = false; 00776 for (i=0; i<msgUnitArr->len; i++) { 00777 char *xml = messageUnitToXml(&msgUnitArr->msgUnitArr[i]); 00778 printf("client.update(): Asynchronous message update arrived:%s\n", xml); 00779 free(xml); 00780 msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc("<qos></qos>"); /* Return QoS: Everything is OK */ 00781 } 00782 if (testException) { 00783 strncpy0(xmlBlasterException->errorCode, "user.notWanted", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00784 strncpy0(xmlBlasterException->message, "I don't want these messages", XMLBLASTEREXCEPTION_MESSAGE_LEN); 00785 return false; 00786 } 00787 return true; 00788 } 00789 00793 int main(int argc, char** argv) 00794 { 00795 int iarg; 00796 CallbackServerUnparsed *cb; 00797 00798 for (iarg=0; iarg < argc; iarg++) { 00799 if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) { 00800 const char *pp = 00801 "\n -logLevel ERROR | WARN | INFO | TRACE [WARN]" 00802 "\n\nExample:" 00803 "\n CallbackServerUnparsed -logLevel TRACE -dispatch/callback/plugin/socket/hostname server.mars.universe"; 00804 printf("Usage:\n%s%s\n", callbackServerRawUsage(), pp); 00805 exit(1); 00806 } 00807 } 00808 00809 cb = getCallbackServerUnparsed(argc, argv, myUpdate, 0); 00810 printf("[main] Created CallbackServerUnparsed instance, creating listener on socket://%s:%d...\n", cb->hostCB, cb->portCB); 00811 cb->runCallbackServer(cb); /* blocks on socket listener */ 00812 00813 /* This code is reached only on socket EOF */ 00814 00815 printf("[main] Socket listener is shutdown\n"); 00816 freeCallbackServerUnparsed(&cb); 00817 return 0; 00818 } 00819 #endif /* USE_MAIN_CB */