1 /*------------------------------------------------------------------------------
2 Name: TestTopicLifeCycle.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Testing some topic state transitions
6 ------------------------------------------------------------------------------*/
7 package org.xmlBlaster.test.topic;
8
9 import java.util.logging.Logger;
10 import java.util.logging.Level;
11 import org.xmlBlaster.util.Global;
12
13 import org.xmlBlaster.client.qos.ConnectQos;
14 import org.xmlBlaster.util.FileLocator;
15 import org.xmlBlaster.util.XmlBlasterException;
16 import org.xmlBlaster.util.MsgUnit;
17 import org.xmlBlaster.util.def.Constants;
18 import org.xmlBlaster.util.qos.TopicProperty;
19 import org.xmlBlaster.client.I_Callback;
20 import org.xmlBlaster.client.key.UpdateKey;
21 import org.xmlBlaster.client.key.PublishKey;
22 import org.xmlBlaster.client.key.GetKey;
23 import org.xmlBlaster.client.key.SubscribeKey;
24 import org.xmlBlaster.client.key.UnSubscribeKey;
25 import org.xmlBlaster.client.key.EraseKey;
26 import org.xmlBlaster.client.qos.GetQos;
27 import org.xmlBlaster.client.qos.PublishQos;
28 import org.xmlBlaster.client.qos.PublishReturnQos;
29 import org.xmlBlaster.client.qos.UpdateQos;
30 import org.xmlBlaster.client.qos.SubscribeQos;
31 import org.xmlBlaster.client.qos.SubscribeReturnQos;
32 import org.xmlBlaster.client.qos.EraseQos;
33 import org.xmlBlaster.client.qos.EraseReturnQos;
34 import org.xmlBlaster.client.qos.UnSubscribeQos;
35 import org.xmlBlaster.client.I_XmlBlasterAccess;
36
37 import org.xmlBlaster.util.EmbeddedXmlBlaster;
38 import org.xmlBlaster.test.Util;
39 import org.xmlBlaster.test.MsgInterceptor;
40
41 import junit.framework.Test;
42 import junit.framework.TestSuite;
43 import org.custommonkey.xmlunit.XMLTestCase;
44
45
46 /**
47 * Here we test some state transitions of a topic.
48 * <p>
49 * We traverse the possible transitions of a topic (TopicHandler.java)
50 * as described in requirement engine.message.lifecycle by sending some expiring messages (see
51 * state transition brackets in requirement)<br />
52 * Please see individual test for a description
53 * </p>
54 * <p>
55 * Invoke examples:
56 * </p>
57 * <pre>
58 * java junit.textui.TestRunner org.xmlBlaster.test.topic.TestTopicLifeCycle
59 *
60 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.topic.TestTopicLifeCycle
61 * </pre>
62 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/engine.message.lifecycle.html">The engine.message.lifecycle requirement</a>
63 * @see org.xmlBlaster.engine.TopicHandler
64 */
65 public class TestTopicLifeCycle extends XMLTestCase implements I_Callback {
66 private Global glob;
67 private static Logger log = Logger.getLogger(TestTopicLifeCycle.class.getName());
68
69 private I_XmlBlasterAccess con = null;
70 private String senderContent = "Some message content";
71 private String publishOid = "TestTopicLifeCycleMsg";
72 private final String xpathTag = "<something/>";
73 private final String xpath = "//something";
74 private SubscribeReturnQos subscribeReturnQos;
75 private long blockUpdateTime = 0L;
76
77 private EmbeddedXmlBlaster serverThread;
78 private int serverPort = 9566;
79 private boolean startEmbedded = true;
80
81 private MsgInterceptor updateInterceptor;
82
83 /**
84 * Constructs the TestTopicLifeCycle object.
85 * <p />
86 * @param testName The name used in the test suite
87 * @param loginName The name to login to the xmlBlaster
88 */
89 public TestTopicLifeCycle(Global glob, String testName) {
90 super(testName);
91 this.glob = glob;
92
93 }
94
95 /**
96 * Sets up the fixture.
97 * <p />
98 * Creates a CORBA connection and does a login.<br />
99 * - One connection for the sender client<br />
100 */
101 protected void setUp() {
102 this.startEmbedded = glob.getProperty().get("startEmbedded", this.startEmbedded);
103 if (this.startEmbedded) {
104 glob.init(Util.getOtherServerPorts(serverPort));
105 String[] args = { };
106 glob.init(args);
107 serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
108 log.info("XmlBlaster is ready for testing the priority dispatch plugin");
109 }
110
111 try {
112 con = glob.getXmlBlasterAccess();
113 ConnectQos qos = new ConnectQos(glob); // == "<qos></qos>";
114 this.updateInterceptor = new MsgInterceptor(this.glob, log, this);
115 con.connect(qos, this.updateInterceptor);
116 }
117 catch (Exception e) {
118 log.severe(e.toString());
119 e.printStackTrace();
120 }
121 this.updateInterceptor.clear();
122 }
123
124 /**
125 * Tears down the fixture.
126 * <p />
127 * cleaning up .... logout
128 */
129 protected void tearDown() {
130 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait 200 milli seconds, until all updates are processed ...
131
132 String xmlKey = "<key oid='" + publishOid + "' queryType='EXACT'/>";
133 try {
134 EraseReturnQos[] arr = con.erase(xmlKey, "<qos/>");
135 if (arr.length != 0) {
136 log.severe("Erased " + arr.length + " messages instead of 0");
137 }
138 assertEquals("Erase", 0, arr.length); // The volatile message schould not exist !!
139 } catch(XmlBlasterException e) { fail("Erase XmlBlasterException: " + e.getMessage()); }
140
141 con.disconnect(null);
142 con=null;
143
144 if (this.startEmbedded) {
145 try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
146 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
147 this.serverThread = null;
148 }
149
150 // reset to default server port (necessary if other tests follow in the same JVM).
151 Util.resetPorts();
152 this.glob = null;
153
154 }
155
156 public EraseReturnQos[] sendErase(boolean forceDestroy) {
157 log.info("Erasing a topic forceDestroy=" + forceDestroy);
158 try {
159 EraseQos eq = new EraseQos(glob);
160 eq.setForceDestroy(forceDestroy);
161 EraseKey ek = new EraseKey(glob, this.publishOid);
162 EraseReturnQos[] er = con.erase(ek.toXml(), eq.toXml());
163 return er;
164 } catch(XmlBlasterException e) {
165 fail("Erase XmlBlasterException: " + e.getMessage());
166 }
167 return null;
168 }
169
170 /**
171 * Publish an almost volatile message.
172 */
173 public void sendExpiringMsg(boolean initializeTopic, long topicDestroyDelay, long msgLifeTime) {
174 log.info("Sending a message initializeTopic=" + initializeTopic + " topicDestroyDelay=" + topicDestroyDelay + " msgLifeTime=" + msgLifeTime);
175 try {
176 // Publish a volatile message
177 PublishKey pk = new PublishKey(glob, publishOid, "text/xml", "1.0");
178 PublishQos pq = new PublishQos(glob);
179 pq.setLifeTime(msgLifeTime);
180 pq.setForceDestroy(false);
181 if (initializeTopic) {
182 // Configure the topic to our needs
183 TopicProperty topicProperty = new TopicProperty(glob);
184 topicProperty.setDestroyDelay(topicDestroyDelay);
185 topicProperty.setCreateDomEntry(false);
186 pq.setTopicProperty(topicProperty);
187 }
188 MsgUnit msgUnit = new MsgUnit(pk, senderContent, pq);
189 PublishReturnQos publishReturnQos = con.publish(msgUnit);
190 assertEquals("Retunred oid is invalid", publishOid, publishReturnQos.getKeyOid());
191 log.info("Sending of '" + senderContent + "' done, returned oid=" + publishOid + " " + msgUnit.toXml());
192 } catch(XmlBlasterException e) {
193 log.severe("publish() XmlBlasterException: " + e.getMessage());
194 assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
195 }
196 }
197
198 /**
199 * Publish an almost volatile XPATH message.
200 * @return publishOid
201 */
202 public String sendExpiringXPathMsg(long topicDestroyDelay, long msgLifeTime) {
203 log.info("Sending a XPath message topicDestroyDelay=" + topicDestroyDelay + " msgLifeTime=" + msgLifeTime);
204 try {
205 // Publish a volatile message
206 PublishKey pk = new PublishKey(glob, "", "text/xml", "1.0");
207 pk.setClientTags(xpathTag);
208 PublishQos pq = new PublishQos(glob);
209 pq.setLifeTime(msgLifeTime);
210 pq.setForceDestroy(false);
211 // Configure the topic to our needs
212 TopicProperty topicProperty = new TopicProperty(glob);
213 topicProperty.setDestroyDelay(topicDestroyDelay);
214 topicProperty.setCreateDomEntry(true);
215 pq.setTopicProperty(topicProperty);
216 MsgUnit msgUnit = new MsgUnit(pk, senderContent, pq);
217 PublishReturnQos publishReturnQos = con.publish(msgUnit);
218 log.info("Sending of '" + senderContent + "' done, returned oid=" + publishReturnQos.getKeyOid() + " " + msgUnit.toXml());
219 return publishReturnQos.getKeyOid();
220 } catch(XmlBlasterException e) {
221 log.severe("publish() XmlBlasterException: " + e.getMessage());
222 assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
223 return ""; // never reached
224 }
225 }
226
227 /**
228 * Subscribe a volatile message.
229 */
230 public void subscribeMsg() {
231 log.info("Subscribing message '" + publishOid + "'...");
232 try {
233 // Subscribe for the volatile message
234 SubscribeKey sk = new SubscribeKey(glob, publishOid);
235 SubscribeQos sq = new SubscribeQos(glob);
236 this.subscribeReturnQos = con.subscribe(sk.toXml(), sq.toXml());
237 log.info("Subscribing of '" + publishOid + "' done");
238 } catch(XmlBlasterException e) {
239 log.severe("subscribe() XmlBlasterException: " + e.getMessage());
240 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
241 }
242 }
243
244 /**
245 * Subscribe topics with XPATH.
246 * @return The subscription id
247 */
248 public String subscribeXPathMsg() {
249 log.info("Subscribing message xpath='" + xpath + "'...");
250 try {
251 // Subscribe for the volatile message
252 SubscribeKey sk = new SubscribeKey(glob, xpath, Constants.XPATH);
253 SubscribeQos sq = new SubscribeQos(glob);
254 this.subscribeReturnQos = con.subscribe(sk.toXml(), sq.toXml());
255 log.info("Subscribing of '" + xpath + "' done");
256 return this.subscribeReturnQos.getSubscriptionId();
257 } catch(XmlBlasterException e) {
258 log.severe("subscribe() XmlBlasterException: " + e.getMessage());
259 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
260 return ""; // never reached
261 }
262 }
263
264 /**
265 * unSubscribe a message.
266 */
267 public void unSubscribeMsg() {
268 log.info("unSubscribing a volatile message ...");
269 try {
270 // Subscribe for the volatile message
271 UnSubscribeKey sk = new UnSubscribeKey(glob, subscribeReturnQos.getSubscriptionId());
272 UnSubscribeQos sq = new UnSubscribeQos(glob);
273 con.unSubscribe(sk.toXml(), sq.toXml());
274 log.info("UnSubscribing of '" + publishOid + "' done");
275 } catch(XmlBlasterException e) {
276 log.severe("unSubscribe() XmlBlasterException: " + e.getMessage());
277 assertTrue("unSubscribe - XmlBlasterException: " + e.getMessage(), false);
278 }
279 }
280
281 /**
282 * Retrieve a dump of xmlBlaster to analyse
283 */
284 private String getDump() {
285 try {
286 GetKey gk = new GetKey(glob, "__cmd:?dump");
287 GetQos gq = new GetQos(glob);
288 MsgUnit[] msgs = con.get(gk.toXml(), gq.toXml());
289 assertEquals("Did not expect returned msg for get()", 1, msgs.length);
290 return msgs[0].getContentStr();
291 }
292 catch (XmlBlasterException e) {
293 fail("Didn't expect an exception in get(): " + e.getMessage());
294 }
295 return "";
296 }
297
298 /**
299 * THIS IS THE TEST
300 * <p>
301 * We traverse the transitions
302 * <pre>
303 * Start -[2]-> ALIVE (3 sec)
304 * -[6]-> UNREFERENCED (3 sec)
305 * -[11]-> DEAD
306 * <pre>
307 * as described in requirement engine.message.lifecycle by sending some expiring messages (see
308 * state transition brackets in requirement)
309 * </p>
310 */
311 public void testExpiry() {
312 log.info("Entering testExpiry ...");
313 this.updateInterceptor.clear();
314
315 { // topic transition from START -> [2] -> ALIVE (3 sec)
316 long topicDestroyDelay = 6000L;
317 long msgLifeTime = 3000L;
318 sendExpiringMsg(true, topicDestroyDelay, msgLifeTime);
319 assertEquals("numReceived after sending", 0, this.updateInterceptor.waitOnUpdate(1000L, 0)); // no message arrived?
320 String dump = getDump();
321 log.fine(dump);
322 // Expecting something like:
323 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='ALIVE'>
324 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
325 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
326 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='ALIVE'") != -1);
327 }
328
329
330 { // topic transition from ALIVE -> [6] -> UNREFERENCED (3 sec)
331 try { Thread.sleep(3500L); } catch( InterruptedException i) {}
332 String dump = getDump();
333 // Expecting something like:
334 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='UNREFERENCED'>
335 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
336 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
337 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='UNREFERENCED'") != -1);
338 }
339
340 { // topic transition from UNREFERENCED -> [11] -> DEAD
341 log.info("Sleeping for another 5 sec, the topic (with destroyDelay=6sec) should be dead then");
342 try { Thread.sleep(6000); } catch( InterruptedException i) {}
343 // Topic should be destroyed now
344
345 String dump = getDump();
346 log.fine("IS DEAD?"+dump);
347 assertTrue("Not expected a dead topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
348 }
349 log.info("SUCCESS testExpiry");
350 }
351
352 /**
353 * THIS IS THE TEST
354 * <p>
355 * We traverse the transitions
356 * <pre>
357 * Start -[2]-> ALIVE (3 sec)
358 * -[6]-> UNREFERENCED (3 sec)
359 * -[5]-> ALIVE (3 sec)
360 * -[11]-> DEAD
361 * <pre>
362 * as described in requirement engine.message.lifecycle by sending some expiring messages (see
363 * state transition brackets in requirement)
364 * </p>
365 */
366 public void testUnreferencedAlive() throws Exception {
367 log.info("Entering testUnreferencedAlive ...");
368 this.updateInterceptor.clear();
369
370 { log.info("topic transition from START -> [2] -> ALIVE (3 sec)");
371 long topicDestroyDelay = 6000L;
372 long msgLifeTime = 3000L;
373 sendExpiringMsg(true, topicDestroyDelay, msgLifeTime);
374 assertEquals("numReceived after sending", 0, this.updateInterceptor.waitOnUpdate(1000L, 0)); // no message arrived?
375 String dump = getDump();
376 log.fine(dump);
377 // Expecting something like:
378 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='ALIVE'>
379 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
380 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
381 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='ALIVE'") != -1);
382 }
383
384 { log.info("topic transition from ALIVE -> [6] -> UNREFERENCED (3 sec)");
385 try { Thread.sleep(3500L); } catch( InterruptedException i) {}
386 String dump = getDump();
387 // Expecting something like:
388 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='UNREFERENCED'>
389 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
390 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
391 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='UNREFERENCED'") != -1);
392 }
393
394 { log.info("topic transition from UNREFERENCED -> [5] -> ALIVE (3 sec)");
395 long msgLifeTime = 3000L;
396 sendExpiringMsg(true, 0L, msgLifeTime);
397 assertEquals("numReceived after sending", 0, this.updateInterceptor.waitOnUpdate(1000L, 0)); // no message arrived?
398 String dump = getDump();
399 log.fine(dump);
400 // Expecting something like:
401 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='ALIVE'>
402 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
403 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
404 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='ALIVE'") != -1);
405 //System.out.println(dump);
406 // This assert could find "__sys__UserList" instead of the wanted "TestTopicLifeCycleMsg"
407 //assertXpathEvaluatesTo(publishOid, "//uniqueKey", dump);
408 }
409
410 { log.info("topic transition from ALIVE -> [10] -> DEAD");
411 boolean forceDestroy = true;
412 EraseReturnQos[] erq = sendErase(forceDestroy);
413 assertEquals("erase failed", 1, erq.length);
414 String dump = getDump();
415 assertTrue("Not expected a dead topic:" + dump, dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
416 }
417
418 { log.info("topic transition from ALIVE -> [10] -> DEAD with XPath subscription");
419 subscribeXPathMsg();
420 long topicDestroyDelay = 0L;
421 long msgLifeTime = 0L;
422 String oid = sendExpiringXPathMsg(topicDestroyDelay, msgLifeTime);
423 assertEquals("numReceived after sending", 1, this.updateInterceptor.waitOnUpdate(1000L, oid, Constants.STATE_OK));
424 assertEquals("", 1, this.updateInterceptor.getMsgs().length);
425 String dump = getDump();
426 assertTrue("Not expected a dead topic:" + dump, dump.indexOf("<uniqueKey>"+oid+"</uniqueKey>") == -1);
427 // assert does not work because of other internal topics:
428 //assertXpathNotExists("//uniqueKey", dump);
429 unSubscribeMsg();
430 }
431
432 log.info("SUCCESS testUnreferencedAlive");
433 }
434
435 /**
436 * THIS IS THE TEST
437 * <p>
438 * We traverse the transitions
439 * <pre>
440 * Start -[2]-> ALIVE (0 sec)
441 * -[6]-> UNREFERENCED (0 sec)
442 * -[11]-> DEAD
443 * <pre>
444 * as described in requirement engine.message.lifecycle by sending some expiring messages (see
445 * state transition brackets in requirement)<br />
446 * Please see individual test for a description
447 * </p>
448 */
449 public void testVolatile() {
450 log.info("Entering testVolatile ...");
451 this.updateInterceptor.clear();
452
453 { // topic transition from START -> [2] -> ALIVE -> DEAD
454 long topicDestroyDelay = 0L;
455 long msgLifeTime = 0L;
456 sendExpiringMsg(true, topicDestroyDelay, msgLifeTime);
457 assertTrue("Not expected a dead topic", getDump().indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
458 }
459 log.info("SUCCESS testVolatile");
460 }
461
462 /**
463 * THIS IS THE TEST
464 * Transitions [1] -> [4] -> [6] -> [11]
465 */
466 public void testSubscribeVolatile() {
467 log.info("Entering testSubscribeVolatile ...");
468 this.updateInterceptor.clear();
469
470 { // topic transition from START -> [1] -> UNCONFIGURED
471 subscribeMsg();
472 if (log.isLoggable(Level.FINE)) log.fine("Retrieving initial dump=" + getDump());
473 String dump = getDump();
474 log.fine(dump);
475 // Expecting something like:
476 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='UNCONFIGURED'>
477 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
478 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
479 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='UNCONFIGURED'") != -1);
480 }
481
482 { // topic transition from UNCONFIGURED -> [4] -> ALIVE
483 long topicDestroyDelay = 0L;
484 long msgLifeTime = 0L;
485 sendExpiringMsg(true, topicDestroyDelay, msgLifeTime);
486 assertEquals("numReceived after sending", 1, this.updateInterceptor.waitOnUpdate(2000L, 1));
487 String dump = getDump();
488 log.fine(dump);
489 // Expecting something like:
490 // <TopicHandler id='http_192_168_1_4_3412/msg/TestTopicLifeCycleMsg' state='ALIVE'>
491 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
492 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
493 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='ALIVE'") != -1);
494 }
495
496 { // topic transition from ALIVE -> [6] -> UNREFERENCED
497 try { Thread.sleep(1000L); } catch( InterruptedException i) {}
498 unSubscribeMsg();
499 // topic transition from UNREFERENCED -> [11] DEAD (wait 200 millis as this is done by timeout thread (async))
500 try { Thread.sleep(200L); } catch( InterruptedException i) {}
501 String dump = getDump();
502 assertTrue("Not expected a dead topic:" + dump, dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
503 }
504 log.info("SUCCESS testSubscribeVolatile");
505 }
506
507 /**
508 * THIS IS THE TEST
509 * Transitions [1] -> [13] -> [9]
510 */
511 public void testUnconfiguredSubscribeSubscribe() {
512 log.info("Entering testUnconfiguredSubscribeSubscribe ...");
513 this.updateInterceptor.clear();
514
515 { // topic transition from START -> [1] -> UNCONFIGURED
516 subscribeMsg();
517 if (log.isLoggable(Level.FINE)) log.fine("Retrieving initial dump=" + getDump());
518 String dump = getDump();
519 log.fine(dump);
520 // Expecting something like:
521 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='UNCONFIGURED'>
522 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
523 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
524 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='UNCONFIGURED'") != -1);
525 }
526
527 { // topic transition from START -> [1] -> UNCONFIGURED
528 subscribeMsg();
529 String dump = getDump();
530 log.fine(dump);
531 // Expecting something like:
532 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='UNCONFIGURED'>
533 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
534 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
535 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='UNCONFIGURED'") != -1);
536 }
537
538 { // topic transition from UNCONFIGURED -> [9] -> DEAD
539 boolean forceDestroy = false;
540 this.updateInterceptor.countErased(true);
541 EraseReturnQos[] erq = sendErase(forceDestroy);
542 log.info("erase num=" + erq.length);
543 assertEquals("erase failed", 1, erq.length);
544 assertEquals("", 2, this.updateInterceptor.waitOnUpdate(1000L, publishOid, Constants.STATE_ERASED, 2)); // Expecting two erase events (for the above subscriptions)
545 try { Thread.sleep(1000L); } catch( InterruptedException i) {} // Give server a change to destroy topic after delivery of erase event messages
546 this.updateInterceptor.countErased(false);
547 String dump = getDump();
548 assertTrue("Not expected a dead topic:" + dump, dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
549 }
550 log.info("SUCCESS testUnconfiguredSubscribeSubscribe");
551 }
552
553 /**
554 * THIS IS THE TEST
555 * Transitions [1] -> [4] -> [7] -> [12]
556 */
557 public void testSoftErased() {
558 log.info("Entering testSoftErased ...");
559 this.updateInterceptor.clear();
560
561 { // topic transition from START -> [1] -> UNCONFIGURED
562 subscribeMsg();
563 String dump = getDump();
564 log.fine(dump);
565 // Expecting something like:
566 // <TopicHandler id='http_192_168_1_4_3412/msg/TestTopicLifeCycleMsg' state='UNCONFIGURED'>
567 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
568 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
569 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='UNCONFIGURED'") != -1);
570 }
571
572 { // topic transition from UNCONFIGURED -> [4] -> ALIVE
573 long topicDestroyDelay = 4000L;
574 long msgLifeTime = 40000000L;
575 this.blockUpdateTime = 3000L; // Blocking callback thread for 3 sec to force state SOFTERASED !!
576 sendExpiringMsg(true, topicDestroyDelay, msgLifeTime);
577 assertEquals("numReceived after sending", 1, this.updateInterceptor.waitOnUpdate(2000L, 1)); // message arrived?
578 String dump = getDump();
579 log.fine(dump);
580 // Expecting something like:
581 // <TopicHandler id='http_192_168_1_4_3412/msg/TestTopicLifeCycleMsg' state='ALIVE'>
582 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
583 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
584 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='ALIVE'") != -1);
585 }
586
587 { // topic transition from ALIVE -> [7] -> SOFTERASED
588 boolean forceDestroy = false;
589 EraseReturnQos[] erq = sendErase(forceDestroy);
590 assertEquals("erase failed", 1, erq.length);
591 this.updateInterceptor.waitOnUpdate(1000L, 1); // Expecting one erase event (for the above subscription)
592 try { Thread.sleep(1000L); } catch( InterruptedException i) {} // Give server a change to destroy topic after delivery of erase event messages
593 String dump = getDump();
594 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
595 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='SOFTERASED'") != -1);
596 }
597
598 { // topic transition from SOFTERASED -> [12] --> DEAD
599 try { Thread.sleep(4500L); } catch( InterruptedException i) {}
600 String dump = getDump();
601 assertTrue("Not expected a dead topic:" + dump, dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
602 }
603 log.info("SUCCESS testSoftErased");
604 }
605
606 /**
607 * THIS IS THE TEST
608 * Transitions [1] -> [4] -> [10]
609 */
610 public void testForcedErased() {
611 log.info("Entering testForcedErased ...");
612 this.updateInterceptor.clear();
613
614 { // topic transition from START -> [1] -> UNCONFIGURED
615 subscribeMsg();
616 if (log.isLoggable(Level.FINE)) log.fine("Retrieving initial dump=" + getDump());
617 String dump = getDump();
618 log.fine(dump);
619 // Expecting something like:
620 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='UNCONFIGURED'>
621 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
622 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
623 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='UNCONFIGURED'") != -1);
624 }
625
626 { // topic transition from UNCONFIGURED -> [4] -> ALIVE
627 long topicDestroyDelay = 400000L;
628 long msgLifeTime = 400000L;
629 this.blockUpdateTime = 0L;
630 sendExpiringMsg(true, topicDestroyDelay, msgLifeTime);
631 assertEquals("numReceived after sending", 1, this.updateInterceptor.waitOnUpdate(2000L, 1));
632 String dump = getDump();
633 log.fine(dump);
634 // Expecting something like:
635 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='ALIVE'>
636 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
637 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
638 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='ALIVE'") != -1);
639 }
640
641 { // topic transition from ALIVE -> [10] -> DEAD
642 boolean forceDestroy = true;
643 EraseReturnQos[] erq = sendErase(forceDestroy);
644 assertEquals("erase failed", 1, erq.length);
645 String dump = getDump();
646 assertTrue("Not expected a dead topic:" + dump, dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
647 }
648 log.info("SUCCESS testForcedErased");
649 }
650
651 /**
652 * THIS IS THE TEST
653 * Transitions [1] -> [9]
654 */
655 public void testUnconfiguredErased() {
656 log.info("Entering testUnconfiguredErased ...");
657 this.updateInterceptor.clear();
658 this.updateInterceptor.countErased(true);
659
660 { // topic transition from START -> [1] -> UNCONFIGURED
661 subscribeMsg();
662 String dump = getDump();
663 if (log.isLoggable(Level.FINE)) log.fine("Retrieving initial dump=" + dump);
664 // Expecting something like:
665 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='UNCONFIGURED'>
666 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
667 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
668 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='UNCONFIGURED'") != -1);
669 }
670
671 { // topic transition from UNCONFIGURED -> [9] -> DEAD
672 boolean forceDestroy = false;
673 EraseReturnQos[] erq = sendErase(forceDestroy);
674 assertEquals("erase failed", 1, erq.length);
675 this.updateInterceptor.waitOnUpdate(1000L, 1); // Expecting one erase event (for the above subscription)
676 assertEquals("Expected ERASE", 1, this.updateInterceptor.getMsgs(this.publishOid,Constants.STATE_ERASED).length);
677 try { Thread.sleep(1000L); } catch( InterruptedException i) {} // Give server a change to destroy topic after delivery of erase event messages
678 String dump = getDump();
679 assertTrue("Not expected a dead topic:" + dump, dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
680 }
681 log.info("SUCCESS testUnconfiguredErased");
682 }
683
684 /**
685 * THIS IS THE TEST
686 * Transitions [1] -> [9] (by unSubscribe)
687 */
688 public void testUnconfiguredUnSubscribe() {
689 log.info("Entering testUnconfiguredUnSubscribe ...");
690 this.updateInterceptor.clear();
691
692 { // topic transition from START -> [1] -> UNCONFIGURED
693 subscribeMsg();
694 if (log.isLoggable(Level.FINE)) log.fine("Retrieving initial dump=" + getDump());
695 String dump = getDump();
696 log.fine(dump);
697 // Expecting something like:
698 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicLifeCycleMsg' state='UNCONFIGURED'>
699 // <uniqueKey>TestTopicLifeCycleMsg</uniqueKey>
700 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
701 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicLifeCycleMsg' state='UNCONFIGURED'") != -1);
702 }
703
704 { // topic transition from UNCONFIGURED -> [9] -> DEAD
705 unSubscribeMsg();
706 String dump = getDump();
707 assertTrue("Not expected a dead topic:" + dump, dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
708 }
709 log.info("SUCCESS testUnconfiguredUnSubscribe");
710 }
711
712 /**
713 * This is the callback method invoked from xmlBlaster
714 * delivering us a new asynchronous message.
715 * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
716 */
717 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
718 log.info("Receiving update of a message " + updateKey.getOid() + " " + updateQos.getState());
719
720 if (updateQos.isOk()) {
721 //assertEquals("Wrong oid of message returned", publishOid, updateKey.getOid());
722 assertEquals("Message content is corrupted", new String(senderContent), new String(content));
723 }
724
725 if (this.blockUpdateTime > 0L) {
726 log.info("Blocking the update callback for " + this.blockUpdateTime + " millis");
727 try { Thread.sleep(this.blockUpdateTime); } catch( InterruptedException i) {}
728 this.blockUpdateTime = 0L;
729 log.info("Block released, reset blockTimer");
730 }
731 return "";
732 }
733
734 /**
735 * Method is used by TestRunner to load these tests
736 */
737 public static Test suite() {
738 TestSuite suite= new TestSuite();
739 suite.addTest(new TestTopicLifeCycle(new Global(), "testSoftErased"));
740 suite.addTest(new TestTopicLifeCycle(new Global(), "testExpiry"));
741 suite.addTest(new TestTopicLifeCycle(new Global(), "testUnreferencedAlive"));
742 suite.addTest(new TestTopicLifeCycle(new Global(), "testVolatile"));
743 suite.addTest(new TestTopicLifeCycle(new Global(), "testSubscribeVolatile"));
744 suite.addTest(new TestTopicLifeCycle(new Global(), "testUnconfiguredSubscribeSubscribe"));
745 suite.addTest(new TestTopicLifeCycle(new Global(), "testSoftErased"));
746 suite.addTest(new TestTopicLifeCycle(new Global(), "testForcedErased"));
747 suite.addTest(new TestTopicLifeCycle(new Global(), "testUnconfiguredErased"));
748 suite.addTest(new TestTopicLifeCycle(new Global(), "testUnconfiguredUnSubscribe"));
749 return suite;
750 }
751
752 /**
753 * Invoke: java org.xmlBlaster.test.topic.TestTopicLifeCycle -startEmbedded false
754 */
755 public static void main(String args[]) {
756 try {
757 TestTopicLifeCycle testSub = new TestTopicLifeCycle(new Global(args), "TestTopicLifeCycle");
758 testSub.setUp();
759 //testSub.testExpiry();
760 //testSub.testUnreferencedAlive();
761 //testSub.testVolatile();
762 //testSub.testSubscribeVolatile();
763 //testSub.testUnconfiguredSubscribeSubscribe();
764 //testSub.testSoftErased();
765 //testSub.testForcedErased();
766 testSub.testUnconfiguredErased();
767 //testSub.testUnconfiguredUnSubscribe();
768 testSub.tearDown();
769 }
770 catch(Exception e) {
771 e.printStackTrace();
772 System.out.println("ERROR!!!!: " + e.toString());
773 }
774 }
775 }
syntax highlighted by Code2HTML, v. 0.9.1