1 /*------------------------------------------------------------------------------
  2 Name:      TestSessionReconnect.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.authentication;
  7 
  8 import java.util.logging.Logger;
  9 import org.xmlBlaster.util.Global;
 10 import org.xmlBlaster.util.qos.HistoryQos;
 11 import org.xmlBlaster.util.def.PriorityEnum;
 12 import org.xmlBlaster.util.def.Constants;
 13 import org.xmlBlaster.util.qos.TopicProperty;
 14 import org.xmlBlaster.util.qos.address.CallbackAddress;
 15 import org.xmlBlaster.client.qos.ConnectQos;
 16 import org.xmlBlaster.client.qos.ConnectReturnQos;
 17 import org.xmlBlaster.util.XmlBlasterException;
 18 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 19 import org.xmlBlaster.client.key.PublishKey;
 20 import org.xmlBlaster.client.key.SubscribeKey;
 21 import org.xmlBlaster.client.qos.PublishQos;
 22 import org.xmlBlaster.client.qos.PublishReturnQos;
 23 import org.xmlBlaster.client.qos.SubscribeQos;
 24 import org.xmlBlaster.client.I_XmlBlasterAccess;
 25 import org.xmlBlaster.util.MsgUnit;
 26 
 27 import org.xmlBlaster.test.Util;
 28 import org.xmlBlaster.test.MsgInterceptor;
 29 import org.xmlBlaster.test.util.Client;
 30 
 31 import junit.framework.*;
 32 
 33 
 34 /**
 35  * This client does test if a subscriber can reconnect to its session and 
 36  * its callback queue holded the messages during downtime. 
 37  * <p>
 38  * This client may be invoked multiple time on the same xmlBlaster server,
 39  * as it cleans up everything after his tests are done.
 40  * </p>
 41  * <p>
 42  * Invoke examples:
 43  * </p>
 44  * <pre>
 45  *    java junit.textui.TestRunner org.xmlBlaster.test.authentication.TestSessionReconnect
 46  *    java junit.swingui.TestRunner -noloading org.xmlBlaster.test.authentication.TestSessionReconnect
 47  * </pre>
 48  */
 49 public class TestSessionReconnect extends TestCase
 50 {
 51    private final Global glob;
 52    private static Logger log = Logger.getLogger(TestSessionReconnect.class.getName());
 53    private String passwd = "secret";
 54    private int serverPort = 7615;
 55    private String oid = "TestSessionReconnect.Msg";
 56    private EmbeddedXmlBlaster serverThread = null;
 57    private String sessionNameSub = "TestSessionReconnectSubscriber";
 58    private I_XmlBlasterAccess conSub;
 59    private I_XmlBlasterAccess conSub2;
 60    private MsgInterceptor updateInterceptorSub;
 61 
 62    private String sessionNamePub = "TestSessionReconnectPublisher";
 63    private I_XmlBlasterAccess conPub;
 64 
 65    /** For Junit */
 66    public TestSessionReconnect() {
 67       this(new Global(), "TestSessionReconnect");
 68    }
 69 
 70    /**
 71     * Constructs the TestSessionReconnect object.
 72     * <p />
 73     * @param testName   The name used in the test suite and to login to xmlBlaster
 74     */
 75    public TestSessionReconnect(Global glob, String testName) {
 76        super(testName);
 77        this.glob = glob;
 78 
 79    }
 80 
 81    /**
 82     * Sets up the fixture.
 83     * <p />
 84     * Connect to xmlBlaster and login
 85     */
 86    protected void setUp() {
 87       glob.init(Util.getOtherServerPorts(serverPort));
 88       serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
 89       log.info("XmlBlaster is ready for testing");
 90    }
 91 
 92    /**
 93     * Cleaning up. 
 94     */
 95    protected void tearDown() {
 96       try { Thread.sleep(1000);} catch(Exception ex) {} 
 97       if (serverThread != null)
 98          serverThread.stopServer(true);
 99       // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
100       Util.resetPorts();
101    }
102 
103    /**
104     */
105    public void testSessionReconnect() {
106       log.info("testSessionReconnect("+sessionNameSub+") ...");
107 
108       try {
109          log.info("============ STEP 1: Start subscriber");
110 
111          Global globSub = glob.getClone(null);
112          // A testsuite helper to collect update messages
113          this.updateInterceptorSub = new MsgInterceptor(globSub, log, null);
114 
115          conSub = globSub.getXmlBlasterAccess();
116          
117          ConnectReturnQos crqSub = null;
118          {
119             ConnectQos qosSub = new ConnectQos(globSub, sessionNameSub, passwd);
120 
121             CallbackAddress addr = new CallbackAddress(globSub);
122             addr.setRetries(-1);
123             String secretCbSessionId = "TrustMeSub";
124             addr.setSecretCbSessionId(secretCbSessionId);
125             qosSub.getSessionCbQueueProperty().setCallbackAddress(addr);
126 
127             log.info("First subscribe connect QoS = " + qosSub.toXml());
128             crqSub = conSub.connect(qosSub, this.updateInterceptorSub); // Login to xmlBlaster
129             log.info("Connect as subscriber '" + crqSub.getSessionName() + "' success");
130          }
131 
132          SubscribeKey sk = new SubscribeKey(globSub, oid);
133          SubscribeQos sq = new SubscribeQos(globSub);
134          sq.setWantInitialUpdate(false);
135          sq.setWantLocal(true);
136          sq.setWantContent(true);
137          
138          HistoryQos historyQos = new HistoryQos(globSub);
139          historyQos.setNumEntries(1);
140          sq.setHistoryQos(historyQos);
141 
142          /*SubscribeReturnQos srq = */conSub.subscribe(sk.toXml(), sq.toXml());
143          log.info("Subscription to '" + oid + "' done");
144 
145          log.info("============ STEP 2: Start publisher");
146          Global globPub = glob.getClone(null);
147          conPub = globPub.getXmlBlasterAccess();
148          ConnectQos qosPub = new ConnectQos(globPub, sessionNamePub, passwd);
149          ConnectReturnQos crqPub = conPub.connect(qosPub, null);  // Login to xmlBlaster, no updates
150          log.info("Connect success as " + crqPub.getSessionName());
151 
152          log.info("============ STEP 3: Stop subscriber callback");
153          try {
154             Client.shutdownCb(conSub, Client.Shutdown.LEAVE_SERVER);
155             // conSub.getCbServer().shutdown();
156          }
157          catch (XmlBlasterException e) {
158             fail("ShutdownCB: " + e.getMessage());
159          }
160 
161          log.info("============ STEP 4: Publish messages");
162          int numPub = 8;
163          MsgUnit[] sentArr = new MsgUnit[numPub];
164          PublishReturnQos[] sentQos = new PublishReturnQos[numPub];
165          for(int i=0; i<numPub; i++) {
166             PublishKey pk = new PublishKey(globPub, oid, "text/xml", "1.0");
167             pk.setClientTags("<org.xmlBlaster><demo/></org.xmlBlaster>");
168             PublishQos pq = new PublishQos(globPub);
169             pq.setPriority(PriorityEnum.NORM_PRIORITY);
170             pq.setPersistent(false);
171             pq.setLifeTime(60000L);
172             if (i == 0) {
173                TopicProperty topicProperty = new TopicProperty(globPub);
174                topicProperty.setDestroyDelay(60000L);
175                topicProperty.setCreateDomEntry(true);
176                topicProperty.setReadonly(false);
177                topicProperty.getHistoryQueueProperty().setMaxEntries(numPub+5);
178                pq.setTopicProperty(topicProperty);
179                log.info("Added TopicProperty on first publish: " + topicProperty.toXml());
180             }
181 
182             byte[] content = "Hello".getBytes();
183             MsgUnit msgUnit = new MsgUnit(pk, content, pq);
184             sentArr[i] = msgUnit;
185             PublishReturnQos prq = conPub.publish(msgUnit);
186             sentQos[i] = prq;
187             log.info("Got status='" + prq.getState() + "' rcvTimestamp=" + prq.getRcvTimestamp().toString() +
188                         " for published message '" + prq.getKeyOid() + "'");
189          }
190 
191          log.info("============ STEP 5: Start subscriber callback with same public sessionId");
192          Global globSub2 = glob.getClone(null);
193          MsgInterceptor updateInterceptorSub2 = new MsgInterceptor(globSub2, log, null);
194          updateInterceptorSub2.setLogPrefix("TrustMeSub2");
195 
196          conSub2 = globSub2.getXmlBlasterAccess(); // Create a new client
197          String secretCbSessionId2 = "TrustMeSub2";
198          {
199             ConnectQos qosSub = new ConnectQos(globSub, sessionNameSub, passwd);
200             CallbackAddress addr = new CallbackAddress(globSub);
201             addr.setRetries(-1);
202             addr.setSecretCbSessionId(secretCbSessionId2);
203             qosSub.getSessionCbQueueProperty().setCallbackAddress(addr);
204             qosSub.getSessionQos().setSessionName(crqSub.getSessionQos().getSessionName());
205 
206             log.info("Second subscribe connect QoS = " + qosSub.toXml());
207             ConnectReturnQos crqSub2 = conSub2.connect(qosSub, updateInterceptorSub2); // Login to xmlBlaster
208             log.info("Connect as subscriber '" + crqSub2.getSessionName() + "' success");
209          }
210 
211          assertEquals("", 0, updateInterceptorSub.count()); // The first login session should not receive anything
212 
213          assertEquals("", numPub, updateInterceptorSub2.waitOnUpdate(4000L, oid, Constants.STATE_OK));
214          updateInterceptorSub2.compareToReceived(sentArr, secretCbSessionId2);
215          updateInterceptorSub2.compareToReceived(sentQos);
216 
217          updateInterceptorSub2.clear();
218       }
219       catch (XmlBlasterException e) {
220          log.severe(e.toString());
221          fail(e.toString());
222       }
223       finally { // clean up
224          log.info("Disconnecting '" + sessionNameSub + "'");
225          conSub.disconnect(null);
226          conSub2.disconnect(null);
227       }
228       log.info("Success in testSessionReconnect()");
229    }
230 
231    /**
232     * Method is used by TestRunner to load these tests
233     */
234    public static Test suite() {
235        TestSuite suite= new TestSuite();
236        suite.addTest(new TestSessionReconnect(Global.instance(), "testSessionReconnect"));
237        return suite;
238    }
239 
240    /**
241     * Invoke: 
242     * <pre>
243     *   java org.xmlBlaster.test.authentication.TestSessionReconnect
244     *   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.authentication.TestSessionReconnect
245     * <pre>
246     */
247    public static void main(String args[]) {
248       TestSessionReconnect testSub = new TestSessionReconnect(new Global(args), "TestSessionReconnect");
249       testSub.setUp();
250       testSub.testSessionReconnect();
251       testSub.tearDown();
252    }
253 }


syntax highlighted by Code2HTML, v. 0.9.1