1 /*-----------------------------------------------------------------------------
2 Name: TestQueue.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Testing the Timeout Features
6 -----------------------------------------------------------------------------*/
7 #include "TestSuite.h"
8 #include <vector>
9 #include <iostream>
10 /*#include "tut.h"*/
11 #include <util/queue/QueueFactory.h>
12 #include <util/queue/I_Queue.h>
13 #include <util/queue/PublishQueueEntry.h>
14 #include <util/queue/ConnectQueueEntry.h>
15 #include <util/queue/SubscribeQueueEntry.h>
16 #include <util/queue/UnSubscribeQueueEntry.h>
17
18 namespace org { namespace xmlBlaster { namespace test {
19
20 using namespace std;
21 using namespace org::xmlBlaster::util;
22 using namespace org::xmlBlaster::util::qos;
23 using namespace org::xmlBlaster::util::qos::storage;
24 using namespace org::xmlBlaster::util::queue;
25 using namespace org::xmlBlaster::client;
26 using namespace org::xmlBlaster::client::qos;
27 using namespace org::xmlBlaster::client::key;
28
29 /**
30 * Tests the queue entry and queue functionality.
31 * The following is tested here:
32 * - PublishQueueEntry comparison operators
33 * - ConnectQueueEntry comparison operators
34 * - Intermixed comparisons (between PublishQueueEntry and ConnectQueueEntry).
35 * - Queue access and overflow
36 */
37 class TestQueue
38 {
39
40 private:
41 string ME;
42 Global& global_;
43 I_Log& log_;
44 I_Queue* queue_;
45
46 public:
47 /** The values for "-queue/connection/type"; */
48 std::vector<string> types;
49
50 public:
51 TestQueue(Global& global, string name) : ME(name), global_(global), log_(global.getLog("test"))
52 {
53 queue_ = NULL;
54 types.push_back("RAM");
55 types.push_back("SQLite");
56 types.push_back("CACHE");
57 }
58
59 virtual ~TestQueue() { }
60
61 void destroyQueue() {
62 ClientQueueProperty prop(global_, "");
63 I_Queue *queue = &QueueFactory::getFactory().getPlugin(global_, prop);
64 queue->destroy();
65 QueueFactory::getFactory().releasePlugin(queue);
66 }
67
68 void testPublishCompare()
69 {
70 string me = ME + "::testPublishCompare";
71 log_.info(me, "");
72 log_.info(me, "comparison test between PublishQueueEntry objects.");
73
74 PublishKey pubKey(global_);
75 PublishQos pubQos(global_);
76 MessageUnit msgUnit(pubKey, string("comparison test"), pubQos);
77 PublishQueueEntry entry1(global_, msgUnit);
78 PublishQueueEntry entry2(global_, msgUnit);
79 PublishQueueEntry entry3(global_, msgUnit, 2);
80 PublishQueueEntry entry4(global_, msgUnit, 3);
81 PublishQueueEntry entry5(global_, msgUnit, 1);
82
83 assertEquals(log_, me, true, entry2 < entry1, "1. PublishQos compare 2 with 1");
84 assertEquals(log_, me, true, entry3 < entry4, "2. PublishQos compare 3 with 4");
85 assertEquals(log_, me, true, entry5 < entry4, "3. PublishQos compare 5 with 4");
86
87 log_.info(me, "test ended successfully");
88 }
89
90
91 void testConnectCompare()
92 {
93 string me = ME + "::testConnectCompare";
94 log_.info(me, "");
95 log_.info(me, "comparison test between ConnectQueueEntry objects.");
96
97 ConnectQos *connectQos = new ConnectQos(global_);
98 ConnectQueueEntry entry1(global_, connectQos);
99 ConnectQueueEntry entry2(global_, connectQos);
100 ConnectQueueEntry entry3(global_, connectQos, 2);
101 ConnectQueueEntry entry4(global_, connectQos, 3);
102 ConnectQueueEntry entry5(global_, connectQos, 1);
103
104 assertEquals(log_, me, true, entry2 < entry1, "1. PublishQos compare 2 with 1");
105 assertEquals(log_, me, true, entry3 < entry4, "2. PublishQos compare 3 with 4");
106 assertEquals(log_, me, true, entry5 < entry4, "3. PublishQos compare 5 with 4");
107
108 log_.info(me, "test ended successfully");
109 }
110
111 void testMixedCompare()
112 {
113 string me = ME + "::testMixedCompare";
114 log_.info(me, "");
115 log_.info(me, "comparison test between PublishQueueEntry and ConnectQueueEntry objects.");
116
117 PublishKey pubKey(global_);
118 PublishQos pubQos(global_);
119 MessageUnit msgUnit(pubKey, string("comparison test"), pubQos);
120 ConnectQos *connectQos = new ConnectQos(global_);
121
122 PublishQueueEntry entry1(global_, msgUnit, 2);
123 ConnectQueueEntry entry2(global_, connectQos, 3);
124 PublishQueueEntry entry3(global_, msgUnit, 1);
125
126 ConnectQueueEntry entry4(global_, connectQos, 2);
127 PublishQueueEntry entry5(global_, msgUnit, 3);
128 ConnectQueueEntry entry6(global_, connectQos, 1);
129
130 assertEquals(log_, me, true, entry1 < entry2, "1. Mixed compare 1 with 2");
131 assertEquals(log_, me, true, entry3 < entry2, "2. Mixed compare 3 with 2");
132
133 assertEquals(log_, me, true, entry4 < entry5, "3. Mixed compare 4 with 5");
134 assertEquals(log_, me, true, entry6 < entry5, "4. Mixed compare 6 with 5");
135
136 log_.info(me, "test completed successfully");
137 }
138
139
140 void testWithOnePublishEntry()
141 {
142 string me = ME + "::testWithOnePublishEntry";
143 log_.info(me, "");
144 log_.info(me, "this test creates a queue. The following checks are done:");
145 ClientQueueProperty prop(global_, "");
146 queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
147 assertEquals(log_, me, true, queue_->empty(), "The queue must be empty after creation");
148 assertEquals(log_, me, 0, queue_->getNumOfEntries(), "The queue must be empty after creation");
149 PublishQos qos(global_);
150 PublishKey key(global_);
151 const string contentStr = "BlaBla";
152 MessageUnit messageUnit(key, contentStr, qos);
153 PublishQueueEntry entry(global_, messageUnit, messageUnit.getQos().getPriority());
154 std::cout << "Putting " << entry.getUniqueId() << std::endl;
155
156 queue_->put(entry);
157 assertEquals(log_, me, false, queue_->empty(), " 2. the queue must contain entries after invoking put one time");
158 assertEquals(log_, me, 1, queue_->getNumOfEntries(), " 2b. the queue must contain one entry after invoking put one time");
159
160 vector<EntryType> ret = queue_->peekWithSamePriority();
161 assertEquals(log_, me, (size_t)1, ret.size(), " 3. the number of entries peeked after one put must be 1");
162 {
163 const MsgQueueEntry &e = *ret[0];
164 std::cout << "Peeking " << e.getUniqueId() << std::endl;
165 assertEquals(log_, me, entry.getUniqueId(), e.getUniqueId(), " 3. the uniqueId must be same");
166 assertEquals(log_, me, entry.getPriority(), e.getPriority(), " 3. the priority must be same");
167 }
168 long numDel = queue_->randomRemove(ret.begin(), ret.end());
169 assertEquals(log_, me, (long)1, numDel, " 4. randomRemove must return 1 entry deleted");
170 assertEquals(log_, me, true, queue_->empty(), " 5. after removing all entries (it was only 1 entry) the queue must be empty");
171 log_.info(me, "ends here. Test was successful.");
172 }
173
174
175 void testWithOneConnectEntry()
176 {
177 string me = ME + "::testWithOneEntry";
178 log_.info(me, "");
179 log_.info(me, "this test creates a queue. The following checks are done:");
180 ClientQueueProperty prop(global_, "");
181 queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
182 assertEquals(log_, me, true, queue_->empty(), " 1. the queue must be empty after creation");
183 ConnectQos *connQos = new ConnectQos(global_);
184 ConnectQueueEntry entry(global_, connQos);
185 queue_->put(entry);
186 assertEquals(log_, me, false, queue_->empty(), " 2. the queue must contain entries after invoking put one time");
187 vector<EntryType> ret = queue_->peekWithSamePriority();
188 assertEquals(log_, me, (size_t)1, ret.size(), " 3. the number of entries peeked after one put must be 1");
189 assertEquals(log_, me, (long)1, queue_->randomRemove(ret.begin(), ret.end()), " 4. randomRemove must return 1 entry deleted");
190 assertEquals(log_, me, true, queue_->empty(), " 5. after removing all entries (it was only 1 entry) the queue must be empty");
191 log_.info(me, "ends here. Test was successful.");
192 }
193
194
195 void testOrder()
196 {
197 string me = ME + "::testOrder";
198 log_.info(me, "");
199 log_.info(me, "this test checks the order in which entries are returned from the queue");
200 ClientQueueProperty prop(global_, "");
201 queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
202 ConnectQos *connQos = new ConnectQos(global_);
203
204 ConnectQueueEntry e1(global_, ConnectQosRef(new ConnectQos(global_)), 1);
205 e1.getConnectQos()->addClientProperty("X", 7);
206 queue_->put(e1);
207
208 ConnectQueueEntry e2(global_, ConnectQosRef(new ConnectQos(global_)), 5); // NORM_PRIORITY
209 e2.getConnectQos()->addClientProperty("X", 4);
210 queue_->put(e2);
211
212 ConnectQueueEntry e3(global_, ConnectQosRef(new ConnectQos(global_)), 7);
213 e3.getConnectQos()->addClientProperty("X", 1);
214 queue_->put(e3);
215
216 ConnectQueueEntry e4(global_, ConnectQosRef(new ConnectQos(global_)), 7);
217 e4.getConnectQos()->addClientProperty("X", 2);
218 queue_->put(e4);
219
220 ConnectQueueEntry e5(global_, ConnectQosRef(new ConnectQos(global_)), 1); // MIN1_PRIORITY
221 e5.getConnectQos()->addClientProperty("X", 8);
222 queue_->put(e5);
223
224 ConnectQueueEntry e6(global_, ConnectQosRef(new ConnectQos(global_)), 5);
225 e6.getConnectQos()->addClientProperty("X", 5);
226 queue_->put(e6);
227
228 ConnectQueueEntry e7(global_, ConnectQosRef(new ConnectQos(global_)), 5);
229 e7.getConnectQos()->addClientProperty("X", 6);
230 queue_->put(e7);
231
232 ConnectQueueEntry e8(global_, ConnectQosRef(new ConnectQos(global_)), 7);
233 e8.getConnectQos()->addClientProperty("X", 3);
234 queue_->put(e8);
235
236 ConnectQueueEntry e9(global_, connQos, 1);
237 e9.getConnectQos()->addClientProperty("X", 9); // MAX_PRIORITY
238 queue_->put(e9);
239
240 vector<EntryType> ret = queue_->peekWithSamePriority();
241 // should be 3 entries with priority 7
242 assertEquals(log_, me, (size_t)3, ret.size(), "1. number of priority 7 msg peeked must be correct.");
243
244 const MsgQueueEntry &entry = *ret[0];
245 // TODO:
246 // [cc] \xmlBlaster\testsuite\src\c++\TestQueue.cpp(245) : warning C4541:
247 // 'dynamic_cast' used on polymorphic type 'org::xmlBlaster::util::queue::MsgQueueEntry' with /GR-;
248 // unpredictable behavior may result
249 //cout << "Trying dynamic cast" << endl; // On _WINDOWS: /GR to enable C++ RTTI didn't help (see build.xml)
250 const ConnectQueueEntry *connectQueueEntry = dynamic_cast<const ConnectQueueEntry*>(&entry);
251 assertEquals(log_, me, 1, connectQueueEntry->getConnectQos()->getClientProperty("X", -1), "2. checking the first entry.");
252 assertEquals(log_, me, 2, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "3. checking the second entry.");
253 assertEquals(log_, me, 3, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "4. checking the third entry.");
254
255 assertEquals(log_, me, false, queue_->empty(), "5. there should still be entries in the queue.");
256 queue_->randomRemove(ret.begin(), ret.end());
257 ret = queue_->peekWithSamePriority();
258 assertEquals(log_, me, (size_t)3, ret.size(), "6. number of priority 7 msg peeked must be correct.");
259 assertEquals(log_, me, 4, dynamic_cast<const ConnectQueueEntry*>(&(*ret[0]))->getConnectQos()->getClientProperty("X", -1), "7. checking the first entry.");
260 assertEquals(log_, me, 5, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "8. checking the second entry.");
261 assertEquals(log_, me, 6, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "9. checking the third entry.");
262
263 queue_->randomRemove(ret.begin(), ret.end());
264 ret = queue_->peekWithSamePriority();
265 assertEquals(log_, me, (size_t)3, ret.size(), "10. number of priority 7 msg peeked must be correct.");
266 assertEquals(log_, me, 7, dynamic_cast<const ConnectQueueEntry*>(&(*ret[0]))->getConnectQos()->getClientProperty("X", -1), "11. checking the first entry.");
267 assertEquals(log_, me, 8, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "12. checking the second entry.");
268 assertEquals(log_, me, 9, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "13. checking the third entry.");
269 queue_->randomRemove(ret.begin(), ret.end());
270 assertEquals(log_, me, true, queue_->empty(), "14. the queue should be empty now.");
271 log_.info(me, "test ended successfully");
272 }
273
274
275 void testMaxNumOfEntries()
276 {
277 string me = ME + "::testMaxNumOfEntries";
278 log_.info(me, "");
279 log_.info(me, "this test checks that an excess of entries really throws an exception");
280 ClientQueueProperty prop(global_, "");
281 prop.setMaxEntries(10);
282 queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
283 ConnectQosRef connQos = new ConnectQos(global_);
284 connQos->setPersistent(false);
285 int i=0;
286 try {
287 for (i=0; i < 10; i++) {
288 if (i == 5) connQos->setPersistent(true);
289 ConnectQueueEntry entry(global_, connQos);
290 queue_->put(entry);
291 }
292 log_.info(me, "1. putting entries inside the queue: OK");
293 }
294 catch (const XmlBlasterException &/*ex*/) {
295 log_.error(me, "1. putting entries inside the queue: FAILED could not put inside the queue the entry nr. " + lexical_cast<string>(i));
296 assert(0);
297 }
298 try {
299 ConnectQueueEntry entry(global_, connQos);
300 queue_->put(entry);
301 log_.error(me, "2. putting entries inside the queue: FAILED should have thrown an exception");
302 assert(0);
303 }
304 catch (const XmlBlasterException &ex) {
305 assertEquals(log_, me, ex.getErrorCodeStr(), string("resource.overflow.queue.entries"), "3. checking that exceeding number of entries throws the correct exception.");
306 queue_->clear();
307 }
308 log_.info(me, "test ended successfully");
309 }
310
311
312 void testMaxNumOfBytes()
313 {
314 string me = ME + "::testMaxNumOfBytes";
315 log_.info(me, "");
316 log_.info(me, "this test checks that an excess of size in bytes really throws an exception");
317 ClientQueueProperty prop(global_, "");
318 ConnectQos *connQos = new ConnectQos(global_);
319 ConnectQueueEntry entry(global_, connQos);
320 size_t maxBytes = 10 * entry.getSizeInBytes();
321 prop.setMaxBytes(maxBytes);
322 queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
323
324 assertEquals(log_, me, maxBytes, (int)queue_->getMaxNumOfBytes(), "Setting maxNumOfBytes");
325
326 int i=0;
327 try {
328 for (i=0; i < 10; i++) {
329 ConnectQueueEntry ent(global_, connQos);
330 log_.trace(me, "Putting entry " + lexical_cast<string>(i) + " to queue, size=" + lexical_cast<string>(ent.getSizeInBytes()));
331 queue_->put(ent);
332 }
333 log_.info(me, "1. putting entries inside the queue: OK");
334 }
335 catch (const XmlBlasterException &/*ex*/) {
336 log_.error(me, "1. putting entries inside the queue: FAILED could not put inside the queue the entry no. " + lexical_cast<string>(i) +
337 /*", entryBytes=" + lexical_cast<string>(entry->getNumOfBytes()) +*/
338 ", numOfEntries=" + lexical_cast<string>(queue_->getNumOfEntries()) +
339 ", numOfBytes=" + lexical_cast<string>(queue_->getNumOfBytes()) +
340 " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes()));
341 assert(0);
342 }
343 try {
344 ConnectQueueEntry ent(global_, connQos);
345 queue_->put(ent);
346 log_.error(me, string("2. putting entries inside the queue: FAILED should have thrown an exception currQueueByte=") +
347 lexical_cast<string>(queue_->getNumOfBytes()) +
348 " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes()));
349 assert(0);
350 }
351 catch (const XmlBlasterException &ex) {
352 assertEquals(log_, me, ex.getErrorCodeStr(), string("resource.overflow.queue.bytes"),
353 string("3. checking that exceeding number of entries throws the correct exception. numOfBytes=") +
354 lexical_cast<string>(queue_->getNumOfBytes()) +
355 " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes()));
356 }
357 log_.info(me, "test ended successfully");
358 }
359
360 void setUp()
361 {
362 destroyQueue(); // Destroy old queue
363 }
364
365 void tearDown() {
366 if (queue_) {
367 QueueFactory::getFactory().releasePlugin(queue_);
368 queue_ = NULL;
369 }
370 }
371 };
372
373 }}} // namespace
374
375
376 using namespace org::xmlBlaster::test;
377
378 /** Compile: build -DexeName=TestQueue cpp-test-single */
379 int main(int args, char *argc[])
380 {
381 org::xmlBlaster::util::Object_Lifetime_Manager::init();
382
383 try {
384 Global& glob = Global::getInstance();
385 glob.initialize(args, argc);
386
387 TestQueue testObj = TestQueue(glob, "TestQueue");
388
389 for (std::vector<string>::size_type i=0; i < testObj.types.size(); i++) {
390 glob.getProperty().setProperty("queue/connection/type", testObj.types[i], true);
391 std::cout << "Testing queue type '" << glob.getProperty().get("queue/connection/type", string("eRRoR")) << "'" << std::endl;
392
393 testObj.setUp();
394 testObj.testPublishCompare();
395 testObj.tearDown();
396
397 testObj.setUp();
398 testObj.testConnectCompare();
399 testObj.setUp();
400 testObj.tearDown();
401
402 testObj.setUp();
403 testObj.testMixedCompare();
404 testObj.tearDown();
405
406 testObj.setUp();
407 testObj.testWithOnePublishEntry();
408 testObj.tearDown();
409
410 testObj.setUp();
411 testObj.testWithOneConnectEntry();
412 testObj.tearDown();
413
414 testObj.setUp();
415 testObj.testOrder();
416 testObj.tearDown();
417
418 testObj.setUp();
419 testObj.testMaxNumOfEntries();
420 testObj.tearDown();
421
422 testObj.setUp();
423 testObj.testMaxNumOfBytes();
424 testObj.tearDown();
425 }
426 }
427 catch (const XmlBlasterException &e) {
428 std::cerr << "TestQueue FAILED: " << e.getMessage() << std::endl;
429 assert(0);
430
431 }
432
433 org::xmlBlaster::util::Object_Lifetime_Manager::fini();
434 return 0;
435 }
syntax highlighted by Code2HTML, v. 0.9.1