1 /*------------------------------------------------------------------------------
2 Name: CacheQueuePlugin.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 #include <util/queue/CacheQueuePlugin.h>
7 #include <util/queue/QueueFactory.h>
8 #include <util/XmlBlasterException.h>
9 #include <util/Global.h>
10 #if defined (XMLBLASTER_PERSISTENT_QUEUE) || defined (XMLBLASTER_PERSISTENT_QUEUE_SQLITE3) // to compile on Windows
11 # include <util/queue/SQLiteQueuePlugin.h> // temporary for usage -> remove again
12 #endif
13
14 using namespace std;
15 using namespace org::xmlBlaster::util;
16 using namespace org::xmlBlaster::util::thread;
17 using namespace org::xmlBlaster::util::qos::storage;
18
19 namespace org { namespace xmlBlaster { namespace util { namespace queue {
20
21 CacheQueuePlugin::CacheQueuePlugin(org::xmlBlaster::util::Global& global, const org::xmlBlaster::util::qos::storage::ClientQueueProperty& property)
22 : ME("CacheQueuePlugin"),
23 global_(global),
24 log_(global.getLog("org.xmlBlaster.util.queue")),
25 property_(property),
26 transientQueueP_(0),
27 persistentQueueP_(0),
28 accessMutex_()
29 {
30 // TODO: type/version should be set from outside!!!
31
32 transientQueueP_ = &QueueFactory::getFactory().getPlugin(global_, property, "RAM", "1.0");
33
34 try {
35 persistentQueueP_ = &QueueFactory::getFactory().getPlugin(global_, property, "SQLite", "1.0");
36
37 // Note: On startup we can only load the highest priority in a bulk, peekWithSamePriority() does not support to get all!
38 reloadFromPersistentStore();
39 }
40 catch (const XmlBlasterException &e) {
41 log_.warn(ME, "No persistent queue is available, we continue RAM based. Reason: " + e.getMessage());
42 }
43 log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "]");
44 }
45
46 /*
47 CacheQueuePlugin::CacheQueuePlugin(const CacheQueuePlugin& queue)
48 : ME("CacheQueuePlugin"),
49 global_(queue.global_),
50 log_(queue.log_),
51 property_(queue.property_),
52 storage_(queue.storage_),
53 accessMutex_()
54 {
55 numOfBytes_ = queue.numOfBytes_;
56 }
57
58 CacheQueuePlugin& CacheQueuePlugin::operator =(const CacheQueuePlugin& queue)
59 {
60 Lock lock(queue.accessMutex_);
61 property_ = queue.property_;
62 storage_ = queue.storage_;
63 numOfBytes_ = queue.numOfBytes_;
64 return *this;
65
66 }
67 */
68
69 CacheQueuePlugin::~CacheQueuePlugin()
70 {
71 if (log_.call()) log_.call(ME, "destructor");
72 QueueFactory::getFactory().releasePlugin(transientQueueP_);
73 if (persistentQueueP_) QueueFactory::getFactory().releasePlugin(persistentQueueP_);
74 }
75
76 void CacheQueuePlugin::put(const MsgQueueEntry &entry)
77 {
78 if (log_.call()) log_.call(ME, "::put");
79
80 Lock lock(accessMutex_);
81 transientQueueP_->put(entry);
82 if (persistentQueueP_) {
83 if (entry.isPersistent()) {
84 try {
85 persistentQueueP_->put(entry);
86 }
87 catch (const XmlBlasterException &e) {
88 log_.warn(ME, "Ignoring problem to put entry into persistent queue, we are handling it transient: " + e.getMessage());
89 }
90 }
91 }
92 }
93
94 long CacheQueuePlugin::reloadFromPersistentStore() const
95 {
96 if (persistentQueueP_ && transientQueueP_->getNumOfEntries() == 0 && persistentQueueP_->getNumOfEntries() > 0) {
97 // On startup shuffle them to the transient queue (only the highest priority is accessible with our I_Queue API)
98 const vector<EntryType> vec = persistentQueueP_->peekWithSamePriority(-1, -1);
99 long count = 0;
100 vector<EntryType>::const_iterator iter = vec.begin();
101 for (; iter != vec.end(); ++iter) {
102 const EntryType &entryType = (*iter);
103 transientQueueP_->put(*entryType);
104 count++;
105 }
106 return count;
107 }
108 return 0;
109 }
110
111 const vector<EntryType> CacheQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const
112 {
113 Lock lock(accessMutex_);
114 vector<EntryType> vec = transientQueueP_->peekWithSamePriority(maxNumOfEntries, maxNumOfBytes);
115
116 if (vec.size() == 0) {
117 long count = reloadFromPersistentStore();
118 if (count > 0) {
119 return transientQueueP_->peekWithSamePriority(maxNumOfEntries, maxNumOfBytes);
120 }
121 }
122
123 return vec;
124 }
125
126
127 long CacheQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end)
128 {
129 Lock lock(accessMutex_);
130 long count = transientQueueP_->randomRemove(start, end);
131
132 if (persistentQueueP_) {
133 vector<EntryType> persistents;
134 vector<EntryType>::const_iterator iter = start;
135 while (iter != end) {
136 const EntryType &entryType = (*iter);
137 if (entryType->isPersistent()) {
138 persistents.push_back(entryType);
139 }
140 iter++;
141 }
142 try {
143 persistentQueueP_->randomRemove(persistents.begin(), persistents.end());
144 }
145 catch (const XmlBlasterException &e) {
146 log_.warn(ME, "Ignoring problem to remove entry from persistent queue, we remove it from the transient queue only: " + e.getMessage());
147 }
148 }
149 return count;
150 }
151
152 long CacheQueuePlugin::getNumOfEntries() const
153 {
154 return transientQueueP_->getNumOfEntries();
155 }
156
157 long CacheQueuePlugin::getMaxNumOfEntries() const
158 {
159 return transientQueueP_->getMaxNumOfEntries();
160 }
161
162 int64_t CacheQueuePlugin::getNumOfBytes() const
163 {
164 return transientQueueP_->getNumOfBytes();
165 }
166
167 int64_t CacheQueuePlugin::getMaxNumOfBytes() const
168 {
169 return transientQueueP_->getMaxNumOfBytes();
170 }
171
172 void CacheQueuePlugin::clear()
173 {
174 Lock lock(accessMutex_);
175 transientQueueP_->clear();
176 if (persistentQueueP_) {
177 try {
178 persistentQueueP_->clear();
179 }
180 catch (const XmlBlasterException &e) {
181 log_.warn(ME, "Ignoring problem to put entry into persistent queue, we are handling it transient: " + e.getMessage());
182 }
183 }
184 }
185
186 bool CacheQueuePlugin::empty() const
187 {
188 return transientQueueP_->empty();
189 }
190
191 void CacheQueuePlugin::destroy()
192 {
193 transientQueueP_->destroy();
194 if (persistentQueueP_) {
195 try {
196 persistentQueueP_->destroy();
197 }
198 catch (const XmlBlasterException &e) {
199 log_.warn(ME, "Ignoring problem to destroy the persistent queue: " + e.getMessage());
200 }
201 }
202 }
203
204 string CacheQueuePlugin::usage()
205 {
206 std::string text = string("");
207 text += string("\nThe CACHE queue plugin configuration:");
208 #if defined (XMLBLASTER_PERSISTENT_QUEUE) || defined (XMLBLASTER_PERSISTENT_QUEUE_SQLITE3) // to compile on Windows
209 text += SQLiteQueuePlugin::usage(); // TODO: depending on persistency
210 #else
211 text += ClientQueueProperty::usage();
212 #endif
213 return text;
214 }
215 }}}} // namespace
216
217
218
syntax highlighted by Code2HTML, v. 0.9.1