1 /*----------------------------------------------------------------------------
2 Name: xmlBlaster/demo/c/socket/Publisher.c
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Demo to publish messages from command line
6 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info>
7 Compile: cd xmlBlaster; build.sh c
8 (Win: copy xmlBlaster\src\c\socket\pthreadVC2.dll to your PATH)
9 Invoke: Publisher -help
10 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
11 -----------------------------------------------------------------------------*/
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <XmlBlasterAccessUnparsed.h>
16
17 static char* readFile(const char *fn);
18
19 #if defined(WINCE)
20 int _tmain(int argc, _TCHAR** argv_wcs) { /* wchar_t==_TCHAR */
21 char **argv = convertWcsArgv(argv_wcs, argc);
22 #else
23 /**
24 * Demo client to publish messages.
25 * Not all PublishQos functionality is implemented.
26 * Invoke: Publisher -logLevel TRACE
27 */
28 int main(int argc, const char* const* argv) {
29 #endif
30 int iarg, iPublish;
31 const char *callbackSessionId = "topSecret";
32 XmlBlasterException xmlBlasterException;
33 XmlBlasterAccessUnparsed *xa = 0;
34 bool disconnect = true;
35 bool erase = true;
36 const char *publishToken = 0;
37
38 printf("[client] XmlBlaster %s C SOCKET client, try option '-help' if you need"
39 " usage informations\n", getXmlBlasterVersion());
40
41 for (iarg=0; iarg < argc; iarg++) {
42 if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
43 char usage[XMLBLASTER_MAX_USAGE_LEN];
44 const char *pp =
45 "\n\nExample:"
46 "\n Publisher -logLevel TRACE"
47 " -dispatch/connection/plugin/socket/hostname 192.168.2.9";
48 printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
49 getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
50 exit(EXIT_FAILURE);
51 }
52 }
53
54 xa = getXmlBlasterAccessUnparsed(argc, (const char* const* )argv);
55 if (xa->initialize(xa, 0, &xmlBlasterException) == false) {
56 printf("[client] Connection to xmlBlaster failed,"
57 " please start the server or check your configuration\n");
58 freeXmlBlasterAccessUnparsed(xa);
59 exit(EXIT_FAILURE);
60 }
61
62 disconnect = xa->props->getBool(xa->props, "disconnect", disconnect);
63 erase = xa->props->getBool(xa->props, "erase", erase);
64
65 { /* connect */
66 char *response = (char *)0;
67 const char * const sessionName = xa->props->getString(xa->props, "session.name", "Publisher");
68 const char * const passwd = xa->props->getString(xa->props, "passwd", "publisher");
69 long sessionTimeout = xa->props->getLong(xa->props, "session.timeout", 86400000L);
70 int maxSessions = xa->props->getInt(xa->props, "session.maxSessions", 10);
71 const bool persistent = xa->props->getBool(xa->props, "persistentConnection", false);
72 char connectQos[4096];
73 char callbackQos[1024];
74 sprintf(callbackQos,
75 "<queue relating='callback' maxEntries='10000000' maxEntriesCache='10000000'>"
76 " <callback type='SOCKET' sessionId='%.256s'>"
77 " socket://%.120s:%d"
78 " </callback>"
79 "</queue>",
80 callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB);
81 sprintf(connectQos,
82 "<qos>"
83 " <securityService type='htpasswd' version='1.0'>"
84 " <![CDATA["
85 " <user>%.80s</user>"
86 " <passwd>%.40s</passwd>"
87 " ]]>"
88 " </securityService>"
89 " <session name='%.80s' timeout='%ld' maxSessions='%d' clearSessions='false' reconnectSameClientOnly='false'/>"
90 " %.20s"
91 "%.1024s"
92 "<clientProperty name='__remoteProperties'>true</clientProperty>"
93 "</qos>", sessionName, passwd, sessionName, sessionTimeout, maxSessions, persistent?"<persistent/>":"", callbackQos);
94
95 response = xa->connect(xa, connectQos, 0, &xmlBlasterException);
96 if (*xmlBlasterException.errorCode != 0) {
97 printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
98 xmlBlasterException.errorCode, xmlBlasterException.message);
99 freeXmlBlasterAccessUnparsed(xa);
100 exit(EXIT_FAILURE);
101 }
102 xmlBlasterFree(response);
103 printf("[client] Connected to xmlBlaster, do some tests ...\n");
104 }
105
106 { /* publish ... */
107 char *response = (char *)0;
108
109 char key[4098];
110 const char *oid = xa->props->getString(xa->props, "oid", "Hello");
111 const char *domain = xa->props->getString(xa->props, "domain", 0);
112 bool interactive = xa->props->getBool(xa->props, "interactive", true);
113
114 char qos[4098];
115 char topicQos[2048];
116 char destinationQos[2048];
117 bool oneway = xa->props->getBool(xa->props, "oneway", false);
118 long sleep = xa->props->getLong(xa->props, "sleep", 1000L);
119 int numPublish = xa->props->getInt(xa->props, "numPublish", 1);
120 const char *clientTags = xa->props->getString(xa->props, "clientTags", "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>");
121 const char *content = xa->props->getString(xa->props, "content", "Hi-%counter");
122 int priority = xa->props->getInt(xa->props, "priority", 5);
123 bool persistentPublish = xa->props->getBool(xa->props, "persistent", true);
124 long lifeTime = xa->props->getLong(xa->props, "lifeTime", -1L);
125 bool verbose = xa->props->getBool(xa->props, "verbose", true);
126 bool forceUpdate = xa->props->getBool(xa->props, "forceUpdate", true);
127 bool forceDestroy = xa->props->getBool(xa->props, "forceDestroy", false);
128 bool readonly = xa->props->getBool(xa->props, "readonly", false);
129 long destroyDelay = xa->props->getLong(xa->props, "destroyDelay", -1L);
130 bool createDomEntry = xa->props->getBool(xa->props, "createDomEntry", true);
131 long historyMaxMsg = xa->props->getLong(xa->props, "queue/history/maxEntries", 10L);
132 long historyMaxBytes = xa->props->getLong(xa->props, "queue/history/maxBytes", 2147483647L);
133 bool forceQueuing = xa->props->getBool(xa->props, "forceQueuing", true);
134 bool subscribable = xa->props->getBool(xa->props, "subscribable", true);
135 const char *destination = xa->props->getString(xa->props, "destination", 0);
136 int contentSize = xa->props->getInt(xa->props, "contentSize", -1);
137 const char *contentFile = xa->props->getString(xa->props, "contentFile", 0);
138 /*Map clientPropertyMap = xa->props->getInt(xa->props, "clientProperty", (Map)0); */
139
140 publishToken = (domain == 0) ? oid : domain;
141
142 sprintf(key, "<key oid='%.512s' domain='%.100s'>%.2000s</key>",
143 oid, ((domain==0)?"":domain), clientTags);
144
145 sprintf(topicQos,
146 " <topic readonly='%.20s' destroyDelay='%ld' createDomEntry='%.20s'>"
147 " <persistence/>"
148 " <queue relating='history' type='CACHE' version='1.0' maxEntries='%ld' maxBytes='%ld'/>"
149 " </topic>",
150 readonly?"true":"false",
151 destroyDelay,
152 createDomEntry?"true":"false",
153 historyMaxMsg,
154 historyMaxBytes
155 );
156 if (destination!=0)
157 sprintf(destinationQos, " <destination queryType='EXACT' forceQueuing='%.20s'>%.512s</destination>",
158 forceQueuing?"true":"false", destination);
159 else
160 *destinationQos = 0;
161
162 for (iPublish=0; iPublish<numPublish || numPublish==-1; iPublish++) {
163 char msg[20];
164 const char *pp = strstr(key, "%counter");
165 MsgUnit msgUnit;
166 memset(&msgUnit, 0, sizeof(MsgUnit));
167
168 if (interactive) {
169 printf("[client] Hit a key to publish '%s' #%d/%d ('b' to break) >> ", oid, iPublish, numPublish);
170 fgets(msg, 19, stdin);
171 if (*msg == 'b')
172 break;
173 }
174 else {
175 if (sleep > 0) {
176 sleepMillis(sleep);
177 }
178 if (verbose) {
179 if (contentFile != 0)
180 printf("[client] Publish to topic '%s' file '%s' #%d/%d\n", oid, contentFile, iPublish, numPublish);
181 else
182 printf("[client] Publish to topic '%s' #%d/%d\n", oid, iPublish, numPublish);
183 }
184 }
185
186 if (pp) { /* Replace '%counter' token by current index */
187 char *k = (char *)malloc(strlen(key)+10);
188 strncpy(k, key, pp-key);
189 sprintf(k+(pp-key), "%d%s", iPublish, pp+strlen("%counter"));
190 msgUnit.key = k;
191 }
192 else
193 msgUnit.key = strcpyAlloc(key);
194
195 if (iPublish == 1) *topicQos = 0;
196 sprintf(qos, "<qos>"
197 " <priority>%d</priority>"
198 " <subscribable>%.20s</subscribable>"
199 " <expiration lifeTime='%ld'/>"
200 " <persistent>%.20s</persistent>"
201 " <forceUpdate>%.20s</forceUpdate>"
202 " <forceDestroy>%.20s</forceDestroy>"
203 " %.2048s"
204 " <clientProperty name='%.100s'>%.512s</clientProperty>"
205 " %.512s"
206 "</qos>",
207 priority,
208 subscribable?"true":"false",
209 lifeTime,
210 persistentPublish?"true":"false",
211 forceUpdate?"true":"false",
212 forceDestroy?"true":"false",
213 destinationQos,
214 "", "", /* ClientProperty */
215 topicQos
216 );
217
218 /*if (iPublish == 0) printf("[client] publishQos is\n%s\n", qos);*/
219
220 if (contentSize > 0) {
221 int i;
222 char *p = (char *)malloc(contentSize);
223 for (i=0; i<contentSize; i++) {
224 int ran = rand() % 100;
225 p[i] = (char)(ran+28);
226 }
227 msgUnit.content = p;
228 msgUnit.contentLen = contentSize;
229 }
230 else if (contentFile != 0) {
231 char* p = readFile(contentFile);
232 msgUnit.content = p;
233 msgUnit.contentLen = strlen(msgUnit.content);
234 }
235 else {
236 const char *pc = strstr(content, "%counter");
237 if (pc) { /* Replace '%counter' token by current index */
238 char *p = (char *)malloc(strlen(content)+10);
239 strncpy(p, content, pc-content);
240 sprintf(p+(pc-content), "%d%s", iPublish, pc+strlen("%counter"));
241 msgUnit.content = p;
242 msgUnit.contentLen = strlen(msgUnit.content);
243 }
244 else {
245 msgUnit.content = strcpyAlloc(content);
246 msgUnit.contentLen = strlen(msgUnit.content);
247 }
248 }
249 msgUnit.qos =strcpyAlloc(qos);
250 if (oneway) {
251 MsgUnitArr msgUnitArr;
252 msgUnitArr.len = 1;
253 msgUnitArr.msgUnitArr = &msgUnit;
254 xa->publishOneway(xa, &msgUnitArr, &xmlBlasterException);
255 }
256 else {
257 response = xa->publish(xa, &msgUnit, &xmlBlasterException);
258 }
259 freeMsgUnitData(&msgUnit);
260 if (*xmlBlasterException.errorCode != 0) {
261 printf("[client] Caught exception in publish errorCode=%s, message=%s\n",
262 xmlBlasterException.errorCode, xmlBlasterException.message);
263 xa->disconnect(xa, 0, &xmlBlasterException);
264 freeXmlBlasterAccessUnparsed(xa);
265 exit(EXIT_FAILURE);
266 }
267 if (verbose) {
268 printf("[client] Publish success, returned status is '%s'\n", response);
269 }
270 xmlBlasterFree(response);
271 }
272 }
273
274 while (true) {
275 char msg[20];
276 bool interactive = xa->props->getBool(xa->props, "interactiveQuit", true);
277 if (!interactive) break;
278
279 printf("(Enter 'q' to exit) >> ");
280 fgets(msg, 19, stdin);
281 if (*msg == 'q')
282 break;
283 }
284
285 if (erase) { /* erase ... */
286 QosArr *resp;
287 char key[256];
288 const char *qos = "<qos/>";
289 sprintf(key, "<key oid='%.200s'/>", publishToken); /* TODO: use subscriptionId */
290 printf("[client] Erase topic '%s' ...\n", publishToken);
291 resp = xa->erase(xa, key, qos, &xmlBlasterException);
292 if (resp) {
293 size_t i;
294 for (i=0; i<resp->len; i++) {
295 printf("[client] Erase success, returned status is '%s'\n", resp->qosArr[i]);
296 }
297 freeQosArr(resp);
298 }
299 else {
300 printf("[client] Caught exception in erase errorCode=%s, message=%s\n",
301 xmlBlasterException.errorCode, xmlBlasterException.message);
302 xa->disconnect(xa, 0, &xmlBlasterException);
303 freeXmlBlasterAccessUnparsed(xa);
304 exit(EXIT_FAILURE);
305 }
306 }
307
308 if (disconnect) {
309 if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
310 printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
311 xmlBlasterException.errorCode, xmlBlasterException.message);
312 freeXmlBlasterAccessUnparsed(xa);
313 exit(EXIT_FAILURE);
314 }
315 }
316
317 freeXmlBlasterAccessUnparsed(xa);
318 printf("[client] Good bye.\n");
319 return 0;
320 }
321
322 char* readFile(const char *fn) {
323 FILE *fp;
324 char *retbuf = NULL;
325 size_t nchmax = 0;
326 register int c;
327 size_t nchread = 0;
328 char *newbuf;
329
330 if ((fp = fopen(fn, "r")) == NULL) {
331 printf("Error Opening File %s.\n", fn);
332 return 0;
333 }
334
335 while ((c = getc(fp)) != EOF) {
336 if (nchread >= nchmax) {
337 nchmax += 1024;
338 if(nchread >= nchmax) { /* in case nchmax overflowed */
339 free(retbuf);
340 return NULL;
341 }
342 #ifdef SAFEREALLOC
343 newbuf = realloc(retbuf, nchmax + 1);
344 #else
345 if (retbuf == NULL) /* in case pre-ANSI realloc */
346 newbuf = (char *)malloc(nchmax + 1);
347 else newbuf = (char *)realloc(retbuf, nchmax + 1);
348 #endif
349 /* +1 for \0 */
350 if (newbuf == NULL) {
351 free(retbuf);
352 return NULL;
353 }
354 retbuf = newbuf;
355 }
356 retbuf[nchread++] = (char)c;
357 }
358
359 if(retbuf != NULL) {
360 retbuf[nchread] = '\0';
361 newbuf = (char *)realloc(retbuf, nchread + 1);
362 if(newbuf != NULL)
363 retbuf = newbuf;
364 }
365
366 return retbuf;
367 }
syntax highlighted by Code2HTML, v. 0.9.1