1 /*----------------------------------------------------------------------------
2 Name: SQLiteQueue.c
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: A persistent queue implementation based on the SQLite relational database
6 Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h)
7 and can easily be used outside of xmlBlaster.
8 Further you need sqlite.h and the sqlite library (dll,so,sl)
9 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info>
10 Date: 04/2004
11 Compile: Compiles at least on Windows, Linux, Solaris. Further porting should be simple.
12 Needs pthread.h but not the pthread library (for exact times)
13
14 export LD_LIBRARY_PATH=/opt/sqlite-bin/lib
15 gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLiteQueue SQLiteQueue.c ../helper.c ../Timeout.c ../Timestampc.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite -lpthread
16 (use optionally -ansi -pedantic -Wno-long-long
17 (Intel C: icc -wd981 ...)
18
19 Compile inside xmlBlaster:
20 build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c
21 expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so
22
23
24 Compile Test main()
25 Replace /I\c\sqlite for your needs ( says where sqlite.h resides ):
26 and \pialibs\sqlite.lib as well ( sqlite.lib is created from sqlite.def via lib /DEF:sqlite.def)
27 cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /D_WINDOWS /I\c\sqlite /I..\.. SqliteQueue.c ..\helper.c /link \pialibs\sqlite.lib
28
29
30 Table layout XB_ENTRIES:
31 dataId bigint
32 queueName text
33 prio integer
34 flag text
35 durable char(1)
36 byteSize bigint
37 blob bytea
38 PRIMARY KEY (dataId, queueName)
39
40 Todo: Tuning:
41 - Add prio to PRIMARY KEY
42 - In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes
43
44 @see: http://www.sqlite.org/
45 @see: http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html
46 @see: http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.html
47 Testsuite: xmlBlaster/testsuite/src/c/TestQueue.c
48 -----------------------------------------------------------------------------*/
49 #if defined(__IPhoneOS__)
50 #include <stdio.h> /* to avoid empty file compiler warning */
51 #endif
52 #if !defined(__IPhoneOS__)
53 #include <stdio.h>
54 #include <string.h>
55
56 #include <malloc.h>
57 #if !defined(_WINDOWS)
58 # include <unistd.h> /* unlink() */
59 # include <errno.h> /* unlink() */
60 #endif
61 #include "util/queue/QueueInterface.h"
62 #include "sqlite.h"
63
64 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception);
65 static const QueueProperties *getProperties(I_Queue *queueP);
66 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception);
67 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception);
68 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception);
69 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception);
70 static int32_t getNumOfEntries(I_Queue *queueP);
71 static int32_t getMaxNumOfEntries(I_Queue *queueP);
72 static int64_t getNumOfBytes(I_Queue *queueP);
73 static int64_t getMaxNumOfBytes(I_Queue *queueP);
74 static bool persistentQueueEmpty(I_Queue *queueP);
75 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception);
76 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception);
77 static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception);
78 static bool createTables(I_Queue *queueP, ExceptionStruct *exception);
79 static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception);
80 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception);
81 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception);
82 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception);
83 static void freeQueueEntryData(QueueEntry *queueEntry);
84
85 /**
86 * Called for each SQL result row and does the specific result parsing depending on the query.
87 * @param userP Pointer on a data struct which contains the parsed data
88 * @return true->to continue, false->to break execution or on error exception->errorCode is not null
89 */
90 typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, void *userP,
91 const char **pazValue, const char **pazColName, ExceptionStruct *exception);
92 static int32_t getResultRows(I_Queue *queueP, const char *methodName,
93 sqlite_vm *pVm,
94 ParseDataFp parseDataFp, void *userP, bool finalize,
95 ExceptionStruct *exception);
96
97 /* Shortcut for:
98 if (queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created");
99 is
100 LOG __FILE__, "Persistent queue is created");
101 */
102 #define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE,
103
104 #define LEN512 512 /* ISO C90 forbids variable-size array: const int LEN512=512; */
105 #define LEN256 256 /* ISO C90 forbids variable-size array: const int LEN256=256; */
106
107 #define DBNAME_MAX 128
108 #define ID_MAX 256
109 /**
110 * Holds Prepared statements for better performance.
111 * @see http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html
112 */
113 typedef struct DbInfoStruct {
114 QueueProperties prop; /** Meta information */
115 size_t numOfEntries; /** Cache for current number of entries */
116 int64_t numOfBytes; /** Cache for current number of bytes */
117 sqlite *db; /** Database handle for SQLite */
118 sqlite_vm *pVm_put; /** SQLite virtual machine to hold a prepared query */
119 sqlite_vm *pVm_peekWithSamePriority;
120 sqlite_vm *pVm_fillCache;
121 } DbInfo;
122
123 /**
124 * Used temporary for peekWithSamePriority().
125 */
126 typedef struct {
127 QueueEntryArr **queueEntryArrPP;
128 int32_t currEntries;
129 int64_t currBytes;
130 int32_t maxNumOfEntries; /** The max wanted number of entries for this peek() */
131 int64_t maxNumOfBytes; /** The max wanted bytes during peek() */
132 } TmpHelper;
133
134 static char int64Str_[INT64_STRLEN_MAX];
135 static char * const int64Str = int64Str_; /* to make the pointer address const */
136
137 /** Column index into XB_ENTRIES table */
138 enum {
139 XB_ENTRIES_DATA_ID = 0,
140 XB_ENTRIES_QUEUE_NAME,
141 XB_ENTRIES_PRIO,
142 XB_ENTRIES_TYPE_NAME,
143 XB_ENTRIES_PERSISTENT,
144 XB_ENTRIES_SIZE_IN_BYTES,
145 XB_ENTRIES_BLOB
146 };
147
148
149 /**
150 * Create a new persistent queue instance.
151 * <br />
152 * @return NULL if bootstrapping failed. If not NULL you need to free() it when you are done
153 * usually by calling shutdown().
154 * @throws exception
155 */
156 Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception)
157 {
158 bool stateOk = true;
159 I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue));
160 if (queueP == 0) return queueP;
161 queueP->isInitialized = false;
162 queueP->initialize = persistentQueueInitialize;
163 queueP->getProperties = getProperties;
164 queueP->put = persistentQueuePut;
165 queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority;
166 queueP->randomRemove = persistentQueueRandomRemove;
167 queueP->clear = persistentQueueClear;
168 queueP->getNumOfEntries = getNumOfEntries;
169 queueP->getMaxNumOfEntries = getMaxNumOfEntries;
170 queueP->getNumOfBytes = getNumOfBytes;
171 queueP->getMaxNumOfBytes = getMaxNumOfBytes;
172 queueP->empty = persistentQueueEmpty;
173 queueP->shutdown = persistentQueueShutdown;
174 queueP->destroy = persistentQueueDestroy;
175 queueP->privateObject = calloc(1, sizeof(DbInfo));
176 {
177 DbInfo *dbInfo = (DbInfo *)queueP->privateObject;
178 dbInfo->numOfEntries = -1;
179 dbInfo->numOfBytes = -1;
180 }
181 stateOk = queueP->initialize(queueP, queueProperties, exception);
182 if (stateOk) {
183 LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created");
184 }
185 else {
186 ExceptionStruct ex;
187 queueP->shutdown(&queueP, &ex);
188 if (*ex.errorCode != 0) {
189 embedException(exception, ex.errorCode, ex.message, exception);
190 }
191 queueP = 0;
192 }
193 return queueP;
194 }
195
196 /** Access the DB handle, queueP pointer is not checked */
197 static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) {
198 return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject);
199 }
200
201 /**
202 * Access the queue configuration.
203 * @param queueP The this pointer
204 * @return Read only access, 0 on error
205 */
206 static const QueueProperties *getProperties(I_Queue *queueP)
207 {
208 ExceptionStruct exception;
209 if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0;
210 return &getDbInfo(queueP)->prop;
211 }
212
213 /**
214 */
215 static void freeQueue(I_Queue **queuePP)
216 {
217 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
218 if (queueP == 0) {
219 fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__);
220 return;
221 }
222
223 LOG __FILE__, "freeQueue() called");
224
225 if (queueP->privateObject) {
226 free(queueP->privateObject);
227 queueP->privateObject = 0;
228 }
229
230 free(queueP);
231 *queuePP = 0;
232 }
233
234 /**
235 * Called internally by createQueue().
236 * @param queueP The this pointer
237 * @param queueProperties The configuration
238 * @param exception Can contain error information (out parameter)
239 * @return true on success
240 */
241 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception)
242 {
243 char *errMsg = 0;
244 bool retOk;
245 const int OPEN_RW = 0;
246 sqlite *db;
247 DbInfo *dbInfo;
248
249 if (checkArgs(queueP, "initialize", false, exception) == false ) return false;
250 if (queueProperties == 0) {
251 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
252 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
253 "[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__);
254 /* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */
255 fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message);
256 return false;
257 }
258
259 queueP->log = queueProperties->logFp;
260 queueP->logLevel = queueProperties->logLevel;
261 queueP->userObject = queueProperties->userObject;
262
263 if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 ||
264 queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) {
265 char dbName[QUEUE_DBNAME_MAX];
266 char queueName[QUEUE_ID_MAX];
267 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
268 if (queueProperties->dbName == 0)
269 strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX);
270 else
271 strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX);
272 if (queueProperties->queueName == 0)
273 strncpy0(queueName, "NULL", QUEUE_ID_MAX);
274 else
275 strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX);
276 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
277 "[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s',"
278 " maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__,
279 dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes);
280 LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
281 return false;
282 }
283
284 dbInfo = getDbInfo(queueP);
285 memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties));
286
287 /* Never trust a queue property you haven't overflowed yourself :-) */
288 dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0;
289 dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0;
290 dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0;
291
292 LOG __FILE__, "dbName = %s", dbInfo->prop.dbName);
293 LOG __FILE__, "queueName = %s", dbInfo->prop.queueName);
294 LOG __FILE__, "tablePrefix = %s", dbInfo->prop.tablePrefix);
295 LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries);
296 LOG __FILE__, "maxNumOfBytes = %ld",(long)dbInfo->prop.maxNumOfBytes);
297 /*LOG __FILE__, "logFp = %d", (int)dbInfo->prop.logFp);*/
298 LOG __FILE__, "logLevel = %d", (int)dbInfo->prop.logLevel);
299 /*LOG __FILE__, "userObject = %d", (void*)dbInfo->prop.userObject);*/
300
301 db = sqlite_open(dbInfo->prop.dbName, OPEN_RW, &errMsg);
302 dbInfo->db = db;
303
304 if (db==0) {
305 queueP->isInitialized = false;
306 if(queueP->log) {
307 if (errMsg) {
308 LOG __FILE__, "%s", errMsg);
309 }
310 else {
311 LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName);
312 }
313 }
314 else {
315 if (errMsg)
316 fprintf(stderr,"[%s] %s\n", __FILE__, errMsg);
317 else
318 fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName);
319 }
320 strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
321 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
322 "[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg);
323 if (errMsg != 0) sqlite_freemem(errMsg);
324 return false;
325 }
326
327 queueP->isInitialized = true;
328
329 retOk = createTables(queueP, exception);
330
331 fillCache(queueP, exception);
332
333 LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed");
334 return true;
335 }
336
337 /**
338 * Create the necessary DB table if not already existing.
339 * @param queueP
340 * @param exception Can contain error information (out parameter)
341 * @return true on success
342 */
343 static bool createTables(I_Queue *queueP, ExceptionStruct *exception)
344 {
345 char queryString[LEN512];
346 bool retOk;
347 const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix;
348
349 SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));",
350 tablePrefix);
351 retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception);
352
353 SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);",
354 tablePrefix, tablePrefix);
355 retOk = execSilent(queueP, queryString, "Creating PRIO index", exception);
356 return retOk;
357 }
358
359 /**
360 * Invoke SQL query.
361 * @param queueP Is not checked, must not be 0
362 * @param queryString The SQL to execute
363 * @param comment For logging or exception text
364 * @param exception Can contain error information (out parameter)
365 * @return true on success
366 */
367 static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception)
368 {
369 int rc = 0;
370 char *errMsg = 0;
371 bool retOk;
372 DbInfo *dbInfo = getDbInfo(queueP);
373
374 rc = sqlite_exec(dbInfo->db, queryString, NULL, NULL, &errMsg);
375 switch (rc) {
376 case SQLITE_OK:
377 LOG __FILE__, "SQL '%s' success", comment);
378 retOk = true;
379 break;
380 default:
381 if (errMsg && strstr(errMsg, "already exists")) {
382 LOG __FILE__, "OK, '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
383 retOk = true;
384 }
385 else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) {
386 LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
387 retOk = true;
388 }
389 else {
390 LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
391 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
392 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
393 "[%.100s:%d] SQL error '%s' [%d]: %s %s", __FILE__, __LINE__, comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
394 retOk = false;
395 }
396 break;
397 }
398 if (errMsg != 0) sqlite_freemem(errMsg);
399 return retOk;
400 }
401
402 /*
403 * This is the callback routine that the SQLite library
404 * invokes for each row of a query result.
405 static int callback(void *pArg, int nArg, char **azArg, char **azCol){
406 int i;
407 struct callback_data *p = (struct callback_data*)pArg;
408 int w = 5;
409 if (p==0) {} // Suppress compiler warning
410 if( azArg==0 ) return 0;
411 for(i=0; i<nArg; i++){
412 int len = strlen(azCol[i]);
413 if( len>w ) w = len;
414 }
415 printf("\n");
416 for(i=0; i<nArg; i++){
417 printf("%*s = %s\n", w, azCol[i], azArg[i] ? azArg[i] : "NULL");
418 }
419 return 0;
420 }
421 */
422
423 /**
424 * @param queueP The queue instance
425 * @param queueEntry The entry
426 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
427 */
428 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception)
429 {
430 int rc = 0;
431 bool stateOk = true;
432 DbInfo *dbInfo;
433 char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */
434
435 if (checkArgs(queueP, "put", true, exception) == false ) return;
436 if (queueEntry == 0) {
437 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
438 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
439 "[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__);
440 return;
441 }
442 if (queueEntry->uniqueId == 0) {
443 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
444 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
445 "[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__);
446 return;
447 }
448 if (*queueEntry->embeddedType == 0) {
449 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
450 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
451 "[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__);
452 return;
453 }
454 strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
455
456 if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) {
457 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
458 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
459 "[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__);
460 return;
461 }
462
463 dbInfo = getDbInfo(queueP);
464
465 if ((int64_t)dbInfo->numOfEntries >= dbInfo->prop.maxNumOfEntries) {
466 strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN);
467 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
468 "[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries);
469 return;
470 }
471 if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) {
472 strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN);
473 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
474 "[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes));
475 return;
476 }
477
478
479 if (dbInfo->pVm_put == 0) { /* Compile prepared query only once */
480 char queryString[LEN256]; /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/
481 SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?)", dbInfo->prop.tablePrefix);
482 stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception);
483 }
484
485 if (stateOk) { /* set prepared statement tokens */
486 char intStr[INT64_STRLEN_MAX];
487 int index = 0;
488 const int len = -1; /* Calculated by sqlite_bind */
489 rc = SQLITE_OK;
490
491 int64ToStr(intStr, queueEntry->uniqueId);
492 /*LOG __FILE__, "put uniqueId as string '%s'", intStr);*/
493 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
494 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, len, false);
495 SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", queueEntry->priority);
496 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
497 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, embeddedType, len, false);
498 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", len, false);
499 SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", (int32_t)queueEntry->embeddedBlob.dataLen);
500 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
501 if (rc == SQLITE_OK) {
502 /* As SQLite does only store strings we encode our blob to a string */
503 size_t estimatedSize = 2 +(257 * queueEntry->embeddedBlob.dataLen )/254;
504 unsigned char *out = (unsigned char *)malloc(estimatedSize*sizeof(char));
505 int encodedSize = sqlite_encode_binary((const unsigned char *)queueEntry->embeddedBlob.data,
506 (int)queueEntry->embeddedBlob.dataLen, out);
507 rc = sqlite_bind(dbInfo->pVm_put, ++index, (const char *)out, encodedSize+1, true);
508 free(out);
509 }
510
511 if (rc != SQLITE_OK) {
512 LOG __FILE__, "put(%s) SQL error: %d %s", int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc));
513 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
514 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
515 "[%.100s:%d] put(%s) SQL error: %d %s", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc));
516 stateOk = false;
517 }
518 }
519
520 if (stateOk) { /* start the query, process results */
521 int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, 0, false, exception);
522 stateOk = countRows >= 0;
523 }
524
525 if (stateOk) {
526 dbInfo->numOfEntries += 1;
527 dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen);
528 }
529
530 LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed");
531 }
532
533
534 /**
535 * Compile a prepared query.
536 * No parameters are checked, they must be valid
537 * @param queueP The queue instance to use
538 * @param methodName A nice string for logging
539 * @param ppVm The virtual machine will be initialized if still 0
540 * @param queryString
541 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
542 * @return false on error and exception->errorCode is not null
543 */
544 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName,
545 sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception)
546 {
547 int iRetry, numRetry=100;
548 char *errMsg = 0;
549 int rc = 0;
550 const char *pzTail = 0; /* OUT: uncompiled tail of zSql */
551 bool stateOk = true;
552 DbInfo *dbInfo = getDbInfo(queueP);
553
554 if (*ppVm == 0) { /* Compile prepared query */
555 for (iRetry = 0; iRetry < numRetry; iRetry++) {
556 rc = sqlite_compile(dbInfo->db, queryString, &pzTail, ppVm, &errMsg);
557 switch (rc) {
558 case SQLITE_BUSY:
559 if (iRetry == (numRetry-1)) {
560 strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN);
561 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
562 "[%.100s:%d] SQL error #%d in %s(): %s %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
563 }
564 LOG __FILE__, "%s() Sleeping as other thread holds DB %s", methodName, (errMsg==0)?"":errMsg);
565 if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
566 sleepMillis(10);
567 break;
568 case SQLITE_OK:
569 iRetry = numRetry; /* We're done */
570 LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString);
571 if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
572 break;
573 default:
574 LOG __FILE__, "SQL error #%d %s in %s(): %s: %s", rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
575 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
576 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
577 "[%.100s:%d] SQL error #%d %s in %s(): %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
578 iRetry = numRetry; /* We're done */
579 if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
580 stateOk = false;
581 break;
582 }
583 }
584 }
585 if (*ppVm == 0) stateOk = false;
586 return stateOk;
587 }
588
589 /**
590 * For each SQL result row parse it into a QueueEntry.
591 * No parameters are checked, they must be valid
592 * Implements a ParseDataFp (function pointer)
593 * @param queueP The 'this' pointer
594 * @param currIndex
595 * @param userP
596 * @param pazValue
597 * @param pazColName
598 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
599 * @return false on error and exception->errorCode is not null
600 */
601 static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, void *userP,
602 const char **pazValue, const char **pazColName, ExceptionStruct *exception)
603 {
604 bool doContinue = true;
605 int numAssigned;
606 bool stateOk = true;
607 int decodeSize = 0;
608 QueueEntry *queueEntry = 0;
609 QueueEntryArr *queueEntryArr;
610 TmpHelper *helper = (TmpHelper*)userP;
611 QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP;
612
613 if (currIndex == 0) {
614 helper->currEntries = 0;
615 helper->currBytes = 0;
616 }
617
618 if (*queueEntryArrPP == 0) {
619 *queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));;
620 if (helper->maxNumOfEntries == 0) {
621 doContinue = false;
622 return doContinue;
623 }
624 }
625 queueEntryArr = *queueEntryArrPP;
626
627 if (queueEntryArr->len == 0) {
628 queueEntryArr->len = 10;
629 queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry));
630 }
631 else if (currIndex >= queueEntryArr->len) {
632 queueEntryArr->len += 10;
633 queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry));
634 }
635 queueEntry = &queueEntryArr->queueEntryArr[currIndex];
636 memset(queueEntry, 0, sizeof(QueueEntry));
637
638 stateOk = strToInt64(&queueEntry->uniqueId, pazValue[XB_ENTRIES_DATA_ID]);
639 if (!stateOk) {
640 LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' to uniqueId, ignoring entry.", pazValue[XB_ENTRIES_DATA_ID]);
641 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
642 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
643 "[%.100s:%d] peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, pazValue[XB_ENTRIES_DATA_ID], pazColName[XB_ENTRIES_DATA_ID]);
644 doContinue = false;
645 return doContinue;
646 }
647
648 LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex);
649 /* strncpy0(dbInfo->prop.queueName, pazValue[2], ID_MAX); */
650 numAssigned = sscanf(pazValue[XB_ENTRIES_PRIO], "%hd", &queueEntry->priority);
651 if (numAssigned != 1) {
652 LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse pazValue[XB_ENTRIES_PRIO] '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_PRIO]);
653 queueEntry->priority = 4;
654 }
655 strncpy0(queueEntry->embeddedType, pazValue[XB_ENTRIES_TYPE_NAME], QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
656 queueEntry->isPersistent = *pazValue[XB_ENTRIES_PERSISTENT] == 'T' ? true : false;
657 {
658 int64_t ival = 0;
659 stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_SIZE_IN_BYTES]);
660 queueEntry->embeddedBlob.dataLen = (size_t)ival;
661 }
662
663 /* TODO!!! in Java the length is the size in RAM and not strlen(data) */
664 /* queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen*sizeof(char)); */
665 queueEntry->embeddedBlob.data = (char *)malloc(strlen(pazValue[XB_ENTRIES_BLOB])*sizeof(char)); /* we spoil some 2 % */
666 decodeSize = sqlite_decode_binary((const unsigned char *)pazValue[XB_ENTRIES_BLOB], (unsigned char *)queueEntry->embeddedBlob.data);
667 if (decodeSize == -1 || (size_t)decodeSize != queueEntry->embeddedBlob.dataLen) {
668 *(queueEntry->embeddedBlob.data + strlen(pazValue[XB_ENTRIES_BLOB]) - 1) = 0;
669 LOG __FILE__, "peekWithSamePriority(%s) ERROR: Returned blob encoded='%s', decodeSize=%d"
670 " but expected decoded len=%d: '%s'",
671 int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_BLOB], decodeSize,
672 queueEntry->embeddedBlob.dataLen, queueEntry->embeddedBlob.data);
673 }
674
675 helper->currEntries += 1;
676 helper->currBytes += queueEntry->embeddedBlob.dataLen;
677
678 /* Limit the number of entries */
679 if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) ||
680 (helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) {
681 /* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */
682 doContinue = false;
683 }
684
685 return doContinue;
686 }
687
688 /**
689 * Execute the query and get the query result.
690 * No parameters are checked, they must be valid
691 * @param queueP The this pointer
692 * @param methodName The method called
693 * @param pVm sqlite virtual machine
694 * @param parseDataFp The function which is called for each SQL result row
695 * or 0 if no function shall be called
696 * @param userP The pointer which is passed to parseDataFp
697 * @param finalize true to call sqlite_finalize which deletes the virtual machine,
698 * false to call sqlite_reset to reuse the prepared query
699 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
700 * @return < 0 on error and exception->errorCode is not null
701 * otherwise the number of successfully parsed rows is returned
702 * @todo For INSERT and DELETE return the number of touched entries !!!
703 */
704 static int32_t getResultRows(I_Queue *queueP, const char *methodName,
705 sqlite_vm *pVm,
706 ParseDataFp parseDataFp, void *userP,
707 bool finalize,
708 ExceptionStruct *exception)
709 {
710 char *errMsg = 0;
711 int32_t currIndex = 0;
712 int numCol = 0;
713 const char **pazValue = 0;
714 const char **pazColName = 0;
715 bool done = false;
716 bool stateOk = true;
717 int rc;
718 while (!done) {
719 rc = sqlite_step(pVm, &numCol, &pazValue, &pazColName);
720 switch( rc ){
721 case SQLITE_DONE:
722 done = true;
723 break;
724 case SQLITE_BUSY:
725 LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName);
726 sleepMillis(10);
727 break;
728 case SQLITE_ROW:
729 {
730 bool doContinue = true;
731 if (parseDataFp) {
732 /* @return true->to continue, false->to break execution or on error exception->errorCode is not null */
733 doContinue = parseDataFp(queueP, currIndex, userP, pazValue, pazColName, exception);
734 stateOk = *exception->errorCode == 0;
735 }
736 else {
737 /*
738 printf("RESULT[%d]\n", iRow);
739 for (iCol = 0; iCol < numCol; iCol++) {
740 printf("%10.10s = %s\n", pazColName[iCol], pazValue[iCol]);
741 }
742 */
743 }
744 currIndex++;
745 if (!stateOk || !doContinue) done = true;
746 }
747 break;
748 case SQLITE_ERROR: /* If exists already */
749 LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc);
750 done = true;
751 stateOk = false;
752 break;
753 case SQLITE_MISUSE:
754 default:
755 LOG __FILE__, "%s() SQL execution problem [sqlCode=%d %s]", methodName, rc, sqlite_error_string(rc));
756 done = true;
757 stateOk = false;
758 break;
759 }
760 }
761 LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex);
762
763 if (finalize) {
764 sqlite_finalize(pVm, &errMsg);
765 if (rc != SQLITE_OK && rc != SQLITE_DONE) {
766 LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled. %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg);
767 }
768 if (errMsg != 0) sqlite_freemem(errMsg);
769 }
770 else { /* Reset prepared statement */
771 rc = sqlite_reset(pVm, &errMsg);
772 if (rc == SQLITE_SCHEMA) {
773 LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg);
774 }
775 if (errMsg != 0) sqlite_freemem(errMsg);
776 }
777
778 return stateOk ? currIndex : (-1)*rc;
779 }
780
781 /**
782 * Access queue entries without removing them.
783 */
784 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception)
785 {
786 int rc = 0;
787 bool stateOk = true;
788 DbInfo *dbInfo;
789 QueueEntryArr *queueEntryArr = 0;
790
791 if (checkArgs(queueP, "peekWithSamePriority", true, exception) == false ) return 0;
792
793 LOG __FILE__, "peekWithSamePriority(maxNumOfEntries=%d, maxNumOfBytes=%s) ...", (int)maxNumOfEntries, int64ToStr(int64Str, maxNumOfBytes));
794
795 dbInfo = getDbInfo(queueP);
796
797 if (dbInfo->pVm_peekWithSamePriority == 0) { /* Compile prepared query */
798 char queryString[LEN512];
799 /*"SELECT * FROM XB_ENTRIES where queueName='connection_clientJoe' and prio=(select max(prio) from XB_ENTRIES where queueName='connection_clientJoe') ORDER BY dataId ASC";*/
800 SNPRINTF(queryString, LEN512,
801 "SELECT * FROM %.20sENTRIES where queueName=?"
802 " and prio=(select max(prio) from %.20sENTRIES where queueName=?)"
803 " ORDER BY dataId ASC",
804 dbInfo->prop.tablePrefix, dbInfo->prop.tablePrefix);
805 stateOk = compilePreparedQuery(queueP, "peekWithSamePriority",
806 &dbInfo->pVm_peekWithSamePriority , queryString, exception);
807 }
808
809 if (stateOk) { /* set prepared statement tokens */
810 int index = 0;
811 int len = -1; /* Calculated by sqlite_bind */
812 rc = SQLITE_OK;
813
814 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);
815 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);
816
817 switch (rc) {
818 case SQLITE_OK:
819 LOG __FILE__, "peekWithSamePriority() Bound to prepared statement [sqlCode=%d]", rc);
820 break;
821 default:
822 LOG __FILE__, "peekWithSamePriority() SQL error: %d %s", rc, sqlite_error_string(rc));
823 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
824 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
825 "[%.100s:%d] peekWithSamePriority() SQL error: %d %s", __FILE__, __LINE__, rc, sqlite_error_string(rc));
826 stateOk = false;
827 break;
828 }
829 }
830
831 if (stateOk) { /* start the query */
832 TmpHelper helper;
833 int32_t currIndex = 0;
834 helper.queueEntryArrPP = &queueEntryArr;
835 helper.maxNumOfEntries = maxNumOfEntries;
836 helper.maxNumOfBytes = maxNumOfBytes;
837 currIndex = getResultRows(queueP, "peekWithSamePriority",
838 dbInfo->pVm_peekWithSamePriority, parseQueueEntryArr,
839 &helper, false, exception);
840 stateOk = currIndex >= 0;
841 if (!stateOk) {
842 if (queueEntryArr) {
843 free(queueEntryArr->queueEntryArr);
844 queueEntryArr->len = 0;
845 }
846 }
847 else {
848 if (!queueEntryArr)
849 queueEntryArr = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));
850 else if ((size_t)currIndex < queueEntryArr->len) {
851 queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, currIndex * sizeof(QueueEntry));
852 queueEntryArr->len = currIndex;
853 }
854 }
855 }
856
857 LOG __FILE__, "peekWithSamePriority() %s", stateOk ? "done" : "failed");
858 return queueEntryArr;
859 }
860
861 /**
862 * Removes the given entries from persistence.
863 * @return The number of removed entries
864 */
865 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception)
866 {
867 bool stateOk = true;
868 int64_t numOfBytes = 0;
869 int32_t countDeleted = 0;
870 sqlite_vm *pVm = 0;
871 DbInfo *dbInfo;
872 if (checkArgs(queueP, "randomRemove", true, exception) == false || queueEntryArr == 0 ||
873 queueEntryArr->len == 0 || queueEntryArr->queueEntryArr == 0)
874 return 0;
875
876 LOG __FILE__, "randomRemove(%d) ...", (int)queueEntryArr->len);
877
878 dbInfo = getDbInfo(queueP);
879
880 {
881 size_t i;
882 const size_t qLen = 128 + 2*ID_MAX + queueEntryArr->len*(INT64_STRLEN_MAX+6);
883 char *queryString = (char *)calloc(qLen, sizeof(char));
884 /* DELETE FROM xb_entries WHERE queueName = 'connection_clientJoe' AND dataId in ( 1081492136876000000, 1081492136856000000 ); */
885 SNPRINTF(queryString, qLen,
886 "DELETE FROM %.20sENTRIES WHERE queueName='%s'"
887 " AND dataId in ( ",
888 dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
889
890 for (i=0; i<queueEntryArr->len; i++) {
891 strcat(queryString, int64ToStr(int64Str, queueEntryArr->queueEntryArr[i].uniqueId));
892 if (i<(queueEntryArr->len-1)) strcat(queryString, ",");
893 numOfBytes += ((queueEntryArr->queueEntryArr[i].sizeInBytes > 0) ? queueEntryArr->queueEntryArr[i].sizeInBytes : queueEntryArr->queueEntryArr[i].embeddedBlob.dataLen);
894 }
895 strcat(queryString, " )");
896 stateOk = compilePreparedQuery(queueP, "randomRemove", &pVm, queryString, exception);
897 free(queryString);
898 }
899
900
901 if (stateOk) { /* start the query */
902 int32_t currIndex = getResultRows(queueP, "randomRemove",
903 pVm, 0, 0, true, exception);
904 stateOk = currIndex >= 0;
905 }
906
907 if (stateOk) {
908 countDeleted = (int32_t)sqlite_last_statement_changes(dbInfo->db);
909 if (countDeleted < 0 || (size_t)countDeleted != queueEntryArr->len) {
910 fillCache(queueP, exception); /* calculate numOfBytes again */
911 }
912 else {
913 dbInfo->numOfEntries -= queueEntryArr->len;
914 dbInfo->numOfBytes -= numOfBytes;
915 }
916 }
917
918 return countDeleted;
919 }
920
921 /**
922 * Destroy all entries in queue and releases all resources in memory and on HD.
923 */
924 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception)
925 {
926 bool stateOk = true;
927 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
928 if (checkArgs(queueP, "destroy", false, exception) == false ) return false;
929
930 shutdownInternal(queuePP, exception);
931
932 {
933 DbInfo *dbInfo = getDbInfo(queueP);
934 const char *dbName = dbInfo->prop.dbName;
935 stateOk = unlink(dbName) == 0; /* Delete old db file */
936 if (!stateOk) {
937 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
938 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
939 "[%.100s:%d] destroy() ERROR: Can't destroy database '%s', errno=%d.", __FILE__, __LINE__, dbName, errno);
940 }
941 }
942
943 freeQueue(queuePP);
944
945 return stateOk;
946 }
947
948 /**
949 * Destroy all entries in queue.
950 */
951 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception)
952 {
953 bool stateOk = true;
954 char queryString[LEN256];
955 sqlite_vm *pVm = 0;
956 DbInfo *dbInfo;
957 if (checkArgs(queueP, "clear", true, exception) == false) return false;
958 dbInfo = getDbInfo(queueP);
959
960 SNPRINTF(queryString, LEN256, "DELETE FROM %.20sENTRIES", dbInfo->prop.tablePrefix);
961 stateOk = compilePreparedQuery(queueP, "clear", &pVm, queryString, exception);
962
963 if (stateOk) {
964 int32_t currIndex = getResultRows(queueP, "clear", pVm, 0, 0, true, exception);
965 stateOk = currIndex >= 0;
966 }
967
968 if (stateOk) {
969 dbInfo->numOfEntries = 0;
970 dbInfo->numOfBytes = 0;
971 }
972
973 LOG __FILE__, "clear() done");
974 return stateOk;
975 }
976
977 /**
978 * Parse response of "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
979 */
980 static bool parseCacheInfo(I_Queue *queueP, size_t currIndex, void *userP,
981 const char **pazValue, const char **pazColName, ExceptionStruct *exception)
982 {
983 int64_t ival = 0;
984 bool stateOk;
985 DbInfo *dbInfo = getDbInfo(queueP);
986
987 stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_DATA_ID]);
988 if (!stateOk) {
989 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
990 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
991 "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfEntries, ignoring entry.", __FILE__, __LINE__, pazColName[XB_ENTRIES_DATA_ID], pazValue[XB_ENTRIES_DATA_ID]);
992 return false;
993 }
994 dbInfo->numOfEntries = (int32_t)ival;
995
996 stateOk = strToInt64(&dbInfo->numOfBytes, pazValue[1]);
997 if (!stateOk) {
998 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
999 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
1000 "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfBytes, ignoring entry.", __FILE__, __LINE__, pazColName[1], pazValue[1]);
1001 if (currIndex) {} /* Just to avoid compiler warning about unused variable */
1002 if (userP) {};
1003 return false;
1004 }
1005
1006 return true;
1007 }
1008
1009 /**
1010 * Reload cached information from database.
1011 * @param queueP The this pointer
1012 * @param exception Returns error
1013 * @return false on error
1014 */
1015 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception)
1016 {
1017 bool stateOk = true;
1018 DbInfo *dbInfo = 0;
1019
1020 char queryString[LEN512]; /* "SELECT count(dataId) FROM XB_ENTRIES where queueName='connection_clientJoe'" */
1021
1022 if (checkArgs(queueP, "fillCache", true, exception) == false ) return true;
1023 dbInfo = getDbInfo(queueP);
1024
1025 SNPRINTF(queryString, LEN512,
1026 "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
1027 dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
1028 stateOk = compilePreparedQuery(queueP, "fillCache",
1029 &dbInfo->pVm_fillCache, queryString, exception);
1030
1031 if (stateOk) { /* start the query, calls parseCacheInfo() */
1032 int32_t currIndex = getResultRows(queueP, "fillCache",
1033 dbInfo->pVm_fillCache, parseCacheInfo,
1034 0, false, exception);
1035 stateOk = currIndex > 0;
1036 }
1037
1038 LOG __FILE__, "fillCache() numOfEntries=%d numOfBytes=%s", dbInfo->numOfEntries, int64ToStr(int64Str, dbInfo->numOfBytes));
1039 return stateOk;
1040 }
1041
1042 static bool persistentQueueEmpty(I_Queue *queueP)
1043 {
1044 return getNumOfEntries(queueP) <= 0;
1045 }
1046
1047 static int32_t getNumOfEntries(I_Queue *queueP)
1048 {
1049 DbInfo *dbInfo;
1050 bool stateOk = true;
1051 ExceptionStruct exception;
1052 if (checkArgs(queueP, "getNumOfEntries", false, &exception) == false ) return -1;
1053 dbInfo = getDbInfo(queueP);
1054 if (dbInfo->numOfEntries == -1) {
1055 stateOk = fillCache(queueP, &exception);
1056 }
1057 return (stateOk) ? (int32_t)dbInfo->numOfEntries : -1;
1058 }
1059
1060 static int32_t getMaxNumOfEntries(I_Queue *queueP)
1061 {
1062 DbInfo *dbInfo;
1063 ExceptionStruct exception;
1064 if (checkArgs(queueP, "getMaxNumOfEntries", false, &exception) == false ) return -1;
1065 dbInfo = getDbInfo(queueP);
1066 return dbInfo->prop.maxNumOfEntries;
1067 }
1068
1069 static int64_t getNumOfBytes(I_Queue *queueP)
1070 {
1071 DbInfo *dbInfo;
1072 ExceptionStruct exception;
1073 bool stateOk = true;
1074 if (checkArgs(queueP, "getNumOfBytes", false, &exception) == false ) return -1;
1075 dbInfo = getDbInfo(queueP);
1076 if (dbInfo->numOfBytes == -1) {
1077 stateOk = fillCache(queueP, &exception);
1078 }
1079 return (stateOk) ? dbInfo->numOfBytes : -1;
1080 }
1081
1082 static int64_t getMaxNumOfBytes(I_Queue *queueP)
1083 {
1084 DbInfo *dbInfo;
1085 ExceptionStruct exception;
1086 if (checkArgs(queueP, "getMaxNumOfBytes", false, &exception) == false ) return -1;
1087 dbInfo = getDbInfo(queueP);
1088 return dbInfo->prop.maxNumOfBytes;
1089 }
1090
1091 /**
1092 * Shutdown without destroying any entry.
1093 * Clears all open DB resources.
1094 */
1095 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception)
1096 {
1097 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
1098 if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
1099 shutdownInternal(queuePP, exception);
1100 freeQueue(queuePP);
1101 }
1102
1103 /**
1104 * Shutdown used internally without calling freeQueue().
1105 */
1106 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception)
1107 {
1108 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
1109 if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
1110 {
1111 DbInfo *dbInfo = getDbInfo(queueP);
1112 queueP->isInitialized = false;
1113 if(dbInfo) {
1114 if (dbInfo->pVm_put) {
1115 char *errMsg = 0;
1116 /*int rc =*/ sqlite_finalize(dbInfo->pVm_put, &errMsg);
1117 if (errMsg != 0) sqlite_freemem(errMsg);
1118 dbInfo->pVm_put = 0;
1119 }
1120 if (dbInfo->pVm_peekWithSamePriority) {
1121 char *errMsg = 0;
1122 sqlite_finalize(dbInfo->pVm_peekWithSamePriority, &errMsg);
1123 if (errMsg != 0) sqlite_freemem(errMsg);
1124 dbInfo->pVm_peekWithSamePriority = 0;
1125 }
1126 if (dbInfo->pVm_fillCache) {
1127 char *errMsg = 0;
1128 sqlite_finalize(dbInfo->pVm_fillCache, &errMsg);
1129 if (errMsg != 0) sqlite_freemem(errMsg);
1130 dbInfo->pVm_fillCache = 0;
1131 }
1132 if (dbInfo->db) {
1133 sqlite_close(dbInfo->db);
1134 dbInfo->db = 0;
1135 }
1136 LOG __FILE__, "shutdown() done");
1137 }
1138 }
1139 }
1140
1141 /**
1142 * Frees everything inside QueueEntryArr and the struct QueueEntryArr itself
1143 * @param queueEntryArr The struct to free, passing NULL is OK
1144 */
1145 Dll_Export void freeQueueEntryArr(QueueEntryArr *queueEntryArr)
1146 {
1147 if (queueEntryArr == (QueueEntryArr *)0) return;
1148 freeQueueEntryArrInternal(queueEntryArr);
1149 free(queueEntryArr);
1150 }
1151
1152 /**
1153 * Frees everything inside QueueEntryArr but NOT the struct QueueEntryArr itself
1154 * @param queueEntryArr The struct internals to free, passing NULL is OK
1155 */
1156 Dll_Export void freeQueueEntryArrInternal(QueueEntryArr *queueEntryArr)
1157 {
1158 size_t i;
1159 if (queueEntryArr == (QueueEntryArr *)0) return;
1160 for (i=0; i<queueEntryArr->len; i++) {
1161 freeQueueEntryData(&queueEntryArr->queueEntryArr[i]);
1162 }
1163 free(queueEntryArr->queueEntryArr);
1164 queueEntryArr->len = 0;
1165 }
1166
1167 /**
1168 * Does not free the queueEntry itself
1169 */
1170 static void freeQueueEntryData(QueueEntry *queueEntry)
1171 {
1172 if (queueEntry == (QueueEntry *)0) return;
1173 if (queueEntry->embeddedBlob.data != 0) {
1174 free((char *)queueEntry->embeddedBlob.data);
1175 queueEntry->embeddedBlob.data = 0;
1176 }
1177 queueEntry->embeddedBlob.dataLen = 0;
1178 }
1179
1180 /**
1181 * Frees the internal blob and the queueEntry itself.
1182 * @param queueEntry Its memory is freed, it is not usable anymore after this call
1183 */
1184 Dll_Export void freeQueueEntry(QueueEntry *queueEntry)
1185 {
1186 if (queueEntry == (QueueEntry *)0) return;
1187 freeQueueEntryData(queueEntry);
1188 free(queueEntry);
1189 }
1190
1191 /**
1192 * NOTE: You need to free the returned pointer with xmlBlasterFree() (which calls free())!
1193 *
1194 * @param queueEntry The data to put to the queue
1195 * @param maxContentDumpLen for -1 get the complete content, else limit the
1196 * content to the given number of bytes
1197 * @return A ASCII XML formatted entry or NULL if out of memory
1198 */
1199 Dll_Export char *queueEntryToXml(QueueEntry *queueEntry, int maxContentDumpLen)
1200 {
1201 if (queueEntry == (QueueEntry *)0) return 0;
1202 {
1203 char *contentStr = strFromBlobAlloc(queueEntry->embeddedBlob.data, queueEntry->embeddedBlob.dataLen);
1204 const size_t blobLen = (maxContentDumpLen >= 0) ? maxContentDumpLen : queueEntry->embeddedBlob.dataLen;
1205 const size_t len = 200 + QUEUE_ENTRY_EMBEDDEDTYPE_LEN + blobLen;
1206 char *xml = (char *)calloc(len, sizeof(char));
1207 if (xml == 0) {
1208 free(contentStr);
1209 return 0;
1210 }
1211 if (maxContentDumpLen == 0)
1212 *contentStr = 0;
1213 else if (maxContentDumpLen > 0 && queueEntry->embeddedBlob.dataLen > 5 &&
1214 (size_t)maxContentDumpLen < (queueEntry->embeddedBlob.dataLen-5))
1215 strcpy(contentStr+maxContentDumpLen, " ...");
1216
1217 SNPRINTF(xml, len, "\n <QueueEntry id='%s' priority='%hd' persistent='%s' type='%s'>"
1218 "\n <content size='%lu'><![CDATA[%s]]></content>"
1219 "\n <QueueEntry>",
1220 int64ToStr(int64Str, queueEntry->uniqueId), queueEntry->priority,
1221 queueEntry->isPersistent?"true":"false",
1222 queueEntry->embeddedType,
1223 (unsigned long)queueEntry->embeddedBlob.dataLen, contentStr);
1224 free(contentStr);
1225 return xml;
1226 }
1227 }
1228
1229 Dll_Export void freeEntryDump(char *entryDump)
1230 {
1231 if (entryDump) free(entryDump);
1232 }
1233
1234 /**
1235 * Checks the given arguments to be valid.
1236 * @param queueP The queue instance
1237 * @param methodName For logging
1238 * @param checkIsConnected If true does check the connection state as well
1239 * @param exception Transporting errors
1240 * @return false if the parameters are not usable,
1241 * in this case 'exception' is filled with detail informations
1242 */
1243 static bool checkArgs(I_Queue *queueP, const char *methodName,
1244 bool checkIsConnected, ExceptionStruct *exception)
1245 {
1246 if (queueP == 0) {
1247 if (exception == 0) {
1248 printf("[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to %s()\n",
1249 __FILE__, __LINE__, methodName);
1250 }
1251 else {
1252 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
1253 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
1254 "[%.100s:%d] Please provide a valid I_Queue pointer to %.16s()",
1255 __FILE__, __LINE__, methodName);
1256 LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
1257 }
1258 return false;
1259 }
1260
1261 if (exception == 0) {
1262 LOG __FILE__, "[%s:%d] Please provide valid exception pointer to %s()", __FILE__, __LINE__, methodName);
1263 return false;
1264 }
1265
1266 if (checkIsConnected) {
1267 if (queueP->privateObject==0 ||
1268 ((DbInfo *)(queueP->privateObject))->db==0 ||
1269 !queueP->isInitialized) {
1270 strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
1271 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
1272 "[%.100s:%d] Not connected to database, %s() failed",
1273 __FILE__, __LINE__, methodName);
1274 LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
1275 return false;
1276 }
1277 }
1278
1279 initializeExceptionStruct(exception);
1280
1281 LOG __FILE__, "%s() entering ...", methodName);
1282
1283 return true;
1284 }
1285
1286 /*=================== TESTCODE =======================*/
1287 # ifdef QUEUE_MAIN
1288 /*
1289 NOTE:
1290 This code may be totally outdated, for current examples please use:
1291 xmlBlaster/testsuite/src/c/TestQueue.c
1292 */
1293 #include <stdio.h>
1294 static void testRun(int argc, char **argv) {
1295 ExceptionStruct exception;
1296 QueueEntryArr *entries = 0;
1297 QueueProperties queueProperties;
1298 I_Queue *queueP = 0;
1299
1300 memset(&queueProperties, 0, sizeof(QueueProperties));
1301 strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
1302 strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
1303 strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
1304 queueProperties.maxNumOfEntries = 10000000L;
1305 queueProperties.maxNumOfBytes = 1000000000LL;
1306 queueProperties.logFp = xmlBlasterDefaultLogging;
1307 queueProperties.logLevel = XMLBLASTER_LOG_TRACE;
1308 queueProperties.userObject = 0;
1309
1310 queueP = createQueue(&queueProperties, &exception);
1311 /* DbInfo *dbInfo = (DbInfo *)queueP->privateObject; */
1312 if (argc || argv) {} /* to avoid compiler warning */
1313
1314 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1315
1316 {
1317 int64_t idArr[] = { 1081492136826000000ll, 1081492136856000000ll, 1081492136876000000ll };
1318 int16_t prioArr[] = { 5 , 1 , 5 };
1319 char *data[] = { "Hello" , " World" , "!!!" };
1320 size_t i;
1321 for (i=0; i<sizeof(idArr)/sizeof(int64_t); i++) {
1322 QueueEntry queueEntry;
1323 memset(&queueEntry, 0, sizeof(QueueEntry));
1324 queueEntry.priority = prioArr[i];
1325 queueEntry.isPersistent = true;
1326 queueEntry.uniqueId = idArr[i];
1327 strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
1328 queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
1329 queueEntry.embeddedBlob.data = data[i];
1330 queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data);
1331
1332 queueP->put(queueP, &queueEntry, &exception);
1333 if (*exception.errorCode != 0) {
1334 LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1335 }
1336 }
1337 }
1338
1339 entries = queueP->peekWithSamePriority(queueP, -1, 6, &exception);
1340 if (*exception.errorCode != 0) {
1341 LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1342 }
1343 if (entries != 0) {
1344 size_t i;
1345 printf("testRun after peekWithSamePriority() dump %lu entries:\n", (unsigned long)entries->len);
1346 for (i=0; i<entries->len; i++) {
1347 QueueEntry *queueEntry = &entries->queueEntryArr[i];
1348 char *dump = queueEntryToXml(queueEntry, 200);
1349 printf("%s\n", dump);
1350 free(dump);
1351 }
1352 }
1353
1354 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1355 queueP->randomRemove(queueP, entries, &exception);
1356 if (*exception.errorCode != 0) {
1357 LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1358 }
1359
1360 freeQueueEntryArr(entries);
1361 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1362
1363 queueP->clear(queueP, &exception);
1364 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1365
1366 queueP->shutdown(&queueP, &exception);
1367 }
1368
1369 int main(int argc, char **argv) {
1370 int i;
1371 for (i=0; i<1; i++) {
1372 testRun(argc, argv);
1373 }
1374 return 0;
1375 }
1376 #endif /*QUEUE_MAIN*/
1377 #endif /* (__IPhoneOS__) */
1378 /*=================== TESTCODE =======================*/
syntax highlighted by Code2HTML, v. 0.9.1