1 /*----------------------------------------------------------------------------
  2 Name:      xmlBlaster/demo/c/socket/Publisher.c
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Demo to publish messages from command line
  6 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
  7 Compile:   cd xmlBlaster; build.sh c
  8            (Win: copy xmlBlaster\src\c\socket\pthreadVC2.dll to your PATH)
  9 Invoke:    Publisher -help
 10 See:    http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 11 -----------------------------------------------------------------------------*/
 12 #include <stdio.h>
 13 #include <stdlib.h>
 14 #include <string.h>
 15 #include <XmlBlasterAccessUnparsed.h>
 16 
 17 static char* readFile(const char *fn);
 18 
 19 #if defined(WINCE)
 20 int _tmain(int argc, _TCHAR** argv_wcs) { /* wchar_t==_TCHAR */
 21    char **argv = convertWcsArgv(argv_wcs, argc);
 22 #else
 23 /**
 24  * Demo client to publish messages. 
 25  * Not all PublishQos functionality is implemented.
 26  * Invoke: Publisher -logLevel TRACE
 27  */
 28 int main(int argc, const char* const* argv) {
 29 #endif
 30    int iarg, iPublish;
 31    const char *callbackSessionId = "topSecret";
 32    XmlBlasterException xmlBlasterException;
 33    XmlBlasterAccessUnparsed *xa = 0;
 34    bool disconnect = true;
 35    bool erase = true;
 36    const char *publishToken = 0;
 37 
 38    printf("[client] XmlBlaster %s C SOCKET client, try option '-help' if you need"
 39           " usage informations\n", getXmlBlasterVersion());
 40 
 41    for (iarg=0; iarg < argc; iarg++) {
 42       if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
 43          char usage[XMLBLASTER_MAX_USAGE_LEN];
 44          const char *pp =
 45          "\n\nExample:"
 46          "\n  Publisher -logLevel TRACE"
 47          " -dispatch/connection/plugin/socket/hostname 192.168.2.9";
 48          printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
 49                   getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
 50          exit(EXIT_FAILURE);
 51       }
 52    }
 53 
 54    xa = getXmlBlasterAccessUnparsed(argc, (const char* const* )argv);
 55    if (xa->initialize(xa, 0, &xmlBlasterException) == false) {
 56       printf("[client] Connection to xmlBlaster failed,"
 57              " please start the server or check your configuration\n");
 58       freeXmlBlasterAccessUnparsed(xa);
 59       exit(EXIT_FAILURE);
 60    }
 61 
 62    disconnect = xa->props->getBool(xa->props, "disconnect", disconnect);
 63    erase = xa->props->getBool(xa->props, "erase", erase);
 64 
 65    {  /* connect */
 66       char *response = (char *)0;
 67       const char * const sessionName = xa->props->getString(xa->props, "session.name", "Publisher");
 68       const char * const passwd = xa->props->getString(xa->props, "passwd", "publisher");
 69       long sessionTimeout = xa->props->getLong(xa->props, "session.timeout", 86400000L);
 70       int maxSessions = xa->props->getInt(xa->props, "session.maxSessions", 10);
 71       const bool persistent = xa->props->getBool(xa->props, "persistentConnection", false);
 72       char connectQos[4096];
 73       char callbackQos[1024];
 74       sprintf(callbackQos,
 75                "<queue relating='callback' maxEntries='10000000' maxEntriesCache='10000000'>"
 76                "  <callback type='SOCKET' sessionId='%.256s'>"
 77                "    socket://%.120s:%d"
 78                "  </callback>"
 79                "</queue>",
 80                callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB);
 81       sprintf(connectQos,
 82                "<qos>"
 83                " <securityService type='htpasswd' version='1.0'>"
 84                "  <![CDATA["
 85                "   <user>%.80s</user>"
 86                "   <passwd>%.40s</passwd>"
 87                "  ]]>"
 88                " </securityService>"
 89                " <session name='%.80s' timeout='%ld' maxSessions='%d' clearSessions='false' reconnectSameClientOnly='false'/>"
 90                " %.20s"
 91                "%.1024s"
 92                "<clientProperty name='__remoteProperties'>true</clientProperty>"
 93                "</qos>", sessionName, passwd, sessionName, sessionTimeout, maxSessions, persistent?"<persistent/>":"", callbackQos);
 94 
 95       response = xa->connect(xa, connectQos, 0, &xmlBlasterException);
 96       if (*xmlBlasterException.errorCode != 0) {
 97          printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
 98                   xmlBlasterException.errorCode, xmlBlasterException.message);
 99          freeXmlBlasterAccessUnparsed(xa);
100          exit(EXIT_FAILURE);
101       }
102       xmlBlasterFree(response);
103       printf("[client] Connected to xmlBlaster, do some tests ...\n");
104    }
105 
106    { /* publish ... */
107       char *response = (char *)0;
108 
109       char key[4098];
110       const char *oid = xa->props->getString(xa->props, "oid", "Hello");
111       const char *domain = xa->props->getString(xa->props, "domain", 0);
112       bool interactive = xa->props->getBool(xa->props, "interactive", true);
113 
114       char qos[4098];
115       char topicQos[2048];
116       char destinationQos[2048];
117       bool oneway = xa->props->getBool(xa->props, "oneway", false);
118       long sleep = xa->props->getLong(xa->props, "sleep", 1000L);
119       int numPublish = xa->props->getInt(xa->props, "numPublish", 1);
120       const char *clientTags = xa->props->getString(xa->props, "clientTags", "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>");
121       const char *content = xa->props->getString(xa->props, "content", "Hi-%counter");
122       int priority = xa->props->getInt(xa->props, "priority", 5);
123       bool persistentPublish = xa->props->getBool(xa->props, "persistent", true);
124       long lifeTime = xa->props->getLong(xa->props, "lifeTime", -1L);
125       bool verbose = xa->props->getBool(xa->props, "verbose", true);
126       bool forceUpdate = xa->props->getBool(xa->props, "forceUpdate", true);
127       bool forceDestroy = xa->props->getBool(xa->props, "forceDestroy", false);
128       bool readonly = xa->props->getBool(xa->props, "readonly", false);
129       long destroyDelay = xa->props->getLong(xa->props, "destroyDelay", -1L);
130       bool createDomEntry = xa->props->getBool(xa->props, "createDomEntry", true);
131       long historyMaxMsg = xa->props->getLong(xa->props, "queue/history/maxEntries", 10L);
132       long historyMaxBytes = xa->props->getLong(xa->props, "queue/history/maxBytes", 2147483647L);
133       bool forceQueuing = xa->props->getBool(xa->props, "forceQueuing", true);
134       bool subscribable = xa->props->getBool(xa->props, "subscribable", true);
135       const char *destination = xa->props->getString(xa->props, "destination", 0);
136       int contentSize = xa->props->getInt(xa->props, "contentSize", -1);
137       const char *contentFile = xa->props->getString(xa->props, "contentFile", 0);
138       /*Map clientPropertyMap = xa->props->getInt(xa->props, "clientProperty", (Map)0); */
139 
140       publishToken = (domain == 0) ? oid : domain;
141 
142       sprintf(key, "<key oid='%.512s' domain='%.100s'>%.2000s</key>",
143                   oid, ((domain==0)?"":domain), clientTags);
144 
145       sprintf(topicQos, 
146                    " <topic readonly='%.20s' destroyDelay='%ld' createDomEntry='%.20s'>"
147                    "  <persistence/>"
148                    "  <queue relating='history' type='CACHE' version='1.0' maxEntries='%ld' maxBytes='%ld'/>"
149                    " </topic>",
150                    readonly?"true":"false",
151                    destroyDelay,
152                    createDomEntry?"true":"false",
153                    historyMaxMsg,
154                    historyMaxBytes
155                    );
156       if (destination!=0)
157          sprintf(destinationQos, " <destination queryType='EXACT' forceQueuing='%.20s'>%.512s</destination>",
158                  forceQueuing?"true":"false", destination);
159       else
160          *destinationQos = 0;
161 
162       for (iPublish=0; iPublish<numPublish || numPublish==-1; iPublish++) {
163          char msg[20];
164          const char *pp = strstr(key, "%counter");
165          MsgUnit msgUnit;
166          memset(&msgUnit, 0, sizeof(MsgUnit));
167 
168          if (interactive) {
169             printf("[client] Hit a key to publish '%s' #%d/%d ('b' to break) >> ", oid, iPublish, numPublish);
170             fgets(msg, 19, stdin);
171             if (*msg == 'b') 
172                break;
173          }
174          else {
175             if (sleep > 0) {
176                sleepMillis(sleep);
177             }
178             if (verbose) {
179                if (contentFile != 0)
180                   printf("[client] Publish to topic '%s' file '%s' #%d/%d\n", oid, contentFile, iPublish, numPublish);
181                else
182                   printf("[client] Publish to topic '%s' #%d/%d\n", oid, iPublish, numPublish);
183             }
184          }
185 
186          if (pp) { /* Replace '%counter' token by current index */
187             char *k = (char *)malloc(strlen(key)+10);
188             strncpy(k, key, pp-key);
189             sprintf(k+(pp-key), "%d%s", iPublish, pp+strlen("%counter"));
190             msgUnit.key = k;
191          }
192          else
193             msgUnit.key = strcpyAlloc(key);
194          
195          if (iPublish == 1) *topicQos = 0;
196          sprintf(qos, "<qos>"
197                    " <priority>%d</priority>"
198                    " <subscribable>%.20s</subscribable>"
199                    " <expiration lifeTime='%ld'/>"
200                    " <persistent>%.20s</persistent>"
201                    " <forceUpdate>%.20s</forceUpdate>"
202                    " <forceDestroy>%.20s</forceDestroy>"
203                    " %.2048s"
204                    " <clientProperty name='%.100s'>%.512s</clientProperty>"
205                    " %.512s"
206                    "</qos>",
207                    priority,
208                    subscribable?"true":"false",
209                    lifeTime,
210                    persistentPublish?"true":"false",
211                    forceUpdate?"true":"false",
212                    forceDestroy?"true":"false",
213                    destinationQos,
214                    "", "", /* ClientProperty */
215                    topicQos
216                    );
217 
218          /*if (iPublish == 0) printf("[client] publishQos is\n%s\n", qos);*/
219 
220          if (contentSize > 0) {
221             int i;
222             char *p = (char *)malloc(contentSize);
223             for (i=0; i<contentSize; i++) {
224                int ran = rand() % 100;
225                p[i] = (char)(ran+28);
226             }
227             msgUnit.content = p;
228             msgUnit.contentLen = contentSize;
229          }
230          else if (contentFile != 0) {
231             char* p = readFile(contentFile);
232             msgUnit.content = p;
233             msgUnit.contentLen = strlen(msgUnit.content);
234          }
235          else {
236             const char *pc = strstr(content, "%counter");
237             if (pc) { /* Replace '%counter' token by current index */
238                char *p = (char *)malloc(strlen(content)+10);
239                strncpy(p, content, pc-content);
240                sprintf(p+(pc-content), "%d%s", iPublish, pc+strlen("%counter"));
241                msgUnit.content = p;
242                msgUnit.contentLen = strlen(msgUnit.content);
243             }
244             else {
245                msgUnit.content = strcpyAlloc(content);
246                msgUnit.contentLen = strlen(msgUnit.content);
247             }
248          }
249          msgUnit.qos =strcpyAlloc(qos);
250          if (oneway) {
251             MsgUnitArr msgUnitArr;
252             msgUnitArr.len = 1;
253             msgUnitArr.msgUnitArr = &msgUnit;
254             xa->publishOneway(xa, &msgUnitArr, &xmlBlasterException);
255          }
256          else {
257             response = xa->publish(xa, &msgUnit, &xmlBlasterException);
258          }
259          freeMsgUnitData(&msgUnit);
260          if (*xmlBlasterException.errorCode != 0) {
261             printf("[client] Caught exception in publish errorCode=%s, message=%s\n",
262                      xmlBlasterException.errorCode, xmlBlasterException.message);
263             xa->disconnect(xa, 0, &xmlBlasterException);
264             freeXmlBlasterAccessUnparsed(xa);
265             exit(EXIT_FAILURE);
266          }
267          if (verbose) {
268            printf("[client] Publish success, returned status is '%s'\n", response);
269          }
270          xmlBlasterFree(response);
271       }
272    }
273 
274    while (true) {
275       char msg[20];
276       bool interactive = xa->props->getBool(xa->props, "interactiveQuit", true);
277       if (!interactive) break;
278                   
279       printf("(Enter 'q' to exit) >> ");
280       fgets(msg, 19, stdin);
281       if (*msg == 'q') 
282          break;
283    }
284     
285    if (erase) {  /* erase ... */
286       QosArr *resp;
287       char key[256];
288       const char *qos = "<qos/>";
289       sprintf(key, "<key oid='%.200s'/>", publishToken); /* TODO: use subscriptionId */
290       printf("[client] Erase topic '%s' ...\n", publishToken);
291       resp = xa->erase(xa, key, qos, &xmlBlasterException);
292       if (resp) {
293          size_t i;
294          for (i=0; i<resp->len; i++) {
295             printf("[client] Erase success, returned status is '%s'\n", resp->qosArr[i]);
296          }
297          freeQosArr(resp);
298       }
299       else {
300          printf("[client] Caught exception in erase errorCode=%s, message=%s\n",
301                   xmlBlasterException.errorCode, xmlBlasterException.message);
302          xa->disconnect(xa, 0, &xmlBlasterException);
303          freeXmlBlasterAccessUnparsed(xa);
304          exit(EXIT_FAILURE);
305       }
306    }
307 
308    if (disconnect) {
309       if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
310          printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
311                   xmlBlasterException.errorCode, xmlBlasterException.message);
312          freeXmlBlasterAccessUnparsed(xa);
313          exit(EXIT_FAILURE);
314       }
315    }
316 
317    freeXmlBlasterAccessUnparsed(xa);
318    printf("[client] Good bye.\n");
319    return 0;
320 }
321 
322 char* readFile(const char *fn) {
323    FILE *fp;
324    char *retbuf = NULL;
325    size_t nchmax = 0;
326    register int c;
327    size_t nchread = 0;
328    char *newbuf;
329 
330    if ((fp = fopen(fn, "r")) == NULL) {
331       printf("Error Opening File %s.\n", fn);
332       return 0;
333    }
334 
335    while ((c = getc(fp)) != EOF) {
336       if (nchread >= nchmax) {
337          nchmax += 1024;
338          if(nchread >= nchmax) { /* in case nchmax overflowed */
339             free(retbuf);
340             return NULL;
341          }
342 #ifdef SAFEREALLOC
343          newbuf = realloc(retbuf, nchmax + 1);
344 #else
345          if (retbuf == NULL)      /* in case pre-ANSI realloc */
346             newbuf = (char *)malloc(nchmax + 1);
347          else    newbuf = (char *)realloc(retbuf, nchmax + 1);
348 #endif
349          /* +1 for \0 */
350          if (newbuf == NULL) {
351             free(retbuf);
352             return NULL;
353          }
354          retbuf = newbuf;
355       }
356       retbuf[nchread++] = (char)c;
357    }
358 
359    if(retbuf != NULL) {
360       retbuf[nchread] = '\0';
361       newbuf = (char *)realloc(retbuf, nchread + 1);
362       if(newbuf != NULL)
363          retbuf = newbuf;
364    }
365 
366    return retbuf;
367 }


syntax highlighted by Code2HTML, v. 0.9.1