00001 /*---------------------------------------------------------------------------- 00002 Name: SQLiteQueue.c 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: A persistent queue implementation based on the SQLite relational database 00006 Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h) 00007 and can easily be used outside of xmlBlaster. 00008 Further you need sqlite.h and the sqlite library (dll,so,sl) 00009 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info> 00010 Date: 04/2004 00011 Compile: Compiles at least on Windows, Linux, Solaris. Further porting should be simple. 00012 Needs pthread.h but not the pthread library (for exact times) 00013 00014 export LD_LIBRARY_PATH=/opt/sqlite-bin/lib 00015 gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLiteQueue SQLiteQueue.c ../helper.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite 00016 (use optionally -ansi -pedantic -Wno-long-long 00017 (Intel C: icc -wd981 ...) 00018 00019 Compile inside xmlBlaster: 00020 build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c 00021 expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so 00022 00023 00024 Compile Test main() 00025 Replace /I\c\sqlite for your needs ( says where sqlite.h resides ): 00026 and \pialibs\sqlite.lib as well ( sqlite.lib is created from sqlite.def via lib /DEF:sqlite.def) 00027 cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /D_WINDOWS /I\c\sqlite /I..\.. SqliteQueue.c ..\helper.c /link \pialibs\sqlite.lib 00028 00029 00030 Table layout XB_ENTRIES: 00031 dataId bigint 00032 queueName text 00033 prio integer 00034 flag text 00035 durable char(1) 00036 byteSize bigint 00037 blob bytea 00038 PRIMARY KEY (dataId, queueName) 00039 00040 Todo: Tuning: 00041 - Add prio to PRIMARY KEY 00042 - In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes 00043 00044 @see: http://www.sqlite.org/ 00045 @see: http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html 00046 @see: http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.html 00047 Testsuite: xmlBlaster/testsuite/src/c/TestQueue.c 00048 -----------------------------------------------------------------------------*/ 00049 #include <stdio.h> 00050 #include <string.h> 00051 #include <malloc.h> 00052 #if !defined(_WINDOWS) 00053 # include <unistd.h> /* unlink() */ 00054 # include <errno.h> /* unlink() */ 00055 #endif 00056 #include "util/queue/QueueInterface.h" 00057 #include "sqlite.h" 00058 00059 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception); 00060 static const QueueProperties *getProperties(I_Queue *queueP); 00061 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception); 00062 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception); 00063 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception); 00064 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception); 00065 static int32_t getNumOfEntries(I_Queue *queueP); 00066 static int32_t getMaxNumOfEntries(I_Queue *queueP); 00067 static int64_t getNumOfBytes(I_Queue *queueP); 00068 static int64_t getMaxNumOfBytes(I_Queue *queueP); 00069 static bool persistentQueueEmpty(I_Queue *queueP); 00070 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception); 00071 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception); 00072 static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception); 00073 static bool createTables(I_Queue *queueP, ExceptionStruct *exception); 00074 static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception); 00075 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception); 00076 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception); 00077 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception); 00078 static void freeQueueEntryData(QueueEntry *queueEntry); 00079 00085 typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, void *userP, 00086 const char **pazValue, const char **pazColName, ExceptionStruct *exception); 00087 static int32_t getResultRows(I_Queue *queueP, const char *methodName, 00088 sqlite_vm *pVm, 00089 ParseDataFp parseDataFp, void *userP, bool finalize, 00090 ExceptionStruct *exception); 00091 00092 /* Shortcut for: 00093 if (queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created"); 00094 is 00095 LOG __FILE__, "Persistent queue is created"); 00096 */ 00097 #define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE, 00098 00099 #define LEN512 512 /* ISO C90 forbids variable-size array: const int LEN512=512; */ 00100 #define LEN256 256 /* ISO C90 forbids variable-size array: const int LEN256=256; */ 00101 00102 #define DBNAME_MAX 128 00103 #define ID_MAX 256 00104 00108 typedef struct DbInfoStruct { 00109 QueueProperties prop; 00110 size_t numOfEntries; 00111 int64_t numOfBytes; 00112 sqlite *db; 00113 sqlite_vm *pVm_put; 00114 sqlite_vm *pVm_peekWithSamePriority; 00115 sqlite_vm *pVm_fillCache; 00116 } DbInfo; 00117 00121 typedef struct { 00122 QueueEntryArr **queueEntryArrPP; 00123 int32_t currEntries; 00124 int64_t currBytes; 00125 int32_t maxNumOfEntries; 00126 int64_t maxNumOfBytes; 00127 } TmpHelper; 00128 00129 static char int64Str_[INT64_STRLEN_MAX]; 00130 static char * const int64Str = int64Str_; /* to make the pointer address const */ 00131 00133 enum { 00134 XB_ENTRIES_DATA_ID = 0, 00135 XB_ENTRIES_QUEUE_NAME, 00136 XB_ENTRIES_PRIO, 00137 XB_ENTRIES_TYPE_NAME, 00138 XB_ENTRIES_PERSISTENT, 00139 XB_ENTRIES_SIZE_IN_BYTES, 00140 XB_ENTRIES_BLOB 00141 }; 00142 00143 00151 Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception) 00152 { 00153 bool stateOk = true; 00154 I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue)); 00155 if (queueP == 0) return queueP; 00156 queueP->isInitialized = false; 00157 queueP->initialize = persistentQueueInitialize; 00158 queueP->getProperties = getProperties; 00159 queueP->put = persistentQueuePut; 00160 queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority; 00161 queueP->randomRemove = persistentQueueRandomRemove; 00162 queueP->clear = persistentQueueClear; 00163 queueP->getNumOfEntries = getNumOfEntries; 00164 queueP->getMaxNumOfEntries = getMaxNumOfEntries; 00165 queueP->getNumOfBytes = getNumOfBytes; 00166 queueP->getMaxNumOfBytes = getMaxNumOfBytes; 00167 queueP->empty = persistentQueueEmpty; 00168 queueP->shutdown = persistentQueueShutdown; 00169 queueP->destroy = persistentQueueDestroy; 00170 queueP->privateObject = calloc(1, sizeof(DbInfo)); 00171 { 00172 DbInfo *dbInfo = (DbInfo *)queueP->privateObject; 00173 dbInfo->numOfEntries = -1; 00174 dbInfo->numOfBytes = -1; 00175 } 00176 stateOk = queueP->initialize(queueP, queueProperties, exception); 00177 if (stateOk) { 00178 LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created"); 00179 } 00180 else { 00181 ExceptionStruct ex; 00182 queueP->shutdown(&queueP, &ex); 00183 if (*ex.errorCode != 0) { 00184 embedException(exception, ex.errorCode, ex.message, exception); 00185 } 00186 queueP = 0; 00187 } 00188 return queueP; 00189 } 00190 00192 static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) { 00193 return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject); 00194 } 00195 00201 static const QueueProperties *getProperties(I_Queue *queueP) 00202 { 00203 ExceptionStruct exception; 00204 if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0; 00205 return &getDbInfo(queueP)->prop; 00206 } 00207 00210 static void freeQueue(I_Queue **queuePP) 00211 { 00212 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP; 00213 if (queueP == 0) { 00214 fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__); 00215 return; 00216 } 00217 00218 LOG __FILE__, "freeQueue() called"); 00219 00220 if (queueP->privateObject) { 00221 free(queueP->privateObject); 00222 queueP->privateObject = 0; 00223 } 00224 00225 free(queueP); 00226 *queuePP = 0; 00227 } 00228 00236 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception) 00237 { 00238 char *errMsg = 0; 00239 bool retOk; 00240 const int OPEN_RW = 0; 00241 sqlite *db; 00242 DbInfo *dbInfo; 00243 00244 if (checkArgs(queueP, "initialize", false, exception) == false ) return false; 00245 if (queueProperties == 0) { 00246 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00247 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00248 "[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__); 00249 /* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */ 00250 fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message); 00251 return false; 00252 } 00253 00254 queueP->log = queueProperties->logFp; 00255 queueP->logLevel = queueProperties->logLevel; 00256 queueP->userObject = queueProperties->userObject; 00257 00258 if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 || 00259 queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) { 00260 char dbName[QUEUE_DBNAME_MAX]; 00261 char queueName[QUEUE_ID_MAX]; 00262 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00263 if (queueProperties->dbName == 0) 00264 strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX); 00265 else 00266 strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX); 00267 if (queueProperties->queueName == 0) 00268 strncpy0(queueName, "NULL", QUEUE_ID_MAX); 00269 else 00270 strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX); 00271 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00272 "[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s'," 00273 " maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__, 00274 dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes); 00275 LOG __FILE__, "%s: %s", exception->errorCode, exception->message); 00276 return false; 00277 } 00278 00279 dbInfo = getDbInfo(queueP); 00280 memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties)); 00281 00282 /* Never trust a queue property you haven't overflowed yourself :-) */ 00283 dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0; 00284 dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0; 00285 dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0; 00286 00287 LOG __FILE__, "dbName = %s", dbInfo->prop.dbName); 00288 LOG __FILE__, "queueName = %s", dbInfo->prop.queueName); 00289 LOG __FILE__, "tablePrefix = %s", dbInfo->prop.tablePrefix); 00290 LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries); 00291 LOG __FILE__, "maxNumOfBytes = %ld",(long)dbInfo->prop.maxNumOfBytes); 00292 /*LOG __FILE__, "logFp = %d", (int)dbInfo->prop.logFp);*/ 00293 LOG __FILE__, "logLevel = %d", (int)dbInfo->prop.logLevel); 00294 /*LOG __FILE__, "userObject = %d", (void*)dbInfo->prop.userObject);*/ 00295 00296 db = sqlite_open(dbInfo->prop.dbName, OPEN_RW, &errMsg); 00297 dbInfo->db = db; 00298 00299 if (db==0) { 00300 queueP->isInitialized = false; 00301 if(queueP->log) { 00302 if (errMsg) { 00303 LOG __FILE__, "%s", errMsg); 00304 } 00305 else { 00306 LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName); 00307 } 00308 } 00309 else { 00310 if (errMsg) 00311 fprintf(stderr,"[%s] %s\n", __FILE__, errMsg); 00312 else 00313 fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName); 00314 } 00315 strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00316 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00317 "[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg); 00318 if (errMsg != 0) sqlite_freemem(errMsg); 00319 return false; 00320 } 00321 00322 queueP->isInitialized = true; 00323 00324 retOk = createTables(queueP, exception); 00325 00326 fillCache(queueP, exception); 00327 00328 LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed"); 00329 return true; 00330 } 00331 00338 static bool createTables(I_Queue *queueP, ExceptionStruct *exception) 00339 { 00340 char queryString[LEN512]; 00341 bool retOk; 00342 const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix; 00343 00344 SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));", 00345 tablePrefix); 00346 retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception); 00347 00348 SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);", 00349 tablePrefix, tablePrefix); 00350 retOk = execSilent(queueP, queryString, "Creating PRIO index", exception); 00351 return retOk; 00352 } 00353 00362 static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception) 00363 { 00364 int rc = 0; 00365 char *errMsg = 0; 00366 bool retOk; 00367 DbInfo *dbInfo = getDbInfo(queueP); 00368 00369 rc = sqlite_exec(dbInfo->db, queryString, NULL, NULL, &errMsg); 00370 switch (rc) { 00371 case SQLITE_OK: 00372 LOG __FILE__, "SQL '%s' success", comment); 00373 retOk = true; 00374 break; 00375 default: 00376 if (errMsg && strstr(errMsg, "already exists")) { 00377 LOG __FILE__, "OK, '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg); 00378 retOk = true; 00379 } 00380 else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) { 00381 LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg); 00382 retOk = true; 00383 } 00384 else { 00385 LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg); 00386 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00387 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00388 "[%.100s:%d] SQL error '%s' [%d]: %s %s", __FILE__, __LINE__, comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg); 00389 retOk = false; 00390 } 00391 break; 00392 } 00393 if (errMsg != 0) sqlite_freemem(errMsg); 00394 return retOk; 00395 } 00396 00397 /* 00398 * This is the callback routine that the SQLite library 00399 * invokes for each row of a query result. 00400 static int callback(void *pArg, int nArg, char **azArg, char **azCol){ 00401 int i; 00402 struct callback_data *p = (struct callback_data*)pArg; 00403 int w = 5; 00404 if (p==0) {} // Suppress compiler warning 00405 if( azArg==0 ) return 0; 00406 for(i=0; i<nArg; i++){ 00407 int len = strlen(azCol[i]); 00408 if( len>w ) w = len; 00409 } 00410 printf("\n"); 00411 for(i=0; i<nArg; i++){ 00412 printf("%*s = %s\n", w, azCol[i], azArg[i] ? azArg[i] : "NULL"); 00413 } 00414 return 0; 00415 } 00416 */ 00417 00423 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception) 00424 { 00425 int rc = 0; 00426 bool stateOk = true; 00427 DbInfo *dbInfo; 00428 char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */ 00429 00430 if (checkArgs(queueP, "put", true, exception) == false ) return; 00431 if (queueEntry == 0) { 00432 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00433 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00434 "[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__); 00435 return; 00436 } 00437 if (queueEntry->uniqueId == 0) { 00438 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00439 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00440 "[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__); 00441 return; 00442 } 00443 if (*queueEntry->embeddedType == 0) { 00444 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00445 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00446 "[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__); 00447 return; 00448 } 00449 strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN); 00450 00451 if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) { 00452 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00453 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00454 "[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__); 00455 return; 00456 } 00457 00458 dbInfo = getDbInfo(queueP); 00459 00460 if ((int64_t)dbInfo->numOfEntries >= dbInfo->prop.maxNumOfEntries) { 00461 strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00462 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00463 "[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries); 00464 return; 00465 } 00466 if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) { 00467 strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00468 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00469 "[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes)); 00470 return; 00471 } 00472 00473 00474 if (dbInfo->pVm_put == 0) { /* Compile prepared query only once */ 00475 char queryString[LEN256]; /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/ 00476 SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?)", dbInfo->prop.tablePrefix); 00477 stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception); 00478 } 00479 00480 if (stateOk) { /* set prepared statement tokens */ 00481 char intStr[INT64_STRLEN_MAX]; 00482 int index = 0; 00483 const int len = -1; /* Calculated by sqlite_bind */ 00484 rc = SQLITE_OK; 00485 00486 int64ToStr(intStr, queueEntry->uniqueId); 00487 /*LOG __FILE__, "put uniqueId as string '%s'", intStr);*/ 00488 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true); 00489 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, len, false); 00490 SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", queueEntry->priority); 00491 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true); 00492 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, embeddedType, len, false); 00493 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", len, false); 00494 SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", (int32_t)queueEntry->embeddedBlob.dataLen); 00495 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true); 00496 if (rc == SQLITE_OK) { 00497 /* As SQLite does only store strings we encode our blob to a string */ 00498 size_t estimatedSize = 2 +(257 * queueEntry->embeddedBlob.dataLen )/254; 00499 unsigned char *out = (unsigned char *)malloc(estimatedSize*sizeof(char)); 00500 int encodedSize = sqlite_encode_binary((const unsigned char *)queueEntry->embeddedBlob.data, 00501 (int)queueEntry->embeddedBlob.dataLen, out); 00502 rc = sqlite_bind(dbInfo->pVm_put, ++index, (const char *)out, encodedSize+1, true); 00503 free(out); 00504 } 00505 00506 if (rc != SQLITE_OK) { 00507 LOG __FILE__, "put(%s) SQL error: %d %s", int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc)); 00508 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00509 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00510 "[%.100s:%d] put(%s) SQL error: %d %s", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc)); 00511 stateOk = false; 00512 } 00513 } 00514 00515 if (stateOk) { /* start the query, process results */ 00516 int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, 0, false, exception); 00517 stateOk = countRows >= 0; 00518 } 00519 00520 if (stateOk) { 00521 dbInfo->numOfEntries += 1; 00522 dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen); 00523 } 00524 00525 LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed"); 00526 } 00527 00528 00539 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, 00540 sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception) 00541 { 00542 int iRetry, numRetry=100; 00543 char *errMsg = 0; 00544 int rc = 0; 00545 const char *pzTail = 0; /* OUT: uncompiled tail of zSql */ 00546 bool stateOk = true; 00547 DbInfo *dbInfo = getDbInfo(queueP); 00548 00549 if (*ppVm == 0) { /* Compile prepared query */ 00550 for (iRetry = 0; iRetry < numRetry; iRetry++) { 00551 rc = sqlite_compile(dbInfo->db, queryString, &pzTail, ppVm, &errMsg); 00552 switch (rc) { 00553 case SQLITE_BUSY: 00554 if (iRetry == (numRetry-1)) { 00555 strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00556 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00557 "[%.100s:%d] SQL error #%d in %s(): %s %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg); 00558 } 00559 LOG __FILE__, "%s() Sleeping as other thread holds DB %s", methodName, (errMsg==0)?"":errMsg); 00560 if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; } 00561 sleepMillis(10); 00562 break; 00563 case SQLITE_OK: 00564 iRetry = numRetry; /* We're done */ 00565 LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString); 00566 if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; } 00567 break; 00568 default: 00569 LOG __FILE__, "SQL error #%d %s in %s(): %s: %s", rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg); 00570 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00571 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00572 "[%.100s:%d] SQL error #%d %s in %s(): %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg); 00573 iRetry = numRetry; /* We're done */ 00574 if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; } 00575 stateOk = false; 00576 break; 00577 } 00578 } 00579 } 00580 if (*ppVm == 0) stateOk = false; 00581 return stateOk; 00582 } 00583 00596 static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, void *userP, 00597 const char **pazValue, const char **pazColName, ExceptionStruct *exception) 00598 { 00599 bool doContinue = true; 00600 int numAssigned; 00601 bool stateOk = true; 00602 int decodeSize = 0; 00603 QueueEntry *queueEntry = 0; 00604 QueueEntryArr *queueEntryArr; 00605 TmpHelper *helper = (TmpHelper*)userP; 00606 QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP; 00607 00608 if (currIndex == 0) { 00609 helper->currEntries = 0; 00610 helper->currBytes = 0; 00611 } 00612 00613 if (*queueEntryArrPP == 0) { 00614 *queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));; 00615 if (helper->maxNumOfEntries == 0) { 00616 doContinue = false; 00617 return doContinue; 00618 } 00619 } 00620 queueEntryArr = *queueEntryArrPP; 00621 00622 if (queueEntryArr->len == 0) { 00623 queueEntryArr->len = 10; 00624 queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry)); 00625 } 00626 else if (currIndex >= queueEntryArr->len) { 00627 queueEntryArr->len += 10; 00628 queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry)); 00629 } 00630 queueEntry = &queueEntryArr->queueEntryArr[currIndex]; 00631 memset(queueEntry, 0, sizeof(QueueEntry)); 00632 00633 stateOk = strToInt64(&queueEntry->uniqueId, pazValue[XB_ENTRIES_DATA_ID]); 00634 if (!stateOk) { 00635 LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' to uniqueId, ignoring entry.", pazValue[XB_ENTRIES_DATA_ID]); 00636 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00637 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00638 "[%.100s:%d] peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, pazValue[XB_ENTRIES_DATA_ID], pazColName[XB_ENTRIES_DATA_ID]); 00639 doContinue = false; 00640 return doContinue; 00641 } 00642 00643 LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex); 00644 /* strncpy0(dbInfo->prop.queueName, pazValue[2], ID_MAX); */ 00645 numAssigned = sscanf(pazValue[XB_ENTRIES_PRIO], "%hd", &queueEntry->priority); 00646 if (numAssigned != 1) { 00647 LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse pazValue[XB_ENTRIES_PRIO] '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_PRIO]); 00648 queueEntry->priority = 4; 00649 } 00650 strncpy0(queueEntry->embeddedType, pazValue[XB_ENTRIES_TYPE_NAME], QUEUE_ENTRY_EMBEDDEDTYPE_LEN); 00651 queueEntry->isPersistent = *pazValue[XB_ENTRIES_PERSISTENT] == 'T' ? true : false; 00652 { 00653 int64_t ival = 0; 00654 stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_SIZE_IN_BYTES]); 00655 queueEntry->embeddedBlob.dataLen = (size_t)ival; 00656 } 00657 00658 /* TODO!!! in Java the length is the size in RAM and not strlen(data) */ 00659 /* queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen*sizeof(char)); */ 00660 queueEntry->embeddedBlob.data = (char *)malloc(strlen(pazValue[XB_ENTRIES_BLOB])*sizeof(char)); /* we spoil some 2 % */ 00661 decodeSize = sqlite_decode_binary((const unsigned char *)pazValue[XB_ENTRIES_BLOB], (unsigned char *)queueEntry->embeddedBlob.data); 00662 if (decodeSize == -1 || (size_t)decodeSize != queueEntry->embeddedBlob.dataLen) { 00663 *(queueEntry->embeddedBlob.data + strlen(pazValue[XB_ENTRIES_BLOB]) - 1) = 0; 00664 LOG __FILE__, "peekWithSamePriority(%s) ERROR: Returned blob encoded='%s', decodeSize=%d" 00665 " but expected decoded len=%d: '%s'", 00666 int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_BLOB], decodeSize, 00667 queueEntry->embeddedBlob.dataLen, queueEntry->embeddedBlob.data); 00668 } 00669 00670 helper->currEntries += 1; 00671 helper->currBytes += queueEntry->embeddedBlob.dataLen; 00672 00673 /* Limit the number of entries */ 00674 if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) || 00675 (helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) { 00676 /* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */ 00677 doContinue = false; 00678 } 00679 00680 return doContinue; 00681 } 00682 00699 static int32_t getResultRows(I_Queue *queueP, const char *methodName, 00700 sqlite_vm *pVm, 00701 ParseDataFp parseDataFp, void *userP, 00702 bool finalize, 00703 ExceptionStruct *exception) 00704 { 00705 char *errMsg = 0; 00706 int32_t currIndex = 0; 00707 int numCol = 0; 00708 const char **pazValue = 0; 00709 const char **pazColName = 0; 00710 bool done = false; 00711 bool stateOk = true; 00712 int rc; 00713 while (!done) { 00714 rc = sqlite_step(pVm, &numCol, &pazValue, &pazColName); 00715 switch( rc ){ 00716 case SQLITE_DONE: 00717 done = true; 00718 break; 00719 case SQLITE_BUSY: 00720 LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName); 00721 sleepMillis(10); 00722 break; 00723 case SQLITE_ROW: 00724 { 00725 bool doContinue = true; 00726 if (parseDataFp) { 00727 /* @return true->to continue, false->to break execution or on error exception->errorCode is not null */ 00728 doContinue = parseDataFp(queueP, currIndex, userP, pazValue, pazColName, exception); 00729 stateOk = *exception->errorCode == 0; 00730 } 00731 else { 00732 /* 00733 printf("RESULT[%d]\n", iRow); 00734 for (iCol = 0; iCol < numCol; iCol++) { 00735 printf("%10.10s = %s\n", pazColName[iCol], pazValue[iCol]); 00736 } 00737 */ 00738 } 00739 currIndex++; 00740 if (!stateOk || !doContinue) done = true; 00741 } 00742 break; 00743 case SQLITE_ERROR: /* If exists already */ 00744 LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc); 00745 done = true; 00746 stateOk = false; 00747 break; 00748 case SQLITE_MISUSE: 00749 default: 00750 LOG __FILE__, "%s() SQL execution problem [sqlCode=%d %s]", methodName, rc, sqlite_error_string(rc)); 00751 done = true; 00752 stateOk = false; 00753 break; 00754 } 00755 } 00756 LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex); 00757 00758 if (finalize) { 00759 sqlite_finalize(pVm, &errMsg); 00760 if (rc != SQLITE_OK && rc != SQLITE_DONE) { 00761 LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled. %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg); 00762 } 00763 if (errMsg != 0) sqlite_freemem(errMsg); 00764 } 00765 else { /* Reset prepared statement */ 00766 rc = sqlite_reset(pVm, &errMsg); 00767 if (rc == SQLITE_SCHEMA) { 00768 LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg); 00769 } 00770 if (errMsg != 0) sqlite_freemem(errMsg); 00771 } 00772 00773 return stateOk ? currIndex : (-1)*rc; 00774 } 00775 00779 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception) 00780 { 00781 int rc = 0; 00782 bool stateOk = true; 00783 DbInfo *dbInfo; 00784 QueueEntryArr *queueEntryArr = 0; 00785 00786 if (checkArgs(queueP, "peekWithSamePriority", true, exception) == false ) return 0; 00787 00788 LOG __FILE__, "peekWithSamePriority(maxNumOfEntries=%d, maxNumOfBytes=%s) ...", (int)maxNumOfEntries, int64ToStr(int64Str, maxNumOfBytes)); 00789 00790 dbInfo = getDbInfo(queueP); 00791 00792 if (dbInfo->pVm_peekWithSamePriority == 0) { /* Compile prepared query */ 00793 char queryString[LEN512]; 00794 /*"SELECT * FROM XB_ENTRIES where queueName='connection_clientJoe' and prio=(select max(prio) from XB_ENTRIES where queueName='connection_clientJoe') ORDER BY dataId ASC";*/ 00795 SNPRINTF(queryString, LEN512, 00796 "SELECT * FROM %.20sENTRIES where queueName=?" 00797 " and prio=(select max(prio) from %.20sENTRIES where queueName=?)" 00798 " ORDER BY dataId ASC", 00799 dbInfo->prop.tablePrefix, dbInfo->prop.tablePrefix); 00800 stateOk = compilePreparedQuery(queueP, "peekWithSamePriority", 00801 &dbInfo->pVm_peekWithSamePriority , queryString, exception); 00802 } 00803 00804 if (stateOk) { /* set prepared statement tokens */ 00805 int index = 0; 00806 int len = -1; /* Calculated by sqlite_bind */ 00807 rc = SQLITE_OK; 00808 00809 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false); 00810 if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false); 00811 00812 switch (rc) { 00813 case SQLITE_OK: 00814 LOG __FILE__, "peekWithSamePriority() Bound to prepared statement [sqlCode=%d]", rc); 00815 break; 00816 default: 00817 LOG __FILE__, "peekWithSamePriority() SQL error: %d %s", rc, sqlite_error_string(rc)); 00818 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00819 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00820 "[%.100s:%d] peekWithSamePriority() SQL error: %d %s", __FILE__, __LINE__, rc, sqlite_error_string(rc)); 00821 stateOk = false; 00822 break; 00823 } 00824 } 00825 00826 if (stateOk) { /* start the query */ 00827 TmpHelper helper; 00828 int32_t currIndex = 0; 00829 helper.queueEntryArrPP = &queueEntryArr; 00830 helper.maxNumOfEntries = maxNumOfEntries; 00831 helper.maxNumOfBytes = maxNumOfBytes; 00832 currIndex = getResultRows(queueP, "peekWithSamePriority", 00833 dbInfo->pVm_peekWithSamePriority, parseQueueEntryArr, 00834 &helper, false, exception); 00835 stateOk = currIndex >= 0; 00836 if (!stateOk) { 00837 if (queueEntryArr) { 00838 free(queueEntryArr->queueEntryArr); 00839 queueEntryArr->len = 0; 00840 } 00841 } 00842 else { 00843 if (!queueEntryArr) 00844 queueEntryArr = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr)); 00845 else if ((size_t)currIndex < queueEntryArr->len) { 00846 queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, currIndex * sizeof(QueueEntry)); 00847 queueEntryArr->len = currIndex; 00848 } 00849 } 00850 } 00851 00852 LOG __FILE__, "peekWithSamePriority() %s", stateOk ? "done" : "failed"); 00853 return queueEntryArr; 00854 } 00855 00860 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception) 00861 { 00862 bool stateOk = true; 00863 int64_t numOfBytes = 0; 00864 int32_t countDeleted = 0; 00865 sqlite_vm *pVm = 0; 00866 DbInfo *dbInfo; 00867 if (checkArgs(queueP, "randomRemove", true, exception) == false || queueEntryArr == 0 || 00868 queueEntryArr->len == 0 || queueEntryArr->queueEntryArr == 0) 00869 return 0; 00870 00871 LOG __FILE__, "randomRemove(%d) ...", (int)queueEntryArr->len); 00872 00873 dbInfo = getDbInfo(queueP); 00874 00875 { 00876 size_t i; 00877 const size_t qLen = 128 + 2*ID_MAX + queueEntryArr->len*(INT64_STRLEN_MAX+6); 00878 char *queryString = (char *)calloc(qLen, sizeof(char)); 00879 /* DELETE FROM xb_entries WHERE queueName = 'connection_clientJoe' AND dataId in ( 1081492136876000000, 1081492136856000000 ); */ 00880 SNPRINTF(queryString, qLen, 00881 "DELETE FROM %.20sENTRIES WHERE queueName='%s'" 00882 " AND dataId in ( ", 00883 dbInfo->prop.tablePrefix, dbInfo->prop.queueName); 00884 00885 for (i=0; i<queueEntryArr->len; i++) { 00886 strcat(queryString, int64ToStr(int64Str, queueEntryArr->queueEntryArr[i].uniqueId)); 00887 if (i<(queueEntryArr->len-1)) strcat(queryString, ","); 00888 numOfBytes += ((queueEntryArr->queueEntryArr[i].sizeInBytes > 0) ? queueEntryArr->queueEntryArr[i].sizeInBytes : queueEntryArr->queueEntryArr[i].embeddedBlob.dataLen); 00889 } 00890 strcat(queryString, " )"); 00891 stateOk = compilePreparedQuery(queueP, "randomRemove", &pVm, queryString, exception); 00892 free(queryString); 00893 } 00894 00895 00896 if (stateOk) { /* start the query */ 00897 int32_t currIndex = getResultRows(queueP, "randomRemove", 00898 pVm, 0, 0, true, exception); 00899 stateOk = currIndex >= 0; 00900 } 00901 00902 if (stateOk) { 00903 countDeleted = (int32_t)sqlite_last_statement_changes(dbInfo->db); 00904 if (countDeleted < 0 || (size_t)countDeleted != queueEntryArr->len) { 00905 fillCache(queueP, exception); /* calculate numOfBytes again */ 00906 } 00907 else { 00908 dbInfo->numOfEntries -= queueEntryArr->len; 00909 dbInfo->numOfBytes -= numOfBytes; 00910 } 00911 } 00912 00913 return countDeleted; 00914 } 00915 00919 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception) 00920 { 00921 bool stateOk = true; 00922 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP; 00923 if (checkArgs(queueP, "destroy", false, exception) == false ) return false; 00924 00925 shutdownInternal(queuePP, exception); 00926 00927 { 00928 DbInfo *dbInfo = getDbInfo(queueP); 00929 const char *dbName = dbInfo->prop.dbName; 00930 stateOk = unlink(dbName) == 0; /* Delete old db file */ 00931 if (!stateOk) { 00932 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00933 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00934 "[%.100s:%d] destroy() ERROR: Can't destroy database '%s', errno=%d.", __FILE__, __LINE__, dbName, errno); 00935 } 00936 } 00937 00938 freeQueue(queuePP); 00939 00940 return stateOk; 00941 } 00942 00946 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception) 00947 { 00948 int stateOk = true; 00949 char queryString[LEN256]; 00950 sqlite_vm *pVm = 0; 00951 DbInfo *dbInfo; 00952 if (checkArgs(queueP, "clear", true, exception) == false) return false; 00953 dbInfo = getDbInfo(queueP); 00954 00955 SNPRINTF(queryString, LEN256, "DELETE FROM %.20sENTRIES", dbInfo->prop.tablePrefix); 00956 stateOk = compilePreparedQuery(queueP, "clear", &pVm, queryString, exception); 00957 00958 if (stateOk) { 00959 int32_t currIndex = getResultRows(queueP, "clear", pVm, 0, 0, true, exception); 00960 stateOk = currIndex >= 0; 00961 } 00962 00963 if (stateOk) { 00964 dbInfo->numOfEntries = 0; 00965 dbInfo->numOfBytes = 0; 00966 } 00967 00968 LOG __FILE__, "clear() done"); 00969 return stateOk; 00970 } 00971 00975 static bool parseCacheInfo(I_Queue *queueP, size_t currIndex, void *userP, 00976 const char **pazValue, const char **pazColName, ExceptionStruct *exception) 00977 { 00978 int64_t ival = 0; 00979 bool stateOk; 00980 DbInfo *dbInfo = getDbInfo(queueP); 00981 00982 stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_DATA_ID]); 00983 if (!stateOk) { 00984 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00985 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00986 "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfEntries, ignoring entry.", __FILE__, __LINE__, pazColName[XB_ENTRIES_DATA_ID], pazValue[XB_ENTRIES_DATA_ID]); 00987 return false; 00988 } 00989 dbInfo->numOfEntries = (int32_t)ival; 00990 00991 stateOk = strToInt64(&dbInfo->numOfBytes, pazValue[1]); 00992 if (!stateOk) { 00993 strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); 00994 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 00995 "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfBytes, ignoring entry.", __FILE__, __LINE__, pazColName[1], pazValue[1]); 00996 if (currIndex) {} /* Just to avoid compiler warning about unused variable */ 00997 if (userP) {}; 00998 return false; 00999 } 01000 01001 return true; 01002 } 01003 01010 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception) 01011 { 01012 bool stateOk = true; 01013 DbInfo *dbInfo = 0; 01014 01015 char queryString[LEN512]; /* "SELECT count(dataId) FROM XB_ENTRIES where queueName='connection_clientJoe'" */ 01016 01017 if (checkArgs(queueP, "fillCache", true, exception) == false ) return true; 01018 dbInfo = getDbInfo(queueP); 01019 01020 SNPRINTF(queryString, LEN512, 01021 "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'", 01022 dbInfo->prop.tablePrefix, dbInfo->prop.queueName); 01023 stateOk = compilePreparedQuery(queueP, "fillCache", 01024 &dbInfo->pVm_fillCache, queryString, exception); 01025 01026 if (stateOk) { /* start the query, calls parseCacheInfo() */ 01027 int32_t currIndex = getResultRows(queueP, "fillCache", 01028 dbInfo->pVm_fillCache, parseCacheInfo, 01029 0, false, exception); 01030 stateOk = currIndex > 0; 01031 } 01032 01033 LOG __FILE__, "fillCache() numOfEntries=%d numOfBytes=%s", dbInfo->numOfEntries, int64ToStr(int64Str, dbInfo->numOfBytes)); 01034 return stateOk; 01035 } 01036 01037 static bool persistentQueueEmpty(I_Queue *queueP) 01038 { 01039 return getNumOfEntries(queueP) <= 0; 01040 } 01041 01042 static int32_t getNumOfEntries(I_Queue *queueP) 01043 { 01044 DbInfo *dbInfo; 01045 bool stateOk = true; 01046 ExceptionStruct exception; 01047 if (checkArgs(queueP, "getNumOfEntries", false, &exception) == false ) return -1; 01048 dbInfo = getDbInfo(queueP); 01049 if (dbInfo->numOfEntries == -1) { 01050 stateOk = fillCache(queueP, &exception); 01051 } 01052 return (stateOk) ? (int32_t)dbInfo->numOfEntries : -1; 01053 } 01054 01055 static int32_t getMaxNumOfEntries(I_Queue *queueP) 01056 { 01057 DbInfo *dbInfo; 01058 ExceptionStruct exception; 01059 if (checkArgs(queueP, "getMaxNumOfEntries", false, &exception) == false ) return -1; 01060 dbInfo = getDbInfo(queueP); 01061 return dbInfo->prop.maxNumOfEntries; 01062 } 01063 01064 static int64_t getNumOfBytes(I_Queue *queueP) 01065 { 01066 DbInfo *dbInfo; 01067 ExceptionStruct exception; 01068 bool stateOk = true; 01069 if (checkArgs(queueP, "getNumOfBytes", false, &exception) == false ) return -1; 01070 dbInfo = getDbInfo(queueP); 01071 if (dbInfo->numOfBytes == -1) { 01072 stateOk = fillCache(queueP, &exception); 01073 } 01074 return (stateOk) ? dbInfo->numOfBytes : -1; 01075 } 01076 01077 static int64_t getMaxNumOfBytes(I_Queue *queueP) 01078 { 01079 DbInfo *dbInfo; 01080 ExceptionStruct exception; 01081 if (checkArgs(queueP, "getMaxNumOfBytes", false, &exception) == false ) return -1; 01082 dbInfo = getDbInfo(queueP); 01083 return dbInfo->prop.maxNumOfBytes; 01084 } 01085 01090 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception) 01091 { 01092 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP; 01093 if (checkArgs(queueP, "shutdown", false, exception) == false ) return; 01094 shutdownInternal(queuePP, exception); 01095 freeQueue(queuePP); 01096 } 01097 01101 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception) 01102 { 01103 I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP; 01104 if (checkArgs(queueP, "shutdown", false, exception) == false ) return; 01105 { 01106 DbInfo *dbInfo = getDbInfo(queueP); 01107 queueP->isInitialized = false; 01108 if(dbInfo) { 01109 if (dbInfo->pVm_put) { 01110 char *errMsg = 0; 01111 /*int rc =*/ sqlite_finalize(dbInfo->pVm_put, &errMsg); 01112 if (errMsg != 0) sqlite_freemem(errMsg); 01113 dbInfo->pVm_put = 0; 01114 } 01115 if (dbInfo->pVm_peekWithSamePriority) { 01116 char *errMsg = 0; 01117 sqlite_finalize(dbInfo->pVm_peekWithSamePriority, &errMsg); 01118 if (errMsg != 0) sqlite_freemem(errMsg); 01119 dbInfo->pVm_peekWithSamePriority = 0; 01120 } 01121 if (dbInfo->pVm_fillCache) { 01122 char *errMsg = 0; 01123 sqlite_finalize(dbInfo->pVm_fillCache, &errMsg); 01124 if (errMsg != 0) sqlite_freemem(errMsg); 01125 dbInfo->pVm_fillCache = 0; 01126 } 01127 if (dbInfo->db) { 01128 sqlite_close(dbInfo->db); 01129 dbInfo->db = 0; 01130 } 01131 LOG __FILE__, "shutdown() done"); 01132 } 01133 } 01134 } 01135 01140 Dll_Export void freeQueueEntryArr(QueueEntryArr *queueEntryArr) 01141 { 01142 if (queueEntryArr == (QueueEntryArr *)0) return; 01143 freeQueueEntryArrInternal(queueEntryArr); 01144 free(queueEntryArr); 01145 } 01146 01151 Dll_Export void freeQueueEntryArrInternal(QueueEntryArr *queueEntryArr) 01152 { 01153 size_t i; 01154 if (queueEntryArr == (QueueEntryArr *)0) return; 01155 for (i=0; i<queueEntryArr->len; i++) { 01156 freeQueueEntryData(&queueEntryArr->queueEntryArr[i]); 01157 } 01158 free(queueEntryArr->queueEntryArr); 01159 queueEntryArr->len = 0; 01160 } 01161 01165 static void freeQueueEntryData(QueueEntry *queueEntry) 01166 { 01167 if (queueEntry == (QueueEntry *)0) return; 01168 if (queueEntry->embeddedBlob.data != 0) { 01169 free((char *)queueEntry->embeddedBlob.data); 01170 queueEntry->embeddedBlob.data = 0; 01171 } 01172 queueEntry->embeddedBlob.dataLen = 0; 01173 } 01174 01179 Dll_Export void freeQueueEntry(QueueEntry *queueEntry) 01180 { 01181 if (queueEntry == (QueueEntry *)0) return; 01182 freeQueueEntryData(queueEntry); 01183 free(queueEntry); 01184 } 01185 01194 Dll_Export char *queueEntryToXml(QueueEntry *queueEntry, int maxContentDumpLen) 01195 { 01196 if (queueEntry == (QueueEntry *)0) return 0; 01197 { 01198 char *contentStr = strFromBlobAlloc(queueEntry->embeddedBlob.data, queueEntry->embeddedBlob.dataLen); 01199 const size_t blobLen = (maxContentDumpLen >= 0) ? maxContentDumpLen : queueEntry->embeddedBlob.dataLen; 01200 const size_t len = 200 + QUEUE_ENTRY_EMBEDDEDTYPE_LEN + blobLen; 01201 char *xml = (char *)calloc(len, sizeof(char)); 01202 if (xml == 0) { 01203 free(contentStr); 01204 return 0; 01205 } 01206 if (maxContentDumpLen == 0) 01207 *contentStr = 0; 01208 else if (maxContentDumpLen > 0 && queueEntry->embeddedBlob.dataLen > 5 && 01209 (size_t)maxContentDumpLen < (queueEntry->embeddedBlob.dataLen-5)) 01210 strcpy(contentStr+maxContentDumpLen, " ..."); 01211 01212 SNPRINTF(xml, len, "\n <QueueEntry id='%s' priority='%hd' persistent='%s' type='%s'>" 01213 "\n <content size='%lu'><![CDATA[%s]]></content>" 01214 "\n <QueueEntry>", 01215 int64ToStr(int64Str, queueEntry->uniqueId), queueEntry->priority, 01216 queueEntry->isPersistent?"true":"false", 01217 queueEntry->embeddedType, 01218 (unsigned long)queueEntry->embeddedBlob.dataLen, contentStr); 01219 free(contentStr); 01220 return xml; 01221 } 01222 } 01223 01224 Dll_Export void freeEntryDump(char *entryDump) 01225 { 01226 if (entryDump) free(entryDump); 01227 } 01228 01238 static bool checkArgs(I_Queue *queueP, const char *methodName, 01239 bool checkIsConnected, ExceptionStruct *exception) 01240 { 01241 if (queueP == 0) { 01242 if (exception == 0) { 01243 printf("[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to %s()\n", 01244 __FILE__, __LINE__, methodName); 01245 } 01246 else { 01247 strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); 01248 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 01249 "[%.100s:%d] Please provide a valid I_Queue pointer to %.16s()", 01250 __FILE__, __LINE__, methodName); 01251 LOG __FILE__, "%s: %s", exception->errorCode, exception->message); 01252 } 01253 return false; 01254 } 01255 01256 if (exception == 0) { 01257 LOG __FILE__, "[%s:%d] Please provide valid exception pointer to %s()", __FILE__, __LINE__, methodName); 01258 return false; 01259 } 01260 01261 if (checkIsConnected) { 01262 if (queueP->privateObject==0 || 01263 ((DbInfo *)(queueP->privateObject))->db==0 || 01264 !queueP->isInitialized) { 01265 strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN); 01266 SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, 01267 "[%.100s:%d] Not connected to database, %s() failed", 01268 __FILE__, __LINE__, methodName); 01269 LOG __FILE__, "%s: %s", exception->errorCode, exception->message); 01270 return false; 01271 } 01272 } 01273 01274 initializeExceptionStruct(exception); 01275 01276 LOG __FILE__, "%s() entering ...", methodName); 01277 01278 return true; 01279 } 01280 01281 /*=================== TESTCODE =======================*/ 01282 # ifdef QUEUE_MAIN 01283 /* 01284 NOTE: 01285 This code may be totally outdated, for current examples please use: 01286 xmlBlaster/testsuite/src/c/TestQueue.c 01287 */ 01288 #include <stdio.h> 01289 static void testRun(int argc, char **argv) { 01290 ExceptionStruct exception; 01291 QueueEntryArr *entries = 0; 01292 QueueProperties queueProperties; 01293 I_Queue *queueP = 0; 01294 01295 memset(&queueProperties, 0, sizeof(QueueProperties)); 01296 strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX); 01297 strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX); 01298 strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX); 01299 queueProperties.maxNumOfEntries = 10000000L; 01300 queueProperties.maxNumOfBytes = 1000000000LL; 01301 queueProperties.logFp = xmlBlasterDefaultLogging; 01302 queueProperties.logLevel = XMLBLASTER_LOG_TRACE; 01303 queueProperties.userObject = 0; 01304 01305 queueP = createQueue(&queueProperties, &exception); 01306 /* DbInfo *dbInfo = (DbInfo *)queueP->privateObject; */ 01307 if (argc || argv) {} /* to avoid compiler warning */ 01308 01309 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false"); 01310 01311 { 01312 int64_t idArr[] = { 1081492136826000000ll, 1081492136856000000ll, 1081492136876000000ll }; 01313 int16_t prioArr[] = { 5 , 1 , 5 }; 01314 char *data[] = { "Hello" , " World" , "!!!" }; 01315 size_t i; 01316 for (i=0; i<sizeof(idArr)/sizeof(int64_t); i++) { 01317 QueueEntry queueEntry; 01318 memset(&queueEntry, 0, sizeof(QueueEntry)); 01319 queueEntry.priority = prioArr[i]; 01320 queueEntry.isPersistent = true; 01321 queueEntry.uniqueId = idArr[i]; 01322 strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN); 01323 queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0; 01324 queueEntry.embeddedBlob.data = data[i]; 01325 queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data); 01326 01327 queueP->put(queueP, &queueEntry, &exception); 01328 if (*exception.errorCode != 0) { 01329 LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message); 01330 } 01331 } 01332 } 01333 01334 entries = queueP->peekWithSamePriority(queueP, -1, 6, &exception); 01335 if (*exception.errorCode != 0) { 01336 LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message); 01337 } 01338 if (entries != 0) { 01339 size_t i; 01340 printf("testRun after peekWithSamePriority() dump %lu entries:\n", (unsigned long)entries->len); 01341 for (i=0; i<entries->len; i++) { 01342 QueueEntry *queueEntry = &entries->queueEntryArr[i]; 01343 char *dump = queueEntryToXml(queueEntry, 200); 01344 printf("%s\n", dump); 01345 free(dump); 01346 } 01347 } 01348 01349 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false"); 01350 queueP->randomRemove(queueP, entries, &exception); 01351 if (*exception.errorCode != 0) { 01352 LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message); 01353 } 01354 01355 freeQueueEntryArr(entries); 01356 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false"); 01357 01358 queueP->clear(queueP, &exception); 01359 printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false"); 01360 01361 queueP->shutdown(&queueP, &exception); 01362 } 01363 01364 int main(int argc, char **argv) { 01365 int i; 01366 for (i=0; i<1; i++) { 01367 testRun(argc, argv); 01368 } 01369 return 0; 01370 } 01371 #endif /*QUEUE_MAIN*/ 01372 /*=================== TESTCODE =======================*/ 01373