1 package org.xmlBlaster.test.classtest.queue;
2
3 import java.sql.Connection;
4 import java.util.List;
5 import java.util.logging.Level;
6 import java.util.logging.Logger;
7
8 import junit.framework.Test;
9 import junit.framework.TestCase;
10 import junit.framework.TestSuite;
11
12 import org.xmlBlaster.util.Global;
13 import org.xmlBlaster.util.StopWatch;
14 import org.xmlBlaster.util.XmlBlasterException;
15 import org.xmlBlaster.util.def.Constants;
16 import org.xmlBlaster.util.def.ErrorCode;
17 import org.xmlBlaster.util.def.PriorityEnum;
18 import org.xmlBlaster.util.plugin.PluginInfo;
19 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
20 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
21 import org.xmlBlaster.util.queue.I_Entry;
22 import org.xmlBlaster.util.queue.I_Queue;
23 import org.xmlBlaster.util.queue.QueuePluginManager;
24 import org.xmlBlaster.util.queue.StorageId;
25 import org.xmlBlaster.util.queue.jdbc.JdbcConnectionPool;
26 import org.xmlBlaster.util.queuemsg.DummyEntry;
27
28 /**
29 * Test JdbcQueuePlugin failover when persistent store disappears.
30 * <p>
31 * Invoke: java org.xmlBlaster.test.classtest.queue.JdbcQueueTest
32 * </p>
33 * <p>
34 * Test database with PostgreSQL:
35 * </p>
36 * <pre>
37 * initdb /tmp/postgres
38 * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres (edit host access)
39 * createdb test
40 * postmaster -i -D /tmp/postgres
41 * </pre>
42 * @see org.xmlBlaster.util.queue.I_Queue
43 * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
44 */
45 public class JdbcQueueTest extends TestCase {
46
47
48 public class ConnectionConsumer extends Thread {
49 private JdbcConnectionPool pool;
50 private int count;
51
52 public ConnectionConsumer(JdbcConnectionPool pool, int count) {
53 this.pool = pool;
54 this.count = count;
55 start();
56 }
57
58 public void run() {
59 boolean success = true;
60 try {
61 log.info("connectionConsumer " + this.count + " starting");
62 Connection conn = this.pool.getConnection();
63 log.info("connectionConsumer " + this.count + " got the connection " + conn);
64 if (conn != null)
65 this.pool.releaseConnection(conn, success);
66 }
67 catch (XmlBlasterException ex) {
68 log.info("connectionConsumer exception " + ex.getMessage());
69 if (ex.getErrorCode().getErrorCode().equals(ErrorCode.RESOURCE_TOO_MANY_THREADS.getErrorCode())) {
70 synchronized(JdbcQueueTest.class) {
71 exceptionCount++;
72 }
73 }
74 }
75 }
76
77 }
78
79 int exceptionCount = 0;
80
81 private String ME = "JdbcQueueTest";
82 protected Global glob;
83 private static Logger log = Logger.getLogger(JdbcQueueTest.class.getName());
84 private long sizeOfMsg = 100L;
85 private I_Queue queue = null;
86
87 public List<I_Entry> queueList = null;
88 // public static String[] PLUGIN_TYPES = { new String("JDBC"), new String("CACHE") };
89 public static String[] PLUGIN_TYPES = { new String("JDBC") };
90 public int count = 0;
91 boolean suppressTest = false;
92 boolean doExecute = true;
93
94 /** Constructor for junit not possible since we need to run it 3 times
95 public JdbcQueueTest(String name) {
96 super(name);
97 for (int i=0; i < NUM_IMPL; i++)
98 initialize(new Global(), name, i);
99 }
100 */
101
102 public JdbcQueueTest(Global glob, String name, int currImpl, boolean doExecute) {
103 super(name);
104 this.doExecute = doExecute;
105 initialize(glob, name, currImpl);
106 }
107
108 private void initialize(Global glob, String name, int currImpl) {
109 this.glob = Global.instance();
110
111
112 this.sizeOfMsg = glob.getProperty().get("sizes", 10L);
113 this.suppressTest = false;
114 this.count = currImpl;
115
116 try {
117 String type = PLUGIN_TYPES[currImpl];
118 this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
119 QueuePluginManager pluginManager = new QueuePluginManager(glob);
120 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
121 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
122 prop.put("tableNamePrefix", "TEST");
123 prop.put("entriesTableName", "_entries");
124
125 CbQueueProperty cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
126 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "SetupQueue");
127
128 this.queue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
129 this.queue.shutdown(); // to allow to initialize again
130 }
131 catch (Exception ex) {
132 log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
133 }
134 }
135
136 protected void setUp() {
137
138 try {
139 glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
140 ME = "JdbcQueueTest with class: " + PLUGIN_TYPES[this.count];
141 }
142 catch (Exception ex) {
143 log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'" + ex.getMessage());
144 }
145
146 // cleaning up the database from previous runs ...
147
148 try {
149 // test initialize()
150 // this.queue.destroy();
151 this.queue.shutdown();
152 }
153 catch (Exception ex) {
154 log.severe("could not propertly set up the database: " + ex.getMessage());
155 this.suppressTest = true;
156 }
157 }
158
159 public void tearDown() {
160 try {
161 this.queue.clear();
162 this.queue.shutdown();
163 }
164 catch (Exception ex) {
165 log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp " + ex.getMessage());
166 }
167 }
168
169
170 public void testPutWithBreak() {
171 if (this.suppressTest) {
172 log.severe("JDBC test is not driven as no database was found");
173 return;
174 }
175 try {
176 if (this.doExecute) putWithBreak();
177 else {
178 log.warning("test desactivated since needs to be run manually");
179 log.warning("please invoke it as 'java org.xmlBlaster.test.classtest.queue.JdbcQueueTest'");
180 }
181 }
182 catch (XmlBlasterException ex) {
183 fail("Exception when testing PutWithBreak probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
184 ex.printStackTrace();
185 }
186 }
187
188 public void putWithBreak() throws XmlBlasterException {
189 String me = ME + ".putWithBreak";
190 // set up the queues ....
191 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
192 prop.setMaxEntries(10000);
193 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "putWithBreak");
194 queue.initialize(queueId, prop);
195 queue.clear();
196
197 int num = 30;
198 boolean success = false;
199 for (int i=0; i < num; i++) {
200 try {
201 log.info("put with break entry " + i + "/" + num + " please kill the DB manually to test reconnect");
202 DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), sizeOfMsg, true);
203 queue.put(entry, false);
204 try {
205 Thread.sleep(5000L);
206 }
207 catch (Exception ex) {
208 }
209 }
210 catch (XmlBlasterException ex) {
211 if (log.isLoggable(Level.FINE)) log.fine(ex.getMessage());
212 if ("resource.db.unavailable".equalsIgnoreCase(ex.getErrorCodeStr())) {
213 log.info("the communication to the db has been lost");
214 success = true;
215 break;
216 }
217 else throw ex;
218 }
219 }
220
221 assertTrue(me + ": Timed out when waiting to loose the connection to the DB", success);
222 success = false; // reset the flag
223 log.info("preparing to reconnect again ...");
224
225 for (int i=0; i < num; i++) {
226 try {
227 log.info("put with break entry " + i + "/" + num + " please restart the the DB to test reconnect");
228 DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), sizeOfMsg, true);
229 queue.put(entry, false);
230 log.info("the communication to the db has been reestablished");
231 success = true;
232 break;
233 }
234 catch (XmlBlasterException ex) {
235 if (log.isLoggable(Level.FINE)) log.fine(ex.getMessage());
236 if ("resource.db.unavailable".equalsIgnoreCase(ex.getErrorCodeStr())) {
237 try {
238 Thread.sleep(5000L);
239 }
240 catch (Exception e) {
241 }
242 }
243 else throw ex;
244 }
245 }
246 assertTrue(me + ": Timed out when waiting to regain the connection to the DB", success);
247 log.info("successfully ended");
248 }
249
250 public void testInitialEntries() {
251 if (this.suppressTest) {
252 log.severe("JDBC test is not driven as no database was found");
253 return;
254 }
255 try {
256 initialEntries();
257 }
258 catch (XmlBlasterException ex) {
259 fail("Exception when testing InitialEntries probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
260 ex.printStackTrace();
261 }
262 }
263
264 public void initialEntries() throws XmlBlasterException {
265 // set up the queues ....
266 log.info("initialEntries test starts");
267 QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
268 cbProp.setMaxEntries(10000L);
269 cbProp.setMaxBytes(200000L);
270 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "initialEntries");
271
272 try {
273 String type = PLUGIN_TYPES[this.count];
274 this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
275 QueuePluginManager pluginManager = new QueuePluginManager(glob);
276 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
277 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
278 prop.put("tableNamePrefix", "TEST");
279 prop.put("entriesTableName", "_entries");
280 I_Queue tmpQueue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
281 tmpQueue.clear();
282 // add some persistent entries and then shutdown ...
283 DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
284 tmpQueue.put(entry, false);
285 entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
286 tmpQueue.put(entry, false);
287 entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
288 tmpQueue.put(entry, false);
289 tmpQueue.shutdown(); // to allow to initialize again
290 I_Queue tmpQueue2 = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
291 long numOfEntries = tmpQueue2.getNumOfEntries();
292 assertEquals("Wrong number of entries in queue", 3L, numOfEntries);
293 List<I_Entry> lst = tmpQueue2.peek(-1, -1L);
294 assertEquals("Wrong number of entries retrieved from queue", 3, lst.size());
295 queue.shutdown();
296 }
297 catch (Exception ex) {
298 log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
299 ex.printStackTrace();
300 assertTrue("exception occured when testing initialEntries", false);
301 }
302 log.info("initialEntries test successfully ended");
303 }
304
305
306
307
308 public void testMultiplePut() {
309 try {
310 multiplePut();
311 }
312 catch (XmlBlasterException ex) {
313 fail("Exception when testing multiplePut probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
314 ex.printStackTrace();
315 }
316 }
317
318 public void multiplePut() throws XmlBlasterException {
319 // set up the queues ....
320 log.info("initialEntries test starts");
321 QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
322 cbProp.setMaxEntries(10000L);
323 cbProp.setMaxBytes(200000L);
324 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "initialEntries");
325
326 try {
327 String type = PLUGIN_TYPES[this.count];
328 this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
329 QueuePluginManager pluginManager = new QueuePluginManager(glob);
330 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
331 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
332 prop.put("tableNamePrefix", "TEST");
333 prop.put("entriesTableName", "_entries");
334 I_Queue tmpQueue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
335 tmpQueue.clear();
336 // add some persistent entries and then shutdown ...
337 int nmax = 1;
338 int size = 100;
339
340 for (int j=0; j < 4; j++) {
341 DummyEntry[] entries = new DummyEntry[nmax];
342 for (int i=0; i < nmax; i++) {
343 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), size, true);
344 }
345 long time1 = System.currentTimeMillis();
346 tmpQueue.put(entries, false);
347 long delta = System.currentTimeMillis() - time1;
348 log.info("multiple put '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
349
350 List<I_Entry> list = tmpQueue.peek(-1, -1L);
351 assertEquals("Wrong number of entries in queue", nmax, list.size());
352
353 time1 = System.currentTimeMillis();
354 tmpQueue.removeRandom(entries);
355 delta = System.currentTimeMillis() - time1;
356 log.info("multiple remove '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
357 tmpQueue.clear();
358
359 time1 = System.currentTimeMillis();
360 for (int i=0; i < nmax; i++) {
361 tmpQueue.put(entries[i], false);
362 }
363 delta = System.currentTimeMillis() - time1;
364 log.info("repeated single put '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
365
366 time1 = System.currentTimeMillis();
367 for (int i=0; i < nmax; i++) tmpQueue.removeRandom(entries[i]);
368 delta = System.currentTimeMillis() - time1;
369 log.info("repeated single remove '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
370 nmax *= 10;
371 }
372 tmpQueue.shutdown(); // to allow to initialize again
373 }
374 catch (Exception ex) {
375 log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
376 ex.printStackTrace();
377 assertTrue("exception occured when testing initialEntries", false);
378 }
379 log.info("initialEntries test successfully ended");
380 }
381
382
383 public void testConnectionPool() {
384 try {
385 log.info(" starting ");
386 int numConn = 3;
387 int maxWaitingThreads = 10;
388
389 Global ownGlobal = this.glob.getClone(null);
390
391 QueuePluginManager pluginManager = new QueuePluginManager(ownGlobal);
392 PluginInfo pluginInfo = new PluginInfo(ownGlobal, pluginManager, "JDBC", "1.0");
393
394 pluginInfo.getParameters().put("connectionBusyTimeout", "10000");
395 pluginInfo.getParameters().put("maxWaitingThreads", "" + maxWaitingThreads);
396 pluginInfo.getParameters().put("connectionPoolSize", "" + numConn);
397
398 JdbcConnectionPool pool = new JdbcConnectionPool();
399 pool.initialize(ownGlobal, pluginInfo.getParameters());
400
401 Connection[] conn = new Connection[numConn];
402 for (int i=0; i < numConn; i++) {
403 log.info(" getting connection " + i);
404 conn[i] = pool.getConnection();
405 assertNotNull("The connection " + i + " shall not be null", conn[i]);
406 }
407
408 log.info(" getting extra connection");
409
410 Connection extraConn = null;
411 try {
412 extraConn = pool.getConnection();
413 assertTrue("An Exception should have occured here: ", false);
414 }
415 catch (Exception ex) {
416 }
417 // should wait 10 seconds and then return null
418 assertNull("the extra connection should be null", extraConn);
419 boolean success = true;
420 pool.releaseConnection(conn[0], success);
421 extraConn = pool.getConnection();
422 assertNotNull("the extra connection should not be null", extraConn);
423 //pool.releaseConnection(extraConn);
424
425 this.exceptionCount = 0;
426 int expectedEx = 4;
427 for (int i=0; i < maxWaitingThreads + expectedEx; i++) {
428 ConnectionConsumer cc = new ConnectionConsumer(pool, i);
429 }
430
431 try {
432 Thread.sleep(15000L);
433 }
434 catch (InterruptedException ex) {
435 }
436
437 assertEquals("Number of exceptions due to too many waiting threads is wrong", expectedEx, this.exceptionCount);
438 log.info(" successfully ended ");
439 }
440 catch (Exception ex) {
441 fail("Exception when testing multiplePut probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
442 ex.printStackTrace();
443 }
444 }
445
446
447 /**
448 * Method is used by TestRunner to load these tests
449 */
450 public static Test suite() {
451 TestSuite suite= new TestSuite();
452 Global glob = new Global();
453 for (int i=0; i < PLUGIN_TYPES.length; i++) {
454 suite.addTest(new JdbcQueueTest(glob, "testConnectionPool", i, true));
455 suite.addTest(new JdbcQueueTest(glob, "testMultiplePut", i, true));
456 suite.addTest(new JdbcQueueTest(glob, "testPutWithBreak", i, false));
457 suite.addTest(new JdbcQueueTest(glob, "testInitialEntries", i, true));
458 }
459 return suite;
460 }
461
462 /**
463 * <pre>
464 * java org.xmlBlaster.test.classtest.queue.JdbcQueueTest
465 * </pre>
466 */
467 public static void main(String args[]) {
468 Global glob = new Global(args);
469
470 for (int i=0; i < PLUGIN_TYPES.length; i++) {
471 JdbcQueueTest testSub = new JdbcQueueTest(glob, "JdbcQueueTest", i, true);
472
473 testSub.setUp();
474 testSub.testConnectionPool();
475 testSub.tearDown();
476
477 testSub.setUp();
478 testSub.testMultiplePut();
479 testSub.tearDown();
480
481 testSub.setUp();
482 testSub.testPutWithBreak();
483 testSub.tearDown();
484
485 testSub.setUp();
486 testSub.testInitialEntries();
487 testSub.tearDown();
488 }
489 }
490 }
syntax highlighted by Code2HTML, v. 0.9.1