00001 /*------------------------------------------------------------------------------ 00002 Name: MsgQosData.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 ------------------------------------------------------------------------------*/ 00006 00031 #include <util/qos/MsgQosData.h> 00032 #include <limits.h> 00033 #include <util/lexical_cast.h> 00034 #include <util/Global.h> 00035 00036 using namespace std; 00037 00038 using namespace org::xmlBlaster::util; 00039 using namespace org::xmlBlaster::util::qos; 00040 using namespace org::xmlBlaster::util::cluster; 00041 00042 namespace org { namespace xmlBlaster { namespace util { namespace qos { 00043 00044 void MsgQosData::init() 00045 { 00046 ME = "MsgQosData"; 00047 subscriptionId_ = ""; 00048 subscribable_.setValue(global_.getProperty(), "isSubscribable"); // true; 00049 redeliver_ = 0; 00050 queueIndex_ = -1; 00051 queueSize_ = -1; 00052 forceUpdate_.setValue(global_.getProperty(), "forceUpdate"); 00053 forceDestroy_.setValue(global_.getProperty(), "forceDestroy"); 00054 lifeTime_ = -1; 00055 remainingLifeStatic_ = -1; 00056 isExpired_ = false; // cache the expired state for performance reasons 00057 maxLifeTime_ = global_.getProperty().getLongProperty("message.maxLifeTime", -1); 00058 receiveTimestampHumanReadable_ = global_.getProperty().getBoolProperty("cb.receiveTimestampHumanReadable", false); 00059 topicProperty_ = NULL; 00060 } 00061 00062 void MsgQosData::copy(const MsgQosData& data) 00063 { 00064 QosData::copy(data); 00065 00066 subscriptionId_ = data.subscriptionId_; 00067 subscribable_ = data.subscribable_; 00068 redeliver_ = data.redeliver_; 00069 queueIndex_ = data.queueIndex_; 00070 queueSize_ = data.queueSize_; 00071 forceUpdate_= data.forceUpdate_; 00072 forceDestroy_ = data.forceDestroy_; 00073 lifeTime_ = data.lifeTime_; 00074 remainingLifeStatic_ = data.remainingLifeStatic_; 00075 isExpired_ = data.isExpired_; 00076 maxLifeTime_ = data.maxLifeTime_; 00077 receiveTimestampHumanReadable_ = data.receiveTimestampHumanReadable_; 00078 topicProperty_ = NULL; 00079 if (data.topicProperty_) 00080 topicProperty_ = new TopicProperty(*data.topicProperty_); 00081 } 00082 00083 00084 MsgQosData::MsgQosData(Global& global, const string& serialData) 00085 : QosData(global, serialData), 00086 subscribable_(Prop<bool>(DEFAULT_isSubscribable)), 00087 forceUpdate_(Prop<bool>(DEFAULT_forceUpdate)), 00088 forceDestroy_(Prop<bool>(DEFAULT_forceDestroy)), 00089 destinationList_() 00090 { 00091 init(); 00092 } 00093 00094 00095 MsgQosData::MsgQosData(const MsgQosData& data) 00096 : QosData(data), 00097 destinationList_(data.destinationList_) 00098 { 00099 copy(data); 00100 } 00101 00102 MsgQosData& MsgQosData::operator=(const MsgQosData& data) 00103 { 00104 QosData::copy(data); 00105 destinationList_ = data.destinationList_; 00106 copy(data); 00107 return *this; 00108 } 00109 00110 00111 MsgQosData::~MsgQosData() 00112 { 00113 delete topicProperty_; 00114 } 00115 00119 void MsgQosData::setSubscribable(const bool isSubscribable) 00120 { 00121 subscribable_ = isSubscribable; 00122 } 00123 00124 bool MsgQosData::isSubscribable() const 00125 { 00126 return subscribable_.getValue(); 00127 } 00128 00129 bool MsgQosData::isPtp() const 00130 { 00131 return !destinationList_.empty(); 00132 } 00133 00134 00135 void MsgQosData::setReadonly(bool readonly) 00136 { 00137 if (topicProperty_ == NULL) 00138 topicProperty_ = new TopicProperty(global_); 00139 topicProperty_->setReadonly(readonly); 00140 } 00141 00142 bool MsgQosData::isReadonly() const 00143 { 00144 if (topicProperty_ == NULL) return false; 00145 return topicProperty_->isReadonly(); 00146 } 00147 00151 void MsgQosData::setVolatile(bool volatileFlag) 00152 { 00153 if (volatileFlag) { 00154 setLifeTime(0L); 00155 setForceDestroy(false); 00156 setRemainingLifeStatic(0L); // not needed as server does set it 00157 } 00158 } 00159 00163 bool MsgQosData::isVolatile() const 00164 { 00165 return getLifeTime()==0L && isForceDestroy()==false; 00166 } 00167 00172 void MsgQosData::setSubscriptionId(const string& subscriptionId) 00173 { 00174 subscriptionId_ = subscriptionId; 00175 } 00176 00181 string MsgQosData::getSubscriptionId() const 00182 { 00183 return subscriptionId_; 00184 } 00185 00190 void MsgQosData::setForceUpdate(bool forceUpdate) 00191 { 00192 forceUpdate_.setValue(forceUpdate, CREATED_BY_SETTER); 00193 } 00194 00198 bool MsgQosData::isForceUpdate() const 00199 { 00200 return forceUpdate_.getValue(); 00201 } 00202 00207 void MsgQosData::setRedeliver(int redeliver) 00208 { 00209 redeliver_ = redeliver; 00210 } 00211 00215 void MsgQosData::incrRedeliver() 00216 { 00217 redeliver_++; 00218 } 00219 00224 int MsgQosData::getRedeliver() const 00225 { 00226 return redeliver_; 00227 } 00228 00232 void MsgQosData::setQueueSize(long queueSize) 00233 { 00234 queueSize_ = queueSize; 00235 } 00236 00240 long MsgQosData::getQueueSize() const 00241 { 00242 return queueSize_; 00243 } 00244 00248 void MsgQosData::setQueueIndex(long queueIndex) 00249 { 00250 queueIndex_ = queueIndex; 00251 } 00252 00256 long MsgQosData::getQueueIndex() const 00257 { 00258 return queueIndex_; 00259 } 00260 00264 long MsgQosData::getLifeTime() const 00265 { 00266 return lifeTime_; 00267 } 00268 00272 void MsgQosData::setLifeTime(long lifeTime) 00273 { 00274 lifeTime_ = lifeTime; 00275 } 00276 00281 long MsgQosData::getRemainingLife() const 00282 { 00283 if (lifeTime_ > 0 && lifeTime_ < LONG_MAX && getRcvTimestamp() != 0) { 00284 Timestamp now = TimestampFactory::getInstance().getTimestamp(); 00285 long ttl = (long)((getRcvTimestamp()-now)/1000000l) + getLifeTime(); 00286 if (ttl < 0) return 0; 00287 return ttl; 00288 } 00289 return -1; 00290 } 00291 00300 long MsgQosData::getRemainingLifeStatic() const 00301 { 00302 return remainingLifeStatic_; 00303 } 00304 00305 void MsgQosData::setRemainingLifeStatic(long remainingLifeStatic) 00306 { 00307 remainingLifeStatic_ = remainingLifeStatic; 00308 } 00309 00313 bool MsgQosData::isExpired() const 00314 { 00315 if (lifeTime_ == LONG_MAX || lifeTime_ <= 0) { 00316 return false; // lifes forever 00317 } 00318 if (isExpired_) { // cache 00319 return true; 00320 } 00321 isExpired_ = (getRemainingLife() <= 0); 00322 return isExpired_; 00323 } 00324 00330 long MsgQosData::getMaxLifeTime() const 00331 { 00332 return maxLifeTime_; 00333 } 00334 00343 vector<Destination> MsgQosData::getDestinations() const 00344 { 00345 return destinationList_; 00346 } 00347 00351 void MsgQosData::addDestination(const Destination& destination) 00352 { 00353 destinationList_.insert(destinationList_.end(), destination); 00354 } 00355 00356 string MsgQosData::toXml(const string& extraOffset) const 00357 { 00358 return toXml(false, extraOffset); 00359 } 00360 00361 string MsgQosData::toXml(bool clearText, const string& extraOffset) const 00362 { 00363 string ret; 00364 00365 string offset = Constants::OFFSET + extraOffset; 00366 string extraOffset1 = extraOffset + Constants::INDENT; 00367 00368 // WARNING: This dump must be valid, as it could be used by the 00369 // persistent store 00370 ret += offset + "<qos>"; 00371 00372 if (!getState().empty() || !getStateInfo().empty()) { 00373 ret += offset + " <state id='" + getState(); 00374 if (!getStateInfo().empty()) 00375 ret += "' info='" + getStateInfo(); 00376 ret += "'/>"; 00377 } 00378 00379 if (subscribable_.isModified()) 00380 ret += offset + " <subscribable>" + Global::getBoolAsString(subscribable_.getValue()) + "</subscribable>"; 00381 00382 vector<Destination>::const_iterator iter = destinationList_.begin(); 00383 while (iter != destinationList_.end()) { 00384 ret += (*iter).toXml(extraOffset1); 00385 iter++; 00386 } 00387 00388 ret += offset + " <sender>" + sender_->getAbsoluteName() + "</sender>"; 00389 00390 if (NORM_PRIORITY != priority_) 00391 ret += offset + " <priority>" + lexical_cast<std::string>(priority_) + "</priority>"; 00392 00393 if (!subscriptionId_.empty()) 00394 ret += offset + " <subscribe id='" + subscriptionId_ + "'/>"; 00395 00396 if (getLifeTime() > 0) { 00397 ret += offset + " <expiration lifeTime='" + lexical_cast<std::string>(getLifeTime()); 00398 bool sendRemainingLife = true; // make it configurable !!! 00399 if (sendRemainingLife) { 00400 if (getRemainingLife() > 0) 00401 ret += "' remainingLife='" + lexical_cast<std::string>(getRemainingLife()); 00402 else if (getRemainingLifeStatic() > 0) 00403 ret += "' remainingLife='" + lexical_cast<std::string>(getRemainingLifeStatic()); 00404 } 00405 ret += "'/>"; 00406 } 00407 00408 if (getRcvTimestamp() != 0) 00409 ret += TimestampFactory::toXml(getRcvTimestamp(), extraOffset1, false); 00410 if(getQueueSize() > 0) 00411 ret += offset + " <queue index='" + lexical_cast<std::string>(getQueueIndex()) + "' size='" + lexical_cast<std::string>(getQueueSize()) + "'/>"; 00412 if (getRedeliver() > 0) 00413 ret += offset + " <redeliver>" + lexical_cast<std::string>(getRedeliver()) + "</redeliver>"; 00414 if (isPersistent()) 00415 ret += offset + " <persistent/>"; 00416 if (forceUpdate_.isModified()) 00417 ret += offset + " <forceUpdate>" + lexical_cast<string>(forceUpdate_.getValue()) + "</forceUpdate>"; 00418 if (forceDestroy_.isModified()) 00419 ret += offset + " <forceDestroy>" + lexical_cast<string>(forceDestroy_.getValue()) + "</forceDestroy>"; 00420 00421 if (topicProperty_ != NULL) { 00422 ret += topicProperty_->toXml(extraOffset1); 00423 } 00424 00425 RouteVector::const_iterator routeIter = routeNodeList_.begin(); 00426 ret += offset + " <route>"; 00427 while (routeIter != routeNodeList_.end()) { 00428 ret += (*routeIter).toXml(extraOffset1); 00429 routeIter++; 00430 } 00431 ret += offset + " </route>"; 00432 ret += dumpClientProperties(extraOffset + Constants::INDENT, clearText); 00433 ret += offset + "</qos>"; 00434 00435 if (ret.length() < 16) return ""; 00436 00437 return ret; 00438 } 00439 00440 MsgQosData* MsgQosData::getClone() const 00441 { 00442 return new MsgQosData(*this); 00443 } 00444 00445 void MsgQosData::setTopicProperty(const TopicProperty& prop) 00446 { 00447 if (topicProperty_ != NULL) { 00448 delete topicProperty_; 00449 topicProperty_ = NULL; 00450 } 00451 topicProperty_ = new TopicProperty(prop); 00452 } 00453 00454 TopicProperty MsgQosData::getTopicProperty() 00455 { 00456 if (topicProperty_ != NULL) return *topicProperty_; 00457 topicProperty_ = new TopicProperty(global_); 00458 return *topicProperty_; 00459 } 00460 00461 bool MsgQosData::hasTopicProperty() const 00462 { 00463 return (topicProperty_ != NULL); 00464 } 00465 00466 00467 bool MsgQosData::isForceDestroy() const 00468 { 00469 return forceDestroy_.getValue(); 00470 } 00471 00472 void MsgQosData::setForceDestroy(bool forceDestroy) 00473 { 00474 forceDestroy_.setValue(forceDestroy, CREATED_BY_SETTER); 00475 } 00476 00477 }}}} 00478