1 /*------------------------------------------------------------------------------
2 Name: MsgQueueEntry.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6
7 #include <util/queue/MsgQueueEntry.h>
8 #include <util/dispatch/I_ConnectionsHandler.h>
9 #include <util/Global.h>
10 #include <util/lexical_cast.h>
11 #include <util/qos/ConnectQos.h>
12 #include <client/qos/PublishReturnQos.h>
13 #include <cstddef> //<stddef.h>
14 #include <util/msgUtil.h> // from xmlBlaster C library
15 #include <socket/xmlBlasterSocket.h> // from xmlBlaster C library ::encodeMsgUnit(&msgUnit, debug);
16 #include <cstring> // memset()
17
18 /**
19 * Class embedding messages or information to be stored on the client queues
20 * Note that all content is copied when passed to the constructors.
21 * This way this queue entry is the owner of the content (and therefore will
22 * delete it when its destructor is called).
23 *
24 * @author <a href='mailto:michele@laghi.eu'>Michele Laghi</a>
25 */
26 namespace org { namespace xmlBlaster { namespace util { namespace queue {
27
28 using namespace std;
29 using namespace org::xmlBlaster::util;
30 using namespace org::xmlBlaster::util::key;
31 using namespace org::xmlBlaster::util::qos;
32 using namespace org::xmlBlaster::util::dispatch;
33 using namespace org::xmlBlaster::client::qos;
34
35 MsgQueueEntry::MsgQueueEntry(Global& global, const MessageUnit& msgUnit, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId)
36 : ReferenceCounterBase(),
37 ME("MsgQueueEntry"),
38 global_(global),
39 log_(global.getLog("org.xmlBlaster.util.queue")),
40 connectQos_((ConnectQos*)0),
41 connectReturnQos_((ConnectReturnQos*)0)
42 {
43 publishReturnQos_ = NULL;
44 msgUnit_ = new MessageUnit(msgUnit);
45 statusQosData_ = NULL;
46 uniqueId_ = uniqueId; //TimestampFactory::getInstance().getTimestamp();
47 embeddedType_ = embeddedType;
48 priority_ = priority; // should be normal priority
49 persistent_ = persistent; // currently no persistents supported
50 logId_ = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_);
51 memset(&blobHolder_, 0, sizeof(BlobHolder));
52 }
53
54 MsgQueueEntry::MsgQueueEntry(Global& global, const ConnectQosRef& connectQos, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId)
55 : ReferenceCounterBase(), ME("MsgQueueEntry"), global_(global), log_(global.getLog("org.xmlBlaster.util.queue")),
56 connectQos_(*connectQos), // OK to take a reference only???!!! Should we clone it so that RAM queue behaves same as persistent queue?
57 connectReturnQos_((ConnectReturnQos*)0)
58 {
59 msgUnit_ = NULL;
60 publishReturnQos_ = NULL;
61 statusQosData_ = NULL;
62 uniqueId_ = uniqueId;
63 embeddedType_ = embeddedType;
64 priority_ = priority; // should be maximum priority
65 persistent_ = persistent; // currently no persistents supported
66 logId_ = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_);
67 memset(&blobHolder_, 0, sizeof(BlobHolder));
68 }
69
70
71 MsgQueueEntry::MsgQueueEntry(Global& global, const QueryKeyData& queryKeyData, const QueryQosData& queryQosData, const string& embeddedType, int priority, bool persistent, Timestamp uniqueId)
72 : ReferenceCounterBase(), ME("MsgQueueEntry"), global_(global), log_(global.getLog("org.xmlBlaster.util.queue")),
73 connectQos_((ConnectQos*)0),
74 connectReturnQos_((ConnectReturnQos*)0)
75 {
76 // The MessageUnit takes a copy of the passed queryKeyData and queryQosData:
77 msgUnit_ = new MessageUnit(queryKeyData, string(""), queryQosData);
78 publishReturnQos_ = NULL;
79 statusQosData_ = NULL;
80 uniqueId_ = uniqueId;
81 embeddedType_ = embeddedType;
82 priority_ = priority; // should be maximum priority
83 persistent_ = persistent; // currently no persistents supported
84 logId_ = embeddedType_ + string(":") + lexical_cast<std::string>(uniqueId_);
85 memset(&blobHolder_, 0, sizeof(BlobHolder));
86 }
87
88 void MsgQueueEntry::copy(const MsgQueueEntry& entry)
89 {
90 connectQos_ = new ConnectQos(*entry.connectQos_);
91
92 if (msgUnit_ != NULL) {
93 delete msgUnit_;
94 msgUnit_ = NULL;
95 }
96 if (entry.msgUnit_ != NULL) msgUnit_ = new org::xmlBlaster::util::MessageUnit(*entry.msgUnit_);
97
98 connectReturnQos_ = new ConnectReturnQos(*entry.connectReturnQos_);
99
100 if (publishReturnQos_ != NULL) {
101 delete publishReturnQos_;
102 publishReturnQos_ = NULL;
103 }
104 if (entry.publishReturnQos_ != NULL)
105 publishReturnQos_ = new org::xmlBlaster::client::qos::PublishReturnQos(*entry.publishReturnQos_);
106
107 if (statusQosData_ != NULL) {
108 delete statusQosData_;
109 statusQosData_ = NULL;
110 }
111 if (entry.statusQosData_ != NULL)
112 statusQosData_ = new org::xmlBlaster::util::qos::StatusQosData(*entry.statusQosData_);
113
114 uniqueId_ = entry.uniqueId_;
115 embeddedType_ = entry.embeddedType_;
116 priority_ = entry.priority_;
117 persistent_ = entry.persistent_;
118 logId_ = logId_;
119 }
120
121
122 MsgQueueEntry::~MsgQueueEntry()
123 {
124 delete msgUnit_;
125 delete publishReturnQos_;
126 delete statusQosData_;
127
128 ::BlobHolder blob;
129 blob.data = blobHolder_.data;
130 blob.dataLen = blobHolder_.dataLen;
131 ::freeBlobHolderContent(&blob);
132 memset(&blobHolder_, 0, sizeof(BlobHolder));
133 }
134
135 MsgQueueEntry::MsgQueueEntry(const MsgQueueEntry& entry)
136 : ReferenceCounterBase(entry), ME(entry.ME), global_(entry.global_), log_(entry.log_),
137 connectQos_((ConnectQos*)0),
138 connectReturnQos_((ConnectReturnQos*)0)
139 {
140 memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
141 msgUnit_ = NULL;
142 publishReturnQos_ = NULL;
143 statusQosData_ = NULL;
144 copy(entry);
145 }
146
147
148 MsgQueueEntry& MsgQueueEntry::operator =(const MsgQueueEntry& entry)
149 {
150 ReferenceCounterBase::operator =(entry);
151 memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
152 copy(entry);
153 return *this;
154 }
155
156 size_t MsgQueueEntry::getSizeInBytes() const
157 {
158 if (msgUnit_) return msgUnit_->getSizeInBytes();
159 return 0;
160 }
161
162 int MsgQueueEntry::getPriority() const
163 {
164 return priority_;
165 }
166
167 bool MsgQueueEntry::isPersistent() const
168 {
169 return persistent_;
170 }
171
172 void MsgQueueEntry::setSender(org::xmlBlaster::util::SessionNameRef sender)
173 {
174 if (msgUnit_) {
175 msgUnit_->getQos().setSender(sender);
176 }
177 //connectQos_
178 //statusQosData_
179 }
180
181 Timestamp MsgQueueEntry::getUniqueId() const
182 {
183 return uniqueId_;
184 }
185
186 string MsgQueueEntry::getLogId() const
187 {
188 return logId_;
189 }
190
191 string MsgQueueEntry::getEmbeddedType() const
192 {
193 return embeddedType_;
194 }
195
196 string MsgQueueEntry::getMethodName() const
197 {
198 string::size_type pos = embeddedType_.find("|");
199 if (pos == string::npos) {
200 return org::xmlBlaster::util::MethodName::UNKNOWN;
201 }
202 if (pos < embeddedType_.size()) {
203 return embeddedType_.substr(pos+1);
204 }
205 return org::xmlBlaster::util::MethodName::UNKNOWN;
206 }
207
208
209 bool MsgQueueEntry::isConnect() const {
210 return false;
211 }
212
213 bool MsgQueueEntry::isPublish() const {
214 return false; // set to true by derived class PublishQueueEntry
215 }
216
217 bool MsgQueueEntry::isSubscribe() const {
218 return false;
219 }
220
221 bool MsgQueueEntry::isUnSubscribe() const {
222 return false;
223 }
224
225 bool MsgQueueEntry::isErase() const {
226 return false;
227 }
228
229 const void* MsgQueueEntry::getEmbeddedObject() const
230 {
231 if (log_.call()) log_.call(ME, string("getEmbeddedObject(") + embeddedType_ + ") ...");
232 if (msgUnit_ == 0) {
233 log_.error(ME, "getEmbeddedObject() with msgUnit == NULL");
234 return 0;
235 }
236 //if (embeddedType_ != (org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::SUBSCRIBE)) // "MSG_RAW|subscribe"
237 // throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "getEmbeddedObject()", string("We only support embeddedType '") + org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::SUBSCRIBE + "'");
238
239 if (blobHolder_.data != 0) // Cached
240 return &blobHolder_;
241
242 if (log_.dump()) log_.dump(ME+".getEmbeddedObject("+ embeddedType_ +")", string("C++ msgUnit=")+msgUnit_->toXml());
243
244 // dump MsgQueueEntry->msgUnit_ with SOCKET protocol into C ::MsgUnit
245 ::MsgUnit mu;
246 memset(&mu, 0, sizeof(::MsgUnit));
247 string keyXml = msgUnit_->getKey().toXml(); // We need the temporary string, as using .c_str() directly would lead to released memory of temporary string
248 mu.key = keyXml.c_str();
249 mu.contentLen = msgUnit_->getContentLen();
250 mu.content = (char *)msgUnit_->getContent();
251 string qosXml = msgUnit_->getQos().toXml();
252 mu.qos = qosXml.c_str();
253 mu.responseQos = (char*)0;
254
255 if (log_.dump()) {
256 char *p = ::messageUnitToXmlLimited(&mu, 100);
257 log_.dump(ME+".getEmbeddedObject()", string("C msgUnit:") + p);
258 ::xmlBlasterFree(p);
259 }
260
261 // Serialize the message identical to the SOCKET protocol serialization
262 // We use the functionality from our xmlBlaster C library
263 ::BlobHolder blob = ::encodeMsgUnit(&mu, 0);
264
265 blobHolder_.data = blob.data;
266 blobHolder_.dataLen = blob.dataLen;
267
268 if (log_.dump()) {
269 char *p = ::blobDump(&blob);
270 log_.dump(ME+".getEmbeddedObject()", string("Putting entry into queue:") + p);
271 ::freeBlobDump(p);
272 }
273
274 return &blobHolder_;
275 //return queryKeyData_; // actually not used now otherwise we would need to return also the qos
276 }
277
278 string MsgQueueEntry::toXml(const string& /*indent*/) const
279 {
280 return "<notImplemented/>\n";
281 }
282
283 const MsgQueueEntry& MsgQueueEntry::send(I_ConnectionsHandler&) const
284 {
285 log_.error(ME, "send not implemented");
286 return *this;
287 }
288
289 MessageUnit& MsgQueueEntry::getMsgUnit() const
290 {
291 return *msgUnit_;
292 }
293
294 }}}} // namespace
syntax highlighted by Code2HTML, v. 0.9.1