util/queue/SQLiteQueue.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      SQLiteQueue.c
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   A persistent queue implementation based on the SQLite relational database
00006            Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h)
00007            and can easily be used outside of xmlBlaster.
00008            Further you need sqlite.h and the sqlite library (dll,so,sl)
00009 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
00010 Date:      04/2004
00011 Compile:   Compiles at least on Windows, Linux, Solaris. Further porting should be simple.
00012            Needs pthread.h but not the pthread library (for exact times)
00013 
00014             export LD_LIBRARY_PATH=/opt/sqlite-bin/lib
00015             gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLiteQueue SQLiteQueue.c ../helper.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite
00016             (use optionally  -ansi -pedantic -Wno-long-long
00017             (Intel C: icc -wd981 ...)
00018 
00019            Compile inside xmlBlaster:
00020             build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c
00021            expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so
00022 
00023 
00024            Compile Test main()
00025             Replace /I\c\sqlite for your needs ( says where sqlite.h resides ):
00026                                 and \pialibs\sqlite.lib as well ( sqlite.lib is created from sqlite.def via lib /DEF:sqlite.def)
00027            cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /D_WINDOWS /I\c\sqlite /I..\.. SqliteQueue.c ..\helper.c /link \pialibs\sqlite.lib
00028 
00029 
00030 Table layout XB_ENTRIES:
00031            dataId bigint
00032            queueName text
00033            prio integer
00034            flag text
00035            durable char(1)
00036            byteSize bigint
00037            blob bytea
00038            PRIMARY KEY (dataId, queueName)
00039 
00040 Todo:      Tuning:
00041             - Add prio to PRIMARY KEY
00042             - In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes
00043 
00044 @see:      http://www.sqlite.org/
00045 @see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html
00046 @see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.html
00047 Testsuite: xmlBlaster/testsuite/src/c/TestQueue.c
00048 -----------------------------------------------------------------------------*/
00049 #include <stdio.h>
00050 #include <string.h>
00051 #include <malloc.h>
00052 #if !defined(_WINDOWS)
00053 # include <unistd.h>   /* unlink() */
00054 # include <errno.h>    /* unlink() */
00055 #endif
00056 #include "util/queue/QueueInterface.h"
00057 #include "sqlite.h"
00058 
00059 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception);
00060 static const QueueProperties *getProperties(I_Queue *queueP);
00061 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception);
00062 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception);
00063 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception);
00064 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception);
00065 static int32_t getNumOfEntries(I_Queue *queueP);
00066 static int32_t getMaxNumOfEntries(I_Queue *queueP);
00067 static int64_t getNumOfBytes(I_Queue *queueP);
00068 static int64_t getMaxNumOfBytes(I_Queue *queueP);
00069 static bool persistentQueueEmpty(I_Queue *queueP);
00070 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception);
00071 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception);
00072 static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception);
00073 static bool createTables(I_Queue *queueP, ExceptionStruct *exception);
00074 static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception);
00075 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception);
00076 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception);
00077 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception);
00078 static void freeQueueEntryData(QueueEntry *queueEntry);
00079 
00085 typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, void *userP,
00086                                const char **pazValue, const char **pazColName, ExceptionStruct *exception);
00087 static int32_t getResultRows(I_Queue *queueP, const char *methodName,
00088                              sqlite_vm *pVm, 
00089                              ParseDataFp parseDataFp, void *userP, bool finalize,
00090                              ExceptionStruct *exception);
00091 
00092 /* Shortcut for:
00093     if (queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created");
00094    is
00095     LOG __FILE__, "Persistent queue is created");
00096 */
00097 #define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE, 
00098 
00099 #define LEN512 512  /* ISO C90 forbids variable-size array: const int LEN512=512; */
00100 #define LEN256 256  /* ISO C90 forbids variable-size array: const int LEN256=256; */
00101 
00102 #define DBNAME_MAX 128
00103 #define ID_MAX 256
00104 
00108 typedef struct DbInfoStruct {
00109    QueueProperties prop;         
00110    size_t numOfEntries;          
00111    int64_t numOfBytes;           
00112    sqlite *db;                   
00113    sqlite_vm *pVm_put;           
00114    sqlite_vm *pVm_peekWithSamePriority;
00115    sqlite_vm *pVm_fillCache;
00116 } DbInfo;
00117 
00121 typedef struct {
00122    QueueEntryArr **queueEntryArrPP;
00123    int32_t currEntries;
00124    int64_t currBytes;
00125    int32_t maxNumOfEntries; 
00126    int64_t maxNumOfBytes;   
00127 } TmpHelper;
00128 
00129 static char int64Str_[INT64_STRLEN_MAX];
00130 static char * const int64Str = int64Str_;   /* to make the pointer address const */
00131 
00133 enum {
00134    XB_ENTRIES_DATA_ID = 0,
00135    XB_ENTRIES_QUEUE_NAME,
00136    XB_ENTRIES_PRIO,
00137    XB_ENTRIES_TYPE_NAME,
00138    XB_ENTRIES_PERSISTENT,
00139    XB_ENTRIES_SIZE_IN_BYTES,
00140    XB_ENTRIES_BLOB
00141 };
00142 
00143 
00151 Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception)
00152 {
00153    bool stateOk = true;
00154    I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue));
00155    if (queueP == 0) return queueP;
00156    queueP->isInitialized = false;
00157    queueP->initialize = persistentQueueInitialize;
00158    queueP->getProperties = getProperties;
00159    queueP->put = persistentQueuePut;
00160    queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority;
00161    queueP->randomRemove = persistentQueueRandomRemove;
00162    queueP->clear = persistentQueueClear;
00163    queueP->getNumOfEntries = getNumOfEntries;
00164    queueP->getMaxNumOfEntries = getMaxNumOfEntries;
00165    queueP->getNumOfBytes = getNumOfBytes;
00166    queueP->getMaxNumOfBytes = getMaxNumOfBytes;
00167    queueP->empty = persistentQueueEmpty;
00168    queueP->shutdown = persistentQueueShutdown;
00169    queueP->destroy = persistentQueueDestroy;
00170    queueP->privateObject = calloc(1, sizeof(DbInfo));
00171    {
00172       DbInfo *dbInfo = (DbInfo *)queueP->privateObject;
00173       dbInfo->numOfEntries = -1;
00174       dbInfo->numOfBytes = -1;
00175    }
00176    stateOk = queueP->initialize(queueP, queueProperties, exception);
00177    if (stateOk) {
00178       LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created");
00179    }
00180    else {
00181       ExceptionStruct ex;
00182       queueP->shutdown(&queueP, &ex);
00183       if (*ex.errorCode != 0) {
00184          embedException(exception, ex.errorCode, ex.message, exception);
00185       }
00186       queueP = 0;
00187    }
00188    return queueP;
00189 }
00190 
00192 static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) {
00193    return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject);
00194 }
00195 
00201 static const QueueProperties *getProperties(I_Queue *queueP)
00202 {
00203    ExceptionStruct exception;
00204    if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0;
00205    return &getDbInfo(queueP)->prop;
00206 }
00207 
00210 static void freeQueue(I_Queue **queuePP)
00211 {
00212    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
00213    if (queueP == 0) {
00214       fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__);
00215       return;
00216    }
00217 
00218    LOG __FILE__, "freeQueue() called");
00219 
00220    if (queueP->privateObject) {
00221       free(queueP->privateObject);
00222       queueP->privateObject = 0;
00223    }
00224 
00225    free(queueP);
00226    *queuePP = 0;
00227 }
00228 
00236 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception)
00237 {
00238    char *errMsg = 0;
00239    bool retOk;
00240    const int OPEN_RW = 0;
00241    sqlite *db;
00242    DbInfo *dbInfo;
00243 
00244    if (checkArgs(queueP, "initialize", false, exception) == false ) return false;
00245    if (queueProperties == 0) {
00246       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00247       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00248                "[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__);
00249       /* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */
00250       fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message);
00251       return false;
00252    }
00253 
00254    queueP->log = queueProperties->logFp;
00255    queueP->logLevel = queueProperties->logLevel;
00256    queueP->userObject = queueProperties->userObject;
00257 
00258    if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 ||
00259        queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) {
00260       char dbName[QUEUE_DBNAME_MAX];
00261       char queueName[QUEUE_ID_MAX];
00262       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00263       if (queueProperties->dbName == 0)
00264          strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX);
00265       else
00266          strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX);
00267       if (queueProperties->queueName == 0)
00268          strncpy0(queueName, "NULL", QUEUE_ID_MAX);
00269       else
00270          strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX);
00271       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00272                "[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s',"
00273                " maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__,
00274                dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes);
00275       LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
00276       return false;
00277    }
00278 
00279    dbInfo = getDbInfo(queueP);
00280    memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties));
00281 
00282    /* Never trust a queue property you haven't overflowed yourself :-) */
00283    dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0;
00284    dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0;
00285    dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0;
00286 
00287    LOG __FILE__, "dbName          = %s", dbInfo->prop.dbName);
00288    LOG __FILE__, "queueName       = %s", dbInfo->prop.queueName);
00289    LOG __FILE__, "tablePrefix     = %s", dbInfo->prop.tablePrefix);
00290    LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries);
00291    LOG __FILE__, "maxNumOfBytes   = %ld",(long)dbInfo->prop.maxNumOfBytes);
00292    /*LOG __FILE__, "logFp           = %d", (int)dbInfo->prop.logFp);*/
00293    LOG __FILE__, "logLevel        = %d", (int)dbInfo->prop.logLevel);
00294    /*LOG __FILE__, "userObject      = %d", (void*)dbInfo->prop.userObject);*/
00295 
00296    db = sqlite_open(dbInfo->prop.dbName, OPEN_RW, &errMsg);
00297    dbInfo->db = db;
00298 
00299    if (db==0) {
00300       queueP->isInitialized = false;
00301       if(queueP->log) {
00302          if (errMsg) {
00303             LOG __FILE__, "%s", errMsg);
00304          }
00305          else {
00306             LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName);
00307          }
00308       }
00309       else {
00310         if (errMsg)
00311            fprintf(stderr,"[%s] %s\n", __FILE__, errMsg);
00312         else
00313            fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName);
00314       }
00315       strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00316       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00317                "[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg);
00318       if (errMsg != 0) sqlite_freemem(errMsg);
00319       return false;
00320    }
00321 
00322    queueP->isInitialized = true;
00323 
00324    retOk = createTables(queueP, exception);
00325 
00326    fillCache(queueP, exception);
00327 
00328    LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed");
00329    return true;
00330 }
00331 
00338 static bool createTables(I_Queue *queueP, ExceptionStruct *exception)
00339 {
00340    char queryString[LEN512];
00341    bool retOk;
00342    const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix;
00343 
00344    SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));",
00345            tablePrefix);
00346    retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception);
00347 
00348    SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);",
00349            tablePrefix, tablePrefix);
00350    retOk = execSilent(queueP, queryString, "Creating PRIO index", exception);
00351    return retOk;
00352 }
00353 
00362 static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception)
00363 {
00364    int rc = 0;
00365    char *errMsg = 0;
00366    bool retOk;
00367    DbInfo *dbInfo = getDbInfo(queueP);
00368 
00369    rc = sqlite_exec(dbInfo->db, queryString, NULL, NULL, &errMsg);
00370    switch (rc) {
00371       case SQLITE_OK:
00372          LOG __FILE__, "SQL '%s' success", comment);
00373          retOk = true;
00374          break;
00375       default:
00376          if (errMsg && strstr(errMsg, "already exists")) {
00377             LOG __FILE__, "OK, '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
00378             retOk = true;
00379          }
00380          else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) {
00381             LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
00382             retOk = true;
00383          }
00384          else {
00385             LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
00386             strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00387             SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00388                      "[%.100s:%d] SQL error '%s' [%d]: %s %s", __FILE__, __LINE__, comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
00389             retOk = false;
00390          }
00391          break;
00392    }
00393    if (errMsg != 0) sqlite_freemem(errMsg);
00394    return retOk;
00395 }
00396 
00397 /*
00398  * This is the callback routine that the SQLite library
00399  * invokes for each row of a query result.
00400 static int callback(void *pArg, int nArg, char **azArg, char **azCol){
00401    int i;
00402    struct callback_data *p = (struct callback_data*)pArg;
00403    int w = 5;
00404    if (p==0) {} // Suppress compiler warning
00405    if( azArg==0 ) return 0;
00406    for(i=0; i<nArg; i++){
00407       int len = strlen(azCol[i]);
00408       if( len>w ) w = len;
00409    }
00410    printf("\n");
00411    for(i=0; i<nArg; i++){
00412       printf("%*s = %s\n", w, azCol[i], azArg[i] ? azArg[i] : "NULL");
00413    }
00414   return 0;
00415 }
00416 */
00417 
00423 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception)
00424 {
00425    int rc = 0;
00426    bool stateOk = true;
00427    DbInfo *dbInfo;
00428    char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */
00429 
00430    if (checkArgs(queueP, "put", true, exception) == false ) return;
00431    if (queueEntry == 0) {
00432       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00433       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00434                "[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__);
00435       return;
00436    }
00437    if (queueEntry->uniqueId == 0) {
00438       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00439       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00440                "[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__);
00441       return;
00442    }
00443    if (*queueEntry->embeddedType == 0) {
00444       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00445       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00446                "[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__);
00447       return;
00448    }
00449    strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
00450 
00451    if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) {
00452       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00453       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00454                "[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__);
00455       return;
00456    }
00457 
00458    dbInfo = getDbInfo(queueP);
00459 
00460    if ((int64_t)dbInfo->numOfEntries >= dbInfo->prop.maxNumOfEntries) {
00461       strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00462       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00463                "[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries);
00464       return;
00465    }
00466    if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) {
00467       strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00468       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00469                "[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes));
00470       return;
00471    }
00472 
00473 
00474    if (dbInfo->pVm_put == 0) {  /* Compile prepared query only once */
00475       char queryString[LEN256];    /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/
00476       SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?)", dbInfo->prop.tablePrefix);
00477       stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception);
00478    }
00479 
00480    if (stateOk) { /* set prepared statement tokens */
00481       char intStr[INT64_STRLEN_MAX];
00482       int index = 0;
00483       const int len = -1; /* Calculated by sqlite_bind */
00484       rc = SQLITE_OK;
00485 
00486       int64ToStr(intStr, queueEntry->uniqueId);
00487       /*LOG __FILE__, "put uniqueId as string '%s'", intStr);*/
00488       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
00489       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, len, false);
00490       SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", queueEntry->priority);
00491       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
00492       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, embeddedType, len, false);
00493       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", len, false);
00494       SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", (int32_t)queueEntry->embeddedBlob.dataLen);
00495       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
00496       if (rc == SQLITE_OK) {
00497          /* As SQLite does only store strings we encode our blob to a string */
00498          size_t estimatedSize = 2 +(257 * queueEntry->embeddedBlob.dataLen )/254;
00499          unsigned char *out = (unsigned char *)malloc(estimatedSize*sizeof(char));
00500          int encodedSize = sqlite_encode_binary((const unsigned char *)queueEntry->embeddedBlob.data,
00501                               (int)queueEntry->embeddedBlob.dataLen, out);
00502          rc = sqlite_bind(dbInfo->pVm_put, ++index, (const char *)out, encodedSize+1, true);
00503          free(out);
00504       }
00505 
00506       if (rc != SQLITE_OK) {
00507          LOG __FILE__, "put(%s) SQL error: %d %s", int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc));
00508          strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00509          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00510                   "[%.100s:%d] put(%s) SQL error: %d %s", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc));
00511          stateOk = false;
00512       }
00513    }
00514 
00515    if (stateOk) { /* start the query, process results */
00516       int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, 0, false, exception);
00517       stateOk = countRows >= 0;
00518    }
00519 
00520    if (stateOk) {
00521       dbInfo->numOfEntries += 1;
00522       dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen);
00523    }
00524 
00525    LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed");
00526 }
00527 
00528 
00539 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName,
00540                     sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception)
00541 {
00542    int iRetry, numRetry=100;
00543    char *errMsg = 0;
00544    int rc = 0;
00545    const char *pzTail = 0;   /* OUT: uncompiled tail of zSql */
00546    bool stateOk = true;
00547    DbInfo *dbInfo = getDbInfo(queueP);
00548 
00549    if (*ppVm == 0) {  /* Compile prepared  query */
00550       for (iRetry = 0; iRetry < numRetry; iRetry++) {
00551          rc = sqlite_compile(dbInfo->db, queryString, &pzTail, ppVm, &errMsg);
00552          switch (rc) {
00553             case SQLITE_BUSY:
00554                if (iRetry == (numRetry-1)) {
00555                   strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00556                   SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00557                            "[%.100s:%d] SQL error #%d in %s(): %s %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
00558                }
00559                LOG __FILE__, "%s() Sleeping as other thread holds DB %s", methodName, (errMsg==0)?"":errMsg);
00560                if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
00561                sleepMillis(10);
00562                break;
00563             case SQLITE_OK:
00564                iRetry = numRetry; /* We're done */
00565                LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString);
00566                if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
00567                break;
00568             default:
00569                LOG __FILE__, "SQL error #%d %s in %s(): %s: %s", rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
00570                strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00571                SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00572                         "[%.100s:%d] SQL error #%d %s in %s(): %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
00573                iRetry = numRetry; /* We're done */
00574                if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
00575                stateOk = false;
00576                break;
00577          }
00578       }
00579    }
00580    if (*ppVm == 0) stateOk = false;
00581    return stateOk;
00582 }
00583 
00596 static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, void *userP,
00597                                const char **pazValue, const char **pazColName, ExceptionStruct *exception)
00598 {
00599    bool doContinue = true;
00600    int numAssigned;
00601    bool stateOk = true;
00602    int decodeSize = 0;
00603    QueueEntry *queueEntry = 0;
00604    QueueEntryArr *queueEntryArr;
00605    TmpHelper *helper = (TmpHelper*)userP;
00606    QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP;
00607 
00608    if (currIndex == 0) {
00609       helper->currEntries = 0;
00610       helper->currBytes = 0;
00611    }
00612 
00613    if (*queueEntryArrPP == 0) {
00614       *queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));;
00615       if (helper->maxNumOfEntries == 0) {
00616          doContinue = false;
00617          return doContinue;
00618       }
00619    }
00620    queueEntryArr = *queueEntryArrPP;
00621 
00622    if (queueEntryArr->len == 0) {
00623       queueEntryArr->len = 10;
00624       queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry));
00625    }
00626    else if (currIndex >= queueEntryArr->len) {
00627       queueEntryArr->len += 10;
00628       queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry));
00629    }
00630    queueEntry = &queueEntryArr->queueEntryArr[currIndex];
00631    memset(queueEntry, 0, sizeof(QueueEntry));
00632 
00633    stateOk = strToInt64(&queueEntry->uniqueId, pazValue[XB_ENTRIES_DATA_ID]);
00634    if (!stateOk) {
00635       LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' to uniqueId, ignoring entry.", pazValue[XB_ENTRIES_DATA_ID]);
00636       strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00637       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00638                "[%.100s:%d] peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, pazValue[XB_ENTRIES_DATA_ID], pazColName[XB_ENTRIES_DATA_ID]);
00639       doContinue = false;
00640       return doContinue;
00641    }
00642 
00643    LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex);
00644    /* strncpy0(dbInfo->prop.queueName, pazValue[2], ID_MAX); */
00645    numAssigned = sscanf(pazValue[XB_ENTRIES_PRIO], "%hd", &queueEntry->priority);
00646    if (numAssigned != 1) {
00647       LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse pazValue[XB_ENTRIES_PRIO] '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_PRIO]);
00648       queueEntry->priority = 4;
00649    }
00650    strncpy0(queueEntry->embeddedType, pazValue[XB_ENTRIES_TYPE_NAME], QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
00651    queueEntry->isPersistent = *pazValue[XB_ENTRIES_PERSISTENT] == 'T' ? true : false;
00652    {
00653       int64_t ival = 0;
00654       stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_SIZE_IN_BYTES]);
00655       queueEntry->embeddedBlob.dataLen = (size_t)ival;
00656    }
00657 
00658    /* TODO!!! in Java the length is the size in RAM and not strlen(data) */
00659    /* queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen*sizeof(char)); */
00660    queueEntry->embeddedBlob.data = (char *)malloc(strlen(pazValue[XB_ENTRIES_BLOB])*sizeof(char)); /* we spoil some 2 % */
00661    decodeSize = sqlite_decode_binary((const unsigned char *)pazValue[XB_ENTRIES_BLOB], (unsigned char *)queueEntry->embeddedBlob.data);
00662    if (decodeSize == -1 || (size_t)decodeSize != queueEntry->embeddedBlob.dataLen) {
00663       *(queueEntry->embeddedBlob.data + strlen(pazValue[XB_ENTRIES_BLOB]) - 1) = 0; 
00664       LOG __FILE__, "peekWithSamePriority(%s) ERROR: Returned blob encoded='%s', decodeSize=%d"
00665                         " but expected decoded len=%d: '%s'",
00666                     int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_BLOB], decodeSize,
00667                     queueEntry->embeddedBlob.dataLen, queueEntry->embeddedBlob.data);
00668    }
00669 
00670    helper->currEntries += 1;
00671    helper->currBytes += queueEntry->embeddedBlob.dataLen;
00672 
00673    /* Limit the number of entries */
00674    if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) ||
00675        (helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) {
00676       /* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */
00677       doContinue = false;
00678    }
00679 
00680    return doContinue;
00681 }
00682 
00699 static int32_t getResultRows(I_Queue *queueP, const char *methodName,
00700                              sqlite_vm *pVm, 
00701                              ParseDataFp parseDataFp, void *userP,
00702                              bool finalize,
00703                              ExceptionStruct *exception)
00704 {
00705    char *errMsg = 0;
00706    int32_t currIndex = 0;
00707    int numCol = 0;
00708    const char **pazValue = 0;
00709    const char **pazColName = 0;
00710    bool done = false;
00711    bool stateOk = true;
00712    int rc;
00713    while (!done) {
00714       rc = sqlite_step(pVm, &numCol, &pazValue, &pazColName);
00715       switch( rc ){
00716          case SQLITE_DONE:
00717             done = true;
00718          break;
00719          case SQLITE_BUSY:
00720             LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName);
00721             sleepMillis(10);
00722          break;
00723          case SQLITE_ROW:
00724          {
00725             bool doContinue = true;
00726             if (parseDataFp) {
00727                /* @return true->to continue, false->to break execution or on error exception->errorCode is not null */
00728                doContinue = parseDataFp(queueP, currIndex, userP, pazValue, pazColName, exception);
00729                stateOk = *exception->errorCode == 0;
00730             }
00731             else {
00732                /*
00733                printf("RESULT[%d]\n", iRow);
00734                for (iCol = 0; iCol < numCol; iCol++) {
00735                   printf("%10.10s = %s\n", pazColName[iCol], pazValue[iCol]);
00736                }
00737                */
00738             }
00739             currIndex++;
00740             if (!stateOk || !doContinue) done = true;
00741          }
00742          break;
00743          case SQLITE_ERROR:   /* If exists already */
00744             LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc);
00745             done = true;
00746             stateOk = false;
00747          break;
00748          case SQLITE_MISUSE:
00749          default:
00750             LOG __FILE__, "%s() SQL execution problem [sqlCode=%d %s]", methodName, rc, sqlite_error_string(rc));
00751             done = true;
00752             stateOk = false;
00753          break;
00754       }
00755    }
00756    LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex);
00757 
00758    if (finalize) {
00759       sqlite_finalize(pVm, &errMsg);
00760       if (rc != SQLITE_OK && rc != SQLITE_DONE) {
00761          LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled. %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg);
00762       }
00763       if (errMsg != 0) sqlite_freemem(errMsg);
00764    }
00765    else { /* Reset prepared statement */
00766       rc = sqlite_reset(pVm, &errMsg);
00767       if (rc == SQLITE_SCHEMA) {
00768          LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg);
00769       }
00770       if (errMsg != 0) sqlite_freemem(errMsg);
00771    }
00772 
00773    return stateOk ? currIndex : (-1)*rc;
00774 }
00775 
00779 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception)
00780 {
00781    int rc = 0;
00782    bool stateOk = true;
00783    DbInfo *dbInfo;
00784    QueueEntryArr *queueEntryArr = 0;
00785 
00786    if (checkArgs(queueP, "peekWithSamePriority", true, exception) == false ) return 0;
00787 
00788    LOG __FILE__, "peekWithSamePriority(maxNumOfEntries=%d, maxNumOfBytes=%s) ...", (int)maxNumOfEntries, int64ToStr(int64Str, maxNumOfBytes));
00789 
00790    dbInfo = getDbInfo(queueP);
00791 
00792    if (dbInfo->pVm_peekWithSamePriority == 0) {  /* Compile prepared  query */
00793       char queryString[LEN512];
00794       /*"SELECT * FROM XB_ENTRIES where queueName='connection_clientJoe' and prio=(select max(prio) from XB_ENTRIES where queueName='connection_clientJoe') ORDER BY dataId ASC";*/
00795       SNPRINTF(queryString, LEN512,
00796            "SELECT * FROM %.20sENTRIES where queueName=?"
00797            " and prio=(select max(prio) from %.20sENTRIES where queueName=?)"
00798            " ORDER BY dataId ASC",
00799            dbInfo->prop.tablePrefix, dbInfo->prop.tablePrefix);
00800       stateOk = compilePreparedQuery(queueP, "peekWithSamePriority",
00801                     &dbInfo->pVm_peekWithSamePriority , queryString, exception);
00802    }
00803 
00804    if (stateOk) { /* set prepared statement tokens */
00805       int index = 0;
00806       int len = -1; /* Calculated by sqlite_bind */
00807       rc = SQLITE_OK;
00808 
00809       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);
00810       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);
00811 
00812       switch (rc) {
00813          case SQLITE_OK:
00814             LOG __FILE__, "peekWithSamePriority() Bound to prepared statement [sqlCode=%d]", rc);
00815             break;
00816          default:
00817             LOG __FILE__, "peekWithSamePriority() SQL error: %d %s", rc, sqlite_error_string(rc));
00818             strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00819             SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00820                      "[%.100s:%d] peekWithSamePriority() SQL error: %d %s", __FILE__, __LINE__, rc, sqlite_error_string(rc));
00821             stateOk = false;
00822             break;
00823       }
00824    }
00825 
00826    if (stateOk) { /* start the query */
00827       TmpHelper helper;
00828       int32_t currIndex = 0;
00829       helper.queueEntryArrPP = &queueEntryArr;
00830       helper.maxNumOfEntries = maxNumOfEntries;
00831       helper.maxNumOfBytes = maxNumOfBytes;
00832       currIndex = getResultRows(queueP, "peekWithSamePriority",
00833                         dbInfo->pVm_peekWithSamePriority, parseQueueEntryArr,
00834                         &helper, false, exception);
00835       stateOk = currIndex >= 0;
00836       if (!stateOk) {
00837          if (queueEntryArr) {
00838             free(queueEntryArr->queueEntryArr);
00839             queueEntryArr->len = 0;
00840          }
00841       }
00842       else {
00843          if (!queueEntryArr)
00844             queueEntryArr = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));
00845          else if ((size_t)currIndex < queueEntryArr->len) {
00846             queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, currIndex * sizeof(QueueEntry));
00847             queueEntryArr->len = currIndex; 
00848          }
00849       }
00850    }
00851 
00852    LOG __FILE__, "peekWithSamePriority() %s", stateOk ? "done" : "failed");
00853    return queueEntryArr;
00854 }
00855 
00860 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception)
00861 {
00862    bool stateOk = true;
00863    int64_t numOfBytes = 0;
00864    int32_t countDeleted = 0;
00865    sqlite_vm *pVm = 0;
00866    DbInfo *dbInfo;
00867    if (checkArgs(queueP, "randomRemove", true, exception) == false || queueEntryArr == 0 ||
00868                  queueEntryArr->len == 0 || queueEntryArr->queueEntryArr == 0)
00869       return 0;
00870 
00871    LOG __FILE__, "randomRemove(%d) ...", (int)queueEntryArr->len);
00872 
00873    dbInfo = getDbInfo(queueP);
00874 
00875    {
00876       size_t i;
00877       const size_t qLen = 128 + 2*ID_MAX + queueEntryArr->len*(INT64_STRLEN_MAX+6);
00878       char *queryString = (char *)calloc(qLen, sizeof(char));
00879       /*  DELETE FROM xb_entries WHERE queueName = 'connection_clientJoe' AND dataId in ( 1081492136876000000, 1081492136856000000 ); */
00880       SNPRINTF(queryString, qLen, 
00881            "DELETE FROM %.20sENTRIES WHERE queueName='%s'"
00882            " AND dataId in ( ",
00883            dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
00884 
00885       for (i=0; i<queueEntryArr->len; i++) {
00886          strcat(queryString, int64ToStr(int64Str, queueEntryArr->queueEntryArr[i].uniqueId));
00887          if (i<(queueEntryArr->len-1)) strcat(queryString, ",");
00888          numOfBytes += ((queueEntryArr->queueEntryArr[i].sizeInBytes > 0) ? queueEntryArr->queueEntryArr[i].sizeInBytes : queueEntryArr->queueEntryArr[i].embeddedBlob.dataLen);
00889       }
00890       strcat(queryString, " )");
00891       stateOk = compilePreparedQuery(queueP, "randomRemove", &pVm, queryString, exception);
00892       free(queryString);
00893    }
00894 
00895 
00896    if (stateOk) { /* start the query */
00897       int32_t currIndex = getResultRows(queueP, "randomRemove",
00898                               pVm, 0, 0, true, exception);
00899       stateOk = currIndex >= 0;
00900    }
00901 
00902    if (stateOk) {
00903       countDeleted = (int32_t)sqlite_last_statement_changes(dbInfo->db);
00904       if (countDeleted < 0 || (size_t)countDeleted != queueEntryArr->len) {
00905          fillCache(queueP, exception); /* calculate numOfBytes again */
00906       }
00907       else {
00908          dbInfo->numOfEntries -= queueEntryArr->len;
00909          dbInfo->numOfBytes -= numOfBytes;
00910       }
00911    }
00912 
00913    return countDeleted;
00914 }
00915 
00919 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception)
00920 {
00921    bool stateOk = true;
00922    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
00923    if (checkArgs(queueP, "destroy", false, exception) == false ) return false;
00924 
00925    shutdownInternal(queuePP, exception);
00926 
00927    {
00928       DbInfo *dbInfo = getDbInfo(queueP);
00929       const char *dbName = dbInfo->prop.dbName;
00930       stateOk = unlink(dbName) == 0; /* Delete old db file */
00931       if (!stateOk) {
00932          strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00933          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00934                   "[%.100s:%d] destroy() ERROR: Can't destroy database '%s', errno=%d.", __FILE__, __LINE__, dbName, errno);
00935       }
00936    }
00937 
00938    freeQueue(queuePP);
00939    
00940    return stateOk;
00941 }
00942 
00946 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception)
00947 {
00948    int stateOk = true;
00949    char queryString[LEN256];
00950    sqlite_vm *pVm = 0;
00951    DbInfo *dbInfo;
00952    if (checkArgs(queueP, "clear", true, exception) == false) return false;
00953    dbInfo = getDbInfo(queueP);
00954 
00955    SNPRINTF(queryString, LEN256, "DELETE FROM %.20sENTRIES", dbInfo->prop.tablePrefix);
00956    stateOk = compilePreparedQuery(queueP, "clear", &pVm, queryString, exception);
00957 
00958    if (stateOk) {
00959       int32_t currIndex = getResultRows(queueP, "clear", pVm, 0, 0, true, exception);
00960       stateOk = currIndex >= 0;
00961    }
00962 
00963    if (stateOk) {
00964       dbInfo->numOfEntries = 0;
00965       dbInfo->numOfBytes = 0;
00966    }
00967 
00968    LOG __FILE__, "clear() done");
00969    return stateOk;
00970 }
00971 
00975 static bool parseCacheInfo(I_Queue *queueP, size_t currIndex, void *userP,
00976                            const char **pazValue, const char **pazColName, ExceptionStruct *exception)
00977 {
00978    int64_t ival = 0;
00979    bool stateOk;
00980    DbInfo *dbInfo = getDbInfo(queueP);
00981 
00982    stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_DATA_ID]);
00983    if (!stateOk) {
00984       strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00985       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00986                "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfEntries, ignoring entry.", __FILE__, __LINE__, pazColName[XB_ENTRIES_DATA_ID], pazValue[XB_ENTRIES_DATA_ID]);
00987       return false;
00988    }
00989    dbInfo->numOfEntries = (int32_t)ival;
00990 
00991    stateOk = strToInt64(&dbInfo->numOfBytes, pazValue[1]);
00992    if (!stateOk) {
00993       strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00994       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00995                "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfBytes, ignoring entry.", __FILE__, __LINE__, pazColName[1], pazValue[1]);
00996       if (currIndex) {} /* Just to avoid compiler warning about unused variable */
00997       if (userP) {};
00998       return false;
00999    }
01000 
01001    return true;
01002 }
01003 
01010 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception)
01011 {
01012    bool stateOk = true;
01013    DbInfo *dbInfo = 0;
01014 
01015    char queryString[LEN512]; /* "SELECT count(dataId) FROM XB_ENTRIES where queueName='connection_clientJoe'" */
01016 
01017    if (checkArgs(queueP, "fillCache", true, exception) == false ) return true;
01018    dbInfo = getDbInfo(queueP);
01019 
01020    SNPRINTF(queryString, LEN512, 
01021             "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
01022             dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
01023    stateOk = compilePreparedQuery(queueP, "fillCache",
01024                   &dbInfo->pVm_fillCache, queryString, exception);
01025 
01026    if (stateOk) { /* start the query, calls parseCacheInfo() */
01027       int32_t currIndex = getResultRows(queueP, "fillCache",
01028                               dbInfo->pVm_fillCache, parseCacheInfo,
01029                               0, false, exception);
01030       stateOk = currIndex > 0;
01031    }
01032 
01033    LOG __FILE__, "fillCache() numOfEntries=%d numOfBytes=%s", dbInfo->numOfEntries, int64ToStr(int64Str, dbInfo->numOfBytes));
01034    return stateOk;
01035 }
01036 
01037 static bool persistentQueueEmpty(I_Queue *queueP)
01038 {
01039    return getNumOfEntries(queueP) <= 0;
01040 }
01041 
01042 static int32_t getNumOfEntries(I_Queue *queueP)
01043 {
01044    DbInfo *dbInfo;
01045    bool stateOk = true;
01046    ExceptionStruct exception;
01047    if (checkArgs(queueP, "getNumOfEntries", false, &exception) == false ) return -1;
01048    dbInfo = getDbInfo(queueP);
01049    if (dbInfo->numOfEntries == -1) {
01050       stateOk = fillCache(queueP, &exception);
01051    }
01052    return (stateOk) ? (int32_t)dbInfo->numOfEntries : -1;
01053 }
01054 
01055 static int32_t getMaxNumOfEntries(I_Queue *queueP)
01056 {
01057    DbInfo *dbInfo;
01058    ExceptionStruct exception;
01059    if (checkArgs(queueP, "getMaxNumOfEntries", false, &exception) == false ) return -1;
01060    dbInfo = getDbInfo(queueP);
01061    return dbInfo->prop.maxNumOfEntries;
01062 }
01063 
01064 static int64_t getNumOfBytes(I_Queue *queueP)
01065 {
01066    DbInfo *dbInfo;
01067    ExceptionStruct exception;
01068    bool stateOk = true;
01069    if (checkArgs(queueP, "getNumOfBytes", false, &exception) == false ) return -1;
01070    dbInfo = getDbInfo(queueP);
01071    if (dbInfo->numOfBytes == -1) {
01072       stateOk = fillCache(queueP, &exception);
01073    }
01074    return (stateOk) ? dbInfo->numOfBytes : -1;
01075 }
01076 
01077 static int64_t getMaxNumOfBytes(I_Queue *queueP)
01078 {
01079    DbInfo *dbInfo;
01080    ExceptionStruct exception;
01081    if (checkArgs(queueP, "getMaxNumOfBytes", false, &exception) == false ) return -1;
01082    dbInfo = getDbInfo(queueP);
01083    return dbInfo->prop.maxNumOfBytes;
01084 }
01085 
01090 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception)
01091 {
01092    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
01093    if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
01094    shutdownInternal(queuePP, exception);
01095    freeQueue(queuePP);
01096 }
01097 
01101 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception)
01102 {
01103    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
01104    if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
01105    {
01106       DbInfo *dbInfo = getDbInfo(queueP);
01107       queueP->isInitialized = false;
01108       if(dbInfo) {
01109          if (dbInfo->pVm_put) {
01110             char *errMsg = 0;
01111             /*int rc =*/ sqlite_finalize(dbInfo->pVm_put, &errMsg);
01112             if (errMsg != 0) sqlite_freemem(errMsg);
01113             dbInfo->pVm_put = 0;
01114          }
01115          if (dbInfo->pVm_peekWithSamePriority) {
01116             char *errMsg = 0;
01117             sqlite_finalize(dbInfo->pVm_peekWithSamePriority, &errMsg);
01118             if (errMsg != 0) sqlite_freemem(errMsg);
01119             dbInfo->pVm_peekWithSamePriority = 0;
01120          }
01121          if (dbInfo->pVm_fillCache) {
01122             char *errMsg = 0;
01123             sqlite_finalize(dbInfo->pVm_fillCache, &errMsg);
01124             if (errMsg != 0) sqlite_freemem(errMsg);
01125             dbInfo->pVm_fillCache = 0;
01126          }
01127          if (dbInfo->db) {
01128             sqlite_close(dbInfo->db);
01129             dbInfo->db = 0;
01130          }
01131          LOG __FILE__, "shutdown() done");
01132       }
01133    }
01134 }
01135 
01140 Dll_Export void freeQueueEntryArr(QueueEntryArr *queueEntryArr)
01141 {
01142    if (queueEntryArr == (QueueEntryArr *)0) return;
01143    freeQueueEntryArrInternal(queueEntryArr);
01144    free(queueEntryArr);
01145 }
01146 
01151 Dll_Export void freeQueueEntryArrInternal(QueueEntryArr *queueEntryArr)
01152 {
01153    size_t i;
01154    if (queueEntryArr == (QueueEntryArr *)0) return;
01155    for (i=0; i<queueEntryArr->len; i++) {
01156       freeQueueEntryData(&queueEntryArr->queueEntryArr[i]);
01157    }
01158    free(queueEntryArr->queueEntryArr);
01159    queueEntryArr->len = 0;
01160 }
01161 
01165 static void freeQueueEntryData(QueueEntry *queueEntry)
01166 {
01167    if (queueEntry == (QueueEntry *)0) return;
01168    if (queueEntry->embeddedBlob.data != 0) {
01169       free((char *)queueEntry->embeddedBlob.data);
01170       queueEntry->embeddedBlob.data = 0;
01171    }
01172    queueEntry->embeddedBlob.dataLen = 0;
01173 }
01174 
01179 Dll_Export void freeQueueEntry(QueueEntry *queueEntry)
01180 {
01181    if (queueEntry == (QueueEntry *)0) return;
01182    freeQueueEntryData(queueEntry);
01183    free(queueEntry);
01184 }
01185 
01194 Dll_Export char *queueEntryToXml(QueueEntry *queueEntry, int maxContentDumpLen)
01195 {
01196    if (queueEntry == (QueueEntry *)0) return 0;
01197    {
01198    char *contentStr = strFromBlobAlloc(queueEntry->embeddedBlob.data, queueEntry->embeddedBlob.dataLen);
01199    const size_t blobLen = (maxContentDumpLen >= 0) ? maxContentDumpLen : queueEntry->embeddedBlob.dataLen;
01200    const size_t len = 200 + QUEUE_ENTRY_EMBEDDEDTYPE_LEN + blobLen;
01201    char *xml = (char *)calloc(len, sizeof(char));
01202    if (xml == 0) {
01203       free(contentStr);
01204       return 0;
01205    }
01206    if (maxContentDumpLen == 0)
01207       *contentStr = 0;
01208    else if (maxContentDumpLen > 0 && queueEntry->embeddedBlob.dataLen > 5 &&
01209             (size_t)maxContentDumpLen < (queueEntry->embeddedBlob.dataLen-5))
01210       strcpy(contentStr+maxContentDumpLen, " ...");
01211 
01212    SNPRINTF(xml, len, "\n <QueueEntry id='%s' priority='%hd' persistent='%s' type='%s'>"
01213                       "\n  <content size='%lu'><![CDATA[%s]]></content>"
01214                       "\n <QueueEntry>",
01215                         int64ToStr(int64Str, queueEntry->uniqueId), queueEntry->priority,
01216                         queueEntry->isPersistent?"true":"false",
01217                         queueEntry->embeddedType,
01218                         (unsigned long)queueEntry->embeddedBlob.dataLen, contentStr);
01219    free(contentStr);
01220    return xml;
01221    }
01222 }
01223 
01224 Dll_Export void freeEntryDump(char *entryDump)
01225 {
01226    if (entryDump) free(entryDump);
01227 }
01228 
01238 static bool checkArgs(I_Queue *queueP, const char *methodName,
01239                       bool checkIsConnected, ExceptionStruct *exception)
01240 {
01241    if (queueP == 0) {
01242       if (exception == 0) {
01243          printf("[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to %s()\n",
01244                   __FILE__, __LINE__, methodName);
01245       }
01246       else {
01247          strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
01248          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
01249                   "[%.100s:%d] Please provide a valid I_Queue pointer to %.16s()",
01250                    __FILE__, __LINE__, methodName);
01251          LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
01252       }
01253       return false;
01254    }
01255 
01256    if (exception == 0) {
01257       LOG __FILE__, "[%s:%d] Please provide valid exception pointer to %s()", __FILE__, __LINE__, methodName);
01258       return false;
01259    }
01260 
01261    if (checkIsConnected) {
01262       if (queueP->privateObject==0 ||
01263           ((DbInfo *)(queueP->privateObject))->db==0 ||
01264           !queueP->isInitialized) {
01265          strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
01266          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
01267                   "[%.100s:%d] Not connected to database, %s() failed",
01268                    __FILE__, __LINE__, methodName);
01269          LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
01270          return false;
01271       }
01272    }
01273 
01274    initializeExceptionStruct(exception);
01275 
01276    LOG __FILE__, "%s() entering ...", methodName);
01277 
01278    return true;
01279 }
01280 
01281 /*=================== TESTCODE =======================*/
01282 # ifdef QUEUE_MAIN
01283 /*
01284  NOTE:
01285     This code may be totally outdated, for current examples please use:
01286     xmlBlaster/testsuite/src/c/TestQueue.c
01287 */
01288 #include <stdio.h>
01289 static void testRun(int argc, char **argv) {
01290    ExceptionStruct exception;
01291    QueueEntryArr *entries = 0;
01292    QueueProperties queueProperties;
01293    I_Queue *queueP = 0;
01294 
01295    memset(&queueProperties, 0, sizeof(QueueProperties));
01296    strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
01297    strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
01298    strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
01299    queueProperties.maxNumOfEntries = 10000000L;
01300    queueProperties.maxNumOfBytes = 1000000000LL;
01301    queueProperties.logFp = xmlBlasterDefaultLogging;
01302    queueProperties.logLevel = XMLBLASTER_LOG_TRACE;
01303    queueProperties.userObject = 0;
01304 
01305    queueP = createQueue(&queueProperties, &exception);
01306    /* DbInfo *dbInfo = (DbInfo *)queueP->privateObject; */
01307    if (argc || argv) {} /* to avoid compiler warning */
01308 
01309    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
01310 
01311    {
01312       int64_t idArr[] =   { 1081492136826000000ll, 1081492136856000000ll, 1081492136876000000ll };
01313       int16_t prioArr[] = { 5                    , 1                    , 5 };
01314       char *data[] =      { "Hello"              , " World"             , "!!!" };
01315       size_t i;
01316       for (i=0; i<sizeof(idArr)/sizeof(int64_t); i++) {
01317          QueueEntry queueEntry;
01318          memset(&queueEntry, 0, sizeof(QueueEntry));
01319          queueEntry.priority = prioArr[i];
01320          queueEntry.isPersistent = true;
01321          queueEntry.uniqueId = idArr[i];
01322          strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
01323          queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
01324          queueEntry.embeddedBlob.data = data[i];
01325          queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data);
01326 
01327          queueP->put(queueP, &queueEntry, &exception);
01328          if (*exception.errorCode != 0) {
01329             LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
01330          }
01331       }
01332    }
01333    
01334    entries = queueP->peekWithSamePriority(queueP, -1, 6, &exception);
01335    if (*exception.errorCode != 0) {
01336       LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
01337    }
01338    if (entries != 0) {
01339       size_t i;
01340       printf("testRun after peekWithSamePriority() dump %lu entries:\n", (unsigned long)entries->len);
01341       for (i=0; i<entries->len; i++) {
01342          QueueEntry *queueEntry = &entries->queueEntryArr[i];
01343          char *dump = queueEntryToXml(queueEntry, 200);
01344          printf("%s\n", dump);
01345          free(dump);
01346       }
01347    }
01348 
01349    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
01350    queueP->randomRemove(queueP, entries, &exception);
01351    if (*exception.errorCode != 0) {
01352       LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
01353    }
01354 
01355    freeQueueEntryArr(entries);
01356    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
01357    
01358    queueP->clear(queueP, &exception);
01359    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
01360 
01361    queueP->shutdown(&queueP, &exception);
01362 }
01363 
01364 int main(int argc, char **argv) {
01365    int i;
01366    for (i=0; i<1; i++) {
01367       testRun(argc, argv);
01368    }
01369    return 0;
01370 }
01371 #endif /*QUEUE_MAIN*/
01372 /*=================== TESTCODE =======================*/
01373