1 package org.xmlBlaster.test.classtest.queue;
  2 
  3 import java.sql.Connection;
  4 import java.util.List;
  5 import java.util.logging.Level;
  6 import java.util.logging.Logger;
  7 
  8 import junit.framework.Test;
  9 import junit.framework.TestCase;
 10 import junit.framework.TestSuite;
 11 
 12 import org.xmlBlaster.util.Global;
 13 import org.xmlBlaster.util.StopWatch;
 14 import org.xmlBlaster.util.XmlBlasterException;
 15 import org.xmlBlaster.util.def.Constants;
 16 import org.xmlBlaster.util.def.ErrorCode;
 17 import org.xmlBlaster.util.def.PriorityEnum;
 18 import org.xmlBlaster.util.plugin.PluginInfo;
 19 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
 20 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
 21 import org.xmlBlaster.util.queue.I_Entry;
 22 import org.xmlBlaster.util.queue.I_Queue;
 23 import org.xmlBlaster.util.queue.QueuePluginManager;
 24 import org.xmlBlaster.util.queue.StorageId;
 25 import org.xmlBlaster.util.queue.jdbc.JdbcConnectionPool;
 26 import org.xmlBlaster.util.queuemsg.DummyEntry;
 27 
 28 /**
 29  * Test JdbcQueuePlugin failover when persistent store disappears. 
 30  * <p>
 31  * Invoke: java org.xmlBlaster.test.classtest.queue.JdbcQueueTest
 32  * </p>
 33  * <p>
 34  * Test database with PostgreSQL:
 35  * </p>
 36  * <pre>
 37  * initdb /tmp/postgres
 38  * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres    (edit host access)
 39  * createdb test
 40  * postmaster -i -D /tmp/postgres
 41  * </pre>
 42  * @see org.xmlBlaster.util.queue.I_Queue
 43  * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
 44  */
 45 public class JdbcQueueTest extends TestCase {
 46    
 47    
 48    public class ConnectionConsumer extends Thread {
 49       private JdbcConnectionPool pool;
 50       private int count;
 51       
 52       public ConnectionConsumer(JdbcConnectionPool pool, int count) {
 53          this.pool = pool;
 54          this.count = count;
 55          start();
 56       }
 57       
 58       public void run() {
 59          boolean success = true;
 60          try {
 61             log.info("connectionConsumer " + this.count + " starting");
 62             Connection conn = this.pool.getConnection();
 63             log.info("connectionConsumer " + this.count + " got the connection " + conn);
 64             if (conn != null) 
 65                this.pool.releaseConnection(conn, success);
 66          }
 67          catch (XmlBlasterException ex) {
 68             log.info("connectionConsumer exception " + ex.getMessage());
 69             if (ex.getErrorCode().getErrorCode().equals(ErrorCode.RESOURCE_TOO_MANY_THREADS.getErrorCode())) {
 70                synchronized(JdbcQueueTest.class) {
 71                   exceptionCount++;
 72                }
 73             }
 74          }
 75       }
 76       
 77    }
 78    
 79    int exceptionCount = 0;
 80    
 81    private String ME = "JdbcQueueTest";
 82    protected Global glob;
 83    private static Logger log = Logger.getLogger(JdbcQueueTest.class.getName());
 84    private long sizeOfMsg  = 100L;
 85    private I_Queue queue   = null;
 86 
 87    public List<I_Entry> queueList = null;
 88 //   public static String[] PLUGIN_TYPES = { new String("JDBC"), new String("CACHE") };
 89    public static String[] PLUGIN_TYPES = { new String("JDBC") };
 90    public int count = 0;
 91    boolean suppressTest = false;
 92    boolean doExecute = true;
 93 
 94    /** Constructor for junit not possible since we need to run it 3 times
 95    public JdbcQueueTest(String name) {
 96       super(name);
 97       for (int i=0; i < NUM_IMPL; i++)
 98          initialize(new Global(), name, i);
 99    }
100    */
101 
102    public JdbcQueueTest(Global glob, String name, int currImpl, boolean doExecute) {
103       super(name);
104       this.doExecute = doExecute;
105       initialize(glob, name, currImpl);
106    }
107 
108    private void initialize(Global glob, String name, int currImpl) {
109       this.glob = Global.instance();
110 
111 
112       this.sizeOfMsg = glob.getProperty().get("sizes", 10L);
113       this.suppressTest = false;
114       this.count = currImpl;
115 
116       try {
117          String type = PLUGIN_TYPES[currImpl];
118          this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
119          QueuePluginManager pluginManager = new QueuePluginManager(glob);
120          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
121          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
122          prop.put("tableNamePrefix", "TEST");
123          prop.put("entriesTableName", "_entries");
124 
125          CbQueueProperty cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
126          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "SetupQueue");
127 
128          this.queue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
129          this.queue.shutdown(); // to allow to initialize again
130       }
131       catch (Exception ex) {
132          log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
133       }
134    }
135 
136    protected void setUp() {
137 
138       try {
139          glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
140          ME = "JdbcQueueTest with class: " + PLUGIN_TYPES[this.count];
141       }
142       catch (Exception ex) {
143          log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'" + ex.getMessage());
144       }
145 
146       // cleaning up the database from previous runs ...
147 
148       try {
149          // test initialize()
150 //         this.queue.destroy();
151          this.queue.shutdown();
152       }
153       catch (Exception ex) {
154          log.severe("could not propertly set up the database: " + ex.getMessage());
155          this.suppressTest = true;
156       }
157    }
158 
159    public void tearDown() {
160       try {
161          this.queue.clear();
162          this.queue.shutdown();
163       }
164       catch (Exception ex) {
165          log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp " + ex.getMessage());
166       }
167    }
168 
169    
170    public void testPutWithBreak() {
171       if (this.suppressTest) {
172          log.severe("JDBC test is not driven as no database was found");
173          return;
174       }
175       try {
176          if (this.doExecute) putWithBreak();
177          else {
178             log.warning("test desactivated since needs to be run manually");
179             log.warning("please invoke it as 'java org.xmlBlaster.test.classtest.queue.JdbcQueueTest'");
180          }
181       }
182       catch (XmlBlasterException ex) {
183          fail("Exception when testing PutWithBreak probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
184          ex.printStackTrace();
185       }
186    }
187 
188    public void putWithBreak() throws XmlBlasterException {
189       String me = ME + ".putWithBreak";
190       // set up the queues ....
191       QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
192       prop.setMaxEntries(10000);
193       StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "putWithBreak");
194       queue.initialize(queueId, prop);
195       queue.clear();
196 
197       int num = 30;
198       boolean success = false;
199       for (int i=0; i < num; i++) {
200          try {
201             log.info("put with break entry " + i + "/" + num + " please kill the DB manually to test reconnect");
202             DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), sizeOfMsg, true);
203             queue.put(entry, false);
204             try {
205                Thread.sleep(5000L);
206             }
207             catch (Exception ex) {
208             }
209          }
210          catch (XmlBlasterException ex) {
211             if (log.isLoggable(Level.FINE))  log.fine(ex.getMessage());
212             if ("resource.db.unavailable".equalsIgnoreCase(ex.getErrorCodeStr())) {
213                log.info("the communication to the db has been lost");
214                success = true;
215                break;
216             }
217             else throw ex;
218          }
219       }
220       
221       assertTrue(me + ": Timed out when waiting to loose the connection to the DB", success);
222       success = false; // reset the flag
223       log.info("preparing to reconnect again ...");
224 
225       for (int i=0; i < num; i++) {
226          try {
227             log.info("put with break entry " + i + "/" + num + " please restart the the DB to test reconnect");
228             DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), sizeOfMsg, true);
229             queue.put(entry, false);
230             log.info("the communication to the db has been reestablished");
231             success = true;
232             break;
233          }
234          catch (XmlBlasterException ex) {
235             if (log.isLoggable(Level.FINE))  log.fine(ex.getMessage());
236             if ("resource.db.unavailable".equalsIgnoreCase(ex.getErrorCodeStr())) {
237                try {
238                   Thread.sleep(5000L);
239                }
240                catch (Exception e) {
241                }
242             }
243             else throw ex;
244          }
245       }
246       assertTrue(me + ": Timed out when waiting to regain the connection to the DB", success);
247       log.info("successfully ended");
248    }
249 
250    public void testInitialEntries() {
251       if (this.suppressTest) {
252          log.severe("JDBC test is not driven as no database was found");
253          return;
254       }
255       try {
256          initialEntries();
257       }
258       catch (XmlBlasterException ex) {
259          fail("Exception when testing InitialEntries probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
260          ex.printStackTrace();
261       }
262    }
263 
264    public void initialEntries() throws XmlBlasterException {
265       // set up the queues ....
266       log.info("initialEntries test starts");
267       QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
268       cbProp.setMaxEntries(10000L);
269       cbProp.setMaxBytes(200000L);
270       StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "initialEntries");
271 
272       try {
273          String type = PLUGIN_TYPES[this.count];
274          this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
275          QueuePluginManager pluginManager = new QueuePluginManager(glob);
276          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
277          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
278          prop.put("tableNamePrefix", "TEST");
279          prop.put("entriesTableName", "_entries");
280          I_Queue tmpQueue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
281          tmpQueue.clear();
282          // add some persistent entries and then shutdown ...
283          DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
284          tmpQueue.put(entry, false);
285          entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
286          tmpQueue.put(entry, false);
287          entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
288          tmpQueue.put(entry, false);
289          tmpQueue.shutdown(); // to allow to initialize again
290          I_Queue tmpQueue2 = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
291          long numOfEntries = tmpQueue2.getNumOfEntries();
292          assertEquals("Wrong number of entries in queue", 3L, numOfEntries);
293          List<I_Entry> lst = tmpQueue2.peek(-1, -1L);
294          assertEquals("Wrong number of entries retrieved from queue", 3, lst.size());
295          queue.shutdown();
296       }
297       catch (Exception ex) {
298          log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
299          ex.printStackTrace();
300          assertTrue("exception occured when testing initialEntries", false);
301       }
302       log.info("initialEntries test successfully ended");
303    }
304 
305 
306 
307 
308    public void testMultiplePut() {
309       try {
310          multiplePut();
311       }
312       catch (XmlBlasterException ex) {
313          fail("Exception when testing multiplePut probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
314          ex.printStackTrace();
315       }
316    }
317 
318    public void multiplePut() throws XmlBlasterException {
319       // set up the queues ....
320       log.info("initialEntries test starts");
321       QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
322       cbProp.setMaxEntries(10000L);
323       cbProp.setMaxBytes(200000L);
324       StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "initialEntries");
325 
326       try {
327          String type = PLUGIN_TYPES[this.count];
328          this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
329          QueuePluginManager pluginManager = new QueuePluginManager(glob);
330          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
331          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
332          prop.put("tableNamePrefix", "TEST");
333          prop.put("entriesTableName", "_entries");
334          I_Queue tmpQueue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
335          tmpQueue.clear();
336          // add some persistent entries and then shutdown ...
337          int nmax = 1;
338          int size = 100;
339 
340          for (int j=0; j < 4; j++) {
341             DummyEntry[] entries = new DummyEntry[nmax];
342             for (int i=0; i < nmax; i++) {
343                entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), size, true);
344             }
345             long time1 = System.currentTimeMillis();
346             tmpQueue.put(entries, false);
347             long delta = System.currentTimeMillis() - time1;
348             log.info("multiple put '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
349            
350             List<I_Entry> list = tmpQueue.peek(-1, -1L);
351             assertEquals("Wrong number of entries in queue", nmax, list.size());
352            
353             time1 = System.currentTimeMillis();
354             tmpQueue.removeRandom(entries);
355             delta = System.currentTimeMillis() - time1;
356             log.info("multiple remove '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
357             tmpQueue.clear();
358            
359             time1 = System.currentTimeMillis();
360             for (int i=0; i < nmax; i++) {
361                tmpQueue.put(entries[i], false);
362             }
363             delta = System.currentTimeMillis() - time1;
364             log.info("repeated single put '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
365            
366             time1 = System.currentTimeMillis();
367             for (int i=0; i < nmax; i++) tmpQueue.removeRandom(entries[i]);
368             delta = System.currentTimeMillis() - time1;
369             log.info("repeated single remove '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
370             nmax *= 10;
371          }
372          tmpQueue.shutdown(); // to allow to initialize again
373       }
374       catch (Exception ex) {
375          log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
376          ex.printStackTrace();
377          assertTrue("exception occured when testing initialEntries", false);
378       }
379       log.info("initialEntries test successfully ended");
380    }
381 
382 
383    public void testConnectionPool() {
384       try {
385          log.info(" starting ");
386          int numConn = 3;
387          int maxWaitingThreads = 10;
388 
389          Global ownGlobal = this.glob.getClone(null);
390 
391          QueuePluginManager pluginManager = new QueuePluginManager(ownGlobal);
392          PluginInfo pluginInfo = new PluginInfo(ownGlobal, pluginManager, "JDBC", "1.0");
393 
394          pluginInfo.getParameters().put("connectionBusyTimeout", "10000");
395          pluginInfo.getParameters().put("maxWaitingThreads", "" + maxWaitingThreads);
396          pluginInfo.getParameters().put("connectionPoolSize", "" + numConn);
397 
398          JdbcConnectionPool pool = new JdbcConnectionPool();
399          pool.initialize(ownGlobal, pluginInfo.getParameters());
400 
401          Connection[] conn = new Connection[numConn];         
402          for (int i=0; i < numConn; i++) {
403             log.info(" getting connection " + i);
404             conn[i] = pool.getConnection();
405             assertNotNull("The connection " + i + " shall not be null", conn[i]);
406          }
407          
408          log.info(" getting extra connection");
409          
410          Connection extraConn = null;
411          try {
412             extraConn = pool.getConnection();
413             assertTrue("An Exception should have occured here: ", false);
414          }
415          catch (Exception ex) {
416          }
417          // should wait 10 seconds and then return null
418          assertNull("the extra connection should be null", extraConn);
419          boolean success = true;
420          pool.releaseConnection(conn[0], success);
421          extraConn = pool.getConnection();
422          assertNotNull("the extra connection should not be null", extraConn);
423          //pool.releaseConnection(extraConn);
424 
425          this.exceptionCount = 0;         
426          int expectedEx = 4;
427          for (int i=0; i < maxWaitingThreads + expectedEx; i++) {
428             ConnectionConsumer cc = new ConnectionConsumer(pool, i);
429          }
430  
431          try {
432             Thread.sleep(15000L);
433          }
434          catch (InterruptedException ex) {
435          }
436  
437          assertEquals("Number of exceptions due to too many waiting threads is wrong", expectedEx, this.exceptionCount);
438          log.info(" successfully ended ");
439       }
440       catch (Exception ex) {
441          fail("Exception when testing multiplePut probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
442          ex.printStackTrace();
443       }
444    }
445 
446 
447    /**
448     * Method is used by TestRunner to load these tests
449     */
450    public static Test suite() {
451       TestSuite suite= new TestSuite();
452       Global glob = new Global();
453       for (int i=0; i < PLUGIN_TYPES.length; i++) {
454          suite.addTest(new JdbcQueueTest(glob, "testConnectionPool", i, true));
455          suite.addTest(new JdbcQueueTest(glob, "testMultiplePut", i, true));
456          suite.addTest(new JdbcQueueTest(glob, "testPutWithBreak", i, false));
457          suite.addTest(new JdbcQueueTest(glob, "testInitialEntries", i, true));
458       }
459       return suite;
460    }
461 
462    /**
463     * <pre>
464     *  java org.xmlBlaster.test.classtest.queue.JdbcQueueTest
465     * </pre>
466     */
467    public static void main(String args[]) {
468       Global glob = new Global(args);
469 
470       for (int i=0; i < PLUGIN_TYPES.length; i++) {
471          JdbcQueueTest testSub = new JdbcQueueTest(glob, "JdbcQueueTest", i, true);
472 
473          testSub.setUp();
474          testSub.testConnectionPool();
475          testSub.tearDown();
476 
477          testSub.setUp();
478          testSub.testMultiplePut();
479          testSub.tearDown();
480 
481          testSub.setUp();
482          testSub.testPutWithBreak();
483          testSub.tearDown();
484 
485          testSub.setUp();
486          testSub.testInitialEntries();
487          testSub.tearDown();
488       }
489    }
490 }


syntax highlighted by Code2HTML, v. 0.9.1