1 /*------------------------------------------------------------------------------
2 Name: TestAdminGet.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.admin;
7
8 import java.util.logging.Logger;
9 import java.util.logging.Level;
10 import org.xmlBlaster.util.Global;
11 import org.xmlBlaster.util.SessionName;
12 import org.xmlBlaster.util.XmlBlasterException;
13 import org.xmlBlaster.util.def.Constants;
14 import org.xmlBlaster.util.property.PropString;
15 import org.xmlBlaster.util.qos.QuerySpecQos;
16 import org.xmlBlaster.util.MsgUnit;
17 import org.xmlBlaster.client.I_Callback;
18 import org.xmlBlaster.client.I_XmlBlasterAccess;
19 import org.xmlBlaster.client.key.GetKey;
20 import org.xmlBlaster.client.key.PublishKey;
21 import org.xmlBlaster.client.key.SubscribeKey;
22 import org.xmlBlaster.client.key.UnSubscribeKey;
23 import org.xmlBlaster.client.key.UpdateKey;
24 import org.xmlBlaster.client.qos.*;
25
26 import org.xmlBlaster.test.MsgInterceptor;
27
28 import junit.framework.*;
29
30
31 /**
32 * Tests the activation/deactivation of the DispatchManager.
33 * <br />
34 * If the DispatchManager is disactivated, asynchronous dispatch should not
35 * be possible.
36 * <p>
37 * Invoke examples:<br />
38 * <pre>
39 * java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestAdminGet
40 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestAdminGet
41 * </pre>
42 */
43 public class TestAdminGet extends TestCase implements I_Callback
44 {
45
46 public class PublisherThread extends Thread {
47 private Global global;
48 private long delay;
49 private MsgUnit[] msgUnits;
50 private Exception ex;
51
52 public PublisherThread(Global global, long timeToWaitBeforePublishing, MsgUnit[] msgUnits) {
53 this.global = global;
54 this.delay = timeToWaitBeforePublishing;
55 this.msgUnits = msgUnits;
56 start();
57 }
58
59 public boolean hasException() {
60 return (this.ex != null);
61 }
62
63 public void run() {
64 try {
65 if (this.delay > 0L) sleep(this.delay);
66 this.global.getXmlBlasterAccess().publishArr(this.msgUnits);
67 }
68 catch (Exception ex) {
69 ex.printStackTrace();
70 this.ex = ex;
71 }
72 }
73 }
74
75 private static String ME = "TestAdminGet";
76
77 private Global glob;
78 private static Logger log = Logger.getLogger(TestAdminGet.class.getName());
79
80 private MsgInterceptor updateInterceptor;
81 private String senderName;
82
83 private final String contentMime = "text/plain";
84
85 private String sessionName = "dispatchTester/1";
86
87 public TestAdminGet(String testName) {
88 this(null, testName);
89 }
90
91 public TestAdminGet(Global glob, String testName) {
92 super(testName);
93 this.senderName = testName;
94 }
95
96 /**
97 * Sets up the fixture.
98 * <p />
99 * Connect to xmlBlaster and login
100 */
101 protected void setUp() {
102 this.glob = (this.glob == null) ? Global.instance() : this.glob;
103
104 this.updateInterceptor = new MsgInterceptor(this.glob, log, null);
105
106 try {
107 I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess(); // Find orb
108
109 String passwd = "secret";
110 ConnectQos connectQos = new ConnectQos(this.glob, senderName, passwd); // == "<qos>...</qos>";
111 connectQos.setSessionName(new SessionName(this.glob, this.sessionName));
112 con.connect(connectQos, this); // Login to xmlBlaster, register for updates
113 }
114 catch (XmlBlasterException e) {
115 log.warning("setUp() - login failed: " + e.getMessage());
116 fail("setUp() - login fail: " + e.getMessage());
117 }
118 catch (Exception e) {
119 log.severe("setUp() - login failed: " + e.toString());
120 e.printStackTrace();
121 fail("setUp() - login fail: " + e.toString());
122 }
123 }
124
125 /**
126 * Tears down the fixture.
127 * <p />
128 * cleaning up .... erase() the previous message OID and logout
129 */
130 protected void tearDown() {
131 log.info("Entering tearDown(), test is finished");
132 String xmlKey = "<key oid='' queryType='XPATH'>\n" +
133 " //TestAdminGet-AGENT" +
134 "</key>";
135
136 String qos = "<qos><forceDestroy>true</forceDestroy></qos>";
137 I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
138 try {
139 EraseReturnQos[] arr = con.erase(xmlKey, qos);
140
141 PropString defaultPlugin = new PropString("CACHE,1.0");
142 String propName = defaultPlugin.setFromEnv(this.glob, glob.getStrippedId(), null, "persistence", Constants.RELATING_TOPICSTORE, "defaultPlugin");
143 log.info("Lookup of propName=" + propName + " defaultValue=" + defaultPlugin.getValue());
144 }
145 catch(XmlBlasterException e) {
146 log.severe("XmlBlasterException: " + e.getMessage());
147 }
148 finally {
149 con.disconnect(null);
150 // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
151 this.glob = null;
152 con = null;
153 Global.instance().shutdown();
154 }
155 }
156
157 /**
158 * TEST: Subscribe to a specific oid
159 */
160 private void doSubscribe(String oid) {
161 try {
162 SubscribeKey key = new SubscribeKey(this.glob, oid);
163
164 SubscribeQos qos = new SubscribeQos(this.glob); // "<qos><persistent>true</persistent></qos>";
165 qos.setWantNotify(false); // to avoig getting erased messages
166
167 SubscribeReturnQos subscriptionId = this.glob.getXmlBlasterAccess().subscribe(key, qos, this.updateInterceptor);
168
169 log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");
170 assertTrue("returned null subscriptionId", subscriptionId != null);
171 } catch(XmlBlasterException e) {
172 log.warning("XmlBlasterException: " + e.getMessage());
173 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
174 }
175 }
176
177 private void doUnSubscribe(String oid) {
178 try {
179 UnSubscribeKey key = new UnSubscribeKey(this.glob, oid);
180
181 UnSubscribeQos qos = new UnSubscribeQos(this.glob);
182 this.glob.getXmlBlasterAccess().unSubscribe(key, qos);
183 }
184 catch(XmlBlasterException e) {
185 log.warning("XmlBlasterException: " + e.getMessage());
186 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
187 }
188 }
189
190 /**
191 * TEST: Construct a message and publish it.
192 * If the counter is negative, the content of the message will be an empty string.
193 * <p />
194 */
195 public void doPublish(int counter, String oid) throws XmlBlasterException {
196 log.info("Publishing a message " + oid + " ...");
197 String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'><test></test></key>";
198 String content = "" + counter;
199 PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"
200 MsgUnit msgUnit = null;
201 if (counter > -1) msgUnit = new MsgUnit(xmlKey, content.getBytes(), qosWrapper.toXml());
202 else msgUnit = new MsgUnit(xmlKey, "", qosWrapper.toXml());
203
204 this.glob.getXmlBlasterAccess().publish(msgUnit);
205 log.info("Success: Publishing of " + oid + " done");
206 }
207
208 /**
209 * Tests the activation flag setting and getting, i.e. disactivating/activating of the
210 * dispatcher.
211 */
212 public void testActivationFlag() {
213 try {
214 String oid = "TestActivationFlag";
215 log.info("Going to publish 3 times on message '" + oid + "' (first time before subscribing)");
216 doPublish(1, oid);
217 doSubscribe(oid);
218 doPublish(2, oid);
219 doPublish(3, oid);
220 assertEquals("wrong number of updates received", 3, this.updateInterceptor.waitOnUpdate(500L));
221 this.updateInterceptor.clear();
222
223 String getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive=one,two,three";
224 MsgUnit[] msg = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), new GetQos(this.glob));
225 assertEquals("wrong number of messages returned", 1, msg.length);
226 for (int i=0; i < msg.length; i++) {
227 log.info("testActivationFlag: dispatcherActive: (" + i + ") : '" + msg[i].getContentStr() + "'");
228 assertEquals("wrong return value", "true", msg[i].getContentStr());
229 }
230
231 getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive=false";
232 doPublish(-1, getOid);
233
234 getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive";
235 msg = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), new GetQos(this.glob));
236 assertEquals("wrong number of messages returned", 1, msg.length);
237 for (int i=0; i < msg.length; i++) {
238 log.info("testActivationFlag: dispatcherActive (result): (" + i + ") : '" + msg[i].getContentStr() + "'");
239 assertEquals("wrong return value", "false", msg[i].getContentStr());
240 }
241
242 doPublish(4, oid);
243 doPublish(5, oid);
244 int numArrived = this.updateInterceptor.waitOnUpdate(2000L);
245 assertEquals("wrong number of messages arrived", 0, numArrived);
246
247 getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive=true";
248 doPublish(-1, getOid);
249
250 getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive";
251 msg = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), new GetQos(this.glob));
252 assertEquals("wrong number of messages returned", 1, msg.length);
253 for (int i=0; i < msg.length; i++) {
254 log.info("testActivationFlag: dispatcherActive (result): (" + i + ") : '" + msg[i].getContentStr() + "'");
255 assertEquals("wrong return value", "true", msg[i].getContentStr());
256 }
257
258 numArrived = this.updateInterceptor.waitOnUpdate(2000L);
259 assertEquals("wrong number of messages arrived", 2, numArrived);
260
261 }
262 catch (XmlBlasterException ex) {
263 ex.printStackTrace();
264 assertTrue("exception should not occur here", false);
265 }
266 }
267
268 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
269 String contentStr = new String(content);
270 String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
271 log.info("Receiving update of a message oid=" + updateKey.getOid() +
272 " priority=" + updateQos.getPriority() +
273 " state=" + updateQos.getState() +
274 " content=" + cont);
275 log.info("further log for receiving update of a message cbSessionId=" + cbSessionId +
276 updateKey.toXml() + "\n" + new String(content) + updateQos.toXml());
277 log.severe("update: should never be invoked (msgInterceptors take care of it since they are passed on subscriptions)");
278 return "OK";
279 }
280
281
282 /**
283 * Testing the getting of queue entries without removing them from the queue.
284 * TEST: <br />
285 */
286 public void testGetQueueEntries() {
287 try {
288 String oid = "TestGetQueueEntries";
289 doSubscribe(oid);
290
291 String getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive=false";
292 doPublish(-1, getOid);
293
294 doPublish(1, oid);
295 doPublish(2, oid);
296 doPublish(3, oid);
297 log.info("Going to publish 3 times on message '" + oid + "'");
298 // should not receive anything yet since the dispatcher is not active anymore
299 assertEquals("wrong number of updates received", 0, this.updateInterceptor.waitOnUpdate(500L));
300 this.updateInterceptor.clear();
301
302 // query with a given GetQos ...
303 GetQos getQos = new GetQos(this.glob);
304 // HistoryQos historyQos = new HistoryQos(this.glob);
305 // historyQos.setNumEntries(3);
306 // getQos.setHistoryQos(historyQos);
307 QuerySpecQos querySpecQos = new QuerySpecQos(this.glob, "QueueQuery", "1.0", "maxEntries=3&maxSize=-1&consumable=false&waitingDelay=0");
308 getQos.addQuerySpec(querySpecQos);
309
310 getOid = "__cmd:client/" + this.sessionName + "/?cbQueueEntries";
311 MsgUnit[] mu = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), getQos);
312 assertEquals("wrong number of retreived entries", 3, mu.length);
313
314 }
315 catch (XmlBlasterException ex) {
316 ex.printStackTrace();
317 assertTrue("exception should not occur here", false);
318 }
319 }
320
321 private void doActivateDispatch(boolean doDispatch) throws XmlBlasterException {
322 // inhibit delivery of subscribed messages ...
323 String getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive=" + doDispatch;
324 doPublish(-1, getOid);
325 // query with a given GetQos ...
326 getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive";
327 MsgUnit[] msg = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), new GetQos(this.glob));
328 assertEquals("wrong number of messages returned", 1, msg.length);
329 assertEquals("wrong return value", "" + doDispatch, msg[0].getContentStr());
330 }
331
332
333 /**
334 * Testing the getting of queue entries. Note that before this method is called, the queue must be empty
335 * TEST: <br />
336 */
337 private void adminGet(String oid, boolean consumable, long waitingDelay, int maxEntries, int initialEntries, int endEntries, int entriesExpected) {
338 try {
339 int sizePerMsg = 0;
340 doActivateDispatch(false);
341 assertEquals("wrong prerequisite: entries have arrived before starting the test: probably coming from an inconsistency in the previous test", 0, this.updateInterceptor.count());
342 this.updateInterceptor.clear();
343 for (int i=0; i < initialEntries; i++) doPublish(i, oid);
344 log.info("In the callback queue there should now be '" + initialEntries + "' entries");
345 int ret = this.updateInterceptor.waitOnUpdate(200L);
346 assertEquals("no update should arrive here ", 0, ret);
347
348 // prepare the messages to be published
349 // wait a third of the total waiting time before publishing in a separate thread (while we wait for updates)
350
351 int extraEntries = endEntries - initialEntries;
352 MsgUnit[] msgs = new MsgUnit[extraEntries];
353 for (int i=0; i < extraEntries; i++) {
354 String content = "extraMsg" + i;
355 msgs[i] = new MsgUnit(new PublishKey(this.glob, oid), content, new PublishQos(this.glob));
356 }
357 long delay = waitingDelay / 3 + 10L;
358 PublisherThread pubThread = new PublisherThread(this.glob, delay, msgs);
359
360 // query with a given GetQos ...
361 GetQos getQos = new GetQos(this.glob);
362 QuerySpecQos querySpecQos = new QuerySpecQos(this.glob, "QueueQuery", "1.0", "maxEntries=" + maxEntries + "&maxSize=-1&consumable=" + consumable + "&waitingDelay=" + waitingDelay);
363 getQos.addQuerySpec(querySpecQos);
364 String getOid = "__cmd:client/" + this.sessionName + "/?cbQueueEntries";
365 MsgUnit[] mu = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), getQos);
366 assertEquals("an exception occured when it should not", false, pubThread.hasException());
367 assertEquals("wrong number of retreived entries", entriesExpected, mu.length);
368
369 assertEquals("messages should not arrive here", 0, this.updateInterceptor.count());
370 doActivateDispatch(true);
371 if (consumable) {
372 int rest = endEntries-mu.length;
373 int arrived = 0;
374 if (rest < 1) {
375 arrived = this.updateInterceptor.waitOnUpdate(500L, 1);
376 }
377 else arrived = this.updateInterceptor.waitOnUpdate(500L, rest);
378 assertEquals("wrong number of messages arrived (some should have been consumed by the get", rest, arrived);
379 }
380 else {
381 int arrived = this.updateInterceptor.waitOnUpdate(200L, endEntries);
382 assertEquals("all published messages should arrive here", endEntries, arrived);
383 }
384 this.updateInterceptor.clear();
385 }
386 catch (XmlBlasterException ex) {
387 ex.printStackTrace();
388 assertTrue("exception should not occur here", false);
389 }
390 }
391
392 public void testGetNonConsumableNoWaiting() {
393 String oid = "NonConsumableNoWaiting";
394 doSubscribe(oid);
395 boolean consumable = false;
396 long waiting = 0L; // no waiting
397 int maxEntries = 3;
398 adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
399 adminGet(oid, consumable, waiting, maxEntries, 0, 4, 0);
400 adminGet(oid, consumable, waiting, maxEntries, 2, 2, 2);
401 adminGet(oid, consumable, waiting, maxEntries, 0, 2, 0);
402 doUnSubscribe(oid);
403 }
404
405 public void testGetConsumableNoWaiting() {
406 String oid = "ConsumableNoWaiting";
407 doSubscribe(oid);
408 boolean consumable = true;
409 long waiting = 0L; // no waiting
410 int maxEntries = 3;
411 adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
412 adminGet(oid, consumable, waiting, maxEntries, 0, 4, 0);
413 adminGet(oid, consumable, waiting, maxEntries, 2, 2, 2);
414 adminGet(oid, consumable, waiting, maxEntries, 0, 2, 0);
415 doUnSubscribe(oid);
416 }
417
418 public void testGetNonConsumableDoWaiting() {
419 String oid = "NonConsumableDoWaiting";
420 doSubscribe(oid);
421 boolean consumable = false;
422 long waiting = 200L; // no waiting
423 int maxEntries = 3;
424 adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
425 adminGet(oid, consumable, waiting, maxEntries, 0, 4, maxEntries);
426 adminGet(oid, consumable, waiting, maxEntries, 2, 2, 2);
427 adminGet(oid, consumable, waiting, maxEntries, 0, 2, 2);
428 adminGet(oid, consumable, waiting, maxEntries, 1, maxEntries, maxEntries);
429 waiting = -1L;
430 adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
431 adminGet(oid, consumable, waiting, maxEntries, 0, 4, maxEntries);
432 adminGet(oid, consumable, waiting, maxEntries, 1, maxEntries, maxEntries);
433 doUnSubscribe(oid);
434 }
435
436 public void testGetConsumableDoWaiting() {
437 String oid = "ConsumableDoWaiting";
438 doSubscribe(oid);
439 boolean consumable = false;
440 long waiting = 200L; // no waiting
441 int maxEntries = 3;
442 adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
443 adminGet(oid, consumable, waiting, maxEntries, 0, 4, maxEntries);
444 adminGet(oid, consumable, waiting, maxEntries, 2, 2, 2);
445 adminGet(oid, consumable, waiting, maxEntries, 0, 2, 2);
446 adminGet(oid, consumable, waiting, maxEntries, 1, maxEntries, maxEntries);
447 waiting = -1L;
448 adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
449 adminGet(oid, consumable, waiting, maxEntries, 0, 4, maxEntries);
450 adminGet(oid, consumable, waiting, maxEntries, 1, maxEntries, maxEntries);
451 doUnSubscribe(oid);
452 }
453
454
455 /**
456 * Invoke: java org.xmlBlaster.test.client.TestAdminGet
457 * <p />
458 * @deprecated Use the TestRunner from the testsuite to run it:<p />
459 * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestAdminGet</pre>
460 */
461 public static void main(String args[])
462 {
463 Global glob = new Global();
464 if (glob.init(args) != 0) {
465 System.out.println(ME + ": Init failed");
466 System.exit(1);
467 }
468
469 TestAdminGet testSub = new TestAdminGet(glob, "TestAdminGet/1");
470
471 testSub.setUp();
472 testSub.testActivationFlag();
473 testSub.tearDown();
474
475 testSub.setUp();
476 testSub.testGetQueueEntries();
477 testSub.tearDown();
478
479 testSub.setUp();
480 testSub.testGetNonConsumableNoWaiting();
481 testSub.tearDown();
482
483 testSub.setUp();
484 testSub.testGetNonConsumableDoWaiting();
485 testSub.tearDown();
486
487 testSub.setUp();
488 testSub.testGetConsumableNoWaiting();
489 testSub.tearDown();
490
491 testSub.setUp();
492 testSub.testGetConsumableDoWaiting();
493 testSub.tearDown();
494
495 }
496 }
syntax highlighted by Code2HTML, v. 0.9.1