demo/c/socket/Subscriber.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      xmlBlaster/demo/c/socket/Subscriber.c
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Example for all remote method invocations.
00006 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
00007 Compile:   cd xmlBlaster; build.sh c
00008            (Win: copy xmlBlaster\src\c\socket\pthreadVC2.dll to your PATH)
00009 Invoke:    Subscriber -help
00010 See:    http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
00011 -----------------------------------------------------------------------------*/
00012 #include <stdio.h>
00013 #include <stdlib.h>
00014 #include <string.h>
00015 #include <XmlBlasterAccessUnparsed.h>
00016 
00017 static const char *updateExceptionErrorCode = 0;
00018 static const char *updateExceptionMessage = 0;
00019 static const char *subscribeToken = 0;
00020 static const char *queryType;
00021 static int message_counter = 1;
00022 static long updateSleep = 0l;
00023 static bool reportUpdateProgress = false;
00024 static int64_t startTimestamp = 0ll; /* In nano sec */
00025 static bool verbose = true;
00026 
00031 static bool myUpdate(MsgUnitArr *msgUnitArr, void *userData,
00032                      XmlBlasterException *exception)
00033 {
00034    size_t i;
00035    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
00036    if (xa != 0) ;  /* Supress compiler warning */
00037 
00038    if (startTimestamp == 0ll)
00039       startTimestamp = getTimestamp();
00040 
00041    for (i=0; i<msgUnitArr->len; i++) {
00042       char *xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
00043 
00044       const int modulo = 100;
00045       if ((message_counter % modulo) == 0) {
00046          int64_t endTimestamp = getTimestamp();
00047          int rate = (int)(((int64_t)message_counter*1000*1000*1000)/(endTimestamp-startTimestamp));
00048          const char *persistent = (strstr(xml, "<persistent>true</persistent>")!=NULL||strstr(xml, "<persistent/>")!=NULL) ? "persistent" : "transient";
00049          xa->log(xa->logUserP, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_INFO, __FILE__,
00050              "Asynchronous %s message [%d] update arrived: average %d messages/second\n",
00051              persistent, message_counter, rate);
00052       }
00053 
00054       if (verbose) {
00055          printf("\n[client] CALLBACK update(): Asynchronous message [%d] update arrived:%s\n",
00056                 message_counter, xml);
00057       }
00058       else {
00059          if (message_counter==1) {
00060             const char *persistent = (strstr(xml, "<persistent>true</persistent>")!=NULL||strstr(xml, "<persistent/>")!=NULL) ? "persistent" : "transient";
00061             xa->log(xa->logUserP, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_INFO, __FILE__,
00062              "Asynchronous %s message [%d] update arrived, we log every 100 again as verbose is set to false\n",
00063              persistent, message_counter);
00064          }
00065       }
00066 
00067       message_counter++;
00068 
00069       /*printf("arrived message :%d\n",message_counter);*/
00070       xmlBlasterFree(xml);
00071       msgUnitArr->msgUnitArr[i].responseQos = 
00072                   strcpyAlloc("<qos><state id='OK'/></qos>");
00073       /* Return QoS: Everything is OK */
00074 
00075       if (updateSleep > 0) {
00076          printf("[client] CALLBACK update(): Sleeping for %ld millis ...\n", updateSleep);
00077          sleepMillis(updateSleep);
00078       }
00079    }
00080    if (updateExceptionErrorCode) {
00081       strncpy0(exception->errorCode, updateExceptionErrorCode,
00082                XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00083       strncpy0(exception->message, updateExceptionMessage,
00084                XMLBLASTEREXCEPTION_MESSAGE_LEN);
00085       return false;
00086    }
00087 
00088 
00089    return true;
00090 }
00091 
00097 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes) {
00098    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed*)userP;
00099    xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00100            "Update data progress currBytesRead=%ld nbytes=%ld", (long)currBytesRead, (long)nbytes);
00101    /*printf("[client] Update data progress currBytesRead=%ld nbytes=%ld\n", (long)currBytesRead, (long)nbytes);*/
00102 }
00103 
00104 #if defined(WINCE)
00105 int _tmain(int argc, _TCHAR** argv_wcs) { /* wchar_t==_TCHAR */
00106    char **argv = convertWcsArgv(argv_wcs, argc);
00107 #else
00108 
00115 int main(int argc, const char* const* argv) {
00116 #endif
00117    int iarg;
00118    const char *callbackSessionId = "topSecret";
00119    XmlBlasterException xmlBlasterException;
00120    XmlBlasterAccessUnparsed *xa = 0;
00121 
00122    printf("[client] XmlBlaster %s C SOCKET client, try option '-help' if you need"
00123           " usage informations\n", getXmlBlasterVersion());
00124 
00125    for (iarg=0; iarg < argc; iarg++) {
00126       if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
00127          char usage[XMLBLASTER_MAX_USAGE_LEN];
00128          const char *pp =
00129          "\n\nExample:"
00130          "\n  Subscriber -logLevel TRACE"
00131          " -dispatch/connection/plugin/socket/hostname 192.168.2.9";
00132          printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
00133                   getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
00134          exit(EXIT_FAILURE);
00135       }
00136    }
00137 
00138    xa = getXmlBlasterAccessUnparsed(argc, (const char* const* )argv);
00139    if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) {
00140       printf("[client] Connection to xmlBlaster failed,"
00141              " please start the server or check your configuration\n");
00142       freeXmlBlasterAccessUnparsed(xa);
00143       exit(EXIT_FAILURE);
00144    }
00145 
00146    verbose = xa->props->getBool(xa->props, "verbose", verbose);
00147    updateSleep = xa->props->getLong(xa->props, "updateSleep", 0L);
00148    reportUpdateProgress = xa->props->getBool(xa->props, "reportUpdateProgress", false); /* Report update progress */
00149    updateExceptionErrorCode = xa->props->getString(xa->props, "updateException.errorCode", 0); /* "user.clientCode" */
00150    updateExceptionMessage = xa->props->getString(xa->props, "updateException.message", "");  /* "I don't want these messages" */
00151 
00152    {  /* connect */
00153       char *response = (char *)0;
00154       char connectQos[4096];
00155       char callbackQos[1024];
00156       const char * const sessionName = xa->props->getString(xa->props, "session.name", "Subscriber");
00157       long sessionTimeout = xa->props->getLong(xa->props, "session.timeout", 86400000L);
00158       int maxSessions = xa->props->getInt(xa->props, "session.maxSessions", 10);
00159       const bool persistent = xa->props->getBool(xa->props, "dispatch/connection/persistent", false);
00160       const long pingInterval = xa->props->getLong(xa->props, "dispatch/callback/pingInterval", 10000L);
00161       const long delay = xa->props->getLong(xa->props, "dispatch/callback/delay", 60000L);
00162       const long retries = xa->props->getLong(xa->props, "dispatch/callback/retries", 0L); /* Set to -1 to keep the session on server side during a missing client */
00163       callbackSessionId = xa->props->getString(xa->props, "dispatch/callback/sessionId", callbackSessionId);
00164       sprintf(callbackQos,
00165                "<queue relating='callback' maxEntries='10000000' maxEntriesCache='10000000'>"
00166                "  <callback type='SOCKET' sessionId='%.256s' pingInterval='%ld' retries='%ld' delay='%ld' oneway='false'>"
00167                "    socket://%.120s:%d"
00168                "  </callback>"
00169                "</queue>",
00170                callbackSessionId, pingInterval, retries, delay, xa->callbackP->hostCB, xa->callbackP->portCB);
00171       sprintf(connectQos,
00172                "<qos>"
00173                " <securityService type='htpasswd' version='1.0'>"
00174                "  <![CDATA["
00175                "   <user>%.80s</user>"
00176                "   <passwd>subscriber</passwd>"
00177                "  ]]>"
00178                " </securityService>"
00179                " <session name='%.80s' timeout='%ld' maxSessions='%d' clearSessions='false' reconnectSameClientOnly='false'/>"
00180                " %.20s"
00181                "%.1024s"
00182                "</qos>", sessionName, sessionName, sessionTimeout, maxSessions, persistent?"<persistent/>":"", callbackQos);
00183 
00184       response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException);
00185       if (*xmlBlasterException.errorCode != 0) {
00186          printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
00187                   xmlBlasterException.errorCode, xmlBlasterException.message);
00188          freeXmlBlasterAccessUnparsed(xa);
00189          exit(EXIT_FAILURE);
00190       }
00191       xmlBlasterFree(response);
00192       printf("[client] Connected to xmlBlaster, do some tests ...\n");
00193    }
00194 
00195    if (reportUpdateProgress && xa->callbackP != 0) {
00196       xa->callbackP->readFromSocket.numReadFuncP = callbackProgressListener;
00197       xa->callbackP->readFromSocket.numReadUserP = xa;
00198    }
00199 
00200    { /* subscribe ... */
00201       char *response = (char *)0;
00202 
00203       char key[1024];
00204       const char *oid = xa->props->getString(xa->props, "oid", "Hello");
00205       const char *domain = xa->props->getString(xa->props, "domain", 0);
00206       const char *xpath = xa->props->getString(xa->props, "xpath", 0);
00207 
00208       char filterQos[2048];
00209       char qos[4098];
00210       bool multiSubscribe = xa->props->getBool(xa->props, "multiSubscribe", true);
00211       bool persistent = xa->props->getBool(xa->props, "subscribe/qos/persistent", false);
00212       bool notifyOnErase = xa->props->getBool(xa->props, "notifyOnErase", true);
00213       bool local = xa->props->getBool(xa->props, "local", true);
00214       bool initialUpdate = xa->props->getBool(xa->props, "initialUpdate", true);
00215       bool updateOneway = xa->props->getBool(xa->props, "updateOneway", false);
00216       int historyNumUpdates = xa->props->getInt(xa->props, "historyNumUpdates", 1);
00217       bool historyNewestFirst = xa->props->getBool(xa->props, "historyNewestFirst", true);
00218       bool wantContent = xa->props->getBool(xa->props, "wantContent", true);
00219       const char *filterType = xa->props->getString(xa->props, "filter.type", "GnuRegexFilter");
00220       const char *filterVersion = xa->props->getString(xa->props, "filter.version", "1.0");
00221       const char *filterQuery = xa->props->getString(xa->props, "filter.query", 0);  /* "^H.*$" */
00222       bool interactiveSubscribe = xa->props->getBool(xa->props, "interactiveSubscribe", false);
00223 
00224       if (domain) {
00225          sprintf(key, "<key domain='%.512s'/>", domain);
00226          subscribeToken = domain;
00227          queryType = "DOMAIN";
00228       }
00229       else if (xpath) {
00230          sprintf(key, "<key queryType='XPATH'>%.512s</key>", xpath);
00231          subscribeToken = xpath;
00232          queryType = "XPATH";
00233       }
00234       else {
00235          sprintf(key, "<key oid='%.512s'/>", oid);
00236          subscribeToken = oid;
00237          queryType = "EXACT";
00238       }
00239 
00240       if (filterQuery) {
00241          sprintf(filterQos, " <filter type='%.100s' version='%.50s'>%.1800s</filter>",
00242                  filterType, filterVersion, filterQuery);
00243       }
00244       else
00245          *filterQos = 0;
00246 
00247       sprintf(qos, "<qos>"
00248                    " <content>%.20s</content>"
00249                    " <multiSubscribe>%.20s</multiSubscribe>"
00250                    " <persistent>%.20s</persistent>"
00251                    " <local>%.20s</local>"
00252                    " <initialUpdate>%.20s</initialUpdate>"
00253                    " <updateOneway>%.20s</updateOneway>"
00254                    " <notify>%.20s</notify>"
00255                    "%.2048s"
00256                    " <history numEntries='%d' newestFirst='%.20s'/>"
00257                    "</qos>",
00258                    wantContent?"true":"false",
00259                    multiSubscribe?"true":"false",
00260                    persistent?"true":"false",
00261                    local?"true":"false",
00262                    initialUpdate?"true":"false",
00263                    updateOneway?"true":"false",
00264                    notifyOnErase?"true":"false",
00265                    filterQos,
00266                    historyNumUpdates,
00267                    historyNewestFirst?"true":"false"
00268                    );
00269 
00270       printf("[client] Subscribe key: %s\n", key);
00271       printf("[client] Subscribe qos: %s\n", qos);
00272 
00273       if (interactiveSubscribe) {
00274          char msg[20];
00275          printf("(Hit a key to subscribe) >> ");
00276          fgets(msg, 19, stdin);
00277       }
00278 
00279       response = xa->subscribe(xa, key, qos, &xmlBlasterException);
00280       if (*xmlBlasterException.errorCode != 0) {
00281          printf("[client] Caught exception in subscribe errorCode=%s, message=%s\n",
00282                   xmlBlasterException.errorCode, xmlBlasterException.message);
00283          xa->disconnect(xa, 0, &xmlBlasterException);
00284          freeXmlBlasterAccessUnparsed(xa);
00285          exit(EXIT_FAILURE);
00286       }
00287       printf("[client] Subscribe success, returned status is '%s'\n", response);
00288       xmlBlasterFree(response);
00289    }
00290 
00291    while (true) {
00292       char msg[20];
00293                   
00294       printf("(Enter 'q' to exit) >> ");
00295       fgets(msg, 19, stdin);
00296       if (*msg == 'q') 
00297          break;
00298    }
00299     
00300    {  /* unSubscribe ... */
00301       QosArr *resp;
00302       char key[256];
00303       const char *qos = "<qos/>";
00304       /* TODO: use subscriptionId */
00305       if (!strcmp(queryType, "EXACT"))
00306          sprintf(key, "<key oid='%.200s'/>", subscribeToken);
00307       else if (!strcmp(queryType, "DOMAIN"))
00308          sprintf(key, "<key domain='%.512s'/>", subscribeToken);
00309       else
00310          sprintf(key, "<key queryType='XPATH'>%.512s</key>", subscribeToken);
00311       printf("[client] UnSubscribe topic '%s' ...\n", subscribeToken);
00312       resp = xa->unSubscribe(xa, key, qos, &xmlBlasterException);
00313       if (resp) {
00314          size_t i;
00315          for (i=0; i<resp->len; i++) {
00316             printf("[client] Unsubscribe success, returned status is '%s'\n", resp->qosArr[i]);
00317          }
00318          freeQosArr(resp);
00319       }
00320       else {
00321          printf("[client] Caught exception in unSubscribe errorCode=%s, message=%s\n",
00322                   xmlBlasterException.errorCode, xmlBlasterException.message);
00323          xa->disconnect(xa, 0, &xmlBlasterException);
00324          freeXmlBlasterAccessUnparsed(xa);
00325          exit(EXIT_FAILURE);
00326       }
00327    }
00328 
00329    if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
00330       printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
00331                xmlBlasterException.errorCode, xmlBlasterException.message);
00332       freeXmlBlasterAccessUnparsed(xa);
00333       exit(EXIT_FAILURE);
00334    }
00335 
00336    freeXmlBlasterAccessUnparsed(xa);
00337    printf("[client] Good bye.\n");
00338    return 0;
00339 }
00340