1 package org.xmlBlaster.test.cluster;
  2 
  3 import java.util.logging.Logger;
  4 
  5 import org.xmlBlaster.test.util.Client;
  6 import org.xmlBlaster.util.Global;
  7 
  8 // for client connections:
  9 import org.xmlBlaster.util.*;
 10 import org.xmlBlaster.client.I_Callback;
 11 import org.xmlBlaster.client.key.PublishKey;
 12 import org.xmlBlaster.client.key.EraseKey;
 13 import org.xmlBlaster.client.key.SubscribeKey;
 14 import org.xmlBlaster.client.key.UnSubscribeKey;
 15 import org.xmlBlaster.client.key.UpdateKey;
 16 import org.xmlBlaster.client.qos.PublishQos;
 17 import org.xmlBlaster.client.qos.PublishReturnQos;
 18 import org.xmlBlaster.client.qos.UpdateQos;
 19 import org.xmlBlaster.client.qos.SubscribeQos;
 20 import org.xmlBlaster.client.qos.UnSubscribeQos;
 21 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
 22 import org.xmlBlaster.client.qos.EraseQos;
 23 import org.xmlBlaster.client.qos.EraseReturnQos;
 24 import org.xmlBlaster.client.I_XmlBlasterAccess;
 25 import org.xmlBlaster.util.MsgUnit;
 26 
 27 import junit.framework.*;
 28 
 29 /**
 30  * Test publishing a message from bilbo to heron. 
 31  * <p />
 32  * <pre>
 33  * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.SubscribeTest
 34  * </pre>
 35  * NOTE: asserts() in update() methods are routed back to server and are not handled
 36  *       by the junit testsuite, so we check double (see code).
 37  * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">Cluster requirement</a>
 38  */
 39 public class SubscribeTest extends TestCase {
 40    private String ME = "SubscribeTest";
 41    private Global glob;
 42    private static Logger log = Logger.getLogger(SubscribeTest.class.getName());
 43    private ServerHelper serverHelper;
 44 
 45    private I_XmlBlasterAccess heronCon, avalonCon, golanCon, frodoCon, bilboCon, bilboCon2;
 46 
 47    private int updateCounterBilbo = 0;
 48    private int updateCounterBilbo2 = 0;
 49    private String oid = "SubscribeToBilbo";
 50    private String domain = "RUGBY_NEWS"; // heron is master for RUGBY_NEWS
 51    private String contentStr = "We win";
 52 
 53    public SubscribeTest(String name) {
 54       super(name);
 55       this.glob = new Global(null, true, false);
 56    }
 57 
 58    /**
 59     * Initialize the test ...
 60     */
 61    protected void setUp() {
 62 
 63       log.info("Entering setUp(), test starts");
 64 
 65       updateCounterBilbo = 0;
 66       updateCounterBilbo2 = 0;
 67 
 68 
 69       serverHelper = new ServerHelper(glob, log, ME);
 70 
 71       // Starts a cluster node
 72       serverHelper.startHeron();
 73       serverHelper.startAvalon();
 74       //serverHelper.startGolan();
 75       serverHelper.startFrodo();
 76       serverHelper.startBilbo();
 77    }
 78 
 79    /**
 80     * cleaning up ...
 81     */
 82    protected void tearDown() {
 83       log.info("Entering tearDown(), test is finished");
 84       try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
 85 
 86       if (bilboCon != null) { bilboCon.disconnect(null); bilboCon = null; }
 87       if (bilboCon2 != null) { bilboCon2.disconnect(null); bilboCon2 = null; }
 88       if (frodoCon != null) { frodoCon.disconnect(null); frodoCon = null; }
 89       if (golanCon != null) { golanCon.disconnect(null); golanCon = null; }
 90       if (avalonCon != null) { avalonCon.disconnect(null); avalonCon = null; }
 91       if (heronCon != null) { heronCon.disconnect(null); heronCon = null; }
 92 
 93       serverHelper.tearDown();
 94    }
 95 
 96    /**
 97     * We start all nodes as described in requirement
 98     * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
 99     * <p />
100     * - Subscribe to RUGBY messages from bilbo twice<br />
101     * - publish RUGBY messages to avalon (heron is the master)<br />
102     * - Kill bilbo, restart bilbo and check if we still get them
103     */ 
104    public void testSubscribeTwice() {
105       System.err.println("***SubscribeTest.testSubscribeTwice: Subscribe a message from a cluster slave ...");
106       try {
107          System.err.println("->Connect to avalon ...");
108          avalonCon = serverHelper.connect(serverHelper.getAvalonGlob(), null);
109 
110          {
111             System.err.println("->Connect to bilbo ...");
112             bilboCon = serverHelper.connect(serverHelper.getBilboGlob(), new I_Callback() {  // Login to xmlBlaster, register for updates
113                   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
114                      if (updateQos.isErased()) {
115                         log.info("Ignoring erase message");
116                         return "";
117                      }
118                      updateCounterBilbo++;
119                      log.info(
120                               "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo + " ...");
121                      assertEquals("Wrong message updated", oid, updateKey.getOid());
122                      return "";
123                   }
124                });
125 
126             System.err.println("->Subscribe from bilbo ...");
127             SubscribeKey sk = new SubscribeKey(glob, oid);
128             sk.setDomain(domain);
129             SubscribeQos sq = new SubscribeQos(glob);
130             bilboCon.subscribe(sk.toXml(), sq.toXml());
131          }
132 
133          {
134             System.err.println("->Connect to bilbo 2 ...");
135             final Global bilboGlob2 = serverHelper.getBilboGlob().getClone(null);
136             bilboCon2 = serverHelper.connect(bilboGlob2, new I_Callback() {  // Login to xmlBlaster, register for updates
137                   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
138                      if (updateQos.isErased()) {
139                         log.info("Ignoring erase message");
140                         return "";
141                      }
142                      updateCounterBilbo2++;
143                      log.info(
144                               "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo2 + " ...");
145                      assertEquals("#2 Wrong message updated", oid, updateKey.getOid());
146                      return "";
147                   }
148                });
149 
150             System.err.println("->Subscribe from bilbo 2 ...");
151             SubscribeKey sk = new SubscribeKey(glob, oid);
152             sk.setDomain(domain);
153             SubscribeQos sq = new SubscribeQos(glob);
154             bilboCon2.subscribe(sk.toXml(), sq.toXml());
155          }
156 
157          // First test subscribe ...
158          {
159             System.err.println("->Publish to avalon ...");
160             PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
161             PublishQos avalon_pq = new PublishQos(glob);
162             MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
163             PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
164             assertEquals("oid changed", oid, avalon_prq.getKeyOid());
165 
166 
167             try { Thread.sleep(2000); } catch( InterruptedException i) {}
168             if (1 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
169             assertEquals("message from avalon", 1, updateCounterBilbo);
170             if (1 != updateCounterBilbo2) log.severe("Did not expect " + updateCounterBilbo2 + " updates");
171             assertEquals("message from avalon #2", 1, updateCounterBilbo2);
172             updateCounterBilbo = 0;
173             updateCounterBilbo2 = 0;
174          }
175 
176          System.err.println("->testSubscribeTwice done, SUCCESS.");
177 
178          // ... and now test unSubscribe
179          {
180             System.err.println("->UnSubscribe from bilbo ...");
181             UnSubscribeKey usk = new UnSubscribeKey(glob, oid);
182             usk.setDomain(domain);
183             UnSubscribeQos usq = new UnSubscribeQos(glob);
184             UnSubscribeReturnQos[] usrq = bilboCon.unSubscribe(usk, usq);
185             assertEquals("", 1, usrq.length);
186 
187             System.err.println("->Publish to avalon ...");
188             PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
189             PublishQos avalon_pq = new PublishQos(glob);
190             MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
191             PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
192             assertEquals("oid changed", oid, avalon_prq.getKeyOid());
193 
194 
195             try { Thread.sleep(2000); } catch( InterruptedException i) {}
196             if (0 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
197             assertEquals("message from avalon", 0, updateCounterBilbo);
198             if (1 != updateCounterBilbo2) log.severe("Did not expect " + updateCounterBilbo2 + " updates");
199             assertEquals("message from avalon #2", 1, updateCounterBilbo2);
200             updateCounterBilbo = 0;
201             updateCounterBilbo2 = 0;
202          }
203 
204          System.err.println("->Trying to erase the message at the slave node ...");
205          EraseKey ek = new EraseKey(glob, oid);
206          ek.setDomain(domain);
207          EraseQos eq = new EraseQos(glob);
208          EraseReturnQos[] arr = avalonCon.erase(ek.toXml(), eq.toXml());
209          assertEquals("Erase", 1, arr.length);
210       }
211       catch (XmlBlasterException e) {
212          e.printStackTrace();
213          fail("SubscribeToBilbo-Exception: " + e.toString());
214       }
215       finally {
216          if (bilboCon != null) {
217             bilboCon.disconnect(null);
218             bilboCon = null;
219          }   
220          if (bilboCon2 != null) {
221             bilboCon2.disconnect(null);
222             bilboCon2 = null;
223          }   
224          if (avalonCon != null) {
225             avalonCon.disconnect(null);
226             avalonCon = null;
227          }
228       }
229 
230       System.err.println("***SubscribeTest.testSubscribeTwice: testSubscribeTwice [SUCCESS]");
231    }
232 
233    /**
234     * We start all nodes as described in requirement
235     * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
236     * <p />
237     * 1. publish RUGBY messages to avalon (heron is the master)<br />
238     * 2. Subscribe those messages from bilbo<br />
239     * 3. Kill bilbo, restart bilbo and check if we still get them
240     */ 
241    public void testSubscribe() {
242       System.err.println("***SubscribeTest: Subscribe a message from a cluster slave ...");
243 
244       int num = 2;
245       I_XmlBlasterAccess[] bilboCons = new I_XmlBlasterAccess[num];
246 
247       try {
248          System.err.println("->Connect to avalon ...");
249          avalonCon = serverHelper.connect(serverHelper.getAvalonGlob(), null);
250          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
251 
252          for (int ii=0; ii<num; ii++) {
253             System.err.println("->Connect to bilbo #" + ii + " ...");
254             final Global bilboGlobii = serverHelper.getBilboGlob().getClone(null);
255             bilboCons[ii] = serverHelper.connect(bilboGlobii, new I_Callback() {  // Login to xmlBlaster, register for updates
256                   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
257                      log.info(
258                               "Receiving update '" + updateKey.getOid() + "' state=" + updateQos.getState() + ", " + updateCounterBilbo + " ...");
259                      if (updateQos.isErased()) {
260                         log.info("Ignoring erase message");
261                         return "";
262                      }
263                      updateCounterBilbo++;
264                      log.info(
265                               "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo + " ...");
266                      assertEquals("Wrong message updated", oid, updateKey.getOid());
267                      return "";
268                   }
269                });
270 
271             System.err.println("->Publish to avalon #" + ii + " ...");
272             PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
273             PublishQos avalon_pq = new PublishQos(glob);
274             MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
275             PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
276             assertEquals("oid changed", oid, avalon_prq.getKeyOid());
277 
278             try { Thread.sleep(1000L); } catch( InterruptedException i) {}
279             
280             System.err.println("->Subscribe from bilbo #" + ii + ", the message from avalon should arrive ...");
281             SubscribeKey sk = new SubscribeKey(glob, oid);
282             sk.setDomain(domain);
283             SubscribeQos sq = new SubscribeQos(glob);
284             bilboCons[ii].subscribe(sk.toXml(), sq.toXml());
285 
286             waitOnUpdate(2000L, 1);
287             try { Thread.sleep(1000); } catch( InterruptedException i) {} // wait longer to check if too many arrive
288             if (1 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
289             assertEquals("message from avalon", 1, updateCounterBilbo);
290             updateCounterBilbo = 0;
291 
292             System.err.println("->Trying to erase the message at the slave node ...");
293             EraseKey ek = new EraseKey(glob, oid);
294             ek.setDomain(domain);
295             EraseQos eq = new EraseQos(glob);
296             EraseReturnQos[] arr = avalonCon.erase(ek.toXml(), eq.toXml());
297             assertEquals("Erase", 1, arr.length);
298 
299             // Wait on erase events
300             try { Thread.sleep(1000); } catch( InterruptedException i) {}
301             updateCounterBilbo = 0;
302             updateCounterBilbo2 = 0;
303 
304             // We stay logged in but kill over callback server ...
305             Client.shutdownCb(bilboCons[ii], Client.Shutdown.KEEP_LOGGED_IN);
306          }
307 
308          System.err.println("->testSubscribe done, SUCCESS.");
309       }
310       catch (XmlBlasterException e) {
311          e.printStackTrace();
312          fail("SubscribeToBilbo-Exception: " + e.toString());
313       }
314       finally {
315          for (int jj=0; jj<bilboCons.length; jj++) {
316             if (bilboCons[jj] != null) {
317                bilboCons[jj].disconnect(null);
318                bilboCons[jj] = null;
319             }
320          }
321          if (avalonCon != null) {
322             avalonCon.disconnect(null);
323             avalonCon = null;
324          }
325       }
326 
327       System.err.println("***SubscribeTest: testSubscribe [SUCCESS]");
328 
329    }
330 
331    private void waitOnUpdate(final long timeout, final int numWait) {
332       long pollingInterval = 50L;  // check every 0.05 seconds
333       if (timeout < 50)  pollingInterval = timeout / 10L;
334       long sum = 0L;
335       while (updateCounterBilbo < numWait) {
336          try {
337             Thread.sleep(pollingInterval);
338          }
339          catch( InterruptedException i)
340          {}
341          sum += pollingInterval;
342          if (sum > timeout) {
343             log.warning("Timeout of " + timeout + " occurred");
344             break;
345          }
346       }
347    }
348 }


syntax highlighted by Code2HTML, v. 0.9.1