1 /*------------------------------------------------------------------------------
  2 Name:      MsgQosData.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 
  7 /**
  8  * Data container handling of publish() and update() quality of services. 
  9  * <p />
 10  * QoS Informations sent from the client to the server via the publish() method and back via the update() method<br />
 11  * They are needed to control xmlBlaster and inform the client.
 12  * <p />
 13  * <p>
 14  * This data holder is accessible through 4 decorators, each of them allowing a specialized view on the data:
 15  * </p>
 16  * <ul>
 17  * <li>PublishQosServer Server side access</i>
 18  * <li>PublishQos Client side access</i>
 19  * <li>UpdateQosServer Server side access facade</i>
 20  * <li>UpdateQos Client side access facade</i>
 21  * </ul>
 22  * <p>
 23  * For the xml representation see MsgQosSaxFactory.
 24  * </p>
 25  * @see org.xmlBlaster.util.qos.MsgQosSaxFactory
 26  * @see org.xmlBlaster.test.classtest.qos.MsgQosFactoryTest
 27  * @author xmlBlaster@marcelruff.info
 28  * @author michele@laghi.eu
 29  */
 30 
 31 #include <util/qos/MsgQosData.h>
 32 #include <limits.h>
 33 #include <util/lexical_cast.h>
 34 #include <util/Global.h>
 35 
 36 using namespace std;
 37 
 38 using namespace org::xmlBlaster::util;
 39 using namespace org::xmlBlaster::util::qos;
 40 using namespace org::xmlBlaster::util::cluster;
 41 
 42 namespace org { namespace xmlBlaster { namespace util { namespace qos {
 43 
 44 void MsgQosData::init()
 45 {
 46    ME = "MsgQosData";
 47    subscriptionId_ = "";
 48    subscribable_.setValue(global_.getProperty(), "isSubscribable"); // true;
 49    redeliver_ = 0;
 50    queueIndex_ = -1;
 51    queueSize_ = -1;
 52    forceUpdate_.setValue(global_.getProperty(), "forceUpdate");
 53    forceDestroy_.setValue(global_.getProperty(), "forceDestroy");
 54    lifeTime_ = -1;
 55    remainingLifeStatic_ = -1;
 56    isExpired_ = false; // cache the expired state for performance reasons
 57    administrative_ = false;
 58    maxLifeTime_ = global_.getProperty().getLongProperty("message.maxLifeTime", -1);
 59    receiveTimestampHumanReadable_ = global_.getProperty().getBoolProperty("cb.receiveTimestampHumanReadable", false);
 60    topicProperty_ = NULL; 
 61 }
 62 
 63 void MsgQosData::copy(const MsgQosData& data)
 64 {
 65    QosData::copy(data);
 66 
 67    subscriptionId_ = data.subscriptionId_;
 68    subscribable_ = data.subscribable_;
 69    redeliver_ = data.redeliver_;
 70    queueIndex_ = data.queueIndex_;
 71    queueSize_ = data.queueSize_;
 72    forceUpdate_= data.forceUpdate_;
 73    forceDestroy_ = data.forceDestroy_;
 74    lifeTime_ = data.lifeTime_;
 75    remainingLifeStatic_ = data.remainingLifeStatic_;
 76    isExpired_ = data.isExpired_;
 77    administrative_ = data.administrative_;
 78    maxLifeTime_ = data.maxLifeTime_;
 79    receiveTimestampHumanReadable_ = data.receiveTimestampHumanReadable_;
 80    topicProperty_ = NULL;
 81    if (data.topicProperty_)
 82       topicProperty_ = new TopicProperty(*data.topicProperty_);
 83 }
 84 
 85 
 86 MsgQosData::MsgQosData(Global& global, const string& serialData)
 87    : QosData(global, serialData),
 88      subscribable_(Prop<bool>(DEFAULT_isSubscribable)),
 89      forceUpdate_(Prop<bool>(DEFAULT_forceUpdate)),
 90      forceDestroy_(Prop<bool>(DEFAULT_forceDestroy)),
 91      destinationList_()
 92 {
 93    init();
 94 }
 95 
 96 
 97 MsgQosData::MsgQosData(const MsgQosData& data)
 98    : QosData(data),
 99      destinationList_(data.destinationList_)
100 {
101    copy(data);
102 }
103 
104 MsgQosData& MsgQosData::operator=(const MsgQosData& data)
105 {
106    QosData::copy(data);
107    destinationList_ = data.destinationList_;
108    copy(data);
109    return *this;
110 }
111 
112 
113 MsgQosData::~MsgQosData()
114 {
115    delete topicProperty_;
116 }
117 
118 /**
119  * @param isSubscribable if false PtP messages are invisible for subscriptions
120  */
121 void MsgQosData::setSubscribable(const bool isSubscribable)
122 {
123    subscribable_ = isSubscribable;
124 }
125 
126 bool MsgQosData::isSubscribable() const
127 {
128    return subscribable_.getValue();
129 }
130 
131 bool MsgQosData::isPtp() const
132 {
133    return !destinationList_.empty();
134 }
135 
136 
137 void MsgQosData::setReadonly(bool readonly)
138 {
139    if (topicProperty_ == NULL)
140      topicProperty_ = new TopicProperty(global_);
141    topicProperty_->setReadonly(readonly);
142 }
143 
144 bool MsgQosData::isReadonly() const
145 {
146    if (topicProperty_ == NULL) return false;
147    return topicProperty_->isReadonly();
148 }
149 
150 /**
151  * @param volatile true/false
152  */
153 void MsgQosData::setVolatile(bool volatileFlag)
154 {
155    if (volatileFlag) {
156       setLifeTime(0L);
157       setForceDestroy(false);
158       setRemainingLifeStatic(0L); // not needed as server does set it
159    }
160 }
161 
162 /**
163  * @return true/false
164  */
165 bool MsgQosData::isVolatile() const
166 {
167    return getLifeTime()==0L && isForceDestroy()==false;
168 }
169 
170 void MsgQosData::setAdministrative(bool administrative)
171 {
172    administrative_ = administrative;
173 }
174 
175 bool MsgQosData::isAdministrative() const
176 {
177    return administrative_;
178 }
179 
180 /**
181  * If Pub/Sub style update: contains the subscribe ID which caused this update
182  * @param subscriptionId null if PtP message
183  */
184 void MsgQosData::setSubscriptionId(const string& subscriptionId)
185 {
186    subscriptionId_ = subscriptionId;
187 }
188 
189 /**
190  * If Pub/Sub style update: contains the subscribe ID which caused this update
191  * @return subscribeId or null if PtP message
192  */
193 string MsgQosData::getSubscriptionId() const
194 {
195    return subscriptionId_;
196 }
197 
198 /**
199  * Send message to subscriber even if the content is the same as the previous. 
200  * @param forceUpdate true update identical messages
201  */
202 void MsgQosData::setForceUpdate(bool forceUpdate)
203 {
204    forceUpdate_.setValue(forceUpdate, CREATED_BY_SETTER);
205 }
206 
207 /**
208  * @return true/false
209  */
210 bool MsgQosData::isForceUpdate() const
211 {
212    return forceUpdate_.getValue();
213 }
214 
215 /**
216  * Set > 0 if the message probably is redelivered (number of retries). 
217  * @param redeliver if == 0 The message is guaranteed to be delivered only once.
218  */
219 void MsgQosData::setRedeliver(int redeliver)
220 {
221    redeliver_ = redeliver;
222 }
223 
224 /**
225  * Increment the redeliver counter
226  */
227 void MsgQosData::incrRedeliver()
228 {
229    redeliver_++;
230 }
231 
232 /**
233  * Returns > 0 if the message probably is redelivered. 
234  * @return == 0 The message is guaranteed to be delivered only once.
235  */
236 int MsgQosData::getRedeliver() const
237 {
238    return redeliver_;
239 }
240 
241 /**
242  * @param queueSize The number of queued messages
243  */
244 void MsgQosData::setQueueSize(long queueSize)
245 {
246    queueSize_ = queueSize;
247 }
248 
249  /**
250  * @return The number of queued messages
251  */
252 long MsgQosData::getQueueSize() const
253 {
254    return queueSize_;
255 }
256 
257 /**
258  * @param queueIndex The index of the message in the queue
259  */
260 void MsgQosData::setQueueIndex(long queueIndex)
261 {
262    queueIndex_ = queueIndex;
263 }
264 
265 /**
266  * @return The index of the message in the queue
267  */
268 long MsgQosData::getQueueIndex() const
269 {
270    return queueIndex_;
271 }
272 
273 /**
274  * The life time of the message or -1L if forever
275  */
276 long MsgQosData::getLifeTime() const
277 {
278    return lifeTime_;
279 }
280 
281 /**
282  * The life time of the message or -1L if forever
283  */
284 void MsgQosData::setLifeTime(long lifeTime)
285 {
286    lifeTime_ = lifeTime;
287 }
288 
289 /**
290  * @return Milliseconds until message expiration (from now) or -1L if forever
291  *         if 0L the message is expired
292  */
293 long MsgQosData::getRemainingLife() const
294 {
295    if (lifeTime_ > 0 && lifeTime_ < LONG_MAX && getRcvTimestamp() != 0) {
296       Timestamp now = TimestampFactory::getInstance().getTimestamp();
297       long ttl = (long)((getRcvTimestamp()-now)/1000000l) + getLifeTime();
298       if (ttl < 0) return 0;
299       return ttl;
300    }
301    return -1;
302 }
303 
304 /**
305  * This is the value delivered in the QoS (as it was calculated by the server on sending)
306  * and is NOT dynamically recalculated.
307  * So trust this value only if your client clock is out of date (or not trusted) and
308  * if you know the message sending latency is not too big.
309  * @return Milliseconds until message expiration (from now) or -1L if forever
310  *         if 0L the message is expired
311  */
312 long MsgQosData::getRemainingLifeStatic() const
313 {
314    return remainingLifeStatic_;
315 }
316 
317 void MsgQosData::setRemainingLifeStatic(long remainingLifeStatic)
318 {
319    remainingLifeStatic_ = remainingLifeStatic;
320 }
321 
322 /**
323  * Calculates if we are expired
324  */
325 bool MsgQosData::isExpired() const
326 {
327    if (lifeTime_ == LONG_MAX || lifeTime_ <= 0) {
328       return false; // lifes forever
329    }
330    if (isExpired_) { // cache
331       return true;
332    }
333    isExpired_ = (getRemainingLife() <= 0);
334    return isExpired_;
335 }
336 
337 /**
338  * The server default for max. span of life,
339  * adjustable with property "message.maxLifeTime"
340  * @return max span of life for a message
341  */
342 long MsgQosData::getMaxLifeTime() const
343 {
344    return maxLifeTime_;
345 }
346 
347 /**
348  * Get all the destinations of this message.
349  * This should only be used with PTP style messaging<br />
350  * Check <code>if (isPtp()) ...</code> before calling this method
351  *
352  * @return a valid ArrayList containing 0 - n strings with destination names (loginName of clients)<br />
353  *         null if Publish/Subscribe style is used
354  */
355 vector<Destination> MsgQosData::getDestinations() const
356 {
357    return destinationList_;
358 }
359 
360 /**
361  * Add a destination. 
362  */
363 void MsgQosData::addDestination(const Destination& destination)
364 {
365    destinationList_.insert(destinationList_.end(), destination);
366 }
367 
368 string MsgQosData::toXml(const string& extraOffset) const
369 {
370    return toXml(false, extraOffset);
371 }
372 
373 string MsgQosData::toXml(bool clearText, const string& extraOffset) const
374 {
375    string ret;
376 
377    string offset = Constants::OFFSET + extraOffset;
378    string extraOffset1 = extraOffset + Constants::INDENT;
379 
380    // WARNING: This dump must be valid, as it could be used by the
381    //          persistent store
382    ret += offset + "<qos>";
383 
384    if (!getState().empty() || !getStateInfo().empty()) {
385       ret += offset + " <state id='" + getState();
386       if (!getStateInfo().empty())
387          ret += "' info='" + getStateInfo();
388       ret += "'/>";
389    }
390 
391    if (subscribable_.isModified())
392       ret += offset + " <subscribable>" + Global::getBoolAsString(subscribable_.getValue()) + "</subscribable>";
393 
394    vector<Destination>::const_iterator iter = destinationList_.begin();
395    while (iter != destinationList_.end()) {
396       ret += (*iter).toXml(extraOffset1);
397       iter++;
398    }
399 
400    ret += offset + " <sender>" + sender_->getAbsoluteName() + "</sender>";
401 
402    if (NORM_PRIORITY != priority_)
403       ret += offset + " <priority>" + lexical_cast<std::string>(priority_) + "</priority>";
404 
405    if (administrative_)
406       ret += offset + " <administrative>" + lexical_cast<std::string>(administrative_) + "</administrative>";
407 
408    if (!subscriptionId_.empty())
409       ret += offset + " <subscribe id='" + subscriptionId_ + "'/>";
410 
411    if (getLifeTime() > 0) {
412       ret += offset + " <expiration lifeTime='" + lexical_cast<std::string>(getLifeTime());
413       bool sendRemainingLife = true; // make it configurable !!!
414       if (sendRemainingLife) {
415          if (getRemainingLife() > 0)
416             ret += "' remainingLife='" + lexical_cast<std::string>(getRemainingLife());
417          else if (getRemainingLifeStatic() > 0)
418             ret += "' remainingLife='" + lexical_cast<std::string>(getRemainingLifeStatic());
419       }
420       ret +=  "'/>";
421    }
422 
423    if (getRcvTimestamp() != 0)
424       ret += TimestampFactory::toXml(getRcvTimestamp(), extraOffset1, false);
425    if(getQueueSize() > 0)
426       ret += offset + " <queue index='" + lexical_cast<std::string>(getQueueIndex()) + "' size='" + lexical_cast<std::string>(getQueueSize()) + "'/>";
427    if (getRedeliver() > 0)
428       ret += offset + " <redeliver>" + lexical_cast<std::string>(getRedeliver()) + "</redeliver>";
429    if (isPersistent())
430       ret += offset + " <persistent/>";
431    if (forceUpdate_.isModified())
432       ret += offset + " <forceUpdate>" + lexical_cast<string>(forceUpdate_.getValue()) + "</forceUpdate>";
433    if (forceDestroy_.isModified())
434       ret += offset + " <forceDestroy>" + lexical_cast<string>(forceDestroy_.getValue()) + "</forceDestroy>";
435 
436    if (topicProperty_ != NULL) {
437       ret += topicProperty_->toXml(extraOffset1);
438    }
439 
440    RouteVector::const_iterator routeIter = routeNodeList_.begin();
441    ret += offset + " <route>";
442    while (routeIter != routeNodeList_.end()) {
443       ret += (*routeIter).toXml(extraOffset1);
444       routeIter++;
445    }
446    ret += offset + " </route>";
447    ret += dumpClientProperties(extraOffset + Constants::INDENT, clearText);
448    ret += offset + "</qos>";
449 
450    if (ret.length() < 16) return "";
451 
452    return ret;
453 }
454 
455 MsgQosData* MsgQosData::getClone() const
456 {
457    return new MsgQosData(*this);
458 }
459 
460 void MsgQosData::setTopicProperty(const TopicProperty& prop)
461 {
462    if (topicProperty_ != NULL) {
463      delete topicProperty_;
464      topicProperty_ = NULL;
465    }
466    topicProperty_ = new TopicProperty(prop);
467 }
468 
469 TopicProperty MsgQosData::getTopicProperty()
470 {
471    if (topicProperty_ != NULL) return *topicProperty_;
472    topicProperty_ = new TopicProperty(global_);
473    return *topicProperty_;
474 }
475 
476 bool MsgQosData::hasTopicProperty() const
477 {
478    return (topicProperty_ != NULL);
479 }
480 
481 
482 bool MsgQosData::isForceDestroy() const
483 {
484    return forceDestroy_.getValue();
485 }
486 
487 void MsgQosData::setForceDestroy(bool forceDestroy)
488 {
489    forceDestroy_.setValue(forceDestroy, CREATED_BY_SETTER);
490 }
491 
492 }}}}


syntax highlighted by Code2HTML, v. 0.9.1