socket/CallbackServerUnparsed.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      CallbackServerUnparsed.c
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Establish a listen socket for xmlBlaster callbacks
00006 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
00007 Compile:
00008   LINUX:   gcc -g -Wall -DUSE_MAIN_CB -I.. -o CallbackServerUnparsed CallbackServerUnparsed.c xmlBlasterSocket.c ../util/msgUtil.c ../util/Properties.c
00009   WIN:     cl /MT -DUSE_MAIN_CB -D_WINDOWS -I.. CallbackServerUnparsed.c xmlBlasterSocket.c ../util/msgUtil.c ../util/Properties.c ws2_32.lib
00010   Solaris: cc -g -DUSE_MAIN_CB -I.. -o CallbackServerUnparsed CallbackServerUnparsed.c xmlBlasterSocket.c ../util/msgUtil.c ../util/Properties.c -lsocket -lnsl
00011 -----------------------------------------------------------------------------*/
00012 #include <stdio.h>
00013 #include <string.h>
00014 #if defined(WINCE)
00015 #  if defined(XB_USE_PTHREADS)
00016 #     include <pthreads/pthread.h>
00017 #  else
00018       /*#include <pthreads/need_errno.h> */
00019       static int errno=0; /* single threaded workaround*/
00020 #  endif
00021 #else
00022 #  include <errno.h>
00023 #endif
00024 #include <socket/xmlBlasterSocket.h> /* gethostname() */
00025 #include <CallbackServerUnparsed.h>
00026 
00027 static bool useThisSocket(CallbackServerUnparsed *cb, int socketToUse, int socketToUseUdp);
00028 static int runCallbackServer(CallbackServerUnparsed *cb);
00029 static bool createCallbackServer(CallbackServerUnparsed *cb);
00030 static bool isListening(CallbackServerUnparsed *cb);
00031 static bool readMessage(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, XmlBlasterException *exception, bool udp);
00032 static ssize_t writenPlain(void *cb, const int fd, const char *ptr, const size_t nbytes);
00033 static ssize_t readnPlain(void *cb, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2);
00034 static bool addResponseListener(CallbackServerUnparsed *cb, MsgRequestInfo *msgRequestInfoP, ResponseFp responseEventFp);
00035 static ResponseListener *removeResponseListener(CallbackServerUnparsed *cb, const char *requestId);
00036 static void voidSendResponse(CallbackServerUnparsed *cb, void *socketDataHolder, MsgUnitArr *msgUnitArr);
00037 static void sendResponse(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, MsgUnitArr *msgUnitArr);
00038 static void voidSendXmlBlasterException(CallbackServerUnparsed *cb, void *socketDataHolder, XmlBlasterException *exception);
00039 static void sendXmlBlasterException(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, XmlBlasterException *exception);
00040 static void voidSendResponseOrException(bool success, CallbackServerUnparsed *cb, void *socketDataHolder, MsgUnitArr *msgUnitArrP, XmlBlasterException *exception);
00041 static void sendResponseOrException(bool success, CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, MsgUnitArr *msgUnitArrP, XmlBlasterException *exception);
00042 static void shutdownCallbackServer(CallbackServerUnparsed *cb);
00043 static void closeAcceptSocket(CallbackServerUnparsed *cb);
00044 
00045 /*
00046 static void xmlBlasterNumRead_test(void *xb, const size_t currBytesRead, const size_t nbytes) {
00047    printf("xmlBlasterSocket.c: DEUBG ONLY currBytesRead=%ld nbytes=%ld\n", (long)currBytesRead, (long)nbytes);
00048 }
00049 */
00050 
00051 
00055 CallbackServerUnparsed *getCallbackServerUnparsed(int argc, const char* const* argv,
00056                         UpdateCbFp updateCb, void *updateCbUserData)
00057 {
00058    CallbackServerUnparsed *cb = (CallbackServerUnparsed *)calloc(1,
00059                                 sizeof(CallbackServerUnparsed));
00060    if (cb == 0) return cb;
00061    cb->props = createProperties(argc, argv);
00062    if (cb->props == 0) {
00063       freeCallbackServerUnparsed(&cb);
00064       return (CallbackServerUnparsed *)0;
00065    }
00066    cb->stopListenLoop = false;
00067    cb->listenSocket = -1; /* Can be reused from XmlBlasterConnectionUnparsed */
00068    cb->acceptSocket = -1; /* Can be reused from XmlBlasterConnectionUnparsed */
00069    cb->socketUdp = -1; /* Can be reused from XmlBlasterConnectionUnparsed */
00070    cb->useThisSocket = useThisSocket;
00071    cb->runCallbackServer = runCallbackServer;
00072    cb->isListening = isListening;
00073    cb->shutdown = shutdownCallbackServer;
00074    cb->reusingConnectionSocket = false; /* is true if we tunnel callback through the client connection socket */
00075    cb->logLevel = parseLogLevel(cb->props->getString(cb->props, "logLevel", "WARN"));
00076    cb->log = xmlBlasterDefaultLogging;
00077    cb->logUserP = 0;
00078    cb->hostCB = strcpyAlloc(cb->props->getString(cb->props, "dispatch/callback/plugin/socket/hostname", 0));
00079    cb->portCB = cb->props->getInt(cb->props, "dispatch/callback/plugin/socket/port", DEFAULT_CALLBACK_SERVER_PORT);
00080    cb->updateCb = updateCb;
00081    cb->updateCbUserData = updateCbUserData; /* A optional pointer from the client code which is returned to the update() function call */
00082    memset(cb->responseListener, 0, MAX_RESPONSE_LISTENER_SIZE*sizeof(ResponseListener));
00083    cb->addResponseListener = addResponseListener;
00084    cb->removeResponseListener = removeResponseListener;
00085    cb->isShutdown = false;
00086    cb->sendResponse = voidSendResponse;
00087    cb->sendXmlBlasterException = voidSendXmlBlasterException;
00088    cb->sendResponseOrException = voidSendResponseOrException;
00089 
00090    cb->writeToSocket.writeToSocketFuncP = writenPlain;
00091    cb->writeToSocket.userP = cb;
00092 
00093    cb->readFromSocket.readFromSocketFuncP = readnPlain;
00094    cb->readFromSocket.userP = cb;
00095    cb->readFromSocket.numReadFuncP = 0; /* xmlBlasterNumRead_test */
00096    cb->readFromSocket.numReadUserP = 0;
00097    return cb;
00098 }
00099 
00100 /*
00101  * @see header
00102  */
00103 bool useThisSocket(CallbackServerUnparsed *cb, int socketToUse, int socketToUseUdp)
00104 {
00105    struct sockaddr_in localAddr;
00106    socklen_t size = (socklen_t)sizeof(localAddr);
00107    memset((char *)&localAddr, 0, (size_t)size);
00108    if (getsockname(socketToUse, (struct sockaddr *)&localAddr, &size) == -1) {
00109       if (cb->logLevel>=XMLBLASTER_LOG_WARN) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00110          "Can't determine the local socket host and port, errno=%d", errno);
00111       return false;
00112    }
00113    cb->portCB = (int)ntohs(localAddr.sin_port);
00114    strcpyRealloc(&cb->hostCB, inet_ntoa(localAddr.sin_addr)); /* inet_ntoa holds the host in an internal static string */
00115 
00116    cb->listenSocket = socketToUse;
00117    cb->acceptSocket = socketToUse;
00118    cb->socketUdp = socketToUseUdp;
00119    cb->reusingConnectionSocket = true; /* we tunnel callback through the client connection socket */
00120 
00121    if (cb->logLevel>=XMLBLASTER_LOG_INFO) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00122       "Forced callback server to reuse socket descriptor '%d' on localHostname=%s localPort=%d",
00123                          socketToUse, cb->hostCB, cb->portCB);
00124    return true;
00125 }
00126 
00127 void freeCallbackServerUnparsed(CallbackServerUnparsed **cb_)
00128 {
00129    CallbackServerUnparsed *cb = *cb_;
00130    if (cb != 0) {
00131       shutdownCallbackServer(cb);
00132       freeProperties(cb->props);
00133       free(cb);
00134       *cb_ = 0;
00135    }
00136 }
00137 
00141 static ssize_t writenPlain(void *userP, const int fd, const char *ptr, const size_t nbytes) {
00142    if (userP) userP = 0; /* To avoid compiler warning */
00143    return writen(fd, ptr, nbytes);
00144 }
00145 
00149 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
00150    if (userP) userP = 0; /* To avoid compiler warning */
00151    return readn(fd, ptr, nbytes, fpNumRead, userP2);
00152 }
00153 
00154 static bool addResponseListener(CallbackServerUnparsed *cb, MsgRequestInfo *msgRequestInfoP, ResponseFp responseEventFp) {
00155    int i;
00156    if (responseEventFp == 0) {
00157       return false;
00158    }
00159    for (i=0; i<MAX_RESPONSE_LISTENER_SIZE; i++) {
00160       if (cb->responseListener[i].msgRequestInfoP == 0) {
00161          cb->responseListener[i].msgRequestInfoP = msgRequestInfoP;
00162          cb->responseListener[i].responseEventFp = responseEventFp;
00163          if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00164             "addResponseListener(i=%d, requestId=%s)", i, msgRequestInfoP->requestIdStr);
00165          return true;
00166       }
00167    }
00168    cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00169       "PANIC too many requests (%d) are waiting for a response, you are not registered", MAX_RESPONSE_LISTENER_SIZE);
00170    return false;
00171 }
00172 
00173 static ResponseListener *getResponseListener(CallbackServerUnparsed *cb, const char *requestId) {
00174    int i;
00175    if (requestId == 0) {
00176       return 0;
00177    }
00178    for (i=0; i<MAX_RESPONSE_LISTENER_SIZE; i++) {
00179       if (cb->responseListener[i].msgRequestInfoP == 0) {
00180          continue;
00181       }
00182       if (!strcmp(cb->responseListener[i].msgRequestInfoP->requestIdStr, requestId)) {
00183          return &cb->responseListener[i];
00184       }
00185    }
00186    cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "RequestId '%s' is not registered", requestId);
00187    return 0;
00188 }
00189 
00190 static ResponseListener *removeResponseListener(CallbackServerUnparsed *cb, const char *requestId) {
00191    int i;
00192    for (i=0; i<MAX_RESPONSE_LISTENER_SIZE; i++) {
00193       if (cb->responseListener[i].msgRequestInfoP == 0) {
00194          continue;
00195       }
00196       if (!strcmp(cb->responseListener[i].msgRequestInfoP->requestIdStr, requestId)) {
00197          cb->responseListener[i].msgRequestInfoP = 0;
00198          return &cb->responseListener[i];
00199       }
00200    }
00201    cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Can't remove requestId '%s', requestId is not registered", requestId);
00202    return (ResponseListener *)0;
00203 }
00204 
00205 static void handleMessage(CallbackServerUnparsed *cb, SocketDataHolder* socketDataHolder, XmlBlasterException* xmlBlasterException, bool success) {
00206 
00207    MsgUnitArr *msgUnitArrP;
00208 
00209    if (success == false) { /* EOF */
00210       int i;
00211       if (!cb->reusingConnectionSocket)
00212          cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Lost callback socket connection to xmlBlaster (EOF)");
00213       closeAcceptSocket(cb);
00214       /* Notify pending requests, otherwise they block in their mutex for a minute ... */
00215       for (i=0; i<MAX_RESPONSE_LISTENER_SIZE; i++) {
00216          if (cb->responseListener[i].msgRequestInfoP == 0) {
00217             continue;
00218          }
00219          if (true) {  /* Handle waiting MSG_TYPE_INVOKE threads (oneways are not in this list) */
00220             ResponseListener *listener = &cb->responseListener[i];
00221             MsgRequestInfo *msgRequestInfoP = listener->msgRequestInfoP;
00222             XmlBlasterException exception;
00223             initializeXmlBlasterException(&exception);
00224             
00225             cb->responseListener[i].msgRequestInfoP = 0;
00226 
00227             /* Simulate an exception on client side ... */
00228             socketDataHolder->type = (char)MSG_TYPE_EXCEPTION;
00229             strncpy0(socketDataHolder->requestId, msgRequestInfoP->requestIdStr, MAX_REQUESTID_LEN);
00230             strncpy0(socketDataHolder->methodName, msgRequestInfoP->methodName, MAX_METHODNAME_LEN);
00231 
00232             exception.remote = true;
00233             strncpy0(exception.errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00234             SNPRINTF(exception.message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00235                   "[%.100s:%d] Lost connection to xmlBlaster with server side EOF", __FILE__, __LINE__);
00236 
00237             encodeXmlBlasterException(&socketDataHolder->blob, &exception, false); 
00238             
00239             /* Takes a clone of socketDataHolder->blob */
00240             listener->responseEventFp(msgRequestInfoP, socketDataHolder);
00241             
00242             freeBlobHolderContent(&socketDataHolder->blob);
00243             if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00244                "Notified pending requestId '%s' about lost socket connection", socketDataHolder->requestId);
00245          }
00246       }
00247       return;
00248    }
00249 
00250    if (*xmlBlasterException->errorCode != 0) {
00251       cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00252          "Couldn't read message from xmlBlaster: errorCode=%s message=%s",
00253                   xmlBlasterException->errorCode, xmlBlasterException->message);
00254       return;
00255    }
00256 
00257    if (cb->reusingConnectionSocket &&
00258          (socketDataHolder->type == (char)MSG_TYPE_RESPONSE || socketDataHolder->type == (char)MSG_TYPE_EXCEPTION)) {
00259       ResponseListener *listener = getResponseListener(cb, socketDataHolder->requestId);
00260       if (listener != 0) {
00261          /* This is a response for a request (no callback for us) */
00262          MsgRequestInfo *msgRequestInfoP = listener->msgRequestInfoP;
00263          removeResponseListener(cb, socketDataHolder->requestId);
00264          listener->responseEventFp(msgRequestInfoP, socketDataHolder);
00265          freeBlobHolderContent(&socketDataHolder->blob);
00266          if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00267             "Forwarded message with requestId '%s' to response listener", socketDataHolder->requestId);
00268          return;
00269       }
00270       else {
00271          cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00272             "PANIC: Did not expect an INVOCATION '%c'='%d' as a callback",
00273                   socketDataHolder->type, (int)socketDataHolder->type);
00274          freeBlobHolderContent(&socketDataHolder->blob);
00275          return;
00276       }
00277    }
00278 
00279    msgUnitArrP = parseMsgUnitArr(socketDataHolder->blob.dataLen, socketDataHolder->blob.data);
00280    freeBlobHolderContent(&(socketDataHolder->blob));
00281 
00282    if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00283       "Received requestId '%s' callback %s()",
00284       socketDataHolder->requestId, socketDataHolder->methodName);
00285 
00286    if (strcmp(socketDataHolder->methodName, XMLBLASTER_PING) == 0) {
00287       size_t i;
00288       for (i=0; i<msgUnitArrP->len; i++) {
00289          msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos/>");
00290       }
00291       sendResponse(cb, socketDataHolder, msgUnitArrP);
00292       freeMsgUnitArr(msgUnitArrP);
00293    }
00294    else if (strcmp(socketDataHolder->methodName, XMLBLASTER_UPDATE) == 0 ||
00295             strcmp(socketDataHolder->methodName, XMLBLASTER_UPDATE_ONEWAY) == 0) {
00296       if (cb->updateCb != 0) { /* Client has registered to receive callback messages? */
00297          if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00298             "Calling client %s() for requestId '%s' ...",
00299             socketDataHolder->methodName, socketDataHolder->requestId);
00300          
00301          strncpy0(msgUnitArrP->secretSessionId, socketDataHolder->secretSessionId, MAX_SESSIONID_LEN);
00302          msgUnitArrP->isOneway = (strcmp(socketDataHolder->methodName, XMLBLASTER_UPDATE_ONEWAY) == 0);
00303          cb->updateCb(msgUnitArrP, cb, xmlBlasterException, socketDataHolder);
00304       }
00305       else { /* Unexpected update arrived, the client was not interested, see similar behavior in XmlBlasterAccess.java:update() */
00306          size_t i;
00307          for (i=0; i<msgUnitArrP->len; i++) {
00308             msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
00309             cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00310                "Ignoring unexpected %s() message as client has not registered a callback, requestId is '%s' ...",
00311                socketDataHolder->methodName, socketDataHolder->requestId);
00312          }
00313          sendResponseOrException(true, cb, socketDataHolder, msgUnitArrP, xmlBlasterException);
00314       }
00315    }
00316    else {
00317       cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00318       "Received unknown callback methodName=%s", socketDataHolder->methodName);
00319    }
00320 
00321 }
00322 
00323 
00327 static int listenLoop(ListenLoopArgs* ls)
00328 {
00329    int rc;
00330    CallbackServerUnparsed *cb = ls->cb;
00331    bool udp = ls->udp;
00332    XmlBlasterException xmlBlasterException;
00333    SocketDataHolder socketDataHolder;
00334    bool success;
00335    bool useUdpForOneway = cb->socketUdp != -1;
00336 
00337    for(;;) {
00338       memset(&xmlBlasterException, 0, sizeof(XmlBlasterException));
00339       /* Here we block until a message arrives, see parseSocketData() */
00340       if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00341          "Going to block on socket read until a new message arrives ...");
00342       if (cb->stopListenLoop) break;
00343       success = readMessage(cb, &socketDataHolder, &xmlBlasterException, udp);
00344       if (cb->stopListenLoop) break;
00345       cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "%s arrived, success=%s", udp ? "UDP" : "TCP", success ? "true" : "false -> EOF");
00346 
00347       if (useUdpForOneway) {
00348          rc = pthread_mutex_lock(&cb->listenMutex);
00349          if (rc != 0) /* EINVAL */
00350             cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock() returned %d.", rc);
00351       }
00352 
00353       handleMessage(cb, &socketDataHolder, &xmlBlasterException, success);
00354 
00355       if (useUdpForOneway) {
00356          rc = pthread_mutex_unlock(&cb->listenMutex);
00357          if (rc != 0) /* EPERM */
00358             cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock() returned %d.", rc);
00359       }
00360 
00361       if (cb->stopListenLoop || !success)
00362          break;
00363    }
00364    /*pthread_exit(NULL);*/
00365    return 0;
00366 }
00367 
00368 
00377 static int runCallbackServer(CallbackServerUnparsed *cb)
00378 {
00379    int rc;
00380    int retVal = 0;
00381    ListenLoopArgs* tcpLoop = 0;
00382    ListenLoopArgs* udpLoop = 0;
00383 
00384    bool useUdpForOneway = cb->socketUdp != -1;
00385 
00386    cb->isShutdown = false;
00387 
00388    if (cb->listenSocket == -1) {
00389       if (createCallbackServer(cb) == false)
00390          return 1;
00391    }
00392    else {
00393       if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00394          "Reusing connection socket to tunnel callback messages");
00395    }
00396 
00397    if (useUdpForOneway) {
00398       /* We need to create two threads: one for TCP and one for the UDP callback listener */
00399       pthread_t tcpListenThread, udpListenThread;
00400 
00401       rc = pthread_mutex_init(&cb->listenMutex, NULL); /* rc is always 0 */
00402 
00403       tcpLoop = (ListenLoopArgs*)malloc(sizeof(ListenLoopArgs)); tcpLoop->cb = cb; tcpLoop->udp = false;
00404       rc = pthread_create(&tcpListenThread, NULL, (void * (*)(void *))listenLoop, tcpLoop);
00405 
00406       if (useUdpForOneway) {
00407          udpLoop = (ListenLoopArgs*)malloc(sizeof(ListenLoopArgs)); udpLoop->cb = cb; udpLoop->udp = true;
00408          rc = pthread_create(&udpListenThread, NULL, (void * (*)(void *))listenLoop, udpLoop);
00409       }
00410 
00411       if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00412             "Waiting to join tcpListenThread ...");
00413       pthread_join(tcpListenThread, NULL);
00414       free(tcpLoop);
00415       if (useUdpForOneway) {
00416          if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00417             "Waiting to join udpListenThread ...");
00418          pthread_join(udpListenThread, NULL);
00419          free(udpLoop);
00420       }
00421       rc = pthread_mutex_destroy(&cb->listenMutex);
00422       if (rc != 0) /* EBUSY */
00423          cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() returned %d, we ignore it", rc);
00424    }
00425    else {
00426       /* TCP only: no separate thread is needed */
00427       tcpLoop = (ListenLoopArgs*)malloc(sizeof(ListenLoopArgs)); tcpLoop->cb = cb; tcpLoop->udp = false;
00428       retVal = listenLoop(tcpLoop);
00429       free(tcpLoop);
00430    }
00431 
00432    cb->isShutdown = true;
00433    if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00434          "Callbackserver thread is dying now ...");
00435    return retVal;
00436 }
00437 
00443 static bool createCallbackServer(CallbackServerUnparsed *cb)
00444 {
00445    socklen_t cli_len;
00446    struct hostent hostbuf, *hostP = NULL;
00447    struct sockaddr_in serv_addr, cli_addr;
00448    char *tmphstbuf=NULL;
00449    size_t hstbuflen=0;
00450    char serverHostName[256];
00451    char errP[MAX_ERRNO_LEN];
00452    if (cb->hostCB == 0) {
00453       strcpyRealloc(&cb->hostCB, "localhost");
00454       if (gethostname(serverHostName, 125) == 0)
00455          strcpyRealloc(&cb->hostCB, serverHostName);
00456    }   
00457 
00458    if (cb->logLevel>=XMLBLASTER_LOG_INFO) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00459       "Starting callback server -dispatch/callback/plugin/socket/hostname %s -dispatch/callback/plugin/socket/port %d ...",
00460                cb->hostCB, cb->portCB);
00461 
00462    /*
00463       * Get a socket to work with.
00464       */
00465    if ((cb->listenSocket = (int)socket(AF_INET, SOCK_STREAM, 0)) < 0) {
00466       if (cb->logLevel>=XMLBLASTER_LOG_WARN) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00467          "Failed creating socket for callback server -dispatch/callback/plugin/socket/hostname %s -dispatch/callback/plugin/socket/port %d",
00468             cb->hostCB, cb->portCB);
00469          cb->isShutdown = true;
00470          return false;
00471    }
00472 
00473    /*
00474     * Create the address we will be binding to.
00475     */
00476    serv_addr.sin_family = AF_INET;
00477    *errP = 0;
00478    hostP = gethostbyname_re(cb->hostCB, &hostbuf, &tmphstbuf, &hstbuflen, errP);
00479 
00480    if (*errP != 0) {
00481       char message[EXCEPTIONSTRUCT_MESSAGE_LEN];
00482       SNPRINTF(message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00483                "[%.100s:%d] Lookup xmlBlaster failed, %s",
00484                __FILE__, __LINE__, errP);
00485       cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, message);
00486       *errP = 0;
00487    }
00488 
00489    if (hostP != NULL) {
00490       serv_addr.sin_addr.s_addr = ((struct in_addr *)(hostP->h_addr))->s_addr; /*inet_addr("192.168.1.2"); */
00491       free(tmphstbuf);
00492    }
00493    else
00494       serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
00495    serv_addr.sin_port = htons((u_short)cb->portCB);
00496 
00497    if (bind(cb->listenSocket, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
00498       if (cb->logLevel>=XMLBLASTER_LOG_WARN) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00499          "Failed binding port for callback server -dispatch/callback/plugin/socket/hostname %s -dispatch/callback/plugin/socket/port %d",
00500             cb->hostCB, cb->portCB);
00501       cb->isShutdown = true;
00502       return false;
00503    }
00504 
00505    /*
00506       * Listen on the socket.
00507       */
00508    if (listen(cb->listenSocket, 5) < 0) {
00509       if (cb->logLevel>=XMLBLASTER_LOG_WARN) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00510          "Failed creating listener for callback server -dispatch/callback/plugin/socket/hostname %s -dispatch/callback/plugin/socket/port %d",
00511             cb->hostCB, cb->portCB);
00512       cb->isShutdown = true;
00513       return false;
00514    }
00515 
00516    if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00517       "[CallbackServerUnparsed] Waiting for xmlBlaster to connect ...");
00518 
00519    /*
00520       * Accept connections.  When we accept one, ns
00521       * will be connected to the client.  cli_addr will
00522       * contain the address of the client.
00523       */
00524    cli_len = (socklen_t)sizeof(cli_addr);
00525    if ((cb->acceptSocket = (int)accept(cb->listenSocket, (struct sockaddr *)&cli_addr, &cli_len)) < 0) {
00526       if (cb->logLevel>=XMLBLASTER_LOG_ERROR) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00527          "[CallbackServerUnparsed] accept failed");
00528          cb->isShutdown = true;
00529          return false;
00530    }
00531    if (cb->logLevel>=XMLBLASTER_LOG_INFO) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00532       "[CallbackServerUnparsed] XmlBlaster connected from %s:%hd",
00533                            inet_ntoa(cli_addr.sin_addr), cli_addr.sin_port);
00534    return true;
00535 }
00536 
00537 static bool isListening(CallbackServerUnparsed *cb)
00538 {
00539    if (cb->listenSocket > -1) {
00540       return true;
00541    }
00542    return false;
00543 }
00544 
00559 static bool readMessage(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, XmlBlasterException *exception, bool udp)
00560 {
00561    return parseSocketData(udp ? cb->socketUdp : cb->acceptSocket, &cb->readFromSocket, socketDataHolder,
00562                           exception, &cb->stopListenLoop, udp, cb->logLevel >= XMLBLASTER_LOG_DUMP);
00563 }
00564 
00566 static void voidSendResponse(CallbackServerUnparsed *cb, void *socketDataHolder, MsgUnitArr *msgUnitArrP)
00567 {
00568    sendResponse(cb, (SocketDataHolder *)socketDataHolder, msgUnitArrP);
00569 }
00570 
00571 static void sendResponse(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, MsgUnitArr *msgUnitArrP)
00572 {
00573    char *rawMsg;
00574    size_t rawMsgLen;
00575    size_t dataLen = 0;
00576    char *data = 0;
00577    size_t i;
00578    MsgUnit msgUnit; /* we (mis)use MsgUnit for simple transformation of the exception into a raw blob */
00579    bool allocated = false;
00580    memset(&msgUnit, 0, sizeof(MsgUnit));
00581 
00582    for (i=0; i<msgUnitArrP->len; i++) {
00583       if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00584          "Returning the UpdateReturnQos '%s' to the server.",
00585             msgUnitArrP->msgUnitArr[i].responseQos);
00586 
00587       if (msgUnitArrP->msgUnitArr[i].responseQos != 0) {
00588          msgUnit.qos = msgUnitArrP->msgUnitArr[i].responseQos;
00589       }
00590       else {
00591          msgUnit.qos = strcpyAlloc("<qos/>");
00592          allocated = true;
00593       }
00594 
00595       if (data == 0) {
00596          BlobHolder blob = encodeMsgUnit(&msgUnit, cb->logLevel >= XMLBLASTER_LOG_DUMP);
00597          data = blob.data;
00598          dataLen = blob.dataLen;
00599       }
00600       else {
00601          BlobHolder blob = encodeMsgUnit(&msgUnit, cb->logLevel >= XMLBLASTER_LOG_DUMP);
00602          data = (char *)realloc(data, dataLen+blob.dataLen);
00603          memcpy(data+dataLen, blob.data, blob.dataLen);
00604          dataLen += blob.dataLen;
00605          free(blob.data);
00606       }
00607    }
00608 
00609    rawMsg = encodeSocketMessage(MSG_TYPE_RESPONSE, socketDataHolder->requestId,
00610                              socketDataHolder->methodName, socketDataHolder->secretSessionId,
00611                              data, dataLen, cb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen);
00612    free(data);
00613 
00614    /*ssize_t numSent =*/(void) cb->writeToSocket.writeToSocketFuncP(cb->updateCbUserData, cb->acceptSocket, rawMsg, (int)rawMsgLen);
00615 
00616    free(rawMsg);
00617 
00618    if (allocated) free((char *)msgUnit.qos);
00619 }
00620 
00621 static void voidSendXmlBlasterException(CallbackServerUnparsed *cb, void *socketDataHolder, XmlBlasterException *exception)
00622 {
00623    sendXmlBlasterException(cb, (SocketDataHolder *)socketDataHolder, exception);
00624 }
00625 
00626 static void sendXmlBlasterException(CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, XmlBlasterException *exception)
00627 {
00628    size_t currpos = 0;
00629    char *rawMsg;
00630    size_t rawMsgLen;
00631    BlobHolder blob;
00632    MsgUnit msgUnit; /* we (mis)use MsgUnit for simple transformation of the exception into a raw blob */
00633    memset(&msgUnit, 0, sizeof(MsgUnit));
00634    
00635    msgUnit.qos = exception->errorCode;
00636    
00637    /* see XmlBlasterException.toByteArr() and parseByteArr() */
00638    msgUnit.contentLen = strlen(exception->errorCode) + strlen(exception->message) + 11;
00639    msgUnit.content = (char *)calloc(msgUnit.contentLen, sizeof(char));
00640    
00641    memcpy((char *)msgUnit.content, exception->errorCode, strlen(exception->errorCode));
00642    currpos = strlen(exception->errorCode) + 4;
00643    
00644    memcpy((char *)msgUnit.content+currpos, exception->message, strlen(exception->message));
00645    
00646    blob = encodeMsgUnit(&msgUnit, cb->logLevel >= XMLBLASTER_LOG_DUMP);
00647 
00648    rawMsg = encodeSocketMessage(MSG_TYPE_EXCEPTION, socketDataHolder->requestId,
00649                              socketDataHolder->methodName, socketDataHolder->secretSessionId,
00650                              blob.data, blob.dataLen, cb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen);
00651    free(blob.data);
00652    free((char *)msgUnit.content);
00653 
00654    /*ssize_t numSent =*/(void) cb->writeToSocket.writeToSocketFuncP(cb->updateCbUserData, cb->acceptSocket, rawMsg, (int)rawMsgLen);
00655 
00656    free(rawMsg);
00657 }
00658 
00663 static void voidSendResponseOrException(bool success, CallbackServerUnparsed *cb, void *socketDataHolder, MsgUnitArr *msgUnitArrP, XmlBlasterException *exception)
00664 {
00665    sendResponseOrException(success, cb, (SocketDataHolder *)socketDataHolder, msgUnitArrP, exception);
00666 }
00667 
00672 static void sendResponseOrException(bool success, CallbackServerUnparsed *cb, SocketDataHolder *socketDataHolder, MsgUnitArr *msgUnitArrP, XmlBlasterException *exception)
00673 {
00674    if (! (strcmp(socketDataHolder->methodName, XMLBLASTER_UPDATE_ONEWAY) == 0)) {
00675       if (success == true) {
00676          if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00677             "update(): Sending response for requestId '%s'", socketDataHolder->requestId);
00678          sendResponse(cb, socketDataHolder, msgUnitArrP);
00679       }
00680       else {
00681          if (*(exception->errorCode) == 0) {
00682             if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00683                "update(): We don't return anything for requestId '%s', the return message will come later by the client update dispatcher thread", socketDataHolder->requestId);
00684          }
00685          else {
00686             if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00687                "update(): Throwing the XmlBlasterException '%s' back to the server:\n%s",
00688                    exception->errorCode, exception->message);
00689             sendXmlBlasterException(cb, socketDataHolder, exception);
00690          }
00691       }
00692    }
00693 
00694    freeMsgUnitArr(msgUnitArrP);
00695 }
00696 
00700 static void closeAcceptSocket(CallbackServerUnparsed *cb)
00701 {
00702    /* We close even if cb->reusingConnectionSocket is set
00703      to react instantly on EOF from server side.
00704      Otherwise the client thread would block until socket response timeout happens (one minute)
00705    */
00706    if (cb->acceptSocket != -1) {
00707       closeSocket(cb->acceptSocket);
00708       cb->acceptSocket = -1;
00709       if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00710          "Closed accept socket");
00711    }
00712 }
00713 
00717 static void shutdownCallbackServer(CallbackServerUnparsed *cb)
00718 {
00719    if (cb == 0) return;
00720 
00721    if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00722          "Shutdown callback server stopListenLoop=%s (changes now to true), reusingConnectionSocket=%s", (cb->stopListenLoop?"true":"false"), (cb->reusingConnectionSocket?"true":"false"));
00723 
00724    cb->stopListenLoop = true;
00725 
00726    if (cb->hostCB != 0) {
00727       free(cb->hostCB);
00728       cb->hostCB = 0;
00729    }
00730 
00731    if (cb->reusingConnectionSocket) {
00732       return; /* not our duty, we only have borrowed the socket from the client side connection */
00733    }
00734 
00735 
00736    closeAcceptSocket(cb);
00737 
00738    if (isListening(cb)) {
00739       closeSocket(cb->listenSocket);
00740       cb->listenSocket = -1;
00741       if (cb->logLevel>=XMLBLASTER_LOG_TRACE) cb->log(cb->logUserP, cb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00742          "Closed listener socket");
00743    }
00744 
00745    cb->readFromSocket.numReadFuncP = 0;
00746    /*
00747    for(i=0; i<10; i++) {
00748       if (cb->isShutdown) {
00749          return;
00750       }
00751       if (cb->debug) printf("[CallbackServerUnparsed] Waiting for thread to die ...");
00752       sleepMillis(1000);
00753    }
00754    printf("[CallbackServerUnparsed] WARNING: Thread has not died after 10 sec");
00755    */
00756 }
00757 
00758 const char *callbackServerRawUsage()
00759 {
00760    return 
00761       "\n   -dispatch/callback/plugin/socket/hostname [localhost]"
00762       "\n                       The IP where to establish the callback server."
00763       "\n                       Can be useful on multi homed hosts."
00764       "\n   -dispatch/callback/plugin/socket/port [7611]"
00765       "\n                       The port of the callback server.";
00766 }
00767 
00768 #ifdef USE_MAIN_CB
00769 
00772 bool myUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, SocketDataHandler socketDataHandler)
00773 {
00774    size_t i;
00775    bool testException = false;
00776    for (i=0; i<msgUnitArr->len; i++) {
00777       char *xml = messageUnitToXml(&msgUnitArr->msgUnitArr[i]);
00778       printf("client.update(): Asynchronous message update arrived:%s\n", xml);
00779       free(xml);
00780       msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc("<qos></qos>"); /* Return QoS: Everything is OK */
00781    }
00782    if (testException) {
00783       strncpy0(xmlBlasterException->errorCode, "user.notWanted", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00784       strncpy0(xmlBlasterException->message, "I don't want these messages", XMLBLASTEREXCEPTION_MESSAGE_LEN);
00785       return false;
00786    }
00787    return true;
00788 }
00789 
00793 int main(int argc, char** argv)
00794 {
00795    int iarg;
00796    CallbackServerUnparsed *cb;
00797    
00798    for (iarg=0; iarg < argc; iarg++) {
00799       if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
00800          const char *pp =
00801          "\n  -logLevel            ERROR | WARN | INFO | TRACE [WARN]"
00802          "\n\nExample:"
00803          "\n  CallbackServerUnparsed -logLevel TRACE -dispatch/callback/plugin/socket/hostname server.mars.universe";
00804          printf("Usage:\n%s%s\n", callbackServerRawUsage(), pp);
00805          exit(1);
00806       }
00807    }
00808 
00809    cb = getCallbackServerUnparsed(argc, argv, myUpdate, 0);
00810    printf("[main] Created CallbackServerUnparsed instance, creating listener on socket://%s:%d...\n", cb->hostCB, cb->portCB);
00811    cb->runCallbackServer(cb); /* blocks on socket listener */
00812 
00813    /* This code is reached only on socket EOF */
00814 
00815    printf("[main] Socket listener is shutdown\n");
00816    freeCallbackServerUnparsed(&cb);
00817    return 0;
00818 }
00819 #endif /* USE_MAIN_CB */