00001 /*----------------------------------------------------------------------------- 00002 Name: TestQueue.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Testing the Timeout Features 00006 -----------------------------------------------------------------------------*/ 00007 #include "TestSuite.h" 00008 #include <vector> 00009 #include <iostream> 00010 /*#include "tut.h"*/ 00011 #include <util/queue/QueueFactory.h> 00012 #include <util/queue/I_Queue.h> 00013 #include <util/queue/PublishQueueEntry.h> 00014 #include <util/queue/ConnectQueueEntry.h> 00015 #include <util/queue/SubscribeQueueEntry.h> 00016 #include <util/queue/UnSubscribeQueueEntry.h> 00017 00018 namespace org { namespace xmlBlaster { namespace test { 00019 00020 using namespace std; 00021 using namespace org::xmlBlaster::util; 00022 using namespace org::xmlBlaster::util::qos; 00023 using namespace org::xmlBlaster::util::qos::storage; 00024 using namespace org::xmlBlaster::util::queue; 00025 using namespace org::xmlBlaster::client; 00026 using namespace org::xmlBlaster::client::qos; 00027 using namespace org::xmlBlaster::client::key; 00028 00037 class TestQueue 00038 { 00039 00040 private: 00041 string ME; 00042 Global& global_; 00043 I_Log& log_; 00044 I_Queue* queue_; 00045 00046 public: 00048 std::vector<string> types; 00049 00050 public: 00051 TestQueue(Global& global, string name) : ME(name), global_(global), log_(global.getLog("test")) 00052 { 00053 queue_ = NULL; 00054 types.push_back("RAM"); 00055 types.push_back("SQLite"); 00056 types.push_back("CACHE"); 00057 } 00058 00059 virtual ~TestQueue() { } 00060 00061 void destroyQueue() { 00062 ClientQueueProperty prop(global_, ""); 00063 I_Queue *queue = &QueueFactory::getFactory().getPlugin(global_, prop); 00064 queue->destroy(); 00065 QueueFactory::getFactory().releasePlugin(queue); 00066 } 00067 00068 void testPublishCompare() 00069 { 00070 string me = ME + "::testPublishCompare"; 00071 log_.info(me, ""); 00072 log_.info(me, "comparison test between PublishQueueEntry objects."); 00073 00074 PublishKey pubKey(global_); 00075 PublishQos pubQos(global_); 00076 MessageUnit msgUnit(pubKey, string("comparison test"), pubQos); 00077 PublishQueueEntry entry1(global_, msgUnit); 00078 PublishQueueEntry entry2(global_, msgUnit); 00079 PublishQueueEntry entry3(global_, msgUnit, 2); 00080 PublishQueueEntry entry4(global_, msgUnit, 3); 00081 PublishQueueEntry entry5(global_, msgUnit, 1); 00082 00083 assertEquals(log_, me, true, entry2 < entry1, "1. PublishQos compare 2 with 1"); 00084 assertEquals(log_, me, true, entry3 < entry4, "2. PublishQos compare 3 with 4"); 00085 assertEquals(log_, me, true, entry5 < entry4, "3. PublishQos compare 5 with 4"); 00086 00087 log_.info(me, "test ended successfully"); 00088 } 00089 00090 00091 void testConnectCompare() 00092 { 00093 string me = ME + "::testConnectCompare"; 00094 log_.info(me, ""); 00095 log_.info(me, "comparison test between ConnectQueueEntry objects."); 00096 00097 ConnectQos *connectQos = new ConnectQos(global_); 00098 ConnectQueueEntry entry1(global_, connectQos); 00099 ConnectQueueEntry entry2(global_, connectQos); 00100 ConnectQueueEntry entry3(global_, connectQos, 2); 00101 ConnectQueueEntry entry4(global_, connectQos, 3); 00102 ConnectQueueEntry entry5(global_, connectQos, 1); 00103 00104 assertEquals(log_, me, true, entry2 < entry1, "1. PublishQos compare 2 with 1"); 00105 assertEquals(log_, me, true, entry3 < entry4, "2. PublishQos compare 3 with 4"); 00106 assertEquals(log_, me, true, entry5 < entry4, "3. PublishQos compare 5 with 4"); 00107 00108 log_.info(me, "test ended successfully"); 00109 } 00110 00111 void testMixedCompare() 00112 { 00113 string me = ME + "::testMixedCompare"; 00114 log_.info(me, ""); 00115 log_.info(me, "comparison test between PublishQueueEntry and ConnectQueueEntry objects."); 00116 00117 PublishKey pubKey(global_); 00118 PublishQos pubQos(global_); 00119 MessageUnit msgUnit(pubKey, string("comparison test"), pubQos); 00120 ConnectQos *connectQos = new ConnectQos(global_); 00121 00122 PublishQueueEntry entry1(global_, msgUnit, 2); 00123 ConnectQueueEntry entry2(global_, connectQos, 3); 00124 PublishQueueEntry entry3(global_, msgUnit, 1); 00125 00126 ConnectQueueEntry entry4(global_, connectQos, 2); 00127 PublishQueueEntry entry5(global_, msgUnit, 3); 00128 ConnectQueueEntry entry6(global_, connectQos, 1); 00129 00130 assertEquals(log_, me, true, entry1 < entry2, "1. Mixed compare 1 with 2"); 00131 assertEquals(log_, me, true, entry3 < entry2, "2. Mixed compare 3 with 2"); 00132 00133 assertEquals(log_, me, true, entry4 < entry5, "3. Mixed compare 4 with 5"); 00134 assertEquals(log_, me, true, entry6 < entry5, "4. Mixed compare 6 with 5"); 00135 00136 log_.info(me, "test completed successfully"); 00137 } 00138 00139 00140 void testWithOnePublishEntry() 00141 { 00142 string me = ME + "::testWithOnePublishEntry"; 00143 log_.info(me, ""); 00144 log_.info(me, "this test creates a queue. The following checks are done:"); 00145 ClientQueueProperty prop(global_, ""); 00146 queue_ = &QueueFactory::getFactory().getPlugin(global_, prop); 00147 assertEquals(log_, me, true, queue_->empty(), "The queue must be empty after creation"); 00148 assertEquals(log_, me, 0, queue_->getNumOfEntries(), "The queue must be empty after creation"); 00149 PublishQos qos(global_); 00150 PublishKey key(global_); 00151 const string contentStr = "BlaBla"; 00152 MessageUnit messageUnit(key, contentStr, qos); 00153 PublishQueueEntry entry(global_, messageUnit, messageUnit.getQos().getPriority()); 00154 std::cout << "Putting " << entry.getUniqueId() << std::endl; 00155 00156 queue_->put(entry); 00157 assertEquals(log_, me, false, queue_->empty(), " 2. the queue must contain entries after invoking put one time"); 00158 assertEquals(log_, me, 1, queue_->getNumOfEntries(), " 2b. the queue must contain one entry after invoking put one time"); 00159 00160 vector<EntryType> ret = queue_->peekWithSamePriority(); 00161 assertEquals(log_, me, (size_t)1, ret.size(), " 3. the number of entries peeked after one put must be 1"); 00162 { 00163 const MsgQueueEntry &e = *ret[0]; 00164 std::cout << "Peeking " << e.getUniqueId() << std::endl; 00165 assertEquals(log_, me, entry.getUniqueId(), e.getUniqueId(), " 3. the uniqueId must be same"); 00166 assertEquals(log_, me, entry.getPriority(), e.getPriority(), " 3. the priority must be same"); 00167 } 00168 long numDel = queue_->randomRemove(ret.begin(), ret.end()); 00169 assertEquals(log_, me, (long)1, numDel, " 4. randomRemove must return 1 entry deleted"); 00170 assertEquals(log_, me, true, queue_->empty(), " 5. after removing all entries (it was only 1 entry) the queue must be empty"); 00171 log_.info(me, "ends here. Test was successful."); 00172 } 00173 00174 00175 void testWithOneConnectEntry() 00176 { 00177 string me = ME + "::testWithOneEntry"; 00178 log_.info(me, ""); 00179 log_.info(me, "this test creates a queue. The following checks are done:"); 00180 ClientQueueProperty prop(global_, ""); 00181 queue_ = &QueueFactory::getFactory().getPlugin(global_, prop); 00182 assertEquals(log_, me, true, queue_->empty(), " 1. the queue must be empty after creation"); 00183 ConnectQos *connQos = new ConnectQos(global_); 00184 ConnectQueueEntry entry(global_, connQos); 00185 queue_->put(entry); 00186 assertEquals(log_, me, false, queue_->empty(), " 2. the queue must contain entries after invoking put one time"); 00187 vector<EntryType> ret = queue_->peekWithSamePriority(); 00188 assertEquals(log_, me, (size_t)1, ret.size(), " 3. the number of entries peeked after one put must be 1"); 00189 assertEquals(log_, me, (long)1, queue_->randomRemove(ret.begin(), ret.end()), " 4. randomRemove must return 1 entry deleted"); 00190 assertEquals(log_, me, true, queue_->empty(), " 5. after removing all entries (it was only 1 entry) the queue must be empty"); 00191 log_.info(me, "ends here. Test was successful."); 00192 } 00193 00194 00195 void testOrder() 00196 { 00197 string me = ME + "::testOrder"; 00198 log_.info(me, ""); 00199 log_.info(me, "this test checks the order in which entries are returned from the queue"); 00200 ClientQueueProperty prop(global_, ""); 00201 queue_ = &QueueFactory::getFactory().getPlugin(global_, prop); 00202 ConnectQos *connQos = new ConnectQos(global_); 00203 00204 ConnectQueueEntry e1(global_, ConnectQosRef(new ConnectQos(global_)), 1); 00205 e1.getConnectQos()->addClientProperty("X", 7); 00206 queue_->put(e1); 00207 00208 ConnectQueueEntry e2(global_, ConnectQosRef(new ConnectQos(global_)), 5); // NORM_PRIORITY 00209 e2.getConnectQos()->addClientProperty("X", 4); 00210 queue_->put(e2); 00211 00212 ConnectQueueEntry e3(global_, ConnectQosRef(new ConnectQos(global_)), 7); 00213 e3.getConnectQos()->addClientProperty("X", 1); 00214 queue_->put(e3); 00215 00216 ConnectQueueEntry e4(global_, ConnectQosRef(new ConnectQos(global_)), 7); 00217 e4.getConnectQos()->addClientProperty("X", 2); 00218 queue_->put(e4); 00219 00220 ConnectQueueEntry e5(global_, ConnectQosRef(new ConnectQos(global_)), 1); // MIN1_PRIORITY 00221 e5.getConnectQos()->addClientProperty("X", 8); 00222 queue_->put(e5); 00223 00224 ConnectQueueEntry e6(global_, ConnectQosRef(new ConnectQos(global_)), 5); 00225 e6.getConnectQos()->addClientProperty("X", 5); 00226 queue_->put(e6); 00227 00228 ConnectQueueEntry e7(global_, ConnectQosRef(new ConnectQos(global_)), 5); 00229 e7.getConnectQos()->addClientProperty("X", 6); 00230 queue_->put(e7); 00231 00232 ConnectQueueEntry e8(global_, ConnectQosRef(new ConnectQos(global_)), 7); 00233 e8.getConnectQos()->addClientProperty("X", 3); 00234 queue_->put(e8); 00235 00236 ConnectQueueEntry e9(global_, connQos, 1); 00237 e9.getConnectQos()->addClientProperty("X", 9); // MAX_PRIORITY 00238 queue_->put(e9); 00239 00240 vector<EntryType> ret = queue_->peekWithSamePriority(); 00241 // should be 3 entries with priority 7 00242 assertEquals(log_, me, (size_t)3, ret.size(), "1. number of priority 7 msg peeked must be correct."); 00243 00244 const MsgQueueEntry &entry = *ret[0]; 00245 // TODO: 00246 // [cc] \xmlBlaster\testsuite\src\c++\TestQueue.cpp(245) : warning C4541: 00247 // 'dynamic_cast' used on polymorphic type 'org::xmlBlaster::util::queue::MsgQueueEntry' with /GR-; 00248 // unpredictable behavior may result 00249 //cout << "Trying dynamic cast" << endl; // On _WINDOWS: /GR to enable C++ RTTI didn't help (see build.xml) 00250 const ConnectQueueEntry *connectQueueEntry = dynamic_cast<const ConnectQueueEntry*>(&entry); 00251 assertEquals(log_, me, 1, connectQueueEntry->getConnectQos()->getClientProperty("X", -1), "2. checking the first entry."); 00252 assertEquals(log_, me, 2, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "3. checking the second entry."); 00253 assertEquals(log_, me, 3, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "4. checking the third entry."); 00254 00255 assertEquals(log_, me, false, queue_->empty(), "5. there should still be entries in the queue."); 00256 queue_->randomRemove(ret.begin(), ret.end()); 00257 ret = queue_->peekWithSamePriority(); 00258 assertEquals(log_, me, (size_t)3, ret.size(), "6. number of priority 7 msg peeked must be correct."); 00259 assertEquals(log_, me, 4, dynamic_cast<const ConnectQueueEntry*>(&(*ret[0]))->getConnectQos()->getClientProperty("X", -1), "7. checking the first entry."); 00260 assertEquals(log_, me, 5, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "8. checking the second entry."); 00261 assertEquals(log_, me, 6, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "9. checking the third entry."); 00262 00263 queue_->randomRemove(ret.begin(), ret.end()); 00264 ret = queue_->peekWithSamePriority(); 00265 assertEquals(log_, me, (size_t)3, ret.size(), "10. number of priority 7 msg peeked must be correct."); 00266 assertEquals(log_, me, 7, dynamic_cast<const ConnectQueueEntry*>(&(*ret[0]))->getConnectQos()->getClientProperty("X", -1), "11. checking the first entry."); 00267 assertEquals(log_, me, 8, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "12. checking the second entry."); 00268 assertEquals(log_, me, 9, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "13. checking the third entry."); 00269 queue_->randomRemove(ret.begin(), ret.end()); 00270 assertEquals(log_, me, true, queue_->empty(), "14. the queue should be empty now."); 00271 log_.info(me, "test ended successfully"); 00272 } 00273 00274 00275 void testMaxNumOfEntries() 00276 { 00277 string me = ME + "::testMaxNumOfEntries"; 00278 log_.info(me, ""); 00279 log_.info(me, "this test checks that an excess of entries really throws an exception"); 00280 ClientQueueProperty prop(global_, ""); 00281 prop.setMaxEntries(10); 00282 queue_ = &QueueFactory::getFactory().getPlugin(global_, prop); 00283 ConnectQosRef connQos = new ConnectQos(global_); 00284 connQos->setPersistent(false); 00285 int i=0; 00286 try { 00287 for (i=0; i < 10; i++) { 00288 if (i == 5) connQos->setPersistent(true); 00289 ConnectQueueEntry entry(global_, connQos); 00290 queue_->put(entry); 00291 } 00292 log_.info(me, "1. putting entries inside the queue: OK"); 00293 } 00294 catch (const XmlBlasterException &/*ex*/) { 00295 log_.error(me, "1. putting entries inside the queue: FAILED could not put inside the queue the entry nr. " + lexical_cast<string>(i)); 00296 assert(0); 00297 } 00298 try { 00299 ConnectQueueEntry entry(global_, connQos); 00300 queue_->put(entry); 00301 log_.error(me, "2. putting entries inside the queue: FAILED should have thrown an exception"); 00302 assert(0); 00303 } 00304 catch (const XmlBlasterException &ex) { 00305 assertEquals(log_, me, ex.getErrorCodeStr(), string("resource.overflow.queue.entries"), "3. checking that exceeding number of entries throws the correct exception."); 00306 queue_->clear(); 00307 } 00308 log_.info(me, "test ended successfully"); 00309 } 00310 00311 00312 void testMaxNumOfBytes() 00313 { 00314 string me = ME + "::testMaxNumOfBytes"; 00315 log_.info(me, ""); 00316 log_.info(me, "this test checks that an excess of size in bytes really throws an exception"); 00317 ClientQueueProperty prop(global_, ""); 00318 ConnectQos *connQos = new ConnectQos(global_); 00319 ConnectQueueEntry entry(global_, connQos); 00320 size_t maxBytes = 10 * entry.getSizeInBytes(); 00321 prop.setMaxBytes(maxBytes); 00322 queue_ = &QueueFactory::getFactory().getPlugin(global_, prop); 00323 00324 assertEquals(log_, me, maxBytes, (int)queue_->getMaxNumOfBytes(), "Setting maxNumOfBytes"); 00325 00326 int i=0; 00327 try { 00328 for (i=0; i < 10; i++) { 00329 ConnectQueueEntry ent(global_, connQos); 00330 log_.trace(me, "Putting entry " + lexical_cast<string>(i) + " to queue, size=" + lexical_cast<string>(ent.getSizeInBytes())); 00331 queue_->put(ent); 00332 } 00333 log_.info(me, "1. putting entries inside the queue: OK"); 00334 } 00335 catch (const XmlBlasterException &/*ex*/) { 00336 log_.error(me, "1. putting entries inside the queue: FAILED could not put inside the queue the entry no. " + lexical_cast<string>(i) + 00337 /*", entryBytes=" + lexical_cast<string>(entry->getNumOfBytes()) +*/ 00338 ", numOfEntries=" + lexical_cast<string>(queue_->getNumOfEntries()) + 00339 ", numOfBytes=" + lexical_cast<string>(queue_->getNumOfBytes()) + 00340 " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes())); 00341 assert(0); 00342 } 00343 try { 00344 ConnectQueueEntry ent(global_, connQos); 00345 queue_->put(ent); 00346 log_.error(me, string("2. putting entries inside the queue: FAILED should have thrown an exception currQueueByte=") + 00347 lexical_cast<string>(queue_->getNumOfBytes()) + 00348 " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes())); 00349 assert(0); 00350 } 00351 catch (const XmlBlasterException &ex) { 00352 assertEquals(log_, me, ex.getErrorCodeStr(), string("resource.overflow.queue.bytes"), 00353 string("3. checking that exceeding number of entries throws the correct exception. numOfBytes=") + 00354 lexical_cast<string>(queue_->getNumOfBytes()) + 00355 " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes())); 00356 } 00357 log_.info(me, "test ended successfully"); 00358 } 00359 00360 void setUp() 00361 { 00362 destroyQueue(); // Destroy old queue 00363 } 00364 00365 void tearDown() { 00366 if (queue_) { 00367 QueueFactory::getFactory().releasePlugin(queue_); 00368 queue_ = NULL; 00369 } 00370 } 00371 }; 00372 00373 }}} // namespace 00374 00375 00376 using namespace org::xmlBlaster::test; 00377 00379 int main(int args, char *argc[]) 00380 { 00381 org::xmlBlaster::util::Object_Lifetime_Manager::init(); 00382 00383 try { 00384 Global& glob = Global::getInstance(); 00385 glob.initialize(args, argc); 00386 00387 TestQueue testObj = TestQueue(glob, "TestQueue"); 00388 00389 for (std::vector<string>::size_type i=0; i < testObj.types.size(); i++) { 00390 glob.getProperty().setProperty("queue/connection/type", testObj.types[i], true); 00391 std::cout << "Testing queue type '" << glob.getProperty().get("queue/connection/type", string("eRRoR")) << "'" << std::endl; 00392 00393 testObj.setUp(); 00394 testObj.testPublishCompare(); 00395 testObj.tearDown(); 00396 00397 testObj.setUp(); 00398 testObj.testConnectCompare(); 00399 testObj.setUp(); 00400 testObj.tearDown(); 00401 00402 testObj.setUp(); 00403 testObj.testMixedCompare(); 00404 testObj.tearDown(); 00405 00406 testObj.setUp(); 00407 testObj.testWithOnePublishEntry(); 00408 testObj.tearDown(); 00409 00410 testObj.setUp(); 00411 testObj.testWithOneConnectEntry(); 00412 testObj.tearDown(); 00413 00414 testObj.setUp(); 00415 testObj.testOrder(); 00416 testObj.tearDown(); 00417 00418 testObj.setUp(); 00419 testObj.testMaxNumOfEntries(); 00420 testObj.tearDown(); 00421 00422 testObj.setUp(); 00423 testObj.testMaxNumOfBytes(); 00424 testObj.tearDown(); 00425 } 00426 } 00427 catch (const XmlBlasterException &e) { 00428 std::cerr << "TestQueue FAILED: " << e.getMessage() << std::endl; 00429 assert(0); 00430 00431 } 00432 00433 org::xmlBlaster::util::Object_Lifetime_Manager::fini(); 00434 return 0; 00435 } 00436 00437