1 package org.xmlBlaster.test.classtest.queue;
   2 
   3 import java.util.ArrayList;
   4 import java.util.List;
   5 import java.util.logging.Level;
   6 import java.util.logging.Logger;
   7 
   8 import junit.framework.Test;
   9 import junit.framework.TestCase;
  10 import junit.framework.TestSuite;
  11 
  12 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
  13 import org.xmlBlaster.util.Global;
  14 import org.xmlBlaster.util.MsgUnit;
  15 import org.xmlBlaster.util.XmlBlasterException;
  16 import org.xmlBlaster.util.def.Constants;
  17 import org.xmlBlaster.util.def.ErrorCode;
  18 import org.xmlBlaster.util.def.PriorityEnum;
  19 import org.xmlBlaster.util.plugin.PluginInfo;
  20 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
  21 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
  22 import org.xmlBlaster.util.queue.BlockingQueueWrapper;
  23 import org.xmlBlaster.util.queue.I_Entry;
  24 import org.xmlBlaster.util.queue.I_Queue;
  25 import org.xmlBlaster.util.queue.I_QueueEntry;
  26 import org.xmlBlaster.util.queue.I_Storage;
  27 import org.xmlBlaster.util.queue.I_StorageSizeListener;
  28 import org.xmlBlaster.util.queue.QueuePluginManager;
  29 import org.xmlBlaster.util.queue.StorageId;
  30 import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;
  31 import org.xmlBlaster.util.queuemsg.DummyEntry;
  32 
  33 /**
  34  * Test RamQueuePlugin.
  35  * <p>
  36  * The sorting order is priority,timestamp:
  37  * </p>
  38  * <pre>
  39  *   ->    5,100 - 5,98 - 5,50 - 9,3000 - 9,2500   ->
  40  * </pre>
  41  * <p>
  42  * As 9 is highest priority it is the first to be taken out.<br />
  43  * As we need to maintain the timely sequence and
  44  * id is a timestamp in (more or less) nano seconds elapsed since 1970)
  45  * the id 2500 (it is older) has precedence to the id 3000
  46  * </p>
  47  * <p>
  48  * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.I_QueueTest
  49  * </p>
  50  * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry)
  51  * @see org.xmlBlaster.util.queue.I_Queue
  52  * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin
  53  * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
  54  */
  55 public class I_QueueTest extends TestCase {
  56 
  57 
  58    class QueueSizeListener  implements I_StorageSizeListener {
  59 
  60       private long lastNumEntries = 0L, 
  61                    lastNumBytes = 0L,
  62                    lastIncrementEntries = 0L,
  63                    lastIncrementBytes = 0L;
  64       private int count = 0;
  65       
  66       public long getLastIncrementEntries() {
  67          return this.lastIncrementEntries;
  68       }
  69       
  70       public long getLastIncrementBytes() {
  71          return this.lastIncrementBytes;
  72       }
  73       
  74       public int getCount() {
  75          return this.count;
  76       }
  77       
  78       public void clear() {
  79          this.lastNumEntries = 0L; 
  80          this.lastNumBytes = 0L;
  81          this.lastIncrementEntries = 0L;
  82          this.lastIncrementBytes = 0L;
  83          this.count = 0;
  84       }
  85       
  86       public void changed(I_Storage storage, long numEntries, long numBytes, boolean isShutdown) {
  87          this.lastIncrementEntries = numEntries - this.lastNumEntries;
  88          this.lastIncrementBytes = numBytes - this.lastNumBytes;
  89          this.lastNumEntries = numEntries;
  90          this.lastNumBytes = numBytes;
  91          this.count++;
  92       }
  93 
  94    }
  95    
  96 
  97    private String ME = "I_QueueTest";
  98    protected Global glob;
  99    private static Logger log = Logger.getLogger(I_QueueTest.class.getName());
 100 
 101    private I_Queue queue;
 102    private QueueSizeListener queueSizeListener = new QueueSizeListener();
 103    
 104    static String[] PLUGIN_TYPES = {
 105                    new String("RAM"),
 106                    new String("JDBC"),
 107                    new String("CACHE")
 108                  };
 109 
 110 /*
 111    static I_Queue[] IMPL = {
 112                    new org.xmlBlaster.util.queue.ram.RamQueuePlugin(),
 113                    new org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin(),
 114                    new org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin()
 115                  };
 116 */
 117 
 118    public class QueuePutter extends Thread {
 119       
 120       private I_Queue queue;
 121       private long delay;
 122       private int numOfEntries;
 123       private boolean ignoreInterceptor;
 124       
 125       public QueuePutter(I_Queue queue, long delay, int numOfEntries, boolean ignoreInterceptor) {
 126          this.queue = queue;
 127          this.delay = delay;
 128          this.numOfEntries = numOfEntries;
 129          this.ignoreInterceptor = ignoreInterceptor;
 130       }
 131       
 132       public void run() {
 133          try {
 134             for (int i=0; i < this.numOfEntries; i++) {
 135                Thread.sleep(this.delay);
 136                DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
 137                this.queue.put(queueEntry, this.ignoreInterceptor);
 138             }
 139          }
 140          catch (Exception ex) {
 141             ex.printStackTrace();
 142          }
 143       }
 144       
 145    }
 146    
 147    
 148    public I_QueueTest(String name, int currImpl, Global glob) {
 149       super(name);
 150 //      this.queue = IMPL[currImpl];
 151       //this.ME = "I_QueueTest[" + this.queue.getClass().getName() + "]";
 152 
 153       if (glob == null) this.glob = Global.instance();
 154       else this.glob = glob;
 155 
 156 
 157       try {
 158          String type = PLUGIN_TYPES[currImpl];
 159          this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
 160          QueuePluginManager pluginManager = new QueuePluginManager(glob);
 161 
 162          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
 163          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
 164          prop.put("tableNamePrefix", "TEST");
 165          prop.put("entriesTableName", "_entries");
 166          this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());
 167 
 168          pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
 169          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "SomeQueueId");
 170          this.queue = pluginManager.getPlugin(pluginInfo, queueId, new CbQueueProperty(this.glob, Constants.RELATING_CALLBACK, this.glob.getStrippedId()));
 171          this.queue.shutdown(); // to allow to initialize again
 172       }
 173       catch (Exception ex) {
 174          log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
 175       }
 176    }
 177 
 178    protected void setUp() {
 179       // cleaning up the database from previous runs ...
 180       QueuePropertyBase prop = null;
 181       try {
 182          prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 183          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "SomeQueueId");
 184          queue.initialize(queueId, prop);
 185          queue.clear();
 186          queue.shutdown();
 187       }
 188       catch (Exception ex) {
 189          log.severe("could not propertly set up the database: " + ex.getMessage());
 190       }
 191    }
 192 
 193    /**
 194     * Tests QueuePropertyBase() and getStorageId()
 195     * @param queueTypeList A space separated list of names for the
 196     *        implementations to be tested. Valid names are:
 197     *        RamQueuePlugin JdbcQueuePlugin
 198     */
 199    public void testConfig() {
 200       config(this.queue);
 201    }
 202 
 203    /**
 204     * Tests initialize(), getProperties(), setProperties() and capacity()
 205     * @param queue !!!Is not initialized in this case!!!!
 206     */
 207    private void config(I_Queue queue) {
 208       ME = "I_QueueTest.config(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
 209       System.out.println("***" + ME);
 210 
 211       QueuePropertyBase prop1 = null;
 212       QueuePropertyBase prop = null;
 213       try {
 214          // test initialize()
 215          prop1 = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 216          int max = 12;
 217          prop1.setMaxEntries(max);
 218          prop1.setMaxEntriesCache(max);
 219          assertEquals(ME+": Wrong capacity", max, prop1.getMaxEntries());
 220          assertEquals(ME+": Wrong cache capacity", max, prop1.getMaxEntriesCache());
 221          //PluginInfo pluginInfo = new PluginInfo(glob, null, "");
 222          //queue.init(glob, pluginInfo);     // Init from pluginloader is first
 223          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "SomeQueueId");
 224          queue.initialize(queueId, prop1);
 225          queue.clear(); // this is needed since the tearDown has cleaned the queue with previous cfg (other StorageId)
 226          assertEquals(ME+": Wrong queue ID", queueId, queue.getStorageId());
 227 
 228          try {
 229             prop = new CbQueueProperty(glob, Constants.RELATING_SUBJECT, "/node/test");
 230             prop.setMaxEntries(99);
 231             prop.setMaxEntriesCache(99);
 232             queue.setProperties(prop);
 233          }
 234          catch(XmlBlasterException e) {
 235             fail("Changing properties failed");
 236          }
 237 
 238       }
 239       catch(XmlBlasterException e) {
 240          fail(ME + ": Exception thrown: " + e.getMessage());
 241       }
 242 
 243       long len = prop.getMaxEntries();
 244       assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), queue.getMaxNumOfEntries());
 245       assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), ((QueuePropertyBase)queue.getProperties()).getMaxEntries());
 246       assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());
 247 
 248       try {
 249          for (int ii=0; ii<len; ii++) {
 250             queue.put(new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true), false);
 251          }
 252          assertEquals(ME+": Wrong total size", len, queue.getNumOfEntries());
 253 
 254          try {
 255             DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
 256             queue.put(queueEntry, false);
 257             queue.put(queueEntry, false);
 258             fail("Did expect an exception on overflow");
 259          }
 260          catch(XmlBlasterException e) {
 261             log.info("SUCCESS the exception is OK: " + e.getMessage());
 262          }
 263 
 264          log.info("toXml() test:" + queue.toXml(""));
 265          log.info("usage() test:" + queue.usage());
 266 
 267          assertEquals(ME+": should not be shutdown", false, queue.isShutdown());
 268          queue.shutdown();
 269          assertEquals(ME+": should be shutdown", true, queue.isShutdown());
 270 
 271          log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
 272          System.out.println("***" + ME + " [SUCCESS]");
 273          queue.shutdown();
 274          queue = null;
 275       }
 276       catch(XmlBlasterException e) {
 277          fail(ME + ": Exception thrown: " + e.getMessage());
 278       }
 279    }
 280 
 281 //------------------------------------
 282    public void testSize1() {
 283       String queueType = "unknown";
 284       try {
 285          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 286          int max = 1;
 287          prop.setMaxEntries(max);
 288          prop.setMaxEntriesCache(max);
 289          queueType = this.queue.toString();
 290          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/size1");
 291          this.queue.initialize(queueId, prop);
 292          queue.clear();
 293          assertEquals(ME + "wrong size before starting ", 0L, queue.getNumOfEntries());
 294          assertEquals(ME, 1L, queue.getMaxNumOfEntries());
 295          size1(this.queue);
 296       }
 297       catch (XmlBlasterException ex) {
 298          fail("Exception when testing Size1 probably due to failed initialization of the queue of type " + queueType);
 299       }
 300    }
 301 
 302    /**
 303     * Tests put(MsgQueueEntry[]) and put(MsgQueueEntry) and clear()
 304     */
 305    private void size1(I_Queue queue) {
 306       this.queue = queue;
 307       ME = "I_QueueTest.size1(" + queue.getStorageId() + ")[" + this.queue.getClass().getName() + "]";
 308       System.out.println("***" + ME);
 309       int maxEntries = (int)queue.getMaxNumOfEntries();
 310       try {
 311          //========== Test 1: put(I_QueueEntry[])
 312          int numLoop = 10;
 313          List<I_Entry> list = new ArrayList<I_Entry>();
 314 
 315          //========== Test 2: put(I_QueueEntry)
 316          this.queue.removeStorageSizeListener(null);
 317          this.queue.addStorageSizeListener(this.queueSizeListener);
 318          this.queueSizeListener.clear();
 319 
 320          for (int ii=0; ii<numLoop; ii++) {
 321             DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
 322             try {
 323                queue.put(queueEntry, false);
 324                assertEquals("number of entries incremented on last invocation", 1, this.queueSizeListener.getLastIncrementEntries());
 325                assertEquals("number of bytes incremented on last invocation", queueEntry.getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
 326                
 327                if (ii > maxEntries) { // queue allows on overload
 328                   fail("Didn't expect more than " + maxEntries + " entries" + queue.toXml(""));
 329                }
 330                else
 331                   list.add(queueEntry);
 332             }
 333             catch (XmlBlasterException e) {
 334                if (ii <= maxEntries) {
 335                   fail("Didn't expect exception" + e.getMessage());
 336                }
 337             }
 338          }
 339          assertEquals("number of invocations for queue size listener is wrong", maxEntries+1, this.queueSizeListener.getCount());
 340 
 341          // The queues allow temporary oversize (one extra put())
 342          assertEquals(ME+": Wrong total size " + queue.toXml(""), maxEntries+1, queue.getNumOfEntries());
 343          this.checkSizeAndEntries(" put(I_QueueEntry) ", list, queue);
 344          log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
 345 
 346          List<I_Entry> entryList = null;
 347          try {
 348             entryList = queue.peekLowest(1, -1L, null, false);
 349             assertEquals("PEEK #1 failed"+queue.toXml(""), 1, entryList.size());
 350             log.info("curr entries="+queue.getNumOfEntries());
 351          }
 352          catch (XmlBlasterException e) {
 353             if (e.getErrorCode()!=ErrorCode.INTERNAL_NOTIMPLEMENTED) throw e;
 354          }
 355 
 356          
 357          //this.queue.removeStorageSizeListener(null);
 358          //this.queue.addStorageSizeListener(this.queueSizeListener);
 359          //this.queueSizeListener.clear();
 360 
 361          entryList = queue.takeLowest(1, -1L, null, false);
 362          long singleSize = ((I_QueueEntry)entryList.get(0)).getSizeInBytes(); 
 363          assertEquals("TAKE #1 failed"+queue.toXml(""), 1, entryList.size());
 364          log.info("curr entries="+queue.getNumOfEntries());
 365          assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries());
 366          assertEquals("number of bytes incremented on last invocation", -singleSize, this.queueSizeListener.getLastIncrementBytes());
 367 
 368          entryList = queue.takeLowest(1, -1L, null, false);
 369          assertEquals("TAKE #2 failed"+queue.toXml(""), 1, entryList.size());
 370          assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries());
 371          assertEquals("number of bytes incremented on last invocation", -singleSize, this.queueSizeListener.getLastIncrementBytes());
 372 
 373          queue.clear();
 374          assertEquals(ME+": Wrong empty size", 0L, queue.getNumOfEntries());
 375 
 376          System.out.println("***" + ME + " [SUCCESS]");
 377          queue.shutdown();
 378          queue = null;
 379 
 380       }
 381       catch(XmlBlasterException e) {
 382          fail(ME + ": Exception thrown: " + e.getMessage());
 383       }
 384       log.info("SUCCESS");
 385    }
 386 
 387 
 388 //------------------------------------
 389    public void testPutMsg() {
 390       String queueType = "unknown";
 391       try {
 392          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 393          queueType = this.queue.toString();
 394          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/putMsg");
 395          this.queue.initialize(queueId, prop);
 396          queue.clear();
 397          assertEquals(ME + "wrong size before starting ", 0L, queue.getNumOfEntries());
 398          putMsg(this.queue);
 399       }
 400       catch (XmlBlasterException ex) {
 401          fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
 402       }
 403    }
 404 
 405 
 406    /**
 407     * @see checkSizeAndEntries(String, I_QueueEntry[], I_Queue)
 408     */
 409    private void checkSizeAndEntries(String txt, List<I_Entry> queueEntries, I_Queue queue) {
 410       checkSizeAndEntries(txt, (I_QueueEntry[])queueEntries.toArray(new I_QueueEntry[queueEntries.size()]), queue);
 411    }
 412 
 413 
 414    /**
 415     * Helper method to do a generic size check (size and number of entries)
 416     */
 417    private void checkSizeAndEntries(String txt, I_QueueEntry[] queueEntries, I_Queue queue) {
 418       long sizeOfTransients = 0L;
 419       long numOfPersistents = 0;
 420       long numOfTransients = 0;
 421       long sizeOfPersistents = 0L;
 422 
 423       for (int i=0; i < queueEntries.length; i++) {
 424          I_QueueEntry entry = queueEntries[i];
 425          if (entry.isPersistent()) {
 426             sizeOfPersistents += entry.getSizeInBytes();
 427             numOfPersistents++;
 428          }
 429          else {
 430             sizeOfTransients += entry.getSizeInBytes();
 431             numOfTransients++;
 432          }
 433       }
 434 
 435       long queueNumOfPersistents = queue.getNumOfPersistentEntries();
 436       long queueNumOfTransients = queue.getNumOfEntries() - queueNumOfPersistents;
 437       long queueSizeOfPersistents = queue.getNumOfPersistentBytes();
 438       long queueSizeOfTransients = queue.getNumOfBytes() - queueSizeOfPersistents;
 439 
 440       txt += " NumPersistents=" + queueNumOfPersistents + " NumOfTransients=" + queueNumOfTransients; 
 441       txt += " SizeOfPersistents=" + queueSizeOfPersistents + " SizeOfTransients=" + queueSizeOfTransients;
 442 
 443       assertEquals(ME + ": " + txt + " wrong number of persistents   ", numOfPersistents, queueNumOfPersistents);
 444       assertEquals(ME + ": " + txt + " wrong number of transients ", numOfTransients, queueNumOfTransients);
 445       assertEquals(ME + ": " + txt + " wrong size of persistents     ", sizeOfPersistents, queueSizeOfPersistents);
 446       assertEquals(ME + ": " + txt + " wrong size of transients   ", sizeOfTransients, queueSizeOfTransients);
 447    }
 448 
 449 
 450 
 451    /**
 452     * Tests put(MsgQueueEntry[]) and put(MsgQueueEntry) and clear()
 453     */
 454    private void putMsg(I_Queue queue) {
 455       ME = "I_QueueTest.putMsg(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
 456       System.out.println("***" + ME);
 457       try {
 458          //========== Test 1: put(I_QueueEntry[])
 459          int numLoop = 10;
 460          List<I_Entry> list = new ArrayList<I_Entry>();
 461 
 462          this.queue.removeStorageSizeListener(null);
 463          this.queue.addStorageSizeListener(this.queueSizeListener);
 464          this.queueSizeListener.clear();
 465 
 466          for (int ii=0; ii<numLoop; ii++) {
 467             DummyEntry[] queueEntries = {
 468                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 469                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 470                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)};
 471 
 472             queue.put(queueEntries, false);
 473 
 474             assertEquals("number of entries incremented on last invocation", 3, this.queueSizeListener.getLastIncrementEntries());
 475             assertEquals("number of bytes incremented on last invocation", 3*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
 476             for (int i=0; i < 3; i++) list.add(queueEntries[i]);
 477 
 478             this.checkSizeAndEntries(" put(I_QueueEntry[]) ", list, queue);
 479             assertEquals(ME+": Wrong size", (ii+1)*queueEntries.length, queue.getNumOfEntries());
 480          }
 481          assertEquals("number of invocations for queue size listener is wrong", numLoop, this.queueSizeListener.getCount());
 482 
 483          int total = numLoop*3;
 484          assertEquals(ME+": Wrong total size", total, queue.getNumOfEntries());
 485          log.info("#1 Success, filled " + queue.getNumOfEntries() + " messages into queue");
 486 
 487 
 488          //========== Test 2: put(I_QueueEntry)
 489          for (int ii=0; ii<numLoop; ii++) {
 490             DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
 491             list.add(queueEntry);
 492             queue.put(queueEntry, false);
 493          }
 494          assertEquals(ME+": Wrong total size", numLoop+total, queue.getNumOfEntries());
 495          this.checkSizeAndEntries(" put(I_QueueEntry) ", list, queue);
 496          log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
 497 
 498          queue.clear();
 499          assertEquals(ME+": Wrong empty size", 0L, queue.getNumOfEntries());
 500 
 501          System.out.println("***" + ME + " [SUCCESS]");
 502          queue.shutdown();
 503          queue = null;
 504 
 505       }
 506       catch(XmlBlasterException e) {
 507          fail(ME + ": Exception thrown: " + e.getMessage());
 508       }
 509    }
 510 
 511 
 512 // ------------------------------------
 513    public void testPeekMsg() {
 514 
 515       String queueType = "unknown";
 516       try {
 517          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 518          queueType = this.queue.toString();
 519          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/peekMsg");
 520          this.queue.initialize(queueId, prop);
 521          queue.clear();
 522          assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
 523          peekMsg(this.queue);
 524       }
 525       catch (XmlBlasterException ex) {
 526          log.severe("Exception when testing peekMsg probably due to failed initialization of the queue " + queueType);
 527       }
 528 
 529    }
 530 
 531 
 532 // ------------------------------------
 533    public void testPeekMsgBlocking() {
 534 
 535       String queueType = "unknown";
 536       try {
 537          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 538          queueType = this.queue.toString();
 539          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/peekMsg");
 540          this.queue.initialize(queueId, prop);
 541          queue.clear();
 542          assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
 543          
 544          // fill the queue:
 545          DummyEntry[] queueEntries = {
 546                new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 547                new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 548                new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)
 549                               };
 550          queue.put(queueEntries, false);
 551          BlockingQueueWrapper wrapper = new BlockingQueueWrapper(200L);
 552          wrapper.init(queue);
 553          int numOfEntries = 2;
 554          List<I_Entry> ret = wrapper.blockingPeek(numOfEntries, 1000L);
 555          assertEquals("Wrong number of entries found", 2, ret.size());
 556          queue.removeNum(2);
 557          numOfEntries = 2;
 558          ret = wrapper.blockingPeek(numOfEntries, 1000L);
 559          assertEquals("Wrong number of entries found", 1, ret.size());
 560          queue.clear();
 561          ret = wrapper.blockingPeek(numOfEntries, 1000L);
 562          assertEquals("Wrong number of entries found", 0, ret.size());
 563          
 564          // and now making asynchronous putting with events
 565          numOfEntries = 3;
 566          long delay = 500L;
 567          boolean inhibitEvents = false;
 568          QueuePutter putter = new QueuePutter(this.queue, delay, numOfEntries, inhibitEvents);
 569          putter.start();
 570          long t0 = System.currentTimeMillis();
 571          ret = wrapper.blockingPeek(numOfEntries, 10000L);
 572          assertEquals("Wrong number of entries when blocking with events", numOfEntries, ret.size());
 573          long delta = System.currentTimeMillis() - t0;
 574          log.info("The blocking request with events took '" + delta + "' milliseconds");
 575          assertTrue("The method was blocking too long (did probably not wake up correctly", delta < 7000L);
 576          queue.clear();
 577          // and now making asynchronous putting without events (polling should detect it)
 578          numOfEntries = 3;
 579          delay = 500L;
 580          inhibitEvents = true;
 581          putter = new QueuePutter(this.queue, delay, numOfEntries, inhibitEvents);
 582          putter.start();
 583          t0 = System.currentTimeMillis();
 584          ret = wrapper.blockingPeek(numOfEntries, 10000L);
 585          assertEquals("Wrong number of entries when blocking with events", numOfEntries, ret.size());
 586          delta = System.currentTimeMillis() - t0;
 587          log.info("The blocking request without events took '" + delta + "' milliseconds");
 588          assertTrue("The method was blocking too long (did probably not wake up correctly", delta < 7000L);
 589          queue.clear();
 590       }
 591       catch (XmlBlasterException ex) {
 592          log.severe("Exception when testing peekMsg probably due to failed initialization of the queue " + queueType);
 593       }
 594 
 595    }
 596 
 597 
 598    /**
 599     * Tests peek() and peek(int num) and remove()
 600     * For a discussion of the sorting order see Javadoc of this class
 601     */
 602    private void peekMsg(I_Queue queue) {
 603       ME = "I_QueueTest.peekMsg(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
 604       System.out.println("***" + ME);
 605       try {
 606          //========== Test 1: peek()
 607          {
 608             DummyEntry[] queueEntries = {
 609                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 610                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 611                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)
 612                                         };
 613             queue.put(queueEntries, false);
 614             for (int ii=0; ii<10; ii++) {
 615                I_QueueEntry result = queue.peek();
 616                assertTrue("Missing entry", result != null);
 617                assertEquals(ME+": Wrong result", queueEntries[0].getUniqueId(), result.getUniqueId());
 618             }
 619             queue.remove(); // Remove one
 620             for (int ii=0; ii<10; ii++) {
 621                I_QueueEntry result = queue.peek();
 622                assertTrue("Missing entry", result != null);
 623                assertEquals(ME+": Wrong result", queueEntries[1].getUniqueId(), result.getUniqueId());
 624             }
 625             queue.remove(); // Remove one
 626             for (int ii=0; ii<10; ii++) {
 627                I_QueueEntry result = queue.peek();
 628                assertTrue("Missing entry", result != null);
 629                assertEquals(ME+": Wrong result", queueEntries[2].getUniqueId(), result.getUniqueId());
 630             }
 631             queue.remove(); // Remove one
 632             for (int ii=0; ii<10; ii++) {
 633                I_QueueEntry result = queue.peek();
 634                assertTrue("Unexpected entry", result == null);
 635             }
 636             log.info("#1 Success, peek()");
 637          }
 638 
 639 
 640          //========== Test 2: peek(num)
 641          {
 642             DummyEntry[] queueEntries = {
 643                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 644                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 645                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 646                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 647                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 648                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 649                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 650                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 651                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 652                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 653                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 654                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true)
 655                                         };
 656             queue.put(queueEntries, false);
 657 
 658             for (int ii=-1; ii<100; ii++) {
 659                List<I_Entry> results = queue.peek(ii, -1L); // does no remove
 660                assertTrue("Missing entry", results != null);
 661                int expected = ii;
 662                if (ii == -1 || ii >= queueEntries.length)
 663                   expected = queueEntries.length;
 664                assertEquals(ME+": Wrong number of entries returned ii=" + ii, expected, results.size());
 665             }
 666 
 667             queue.clear();
 668             log.info("#2 Success, peek(int)");
 669          }
 670 
 671          //========== Test 3: peekSamePriority(-1)
 672          {
 673             DummyEntry[] queueEntries = {
 674                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 675                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 676                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 677                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 678                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 679                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 680                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 681                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 682                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 683                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 684                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 685                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true)
 686                                         };
 687             queue.put(queueEntries, false);
 688 
 689             int[] prios = { 9, 7, 5 };
 690             for (int j=0; j<prios.length; j++) {
 691                for (int ii=0; ii<10; ii++) {
 692                   List<I_Entry> results = queue.peekSamePriority(-1, -1L); // does no remove
 693                   assertTrue("Expected results", results != null);
 694                   assertEquals(ME+": Wrong number of 9 priorities", 4, results.size());
 695                   for (int k=0; k<results.size(); ++k)
 696                      assertEquals(ME+": Wrong priority returned", prios[j], ((I_QueueEntry)results.get(k)).getPriority());
 697                }
 698                for (int ii=0; ii<4; ii++) {
 699                   int num = queue.remove();
 700                   assertEquals(ME+": Expected remove", 1, num);
 701                }
 702             }
 703 
 704             assertEquals(ME+": Expected empty queue", 0, queue.getNumOfEntries());
 705 
 706             log.info("#3 Success, peekSamePriority()");
 707          }
 708 
 709          //========== Test 4: peekWithPriority(-1,7,9)
 710          {
 711             DummyEntry[] queueEntries = {
 712                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 713                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 714                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 715                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 716                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 717                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 718                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 719                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 720                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 721                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 722                          new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true),
 723                          new DummyEntry(glob, PriorityEnum.MIN_PRIORITY, queue.getStorageId(), true),
 724                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 725                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true)
 726                                         };
 727             queue.put(queueEntries, false);
 728 
 729             for (int ii=0; ii<10; ii++) {
 730                List<I_Entry> results = queue.peekWithPriority(-1, -1L, 7, 9); // does no remove
 731                assertTrue("Expected results", results != null);
 732                assertEquals(ME+": Wrong number of 9 priorities", 8, results.size());
 733                for (int k=0; k<results.size(); ++k) {
 734                   assertEquals(ME+": Wrong priority returned", (k<4)?9L:7L, ((I_QueueEntry)results.get(k)).getPriority());
 735                }
 736             }
 737             queue.clear();
 738             assertEquals(ME+": Expected empty queue", 0, queue.getNumOfEntries());
 739 
 740             log.info("#4 Success, peekWithPriority()");
 741          }
 742 
 743 
 744          //========== Test 5: peek(100, 60)
 745          {
 746             DummyEntry[] queueEntries = {
 747                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 80, true),
 748                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), 80, true),
 749                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), 80, true),
 750                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), 80, true),
 751                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), 80, true),
 752                                         };
 753             queue.put(queueEntries, false);
 754 
 755             try {
 756                List<I_Entry> results = queue.peek(100, 60); // does no remove
 757                assertNotNull(ME+": the result should not be null");
 758                assertEquals(ME+": Expected one entry on peek(100,60)", 1, results.size());
 759             }
 760             catch (XmlBlasterException e) {
 761                e.printStackTrace();
 762                assertTrue("An exception should not occur here " + e.getMessage(), false);
 763             }
 764             
 765             queue.clear();
 766             assertEquals(ME+": Expected empty queue", 0, queue.getNumOfEntries());
 767 
 768             log.info("#5 Success, peek(100, 60)");
 769          }
 770 
 771          System.out.println("***" + ME + " [SUCCESS]");
 772          queue.shutdown();
 773       }
 774       catch(XmlBlasterException e) {
 775          e.printStackTrace();
 776          fail(ME + ": Exception thrown: " + e.getMessage());
 777       }
 778    }
 779 
 780 
 781 //-----------------------------------------
 782    public void testRemoveWithPriority() {
 783       String queueType = "unknown";
 784       try {
 785          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 786          queueType = this.queue.toString();
 787          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/removeWithPriority");
 788          this.queue.initialize(queueId, prop);
 789          queue.clear();
 790          assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
 791          removeWithPriority(this.queue);
 792       }
 793       catch (XmlBlasterException ex) {
 794          log.severe("Exception when testing removeWithpriority probably due to failed initialization of the queue " + queueType);
 795       }
 796    }
 797 
 798 
 799    /**
 800     * Test removeWithPriority(long[])
 801     */
 802    private void removeWithPriority(I_Queue queue) {
 803       ME = "I_QueueTest.removeWithPriority(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
 804       System.out.println("***" + ME);
 805       try {
 806          //========== Test 1: remove prio 7 and 9
 807          {
 808            DummyEntry[] queueEntries = {
 809                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 810                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 811                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 812                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 813                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 814                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 815                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 816                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 817                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 818                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 819                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 820                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 821                          new DummyEntry(glob, PriorityEnum.MIN_PRIORITY, queue.getStorageId(), true)
 822                                         };
 823             this.queue.removeStorageSizeListener(null);
 824             this.queue.addStorageSizeListener(this.queueSizeListener);
 825             this.queueSizeListener.clear();
 826                                         
 827             queue.put(queueEntries, false);
 828 
 829             long numRemoved = queue.removeWithPriority(-1, -1L, 7, 9);
 830 
 831             assertEquals("number of invocations", 2, this.queueSizeListener.getCount());
 832             assertEquals("number of entries incremented on last invocation", -8, this.queueSizeListener.getLastIncrementEntries());
 833             assertEquals("number of bytes incremented on last invocation", -8*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
 834                                         
 835             assertEquals(ME+": Wrong number removed", 8, numRemoved);
 836             assertEquals(ME+": Wrong size", queueEntries.length-8, queue.getNumOfEntries());
 837 
 838             numRemoved = queue.removeWithPriority(-1, -1L, 27, 99);
 839             long sizeInBytes = (queueEntries.length - 8) * queueEntries[0].getSizeInBytes();
 840             assertEquals(ME+": Wrong size in bytes ", sizeInBytes, queue.getNumOfBytes());
 841             assertEquals(ME+": Wrong number removed", 0, numRemoved);
 842             assertEquals(ME+": Wrong number of entries ", queueEntries.length-8, queue.getNumOfEntries());
 843 
 844             queue.clear();
 845 
 846             log.info("#1 Success, fill and remove");
 847          }
 848 
 849          //========== Test 2: remove prio 7 and 9 with num limit
 850          {
 851             DummyEntry[] queueEntries = {
 852                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 853                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 854                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 855                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 856                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 857                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 858                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 859                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 860                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 861                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 862                          new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
 863                          new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
 864                          new DummyEntry(glob, PriorityEnum.MIN_PRIORITY, queue.getStorageId(), true)
 865                                         };
 866             this.queue.removeStorageSizeListener(null);
 867             this.queue.addStorageSizeListener(this.queueSizeListener);
 868             this.queueSizeListener.clear();
 869                                         
 870             queue.put(queueEntries, false);
 871 
 872             long numRemoved = queue.removeWithPriority(2, -1L, 7, 9);
 873 
 874             assertEquals(ME+": Wrong number removed", 2, numRemoved);
 875             assertEquals(ME+": Wrong size", queueEntries.length-2, queue.getNumOfEntries());
 876             assertEquals("number of invocations", 2, this.queueSizeListener.getCount());
 877             assertEquals("number of entries incremented on last invocation", -2, this.queueSizeListener.getLastIncrementEntries());
 878             assertEquals("number of bytes incremented on last invocation", -2*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
 879                                         
 880             log.info("#2 Success, fill and remove");
 881          }
 882          queue.shutdown();
 883       }
 884       catch(XmlBlasterException e) {
 885          fail(ME + ": Exception thrown: " + e.getMessage());
 886       }
 887    }
 888 
 889 //------------------------------------
 890    public void testRemoveRandom() {
 891 
 892       String queueType = "unknown";
 893       try {
 894          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 895          queueType = this.queue.toString();
 896          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/removeRandom");
 897          this.queue.initialize(queueId, prop);
 898          queue.clear();
 899          assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
 900          removeRandom(this.queue);
 901       }
 902       catch (XmlBlasterException ex) {
 903          log.severe("Exception when testing removeRandom probably due to failed initialization of the queue " + queueType);
 904       }
 905 
 906    }
 907 
 908 
 909    /**
 910     * Test removeRandom(long[])
 911     */
 912    private void removeRandom(I_Queue queue) {
 913       ME = "I_QueueTest.removeRandom(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "][" + queue.getClass().getName() + "]";
 914       System.out.println("***" + ME);
 915       try {
 916          //========== Test 1: remove 1 from 1
 917          {
 918             this.queue.removeStorageSizeListener(null);
 919             this.queue.addStorageSizeListener(this.queueSizeListener);
 920             this.queueSizeListener.clear();
 921 
 922             //MsgUnit msgUnit = new MsgUnit("<key/>", "bla".getBytes(), "<qos/>");
 923             DummyEntry[] queueEntries = { new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true) };
 924             queue.put(queueEntries, false);
 925 
 926             I_QueueEntry[] testEntryArr = { queueEntries[0] };
 927             long numRemoved = 0L;
 928             boolean[] tmpArr = queue.removeRandom(testEntryArr);
 929             for (int i=0; i < tmpArr.length; i++) if(tmpArr[i]) numRemoved++;
 930 
 931             assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries());
 932             assertEquals("number of bytes incremented on last invocation", -queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
 933 
 934             assertEquals(ME+": Wrong number removed", 1, numRemoved);
 935             assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());
 936             log.info("#1 Success, fill and random remove");
 937          }
 938 
 939          //========== Test 2: removeRandom 2 from 3
 940          {
 941             DummyEntry[] queueEntries = {
 942                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 943                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 944                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)
 945                                         };
 946             queue.put(queueEntries, false);
 947 
 948             I_QueueEntry[] testEntryArr = { queueEntries[0], 
 949                                             queueEntries[2]
 950                                           };
 951             long numRemoved = 0L;
 952             boolean[] tmpArr = queue.removeRandom(testEntryArr);
 953             for (int i=0; i < tmpArr.length; i++) if(tmpArr[i]) numRemoved++;
 954 
 955             assertEquals("number of entries incremented on last invocation", -2, this.queueSizeListener.getLastIncrementEntries());
 956             assertEquals("number of bytes incremented on last invocation", -2*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
 957 
 958             assertEquals(ME+": Wrong number removed", 2, numRemoved);
 959             assertEquals(ME+": Wrong size", 1, queue.getNumOfEntries());
 960             I_QueueEntry result = queue.peek();
 961             assertEquals(ME+": Wrong timestamp", queueEntries[1].getUniqueId(), result.getUniqueId());
 962             queue.clear();
 963             log.info("#2 Success, fill and random remove");
 964          }
 965 
 966          //========== Test 3: removeRandom 5 from 3
 967          {
 968             DummyEntry[] queueEntries = {
 969                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 970                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 971                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)
 972                                         };
 973             queue.put(queueEntries, false);
 974 
 975             I_QueueEntry[] dataIdArr = {
 976                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 977                          queueEntries[0],
 978                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 979                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 980                          queueEntries[2],
 981                                         };
 982             long numRemoved = 0L;
 983             boolean[] tmpArr = queue.removeRandom(dataIdArr);
 984             for (int i=0; i < tmpArr.length; i++) if(tmpArr[i]) numRemoved++;
 985 
 986             assertEquals("number of entries incremented on last invocation", -2, this.queueSizeListener.getLastIncrementEntries());
 987             assertEquals("number of bytes incremented on last invocation", -2*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
 988 
 989             assertEquals(ME+": Wrong number removed", 2, numRemoved);
 990             assertEquals(ME+": Wrong size", 1, queue.getNumOfEntries());
 991 
 992             I_QueueEntry entry = queue.peek();
 993             assertTrue("Missing entry", (I_QueueEntry)null != entry);
 994             assertEquals(ME+": Wrong entry removed", queueEntries[1].getUniqueId(), entry.getUniqueId());
 995 
 996             queue.clear();
 997             log.info("#3 Success, fill and random remove");
 998          }
 999 
1000          //========== Test 4: removeRandom 0 from 0
1001          {
1002             DummyEntry[] queueEntries = new DummyEntry[0];
1003             queue.put(queueEntries, false);
1004 
1005             I_QueueEntry[] dataIdArr = new I_QueueEntry[0];
1006 
1007             long numRemoved = 0L;
1008             boolean[] tmpArr = queue.removeRandom(dataIdArr);
1009             for (int i=0; i < tmpArr.length; i++) if(tmpArr[i]) numRemoved++;
1010 
1011             assertEquals(ME+": Wrong number removed", 0, numRemoved);
1012             assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());
1013             queue.clear();
1014             log.info("#4 Success, fill and random remove");
1015          }
1016 
1017          //========== Test 5: removeRandom null from null
1018          {
1019             queue.put((DummyEntry[])null, false);
1020 
1021 //            long numRemoved = queue.removeRandom((I_QueueEntry[])null);
1022             long numRemoved = 0L;
1023             boolean[] tmpArr = queue.removeRandom((I_QueueEntry[])null);
1024             for (int i=0; i < tmpArr.length; i++) if(tmpArr[i]) numRemoved++;
1025 
1026             assertEquals(ME+": Wrong number removed", 0, numRemoved);
1027             assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());
1028             queue.clear();
1029             log.info("#5 Success, fill and random remove");
1030          }
1031 
1032          queue.shutdown();
1033          System.out.println("***" + ME + " [SUCCESS]");
1034       }
1035       catch(XmlBlasterException e) {
1036          fail(ME + ": Exception thrown: " + e.getMessage());
1037       }
1038    }
1039 
1040 
1041 
1042 //------------------------------------
1043    public void testTakeLowest() {
1044       String queueType = "unknown";
1045       try {
1046          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1047          queueType = this.queue.toString();
1048          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/takeLowest");
1049          this.queue.initialize(queueId, prop);
1050          queue.clear();
1051          assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
1052          takeLowest(this.queue);
1053       }
1054       catch (XmlBlasterException ex) {
1055          log.severe("Exception when testing removeRandomLong probably due to failed initialization of the queue " + queueType);
1056       }
1057 
1058    }
1059 
1060 
1061 
1062    /**
1063     * returns the number of entries left in the queue after this processing operation 
1064     * @param queue the queue to use for this test
1065     * @param numEntries the number of entries to pass to the takeLowest operation
1066     * @param numBytes the num of bytes to pass to the takeLowest operation
1067     * @param leaveOne the flag to pass to the takeLowest operation
1068     * @param origEntries the array of the original entries put into the queue
1069     * @param entriesLeft number of entries left in the queue before this operation
1070     * @param currentEntries the number of entries which should have been processed by this operation
1071     */
1072    private final int assertCheckForTakeLowest(I_Queue queue, int numEntries, long numBytes, 
1073       I_QueueEntry refEntry, boolean leaveOne, I_QueueEntry[] origEntries, int entriesLeft, 
1074       int currentEntries, long size) throws XmlBlasterException {
1075       String me = ME + "/" + numEntries + "/" + numBytes + "/" + leaveOne + "/" + entriesLeft + "/" + currentEntries;
1076       if (log.isLoggable(Level.FINE)) log.fine("");
1077       assertEquals(me+": Wrong size of entry ", size, origEntries[0].getSizeInBytes());
1078       assertEquals(me+": Wrong amount of entries in queue before takeLowest invocation ", entriesLeft, queue.getNumOfEntries());
1079       assertEquals(me+": Wrong size of entries in queue before takeLowest invocation ", size*entriesLeft, queue.getNumOfBytes());
1080       assertEquals(me+": Wrong amount of persistent entries in queue before takeLowest invocation ", entriesLeft, queue.getNumOfPersistentEntries());
1081       assertEquals(me+": Wrong size of persistent entries in queue before takeLowest invocation ", size*entriesLeft, queue.getNumOfPersistentBytes());
1082 
1083       List<I_Entry> list = null;
1084       try {
1085          list = queue.peekLowest(numEntries, numBytes, refEntry, leaveOne);  // gives back all minus one
1086          assertEquals(me+": Wrong number of entries in peekLowest return ", currentEntries, list.size());
1087          assertEquals(me+": Wrong number of entries in queue after peekLowest invocation ", entriesLeft, queue.getNumOfEntries());
1088          assertEquals(me+": Wrong number of bytes in queue after peekLowest invocation ", size*(entriesLeft), queue.getNumOfBytes());
1089          assertEquals(me+": Wrong number of persistent bytes in queue after takeLowest invocation ", size*(entriesLeft), queue.getNumOfPersistentBytes());
1090       }
1091       catch (XmlBlasterException e) {
1092          if (e.getErrorCode()!=ErrorCode.INTERNAL_NOTIMPLEMENTED) throw e;
1093       }
1094 
1095       list = queue.takeLowest(numEntries, numBytes, refEntry, leaveOne);  // gives back all minus one
1096 
1097       assertEquals("number of entries incremented on last invocation", -currentEntries, this.queueSizeListener.getLastIncrementEntries());
1098       assertEquals("number of bytes incremented on last invocation", -currentEntries*origEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
1099 
1100       assertEquals(me+": Wrong number of entries in takeLowest return ", currentEntries, list.size());
1101       assertEquals(me+": Wrong number of entries in queue after takeLowest invocation ", entriesLeft-currentEntries, queue.getNumOfEntries());
1102       assertEquals(me+": Wrong number of bytes in queue after takeLowest invocation ", size*(entriesLeft-currentEntries), queue.getNumOfBytes());
1103       assertEquals(me+": Wrong number of persistent bytes in queue after takeLowest invocation ", size*(entriesLeft-currentEntries), queue.getNumOfPersistentBytes());
1104 
1105       for (int i=entriesLeft-currentEntries; i < entriesLeft; i++) {
1106          int j = entriesLeft - 1 - i;
1107          long ref = ((I_QueueEntry)list.get(j)).getUniqueId();
1108          assertEquals(me+": Wrong sequence in takeLowest", origEntries[i].getUniqueId(), ref);
1109       }
1110       return entriesLeft - currentEntries;
1111    }
1112 
1113 
1114    /**
1115     * Test takeLowest(I_Queue)
1116     */
1117    private void takeLowest(I_Queue queue) {
1118 
1119       if (queue instanceof CacheQueueInterceptorPlugin) return;
1120 
1121       ME = "I_QueueTest.takeLowest(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1122       System.out.println("***" + ME);
1123       try {
1124          //========== Test 1: takeLowest without restrictions
1125          {
1126             log.fine("takeLowest test 1");
1127             int imax = 50;
1128             long size = 0L;
1129             long msgSize = 100L; // every msg is 100 bytes long
1130             int entriesLeft = imax;
1131 
1132 
1133             this.queue.removeStorageSizeListener(null);
1134             this.queue.addStorageSizeListener(this.queueSizeListener);
1135             this.queueSizeListener.clear();
1136 
1137             DummyEntry[] entries = new DummyEntry[imax];
1138             for (int i=0; i < imax; i++) {
1139                entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), msgSize, true);
1140                size += entries[i].getSizeInBytes();
1141                queue.put(entries[i], false);
1142             }
1143 
1144             assertEquals(ME+": Wrong number put", imax, queue.getNumOfEntries());
1145             assertEquals(ME+": Wrong expected size in bytes of entries", msgSize*imax, size);
1146             assertEquals(ME+": Wrong size in bytes put", size, queue.getNumOfBytes());
1147 
1148             entriesLeft = assertCheckForTakeLowest(queue,  0, -1L, null, true, entries, entriesLeft, 0, msgSize);
1149             entriesLeft = assertCheckForTakeLowest(queue,  1, -1L, null, true, entries, entriesLeft, 1, msgSize);
1150             entriesLeft = assertCheckForTakeLowest(queue,  2, -1L, null, true, entries, entriesLeft, 2, msgSize);
1151             entriesLeft = assertCheckForTakeLowest(queue, -1, 0L, null, true, entries, entriesLeft, 0, msgSize);
1152             entriesLeft = assertCheckForTakeLowest(queue,  0, 0L, null, true, entries, entriesLeft, 0, msgSize);
1153             entriesLeft = assertCheckForTakeLowest(queue,  1, 0L, null, true, entries, entriesLeft, 1, msgSize);
1154             entriesLeft = assertCheckForTakeLowest(queue,  2, 0L, null, true, entries, entriesLeft, 2, msgSize);
1155             entriesLeft = assertCheckForTakeLowest(queue, -1, 50L, null, true, entries, entriesLeft, 1, msgSize);
1156             entriesLeft = assertCheckForTakeLowest(queue,  0, 50L, null, true, entries, entriesLeft, 1, msgSize);
1157             entriesLeft = assertCheckForTakeLowest(queue,  1, 50L, null, true, entries, entriesLeft, 1, msgSize);
1158             entriesLeft = assertCheckForTakeLowest(queue,  2, 50L, null, true, entries, entriesLeft, 2, msgSize);
1159             entriesLeft = assertCheckForTakeLowest(queue, -1, 100L, null, true, entries, entriesLeft, 1, msgSize);
1160             entriesLeft = assertCheckForTakeLowest(queue,  0, 100L, null, true, entries, entriesLeft, 1, msgSize);
1161             entriesLeft = assertCheckForTakeLowest(queue,  1, 100L, null, true, entries, entriesLeft, 1, msgSize);
1162             entriesLeft = assertCheckForTakeLowest(queue,  2, 100L, null, true, entries, entriesLeft, 2, msgSize);
1163             entriesLeft = assertCheckForTakeLowest(queue, -1, 150L, null, true, entries, entriesLeft, 2, msgSize);
1164             entriesLeft = assertCheckForTakeLowest(queue,  0, 150L, null, true, entries, entriesLeft, 2, msgSize);
1165             entriesLeft = assertCheckForTakeLowest(queue,  1, 150L, null, true, entries, entriesLeft, 2, msgSize);
1166             entriesLeft = assertCheckForTakeLowest(queue,  2, 150L, null, true, entries, entriesLeft, 2, msgSize);
1167             entriesLeft = assertCheckForTakeLowest(queue, -1, 200L, null, true, entries, entriesLeft, 2, msgSize);
1168             entriesLeft = assertCheckForTakeLowest(queue,  0, 200L, null, true, entries, entriesLeft, 2, msgSize);
1169             entriesLeft = assertCheckForTakeLowest(queue,  1, 200L, null, true, entries, entriesLeft, 2, msgSize);
1170             entriesLeft = assertCheckForTakeLowest(queue,  2, 200L, null, true, entries, entriesLeft, 2, msgSize);
1171             entriesLeft = assertCheckForTakeLowest(queue, -1, -1L, null, true, entries, entriesLeft, entriesLeft-1, msgSize);
1172             entriesLeft = assertCheckForTakeLowest(queue, -1, -1L, null, false, entries, entriesLeft, 1, msgSize);
1173             assertEquals(ME+": Wrong size in takeLowest after cleaning ", queue.getNumOfEntries(), 0);
1174             queue.clear();
1175          }
1176 
1177          //========== Test 2: takeLowest which should return an empty array
1178          {
1179             log.fine("takeLowest test 2");
1180             int imax = 20;
1181             long size = 0L;
1182 
1183             DummyEntry[] entries = new DummyEntry[imax];
1184             for (int i=0; i < imax; i++) {
1185                entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1186                size += entries[i].getSizeInBytes();
1187                queue.put(entries[i], false);
1188             }
1189 
1190             DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1191 
1192             assertEquals(ME+": Wrong number put", imax, queue.getNumOfEntries());
1193             assertEquals(ME+": Wrong size in bytes put", size, queue.getNumOfBytes());
1194 
1195             // should return an empty array since the timestamp is  the last
1196             List<I_Entry> list = queue.takeLowest(-1, -1, queueEntry, true);
1197 
1198             assertEquals(ME+": Wrong size in takeLowest return ", 0, list.size());
1199             queue.clear();
1200             assertEquals(ME+": Wrong size in takeLowest after cleaning ", 0, queue.getNumOfEntries());
1201          }
1202 
1203 
1204          //========== Test 3: takeLowest should return 13 entries
1205          {
1206             log.fine("takeLowest test 3");
1207             int imax = 20;
1208             long size = 0L;
1209 
1210             DummyEntry[] entries = new DummyEntry[imax];
1211             for (int i=0; i < imax; i++) {
1212                entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1213                size += entries[i].getSizeInBytes();
1214                queue.put(entries[i], false);
1215             }
1216             DummyEntry queueEntry = entries[6];
1217 
1218             assertEquals(ME+": Wrong number put", imax, queue.getNumOfEntries());
1219             assertEquals(ME+": Wrong size in bytes put", size, queue.getNumOfBytes());
1220 
1221             // should return an empty array since the timestamp is  the last
1222             List<I_Entry> list = queue.takeLowest(-1, -1, queueEntry, true);
1223 
1224             assertEquals(ME+": Wrong size in takeLowest return ", list.size(), imax-6-1);
1225             queue.clear();
1226             assertEquals(ME+": Wrong size in takeLowest after cleaning ", queue.getNumOfEntries(), 0);
1227          }
1228 
1229 
1230          //========== Test 4: takeLowest without restrictions
1231          {
1232             log.fine("takeLowest test 4 (with entry null)");
1233             int imax = 20;
1234             long size = 0L;
1235 
1236             DummyEntry[] entries = new DummyEntry[imax];
1237             for (int i=0; i < imax; i++) {
1238                entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1239                size += entries[i].getSizeInBytes();
1240                queue.put(entries[i], false);
1241             }
1242 
1243             assertEquals(ME+": Wrong number put", imax, queue.getNumOfEntries());
1244             assertEquals(ME+": Wrong size in bytes put", size, queue.getNumOfBytes());
1245 
1246             List<I_Entry> list = queue.takeLowest(-1, -1, null, true);
1247 
1248             assertEquals(ME+": Wrong size in takeLowest return ", list.size(), entries.length-1);
1249             for (int i=1; i < imax; i++) {
1250                int j = imax - 1 - i;
1251                long ref = ((I_QueueEntry)list.get(j)).getUniqueId();
1252                assertEquals(ME+": Wrong unique ID", entries[i].getUniqueId(), ref);
1253             }
1254             queue.clear();
1255             assertEquals(ME+": Wrong size in takeLowest after cleaning ", queue.getNumOfEntries(), 0);
1256          }
1257 
1258 
1259       }
1260       catch(XmlBlasterException e) {
1261          fail(ME + ": Exception thrown: " + e.getMessage());
1262       }
1263    }
1264 
1265 
1266    public void testWrongOrder() {
1267       String queueType = "unknown";
1268       try {
1269          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1270          queueType = this.queue.toString();
1271          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/takeLowest");
1272          this.queue.initialize(queueId, prop);
1273          queue.clear();
1274          assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
1275          wrongOrder(this.queue);
1276       }
1277       catch (XmlBlasterException ex) {
1278          log.severe("Exception when testing removeRandomLong probably due to failed initialization of the queue " + queueType);
1279       }
1280 
1281    }
1282 
1283    /**
1284     * Test wrongOrder(I_Queue)
1285     */
1286    private void wrongOrder(I_Queue queue) {
1287       ME = "I_QueueTest.wrongOrder(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1288       System.out.println("***" + ME);
1289       try {
1290          //========== Test 1: checks if entries are returned in the correct
1291          // order even if they are inserted in the wrong order
1292          {
1293             log.fine("wrongOrder test 1");
1294             int imax = 5;
1295             long size = 0L;
1296 
1297             DummyEntry[] entries = new DummyEntry[imax];
1298             for (int i=0; i < imax; i++) {
1299                entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1300                size += entries[i].getSizeInBytes();
1301             }
1302 
1303             DummyEntry[] putEntries = new DummyEntry[imax];
1304             putEntries[0] = entries[3];
1305             putEntries[1] = entries[4];
1306             putEntries[2] = entries[2];
1307             putEntries[3] = entries[0];
1308             putEntries[4] = entries[1];
1309 
1310             queue.put(putEntries, false);
1311 
1312             assertEquals(ME+": Wrong number put", imax, queue.getNumOfEntries());
1313             assertEquals(ME+": Wrong size in bytes put", size, queue.getNumOfBytes());
1314 
1315             List<I_Entry> listPeekSamePrio = queue.peekSamePriority(-1, -1L);
1316             List<I_Entry> listPeekWithPrio = queue.peekWithPriority(-1, -1L, 0, 10);
1317             List<I_Entry> listPeek = queue.peek(-1, -1L);
1318 
1319             //they all should give the same result ...
1320             for (int i=0; i<imax; i++) {
1321                long id = entries[i].getUniqueId();
1322                long idPeekSamePrio = ((I_QueueEntry)listPeekSamePrio.get(i)).getUniqueId();
1323                long idPeekWithPrio = ((I_QueueEntry)listPeekWithPrio.get(i)).getUniqueId();
1324                long idPeek = ((I_QueueEntry)listPeek.get(i)).getUniqueId();
1325                assertEquals(ME+": Wrong entry for peekSamePrio ", id, idPeekSamePrio);
1326                assertEquals(ME+": Wrong entry for peekWithPrio ", id, idPeekWithPrio);
1327                assertEquals(ME+": Wrong entry for peek ", id, idPeek);
1328             }
1329             queue.clear();
1330             assertEquals(ME+": Wrong size in takeLowest after cleaning ", queue.getNumOfEntries(), 0);
1331          }
1332 
1333       }
1334       catch(XmlBlasterException e) {
1335          fail(ME + ": Exception thrown: " + e.getMessage());
1336       }
1337    }
1338 
1339 
1340    public void testPutEntriesTwice() {
1341       String queueType = "unknown";
1342       try {
1343          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1344          queueType = this.queue.toString();
1345          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/putEntriesTwice");
1346          this.queue.initialize(queueId, prop);
1347          queue.clear();
1348          assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1349          putEntriesTwice(this.queue);
1350       }
1351       catch (XmlBlasterException ex) {
1352          log.severe("Exception when testing putEntriesTwice probably due to failed initialization of the queue " + queueType);
1353       }
1354    }
1355 
1356 
1357    /**
1358     * Test wrongOrder(I_Queue)
1359     */
1360    private void putEntriesTwice(I_Queue queue) {
1361       ME = "I_QueueTest.putEntriesTwice(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1362       System.out.println("***" + ME);
1363       try {
1364          //========== Test 1: checks if entries are returned in the correct
1365          // order even if they are inserted in the wrong order
1366          {
1367             log.fine("putEntriesTwice test 1");
1368             int imax = 5;
1369             long size = 0L;
1370 
1371             DummyEntry[] entries = new DummyEntry[imax];
1372             for (int i=0; i < imax; i++) {
1373                entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1374                size += entries[i].getSizeInBytes();
1375             }
1376 
1377             queue.put(entries, false);
1378             queue.put(entries, false);
1379 
1380             assertEquals(ME+": Wrong number of entries after putting same entries twice ", imax, queue.getNumOfEntries());
1381             queue.removeRandom(entries);
1382 
1383             assertEquals(ME+": Wrong size in takeLowest after cleaning ", queue.getNumOfEntries(), 0);
1384          }
1385       }
1386       catch(XmlBlasterException e) {
1387          fail(ME + ": Exception thrown: " + e.getMessage());
1388       }
1389    }
1390 
1391 
1392 
1393    public void testPeekWithLimitEntry() {
1394       String queueType = "unknown";
1395       try {
1396          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1397          queueType = this.queue.toString();
1398          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/peekWithLimitEntry");
1399          this.queue.initialize(queueId, prop);
1400          queue.clear();
1401          assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1402          peekWithLimitEntry(this.queue);
1403       }
1404       catch (XmlBlasterException ex) {
1405          log.severe("Exception when testing peekWithLimitEntry probably due to failed initialization of the queue " + queueType);
1406       }
1407    }
1408 
1409 
1410    /**
1411     * Test testPeekWithLimitEntry(I_Queue)
1412     */
1413    private void peekWithLimitEntry(I_Queue queue) {
1414       ME = "I_QueueTest.peekWithLimitEntry(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1415       System.out.println("***" + ME);
1416       try {
1417          //========== Test 1: normal case where limitEntry is contained in the queue
1418          {
1419             log.fine("peekWithLimitEntry test 1");
1420             int imax = 5;
1421 
1422             DummyEntry[] entries = new DummyEntry[imax];
1423             entries[0] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1424             entries[3] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1425             entries[1] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1426             entries[4] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1427             entries[2] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1428 
1429             queue.put(entries, false);
1430             assertEquals(ME+": Wrong number of entries after putting same entries ", imax, queue.getNumOfEntries());
1431 
1432             List<I_Entry> list = queue.peekWithLimitEntry(entries[3]);
1433             assertEquals(ME+": Wrong number of peeked entries (with limit) ", 3, list.size());
1434             for (int i=0; i < list.size(); i++) {
1435                assertEquals(ME + ": Wrong order in peeked entries (with limit): ", entries[i].getUniqueId(), ((I_QueueEntry)list.get(i)).getUniqueId());
1436             }
1437 
1438             queue.removeRandom(entries);
1439             assertEquals(ME+": Wrong size in peekWithLimitEntry after cleaning ", queue.getNumOfEntries(), 0);
1440          }
1441 
1442          //========== Test 2: normal case where limitEntry is NOT contained in the queue (should not return anything)
1443          {
1444             log.fine("peekWithLimitEntry test 2");
1445             int imax = 5;
1446 
1447             DummyEntry[] entries = new DummyEntry[imax];
1448             entries[0] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1449             entries[3] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1450             entries[1] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1451             entries[4] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1452             entries[2] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1453 
1454             DummyEntry limitEntry = new DummyEntry(glob, PriorityEnum.HIGH8_PRIORITY, queue.getStorageId(), true);
1455 
1456             queue.put(entries, false);
1457             assertEquals(ME+": Wrong number of entries after putting same entries ", imax, queue.getNumOfEntries());
1458 
1459             List<I_Entry> list = queue.peekWithLimitEntry(limitEntry);
1460             assertEquals(ME+": Wrong number of peeked entries (with limit) ", 0, list.size());
1461             queue.removeRandom(entries);
1462             assertEquals(ME+": Wrong size in peekWithLimitEntry after cleaning ", queue.getNumOfEntries(), 0);
1463          }
1464 
1465          //========== Test 3: normal case where limitEntry is NOT contained in the queue
1466          {
1467             log.fine("peekWithLimitEntry test 3");
1468             int imax = 5;
1469 
1470             DummyEntry[] entries = new DummyEntry[imax];
1471             entries[0] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1472             entries[3] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1473             entries[1] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1474             entries[4] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1475             entries[2] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1476 
1477             DummyEntry limitEntry = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1478 
1479             queue.put(entries, false);
1480             assertEquals(ME+": Wrong number of entries after putting same entries ", imax, queue.getNumOfEntries());
1481 
1482             List<I_Entry> list = queue.peekWithLimitEntry(limitEntry);
1483             assertEquals(ME+": Wrong number of peeked entries (with limit) ", imax, list.size());
1484             for (int i=0; i < list.size(); i++) {
1485                assertEquals(ME + ": Wrong order in peeked entries (with limit): ", entries[i].getUniqueId(), ((I_QueueEntry)list.get(i)).getUniqueId());
1486             }
1487 
1488             queue.removeRandom(entries);
1489             assertEquals(ME+": Wrong size in peekWithLimitEntry after cleaning ", queue.getNumOfEntries(), 0);
1490          }
1491 
1492          //========== Test 4: normal case where limitEntry is NOT contained in the queue
1493          {
1494             log.fine("peekWithLimitEntry test 4");
1495             int imax = 5;
1496 
1497             DummyEntry[] entries = new DummyEntry[imax];
1498             entries[0] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1499             entries[3] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1500             entries[1] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1501             entries[4] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1502             entries[2] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1503 
1504             queue.put(entries, false);
1505             assertEquals(ME+": Wrong number of entries after putting same entries ", imax, queue.getNumOfEntries());
1506 
1507             List<I_Entry> list = queue.peekWithLimitEntry(null);
1508             assertEquals(ME+": Wrong number of peeked entries (with limit) ", 0, list.size());
1509 
1510             queue.removeRandom(entries);
1511             assertEquals(ME+": Wrong size in peekWithLimitEntry after cleaning ", queue.getNumOfEntries(), 0);
1512          }
1513       }
1514       catch(XmlBlasterException e) {
1515          fail(ME + ": Exception thrown: " + e.getMessage());
1516       }
1517    }
1518 
1519 
1520 
1521    public void testSizesCheck() {
1522       String queueType = "unknown";
1523       try {
1524          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1525          queueType = this.queue.toString();
1526          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/testSizes");
1527          this.queue.initialize(queueId, prop);
1528          queue.clear();
1529          assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1530          sizesCheck(this.queue);
1531       }
1532       catch (XmlBlasterException ex) {
1533          log.severe("Exception when testing sizesCheck probably due to failed initialization of the queue " + queueType);
1534       }
1535    }
1536 
1537 
1538    /**
1539     * Test sizesCheck(I_Queue)
1540     */
1541    private void sizesCheck(I_Queue queue) {
1542       ME = "I_QueueTest.sizesCheck(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1543       System.out.println("***" + ME);
1544       try {
1545          //========== Test 1: normal case where limitEntry is contained in the queue
1546          {
1547             log.fine("sizesCheck test 1");
1548             int imax = 20;
1549 
1550             DummyEntry[] entries = new DummyEntry[imax];
1551             List<I_Entry> list = new ArrayList<I_Entry>();
1552 
1553             for (int i=0; i < imax; i++) {
1554                entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1555                list.add(entries[i]);
1556             }
1557 
1558             queue.put(entries, false);
1559             this.checkSizeAndEntries("sizesCheck test 1: ", list, queue);
1560 
1561             if (queue instanceof CacheQueueInterceptorPlugin) return;
1562             log.info("size of list before: " + list.size());
1563             queue.takeLowest(2, 100L, null, true);
1564             list.remove(list.size()-1);
1565             list.remove(list.size()-1);
1566             log.info("size of list after: " + list.size());
1567 
1568             this.checkSizeAndEntries("sizesCheck test 1 (after takeLowest): ", list, queue);
1569 
1570             queue.removeRandom(entries);
1571             list.removeAll(list);
1572             this.checkSizeAndEntries("sizesCheck test 1 (after removing): ", list, queue);
1573 
1574 
1575          }
1576       }
1577       catch(XmlBlasterException e) {
1578          fail(ME + ": Exception thrown: " + e.getMessage());
1579       }
1580    }
1581 
1582    public void testBigEntries() {
1583       String queueType = "unknown";
1584       try {
1585          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1586          queueType = this.queue.toString();
1587          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/testSizes");
1588          this.queue.initialize(queueId, prop);
1589          queue.clear();
1590          assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1591          bigEntries(this.queue);
1592       }
1593       catch (XmlBlasterException ex) {
1594          log.severe("Exception when testing sizesCheck probably due to failed initialization of the queue " + queueType);
1595       }
1596    }
1597 
1598    /*
1599    public void testPublishMsgBigEntry() {
1600       String queueType = "unknown";
1601       try {
1602          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1603          queueType = this.queue.toString();
1604          StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/testSizes");
1605          this.queue.initialize(queueId, prop);
1606          queue.clear();
1607          assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1608          publishMsgBigEntry(this.queue);
1609       }
1610       catch (XmlBlasterException ex) {
1611          log.severe("Exception when testing sizesCheck probably due to failed initialization of the queue " + queueType);
1612       }
1613    }
1614    */
1615    
1616    /**
1617     * Test bigEngtries(I_Queue)
1618     * It tests the insertion and removal of entries which contain a large blob (2.1MB)
1619     */
1620    private void publishMsgBigEntry(I_Queue queue) {
1621       ME = "I_QueueTest.publishMsgBigEntry(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1622       System.out.println("***" + ME);
1623       try {
1624          {
1625             log.fine("start test");
1626             int msgSize = 1000000;
1627             
1628             StorageId storageId = new StorageId(glob, "mystore", "test");
1629             byte[] content = new byte[msgSize];
1630             MsgUnit msgUnit = new MsgUnit(this.glob, "<key oid='aaa'/>", content, "<qos/>");
1631             MsgQueuePublishEntry pubEntry = new MsgQueuePublishEntry(this.glob, msgUnit, storageId);
1632             queue.put(pubEntry, false);
1633             I_Entry entry = queue.peek();
1634             queue.removeRandom(entry);
1635          }
1636       }
1637       catch(XmlBlasterException e) {
1638          fail(ME + ": Exception thrown: " + e.getMessage());
1639       }
1640    }
1641 
1642    /**
1643     * Test bigEngtries(I_Queue)
1644     * It tests the insertion and removal of entries which contain a large blob (2.1MB)
1645     */
1646    private void bigEntries(I_Queue queue) {
1647       ME = "I_QueueTest.bigEntries(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1648       System.out.println("***" + ME);
1649       try {
1650          {
1651             log.fine("start test");
1652             int imax = 3; 
1653             long msgSize = 202010L;
1654             DummyEntry[] entries = new DummyEntry[imax];
1655             List<I_Entry> list = new ArrayList<I_Entry>();
1656 
1657             for (int i=0; i < imax; i++) {
1658                entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), msgSize, true);
1659                list.add(entries[i]);
1660             }
1661 
1662             queue.put(entries, false);
1663             this.checkSizeAndEntries("sizesCheck test 1: ", list, queue);
1664             List<I_Entry> entriesArray = queue.peek(imax, -1L);
1665             assertEquals("wrong number of big entries retrieved", imax, entriesArray.size());
1666             queue.removeRandom(entries);
1667             list.removeAll(list);
1668             this.checkSizeAndEntries("sizesCheck test 1 (after removing): ", list, queue);
1669 
1670 
1671          }
1672       }
1673       catch(XmlBlasterException e) {
1674          fail(ME + ": Exception thrown: " + e.getMessage());
1675       }
1676    }
1677 
1678 // ---------------------------------------------------------------------
1679 
1680    public void testOverflow() {
1681       String queueType = "unknown";
1682       try {
1683          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1684          prop.setMaxEntries(1L);
1685          prop.setMaxEntriesCache(1L);
1686          
1687          queueType = this.queue.toString();
1688          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/testOverflow");
1689          this.queue.initialize(queueId, prop);
1690          queue.clear();
1691          assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1692          overflow(this.queue);
1693       }
1694       catch (XmlBlasterException ex) {
1695          log.severe("Exception when testing overflowCheck probably due to failed initialization of the queue " + queueType);
1696       }
1697    }
1698 
1699 
1700    /**
1701     * Test overflow(I_Queue)
1702     * It tests if the overflow mechanism works OK
1703     */
1704    private void overflow(I_Queue queue) {
1705       ME = "I_QueueTest.overflow(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1706       System.out.println("***" + ME);
1707       try {
1708          log.fine("start test");
1709          int imax = 4; 
1710          long msgSize = 100L;
1711          boolean isPersistent = true;
1712          DummyEntry[] entries = new DummyEntry[imax];
1713 
1714          for (int i=0; i < imax; i++) {
1715             entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), msgSize, isPersistent);
1716          }
1717 
1718          queue.put(entries[0], false); // <-- OK
1719          queue.put(entries[1], false); // <-- OK
1720          try {
1721             queue.put(entries[2], false); // <-- overflow
1722             assertTrue("here we expect an overflow exception", false);
1723          }
1724          catch (XmlBlasterException ex) {
1725             log.info("overflow: an exception here is OK since it was expected due to overflow of the queue");
1726          }
1727          try {
1728             queue.put(entries[3], false); // <-- overflow
1729             assertTrue("here we expect an overflow exception", false);
1730          }
1731          catch (XmlBlasterException ex) {
1732             log.info("overflow: an exception here is OK since it was expected due to overflow of the queue");
1733          }
1734    
1735          List<I_Entry> ret = queue.peek(4, -1L);
1736          assertEquals("the number of entries in the queue", 2, ret.size());
1737          for (int i=0; i < 2; i++) {
1738             assertEquals(ME + ".overflow: entry '" + i + "' in sequence is wrong", entries[i].getUniqueId(), ((I_QueueEntry)ret.get(i)).getUniqueId());
1739          }
1740       }
1741       catch(XmlBlasterException e) {
1742          fail(ME + ": Exception thrown: " + e.getMessage());
1743       }
1744    }
1745 
1746 // ---------------------------------------------------------------------
1747 
1748    public void tearDown() {
1749       try {
1750          this.queue.clear();
1751          this.queue.shutdown();
1752       }
1753       catch (Exception ex) {
1754          log.severe("error when tearing down " + ex.getMessage());
1755       }
1756    }
1757 
1758 
1759    /**
1760     * Method is used by TestRunner to load these tests
1761     */
1762    public static Test suite()
1763    {
1764       TestSuite suite= new TestSuite();
1765       Global glob = new Global();
1766       for (int i=0; i<PLUGIN_TYPES.length; i++) {
1767          suite.addTest(new I_QueueTest("testConfig", i, glob));
1768          suite.addTest(new I_QueueTest("testSize1", i, glob));
1769          suite.addTest(new I_QueueTest("testPutMsg", i, glob));
1770          suite.addTest(new I_QueueTest("testPeekMsg", i, glob));
1771          suite.addTest(new I_QueueTest("testRemoveRandom", i, glob));
1772          suite.addTest(new I_QueueTest("testRemoveWithPriority", i, glob));
1773          suite.addTest(new I_QueueTest("testTakeLowest", i, glob));
1774          suite.addTest(new I_QueueTest("testPutEntriesTwice", i, glob));
1775          suite.addTest(new I_QueueTest("testPeekWithLimitEntry", i, glob));
1776          suite.addTest(new I_QueueTest("testSizesCheck", i, glob));
1777          suite.addTest(new I_QueueTest("testBigEntries", i, glob));
1778          suite.addTest(new I_QueueTest("testOverflow", i, glob));
1779       }
1780       return suite;
1781    }
1782 
1783    /**
1784     * <pre>
1785     *  java org.xmlBlaster.test.classtest.queue.I_QueueTest
1786     * </pre>
1787     */
1788    public static void main(String args[]) {
1789 
1790       Global glob = new Global(args);
1791 
1792       for (int i=0; i < PLUGIN_TYPES.length; i++) {
1793          I_QueueTest testSub = new I_QueueTest("I_QueueTest", i, glob);
1794 
1795          long startTime = System.currentTimeMillis();
1796 
1797          testSub.setUp();
1798          testSub.testSizesCheck();
1799          testSub.tearDown();
1800 
1801          /*
1802          testSub.setUp();
1803          testSub.testPublishMsgBigEntry();
1804          testSub.tearDown();
1805          */
1806          
1807          testSub.setUp();
1808          testSub.testPeekMsgBlocking();
1809          testSub.tearDown();
1810 
1811          testSub.setUp();
1812          testSub.testConfig();
1813          testSub.tearDown();
1814 
1815          testSub.setUp();
1816          testSub.testSize1();
1817          testSub.tearDown();
1818 
1819          testSub.setUp();
1820          testSub.testPutMsg();
1821          testSub.tearDown();
1822 
1823          testSub.setUp();
1824          testSub.testPeekMsg();
1825          testSub.tearDown();
1826 
1827          testSub.setUp();
1828          testSub.testRemoveRandom();
1829          testSub.tearDown();
1830 
1831          testSub.setUp();
1832          testSub.testRemoveWithPriority();
1833          testSub.tearDown();
1834 
1835          testSub.setUp();
1836          testSub.testTakeLowest();
1837          testSub.tearDown();
1838 
1839          testSub.setUp();
1840          testSub.testPutEntriesTwice();
1841          testSub.tearDown();
1842 
1843          testSub.setUp();
1844          testSub.testPeekWithLimitEntry();
1845          testSub.tearDown();
1846 
1847          testSub.setUp();
1848          testSub.testBigEntries();
1849          testSub.tearDown();
1850 
1851          testSub.setUp();
1852          testSub.testOverflow();
1853          testSub.tearDown();
1854 
1855          long usedTime = System.currentTimeMillis() - startTime;
1856          I_QueueTest.log.info("time used for tests: " + usedTime/1000 + " seconds");
1857       }
1858    }
1859 }


syntax highlighted by Code2HTML, v. 0.9.1