00001 /*----------------------------------------------------------------------------- 00002 Name: TestSub.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Demo code for a client using xmlBlaster 00006 Version: $Id: TestSub.cpp 12916 2004-11-18 14:55:44Z ruff $ 00007 -----------------------------------------------------------------------------*/ 00008 #include "TestSuite.h" 00009 #include <iostream> 00010 00020 using namespace std; 00021 using namespace org::xmlBlaster::util; 00022 using namespace org::xmlBlaster::util::qos; 00023 using namespace org::xmlBlaster::util::thread; 00024 using namespace org::xmlBlaster::client; 00025 using namespace org::xmlBlaster::client::qos; 00026 using namespace org::xmlBlaster::client::key; 00027 using namespace org::xmlBlaster::authentication; 00028 00029 namespace org { namespace xmlBlaster { namespace test { 00030 00031 class SpecificCallback : public I_Callback { 00032 private: 00033 int numReceived_; 00034 string name_; 00035 I_Log& log_; 00036 00037 public: 00038 SpecificCallback(I_Log& log, const string& name) : log_(log) { 00039 name_ = name; 00040 numReceived_ = 0; 00041 } 00042 00043 int getCount() { 00044 return numReceived_; 00045 } 00046 00047 00048 string update(const string &sessionId, 00049 UpdateKey &updateKey, 00050 const unsigned char * /*content*/, long /*contentSize*/, 00051 UpdateQos &updateQos) 00052 { 00053 log_.info("update", string("Receiving update on callback '") + name_ + "' of message oid=" + 00054 updateKey.getOid() + " state=" + updateQos.getState() + 00055 " authentication sessionId=" + sessionId + " ..."); 00056 numReceived_++; 00057 return "<qos><state id='OK'/></qos>"; 00058 } 00059 00060 00061 }; 00062 00063 00064 class TestSub: public TestSuite, public virtual I_Callback 00065 { 00066 private: 00067 bool messageArrived_; // = false; 00068 int numReceived_; // = 0; // error checking 00069 string subscribeOid_; 00070 string publishOid_; // = "dummy"; 00071 string senderName_; 00072 string senderContent_; 00073 string receiverName_; // sender/receiver is here the same client 00074 string contentMime_; // = "text/xml"; 00075 string contentMimeExtended_; // = "1.0"; 00076 ConnectReturnQos returnQos_; 00077 SpecificCallback *cb1_; 00078 SpecificCallback *cb2_; 00079 SpecificCallback *cb3_; 00080 00082 enum TestType { 00083 TEST_ONEWAY, TEST_PUBLISH, TEST_ARRAY 00084 }; 00085 00092 public: 00093 TestSub(int args, char *argc[], const string &loginName) 00094 : TestSuite(args, argc, "TestSub"), returnQos_(global_) 00095 { 00096 senderName_ = loginName; 00097 receiverName_ = loginName; 00098 numReceived_ = 0; 00099 publishOid_ = "dummy"; 00100 contentMime_ = "text/xml"; 00101 contentMimeExtended_ = "1.0"; 00102 senderContent_ = "Yeahh, i'm the new content"; 00103 cb1_ = new SpecificCallback(log_, "callback1"); 00104 cb2_ = new SpecificCallback(log_, "callback2"); 00105 cb3_ = new SpecificCallback(log_, "callback3"); 00106 } 00107 00108 virtual ~TestSub() 00109 { 00110 delete cb1_; 00111 delete cb2_; 00112 delete cb3_; 00113 } 00114 00119 void setUp() 00120 { 00121 log_.info(ME, "Trying to connect to xmlBlaster with C++ client lib " + Global::getVersion() + " from " + Global::getBuildTimestamp()); 00122 TestSuite::setUp(); 00123 try { 00124 string passwd = "secret"; 00125 SecurityQos secQos(global_, senderName_, passwd); 00126 ConnectQos connQos(global_); 00127 connQos.getSessionQosRef()->setPubSessionId(3L); 00128 returnQos_ = connection_.connect(connQos, this); 00129 string name = returnQos_.getSessionQos().getAbsoluteName(); 00130 string name1 = returnQos_.getSessionQosRef()->getAbsoluteName(); 00131 assertEquals(log_, ME, name, name1, string("name comparison for reference")); 00132 00133 log_.info(ME, string("connection setup: the session name is '") + name + "'"); 00134 // Login to xmlBlaster 00135 } 00136 catch (XmlBlasterException &e) { 00137 log_.error(ME, string("Login failed: ") + e.toXml()); 00138 usage(); 00139 assert(0); 00140 } 00141 } 00142 00143 00148 void tearDown() 00149 { 00150 log_.info(ME, "Cleaning up test - erasing message."); 00151 00152 EraseKey eraseKey(global_); 00153 eraseKey.setOid(publishOid_); 00154 EraseQos eraseQos(global_); 00155 00156 vector<EraseReturnQos> retArr; 00157 try { 00158 retArr = connection_.erase(eraseKey, eraseQos); 00159 } 00160 catch(XmlBlasterException &e) { 00161 log_.error(ME, string("XmlBlasterException: ") + e.toXml()); 00162 } 00163 if (retArr.size() != 1) { 00164 log_.error(ME, "Erased " + lexical_cast<string>(retArr.size()) + " messages"); 00165 } 00166 connection_.disconnect(DisconnectQos(global_)); 00167 TestSuite::tearDown(); 00168 } 00169 00170 00175 void testSubscribeXPath() 00176 { 00177 if (log_.trace()) log_.trace(ME, "Subscribing using XPath syntax ..."); 00178 SubscribeKey subKey(global_); 00179 subKey.setQueryString("//TestSub-AGENT"); 00180 SubscribeQos subQos(global_); 00181 numReceived_ = 0; 00182 subscribeOid_ = ""; 00183 try { 00184 subscribeOid_ = connection_.subscribe(subKey, subQos).getSubscriptionId(); 00185 log_.info(ME, string("Success: Subscribe subscription-id=") + 00186 subscribeOid_ + " done"); 00187 } 00188 catch(XmlBlasterException &e) { 00189 log_.warn(ME, string("XmlBlasterException: ") 00190 + e.toXml()); 00191 cerr << "subscribe - XmlBlasterException: " << e.toXml() << endl; 00192 assert(0); 00193 } 00194 if (subscribeOid_ == "") { 00195 cerr << "returned null subscribeOid" << endl; 00196 assert(0); 00197 } 00198 if (subscribeOid_.length() == 0) { 00199 cerr << "returned subscribeOid is empty" << endl; 00200 assert(0); 00201 } 00202 } 00203 00207 void testSubscribeSpecificCallback() 00208 { 00209 if (log_.trace()) log_.trace(ME, "Subscribing using a specific callback pro subscription ..."); 00210 string oid1("oid1"); 00211 string oid2("oid2"); 00212 string oid3("oid3"); 00213 00214 SubscribeKey subKey1(global_, oid1); 00215 SubscribeKey subKey2(global_, oid2); 00216 SubscribeKey subKey3(global_, oid3); 00217 SubscribeQos subQos(global_); 00218 00219 numReceived_ = 0; 00220 subscribeOid_ = ""; 00221 try { 00222 subscribeOid_ = connection_.subscribe(subKey1, subQos, cb1_).getSubscriptionId(); 00223 /*string sub1 =*/ connection_.subscribe(subKey2, subQos, cb2_).getSubscriptionId(); 00224 /*string sub2 =*/ connection_.subscribe(subKey3, subQos, cb3_).getSubscriptionId(); 00225 00226 log_.info(ME, string("Success: Subscribe subscription-id=") + subscribeOid_ + " done"); 00227 00228 { 00229 PublishKey pubKey1(global_); 00230 pubKey1.setOid(oid1); 00231 PublishQos pubQos(global_); 00232 MessageUnit msgUnit(pubKey1, senderContent_, pubQos); 00233 connection_.publish(msgUnit); 00234 } 00235 00236 for (int i=0; i < 2; i++) { 00237 PublishKey pubKey2(global_); 00238 pubKey2.setOid(oid2); 00239 PublishQos pubQos(global_); 00240 MessageUnit msgUnit(pubKey2, senderContent_, pubQos); 00241 connection_.publish(msgUnit); 00242 } 00243 00244 for (int i=0; i < 3; i++) { 00245 PublishKey pubKey3(global_); 00246 pubKey3.setOid(oid3); 00247 PublishQos pubQos(global_); 00248 MessageUnit msgUnit(pubKey3, senderContent_, pubQos); 00249 connection_.publish(msgUnit); 00250 } 00251 00252 org::xmlBlaster::util::thread::Thread::sleep(2000L); 00253 assertEquals(log_, "specificCallback", 1, cb1_->getCount(), string("callback 1")); 00254 assertEquals(log_, "specificCallback", 2, cb2_->getCount(), string("callback 2")); 00255 assertEquals(log_, "specificCallback", 3, cb3_->getCount(), string("callback 3")); 00256 00257 UnSubscribeKey key(global_); 00258 key.setOid(oid1); 00259 UnSubscribeQos qos(global_); 00260 connection_.unSubscribe(key, qos); 00261 key.setOid(oid2); 00262 connection_.unSubscribe(key, qos); 00263 key.setOid(oid3); 00264 connection_.unSubscribe(key, qos); 00265 } 00266 catch(XmlBlasterException &e) { 00267 log_.warn(ME, string("XmlBlasterException: ") + e.toXml()); 00268 cerr << "subscribe - XmlBlasterException: " << e.toXml() << endl; 00269 assert(0); 00270 } 00271 if (subscribeOid_ == "") { 00272 cerr << "returned null subscribeOid" << endl; 00273 assert(0); 00274 } 00275 if (subscribeOid_.length() == 0) { 00276 cerr << "returned subscribeOid is empty" << endl; 00277 assert(0); 00278 } 00279 } 00280 00281 00286 void testPublishCorbaMethods(TestType testType) 00287 { 00288 if (log_.trace()) log_.trace(ME, "Publishing a message (old style) ..."); 00289 numReceived_ = 0; 00290 PublishKey pubKey(global_); 00291 pubKey.setOid(publishOid_); 00292 pubKey.setContentMime(contentMime_); 00293 pubKey.setContentMimeExtended(contentMimeExtended_); 00294 string xmlKey = string("") + 00295 " <TestSub-AGENT id='192.168.124.10' subId='1' type='generic'>" + 00296 " <TestSub-DRIVER id='FileProof' pollingFreq='10'>" + 00297 " </TestSub-DRIVER>"+ 00298 " </TestSub-AGENT>"; 00299 pubKey.setClientTags(xmlKey); 00300 00301 PublishQos pubQos(global_); 00302 MessageUnit msgUnit(pubKey, senderContent_, pubQos); 00303 try { 00304 00305 if (testType == TEST_ONEWAY) { 00306 vector<MessageUnit> msgUnitArr; 00307 msgUnitArr.insert(msgUnitArr.begin(), msgUnit); 00308 connection_.publishOneway(msgUnitArr); 00309 log_.info(ME, string("Success: Publishing oneway done (old style)")); 00310 } 00311 else if (testType == TEST_PUBLISH) { 00312 string tmp = connection_.publish(msgUnit).getKeyOid(); 00313 if (tmp.find(publishOid_) == string::npos) { 00314 log_.error(ME, "Wrong publishOid: " + tmp); 00315 assert(0); 00316 } 00317 log_.info(ME, string("Success: Publishing with ACK done (old style), returned oid=") + 00318 publishOid_); 00319 } 00320 else { 00321 vector<MessageUnit> msgUnitArr; 00322 msgUnitArr.insert(msgUnitArr.begin(), msgUnit); 00323 connection_.publishArr(msgUnitArr); 00324 log_.info(ME, string("Success: Publishing array done (old style)")); 00325 } 00326 } 00327 catch(XmlBlasterException &e) { 00328 log_.warn(ME, string("XmlBlasterException: ")+e.toXml()); 00329 assert(0); 00330 } 00331 } 00332 00333 00338 void testPublishSTLMethods(TestType testType) 00339 { 00340 if (log_.trace()) log_.trace(ME, "Publishing a message (the STL way) ..."); 00341 numReceived_ = 0; 00342 string clientTags = string("") + 00343 " <TestSub-AGENT id='192.168.124.10' subId='1' type='generic'>" + 00344 " <TestSub-DRIVER id='FileProof' pollingFreq='10'>" + 00345 " </TestSub-DRIVER>"+ 00346 " </TestSub-AGENT>"; 00347 00348 PublishKey key(global_, publishOid_, contentMime_, contentMimeExtended_); 00349 key.setClientTags(clientTags); 00350 PublishQos pubQos(global_); 00351 MessageUnit msgUnit(key, senderContent_, pubQos); 00352 try { 00353 if (testType == TEST_ONEWAY) { 00354 vector<MessageUnit> msgVec; 00355 msgVec.push_back(msgUnit); 00356 connection_.publishOneway(msgVec); 00357 log_.info(ME, string("Success: Publishing oneway done (the STL way)")); 00358 } 00359 else if (testType == TEST_PUBLISH) { 00360 string tmp = connection_.publish(msgUnit).getKeyOid(); 00361 log_.info(ME, string("the publish oid ='") + tmp + "'"); 00362 } 00363 else { 00364 vector<MessageUnit> msgVec; 00365 msgVec.push_back(msgUnit); 00366 vector<PublishReturnQos> retArr = connection_.publishArr(msgVec); 00367 log_.info(ME, string("Success: Publishing array of size " + lexical_cast<string>(retArr.size()) 00368 + " done (the STL way)")); 00369 } 00370 } 00371 catch(XmlBlasterException &e) { 00372 log_.warn(ME, string("XmlBlasterException: ")+e.toXml()); 00373 assert(0); 00374 } 00375 } 00376 00377 00382 void testPublishAfterSubscribeXPath() 00383 { 00384 testSubscribeXPath(); 00385 waitOnUpdate(1000L); 00386 // Wait some time for callback to arrive ... 00387 if (numReceived_ != 0) { 00388 log_.error(ME, "numReceived after subscribe = " + lexical_cast<string>(numReceived_)); 00389 assert(0); 00390 } 00391 00392 /* 00393 testSubscribeXPath(); 00394 waitOnUpdate(1000L); 00395 // Wait some time for callback to arrive ... 00396 if (numReceived_ != 0) { 00397 log_.error(ME, "numReceived after subscribe = " + lexical_cast<string>(numReceived_)); 00398 assert(0); 00399 } 00400 */ 00401 00402 /* 00403 testPublishCorbaMethods(TEST_ONEWAY); 00404 waitOnUpdate(2000L); 00405 if (numReceived_ != 1) { 00406 log_.error(ME,"numReceived after publishing oneway = " + lexical_cast<string>(numReceived_)); 00407 assert(0); 00408 } 00409 00410 testPublishCorbaMethods(TEST_PUBLISH); 00411 waitOnUpdate(2000L); 00412 if (numReceived_ != 1) { 00413 log_.error(ME,"numReceived after publishing with ACK = " + lexical_cast<string>(numReceived_)); 00414 assert(0); 00415 } 00416 00417 testPublishCorbaMethods(TEST_ARRAY); 00418 waitOnUpdate(2000L); 00419 if (numReceived_ != 1) { 00420 log_.error(ME,"numReceived after publishing with ACK = " + lexical_cast<string>(numReceived_)); 00421 assert(0); 00422 } 00423 */ 00424 testPublishSTLMethods(TEST_ONEWAY); 00425 waitOnUpdate(2000L); 00426 if (numReceived_ != 1) { 00427 log_.error(ME,"numReceived after publishing STL oneway = " + lexical_cast<string>(numReceived_)); 00428 assert(0); 00429 } 00430 numReceived_ = 0; 00431 00432 testPublishSTLMethods(TEST_PUBLISH); 00433 waitOnUpdate(2000L); 00434 if (numReceived_ != 1) { 00435 log_.error(ME,"numReceived after publishing STL with ACK = " + lexical_cast<string>(numReceived_)); 00436 assert(0); 00437 } 00438 numReceived_ = 0; 00439 00440 testPublishSTLMethods(TEST_ARRAY); 00441 waitOnUpdate(2000L); 00442 if (numReceived_ != 1) { 00443 log_.error(ME,"numReceived after publishing STL with ACK = " + lexical_cast<string>(numReceived_)); 00444 assert(0); 00445 } 00446 numReceived_ = 0; 00447 } 00448 00449 00464 string update(const string &sessionId, 00465 UpdateKey &updateKey, 00466 const unsigned char *content, long contentSize, 00467 UpdateQos &updateQos) 00468 { 00469 log_.info(ME, string("Receiving update of message oid=") + 00470 updateKey.getOid() + " state=" + updateQos.getState() + 00471 " authentication sessionId=" + sessionId + " ..."); 00472 numReceived_ ++; 00473 00474 string contentStr(reinterpret_cast<char *>(const_cast<unsigned char *>(content)), contentSize); 00475 00476 if (updateQos.getState() != Constants::STATE_OK && 00477 updateQos.getState() != org::xmlBlaster::util::Constants::STATE_ERASED) { 00478 log_.error(ME, "Unexpected message state=" + updateQos.getState()); 00479 assert(0); 00480 } 00481 00482 string name = returnQos_.getSessionQos().getAbsoluteName(); 00483 if (/*senderName_*/ name != updateQos.getSender()->getAbsoluteName()) { 00484 log_.error(ME, string("Wrong Sender, should be: '") + name + "' but is: '" + updateQos.getSender()->getAbsoluteName()); 00485 assert(0); 00486 } 00487 if (subscribeOid_.find(updateQos.getSubscriptionId()) == string::npos) { 00488 log_.error(ME, string("engine.qos.update.subscriptionId: ") 00489 + "Wrong subscriptionId, expected=" + subscribeOid_ + " received=" + updateQos.getSubscriptionId()); 00490 //assert(0); 00491 } 00492 if (publishOid_ != updateKey.getOid()) { 00493 log_.error(ME, "Wrong oid of message returned"); 00494 assert(0); 00495 } 00496 00497 if (updateQos.getState() == Constants::STATE_OK && senderContent_ != contentStr) { 00498 log_.error(ME, "Corrupted content expected '" + senderContent_ + "' size=" + 00499 lexical_cast<string>(senderContent_.size()) + " but was '" + contentStr + 00500 "' size=" + lexical_cast<string>(contentStr.size()) + " and contentSize=" + 00501 lexical_cast<string>(contentSize)); 00502 assert(0); 00503 } 00504 if (contentMime_ != updateKey.getContentMime()) { 00505 log_.error(ME, "Message contentMime is corrupted"); 00506 assert(0); 00507 } 00508 if (contentMimeExtended_ != updateKey.getContentMimeExtended()) { 00509 log_.error(ME, "Message contentMimeExtended is corrupted"); 00510 assert(0); 00511 } 00512 messageArrived_ = true; 00513 00514 log_.info(ME, "Success, message oid=" + updateKey.getOid() + " state=" + updateQos.getState() + " arrived as expected."); 00515 return "<qos><state id='OK'/></qos>"; 00516 } 00517 00518 00524 private: 00525 void waitOnUpdate(long timeout) { 00526 long delay = timeout; 00527 Thread::sleep(delay); 00528 /* 00529 util::StopWatch stopWatch(timeout); 00530 while (stopWatch.isRunning()) { 00531 connection_.orbPerformWork(); 00532 if (messageArrived_) { 00533 messageArrived_ = false; 00534 return; 00535 } 00536 } 00537 */ 00538 log_.warn(ME, "Timeout of " + lexical_cast<string>(timeout) + " milliseconds occured"); 00539 } 00540 00541 void usage() const 00542 { 00543 TestSuite::usage(); 00544 log_.plain(ME, "----------------------------------------------------------"); 00545 log_.plain(ME, "Testing C++/CORBA access to xmlBlaster with subscribe()"); 00546 log_.plain(ME, "Usage:"); 00547 XmlBlasterAccess::usage(); 00548 log_.usage(); 00549 log_.plain(ME, "Example:"); 00550 log_.plain(ME, " TestSub -bootstrapHostname myHost.myCompany.com -bootstrapPort 3412 -trace true"); 00551 log_.plain(ME, "----------------------------------------------------------"); 00552 } 00553 }; 00554 00555 }}} // namespace 00556 00557 using namespace org::xmlBlaster::test; 00558 00559 int main(int args, char *argc[]) 00560 { 00561 try { 00562 org::xmlBlaster::util::Object_Lifetime_Manager::init(); 00563 TestSub testSub(args, argc, "Tim"); 00564 00565 testSub.setUp(); 00566 testSub.testPublishAfterSubscribeXPath(); 00567 testSub.testSubscribeSpecificCallback(); 00568 testSub.tearDown(); 00569 00570 Thread::sleepSecs(1); 00571 } 00572 catch (XmlBlasterException& ex) { 00573 std::cout << ex.toXml() << std::endl; 00574 } 00575 catch (bad_exception& ex) { 00576 cout << "bad_exception: " << ex.what() << endl; 00577 } 00578 catch (exception& ex) { 00579 cout << " exception: " << ex.what() << endl; 00580 } 00581 catch (string& ex) { 00582 cout << "string: " << ex << endl; 00583 } 00584 catch (char* ex) { 00585 cout << "char* : " << ex << endl; 00586 } 00587 00588 catch (...) 00589 { 00590 cout << "unknown exception occured" << endl; 00591 XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread"); 00592 cout << e.toXml() << endl; 00593 } 00594 00595 org::xmlBlaster::util::Object_Lifetime_Manager::fini(); 00596 return 0; 00597 } 00598 00599