1 /*------------------------------------------------------------------------------
  2 Name:      TestSynchronousCache.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.client;
  7 
  8 import java.util.logging.Logger;
  9 import org.xmlBlaster.util.Global;
 10 
 11 import org.xmlBlaster.client.qos.ConnectQos;
 12 import org.xmlBlaster.util.XmlBlasterException;
 13 import org.xmlBlaster.util.MsgUnit;
 14 import org.xmlBlaster.util.def.Constants;
 15 import org.xmlBlaster.client.key.PublishKey;
 16 import org.xmlBlaster.client.key.GetKey;
 17 import org.xmlBlaster.client.key.EraseKey;
 18 import org.xmlBlaster.client.qos.GetQos;
 19 import org.xmlBlaster.client.qos.GetReturnQos;
 20 import org.xmlBlaster.client.qos.PublishQos;
 21 import org.xmlBlaster.client.qos.PublishReturnQos;
 22 import org.xmlBlaster.client.qos.EraseQos;
 23 import org.xmlBlaster.client.qos.EraseReturnQos;
 24 import org.xmlBlaster.client.I_XmlBlasterAccess;
 25 import org.xmlBlaster.client.SynchronousCache;
 26 
 27 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 28 import org.xmlBlaster.test.Util;
 29 import org.xmlBlaster.test.MsgInterceptor;
 30 
 31 import junit.framework.*;
 32 
 33 
 34 /**
 35  * Here we test the client side synchronous cache for high performing getCached() invocations. 
 36  * <p>
 37  * </p>
 38  * <p>
 39  * Invoke examples:
 40  * </p>
 41  * <pre>
 42  *    java junit.textui.TestRunner org.xmlBlaster.test.client.TestSynchronousCache
 43  *
 44  *    java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestSynchronousCache
 45  * </pre>
 46  * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/client.cache.html">The client.cache requirement</a>
 47  * @see org.xmlBlaster.client.SynchronousCache
 48  */
 49 public class TestSynchronousCache extends TestCase {
 50    private Global glob;
 51    private static Logger log = Logger.getLogger(TestSynchronousCache.class.getName());
 52 
 53    private I_XmlBlasterAccess con = null;
 54    private MsgInterceptor updateInterceptor;
 55 
 56    private EmbeddedXmlBlaster serverThread;
 57    private int serverPort = 34576;
 58    private boolean startEmbedded = true;
 59    private SynchronousCache synchronousCache;
 60    private String[] publishOidArr = new String[] { "oid-0", "oid-1", "xx-oid-2" };
 61    private String[] contentArr = new String[] { "content-oid-0", "content-oid-1", "content-oid-2" };
 62 
 63    /**
 64     * Constructs the TestSynchronousCache object.
 65     * <p />
 66     * @param testName  The name used in the test suite
 67     * @param loginName The name to login to the xmlBlaster
 68     */
 69    public TestSynchronousCache(Global glob, String testName) {
 70       super(testName);
 71       this.glob = glob;
 72 
 73    }
 74 
 75    /**
 76     * Sets up the fixture.
 77     * <p />
 78     * Creates a CORBA connection and does a login.<br />
 79     * - One connection for the sender client<br />
 80     */
 81    protected void setUp() {
 82       this.startEmbedded = glob.getProperty().get("startEmbedded", this.startEmbedded);
 83       if (this.startEmbedded) {
 84          glob.init(Util.getOtherServerPorts(serverPort));
 85          String[] args = { };
 86          glob.init(args);
 87          serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
 88          log.info("XmlBlaster is ready for testing the client cache");
 89       }
 90 
 91       try {
 92          this.con = glob.getXmlBlasterAccess();
 93          this.synchronousCache = con.createSynchronousCache(100); // remember handle to check in this test
 94          ConnectQos connectQos = new ConnectQos(glob);
 95          this.updateInterceptor = new MsgInterceptor(glob,log, null);
 96          this.con.connect(connectQos, this.updateInterceptor);
 97       }
 98       catch (Exception e) {
 99           log.severe(e.toString());
100           e.printStackTrace();
101       }
102    }
103 
104    /**
105     * Tears down the fixture.
106     * <p />
107     * cleaning up .... logout
108     */
109    protected void tearDown() {
110       try { Thread.sleep(200L); } catch( InterruptedException i) {}   // Wait 200 milli seconds, until all updates are processed ...
111 
112       for (int i=0; i<publishOidArr.length; i++) {
113          // Erase if not all have been destroyed during test
114          sendErase(publishOidArr[i]);
115       }
116 
117       this.con.disconnect(null);
118       this.con = null;
119 
120       if (this.startEmbedded) {
121          try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
122          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
123          this.serverThread = null;
124       }
125 
126       // reset to default server port (necessary if other tests follow in the same JVM).
127       Util.resetPorts(glob);
128 
129       this.glob = null;
130      
131       this.updateInterceptor = null;
132       this.synchronousCache = null;
133    }
134 
135    public EraseReturnQos[] sendErase(String publishOid) {
136       log.info("Erasing topic '" + publishOid + "'");
137       try {
138          EraseQos eq = new EraseQos(glob);
139          // !!!! NOTE: if force destroy is true the erase event may not
140          // come through and the cache is not cleared !!! How to relove?
141          eq.setForceDestroy(false);
142          EraseKey ek = new EraseKey(glob, publishOid);
143          EraseReturnQos[] er = con.erase(ek, eq);
144          // Wait 200 milli seconds, until erase event is processed and cache is cleared ...
145          try { Thread.sleep(200L); } catch( InterruptedException i) {}
146          return er;
147       } catch(XmlBlasterException e) {
148          fail("Erase XmlBlasterException: " + e.getMessage());
149       }
150       return null;
151    }
152 
153    /**
154     * Publish an almost volatile message.
155     */
156    public PublishReturnQos publishMsg(String publishOid, String content) {
157       log.info("Sending a message '" + content + "'");
158       try {
159          // Publish a volatile message
160          PublishKey pk = new PublishKey(glob, publishOid, "text/xml", "1.0");
161          PublishQos pq = new PublishQos(glob);
162          MsgUnit msgUnit = new MsgUnit(pk, content, pq);
163          PublishReturnQos publishReturnQos = con.publish(msgUnit);
164          assertEquals("Retunred oid is invalid", publishOid, publishReturnQos.getKeyOid());
165          log.info("Sending of '" + content + "' done, returned oid=" + publishOid + " " + msgUnit.toXml());
166          return publishReturnQos;
167       } catch(XmlBlasterException e) {
168          log.severe("publish() XmlBlasterException: " + e.getMessage());
169          assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
170       }
171       return null; // never reached
172    }
173 
174    /**
175     * THIS IS THE TEST
176     * <p>
177     * We publish some messages and try cached access.
178     * </p>
179     */
180    public void testCachedAccess() {
181       {
182          log.info("Entering testCachedAccess ...");
183          try {
184             publishMsg(publishOidArr[0], contentArr[0]);
185             publishMsg(publishOidArr[2], contentArr[2]);
186             try { Thread.sleep(200L); } catch( InterruptedException i) {}   // Wait 200 milli seconds, until all updates are processed ...
187 
188             GetKey gk = new GetKey(glob, publishOidArr[0]);
189             GetQos gq = new GetQos(glob);
190 
191             for (int i=0; i<10; i++) {
192                MsgUnit[] msgs = con.getCached(gk, gq);
193                assertEquals(this.synchronousCache.toXml(""), 1, msgs.length);
194                GetReturnQos grq = new GetReturnQos(glob, msgs[0].getQos());
195                assertEquals("", 1, this.synchronousCache.getNumQueriesCached());
196                log.info("Accessed xmlBlaster message with content '" + new String(msgs[0].getContent()) +
197                               "' and status=" + grq.getState());
198             }
199 
200             sendErase(publishOidArr[0]);
201             assertEquals("", 0, this.synchronousCache.getNumQueriesCached());
202             sendErase(publishOidArr[2]);
203          }
204          catch (XmlBlasterException e) {
205             log.severe("testCachedAccess() failed: " + e.getMessage());
206             fail(e.getMessage());
207          }
208          assertEquals("Unexpected update arrived", 0, this.updateInterceptor.waitOnUpdate(1000L, 0));
209       }
210 
211       {
212          log.info("Entering testCachedAccess with updated MsgUnit ...");
213          try {
214             PublishReturnQos publishReturnQos = publishMsg(publishOidArr[0], contentArr[0]);
215 
216             GetKey gk = new GetKey(glob, publishOidArr[0]);
217             GetQos gq = new GetQos(glob);
218 
219             for (int i=0; i<5; i++) {
220                MsgUnit[] msgs = con.getCached(gk, gq);
221                GetReturnQos grq = new GetReturnQos(glob, msgs[0].getQos());
222                assertEquals(this.synchronousCache.toXml(""), 1, msgs.length);
223                assertEquals("", 1, this.synchronousCache.getNumQueriesCached());
224                assertEquals("", publishReturnQos.getRcvTimestamp(), grq.getRcvTimestamp());
225                assertEquals("", contentArr[0], msgs[0].getContentStr());
226                log.info("Accessed xmlBlaster message with content '" + new String(msgs[0].getContent()) +
227                               "' and status=" + grq.getState() + " rcv=" + grq.getRcvTimestamp());
228             }
229 
230             // Now publish again an check if cache is updated
231             String contentNew = contentArr[0]+"-NEW";
232             publishReturnQos = publishMsg(publishOidArr[0], contentNew);
233             try { Thread.sleep(200L); } catch( InterruptedException i) {}   // Wait 200 milli seconds, until all updates are processed ...
234             for (int i=0; i<5; i++) {
235                MsgUnit[] msgs = con.getCached(gk, gq);
236                GetReturnQos grq = new GetReturnQos(glob, msgs[0].getQos());
237                assertEquals(this.synchronousCache.toXml(""), 1, msgs.length);
238                assertEquals("", 1, this.synchronousCache.getNumQueriesCached());
239                assertEquals("", publishReturnQos.getRcvTimestamp().getTimestamp(), grq.getRcvTimestamp().getTimestamp());
240                assertEquals("", publishReturnQos.getKeyOid(), msgs[0].getKeyOid());
241                assertEquals("", contentNew, msgs[0].getContentStr());
242                log.info("Accessed xmlBlaster message with content '" + new String(msgs[0].getContent()) +
243                               "' and status=" + grq.getState() + " rcv=" + grq.getRcvTimestamp());
244             }
245 
246             sendErase(publishOidArr[0]);
247             assertEquals("", 0, this.synchronousCache.getNumQueriesCached());
248          }
249          catch (XmlBlasterException e) {
250             log.severe("testCachedAccess() failed: " + e.getMessage());
251             fail(e.getMessage());
252          }
253          assertEquals("Unexpected update arrived", 0, this.updateInterceptor.waitOnUpdate(1000L, 0));
254       }
255 
256       {
257          log.info("Entering testCachedAccess with XPATH ...");
258          try {
259             PublishReturnQos publishReturnQos0 = publishMsg(publishOidArr[0], contentArr[0]);
260             PublishReturnQos publishReturnQos1 = publishMsg(publishOidArr[1], contentArr[1]);
261             publishMsg(publishOidArr[2], contentArr[2]);
262             try { Thread.sleep(200L); } catch( InterruptedException i) {}   // Wait 200 milli seconds, until all updates are processed ...
263 
264             // This should match [0] and [1] msg:
265             GetKey gk = new GetKey(glob, "//key[starts-with(@oid,'oid-')]", Constants.XPATH);
266             GetQos gq = new GetQos(glob);
267 
268             for (int i=0; i<10; i++) {
269                MsgUnit[] msgs = con.getCached(gk, gq);
270                assertEquals("", 2, msgs.length);
271                GetReturnQos grq0 = new GetReturnQos(glob, msgs[0].getQos());
272                GetReturnQos grq1 = new GetReturnQos(glob, msgs[1].getQos());
273                assertEquals(this.synchronousCache.toXml(""), 2, msgs.length);
274                assertEquals(this.synchronousCache.toXml(""), 1, this.synchronousCache.getNumQueriesCached());
275                log.info(" publishReturnQos0.getRcvTimestamp()=" + publishReturnQos0.getRcvTimestamp() +
276                             " publishReturnQos1.getRcvTimestamp()=" + publishReturnQos1.getRcvTimestamp() +
277                             " grq0.getRcvTimestamp()=" + grq0.getRcvTimestamp() +
278                             " grq1.getRcvTimestamp()=" + grq1.getRcvTimestamp());
279                assertTrue("", publishReturnQos0.getRcvTimestamp().equals(grq0.getRcvTimestamp()) ||
280                               publishReturnQos0.getRcvTimestamp().equals(grq1.getRcvTimestamp()));
281                assertTrue("", publishReturnQos1.getRcvTimestamp().equals(grq0.getRcvTimestamp()) ||
282                               publishReturnQos1.getRcvTimestamp().equals(grq1.getRcvTimestamp()));
283                assertTrue("", !grq0.getRcvTimestamp().equals(grq1.getRcvTimestamp()));
284                assertEquals("", 2, msgs.length);
285                log.info("Accessed " + msgs.length + " xmlBlaster messages with content '" +
286                               new String(msgs[0].getContent()) +
287                               "' and '" + new String(msgs[1].getContent()) + "' and status=" + grq0.getState());
288             }
289 
290             log.info("Current cache:" + this.synchronousCache.toXml(""));
291             assertEquals("", 1, this.synchronousCache.getNumQueriesCached());
292             /*EraseReturnQos[] arr0 =*/ sendErase(publishOidArr[0]);
293             assertEquals("", 1, this.synchronousCache.getNumQueriesCached());
294             sendErase(publishOidArr[1]);
295             log.info("Current cache:" + this.synchronousCache.toXml(""));
296 
297             // The cache is not cleared automatically for XPATH, we do it manually
298             this.synchronousCache.removeEntryByQueryString(this.synchronousCache.getQueryString(gk));
299             log.info("Current cache:" + this.synchronousCache.toXml(""));
300             assertEquals("", 0, this.synchronousCache.getNumQueriesCached());
301             sendErase(publishOidArr[2]);
302             assertEquals("", 0, this.synchronousCache.getNumQueriesCached());
303          }
304          catch (XmlBlasterException e) {
305             log.severe("testCachedAccess() failed: " + e.getMessage());
306             fail(e.getMessage());
307          }
308          assertEquals("Unexpected update arrived", 0, this.updateInterceptor.waitOnUpdate(1000L, 0));
309       }
310       log.info("SUCCESS testCachedAccess");
311    }
312 
313    /**
314     * Method is used by TestRunner to load these tests
315     */
316    public static Test suite() {
317        TestSuite suite= new TestSuite();
318        suite.addTest(new TestSynchronousCache(new Global(), "testCachedAccess"));
319        return suite;
320    }
321 
322    /**
323     * Invoke: java org.xmlBlaster.test.client.TestSynchronousCache -startEmbedded false
324     */
325    public static void main(String args[]) {
326       TestSynchronousCache testSub = new TestSynchronousCache(new Global(args), "TestSynchronousCache");
327       testSub.setUp();
328       testSub.testCachedAccess();
329       testSub.tearDown();
330    }
331 }


syntax highlighted by Code2HTML, v. 0.9.1