00001 /*----------------------------------------------------------------------------- 00002 Name: TestFailsafe.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Testing the Timeout Features 00006 -----------------------------------------------------------------------------*/ 00007 #include "TestSuite.h" 00008 #include <iostream> 00009 00010 namespace org { namespace xmlBlaster { namespace test { 00011 00012 using namespace std; 00013 using namespace org::xmlBlaster::util; 00014 using namespace org::xmlBlaster::util::qos; 00015 using namespace org::xmlBlaster::util::dispatch; 00016 using namespace org::xmlBlaster::util::thread; 00017 using namespace org::xmlBlaster::util::qos::address; 00018 using namespace org::xmlBlaster::client; 00019 using namespace org::xmlBlaster::client::qos; 00020 using namespace org::xmlBlaster::client::key; 00021 00022 class TestFailsafe : public virtual I_Callback, public virtual I_ConnectionProblems, public TestSuite 00023 { 00024 private: 00025 ConnectQos *connQos_; 00026 ConnectReturnQos *connRetQos_; 00027 SubscribeQos *subQos_; 00028 SubscribeKey *subKey_; 00029 PublishQos *pubQos_; 00030 PublishKey *pubKey_; 00031 Mutex updateMutex_; 00032 bool isConnected_; 00033 int numOfUpdates_; 00034 bool useSessionMarker_; // Remove again at version 2.0 00035 00036 public: 00037 TestFailsafe(int args, char ** argv) 00038 : TestSuite(args, argv, "TestFailsafe"), 00039 updateMutex_() 00040 { 00041 connQos_ = 0; 00042 connRetQos_ = 0; 00043 subQos_ = 0; 00044 subKey_ = 0; 00045 pubQos_ = 0; 00046 pubKey_ = 0; 00047 isConnected_ = false; 00048 numOfUpdates_ = 0; 00049 00050 SessionName sn(global_, "client/dummy"); 00051 useSessionMarker_ = sn.useSessionMarker(); 00052 } 00053 00054 00055 virtual ~TestFailsafe() 00056 { 00057 if (log_.call()) log_.call(ME, "destructor"); 00058 delete connQos_; 00059 delete connRetQos_; 00060 delete subQos_; 00061 delete subKey_; 00062 delete pubQos_; 00063 delete pubKey_; 00064 if (log_.trace()) log_.trace(ME, "destructor ended"); 00065 } 00066 00067 bool reachedAlive(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/) 00068 { 00069 log_.info(ME, "reconnected"); 00070 isConnected_ = true; 00071 return true; 00072 } 00073 00074 void reachedDead(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/) 00075 { 00076 log_.info(ME, "lost connection"); 00077 isConnected_ = false; 00078 } 00079 00080 void reachedPolling(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/) 00081 { 00082 log_.info(ME, "going to poll modus"); 00083 isConnected_ = false; 00084 } 00085 00086 AddressBaseRef getAddress() { 00087 AddressBaseRef address = new Address(global_); 00088 address->setDelay(1000); 00089 address->setPingInterval(1000); 00090 return address; 00091 } 00092 00093 void setUp() 00094 { 00095 TestSuite::setUp(); 00096 try { 00097 connection_.initFailsafe(this); 00098 00099 connQos_ = new ConnectQos(global_, "guy", "secret"); 00100 connQos_->setAddress(getAddress()); 00101 log_.info(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos_->toXml()); 00102 // Login to xmlBlaster, register for updates 00103 connRetQos_ = new ConnectReturnQos(connection_.connect(*connQos_, this)); 00104 log_.info(ME, "successfully connected to xmlBlaster. Return qos: " + connRetQos_->toXml()); 00105 00106 subKey_ = new SubscribeKey(global_); 00107 subKey_->setOid("TestFailsafe"); 00108 subQos_ = new SubscribeQos(global_); 00109 log_.info(ME, string("subscribing to xmlBlaster with key: ") + subKey_->toXml() + " and qos: " + subQos_->toXml()); 00110 00111 SubscribeReturnQos subRetQos = connection_.subscribe(*subKey_, *subQos_); 00112 log_.info(ME, string("successfully subscribed to xmlBlaster. Return qos: ") + subRetQos.toXml()); 00113 } 00114 catch (XmlBlasterException& ex) { 00115 log_.error(ME, string("exception occurred in setUp. ") + ex.toXml()); 00116 assert(0); 00117 } 00118 } 00119 00120 00128 void testReconnect() 00129 { 00130 log_.info(ME, "testReconnect START"); 00131 tearDown(); 00132 // DisconnectQos disconnectQos(global_); 00133 // connection_.disconnect(disconnectQos); 00134 Thread::sleep(500); 00135 if (useEmbeddedServer_) { 00136 stopEmbeddedServer(); 00137 Thread::sleepSecs(2); 00138 } 00139 else { 00140 waitOnKeyboardHit("Please stop the server now and hit 'c' to continue >> "); 00141 //log_.info(ME, "please stop the server now (I will wait 20 s)"); 00142 //Thread::sleepSecs(20); 00143 } 00144 log_.info(ME, "the communication is now down: ready to start the tests"); 00145 ConnectQos connQos(global_); 00146 connQos.setAddress(getAddress()); 00147 SessionQos sessionQos(global_,"client/Fritz/-2"); 00148 connQos.setSessionQos(sessionQos); 00149 bool wentInException = false; 00150 00151 try { 00152 connection_.connect(connQos, this); 00153 } 00154 catch (XmlBlasterException &ex) { 00155 log_.info(ME, "Exception is wanted: " + ex.toString()); 00156 wentInException = true; 00157 } 00158 assertEquals(log_, ME, true, wentInException, "reconnecting when communication down and not giving positive publicSessionId: exception must be thrown"); 00159 00160 sessionQos = SessionQos(global_,"client/Fritz/-1"); 00161 connQos.setSessionQos(sessionQos); 00162 wentInException = false; 00163 try { 00164 connection_.connect(connQos, this); 00165 } 00166 catch (XmlBlasterException &ex) { 00167 log_.info(ME, "Exception is wanted: " + ex.toString()); 00168 wentInException = true; 00169 } 00170 assertEquals(log_, ME, true, wentInException, "reconnecting for the second time when communication down and not giving positive publicSessionId: exception must be thrown (again)"); 00171 00172 log_.info(ME, "TESTING FAIL SAFE ..."); 00173 sessionQos = SessionQos(global_,"client/Fritz/7"); 00174 connQos.setSessionQos(sessionQos); 00175 wentInException = false; 00176 try { 00177 ConnectReturnQos retQos = connection_.connect(connQos, this); 00178 string name = retQos.getSessionQos().getRelativeName(); 00179 if (useSessionMarker_) 00180 assertEquals(log_, ME, string("client/Fritz/session/7"), name, "checking that return qos has the correct sessionId"); 00181 else 00182 assertEquals(log_, ME, string("client/Fritz/7"), name, "checking that return qos has the correct sessionId"); 00183 } 00184 catch (XmlBlasterException &ex) { 00185 log_.error(ME, ex.toXml()); 00186 wentInException = true; 00187 } 00188 assertEquals(log_, ME, false, wentInException, "reconnecting when communication down and giving positive publicSessionId: no exception expected"); 00189 00190 sessionQos = SessionQos(global_,"client/Fritz/2"); 00191 connQos.setSessionQos(sessionQos); 00192 wentInException = false; 00193 try { 00194 connection_.connect(connQos, this); 00195 } 00196 catch (XmlBlasterException &/*ex*/) { 00197 wentInException = true; 00198 } 00199 assertEquals(log_, ME, false, wentInException, "reconnecting second time when communication down and giving positive publicSessionId: no exception expected but a warning should have come"); 00200 00201 00202 DisconnectQos discQos(global_); 00203 wentInException = false; 00204 try { 00205 connection_.disconnect(discQos); 00206 } 00207 catch (XmlBlasterException &/*ex*/) { 00208 wentInException = true; 00209 } 00210 assertEquals(log_, ME, true, wentInException, "disconnecting when no communication should give an exception"); 00211 00212 // and now we are reconnecting ... 00213 if (useEmbeddedServer_) { 00214 startEmbeddedServer(); 00215 Thread::sleepSecs(1); 00216 } 00217 else { 00218 for (int i=0; i < 30; i++) { 00219 if (isConnected_) break; 00220 log_.info(ME, "please restart the server now"); 00221 Thread::sleepSecs(2); 00222 if (connection_.isAlive()) { 00223 break; 00224 } 00225 } 00226 } 00227 00228 // making a subscription now should work ... 00229 SubscribeKey subKey(global_); 00230 subKey.setOid("TestReconnect"); 00231 SubscribeQos subQos(global_); 00232 wentInException = false; 00233 try { 00234 connection_.subscribe(subKey, subQos); 00235 } 00236 catch (XmlBlasterException &ex) { 00237 wentInException = true; 00238 log_.info(ME, string("exception when subscribing: ") + ex.toXml()); 00239 } 00240 assertEquals(log_, ME, false, wentInException, "subscribing when communication should not give an exception"); 00241 00242 log_.info(ME, "disconnecting now the newly established connection"); 00243 connection_.disconnect(DisconnectQos(global_)); 00244 log_.info(ME, "going to call setUp to reestablish the initial setup"); 00245 00246 setUp(); 00247 00248 // publishing something to make it happy 00249 PublishQos pubQos(global_); 00250 PublishKey pubKey(global_); 00251 pubKey.setOid("TestFailsafe"); 00252 00253 string msg = "dummy"; 00254 MessageUnit msgUnit(pubKey, msg, pubQos); 00255 connection_.publish(msgUnit); 00256 00257 log_.info(ME, "testReconnect END"); 00258 } 00259 00260 00261 void testFailsafe() 00262 { 00263 int imax = 30; 00264 try { 00265 pubQos_ = new PublishQos(global_); 00266 pubKey_ = new PublishKey(global_); 00267 pubKey_->setOid("TestFailsafe"); 00268 00269 for (int i=0; i < imax; i++) { 00270 string msg = lexical_cast<string>(i); 00271 MessageUnit msgUnit(*pubKey_, msg, *pubQos_); 00272 log_.info(ME, string("publishing msg '") + msg + "'"); 00273 /*PublishReturnQos pubRetQos =*/ connection_.publish(msgUnit); 00274 00275 if (i == 2) stopEmbeddedServer(); 00276 if (i == 12) startEmbeddedServer(); 00277 try { 00278 Thread::sleepSecs(1); 00279 } 00280 catch(XmlBlasterException e) { 00281 cout << e.toXml() << endl; 00282 } 00283 00284 } 00285 } 00286 catch (XmlBlasterException& ex) { 00287 log_.error(ME, string("exception occurred in testFailSafe. ") + ex.toXml()); 00288 assert(0); 00289 } 00290 00291 int i = 0; 00292 while (numOfUpdates_ < (imax-1) && i < 100) { 00293 i++; 00294 Thread::sleep(100); 00295 } 00296 00297 00298 } 00299 00300 00301 void tearDown() 00302 { 00303 try { 00304 EraseKey eraseKey(global_); 00305 eraseKey.setOid("TestFailsafe"); 00306 EraseQos eraseQos(global_); 00307 log_.info(ME, string("erasing the published message. Key: ") + eraseKey.toXml() + " qos: " + eraseQos.toXml()); 00308 vector<EraseReturnQos> eraseRetQos = connection_.erase(eraseKey, eraseQos); 00309 for (size_t i=0; i < eraseRetQos.size(); i++ ) { 00310 log_.info(ME, string("successfully erased the message. return qos: ") + eraseRetQos[i].toXml()); 00311 } 00312 00313 // log_.info(ME, "going to sleep for one minute"); 00314 // org::xmlBlaster::util::thread::Thread::sleep(60000); 00315 00316 DisconnectQos disconnectQos(global_); 00317 connection_.disconnect(disconnectQos); 00318 } 00319 catch (XmlBlasterException& ex) { 00320 log_.error(ME, string("exception occurred in tearDown. ") + ex.toXml()); 00321 assert(0); 00322 } 00323 00324 delete connQos_; connQos_ = 0; 00325 delete subQos_; subQos_ = 0; 00326 delete subKey_; subKey_ = 0; 00327 delete connRetQos_; connRetQos_ = 0; 00328 delete pubQos_; pubQos_ = 0; 00329 delete pubKey_; pubKey_ = 0; 00330 00331 TestSuite::tearDown(); 00332 } 00333 00334 string update(const string& sessionId, UpdateKey& updateKey, const unsigned char *content, long contentSize, UpdateQos& updateQos) 00335 { 00336 Lock lock(updateMutex_); 00337 if (log_.trace()) log_.trace(ME, "update: session: " + sessionId); 00338 if (log_.trace()) log_.trace(ME, "update: key : " + updateKey.toXml()); 00339 if (log_.trace()) log_.trace(ME, "update: qos : " + updateQos.toXml()); 00340 string help((char*)content, (char*)(content)+contentSize); 00341 if (log_.trace()) log_.trace(ME, "update: content: " + help); 00342 if (updateQos.getState() == "ERASED" ) return ""; 00343 00344 int count = atoi(help.c_str()); 00345 assertEquals(log_, ME, numOfUpdates_, count, string("update check ") + help); 00346 numOfUpdates_++; 00347 return ""; 00348 } 00349 00350 }; 00351 00352 }}} 00353 00354 00355 using namespace org::xmlBlaster::test; 00356 00366 int main(int args, char ** argv) 00367 { 00368 TestFailsafe *testFailsafe = 0; 00369 try { 00370 org::xmlBlaster::util::Object_Lifetime_Manager::init(); 00371 testFailsafe = new TestFailsafe(args, argv); 00372 testFailsafe->setUp(); 00373 testFailsafe->testReconnect(); 00374 00375 // testFailsafe.testFailsafe(); 00376 testFailsafe->tearDown(); 00377 delete testFailsafe; 00378 testFailsafe = 0; 00379 org::xmlBlaster::util::Object_Lifetime_Manager::fini(); 00380 } 00381 catch (XmlBlasterException& ex) { 00382 std::cout << ex.toXml() << std::endl; 00383 } 00384 catch (bad_exception& ex) { 00385 cout << "bad_exception: " << ex.what() << endl; 00386 } 00387 catch (exception& ex) { 00388 cout << " exception: " << ex.what() << endl; 00389 } 00390 catch (string& ex) { 00391 cout << "string: " << ex << endl; 00392 } 00393 catch (char* ex) { 00394 cout << "char* : " << ex << endl; 00395 } 00396 00397 catch (...) 00398 { 00399 cout << "unknown exception occured" << endl; 00400 XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread"); 00401 cout << e.toXml() << endl; 00402 } 00403 00404 return 0; 00405 } 00406