1 /*------------------------------------------------------------------------------
2 Name: MsgQosData.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6
7 /**
8 * Data container handling of publish() and update() quality of services.
9 * <p />
10 * QoS Informations sent from the client to the server via the publish() method and back via the update() method<br />
11 * They are needed to control xmlBlaster and inform the client.
12 * <p />
13 * <p>
14 * This data holder is accessible through 4 decorators, each of them allowing a specialized view on the data:
15 * </p>
16 * <ul>
17 * <li>PublishQosServer Server side access</i>
18 * <li>PublishQos Client side access</i>
19 * <li>UpdateQosServer Server side access facade</i>
20 * <li>UpdateQos Client side access facade</i>
21 * </ul>
22 * <p>
23 * For the xml representation see MsgQosSaxFactory.
24 * </p>
25 * @see org.xmlBlaster.util.qos.MsgQosSaxFactory
26 * @see org.xmlBlaster.test.classtest.qos.MsgQosFactoryTest
27 * @author xmlBlaster@marcelruff.info
28 * @author michele@laghi.eu
29 */
30
31 #include <util/qos/MsgQosData.h>
32 #include <limits.h>
33 #include <util/lexical_cast.h>
34 #include <util/Global.h>
35
36 using namespace std;
37
38 using namespace org::xmlBlaster::util;
39 using namespace org::xmlBlaster::util::qos;
40 using namespace org::xmlBlaster::util::cluster;
41
42 namespace org { namespace xmlBlaster { namespace util { namespace qos {
43
44 void MsgQosData::init()
45 {
46 ME = "MsgQosData";
47 subscriptionId_ = "";
48 subscribable_.setValue(global_.getProperty(), "isSubscribable"); // true;
49 redeliver_ = 0;
50 queueIndex_ = -1;
51 queueSize_ = -1;
52 forceUpdate_.setValue(global_.getProperty(), "forceUpdate");
53 forceDestroy_.setValue(global_.getProperty(), "forceDestroy");
54 lifeTime_ = -1;
55 remainingLifeStatic_ = -1;
56 isExpired_ = false; // cache the expired state for performance reasons
57 administrative_ = false;
58 maxLifeTime_ = global_.getProperty().getLongProperty("message.maxLifeTime", -1);
59 receiveTimestampHumanReadable_ = global_.getProperty().getBoolProperty("cb.receiveTimestampHumanReadable", false);
60 topicProperty_ = NULL;
61 }
62
63 void MsgQosData::copy(const MsgQosData& data)
64 {
65 QosData::copy(data);
66
67 subscriptionId_ = data.subscriptionId_;
68 subscribable_ = data.subscribable_;
69 redeliver_ = data.redeliver_;
70 queueIndex_ = data.queueIndex_;
71 queueSize_ = data.queueSize_;
72 forceUpdate_= data.forceUpdate_;
73 forceDestroy_ = data.forceDestroy_;
74 lifeTime_ = data.lifeTime_;
75 remainingLifeStatic_ = data.remainingLifeStatic_;
76 isExpired_ = data.isExpired_;
77 administrative_ = data.administrative_;
78 maxLifeTime_ = data.maxLifeTime_;
79 receiveTimestampHumanReadable_ = data.receiveTimestampHumanReadable_;
80 topicProperty_ = NULL;
81 if (data.topicProperty_)
82 topicProperty_ = new TopicProperty(*data.topicProperty_);
83 }
84
85
86 MsgQosData::MsgQosData(Global& global, const string& serialData)
87 : QosData(global, serialData),
88 subscribable_(Prop<bool>(DEFAULT_isSubscribable)),
89 forceUpdate_(Prop<bool>(DEFAULT_forceUpdate)),
90 forceDestroy_(Prop<bool>(DEFAULT_forceDestroy)),
91 destinationList_()
92 {
93 init();
94 }
95
96
97 MsgQosData::MsgQosData(const MsgQosData& data)
98 : QosData(data),
99 destinationList_(data.destinationList_)
100 {
101 copy(data);
102 }
103
104 MsgQosData& MsgQosData::operator=(const MsgQosData& data)
105 {
106 QosData::copy(data);
107 destinationList_ = data.destinationList_;
108 copy(data);
109 return *this;
110 }
111
112
113 MsgQosData::~MsgQosData()
114 {
115 delete topicProperty_;
116 }
117
118 /**
119 * @param isSubscribable if false PtP messages are invisible for subscriptions
120 */
121 void MsgQosData::setSubscribable(const bool isSubscribable)
122 {
123 subscribable_ = isSubscribable;
124 }
125
126 bool MsgQosData::isSubscribable() const
127 {
128 return subscribable_.getValue();
129 }
130
131 bool MsgQosData::isPtp() const
132 {
133 return !destinationList_.empty();
134 }
135
136
137 void MsgQosData::setReadonly(bool readonly)
138 {
139 if (topicProperty_ == NULL)
140 topicProperty_ = new TopicProperty(global_);
141 topicProperty_->setReadonly(readonly);
142 }
143
144 bool MsgQosData::isReadonly() const
145 {
146 if (topicProperty_ == NULL) return false;
147 return topicProperty_->isReadonly();
148 }
149
150 /**
151 * @param volatile true/false
152 */
153 void MsgQosData::setVolatile(bool volatileFlag)
154 {
155 if (volatileFlag) {
156 setLifeTime(0L);
157 setForceDestroy(false);
158 setRemainingLifeStatic(0L); // not needed as server does set it
159 }
160 }
161
162 /**
163 * @return true/false
164 */
165 bool MsgQosData::isVolatile() const
166 {
167 return getLifeTime()==0L && isForceDestroy()==false;
168 }
169
170 void MsgQosData::setAdministrative(bool administrative)
171 {
172 administrative_ = administrative;
173 }
174
175 bool MsgQosData::isAdministrative() const
176 {
177 return administrative_;
178 }
179
180 /**
181 * If Pub/Sub style update: contains the subscribe ID which caused this update
182 * @param subscriptionId null if PtP message
183 */
184 void MsgQosData::setSubscriptionId(const string& subscriptionId)
185 {
186 subscriptionId_ = subscriptionId;
187 }
188
189 /**
190 * If Pub/Sub style update: contains the subscribe ID which caused this update
191 * @return subscribeId or null if PtP message
192 */
193 string MsgQosData::getSubscriptionId() const
194 {
195 return subscriptionId_;
196 }
197
198 /**
199 * Send message to subscriber even if the content is the same as the previous.
200 * @param forceUpdate true update identical messages
201 */
202 void MsgQosData::setForceUpdate(bool forceUpdate)
203 {
204 forceUpdate_.setValue(forceUpdate, CREATED_BY_SETTER);
205 }
206
207 /**
208 * @return true/false
209 */
210 bool MsgQosData::isForceUpdate() const
211 {
212 return forceUpdate_.getValue();
213 }
214
215 /**
216 * Set > 0 if the message probably is redelivered (number of retries).
217 * @param redeliver if == 0 The message is guaranteed to be delivered only once.
218 */
219 void MsgQosData::setRedeliver(int redeliver)
220 {
221 redeliver_ = redeliver;
222 }
223
224 /**
225 * Increment the redeliver counter
226 */
227 void MsgQosData::incrRedeliver()
228 {
229 redeliver_++;
230 }
231
232 /**
233 * Returns > 0 if the message probably is redelivered.
234 * @return == 0 The message is guaranteed to be delivered only once.
235 */
236 int MsgQosData::getRedeliver() const
237 {
238 return redeliver_;
239 }
240
241 /**
242 * @param queueSize The number of queued messages
243 */
244 void MsgQosData::setQueueSize(long queueSize)
245 {
246 queueSize_ = queueSize;
247 }
248
249 /**
250 * @return The number of queued messages
251 */
252 long MsgQosData::getQueueSize() const
253 {
254 return queueSize_;
255 }
256
257 /**
258 * @param queueIndex The index of the message in the queue
259 */
260 void MsgQosData::setQueueIndex(long queueIndex)
261 {
262 queueIndex_ = queueIndex;
263 }
264
265 /**
266 * @return The index of the message in the queue
267 */
268 long MsgQosData::getQueueIndex() const
269 {
270 return queueIndex_;
271 }
272
273 /**
274 * The life time of the message or -1L if forever
275 */
276 long MsgQosData::getLifeTime() const
277 {
278 return lifeTime_;
279 }
280
281 /**
282 * The life time of the message or -1L if forever
283 */
284 void MsgQosData::setLifeTime(long lifeTime)
285 {
286 lifeTime_ = lifeTime;
287 }
288
289 /**
290 * @return Milliseconds until message expiration (from now) or -1L if forever
291 * if 0L the message is expired
292 */
293 long MsgQosData::getRemainingLife() const
294 {
295 if (lifeTime_ > 0 && lifeTime_ < LONG_MAX && getRcvTimestamp() != 0) {
296 Timestamp now = TimestampFactory::getInstance().getTimestamp();
297 long ttl = (long)((getRcvTimestamp()-now)/1000000l) + getLifeTime();
298 if (ttl < 0) return 0;
299 return ttl;
300 }
301 return -1;
302 }
303
304 /**
305 * This is the value delivered in the QoS (as it was calculated by the server on sending)
306 * and is NOT dynamically recalculated.
307 * So trust this value only if your client clock is out of date (or not trusted) and
308 * if you know the message sending latency is not too big.
309 * @return Milliseconds until message expiration (from now) or -1L if forever
310 * if 0L the message is expired
311 */
312 long MsgQosData::getRemainingLifeStatic() const
313 {
314 return remainingLifeStatic_;
315 }
316
317 void MsgQosData::setRemainingLifeStatic(long remainingLifeStatic)
318 {
319 remainingLifeStatic_ = remainingLifeStatic;
320 }
321
322 /**
323 * Calculates if we are expired
324 */
325 bool MsgQosData::isExpired() const
326 {
327 if (lifeTime_ == LONG_MAX || lifeTime_ <= 0) {
328 return false; // lifes forever
329 }
330 if (isExpired_) { // cache
331 return true;
332 }
333 isExpired_ = (getRemainingLife() <= 0);
334 return isExpired_;
335 }
336
337 /**
338 * The server default for max. span of life,
339 * adjustable with property "message.maxLifeTime"
340 * @return max span of life for a message
341 */
342 long MsgQosData::getMaxLifeTime() const
343 {
344 return maxLifeTime_;
345 }
346
347 /**
348 * Get all the destinations of this message.
349 * This should only be used with PTP style messaging<br />
350 * Check <code>if (isPtp()) ...</code> before calling this method
351 *
352 * @return a valid ArrayList containing 0 - n strings with destination names (loginName of clients)<br />
353 * null if Publish/Subscribe style is used
354 */
355 vector<Destination> MsgQosData::getDestinations() const
356 {
357 return destinationList_;
358 }
359
360 /**
361 * Add a destination.
362 */
363 void MsgQosData::addDestination(const Destination& destination)
364 {
365 destinationList_.insert(destinationList_.end(), destination);
366 }
367
368 string MsgQosData::toXml(const string& extraOffset) const
369 {
370 return toXml(false, extraOffset);
371 }
372
373 string MsgQosData::toXml(bool clearText, const string& extraOffset) const
374 {
375 string ret;
376
377 string offset = Constants::OFFSET + extraOffset;
378 string extraOffset1 = extraOffset + Constants::INDENT;
379
380 // WARNING: This dump must be valid, as it could be used by the
381 // persistent store
382 ret += offset + "<qos>";
383
384 if (!getState().empty() || !getStateInfo().empty()) {
385 ret += offset + " <state id='" + getState();
386 if (!getStateInfo().empty())
387 ret += "' info='" + getStateInfo();
388 ret += "'/>";
389 }
390
391 if (subscribable_.isModified())
392 ret += offset + " <subscribable>" + Global::getBoolAsString(subscribable_.getValue()) + "</subscribable>";
393
394 vector<Destination>::const_iterator iter = destinationList_.begin();
395 while (iter != destinationList_.end()) {
396 ret += (*iter).toXml(extraOffset1);
397 iter++;
398 }
399
400 ret += offset + " <sender>" + sender_->getAbsoluteName() + "</sender>";
401
402 if (NORM_PRIORITY != priority_)
403 ret += offset + " <priority>" + lexical_cast<std::string>(priority_) + "</priority>";
404
405 if (administrative_)
406 ret += offset + " <administrative>" + lexical_cast<std::string>(administrative_) + "</administrative>";
407
408 if (!subscriptionId_.empty())
409 ret += offset + " <subscribe id='" + subscriptionId_ + "'/>";
410
411 if (getLifeTime() > 0) {
412 ret += offset + " <expiration lifeTime='" + lexical_cast<std::string>(getLifeTime());
413 bool sendRemainingLife = true; // make it configurable !!!
414 if (sendRemainingLife) {
415 if (getRemainingLife() > 0)
416 ret += "' remainingLife='" + lexical_cast<std::string>(getRemainingLife());
417 else if (getRemainingLifeStatic() > 0)
418 ret += "' remainingLife='" + lexical_cast<std::string>(getRemainingLifeStatic());
419 }
420 ret += "'/>";
421 }
422
423 if (getRcvTimestamp() != 0)
424 ret += TimestampFactory::toXml(getRcvTimestamp(), extraOffset1, false);
425 if(getQueueSize() > 0)
426 ret += offset + " <queue index='" + lexical_cast<std::string>(getQueueIndex()) + "' size='" + lexical_cast<std::string>(getQueueSize()) + "'/>";
427 if (getRedeliver() > 0)
428 ret += offset + " <redeliver>" + lexical_cast<std::string>(getRedeliver()) + "</redeliver>";
429 if (isPersistent())
430 ret += offset + " <persistent/>";
431 if (forceUpdate_.isModified())
432 ret += offset + " <forceUpdate>" + lexical_cast<string>(forceUpdate_.getValue()) + "</forceUpdate>";
433 if (forceDestroy_.isModified())
434 ret += offset + " <forceDestroy>" + lexical_cast<string>(forceDestroy_.getValue()) + "</forceDestroy>";
435
436 if (topicProperty_ != NULL) {
437 ret += topicProperty_->toXml(extraOffset1);
438 }
439
440 RouteVector::const_iterator routeIter = routeNodeList_.begin();
441 ret += offset + " <route>";
442 while (routeIter != routeNodeList_.end()) {
443 ret += (*routeIter).toXml(extraOffset1);
444 routeIter++;
445 }
446 ret += offset + " </route>";
447 ret += dumpClientProperties(extraOffset + Constants::INDENT, clearText);
448 ret += offset + "</qos>";
449
450 if (ret.length() < 16) return "";
451
452 return ret;
453 }
454
455 MsgQosData* MsgQosData::getClone() const
456 {
457 return new MsgQosData(*this);
458 }
459
460 void MsgQosData::setTopicProperty(const TopicProperty& prop)
461 {
462 if (topicProperty_ != NULL) {
463 delete topicProperty_;
464 topicProperty_ = NULL;
465 }
466 topicProperty_ = new TopicProperty(prop);
467 }
468
469 TopicProperty MsgQosData::getTopicProperty()
470 {
471 if (topicProperty_ != NULL) return *topicProperty_;
472 topicProperty_ = new TopicProperty(global_);
473 return *topicProperty_;
474 }
475
476 bool MsgQosData::hasTopicProperty() const
477 {
478 return (topicProperty_ != NULL);
479 }
480
481
482 bool MsgQosData::isForceDestroy() const
483 {
484 return forceDestroy_.getValue();
485 }
486
487 void MsgQosData::setForceDestroy(bool forceDestroy)
488 {
489 forceDestroy_.setValue(forceDestroy, CREATED_BY_SETTER);
490 }
491
492 }}}}
syntax highlighted by Code2HTML, v. 0.9.1