util/qos/MsgQosData.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      MsgQosData.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 ------------------------------------------------------------------------------*/
00006 
00031 #include <util/qos/MsgQosData.h>
00032 #include <limits.h>
00033 #include <util/lexical_cast.h>
00034 #include <util/Global.h>
00035 
00036 using namespace std;
00037 
00038 using namespace org::xmlBlaster::util;
00039 using namespace org::xmlBlaster::util::qos;
00040 using namespace org::xmlBlaster::util::cluster;
00041 
00042 namespace org { namespace xmlBlaster { namespace util { namespace qos {
00043 
00044 void MsgQosData::init()
00045 {
00046    ME = "MsgQosData";
00047    subscriptionId_ = "";
00048    subscribable_.setValue(global_.getProperty(), "isSubscribable"); // true;
00049    redeliver_ = 0;
00050    queueIndex_ = -1;
00051    queueSize_ = -1;
00052    forceUpdate_.setValue(global_.getProperty(), "forceUpdate");
00053    forceDestroy_.setValue(global_.getProperty(), "forceDestroy");
00054    lifeTime_ = -1;
00055    remainingLifeStatic_ = -1;
00056    isExpired_ = false; // cache the expired state for performance reasons
00057    maxLifeTime_ = global_.getProperty().getLongProperty("message.maxLifeTime", -1);
00058    receiveTimestampHumanReadable_ = global_.getProperty().getBoolProperty("cb.receiveTimestampHumanReadable", false);
00059    topicProperty_ = NULL; 
00060 }
00061 
00062 void MsgQosData::copy(const MsgQosData& data)
00063 {
00064    QosData::copy(data);
00065 
00066    subscriptionId_ = data.subscriptionId_;
00067    subscribable_ = data.subscribable_;
00068    redeliver_ = data.redeliver_;
00069    queueIndex_ = data.queueIndex_;
00070    queueSize_ = data.queueSize_;
00071    forceUpdate_= data.forceUpdate_;
00072    forceDestroy_ = data.forceDestroy_;
00073    lifeTime_ = data.lifeTime_;
00074    remainingLifeStatic_ = data.remainingLifeStatic_;
00075    isExpired_ = data.isExpired_;
00076    maxLifeTime_ = data.maxLifeTime_;
00077    receiveTimestampHumanReadable_ = data.receiveTimestampHumanReadable_;
00078    topicProperty_ = NULL;
00079    if (data.topicProperty_)
00080       topicProperty_ = new TopicProperty(*data.topicProperty_);
00081 }
00082 
00083 
00084 MsgQosData::MsgQosData(Global& global, const string& serialData)
00085    : QosData(global, serialData),
00086      subscribable_(Prop<bool>(DEFAULT_isSubscribable)),
00087      forceUpdate_(Prop<bool>(DEFAULT_forceUpdate)),
00088      forceDestroy_(Prop<bool>(DEFAULT_forceDestroy)),
00089      destinationList_()
00090 {
00091    init();
00092 }
00093 
00094 
00095 MsgQosData::MsgQosData(const MsgQosData& data)
00096    : QosData(data),
00097      destinationList_(data.destinationList_)
00098 {
00099    copy(data);
00100 }
00101 
00102 MsgQosData& MsgQosData::operator=(const MsgQosData& data)
00103 {
00104    QosData::copy(data);
00105    destinationList_ = data.destinationList_;
00106    copy(data);
00107    return *this;
00108 }
00109 
00110 
00111 MsgQosData::~MsgQosData()
00112 {
00113    delete topicProperty_;
00114 }
00115 
00119 void MsgQosData::setSubscribable(const bool isSubscribable)
00120 {
00121    subscribable_ = isSubscribable;
00122 }
00123 
00124 bool MsgQosData::isSubscribable() const
00125 {
00126    return subscribable_.getValue();
00127 }
00128 
00129 bool MsgQosData::isPtp() const
00130 {
00131    return !destinationList_.empty();
00132 }
00133 
00134 
00135 void MsgQosData::setReadonly(bool readonly)
00136 {
00137    if (topicProperty_ == NULL)
00138      topicProperty_ = new TopicProperty(global_);
00139    topicProperty_->setReadonly(readonly);
00140 }
00141 
00142 bool MsgQosData::isReadonly() const
00143 {
00144    if (topicProperty_ == NULL) return false;
00145    return topicProperty_->isReadonly();
00146 }
00147 
00151 void MsgQosData::setVolatile(bool volatileFlag)
00152 {
00153    if (volatileFlag) {
00154       setLifeTime(0L);
00155       setForceDestroy(false);
00156       setRemainingLifeStatic(0L); // not needed as server does set it
00157    }
00158 }
00159 
00163 bool MsgQosData::isVolatile() const
00164 {
00165    return getLifeTime()==0L && isForceDestroy()==false;
00166 }
00167 
00172 void MsgQosData::setSubscriptionId(const string& subscriptionId)
00173 {
00174    subscriptionId_ = subscriptionId;
00175 }
00176 
00181 string MsgQosData::getSubscriptionId() const
00182 {
00183    return subscriptionId_;
00184 }
00185 
00190 void MsgQosData::setForceUpdate(bool forceUpdate)
00191 {
00192    forceUpdate_.setValue(forceUpdate, CREATED_BY_SETTER);
00193 }
00194 
00198 bool MsgQosData::isForceUpdate() const
00199 {
00200    return forceUpdate_.getValue();
00201 }
00202 
00207 void MsgQosData::setRedeliver(int redeliver)
00208 {
00209    redeliver_ = redeliver;
00210 }
00211 
00215 void MsgQosData::incrRedeliver()
00216 {
00217    redeliver_++;
00218 }
00219 
00224 int MsgQosData::getRedeliver() const
00225 {
00226    return redeliver_;
00227 }
00228 
00232 void MsgQosData::setQueueSize(long queueSize)
00233 {
00234    queueSize_ = queueSize;
00235 }
00236 
00240 long MsgQosData::getQueueSize() const
00241 {
00242    return queueSize_;
00243 }
00244 
00248 void MsgQosData::setQueueIndex(long queueIndex)
00249 {
00250    queueIndex_ = queueIndex;
00251 }
00252 
00256 long MsgQosData::getQueueIndex() const
00257 {
00258    return queueIndex_;
00259 }
00260 
00264 long MsgQosData::getLifeTime() const
00265 {
00266    return lifeTime_;
00267 }
00268 
00272 void MsgQosData::setLifeTime(long lifeTime)
00273 {
00274    lifeTime_ = lifeTime;
00275 }
00276 
00281 long MsgQosData::getRemainingLife() const
00282 {
00283    if (lifeTime_ > 0 && lifeTime_ < LONG_MAX && getRcvTimestamp() != 0) {
00284       Timestamp now = TimestampFactory::getInstance().getTimestamp();
00285       long ttl = (long)((getRcvTimestamp()-now)/1000000l) + getLifeTime();
00286       if (ttl < 0) return 0;
00287       return ttl;
00288    }
00289    return -1;
00290 }
00291 
00300 long MsgQosData::getRemainingLifeStatic() const
00301 {
00302    return remainingLifeStatic_;
00303 }
00304 
00305 void MsgQosData::setRemainingLifeStatic(long remainingLifeStatic)
00306 {
00307    remainingLifeStatic_ = remainingLifeStatic;
00308 }
00309 
00313 bool MsgQosData::isExpired() const
00314 {
00315    if (lifeTime_ == LONG_MAX || lifeTime_ <= 0) {
00316       return false; // lifes forever
00317    }
00318    if (isExpired_) { // cache
00319       return true;
00320    }
00321    isExpired_ = (getRemainingLife() <= 0);
00322    return isExpired_;
00323 }
00324 
00330 long MsgQosData::getMaxLifeTime() const
00331 {
00332    return maxLifeTime_;
00333 }
00334 
00343 vector<Destination> MsgQosData::getDestinations() const
00344 {
00345    return destinationList_;
00346 }
00347 
00351 void MsgQosData::addDestination(const Destination& destination)
00352 {
00353    destinationList_.insert(destinationList_.end(), destination);
00354 }
00355 
00356 string MsgQosData::toXml(const string& extraOffset) const
00357 {
00358    return toXml(false, extraOffset);
00359 }
00360 
00361 string MsgQosData::toXml(bool clearText, const string& extraOffset) const
00362 {
00363    string ret;
00364 
00365    string offset = Constants::OFFSET + extraOffset;
00366    string extraOffset1 = extraOffset + Constants::INDENT;
00367 
00368    // WARNING: This dump must be valid, as it could be used by the
00369    //          persistent store
00370    ret += offset + "<qos>";
00371 
00372    if (!getState().empty() || !getStateInfo().empty()) {
00373       ret += offset + " <state id='" + getState();
00374       if (!getStateInfo().empty())
00375          ret += "' info='" + getStateInfo();
00376       ret += "'/>";
00377    }
00378 
00379    if (subscribable_.isModified())
00380       ret += offset + " <subscribable>" + Global::getBoolAsString(subscribable_.getValue()) + "</subscribable>";
00381 
00382    vector<Destination>::const_iterator iter = destinationList_.begin();
00383    while (iter != destinationList_.end()) {
00384       ret += (*iter).toXml(extraOffset1);
00385       iter++;
00386    }
00387 
00388    ret += offset + " <sender>" + sender_->getAbsoluteName() + "</sender>";
00389 
00390    if (NORM_PRIORITY != priority_)
00391       ret += offset + " <priority>" + lexical_cast<std::string>(priority_) + "</priority>";
00392 
00393    if (!subscriptionId_.empty())
00394       ret += offset + " <subscribe id='" + subscriptionId_ + "'/>";
00395 
00396    if (getLifeTime() > 0) {
00397       ret += offset + " <expiration lifeTime='" + lexical_cast<std::string>(getLifeTime());
00398       bool sendRemainingLife = true; // make it configurable !!!
00399       if (sendRemainingLife) {
00400          if (getRemainingLife() > 0)
00401             ret += "' remainingLife='" + lexical_cast<std::string>(getRemainingLife());
00402          else if (getRemainingLifeStatic() > 0)
00403             ret += "' remainingLife='" + lexical_cast<std::string>(getRemainingLifeStatic());
00404       }
00405       ret +=  "'/>";
00406    }
00407 
00408    if (getRcvTimestamp() != 0)
00409       ret += TimestampFactory::toXml(getRcvTimestamp(), extraOffset1, false);
00410    if(getQueueSize() > 0)
00411       ret += offset + " <queue index='" + lexical_cast<std::string>(getQueueIndex()) + "' size='" + lexical_cast<std::string>(getQueueSize()) + "'/>";
00412    if (getRedeliver() > 0)
00413       ret += offset + " <redeliver>" + lexical_cast<std::string>(getRedeliver()) + "</redeliver>";
00414    if (isPersistent())
00415       ret += offset + " <persistent/>";
00416    if (forceUpdate_.isModified())
00417       ret += offset + " <forceUpdate>" + lexical_cast<string>(forceUpdate_.getValue()) + "</forceUpdate>";
00418    if (forceDestroy_.isModified())
00419       ret += offset + " <forceDestroy>" + lexical_cast<string>(forceDestroy_.getValue()) + "</forceDestroy>";
00420 
00421    if (topicProperty_ != NULL) {
00422       ret += topicProperty_->toXml(extraOffset1);
00423    }
00424 
00425    RouteVector::const_iterator routeIter = routeNodeList_.begin();
00426    ret += offset + " <route>";
00427    while (routeIter != routeNodeList_.end()) {
00428       ret += (*routeIter).toXml(extraOffset1);
00429       routeIter++;
00430    }
00431    ret += offset + " </route>";
00432    ret += dumpClientProperties(extraOffset + Constants::INDENT, clearText);
00433    ret += offset + "</qos>";
00434 
00435    if (ret.length() < 16) return "";
00436 
00437    return ret;
00438 }
00439 
00440 MsgQosData* MsgQosData::getClone() const
00441 {
00442    return new MsgQosData(*this);
00443 }
00444 
00445 void MsgQosData::setTopicProperty(const TopicProperty& prop)
00446 {
00447    if (topicProperty_ != NULL) {
00448      delete topicProperty_;
00449      topicProperty_ = NULL;
00450    }
00451    topicProperty_ = new TopicProperty(prop);
00452 }
00453 
00454 TopicProperty MsgQosData::getTopicProperty()
00455 {
00456    if (topicProperty_ != NULL) return *topicProperty_;
00457    topicProperty_ = new TopicProperty(global_);
00458    return *topicProperty_;
00459 }
00460 
00461 bool MsgQosData::hasTopicProperty() const
00462 {
00463    return (topicProperty_ != NULL);
00464 }
00465 
00466 
00467 bool MsgQosData::isForceDestroy() const
00468 {
00469    return forceDestroy_.getValue();
00470 }
00471 
00472 void MsgQosData::setForceDestroy(bool forceDestroy)
00473 {
00474    forceDestroy_.setValue(forceDestroy, CREATED_BY_SETTER);
00475 }
00476 
00477 }}}}
00478