1 /*--UNFINISHED SEE TODOS--------------------------------------------------------------------------
2 Name: SQLite3Queue.c
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: A persistent queue implementation based on the SQLite relational database
6 Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h)
7 and can easily be used outside of xmlBlaster.
8 Further you need sqlite.h and the sqlite library (dll,so,sl)
9 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info> 's brother
10 Date: 04/2004
11 Compile: Compiles at least on Windows, Linux, Solaris. Further porting should be simple.
12 Needs pthread.h but not the pthread library (for exact times)
13
14 export LD_LIBRARY_PATH=/opt/sqlite-bin/lib
15 gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLite3Queue SQLiteQueue.c ../helper.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite3
16 (use optionally -ansi -pedantic -Wno-long-long
17 (Intel C: icc -wd981 ...)
18
19 Compile inside xmlBlaster:
20 build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c
21 expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so
22
23 Testcompile on Windows
24
25 create sqlite3.lib from sqlite3.def via:
26 lib /DEF:sqlite3.def
27
28 ( /I\c\sqlite3 says where sqlite3.h resides ):
29 cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /DSQLITE3=1 /D_WINDOWS /I\c\sqlite3 /I..\.. Sqlite3Queue.c ..\helper.c /link \pialibs\sqlite3.lib
30
31 Table layout XB_ENTRIES:
32 dataId bigint
33 queueName text
34 prio integer
35 flag text
36 durable char(1)
37 byteSize bigint
38 blob bytea
39 PRIMARY KEY (dataId, queueName)
40
41 Todo: Tuning:
42 - Add prio to PRIMARY KEY
43 - In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes
44
45 @see: http://www.sqlite.org/
46 @see: http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html
47 @see: http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.html
48 @see: http://www.sqlite.org/threadsafe.html sqlite3 default is thread-safe (serialized)
49 Testsuite: xmlBlaster/testsuite/src/c/TestQueue.c
50 -----------------------------------------------------------------------------*/
51 #include <stdio.h>
52 #include <string.h>
53 #include <malloc.h>
54 #if !defined(_WINDOWS)
55 # include <unistd.h> /* unlink() */
56 # include <errno.h> /* unlink() */
57 #endif
58 #include "util/queue/QueueInterface.h"
59
60 /*#ifdef QUEUE_MAIN
61 # ifdef Dll_Export
62 # undef Dll_Export
63 # endif
64 # define Dll_Export
65 #endif*/
66
67 # include "sqlite3.h"
68 static void xb_sqlite_free(char * pdata) { sqlite3_free(pdata); }
69
70 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception);
71 static const QueueProperties *getProperties(I_Queue *queueP);
72 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception);
73 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception);
74 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception);
75 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception);
76 static int32_t getNumOfEntries(I_Queue *queueP);
77 static int32_t getMaxNumOfEntries(I_Queue *queueP);
78 static int64_t getNumOfBytes(I_Queue *queueP);
79 static int64_t getMaxNumOfBytes(I_Queue *queueP);
80 static bool persistentQueueEmpty(I_Queue *queueP);
81 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception);
82 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception);
83 static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception);
84 static bool createTables(I_Queue *queueP, ExceptionStruct *exception);
85 static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception);
86 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, sqlite3_stmt **ppVm, const char *queryString, ExceptionStruct *exception);
87 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception);
88 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception);
89 static void freeQueueEntryData(QueueEntry *queueEntry);
90
91 /* For manual error checking */
92 static const char *errLink = "http://www.sqlite.org/c3ref/c_abort.html";
93
94 /* The tmp_hlp_struct; is needed because a forward declaration of an anonymous struct is not possible. */
95 struct TmpHelper;
96 typedef struct TmpHelper TmpHelper;
97 typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, TmpHelper *helper, sqlite3_stmt *pVm, ExceptionStruct *exception);
98 /**
99 * Used temporary to shorten arglists.
100 */
101 struct TmpHelper{
102 QueueEntryArr **queueEntryArrPP;
103 int32_t currEntries;
104 int64_t currBytes;
105 int32_t maxNumOfEntries; /** The max wanted number of entries for this peek() */
106 int64_t maxNumOfBytes; /** The max wanted bytes during peek() */
107 ParseDataFp parseDataFp;
108 };
109
110 static int32_t getResultRows(I_Queue *queueP, const char *methodName, sqlite3_stmt *pVm, TmpHelper *helper, bool finalize, ExceptionStruct *exception);
111 /* Shortcut for:
112 if (queueP && queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created");
113 is
114 LOG __FILE__, "Persistent queue is created");
115 */
116 #define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE,
117
118 #define LEN512 512 /* ISO C90 forbids variable-size array: const int LEN512=512; */
119 #define LEN256 256 /* ISO C90 forbids variable-size array: const int LEN256=256; */
120
121 #define DBNAME_MAX 128
122 #define ID_MAX 256
123
124
125 /**
126 * Holds Prepared statements for better performance.
127 * @see http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html
128 */
129 typedef struct DbInfoStruct {
130 QueueProperties prop; /** Meta information */
131 size_t numOfEntries; /** Cache for current number of entries */
132 int64_t numOfBytes; /** Cache for current number of bytes */
133 sqlite3 *db; /** Database handle for SQLite */
134 sqlite3_stmt *pVm_put; /** SQLite virtual machine to hold a prepared query */
135 sqlite3_stmt *pVm_peekWithSamePriority;
136 sqlite3_stmt *pVm_fillCache;
137 } DbInfo;
138
139 static char int64Str_[INT64_STRLEN_MAX];
140 static char * const int64Str = int64Str_; /* to make the pointer address const */
141
142 /** Column index into XB_ENTRIES table */
143 enum {
144 XB_ENTRIES_DATA_ID = 0,
145 XB_ENTRIES_QUEUE_NAME,
146 XB_ENTRIES_PRIO,
147 XB_ENTRIES_TYPE_NAME,
148 XB_ENTRIES_PERSISTENT,
149 XB_ENTRIES_SIZE_IN_BYTES,
150 XB_ENTRIES_BLOB
151 };
152
153
154 /**
155 * Create a new persistent queue instance.
156 * <br />
157 * @return NULL if bootstrapping failed. If not NULL you need to free() it when you are done
158 * usually by calling shutdown().
159 * @throws exception
160 */
161 Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception)
162 {
163 bool stateOk = true;
164 I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue));
165 if (queueP == 0) return queueP;
166 queueP->isInitialized = false;
167 queueP->initialize = persistentQueueInitialize;
168 queueP->getProperties = getProperties;
169 queueP->put = persistentQueuePut;
170 queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority;
171 queueP->randomRemove = persistentQueueRandomRemove;
172 queueP->clear = persistentQueueClear;
173 queueP->getNumOfEntries = getNumOfEntries;
174 queueP->getMaxNumOfEntries = getMaxNumOfEntries;
175 queueP->getNumOfBytes = getNumOfBytes;
176 queueP->getMaxNumOfBytes = getMaxNumOfBytes;
177 queueP->empty = persistentQueueEmpty;
178 queueP->shutdown = persistentQueueShutdown;
179 queueP->destroy = persistentQueueDestroy;
180 queueP->privateObject = calloc(1, sizeof(DbInfo));
181 {
182 DbInfo *dbInfo = (DbInfo *)queueP->privateObject;
183 dbInfo->numOfEntries = -1;
184 dbInfo->numOfBytes = -1;
185 }
186 stateOk = queueP->initialize(queueP, queueProperties, exception);
187 if (stateOk) {
188 LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created");
189 }
190 else {
191 ExceptionStruct ex;
192 queueP->shutdown(&queueP, &ex);
193 if (*ex.errorCode != 0) {
194 embedException(exception, ex.errorCode, ex.message, exception);
195 }
196 queueP = 0;
197 }
198 return queueP;
199 }
200
201 /** Access the DB handle, queueP pointer is not checked */
202 static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) {
203 return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject);
204 }
205
206 /**
207 * Access the queue configuration.
208 * @param queueP The this pointer
209 * @return Read only access, 0 on error
210 */
211 static const QueueProperties *getProperties(I_Queue *queueP)
212 {
213 ExceptionStruct exception;
214 if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0;
215 return &getDbInfo(queueP)->prop;
216 }
217
218 /**
219 */
220 static void freeQueue(I_Queue **queuePP)
221 {
222 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
223 if (queueP == 0) {
224 fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__);
225 return;
226 }
227
228 LOG __FILE__, "freeQueue() called");
229
230 if (queueP->privateObject) {
231 free(queueP->privateObject);
232 queueP->privateObject = 0;
233 }
234
235 free(queueP);
236 *queuePP = 0;
237 }
238
239 /**
240 * Called internally by createQueue().
241 * @param queueP The this pointer
242 * @param queueProperties The configuration
243 * @param exception Can contain error information (out parameter)
244 * @return true on success
245 */
246 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception)
247 {
248 char *errMsg = 0;
249 bool retOk;
250 sqlite3 *db = 0;
251 DbInfo *dbInfo = 0;
252
253 if (checkArgs(queueP, "initialize", false, exception) == false ) return false;
254 if (queueProperties == 0) {
255 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
256 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
257 "[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__);
258 /* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */
259 fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message);
260 return false;
261 }
262
263 queueP->log = queueProperties->logFp;
264 queueP->logLevel = queueProperties->logLevel;
265 queueP->userObject = queueProperties->userObject;
266
267 if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 ||
268 queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) {
269 char dbName[QUEUE_DBNAME_MAX];
270 char queueName[QUEUE_ID_MAX];
271 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
272 if (queueProperties->dbName == 0)
273 strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX);
274 else
275 strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX);
276 if (queueProperties->queueName == 0)
277 strncpy0(queueName, "NULL", QUEUE_ID_MAX);
278 else
279 strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX);
280 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
281 "[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s',"
282 " maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__,
283 dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes);
284 LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
285 return false;
286 }
287
288 dbInfo = getDbInfo(queueP);
289 memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties));
290
291 /* Never trust a queue property you haven't overflowed yourself :-) */
292 dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0;
293 dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0;
294 dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0;
295
296 LOG __FILE__, "dbName = %s", dbInfo->prop.dbName);
297 LOG __FILE__, "queueName = %s", dbInfo->prop.queueName);
298 LOG __FILE__, "tablePrefix = %s", dbInfo->prop.tablePrefix);
299 LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries);
300 LOG __FILE__, "maxNumOfBytes = %ld",(long)dbInfo->prop.maxNumOfBytes);
301 /*LOG __FILE__, "logFp = %d", (int)dbInfo->prop.logFp);*/
302 LOG __FILE__, "logLevel = %d", (int)dbInfo->prop.logLevel);
303 /*LOG __FILE__, "userObject = %d", (void*)dbInfo->prop.userObject);*/
304
305 if (sqlite3_open_v2(dbInfo->prop.dbName, &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX, 0) != SQLITE_OK || db == 0) {
306 queueP->isInitialized = false;
307 if(queueP->log) {
308 if (errMsg) {
309 LOG __FILE__, "%s", errMsg);
310 }
311 else {
312 LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName);
313 }
314 }
315 else {
316 if (errMsg)
317 fprintf(stderr,"[%s] %s\n", __FILE__, errMsg);
318 else
319 fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName);
320 }
321 strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
322 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
323 "[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg);
324 if (errMsg != 0) xb_sqlite_free(errMsg);
325 return false;
326 }
327
328 dbInfo->db = db;
329 queueP->isInitialized = true;
330
331 retOk = createTables(queueP, exception);
332
333 fillCache(queueP, exception);
334
335 LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed");
336 return true;
337 }
338
339 /**
340 * Create the necessary DB table if not already existing.
341 * @param queueP
342 * @param exception Can contain error information (out parameter)
343 * @return true on success
344 */
345 static bool createTables(I_Queue *queueP, ExceptionStruct *exception)
346 {
347 char queryString[LEN512];
348 bool retOk;
349 const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix;
350
351 SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));",
352 tablePrefix);
353 retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception);
354
355 SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);",
356 tablePrefix, tablePrefix);
357 retOk = execSilent(queueP, queryString, "Creating PRIO index", exception);
358 return retOk;
359 }
360
361 /**
362 * Invoke SQL query.
363 * @param queueP Is not checked, must not be 0
364 * @param queryString The SQL to execute
365 * @param comment For logging or exception text
366 * @param exception Can contain error information (out parameter)
367 * @return true on success
368 */
369 static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception)
370 {
371 int rc = 0;
372 char *errMsg = 0;
373 bool retOk;
374 DbInfo *dbInfo = getDbInfo(queueP);
375
376 rc = sqlite3_exec(dbInfo->db, queryString, NULL, NULL, &errMsg);
377 switch (rc) {
378 case SQLITE_OK:
379 LOG __FILE__, "SQL '%s' success", comment);
380 retOk = true;
381 break;
382 default:
383 if (errMsg && strstr(errMsg, "already exists")) {
384 LOG __FILE__, "OK, '%s' [%d]: %s", comment, rc, (errMsg==0)?"":errMsg);
385 retOk = true;
386 }
387 else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) {
388 LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, (errMsg==0)?"":errMsg);
389 retOk = true;
390 }
391 else {
392 LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, (errMsg==0)?"":errMsg);
393 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
394 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
395 "[%.100s:%d] SQL error '%s' [%d]: %s", __FILE__, __LINE__, comment, rc, (errMsg==0)?"":errMsg);
396 retOk = false;
397 }
398 break;
399 }
400 if (errMsg != 0) xb_sqlite_free(errMsg);
401 return retOk;
402 }
403
404 /**
405 * @param queueP The queue instance
406 * @param queueEntry The entry
407 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
408 */
409 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception)
410 {
411 int rc = 0;
412 bool stateOk = true;
413 DbInfo *dbInfo;
414 char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */
415
416 if (checkArgs(queueP, "put", true, exception) == false ) return;
417 if (queueEntry == 0) {
418 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
419 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
420 "[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__);
421 return;
422 }
423 if (queueEntry->uniqueId == 0) {
424 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
425 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
426 "[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__);
427 return;
428 }
429 if (*queueEntry->embeddedType == 0) {
430 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
431 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
432 "[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__);
433 return;
434 }
435 strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
436
437 if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) {
438 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
439 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
440 "[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__);
441 return;
442 }
443
444 dbInfo = getDbInfo(queueP);
445
446 if (dbInfo->numOfEntries >= (size_t)dbInfo->prop.maxNumOfEntries) {
447 strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN);
448 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
449 "[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries);
450 return;
451 }
452 if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) {
453 strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN);
454 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
455 "[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes));
456 return;
457 }
458
459
460 if (dbInfo->pVm_put == 0) { /* Compile prepared query only once */
461 char queryString[LEN256]; /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/
462 SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?);", dbInfo->prop.tablePrefix);
463 stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception);
464 }
465
466 if (stateOk) { /* set prepared statement tokens */
467 int index = 0;
468 rc = SQLITE_OK;
469 if(rc == SQLITE_OK) rc = sqlite3_bind_int64(dbInfo->pVm_put , ++index, queueEntry->uniqueId);
470 if(rc == SQLITE_OK) rc = sqlite3_bind_text(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, strlen(dbInfo->prop.queueName), SQLITE_STATIC);
471 if(rc == SQLITE_OK) rc = sqlite3_bind_int64(dbInfo->pVm_put, ++index, queueEntry->priority);
472 if(rc == SQLITE_OK) rc = sqlite3_bind_text(dbInfo->pVm_put, ++index, embeddedType, strlen(embeddedType), SQLITE_STATIC);
473 if(rc == SQLITE_OK) rc = sqlite3_bind_text(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", 1, SQLITE_STATIC);
474 if(rc == SQLITE_OK) rc = sqlite3_bind_int64(dbInfo->pVm_put, ++index, queueEntry->embeddedBlob.dataLen);
475 if(rc == SQLITE_OK) rc = sqlite3_bind_blob(dbInfo->pVm_put, ++index, queueEntry->embeddedBlob.data, (int)queueEntry->embeddedBlob.dataLen, SQLITE_STATIC);
476
477 if (rc != SQLITE_OK) {
478 switch(rc) {
479 case SQLITE_RANGE:
480 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
481 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] put(%s) SQL error: %d index out of range", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc );
482 LOG __FILE__, "put(%s) SQL error: %d index out of range", int64ToStr(int64Str, queueEntry->uniqueId), rc); break;
483 case SQLITE_NOMEM:
484 LOG __FILE__, "put(%s) SQL error: %d out of memory", int64ToStr(int64Str, queueEntry->uniqueId), rc);
485 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] put(%s) SQL error: %d out of memory", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc );
486 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); break;
487 case SQLITE_MISUSE:
488 LOG __FILE__, "put(%s) SQL error: %d misuse: virtual machine not valid", int64ToStr(int64Str, queueEntry->uniqueId), rc);
489 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] put(%s) SQL error: %d misuse: virtual machine not valid", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc );
490 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); break;
491 default:
492 LOG __FILE__, "put(%s) SQL error: %d undefined error", int64ToStr(int64Str, queueEntry->uniqueId), rc);
493 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] put(%s) SQL error: %d undefined error", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc );
494 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); break;
495
496 }
497 stateOk = false;
498 }
499 }
500
501 if (stateOk) { /* start the query, process results */
502 int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, false, exception);
503 stateOk = countRows >= 0;
504 }
505
506 if (stateOk) {
507 dbInfo->numOfEntries += 1;
508 dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen);
509 }
510
511 LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed");
512 }
513
514
515 /**
516 * Compile a prepared query.
517 * No parameters are checked, they must be valid
518 * @param queueP The queue instance to use
519 * @param methodName A nice string for logging
520 * @param ppVm The virtual machine will be initialized if still 0
521 * @param queryString
522 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
523 * @return false on error and exception->errorCode is not null
524 */
525 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName,
526 sqlite3_stmt **ppVm, const char *queryString, ExceptionStruct *exception)
527 {
528 int iRetry, numRetry=100;
529 int rc = 0;
530 const char *pzTail = 0; /* OUT: uncompiled tail of zSql */
531 bool stateOk = true;
532 DbInfo *dbInfo = getDbInfo(queueP);
533
534 if (*ppVm == 0) { /* Compile prepared query */
535 for (iRetry = 0; iRetry < numRetry; iRetry++) {
536 rc = sqlite3_prepare_v2(dbInfo->db, queryString, strlen(queryString), ppVm, &pzTail);
537 switch (rc) {
538 case SQLITE_BUSY:
539 if (iRetry == (numRetry-1)) {
540 strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN);
541 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
542 "[%.100s:%d] SQL error #%d resource busy in %s()", __FILE__, __LINE__, rc, methodName);
543 }
544 LOG __FILE__, "%s() Sleeping as other thread holds DB", methodName );
545 sleepMillis(10);
546 break;
547 case SQLITE_OK:
548 iRetry = numRetry; /* We're done */
549 LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString);
550 break;
551 default:
552 LOG __FILE__, "SQL error #%d in %s(). See %s for details.", rc, methodName, errLink);
553 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
554 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
555 "[%.100s:%d] SQL error #%d in %s(). See %s for details.", __FILE__, __LINE__, rc, methodName, errLink);
556 iRetry = numRetry; /* We're done */
557 stateOk = false;
558 break;
559 }
560 }
561 }
562 if (*ppVm == 0) stateOk = false;
563 return stateOk;
564 }
565
566 /**
567 * For each SQL result row parse it into a QueueEntry.
568 * No parameters are checked, they must be valid
569 * Implements a ParseDataFp (function pointer)
570 * @param queueP The 'this' pointer
571 * @param currIndex
572 * @param TmpHelper
573 * @param sqlite3 statement
574 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
575 * @return false on error and exception->errorCode is not null
576 */
577 static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, TmpHelper *helper,
578 sqlite3_stmt *pVm, ExceptionStruct *exception)
579 {
580 bool doContinue = true;
581 int numAssigned;
582 bool stateOk = true;
583 QueueEntry *queueEntry = 0;
584 QueueEntryArr *queueEntryArr;
585 QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP;
586
587 if (currIndex == 0) {
588 helper->currEntries = 0;
589 helper->currBytes = 0;
590 }
591
592 if (*queueEntryArrPP == 0) {
593 *queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));
594 if (helper->maxNumOfEntries == 0) {
595 doContinue = false;
596 return doContinue;
597 }
598 }
599 queueEntryArr = *queueEntryArrPP;
600
601 if (queueEntryArr->len == 0) {
602 queueEntryArr->len = 10;
603 queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry));
604 }
605 else if (currIndex >= queueEntryArr->len) {
606 queueEntryArr->len += 10;
607 queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry));
608 }
609 queueEntry = &queueEntryArr->queueEntryArr[currIndex];
610 memset(queueEntry, 0, sizeof(QueueEntry));
611
612 queueEntry->uniqueId = sqlite3_column_int64(pVm, XB_ENTRIES_DATA_ID);
613 stateOk = queueEntry->uniqueId == 0 ? false : true;
614 if (!stateOk) {
615 LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse sqlite3_column_int64(pVm, 0) '%.20s' to uniqueId, ignoring entry.", sqlite3_column_text(pVm, XB_ENTRIES_DATA_ID));
616 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
617 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
618 "[%.100s:%d] peekWithSamePriority() ERROR: Can't parse qlite3_column_int64(pVm, 0) '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, (char*)sqlite3_column_text(pVm, XB_ENTRIES_DATA_ID), sqlite3_column_name(pVm, XB_ENTRIES_DATA_ID));
619 doContinue = false;
620 return doContinue;
621 }
622
623 LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex);
624 numAssigned = sscanf((const char*)sqlite3_column_text(pVm, XB_ENTRIES_PRIO), "%hd", &queueEntry->priority);
625 if (numAssigned != 1) {
626 LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse sqlite3_column_int64(pVm, XB_ENTRIES_PRIO) '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), sqlite3_column_text(pVm, XB_ENTRIES_PRIO));
627 queueEntry->priority = 4;
628 }
629 strncpy0(queueEntry->embeddedType, (const char*)sqlite3_column_text(pVm, XB_ENTRIES_TYPE_NAME), QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
630
631 queueEntry->isPersistent = *sqlite3_column_text(pVm, XB_ENTRIES_PERSISTENT) == 'T' ? true : false;
632
633 queueEntry->embeddedBlob.dataLen = (size_t)sqlite3_column_int64(pVm, XB_ENTRIES_SIZE_IN_BYTES);
634
635 /* sqlite3_column_bytes() can be used to get the length */
636 queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen);
637 memcpy(queueEntry->embeddedBlob.data, (char *)sqlite3_column_blob(pVm, XB_ENTRIES_BLOB), queueEntry->embeddedBlob.dataLen);
638
639 helper->currEntries += 1;
640 helper->currBytes += queueEntry->embeddedBlob.dataLen;
641
642 /* Limit the number of entries */
643 if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) ||
644 (helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) {
645 /* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */
646 doContinue = false;
647 }
648
649 return doContinue;
650 }
651
652 /**
653 * Execute the query and get the query result.
654 * No parameters are checked, they must be valid
655 * @param queueP The this pointer
656 * @param methodName The method called
657 * @param pVm sqlite virtual machine
658 * @param helper for smaller arglist
659 * @param finalize true to call sqlite_finalize which deletes the virtual machine,
660 * false to call sqlite_reset to reuse the prepared query
661 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
662 * @return < 0 on error and exception->errorCode is not null
663 * otherwise the number of successfully parsed rows is returned
664 * @todo For INSERT and DELETE return the number of touched entries !!!
665 */
666 static int32_t getResultRows(I_Queue *queueP, const char *methodName, sqlite3_stmt *pVm, TmpHelper *helper, bool finalize, ExceptionStruct *exception)
667 {
668 int32_t currIndex = 0;
669 bool done = false;
670 bool stateOk = true;
671 int rc;
672
673 while (!done) {
674
675 rc = sqlite3_step(pVm);
676 switch(rc){
677 case SQLITE_DONE:
678 done = true;
679 break;
680 case SQLITE_BUSY:
681 LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName);
682 sleepMillis(10);
683 break;
684 case SQLITE_ROW:
685 {
686 bool doContinue = true;
687 if(helper != 0) {
688 doContinue = helper->parseDataFp(queueP, currIndex, helper, pVm, exception);
689
690 stateOk = *exception->errorCode == 0;
691 }
692 currIndex++;
693 if(!stateOk || !doContinue) done = true;
694 }
695 break;
696 case SQLITE_ERROR:
697 LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc);
698 done = true;
699 stateOk = false;
700 break;
701 case SQLITE_SCHEMA:
702 LOG __FILE__, "%s() Sql execution problem [sqlCode=%d], inconsistent schema", methodName, rc);
703 /* no break */
704 case SQLITE_MISUSE:
705 default:
706 LOG __FILE__, "%s() SQL execution problem [sqlCode=%d]. See %s for details", methodName, rc, errLink);
707 done = true;
708 stateOk = false;
709 break;
710 }
711
712 }
713 LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex);
714
715 if (finalize) {
716 sqlite3_finalize(pVm);
717 if (rc != SQLITE_OK && rc != SQLITE_DONE) {
718 /* LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled.", rc, sqlite_errmsg( )); */
719 LOG __FILE__, "WARN: getResultRows() sqlCode=%d is not handled. See %s for details", rc, errLink );
720 }
721 }
722 else { /* Reset prepared statement */
723 rc = sqlite3_reset(pVm);
724 if (rc == SQLITE_SCHEMA) {
725 /* LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled", rc, sqlite_error_string(rc) ); */
726 LOG __FILE__, "WARN: getResultRows() sqlCode=%d is not handled. See %s for details", rc, errLink );
727 }
728 }
729
730 return stateOk ? currIndex : (-1)*rc;
731 }
732
733 /**
734 * Access queue entries without removing them.
735 * @param queueP the this pointer
736 * @param maxNumOfEntries
737 * @param maxNumOfBytes
738 * @param Exception struct
739 * @return queueEntryArr
740 */
741 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception)
742 {
743 int rc = 0;
744 bool stateOk = true;
745 DbInfo *dbInfo;
746 QueueEntryArr *queueEntryArr = 0;
747
748 if (checkArgs(queueP, "peekWithSamePriority", true, exception) == false ) return 0;
749
750 LOG __FILE__, "peekWithSamePriority(maxNumOfEntries=%d, maxNumOfBytes=%s) ...", (int)maxNumOfEntries, int64ToStr(int64Str, maxNumOfBytes));
751
752 dbInfo = getDbInfo(queueP);
753
754 if (dbInfo->pVm_peekWithSamePriority == 0) { /* Compile prepared query */
755 char queryString[LEN512];
756 /*"SELECT * FROM XB_ENTRIES where queueName='connection_clientJoe' and prio=(select max(prio) from XB_ENTRIES where queueName='connection_clientJoe') ORDER BY dataId ASC";*/
757 SNPRINTF(queryString, LEN512,
758 "SELECT * FROM %.20sENTRIES where queueName=?"
759 " and prio=(select max(prio) from %.20sENTRIES where queueName=?)"
760 " ORDER BY dataId ASC",
761 dbInfo->prop.tablePrefix, dbInfo->prop.tablePrefix);
762 stateOk = compilePreparedQuery(queueP, "peekWithSamePriority",
763 &dbInfo->pVm_peekWithSamePriority , queryString, exception);
764 }
765
766 if (stateOk) { /* set prepared statement tokens */
767 int index = 0;
768
769 rc = SQLITE_OK;
770
771 if (rc == SQLITE_OK) rc = sqlite3_bind_text(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, strlen(dbInfo->prop.queueName), SQLITE_STATIC);
772 if (rc == SQLITE_OK) rc = sqlite3_bind_text(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, strlen(dbInfo->prop.queueName), SQLITE_STATIC);
773
774 switch (rc) {
775 case SQLITE_OK:
776 LOG __FILE__, "peekWithSamePriority() Bound to prepared statement [sqlCode=%d]", rc);
777 break;
778
779 case SQLITE_RANGE:
780 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
781 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] peekWithSamePriority() SQL error: %d index out of range", __FILE__, __LINE__, rc );
782 LOG __FILE__, "peekWithSamePriority() SQL error: %d index out of range", rc);
783 stateOk = false;
784 break;
785 case SQLITE_NOMEM:
786 LOG __FILE__, "peekWithSamePriority() SQL error: %d out of memory", rc);
787 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] peekWithSamePriority() SQL error: %d out of memory", __FILE__, __LINE__, rc );
788 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
789 stateOk = false;
790 break;
791 case SQLITE_MISUSE:
792 LOG __FILE__, "peekWithSamePriority() SQL error: %d misuse: virtual machine not valid", rc);
793 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] peekWithSamePriority() SQL error: %d misuse: virtual machine not valid", __FILE__, __LINE__, rc );
794 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
795 stateOk = false;
796 break;
797 default:
798 LOG __FILE__, "peekWithSamePriority() SQL error: %d undefined error", rc);
799 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] peekWithSamePriority() SQL error: %d undefined error", __FILE__, __LINE__, rc );
800 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
801 stateOk = false;
802 break;
803 }
804 }
805
806 if (stateOk) { /* start the query */
807 TmpHelper helper;
808 int32_t currIndex = 0;
809 helper.queueEntryArrPP = &queueEntryArr;
810 helper.maxNumOfEntries = maxNumOfEntries;
811 helper.maxNumOfBytes = maxNumOfBytes;
812 helper.parseDataFp = parseQueueEntryArr;
813 currIndex = getResultRows(queueP, "peekWithSamePriority", dbInfo->pVm_peekWithSamePriority, &helper, false, exception);
814 stateOk = currIndex >= 0;
815 if (!stateOk) {
816 if (queueEntryArr) {
817 free(queueEntryArr->queueEntryArr);
818 queueEntryArr->len = 0;
819 }
820 }
821 else {
822 if (!queueEntryArr)
823 queueEntryArr = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));
824 else if ((size_t)currIndex < queueEntryArr->len) {
825 queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, currIndex * sizeof(QueueEntry));
826 queueEntryArr->len = currIndex;
827 }
828 }
829 }
830
831 LOG __FILE__, "peekWithSamePriority() %s", stateOk ? "done" : "failed");
832 return queueEntryArr;
833 }
834
835 /**
836 * Removes the given entries from persistence.
837 * @return The number of removed entries
838 */
839 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception)
840 {
841 bool stateOk = true;
842 int64_t numOfBytes = 0;
843 int32_t countDeleted = 0;
844 sqlite3_stmt *pVm = 0;
845 DbInfo *dbInfo;
846 if (checkArgs(queueP, "randomRemove", true, exception) == false || queueEntryArr == 0 ||
847 queueEntryArr->len == 0 || queueEntryArr->queueEntryArr == 0)
848 return 0;
849
850 LOG __FILE__, "randomRemove(%d) ...", (int)queueEntryArr->len);
851
852 dbInfo = getDbInfo(queueP);
853
854 {
855 size_t i;
856 const size_t qLen = 128 + 2*ID_MAX + queueEntryArr->len*(INT64_STRLEN_MAX+6);
857 char *queryString = (char *)calloc(qLen, sizeof(char));
858 /* DELETE FROM xb_entries WHERE queueName = 'connection_clientJoe' AND dataId in ( 1081492136876000000, 1081492136856000000 ); */
859 SNPRINTF(queryString, qLen,
860 "DELETE FROM %.20sENTRIES WHERE queueName='%s'"
861 " AND dataId in ( ",
862 dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
863
864 for (i=0; i<queueEntryArr->len; i++) {
865 strncat0(queryString, int64ToStr(int64Str, queueEntryArr->queueEntryArr[i].uniqueId), INT64_STRLEN_MAX+1);
866 if (i<(queueEntryArr->len-1)) strncat0(queryString, ",", 2);
867 numOfBytes += ((queueEntryArr->queueEntryArr[i].sizeInBytes > 0) ? queueEntryArr->queueEntryArr[i].sizeInBytes : queueEntryArr->queueEntryArr[i].embeddedBlob.dataLen);
868 }
869 strncat0(queryString, " )", 3);
870 stateOk = compilePreparedQuery(queueP, "randomRemove", &pVm, queryString, exception);
871 free(queryString);
872 }
873
874
875 if (stateOk) { /* start the query */
876 int32_t currIndex = getResultRows(queueP, "randomRemove", pVm, 0, true, exception);
877 stateOk = currIndex >= 0;
878 }
879
880 if (stateOk) {
881 countDeleted = (int32_t)sqlite3_changes(dbInfo->db); /* This function returns the number of database rows that were changed (or inserted or deleted) by the most recently completed
882 INSERT, UPDATE, or DELETE statement.
883 Only changes that are directly specified by the INSERT, UPDATE, or DELETE statement are counted.
884 Auxiliary changes caused by triggers are not counted.
885 Use the sqlite3_total_changes() function to find the total number of changes including changes caused by triggers.*/
886 if (countDeleted < 0 || (size_t)countDeleted != queueEntryArr->len) {
887 fillCache(queueP, exception); /* calculate numOfBytes again */
888 }
889 else {
890 dbInfo->numOfEntries -= queueEntryArr->len;
891 dbInfo->numOfBytes -= numOfBytes;
892 }
893 }
894
895 return countDeleted;
896 }
897
898 /**
899 * Destroy all entries in queue and releases all resources in memory and on HD.
900 */
901 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception)
902 {
903 bool stateOk = true;
904 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
905 if (checkArgs(queueP, "destroy", false, exception) == false ) return false;
906 shutdownInternal(queuePP, exception);
907
908 {
909 DbInfo *dbInfo = getDbInfo(queueP);
910 const char *dbName = dbInfo->prop.dbName;
911 stateOk = unlink(dbName) == 0; /* Delete old db file */
912 if (!stateOk) {
913 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
914 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
915 "[%.100s:%d] destroy() ERROR: Can't destroy database '%s', errno=%d.", __FILE__, __LINE__, dbName, errno);
916 }
917 }
918
919 freeQueue(queuePP);
920
921 return stateOk;
922 }
923
924 /**
925 * Destroy all entries in queue.
926 */
927 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception)
928 {
929 int stateOk = true;
930 char queryString[LEN256];
931 sqlite3_stmt *pVm = 0;
932 DbInfo *dbInfo;
933 if (checkArgs(queueP, "clear", true, exception) == false) return false;
934 dbInfo = getDbInfo(queueP);
935
936 SNPRINTF(queryString, LEN256, "DELETE FROM %.20sENTRIES", dbInfo->prop.tablePrefix);
937 stateOk = compilePreparedQuery(queueP, "clear", &pVm, queryString, exception);
938
939 if (stateOk) {
940 int32_t currIndex = getResultRows(queueP, "clear", pVm, 0, true, exception);
941 stateOk = currIndex >= 0;
942 }
943
944 if (stateOk) {
945 dbInfo->numOfEntries = 0;
946 dbInfo->numOfBytes = 0;
947 }
948
949 LOG __FILE__, "clear() done");
950 return stateOk;
951 }
952
953 /**
954 * Parse response of "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
955 */
956 static bool parseCacheInfo(I_Queue *queueP, size_t currIndex, TmpHelper* helper, sqlite3_stmt *pVm, ExceptionStruct *exception)
957 {
958 int64_t ival = 0;
959 DbInfo *dbInfo = getDbInfo(queueP);
960 ival = sqlite3_column_int64(pVm, 0);
961 dbInfo->numOfEntries = (int32_t)ival;
962 dbInfo->numOfBytes = sqlite3_column_int64(pVm, 1);
963 return true;
964 }
965
966 /**
967 * Reload cached information from database.
968 * @param queueP The this pointer
969 * @param exception Returns error
970 * @return false on error
971 */
972 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception)
973 {
974 bool stateOk = true;
975 DbInfo *dbInfo = 0;
976
977 char queryString[LEN512]; /* "SELECT count(dataId) FROM XB_ENTRIES where queueName='connection_clientJoe'" */
978
979 if (checkArgs(queueP, "fillCache", true, exception) == false ) return true;
980 dbInfo = getDbInfo(queueP);
981
982 SNPRINTF(queryString, LEN512,
983 "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
984 dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
985 stateOk = compilePreparedQuery(queueP, "fillCache",
986 &dbInfo->pVm_fillCache, queryString, exception);
987
988 if (stateOk) { /* start the query, calls parseCacheInfo() */
989 TmpHelper helper;
990 int32_t currIndex;
991 helper.parseDataFp = parseCacheInfo;
992 currIndex = getResultRows (queueP, "fillCache", dbInfo->pVm_fillCache, &helper, false, exception);
993 stateOk = currIndex > 0;
994 }
995
996 LOG __FILE__, "fillCache() numOfEntries=%d numOfBytes=%s", dbInfo->numOfEntries, int64ToStr(int64Str, dbInfo->numOfBytes));
997 return stateOk;
998 }
999
1000 static bool persistentQueueEmpty(I_Queue *queueP)
1001 {
1002 return getNumOfEntries(queueP) <= 0;
1003 }
1004
1005 static int32_t getNumOfEntries(I_Queue *queueP)
1006 {
1007 DbInfo *dbInfo;
1008 bool stateOk = true;
1009 ExceptionStruct exception;
1010 if (checkArgs(queueP, "getNumOfEntries", false, &exception) == false ) return -1;
1011 dbInfo = getDbInfo(queueP);
1012 if (dbInfo->numOfEntries == -1) {
1013 stateOk = fillCache(queueP, &exception);
1014 }
1015 return (stateOk) ? (int32_t)dbInfo->numOfEntries : -1;
1016 }
1017
1018 static int32_t getMaxNumOfEntries(I_Queue *queueP)
1019 {
1020 DbInfo *dbInfo;
1021 ExceptionStruct exception;
1022 if (checkArgs(queueP, "getMaxNumOfEntries", false, &exception) == false ) return -1;
1023 dbInfo = getDbInfo(queueP);
1024 return dbInfo->prop.maxNumOfEntries;
1025 }
1026
1027 static int64_t getNumOfBytes(I_Queue *queueP)
1028 {
1029 DbInfo *dbInfo;
1030 ExceptionStruct exception;
1031 bool stateOk = true;
1032 if (checkArgs(queueP, "getNumOfBytes", false, &exception) == false ) return -1;
1033 dbInfo = getDbInfo(queueP);
1034 if (dbInfo->numOfBytes == -1) {
1035 stateOk = fillCache(queueP, &exception);
1036 }
1037 return (stateOk) ? dbInfo->numOfBytes : -1;
1038 }
1039
1040 static int64_t getMaxNumOfBytes(I_Queue *queueP)
1041 {
1042 DbInfo *dbInfo;
1043 ExceptionStruct exception;
1044 if (checkArgs(queueP, "getMaxNumOfBytes", false, &exception) == false ) return -1;
1045 dbInfo = getDbInfo(queueP);
1046 return dbInfo->prop.maxNumOfBytes;
1047 }
1048
1049 /**
1050 * Shutdown without destroying any entry.
1051 * Clears all open DB resources.
1052 */
1053 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception)
1054 {
1055 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
1056 if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
1057 shutdownInternal(queuePP, exception);
1058 freeQueue(queuePP);
1059 }
1060
1061 /**
1062 * Shutdown used internally without calling freeQueue().
1063 */
1064 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception)
1065 {
1066 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
1067 if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
1068 {
1069 DbInfo *dbInfo = getDbInfo(queueP);
1070 queueP->isInitialized = false;
1071 if(dbInfo) {
1072 if (dbInfo->pVm_put) {
1073 sqlite3_finalize(dbInfo->pVm_put);
1074 dbInfo->pVm_put = 0;
1075 }
1076 if (dbInfo->pVm_peekWithSamePriority) {
1077 sqlite3_finalize(dbInfo->pVm_peekWithSamePriority);
1078 dbInfo->pVm_peekWithSamePriority = 0;
1079 }
1080 if (dbInfo->pVm_fillCache) {
1081 sqlite3_finalize(dbInfo->pVm_fillCache);
1082 dbInfo->pVm_fillCache = 0;
1083 }
1084 if (dbInfo->db) {
1085 sqlite3_close(dbInfo->db);
1086 dbInfo->db = 0;
1087 }
1088 LOG __FILE__, "shutdown() done");
1089 }
1090 }
1091 }
1092
1093 /**
1094 * Frees everything inside QueueEntryArr and the struct QueueEntryArr itself
1095 * @param queueEntryArr The struct to free, passing NULL is OK
1096 */
1097 Dll_Export void freeQueueEntryArr(QueueEntryArr *queueEntryArr)
1098 {
1099 if (queueEntryArr == (QueueEntryArr *)0) return;
1100 freeQueueEntryArrInternal(queueEntryArr);
1101 free(queueEntryArr);
1102 }
1103
1104 /**
1105 * Frees everything inside QueueEntryArr but NOT the struct QueueEntryArr itself
1106 * @param queueEntryArr The struct internals to free, passing NULL is OK
1107 */
1108 Dll_Export void freeQueueEntryArrInternal(QueueEntryArr *queueEntryArr)
1109 {
1110 size_t i;
1111 if (queueEntryArr == (QueueEntryArr *)0) return;
1112 for (i=0; i<queueEntryArr->len; i++) {
1113 freeQueueEntryData(&queueEntryArr->queueEntryArr[i]);
1114 }
1115 free(queueEntryArr->queueEntryArr);
1116 queueEntryArr->len = 0;
1117 }
1118
1119 /**
1120 * Does not free the queueEntry itself
1121 */
1122 static void freeQueueEntryData(QueueEntry *queueEntry)
1123 {
1124 if (queueEntry == (QueueEntry *)0) return;
1125 if (queueEntry->embeddedBlob.data != 0) {
1126 free((char *)queueEntry->embeddedBlob.data);
1127 queueEntry->embeddedBlob.data = 0;
1128 }
1129 queueEntry->embeddedBlob.dataLen = 0;
1130 }
1131
1132 /**
1133 * Frees the internal blob and the queueEntry itself.
1134 * @param queueEntry Its memory is freed, it is not usable anymore after this call
1135 */
1136 Dll_Export void freeQueueEntry(QueueEntry *queueEntry)
1137 {
1138 if (queueEntry == (QueueEntry *)0) return;
1139 freeQueueEntryData(queueEntry);
1140 free(queueEntry);
1141 }
1142
1143 /**
1144 * NOTE: You need to free the returned pointer with xmlBlasterFree() (which calls free())!
1145 *
1146 * @param queueEntry The data to put to the queue
1147 * @param maxContentDumpLen for -1 get the complete content, else limit the
1148 * content to the given number of bytes
1149 * @return A ASCII XML formatted entry or NULL if out of memory
1150 */
1151 Dll_Export char *queueEntryToXml(QueueEntry *queueEntry, int maxContentDumpLen)
1152 {
1153 if (queueEntry == (QueueEntry *)0) return 0;
1154 {
1155 char *contentStr = strFromBlobAlloc(queueEntry->embeddedBlob.data, queueEntry->embeddedBlob.dataLen);
1156 const size_t blobLen = (maxContentDumpLen >= 0) ? maxContentDumpLen : queueEntry->embeddedBlob.dataLen;
1157 const size_t len = 200 + QUEUE_ENTRY_EMBEDDEDTYPE_LEN + blobLen;
1158 char *xml = (char *)calloc(len, sizeof(char));
1159 if (xml == 0) {
1160 free(contentStr);
1161 return 0;
1162 }
1163 if (maxContentDumpLen == 0)
1164 *contentStr = 0;
1165 else if (maxContentDumpLen > 0 && queueEntry->embeddedBlob.dataLen > 5 &&
1166 (size_t)maxContentDumpLen < (queueEntry->embeddedBlob.dataLen-5))
1167 strcpy(contentStr+maxContentDumpLen, " ...");
1168
1169 SNPRINTF(xml, len, "\n <QueueEntry id='%s' priority='%hd' persistent='%s' type='%s'>"
1170 "\n <content size='%lu'><![CDATA[%s]]></content>"
1171 "\n <QueueEntry>",
1172 int64ToStr(int64Str, queueEntry->uniqueId), queueEntry->priority,
1173 queueEntry->isPersistent?"true":"false",
1174 queueEntry->embeddedType,
1175 (unsigned long)queueEntry->embeddedBlob.dataLen, contentStr);
1176 free(contentStr);
1177 return xml;
1178 }
1179 }
1180
1181 Dll_Export void freeEntryDump(char *entryDump)
1182 {
1183 if (entryDump) free(entryDump);
1184 }
1185
1186 /**
1187 * Checks the given arguments to be valid.
1188 * @param queueP The queue instance
1189 * @param methodName For logging
1190 * @param checkIsConnected If true does check the connection state as well
1191 * @param exception Transporting errors
1192 * @return false if the parameters are not usable,
1193 * in this case 'exception' is filled with detail informations
1194 */
1195 static bool checkArgs(I_Queue *queueP, const char *methodName,
1196 bool checkIsConnected, ExceptionStruct *exception)
1197 {
1198 if (queueP == 0) {
1199 if (exception == 0) {
1200 printf("[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to %s()\n",
1201 __FILE__, __LINE__, methodName);
1202 }
1203 else {
1204 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
1205 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
1206 "[%.100s:%d] Please provide a valid I_Queue pointer to %.16s()",
1207 __FILE__, __LINE__, methodName);
1208 LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
1209 }
1210 return false;
1211 }
1212
1213 if (exception == 0) {
1214 LOG __FILE__, "[%s:%d] Please provide valid exception pointer to %s()", __FILE__, __LINE__, methodName);
1215 return false;
1216 }
1217
1218 if (checkIsConnected) {
1219 if (queueP->privateObject==0 ||
1220 ((DbInfo *)(queueP->privateObject))->db==0 ||
1221 !queueP->isInitialized) {
1222 strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
1223 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
1224 "[%.100s:%d] Not connected to database, %s() failed",
1225 __FILE__, __LINE__, methodName);
1226 LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
1227 return false;
1228 }
1229 }
1230
1231 initializeExceptionStruct(exception);
1232
1233 LOG __FILE__, "%s() entering ...", methodName);
1234
1235 return true;
1236 }
1237
1238 /*=================== TESTCODE =======================*/
1239 # ifdef QUEUE_MAIN
1240 #include <stdio.h>
1241 static void testRun(int argc, char **argv) {
1242 ExceptionStruct exception;
1243 QueueEntryArr *entries = 0;
1244 QueueProperties queueProperties;
1245 I_Queue *queueP = 0;
1246
1247 memset(&queueProperties, 0, sizeof(QueueProperties));
1248 strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
1249 strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
1250 strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
1251 queueProperties.maxNumOfEntries = 10000000L;
1252 queueProperties.maxNumOfBytes = 1000000000LL;
1253 queueProperties.logFp = xmlBlasterDefaultLogging;
1254 queueProperties.logLevel = XMLBLASTER_LOG_TRACE;
1255 queueProperties.userObject = 0;
1256
1257 queueP = createQueue(&queueProperties, &exception);
1258 /* DbInfo *dbInfo = (DbInfo *)queueP->privateObject; */
1259 if (argc || argv) {} /* to avoid compiler warning */
1260
1261 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1262
1263 {
1264 int64_t idArr[] = { 1081492136826000000ll, 1081492136856000000ll, 1081492136876000000ll };
1265 int16_t prioArr[] = { 5 , 1 , 5 };
1266 char *data[] = { "Hello" , " World" , "!!!" };
1267 size_t i;
1268 for (i=0; i<sizeof(idArr)/sizeof(int64_t); i++) {
1269 QueueEntry queueEntry;
1270 memset(&queueEntry, 0, sizeof(QueueEntry));
1271 queueEntry.priority = prioArr[i];
1272 queueEntry.isPersistent = true;
1273 queueEntry.uniqueId = idArr[i];
1274 strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
1275 queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
1276 queueEntry.embeddedBlob.data = data[i];
1277 queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data);
1278
1279 queueP->put(queueP, &queueEntry, &exception);
1280 if (*exception.errorCode != 0) {
1281 LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1282 }
1283 }
1284 }
1285
1286 entries = queueP->peekWithSamePriority(queueP, -1, 6, &exception);
1287 if (*exception.errorCode != 0) {
1288 LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1289 }
1290 if (entries != 0) {
1291 size_t i;
1292 printf("testRun after peekWithSamePriority() dump %lu entries:\n", (unsigned long)entries->len);
1293 for (i=0; i<entries->len; i++) {
1294 QueueEntry *queueEntry = &entries->queueEntryArr[i];
1295 char *dump = queueEntryToXml(queueEntry, 200);
1296 printf("%s\n", dump);
1297 free(dump);
1298 }
1299 }
1300
1301 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1302 queueP->randomRemove(queueP, entries, &exception);
1303 if (*exception.errorCode != 0) {
1304 LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1305 }
1306
1307 freeQueueEntryArr(entries);
1308 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1309
1310 queueP->clear(queueP, &exception);
1311 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1312
1313 queueP->shutdown(&queueP, &exception);
1314 }
1315
1316 int main(int argc, char **argv) {
1317 int i;
1318 for (i=0; i<1; i++) {
1319 testRun(argc, argv);
1320 }
1321 return 0;
1322 }
1323 #endif /*QUEUE_MAIN*/
1324 /*=================== TESTCODE =======================*/
syntax highlighted by Code2HTML, v. 0.9.1