testsuite/src/c++/TestSub.cpp

Go to the documentation of this file.
00001 /*-----------------------------------------------------------------------------
00002 Name:      TestSub.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Demo code for a client using xmlBlaster
00006 Version:   $Id: TestSub.cpp 12916 2004-11-18 14:55:44Z ruff $
00007 -----------------------------------------------------------------------------*/
00008 #include "TestSuite.h"
00009 #include <iostream>
00010 
00020 using namespace std;
00021 using namespace org::xmlBlaster::util;
00022 using namespace org::xmlBlaster::util::qos;
00023 using namespace org::xmlBlaster::util::thread;
00024 using namespace org::xmlBlaster::client;
00025 using namespace org::xmlBlaster::client::qos;
00026 using namespace org::xmlBlaster::client::key;
00027 using namespace org::xmlBlaster::authentication;
00028 
00029 namespace org { namespace xmlBlaster { namespace test {
00030 
00031 class SpecificCallback : public I_Callback {
00032 private:
00033    int numReceived_;
00034    string name_;
00035    I_Log& log_;
00036 
00037 public:
00038    SpecificCallback(I_Log& log, const string& name) : log_(log) {
00039       name_ = name;
00040       numReceived_ = 0;
00041    }
00042 
00043    int getCount() {
00044       return numReceived_;
00045    }
00046 
00047 
00048    string update(const string &sessionId,
00049                UpdateKey &updateKey,
00050                const unsigned char * /*content*/, long /*contentSize*/,
00051                UpdateQos &updateQos) 
00052    {
00053       log_.info("update", string("Receiving update on callback '") + name_ + "' of message oid=" +
00054                 updateKey.getOid() + " state=" + updateQos.getState() +
00055                 " authentication sessionId=" + sessionId + " ...");
00056       numReceived_++;
00057       return "<qos><state id='OK'/></qos>";
00058    }
00059 
00060 
00061 };
00062 
00063 
00064 class TestSub: public TestSuite, public virtual I_Callback 
00065 {
00066 private:
00067    bool   messageArrived_;      // = false;
00068    int    numReceived_;         //  = 0;         // error checking
00069    string subscribeOid_;
00070    string publishOid_;          // = "dummy";
00071    string senderName_;
00072    string senderContent_;
00073    string receiverName_;        // sender/receiver is here the same client
00074    string contentMime_;         // = "text/xml";
00075    string contentMimeExtended_; //  = "1.0";
00076    ConnectReturnQos returnQos_;
00077    SpecificCallback *cb1_;
00078    SpecificCallback *cb2_;
00079    SpecificCallback *cb3_;
00080 
00082    enum TestType {
00083       TEST_ONEWAY, TEST_PUBLISH, TEST_ARRAY
00084    };
00085 
00092  public:
00093    TestSub(int args, char *argc[], const string &loginName)
00094       : TestSuite(args, argc, "TestSub"), returnQos_(global_)
00095    {
00096       senderName_          = loginName;
00097       receiverName_        = loginName;
00098       numReceived_         = 0;
00099       publishOid_          = "dummy";
00100       contentMime_         = "text/xml";
00101       contentMimeExtended_ = "1.0";
00102       senderContent_       = "Yeahh, i'm the new content";
00103       cb1_ = new SpecificCallback(log_, "callback1");
00104       cb2_ = new SpecificCallback(log_, "callback2");
00105       cb3_ = new SpecificCallback(log_, "callback3");
00106    }
00107 
00108    virtual ~TestSub() 
00109    {
00110       delete cb1_;
00111       delete cb2_; 
00112       delete cb3_;
00113    }
00114 
00119    void setUp() 
00120    {
00121       log_.info(ME, "Trying to connect to xmlBlaster with C++ client lib " + Global::getVersion() + " from " + Global::getBuildTimestamp());
00122       TestSuite::setUp();
00123       try {
00124          string passwd = "secret";
00125          SecurityQos secQos(global_, senderName_, passwd);
00126          ConnectQos connQos(global_);
00127          connQos.getSessionQosRef()->setPubSessionId(3L);
00128          returnQos_ = connection_.connect(connQos, this);
00129          string name = returnQos_.getSessionQos().getAbsoluteName();
00130          string name1 = returnQos_.getSessionQosRef()->getAbsoluteName();
00131          assertEquals(log_, ME, name, name1, string("name comparison for reference"));
00132 
00133          log_.info(ME, string("connection setup: the session name is '") + name + "'");
00134          // Login to xmlBlaster
00135       }
00136       catch (XmlBlasterException &e) {
00137          log_.error(ME, string("Login failed: ") + e.toXml());
00138          usage();
00139          assert(0);
00140       }
00141    }
00142 
00143 
00148    void tearDown() 
00149    {
00150       log_.info(ME, "Cleaning up test - erasing message.");
00151 
00152       EraseKey eraseKey(global_);
00153       eraseKey.setOid(publishOid_);
00154       EraseQos eraseQos(global_);
00155 
00156       vector<EraseReturnQos> retArr;
00157       try {
00158          retArr = connection_.erase(eraseKey, eraseQos);
00159       }
00160       catch(XmlBlasterException &e) {
00161          log_.error(ME, string("XmlBlasterException: ") + e.toXml());
00162       }
00163       if (retArr.size() != 1) {
00164          log_.error(ME, "Erased " + lexical_cast<string>(retArr.size()) + " messages");
00165       }
00166       connection_.disconnect(DisconnectQos(global_));
00167       TestSuite::tearDown();
00168    }
00169 
00170 
00175    void testSubscribeXPath() 
00176    {
00177       if (log_.trace()) log_.trace(ME, "Subscribing using XPath syntax ...");
00178       SubscribeKey subKey(global_);
00179       subKey.setQueryString("//TestSub-AGENT");
00180       SubscribeQos subQos(global_);
00181       numReceived_ = 0;
00182       subscribeOid_ = "";
00183       try {
00184          subscribeOid_ = connection_.subscribe(subKey, subQos).getSubscriptionId();
00185          log_.info(ME, string("Success: Subscribe subscription-id=") +
00186                    subscribeOid_ + " done");
00187       }
00188       catch(XmlBlasterException &e) {
00189          log_.warn(ME, string("XmlBlasterException: ")
00190                       + e.toXml());
00191          cerr << "subscribe - XmlBlasterException: " << e.toXml() << endl;
00192          assert(0);
00193       }
00194       if (subscribeOid_ == "") {
00195          cerr << "returned null subscribeOid" << endl;
00196          assert(0);
00197       }
00198       if (subscribeOid_.length() == 0) {
00199          cerr << "returned subscribeOid is empty" << endl;
00200          assert(0);
00201       }
00202    }
00203 
00207    void testSubscribeSpecificCallback() 
00208    {
00209       if (log_.trace()) log_.trace(ME, "Subscribing using a specific callback pro subscription ...");
00210       string oid1("oid1");
00211       string oid2("oid2");
00212       string oid3("oid3");
00213 
00214       SubscribeKey subKey1(global_, oid1);
00215       SubscribeKey subKey2(global_, oid2);
00216       SubscribeKey subKey3(global_, oid3);
00217       SubscribeQos subQos(global_);
00218 
00219       numReceived_ = 0;
00220       subscribeOid_ = "";
00221       try {
00222          subscribeOid_ = connection_.subscribe(subKey1, subQos, cb1_).getSubscriptionId();
00223          /*string sub1 =*/ connection_.subscribe(subKey2, subQos, cb2_).getSubscriptionId();
00224          /*string sub2 =*/ connection_.subscribe(subKey3, subQos, cb3_).getSubscriptionId();
00225 
00226          log_.info(ME, string("Success: Subscribe subscription-id=") + subscribeOid_ + " done");
00227 
00228          {
00229             PublishKey pubKey1(global_);
00230             pubKey1.setOid(oid1);
00231             PublishQos pubQos(global_);
00232             MessageUnit msgUnit(pubKey1, senderContent_, pubQos);
00233             connection_.publish(msgUnit);
00234          }
00235    
00236          for (int i=0; i < 2; i++) {
00237             PublishKey pubKey2(global_);
00238             pubKey2.setOid(oid2);
00239             PublishQos pubQos(global_);
00240             MessageUnit msgUnit(pubKey2, senderContent_, pubQos);
00241             connection_.publish(msgUnit);
00242          }
00243 
00244          for (int i=0; i < 3; i++) {
00245             PublishKey pubKey3(global_);
00246             pubKey3.setOid(oid3);
00247             PublishQos pubQos(global_);
00248             MessageUnit msgUnit(pubKey3, senderContent_, pubQos);
00249             connection_.publish(msgUnit);
00250          }
00251 
00252          org::xmlBlaster::util::thread::Thread::sleep(2000L); 
00253          assertEquals(log_, "specificCallback", 1, cb1_->getCount(), string("callback 1"));
00254          assertEquals(log_, "specificCallback", 2, cb2_->getCount(), string("callback 2"));
00255          assertEquals(log_, "specificCallback", 3, cb3_->getCount(), string("callback 3"));
00256 
00257          UnSubscribeKey key(global_);
00258          key.setOid(oid1);
00259          UnSubscribeQos qos(global_);
00260          connection_.unSubscribe(key, qos);
00261          key.setOid(oid2);
00262          connection_.unSubscribe(key, qos);
00263          key.setOid(oid3);
00264          connection_.unSubscribe(key, qos);
00265       }
00266       catch(XmlBlasterException &e) {
00267          log_.warn(ME, string("XmlBlasterException: ") + e.toXml());
00268          cerr << "subscribe - XmlBlasterException: " << e.toXml() << endl;
00269          assert(0);
00270       }
00271       if (subscribeOid_ == "") {
00272          cerr << "returned null subscribeOid" << endl;
00273          assert(0);
00274       }
00275       if (subscribeOid_.length() == 0) {
00276          cerr << "returned subscribeOid is empty" << endl;
00277          assert(0);
00278       }
00279    }
00280 
00281 
00286    void testPublishCorbaMethods(TestType testType) 
00287    {
00288       if (log_.trace()) log_.trace(ME, "Publishing a message (old style) ...");
00289       numReceived_ = 0;
00290       PublishKey pubKey(global_);
00291       pubKey.setOid(publishOid_);
00292       pubKey.setContentMime(contentMime_);
00293       pubKey.setContentMimeExtended(contentMimeExtended_);
00294       string xmlKey = string("") +
00295          "   <TestSub-AGENT id='192.168.124.10' subId='1' type='generic'>" +
00296          "      <TestSub-DRIVER id='FileProof' pollingFreq='10'>" +
00297          "      </TestSub-DRIVER>"+
00298          "   </TestSub-AGENT>";
00299       pubKey.setClientTags(xmlKey);
00300 
00301       PublishQos pubQos(global_);
00302       MessageUnit msgUnit(pubKey, senderContent_, pubQos);
00303       try {
00304 
00305          if (testType == TEST_ONEWAY) {
00306             vector<MessageUnit> msgUnitArr;
00307             msgUnitArr.insert(msgUnitArr.begin(), msgUnit);
00308             connection_.publishOneway(msgUnitArr);
00309             log_.info(ME, string("Success: Publishing oneway done (old style)"));
00310          }
00311          else if (testType == TEST_PUBLISH) {
00312             string tmp = connection_.publish(msgUnit).getKeyOid();
00313             if (tmp.find(publishOid_) == string::npos) {
00314                log_.error(ME, "Wrong publishOid: " + tmp);
00315                assert(0);
00316             }
00317             log_.info(ME, string("Success: Publishing with ACK done (old style), returned oid=") +
00318                       publishOid_);
00319          }
00320          else {
00321             vector<MessageUnit> msgUnitArr;
00322             msgUnitArr.insert(msgUnitArr.begin(), msgUnit);
00323             connection_.publishArr(msgUnitArr);
00324             log_.info(ME, string("Success: Publishing array done (old style)"));
00325          }
00326       }
00327       catch(XmlBlasterException &e) {
00328          log_.warn(ME, string("XmlBlasterException: ")+e.toXml());
00329          assert(0);
00330       }
00331    }
00332 
00333 
00338    void testPublishSTLMethods(TestType testType) 
00339    {
00340       if (log_.trace()) log_.trace(ME, "Publishing a message (the STL way) ...");
00341       numReceived_ = 0;
00342       string clientTags = string("") +
00343          "   <TestSub-AGENT id='192.168.124.10' subId='1' type='generic'>" +
00344          "      <TestSub-DRIVER id='FileProof' pollingFreq='10'>" +
00345          "      </TestSub-DRIVER>"+
00346          "   </TestSub-AGENT>";
00347 
00348       PublishKey key(global_, publishOid_, contentMime_, contentMimeExtended_);
00349       key.setClientTags(clientTags);
00350       PublishQos pubQos(global_);
00351       MessageUnit msgUnit(key, senderContent_, pubQos);
00352       try {
00353          if (testType == TEST_ONEWAY) {
00354             vector<MessageUnit> msgVec;
00355             msgVec.push_back(msgUnit);
00356             connection_.publishOneway(msgVec);
00357             log_.info(ME, string("Success: Publishing oneway done (the STL way)"));
00358          }
00359          else if (testType == TEST_PUBLISH) {
00360             string tmp = connection_.publish(msgUnit).getKeyOid();
00361             log_.info(ME, string("the publish oid ='") + tmp + "'");
00362          }
00363          else {
00364             vector<MessageUnit> msgVec;
00365             msgVec.push_back(msgUnit);
00366             vector<PublishReturnQos> retArr = connection_.publishArr(msgVec);
00367             log_.info(ME, string("Success: Publishing array of size " + lexical_cast<string>(retArr.size())
00368                                    + " done (the STL way)"));
00369          }
00370       }
00371       catch(XmlBlasterException &e) {
00372          log_.warn(ME, string("XmlBlasterException: ")+e.toXml());
00373          assert(0);
00374       }
00375    }
00376 
00377 
00382    void testPublishAfterSubscribeXPath() 
00383    {
00384       testSubscribeXPath();
00385       waitOnUpdate(1000L);
00386       // Wait some time for callback to arrive ...
00387       if (numReceived_ != 0) {
00388          log_.error(ME, "numReceived after subscribe = " + lexical_cast<string>(numReceived_));
00389          assert(0);
00390       }
00391 
00392       /*
00393       testSubscribeXPath();
00394       waitOnUpdate(1000L);
00395       // Wait some time for callback to arrive ...
00396       if (numReceived_ != 0) {
00397          log_.error(ME, "numReceived after subscribe = " + lexical_cast<string>(numReceived_));
00398          assert(0);
00399       }
00400       */
00401 
00402 /*
00403       testPublishCorbaMethods(TEST_ONEWAY);
00404       waitOnUpdate(2000L);
00405       if (numReceived_ != 1) {
00406          log_.error(ME,"numReceived after publishing oneway = " + lexical_cast<string>(numReceived_));
00407          assert(0);
00408       }
00409 
00410       testPublishCorbaMethods(TEST_PUBLISH);
00411       waitOnUpdate(2000L);
00412       if (numReceived_ != 1) {
00413          log_.error(ME,"numReceived after publishing with ACK = " + lexical_cast<string>(numReceived_));
00414          assert(0);
00415       }
00416 
00417       testPublishCorbaMethods(TEST_ARRAY);
00418       waitOnUpdate(2000L);
00419       if (numReceived_ != 1) {
00420          log_.error(ME,"numReceived after publishing with ACK = " + lexical_cast<string>(numReceived_));
00421          assert(0);
00422       }
00423 */
00424       testPublishSTLMethods(TEST_ONEWAY);
00425       waitOnUpdate(2000L);
00426       if (numReceived_ != 1) {
00427          log_.error(ME,"numReceived after publishing STL oneway = " + lexical_cast<string>(numReceived_));
00428          assert(0);
00429       }
00430       numReceived_ = 0;
00431 
00432       testPublishSTLMethods(TEST_PUBLISH);
00433       waitOnUpdate(2000L);
00434       if (numReceived_ != 1) {
00435          log_.error(ME,"numReceived after publishing STL with ACK = " + lexical_cast<string>(numReceived_));
00436          assert(0);
00437       }
00438       numReceived_ = 0;
00439 
00440       testPublishSTLMethods(TEST_ARRAY);
00441       waitOnUpdate(2000L);
00442       if (numReceived_ != 1) {
00443          log_.error(ME,"numReceived after publishing STL with ACK = " + lexical_cast<string>(numReceived_));
00444          assert(0);
00445       }
00446       numReceived_ = 0;
00447    }
00448 
00449 
00464    string update(const string &sessionId,
00465                UpdateKey &updateKey,
00466                const unsigned char *content, long contentSize,
00467                UpdateQos &updateQos) 
00468    {
00469       log_.info(ME, string("Receiving update of message oid=") +
00470                 updateKey.getOid() + " state=" + updateQos.getState() +
00471                 " authentication sessionId=" + sessionId + " ...");
00472       numReceived_ ++;
00473 
00474       string contentStr(reinterpret_cast<char *>(const_cast<unsigned char *>(content)), contentSize);
00475 
00476       if (updateQos.getState() != Constants::STATE_OK &&
00477           updateQos.getState() != org::xmlBlaster::util::Constants::STATE_ERASED) {
00478          log_.error(ME, "Unexpected message state=" + updateQos.getState());
00479          assert(0);
00480       }
00481 
00482       string name = returnQos_.getSessionQos().getAbsoluteName();
00483       if (/*senderName_*/ name != updateQos.getSender()->getAbsoluteName()) {
00484          log_.error(ME, string("Wrong Sender, should be: '") + name + "' but is: '" + updateQos.getSender()->getAbsoluteName());
00485          assert(0);
00486       }
00487       if (subscribeOid_.find(updateQos.getSubscriptionId()) == string::npos) {
00488          log_.error(ME, string("engine.qos.update.subscriptionId: ")
00489                     + "Wrong subscriptionId, expected=" + subscribeOid_ + " received=" + updateQos.getSubscriptionId());
00490          //assert(0);
00491       }
00492       if (publishOid_ != updateKey.getOid()) {
00493          log_.error(ME, "Wrong oid of message returned");
00494          assert(0);
00495       }
00496 
00497       if (updateQos.getState() == Constants::STATE_OK && senderContent_ != contentStr) {
00498          log_.error(ME, "Corrupted content expected '" + senderContent_ + "' size=" +
00499                            lexical_cast<string>(senderContent_.size()) + " but was '" + contentStr +
00500                            "' size=" + lexical_cast<string>(contentStr.size()) + " and contentSize=" +
00501                            lexical_cast<string>(contentSize));
00502          assert(0);
00503       }
00504       if (contentMime_ != updateKey.getContentMime()) {
00505          log_.error(ME, "Message contentMime is corrupted");
00506          assert(0);
00507       }
00508       if (contentMimeExtended_ != updateKey.getContentMimeExtended()) {
00509          log_.error(ME, "Message contentMimeExtended is corrupted");
00510          assert(0);
00511       }
00512       messageArrived_ = true;
00513 
00514       log_.info(ME, "Success, message oid=" + updateKey.getOid() + " state=" + updateQos.getState() + " arrived as expected.");
00515       return "<qos><state id='OK'/></qos>";
00516    }
00517 
00518 
00524 private:
00525    void waitOnUpdate(long timeout) {
00526       long delay = timeout;
00527       Thread::sleep(delay);
00528 /*
00529       util::StopWatch stopWatch(timeout);
00530       while (stopWatch.isRunning()) {
00531          connection_.orbPerformWork();
00532          if (messageArrived_) {
00533             messageArrived_ = false;
00534             return;
00535          }
00536       }
00537 */
00538       log_.warn(ME, "Timeout of " + lexical_cast<string>(timeout) + " milliseconds occured");
00539    }
00540 
00541    void usage() const
00542    {
00543       TestSuite::usage();
00544       log_.plain(ME, "----------------------------------------------------------");
00545       log_.plain(ME, "Testing C++/CORBA access to xmlBlaster with subscribe()");
00546       log_.plain(ME, "Usage:");
00547       XmlBlasterAccess::usage();
00548       log_.usage();
00549       log_.plain(ME, "Example:");
00550       log_.plain(ME, "   TestSub -bootstrapHostname myHost.myCompany.com -bootstrapPort 3412 -trace true");
00551       log_.plain(ME, "----------------------------------------------------------");
00552    }
00553 };
00554 
00555 }}} // namespace
00556 
00557 using namespace org::xmlBlaster::test;
00558 
00559 int main(int args, char *argc[]) 
00560 {
00561    try {
00562       org::xmlBlaster::util::Object_Lifetime_Manager::init();
00563       TestSub testSub(args, argc, "Tim");
00564  
00565       testSub.setUp();
00566       testSub.testPublishAfterSubscribeXPath();
00567       testSub.testSubscribeSpecificCallback();
00568       testSub.tearDown();
00569 
00570       Thread::sleepSecs(1);
00571    }
00572    catch (XmlBlasterException& ex) {
00573       std::cout << ex.toXml() << std::endl;
00574    }
00575    catch (bad_exception& ex) {
00576       cout << "bad_exception: " << ex.what() << endl;
00577    }
00578    catch (exception& ex) {
00579       cout << " exception: " << ex.what() << endl;
00580    }
00581    catch (string& ex) {
00582       cout << "string: " << ex << endl;
00583    }
00584    catch (char* ex) {
00585       cout << "char* :  " << ex << endl;
00586    }
00587 
00588    catch (...)
00589    {
00590       cout << "unknown exception occured" << endl;
00591       XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
00592       cout << e.toXml() << endl;
00593    }
00594 
00595    org::xmlBlaster::util::Object_Lifetime_Manager::fini();
00596    return 0;
00597 }
00598 
00599