00001 /*------------------------------------------------------------------------------
00002 Name:      SQLiteQueuePlugin.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Author:    xmlBlaster@marcelruff.info
00006 ------------------------------------------------------------------------------*/
00007 #include <util/queue/SQLiteQueuePlugin.h>
00008 #include <util/XmlBlasterException.h>
00009 #include <util/Global.h>
00010 #include <stdarg.h>           // va_start for logging
00011 #include <stdio.h>            // vsnprintf for g++ 2.9x only
00012 #include <util/lexical_cast.h>
00013 #include <util/MessageUnit.h>
00014 #include <util/queue/ConnectQueueEntry.h>
00015 #include <util/queue/SubscribeQueueEntry.h>
00016 #include <util/queue/UnSubscribeQueueEntry.h>
00017 #include <util/queue/PublishQueueEntry.h>
00018 #include <socket/xmlBlasterSocket.h> // C xmlBlaster client library: for msgUnit serialize
00019 #include <util/queue/QueueInterface.h> // The C implementation interface
00021 using namespace std;
00022 using namespace org::xmlBlaster::util;
00023 using namespace org::xmlBlaster::util::thread;
00024 using namespace org::xmlBlaster::util::qos::storage;
00025 using namespace org::xmlBlaster::util::key;
00026 using namespace org::xmlBlaster::util::qos;
00027 using namespace org::xmlBlaster::client::qos;
00028 using namespace org::xmlBlaster::client::key;
00030 //static ::XmlBlasterLogging loggingFp = ::xmlBlasterDefaultLogging;
00031 static void myLogger(void *logUserP, 
00032                      XMLBLASTER_LOG_LEVEL currLevel,
00033                      XMLBLASTER_LOG_LEVEL level,
00034                      const char *location, const char *fmt, ...);
00036 namespace org { namespace xmlBlaster { namespace util { namespace queue {
00038 SQLiteQueuePlugin::SQLiteQueuePlugin(Global& global, const ClientQueueProperty& property)
00039    : ME("SQLiteQueuePlugin"), 
00040      global_(global), 
00041      log_(global.getLog("org.xmlBlaster.util.queue")), 
00042      property_(property), 
00043      queueP_(0), 
00044      connectQosFactory_(global_),
00045      statusQosFactory_(global_),
00046      msgKeyFactory_(global_),
00047      msgQosFactory_(global_),
00048      accessMutex_()
00049 {
00050    if (log_.call()) log_.call(ME, "Constructor queue [" + getType() + "][" + getVersion() + "] ...");
00051    /*
00052     TODO: Pass basic configuration from plugin key/values similar to (see xmlBlaster.properties)
00053      QueuePlugin[SQLite][1.0]=SQLiteQueuePlugin,
00054          url=/${user.home}${file.separator}tmp${file.separator}$_{xmlBlaster_uniqueId}.db,\
00055          user=sqlite,\
00056          password=,\
00057          connectionPoolSize=1,\
00058          connectionBusyTimeout=90000,\
00059          maxWaitingThreads=300,\
00060          tableNamePrefix=XB_,\
00061          entriesTableName=ENTRIES,\
00062          dbAdmin=true
00063    */
00064    const std::string classRelating = "QueuePlugin["+getType()+"]["+getVersion()+"]"; // "QueuePlugin[SQLite][1.0]"
00065    const std::string instanceRelating = property.getPropertyPrefix();                // == "connection"
00067    // Should it be "queue/connection/tableNamePrefix" or "queue/QueuePlugin[SQLite][1.0]/tableNamePrefix"
00068    // The first allows different instances with changing "connection" to e.g. "tailback" etc.
00069    if (global_.getProperty().propertyExists(classRelating, true)) {
00070       log_.warn(ME, "Your setting of property '" + classRelating + "' is not supported");
00071    }
00073    std::string defaultPath = ""; // for example: "/home/joe/tmp/" or "C:\Documents and Settings\marcel\tmp"
00074    if (global_.getProperty().get("user.home", "") != "")
00075       defaultPath = global_.getProperty().get("user.home", "") +
00076                     global_.getProperty().get("file.separator", "");
00077                     //+ "tmp" +                                     // We currently can't create missing directories, TODO!!!
00078                     //global_.getProperty().get("file.separator", "");
00080    const std::string url = global_.getProperty().get("queue/"+instanceRelating+"/url", defaultPath+"xmlBlasterClientCpp.db");  // "queue/connection/url"
00081    const std::string queueName = global_.getProperty().get("queue/"+instanceRelating+"/queueName", instanceRelating + "_" + global_.getStrippedImmutableId()); // "connection_clientJoe2"
00082    const std::string tableNamePrefix = global_.getProperty().get("queue/"+instanceRelating+"/tableNamePrefix", "XB_");// "queue/connection/tableNamePrefix"
00084    ::ExceptionStruct exception;
00085    ::QueueProperties queueProperties;
00086    memset(&queueProperties, 0, sizeof(QueueProperties));
00088    strncpy0(queueProperties.dbName, url.c_str(), QUEUE_DBNAME_MAX);
00089    strncpy0(queueProperties.queueName, queueName.c_str(), QUEUE_ID_MAX);
00090    strncpy0(queueProperties.tablePrefix, tableNamePrefix.c_str(), QUEUE_PREFIX_MAX);
00091    queueProperties.maxNumOfEntries = (int32_t)property.getMaxEntries();
00092    queueProperties.maxNumOfBytes = property.getMaxBytes();
00093    queueProperties.logFp = myLogger;
00094    queueProperties.logLevel = (log_.call() || log_.trace()) ? XMLBLASTER_LOG_TRACE : XMLBLASTER_LOG_INFO;
00095    queueProperties.userObject = &log_;
00097    queueP_ = createQueue(&queueProperties, &exception); // &log_ Used in myLogger(), see above
00098    if (*exception.errorCode != 0) throw convertFromQueueException(&exception);
00100    log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "], queue/"+instanceRelating+"/url='" +
00101                  queueProperties.dbName + "', queue/"+instanceRelating+"/queueName='" + queueProperties.queueName +
00102                  "', queue/"+instanceRelating+"/maxEntries=" + lexical_cast<string>(queueProperties.maxNumOfEntries));
00103 }
00105 /*
00106 SQLiteQueuePlugin::SQLiteQueuePlugin(const SQLiteQueuePlugin& queue)
00107    : ME("SQLiteQueuePlugin"), 
00108      global_(queue.global_), 
00109      log_(queue.log_), 
00110      property_(queue.property_), 
00111      queueP_(queue.queueP_), 
00112      accessMutex_()
00113 {
00114 }
00116 SQLiteQueuePlugin& SQLiteQueuePlugin::operator =(const SQLiteQueuePlugin& queue)
00117 {
00118    Lock lock(queue.accessMutex_);
00119    property_   = queue.property_;
00120    queueP_    = queue.queueP_;
00121    return *this;
00123 }
00124 */
00126 SQLiteQueuePlugin::~SQLiteQueuePlugin()
00127 {
00128    if (log_.call()) log_.call(ME, "destructor");
00129    if (queueP_) {
00130       Lock lock(accessMutex_);
00131       ::ExceptionStruct exception;
00132       queueP_->shutdown(&queueP_, &exception); // NULLs the queueP_
00133       if (*exception.errorCode != 0) {
00134          const int ERRORSTR_LEN = 1024;
00135          char errorString[ERRORSTR_LEN];
00136          log_.warn(ME, string("Ignoring problem during shutdown: ") + getExceptionStr(errorString, ERRORSTR_LEN, &exception));
00137       }
00138    }
00139 } 
00141 void SQLiteQueuePlugin::put(const MsgQueueEntry &entry)
00142 {
00143    if (log_.call()) log_.call(ME, "::put");
00144    if (log_.dump()) log_.dump(ME+".put()", string("The msg entry is: ")  + entry.toXml());
00146    Lock lock(accessMutex_);
00147    if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, put() failed");
00149    ::ExceptionStruct exception;
00150    ::QueueEntry queueEntry;
00152    // Copy C++ to C struct ...
00154    queueEntry.priority = entry.getPriority();
00155    queueEntry.isPersistent = entry.isPersistent();
00156    queueEntry.uniqueId = entry.getUniqueId();
00157    queueEntry.sizeInBytes = entry.getSizeInBytes();
00158    strncpy0(queueEntry.embeddedType, entry.getEmbeddedType().c_str(), QUEUE_ENTRY_EMBEDDEDTYPE_LEN);  // "MSG_RAW|publish"
00159    queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
00161    // dump MsgQueueEntry with SOCKET protocol into C ::MsgUnit ...
00163    const BlobHolder *blob = (const BlobHolder *)entry.getEmbeddedObject();
00164    if (blob == 0) throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME, "put() failed, the entry " + entry.getLogId() + " returned NULL for embeddedObject");
00165    queueEntry.embeddedBlob.data = blob->data;
00166    queueEntry.embeddedBlob.dataLen = blob->dataLen;
00168    if (log_.dump()) {
00169       char *dumpP = blobDump(&queueEntry.embeddedBlob);
00170       log_.dump(ME+".put()", string("Put blob to queue:") + dumpP);
00171       ::xmlBlasterFree(dumpP);
00172    }
00174    // Push into C persistent queue ...
00176    queueP_->put(queueP_, &queueEntry, &exception);
00178    if (*exception.errorCode != 0) throw convertFromQueueException(&exception);
00179 }
00181 const vector<EntryType> SQLiteQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const
00182 {
00183    Lock lock(accessMutex_);
00184    if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, peekWithSamePriority() failed");
00185    vector<EntryType> ret;
00186    if (queueP_->empty(queueP_)) return ret;
00187    if (log_.call()) log_.call(ME, "peekWithSamePriority maxNumOfEntries=" + lexical_cast<std::string>(maxNumOfEntries) + " maxNumOfBytes=" + lexical_cast<std::string>(maxNumOfBytes));
00189    ::ExceptionStruct exception;
00190    ::QueueEntryArr *entriesC = queueP_->peekWithSamePriority(queueP_, (int32_t)maxNumOfEntries, maxNumOfBytes, &exception);
00191    if (*exception.errorCode != 0) throw convertFromQueueException(&exception);
00193    // Now we need to copy the C results into C++ classes ...
00195    for (size_t ii=0; ii<entriesC->len; ii++) {
00196       ::QueueEntry &queueEntryC = entriesC->queueEntryArr[ii];
00197       string type, methodName;
00198       parseEmbeddedType(queueEntryC.embeddedType, type, methodName);
00200       if (type != Constants::ENTRY_TYPE_MSG_RAW) {
00201          string embedded = queueEntryC.embeddedType;
00202          freeQueueEntryArr(entriesC);
00203          throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::peekWithSamePriority", string("The queue entry embeddedType '") + embedded + "' type='" + type + "' is not supported");
00204       }
00206       if (log_.dump()) {
00207          char *dumpP = blobDump(&queueEntryC.embeddedBlob);
00208          log_.dump(ME+".peekWithSamePriority()", string("Retrieved blob from queue:") + dumpP);
00209          ::xmlBlasterFree(dumpP);
00210       }
00212       ::MsgUnitArr *msgUnitArrC = ::parseMsgUnitArr(queueEntryC.embeddedBlob.dataLen, queueEntryC.embeddedBlob.data);
00214       for (size_t j=0; msgUnitArrC!=0 && j<msgUnitArrC->len; j++) { // TODO: Collect a PUBLISH_ARR !!! (currently we transform it to single publish()es)
00215          ::MsgUnit &msgUnit = msgUnitArrC->msgUnitArr[j];
00216          if (log_.dump()) {
00217             char *dumpP = ::messageUnitToXmlLimited(&msgUnit, 128);
00218             log_.dump(ME+".peekWithSamePriority()", string("Retrieved and parsed C message from queue:") + dumpP);
00219             ::xmlBlasterFree(dumpP);
00220          }
00221          if (methodName == MethodName::PUBLISH) {
00222             MsgKeyData msgKeyData = msgKeyFactory_.readObject(string(msgUnit.key));
00223             MsgQosData msgQosData = msgQosFactory_.readObject(string(msgUnit.qos));
00224             MessageUnit messageUnit(msgKeyData, msgUnit.contentLen, (const unsigned char*)msgUnit.content, msgQosData);
00225             PublishQueueEntry *pq = new PublishQueueEntry(global_, messageUnit,
00226                                            queueEntryC.priority, queueEntryC.uniqueId);
00227             if (log_.trace()) log_.trace(ME, "Got PublishQueueEntry from queue");
00228             ret.insert(ret.end(), EntryType(*pq));
00229             if (log_.trace()) log_.trace(ME, "PublishQueueEntry is reference countet");
00230          }
00231          else if (methodName == MethodName::CONNECT) {
00232             ConnectQosRef connectQos = connectQosFactory_.readObject(string(msgUnit.qos));
00233             ConnectQueueEntry *pq = new ConnectQueueEntry(global_, connectQos,
00234                                            queueEntryC.priority, queueEntryC.uniqueId);
00235             if (log_.trace()) log_.trace(ME, "Got ConnectQueueEntry from queue");
00236             ret.insert(ret.end(), EntryType(*pq));
00237             if (log_.trace()) log_.trace(ME, "ConnectQueueEntry is reference countet");
00238          }
00239          /* TODO: queryKeyFactory and queryQosFactory!
00240          else if (methodName == MethodName::SUBSCRIBE) {
00241             QueryKeyData queryKeyData = queryKeyFactory_.readObject(string(msgUnit.key));
00242             SubscribeKey subscribeKey(global_, queryKeyData);
00243             QueryQosData queryQosData = queryQosFactory_.readObject(string(msgUnit.qos));
00244             SubscribeQos subscribeQos(global_, queryQosData);
00245             SubscribeQueueEntry *pq = new SubscribeQueueEntry(global_, subscribeKey, subscribeQos,
00246                                            queueEntryC.priority, queueEntryC.uniqueId);
00247             if (log_.trace()) log_.trace(ME, "Got SubscribeQueueEntry from queue");
00248             ret.insert(ret.end(), EntryType(*pq));
00249             if (log_.trace()) log_.trace(ME, "SubscribeQueueEntry is reference countet");
00250          }
00251          */
00252          else {  // TODO: How to handle: throw exception or remove the buggy entry?
00253             log_.error(ME + "::peekWithSamePriority", string("The queue entry embeddedType '") + queueEntryC.embeddedType + "' methodName='" + methodName + "' is not supported, we ignore it.");
00254          }
00255       }
00257       freeMsgUnitArr(msgUnitArrC);
00258    }
00260    freeQueueEntryArr(entriesC);
00261    return ret;
00262 }
00264 void SQLiteQueuePlugin::parseEmbeddedType(const string& embeddedType, string &type, string &methodName)
00265 {
00266    string::size_type pos = embeddedType.find("|");
00267    if (pos == string::npos) {
00268       type = embeddedType;
00269       methodName = "";
00270       return;
00271    }
00272    type = embeddedType.substr(0, pos);
00273    if (pos < embeddedType.size())
00274       methodName = embeddedType.substr(pos+1);
00275    // No trim(): we assume no white spaces
00276 }
00278 long SQLiteQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) 
00279 {
00280    Lock lock(accessMutex_);
00281    if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, randomRemove() failed");
00282    if (start == end || queueP_->empty(queueP_)) return 0;
00284    ::QueueEntryArr queueEntryArr;
00285    memset(&queueEntryArr, 0, sizeof(QueueEntryArr));
00286    {
00287       vector<EntryType>::const_iterator iter = start;
00288       while (iter != end) {
00289          iter++;
00290          queueEntryArr.len++;
00291       }
00292    }
00293    if (queueEntryArr.len < 1) return 0;
00295    queueEntryArr.queueEntryArr = (QueueEntry *)calloc(queueEntryArr.len, sizeof(QueueEntry));
00297    vector<EntryType>::const_iterator iter = start;
00298    for (int currIndex=0; iter != end; ++iter, currIndex++) {
00299       const EntryType &entryType = (*iter);
00300       const MsgQueueEntry &entry = *entryType;
00301       ::QueueEntry &queueEntry = queueEntryArr.queueEntryArr[currIndex];
00303       // Copy C++ to C struct ...
00305       queueEntry.priority = entry.getPriority();
00306       queueEntry.isPersistent = entry.isPersistent();
00307       queueEntry.uniqueId = entry.getUniqueId();
00308       queueEntry.sizeInBytes = entry.getSizeInBytes();
00309       strncpy0(queueEntry.embeddedType, entry.getEmbeddedType().c_str(), QUEUE_ENTRY_EMBEDDEDTYPE_LEN);  // "MSG_RAW|publish" "MSG_RAW|connect"
00310       queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
00311       /*
00312       const BlobHolder *blob = (const BlobHolder *)entry.getEmbeddedObject();
00313       if (blob == 0) throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME, "put() failed, the entry " + entry.getLogId() + " returned NULL for embeddedObject");
00314       queueEntry.embeddedBlob.data = blob->data;
00315       queueEntry.embeddedBlob.dataLen = blob->dataLen;
00316       */
00317       if (log_.dump()) {
00318          char *dumpP = ::queueEntryToXml(&queueEntry, 200);
00319          log_.dump(ME+".put()", string("Put blob to queue:") + dumpP);
00320          xmlBlasterFree(dumpP);
00321       }
00322    }
00324    ::ExceptionStruct exception;
00326    int32_t numRemoved = queueP_->randomRemove(queueP_, &queueEntryArr, &exception);
00328    freeQueueEntryArrInternal(&queueEntryArr);
00330    if (*exception.errorCode != 0) throw convertFromQueueException(&exception);
00331    return (long)numRemoved;
00332 }
00334 long SQLiteQueuePlugin::getNumOfEntries() const
00335 {
00336    if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfEntries() failed");
00337    return queueP_->getNumOfEntries(queueP_);
00338 }
00340 long SQLiteQueuePlugin::getMaxNumOfEntries() const
00341 {
00342    if (queueP_ == 0) return property_.getMaxEntries(); // throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfEntries() failed");
00343    return queueP_->getMaxNumOfEntries(queueP_);
00344 }
00346 int64_t SQLiteQueuePlugin::getNumOfBytes() const
00347 {
00348    if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfBytes() failed");
00349    return queueP_->getNumOfBytes(queueP_);
00350 }
00352 int64_t SQLiteQueuePlugin::getMaxNumOfBytes() const
00353 {  
00354    if (queueP_ == 0) return property_.getMaxBytes(); // throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getMaxNumOfBytes() failed");
00355    return queueP_->getMaxNumOfBytes(queueP_);
00356 }
00358 void SQLiteQueuePlugin::clear()
00359 {
00360    Lock lock(accessMutex_);
00361    if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, clear() failed");
00362    ::ExceptionStruct exception;
00363    queueP_->clear(queueP_, &exception);
00364 }
00367 bool SQLiteQueuePlugin::empty() const
00368 {
00369    if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, empty() failed");
00370    return queueP_->empty(queueP_);
00371 }
00373 void SQLiteQueuePlugin::destroy()
00374 {
00375    if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, destroy() failed");
00376    ::ExceptionStruct exception;
00377    queueP_->destroy(&queueP_, &exception);
00378    if (*exception.errorCode != 0) throw convertFromQueueException(&exception);
00379 }
00381 // Exception conversion ....
00382 org::xmlBlaster::util::XmlBlasterException SQLiteQueuePlugin::convertFromQueueException(const ::ExceptionStruct *ex) const
00383 {
00384    return org::xmlBlaster::util::XmlBlasterException(
00385             (*ex->errorCode=='\0')?string("internal.unknown"):string(ex->errorCode),
00386             string(""),
00387             ME,
00388             "en",
00389             string(ex->message),
00390             global_.getVersion() + " " + global_.getBuildTimestamp());
00391 }
00393 string SQLiteQueuePlugin::usage()
00394 {
00395    std::string text = string("");
00396    text += string("\nThe SQLite persistent queue plugin configuration:");
00397    text += string("\n   -queue/connection/url [xmlBlasterClientCpp.db]");
00398    text += string("\n                       The database file name (incl. path), defaults to the current directory.");
00399    text += string("\n   -queue/connection/tableNamePrefix [XB_]");
00400    text += string("\n                       The prefix for all tables in the database.");
00401    text += ClientQueueProperty::usage();
00402    return text;
00403 }
00405 }}}} // namespace
00423 static void myLogger(void *logUserP, 
00424                      XMLBLASTER_LOG_LEVEL currLevel,
00425                      XMLBLASTER_LOG_LEVEL level,
00426                      const char *location, const char *fmt, ...)
00427 {
00428    /* Guess we need no more than 200 bytes. */
00429    int n, size = 200;
00430    char *p = 0;
00431    va_list ap;
00432    ::I_Queue *queueP = (::I_Queue *)logUserP;
00434    //org::xmlBlaster::util::queue::SQLiteQueuePlugin *pluginP =
00435    //      (org::xmlBlaster::util::queue::SQLiteQueuePlugin *)queueP->userObject;
00436    //org::xmlBlaster::util::I_Log& log = pluginP->getLog();
00438    if (queueP->userObject == 0) {
00439       std::cout << "myLogger not initialized" << std::endl;
00440       return;
00441    }
00442    org::xmlBlaster::util::I_Log& log = *((org::xmlBlaster::util::I_Log*)queueP->userObject);
00445       return;
00446    }
00447    if ((p = (char *)malloc (size)) == NULL)
00448       return;
00450    for (;;) {
00451       /* Try to print in the allocated space. */
00452       va_start(ap, fmt);
00453       n = VSNPRINTF(p, size, fmt, ap); /* UNIX: vsnprintf(), WINDOWS: _vsnprintf() */
00454       va_end(ap);
00455       /* If that worked, print the string to console. */
00456       if (n > -1 && n < size) {
00457          if (level == XMLBLASTER_LOG_INFO)
00458             log.info(location, p);
00459          else if (level == XMLBLASTER_LOG_WARN)
00460             log.warn(location, p);
00461          else if (level == XMLBLASTER_LOG_ERROR)
00462             log.error(location, p);
00463          else
00464             log.trace(location, p);
00465          free(p);
00466          return;
00467       }
00468       /* Else try again with more space. */
00469       if (n > -1)    /* glibc 2.1 */
00470          size = n+1; /* precisely what is needed */
00471       else           /* glibc 2.0 */
00472          size *= 2;  /* twice the old size */
00473       if ((p = (char *)realloc (p, size)) == NULL) {
00474          return;
00475       }
00476    }
00477 }