1 /*------------------------------------------------------------------------------
  2 Name:      CacheQueuePlugin.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 #include <util/queue/CacheQueuePlugin.h>

  7 #include <util/queue/QueueFactory.h>

  8 #include <util/XmlBlasterException.h>

  9 #include <util/Global.h>

 10 #if defined (XMLBLASTER_PERSISTENT_QUEUE) || defined (XMLBLASTER_PERSISTENT_QUEUE_SQLITE3) // to compile on Windows

 11 #  include <util/queue/SQLiteQueuePlugin.h> // temporary for usage -> remove again

 12 #endif

 13 
 14 using namespace std;
 15 using namespace org::xmlBlaster::util;
 16 using namespace org::xmlBlaster::util::thread;
 17 using namespace org::xmlBlaster::util::qos::storage;
 18 
 19 namespace org { namespace xmlBlaster { namespace util { namespace queue {
 20 
 21 CacheQueuePlugin::CacheQueuePlugin(org::xmlBlaster::util::Global& global, const org::xmlBlaster::util::qos::storage::ClientQueueProperty& property)
 22    : ME("CacheQueuePlugin"), 
 23      global_(global), 
 24      log_(global.getLog("org.xmlBlaster.util.queue")), 
 25      property_(property), 
 26      transientQueueP_(0), 
 27      persistentQueueP_(0), 
 28      accessMutex_()
 29 {
 30    // TODO: type/version should be set from outside!!!

 31 
 32    transientQueueP_ = &QueueFactory::getFactory().getPlugin(global_, property, "RAM", "1.0");
 33 
 34    try {
 35       persistentQueueP_ = &QueueFactory::getFactory().getPlugin(global_, property, "SQLite", "1.0");
 36 
 37       // Note: On startup we can only load the highest priority in a bulk, peekWithSamePriority() does not support to get all!

 38       reloadFromPersistentStore();
 39    }
 40    catch (const XmlBlasterException &e) {
 41       log_.warn(ME, "No persistent queue is available, we continue RAM based. Reason: " + e.getMessage());
 42    }
 43    log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "]");
 44 }
 45 
 46 /*
 47 CacheQueuePlugin::CacheQueuePlugin(const CacheQueuePlugin& queue)
 48    : ME("CacheQueuePlugin"), 
 49      global_(queue.global_), 
 50      log_(queue.log_), 
 51      property_(queue.property_), 
 52      storage_(queue.storage_), 
 53      accessMutex_()
 54 {
 55    numOfBytes_ = queue.numOfBytes_;
 56 }
 57 
 58 CacheQueuePlugin& CacheQueuePlugin::operator =(const CacheQueuePlugin& queue)
 59 {
 60    Lock lock(queue.accessMutex_);
 61    property_   = queue.property_;
 62    storage_    = queue.storage_;
 63    numOfBytes_ = queue.numOfBytes_;
 64    return *this;
 65 
 66 }
 67 */
 68 
 69 CacheQueuePlugin::~CacheQueuePlugin()
 70 {
 71    if (log_.call()) log_.call(ME, "destructor");
 72    QueueFactory::getFactory().releasePlugin(transientQueueP_);
 73    if (persistentQueueP_) QueueFactory::getFactory().releasePlugin(persistentQueueP_);
 74 } 
 75 
 76 void CacheQueuePlugin::put(const MsgQueueEntry &entry)
 77 {
 78    if (log_.call()) log_.call(ME, "::put");
 79 
 80    Lock lock(accessMutex_);
 81    transientQueueP_->put(entry);
 82    if (persistentQueueP_) {
 83       if (entry.isPersistent()) {
 84          try {
 85            persistentQueueP_->put(entry);
 86          }
 87          catch (const XmlBlasterException &e) {
 88             log_.warn(ME, "Ignoring problem to put entry into persistent queue, we are handling it transient: " + e.getMessage());
 89          }
 90       }
 91    }
 92 }
 93 
 94 long CacheQueuePlugin::reloadFromPersistentStore() const
 95 {
 96    if (persistentQueueP_ && transientQueueP_->getNumOfEntries() == 0 && persistentQueueP_->getNumOfEntries() > 0) {
 97       // On startup shuffle them to the transient queue (only the highest priority is accessible with our I_Queue API)

 98       const vector<EntryType> vec = persistentQueueP_->peekWithSamePriority(-1, -1);
 99       long count = 0;
100       vector<EntryType>::const_iterator iter = vec.begin();
101       for (; iter != vec.end(); ++iter) {
102          const EntryType &entryType = (*iter);
103          transientQueueP_->put(*entryType);
104          count++;
105       }
106       return count;
107    }
108    return 0;
109 }
110 
111 const vector<EntryType> CacheQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const
112 {
113    Lock lock(accessMutex_);
114    vector<EntryType> vec = transientQueueP_->peekWithSamePriority(maxNumOfEntries, maxNumOfBytes);
115 
116    if (vec.size() == 0) {
117       long count = reloadFromPersistentStore();
118       if (count > 0) {
119          return transientQueueP_->peekWithSamePriority(maxNumOfEntries, maxNumOfBytes);
120       }
121    }
122 
123    return vec;
124 }
125 
126 
127 long CacheQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) 
128 {
129    Lock lock(accessMutex_);
130    long count = transientQueueP_->randomRemove(start, end);
131 
132    if (persistentQueueP_) {
133       vector<EntryType> persistents;
134       vector<EntryType>::const_iterator iter = start;
135       while (iter != end) {
136          const EntryType &entryType = (*iter);
137          if (entryType->isPersistent()) {
138             persistents.push_back(entryType);
139          }
140          iter++;
141       }
142       try {
143          persistentQueueP_->randomRemove(persistents.begin(), persistents.end());
144       }
145       catch (const XmlBlasterException &e) {
146          log_.warn(ME, "Ignoring problem to remove entry from persistent queue, we remove it from the transient queue only: " + e.getMessage());
147       }
148    }
149    return count;
150 }
151 
152 long CacheQueuePlugin::getNumOfEntries() const
153 {
154    return transientQueueP_->getNumOfEntries();
155 }
156 
157 long CacheQueuePlugin::getMaxNumOfEntries() const
158 {
159    return transientQueueP_->getMaxNumOfEntries();
160 }
161 
162 int64_t CacheQueuePlugin::getNumOfBytes() const
163 {
164    return transientQueueP_->getNumOfBytes();
165 }
166 
167 int64_t CacheQueuePlugin::getMaxNumOfBytes() const
168 {
169    return transientQueueP_->getMaxNumOfBytes();
170 }
171 
172 void CacheQueuePlugin::clear()
173 {
174    Lock lock(accessMutex_);
175    transientQueueP_->clear();
176    if (persistentQueueP_) {
177       try {
178          persistentQueueP_->clear();
179       }
180       catch (const XmlBlasterException &e) {
181          log_.warn(ME, "Ignoring problem to put entry into persistent queue, we are handling it transient: " + e.getMessage());
182       }
183    }
184 }
185 
186 bool CacheQueuePlugin::empty() const
187 {
188    return transientQueueP_->empty();
189 }
190 
191 void CacheQueuePlugin::destroy()
192 {
193    transientQueueP_->destroy();
194    if (persistentQueueP_) {
195       try {
196          persistentQueueP_->destroy();
197       }
198       catch (const XmlBlasterException &e) {
199          log_.warn(ME, "Ignoring problem to destroy the persistent queue: " + e.getMessage());
200       }
201    }
202 }
203 
204 string CacheQueuePlugin::usage()
205 {
206    std::string text = string("");
207    text += string("\nThe CACHE queue plugin configuration:");
208 #if defined (XMLBLASTER_PERSISTENT_QUEUE) || defined (XMLBLASTER_PERSISTENT_QUEUE_SQLITE3) // to compile on Windows

209    text += SQLiteQueuePlugin::usage();   // TODO: depending on persistency

210 #else

211    text += ClientQueueProperty::usage();
212 #endif

213    return text;
214 }
215 }}}} // namespace

216 
217 
218 


syntax highlighted by Code2HTML, v. 0.9.1