1 package org.xmlBlaster.test.cluster;
  2 
  3 import java.util.logging.Logger;
  4 
  5 import junit.framework.TestCase;
  6 
  7 import org.xmlBlaster.client.I_Callback;
  8 import org.xmlBlaster.client.I_XmlBlasterAccess;
  9 import org.xmlBlaster.client.key.PublishKey;
 10 import org.xmlBlaster.client.key.UpdateKey;
 11 import org.xmlBlaster.client.qos.PublishQos;
 12 import org.xmlBlaster.client.qos.PublishReturnQos;
 13 import org.xmlBlaster.client.qos.UpdateQos;
 14 import org.xmlBlaster.util.Global;
 15 import org.xmlBlaster.util.MsgUnit;
 16 // for client connections:
 17 import org.xmlBlaster.util.SessionName;
 18 import org.xmlBlaster.util.XmlBlasterException;
 19 import org.xmlBlaster.util.qos.address.Destination;
 20 
 21 /**
 22  * Test publishing a message from bilbo to heron. 
 23  * <p />
 24  * <pre>
 25  * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.PtPTest
 26  * </pre>
 27  * NOTE: asserts() in update() methods are routed back to server and are not handled
 28  *       by the junit testsuite, so we check double (see code).
 29  * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">Cluster requirement</a>
 30  */
 31 public class PtPTest extends TestCase {
 32    private String ME = "PtPTest";
 33    private Global glob;
 34    private static Logger log = Logger.getLogger(PtPTest.class.getName());
 35    private ServerHelper serverHelper;
 36 
 37    private I_XmlBlasterAccess heronCon, avalonCon, golanCon, frodoCon, bilboCon;
 38 
 39    private int updateCounterHeron = 0;
 40    private String oid = "PublishToBilbo";
 41    private String contentStr = "We win";
 42 
 43    private String assertInUpdateHeron = null;
 44    private String assertInUpdateBilbo = null;
 45 
 46    public PtPTest(String name) {
 47       super(name);
 48       this.glob = new Global(null, true, false);
 49    }
 50 
 51    /**
 52     * Initialize the test ...
 53     */
 54    protected void setUp() {
 55 
 56       log.info("Entering setUp(), test starts");
 57 
 58       serverHelper = new ServerHelper(glob, log, ME);
 59 
 60       // Starts a cluster node
 61       serverHelper.startHeron();
 62       //serverHelper.startAvalon();
 63       //serverHelper.startGolan();
 64       serverHelper.startFrodo();
 65       serverHelper.startBilbo();
 66    }
 67 
 68    /**
 69     * cleaning up ...
 70     */
 71    protected void tearDown() {
 72       log.info("Entering tearDown(), test is finished");
 73       try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
 74 
 75       if (bilboCon != null) { bilboCon.disconnect(null); bilboCon = null; }
 76       if (frodoCon != null) { frodoCon.disconnect(null); frodoCon = null; }
 77       if (golanCon != null) { golanCon.disconnect(null); golanCon = null; }
 78       if (avalonCon != null) { avalonCon.disconnect(null); avalonCon = null; }
 79       if (heronCon != null) { heronCon.disconnect(null); heronCon = null; }
 80 
 81       serverHelper.tearDown();
 82    }
 83 
 84    /**
 85     * We start bilbo, frodo and heron nodes as described in requirement
 86     * PtP messages are routed from ClientTo[bilbo] -> bilbo -> frodo -> heron -> ClientTo[heron]
 87     * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
 88     * publish a message to bilbo which should be routed to client XX which is logged in to heron.
 89     */ 
 90    public void testPublishPtP() {
 91       System.err.println("***PtPTest: Publish a message to a cluster slave ...");
 92       try {
 93          log.info("Login to heron and wait for PtP message ...");
 94          heronCon = serverHelper.connect(serverHelper.getHeronGlob(), new I_Callback() {  // Login to xmlBlaster, register for updates
 95                public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
 96                   log.info("Received message '" + updateKey.getOid() + "' state=" +
 97                            updateQos.getState() + " from '" + updateQos.getSender() + "'");
 98                   if (!updateQos.getSender().equalsAbsolute(bilboCon.getConnectReturnQos().getSessionName())) {
 99                      assertInUpdateHeron = serverHelper.getHeronGlob().getId() + ": Did not expect message update in default handler";
100                   }
101                   updateCounterHeron++;
102                   return "";
103                }
104             });
105          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
106          assertTrue(assertInUpdateHeron, assertInUpdateHeron == null);
107          assertInUpdateHeron = null;
108 
109          log.info("Login to bilbo to send PtP message ...");
110          bilboCon = serverHelper.connect(serverHelper.getBilboGlob(), new I_Callback() {  // Login to xmlBlaster, register for updates
111                public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
112                   assertInUpdateBilbo = serverHelper.getBilboGlob().getId() + ": Should not receive the message '" + updateKey.getOid() + "'";
113                   fail(assertInUpdateBilbo); // This is routed to server, not to junit
114                   return "";
115                }
116             });
117          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
118          assertTrue(assertInUpdateBilbo, assertInUpdateBilbo == null);
119          assertInUpdateBilbo = null;
120 
121          int num = 5;
122          for (int i=0; i<num; i++) {
123             PublishKey pk = new PublishKey(glob, oid, "text/plain", "1.0");
124             pk.setDomain("RUGBY_NEWS"); // heron is master: need for routing as heron is not directly connected to us
125             PublishQos pq = new PublishQos(glob);
126             SessionName sessionName = heronCon.getConnectReturnQos().getSessionName(); // destination client
127             Destination destination = new Destination(sessionName);
128             destination.forceQueuing(true);
129             pq.addDestination(destination);
130             log.info("Sending PtP message '" + oid + "' from bilbo to '" + sessionName + "' :" + pq.toXml());
131             MsgUnit msgUnit = new MsgUnit(pk, (contentStr+"-"+i).getBytes(), pq);
132             PublishReturnQos prq = bilboCon.publish(msgUnit);
133             log.info("Published message to destination='" + sessionName +
134                                        "' content='" + (contentStr+"-"+i) +
135                                        "' to xmlBlaster node with IP=" + serverHelper.getBilboGlob().getProperty().get("bootstrapPort",0) +
136                                        ", the returned QoS is: " + prq.getKeyOid());
137          }
138 
139          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
140          assertTrue(assertInUpdateHeron, assertInUpdateHeron == null);
141          assertEquals("Heron client did not receive PtP message", num, updateCounterHeron);
142       }
143       catch (XmlBlasterException e) {
144          e.printStackTrace();
145          fail("PublishToBilbo-Exception: " + e.getMessage());
146       }
147 
148       System.err.println("***PtPTest: testPublishPtP [SUCCESS]");
149    }
150 
151    /**
152     * Invoke: 
153     * <pre>
154     *  java -Dtrace[cluster]=true -Dcall[cluster]=true -Dcall[core]=true org.xmlBlaster.test.cluster.PtPTest
155     *  java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.PtPTest
156     * <pre>
157     */
158    public static void main(String args[]) {
159       Global glob = new Global(null, true, false);
160       if (glob.init(args) != 0) {
161          System.exit(0);
162       }
163       PtPTest testSub = new PtPTest("PtPTest");
164       testSub.setUp();
165       testSub.testPublishPtP();
166       testSub.tearDown();
167    }
168 }


syntax highlighted by Code2HTML, v. 0.9.1