testsuite/src/c++/TestFailsafe.cpp

Go to the documentation of this file.
00001 /*-----------------------------------------------------------------------------
00002 Name:      TestFailsafe.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Testing the Timeout Features
00006 -----------------------------------------------------------------------------*/
00007 #include "TestSuite.h"
00008 #include <iostream>
00009 
00010 namespace org { namespace xmlBlaster { namespace test {
00011 
00012 using namespace std;
00013 using namespace org::xmlBlaster::util;
00014 using namespace org::xmlBlaster::util::qos;
00015 using namespace org::xmlBlaster::util::dispatch;
00016 using namespace org::xmlBlaster::util::thread;
00017 using namespace org::xmlBlaster::util::qos::address;
00018 using namespace org::xmlBlaster::client;
00019 using namespace org::xmlBlaster::client::qos;
00020 using namespace org::xmlBlaster::client::key;
00021 
00022 class TestFailsafe : public virtual I_Callback, public virtual I_ConnectionProblems, public TestSuite
00023 {
00024 private:
00025    ConnectQos       *connQos_;
00026    ConnectReturnQos *connRetQos_;
00027    SubscribeQos     *subQos_;
00028    SubscribeKey     *subKey_;
00029    PublishQos       *pubQos_;
00030    PublishKey       *pubKey_;
00031    Mutex            updateMutex_;
00032    bool             isConnected_;
00033    int              numOfUpdates_;
00034    bool useSessionMarker_;  // Remove again at version 2.0
00035 
00036 public:
00037    TestFailsafe(int args, char ** argv) 
00038       : TestSuite(args, argv, "TestFailsafe"),
00039         updateMutex_()
00040    {
00041       connQos_        = 0;
00042       connRetQos_     = 0;
00043       subQos_         = 0;
00044       subKey_         = 0;
00045       pubQos_         = 0;
00046       pubKey_         = 0;
00047       isConnected_    = false;
00048       numOfUpdates_   = 0;
00049 
00050       SessionName sn(global_, "client/dummy");
00051       useSessionMarker_ = sn.useSessionMarker();
00052    }
00053 
00054 
00055    virtual ~TestFailsafe()
00056    {
00057       if (log_.call()) log_.call(ME, "destructor");
00058       delete connQos_;
00059       delete connRetQos_;
00060       delete subQos_;
00061       delete subKey_;
00062       delete pubQos_;
00063       delete pubKey_;
00064       if (log_.trace()) log_.trace(ME, "destructor ended");
00065    }
00066 
00067    bool reachedAlive(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
00068    {
00069       log_.info(ME, "reconnected");
00070       isConnected_ = true;
00071       return true;
00072    }
00073 
00074    void reachedDead(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
00075    {
00076       log_.info(ME, "lost connection");
00077       isConnected_ = false;
00078    }
00079 
00080    void reachedPolling(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
00081    {
00082       log_.info(ME, "going to poll modus");
00083       isConnected_ = false;
00084    }
00085 
00086    AddressBaseRef getAddress() {
00087       AddressBaseRef address = new Address(global_);
00088       address->setDelay(1000);
00089       address->setPingInterval(1000);
00090       return address;
00091    }
00092 
00093    void setUp()
00094    {
00095       TestSuite::setUp();
00096       try {   
00097          connection_.initFailsafe(this);
00098 
00099          connQos_ = new ConnectQos(global_, "guy", "secret");
00100          connQos_->setAddress(getAddress());
00101          log_.info(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos_->toXml());
00102          // Login to xmlBlaster, register for updates
00103          connRetQos_ = new ConnectReturnQos(connection_.connect(*connQos_, this));  
00104          log_.info(ME, "successfully connected to xmlBlaster. Return qos: " + connRetQos_->toXml());
00105 
00106          subKey_ = new SubscribeKey(global_);
00107          subKey_->setOid("TestFailsafe");
00108          subQos_ = new SubscribeQos(global_);
00109          log_.info(ME, string("subscribing to xmlBlaster with key: ") + subKey_->toXml() + " and qos: " + subQos_->toXml());
00110 
00111          SubscribeReturnQos subRetQos = connection_.subscribe(*subKey_, *subQos_);
00112          log_.info(ME, string("successfully subscribed to xmlBlaster. Return qos: ") + subRetQos.toXml());
00113       }
00114       catch (XmlBlasterException& ex) {
00115          log_.error(ME, string("exception occurred in setUp. ") + ex.toXml());
00116          assert(0);
00117       }
00118    }
00119 
00120 
00128    void testReconnect()
00129    {
00130       log_.info(ME, "testReconnect START");
00131       tearDown();
00132       // DisconnectQos disconnectQos(global_);
00133       // connection_.disconnect(disconnectQos);
00134       Thread::sleep(500);
00135       if (useEmbeddedServer_) {
00136          stopEmbeddedServer();
00137          Thread::sleepSecs(2);
00138       }
00139       else {
00140          waitOnKeyboardHit("Please stop the server now and hit 'c' to continue >> ");
00141          //log_.info(ME, "please stop the server now (I will wait 20 s)");
00142          //Thread::sleepSecs(20);
00143       }
00144       log_.info(ME, "the communication is now down: ready to start the tests");
00145       ConnectQos connQos(global_);
00146       connQos.setAddress(getAddress());
00147       SessionQos sessionQos(global_,"client/Fritz/-2");
00148       connQos.setSessionQos(sessionQos);
00149       bool wentInException = false;
00150 
00151       try {
00152          connection_.connect(connQos, this);
00153       }
00154       catch (XmlBlasterException &ex) {
00155          log_.info(ME, "Exception is wanted: " + ex.toString());
00156          wentInException = true;
00157       }   
00158       assertEquals(log_, ME, true, wentInException, "reconnecting when communication down and not giving positive publicSessionId: exception must be thrown");
00159 
00160       sessionQos = SessionQos(global_,"client/Fritz/-1");
00161       connQos.setSessionQos(sessionQos);
00162       wentInException = false;
00163       try {
00164          connection_.connect(connQos, this);
00165       }
00166       catch (XmlBlasterException &ex) {
00167          log_.info(ME, "Exception is wanted: " + ex.toString());
00168          wentInException = true;
00169       }   
00170       assertEquals(log_, ME, true, wentInException, "reconnecting for the second time when communication down and not giving positive publicSessionId: exception must be thrown (again)");
00171 
00172       log_.info(ME, "TESTING FAIL SAFE ...");
00173       sessionQos = SessionQos(global_,"client/Fritz/7");
00174       connQos.setSessionQos(sessionQos);
00175       wentInException = false;
00176       try {
00177          ConnectReturnQos retQos = connection_.connect(connQos, this);
00178          string name = retQos.getSessionQos().getRelativeName();
00179          if (useSessionMarker_)
00180             assertEquals(log_, ME, string("client/Fritz/session/7"), name, "checking that return qos has the correct sessionId");
00181          else
00182             assertEquals(log_, ME, string("client/Fritz/7"), name, "checking that return qos has the correct sessionId");
00183       }
00184       catch (XmlBlasterException &ex) {
00185          log_.error(ME, ex.toXml());
00186          wentInException = true;
00187       }   
00188       assertEquals(log_, ME, false, wentInException, "reconnecting when communication down and giving positive publicSessionId: no exception expected");
00189 
00190       sessionQos = SessionQos(global_,"client/Fritz/2");
00191       connQos.setSessionQos(sessionQos);
00192       wentInException = false;
00193       try {
00194          connection_.connect(connQos, this);
00195       }
00196       catch (XmlBlasterException &/*ex*/) {
00197          wentInException = true;
00198       }   
00199       assertEquals(log_, ME, false, wentInException, "reconnecting second time when communication down and giving positive publicSessionId: no exception expected but a warning should have come");
00200 
00201 
00202       DisconnectQos discQos(global_);
00203       wentInException = false;
00204       try {
00205          connection_.disconnect(discQos);
00206       }
00207       catch (XmlBlasterException &/*ex*/) {
00208          wentInException = true;
00209       }   
00210       assertEquals(log_, ME, true, wentInException, "disconnecting when no communication should give an exception");
00211 
00212       // and now we are reconnecting ...
00213       if (useEmbeddedServer_) {
00214          startEmbeddedServer();
00215          Thread::sleepSecs(1);
00216       }
00217       else {
00218          for (int i=0; i < 30; i++) {
00219             if (isConnected_) break;
00220             log_.info(ME, "please restart the server now");
00221             Thread::sleepSecs(2);
00222             if (connection_.isAlive()) {
00223                break;
00224             }
00225          }
00226       }
00227 
00228       // making  a subscription now should work ...
00229       SubscribeKey subKey(global_);
00230       subKey.setOid("TestReconnect");
00231       SubscribeQos subQos(global_);
00232       wentInException = false;
00233       try {
00234          connection_.subscribe(subKey, subQos);
00235       }
00236       catch (XmlBlasterException &ex) {
00237          wentInException = true;
00238          log_.info(ME, string("exception when subscribing: ") + ex.toXml());
00239       }   
00240       assertEquals(log_, ME, false, wentInException, "subscribing when communication should not give an exception");
00241 
00242       log_.info(ME, "disconnecting now the newly established connection");
00243       connection_.disconnect(DisconnectQos(global_));
00244       log_.info(ME, "going to call setUp to reestablish the initial setup");
00245 
00246       setUp();
00247 
00248       // publishing something to make it happy
00249       PublishQos pubQos(global_);
00250       PublishKey pubKey(global_);
00251       pubKey.setOid("TestFailsafe");
00252 
00253       string msg = "dummy";
00254       MessageUnit msgUnit(pubKey, msg, pubQos);
00255       connection_.publish(msgUnit);
00256 
00257       log_.info(ME, "testReconnect END");
00258    }
00259 
00260 
00261    void testFailsafe() 
00262    {
00263       int imax = 30;
00264       try {
00265          pubQos_ = new PublishQos(global_);
00266          pubKey_ = new PublishKey(global_);
00267          pubKey_->setOid("TestFailsafe");
00268 
00269          for (int i=0; i < imax; i++) {
00270             string msg = lexical_cast<string>(i);
00271             MessageUnit msgUnit(*pubKey_, msg, *pubQos_);
00272             log_.info(ME, string("publishing msg '") + msg + "'");
00273             /*PublishReturnQos pubRetQos =*/ connection_.publish(msgUnit);
00274 
00275             if (i == 2) stopEmbeddedServer();
00276             if (i == 12) startEmbeddedServer();
00277             try {
00278                Thread::sleepSecs(1);
00279             }
00280             catch(XmlBlasterException e) {
00281                cout << e.toXml() << endl;
00282             }
00283 
00284          }
00285       }
00286       catch (XmlBlasterException& ex) {
00287          log_.error(ME, string("exception occurred in testFailSafe. ") + ex.toXml());
00288          assert(0);
00289       }
00290 
00291       int i = 0;
00292       while (numOfUpdates_ < (imax-1) && i < 100) {
00293          i++;
00294          Thread::sleep(100);
00295       }
00296 
00297 
00298    }
00299 
00300 
00301    void tearDown()
00302    {
00303       try {
00304          EraseKey eraseKey(global_);
00305          eraseKey.setOid("TestFailsafe");
00306          EraseQos eraseQos(global_);
00307          log_.info(ME, string("erasing the published message. Key: ") + eraseKey.toXml() + " qos: " + eraseQos.toXml());
00308          vector<EraseReturnQos> eraseRetQos = connection_.erase(eraseKey, eraseQos);
00309          for (size_t i=0; i < eraseRetQos.size(); i++ ) {
00310             log_.info(ME, string("successfully erased the message. return qos: ") + eraseRetQos[i].toXml());
00311          }
00312 
00313          // log_.info(ME, "going to sleep for one minute");
00314          // org::xmlBlaster::util::thread::Thread::sleep(60000);
00315 
00316          DisconnectQos disconnectQos(global_);
00317          connection_.disconnect(disconnectQos);
00318       }
00319       catch (XmlBlasterException& ex) {
00320          log_.error(ME, string("exception occurred in tearDown. ") + ex.toXml());
00321          assert(0);
00322       }
00323 
00324       delete connQos_; connQos_ = 0;
00325       delete subQos_; subQos_ = 0;
00326       delete subKey_; subKey_ = 0;
00327       delete connRetQos_; connRetQos_ = 0;
00328       delete pubQos_; pubQos_ = 0;
00329       delete pubKey_; pubKey_ = 0;
00330 
00331       TestSuite::tearDown();
00332    }
00333 
00334    string update(const string& sessionId, UpdateKey& updateKey, const unsigned char *content, long contentSize, UpdateQos& updateQos)
00335    {
00336       Lock lock(updateMutex_);
00337       if (log_.trace()) log_.trace(ME, "update: session: " + sessionId);
00338       if (log_.trace()) log_.trace(ME, "update: key    : " + updateKey.toXml());
00339       if (log_.trace()) log_.trace(ME, "update: qos    : " + updateQos.toXml());
00340       string help((char*)content, (char*)(content)+contentSize);
00341       if (log_.trace()) log_.trace(ME, "update: content: " + help);
00342       if (updateQos.getState() == "ERASED" ) return "";
00343 
00344       int count = atoi(help.c_str());
00345       assertEquals(log_, ME, numOfUpdates_, count, string("update check ") + help);
00346       numOfUpdates_++;
00347       return "";
00348    }
00349 
00350 };
00351 
00352 }}}
00353 
00354 
00355 using namespace org::xmlBlaster::test;
00356 
00366 int main(int args, char ** argv)
00367 {
00368    TestFailsafe *testFailsafe = 0;
00369    try {
00370       org::xmlBlaster::util::Object_Lifetime_Manager::init();
00371       testFailsafe = new TestFailsafe(args, argv);
00372       testFailsafe->setUp();
00373       testFailsafe->testReconnect();
00374       
00375       // testFailsafe.testFailsafe();
00376       testFailsafe->tearDown();
00377       delete testFailsafe;
00378       testFailsafe = 0; 
00379       org::xmlBlaster::util::Object_Lifetime_Manager::fini();
00380    }
00381    catch (XmlBlasterException& ex) {
00382       std::cout << ex.toXml() << std::endl;
00383    }
00384    catch (bad_exception& ex) {
00385       cout << "bad_exception: " << ex.what() << endl;
00386    }
00387    catch (exception& ex) {
00388       cout << " exception: " << ex.what() << endl;
00389    }
00390    catch (string& ex) {
00391       cout << "string: " << ex << endl;
00392    }
00393    catch (char* ex) {
00394       cout << "char* :  " << ex << endl;
00395    }
00396 
00397    catch (...)
00398    {
00399       cout << "unknown exception occured" << endl;
00400       XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
00401       cout << e.toXml() << endl;
00402    }
00403 
00404    return 0;
00405 }
00406