00001 /*---------------------------------------------------------------------------- 00002 Name: xmlBlaster/demo/c/socket/Publisher.c 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Demo to publish messages from command line 00006 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info> 00007 Compile: cd xmlBlaster; build.sh c 00008 (Win: copy xmlBlaster\src\c\socket\pthreadVC2.dll to your PATH) 00009 Invoke: Publisher -help 00010 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html 00011 -----------------------------------------------------------------------------*/ 00012 #include <stdio.h> 00013 #include <stdlib.h> 00014 #include <string.h> 00015 #include <XmlBlasterAccessUnparsed.h> 00016 00017 static char* readFile(const char *fn); 00018 00019 #if defined(WINCE) 00020 int _tmain(int argc, _TCHAR** argv_wcs) { /* wchar_t==_TCHAR */ 00021 char **argv = convertWcsArgv(argv_wcs, argc); 00022 #else 00023 00028 int main(int argc, const char* const* argv) { 00029 #endif 00030 int iarg, iPublish; 00031 const char *callbackSessionId = "topSecret"; 00032 XmlBlasterException xmlBlasterException; 00033 XmlBlasterAccessUnparsed *xa = 0; 00034 bool disconnect = true; 00035 bool erase = true; 00036 const char *publishToken = 0; 00037 00038 printf("[client] XmlBlaster %s C SOCKET client, try option '-help' if you need" 00039 " usage informations\n", getXmlBlasterVersion()); 00040 00041 for (iarg=0; iarg < argc; iarg++) { 00042 if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) { 00043 char usage[XMLBLASTER_MAX_USAGE_LEN]; 00044 const char *pp = 00045 "\n\nExample:" 00046 "\n Publisher -logLevel TRACE" 00047 " -dispatch/connection/plugin/socket/hostname 192.168.2.9"; 00048 printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n", 00049 getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp); 00050 exit(EXIT_FAILURE); 00051 } 00052 } 00053 00054 xa = getXmlBlasterAccessUnparsed(argc, (const char* const* )argv); 00055 if (xa->initialize(xa, 0, &xmlBlasterException) == false) { 00056 printf("[client] Connection to xmlBlaster failed," 00057 " please start the server or check your configuration\n"); 00058 freeXmlBlasterAccessUnparsed(xa); 00059 exit(EXIT_FAILURE); 00060 } 00061 00062 disconnect = xa->props->getBool(xa->props, "disconnect", disconnect); 00063 erase = xa->props->getBool(xa->props, "erase", erase); 00064 00065 { /* connect */ 00066 char *response = (char *)0; 00067 const char * const sessionName = xa->props->getString(xa->props, "session.name", "Publisher"); 00068 const char * const passwd = xa->props->getString(xa->props, "passwd", "publisher"); 00069 long sessionTimeout = xa->props->getLong(xa->props, "session.timeout", 86400000L); 00070 int maxSessions = xa->props->getInt(xa->props, "session.maxSessions", 10); 00071 const bool persistent = xa->props->getBool(xa->props, "persistentConnection", false); 00072 char connectQos[4096]; 00073 char callbackQos[1024]; 00074 sprintf(callbackQos, 00075 "<queue relating='callback' maxEntries='10000000' maxEntriesCache='10000000'>" 00076 " <callback type='SOCKET' sessionId='%.256s'>" 00077 " socket://%.120s:%d" 00078 " </callback>" 00079 "</queue>", 00080 callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB); 00081 sprintf(connectQos, 00082 "<qos>" 00083 " <securityService type='htpasswd' version='1.0'>" 00084 " <![CDATA[" 00085 " <user>%.80s</user>" 00086 " <passwd>%.40s</passwd>" 00087 " ]]>" 00088 " </securityService>" 00089 " <session name='%.80s' timeout='%ld' maxSessions='%d' clearSessions='false' reconnectSameClientOnly='false'/>" 00090 " %.20s" 00091 "%.1024s" 00092 "</qos>", sessionName, passwd, sessionName, sessionTimeout, maxSessions, persistent?"<persistent/>":"", callbackQos); 00093 00094 response = xa->connect(xa, connectQos, 0, &xmlBlasterException); 00095 if (*xmlBlasterException.errorCode != 0) { 00096 printf("[client] Caught exception during connect errorCode=%s, message=%s\n", 00097 xmlBlasterException.errorCode, xmlBlasterException.message); 00098 freeXmlBlasterAccessUnparsed(xa); 00099 exit(EXIT_FAILURE); 00100 } 00101 xmlBlasterFree(response); 00102 printf("[client] Connected to xmlBlaster, do some tests ...\n"); 00103 } 00104 00105 { /* publish ... */ 00106 char *response = (char *)0; 00107 00108 char key[4098]; 00109 const char *oid = xa->props->getString(xa->props, "oid", "Hello"); 00110 const char *domain = xa->props->getString(xa->props, "domain", 0); 00111 bool interactive = xa->props->getBool(xa->props, "interactive", true); 00112 00113 char qos[4098]; 00114 char topicQos[2048]; 00115 char destinationQos[2048]; 00116 bool oneway = xa->props->getBool(xa->props, "oneway", false); 00117 long sleep = xa->props->getLong(xa->props, "sleep", 1000L); 00118 int numPublish = xa->props->getInt(xa->props, "numPublish", 1); 00119 const char *clientTags = xa->props->getString(xa->props, "clientTags", "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>"); 00120 const char *content = xa->props->getString(xa->props, "content", "Hi-%counter"); 00121 int priority = xa->props->getInt(xa->props, "priority", 5); 00122 bool persistentPublish = xa->props->getBool(xa->props, "persistent", true); 00123 long lifeTime = xa->props->getLong(xa->props, "lifeTime", -1L); 00124 bool verbose = xa->props->getBool(xa->props, "verbose", true); 00125 bool forceUpdate = xa->props->getBool(xa->props, "forceUpdate", true); 00126 bool forceDestroy = xa->props->getBool(xa->props, "forceDestroy", false); 00127 bool readonly = xa->props->getBool(xa->props, "readonly", false); 00128 long destroyDelay = xa->props->getLong(xa->props, "destroyDelay", -1L); 00129 bool createDomEntry = xa->props->getBool(xa->props, "createDomEntry", true); 00130 long historyMaxMsg = xa->props->getLong(xa->props, "queue/history/maxEntries", 10L); 00131 long historyMaxBytes = xa->props->getLong(xa->props, "queue/history/maxBytes", 2147483647L); 00132 bool forceQueuing = xa->props->getBool(xa->props, "forceQueuing", true); 00133 bool subscribable = xa->props->getBool(xa->props, "subscribable", true); 00134 const char *destination = xa->props->getString(xa->props, "destination", 0); 00135 int contentSize = xa->props->getInt(xa->props, "contentSize", -1); 00136 const char *contentFile = xa->props->getString(xa->props, "contentFile", 0); 00137 /*Map clientPropertyMap = xa->props->getInt(xa->props, "clientProperty", (Map)0); */ 00138 00139 publishToken = (domain == 0) ? oid : domain; 00140 00141 sprintf(key, "<key oid='%.512s' domain='%.100s'>%.2000s</key>", 00142 oid, ((domain==0)?"":domain), clientTags); 00143 00144 sprintf(topicQos, 00145 " <topic readonly='%.20s' destroyDelay='%ld' createDomEntry='%.20s'>" 00146 " <persistence/>" 00147 " <queue relating='history' type='CACHE' version='1.0' maxEntries='%ld' maxBytes='%ld'/>" 00148 " </topic>", 00149 readonly?"true":"false", 00150 destroyDelay, 00151 createDomEntry?"true":"false", 00152 historyMaxMsg, 00153 historyMaxBytes 00154 ); 00155 if (destination!=0) 00156 sprintf(destinationQos, " <destination queryType='EXACT' forceQueuing='%.20s'>%.512s</destination>", 00157 forceQueuing?"true":"false", destination); 00158 else 00159 *destinationQos = 0; 00160 00161 for (iPublish=0; iPublish<numPublish || numPublish==-1; iPublish++) { 00162 char msg[20]; 00163 const char *pp = strstr(key, "%counter"); 00164 MsgUnit msgUnit; 00165 memset(&msgUnit, 0, sizeof(MsgUnit)); 00166 00167 if (interactive) { 00168 printf("[client] Hit a key to publish '%s' #%d/%d ('b' to break) >> ", oid, iPublish, numPublish); 00169 fgets(msg, 19, stdin); 00170 if (*msg == 'b') 00171 break; 00172 } 00173 else { 00174 if (sleep > 0) { 00175 sleepMillis(sleep); 00176 } 00177 if (verbose) { 00178 if (contentFile != 0) 00179 printf("[client] Publish to topic '%s' file '%s' #%d/%d\n", oid, contentFile, iPublish, numPublish); 00180 else 00181 printf("[client] Publish to topic '%s' #%d/%d\n", oid, iPublish, numPublish); 00182 } 00183 } 00184 00185 if (pp) { /* Replace '%counter' token by current index */ 00186 char *k = (char *)malloc(strlen(key)+10); 00187 strncpy(k, key, pp-key); 00188 sprintf(k+(pp-key), "%d%s", iPublish, pp+strlen("%counter")); 00189 msgUnit.key = k; 00190 } 00191 else 00192 msgUnit.key = strcpyAlloc(key); 00193 00194 if (iPublish == 1) *topicQos = 0; 00195 sprintf(qos, "<qos>" 00196 " <priority>%d</priority>" 00197 " <subscribable>%.20s</subscribable>" 00198 " <expiration lifeTime='%ld'/>" 00199 " <persistent>%.20s</persistent>" 00200 " <forceUpdate>%.20s</forceUpdate>" 00201 " <forceDestroy>%.20s</forceDestroy>" 00202 " %.2048s" 00203 " <clientProperty name='%.100s'>%.512s</clientProperty>" 00204 " %.512s" 00205 "</qos>", 00206 priority, 00207 subscribable?"true":"false", 00208 lifeTime, 00209 persistentPublish?"true":"false", 00210 forceUpdate?"true":"false", 00211 forceDestroy?"true":"false", 00212 destinationQos, 00213 "", "", /* ClientProperty */ 00214 topicQos 00215 ); 00216 00217 /*if (iPublish == 0) printf("[client] publishQos is\n%s\n", qos);*/ 00218 00219 if (contentSize > 0) { 00220 int i; 00221 char *p = (char *)malloc(contentSize); 00222 for (i=0; i<contentSize; i++) { 00223 int ran = rand() % 100; 00224 p[i] = (char)(ran+28); 00225 } 00226 msgUnit.content = p; 00227 msgUnit.contentLen = contentSize; 00228 } 00229 else if (contentFile != 0) { 00230 char* p = readFile(contentFile); 00231 msgUnit.content = p; 00232 msgUnit.contentLen = strlen(msgUnit.content); 00233 } 00234 else { 00235 const char *pc = strstr(content, "%counter"); 00236 if (pc) { /* Replace '%counter' token by current index */ 00237 char *p = (char *)malloc(strlen(content)+10); 00238 strncpy(p, content, pc-content); 00239 sprintf(p+(pc-content), "%d%s", iPublish, pc+strlen("%counter")); 00240 msgUnit.content = p; 00241 msgUnit.contentLen = strlen(msgUnit.content); 00242 } 00243 else { 00244 msgUnit.content = strcpyAlloc(content); 00245 msgUnit.contentLen = strlen(msgUnit.content); 00246 } 00247 } 00248 msgUnit.qos =strcpyAlloc(qos); 00249 if (oneway) { 00250 MsgUnitArr msgUnitArr; 00251 msgUnitArr.len = 1; 00252 msgUnitArr.msgUnitArr = &msgUnit; 00253 xa->publishOneway(xa, &msgUnitArr, &xmlBlasterException); 00254 } 00255 else { 00256 response = xa->publish(xa, &msgUnit, &xmlBlasterException); 00257 } 00258 freeMsgUnitData(&msgUnit); 00259 if (*xmlBlasterException.errorCode != 0) { 00260 printf("[client] Caught exception in publish errorCode=%s, message=%s\n", 00261 xmlBlasterException.errorCode, xmlBlasterException.message); 00262 xa->disconnect(xa, 0, &xmlBlasterException); 00263 freeXmlBlasterAccessUnparsed(xa); 00264 exit(EXIT_FAILURE); 00265 } 00266 if (verbose) { 00267 printf("[client] Publish success, returned status is '%s'\n", response); 00268 } 00269 xmlBlasterFree(response); 00270 } 00271 } 00272 00273 while (true) { 00274 char msg[20]; 00275 bool interactive = xa->props->getBool(xa->props, "interactiveQuit", true); 00276 if (!interactive) break; 00277 00278 printf("(Enter 'q' to exit) >> "); 00279 fgets(msg, 19, stdin); 00280 if (*msg == 'q') 00281 break; 00282 } 00283 00284 if (erase) { /* erase ... */ 00285 QosArr *resp; 00286 char key[256]; 00287 const char *qos = "<qos/>"; 00288 sprintf(key, "<key oid='%.200s'/>", publishToken); /* TODO: use subscriptionId */ 00289 printf("[client] Erase topic '%s' ...\n", publishToken); 00290 resp = xa->erase(xa, key, qos, &xmlBlasterException); 00291 if (resp) { 00292 size_t i; 00293 for (i=0; i<resp->len; i++) { 00294 printf("[client] Erase success, returned status is '%s'\n", resp->qosArr[i]); 00295 } 00296 freeQosArr(resp); 00297 } 00298 else { 00299 printf("[client] Caught exception in erase errorCode=%s, message=%s\n", 00300 xmlBlasterException.errorCode, xmlBlasterException.message); 00301 xa->disconnect(xa, 0, &xmlBlasterException); 00302 freeXmlBlasterAccessUnparsed(xa); 00303 exit(EXIT_FAILURE); 00304 } 00305 } 00306 00307 if (disconnect) { 00308 if (xa->disconnect(xa, 0, &xmlBlasterException) == false) { 00309 printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n", 00310 xmlBlasterException.errorCode, xmlBlasterException.message); 00311 freeXmlBlasterAccessUnparsed(xa); 00312 exit(EXIT_FAILURE); 00313 } 00314 } 00315 00316 freeXmlBlasterAccessUnparsed(xa); 00317 printf("[client] Good bye.\n"); 00318 return 0; 00319 } 00320 00321 char* readFile(const char *fn) { 00322 FILE *fp; 00323 char *retbuf = NULL; 00324 size_t nchmax = 0; 00325 register int c; 00326 size_t nchread = 0; 00327 char *newbuf; 00328 00329 if ((fp = fopen(fn, "r")) == NULL) { 00330 printf("Error Opening File %s.\n", fn); 00331 return 0; 00332 } 00333 00334 while ((c = getc(fp)) != EOF) { 00335 if (nchread >= nchmax) { 00336 nchmax += 1024; 00337 if(nchread >= nchmax) { /* in case nchmax overflowed */ 00338 free(retbuf); 00339 return NULL; 00340 } 00341 #ifdef SAFEREALLOC 00342 newbuf = realloc(retbuf, nchmax + 1); 00343 #else 00344 if (retbuf == NULL) /* in case pre-ANSI realloc */ 00345 newbuf = (char *)malloc(nchmax + 1); 00346 else newbuf = (char *)realloc(retbuf, nchmax + 1); 00347 #endif 00348 /* +1 for \0 */ 00349 if (newbuf == NULL) { 00350 free(retbuf); 00351 return NULL; 00352 } 00353 retbuf = newbuf; 00354 } 00355 retbuf[nchread++] = c; 00356 } 00357 00358 if(retbuf != NULL) { 00359 retbuf[nchread] = '\0'; 00360 newbuf = (char *)realloc(retbuf, nchread + 1); 00361 if(newbuf != NULL) 00362 retbuf = newbuf; 00363 } 00364 00365 return retbuf; 00366 }