1 /*-----------------------------------------------------------------------------
2 Name: TestFailsafe.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Testing the Timeout Features
6 -----------------------------------------------------------------------------*/
7 #include "TestSuite.h"
8 #include <iostream>
9
10 namespace org { namespace xmlBlaster { namespace test {
11
12 using namespace std;
13 using namespace org::xmlBlaster::util;
14 using namespace org::xmlBlaster::util::qos;
15 using namespace org::xmlBlaster::util::dispatch;
16 using namespace org::xmlBlaster::util::thread;
17 using namespace org::xmlBlaster::util::qos::address;
18 using namespace org::xmlBlaster::client;
19 using namespace org::xmlBlaster::client::qos;
20 using namespace org::xmlBlaster::client::key;
21
22 class TestFailsafe : public virtual I_Callback, public virtual I_ConnectionProblems, public TestSuite
23 {
24 private:
25 ConnectQos *connQos_;
26 ConnectReturnQos *connRetQos_;
27 SubscribeQos *subQos_;
28 SubscribeKey *subKey_;
29 PublishQos *pubQos_;
30 PublishKey *pubKey_;
31 Mutex updateMutex_;
32 bool isConnected_;
33 int numOfUpdates_;
34 bool useSessionMarker_; // Remove again at version 2.0
35
36 public:
37 TestFailsafe(int args, char ** argv)
38 : TestSuite(args, argv, "TestFailsafe"),
39 updateMutex_()
40 {
41 connQos_ = 0;
42 connRetQos_ = 0;
43 subQos_ = 0;
44 subKey_ = 0;
45 pubQos_ = 0;
46 pubKey_ = 0;
47 isConnected_ = false;
48 numOfUpdates_ = 0;
49
50 SessionName sn(global_, "client/dummy");
51 useSessionMarker_ = sn.useSessionMarker();
52 }
53
54
55 virtual ~TestFailsafe()
56 {
57 if (log_.call()) log_.call(ME, "destructor");
58 delete connQos_;
59 delete connRetQos_;
60 delete subQos_;
61 delete subKey_;
62 delete pubQos_;
63 delete pubKey_;
64 if (log_.trace()) log_.trace(ME, "destructor ended");
65 }
66
67 bool reachedAlive(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
68 {
69 log_.info(ME, "reconnected");
70 isConnected_ = true;
71 return true;
72 }
73
74 void reachedDead(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
75 {
76 log_.info(ME, "lost connection");
77 isConnected_ = false;
78 }
79
80 void reachedPolling(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
81 {
82 log_.info(ME, "going to poll modus");
83 isConnected_ = false;
84 }
85
86 AddressBaseRef getAddress() {
87 AddressBaseRef address = new Address(global_);
88 address->setDelay(1000);
89 address->setPingInterval(1000);
90 return address;
91 }
92
93 void setUp()
94 {
95 TestSuite::setUp();
96 try {
97 connection_.initFailsafe(this);
98
99 connQos_ = new ConnectQos(global_, "guy", "secret");
100 connQos_->setAddress(getAddress());
101 log_.info(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos_->toXml());
102 // Login to xmlBlaster, register for updates
103 connRetQos_ = new ConnectReturnQos(connection_.connect(*connQos_, this));
104 log_.info(ME, "successfully connected to xmlBlaster. Return qos: " + connRetQos_->toXml());
105
106 subKey_ = new SubscribeKey(global_);
107 subKey_->setOid("TestFailsafe");
108 subQos_ = new SubscribeQos(global_);
109 log_.info(ME, string("subscribing to xmlBlaster with key: ") + subKey_->toXml() + " and qos: " + subQos_->toXml());
110
111 SubscribeReturnQos subRetQos = connection_.subscribe(*subKey_, *subQos_);
112 log_.info(ME, string("successfully subscribed to xmlBlaster. Return qos: ") + subRetQos.toXml());
113 }
114 catch (XmlBlasterException& ex) {
115 log_.error(ME, string("exception occurred in setUp. ") + ex.toXml());
116 assert(0);
117 }
118 }
119
120
121 /**
122 * This test does the following:
123 * - tears down , i.e. it erases the message 'TestFailsafe' and disconnects.
124 * - shuts down the server if embedded, otherwise waits you to shutdown for 20 s.
125 * - tries to reconnect (and should fail since the server is not connected and the session id is negative)
126 * -
127 */
128 void testReconnect()
129 {
130 log_.info(ME, "testReconnect START");
131 tearDown();
132 // DisconnectQos disconnectQos(global_);
133 // connection_.disconnect(disconnectQos);
134 Thread::sleep(500);
135 if (useEmbeddedServer_) {
136 stopEmbeddedServer();
137 Thread::sleepSecs(2);
138 }
139 else {
140 waitOnKeyboardHit("Please stop the server now and hit 'c' to continue >> ");
141 //log_.info(ME, "please stop the server now (I will wait 20 s)");
142 //Thread::sleepSecs(20);
143 }
144 log_.info(ME, "the communication is now down: ready to start the tests");
145 ConnectQos connQos(global_);
146 connQos.setAddress(getAddress());
147 SessionQos sessionQos(global_,"client/Fritz/-2");
148 connQos.setSessionQos(sessionQos);
149 bool wentInException = false;
150
151 try {
152 connection_.connect(connQos, this);
153 }
154 catch (XmlBlasterException &ex) {
155 log_.info(ME, "Exception is wanted: " + ex.toString());
156 wentInException = true;
157 }
158 assertEquals(log_, ME, true, wentInException, "reconnecting when communication down and not giving positive publicSessionId: exception must be thrown");
159
160 sessionQos = SessionQos(global_,"client/Fritz/-1");
161 connQos.setSessionQos(sessionQos);
162 wentInException = false;
163 try {
164 connection_.connect(connQos, this);
165 }
166 catch (XmlBlasterException &ex) {
167 log_.info(ME, "Exception is wanted: " + ex.toString());
168 wentInException = true;
169 }
170 assertEquals(log_, ME, true, wentInException, "reconnecting for the second time when communication down and not giving positive publicSessionId: exception must be thrown (again)");
171
172 log_.info(ME, "TESTING FAIL SAFE ...");
173 sessionQos = SessionQos(global_,"client/Fritz/7");
174 connQos.setSessionQos(sessionQos);
175 wentInException = false;
176 try {
177 ConnectReturnQos retQos = connection_.connect(connQos, this);
178 string name = retQos.getSessionQos().getRelativeName();
179 if (useSessionMarker_)
180 assertEquals(log_, ME, string("client/Fritz/session/7"), name, "checking that return qos has the correct sessionId");
181 else
182 assertEquals(log_, ME, string("client/Fritz/7"), name, "checking that return qos has the correct sessionId");
183 }
184 catch (XmlBlasterException &ex) {
185 log_.error(ME, ex.toXml());
186 wentInException = true;
187 }
188 assertEquals(log_, ME, false, wentInException, "reconnecting when communication down and giving positive publicSessionId: no exception expected");
189
190 sessionQos = SessionQos(global_,"client/Fritz/2");
191 connQos.setSessionQos(sessionQos);
192 wentInException = false;
193 try {
194 connection_.connect(connQos, this);
195 }
196 catch (XmlBlasterException &/*ex*/) {
197 wentInException = true;
198 }
199 assertEquals(log_, ME, false, wentInException, "reconnecting second time when communication down and giving positive publicSessionId: no exception expected but a warning should have come");
200
201
202 DisconnectQos discQos(global_);
203 wentInException = false;
204 try {
205 connection_.disconnect(discQos);
206 }
207 catch (XmlBlasterException &/*ex*/) {
208 wentInException = true;
209 }
210 assertEquals(log_, ME, true, wentInException, "disconnecting when no communication should give an exception");
211
212 // and now we are reconnecting ...
213 if (useEmbeddedServer_) {
214 startEmbeddedServer();
215 Thread::sleepSecs(1);
216 }
217 else {
218 for (int i=0; i < 30; i++) {
219 if (isConnected_) break;
220 log_.info(ME, "please restart the server now");
221 Thread::sleepSecs(2);
222 if (connection_.isAlive()) {
223 break;
224 }
225 }
226 }
227
228 // making a subscription now should work ...
229 SubscribeKey subKey(global_);
230 subKey.setOid("TestReconnect");
231 SubscribeQos subQos(global_);
232 wentInException = false;
233 try {
234 connection_.subscribe(subKey, subQos);
235 }
236 catch (XmlBlasterException &ex) {
237 wentInException = true;
238 log_.info(ME, string("exception when subscribing: ") + ex.toXml());
239 }
240 assertEquals(log_, ME, false, wentInException, "subscribing when communication should not give an exception");
241
242 log_.info(ME, "disconnecting now the newly established connection");
243 connection_.disconnect(DisconnectQos(global_));
244 log_.info(ME, "going to call setUp to reestablish the initial setup");
245
246 setUp();
247
248 // publishing something to make it happy
249 PublishQos pubQos(global_);
250 PublishKey pubKey(global_);
251 pubKey.setOid("TestFailsafe");
252
253 string msg = "dummy";
254 MessageUnit msgUnit(pubKey, msg, pubQos);
255 connection_.publish(msgUnit);
256
257 log_.info(ME, "testReconnect END");
258 }
259
260
261 void testFailsafe()
262 {
263 int imax = 30;
264 try {
265 pubQos_ = new PublishQos(global_);
266 pubKey_ = new PublishKey(global_);
267 pubKey_->setOid("TestFailsafe");
268
269 for (int i=0; i < imax; i++) {
270 string msg = lexical_cast<string>(i);
271 MessageUnit msgUnit(*pubKey_, msg, *pubQos_);
272 log_.info(ME, string("publishing msg '") + msg + "'");
273 /*PublishReturnQos pubRetQos =*/ connection_.publish(msgUnit);
274
275 if (i == 2) stopEmbeddedServer();
276 if (i == 12) startEmbeddedServer();
277 try {
278 Thread::sleepSecs(1);
279 }
280 catch(XmlBlasterException e) {
281 cout << e.toXml() << endl;
282 }
283
284 }
285 }
286 catch (XmlBlasterException& ex) {
287 log_.error(ME, string("exception occurred in testFailSafe. ") + ex.toXml());
288 assert(0);
289 }
290
291 int i = 0;
292 while (numOfUpdates_ < (imax-1) && i < 100) {
293 i++;
294 Thread::sleep(100);
295 }
296
297
298 }
299
300
301 void tearDown()
302 {
303 try {
304 EraseKey eraseKey(global_);
305 eraseKey.setOid("TestFailsafe");
306 EraseQos eraseQos(global_);
307 log_.info(ME, string("erasing the published message. Key: ") + eraseKey.toXml() + " qos: " + eraseQos.toXml());
308 vector<EraseReturnQos> eraseRetQos = connection_.erase(eraseKey, eraseQos);
309 for (size_t i=0; i < eraseRetQos.size(); i++ ) {
310 log_.info(ME, string("successfully erased the message. return qos: ") + eraseRetQos[i].toXml());
311 }
312
313 // log_.info(ME, "going to sleep for one minute");
314 // org::xmlBlaster::util::thread::Thread::sleep(60000);
315
316 DisconnectQos disconnectQos(global_);
317 connection_.disconnect(disconnectQos);
318 }
319 catch (XmlBlasterException& ex) {
320 log_.error(ME, string("exception occurred in tearDown. ") + ex.toXml());
321 assert(0);
322 }
323
324 delete connQos_; connQos_ = 0;
325 delete subQos_; subQos_ = 0;
326 delete subKey_; subKey_ = 0;
327 delete connRetQos_; connRetQos_ = 0;
328 delete pubQos_; pubQos_ = 0;
329 delete pubKey_; pubKey_ = 0;
330
331 TestSuite::tearDown();
332 }
333
334 string update(const string& sessionId, UpdateKey& updateKey, const unsigned char *content, long contentSize, UpdateQos& updateQos)
335 {
336 Lock lock(updateMutex_);
337 if (log_.trace()) log_.trace(ME, "update: session: " + sessionId);
338 if (log_.trace()) log_.trace(ME, "update: key : " + updateKey.toXml());
339 if (log_.trace()) log_.trace(ME, "update: qos : " + updateQos.toXml());
340 string help((char*)content, (char*)(content)+contentSize);
341 if (log_.trace()) log_.trace(ME, "update: content: " + help);
342 if (updateQos.getState() == "ERASED" ) return "";
343
344 int count = atoi(help.c_str());
345 assertEquals(log_, ME, numOfUpdates_, count, string("update check ") + help);
346 numOfUpdates_++;
347 return "";
348 }
349
350 };
351
352 }}}
353
354
355 using namespace org::xmlBlaster::test;
356
357 /**
358 * Try
359 * <pre>
360 * java TestFailsafe -help
361 * </pre>
362 * for usage help
363 *
364 * To disable the embedded server add -embeddedServer false
365 */
366 int main(int args, char ** argv)
367 {
368 TestFailsafe *testFailsafe = 0;
369 try {
370 org::xmlBlaster::util::Object_Lifetime_Manager::init();
371 testFailsafe = new TestFailsafe(args, argv);
372 testFailsafe->setUp();
373 testFailsafe->testReconnect();
374
375 // testFailsafe.testFailsafe();
376 testFailsafe->tearDown();
377 delete testFailsafe;
378 testFailsafe = 0;
379 org::xmlBlaster::util::Object_Lifetime_Manager::fini();
380 }
381 catch (XmlBlasterException& ex) {
382 std::cout << ex.toXml() << std::endl;
383 }
384 catch (bad_exception& ex) {
385 cout << "bad_exception: " << ex.what() << endl;
386 }
387 catch (exception& ex) {
388 cout << " exception: " << ex.what() << endl;
389 }
390 catch (string& ex) {
391 cout << "string: " << ex << endl;
392 }
393 catch (char* ex) {
394 cout << "char* : " << ex << endl;
395 }
396
397 catch (...)
398 {
399 cout << "unknown exception occured" << endl;
400 XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
401 cout << e.toXml() << endl;
402 }
403
404 return 0;
405 }
syntax highlighted by Code2HTML, v. 0.9.1