1 /*------------------------------------------------------------------------------
2 Name: TestSynchronousCache.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.client;
7
8 import java.util.logging.Logger;
9 import org.xmlBlaster.util.Global;
10
11 import org.xmlBlaster.client.qos.ConnectQos;
12 import org.xmlBlaster.util.XmlBlasterException;
13 import org.xmlBlaster.util.MsgUnit;
14 import org.xmlBlaster.util.def.Constants;
15 import org.xmlBlaster.client.key.PublishKey;
16 import org.xmlBlaster.client.key.GetKey;
17 import org.xmlBlaster.client.key.EraseKey;
18 import org.xmlBlaster.client.qos.GetQos;
19 import org.xmlBlaster.client.qos.GetReturnQos;
20 import org.xmlBlaster.client.qos.PublishQos;
21 import org.xmlBlaster.client.qos.PublishReturnQos;
22 import org.xmlBlaster.client.qos.EraseQos;
23 import org.xmlBlaster.client.qos.EraseReturnQos;
24 import org.xmlBlaster.client.I_XmlBlasterAccess;
25 import org.xmlBlaster.client.SynchronousCache;
26
27 import org.xmlBlaster.util.EmbeddedXmlBlaster;
28 import org.xmlBlaster.test.Util;
29 import org.xmlBlaster.test.MsgInterceptor;
30
31 import junit.framework.*;
32
33
34 /**
35 * Here we test the client side synchronous cache for high performing getCached() invocations.
36 * <p>
37 * </p>
38 * <p>
39 * Invoke examples:
40 * </p>
41 * <pre>
42 * java junit.textui.TestRunner org.xmlBlaster.test.client.TestSynchronousCache
43 *
44 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestSynchronousCache
45 * </pre>
46 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/client.cache.html">The client.cache requirement</a>
47 * @see org.xmlBlaster.client.SynchronousCache
48 */
49 public class TestSynchronousCache extends TestCase {
50 private Global glob;
51 private static Logger log = Logger.getLogger(TestSynchronousCache.class.getName());
52
53 private I_XmlBlasterAccess con = null;
54 private MsgInterceptor updateInterceptor;
55
56 private EmbeddedXmlBlaster serverThread;
57 private int serverPort = 34576;
58 private boolean startEmbedded = true;
59 private SynchronousCache synchronousCache;
60 private String[] publishOidArr = new String[] { "oid-0", "oid-1", "xx-oid-2" };
61 private String[] contentArr = new String[] { "content-oid-0", "content-oid-1", "content-oid-2" };
62
63 /**
64 * Constructs the TestSynchronousCache object.
65 * <p />
66 * @param testName The name used in the test suite
67 * @param loginName The name to login to the xmlBlaster
68 */
69 public TestSynchronousCache(Global glob, String testName) {
70 super(testName);
71 this.glob = glob;
72
73 }
74
75 /**
76 * Sets up the fixture.
77 * <p />
78 * Creates a CORBA connection and does a login.<br />
79 * - One connection for the sender client<br />
80 */
81 protected void setUp() {
82 this.startEmbedded = glob.getProperty().get("startEmbedded", this.startEmbedded);
83 if (this.startEmbedded) {
84 glob.init(Util.getOtherServerPorts(serverPort));
85 String[] args = { };
86 glob.init(args);
87 serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
88 log.info("XmlBlaster is ready for testing the client cache");
89 }
90
91 try {
92 this.con = glob.getXmlBlasterAccess();
93 this.synchronousCache = con.createSynchronousCache(100); // remember handle to check in this test
94 ConnectQos connectQos = new ConnectQos(glob);
95 this.updateInterceptor = new MsgInterceptor(glob,log, null);
96 this.con.connect(connectQos, this.updateInterceptor);
97 }
98 catch (Exception e) {
99 log.severe(e.toString());
100 e.printStackTrace();
101 }
102 }
103
104 /**
105 * Tears down the fixture.
106 * <p />
107 * cleaning up .... logout
108 */
109 protected void tearDown() {
110 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait 200 milli seconds, until all updates are processed ...
111
112 for (int i=0; i<publishOidArr.length; i++) {
113 // Erase if not all have been destroyed during test
114 sendErase(publishOidArr[i]);
115 }
116
117 this.con.disconnect(null);
118 this.con = null;
119
120 if (this.startEmbedded) {
121 try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
122 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
123 this.serverThread = null;
124 }
125
126 // reset to default server port (necessary if other tests follow in the same JVM).
127 Util.resetPorts(glob);
128
129 this.glob = null;
130
131 this.updateInterceptor = null;
132 this.synchronousCache = null;
133 }
134
135 public EraseReturnQos[] sendErase(String publishOid) {
136 log.info("Erasing topic '" + publishOid + "'");
137 try {
138 EraseQos eq = new EraseQos(glob);
139 // !!!! NOTE: if force destroy is true the erase event may not
140 // come through and the cache is not cleared !!! How to relove?
141 eq.setForceDestroy(false);
142 EraseKey ek = new EraseKey(glob, publishOid);
143 EraseReturnQos[] er = con.erase(ek, eq);
144 // Wait 200 milli seconds, until erase event is processed and cache is cleared ...
145 try { Thread.sleep(200L); } catch( InterruptedException i) {}
146 return er;
147 } catch(XmlBlasterException e) {
148 fail("Erase XmlBlasterException: " + e.getMessage());
149 }
150 return null;
151 }
152
153 /**
154 * Publish an almost volatile message.
155 */
156 public PublishReturnQos publishMsg(String publishOid, String content) {
157 log.info("Sending a message '" + content + "'");
158 try {
159 // Publish a volatile message
160 PublishKey pk = new PublishKey(glob, publishOid, "text/xml", "1.0");
161 PublishQos pq = new PublishQos(glob);
162 MsgUnit msgUnit = new MsgUnit(pk, content, pq);
163 PublishReturnQos publishReturnQos = con.publish(msgUnit);
164 assertEquals("Retunred oid is invalid", publishOid, publishReturnQos.getKeyOid());
165 log.info("Sending of '" + content + "' done, returned oid=" + publishOid + " " + msgUnit.toXml());
166 return publishReturnQos;
167 } catch(XmlBlasterException e) {
168 log.severe("publish() XmlBlasterException: " + e.getMessage());
169 assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
170 }
171 return null; // never reached
172 }
173
174 /**
175 * THIS IS THE TEST
176 * <p>
177 * We publish some messages and try cached access.
178 * </p>
179 */
180 public void testCachedAccess() {
181 {
182 log.info("Entering testCachedAccess ...");
183 try {
184 publishMsg(publishOidArr[0], contentArr[0]);
185 publishMsg(publishOidArr[2], contentArr[2]);
186 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait 200 milli seconds, until all updates are processed ...
187
188 GetKey gk = new GetKey(glob, publishOidArr[0]);
189 GetQos gq = new GetQos(glob);
190
191 for (int i=0; i<10; i++) {
192 MsgUnit[] msgs = con.getCached(gk, gq);
193 assertEquals(this.synchronousCache.toXml(""), 1, msgs.length);
194 GetReturnQos grq = new GetReturnQos(glob, msgs[0].getQos());
195 assertEquals("", 1, this.synchronousCache.getNumQueriesCached());
196 log.info("Accessed xmlBlaster message with content '" + new String(msgs[0].getContent()) +
197 "' and status=" + grq.getState());
198 }
199
200 sendErase(publishOidArr[0]);
201 assertEquals("", 0, this.synchronousCache.getNumQueriesCached());
202 sendErase(publishOidArr[2]);
203 }
204 catch (XmlBlasterException e) {
205 log.severe("testCachedAccess() failed: " + e.getMessage());
206 fail(e.getMessage());
207 }
208 assertEquals("Unexpected update arrived", 0, this.updateInterceptor.waitOnUpdate(1000L, 0));
209 }
210
211 {
212 log.info("Entering testCachedAccess with updated MsgUnit ...");
213 try {
214 PublishReturnQos publishReturnQos = publishMsg(publishOidArr[0], contentArr[0]);
215
216 GetKey gk = new GetKey(glob, publishOidArr[0]);
217 GetQos gq = new GetQos(glob);
218
219 for (int i=0; i<5; i++) {
220 MsgUnit[] msgs = con.getCached(gk, gq);
221 GetReturnQos grq = new GetReturnQos(glob, msgs[0].getQos());
222 assertEquals(this.synchronousCache.toXml(""), 1, msgs.length);
223 assertEquals("", 1, this.synchronousCache.getNumQueriesCached());
224 assertEquals("", publishReturnQos.getRcvTimestamp(), grq.getRcvTimestamp());
225 assertEquals("", contentArr[0], msgs[0].getContentStr());
226 log.info("Accessed xmlBlaster message with content '" + new String(msgs[0].getContent()) +
227 "' and status=" + grq.getState() + " rcv=" + grq.getRcvTimestamp());
228 }
229
230 // Now publish again an check if cache is updated
231 String contentNew = contentArr[0]+"-NEW";
232 publishReturnQos = publishMsg(publishOidArr[0], contentNew);
233 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait 200 milli seconds, until all updates are processed ...
234 for (int i=0; i<5; i++) {
235 MsgUnit[] msgs = con.getCached(gk, gq);
236 GetReturnQos grq = new GetReturnQos(glob, msgs[0].getQos());
237 assertEquals(this.synchronousCache.toXml(""), 1, msgs.length);
238 assertEquals("", 1, this.synchronousCache.getNumQueriesCached());
239 assertEquals("", publishReturnQos.getRcvTimestamp().getTimestamp(), grq.getRcvTimestamp().getTimestamp());
240 assertEquals("", publishReturnQos.getKeyOid(), msgs[0].getKeyOid());
241 assertEquals("", contentNew, msgs[0].getContentStr());
242 log.info("Accessed xmlBlaster message with content '" + new String(msgs[0].getContent()) +
243 "' and status=" + grq.getState() + " rcv=" + grq.getRcvTimestamp());
244 }
245
246 sendErase(publishOidArr[0]);
247 assertEquals("", 0, this.synchronousCache.getNumQueriesCached());
248 }
249 catch (XmlBlasterException e) {
250 log.severe("testCachedAccess() failed: " + e.getMessage());
251 fail(e.getMessage());
252 }
253 assertEquals("Unexpected update arrived", 0, this.updateInterceptor.waitOnUpdate(1000L, 0));
254 }
255
256 {
257 log.info("Entering testCachedAccess with XPATH ...");
258 try {
259 PublishReturnQos publishReturnQos0 = publishMsg(publishOidArr[0], contentArr[0]);
260 PublishReturnQos publishReturnQos1 = publishMsg(publishOidArr[1], contentArr[1]);
261 publishMsg(publishOidArr[2], contentArr[2]);
262 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait 200 milli seconds, until all updates are processed ...
263
264 // This should match [0] and [1] msg:
265 GetKey gk = new GetKey(glob, "//key[starts-with(@oid,'oid-')]", Constants.XPATH);
266 GetQos gq = new GetQos(glob);
267
268 for (int i=0; i<10; i++) {
269 MsgUnit[] msgs = con.getCached(gk, gq);
270 assertEquals("", 2, msgs.length);
271 GetReturnQos grq0 = new GetReturnQos(glob, msgs[0].getQos());
272 GetReturnQos grq1 = new GetReturnQos(glob, msgs[1].getQos());
273 assertEquals(this.synchronousCache.toXml(""), 2, msgs.length);
274 assertEquals(this.synchronousCache.toXml(""), 1, this.synchronousCache.getNumQueriesCached());
275 log.info(" publishReturnQos0.getRcvTimestamp()=" + publishReturnQos0.getRcvTimestamp() +
276 " publishReturnQos1.getRcvTimestamp()=" + publishReturnQos1.getRcvTimestamp() +
277 " grq0.getRcvTimestamp()=" + grq0.getRcvTimestamp() +
278 " grq1.getRcvTimestamp()=" + grq1.getRcvTimestamp());
279 assertTrue("", publishReturnQos0.getRcvTimestamp().equals(grq0.getRcvTimestamp()) ||
280 publishReturnQos0.getRcvTimestamp().equals(grq1.getRcvTimestamp()));
281 assertTrue("", publishReturnQos1.getRcvTimestamp().equals(grq0.getRcvTimestamp()) ||
282 publishReturnQos1.getRcvTimestamp().equals(grq1.getRcvTimestamp()));
283 assertTrue("", !grq0.getRcvTimestamp().equals(grq1.getRcvTimestamp()));
284 assertEquals("", 2, msgs.length);
285 log.info("Accessed " + msgs.length + " xmlBlaster messages with content '" +
286 new String(msgs[0].getContent()) +
287 "' and '" + new String(msgs[1].getContent()) + "' and status=" + grq0.getState());
288 }
289
290 log.info("Current cache:" + this.synchronousCache.toXml(""));
291 assertEquals("", 1, this.synchronousCache.getNumQueriesCached());
292 /*EraseReturnQos[] arr0 =*/ sendErase(publishOidArr[0]);
293 assertEquals("", 1, this.synchronousCache.getNumQueriesCached());
294 sendErase(publishOidArr[1]);
295 log.info("Current cache:" + this.synchronousCache.toXml(""));
296
297 // The cache is not cleared automatically for XPATH, we do it manually
298 this.synchronousCache.removeEntryByQueryString(this.synchronousCache.getQueryString(gk));
299 log.info("Current cache:" + this.synchronousCache.toXml(""));
300 assertEquals("", 0, this.synchronousCache.getNumQueriesCached());
301 sendErase(publishOidArr[2]);
302 assertEquals("", 0, this.synchronousCache.getNumQueriesCached());
303 }
304 catch (XmlBlasterException e) {
305 log.severe("testCachedAccess() failed: " + e.getMessage());
306 fail(e.getMessage());
307 }
308 assertEquals("Unexpected update arrived", 0, this.updateInterceptor.waitOnUpdate(1000L, 0));
309 }
310 log.info("SUCCESS testCachedAccess");
311 }
312
313 /**
314 * Method is used by TestRunner to load these tests
315 */
316 public static Test suite() {
317 TestSuite suite= new TestSuite();
318 suite.addTest(new TestSynchronousCache(new Global(), "testCachedAccess"));
319 return suite;
320 }
321
322 /**
323 * Invoke: java org.xmlBlaster.test.client.TestSynchronousCache -startEmbedded false
324 */
325 public static void main(String args[]) {
326 TestSynchronousCache testSub = new TestSynchronousCache(new Global(args), "TestSynchronousCache");
327 testSub.setUp();
328 testSub.testCachedAccess();
329 testSub.tearDown();
330 }
331 }
syntax highlighted by Code2HTML, v. 0.9.1