1 /*------------------------------------------------------------------------------
  2 Name:      TestPersistentSession.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.client;
  7 
  8 import java.util.logging.Logger;
  9 import org.xmlBlaster.util.Global;
 10 import org.xmlBlaster.util.SessionName;
 11 import org.xmlBlaster.util.XmlBlasterException;
 12 import org.xmlBlaster.util.def.ErrorCode;
 13 import org.xmlBlaster.util.def.Constants;
 14 import org.xmlBlaster.util.property.PropString;
 15 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 16 import org.xmlBlaster.util.qos.address.Address;
 17 import org.xmlBlaster.util.qos.address.CallbackAddress;
 18 import org.xmlBlaster.util.MsgUnit;
 19 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 20 import org.xmlBlaster.client.qos.PublishQos;
 21 import org.xmlBlaster.client.I_Callback;
 22 import org.xmlBlaster.client.I_ConnectionStateListener;
 23 import org.xmlBlaster.client.I_XmlBlasterAccess;
 24 import org.xmlBlaster.client.key.SubscribeKey;
 25 import org.xmlBlaster.client.key.UpdateKey;
 26 import org.xmlBlaster.client.qos.*;
 27 
 28 import org.xmlBlaster.test.Util;
 29 import org.xmlBlaster.test.MsgInterceptor;
 30 
 31 import junit.framework.*;
 32 
 33 
 34 /**
 35  * Tests the persistent sessions .
 36  * <br />For a description of what this persistent sessions and subscriptions are
 37  * please read the requirement engine.persistence.session.
 38  * <p>
 39  * This is an interesting example, since it creates a XmlBlaster server instance
 40  * in the same JVM , but in a separate thread, talking over CORBA with it.
 41  * <p>
 42  * Invoke examples:<br />
 43  * <pre>
 44  *   java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestPersistentSession
 45  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestPersistentSession
 46  * </pre>
 47  * @see org.xmlBlaster.client.I_XmlBlasterAccess
 48  */
 49 public class TestPersistentSession extends TestCase implements I_ConnectionStateListener, I_Callback
 50 {
 51    private static String ME = "TestPersistentSession";
 52    private static final boolean TRANSIENT = false;
 53    private static final boolean PERSISTENT = true;
 54    
 55    private Global glob;
 56    private Global origGlobal;
 57    private Global serverGlobal;
 58    private static Logger log = Logger.getLogger(TestPersistentSession.class.getName());
 59 
 60    private int serverPort = 7604;
 61    private EmbeddedXmlBlaster serverThread;
 62 
 63    private MsgInterceptor[] updateInterceptors;
 64    //private I_XmlBlasterAccess con;
 65    private String senderName;
 66 
 67    private int numPublish = 8;
 68    private int numStop = 3;
 69    private int numStart = 5;
 70    private final String contentMime = "text/plain";
 71 
 72    private final long reconnectDelay = 2000L;
 73    private boolean failsafeCallback = true;
 74    /** the session is persistent from the beginning */
 75    private boolean persistent = true;
 76    private boolean exactSubscription = false;
 77    private boolean initialUpdates = true;
 78    private int numSubscribers = 4;
 79 
 80    public TestPersistentSession(String testName) {
 81       this(null, testName);
 82    }
 83 
 84    public TestPersistentSession(Global glob, String testName) {
 85       super(testName);
 86       this.origGlobal = glob;
 87       this.senderName = testName;
 88       this.updateInterceptors = new MsgInterceptor[this.numSubscribers];
 89    }
 90 
 91    /**
 92     * Sets up the fixture.
 93     * <p />
 94     * Connect to xmlBlaster and login
 95     */
 96    protected void setUp() {
 97       setup(false);
 98    }
 99    
100    
101    private void setup(boolean restrictedEntries) {
102       this.origGlobal = (this.origGlobal == null) ? Global.instance() : this.origGlobal;
103 
104       
105       this.origGlobal.init(Util.getOtherServerPorts(serverPort));
106       this.glob = this.origGlobal.getClone(null);
107 
108       String[] args = null;
109       if (restrictedEntries) {
110          args = new String[] {"-persistence/session/maxEntriesCache", "1",
111                        "-persistence/session/maxEntries","2",
112                        "-persistence/subscribe/maxEntriesCache", "2",
113                        "-persistence/subscribe/maxEntries","3",
114                       };
115       }
116       this.serverGlobal = this.origGlobal.getClone(args);
117       serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.serverGlobal);
118       log.info("XmlBlaster is ready for testing on bootstrapPort " + serverPort);
119       
120       System.out.println("============== Connect/Disconnect for general/1 to cleanup first");
121 
122       try { // we just connect and disconnect to make sure all resources are really cleaned up
123          Global tmpGlobal = this.origGlobal.getClone(null);
124          I_XmlBlasterAccess con = tmpGlobal.getXmlBlasterAccess(); // Find orb
125 
126          String passwd = "secret";
127          ConnectQos connectQos = new ConnectQos(tmpGlobal, senderName, passwd); // == "<qos>...</qos>";
128          connectQos.setSessionName(new SessionName(tmpGlobal, "general/1"));
129          // set the persistent connection 
130          connectQos.setPersistent(this.persistent);
131          // Setup fail save handling for connection ...
132          Address addressProp = new Address(tmpGlobal);
133          addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
134          addressProp.setRetries(-1);       // -1 == forever
135          addressProp.setPingInterval(-1L); // switched off
136          con.registerConnectionListener(this);
137          connectQos.setAddress(addressProp);
138          
139          // setup failsafe handling for callback ...
140          if (this.failsafeCallback) {
141             CallbackAddress cbAddress = new CallbackAddress(tmpGlobal);
142             cbAddress.setRetries(-1);
143             cbAddress.setPingInterval(-1);
144             cbAddress.setDelay(1000L);
145             cbAddress.setSecretCbSessionId("someSecredSessionId");
146             connectQos.addCallbackAddress(cbAddress);
147          }
148          con.connect(connectQos, this);
149          DisconnectQos disconnectQos = new DisconnectQos(tmpGlobal);
150          con.disconnect(disconnectQos);
151       }
152       catch (XmlBlasterException e) {
153           log.warning("setUp() - login failed: " + e.getMessage());
154           fail("setUp() - login fail: " + e.getMessage());
155       }
156       catch (Exception e) {
157           log.severe("setUp() - login failed: " + e.toString());
158           e.printStackTrace();
159           fail("setUp() - login fail: " + e.toString());
160       }
161 
162       System.out.println("============== Connect for general/1");
163 
164       try {
165          I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess(); // Find orb
166 
167          String passwd = "secret";
168          ConnectQos connectQos = new ConnectQos(this.glob, senderName, passwd); // == "<qos>...</qos>";
169          connectQos.setSessionName(new SessionName(this.glob, "general/1"));
170          // set the persistent connection 
171          connectQos.setPersistent(this.persistent);
172          // Setup fail save handling for connection ...
173          Address addressProp = new Address(glob);
174          addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
175          addressProp.setRetries(-1);       // -1 == forever
176          addressProp.setPingInterval(-1L); // switched off
177          con.registerConnectionListener(this);
178          connectQos.setAddress(addressProp);
179          
180          // setup failsafe handling for callback ...
181          if (this.failsafeCallback) {
182             CallbackAddress cbAddress = new CallbackAddress(this.glob);
183             cbAddress.setRetries(-1);
184             cbAddress.setPingInterval(-1);
185             cbAddress.setDelay(1000L);
186             cbAddress.setSecretCbSessionId("someSecredSessionId");
187             connectQos.addCallbackAddress(cbAddress);
188          }
189 
190          con.connect(connectQos, this);  // Login to xmlBlaster, register for updates
191       }
192       catch (XmlBlasterException e) {
193           log.warning("setUp() - login failed: " + e.getMessage());
194           fail("setUp() - login fail: " + e.getMessage());
195       }
196       catch (Exception e) {
197           log.severe("setUp() - login failed: " + e.toString());
198           e.printStackTrace();
199           fail("setUp() - login fail: " + e.toString());
200       }
201    }
202 
203    /**
204     * Tears down the fixture.
205     * <p />
206     * cleaning up .... erase() the previous message OID and logout
207     */
208    protected void tearDown() {
209       System.out.println("============== Entering tearDown(), test is finished");
210       log.info("Entering tearDown(), test is finished");
211       String xmlKey = "<key oid='' queryType='XPATH'>\n" +
212                       "   //TestPersistentSession-AGENT" +
213                       "</key>";
214 
215       String qos = "<qos><forceDestroy>true</forceDestroy></qos>";
216       I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
217       try {
218          System.out.println("============== tearDown(), erase: " + xmlKey);
219          con.erase(xmlKey, qos);
220 
221          PropString defaultPlugin = new PropString("CACHE,1.0");
222          String propName = defaultPlugin.setFromEnv(this.glob, glob.getStrippedId(), null, "persistence", Constants.RELATING_TOPICSTORE, "defaultPlugin");
223          log.info("Lookup of propName=" + propName + " defaultValue=" + defaultPlugin.getValue());
224       }
225       catch(XmlBlasterException e) {
226          log.severe("XmlBlasterException: " + e.getMessage());
227       }
228       finally {
229          System.out.println("============== tearDown(), disconnect");
230          con.disconnect(null);
231          Util.delay(1000);
232          System.out.println("============== tearDown(), stopXmlBlaster");
233          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
234          this.serverThread = null;
235          // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
236          Util.resetPorts(this.serverGlobal);
237          Util.resetPorts(this.glob);
238          Util.resetPorts(this.origGlobal);
239          this.glob = null;
240          this.serverGlobal = null;
241          con = null;
242          Global.instance().shutdown();
243       }
244       System.out.println("============== tearDown() done");
245    }
246 
247    /**
248     * TEST: Subscribe to messages with XPATH.
249     */
250    private void doSubscribe(int num, boolean isExact, boolean isPersistent) {
251       try {
252          SubscribeKey key = null;
253          if (isExact)  key = new SubscribeKey(this.glob, "Message-1");
254          else key = new SubscribeKey(this.glob, "//TestPersistentSession-AGENT", "XPATH");
255 
256          SubscribeQos qos = new SubscribeQos(this.glob); // "<qos><persistent>true</persistent></qos>";
257          qos.setPersistent(isPersistent);
258          qos.setWantInitialUpdate(this.initialUpdates);
259          qos.setWantNotify(false); // to avoig getting erased messages
260 
261          this.updateInterceptors[num] = new MsgInterceptor(this.glob, log, null); // Collect received msgs
262          this.updateInterceptors[num].setLogPrefix("interceptor-" + num);
263          SubscribeReturnQos subscriptionId = this.glob.getXmlBlasterAccess().subscribe(key, qos, this.updateInterceptors[num]);
264 
265          log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");
266          assertTrue("returned null subscriptionId", subscriptionId != null);
267       } catch(XmlBlasterException e) {
268          log.warning("XmlBlasterException: " + e.getMessage());
269          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
270       }
271    }
272  
273    /**
274     * TEST: Construct a message and publish it.
275     * <p />
276     */
277    public void doPublish(int counter) throws XmlBlasterException {
278       String oid = "Message" + "-" + counter;
279       log.info("Publishing a message " + oid + " ...");
280       String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'>\n" +
281                       "   <TestPersistentSession-AGENT id='192.168.124.10' subId='1' type='generic'>" +
282                       "   </TestPersistentSession-AGENT>" +
283                       "</key>";
284       String content = "" + counter;
285       PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"
286       MsgUnit msgUnit = new MsgUnit(xmlKey, content.getBytes(), qosWrapper.toXml());
287 
288       this.glob.getXmlBlasterAccess().publish(msgUnit);
289       log.info("Success: Publishing of " + oid + " done");
290    }
291 
292    /**
293     * TEST: <br />
294     */
295    public void persistentSession(boolean doStop) {
296       //doSubscribe(); -> see reachedAlive()
297       log.info("Going to publish " + numPublish + " messages, xmlBlaster will be down for message 3 and 4");
298       // 
299       doSubscribe(0, this.exactSubscription, TRANSIENT);
300       doSubscribe(1, this.exactSubscription, PERSISTENT);
301       
302       for (int i=0; i<numPublish; i++) {
303          try {
304             if (i == numStop) { // 3
305                if (doStop) {
306                   log.info("Stopping xmlBlaster, but continue with publishing ...");
307                   EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
308                   System.out.println("============== Stopped xmlBlaster, but continue with publishing");
309                   this.serverThread = null;
310                }
311                else {
312                   log.info("changing run level but continue with publishing ...");
313                   this.serverThread.changeRunlevel(0, true);
314                   System.out.println("============== Changed run level to 0 but continue with publishing");
315                }
316             }
317             if (i == numStart) {
318                if (doStop) {
319                   log.info("Starting xmlBlaster again, expecting the previous published two messages ...");
320                   // serverThread = EmbeddedXmlBlaster.startXmlBlaster(serverPort);
321                   serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.serverGlobal);
322                   log.info("xmlBlaster started, waiting on tail back messsages");
323                   System.out.println("============== XmlBlaster started, waiting on two tail back messsages");
324                }
325                else {
326                   log.info("changing runlevel again to runlevel 9. Expecting the previous published two messages ...");
327                   this.serverThread.changeRunlevel(9, true);
328                   log.info("xmlBlaster runlevel 9 reached, waiting on tail back messsages");
329                   System.out.println("============== Changed runlevel again to runlevel 9. Expecting the previous published two messages");
330                }
331                
332                // Message-4 We need to wait until the client reconnected (reconnect interval)
333                // Message-5
334                assertEquals("", 2, this.updateInterceptors[1].waitOnUpdate(reconnectDelay*2L, 2));
335                assertEquals("", 2, this.updateInterceptors[3].waitOnUpdate(reconnectDelay*2L, 2));
336                
337                for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear();
338             }
339             doPublish(i+1);
340             if (i == 0) {
341                doSubscribe(2, this.exactSubscription, TRANSIENT);
342                doSubscribe(3, this.exactSubscription, PERSISTENT);
343             }
344 
345             if (i < numStop || i >= numStart ) {
346                int n = 1;
347                if (i == 0 && !this.initialUpdates) n = 0;
348                assertEquals("Message nr. " + (i+1), 1, this.updateInterceptors[1].waitOnUpdate(4000L, 1));
349                assertEquals("Message nr. " + (i+1), n, this.updateInterceptors[3].waitOnUpdate(4000L, n));
350             }
351             for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear();
352          }
353          catch(XmlBlasterException e) {
354             if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_POLLING)
355                log.warning("Lost connection, my connection layer is polling: " + e.getMessage());
356             else if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_DEAD)
357                assertTrue("Lost connection, my connection layer is NOT polling", false);
358             else
359                assertTrue("Publishing problems: " + e.getMessage(), false);
360          }
361       }
362       doSubscribe(0, this.exactSubscription, TRANSIENT);
363       doSubscribe(1, this.exactSubscription, PERSISTENT);
364    }
365 
366    /**
367     * This is the callback method invoked from I_XmlBlasterAccess
368     * informing the client in an asynchronous mode if the connection was established.
369     * <p />
370     * This method is enforced through interface I_ConnectionStateListener
371     */
372    public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
373    }
374 
375    public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
376       log.info("I_ConnectionStateListener: We were lucky, reconnected to xmlBlaster");
377       // doSubscribe();    // initialize on startup and on reconnect
378    }
379 
380    public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
381       log.warning("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING);
382    }
383 
384    public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
385       log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
386    }
387 
388    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
389       String contentStr = new String(content);
390       String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
391       log.info("Receiving update of a message oid=" + updateKey.getOid() +
392                         " priority=" + updateQos.getPriority() +
393                         " state=" + updateQos.getState() +
394                         " content=" + cont);
395       log.severe("update: should never be invoked (msgInterceptors take care of it since they are passed on subscriptions), further log for receiving update of a message cbSessionId=" + cbSessionId +
396                      updateKey.toXml() + "\n" + new String(content) + updateQos.toXml());
397       return "OK";
398    }
399 
400 
401    public void testXPathInitialStop() {
402       this.exactSubscription = false;
403       this.initialUpdates = true;
404       System.out.println("============== testXPathInitialStop");
405       persistentSession(true);
406    }
407 
408    public void testXPathNoInitialStop() {
409       this.exactSubscription = false;
410       this.initialUpdates = false;
411       System.out.println("============== testXPathNoInitialStop");
412       persistentSession(true);
413    }
414 
415    public void testXPathInitialRunlevelChange() {
416       this.persistent = true;
417       this.exactSubscription = false;
418       this.initialUpdates = true;
419       System.out.println("============== testXPathInitialRunlevelChange");
420       persistentSession(false);
421    }
422 
423    // -----------------------------------------------------------------
424    private Global createConnection(Global parentGlobal, String sessionName, boolean isPersistent, boolean expectEx) {
425       try {
426          Global ret = parentGlobal.getClone(null);
427          I_XmlBlasterAccess con = ret.getXmlBlasterAccess(); // Find orb
428          ConnectQos connectQos = new ConnectQos(glob); // == "<qos>...</qos>";
429          connectQos.setSessionName(new SessionName(ret, sessionName));
430          // set the persistent connection 
431          connectQos.setPersistent(isPersistent);
432          // Setup fail save handling for connection ...
433          Address addressProp = new Address(glob);
434          addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
435          addressProp.setRetries(-1);       // -1 == forever
436          addressProp.setPingInterval(-1L); // switched off
437          connectQos.setAddress(addressProp);
438       
439          // setup failsafe handling for callback ...
440          if (this.failsafeCallback) {
441             CallbackAddress cbAddress = new CallbackAddress(this.glob);
442             cbAddress.setRetries(-1);
443             cbAddress.setPingInterval(-1);
444             cbAddress.setDelay(1000L);
445             connectQos.addCallbackAddress(cbAddress);
446          }
447          con.connect(connectQos, this);  // Login to xmlBlaster, register for updates
448          if (expectEx) assertTrue("an exception was expected here because of overflow: Configuration of session queue probably not working", false);
449          return ret;
450       }
451       catch (XmlBlasterException ex) {
452          if (expectEx) log.info("createConnection: exception was OK since overflow was expected");
453          else assertTrue("an exception should not occur here", false);
454       }
455       return null; //to make compiler happy
456    }      
457 
458    
459    /**
460     * Tests the requirement:
461     * - If the storage for the sessions is overflown, it should throw an exception
462     *
463     */
464    public void testOverflow() {
465       System.out.println("============== testXPathNoInitialStop");
466       // to change the configuration on server side (limit the queue sizes)
467       tearDown();
468       setup(true);
469       Global[] globals = new Global[5];
470       try {
471          globals[0] = createConnection(this.origGlobal, "bjoern/1", true , false);
472          globals[1] = createConnection(this.origGlobal, "fritz/2", false, false);
473          globals[3] = createConnection(this.origGlobal, "dimitri/3", true , true); // <-- exception (since main connection also persistent)
474          globals[2] = createConnection(this.origGlobal, "pandora/4", false , false); // OK since transient
475          globals[4] = createConnection(this.origGlobal, "jonny/5", true, true);
476       }
477       finally {
478          Util.delay(2000);
479          for (int i=0; i < globals.length; i++) {
480             if (globals[i] != null) globals[i].getXmlBlasterAccess().disconnect(new DisconnectQos(globals[i]));
481          }
482       }
483    }
484 
485    /**
486     * Invoke: java org.xmlBlaster.test.client.TestPersistentSession
487     * <p />
488     * @deprecated Use the TestRunner from the testsuite to run it:<p />
489     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestPersistentSession</pre>
490     */
491    public static void main(String args[])
492    {
493       Global glob = new Global();
494       if (glob.init(args) != 0) {
495          System.out.println(ME + ": Init failed");
496          System.exit(1);
497       }
498 
499       TestPersistentSession testSub = new TestPersistentSession(glob, "TestPersistentSession/1");
500 
501       testSub.setUp();
502       testSub.testXPathInitialStop();
503       testSub.tearDown();
504 
505       testSub.setUp();
506       testSub.testXPathNoInitialStop();
507       testSub.tearDown();
508 
509       testSub.setUp();
510       testSub.testXPathInitialRunlevelChange();
511       testSub.tearDown();
512 
513       testSub.setUp();
514       testSub.testOverflow();
515       testSub.tearDown();
516 
517       log.info("Main done");
518    }
519 }


syntax highlighted by Code2HTML, v. 0.9.1