1 /*----------------------------------------------------------------------------
  2 Name:      xmlBlaster/demo/c/socket/Subscriber.c
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Example for all remote method invocations.
  6 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
  7 Compile:   cd xmlBlaster; build.sh c
  8            (Win: copy xmlBlaster\src\c\socket\pthreadVC2.dll to your PATH)
  9 Invoke:    Subscriber -help
 10 See:    http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 11 -----------------------------------------------------------------------------*/
 12 #include <stdio.h>
 13 #include <stdlib.h>
 14 #include <string.h>
 15 #include <XmlBlasterAccessUnparsed.h>
 16 #include <util/Timestampc.h>
 17 
 18 static const char *updateExceptionErrorCode = 0;
 19 static const char *updateExceptionMessage = 0;
 20 static const char *subscribeToken = 0;
 21 static const char *queryType;
 22 static int message_counter = 1;
 23 static long updateSleep = 0l;
 24 static bool reportUpdateProgress = false;
 25 static int64_t startTimestamp = 0ll; /* In nano sec */
 26 static bool verbose = true;
 27 
 28 /**
 29  * Here we receive the callback messages from xmlBlaster
 30  * @see UpdateFp in CallbackServerUnparsed.h
 31  */
 32 static bool myUpdate(MsgUnitArr *msgUnitArr, void *userData,
 33                      XmlBlasterException *exception)
 34 {
 35    size_t i;
 36    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
 37    if (xa != 0) ;  /* Supress compiler warning */
 38 
 39    if (startTimestamp == 0ll)
 40       startTimestamp = getTimestamp();
 41 
 42    for (i=0; i<msgUnitArr->len; i++) {
 43       char *xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
 44 
 45       const int modulo = 100;
 46       if ((message_counter % modulo) == 0) {
 47          int64_t endTimestamp = getTimestamp();
 48          int rate = (int)(((int64_t)message_counter*1000*1000*1000)/(endTimestamp-startTimestamp));
 49          const char *persistent = (strstr(xml, "<persistent>true</persistent>")!=NULL||strstr(xml, "<persistent/>")!=NULL) ? "persistent" : "transient";
 50          xa->log(xa->logUserP, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_INFO, __FILE__,
 51              "Asynchronous %s message [%d] update arrived: average %d messages/second\n",
 52              persistent, message_counter, rate);
 53       }
 54 
 55       if (verbose) {
 56          printf("\n[client] CALLBACK update(): Asynchronous message [%d] update arrived:%s\n",
 57                 message_counter, xml);
 58       }
 59       else {
 60          if (message_counter==1) {
 61             const char *persistent = (strstr(xml, "<persistent>true</persistent>")!=NULL||strstr(xml, "<persistent/>")!=NULL) ? "persistent" : "transient";
 62             xa->log(xa->logUserP, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_INFO, __FILE__,
 63              "Asynchronous %s message [%d] update arrived, we log every 100 again as verbose is set to false\n",
 64              persistent, message_counter);
 65          }
 66       }
 67 
 68       message_counter++;
 69 
 70       /*printf("arrived message :%d\n",message_counter);*/
 71       xmlBlasterFree(xml);
 72       msgUnitArr->msgUnitArr[i].responseQos = 
 73                   strcpyAlloc("<qos><state id='OK'/></qos>");
 74       /* Return QoS: Everything is OK */
 75 
 76       if (updateSleep > 0) {
 77          printf("[client] CALLBACK update(): Sleeping for %ld millis ...\n", updateSleep);
 78          sleepMillis(updateSleep);
 79       }
 80    }
 81    if (updateExceptionErrorCode) {
 82       strncpy0(exception->errorCode, updateExceptionErrorCode,
 83                XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 84       strncpy0(exception->message, updateExceptionMessage,
 85                XMLBLASTEREXCEPTION_MESSAGE_LEN);
 86       return false;
 87    }
 88 
 89 
 90    return true;
 91 }
 92 
 93 /**
 94  * Access the read socket progress. 
 95  * You need to register this function pointer if you want to see the progress of huge messages
 96  * on slow connections.
 97  */
 98 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes) {
 99    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed*)userP;
100    xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
101            "Update data progress currBytesRead=%ld nbytes=%ld", (long)currBytesRead, (long)nbytes);
102    /*printf("[client] Update data progress currBytesRead=%ld nbytes=%ld\n", (long)currBytesRead, (long)nbytes);*/
103 }
104 
105 #if defined(WINCE)
106 int _tmain(int argc, _TCHAR** argv_wcs) { /* wchar_t==_TCHAR */
107    char **argv = convertWcsArgv(argv_wcs, argc);
108 #else
109 /**
110  * Invoke examples:
111  *
112  * Subscriber -logLevel TRACE
113  *
114  * Subscriber -session.name Subscriber/1 -dispatch/callback/retries -1 -subscribe/qos/persistent true -interactiveSubscribe true 
115  */
116 int main(int argc, const char* const* argv) {
117 #endif
118    int iarg;
119    const char *callbackSessionId = "topSecret";
120    XmlBlasterException xmlBlasterException;
121    XmlBlasterAccessUnparsed *xa = 0;
122 
123    printf("[client] XmlBlaster %s C SOCKET client, try option '-help' if you need"
124           " usage informations\n", getXmlBlasterVersion());
125 
126    for (iarg=0; iarg < argc; iarg++) {
127       if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
128          char usage[XMLBLASTER_MAX_USAGE_LEN];
129          const char *pp =
130          "\n\nExample:"
131          "\n  Subscriber -logLevel TRACE"
132          " -dispatch/connection/plugin/socket/hostname 192.168.2.9";
133          printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
134                   getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
135          exit(EXIT_FAILURE);
136       }
137    }
138 
139    xa = getXmlBlasterAccessUnparsed(argc, (const char* const* )argv);
140    if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) {
141       printf("[client] Connection to xmlBlaster failed,"
142              " please start the server or check your configuration\n");
143       freeXmlBlasterAccessUnparsed(xa);
144       exit(EXIT_FAILURE);
145    }
146 
147    verbose = xa->props->getBool(xa->props, "verbose", verbose);
148    updateSleep = xa->props->getLong(xa->props, "updateSleep", 0L);
149    reportUpdateProgress = xa->props->getBool(xa->props, "reportUpdateProgress", false); /* Report update progress */
150    updateExceptionErrorCode = xa->props->getString(xa->props, "updateException.errorCode", 0); /* "user.clientCode" */
151    updateExceptionMessage = xa->props->getString(xa->props, "updateException.message", "");  /* "I don't want these messages" */
152 
153    {  /* connect */
154       char *response = (char *)0;
155       char connectQos[4096];
156       char callbackQos[1024];
157       const char * const sessionName = xa->props->getString(xa->props, "session.name", "Subscriber");
158       long sessionTimeout = xa->props->getLong(xa->props, "session.timeout", 86400000L);
159       int maxSessions = xa->props->getInt(xa->props, "session.maxSessions", 10);
160       const bool persistent = xa->props->getBool(xa->props, "dispatch/connection/persistent", false);
161       const long pingInterval = xa->props->getLong(xa->props, "dispatch/callback/pingInterval", 10000L);
162       const long delay = xa->props->getLong(xa->props, "dispatch/callback/delay", 60000L);
163       const long retries = xa->props->getLong(xa->props, "dispatch/callback/retries", 0L); /* Set to -1 to keep the session on server side during a missing client */
164       callbackSessionId = xa->props->getString(xa->props, "dispatch/callback/sessionId", callbackSessionId);
165       sprintf(callbackQos,
166                "<queue relating='callback' maxEntries='10000000' maxEntriesCache='10000000'>"
167                "  <callback type='SOCKET' sessionId='%.256s' pingInterval='%ld' retries='%ld' delay='%ld' oneway='false'>"
168                "    socket://%.120s:%d"
169                "  </callback>"
170                "</queue>",
171                callbackSessionId, pingInterval, retries, delay, xa->callbackP->hostCB, xa->callbackP->portCB);
172       sprintf(connectQos,
173                "<qos>"
174                " <securityService type='htpasswd' version='1.0'>"
175                "  <![CDATA["
176                "   <user>%.80s</user>"
177                "   <passwd>subscriber</passwd>"
178                "  ]]>"
179                " </securityService>"
180                " <session name='%.80s' timeout='%ld' maxSessions='%d' clearSessions='false' reconnectSameClientOnly='false'/>"
181                " %.20s"
182                "%.1024s"
183                "</qos>", sessionName, sessionName, sessionTimeout, maxSessions, persistent?"<persistent/>":"", callbackQos);
184 
185       response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException);
186       if (*xmlBlasterException.errorCode != 0) {
187          printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
188                   xmlBlasterException.errorCode, xmlBlasterException.message);
189          freeXmlBlasterAccessUnparsed(xa);
190          exit(EXIT_FAILURE);
191       }
192       xmlBlasterFree(response);
193       printf("[client] Connected to xmlBlaster, do some tests ...\n");
194    }
195 
196    if (reportUpdateProgress && xa->callbackP != 0) {
197       xa->callbackP->readFromSocket.numReadFuncP = callbackProgressListener;
198       xa->callbackP->readFromSocket.numReadUserP = xa;
199    }
200 
201    { /* subscribe ... */
202       char *response = (char *)0;
203 
204       char key[1024];
205       const char *oid = xa->props->getString(xa->props, "oid", "Hello");
206       const char *domain = xa->props->getString(xa->props, "domain", 0);
207       const char *xpath = xa->props->getString(xa->props, "xpath", 0);
208 
209       char filterQos[2048];
210       char qos[4098];
211       bool multiSubscribe = xa->props->getBool(xa->props, "multiSubscribe", true);
212       bool persistent = xa->props->getBool(xa->props, "subscribe/qos/persistent", false);
213       bool notifyOnErase = xa->props->getBool(xa->props, "notifyOnErase", true);
214       bool local = xa->props->getBool(xa->props, "local", true);
215       bool initialUpdate = xa->props->getBool(xa->props, "initialUpdate", true);
216       bool updateOneway = xa->props->getBool(xa->props, "updateOneway", false);
217       int historyNumUpdates = xa->props->getInt(xa->props, "historyNumUpdates", 1);
218       bool historyNewestFirst = xa->props->getBool(xa->props, "historyNewestFirst", true);
219       bool wantContent = xa->props->getBool(xa->props, "wantContent", true);
220       const char *filterType = xa->props->getString(xa->props, "filter.type", "GnuRegexFilter");
221       const char *filterVersion = xa->props->getString(xa->props, "filter.version", "1.0");
222       const char *filterQuery = xa->props->getString(xa->props, "filter.query", 0);  /* "^H.*$" */
223       bool interactiveSubscribe = xa->props->getBool(xa->props, "interactiveSubscribe", false);
224 
225       if (domain) {
226          sprintf(key, "<key domain='%.512s'/>", domain);
227          subscribeToken = domain;
228          queryType = "DOMAIN";
229       }
230       else if (xpath) {
231          sprintf(key, "<key queryType='XPATH'>%.512s</key>", xpath);
232          subscribeToken = xpath;
233          queryType = "XPATH";
234       }
235       else {
236          sprintf(key, "<key oid='%.512s'/>", oid);
237          subscribeToken = oid;
238          queryType = "EXACT";
239       }
240 
241       if (filterQuery) {
242          sprintf(filterQos, " <filter type='%.100s' version='%.50s'>%.1800s</filter>",
243                  filterType, filterVersion, filterQuery);
244       }
245       else
246          *filterQos = 0;
247 
248       sprintf(qos, "<qos>"
249                    " <content>%.20s</content>"
250                    " <multiSubscribe>%.20s</multiSubscribe>"
251                    " <persistent>%.20s</persistent>"
252                    " <local>%.20s</local>"
253                    " <initialUpdate>%.20s</initialUpdate>"
254                    " <updateOneway>%.20s</updateOneway>"
255                    " <notify>%.20s</notify>"
256                    "%.2048s"
257                    " <history numEntries='%d' newestFirst='%.20s'/>"
258                    "</qos>",
259                    wantContent?"true":"false",
260                    multiSubscribe?"true":"false",
261                    persistent?"true":"false",
262                    local?"true":"false",
263                    initialUpdate?"true":"false",
264                    updateOneway?"true":"false",
265                    notifyOnErase?"true":"false",
266                    filterQos,
267                    historyNumUpdates,
268                    historyNewestFirst?"true":"false"
269                    );
270 
271       printf("[client] Subscribe key: %s\n", key);
272       printf("[client] Subscribe qos: %s\n", qos);
273 
274       if (interactiveSubscribe) {
275          char msg[20];
276          printf("(Hit a key to subscribe) >> ");
277          fgets(msg, 19, stdin);
278       }
279 
280       response = xa->subscribe(xa, key, qos, &xmlBlasterException);
281       if (*xmlBlasterException.errorCode != 0) {
282          printf("[client] Caught exception in subscribe errorCode=%s, message=%s\n",
283                   xmlBlasterException.errorCode, xmlBlasterException.message);
284          xa->disconnect(xa, 0, &xmlBlasterException);
285          freeXmlBlasterAccessUnparsed(xa);
286          exit(EXIT_FAILURE);
287       }
288       printf("[client] Subscribe success, returned status is '%s'\n", response);
289       xmlBlasterFree(response);
290    }
291 
292    while (true) {
293       char msg[20];
294                   
295       printf("(Enter 'q' to exit) >> ");
296       fgets(msg, 19, stdin);
297       if (*msg == 'q') 
298          break;
299    }
300     
301    {  /* unSubscribe ... */
302       QosArr *resp;
303       char key[256];
304       const char *qos = "<qos/>";
305       /* TODO: use subscriptionId */
306       if (!strcmp(queryType, "EXACT"))
307          sprintf(key, "<key oid='%.200s'/>", subscribeToken);
308       else if (!strcmp(queryType, "DOMAIN"))
309          sprintf(key, "<key domain='%.512s'/>", subscribeToken);
310       else
311          sprintf(key, "<key queryType='XPATH'>%.512s</key>", subscribeToken);
312       printf("[client] UnSubscribe topic '%s' ...\n", subscribeToken);
313       resp = xa->unSubscribe(xa, key, qos, &xmlBlasterException);
314       if (resp) {
315          size_t i;
316          for (i=0; i<resp->len; i++) {
317             printf("[client] Unsubscribe success, returned status is '%s'\n", resp->qosArr[i]);
318          }
319          freeQosArr(resp);
320       }
321       else {
322          printf("[client] Caught exception in unSubscribe errorCode=%s, message=%s\n",
323                   xmlBlasterException.errorCode, xmlBlasterException.message);
324          xa->disconnect(xa, 0, &xmlBlasterException);
325          freeXmlBlasterAccessUnparsed(xa);
326          exit(EXIT_FAILURE);
327       }
328    }
329 
330    if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
331       printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
332                xmlBlasterException.errorCode, xmlBlasterException.message);
333       freeXmlBlasterAccessUnparsed(xa);
334       exit(EXIT_FAILURE);
335    }
336 
337    freeXmlBlasterAccessUnparsed(xa);
338    printf("[client] Good bye.\n");
339    return 0;
340 }


syntax highlighted by Code2HTML, v. 0.9.1