1 /*------------------------------------------------------------------------------
2 Name: TestPriorizedDispatchPlugin.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.dispatch;
7
8 import java.util.logging.Logger;
9 import org.xmlBlaster.util.Global;
10 import org.xmlBlaster.util.XmlBlasterException;
11 import org.xmlBlaster.util.qos.address.CallbackAddress;
12 import org.xmlBlaster.client.qos.ConnectQos;
13 import org.xmlBlaster.util.def.PriorityEnum;
14 import org.xmlBlaster.client.I_XmlBlasterAccess;
15 import org.xmlBlaster.client.qos.PublishQos;
16 import org.xmlBlaster.client.qos.PublishReturnQos;
17 import org.xmlBlaster.client.qos.SubscribeQos;
18 import org.xmlBlaster.client.qos.SubscribeReturnQos;
19 import org.xmlBlaster.client.key.SubscribeKey;
20 import org.xmlBlaster.util.MsgUnit;
21 import org.xmlBlaster.util.def.Constants;
22 import org.xmlBlaster.util.EmbeddedXmlBlaster;
23 import org.xmlBlaster.test.Util;
24 import org.xmlBlaster.test.Msg;
25 import org.xmlBlaster.test.MsgInterceptor;
26
27 import junit.framework.*;
28
29
30 /**
31 * This client tests the
32 * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html">dispatch.control.plugin requirement</a>
33 * <p />
34 * We start our own xmlBlaster server in a thread.
35 * This client may be invoked multiple time on the same xmlBlaster server,
36 * as it cleans up everything after his tests are done.
37 * <p>
38 * Invoke examples:<br />
39 * <pre>
40 * java junit.textui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin
41 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin
42 * </pre>
43 * @see org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin
44 */
45 public class TestPriorizedDispatchPlugin extends TestCase {
46 private Global glob;
47 private static Logger log = Logger.getLogger(TestPriorizedDispatchPlugin.class.getName());
48
49 private I_XmlBlasterAccess con = null;
50 private String name;
51 private String passwd = "secret";
52 private EmbeddedXmlBlaster serverThread;
53 private int serverPort = 9560;
54 private boolean startEmbedded = true;
55 private MsgInterceptor update; // collects updated messages
56
57 private final String msgOid = "dispatchTestMessage";
58
59 private int msgSequenceNumber = 0;
60
61 private String statusOid = "_bandwidth.status";
62 private String NORMAL_LINE = "2M";
63 private String BACKUP_LINE = "64k";
64 private String DEAD_LINE = "DOWN";
65
66 private String[] states = { NORMAL_LINE, BACKUP_LINE, DEAD_LINE };
67 private String[][] expectedActions = {
68 {"send", "send", "send", "send", "send", "send", "send", "send", "send", "send"},
69 {"destroy", "destroy", "destroy", "destroy", "queue", "queue", "queue,notifySender", "send", "send", "send"},
70 {"destroy", "destroy", "destroy", "destroy", "queue", "queue", "queue", "queue", "queue", "queue"}
71 };
72
73 /**
74 * Constructs the TestPriorizedDispatchPlugin object.
75 * <p />
76 * @param testName The name used in the test suite
77 * @param name The name to login to the xmlBlaster
78 */
79 public TestPriorizedDispatchPlugin(Global glob, String testName, String name) {
80 super(testName);
81 this.glob = glob;
82
83 this.name = name;
84 }
85
86 /**
87 * Sets up the fixture.
88 * <p />
89 * We start an own xmlBlaster server in a separate thread,
90 * it is configured to load our demo dispatch plugin.
91 * <p />
92 * Then we connect as a client
93 */
94 protected void setUp() {
95 //Global embeddedGlobal = glob.getClone(null);
96 this.startEmbedded = glob.getProperty().get("startEmbedded", this.startEmbedded);
97 // We register here the demo plugin with xmlBlaster server, supplying an argument to the plugin
98 String[] args = {
99 "-DispatchPlugin[Priority][1.0]", "org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin",
100 "-DispatchPlugin/defaultPlugin", "undef",
101 "-PriorizedDispatchPlugin/user", "_PriorizedDispatchPlugin",
102 "-"+org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY, //"-PriorizedDispatchPlugin/config",
103 "<msgDispatch defaultStatus='" + BACKUP_LINE + "' defaultAction='send'>\n"+
104 " <onStatus oid='" + statusOid + "' content='" + NORMAL_LINE + "' defaultAction='send'>\n" +
105 //" <action do='send' ifPriority='0-9'/>\n" +
106 " </onStatus>\n" +
107 " <onStatus oid='" + statusOid + "' content='" + BACKUP_LINE + "' defaultAction='send'>\n" +
108 " <action do='send' ifPriority='7'/>\n" +
109 " <action do='queue,notifySender' ifPriority='6'/>\n" +
110 " <action do='queue' ifPriority='4-5'/>\n" +
111 " <action do='destroy' ifPriority='0-3'/>\n" +
112 " </onStatus>\n" +
113 " <onStatus oid='" + statusOid + "' content='" + DEAD_LINE + "' defaultAction='queue'>\n" +
114 " <action do='destroy' ifPriority='0-3'/>\n" +
115 " </onStatus>\n" +
116 "</msgDispatch>\n"
117 };
118 glob.init(args);
119
120 if (this.startEmbedded) {
121 glob.init(Util.getOtherServerPorts(serverPort));
122 serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
123 log.info("XmlBlaster is ready for testing the priority dispatch plugin");
124 }
125
126 try {
127 log.info("Connecting ...");
128 this.con = glob.getXmlBlasterAccess();
129
130 // Activate plugin for callback only:
131 ConnectQos qos = new ConnectQos(glob, name, passwd);
132 CallbackAddress cbAddress = new CallbackAddress(glob);
133 cbAddress.setDispatchPlugin("Priority,1.0");
134 qos.addCallbackAddress(cbAddress);
135
136 this.update = new MsgInterceptor(glob, log, null);
137 this.con.connect(qos, update);
138 }
139 catch (Exception e) {
140 Thread.dumpStack();
141 log.severe("Can't connect to xmlBlaster: " + e.toString());
142 }
143
144 this.update.clear();
145 }
146
147 /**
148 * @param The oid of the status message
149 * @param state Choose one of "2M" or "64k"
150 */
151 private void changeStatus(String oid, String state) {
152 log.info("Changing band width state to '" + state + "'");
153 try {
154 PublishReturnQos rq = con.publish(new MsgUnit(glob, "<key oid='" + oid + "'/>", state, null));
155 log.info("SUCCESS for state change to '" + state + "', " + rq.getState());
156 // Sleep to be shure the plugin has got and processed the message
157 try { Thread.sleep(1000L); } catch( InterruptedException i) {}
158 } catch(XmlBlasterException e) {
159 log.warning("XmlBlasterException: " + e.getMessage());
160 fail("publish bandwidth state - XmlBlasterException: " + e.getMessage());
161 }
162 }
163
164 private void publish(String oid, int priority) {
165 PriorityEnum prio = PriorityEnum.toPriorityEnum(priority);
166 try {
167 msgSequenceNumber++;
168 String content = "" + msgSequenceNumber;
169 PublishQos pq = new PublishQos(glob);
170 pq.setPriority(prio);
171 PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", content.getBytes(), pq.toXml()));
172 log.info("SUCCESS publish '" + oid + "' with prio=" + prio.toString() + " content=" + content + " returned state=" + rq.getState());
173 assertEquals("Returned oid wrong", oid, rq.getKeyOid());
174 assertEquals("Return not OK", Constants.STATE_OK, rq.getState());
175 } catch(XmlBlasterException e) {
176 log.warning("XmlBlasterException: " + e.getMessage());
177 fail("publish prio=" + prio.toString() + " - XmlBlasterException: " + e.getMessage());
178 }
179 }
180
181 private void subscribe(String oid) {
182 try {
183 SubscribeKey sk = new SubscribeKey(glob, oid);
184 SubscribeQos sq = new SubscribeQos(glob);
185 SubscribeReturnQos srq = con.subscribe(sk.toXml(), sq.toXml());
186 log.info("SUCCESS subscribe to '" + oid + "' returned state=" + srq.getState());
187 } catch(XmlBlasterException e) {
188 log.warning("XmlBlasterException: " + e.getMessage());
189 fail("subscribe - XmlBlasterException: " + e.getMessage());
190 }
191 }
192
193 /**
194 * Test all tuples of possibilities
195 */
196 public void testPriorizedDispatchPlugin() {
197 log.info("testPriorizedDispatchPlugin() ...");
198 long sleep = 1000L;
199 String text;
200
201 subscribe(msgOid);
202
203 int queueCounter = 0;
204 int destroyCounter = 0;
205
206 try {
207 for (int i=0; i<states.length; i++) {
208 changeStatus(statusOid, states[i]);
209 log.info("========================state=" + states[i]);
210 for (int priority=0; priority<expectedActions[i].length; priority++) {
211 String action = expectedActions[i][priority];
212 text = "state=" + states[i] + " action=" + action;
213 log.info("Doing " + text + " queueCounter=" + queueCounter);
214
215 boolean expectsNotify = false;
216 if (action.indexOf("notifySender") >= 0) {
217 expectsNotify = true;
218 log.info(text + ": Expecting notify");
219 }
220
221 if (action.startsWith("send")) {
222 publish(msgOid, priority);
223 assertEquals(text, 1, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
224 int count = expectsNotify ? 2 : 1;
225 assertEquals(text, count, this.update.count());
226 if (expectsNotify) {
227 String expectedState = "send,notifySender";
228 Msg msg = this.update.getMsg(msgOid, expectedState); // PtP notification
229 assertTrue("send,notifySender PtP not arrived", msg != null);
230 }
231 }
232 else if (action.startsWith("queue")) {
233 publish(msgOid, priority);
234 queueCounter++;
235 assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
236 int count = expectsNotify ? 1 : 0;
237 assertEquals(text, count, this.update.count());
238 if (expectsNotify) {
239 assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification
240 }
241 }
242 else if (action.startsWith("destroy")) {
243 publish(msgOid, priority);
244 destroyCounter++;
245 assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
246 int count = expectsNotify ? 1 : 0;
247 assertEquals(text, count, this.update.count());
248 if (expectsNotify) {
249 assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification
250 }
251 }
252 else {
253 log.severe(text + ": Action is not supported");
254 fail(text + ": Action is not supported");
255 }
256
257 this.update.clear();
258 } // for prio
259 } // for states
260
261 text = "Checking ascending sequence of flushed " + queueCounter + " messages which where hold back";
262 this.update.clear();
263 changeStatus(statusOid, NORMAL_LINE);
264 assertEquals(text, queueCounter, this.update.waitOnUpdate(2000L, msgOid, Constants.STATE_OK));
265 assertEquals(text, queueCounter, this.update.count());
266 Msg[] msgArr = this.update.getMsgs();
267 assertEquals(text, queueCounter, msgArr.length);
268 int lastNum = -1;
269 int lastPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;
270 for (int i=0; i<msgArr.length; i++) {
271 log.info("Received flushed hold back message " + msgArr[i].getUpdateKey().getOid() +
272 " priority=" + msgArr[i].getUpdateQos().getPriority() +
273 " content=" + msgArr[i].getContentStr() +
274 " state=" + msgArr[i].getUpdateQos().getState());
275 }
276 for (int i=0; i<msgArr.length; i++) {
277 int currPrio = msgArr[i].getUpdateQos().getPriority().getInt();
278 int currNum = msgArr[i].getContentInt();
279 if (lastPrio < currPrio || lastPrio == currPrio && lastNum >= currNum)
280 fail(text + " Sequence is not ascending: last=" + lastNum + " curr=" + currNum);
281 lastNum = currNum;
282 lastPrio = currPrio;
283 }
284 this.update.clear();
285 }
286 catch (XmlBlasterException e) {
287 fail(e.toString());
288 }
289 log.info("Success in testPriorizedDispatchPlugin()");
290 }
291
292 /**
293 * Tests to change the plugin configuration and different status message oids.
294 */
295 public void testPriorizedDispatchPluginReconfigure() {
296 log.info("testPriorizedDispatchPluginReconfigure() ...");
297 String statusOid2 = statusOid+"-2";
298 String config =
299 "<msgDispatch defaultStatus='GO' defaultAction='send'>\n"+
300 " <onStatus oid='" + statusOid + "' content='GO' defaultAction='send'>\n" +
301 " <action do='send' ifPriority='0-9'/>\n" +
302 " </onStatus>\n" +
303 " <onStatus oid='" + statusOid2 + "' content='" + BACKUP_LINE + "' defaultAction='send'>\n" +
304 " <action do='queue' ifPriority='0-9'/>\n" +
305 " </onStatus>\n" +
306 "</msgDispatch>\n";
307
308 publishNewConfig(config);
309
310 String text = "Testing configuration";
311
312 long sleep = 2000L;
313
314 //try {
315 subscribe(msgOid);
316
317 int maxPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;
318
319 // check normal operation
320 changeStatus(statusOid, "GO");
321 for (int priority=0; priority < maxPrio; priority++) {
322 publish(msgOid, priority);
323 }
324 assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
325 log.info("SUCCESS, state=GO");
326 this.update.clear();
327
328 // queue messages
329 changeStatus(statusOid2, BACKUP_LINE);
330 for (int priority=0; priority < maxPrio; priority++) {
331 publish(msgOid, priority);
332 }
333 assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
334 log.info("SUCCESS, state=" + BACKUP_LINE);
335 this.update.clear();
336
337 // flush the before queued messages
338 changeStatus(statusOid, "GO");
339 assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
340 log.info("SUCCESS, state=GO");
341 this.update.clear();
342
343 // check unkown message content
344 changeStatus(statusOid, "??YYXX");
345 for (int priority=0; priority < maxPrio; priority++) {
346 publish(msgOid, priority);
347 }
348 assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
349 log.info("SUCCESS, state=GO");
350 this.update.clear();
351 /*
352 }
353 catch (XmlBlasterException e) {
354 fail(e.toString());
355 } */
356 log.info("Success in testPriorizedDispatchPluginReconfigure()");
357 }
358
359 /**
360 * Change the configuration of the plugin
361 */
362 private void publishNewConfig(String config) {
363 String configKey = org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY; // "PriorizedDispatchPlugin/config"
364 try {
365 String oid = "__cmd:sysprop/?" + configKey;
366 String contentStr = config;
367 PublishQos pq = new PublishQos(glob);
368 PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", contentStr.getBytes(), pq.toXml()));
369 log.info("SUCCESS publish new configuration '" + oid + "' returned state=" + rq.getState());
370 assertEquals("Returned oid wrong", oid, rq.getKeyOid());
371 assertEquals("Return not OK", Constants.STATE_OK, rq.getState());
372 } catch(XmlBlasterException e) {
373 log.warning("XmlBlasterException: " + e.toString());
374 fail("publish of configuration data - XmlBlasterException: " + e.getMessage());
375 }
376 }
377
378 /**
379 * Test the notifySender message
380 * 1. subscribe to a message
381 * 2. change state to 64k
382 * 3. send a message with prio 6 which should trigger a notify PtP message
383 */
384 public void testPriorizedDispatchPluginOne() {
385 log.info("testPriorizedDispatchPluginOne() ...");
386
387 long sleep = 2000L;
388 String text = "state=" + BACKUP_LINE + " action=queue,notifySender";
389
390 // <action do='queue,notifySender' ifPriority='6'/>
391
392 subscribe(msgOid);
393
394 changeStatus(statusOid, BACKUP_LINE);
395 try { Thread.sleep(1000L); } catch( InterruptedException i) {} // Wait some time
396
397 int priority = 6;
398 log.info(text + ": Expecting notify");
399
400 this.update.clear();
401 publish(msgOid, priority);
402 assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
403 assertEquals(text, 1, this.update.count());
404 assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification
405
406 this.update.clear();
407
408 changeStatus(statusOid, NORMAL_LINE);
409 log.info(text + ": Expecting queued message");
410 assertEquals(text, 1, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
411
412 log.info("Success in testPriorizedDispatchPluginOne()");
413 }
414
415 /**
416 * Tears down the fixture.
417 * <p />
418 * cleaning up .... erase() the previous message OID and logout
419 */
420 protected void tearDown() {
421 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait some time
422
423 this.con.disconnect(null);
424 this.con = null;
425
426 if (this.startEmbedded) {
427 try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
428 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
429 this.serverThread = null;
430 }
431
432 // reset to default server port (necessary if other tests follow in the same JVM).
433 Util.resetPorts(glob);
434 this.glob = null;
435
436 this.con = null;
437 this.update = null;
438 Global.instance().shutdown();
439 }
440
441 /**
442 * Method is used by TestRunner to load these tests
443 */
444 public static Test suite() {
445 TestSuite suite= new TestSuite();
446 suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPluginOne", "PriorizedDispatchPluginOne"));
447 suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPlugin", "PriorizedDispatchPlugin"));
448 suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPluginReconfigure", "PriorizedDispatchPluginRecovery"));
449 return suite;
450 }
451
452 /**
453 * Invoke:
454 * <pre>
455 * java org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin -logging/org.xmlBlaster.engine.dispatch FINE -logging/org.xmlBlaster.util.dispatch FINE -logging/org.xmlBlaster.engine FINEST
456 * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin
457 * <pre>
458 */
459 public static void main(String args[]) {
460 Global glob = new Global();
461 if (glob.init(args) != 0) {
462 System.exit(0);
463 }
464 TestPriorizedDispatchPlugin testSub = new TestPriorizedDispatchPlugin(glob, "TestPriorizedDispatchPlugin", "TestPriorizedDispatchPlugin");
465 testSub.setUp();
466 testSub.testPriorizedDispatchPlugin();
467 //testSub.testPriorizedDispatchPluginReconfigure();
468 //testSub.testPriorizedDispatchPluginOne();
469 testSub.tearDown();
470 }
471 }
syntax highlighted by Code2HTML, v. 0.9.1