1 /*-----------------------------------------------------------------------------
2 Name: TestSub.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Demo code for a client using xmlBlaster
6 Version: $Id: TestSub.cpp 12916 2004-11-18 14:55:44Z ruff $
7 -----------------------------------------------------------------------------*/
8 #include "TestSuite.h"
9 #include <iostream>
10
11 /**
12 * This client tests the method subscribe() with a later publish() with XPath
13 * query.<br />
14 * The subscribe() should be recognized for this later arriving publish()<p>
15 * This client may be invoked multiple time on the same xmlBlaster server,
16 * as it cleans up everything after his tests are done.
17 * <p>
18 */
19
20 using namespace std;
21 using namespace org::xmlBlaster::util;
22 using namespace org::xmlBlaster::util::qos;
23 using namespace org::xmlBlaster::util::thread;
24 using namespace org::xmlBlaster::client;
25 using namespace org::xmlBlaster::client::qos;
26 using namespace org::xmlBlaster::client::key;
27 using namespace org::xmlBlaster::authentication;
28
29 namespace org { namespace xmlBlaster { namespace test {
30
31 class SpecificCallback : public I_Callback {
32 private:
33 int numReceived_;
34 string name_;
35 I_Log& log_;
36
37 public:
38 SpecificCallback(I_Log& log, const string& name) : log_(log) {
39 name_ = name;
40 numReceived_ = 0;
41 }
42
43 int getCount() {
44 return numReceived_;
45 }
46
47
48 string update(const string &sessionId,
49 UpdateKey &updateKey,
50 const unsigned char * /*content*/, long /*contentSize*/,
51 UpdateQos &updateQos)
52 {
53 log_.info("update", string("Receiving update on callback '") + name_ + "' of message oid=" +
54 updateKey.getOid() + " state=" + updateQos.getState() +
55 " authentication sessionId=" + sessionId + " ...");
56 numReceived_++;
57 return "<qos><state id='OK'/></qos>";
58 }
59
60
61 };
62
63
64 class TestSub: public TestSuite, public virtual I_Callback
65 {
66 private:
67 bool messageArrived_; // = false;
68 int numReceived_; // = 0; // error checking
69 string subscribeOid_;
70 string publishOid_; // = "dummy";
71 string senderName_;
72 string senderContent_;
73 string receiverName_; // sender/receiver is here the same client
74 string contentMime_; // = "text/xml";
75 string contentMimeExtended_; // = "1.0";
76 ConnectReturnQos returnQos_;
77 SpecificCallback *cb1_;
78 SpecificCallback *cb2_;
79 SpecificCallback *cb3_;
80
81 /** Publish tests */
82 enum TestType {
83 TEST_ONEWAY, TEST_PUBLISH, TEST_ARRAY
84 };
85
86 /**
87 * Constructs the TestSub object.
88 * <p />
89 * @param testName The name used in the test suite
90 * @param loginName The name to login to the xmlBlaster
91 */
92 public:
93 TestSub(int args, char *argc[], const string &loginName)
94 : TestSuite(args, argc, "TestSub"), returnQos_(global_)
95 {
96 senderName_ = loginName;
97 receiverName_ = loginName;
98 numReceived_ = 0;
99 publishOid_ = "dummy";
100 contentMime_ = "text/xml";
101 contentMimeExtended_ = "1.0";
102 senderContent_ = "Yeahh, i'm the new content";
103 cb1_ = new SpecificCallback(log_, "callback1");
104 cb2_ = new SpecificCallback(log_, "callback2");
105 cb3_ = new SpecificCallback(log_, "callback3");
106 }
107
108 virtual ~TestSub()
109 {
110 delete cb1_;
111 delete cb2_;
112 delete cb3_;
113 }
114
115 /**
116 * Sets up the fixture. <p />
117 * Connect to xmlBlaster and login
118 */
119 void setUp()
120 {
121 log_.info(ME, "Trying to connect to xmlBlaster with C++ client lib " + Global::getVersion() + " from " + Global::getBuildTimestamp());
122 TestSuite::setUp();
123 try {
124 string passwd = "secret";
125 SecurityQos secQos(global_, senderName_, passwd);
126 ConnectQos connQos(global_);
127 connQos.getSessionQosRef()->setPubSessionId(3L);
128 returnQos_ = connection_.connect(connQos, this);
129 string name = returnQos_.getSessionQos().getAbsoluteName();
130 string name1 = returnQos_.getSessionQosRef()->getAbsoluteName();
131 assertEquals(log_, ME, name, name1, string("name comparison for reference"));
132
133 log_.info(ME, string("connection setup: the session name is '") + name + "'");
134 // Login to xmlBlaster
135 }
136 catch (XmlBlasterException &e) {
137 log_.error(ME, string("Login failed: ") + e.toXml());
138 usage();
139 assert(0);
140 }
141 }
142
143
144 /**
145 * Tears down the fixture. <p />
146 * cleaning up .... erase() the previous message OID and logout
147 */
148 void tearDown()
149 {
150 log_.info(ME, "Cleaning up test - erasing message.");
151
152 EraseKey eraseKey(global_);
153 eraseKey.setOid(publishOid_);
154 EraseQos eraseQos(global_);
155
156 vector<EraseReturnQos> retArr;
157 try {
158 retArr = connection_.erase(eraseKey, eraseQos);
159 }
160 catch(XmlBlasterException &e) {
161 log_.error(ME, string("XmlBlasterException: ") + e.toXml());
162 }
163 if (retArr.size() != 1) {
164 log_.error(ME, "Erased " + lexical_cast<string>(retArr.size()) + " messages");
165 }
166 connection_.disconnect(DisconnectQos(global_));
167 TestSuite::tearDown();
168 }
169
170
171 /**
172 * TEST: Subscribe to messages with XPATH.<p />
173 * The returned subscribeOid is checked
174 */
175 void testSubscribeXPath()
176 {
177 if (log_.trace()) log_.trace(ME, "Subscribing using XPath syntax ...");
178 SubscribeKey subKey(global_);
179 subKey.setQueryString("//TestSub-AGENT");
180 SubscribeQos subQos(global_);
181 numReceived_ = 0;
182 subscribeOid_ = "";
183 try {
184 subscribeOid_ = connection_.subscribe(subKey, subQos).getSubscriptionId();
185 log_.info(ME, string("Success: Subscribe subscription-id=") +
186 subscribeOid_ + " done");
187 }
188 catch(XmlBlasterException &e) {
189 log_.warn(ME, string("XmlBlasterException: ")
190 + e.toXml());
191 cerr << "subscribe - XmlBlasterException: " << e.toXml() << endl;
192 assert(0);
193 }
194 if (subscribeOid_ == "") {
195 cerr << "returned null subscribeOid" << endl;
196 assert(0);
197 }
198 if (subscribeOid_.length() == 0) {
199 cerr << "returned subscribeOid is empty" << endl;
200 assert(0);
201 }
202 }
203
204 /**
205 * TEST: Subscribe to messages with specific callback<p />
206 */
207 void testSubscribeSpecificCallback()
208 {
209 if (log_.trace()) log_.trace(ME, "Subscribing using a specific callback pro subscription ...");
210 string oid1("oid1");
211 string oid2("oid2");
212 string oid3("oid3");
213
214 SubscribeKey subKey1(global_, oid1);
215 SubscribeKey subKey2(global_, oid2);
216 SubscribeKey subKey3(global_, oid3);
217 SubscribeQos subQos(global_);
218
219 numReceived_ = 0;
220 subscribeOid_ = "";
221 try {
222 subscribeOid_ = connection_.subscribe(subKey1, subQos, cb1_).getSubscriptionId();
223 /*string sub1 =*/ connection_.subscribe(subKey2, subQos, cb2_).getSubscriptionId();
224 /*string sub2 =*/ connection_.subscribe(subKey3, subQos, cb3_).getSubscriptionId();
225
226 log_.info(ME, string("Success: Subscribe subscription-id=") + subscribeOid_ + " done");
227
228 {
229 PublishKey pubKey1(global_);
230 pubKey1.setOid(oid1);
231 PublishQos pubQos(global_);
232 MessageUnit msgUnit(pubKey1, senderContent_, pubQos);
233 connection_.publish(msgUnit);
234 }
235
236 for (int i=0; i < 2; i++) {
237 PublishKey pubKey2(global_);
238 pubKey2.setOid(oid2);
239 PublishQos pubQos(global_);
240 MessageUnit msgUnit(pubKey2, senderContent_, pubQos);
241 connection_.publish(msgUnit);
242 }
243
244 for (int i=0; i < 3; i++) {
245 PublishKey pubKey3(global_);
246 pubKey3.setOid(oid3);
247 PublishQos pubQos(global_);
248 MessageUnit msgUnit(pubKey3, senderContent_, pubQos);
249 connection_.publish(msgUnit);
250 }
251
252 org::xmlBlaster::util::thread::Thread::sleep(2000L);
253 assertEquals(log_, "specificCallback", 1, cb1_->getCount(), string("callback 1"));
254 assertEquals(log_, "specificCallback", 2, cb2_->getCount(), string("callback 2"));
255 assertEquals(log_, "specificCallback", 3, cb3_->getCount(), string("callback 3"));
256
257 UnSubscribeKey key(global_);
258 key.setOid(oid1);
259 UnSubscribeQos qos(global_);
260 connection_.unSubscribe(key, qos);
261 key.setOid(oid2);
262 connection_.unSubscribe(key, qos);
263 key.setOid(oid3);
264 connection_.unSubscribe(key, qos);
265 }
266 catch(XmlBlasterException &e) {
267 log_.warn(ME, string("XmlBlasterException: ") + e.toXml());
268 cerr << "subscribe - XmlBlasterException: " << e.toXml() << endl;
269 assert(0);
270 }
271 if (subscribeOid_ == "") {
272 cerr << "returned null subscribeOid" << endl;
273 assert(0);
274 }
275 if (subscribeOid_.length() == 0) {
276 cerr << "returned subscribeOid is empty" << endl;
277 assert(0);
278 }
279 }
280
281
282 /**
283 * TEST: Construct a message and publish it. <p />
284 * The returned publishOid is checked
285 */
286 void testPublishCorbaMethods(TestType testType)
287 {
288 if (log_.trace()) log_.trace(ME, "Publishing a message (old style) ...");
289 numReceived_ = 0;
290 PublishKey pubKey(global_);
291 pubKey.setOid(publishOid_);
292 pubKey.setContentMime(contentMime_);
293 pubKey.setContentMimeExtended(contentMimeExtended_);
294 string xmlKey = string("") +
295 " <TestSub-AGENT id='192.168.124.10' subId='1' type='generic'>" +
296 " <TestSub-DRIVER id='FileProof' pollingFreq='10'>" +
297 " </TestSub-DRIVER>"+
298 " </TestSub-AGENT>";
299 pubKey.setClientTags(xmlKey);
300
301 PublishQos pubQos(global_);
302 MessageUnit msgUnit(pubKey, senderContent_, pubQos);
303 try {
304
305 if (testType == TEST_ONEWAY) {
306 vector<MessageUnit> msgUnitArr;
307 msgUnitArr.insert(msgUnitArr.begin(), msgUnit);
308 connection_.publishOneway(msgUnitArr);
309 log_.info(ME, string("Success: Publishing oneway done (old style)"));
310 }
311 else if (testType == TEST_PUBLISH) {
312 string tmp = connection_.publish(msgUnit).getKeyOid();
313 if (tmp.find(publishOid_) == string::npos) {
314 log_.error(ME, "Wrong publishOid: " + tmp);
315 assert(0);
316 }
317 log_.info(ME, string("Success: Publishing with ACK done (old style), returned oid=") +
318 publishOid_);
319 }
320 else {
321 vector<MessageUnit> msgUnitArr;
322 msgUnitArr.insert(msgUnitArr.begin(), msgUnit);
323 connection_.publishArr(msgUnitArr);
324 log_.info(ME, string("Success: Publishing array done (old style)"));
325 }
326 }
327 catch(XmlBlasterException &e) {
328 log_.warn(ME, string("XmlBlasterException: ")+e.toXml());
329 assert(0);
330 }
331 }
332
333
334 /**
335 * TEST: Construct a message and publish it. <p />
336 * The returned publishOid is checked
337 */
338 void testPublishSTLMethods(TestType testType)
339 {
340 if (log_.trace()) log_.trace(ME, "Publishing a message (the STL way) ...");
341 numReceived_ = 0;
342 string clientTags = string("") +
343 " <TestSub-AGENT id='192.168.124.10' subId='1' type='generic'>" +
344 " <TestSub-DRIVER id='FileProof' pollingFreq='10'>" +
345 " </TestSub-DRIVER>"+
346 " </TestSub-AGENT>";
347
348 PublishKey key(global_, publishOid_, contentMime_, contentMimeExtended_);
349 key.setClientTags(clientTags);
350 PublishQos pubQos(global_);
351 MessageUnit msgUnit(key, senderContent_, pubQos);
352 try {
353 if (testType == TEST_ONEWAY) {
354 vector<MessageUnit> msgVec;
355 msgVec.push_back(msgUnit);
356 connection_.publishOneway(msgVec);
357 log_.info(ME, string("Success: Publishing oneway done (the STL way)"));
358 }
359 else if (testType == TEST_PUBLISH) {
360 string tmp = connection_.publish(msgUnit).getKeyOid();
361 log_.info(ME, string("the publish oid ='") + tmp + "'");
362 }
363 else {
364 vector<MessageUnit> msgVec;
365 msgVec.push_back(msgUnit);
366 vector<PublishReturnQos> retArr = connection_.publishArr(msgVec);
367 log_.info(ME, string("Success: Publishing array of size " + lexical_cast<string>(retArr.size())
368 + " done (the STL way)"));
369 }
370 }
371 catch(XmlBlasterException &e) {
372 log_.warn(ME, string("XmlBlasterException: ")+e.toXml());
373 assert(0);
374 }
375 }
376
377
378 /**
379 * TEST: Construct a message and publish it,<br />
380 * the previous XPath subscription should match and send an update.
381 */
382 void testPublishAfterSubscribeXPath()
383 {
384 testSubscribeXPath();
385 waitOnUpdate(1000L);
386 // Wait some time for callback to arrive ...
387 if (numReceived_ != 0) {
388 log_.error(ME, "numReceived after subscribe = " + lexical_cast<string>(numReceived_));
389 assert(0);
390 }
391
392 /*
393 testSubscribeXPath();
394 waitOnUpdate(1000L);
395 // Wait some time for callback to arrive ...
396 if (numReceived_ != 0) {
397 log_.error(ME, "numReceived after subscribe = " + lexical_cast<string>(numReceived_));
398 assert(0);
399 }
400 */
401
402 /*
403 testPublishCorbaMethods(TEST_ONEWAY);
404 waitOnUpdate(2000L);
405 if (numReceived_ != 1) {
406 log_.error(ME,"numReceived after publishing oneway = " + lexical_cast<string>(numReceived_));
407 assert(0);
408 }
409
410 testPublishCorbaMethods(TEST_PUBLISH);
411 waitOnUpdate(2000L);
412 if (numReceived_ != 1) {
413 log_.error(ME,"numReceived after publishing with ACK = " + lexical_cast<string>(numReceived_));
414 assert(0);
415 }
416
417 testPublishCorbaMethods(TEST_ARRAY);
418 waitOnUpdate(2000L);
419 if (numReceived_ != 1) {
420 log_.error(ME,"numReceived after publishing with ACK = " + lexical_cast<string>(numReceived_));
421 assert(0);
422 }
423 */
424 testPublishSTLMethods(TEST_ONEWAY);
425 waitOnUpdate(2000L);
426 if (numReceived_ != 1) {
427 log_.error(ME,"numReceived after publishing STL oneway = " + lexical_cast<string>(numReceived_));
428 assert(0);
429 }
430 numReceived_ = 0;
431
432 testPublishSTLMethods(TEST_PUBLISH);
433 waitOnUpdate(2000L);
434 if (numReceived_ != 1) {
435 log_.error(ME,"numReceived after publishing STL with ACK = " + lexical_cast<string>(numReceived_));
436 assert(0);
437 }
438 numReceived_ = 0;
439
440 testPublishSTLMethods(TEST_ARRAY);
441 waitOnUpdate(2000L);
442 if (numReceived_ != 1) {
443 log_.error(ME,"numReceived after publishing STL with ACK = " + lexical_cast<string>(numReceived_));
444 assert(0);
445 }
446 numReceived_ = 0;
447 }
448
449
450 /**
451 * This is the callback method (I_Callback) invoked from XmlBlasterAccess
452 * informing the client in an asynchronous mode about a new message.
453 * <p />
454 * The raw CORBA-BlasterCallback.update() is unpacked and for each arrived
455 * message this update is called.
456 *
457 * @param sessionId The sessionId to authenticate the callback
458 * This sessionId was passed on subscription
459 * we can use it to decide if we trust this update()
460 * @param updateKey The arrived key
461 * @param content The arrived message content
462 * @param qos Quality of Service of the MessageUnit
463 */
464 string update(const string &sessionId,
465 UpdateKey &updateKey,
466 const unsigned char *content, long contentSize,
467 UpdateQos &updateQos)
468 {
469 log_.info(ME, string("Receiving update of message oid=") +
470 updateKey.getOid() + " state=" + updateQos.getState() +
471 " authentication sessionId=" + sessionId + " ...");
472 numReceived_ ++;
473
474 string contentStr(reinterpret_cast<char *>(const_cast<unsigned char *>(content)), contentSize);
475
476 if (updateQos.getState() != Constants::STATE_OK &&
477 updateQos.getState() != org::xmlBlaster::util::Constants::STATE_ERASED) {
478 log_.error(ME, "Unexpected message state=" + updateQos.getState());
479 assert(0);
480 }
481
482 string name = returnQos_.getSessionQos().getAbsoluteName();
483 if (/*senderName_*/ name != updateQos.getSender()->getAbsoluteName()) {
484 log_.error(ME, string("Wrong Sender, should be: '") + name + "' but is: '" + updateQos.getSender()->getAbsoluteName());
485 assert(0);
486 }
487 if (subscribeOid_.find(updateQos.getSubscriptionId()) == string::npos) {
488 log_.error(ME, string("engine.qos.update.subscriptionId: ")
489 + "Wrong subscriptionId, expected=" + subscribeOid_ + " received=" + updateQos.getSubscriptionId());
490 //assert(0);
491 }
492 if (publishOid_ != updateKey.getOid()) {
493 log_.error(ME, "Wrong oid of message returned");
494 assert(0);
495 }
496
497 if (updateQos.getState() == Constants::STATE_OK && senderContent_ != contentStr) {
498 log_.error(ME, "Corrupted content expected '" + senderContent_ + "' size=" +
499 lexical_cast<string>(senderContent_.size()) + " but was '" + contentStr +
500 "' size=" + lexical_cast<string>(contentStr.size()) + " and contentSize=" +
501 lexical_cast<string>(contentSize));
502 assert(0);
503 }
504 if (contentMime_ != updateKey.getContentMime()) {
505 log_.error(ME, "Message contentMime is corrupted");
506 assert(0);
507 }
508 if (contentMimeExtended_ != updateKey.getContentMimeExtended()) {
509 log_.error(ME, "Message contentMimeExtended is corrupted");
510 assert(0);
511 }
512 messageArrived_ = true;
513
514 log_.info(ME, "Success, message oid=" + updateKey.getOid() + " state=" + updateQos.getState() + " arrived as expected.");
515 return "<qos><state id='OK'/></qos>";
516 }
517
518
519 /**
520 * Little helper, waits until the variable 'messageArrive' is set
521 * to true, or returns when the given timeout occurs.
522 * @param timeout in milliseconds
523 */
524 private:
525 void waitOnUpdate(long timeout) {
526 long delay = timeout;
527 Thread::sleep(delay);
528 /*
529 util::StopWatch stopWatch(timeout);
530 while (stopWatch.isRunning()) {
531 connection_.orbPerformWork();
532 if (messageArrived_) {
533 messageArrived_ = false;
534 return;
535 }
536 }
537 */
538 log_.warn(ME, "Timeout of " + lexical_cast<string>(timeout) + " milliseconds occured");
539 }
540
541 void usage() const
542 {
543 TestSuite::usage();
544 log_.plain(ME, "----------------------------------------------------------");
545 log_.plain(ME, "Testing C++/CORBA access to xmlBlaster with subscribe()");
546 log_.plain(ME, "Usage:");
547 XmlBlasterAccess::usage();
548 log_.usage();
549 log_.plain(ME, "Example:");
550 log_.plain(ME, " TestSub -bootstrapHostname myHost.myCompany.com -bootstrapPort 3412 -trace true");
551 log_.plain(ME, "----------------------------------------------------------");
552 }
553 };
554
555 }}} // namespace
556
557 using namespace org::xmlBlaster::test;
558
559 int main(int args, char *argc[])
560 {
561 try {
562 org::xmlBlaster::util::Object_Lifetime_Manager::init();
563 TestSub testSub(args, argc, "Tim");
564
565 testSub.setUp();
566 testSub.testPublishAfterSubscribeXPath();
567 testSub.testSubscribeSpecificCallback();
568 testSub.tearDown();
569
570 Thread::sleepSecs(1);
571 }
572 catch (XmlBlasterException& ex) {
573 std::cout << ex.toXml() << std::endl;
574 }
575 catch (bad_exception& ex) {
576 cout << "bad_exception: " << ex.what() << endl;
577 }
578 catch (exception& ex) {
579 cout << " exception: " << ex.what() << endl;
580 }
581 catch (string& ex) {
582 cout << "string: " << ex << endl;
583 }
584 catch (char* ex) {
585 cout << "char* : " << ex << endl;
586 }
587
588 catch (...)
589 {
590 cout << "unknown exception occured" << endl;
591 XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
592 cout << e.toXml() << endl;
593 }
594
595 org::xmlBlaster::util::Object_Lifetime_Manager::fini();
596 return 0;
597 }
syntax highlighted by Code2HTML, v. 0.9.1