util/queue/MsgQueueEntry.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      MsgQueueEntry.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 ------------------------------------------------------------------------------*/
00006 
00007 #include <util/queue/MsgQueueEntry.h>
00008 #include <util/dispatch/I_ConnectionsHandler.h>
00009 #include <util/Global.h>
00010 #include <util/lexical_cast.h>
00011 #include <util/qos/ConnectQos.h>
00012 #include <client/qos/PublishReturnQos.h>
00013 #include <cstddef> //<stddef.h>
00014 #include <util/msgUtil.h> // from xmlBlaster C library
00015 #include <socket/xmlBlasterSocket.h> // from xmlBlaster C library ::encodeMsgUnit(&msgUnit, debug);
00016 
00025 namespace org { namespace xmlBlaster { namespace util { namespace queue {
00026 
00027 using namespace std;
00028 using namespace org::xmlBlaster::util;
00029 using namespace org::xmlBlaster::util::key;
00030 using namespace org::xmlBlaster::util::qos;
00031 using namespace org::xmlBlaster::util::dispatch;
00032 using namespace org::xmlBlaster::client::qos;
00033 
00034 MsgQueueEntry::MsgQueueEntry(Global& global, const MessageUnit& msgUnit, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId)
00035    : ReferenceCounterBase(), 
00036      ME("MsgQueueEntry"), 
00037      global_(global), 
00038      log_(global.getLog("org.xmlBlaster.util.queue")),
00039      connectQos_((ConnectQos*)0),
00040      connectReturnQos_((ConnectReturnQos*)0)
00041 {
00042    publishReturnQos_ = NULL;
00043    msgUnit_          = new MessageUnit(msgUnit);
00044    statusQosData_    = NULL;
00045    uniqueId_         = uniqueId; //TimestampFactory::getInstance().getTimestamp();
00046    embeddedType_     = embeddedType;
00047    priority_         = priority; // should be normal priority
00048    persistent_       = persistent; // currently no persistents supported
00049    logId_            = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_);
00050    memset(&blobHolder_, 0, sizeof(BlobHolder));
00051 }
00052 
00053 MsgQueueEntry::MsgQueueEntry(Global& global, const ConnectQosRef& connectQos, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId)
00054    : ReferenceCounterBase(), ME("MsgQueueEntry"), global_(global), log_(global.getLog("org.xmlBlaster.util.queue")),
00055      connectQos_(*connectQos),  // OK to take a reference only???!!! Should we clone it so that RAM queue behaves same as persistent queue?
00056      connectReturnQos_((ConnectReturnQos*)0)
00057 {
00058    msgUnit_          = NULL;
00059    publishReturnQos_ = NULL;
00060    statusQosData_    = NULL;
00061    uniqueId_         = uniqueId;
00062    embeddedType_     = embeddedType;
00063    priority_         = priority; // should be maximum priority
00064    persistent_       = persistent; // currently no persistents supported
00065    logId_            = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_);
00066    memset(&blobHolder_, 0, sizeof(BlobHolder));
00067 }
00068 
00069 
00070 MsgQueueEntry::MsgQueueEntry(Global& global, const QueryKeyData& queryKeyData, const QueryQosData& queryQosData, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId)
00071    : ReferenceCounterBase(), ME("MsgQueueEntry"), global_(global), log_(global.getLog("org.xmlBlaster.util.queue")),
00072      connectQos_((ConnectQos*)0),
00073      connectReturnQos_((ConnectReturnQos*)0)
00074 {
00075    // The MessageUnit takes a copy of the passed queryKeyData and queryQosData:
00076    msgUnit_          = new MessageUnit(queryKeyData, string(""), queryQosData);
00077    publishReturnQos_ = NULL;
00078    statusQosData_    = NULL;
00079    uniqueId_         = uniqueId;
00080    embeddedType_     = embeddedType;
00081    priority_         = priority; // should be maximum priority
00082    persistent_          = persistent; // currently no persistents supported
00083    logId_            = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_);
00084    memset(&blobHolder_, 0, sizeof(BlobHolder));
00085 }
00086 
00087 void MsgQueueEntry::copy(const MsgQueueEntry& entry)
00088 {
00089    connectQos_ = new ConnectQos(*entry.connectQos_);
00090 
00091    if (msgUnit_ != NULL) {
00092       delete msgUnit_;
00093       msgUnit_ = NULL;
00094    }
00095    if (entry.msgUnit_ != NULL) msgUnit_ = new org::xmlBlaster::util::MessageUnit(*entry.msgUnit_);
00096 
00097    connectReturnQos_ = new ConnectReturnQos(*entry.connectReturnQos_);
00098 
00099    if (publishReturnQos_ != NULL) {
00100       delete publishReturnQos_;
00101       publishReturnQos_ = NULL; 
00102    }
00103    if (entry.publishReturnQos_ != NULL) 
00104       publishReturnQos_ = new org::xmlBlaster::client::qos::PublishReturnQos(*entry.publishReturnQos_);
00105 
00106    if (statusQosData_ != NULL) {
00107       delete statusQosData_;
00108       statusQosData_ = NULL; 
00109    }
00110    if (entry.statusQosData_ != NULL) 
00111       statusQosData_ = new org::xmlBlaster::util::qos::StatusQosData(*entry.statusQosData_);
00112 
00113    uniqueId_     = entry.uniqueId_;
00114    embeddedType_ = entry.embeddedType_;
00115    priority_     = entry.priority_;
00116    persistent_      = entry.persistent_;
00117    logId_        = logId_;
00118 }
00119 
00120 
00121 MsgQueueEntry::~MsgQueueEntry()
00122 {
00123    delete msgUnit_;
00124    delete publishReturnQos_;
00125    delete statusQosData_;
00126 
00127    ::BlobHolder blob;
00128    blob.data    = blobHolder_.data;
00129    blob.dataLen = blobHolder_.dataLen;
00130    ::freeBlobHolderContent(&blob);
00131    memset(&blobHolder_, 0, sizeof(BlobHolder));
00132 }
00133 
00134 MsgQueueEntry::MsgQueueEntry(const MsgQueueEntry& entry)
00135    : ReferenceCounterBase(entry), ME(entry.ME), global_(entry.global_), log_(entry.log_),
00136      connectQos_((ConnectQos*)0),
00137      connectReturnQos_((ConnectReturnQos*)0)
00138 {
00139    memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
00140    msgUnit_          = NULL;
00141    publishReturnQos_ = NULL;
00142    statusQosData_    = NULL;
00143    copy(entry);
00144 }
00145 
00146 
00147 MsgQueueEntry& MsgQueueEntry::operator =(const MsgQueueEntry& entry)
00148 {
00149    ReferenceCounterBase::operator =(entry);
00150    memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
00151    copy(entry);
00152    return *this;
00153 }
00154 
00155 size_t MsgQueueEntry::getSizeInBytes() const
00156 {
00157    if (msgUnit_) return msgUnit_->getSizeInBytes();
00158    return 0;
00159 }
00160 
00161 int MsgQueueEntry::getPriority() const
00162 {
00163    return priority_;
00164 }
00165 
00166 bool MsgQueueEntry::isPersistent() const
00167 {
00168    return persistent_;
00169 }
00170 
00171 void MsgQueueEntry::setSender(org::xmlBlaster::util::SessionNameRef sender)
00172 {
00173    if (msgUnit_) {
00174       msgUnit_->getQos().setSender(sender);
00175    }
00176    //connectQos_
00177    //statusQosData_
00178 }
00179 
00180 Timestamp MsgQueueEntry::getUniqueId() const
00181 {
00182    return uniqueId_;
00183 }
00184 
00185 string MsgQueueEntry::getLogId() const
00186 {
00187    return logId_;
00188 }
00189 
00190 string MsgQueueEntry::getEmbeddedType() const
00191 {
00192    return embeddedType_;
00193 }
00194 
00195 bool MsgQueueEntry::isConnect() const {
00196    return false;
00197 }
00198 
00199 bool MsgQueueEntry::isPublish() const {
00200    return false; // set to true by derived class PublishQueueEntry
00201 }
00202 
00203 bool MsgQueueEntry::isSubscribe() const {
00204    return false;
00205 }
00206 
00207 bool MsgQueueEntry::isUnSubscribe() const {
00208    return false;
00209 }
00210 
00211 bool MsgQueueEntry::isErase() const {
00212    return false;
00213 }
00214 
00215 const void* MsgQueueEntry::getEmbeddedObject() const
00216 {
00217    if (log_.call()) log_.call(ME, string("getEmbeddedObject(") + embeddedType_ + ") ...");
00218    if (msgUnit_ == 0) {
00219       log_.error(ME, "getEmbeddedObject() with msgUnit == NULL");
00220       return 0;
00221    }
00222    //if (embeddedType_ != (org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::SUBSCRIBE)) // "MSG_RAW|subscribe"
00223    //   throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "getEmbeddedObject()", string("We only support embeddedType '") + org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::SUBSCRIBE + "'");
00224 
00225    if (blobHolder_.data != 0) // Cached
00226       return &blobHolder_;
00227 
00228    if (log_.dump()) log_.dump(ME+".getEmbeddedObject("+ embeddedType_ +")", string("C++ msgUnit=")+msgUnit_->toXml());
00229 
00230    // dump MsgQueueEntry->msgUnit_ with SOCKET protocol into C ::MsgUnit
00231    ::MsgUnit mu;
00232    memset(&mu, 0, sizeof(::MsgUnit));
00233    string keyXml = msgUnit_->getKey().toXml(); // We need the temporary string, as using .c_str() directly would lead to released memory of temporary string
00234    mu.key = keyXml.c_str();
00235    mu.contentLen = msgUnit_->getContentLen();
00236    mu.content = (char *)msgUnit_->getContent();
00237    string qosXml = msgUnit_->getQos().toXml();
00238    mu.qos = qosXml.c_str();
00239    mu.responseQos = (char*)0;
00240 
00241    if (log_.dump()) {
00242       char *p = ::messageUnitToXmlLimited(&mu, 100);
00243       log_.dump(ME+".getEmbeddedObject()", string("C msgUnit:") + p);
00244       ::xmlBlasterFree(p);
00245    }
00246 
00247    // Serialize the message identical to the SOCKET protocol serialization
00248    // We use the functionality from our xmlBlaster C library
00249    ::BlobHolder blob = ::encodeMsgUnit(&mu, 0);
00250 
00251    blobHolder_.data = blob.data;
00252    blobHolder_.dataLen = blob.dataLen;
00253 
00254    if (log_.dump()) {
00255       char *p = ::blobDump(&blob);
00256       log_.dump(ME+".getEmbeddedObject()", string("Putting entry into queue:") + p);
00257       ::freeBlobDump(p);
00258    }
00259 
00260    return &blobHolder_;
00261    //return queryKeyData_; // actually not used now otherwise we would need to return also the qos
00262 }
00263 
00264 string MsgQueueEntry::toXml(const string& /*indent*/) const
00265 {
00266    return "<notImplemented/>\n";
00267 }
00268 
00269 const MsgQueueEntry& MsgQueueEntry::send(I_ConnectionsHandler&) const
00270 {
00271    log_.error(ME, "send not implemented");
00272    return *this;
00273 }
00274 
00275 MessageUnit& MsgQueueEntry::getMsgUnit() const 
00276 {
00277    return *msgUnit_;
00278 }
00279 
00280 }}}} // namespace
00281