1 /*------------------------------------------------------------------------------
  2 Name:      MsgQueueEntry.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 
  7 #include <util/queue/MsgQueueEntry.h>
  8 #include <util/dispatch/I_ConnectionsHandler.h>
  9 #include <util/Global.h>
 10 #include <util/lexical_cast.h>
 11 #include <util/qos/ConnectQos.h>
 12 #include <client/qos/PublishReturnQos.h>
 13 #include <cstddef> //<stddef.h>
 14 #include <util/msgUtil.h> // from xmlBlaster C library
 15 #include <socket/xmlBlasterSocket.h> // from xmlBlaster C library ::encodeMsgUnit(&msgUnit, debug);
 16 #include <cstring> // memset()
 17 
 18 /**
 19  * Class embedding messages or information to be stored on the client queues
 20  * Note that all content is copied when passed to the constructors.
 21  * This way this queue entry is the owner of the content (and therefore will
 22  * delete it when its destructor is called).
 23  *
 24  * @author <a href='mailto:michele@laghi.eu'>Michele Laghi</a>
 25  */
 26 namespace org { namespace xmlBlaster { namespace util { namespace queue {
 27 
 28 using namespace std;
 29 using namespace org::xmlBlaster::util;
 30 using namespace org::xmlBlaster::util::key;
 31 using namespace org::xmlBlaster::util::qos;
 32 using namespace org::xmlBlaster::util::dispatch;
 33 using namespace org::xmlBlaster::client::qos;
 34 
 35 MsgQueueEntry::MsgQueueEntry(Global& global, const MessageUnit& msgUnit, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId)
 36    : ReferenceCounterBase(), 
 37      ME("MsgQueueEntry"), 
 38      global_(global), 
 39      log_(global.getLog("org.xmlBlaster.util.queue")),
 40      connectQos_((ConnectQos*)0),
 41      connectReturnQos_((ConnectReturnQos*)0)
 42 {
 43    publishReturnQos_ = NULL;
 44    msgUnit_          = new MessageUnit(msgUnit);
 45    statusQosData_    = NULL;
 46    uniqueId_         = uniqueId; //TimestampFactory::getInstance().getTimestamp();
 47    embeddedType_     = embeddedType;
 48    priority_         = priority; // should be normal priority
 49    persistent_       = persistent; // currently no persistents supported
 50    logId_            = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_);
 51    memset(&blobHolder_, 0, sizeof(BlobHolder));
 52 }
 53 
 54 MsgQueueEntry::MsgQueueEntry(Global& global, const ConnectQosRef& connectQos, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId)
 55    : ReferenceCounterBase(), ME("MsgQueueEntry"), global_(global), log_(global.getLog("org.xmlBlaster.util.queue")),
 56      connectQos_(*connectQos),  // OK to take a reference only???!!! Should we clone it so that RAM queue behaves same as persistent queue?
 57      connectReturnQos_((ConnectReturnQos*)0)
 58 {
 59    msgUnit_          = NULL;
 60    publishReturnQos_ = NULL;
 61    statusQosData_    = NULL;
 62    uniqueId_         = uniqueId;
 63    embeddedType_     = embeddedType;
 64    priority_         = priority; // should be maximum priority
 65    persistent_       = persistent; // currently no persistents supported
 66    logId_            = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_);
 67    memset(&blobHolder_, 0, sizeof(BlobHolder));
 68 }
 69 
 70 
 71 MsgQueueEntry::MsgQueueEntry(Global& global, const QueryKeyData& queryKeyData, const QueryQosData& queryQosData, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId)
 72    : ReferenceCounterBase(), ME("MsgQueueEntry"), global_(global), log_(global.getLog("org.xmlBlaster.util.queue")),
 73      connectQos_((ConnectQos*)0),
 74      connectReturnQos_((ConnectReturnQos*)0)
 75 {
 76    // The MessageUnit takes a copy of the passed queryKeyData and queryQosData:
 77    msgUnit_          = new MessageUnit(queryKeyData, string(""), queryQosData);
 78    publishReturnQos_ = NULL;
 79    statusQosData_    = NULL;
 80    uniqueId_         = uniqueId;
 81    embeddedType_     = embeddedType;
 82    priority_         = priority; // should be maximum priority
 83    persistent_          = persistent; // currently no persistents supported
 84    logId_            = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_);
 85    memset(&blobHolder_, 0, sizeof(BlobHolder));
 86 }
 87 
 88 void MsgQueueEntry::copy(const MsgQueueEntry& entry)
 89 {
 90    connectQos_ = new ConnectQos(*entry.connectQos_);
 91 
 92    if (msgUnit_ != NULL) {
 93       delete msgUnit_;
 94       msgUnit_ = NULL;
 95    }
 96    if (entry.msgUnit_ != NULL) msgUnit_ = new org::xmlBlaster::util::MessageUnit(*entry.msgUnit_);
 97 
 98    connectReturnQos_ = new ConnectReturnQos(*entry.connectReturnQos_);
 99 
100    if (publishReturnQos_ != NULL) {
101       delete publishReturnQos_;
102       publishReturnQos_ = NULL; 
103    }
104    if (entry.publishReturnQos_ != NULL) 
105       publishReturnQos_ = new org::xmlBlaster::client::qos::PublishReturnQos(*entry.publishReturnQos_);
106 
107    if (statusQosData_ != NULL) {
108       delete statusQosData_;
109       statusQosData_ = NULL; 
110    }
111    if (entry.statusQosData_ != NULL) 
112       statusQosData_ = new org::xmlBlaster::util::qos::StatusQosData(*entry.statusQosData_);
113 
114    uniqueId_     = entry.uniqueId_;
115    embeddedType_ = entry.embeddedType_;
116    priority_     = entry.priority_;
117    persistent_      = entry.persistent_;
118    logId_        = logId_;
119 }
120 
121 
122 MsgQueueEntry::~MsgQueueEntry()
123 {
124    delete msgUnit_;
125    delete publishReturnQos_;
126    delete statusQosData_;
127 
128    ::BlobHolder blob;
129    blob.data    = blobHolder_.data;
130    blob.dataLen = blobHolder_.dataLen;
131    ::freeBlobHolderContent(&blob);
132    memset(&blobHolder_, 0, sizeof(BlobHolder));
133 }
134 
135 MsgQueueEntry::MsgQueueEntry(const MsgQueueEntry& entry)
136    : ReferenceCounterBase(entry), ME(entry.ME), global_(entry.global_), log_(entry.log_),
137      connectQos_((ConnectQos*)0),
138      connectReturnQos_((ConnectReturnQos*)0)
139 {
140    memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
141    msgUnit_          = NULL;
142    publishReturnQos_ = NULL;
143    statusQosData_    = NULL;
144    copy(entry);
145 }
146 
147 
148 MsgQueueEntry& MsgQueueEntry::operator =(const MsgQueueEntry& entry)
149 {
150    ReferenceCounterBase::operator =(entry);
151    memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
152    copy(entry);
153    return *this;
154 }
155 
156 size_t MsgQueueEntry::getSizeInBytes() const
157 {
158    if (msgUnit_) return msgUnit_->getSizeInBytes();
159    return 0;
160 }
161 
162 int MsgQueueEntry::getPriority() const
163 {
164    return priority_;
165 }
166 
167 bool MsgQueueEntry::isPersistent() const
168 {
169    return persistent_;
170 }
171 
172 void MsgQueueEntry::setSender(org::xmlBlaster::util::SessionNameRef sender)
173 {
174    if (msgUnit_) {
175       msgUnit_->getQos().setSender(sender);
176    }
177    //connectQos_
178    //statusQosData_
179 }
180 
181 Timestamp MsgQueueEntry::getUniqueId() const
182 {
183    return uniqueId_;
184 }
185 
186 string MsgQueueEntry::getLogId() const
187 {
188    return logId_;
189 }
190 
191 string MsgQueueEntry::getEmbeddedType() const
192 {
193    return embeddedType_;
194 }
195 
196 string MsgQueueEntry::getMethodName() const
197 {
198    string::size_type pos = embeddedType_.find("|");
199    if (pos == string::npos) {
200       return org::xmlBlaster::util::MethodName::UNKNOWN;
201    }
202    if (pos < embeddedType_.size()) {
203       return embeddedType_.substr(pos+1);
204    }
205    return org::xmlBlaster::util::MethodName::UNKNOWN;
206 }
207 
208 
209 bool MsgQueueEntry::isConnect() const {
210         return false;
211 }
212 
213 bool MsgQueueEntry::isPublish() const {
214         return false; // set to true by derived class PublishQueueEntry
215 }
216 
217 bool MsgQueueEntry::isSubscribe() const {
218         return false;
219 }
220 
221 bool MsgQueueEntry::isUnSubscribe() const {
222         return false;
223 }
224 
225 bool MsgQueueEntry::isErase() const {
226         return false;
227 }
228 
229 const void* MsgQueueEntry::getEmbeddedObject() const
230 {
231    if (log_.call()) log_.call(ME, string("getEmbeddedObject(") + embeddedType_ + ") ...");
232    if (msgUnit_ == 0) {
233       log_.error(ME, "getEmbeddedObject() with msgUnit == NULL");
234       return 0;
235    }
236    //if (embeddedType_ != (org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::SUBSCRIBE)) // "MSG_RAW|subscribe"
237    //   throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "getEmbeddedObject()", string("We only support embeddedType '") + org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::SUBSCRIBE + "'");
238 
239    if (blobHolder_.data != 0) // Cached
240       return &blobHolder_;
241 
242    if (log_.dump()) log_.dump(ME+".getEmbeddedObject("+ embeddedType_ +")", string("C++ msgUnit=")+msgUnit_->toXml());
243 
244    // dump MsgQueueEntry->msgUnit_ with SOCKET protocol into C ::MsgUnit
245    ::MsgUnit mu;
246    memset(&mu, 0, sizeof(::MsgUnit));
247    string keyXml = msgUnit_->getKey().toXml(); // We need the temporary string, as using .c_str() directly would lead to released memory of temporary string
248    mu.key = keyXml.c_str();
249    mu.contentLen = msgUnit_->getContentLen();
250    mu.content = (char *)msgUnit_->getContent();
251    string qosXml = msgUnit_->getQos().toXml();
252    mu.qos = qosXml.c_str();
253    mu.responseQos = (char*)0;
254 
255    if (log_.dump()) {
256       char *p = ::messageUnitToXmlLimited(&mu, 100);
257       log_.dump(ME+".getEmbeddedObject()", string("C msgUnit:") + p);
258       ::xmlBlasterFree(p);
259    }
260 
261    // Serialize the message identical to the SOCKET protocol serialization
262    // We use the functionality from our xmlBlaster C library
263    ::BlobHolder blob = ::encodeMsgUnit(&mu, 0);
264 
265    blobHolder_.data = blob.data;
266    blobHolder_.dataLen = blob.dataLen;
267 
268    if (log_.dump()) {
269       char *p = ::blobDump(&blob);
270       log_.dump(ME+".getEmbeddedObject()", string("Putting entry into queue:") + p);
271       ::freeBlobDump(p);
272    }
273 
274    return &blobHolder_;
275    //return queryKeyData_; // actually not used now otherwise we would need to return also the qos
276 }
277 
278 string MsgQueueEntry::toXml(const string& /*indent*/) const
279 {
280    return "<notImplemented/>\n";
281 }
282 
283 const MsgQueueEntry& MsgQueueEntry::send(I_ConnectionsHandler&) const
284 {
285    log_.error(ME, "send not implemented");
286    return *this;
287 }
288 
289 MessageUnit& MsgQueueEntry::getMsgUnit() const 
290 {
291    return *msgUnit_;
292 }
293 
294 }}}} // namespace


syntax highlighted by Code2HTML, v. 0.9.1