00001 /*------------------------------------------------------------------------------ 00002 Name: MsgQueueEntry.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 ------------------------------------------------------------------------------*/ 00006 00007 #include <util/queue/MsgQueueEntry.h> 00008 #include <util/dispatch/I_ConnectionsHandler.h> 00009 #include <util/Global.h> 00010 #include <util/lexical_cast.h> 00011 #include <util/qos/ConnectQos.h> 00012 #include <client/qos/PublishReturnQos.h> 00013 #include <cstddef> //<stddef.h> 00014 #include <util/msgUtil.h> // from xmlBlaster C library 00015 #include <socket/xmlBlasterSocket.h> // from xmlBlaster C library ::encodeMsgUnit(&msgUnit, debug); 00016 00025 namespace org { namespace xmlBlaster { namespace util { namespace queue { 00026 00027 using namespace std; 00028 using namespace org::xmlBlaster::util; 00029 using namespace org::xmlBlaster::util::key; 00030 using namespace org::xmlBlaster::util::qos; 00031 using namespace org::xmlBlaster::util::dispatch; 00032 using namespace org::xmlBlaster::client::qos; 00033 00034 MsgQueueEntry::MsgQueueEntry(Global& global, const MessageUnit& msgUnit, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId) 00035 : ReferenceCounterBase(), 00036 ME("MsgQueueEntry"), 00037 global_(global), 00038 log_(global.getLog("org.xmlBlaster.util.queue")), 00039 connectQos_((ConnectQos*)0), 00040 connectReturnQos_((ConnectReturnQos*)0) 00041 { 00042 publishReturnQos_ = NULL; 00043 msgUnit_ = new MessageUnit(msgUnit); 00044 statusQosData_ = NULL; 00045 uniqueId_ = uniqueId; //TimestampFactory::getInstance().getTimestamp(); 00046 embeddedType_ = embeddedType; 00047 priority_ = priority; // should be normal priority 00048 persistent_ = persistent; // currently no persistents supported 00049 logId_ = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_); 00050 memset(&blobHolder_, 0, sizeof(BlobHolder)); 00051 } 00052 00053 MsgQueueEntry::MsgQueueEntry(Global& global, const ConnectQosRef& connectQos, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId) 00054 : ReferenceCounterBase(), ME("MsgQueueEntry"), global_(global), log_(global.getLog("org.xmlBlaster.util.queue")), 00055 connectQos_(*connectQos), // OK to take a reference only???!!! Should we clone it so that RAM queue behaves same as persistent queue? 00056 connectReturnQos_((ConnectReturnQos*)0) 00057 { 00058 msgUnit_ = NULL; 00059 publishReturnQos_ = NULL; 00060 statusQosData_ = NULL; 00061 uniqueId_ = uniqueId; 00062 embeddedType_ = embeddedType; 00063 priority_ = priority; // should be maximum priority 00064 persistent_ = persistent; // currently no persistents supported 00065 logId_ = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_); 00066 memset(&blobHolder_, 0, sizeof(BlobHolder)); 00067 } 00068 00069 00070 MsgQueueEntry::MsgQueueEntry(Global& global, const QueryKeyData& queryKeyData, const QueryQosData& queryQosData, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId) 00071 : ReferenceCounterBase(), ME("MsgQueueEntry"), global_(global), log_(global.getLog("org.xmlBlaster.util.queue")), 00072 connectQos_((ConnectQos*)0), 00073 connectReturnQos_((ConnectReturnQos*)0) 00074 { 00075 // The MessageUnit takes a copy of the passed queryKeyData and queryQosData: 00076 msgUnit_ = new MessageUnit(queryKeyData, string(""), queryQosData); 00077 publishReturnQos_ = NULL; 00078 statusQosData_ = NULL; 00079 uniqueId_ = uniqueId; 00080 embeddedType_ = embeddedType; 00081 priority_ = priority; // should be maximum priority 00082 persistent_ = persistent; // currently no persistents supported 00083 logId_ = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_); 00084 memset(&blobHolder_, 0, sizeof(BlobHolder)); 00085 } 00086 00087 void MsgQueueEntry::copy(const MsgQueueEntry& entry) 00088 { 00089 connectQos_ = new ConnectQos(*entry.connectQos_); 00090 00091 if (msgUnit_ != NULL) { 00092 delete msgUnit_; 00093 msgUnit_ = NULL; 00094 } 00095 if (entry.msgUnit_ != NULL) msgUnit_ = new org::xmlBlaster::util::MessageUnit(*entry.msgUnit_); 00096 00097 connectReturnQos_ = new ConnectReturnQos(*entry.connectReturnQos_); 00098 00099 if (publishReturnQos_ != NULL) { 00100 delete publishReturnQos_; 00101 publishReturnQos_ = NULL; 00102 } 00103 if (entry.publishReturnQos_ != NULL) 00104 publishReturnQos_ = new org::xmlBlaster::client::qos::PublishReturnQos(*entry.publishReturnQos_); 00105 00106 if (statusQosData_ != NULL) { 00107 delete statusQosData_; 00108 statusQosData_ = NULL; 00109 } 00110 if (entry.statusQosData_ != NULL) 00111 statusQosData_ = new org::xmlBlaster::util::qos::StatusQosData(*entry.statusQosData_); 00112 00113 uniqueId_ = entry.uniqueId_; 00114 embeddedType_ = entry.embeddedType_; 00115 priority_ = entry.priority_; 00116 persistent_ = entry.persistent_; 00117 logId_ = logId_; 00118 } 00119 00120 00121 MsgQueueEntry::~MsgQueueEntry() 00122 { 00123 delete msgUnit_; 00124 delete publishReturnQos_; 00125 delete statusQosData_; 00126 00127 ::BlobHolder blob; 00128 blob.data = blobHolder_.data; 00129 blob.dataLen = blobHolder_.dataLen; 00130 ::freeBlobHolderContent(&blob); 00131 memset(&blobHolder_, 0, sizeof(BlobHolder)); 00132 } 00133 00134 MsgQueueEntry::MsgQueueEntry(const MsgQueueEntry& entry) 00135 : ReferenceCounterBase(entry), ME(entry.ME), global_(entry.global_), log_(entry.log_), 00136 connectQos_((ConnectQos*)0), 00137 connectReturnQos_((ConnectReturnQos*)0) 00138 { 00139 memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache 00140 msgUnit_ = NULL; 00141 publishReturnQos_ = NULL; 00142 statusQosData_ = NULL; 00143 copy(entry); 00144 } 00145 00146 00147 MsgQueueEntry& MsgQueueEntry::operator =(const MsgQueueEntry& entry) 00148 { 00149 ReferenceCounterBase::operator =(entry); 00150 memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache 00151 copy(entry); 00152 return *this; 00153 } 00154 00155 size_t MsgQueueEntry::getSizeInBytes() const 00156 { 00157 if (msgUnit_) return msgUnit_->getSizeInBytes(); 00158 return 0; 00159 } 00160 00161 int MsgQueueEntry::getPriority() const 00162 { 00163 return priority_; 00164 } 00165 00166 bool MsgQueueEntry::isPersistent() const 00167 { 00168 return persistent_; 00169 } 00170 00171 void MsgQueueEntry::setSender(org::xmlBlaster::util::SessionNameRef sender) 00172 { 00173 if (msgUnit_) { 00174 msgUnit_->getQos().setSender(sender); 00175 } 00176 //connectQos_ 00177 //statusQosData_ 00178 } 00179 00180 Timestamp MsgQueueEntry::getUniqueId() const 00181 { 00182 return uniqueId_; 00183 } 00184 00185 string MsgQueueEntry::getLogId() const 00186 { 00187 return logId_; 00188 } 00189 00190 string MsgQueueEntry::getEmbeddedType() const 00191 { 00192 return embeddedType_; 00193 } 00194 00195 bool MsgQueueEntry::isConnect() const { 00196 return false; 00197 } 00198 00199 bool MsgQueueEntry::isPublish() const { 00200 return false; // set to true by derived class PublishQueueEntry 00201 } 00202 00203 bool MsgQueueEntry::isSubscribe() const { 00204 return false; 00205 } 00206 00207 bool MsgQueueEntry::isUnSubscribe() const { 00208 return false; 00209 } 00210 00211 bool MsgQueueEntry::isErase() const { 00212 return false; 00213 } 00214 00215 const void* MsgQueueEntry::getEmbeddedObject() const 00216 { 00217 if (log_.call()) log_.call(ME, string("getEmbeddedObject(") + embeddedType_ + ") ..."); 00218 if (msgUnit_ == 0) { 00219 log_.error(ME, "getEmbeddedObject() with msgUnit == NULL"); 00220 return 0; 00221 } 00222 //if (embeddedType_ != (org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::SUBSCRIBE)) // "MSG_RAW|subscribe" 00223 // throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "getEmbeddedObject()", string("We only support embeddedType '") + org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::SUBSCRIBE + "'"); 00224 00225 if (blobHolder_.data != 0) // Cached 00226 return &blobHolder_; 00227 00228 if (log_.dump()) log_.dump(ME+".getEmbeddedObject("+ embeddedType_ +")", string("C++ msgUnit=")+msgUnit_->toXml()); 00229 00230 // dump MsgQueueEntry->msgUnit_ with SOCKET protocol into C ::MsgUnit 00231 ::MsgUnit mu; 00232 memset(&mu, 0, sizeof(::MsgUnit)); 00233 string keyXml = msgUnit_->getKey().toXml(); // We need the temporary string, as using .c_str() directly would lead to released memory of temporary string 00234 mu.key = keyXml.c_str(); 00235 mu.contentLen = msgUnit_->getContentLen(); 00236 mu.content = (char *)msgUnit_->getContent(); 00237 string qosXml = msgUnit_->getQos().toXml(); 00238 mu.qos = qosXml.c_str(); 00239 mu.responseQos = (char*)0; 00240 00241 if (log_.dump()) { 00242 char *p = ::messageUnitToXmlLimited(&mu, 100); 00243 log_.dump(ME+".getEmbeddedObject()", string("C msgUnit:") + p); 00244 ::xmlBlasterFree(p); 00245 } 00246 00247 // Serialize the message identical to the SOCKET protocol serialization 00248 // We use the functionality from our xmlBlaster C library 00249 ::BlobHolder blob = ::encodeMsgUnit(&mu, 0); 00250 00251 blobHolder_.data = blob.data; 00252 blobHolder_.dataLen = blob.dataLen; 00253 00254 if (log_.dump()) { 00255 char *p = ::blobDump(&blob); 00256 log_.dump(ME+".getEmbeddedObject()", string("Putting entry into queue:") + p); 00257 ::freeBlobDump(p); 00258 } 00259 00260 return &blobHolder_; 00261 //return queryKeyData_; // actually not used now otherwise we would need to return also the qos 00262 } 00263 00264 string MsgQueueEntry::toXml(const string& /*indent*/) const 00265 { 00266 return "<notImplemented/>\n"; 00267 } 00268 00269 const MsgQueueEntry& MsgQueueEntry::send(I_ConnectionsHandler&) const 00270 { 00271 log_.error(ME, "send not implemented"); 00272 return *this; 00273 } 00274 00275 MessageUnit& MsgQueueEntry::getMsgUnit() const 00276 { 00277 return *msgUnit_; 00278 } 00279 00280 }}}} // namespace 00281