00001 /*---------------------------------------------------------------------------- 00002 Name: xmlBlaster/demo/c/socket/Subscriber.c 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Example for all remote method invocations. 00006 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info> 00007 Compile: cd xmlBlaster; build.sh c 00008 (Win: copy xmlBlaster\src\c\socket\pthreadVC2.dll to your PATH) 00009 Invoke: Subscriber -help 00010 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html 00011 -----------------------------------------------------------------------------*/ 00012 #include <stdio.h> 00013 #include <stdlib.h> 00014 #include <string.h> 00015 #include <XmlBlasterAccessUnparsed.h> 00016 00017 static const char *updateExceptionErrorCode = 0; 00018 static const char *updateExceptionMessage = 0; 00019 static const char *subscribeToken = 0; 00020 static const char *queryType; 00021 static int message_counter = 1; 00022 static long updateSleep = 0l; 00023 static bool reportUpdateProgress = false; 00024 static int64_t startTimestamp = 0ll; /* In nano sec */ 00025 static bool verbose = true; 00026 00031 static bool myUpdate(MsgUnitArr *msgUnitArr, void *userData, 00032 XmlBlasterException *exception) 00033 { 00034 size_t i; 00035 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData; 00036 if (xa != 0) ; /* Supress compiler warning */ 00037 00038 if (startTimestamp == 0ll) 00039 startTimestamp = getTimestamp(); 00040 00041 for (i=0; i<msgUnitArr->len; i++) { 00042 char *xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100); 00043 00044 const int modulo = 100; 00045 if ((message_counter % modulo) == 0) { 00046 int64_t endTimestamp = getTimestamp(); 00047 int rate = (int)(((int64_t)message_counter*1000*1000*1000)/(endTimestamp-startTimestamp)); 00048 const char *persistent = (strstr(xml, "<persistent>true</persistent>")!=NULL||strstr(xml, "<persistent/>")!=NULL) ? "persistent" : "transient"; 00049 xa->log(xa->logUserP, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_INFO, __FILE__, 00050 "Asynchronous %s message [%d] update arrived: average %d messages/second\n", 00051 persistent, message_counter, rate); 00052 } 00053 00054 if (verbose) { 00055 printf("\n[client] CALLBACK update(): Asynchronous message [%d] update arrived:%s\n", 00056 message_counter, xml); 00057 } 00058 else { 00059 if (message_counter==1) { 00060 const char *persistent = (strstr(xml, "<persistent>true</persistent>")!=NULL||strstr(xml, "<persistent/>")!=NULL) ? "persistent" : "transient"; 00061 xa->log(xa->logUserP, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_INFO, __FILE__, 00062 "Asynchronous %s message [%d] update arrived, we log every 100 again as verbose is set to false\n", 00063 persistent, message_counter); 00064 } 00065 } 00066 00067 message_counter++; 00068 00069 /*printf("arrived message :%d\n",message_counter);*/ 00070 xmlBlasterFree(xml); 00071 msgUnitArr->msgUnitArr[i].responseQos = 00072 strcpyAlloc("<qos><state id='OK'/></qos>"); 00073 /* Return QoS: Everything is OK */ 00074 00075 if (updateSleep > 0) { 00076 printf("[client] CALLBACK update(): Sleeping for %ld millis ...\n", updateSleep); 00077 sleepMillis(updateSleep); 00078 } 00079 } 00080 if (updateExceptionErrorCode) { 00081 strncpy0(exception->errorCode, updateExceptionErrorCode, 00082 XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00083 strncpy0(exception->message, updateExceptionMessage, 00084 XMLBLASTEREXCEPTION_MESSAGE_LEN); 00085 return false; 00086 } 00087 00088 00089 return true; 00090 } 00091 00097 static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes) { 00098 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed*)userP; 00099 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00100 "Update data progress currBytesRead=%ld nbytes=%ld", (long)currBytesRead, (long)nbytes); 00101 /*printf("[client] Update data progress currBytesRead=%ld nbytes=%ld\n", (long)currBytesRead, (long)nbytes);*/ 00102 } 00103 00104 #if defined(WINCE) 00105 int _tmain(int argc, _TCHAR** argv_wcs) { /* wchar_t==_TCHAR */ 00106 char **argv = convertWcsArgv(argv_wcs, argc); 00107 #else 00108 00115 int main(int argc, const char* const* argv) { 00116 #endif 00117 int iarg; 00118 const char *callbackSessionId = "topSecret"; 00119 XmlBlasterException xmlBlasterException; 00120 XmlBlasterAccessUnparsed *xa = 0; 00121 00122 printf("[client] XmlBlaster %s C SOCKET client, try option '-help' if you need" 00123 " usage informations\n", getXmlBlasterVersion()); 00124 00125 for (iarg=0; iarg < argc; iarg++) { 00126 if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) { 00127 char usage[XMLBLASTER_MAX_USAGE_LEN]; 00128 const char *pp = 00129 "\n\nExample:" 00130 "\n Subscriber -logLevel TRACE" 00131 " -dispatch/connection/plugin/socket/hostname 192.168.2.9"; 00132 printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n", 00133 getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp); 00134 exit(EXIT_FAILURE); 00135 } 00136 } 00137 00138 xa = getXmlBlasterAccessUnparsed(argc, (const char* const* )argv); 00139 if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) { 00140 printf("[client] Connection to xmlBlaster failed," 00141 " please start the server or check your configuration\n"); 00142 freeXmlBlasterAccessUnparsed(xa); 00143 exit(EXIT_FAILURE); 00144 } 00145 00146 verbose = xa->props->getBool(xa->props, "verbose", verbose); 00147 updateSleep = xa->props->getLong(xa->props, "updateSleep", 0L); 00148 reportUpdateProgress = xa->props->getBool(xa->props, "reportUpdateProgress", false); /* Report update progress */ 00149 updateExceptionErrorCode = xa->props->getString(xa->props, "updateException.errorCode", 0); /* "user.clientCode" */ 00150 updateExceptionMessage = xa->props->getString(xa->props, "updateException.message", ""); /* "I don't want these messages" */ 00151 00152 { /* connect */ 00153 char *response = (char *)0; 00154 char connectQos[4096]; 00155 char callbackQos[1024]; 00156 const char * const sessionName = xa->props->getString(xa->props, "session.name", "Subscriber"); 00157 long sessionTimeout = xa->props->getLong(xa->props, "session.timeout", 86400000L); 00158 int maxSessions = xa->props->getInt(xa->props, "session.maxSessions", 10); 00159 const bool persistent = xa->props->getBool(xa->props, "dispatch/connection/persistent", false); 00160 const long pingInterval = xa->props->getLong(xa->props, "dispatch/callback/pingInterval", 10000L); 00161 const long delay = xa->props->getLong(xa->props, "dispatch/callback/delay", 60000L); 00162 const long retries = xa->props->getLong(xa->props, "dispatch/callback/retries", 0L); /* Set to -1 to keep the session on server side during a missing client */ 00163 callbackSessionId = xa->props->getString(xa->props, "dispatch/callback/sessionId", callbackSessionId); 00164 sprintf(callbackQos, 00165 "<queue relating='callback' maxEntries='10000000' maxEntriesCache='10000000'>" 00166 " <callback type='SOCKET' sessionId='%.256s' pingInterval='%ld' retries='%ld' delay='%ld' oneway='false'>" 00167 " socket://%.120s:%d" 00168 " </callback>" 00169 "</queue>", 00170 callbackSessionId, pingInterval, retries, delay, xa->callbackP->hostCB, xa->callbackP->portCB); 00171 sprintf(connectQos, 00172 "<qos>" 00173 " <securityService type='htpasswd' version='1.0'>" 00174 " <![CDATA[" 00175 " <user>%.80s</user>" 00176 " <passwd>subscriber</passwd>" 00177 " ]]>" 00178 " </securityService>" 00179 " <session name='%.80s' timeout='%ld' maxSessions='%d' clearSessions='false' reconnectSameClientOnly='false'/>" 00180 " %.20s" 00181 "%.1024s" 00182 "</qos>", sessionName, sessionName, sessionTimeout, maxSessions, persistent?"<persistent/>":"", callbackQos); 00183 00184 response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException); 00185 if (*xmlBlasterException.errorCode != 0) { 00186 printf("[client] Caught exception during connect errorCode=%s, message=%s\n", 00187 xmlBlasterException.errorCode, xmlBlasterException.message); 00188 freeXmlBlasterAccessUnparsed(xa); 00189 exit(EXIT_FAILURE); 00190 } 00191 xmlBlasterFree(response); 00192 printf("[client] Connected to xmlBlaster, do some tests ...\n"); 00193 } 00194 00195 if (reportUpdateProgress && xa->callbackP != 0) { 00196 xa->callbackP->readFromSocket.numReadFuncP = callbackProgressListener; 00197 xa->callbackP->readFromSocket.numReadUserP = xa; 00198 } 00199 00200 { /* subscribe ... */ 00201 char *response = (char *)0; 00202 00203 char key[1024]; 00204 const char *oid = xa->props->getString(xa->props, "oid", "Hello"); 00205 const char *domain = xa->props->getString(xa->props, "domain", 0); 00206 const char *xpath = xa->props->getString(xa->props, "xpath", 0); 00207 00208 char filterQos[2048]; 00209 char qos[4098]; 00210 bool multiSubscribe = xa->props->getBool(xa->props, "multiSubscribe", true); 00211 bool persistent = xa->props->getBool(xa->props, "subscribe/qos/persistent", false); 00212 bool notifyOnErase = xa->props->getBool(xa->props, "notifyOnErase", true); 00213 bool local = xa->props->getBool(xa->props, "local", true); 00214 bool initialUpdate = xa->props->getBool(xa->props, "initialUpdate", true); 00215 bool updateOneway = xa->props->getBool(xa->props, "updateOneway", false); 00216 int historyNumUpdates = xa->props->getInt(xa->props, "historyNumUpdates", 1); 00217 bool historyNewestFirst = xa->props->getBool(xa->props, "historyNewestFirst", true); 00218 bool wantContent = xa->props->getBool(xa->props, "wantContent", true); 00219 const char *filterType = xa->props->getString(xa->props, "filter.type", "GnuRegexFilter"); 00220 const char *filterVersion = xa->props->getString(xa->props, "filter.version", "1.0"); 00221 const char *filterQuery = xa->props->getString(xa->props, "filter.query", 0); /* "^H.*$" */ 00222 bool interactiveSubscribe = xa->props->getBool(xa->props, "interactiveSubscribe", false); 00223 00224 if (domain) { 00225 sprintf(key, "<key domain='%.512s'/>", domain); 00226 subscribeToken = domain; 00227 queryType = "DOMAIN"; 00228 } 00229 else if (xpath) { 00230 sprintf(key, "<key queryType='XPATH'>%.512s</key>", xpath); 00231 subscribeToken = xpath; 00232 queryType = "XPATH"; 00233 } 00234 else { 00235 sprintf(key, "<key oid='%.512s'/>", oid); 00236 subscribeToken = oid; 00237 queryType = "EXACT"; 00238 } 00239 00240 if (filterQuery) { 00241 sprintf(filterQos, " <filter type='%.100s' version='%.50s'>%.1800s</filter>", 00242 filterType, filterVersion, filterQuery); 00243 } 00244 else 00245 *filterQos = 0; 00246 00247 sprintf(qos, "<qos>" 00248 " <content>%.20s</content>" 00249 " <multiSubscribe>%.20s</multiSubscribe>" 00250 " <persistent>%.20s</persistent>" 00251 " <local>%.20s</local>" 00252 " <initialUpdate>%.20s</initialUpdate>" 00253 " <updateOneway>%.20s</updateOneway>" 00254 " <notify>%.20s</notify>" 00255 "%.2048s" 00256 " <history numEntries='%d' newestFirst='%.20s'/>" 00257 "</qos>", 00258 wantContent?"true":"false", 00259 multiSubscribe?"true":"false", 00260 persistent?"true":"false", 00261 local?"true":"false", 00262 initialUpdate?"true":"false", 00263 updateOneway?"true":"false", 00264 notifyOnErase?"true":"false", 00265 filterQos, 00266 historyNumUpdates, 00267 historyNewestFirst?"true":"false" 00268 ); 00269 00270 printf("[client] Subscribe key: %s\n", key); 00271 printf("[client] Subscribe qos: %s\n", qos); 00272 00273 if (interactiveSubscribe) { 00274 char msg[20]; 00275 printf("(Hit a key to subscribe) >> "); 00276 fgets(msg, 19, stdin); 00277 } 00278 00279 response = xa->subscribe(xa, key, qos, &xmlBlasterException); 00280 if (*xmlBlasterException.errorCode != 0) { 00281 printf("[client] Caught exception in subscribe errorCode=%s, message=%s\n", 00282 xmlBlasterException.errorCode, xmlBlasterException.message); 00283 xa->disconnect(xa, 0, &xmlBlasterException); 00284 freeXmlBlasterAccessUnparsed(xa); 00285 exit(EXIT_FAILURE); 00286 } 00287 printf("[client] Subscribe success, returned status is '%s'\n", response); 00288 xmlBlasterFree(response); 00289 } 00290 00291 while (true) { 00292 char msg[20]; 00293 00294 printf("(Enter 'q' to exit) >> "); 00295 fgets(msg, 19, stdin); 00296 if (*msg == 'q') 00297 break; 00298 } 00299 00300 { /* unSubscribe ... */ 00301 QosArr *resp; 00302 char key[256]; 00303 const char *qos = "<qos/>"; 00304 /* TODO: use subscriptionId */ 00305 if (!strcmp(queryType, "EXACT")) 00306 sprintf(key, "<key oid='%.200s'/>", subscribeToken); 00307 else if (!strcmp(queryType, "DOMAIN")) 00308 sprintf(key, "<key domain='%.512s'/>", subscribeToken); 00309 else 00310 sprintf(key, "<key queryType='XPATH'>%.512s</key>", subscribeToken); 00311 printf("[client] UnSubscribe topic '%s' ...\n", subscribeToken); 00312 resp = xa->unSubscribe(xa, key, qos, &xmlBlasterException); 00313 if (resp) { 00314 size_t i; 00315 for (i=0; i<resp->len; i++) { 00316 printf("[client] Unsubscribe success, returned status is '%s'\n", resp->qosArr[i]); 00317 } 00318 freeQosArr(resp); 00319 } 00320 else { 00321 printf("[client] Caught exception in unSubscribe errorCode=%s, message=%s\n", 00322 xmlBlasterException.errorCode, xmlBlasterException.message); 00323 xa->disconnect(xa, 0, &xmlBlasterException); 00324 freeXmlBlasterAccessUnparsed(xa); 00325 exit(EXIT_FAILURE); 00326 } 00327 } 00328 00329 if (xa->disconnect(xa, 0, &xmlBlasterException) == false) { 00330 printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n", 00331 xmlBlasterException.errorCode, xmlBlasterException.message); 00332 freeXmlBlasterAccessUnparsed(xa); 00333 exit(EXIT_FAILURE); 00334 } 00335 00336 freeXmlBlasterAccessUnparsed(xa); 00337 printf("[client] Good bye.\n"); 00338 return 0; 00339 } 00340