1 package org.xmlBlaster.test.cluster;
  2 
  3 import java.util.logging.Logger;
  4 import java.util.logging.Level;
  5 import org.xmlBlaster.util.Global;
  6 
  7 // for client connections:
  8 import org.xmlBlaster.util.*;
  9 import org.xmlBlaster.client.I_Callback;
 10 import org.xmlBlaster.client.key.PublishKey;
 11 import org.xmlBlaster.client.key.EraseKey;
 12 import org.xmlBlaster.client.key.GetKey;
 13 import org.xmlBlaster.client.key.SubscribeKey;
 14 import org.xmlBlaster.client.key.UnSubscribeKey;
 15 import org.xmlBlaster.client.key.UpdateKey;
 16 import org.xmlBlaster.client.qos.PublishQos;
 17 import org.xmlBlaster.client.qos.PublishReturnQos;
 18 import org.xmlBlaster.client.qos.UpdateQos;
 19 import org.xmlBlaster.client.qos.SubscribeQos;
 20 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 21 import org.xmlBlaster.client.qos.UnSubscribeQos;
 22 import org.xmlBlaster.client.qos.EraseQos;
 23 import org.xmlBlaster.client.I_XmlBlasterAccess;
 24 import org.xmlBlaster.util.MsgUnit;
 25 
 26 
 27 import java.util.Vector;
 28 import java.io.File;
 29 
 30 import junit.framework.*;
 31 
 32 /**
 33  * Test publishing a message from bilbo to heron. 
 34  * <p />
 35  * <pre>
 36  * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.PublishTest
 37  * </pre>
 38  * NOTE: asserts() in update() methods are routed back to server and are not handled
 39  *       by the junit testsuite, so we check double (see code).
 40  * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">Cluster requirement</a>
 41  */
 42 public class PublishTest extends TestCase {
 43    private String ME = "PublishTest";
 44    private Global glob;
 45    private static Logger log = Logger.getLogger(PublishTest.class.getName());
 46    private ServerHelper serverHelper;
 47 
 48    private I_XmlBlasterAccess heronCon, avalonCon, golanCon, frodoCon, bilboCon;
 49 
 50    private int updateCounterHeron = 0;
 51    private int updateCounterFrodo = 0;
 52    private int updateCounterBilbo = 0;
 53    private String oid = "PublishToBilbo";
 54    private String domain = "RUGBY_NEWS"; // heron is master for RUGBY_NEWS
 55    private String contentStr = "We win";
 56 
 57    private String assertInUpdate = null;
 58 
 59    public PublishTest(String name) {
 60       super(name);
 61       this.glob = new Global(null, true, false);
 62    }
 63 
 64    /**
 65     * Initialize the test ...
 66     */
 67    protected void setUp() {
 68 
 69       log.info("Entering setUp(), test starts");
 70 
 71       serverHelper = new ServerHelper(glob, log, ME);
 72 
 73       // Starts a cluster node
 74       serverHelper.startHeron();
 75       //serverHelper.startAvalon();
 76       //serverHelper.startGolan();
 77       serverHelper.startFrodo();
 78       serverHelper.startBilbo();
 79    }
 80 
 81    /**
 82     * cleaning up ...
 83     */
 84    protected void tearDown() {
 85       log.info("Entering tearDown(), test is finished");
 86       try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
 87 
 88       if (bilboCon != null) { bilboCon.disconnect(null); bilboCon = null; }
 89       if (frodoCon != null) { frodoCon.disconnect(null); frodoCon = null; }
 90       if (golanCon != null) { golanCon.disconnect(null); golanCon = null; }
 91       if (avalonCon != null) { avalonCon.disconnect(null); avalonCon = null; }
 92       if (heronCon != null) { heronCon.disconnect(null); heronCon = null; }
 93 
 94       serverHelper.tearDown();
 95    }
 96 
 97    /**
 98     * We start all nodes as described in requirement
 99     * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
100     * publish a message to bilbo which should be routed to heron.
101     * Than we try to access the message at heron
102     */ 
103    public void testPublish() {
104       System.err.println("***PublishTest: Publish a message to a cluster slave ...");
105       try {
106          bilboCon = serverHelper.connect(serverHelper.getBilboGlob(), new I_Callback() {  // Login to xmlBlaster, register for updates
107                public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
108                   assertInUpdate = serverHelper.getBilboGlob().getId() + ": Should not receive the message '" + updateKey.getOid() + "'";
109                   fail(assertInUpdate); // This is routed to server, not to junit
110                   return "";
111                }
112             });
113          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
114          assertTrue(assertInUpdate, assertInUpdate == null);
115          assertInUpdate = null;
116 
117          PublishKey pk = new PublishKey(glob, oid, "text/plain", "1.0");
118          pk.setDomain(domain);
119          PublishQos pq = new PublishQos(glob);
120          MsgUnit msgUnit = new MsgUnit(pk, contentStr, pq);
121          PublishReturnQos prq = bilboCon.publish(msgUnit);
122          log.info("Published message of domain='" + pk.getDomain() + "' and content='" + contentStr +
123                                     "' to xmlBlaster node bilbo with IP=" + serverHelper.getBilboGlob().getProperty().get("bootstrapPort",0) +
124                                     ", the returned QoS is: " + prq.getKeyOid());
125 
126          heronCon = serverHelper.connect(serverHelper.getHeronGlob(), new I_Callback() {  // Login to xmlBlaster, register for updates
127                public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
128                   log.severe("Receive message '" + updateKey.getOid() + "'");
129                   assertInUpdate = serverHelper.getHeronGlob().getId() + ": Did not expect message update in default handler";
130                   fail(assertInUpdate); // This is routed to server, not to junit
131                   return "";
132                }
133             });
134          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
135          assertTrue(assertInUpdate, assertInUpdate == null);
136          assertInUpdate = null;
137 
138          System.err.println("->Check if the message has reached the master node heron ...");
139          GetKey gk = new GetKey(glob, oid);
140          MsgUnit[] msgs = heronCon.get(gk.toXml(), null);
141          assertTrue("Invalid msgs returned", msgs != null);
142          assertEquals("Invalid number of messages returned", 1, msgs.length);
143          assertTrue("Invalid message oid returned", msgs[0].getKey().indexOf(oid) > 0);
144          log.info("SUCCESS: Got message:" + msgs[0].getKey());
145 
146          System.err.println("->Check if the message is available at the slave node bilbo ...");
147          gk = new GetKey(glob, oid);
148          gk.setDomain(domain);
149          msgs = bilboCon.get(gk.toXml(), null);
150          assertTrue("Invalid msgs returned", msgs != null);
151          assertEquals("Invalid number of messages returned", 1, msgs.length);
152          log.info("SUCCESS: Got message:" + msgs[0].getKey());
153 
154          System.err.println("->Trying to erase the message at the slave node ...");
155          EraseKey ek = new EraseKey(glob, oid);
156          ek.setDomain(domain);
157          EraseQos eq = new EraseQos(glob);
158          bilboCon.erase(ek.toXml(), eq.toXml());
159 
160          // Check if erased ...
161          gk = new GetKey(glob, oid);
162          msgs = heronCon.get(gk.toXml(), null);
163          assertTrue("Invalid msgs returned", msgs != null);
164          assertEquals("Invalid number of messages returned", 0, msgs.length);
165          log.info("SUCCESS: Got no message after erase");
166 
167          System.err.println("***PublishTest: Publish a message to a cluster slave - frodo is offline ...");
168 
169          System.err.println("->Subscribe from heron, the message is currently erased ...");
170          SubscribeKey sk = new SubscribeKey(glob, oid);
171          sk.setDomain(domain);
172          SubscribeQos sq = new SubscribeQos(glob);
173          SubscribeReturnQos srq = heronCon.subscribe(sk.toXml(), sq.toXml(), new I_Callback() {
174             public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
175                assertInUpdate = serverHelper.getHeronGlob().getId() + ": Receiving unexpected asynchronous update message";
176                assertEquals(assertInUpdate, oid, updateKey.getOid());
177                assertInUpdate = serverHelper.getHeronGlob().getId() + ": Receiving corrupted asynchronous update message";
178                assertEquals(assertInUpdate, contentStr, new String(content));
179                log.info("heronCon - Receiving asynchronous message '" + updateKey.getOid() + "' in " + oid + " handler, state=" + updateQos.getState());
180                updateCounterHeron++;
181                assertInUpdate = null;
182                return "";
183             }
184          });  // subscribe with our specific update handler
185          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
186          assertTrue(assertInUpdate, assertInUpdate == null);
187          assertInUpdate = null;
188 
189          serverHelper.stopFrodo();
190 
191          System.err.println("->Check: heron hasn't got the message ...");
192          gk = new GetKey(glob, oid);
193          msgs = heronCon.get(gk.toXml(), null);
194          assertTrue("Invalid msgs returned", msgs != null);
195          assertEquals("Invalid number of messages returned", 0, msgs.length);
196          log.info("SUCCESS: Got no message after erase");
197 
198          // publish again ...
199          pk = new PublishKey(glob, oid, "text/plain", "1.0");
200          pk.setDomain(domain);
201          pq = new PublishQos(glob);
202          msgUnit = new MsgUnit(pk.toXml(), contentStr.getBytes(), pq.toXml());
203          prq = bilboCon.publish(msgUnit);
204          log.info("Published message of domain='" + pk.getDomain() + "' and content='" + contentStr +
205                                     "' to xmlBlaster node bilbo with IP=" + serverHelper.getBilboGlob().getProperty().get("bootstrapPort",0) +
206                                     ", the returned QoS is: " + prq.getKeyOid());
207 
208 
209          assertEquals("heron is not reachable, publish should not have come through", 0, updateCounterHeron);
210 
211          serverHelper.startFrodo();
212 
213          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
214          assertEquals("heron has not received message", 1, updateCounterHeron);
215          updateCounterHeron = 0;
216 
217          System.err.println("->Connect to frodo ...");
218          frodoCon = serverHelper.connect(serverHelper.getFrodoGlob(), new I_Callback() {
219                public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
220                   assertInUpdate = serverHelper.getFrodoGlob().getId() + ": Receive unexpected message '" + updateKey.getOid() + "'";
221                   return "";
222                }
223             });
224          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
225          assertTrue(assertInUpdate, assertInUpdate == null);
226          assertInUpdate = null;
227 
228          System.err.println("->Subscribe from frodo, is he able to organize it?");
229          sk = new SubscribeKey(glob, oid);
230          sk.setDomain(domain);
231          sq = new SubscribeQos(glob);
232          srq = frodoCon.subscribe(sk.toXml(), sq.toXml(), new I_Callback() {
233             public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
234                log.info("frodoCon - Receiving asynchronous message '" + updateKey.getOid() + "' in " + oid + " handler, state=" + updateQos.getState());
235                updateCounterFrodo++;
236                assertInUpdate = null;
237                return "";
238             }
239          });  // subscribe with our specific update handler
240          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
241          assertTrue(assertInUpdate, assertInUpdate == null);
242          assertInUpdate = null;
243 
244          try { Thread.sleep(5000); } catch( InterruptedException i) {} // Wait some time
245          assertEquals("frodo is reachable again, subscribe should work", 1, updateCounterFrodo);
246         
247          updateCounterHeron = 0;
248          updateCounterFrodo = 0;
249          updateCounterBilbo = 0;
250 
251          System.err.println("->Check unSubscribe from client frodo ...");
252          UnSubscribeKey uk = new UnSubscribeKey(glob, srq.getSubscriptionId());
253          UnSubscribeQos uq = new UnSubscribeQos(glob);
254          frodoCon.unSubscribe(uk.toXml(), uq.toXml());
255 
256          System.err.println("->Check publish, frodo should not get it ...");
257          pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
258          pq = new PublishQos(glob);
259          msgUnit = new MsgUnit(pk, contentStr, pq);
260          prq = frodoCon.publish(msgUnit);
261          log.info("Published message of domain='" + pk.getDomain() + "' and content='" + contentStr +
262                                     "' to xmlBlaster node frodo with IP=" + serverHelper.getFrodoGlob().getProperty().get("bootstrapPort",0) +
263                                     ", the returned QoS is: " + prq.getKeyOid());
264 
265          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
266          assertEquals("frodo is unSubscribed and should not receive message", 0, updateCounterFrodo);
267          assertEquals("heron has not received message", 1, updateCounterHeron);
268 
269 
270       }
271       catch (XmlBlasterException e) {
272          e.printStackTrace();
273          fail("PublishToBilbo-Exception: " + e.toString());
274       }
275       /* is done in tearDown
276       finally {
277          if (bilboCon != null) {
278             bilboCon.disconnect(null);
279             bilboCon = null;
280          }
281       }
282       */
283 
284       System.err.println("***PublishTest: testPublish [SUCCESS]");
285    }
286 
287    /**
288     * setUp() and tearDown() are ivoked between each test...() method
289     */
290     /*
291    public void testDummy() {
292       System.err.println("***PublishTest: testDummy [SUCCESS]");
293    }
294      */
295 }


syntax highlighted by Code2HTML, v. 0.9.1