1 /*----------------------------------------------------------------------------
2 Name: xmlBlaster/testsuite/src/c/TestStress.c
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Test C client library
6 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info>
7 Compile: cd xmlBlaster; build c
8 Invoke: Start 'java org.xmlBlaster.Main' and then 'TestStress'
9 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/c.client.socket.html
10 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
11 -----------------------------------------------------------------------------*/
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <XmlBlasterAccessUnparsed.h>
16 #include "test.h"
17
18 static int argc = 0;
19 static char** argv = 0;
20 #define ERRORSTR_LEN 4096
21 static char errorString[ERRORSTR_LEN+1];
22 static char updateContent[256];
23 static void *updateUserData;
24 static const char *CONTENT = "Some message payload";
25 static size_t updateCounter = 0;
26
27 /**
28 * Here we receive the callback messages from xmlBlaster
29 * mu_assert() does not help here as it is another thread
30 */
31 static bool myUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException)
32 {
33 size_t i;
34 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
35 if (xmlBlasterException != 0) ; /* Supress compiler warning */
36 updateUserData = xa;
37 updateCounter += msgUnitArr->len;
38 for (i=0; i<msgUnitArr->len; i++) {
39 MsgUnit *msg = &msgUnitArr->msgUnitArr[i];
40 if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
41 xa->log(0, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "CALLBACK update(): Asynchronous message update arrived\n");
42 strncpy0(updateContent, msg->content, msg->contentLen+1); /* Adds '\0' to the end */
43 msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
44 }
45 return true;
46 }
47
48 /**
49 * Invoke: TestStress -logLevel TRACE
50 */
51 static const char * test_stress()
52 {
53 char *response = (char *)0;
54 /*
55 * callbackSessionId:
56 * Is created by the client and used to validate callback messages in update.
57 * This is sent on connect in ConnectQos.
58 * (Is different from the xmlBlaster secret session ID)
59 */
60 const char *callbackSessionId = "topSecret";
61 XmlBlasterException xmlBlasterException;
62 XmlBlasterAccessUnparsed *xa = 0;
63 bool retBool;
64 int iPub, iWait, numPublish, maxQueueEntries, maxSessions=10;
65 const char *sessionName = "joe";
66
67 xa = getXmlBlasterAccessUnparsed(argc, (const char* const*)argv);
68 if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) {
69 freeXmlBlasterAccessUnparsed(xa);
70 mu_fail("[TEST FAIL] Connection to xmlBlaster failed, please start the server or check your configuration");
71 }
72
73 numPublish = xa->props->getInt(xa->props, "numPublish", 2500);
74 maxQueueEntries = xa->props->getInt(xa->props, "queue/callback/maxEntries", numPublish);
75 sessionName = xa->props->getString(xa->props, "session.name", sessionName);
76 maxSessions = xa->props->getInt(xa->props, "session.maxSessions", maxSessions);
77
78 { /* connect */
79 char connectQos[2048];
80 char callbackQos[1024];
81 sprintf(callbackQos,
82 "<queue relating='callback' maxEntries='%d' maxEntriesCache='%d'>"
83 " <callback type='SOCKET' sessionId='%s'>"
84 " socket://%.120s:%d"
85 " </callback>"
86 "</queue>",
87 maxQueueEntries, maxQueueEntries, callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB);
88 sprintf(connectQos,
89 "<qos>"
90 " <securityService type='htpasswd' version='1.0'>"
91 " <![CDATA["
92 " <user>fritz</user>"
93 " <passwd>secret</passwd>"
94 " ]]>"
95 " </securityService>"
96 " <session name='%.120s' timeout='3600000' maxSessions='%d' clearSessions='false' reconnectSameClientOnly='false'/>"
97 "%.1024s"
98 "</qos>", sessionName, maxSessions, callbackQos);
99
100 response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException);
101 if (*xmlBlasterException.errorCode != '\0') {
102 SNPRINTF(errorString, ERRORSTR_LEN, "[TEST FAIL] Caught exception during connect errorCode=%s, message=%s\n",
103 xmlBlasterException.errorCode, xmlBlasterException.message);
104 freeXmlBlasterAccessUnparsed(xa);
105 mu_assert(errorString, false);
106 }
107 xmlBlasterFree(response);
108 printf("[client] Connected to xmlBlaster, do some tests ...\n");
109 }
110
111 { /* subscribe ... */
112 const char *key = "<key oid='TestStress'/>";
113 const char *qos = "<qos/>";
114 printf("[client] Subscribe message 'TestStress' ...\n");
115 response = xa->subscribe(xa, key, qos, &xmlBlasterException);
116 if (*xmlBlasterException.errorCode != 0) {
117 SNPRINTF(errorString, ERRORSTR_LEN, "[TEST FAIL] Caught exception in subscribe errorCode=%s, message=%s\n",
118 xmlBlasterException.errorCode, xmlBlasterException.message);
119 freeXmlBlasterAccessUnparsed(xa);
120 mu_assert(errorString, false);
121 }
122 printf("[client] Subscribe success\n");
123 mu_assert("Subscribe response is invalid", strstr(response, "subscribe id=")!=0);
124 mu_assert("Subscribe response is invalid", strstr(response, "WARNING")==0);
125 mu_assert("Subscribe response is invalid", strstr(response, "ERROR")==0);
126 xmlBlasterFree(response);
127 }
128
129 printf("[client] Publishing %d messages 'TestStress' ...\n", numPublish);
130 for (iPub=0; iPub<numPublish; iPub++) {
131 char tmp[200];
132 MsgUnit msgUnit;
133 memset(&msgUnit, 0, sizeof(MsgUnit));
134 msgUnit.key = strcpyAlloc("<key oid='TestStress'/>");
135 sprintf(tmp, "#%d %s", (iPub+1), CONTENT);
136 msgUnit.content = strcpyAlloc(tmp);
137 msgUnit.contentLen = strlen(msgUnit.content);
138 msgUnit.qos =strcpyAlloc("<qos><persistent>false</persistent></qos>");
139 response = xa->publish(xa, &msgUnit, &xmlBlasterException);
140 freeMsgUnitData(&msgUnit);
141 if (*xmlBlasterException.errorCode != '\0') {
142 SNPRINTF(errorString, ERRORSTR_LEN, "[TEST FAIL] Caught exception in publish #%d errorCode=%s, message=%s\n",
143 iPub, xmlBlasterException.errorCode, xmlBlasterException.message);
144 freeXmlBlasterAccessUnparsed(xa);
145 mu_assert(errorString, false);
146 }
147 if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
148 xa->log(0, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Publish #%d messages success\n", iPub);
149 mu_assert("Publish response is invalid", strstr(response, "rcvTimestamp nanos=")!=0);
150 xmlBlasterFree(response);
151 }
152
153 for (iWait=0; iWait<10; iWait++) {
154 printf("[client] Publish of %d messages success, received %d updates\n", numPublish, (int32_t)updateCounter);
155 if ((int)updateCounter >= numPublish)
156 break;
157 sleepMillis(500);
158 }
159
160 mu_assert("No update arrived", *updateContent != '\0');
161 if ((int)updateCounter < numPublish) {
162 freeXmlBlasterAccessUnparsed(xa);
163 mu_assert("Missing updates", (int)updateCounter == numPublish);
164 }
165 else if ((int)updateCounter > numPublish) {
166 printf("[client] WARN: Publish of %d messages but received %d updates\n", numPublish, (int32_t)updateCounter);
167 }
168 printf("[client] updateContent = %s, CONTENT = %s\n", updateContent, CONTENT);
169 mu_assert("Received wrong message in update()", strstr(updateContent, CONTENT) != 0);
170 *updateContent = '\0';
171
172 mu_assert("UserData from update() is invalid", updateUserData == xa);
173
174
175 { /* erase ... */
176 QosArr* responseArrP;
177 const char *key = "<key oid='TestStress'/>";
178 const char *qos = "<qos/>";
179 printf("[client] Erasing message 'TestStress' ...\n");
180 responseArrP = xa->erase(xa, key, qos, &xmlBlasterException);
181 if (*xmlBlasterException.errorCode != '\0') {
182 SNPRINTF(errorString, ERRORSTR_LEN, "[TEST FAIL] Caught exception in erase() errorCode=%s, message=%s\n",
183 xmlBlasterException.errorCode, xmlBlasterException.message);
184 freeXmlBlasterAccessUnparsed(xa);
185 mu_assert(errorString, false);
186 }
187 printf("[client] Erase success\n");
188 freeQosArr(responseArrP);
189 }
190
191 retBool = xa->disconnect(xa, 0, &xmlBlasterException);
192 if (*xmlBlasterException.errorCode != '\0') {
193 SNPRINTF(errorString, ERRORSTR_LEN, "[TEST FAIL] Caught exception in erase() errorCode=%s, message=%s\n",
194 xmlBlasterException.errorCode, xmlBlasterException.message);
195 freeXmlBlasterAccessUnparsed(xa);
196 mu_assert(errorString, false);
197 }
198 mu_assert("disconnect() returned false", retBool == true);
199
200 if (*updateContent != '\0') { /* The erase event is sent as update as well */
201 *updateContent = '\0';
202 }
203
204 freeXmlBlasterAccessUnparsed(xa);
205 printf("[client] Good bye.\n");
206 return 0;
207 }
208
209
210 static const char *all_tests()
211 {
212 mu_run_test(test_stress);
213 return 0;
214 }
215
216 int main(int argc_, char **argv_)
217 {
218 const char *result;
219 argc = argc_;
220 argv = argv_;
221
222 result = all_tests();
223
224 if (result != 0) {
225 printf("%s\n", result);
226 }
227 else {
228 printf("ALL TESTS PASSED\n");
229 }
230 printf("Tests run: %d\n", tests_run);
231
232 return result != 0;
233 }
syntax highlighted by Code2HTML, v. 0.9.1