1 /*------------------------------------------------------------------------------
2 Name: TestSessionReconnect.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.authentication;
7
8 import java.util.logging.Logger;
9 import org.xmlBlaster.util.Global;
10 import org.xmlBlaster.util.qos.HistoryQos;
11 import org.xmlBlaster.util.def.PriorityEnum;
12 import org.xmlBlaster.util.def.Constants;
13 import org.xmlBlaster.util.qos.TopicProperty;
14 import org.xmlBlaster.util.qos.address.CallbackAddress;
15 import org.xmlBlaster.client.qos.ConnectQos;
16 import org.xmlBlaster.client.qos.ConnectReturnQos;
17 import org.xmlBlaster.util.XmlBlasterException;
18 import org.xmlBlaster.util.EmbeddedXmlBlaster;
19 import org.xmlBlaster.client.key.PublishKey;
20 import org.xmlBlaster.client.key.SubscribeKey;
21 import org.xmlBlaster.client.qos.PublishQos;
22 import org.xmlBlaster.client.qos.PublishReturnQos;
23 import org.xmlBlaster.client.qos.SubscribeQos;
24 import org.xmlBlaster.client.I_XmlBlasterAccess;
25 import org.xmlBlaster.util.MsgUnit;
26
27 import org.xmlBlaster.test.Util;
28 import org.xmlBlaster.test.MsgInterceptor;
29 import org.xmlBlaster.test.util.Client;
30
31 import junit.framework.*;
32
33
34 /**
35 * This client does test if a subscriber can reconnect to its session and
36 * its callback queue holded the messages during downtime.
37 * <p>
38 * This client may be invoked multiple time on the same xmlBlaster server,
39 * as it cleans up everything after his tests are done.
40 * </p>
41 * <p>
42 * Invoke examples:
43 * </p>
44 * <pre>
45 * java junit.textui.TestRunner org.xmlBlaster.test.authentication.TestSessionReconnect
46 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.authentication.TestSessionReconnect
47 * </pre>
48 */
49 public class TestSessionReconnect extends TestCase
50 {
51 private final Global glob;
52 private static Logger log = Logger.getLogger(TestSessionReconnect.class.getName());
53 private String passwd = "secret";
54 private int serverPort = 7615;
55 private String oid = "TestSessionReconnect.Msg";
56 private EmbeddedXmlBlaster serverThread = null;
57 private String sessionNameSub = "TestSessionReconnectSubscriber";
58 private I_XmlBlasterAccess conSub;
59 private I_XmlBlasterAccess conSub2;
60 private MsgInterceptor updateInterceptorSub;
61
62 private String sessionNamePub = "TestSessionReconnectPublisher";
63 private I_XmlBlasterAccess conPub;
64
65 /** For Junit */
66 public TestSessionReconnect() {
67 this(new Global(), "TestSessionReconnect");
68 }
69
70 /**
71 * Constructs the TestSessionReconnect object.
72 * <p />
73 * @param testName The name used in the test suite and to login to xmlBlaster
74 */
75 public TestSessionReconnect(Global glob, String testName) {
76 super(testName);
77 this.glob = glob;
78
79 }
80
81 /**
82 * Sets up the fixture.
83 * <p />
84 * Connect to xmlBlaster and login
85 */
86 protected void setUp() {
87 glob.init(Util.getOtherServerPorts(serverPort));
88 serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
89 log.info("XmlBlaster is ready for testing");
90 }
91
92 /**
93 * Cleaning up.
94 */
95 protected void tearDown() {
96 try { Thread.sleep(1000);} catch(Exception ex) {}
97 if (serverThread != null)
98 serverThread.stopServer(true);
99 // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
100 Util.resetPorts();
101 }
102
103 /**
104 */
105 public void testSessionReconnect() {
106 log.info("testSessionReconnect("+sessionNameSub+") ...");
107
108 try {
109 log.info("============ STEP 1: Start subscriber");
110
111 Global globSub = glob.getClone(null);
112 // A testsuite helper to collect update messages
113 this.updateInterceptorSub = new MsgInterceptor(globSub, log, null);
114
115 conSub = globSub.getXmlBlasterAccess();
116
117 ConnectReturnQos crqSub = null;
118 {
119 ConnectQos qosSub = new ConnectQos(globSub, sessionNameSub, passwd);
120
121 CallbackAddress addr = new CallbackAddress(globSub);
122 addr.setRetries(-1);
123 String secretCbSessionId = "TrustMeSub";
124 addr.setSecretCbSessionId(secretCbSessionId);
125 qosSub.getSessionCbQueueProperty().setCallbackAddress(addr);
126
127 log.info("First subscribe connect QoS = " + qosSub.toXml());
128 crqSub = conSub.connect(qosSub, this.updateInterceptorSub); // Login to xmlBlaster
129 log.info("Connect as subscriber '" + crqSub.getSessionName() + "' success");
130 }
131
132 SubscribeKey sk = new SubscribeKey(globSub, oid);
133 SubscribeQos sq = new SubscribeQos(globSub);
134 sq.setWantInitialUpdate(false);
135 sq.setWantLocal(true);
136 sq.setWantContent(true);
137
138 HistoryQos historyQos = new HistoryQos(globSub);
139 historyQos.setNumEntries(1);
140 sq.setHistoryQos(historyQos);
141
142 /*SubscribeReturnQos srq = */conSub.subscribe(sk.toXml(), sq.toXml());
143 log.info("Subscription to '" + oid + "' done");
144
145 log.info("============ STEP 2: Start publisher");
146 Global globPub = glob.getClone(null);
147 conPub = globPub.getXmlBlasterAccess();
148 ConnectQos qosPub = new ConnectQos(globPub, sessionNamePub, passwd);
149 ConnectReturnQos crqPub = conPub.connect(qosPub, null); // Login to xmlBlaster, no updates
150 log.info("Connect success as " + crqPub.getSessionName());
151
152 log.info("============ STEP 3: Stop subscriber callback");
153 try {
154 Client.shutdownCb(conSub, Client.Shutdown.LEAVE_SERVER);
155 // conSub.getCbServer().shutdown();
156 }
157 catch (XmlBlasterException e) {
158 fail("ShutdownCB: " + e.getMessage());
159 }
160
161 log.info("============ STEP 4: Publish messages");
162 int numPub = 8;
163 MsgUnit[] sentArr = new MsgUnit[numPub];
164 PublishReturnQos[] sentQos = new PublishReturnQos[numPub];
165 for(int i=0; i<numPub; i++) {
166 PublishKey pk = new PublishKey(globPub, oid, "text/xml", "1.0");
167 pk.setClientTags("<org.xmlBlaster><demo/></org.xmlBlaster>");
168 PublishQos pq = new PublishQos(globPub);
169 pq.setPriority(PriorityEnum.NORM_PRIORITY);
170 pq.setPersistent(false);
171 pq.setLifeTime(60000L);
172 if (i == 0) {
173 TopicProperty topicProperty = new TopicProperty(globPub);
174 topicProperty.setDestroyDelay(60000L);
175 topicProperty.setCreateDomEntry(true);
176 topicProperty.setReadonly(false);
177 topicProperty.getHistoryQueueProperty().setMaxEntries(numPub+5);
178 pq.setTopicProperty(topicProperty);
179 log.info("Added TopicProperty on first publish: " + topicProperty.toXml());
180 }
181
182 byte[] content = "Hello".getBytes();
183 MsgUnit msgUnit = new MsgUnit(pk, content, pq);
184 sentArr[i] = msgUnit;
185 PublishReturnQos prq = conPub.publish(msgUnit);
186 sentQos[i] = prq;
187 log.info("Got status='" + prq.getState() + "' rcvTimestamp=" + prq.getRcvTimestamp().toString() +
188 " for published message '" + prq.getKeyOid() + "'");
189 }
190
191 log.info("============ STEP 5: Start subscriber callback with same public sessionId");
192 Global globSub2 = glob.getClone(null);
193 MsgInterceptor updateInterceptorSub2 = new MsgInterceptor(globSub2, log, null);
194 updateInterceptorSub2.setLogPrefix("TrustMeSub2");
195
196 conSub2 = globSub2.getXmlBlasterAccess(); // Create a new client
197 String secretCbSessionId2 = "TrustMeSub2";
198 {
199 ConnectQos qosSub = new ConnectQos(globSub, sessionNameSub, passwd);
200 CallbackAddress addr = new CallbackAddress(globSub);
201 addr.setRetries(-1);
202 addr.setSecretCbSessionId(secretCbSessionId2);
203 qosSub.getSessionCbQueueProperty().setCallbackAddress(addr);
204 qosSub.getSessionQos().setSessionName(crqSub.getSessionQos().getSessionName());
205
206 log.info("Second subscribe connect QoS = " + qosSub.toXml());
207 ConnectReturnQos crqSub2 = conSub2.connect(qosSub, updateInterceptorSub2); // Login to xmlBlaster
208 log.info("Connect as subscriber '" + crqSub2.getSessionName() + "' success");
209 }
210
211 assertEquals("", 0, updateInterceptorSub.count()); // The first login session should not receive anything
212
213 assertEquals("", numPub, updateInterceptorSub2.waitOnUpdate(4000L, oid, Constants.STATE_OK));
214 updateInterceptorSub2.compareToReceived(sentArr, secretCbSessionId2);
215 updateInterceptorSub2.compareToReceived(sentQos);
216
217 updateInterceptorSub2.clear();
218 }
219 catch (XmlBlasterException e) {
220 log.severe(e.toString());
221 fail(e.toString());
222 }
223 finally { // clean up
224 log.info("Disconnecting '" + sessionNameSub + "'");
225 conSub.disconnect(null);
226 conSub2.disconnect(null);
227 }
228 log.info("Success in testSessionReconnect()");
229 }
230
231 /**
232 * Method is used by TestRunner to load these tests
233 */
234 public static Test suite() {
235 TestSuite suite= new TestSuite();
236 suite.addTest(new TestSessionReconnect(Global.instance(), "testSessionReconnect"));
237 return suite;
238 }
239
240 /**
241 * Invoke:
242 * <pre>
243 * java org.xmlBlaster.test.authentication.TestSessionReconnect
244 * java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.authentication.TestSessionReconnect
245 * <pre>
246 */
247 public static void main(String args[]) {
248 TestSessionReconnect testSub = new TestSessionReconnect(new Global(args), "TestSessionReconnect");
249 testSub.setUp();
250 testSub.testSessionReconnect();
251 testSub.tearDown();
252 }
253 }
syntax highlighted by Code2HTML, v. 0.9.1