1 package org.xmlBlaster.test.cluster;
2
3 import java.util.logging.Logger;
4 import java.util.logging.Level;
5 import org.xmlBlaster.util.Global;
6
7 // for client connections:
8 import org.xmlBlaster.util.*;
9 import org.xmlBlaster.client.I_Callback;
10 import org.xmlBlaster.client.key.PublishKey;
11 import org.xmlBlaster.client.key.EraseKey;
12 import org.xmlBlaster.client.key.GetKey;
13 import org.xmlBlaster.client.key.SubscribeKey;
14 import org.xmlBlaster.client.key.UnSubscribeKey;
15 import org.xmlBlaster.client.key.UpdateKey;
16 import org.xmlBlaster.client.qos.PublishQos;
17 import org.xmlBlaster.client.qos.PublishReturnQos;
18 import org.xmlBlaster.client.qos.UpdateQos;
19 import org.xmlBlaster.client.qos.SubscribeQos;
20 import org.xmlBlaster.client.qos.SubscribeReturnQos;
21 import org.xmlBlaster.client.qos.UnSubscribeQos;
22 import org.xmlBlaster.client.qos.EraseQos;
23 import org.xmlBlaster.client.I_XmlBlasterAccess;
24 import org.xmlBlaster.util.MsgUnit;
25
26
27 import java.util.Vector;
28 import java.io.File;
29
30 import junit.framework.*;
31
32 /**
33 * Test publishing a message from bilbo to heron.
34 * <p />
35 * <pre>
36 * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.PublishTest
37 * </pre>
38 * NOTE: asserts() in update() methods are routed back to server and are not handled
39 * by the junit testsuite, so we check double (see code).
40 * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">Cluster requirement</a>
41 */
42 public class PublishTest extends TestCase {
43 private String ME = "PublishTest";
44 private Global glob;
45 private static Logger log = Logger.getLogger(PublishTest.class.getName());
46 private ServerHelper serverHelper;
47
48 private I_XmlBlasterAccess heronCon, avalonCon, golanCon, frodoCon, bilboCon;
49
50 private int updateCounterHeron = 0;
51 private int updateCounterFrodo = 0;
52 private int updateCounterBilbo = 0;
53 private String oid = "PublishToBilbo";
54 private String domain = "RUGBY_NEWS"; // heron is master for RUGBY_NEWS
55 private String contentStr = "We win";
56
57 private String assertInUpdate = null;
58
59 public PublishTest(String name) {
60 super(name);
61 this.glob = new Global(null, true, false);
62 }
63
64 /**
65 * Initialize the test ...
66 */
67 protected void setUp() {
68
69 log.info("Entering setUp(), test starts");
70
71 serverHelper = new ServerHelper(glob, log, ME);
72
73 // Starts a cluster node
74 serverHelper.startHeron();
75 //serverHelper.startAvalon();
76 //serverHelper.startGolan();
77 serverHelper.startFrodo();
78 serverHelper.startBilbo();
79 }
80
81 /**
82 * cleaning up ...
83 */
84 protected void tearDown() {
85 log.info("Entering tearDown(), test is finished");
86 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
87
88 if (bilboCon != null) { bilboCon.disconnect(null); bilboCon = null; }
89 if (frodoCon != null) { frodoCon.disconnect(null); frodoCon = null; }
90 if (golanCon != null) { golanCon.disconnect(null); golanCon = null; }
91 if (avalonCon != null) { avalonCon.disconnect(null); avalonCon = null; }
92 if (heronCon != null) { heronCon.disconnect(null); heronCon = null; }
93
94 serverHelper.tearDown();
95 }
96
97 /**
98 * We start all nodes as described in requirement
99 * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
100 * publish a message to bilbo which should be routed to heron.
101 * Than we try to access the message at heron
102 */
103 public void testPublish() {
104 System.err.println("***PublishTest: Publish a message to a cluster slave ...");
105 try {
106 bilboCon = serverHelper.connect(serverHelper.getBilboGlob(), new I_Callback() { // Login to xmlBlaster, register for updates
107 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
108 assertInUpdate = serverHelper.getBilboGlob().getId() + ": Should not receive the message '" + updateKey.getOid() + "'";
109 fail(assertInUpdate); // This is routed to server, not to junit
110 return "";
111 }
112 });
113 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
114 assertTrue(assertInUpdate, assertInUpdate == null);
115 assertInUpdate = null;
116
117 PublishKey pk = new PublishKey(glob, oid, "text/plain", "1.0");
118 pk.setDomain(domain);
119 PublishQos pq = new PublishQos(glob);
120 MsgUnit msgUnit = new MsgUnit(pk, contentStr, pq);
121 PublishReturnQos prq = bilboCon.publish(msgUnit);
122 log.info("Published message of domain='" + pk.getDomain() + "' and content='" + contentStr +
123 "' to xmlBlaster node bilbo with IP=" + serverHelper.getBilboGlob().getProperty().get("bootstrapPort",0) +
124 ", the returned QoS is: " + prq.getKeyOid());
125
126 heronCon = serverHelper.connect(serverHelper.getHeronGlob(), new I_Callback() { // Login to xmlBlaster, register for updates
127 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
128 log.severe("Receive message '" + updateKey.getOid() + "'");
129 assertInUpdate = serverHelper.getHeronGlob().getId() + ": Did not expect message update in default handler";
130 fail(assertInUpdate); // This is routed to server, not to junit
131 return "";
132 }
133 });
134 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
135 assertTrue(assertInUpdate, assertInUpdate == null);
136 assertInUpdate = null;
137
138 System.err.println("->Check if the message has reached the master node heron ...");
139 GetKey gk = new GetKey(glob, oid);
140 MsgUnit[] msgs = heronCon.get(gk.toXml(), null);
141 assertTrue("Invalid msgs returned", msgs != null);
142 assertEquals("Invalid number of messages returned", 1, msgs.length);
143 assertTrue("Invalid message oid returned", msgs[0].getKey().indexOf(oid) > 0);
144 log.info("SUCCESS: Got message:" + msgs[0].getKey());
145
146 System.err.println("->Check if the message is available at the slave node bilbo ...");
147 gk = new GetKey(glob, oid);
148 gk.setDomain(domain);
149 msgs = bilboCon.get(gk.toXml(), null);
150 assertTrue("Invalid msgs returned", msgs != null);
151 assertEquals("Invalid number of messages returned", 1, msgs.length);
152 log.info("SUCCESS: Got message:" + msgs[0].getKey());
153
154 System.err.println("->Trying to erase the message at the slave node ...");
155 EraseKey ek = new EraseKey(glob, oid);
156 ek.setDomain(domain);
157 EraseQos eq = new EraseQos(glob);
158 bilboCon.erase(ek.toXml(), eq.toXml());
159
160 // Check if erased ...
161 gk = new GetKey(glob, oid);
162 msgs = heronCon.get(gk.toXml(), null);
163 assertTrue("Invalid msgs returned", msgs != null);
164 assertEquals("Invalid number of messages returned", 0, msgs.length);
165 log.info("SUCCESS: Got no message after erase");
166
167 System.err.println("***PublishTest: Publish a message to a cluster slave - frodo is offline ...");
168
169 System.err.println("->Subscribe from heron, the message is currently erased ...");
170 SubscribeKey sk = new SubscribeKey(glob, oid);
171 sk.setDomain(domain);
172 SubscribeQos sq = new SubscribeQos(glob);
173 SubscribeReturnQos srq = heronCon.subscribe(sk.toXml(), sq.toXml(), new I_Callback() {
174 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
175 assertInUpdate = serverHelper.getHeronGlob().getId() + ": Receiving unexpected asynchronous update message";
176 assertEquals(assertInUpdate, oid, updateKey.getOid());
177 assertInUpdate = serverHelper.getHeronGlob().getId() + ": Receiving corrupted asynchronous update message";
178 assertEquals(assertInUpdate, contentStr, new String(content));
179 log.info("heronCon - Receiving asynchronous message '" + updateKey.getOid() + "' in " + oid + " handler, state=" + updateQos.getState());
180 updateCounterHeron++;
181 assertInUpdate = null;
182 return "";
183 }
184 }); // subscribe with our specific update handler
185 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
186 assertTrue(assertInUpdate, assertInUpdate == null);
187 assertInUpdate = null;
188
189 serverHelper.stopFrodo();
190
191 System.err.println("->Check: heron hasn't got the message ...");
192 gk = new GetKey(glob, oid);
193 msgs = heronCon.get(gk.toXml(), null);
194 assertTrue("Invalid msgs returned", msgs != null);
195 assertEquals("Invalid number of messages returned", 0, msgs.length);
196 log.info("SUCCESS: Got no message after erase");
197
198 // publish again ...
199 pk = new PublishKey(glob, oid, "text/plain", "1.0");
200 pk.setDomain(domain);
201 pq = new PublishQos(glob);
202 msgUnit = new MsgUnit(pk.toXml(), contentStr.getBytes(), pq.toXml());
203 prq = bilboCon.publish(msgUnit);
204 log.info("Published message of domain='" + pk.getDomain() + "' and content='" + contentStr +
205 "' to xmlBlaster node bilbo with IP=" + serverHelper.getBilboGlob().getProperty().get("bootstrapPort",0) +
206 ", the returned QoS is: " + prq.getKeyOid());
207
208
209 assertEquals("heron is not reachable, publish should not have come through", 0, updateCounterHeron);
210
211 serverHelper.startFrodo();
212
213 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
214 assertEquals("heron has not received message", 1, updateCounterHeron);
215 updateCounterHeron = 0;
216
217 System.err.println("->Connect to frodo ...");
218 frodoCon = serverHelper.connect(serverHelper.getFrodoGlob(), new I_Callback() {
219 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
220 assertInUpdate = serverHelper.getFrodoGlob().getId() + ": Receive unexpected message '" + updateKey.getOid() + "'";
221 return "";
222 }
223 });
224 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
225 assertTrue(assertInUpdate, assertInUpdate == null);
226 assertInUpdate = null;
227
228 System.err.println("->Subscribe from frodo, is he able to organize it?");
229 sk = new SubscribeKey(glob, oid);
230 sk.setDomain(domain);
231 sq = new SubscribeQos(glob);
232 srq = frodoCon.subscribe(sk.toXml(), sq.toXml(), new I_Callback() {
233 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
234 log.info("frodoCon - Receiving asynchronous message '" + updateKey.getOid() + "' in " + oid + " handler, state=" + updateQos.getState());
235 updateCounterFrodo++;
236 assertInUpdate = null;
237 return "";
238 }
239 }); // subscribe with our specific update handler
240 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
241 assertTrue(assertInUpdate, assertInUpdate == null);
242 assertInUpdate = null;
243
244 try { Thread.sleep(5000); } catch( InterruptedException i) {} // Wait some time
245 assertEquals("frodo is reachable again, subscribe should work", 1, updateCounterFrodo);
246
247 updateCounterHeron = 0;
248 updateCounterFrodo = 0;
249 updateCounterBilbo = 0;
250
251 System.err.println("->Check unSubscribe from client frodo ...");
252 UnSubscribeKey uk = new UnSubscribeKey(glob, srq.getSubscriptionId());
253 UnSubscribeQos uq = new UnSubscribeQos(glob);
254 frodoCon.unSubscribe(uk.toXml(), uq.toXml());
255
256 System.err.println("->Check publish, frodo should not get it ...");
257 pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
258 pq = new PublishQos(glob);
259 msgUnit = new MsgUnit(pk, contentStr, pq);
260 prq = frodoCon.publish(msgUnit);
261 log.info("Published message of domain='" + pk.getDomain() + "' and content='" + contentStr +
262 "' to xmlBlaster node frodo with IP=" + serverHelper.getFrodoGlob().getProperty().get("bootstrapPort",0) +
263 ", the returned QoS is: " + prq.getKeyOid());
264
265 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
266 assertEquals("frodo is unSubscribed and should not receive message", 0, updateCounterFrodo);
267 assertEquals("heron has not received message", 1, updateCounterHeron);
268
269
270 }
271 catch (XmlBlasterException e) {
272 e.printStackTrace();
273 fail("PublishToBilbo-Exception: " + e.toString());
274 }
275 /* is done in tearDown
276 finally {
277 if (bilboCon != null) {
278 bilboCon.disconnect(null);
279 bilboCon = null;
280 }
281 }
282 */
283
284 System.err.println("***PublishTest: testPublish [SUCCESS]");
285 }
286
287 /**
288 * setUp() and tearDown() are ivoked between each test...() method
289 */
290 /*
291 public void testDummy() {
292 System.err.println("***PublishTest: testDummy [SUCCESS]");
293 }
294 */
295 }
syntax highlighted by Code2HTML, v. 0.9.1