00001 /*------------------------------------------------------------------------------ 00002 Name: CacheQueuePlugin.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 ------------------------------------------------------------------------------*/ 00006 #include <util/queue/CacheQueuePlugin.h> 00007 #include <util/queue/QueueFactory.h> 00008 #include <util/XmlBlasterException.h> 00009 #include <util/Global.h> 00010 #ifdef XMLBLASTER_PERSISTENT_QUEUE // to compile on Windows 00011 # include <util/queue/SQLiteQueuePlugin.h> // temporary for usage -> remove again 00012 #endif 00013 00014 using namespace std; 00015 using namespace org::xmlBlaster::util; 00016 using namespace org::xmlBlaster::util::thread; 00017 using namespace org::xmlBlaster::util::qos::storage; 00018 00019 namespace org { namespace xmlBlaster { namespace util { namespace queue { 00020 00021 CacheQueuePlugin::CacheQueuePlugin(org::xmlBlaster::util::Global& global, const org::xmlBlaster::util::qos::storage::ClientQueueProperty& property) 00022 : ME("CacheQueuePlugin"), 00023 global_(global), 00024 log_(global.getLog("org.xmlBlaster.util.queue")), 00025 property_(property), 00026 transientQueueP_(0), 00027 persistentQueueP_(0), 00028 accessMutex_() 00029 { 00030 // TODO: type/version should be set from outside!!! 00031 00032 transientQueueP_ = &QueueFactory::getFactory().getPlugin(global_, property, "RAM", "1.0"); 00033 00034 try { 00035 persistentQueueP_ = &QueueFactory::getFactory().getPlugin(global_, property, "SQLite", "1.0"); 00036 00037 // Note: On startup we can only load the highest priority in a bulk, peekWithSamePriority() does not support to get all! 00038 reloadFromPersistentStore(); 00039 } 00040 catch (const XmlBlasterException &e) { 00041 log_.warn(ME, "No persistent queue is available, we continue RAM based. Reason: " + e.getMessage()); 00042 } 00043 log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "]"); 00044 } 00045 00046 /* 00047 CacheQueuePlugin::CacheQueuePlugin(const CacheQueuePlugin& queue) 00048 : ME("CacheQueuePlugin"), 00049 global_(queue.global_), 00050 log_(queue.log_), 00051 property_(queue.property_), 00052 storage_(queue.storage_), 00053 accessMutex_() 00054 { 00055 numOfBytes_ = queue.numOfBytes_; 00056 } 00057 00058 CacheQueuePlugin& CacheQueuePlugin::operator =(const CacheQueuePlugin& queue) 00059 { 00060 Lock lock(queue.accessMutex_); 00061 property_ = queue.property_; 00062 storage_ = queue.storage_; 00063 numOfBytes_ = queue.numOfBytes_; 00064 return *this; 00065 00066 } 00067 */ 00068 00069 CacheQueuePlugin::~CacheQueuePlugin() 00070 { 00071 if (log_.call()) log_.call(ME, "destructor"); 00072 QueueFactory::getFactory().releasePlugin(transientQueueP_); 00073 if (persistentQueueP_) QueueFactory::getFactory().releasePlugin(persistentQueueP_); 00074 } 00075 00076 void CacheQueuePlugin::put(const MsgQueueEntry &entry) 00077 { 00078 if (log_.call()) log_.call(ME, "::put"); 00079 00080 Lock lock(accessMutex_); 00081 transientQueueP_->put(entry); 00082 if (persistentQueueP_) { 00083 if (entry.isPersistent()) { 00084 try { 00085 persistentQueueP_->put(entry); 00086 } 00087 catch (const XmlBlasterException &e) { 00088 log_.warn(ME, "Ignoring problem to put entry into persistent queue, we are handling it transient: " + e.getMessage()); 00089 } 00090 } 00091 } 00092 } 00093 00094 long CacheQueuePlugin::reloadFromPersistentStore() const 00095 { 00096 if (persistentQueueP_ && transientQueueP_->getNumOfEntries() == 0 && persistentQueueP_->getNumOfEntries() > 0) { 00097 // On startup shuffle them to the transient queue (only the highest priority is accessible with our I_Queue API) 00098 const vector<EntryType> vec = persistentQueueP_->peekWithSamePriority(-1, -1); 00099 long count = 0; 00100 vector<EntryType>::const_iterator iter = vec.begin(); 00101 for (; iter != vec.end(); ++iter) { 00102 const EntryType &entryType = (*iter); 00103 transientQueueP_->put(*entryType); 00104 count++; 00105 } 00106 return count; 00107 } 00108 return 0; 00109 } 00110 00111 const vector<EntryType> CacheQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const 00112 { 00113 Lock lock(accessMutex_); 00114 vector<EntryType> vec = transientQueueP_->peekWithSamePriority(maxNumOfEntries, maxNumOfBytes); 00115 00116 if (vec.size() == 0) { 00117 long count = reloadFromPersistentStore(); 00118 if (count > 0) { 00119 return transientQueueP_->peekWithSamePriority(maxNumOfEntries, maxNumOfBytes); 00120 } 00121 } 00122 00123 return vec; 00124 } 00125 00126 00127 long CacheQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) 00128 { 00129 Lock lock(accessMutex_); 00130 long count = transientQueueP_->randomRemove(start, end); 00131 00132 if (persistentQueueP_) { 00133 vector<EntryType> persistents; 00134 vector<EntryType>::const_iterator iter = start; 00135 while (iter != end) { 00136 const EntryType &entryType = (*iter); 00137 if (entryType->isPersistent()) { 00138 persistents.push_back(entryType); 00139 } 00140 iter++; 00141 } 00142 try { 00143 persistentQueueP_->randomRemove(persistents.begin(), persistents.end()); 00144 } 00145 catch (const XmlBlasterException &e) { 00146 log_.warn(ME, "Ignoring problem to remove entry from persistent queue, we remove it from the transient queue only: " + e.getMessage()); 00147 } 00148 } 00149 return count; 00150 } 00151 00152 long CacheQueuePlugin::getNumOfEntries() const 00153 { 00154 return transientQueueP_->getNumOfEntries(); 00155 } 00156 00157 long CacheQueuePlugin::getMaxNumOfEntries() const 00158 { 00159 return transientQueueP_->getMaxNumOfEntries(); 00160 } 00161 00162 int64_t CacheQueuePlugin::getNumOfBytes() const 00163 { 00164 return transientQueueP_->getNumOfBytes(); 00165 } 00166 00167 int64_t CacheQueuePlugin::getMaxNumOfBytes() const 00168 { 00169 return transientQueueP_->getMaxNumOfBytes(); 00170 } 00171 00172 void CacheQueuePlugin::clear() 00173 { 00174 Lock lock(accessMutex_); 00175 transientQueueP_->clear(); 00176 if (persistentQueueP_) { 00177 try { 00178 persistentQueueP_->clear(); 00179 } 00180 catch (const XmlBlasterException &e) { 00181 log_.warn(ME, "Ignoring problem to put entry into persistent queue, we are handling it transient: " + e.getMessage()); 00182 } 00183 } 00184 } 00185 00186 bool CacheQueuePlugin::empty() const 00187 { 00188 return transientQueueP_->empty(); 00189 } 00190 00191 void CacheQueuePlugin::destroy() 00192 { 00193 transientQueueP_->destroy(); 00194 if (persistentQueueP_) { 00195 try { 00196 persistentQueueP_->destroy(); 00197 } 00198 catch (const XmlBlasterException &e) { 00199 log_.warn(ME, "Ignoring problem to destroy the persistent queue: " + e.getMessage()); 00200 } 00201 } 00202 } 00203 00204 string CacheQueuePlugin::usage() 00205 { 00206 std::string text = string(""); 00207 text += string("\nThe CACHE queue plugin configuration:"); 00208 #ifdef XMLBLASTER_PERSISTENT_QUEUE // to compile on Windows 00209 text += SQLiteQueuePlugin::usage(); // TODO: depending on persistency 00210 #else 00211 text += ClientQueueProperty::usage(); 00212 #endif 00213 return text; 00214 } 00215 }}}} // namespace 00216 00217 00218