1 /*----------------------------------------------------------------------------
2 Name: xmlBlaster/demo/c/socket/Subscriber.c
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Example for all remote method invocations.
6 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info>
7 Compile: cd xmlBlaster; build.sh c
8 (Win: copy xmlBlaster\src\c\socket\pthreadVC2.dll to your PATH)
9 Invoke: Subscriber -help
10 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
11 -----------------------------------------------------------------------------*/
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <XmlBlasterAccessUnparsed.h>
16 #include <util/Timestampc.h>
17
18 static const char *updateExceptionErrorCode = 0;
19 static const char *updateExceptionMessage = 0;
20 static const char *subscribeToken = 0;
21 static const char *queryType;
22 static int message_counter = 1;
23 static long updateSleep = 0l;
24 static bool reportUpdateProgress = false;
25 static int64_t startTimestamp = 0ll; /* In nano sec */
26 static bool verbose = true;
27
28 /**
29 * Here we receive the callback messages from xmlBlaster
30 * @see UpdateFp in CallbackServerUnparsed.h
31 */
32 static bool myUpdate(MsgUnitArr *msgUnitArr, void *userData,
33 XmlBlasterException *exception)
34 {
35 size_t i;
36 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
37 if (xa != 0) ; /* Supress compiler warning */
38
39 if (startTimestamp == 0ll)
40 startTimestamp = getTimestamp();
41
42 for (i=0; i<msgUnitArr->len; i++) {
43 char *xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
44
45 const int modulo = 100;
46 if ((message_counter % modulo) == 0) {
47 int64_t endTimestamp = getTimestamp();
48 int rate = (int)(((int64_t)message_counter*1000*1000*1000)/(endTimestamp-startTimestamp));
49 const char *persistent = (strstr(xml, "<persistent>true</persistent>")!=NULL||strstr(xml, "<persistent/>")!=NULL) ? "persistent" : "transient";
50 xa->log(xa->logUserP, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_INFO, __FILE__,
51 "Asynchronous %s message [%d] update arrived: average %d messages/second\n",
52 persistent, message_counter, rate);
53 }
54
55 if (verbose) {
56 printf("\n[client] CALLBACK update(): Asynchronous message [%d] update arrived:%s\n",
57 message_counter, xml);
58 }
59 else {
60 if (message_counter==1) {
61 const char *persistent = (strstr(xml, "<persistent>true</persistent>")!=NULL||strstr(xml, "<persistent/>")!=NULL) ? "persistent" : "transient";
62 xa->log(xa->logUserP, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_INFO, __FILE__,
63 "Asynchronous %s message [%d] update arrived, we log every 100 again as verbose is set to false\n",
64 persistent, message_counter);
65 }
66 }
67
68 message_counter++;
69
70 /*printf("arrived message :%d\n",message_counter);*/
71 xmlBlasterFree(xml);
72 msgUnitArr->msgUnitArr[i].responseQos =
73 strcpyAlloc("<qos><state id='OK'/></qos>");
74 /* Return QoS: Everything is OK */
75
76 if (updateSleep > 0) {
77 printf("[client] CALLBACK update(): Sleeping for %ld millis ...\n", updateSleep);
78 sleepMillis(updateSleep);
79 }
80 }
81 if (updateExceptionErrorCode) {
82 strncpy0(exception->errorCode, updateExceptionErrorCode,
83 XMLBLASTEREXCEPTION_ERRORCODE_LEN);
84 strncpy0(exception->message, updateExceptionMessage,
85 XMLBLASTEREXCEPTION_MESSAGE_LEN);
86 return false;
87 }
88
89
90 return true;
91 }
92
93 /**
94 * Access the read socket progress.
95 * You need to register this function pointer if you want to see the progress of huge messages
96 * on slow connections.
97 */
98 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes) {
99 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed*)userP;
100 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
101 "Update data progress currBytesRead=%ld nbytes=%ld", (long)currBytesRead, (long)nbytes);
102 /*printf("[client] Update data progress currBytesRead=%ld nbytes=%ld\n", (long)currBytesRead, (long)nbytes);*/
103 }
104
105 #if defined(WINCE)
106 int _tmain(int argc, _TCHAR** argv_wcs) { /* wchar_t==_TCHAR */
107 char **argv = convertWcsArgv(argv_wcs, argc);
108 #else
109 /**
110 * Invoke examples:
111 *
112 * Subscriber -logLevel TRACE
113 *
114 * Subscriber -session.name Subscriber/1 -dispatch/callback/retries -1 -subscribe/qos/persistent true -interactiveSubscribe true
115 */
116 int main(int argc, const char* const* argv) {
117 #endif
118 int iarg;
119 const char *callbackSessionId = "topSecret";
120 XmlBlasterException xmlBlasterException;
121 XmlBlasterAccessUnparsed *xa = 0;
122
123 printf("[client] XmlBlaster %s C SOCKET client, try option '-help' if you need"
124 " usage informations\n", getXmlBlasterVersion());
125
126 for (iarg=0; iarg < argc; iarg++) {
127 if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
128 char usage[XMLBLASTER_MAX_USAGE_LEN];
129 const char *pp =
130 "\n\nExample:"
131 "\n Subscriber -logLevel TRACE"
132 " -dispatch/connection/plugin/socket/hostname 192.168.2.9";
133 printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
134 getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
135 exit(EXIT_FAILURE);
136 }
137 }
138
139 xa = getXmlBlasterAccessUnparsed(argc, (const char* const* )argv);
140 if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) {
141 printf("[client] Connection to xmlBlaster failed,"
142 " please start the server or check your configuration\n");
143 freeXmlBlasterAccessUnparsed(xa);
144 exit(EXIT_FAILURE);
145 }
146
147 verbose = xa->props->getBool(xa->props, "verbose", verbose);
148 updateSleep = xa->props->getLong(xa->props, "updateSleep", 0L);
149 reportUpdateProgress = xa->props->getBool(xa->props, "reportUpdateProgress", false); /* Report update progress */
150 updateExceptionErrorCode = xa->props->getString(xa->props, "updateException.errorCode", 0); /* "user.clientCode" */
151 updateExceptionMessage = xa->props->getString(xa->props, "updateException.message", ""); /* "I don't want these messages" */
152
153 { /* connect */
154 char *response = (char *)0;
155 char connectQos[4096];
156 char callbackQos[1024];
157 const char * const sessionName = xa->props->getString(xa->props, "session.name", "Subscriber");
158 long sessionTimeout = xa->props->getLong(xa->props, "session.timeout", 86400000L);
159 int maxSessions = xa->props->getInt(xa->props, "session.maxSessions", 10);
160 const bool persistent = xa->props->getBool(xa->props, "dispatch/connection/persistent", false);
161 const long pingInterval = xa->props->getLong(xa->props, "dispatch/callback/pingInterval", 10000L);
162 const long delay = xa->props->getLong(xa->props, "dispatch/callback/delay", 60000L);
163 const long retries = xa->props->getLong(xa->props, "dispatch/callback/retries", 0L); /* Set to -1 to keep the session on server side during a missing client */
164 callbackSessionId = xa->props->getString(xa->props, "dispatch/callback/sessionId", callbackSessionId);
165 sprintf(callbackQos,
166 "<queue relating='callback' maxEntries='10000000' maxEntriesCache='10000000'>"
167 " <callback type='SOCKET' sessionId='%.256s' pingInterval='%ld' retries='%ld' delay='%ld' oneway='false'>"
168 " socket://%.120s:%d"
169 " </callback>"
170 "</queue>",
171 callbackSessionId, pingInterval, retries, delay, xa->callbackP->hostCB, xa->callbackP->portCB);
172 sprintf(connectQos,
173 "<qos>"
174 " <securityService type='htpasswd' version='1.0'>"
175 " <![CDATA["
176 " <user>%.80s</user>"
177 " <passwd>subscriber</passwd>"
178 " ]]>"
179 " </securityService>"
180 " <session name='%.80s' timeout='%ld' maxSessions='%d' clearSessions='false' reconnectSameClientOnly='false'/>"
181 " %.20s"
182 "%.1024s"
183 "</qos>", sessionName, sessionName, sessionTimeout, maxSessions, persistent?"<persistent/>":"", callbackQos);
184
185 response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException);
186 if (*xmlBlasterException.errorCode != 0) {
187 printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
188 xmlBlasterException.errorCode, xmlBlasterException.message);
189 freeXmlBlasterAccessUnparsed(xa);
190 exit(EXIT_FAILURE);
191 }
192 xmlBlasterFree(response);
193 printf("[client] Connected to xmlBlaster, do some tests ...\n");
194 }
195
196 if (reportUpdateProgress && xa->callbackP != 0) {
197 xa->callbackP->readFromSocket.numReadFuncP = callbackProgressListener;
198 xa->callbackP->readFromSocket.numReadUserP = xa;
199 }
200
201 { /* subscribe ... */
202 char *response = (char *)0;
203
204 char key[1024];
205 const char *oid = xa->props->getString(xa->props, "oid", "Hello");
206 const char *domain = xa->props->getString(xa->props, "domain", 0);
207 const char *xpath = xa->props->getString(xa->props, "xpath", 0);
208
209 char filterQos[2048];
210 char qos[4098];
211 bool multiSubscribe = xa->props->getBool(xa->props, "multiSubscribe", true);
212 bool persistent = xa->props->getBool(xa->props, "subscribe/qos/persistent", false);
213 bool notifyOnErase = xa->props->getBool(xa->props, "notifyOnErase", true);
214 bool local = xa->props->getBool(xa->props, "local", true);
215 bool initialUpdate = xa->props->getBool(xa->props, "initialUpdate", true);
216 bool updateOneway = xa->props->getBool(xa->props, "updateOneway", false);
217 int historyNumUpdates = xa->props->getInt(xa->props, "historyNumUpdates", 1);
218 bool historyNewestFirst = xa->props->getBool(xa->props, "historyNewestFirst", true);
219 bool wantContent = xa->props->getBool(xa->props, "wantContent", true);
220 const char *filterType = xa->props->getString(xa->props, "filter.type", "GnuRegexFilter");
221 const char *filterVersion = xa->props->getString(xa->props, "filter.version", "1.0");
222 const char *filterQuery = xa->props->getString(xa->props, "filter.query", 0); /* "^H.*$" */
223 bool interactiveSubscribe = xa->props->getBool(xa->props, "interactiveSubscribe", false);
224
225 if (domain) {
226 sprintf(key, "<key domain='%.512s'/>", domain);
227 subscribeToken = domain;
228 queryType = "DOMAIN";
229 }
230 else if (xpath) {
231 sprintf(key, "<key queryType='XPATH'>%.512s</key>", xpath);
232 subscribeToken = xpath;
233 queryType = "XPATH";
234 }
235 else {
236 sprintf(key, "<key oid='%.512s'/>", oid);
237 subscribeToken = oid;
238 queryType = "EXACT";
239 }
240
241 if (filterQuery) {
242 sprintf(filterQos, " <filter type='%.100s' version='%.50s'>%.1800s</filter>",
243 filterType, filterVersion, filterQuery);
244 }
245 else
246 *filterQos = 0;
247
248 sprintf(qos, "<qos>"
249 " <content>%.20s</content>"
250 " <multiSubscribe>%.20s</multiSubscribe>"
251 " <persistent>%.20s</persistent>"
252 " <local>%.20s</local>"
253 " <initialUpdate>%.20s</initialUpdate>"
254 " <updateOneway>%.20s</updateOneway>"
255 " <notify>%.20s</notify>"
256 "%.2048s"
257 " <history numEntries='%d' newestFirst='%.20s'/>"
258 "</qos>",
259 wantContent?"true":"false",
260 multiSubscribe?"true":"false",
261 persistent?"true":"false",
262 local?"true":"false",
263 initialUpdate?"true":"false",
264 updateOneway?"true":"false",
265 notifyOnErase?"true":"false",
266 filterQos,
267 historyNumUpdates,
268 historyNewestFirst?"true":"false"
269 );
270
271 printf("[client] Subscribe key: %s\n", key);
272 printf("[client] Subscribe qos: %s\n", qos);
273
274 if (interactiveSubscribe) {
275 char msg[20];
276 printf("(Hit a key to subscribe) >> ");
277 fgets(msg, 19, stdin);
278 }
279
280 response = xa->subscribe(xa, key, qos, &xmlBlasterException);
281 if (*xmlBlasterException.errorCode != 0) {
282 printf("[client] Caught exception in subscribe errorCode=%s, message=%s\n",
283 xmlBlasterException.errorCode, xmlBlasterException.message);
284 xa->disconnect(xa, 0, &xmlBlasterException);
285 freeXmlBlasterAccessUnparsed(xa);
286 exit(EXIT_FAILURE);
287 }
288 printf("[client] Subscribe success, returned status is '%s'\n", response);
289 xmlBlasterFree(response);
290 }
291
292 while (true) {
293 char msg[20];
294
295 printf("(Enter 'q' to exit) >> ");
296 fgets(msg, 19, stdin);
297 if (*msg == 'q')
298 break;
299 }
300
301 { /* unSubscribe ... */
302 QosArr *resp;
303 char key[256];
304 const char *qos = "<qos/>";
305 /* TODO: use subscriptionId */
306 if (!strcmp(queryType, "EXACT"))
307 sprintf(key, "<key oid='%.200s'/>", subscribeToken);
308 else if (!strcmp(queryType, "DOMAIN"))
309 sprintf(key, "<key domain='%.512s'/>", subscribeToken);
310 else
311 sprintf(key, "<key queryType='XPATH'>%.512s</key>", subscribeToken);
312 printf("[client] UnSubscribe topic '%s' ...\n", subscribeToken);
313 resp = xa->unSubscribe(xa, key, qos, &xmlBlasterException);
314 if (resp) {
315 size_t i;
316 for (i=0; i<resp->len; i++) {
317 printf("[client] Unsubscribe success, returned status is '%s'\n", resp->qosArr[i]);
318 }
319 freeQosArr(resp);
320 }
321 else {
322 printf("[client] Caught exception in unSubscribe errorCode=%s, message=%s\n",
323 xmlBlasterException.errorCode, xmlBlasterException.message);
324 xa->disconnect(xa, 0, &xmlBlasterException);
325 freeXmlBlasterAccessUnparsed(xa);
326 exit(EXIT_FAILURE);
327 }
328 }
329
330 if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
331 printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
332 xmlBlasterException.errorCode, xmlBlasterException.message);
333 freeXmlBlasterAccessUnparsed(xa);
334 exit(EXIT_FAILURE);
335 }
336
337 freeXmlBlasterAccessUnparsed(xa);
338 printf("[client] Good bye.\n");
339 return 0;
340 }
syntax highlighted by Code2HTML, v. 0.9.1