1 /*------------------------------------------------------------------------------
2 Name: SQLiteQueuePlugin.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Author: xmlBlaster@marcelruff.info
6 ------------------------------------------------------------------------------*/
7 #include <util/queue/SQLiteQueuePlugin.h>
8 #include <util/XmlBlasterException.h>
9 #include <util/Global.h>
10 #include <stdarg.h> // va_start for logging
11 #include <stdio.h> // vsnprintf for g++ 2.9x only
12 #include <string.h> // memset
13 #include <util/lexical_cast.h>
14 #include <util/MessageUnit.h>
15 #include <util/queue/ConnectQueueEntry.h>
16 #include <util/queue/SubscribeQueueEntry.h>
17 #include <util/queue/UnSubscribeQueueEntry.h>
18 #include <util/queue/PublishQueueEntry.h>
19 #include <socket/xmlBlasterSocket.h> // C xmlBlaster client library: for msgUnit serialize
20 #include <util/queue/QueueInterface.h> // The C implementation interface
21
22 using namespace std;
23 using namespace org::xmlBlaster::util;
24 using namespace org::xmlBlaster::util::thread;
25 using namespace org::xmlBlaster::util::qos::storage;
26 using namespace org::xmlBlaster::util::key;
27 using namespace org::xmlBlaster::util::qos;
28 using namespace org::xmlBlaster::client::qos;
29 using namespace org::xmlBlaster::client::key;
30
31 //static ::XmlBlasterLogging loggingFp = ::xmlBlasterDefaultLogging;
32 static void myLogger(void *logUserP,
33 XMLBLASTER_LOG_LEVEL currLevel,
34 XMLBLASTER_LOG_LEVEL level,
35 const char *location, const char *fmt, ...);
36
37 namespace org { namespace xmlBlaster { namespace util { namespace queue {
38
39 SQLiteQueuePlugin::SQLiteQueuePlugin(Global& global, const ClientQueueProperty& property)
40 : ME("SQLiteQueuePlugin"),
41 global_(global),
42 log_(global.getLog("org.xmlBlaster.util.queue")),
43 property_(property),
44 queueP_(0),
45 connectQosFactory_(global_),
46 statusQosFactory_(global_),
47 msgKeyFactory_(global_),
48 msgQosFactory_(global_),
49 accessMutex_()
50 {
51 if (log_.call()) log_.call(ME, "Constructor queue [" + getType() + "][" + getVersion() + "] ...");
52 /*
53 TODO: Pass basic configuration from plugin key/values similar to (see xmlBlaster.properties)
54 QueuePlugin[SQLite][1.0]=SQLiteQueuePlugin,
55 url=/${user.home}${file.separator}tmp${file.separator}$_{xmlBlaster_uniqueId}.db,\
56 user=sqlite,\
57 password=,\
58 connectionPoolSize=1,\
59 connectionBusyTimeout=90000,\
60 maxWaitingThreads=300,\
61 tableNamePrefix=XB_,\
62 entriesTableName=ENTRIES,\
63 dbAdmin=true
64 */
65 const std::string classRelating = "QueuePlugin["+getType()+"]["+getVersion()+"]"; // "QueuePlugin[SQLite][1.0]"
66 const std::string instanceRelating = property.getPropertyPrefix(); // == "connection"
67
68 // Should it be "queue/connection/tableNamePrefix" or "queue/QueuePlugin[SQLite][1.0]/tableNamePrefix"
69 // The first allows different instances with changing "connection" to e.g. "tailback" etc.
70 if (global_.getProperty().propertyExists(classRelating, true)) {
71 log_.warn(ME, "Your setting of property '" + classRelating + "' is not supported");
72 }
73
74 std::string defaultPath = ""; // for example: "/home/joe/tmp/" or "C:\Documents and Settings\marcel\tmp"
75 if (global_.getProperty().get("user.home", "") != "")
76 defaultPath = global_.getProperty().get("user.home", "") +
77 global_.getProperty().get("file.separator", "");
78 //+ "tmp" + // We currently can't create missing directories, TODO!!!
79 //global_.getProperty().get("file.separator", "");
80
81 const std::string url = global_.getProperty().get("queue/"+instanceRelating+"/url", defaultPath+"xmlBlasterClientCpp.db"); // "queue/connection/url"
82 const std::string queueName = global_.getProperty().get("queue/"+instanceRelating+"/queueName", instanceRelating + "_" + global_.getStrippedImmutableId()); // "connection_clientJoe2"
83 const std::string tableNamePrefix = global_.getProperty().get("queue/"+instanceRelating+"/tableNamePrefix", "XB_");// "queue/connection/tableNamePrefix"
84
85 ::ExceptionStruct exception;
86 ::QueueProperties queueProperties;
87 memset(&queueProperties, 0, sizeof(QueueProperties));
88
89 strncpy0(queueProperties.dbName, url.c_str(), QUEUE_DBNAME_MAX);
90 strncpy0(queueProperties.queueName, queueName.c_str(), QUEUE_ID_MAX);
91 strncpy0(queueProperties.tablePrefix, tableNamePrefix.c_str(), QUEUE_PREFIX_MAX);
92 queueProperties.maxNumOfEntries = (int32_t)property.getMaxEntries();
93 queueProperties.maxNumOfBytes = property.getMaxBytes();
94 queueProperties.logFp = myLogger;
95 queueProperties.logLevel = (log_.call() || log_.trace()) ? XMLBLASTER_LOG_TRACE : XMLBLASTER_LOG_INFO;
96 queueProperties.userObject = &log_;
97
98 queueP_ = createQueue(&queueProperties, &exception); // &log_ Used in myLogger(), see above
99 if (*exception.errorCode != 0) throw convertFromQueueException(&exception);
100
101 log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "], queue/"+instanceRelating+"/url='" +
102 queueProperties.dbName + "', queue/"+instanceRelating+"/queueName='" + queueProperties.queueName +
103 "', queue/"+instanceRelating+"/maxEntries=" + lexical_cast<string>(queueProperties.maxNumOfEntries));
104 }
105
106 /*
107 SQLiteQueuePlugin::SQLiteQueuePlugin(const SQLiteQueuePlugin& queue)
108 : ME("SQLiteQueuePlugin"),
109 global_(queue.global_),
110 log_(queue.log_),
111 property_(queue.property_),
112 queueP_(queue.queueP_),
113 accessMutex_()
114 {
115 }
116
117 SQLiteQueuePlugin& SQLiteQueuePlugin::operator =(const SQLiteQueuePlugin& queue)
118 {
119 Lock lock(queue.accessMutex_);
120 property_ = queue.property_;
121 queueP_ = queue.queueP_;
122 return *this;
123
124 }
125 */
126
127 SQLiteQueuePlugin::~SQLiteQueuePlugin()
128 {
129 if (log_.call()) log_.call(ME, "destructor");
130 if (queueP_) {
131 Lock lock(accessMutex_);
132 ::ExceptionStruct exception;
133 queueP_->shutdown(&queueP_, &exception); // NULLs the queueP_
134 if (*exception.errorCode != 0) {
135 const int ERRORSTR_LEN = 1024;
136 char errorString[ERRORSTR_LEN];
137 log_.warn(ME, string("Ignoring problem during shutdown: ") + getExceptionStr(errorString, ERRORSTR_LEN, &exception));
138 }
139 }
140 }
141
142 void SQLiteQueuePlugin::put(const MsgQueueEntry &entry)
143 {
144 if (log_.call()) log_.call(ME, "::put");
145 if (log_.dump()) log_.dump(ME+".put()", string("The msg entry is: ") + entry.toXml());
146
147 Lock lock(accessMutex_);
148 if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, put() failed");
149
150 ::ExceptionStruct exception;
151 ::QueueEntry queueEntry;
152
153 // Copy C++ to C struct ...
154
155 queueEntry.priority = entry.getPriority();
156 queueEntry.isPersistent = entry.isPersistent();
157 queueEntry.uniqueId = entry.getUniqueId();
158 queueEntry.sizeInBytes = entry.getSizeInBytes();
159 strncpy0(queueEntry.embeddedType, entry.getEmbeddedType().c_str(), QUEUE_ENTRY_EMBEDDEDTYPE_LEN); // "MSG_RAW|publish"
160 queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
161
162 // dump MsgQueueEntry with SOCKET protocol into C ::MsgUnit ...
163
164 const BlobHolder *blob = (const BlobHolder *)entry.getEmbeddedObject();
165 if (blob == 0) throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME, "put() failed, the entry " + entry.getLogId() + " returned NULL for embeddedObject");
166 queueEntry.embeddedBlob.data = blob->data;
167 queueEntry.embeddedBlob.dataLen = blob->dataLen;
168
169 if (log_.dump()) {
170 char *dumpP = blobDump(&queueEntry.embeddedBlob);
171 log_.dump(ME+".put()", string("Put blob to queue:") + dumpP);
172 ::xmlBlasterFree(dumpP);
173 }
174
175 // Push into C persistent queue ...
176
177 queueP_->put(queueP_, &queueEntry, &exception);
178
179 if (*exception.errorCode != 0) throw convertFromQueueException(&exception);
180 }
181
182 const vector<EntryType> SQLiteQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const
183 {
184 Lock lock(accessMutex_);
185 if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, peekWithSamePriority() failed");
186 vector<EntryType> ret;
187 if (queueP_->empty(queueP_)) return ret;
188 if (log_.call()) log_.call(ME, "peekWithSamePriority maxNumOfEntries=" + lexical_cast<std::string>(maxNumOfEntries) + " maxNumOfBytes=" + lexical_cast<std::string>(maxNumOfBytes));
189
190 ::ExceptionStruct exception;
191 ::QueueEntryArr *entriesC = queueP_->peekWithSamePriority(queueP_, (int32_t)maxNumOfEntries, maxNumOfBytes, &exception);
192 if (*exception.errorCode != 0) throw convertFromQueueException(&exception);
193
194 // Now we need to copy the C results into C++ classes ...
195
196 for (size_t ii=0; ii<entriesC->len; ii++) {
197 ::QueueEntry &queueEntryC = entriesC->queueEntryArr[ii];
198 string type, methodName;
199 parseEmbeddedType(queueEntryC.embeddedType, type, methodName);
200
201 if (type != Constants::ENTRY_TYPE_MSG_RAW) {
202 string embedded = queueEntryC.embeddedType;
203 freeQueueEntryArr(entriesC);
204 throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::peekWithSamePriority", string("The queue entry embeddedType '") + embedded + "' type='" + type + "' is not supported");
205 }
206
207 if (log_.dump()) {
208 char *dumpP = blobDump(&queueEntryC.embeddedBlob);
209 log_.dump(ME+".peekWithSamePriority()", string("Retrieved blob from queue:") + dumpP);
210 ::xmlBlasterFree(dumpP);
211 }
212
213 ::MsgUnitArr *msgUnitArrC = ::parseMsgUnitArr(queueEntryC.embeddedBlob.dataLen, queueEntryC.embeddedBlob.data);
214
215 for (size_t j=0; msgUnitArrC!=0 && j<msgUnitArrC->len; j++) { // TODO: Collect a PUBLISH_ARR !!! (currently we transform it to single publish()es)
216 ::MsgUnit &msgUnit = msgUnitArrC->msgUnitArr[j];
217 if (log_.dump()) {
218 char *dumpP = ::messageUnitToXmlLimited(&msgUnit, 128);
219 log_.dump(ME+".peekWithSamePriority()", string("Retrieved and parsed C message from queue:") + dumpP);
220 ::xmlBlasterFree(dumpP);
221 }
222 if (methodName == MethodName::PUBLISH) {
223 MsgKeyData msgKeyData = msgKeyFactory_.readObject(string(msgUnit.key));
224 MsgQosData msgQosData = msgQosFactory_.readObject(string(msgUnit.qos));
225 MessageUnit messageUnit(msgKeyData, msgUnit.contentLen, (const unsigned char*)msgUnit.content, msgQosData);
226 PublishQueueEntry *pq = new PublishQueueEntry(global_, messageUnit,
227 queueEntryC.priority, queueEntryC.uniqueId);
228 if (log_.trace()) log_.trace(ME, "Got PublishQueueEntry from queue");
229 ret.insert(ret.end(), EntryType(*pq));
230 if (log_.trace()) log_.trace(ME, "PublishQueueEntry is reference countet");
231 }
232 else if (methodName == MethodName::CONNECT) {
233 ConnectQosRef connectQos = connectQosFactory_.readObject(string(msgUnit.qos));
234 ConnectQueueEntry *pq = new ConnectQueueEntry(global_, connectQos,
235 queueEntryC.priority, queueEntryC.uniqueId);
236 if (log_.trace()) log_.trace(ME, "Got ConnectQueueEntry from queue");
237 ret.insert(ret.end(), EntryType(*pq));
238 if (log_.trace()) log_.trace(ME, "ConnectQueueEntry is reference countet");
239 }
240 /* TODO: queryKeyFactory and queryQosFactory!
241 else if (methodName == MethodName::SUBSCRIBE) {
242 QueryKeyData queryKeyData = queryKeyFactory_.readObject(string(msgUnit.key));
243 SubscribeKey subscribeKey(global_, queryKeyData);
244 QueryQosData queryQosData = queryQosFactory_.readObject(string(msgUnit.qos));
245 SubscribeQos subscribeQos(global_, queryQosData);
246 SubscribeQueueEntry *pq = new SubscribeQueueEntry(global_, subscribeKey, subscribeQos,
247 queueEntryC.priority, queueEntryC.uniqueId);
248 if (log_.trace()) log_.trace(ME, "Got SubscribeQueueEntry from queue");
249 ret.insert(ret.end(), EntryType(*pq));
250 if (log_.trace()) log_.trace(ME, "SubscribeQueueEntry is reference countet");
251 }
252 */
253 else { // TODO: How to handle: throw exception or remove the buggy entry?
254 log_.error(ME + "::peekWithSamePriority", string("The queue entry embeddedType '") + queueEntryC.embeddedType + "' methodName='" + methodName + "' is not supported, we ignore it.");
255 }
256 }
257
258 freeMsgUnitArr(msgUnitArrC);
259 }
260
261 freeQueueEntryArr(entriesC);
262 return ret;
263 }
264
265 void SQLiteQueuePlugin::parseEmbeddedType(const string& embeddedType, string &type, string &methodName)
266 {
267 string::size_type pos = embeddedType.find("|");
268 if (pos == string::npos) {
269 type = embeddedType;
270 methodName = "";
271 return;
272 }
273 type = embeddedType.substr(0, pos);
274 if (pos < embeddedType.size())
275 methodName = embeddedType.substr(pos+1);
276 // No trim(): we assume no white spaces
277 }
278
279 long SQLiteQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end)
280 {
281 Lock lock(accessMutex_);
282 if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, randomRemove() failed");
283 if (start == end || queueP_->empty(queueP_)) return 0;
284
285 ::QueueEntryArr queueEntryArr;
286 memset(&queueEntryArr, 0, sizeof(QueueEntryArr));
287 {
288 vector<EntryType>::const_iterator iter = start;
289 while (iter != end) {
290 iter++;
291 queueEntryArr.len++;
292 }
293 }
294 if (queueEntryArr.len < 1) return 0;
295
296 queueEntryArr.queueEntryArr = (QueueEntry *)calloc(queueEntryArr.len, sizeof(QueueEntry));
297
298 vector<EntryType>::const_iterator iter = start;
299 for (int currIndex=0; iter != end; ++iter, currIndex++) {
300 const EntryType &entryType = (*iter);
301 const MsgQueueEntry &entry = *entryType;
302 ::QueueEntry &queueEntry = queueEntryArr.queueEntryArr[currIndex];
303
304 // Copy C++ to C struct ...
305
306 queueEntry.priority = entry.getPriority();
307 queueEntry.isPersistent = entry.isPersistent();
308 queueEntry.uniqueId = entry.getUniqueId();
309 queueEntry.sizeInBytes = entry.getSizeInBytes();
310 strncpy0(queueEntry.embeddedType, entry.getEmbeddedType().c_str(), QUEUE_ENTRY_EMBEDDEDTYPE_LEN); // "MSG_RAW|publish" "MSG_RAW|connect"
311 queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
312 /*
313 const BlobHolder *blob = (const BlobHolder *)entry.getEmbeddedObject();
314 if (blob == 0) throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME, "put() failed, the entry " + entry.getLogId() + " returned NULL for embeddedObject");
315 queueEntry.embeddedBlob.data = blob->data;
316 queueEntry.embeddedBlob.dataLen = blob->dataLen;
317 */
318 if (log_.dump()) {
319 char *dumpP = ::queueEntryToXml(&queueEntry, 200);
320 log_.dump(ME+".put()", string("Put blob to queue:") + dumpP);
321 xmlBlasterFree(dumpP);
322 }
323 }
324
325 ::ExceptionStruct exception;
326
327 int32_t numRemoved = queueP_->randomRemove(queueP_, &queueEntryArr, &exception);
328
329 freeQueueEntryArrInternal(&queueEntryArr);
330
331 if (*exception.errorCode != 0) throw convertFromQueueException(&exception);
332 return (long)numRemoved;
333 }
334
335 long SQLiteQueuePlugin::getNumOfEntries() const
336 {
337 if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfEntries() failed");
338 return queueP_->getNumOfEntries(queueP_);
339 }
340
341 long SQLiteQueuePlugin::getMaxNumOfEntries() const
342 {
343 if (queueP_ == 0) return property_.getMaxEntries(); // throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfEntries() failed");
344 return queueP_->getMaxNumOfEntries(queueP_);
345 }
346
347 int64_t SQLiteQueuePlugin::getNumOfBytes() const
348 {
349 if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfBytes() failed");
350 return queueP_->getNumOfBytes(queueP_);
351 }
352
353 int64_t SQLiteQueuePlugin::getMaxNumOfBytes() const
354 {
355 if (queueP_ == 0) return property_.getMaxBytes(); // throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getMaxNumOfBytes() failed");
356 return queueP_->getMaxNumOfBytes(queueP_);
357 }
358
359 void SQLiteQueuePlugin::clear()
360 {
361 Lock lock(accessMutex_);
362 if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, clear() failed");
363 ::ExceptionStruct exception;
364 queueP_->clear(queueP_, &exception);
365 }
366
367
368 bool SQLiteQueuePlugin::empty() const
369 {
370 if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, empty() failed");
371 return queueP_->empty(queueP_);
372 }
373
374 void SQLiteQueuePlugin::destroy()
375 {
376 if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, destroy() failed");
377 ::ExceptionStruct exception;
378 queueP_->destroy(&queueP_, &exception);
379 if (*exception.errorCode != 0) throw convertFromQueueException(&exception);
380 }
381
382 // Exception conversion ....
383 org::xmlBlaster::util::XmlBlasterException SQLiteQueuePlugin::convertFromQueueException(const ::ExceptionStruct *ex) const
384 {
385 return org::xmlBlaster::util::XmlBlasterException(
386 (*ex->errorCode=='\0')?string("internal.unknown"):string(ex->errorCode),
387 string(""),
388 ME,
389 "en",
390 string(ex->message),
391 global_.getVersion() + " " + global_.getBuildTimestamp());
392 }
393
394 string SQLiteQueuePlugin::usage()
395 {
396 std::string text = string("");
397 text += string("\nThe SQLite persistent queue plugin configuration:");
398 text += string("\n -queue/connection/url [xmlBlasterClientCpp.db]");
399 text += string("\n The database file name (incl. path), defaults to the current directory.");
400 text += string("\n -queue/connection/tableNamePrefix [XB_]");
401 text += string("\n The prefix for all tables in the database.");
402 text += ClientQueueProperty::usage();
403 return text;
404 }
405
406 }}}} // namespace
407
408
409 /**
410 * Customized logging output is handled by this method.
411 * We redirect logging output from the C implementation to our C++ logging plugin.
412 * <p>
413 * Please compile with <code>XMLBLASTER_PERSISTENT_QUEUE_SQLITE3</code> defined.
414 * </p>
415 * @param queueP
416 * @param currLevel The actual log level of the client
417 * @param level The level of this log entry
418 * @param location A string describing the code place
419 * @param fmt The formatting string
420 * @param ... Other variables to log, corresponds to 'fmt'
421 * @see xmlBlaster/src/c/msgUtil.c: xmlBlasterDefaultLogging() is the default
422 * implementation
423 */
424 static void myLogger(void *logUserP,
425 XMLBLASTER_LOG_LEVEL currLevel,
426 XMLBLASTER_LOG_LEVEL level,
427 const char *location, const char *fmt, ...)
428 {
429 /* Guess we need no more than 200 bytes. */
430 int n, size = 200;
431 char *p = 0;
432 va_list ap;
433 ::I_Queue *queueP = (::I_Queue *)logUserP;
434
435 //org::xmlBlaster::util::queue::SQLiteQueuePlugin *pluginP =
436 // (org::xmlBlaster::util::queue::SQLiteQueuePlugin *)queueP->userObject;
437 //org::xmlBlaster::util::I_Log& log = pluginP->getLog();
438
439 if (queueP->userObject == 0) {
440 std::cout << "myLogger not initialized" << std::endl;
441 return;
442 }
443 org::xmlBlaster::util::I_Log& log = *((org::xmlBlaster::util::I_Log*)queueP->userObject);
444
445 if (level > currLevel) { /* XMLBLASTER_LOG_ERROR, XMLBLASTER_LOG_WARN, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_TRACE */
446 return;
447 }
448 if ((p = (char *)malloc (size)) == NULL)
449 return;
450
451 for (;;) {
452 /* Try to print in the allocated space. */
453 va_start(ap, fmt);
454 n = VSNPRINTF(p, size, fmt, ap); /* UNIX: vsnprintf(), WINDOWS: _vsnprintf() */
455 va_end(ap);
456 /* If that worked, print the string to console. */
457 if (n > -1 && n < size) {
458 if (level == XMLBLASTER_LOG_INFO)
459 log.info(location, p);
460 else if (level == XMLBLASTER_LOG_WARN)
461 log.warn(location, p);
462 else if (level == XMLBLASTER_LOG_ERROR)
463 log.error(location, p);
464 else
465 log.trace(location, p);
466 free(p);
467 return;
468 }
469 /* Else try again with more space. */
470 if (n > -1) /* glibc 2.1 */
471 size = n+1; /* precisely what is needed */
472 else /* glibc 2.0 */
473 size *= 2; /* twice the old size */
474 if ((p = (char *)realloc (p, size)) == NULL) {
475 return;
476 }
477 }
478 }
479
syntax highlighted by Code2HTML, v. 0.9.1