1 /*------------------------------------------------------------------------------
2 Name: TestPersistentSession.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.client;
7
8 import java.util.logging.Logger;
9 import org.xmlBlaster.util.Global;
10 import org.xmlBlaster.util.SessionName;
11 import org.xmlBlaster.util.XmlBlasterException;
12 import org.xmlBlaster.util.def.ErrorCode;
13 import org.xmlBlaster.util.def.Constants;
14 import org.xmlBlaster.util.property.PropString;
15 import org.xmlBlaster.util.EmbeddedXmlBlaster;
16 import org.xmlBlaster.util.qos.address.Address;
17 import org.xmlBlaster.util.qos.address.CallbackAddress;
18 import org.xmlBlaster.util.MsgUnit;
19 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
20 import org.xmlBlaster.client.qos.PublishQos;
21 import org.xmlBlaster.client.I_Callback;
22 import org.xmlBlaster.client.I_ConnectionStateListener;
23 import org.xmlBlaster.client.I_XmlBlasterAccess;
24 import org.xmlBlaster.client.key.SubscribeKey;
25 import org.xmlBlaster.client.key.UpdateKey;
26 import org.xmlBlaster.client.qos.*;
27
28 import org.xmlBlaster.test.Util;
29 import org.xmlBlaster.test.MsgInterceptor;
30
31 import junit.framework.*;
32
33
34 /**
35 * Tests the persistent sessions .
36 * <br />For a description of what this persistent sessions and subscriptions are
37 * please read the requirement engine.persistence.session.
38 * <p>
39 * This is an interesting example, since it creates a XmlBlaster server instance
40 * in the same JVM , but in a separate thread, talking over CORBA with it.
41 * <p>
42 * Invoke examples:<br />
43 * <pre>
44 * java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestPersistentSession
45 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestPersistentSession
46 * </pre>
47 * @see org.xmlBlaster.client.I_XmlBlasterAccess
48 */
49 public class TestPersistentSession extends TestCase implements I_ConnectionStateListener, I_Callback
50 {
51 private static String ME = "TestPersistentSession";
52 private static final boolean TRANSIENT = false;
53 private static final boolean PERSISTENT = true;
54
55 private Global glob;
56 private Global origGlobal;
57 private Global serverGlobal;
58 private static Logger log = Logger.getLogger(TestPersistentSession.class.getName());
59
60 private int serverPort = 7604;
61 private EmbeddedXmlBlaster serverThread;
62
63 private MsgInterceptor[] updateInterceptors;
64 //private I_XmlBlasterAccess con;
65 private String senderName;
66
67 private int numPublish = 8;
68 private int numStop = 3;
69 private int numStart = 5;
70 private final String contentMime = "text/plain";
71
72 private final long reconnectDelay = 2000L;
73 private boolean failsafeCallback = true;
74 /** the session is persistent from the beginning */
75 private boolean persistent = true;
76 private boolean exactSubscription = false;
77 private boolean initialUpdates = true;
78 private int numSubscribers = 4;
79
80 public TestPersistentSession(String testName) {
81 this(null, testName);
82 }
83
84 public TestPersistentSession(Global glob, String testName) {
85 super(testName);
86 this.origGlobal = glob;
87 this.senderName = testName;
88 this.updateInterceptors = new MsgInterceptor[this.numSubscribers];
89 }
90
91 /**
92 * Sets up the fixture.
93 * <p />
94 * Connect to xmlBlaster and login
95 */
96 protected void setUp() {
97 setup(false);
98 }
99
100
101 private void setup(boolean restrictedEntries) {
102 this.origGlobal = (this.origGlobal == null) ? Global.instance() : this.origGlobal;
103
104
105 this.origGlobal.init(Util.getOtherServerPorts(serverPort));
106 this.glob = this.origGlobal.getClone(null);
107
108 String[] args = null;
109 if (restrictedEntries) {
110 args = new String[] {"-persistence/session/maxEntriesCache", "1",
111 "-persistence/session/maxEntries","2",
112 "-persistence/subscribe/maxEntriesCache", "2",
113 "-persistence/subscribe/maxEntries","3",
114 };
115 }
116 this.serverGlobal = this.origGlobal.getClone(args);
117 serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.serverGlobal);
118 log.info("XmlBlaster is ready for testing on bootstrapPort " + serverPort);
119
120 System.out.println("============== Connect/Disconnect for general/1 to cleanup first");
121
122 try { // we just connect and disconnect to make sure all resources are really cleaned up
123 Global tmpGlobal = this.origGlobal.getClone(null);
124 I_XmlBlasterAccess con = tmpGlobal.getXmlBlasterAccess(); // Find orb
125
126 String passwd = "secret";
127 ConnectQos connectQos = new ConnectQos(tmpGlobal, senderName, passwd); // == "<qos>...</qos>";
128 connectQos.setSessionName(new SessionName(tmpGlobal, "general/1"));
129 // set the persistent connection
130 connectQos.setPersistent(this.persistent);
131 // Setup fail save handling for connection ...
132 Address addressProp = new Address(tmpGlobal);
133 addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
134 addressProp.setRetries(-1); // -1 == forever
135 addressProp.setPingInterval(-1L); // switched off
136 con.registerConnectionListener(this);
137 connectQos.setAddress(addressProp);
138
139 // setup failsafe handling for callback ...
140 if (this.failsafeCallback) {
141 CallbackAddress cbAddress = new CallbackAddress(tmpGlobal);
142 cbAddress.setRetries(-1);
143 cbAddress.setPingInterval(-1);
144 cbAddress.setDelay(1000L);
145 cbAddress.setSecretCbSessionId("someSecredSessionId");
146 connectQos.addCallbackAddress(cbAddress);
147 }
148 con.connect(connectQos, this);
149 DisconnectQos disconnectQos = new DisconnectQos(tmpGlobal);
150 con.disconnect(disconnectQos);
151 }
152 catch (XmlBlasterException e) {
153 log.warning("setUp() - login failed: " + e.getMessage());
154 fail("setUp() - login fail: " + e.getMessage());
155 }
156 catch (Exception e) {
157 log.severe("setUp() - login failed: " + e.toString());
158 e.printStackTrace();
159 fail("setUp() - login fail: " + e.toString());
160 }
161
162 System.out.println("============== Connect for general/1");
163
164 try {
165 I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess(); // Find orb
166
167 String passwd = "secret";
168 ConnectQos connectQos = new ConnectQos(this.glob, senderName, passwd); // == "<qos>...</qos>";
169 connectQos.setSessionName(new SessionName(this.glob, "general/1"));
170 // set the persistent connection
171 connectQos.setPersistent(this.persistent);
172 // Setup fail save handling for connection ...
173 Address addressProp = new Address(glob);
174 addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
175 addressProp.setRetries(-1); // -1 == forever
176 addressProp.setPingInterval(-1L); // switched off
177 con.registerConnectionListener(this);
178 connectQos.setAddress(addressProp);
179
180 // setup failsafe handling for callback ...
181 if (this.failsafeCallback) {
182 CallbackAddress cbAddress = new CallbackAddress(this.glob);
183 cbAddress.setRetries(-1);
184 cbAddress.setPingInterval(-1);
185 cbAddress.setDelay(1000L);
186 cbAddress.setSecretCbSessionId("someSecredSessionId");
187 connectQos.addCallbackAddress(cbAddress);
188 }
189
190 con.connect(connectQos, this); // Login to xmlBlaster, register for updates
191 }
192 catch (XmlBlasterException e) {
193 log.warning("setUp() - login failed: " + e.getMessage());
194 fail("setUp() - login fail: " + e.getMessage());
195 }
196 catch (Exception e) {
197 log.severe("setUp() - login failed: " + e.toString());
198 e.printStackTrace();
199 fail("setUp() - login fail: " + e.toString());
200 }
201 }
202
203 /**
204 * Tears down the fixture.
205 * <p />
206 * cleaning up .... erase() the previous message OID and logout
207 */
208 protected void tearDown() {
209 System.out.println("============== Entering tearDown(), test is finished");
210 log.info("Entering tearDown(), test is finished");
211 String xmlKey = "<key oid='' queryType='XPATH'>\n" +
212 " //TestPersistentSession-AGENT" +
213 "</key>";
214
215 String qos = "<qos><forceDestroy>true</forceDestroy></qos>";
216 I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
217 try {
218 System.out.println("============== tearDown(), erase: " + xmlKey);
219 con.erase(xmlKey, qos);
220
221 PropString defaultPlugin = new PropString("CACHE,1.0");
222 String propName = defaultPlugin.setFromEnv(this.glob, glob.getStrippedId(), null, "persistence", Constants.RELATING_TOPICSTORE, "defaultPlugin");
223 log.info("Lookup of propName=" + propName + " defaultValue=" + defaultPlugin.getValue());
224 }
225 catch(XmlBlasterException e) {
226 log.severe("XmlBlasterException: " + e.getMessage());
227 }
228 finally {
229 System.out.println("============== tearDown(), disconnect");
230 con.disconnect(null);
231 Util.delay(1000);
232 System.out.println("============== tearDown(), stopXmlBlaster");
233 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
234 this.serverThread = null;
235 // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
236 Util.resetPorts(this.serverGlobal);
237 Util.resetPorts(this.glob);
238 Util.resetPorts(this.origGlobal);
239 this.glob = null;
240 this.serverGlobal = null;
241 con = null;
242 Global.instance().shutdown();
243 }
244 System.out.println("============== tearDown() done");
245 }
246
247 /**
248 * TEST: Subscribe to messages with XPATH.
249 */
250 private void doSubscribe(int num, boolean isExact, boolean isPersistent) {
251 try {
252 SubscribeKey key = null;
253 if (isExact) key = new SubscribeKey(this.glob, "Message-1");
254 else key = new SubscribeKey(this.glob, "//TestPersistentSession-AGENT", "XPATH");
255
256 SubscribeQos qos = new SubscribeQos(this.glob); // "<qos><persistent>true</persistent></qos>";
257 qos.setPersistent(isPersistent);
258 qos.setWantInitialUpdate(this.initialUpdates);
259 qos.setWantNotify(false); // to avoig getting erased messages
260
261 this.updateInterceptors[num] = new MsgInterceptor(this.glob, log, null); // Collect received msgs
262 this.updateInterceptors[num].setLogPrefix("interceptor-" + num);
263 SubscribeReturnQos subscriptionId = this.glob.getXmlBlasterAccess().subscribe(key, qos, this.updateInterceptors[num]);
264
265 log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");
266 assertTrue("returned null subscriptionId", subscriptionId != null);
267 } catch(XmlBlasterException e) {
268 log.warning("XmlBlasterException: " + e.getMessage());
269 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
270 }
271 }
272
273 /**
274 * TEST: Construct a message and publish it.
275 * <p />
276 */
277 public void doPublish(int counter) throws XmlBlasterException {
278 String oid = "Message" + "-" + counter;
279 log.info("Publishing a message " + oid + " ...");
280 String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'>\n" +
281 " <TestPersistentSession-AGENT id='192.168.124.10' subId='1' type='generic'>" +
282 " </TestPersistentSession-AGENT>" +
283 "</key>";
284 String content = "" + counter;
285 PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"
286 MsgUnit msgUnit = new MsgUnit(xmlKey, content.getBytes(), qosWrapper.toXml());
287
288 this.glob.getXmlBlasterAccess().publish(msgUnit);
289 log.info("Success: Publishing of " + oid + " done");
290 }
291
292 /**
293 * TEST: <br />
294 */
295 public void persistentSession(boolean doStop) {
296 //doSubscribe(); -> see reachedAlive()
297 log.info("Going to publish " + numPublish + " messages, xmlBlaster will be down for message 3 and 4");
298 //
299 doSubscribe(0, this.exactSubscription, TRANSIENT);
300 doSubscribe(1, this.exactSubscription, PERSISTENT);
301
302 for (int i=0; i<numPublish; i++) {
303 try {
304 if (i == numStop) { // 3
305 if (doStop) {
306 log.info("Stopping xmlBlaster, but continue with publishing ...");
307 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
308 System.out.println("============== Stopped xmlBlaster, but continue with publishing");
309 this.serverThread = null;
310 }
311 else {
312 log.info("changing run level but continue with publishing ...");
313 this.serverThread.changeRunlevel(0, true);
314 System.out.println("============== Changed run level to 0 but continue with publishing");
315 }
316 }
317 if (i == numStart) {
318 if (doStop) {
319 log.info("Starting xmlBlaster again, expecting the previous published two messages ...");
320 // serverThread = EmbeddedXmlBlaster.startXmlBlaster(serverPort);
321 serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.serverGlobal);
322 log.info("xmlBlaster started, waiting on tail back messsages");
323 System.out.println("============== XmlBlaster started, waiting on two tail back messsages");
324 }
325 else {
326 log.info("changing runlevel again to runlevel 9. Expecting the previous published two messages ...");
327 this.serverThread.changeRunlevel(9, true);
328 log.info("xmlBlaster runlevel 9 reached, waiting on tail back messsages");
329 System.out.println("============== Changed runlevel again to runlevel 9. Expecting the previous published two messages");
330 }
331
332 // Message-4 We need to wait until the client reconnected (reconnect interval)
333 // Message-5
334 assertEquals("", 2, this.updateInterceptors[1].waitOnUpdate(reconnectDelay*2L, 2));
335 assertEquals("", 2, this.updateInterceptors[3].waitOnUpdate(reconnectDelay*2L, 2));
336
337 for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear();
338 }
339 doPublish(i+1);
340 if (i == 0) {
341 doSubscribe(2, this.exactSubscription, TRANSIENT);
342 doSubscribe(3, this.exactSubscription, PERSISTENT);
343 }
344
345 if (i < numStop || i >= numStart ) {
346 int n = 1;
347 if (i == 0 && !this.initialUpdates) n = 0;
348 assertEquals("Message nr. " + (i+1), 1, this.updateInterceptors[1].waitOnUpdate(4000L, 1));
349 assertEquals("Message nr. " + (i+1), n, this.updateInterceptors[3].waitOnUpdate(4000L, n));
350 }
351 for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear();
352 }
353 catch(XmlBlasterException e) {
354 if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_POLLING)
355 log.warning("Lost connection, my connection layer is polling: " + e.getMessage());
356 else if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_DEAD)
357 assertTrue("Lost connection, my connection layer is NOT polling", false);
358 else
359 assertTrue("Publishing problems: " + e.getMessage(), false);
360 }
361 }
362 doSubscribe(0, this.exactSubscription, TRANSIENT);
363 doSubscribe(1, this.exactSubscription, PERSISTENT);
364 }
365
366 /**
367 * This is the callback method invoked from I_XmlBlasterAccess
368 * informing the client in an asynchronous mode if the connection was established.
369 * <p />
370 * This method is enforced through interface I_ConnectionStateListener
371 */
372 public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
373 }
374
375 public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
376 log.info("I_ConnectionStateListener: We were lucky, reconnected to xmlBlaster");
377 // doSubscribe(); // initialize on startup and on reconnect
378 }
379
380 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
381 log.warning("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING);
382 }
383
384 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
385 log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
386 }
387
388 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
389 String contentStr = new String(content);
390 String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
391 log.info("Receiving update of a message oid=" + updateKey.getOid() +
392 " priority=" + updateQos.getPriority() +
393 " state=" + updateQos.getState() +
394 " content=" + cont);
395 log.severe("update: should never be invoked (msgInterceptors take care of it since they are passed on subscriptions), further log for receiving update of a message cbSessionId=" + cbSessionId +
396 updateKey.toXml() + "\n" + new String(content) + updateQos.toXml());
397 return "OK";
398 }
399
400
401 public void testXPathInitialStop() {
402 this.exactSubscription = false;
403 this.initialUpdates = true;
404 System.out.println("============== testXPathInitialStop");
405 persistentSession(true);
406 }
407
408 public void testXPathNoInitialStop() {
409 this.exactSubscription = false;
410 this.initialUpdates = false;
411 System.out.println("============== testXPathNoInitialStop");
412 persistentSession(true);
413 }
414
415 public void testXPathInitialRunlevelChange() {
416 this.persistent = true;
417 this.exactSubscription = false;
418 this.initialUpdates = true;
419 System.out.println("============== testXPathInitialRunlevelChange");
420 persistentSession(false);
421 }
422
423 // -----------------------------------------------------------------
424 private Global createConnection(Global parentGlobal, String sessionName, boolean isPersistent, boolean expectEx) {
425 try {
426 Global ret = parentGlobal.getClone(null);
427 I_XmlBlasterAccess con = ret.getXmlBlasterAccess(); // Find orb
428 ConnectQos connectQos = new ConnectQos(glob); // == "<qos>...</qos>";
429 connectQos.setSessionName(new SessionName(ret, sessionName));
430 // set the persistent connection
431 connectQos.setPersistent(isPersistent);
432 // Setup fail save handling for connection ...
433 Address addressProp = new Address(glob);
434 addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
435 addressProp.setRetries(-1); // -1 == forever
436 addressProp.setPingInterval(-1L); // switched off
437 connectQos.setAddress(addressProp);
438
439 // setup failsafe handling for callback ...
440 if (this.failsafeCallback) {
441 CallbackAddress cbAddress = new CallbackAddress(this.glob);
442 cbAddress.setRetries(-1);
443 cbAddress.setPingInterval(-1);
444 cbAddress.setDelay(1000L);
445 connectQos.addCallbackAddress(cbAddress);
446 }
447 con.connect(connectQos, this); // Login to xmlBlaster, register for updates
448 if (expectEx) assertTrue("an exception was expected here because of overflow: Configuration of session queue probably not working", false);
449 return ret;
450 }
451 catch (XmlBlasterException ex) {
452 if (expectEx) log.info("createConnection: exception was OK since overflow was expected");
453 else assertTrue("an exception should not occur here", false);
454 }
455 return null; //to make compiler happy
456 }
457
458
459 /**
460 * Tests the requirement:
461 * - If the storage for the sessions is overflown, it should throw an exception
462 *
463 */
464 public void testOverflow() {
465 System.out.println("============== testXPathNoInitialStop");
466 // to change the configuration on server side (limit the queue sizes)
467 tearDown();
468 setup(true);
469 Global[] globals = new Global[5];
470 try {
471 globals[0] = createConnection(this.origGlobal, "bjoern/1", true , false);
472 globals[1] = createConnection(this.origGlobal, "fritz/2", false, false);
473 globals[3] = createConnection(this.origGlobal, "dimitri/3", true , true); // <-- exception (since main connection also persistent)
474 globals[2] = createConnection(this.origGlobal, "pandora/4", false , false); // OK since transient
475 globals[4] = createConnection(this.origGlobal, "jonny/5", true, true);
476 }
477 finally {
478 Util.delay(2000);
479 for (int i=0; i < globals.length; i++) {
480 if (globals[i] != null) globals[i].getXmlBlasterAccess().disconnect(new DisconnectQos(globals[i]));
481 }
482 }
483 }
484
485 /**
486 * Invoke: java org.xmlBlaster.test.client.TestPersistentSession
487 * <p />
488 * @deprecated Use the TestRunner from the testsuite to run it:<p />
489 * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestPersistentSession</pre>
490 */
491 public static void main(String args[])
492 {
493 Global glob = new Global();
494 if (glob.init(args) != 0) {
495 System.out.println(ME + ": Init failed");
496 System.exit(1);
497 }
498
499 TestPersistentSession testSub = new TestPersistentSession(glob, "TestPersistentSession/1");
500
501 testSub.setUp();
502 testSub.testXPathInitialStop();
503 testSub.tearDown();
504
505 testSub.setUp();
506 testSub.testXPathNoInitialStop();
507 testSub.tearDown();
508
509 testSub.setUp();
510 testSub.testXPathInitialRunlevelChange();
511 testSub.tearDown();
512
513 testSub.setUp();
514 testSub.testOverflow();
515 testSub.tearDown();
516
517 log.info("Main done");
518 }
519 }
syntax highlighted by Code2HTML, v. 0.9.1