1 /*------------------------------------------------------------------------------
  2 Name:      TestPriorizedDispatchPlugin.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.dispatch;
  7 
  8 import java.util.logging.Logger;
  9 import org.xmlBlaster.util.Global;
 10 import org.xmlBlaster.util.XmlBlasterException;
 11 import org.xmlBlaster.util.qos.address.CallbackAddress;
 12 import org.xmlBlaster.client.qos.ConnectQos;
 13 import org.xmlBlaster.util.def.PriorityEnum;
 14 import org.xmlBlaster.client.I_XmlBlasterAccess;
 15 import org.xmlBlaster.client.qos.PublishQos;
 16 import org.xmlBlaster.client.qos.PublishReturnQos;
 17 import org.xmlBlaster.client.qos.SubscribeQos;
 18 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 19 import org.xmlBlaster.client.key.SubscribeKey;
 20 import org.xmlBlaster.util.MsgUnit;
 21 import org.xmlBlaster.util.def.Constants;
 22 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 23 import org.xmlBlaster.test.Util;
 24 import org.xmlBlaster.test.Msg;
 25 import org.xmlBlaster.test.MsgInterceptor;
 26 
 27 import junit.framework.*;
 28 
 29 
 30 /**
 31  * This client tests the
 32  * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html">dispatch.control.plugin requirement</a>
 33  * <p />
 34  * We start our own xmlBlaster server in a thread.
 35  * This client may be invoked multiple time on the same xmlBlaster server,
 36  * as it cleans up everything after his tests are done.
 37  * <p>
 38  * Invoke examples:<br />
 39  * <pre>
 40  *    java junit.textui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin
 41  *    java junit.swingui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin
 42  * </pre>
 43  * @see org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin
 44  */
 45 public class TestPriorizedDispatchPlugin extends TestCase {
 46    private Global glob;
 47    private static Logger log = Logger.getLogger(TestPriorizedDispatchPlugin.class.getName());
 48 
 49    private I_XmlBlasterAccess con = null;
 50    private String name;
 51    private String passwd = "secret";
 52    private EmbeddedXmlBlaster serverThread;
 53    private int serverPort = 9560;
 54    private boolean startEmbedded = true;
 55    private MsgInterceptor update; // collects updated messages
 56 
 57    private final String msgOid = "dispatchTestMessage";
 58 
 59    private int msgSequenceNumber = 0;
 60 
 61    private String statusOid = "_bandwidth.status";
 62    private String NORMAL_LINE = "2M";
 63    private String BACKUP_LINE = "64k";
 64    private String DEAD_LINE = "DOWN";
 65 
 66    private String[] states = { NORMAL_LINE, BACKUP_LINE, DEAD_LINE };
 67    private String[][] expectedActions = { 
 68       {"send", "send", "send", "send", "send", "send", "send", "send", "send", "send"},
 69       {"destroy", "destroy", "destroy", "destroy", "queue", "queue", "queue,notifySender", "send", "send", "send"},
 70       {"destroy", "destroy", "destroy", "destroy", "queue", "queue", "queue", "queue", "queue", "queue"}
 71     };
 72 
 73    /**
 74     * Constructs the TestPriorizedDispatchPlugin object.
 75     * <p />
 76     * @param testName   The name used in the test suite
 77     * @param name       The name to login to the xmlBlaster
 78     */
 79    public TestPriorizedDispatchPlugin(Global glob, String testName, String name) {
 80       super(testName);
 81       this.glob = glob;
 82 
 83       this.name = name;
 84    }
 85 
 86    /**
 87     * Sets up the fixture.
 88     * <p />
 89     * We start an own xmlBlaster server in a separate thread,
 90     * it is configured to load our demo dispatch plugin.
 91     * <p />
 92     * Then we connect as a client
 93     */
 94    protected void setUp() {
 95       //Global embeddedGlobal = glob.getClone(null);  
 96       this.startEmbedded = glob.getProperty().get("startEmbedded", this.startEmbedded);
 97       // We register here the demo plugin with xmlBlaster server, supplying an argument to the plugin
 98       String[] args = {
 99         "-DispatchPlugin[Priority][1.0]", "org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin",
100         "-DispatchPlugin/defaultPlugin", "undef", 
101         "-PriorizedDispatchPlugin/user", "_PriorizedDispatchPlugin",
102         "-"+org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY, //"-PriorizedDispatchPlugin/config", 
103             "<msgDispatch defaultStatus='" + BACKUP_LINE + "' defaultAction='send'>\n"+
104             "  <onStatus oid='" + statusOid + "' content='" + NORMAL_LINE + "' defaultAction='send'>\n" +
105             //"    <action do='send'  ifPriority='0-9'/>\n" +
106             "  </onStatus>\n" +
107             "  <onStatus oid='" + statusOid + "' content='" + BACKUP_LINE + "' defaultAction='send'>\n" +
108             "     <action do='send'  ifPriority='7'/>\n" +
109             "     <action do='queue,notifySender'  ifPriority='6'/>\n" +
110             "     <action do='queue'  ifPriority='4-5'/>\n" +
111             "     <action do='destroy'  ifPriority='0-3'/>\n" +
112             "  </onStatus>\n" +
113             "  <onStatus oid='" + statusOid + "' content='" + DEAD_LINE + "' defaultAction='queue'>\n" +
114             "    <action do='destroy'  ifPriority='0-3'/>\n" +
115             "  </onStatus>\n" +
116             "</msgDispatch>\n"
117          };
118       glob.init(args);
119 
120       if (this.startEmbedded) {
121          glob.init(Util.getOtherServerPorts(serverPort));
122          serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
123          log.info("XmlBlaster is ready for testing the priority dispatch plugin");
124       }
125 
126       try {
127          log.info("Connecting ...");
128          this.con = glob.getXmlBlasterAccess();
129 
130          // Activate plugin for callback only:
131          ConnectQos qos = new ConnectQos(glob, name, passwd);
132          CallbackAddress cbAddress = new CallbackAddress(glob);
133          cbAddress.setDispatchPlugin("Priority,1.0");
134          qos.addCallbackAddress(cbAddress);
135 
136          this.update = new MsgInterceptor(glob, log, null);
137          this.con.connect(qos, update);
138       }
139       catch (Exception e) {
140          Thread.dumpStack();
141          log.severe("Can't connect to xmlBlaster: " + e.toString());
142       }
143 
144       this.update.clear();
145    }
146 
147    /**
148     * @param The oid of the status message 
149     * @param state Choose one of "2M" or "64k"
150     */
151    private void changeStatus(String oid, String state) {
152       log.info("Changing band width state to '" + state + "'");
153       try {
154          PublishReturnQos rq = con.publish(new MsgUnit(glob, "<key oid='" + oid + "'/>", state, null));
155          log.info("SUCCESS for state change to '" + state + "', " + rq.getState());
156          // Sleep to be shure the plugin has got and processed the message
157          try { Thread.sleep(1000L); } catch( InterruptedException i) {}
158       } catch(XmlBlasterException e) {
159          log.warning("XmlBlasterException: " + e.getMessage());
160          fail("publish bandwidth state - XmlBlasterException: " + e.getMessage());
161       }
162    }
163 
164    private void publish(String oid, int priority) {
165       PriorityEnum prio = PriorityEnum.toPriorityEnum(priority);
166       try {
167          msgSequenceNumber++;
168          String content = "" + msgSequenceNumber;
169          PublishQos pq = new PublishQos(glob);
170          pq.setPriority(prio);
171          PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", content.getBytes(), pq.toXml()));
172          log.info("SUCCESS publish '" + oid + "' with prio=" + prio.toString() + " content=" + content + " returned state=" + rq.getState());
173          assertEquals("Returned oid wrong", oid, rq.getKeyOid());
174          assertEquals("Return not OK", Constants.STATE_OK, rq.getState());
175       } catch(XmlBlasterException e) {
176          log.warning("XmlBlasterException: " + e.getMessage());
177          fail("publish prio=" + prio.toString() + " - XmlBlasterException: " + e.getMessage());
178       }
179    }
180 
181    private void subscribe(String oid) {
182       try {
183          SubscribeKey sk = new SubscribeKey(glob, oid);
184          SubscribeQos sq = new SubscribeQos(glob);
185          SubscribeReturnQos srq = con.subscribe(sk.toXml(), sq.toXml());
186          log.info("SUCCESS subscribe to '" + oid + "' returned state=" + srq.getState());
187       } catch(XmlBlasterException e) {
188          log.warning("XmlBlasterException: " + e.getMessage());
189          fail("subscribe - XmlBlasterException: " + e.getMessage());
190       }
191    }
192 
193    /**
194     * Test all tuples of possibilities
195     */
196    public void testPriorizedDispatchPlugin() {
197       log.info("testPriorizedDispatchPlugin() ...");
198       long sleep = 1000L;
199       String text;
200 
201       subscribe(msgOid);
202 
203       int queueCounter = 0;
204       int destroyCounter = 0;
205 
206       try {
207          for (int i=0; i<states.length; i++) {
208             changeStatus(statusOid, states[i]);
209             log.info("========================state=" + states[i]);
210             for (int priority=0; priority<expectedActions[i].length; priority++) {
211                String action = expectedActions[i][priority];
212                text = "state=" + states[i] + " action=" + action;
213                log.info("Doing " + text + " queueCounter=" + queueCounter);
214 
215                boolean expectsNotify = false;
216                if (action.indexOf("notifySender") >= 0) {
217                   expectsNotify = true;
218                   log.info(text + ": Expecting notify");
219                }
220 
221                if (action.startsWith("send")) {
222                   publish(msgOid, priority);
223                   assertEquals(text, 1, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
224                   int count = expectsNotify ? 2 : 1;
225                   assertEquals(text, count, this.update.count());
226                   if (expectsNotify) {
227                      String expectedState = "send,notifySender";
228                      Msg msg = this.update.getMsg(msgOid, expectedState); // PtP notification
229                      assertTrue("send,notifySender PtP not arrived", msg != null);
230                   }
231                }
232                else if (action.startsWith("queue")) {
233                   publish(msgOid, priority);
234                   queueCounter++;
235                   assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
236                   int count = expectsNotify ? 1 : 0;
237                   assertEquals(text, count, this.update.count());
238                   if (expectsNotify) {
239                      assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification
240                   }
241                }
242                else if (action.startsWith("destroy")) {
243                   publish(msgOid, priority);
244                   destroyCounter++;
245                   assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
246                   int count = expectsNotify ? 1 : 0;
247                   assertEquals(text, count, this.update.count());
248                   if (expectsNotify) {
249                      assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification
250                   }
251                }
252                else {
253                   log.severe(text + ": Action is not supported");
254                   fail(text + ": Action is not supported");
255                }
256 
257                this.update.clear();
258             } // for prio
259          } // for states
260 
261          text = "Checking ascending sequence of flushed " + queueCounter + " messages which where hold back";
262          this.update.clear();
263          changeStatus(statusOid, NORMAL_LINE);
264          assertEquals(text, queueCounter, this.update.waitOnUpdate(2000L, msgOid, Constants.STATE_OK));
265          assertEquals(text, queueCounter, this.update.count());
266          Msg[] msgArr = this.update.getMsgs();
267          assertEquals(text, queueCounter, msgArr.length);
268          int lastNum = -1;
269          int lastPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;
270          for (int i=0; i<msgArr.length; i++) {
271             log.info("Received flushed hold back message " + msgArr[i].getUpdateKey().getOid() + 
272                          " priority=" + msgArr[i].getUpdateQos().getPriority() +
273                          " content=" + msgArr[i].getContentStr() +
274                          " state=" + msgArr[i].getUpdateQos().getState());
275          }
276          for (int i=0; i<msgArr.length; i++) {
277             int currPrio = msgArr[i].getUpdateQos().getPriority().getInt();
278             int currNum = msgArr[i].getContentInt();
279             if (lastPrio < currPrio || lastPrio == currPrio && lastNum >= currNum)
280                fail(text + " Sequence is not ascending: last=" + lastNum + " curr=" + currNum);
281             lastNum = currNum;
282             lastPrio = currPrio;
283          }
284          this.update.clear();
285       }
286       catch (XmlBlasterException e) {
287          fail(e.toString());
288       }
289       log.info("Success in testPriorizedDispatchPlugin()");
290    }
291 
292    /**
293     * Tests to change the plugin configuration and different status message oids. 
294     */
295    public void testPriorizedDispatchPluginReconfigure() {
296       log.info("testPriorizedDispatchPluginReconfigure() ...");
297       String statusOid2 = statusOid+"-2";
298       String config = 
299             "<msgDispatch defaultStatus='GO' defaultAction='send'>\n"+
300             "  <onStatus oid='" + statusOid + "' content='GO' defaultAction='send'>\n" +
301             "    <action do='send'  ifPriority='0-9'/>\n" +
302             "  </onStatus>\n" +
303             "  <onStatus oid='" + statusOid2 + "' content='" + BACKUP_LINE + "' defaultAction='send'>\n" +
304             "     <action do='queue'  ifPriority='0-9'/>\n" +
305             "  </onStatus>\n" +
306             "</msgDispatch>\n";
307 
308       publishNewConfig(config);
309 
310       String text = "Testing configuration";
311 
312       long sleep = 2000L;
313 
314       //try {
315          subscribe(msgOid);
316 
317          int maxPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;
318 
319          // check normal operation
320          changeStatus(statusOid, "GO");
321          for (int priority=0; priority < maxPrio; priority++) {
322             publish(msgOid, priority);
323          }
324          assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
325          log.info("SUCCESS, state=GO");
326          this.update.clear();
327 
328          // queue messages
329          changeStatus(statusOid2, BACKUP_LINE);
330          for (int priority=0; priority < maxPrio; priority++) {
331             publish(msgOid, priority);
332          }
333          assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
334          log.info("SUCCESS, state=" + BACKUP_LINE);
335          this.update.clear();
336 
337          // flush the before queued messages
338          changeStatus(statusOid, "GO");
339          assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
340          log.info("SUCCESS, state=GO");
341          this.update.clear();
342 
343          // check unkown message content
344          changeStatus(statusOid, "??YYXX");
345          for (int priority=0; priority < maxPrio; priority++) {
346             publish(msgOid, priority);
347          }
348          assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
349          log.info("SUCCESS, state=GO");
350          this.update.clear();
351          /*
352       }
353       catch (XmlBlasterException e) {
354          fail(e.toString());
355       }    */
356       log.info("Success in testPriorizedDispatchPluginReconfigure()");
357    }
358 
359    /**
360     * Change the configuration of the plugin
361     */
362    private void publishNewConfig(String config) {
363       String configKey = org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY; // "PriorizedDispatchPlugin/config"
364       try {
365          String oid = "__cmd:sysprop/?" + configKey;
366          String contentStr = config;
367          PublishQos pq = new PublishQos(glob);
368          PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", contentStr.getBytes(), pq.toXml()));
369          log.info("SUCCESS publish new configuration '" + oid + "' returned state=" + rq.getState());
370          assertEquals("Returned oid wrong", oid, rq.getKeyOid());
371          assertEquals("Return not OK", Constants.STATE_OK, rq.getState());
372       } catch(XmlBlasterException e) {
373          log.warning("XmlBlasterException: " + e.toString());
374          fail("publish of configuration data - XmlBlasterException: " + e.getMessage());
375       }
376    }
377 
378    /**
379     * Test the notifySender message
380     * 1. subscribe to a message
381     * 2. change state to 64k
382     * 3. send a message with prio 6 which should trigger a notify PtP message
383     */
384    public void testPriorizedDispatchPluginOne() {
385       log.info("testPriorizedDispatchPluginOne() ...");
386 
387       long sleep = 2000L;
388       String text = "state=" + BACKUP_LINE + " action=queue,notifySender";
389 
390       // <action do='queue,notifySender'  ifPriority='6'/>
391 
392       subscribe(msgOid);
393 
394       changeStatus(statusOid, BACKUP_LINE);
395       try { Thread.sleep(1000L); } catch( InterruptedException i) {} // Wait some time
396 
397       int priority = 6;
398       log.info(text + ": Expecting notify");
399 
400       this.update.clear();
401       publish(msgOid, priority);
402       assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
403       assertEquals(text, 1, this.update.count());
404       assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification
405 
406       this.update.clear();
407 
408       changeStatus(statusOid, NORMAL_LINE);
409       log.info(text + ": Expecting queued message");
410       assertEquals(text, 1, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
411 
412       log.info("Success in testPriorizedDispatchPluginOne()");
413    }
414 
415    /**
416     * Tears down the fixture.
417     * <p />
418     * cleaning up .... erase() the previous message OID and logout
419     */
420    protected void tearDown() {
421       try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait some time
422 
423       this.con.disconnect(null);
424       this.con = null;
425 
426       if (this.startEmbedded) {
427          try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
428          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
429          this.serverThread = null;
430       }
431 
432       // reset to default server port (necessary if other tests follow in the same JVM).
433       Util.resetPorts(glob);
434       this.glob = null;
435      
436       this.con = null;
437       this.update = null;
438       Global.instance().shutdown();
439    }
440 
441    /**
442     * Method is used by TestRunner to load these tests
443     */
444    public static Test suite() {
445        TestSuite suite= new TestSuite();
446        suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPluginOne", "PriorizedDispatchPluginOne"));
447        suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPlugin", "PriorizedDispatchPlugin"));
448        suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPluginReconfigure", "PriorizedDispatchPluginRecovery"));
449        return suite;
450    }
451 
452    /**
453     * Invoke: 
454     * <pre>
455     *  java org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin  -logging/org.xmlBlaster.engine.dispatch FINE -logging/org.xmlBlaster.util.dispatch FINE -logging/org.xmlBlaster.engine FINEST
456     *  java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin
457     * <pre>
458     */
459    public static void main(String args[]) {
460       Global glob = new Global();
461       if (glob.init(args) != 0) {
462          System.exit(0);
463       }
464       TestPriorizedDispatchPlugin testSub = new TestPriorizedDispatchPlugin(glob, "TestPriorizedDispatchPlugin", "TestPriorizedDispatchPlugin");
465       testSub.setUp();
466       testSub.testPriorizedDispatchPlugin();
467       //testSub.testPriorizedDispatchPluginReconfigure();
468       //testSub.testPriorizedDispatchPluginOne();
469       testSub.tearDown();
470    }
471 }


syntax highlighted by Code2HTML, v. 0.9.1