00001 /*------------------------------------------------------------------------------ 00002 Name: QueuePropertyBase.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Holding callback queue properties 00006 Version: $Id: QueuePropertyBase.cpp 13445 2005-07-15 01:54:35Z ruff $ 00007 ------------------------------------------------------------------------------*/ 00008 00009 00017 #include <util/qos/storage/QueuePropertyBase.h> 00018 #include <util/lexical_cast.h> 00019 #include <util/Global.h> 00020 00021 namespace org { namespace xmlBlaster { namespace util { namespace qos { namespace storage { 00022 00023 using namespace std; 00024 using namespace org::xmlBlaster::util; 00025 using namespace org::xmlBlaster::util::qos::address; 00026 00027 const long DEFAULT_maxEntriesDefault = 1000L; 00028 const long DEFAULT_maxEntriesCacheDefault = 1000L; 00029 const long DEFAULT_bytesDefault = 10485760L; // 10 MB 00030 const long DEFAULT_bytesCacheDefault = 2097152L; // 2 MB 00032 const double DEFAULT_storeSwapLevelRatio = 0.70; 00034 const double DEFAULT_storeSwapBytesRatio = 0.25; 00036 const double DEFAULT_reloadSwapLevelRatio = 0.30; 00038 const double DEFAULT_reloadSwapBytesRatio = 0.25; 00039 const Timestamp DEFAULT_minExpires = 1000; 00040 const Timestamp DEFAULT_maxExpires = 0; 00041 const string DEFAULT_onOverflow = Constants::ONOVERFLOW_DEADMESSAGE; 00042 const string DEFAULT_onFailure = Constants::ONOVERFLOW_DEADMESSAGE; 00043 00044 // static variables 00045 string DEFAULT_type = "CACHE"; 00046 string DEFAULT_version = "1.0"; 00048 long DEFAULT_expires; 00049 00050 00055 void QueuePropertyBase::initialize(const string& propertyPrefix) 00056 { 00057 //if (log_.call()) log_.call(ME, string("::initialize with property prefix '") + propertyPrefix + "'"); 00058 propertyPrefix_ = propertyPrefix; 00059 string prefix = getPrefix(); 00060 //if (log_.trace()) log_.trace(ME, string("::initialize: got the prefix '") + prefix + "'"); 00061 00062 // Do we need this range settings? 00063 setMinExpires(global_.getProperty().getTimestampProperty("queue/expires.min", DEFAULT_minExpires)); 00064 setMaxExpires(global_.getProperty().getTimestampProperty("queue/expires.max", DEFAULT_maxExpires)); // Long.MAX_VALUE); 00065 //if (log_.trace()) log_.trace(ME, "::initialize: expires set"); 00066 if (nodeId_ != "") { 00067 setMinExpires(global_.getProperty().getTimestampProperty("queue/expires.min["+nodeId_+"]", getMinExpires())); 00068 setMaxExpires(global_.getProperty().getTimestampProperty("queue/expires.max["+nodeId_+"]", getMaxExpires())); // Long.MAX_VALUE); 00069 } 00070 //if (log_.trace()) log_.trace(ME, "::initialize: expires for the specific node set"); 00071 00072 // prefix is e.g. "queue/history/" or "persistence/topicStore/" 00073 setMaxEntries(global_.getProperty().getLongProperty(prefix+"maxEntries", DEFAULT_maxEntriesDefault)); 00074 //if (log_.trace()) log_.trace(ME, "::initialize: setMaxEntries -> " + lexical_cast<string>(getMaxEntries())); 00075 setMaxEntriesCache(global_.getProperty().getLongProperty(prefix+"maxEntriesCache", DEFAULT_maxEntriesCacheDefault)); 00076 //if (log_.trace()) log_.trace(ME, "::initialize: setMaxEntriesCache -> " + lexical_cast<string>(getMaxEntriesCache())); 00077 setMaxBytes(global_.getProperty().getLongProperty(prefix+"maxBytes", DEFAULT_bytesDefault)); 00078 //if (log_.trace()) log_.trace(ME, "::initialize: setMaxBytes -> " + lexical_cast<string>(getMaxBytes())); 00079 setMaxBytesCache(global_.getProperty().getLongProperty(prefix+"maxBytesCache", DEFAULT_bytesCacheDefault)); 00080 //if (log_.trace()) log_.trace(ME, "::initialize: setMaxBytesCache -> " + lexical_cast<string>(getMaxBytesCache())); 00081 00082 setStoreSwapLevel(global_.getProperty().getLongProperty(prefix+"storeSwapLevel", (long)(DEFAULT_storeSwapLevelRatio*maxBytesCache_))); 00083 setStoreSwapBytes(global_.getProperty().getLongProperty(prefix+"storeSwapBytes", (long)(DEFAULT_storeSwapBytesRatio*maxBytesCache_))); 00084 setReloadSwapLevel(global_.getProperty().getLongProperty(prefix+"reloadSwapLevel", (long)(DEFAULT_reloadSwapLevelRatio*maxBytesCache_))); 00085 setReloadSwapBytes(global_.getProperty().getLongProperty(prefix+"reloadSwapBytes", (long)(DEFAULT_reloadSwapBytesRatio*maxBytesCache_))); 00086 00087 //if (log_.trace()) log_.trace(ME, "::initialize: values for the swap control set"); 00088 00089 setExpires(global_.getProperty().getTimestampProperty(prefix+"expires", DEFAULT_maxExpires)); 00090 setOnOverflow(global_.getProperty().getStringProperty(prefix+"onOverflow", DEFAULT_onOverflow)); 00091 setOnFailure(global_.getProperty().getStringProperty(prefix+"onFailure", DEFAULT_onFailure)); 00092 setType(global_.getProperty().getStringProperty(prefix+"type", DEFAULT_type)); 00093 setVersion(global_.getProperty().getStringProperty(prefix+"version", DEFAULT_version)); 00094 00095 //if (log_.trace()) log_.trace(ME, "::initialize: going to set specific node properties"); 00096 00097 if (nodeId_ != "") { 00098 setMaxEntries(global_.getProperty().getLongProperty(prefix+"maxEntries["+nodeId_+"]", getMaxEntries())); 00099 setMaxEntriesCache(global_.getProperty().getLongProperty(prefix+"maxEntriesCache["+nodeId_+"]", getMaxEntriesCache())); 00100 setMaxBytes(global_.getProperty().getLongProperty(prefix+"maxBytes["+nodeId_+"]", getMaxBytes())); 00101 setMaxBytesCache(global_.getProperty().getLongProperty(prefix+"maxBytesCache["+nodeId_+"]", getMaxBytesCache())); 00102 setStoreSwapLevel(global_.getProperty().getLongProperty(prefix+"storeSwapLevel["+nodeId_+"]", getStoreSwapLevel())); 00103 setStoreSwapBytes(global_.getProperty().getLongProperty(prefix+"storeSwapBytes["+nodeId_+"]", getStoreSwapBytes())); 00104 setReloadSwapLevel(global_.getProperty().getLongProperty(prefix+"reloadSwapLevel["+nodeId_+"]", getReloadSwapLevel())); 00105 setReloadSwapBytes(global_.getProperty().getLongProperty(prefix+"reloadSwapBytes["+nodeId_+"]", getReloadSwapBytes())); 00106 setExpires(global_.getProperty().getTimestampProperty(prefix+"expires["+nodeId_+"]", getExpires())); 00107 setOnOverflow(global_.getProperty().getStringProperty(prefix+"onOverflow["+nodeId_+"]", getOnOverflow())); 00108 setOnFailure(global_.getProperty().getStringProperty(prefix+"onFailure["+nodeId_+"]", getOnFailure())); 00109 setType(global_.getProperty().getStringProperty(prefix+"type["+nodeId_+"]", getType())); 00110 setVersion(global_.getProperty().getStringProperty(prefix+"version["+nodeId_+"]", getVersion())); 00111 } 00112 if (log_.trace()) log_.trace(ME, string("::initialized to: ") + toXml()); 00113 } 00114 00115 /* 00116 void QueuePropertyBase::initialize() 00117 { 00118 // Do we need this range settings? 00119 setMinExpires(global_.getProperty().getTimestampProperty("queue/expires.min", DEFAULT_minExpires)); 00120 setMaxExpires(global_.getProperty().getTimestampProperty("queue/expires.max", DEFAULT_maxExpires)); // Long.MAX_VALUE); 00121 if (nodeId_ != "") { 00122 setMinExpires(global_.getProperty().getTimestampProperty(string("queue/expires.min[")+nodeId_+string("]"), getMinExpires())); 00123 setMaxExpires(global_.getProperty().getTimestampProperty(string("queue/expires.max[")+nodeId_+string("]"), getMaxExpires())); // Long.MAX_VALUE); 00124 } 00125 00126 // PluginInfo pluginInfo = new PluginInfo(glob, null, global_.getProperty().get("queue/defaultPlugin", DEFAULT_type)); 00127 // DEFAULT_type = pluginInfo.getType(); 00128 // DEFAULT_version = pluginInfo.getVersion(); 00129 } 00130 */ 00131 00132 00133 QueuePropertyBase::QueuePropertyBase(Global& global, const string& nodeId) 00134 : ME("QueuePropertyBase"), 00135 global_(global), 00136 log_(global.getLog("org.xmlBlaster.util.qos")), 00137 type_(DEFAULT_type), 00138 version_(DEFAULT_version), 00139 minExpires_(DEFAULT_minExpires), 00140 maxExpires_(DEFAULT_minExpires), 00141 relating_(Constants::RELATING_CALLBACK), 00142 expires_(DEFAULT_expires), 00143 maxEntries_(DEFAULT_maxEntriesDefault), 00144 maxBytes_(DEFAULT_bytesDefault), 00145 maxEntriesCache_(DEFAULT_maxEntriesCacheDefault), 00146 storeSwapLevel_(0), 00147 storeSwapBytes_(0), 00148 reloadSwapLevel_(0), 00149 reloadSwapBytes_(0), 00150 maxBytesCache_(DEFAULT_bytesCacheDefault), 00151 onOverflow_(Constants::ONOVERFLOW_DEADMESSAGE), 00152 onFailure_(Constants::ONOVERFLOW_DEADMESSAGE), 00153 addressArr_(), 00154 nodeId_(nodeId), 00155 propertyPrefix_(""), 00156 rootTagName_("queue") 00157 { 00158 } 00159 00160 QueuePropertyBase::QueuePropertyBase(const QueuePropertyBase& prop) 00161 : ME("QueuePropertyBase"), global_(prop.global_), log_(prop.log_) 00162 { 00163 copy(prop); 00164 } 00165 00166 QueuePropertyBase& 00167 QueuePropertyBase::operator =(const QueuePropertyBase& prop) 00168 { 00169 copy(prop); 00170 return *this; 00171 } 00172 00173 00174 QueuePropertyBase::~QueuePropertyBase() 00175 { 00176 addressArr_.clear(); 00177 // delete all entries of the address vector since they are pointers 00178 // owned by this object. 00179 // cleanupAddresses(); 00180 } 00181 00185 void QueuePropertyBase::setRelating(const string& relating) 00186 { 00187 if (Constants::RELATING_CALLBACK == relating) 00188 relating_ = Constants::RELATING_CALLBACK; 00189 else if (Constants::RELATING_SUBJECT == relating) 00190 relating_ = Constants::RELATING_SUBJECT; 00191 else if (Constants::RELATING_CLIENT == relating) 00192 relating_ = Constants::RELATING_CLIENT; 00193 else if (Constants::RELATING_HISTORY == relating) 00194 relating_ = Constants::RELATING_HISTORY; 00195 else if (Constants::RELATING_MSGUNITSTORE == relating) 00196 relating_ = Constants::RELATING_MSGUNITSTORE; 00197 else if (Constants::RELATING_TOPICSTORE == relating) 00198 relating_ = Constants::RELATING_TOPICSTORE; 00199 else { 00200 log_.warn(ME, string("Ignoring relating=") + relating); 00201 } 00202 } 00203 00208 string QueuePropertyBase::getRelating() const 00209 { 00210 return relating_; 00211 } 00212 00217 Timestamp QueuePropertyBase::getExpires() const 00218 { 00219 return expires_; 00220 } 00221 00226 void QueuePropertyBase::setExpires(Timestamp expires) 00227 { 00228 if (maxExpires_ <= 0) expires_ = expires; 00229 else if ( (expires>0) && (maxExpires_>0) && (expires>maxExpires_) ) 00230 expires_ = maxExpires_; 00231 else if ( (expires<=0) && (maxExpires_>0) ) 00232 expires_ = maxExpires_; 00233 00234 if ( (expires>0) && (expires<minExpires_) ) 00235 expires_ = minExpires_; 00236 } 00237 00238 00244 long QueuePropertyBase::getMaxEntries() const 00245 { 00246 return maxEntries_; 00247 } 00248 00254 void QueuePropertyBase::setMaxEntries(long maxEntries) 00255 { 00256 maxEntries_ = maxEntries; 00257 } 00258 00259 00265 string QueuePropertyBase::getType() const 00266 { 00267 return type_; 00268 } 00269 00275 void QueuePropertyBase::setType(const string& type) 00276 { 00277 type_ = type; 00278 } 00279 00285 string QueuePropertyBase::getVersion() const 00286 { 00287 return version_; 00288 } 00289 00295 void QueuePropertyBase::setVersion(const string& version) 00296 { 00297 version_ = version; 00298 } 00299 00305 long QueuePropertyBase::getMaxEntriesCache() const 00306 { 00307 return maxEntriesCache_; 00308 } 00309 00315 void QueuePropertyBase::setMaxEntriesCache(long maxEntriesCache) 00316 { 00317 maxEntriesCache_ = maxEntriesCache; 00318 } 00319 00320 00326 long QueuePropertyBase::getMaxBytes() const 00327 { 00328 return maxBytes_; 00329 } 00330 00336 void QueuePropertyBase::setMaxBytes(long maxBytes) 00337 { 00338 maxBytes_ = maxBytes; 00339 } 00340 00341 00347 long QueuePropertyBase::getMaxBytesCache() const 00348 { 00349 return maxBytesCache_; 00350 } 00351 00352 00358 long QueuePropertyBase::getStoreSwapLevel() const 00359 { 00360 return storeSwapLevel_; 00361 } 00362 00368 void QueuePropertyBase::setStoreSwapLevel(long storeSwapLevel) 00369 { 00370 storeSwapLevel_ = storeSwapLevel; 00371 } 00372 00378 long QueuePropertyBase::getStoreSwapBytes() const 00379 { 00380 return storeSwapBytes_; 00381 } 00382 00388 void QueuePropertyBase::setStoreSwapBytes(long storeSwapBytes) 00389 { 00390 storeSwapBytes_ = storeSwapBytes; 00391 } 00392 00398 long QueuePropertyBase::getReloadSwapLevel() const 00399 { 00400 return reloadSwapLevel_; 00401 } 00402 00408 void QueuePropertyBase::setReloadSwapLevel(long reloadSwapLevel) 00409 { 00410 reloadSwapLevel_ = reloadSwapLevel; 00411 } 00412 00418 long QueuePropertyBase::getReloadSwapBytes() const 00419 { 00420 return reloadSwapBytes_; 00421 } 00422 00428 void QueuePropertyBase::setReloadSwapBytes(long reloadSwapBytes) 00429 { 00430 reloadSwapBytes_ = reloadSwapBytes; 00431 } 00432 00438 void QueuePropertyBase::setMaxBytesCache(long maxBytesCache) 00439 { 00440 maxBytesCache_ = maxBytesCache; 00441 } 00442 00443 00449 void QueuePropertyBase::setOnOverflow(const string& onOverflow) 00450 { 00451 /* 00452 if (Constants.ONOVERFLOW_BLOCK.equalsIgnoreCase(onOverflow)) { 00453 this.onOverflow = Constants.ONOVERFLOW_BLOCK; 00454 } 00455 */ 00456 if (Constants::ONOVERFLOW_DEADMESSAGE == onOverflow) { 00457 onOverflow_ = Constants::ONOVERFLOW_DEADMESSAGE; 00458 } 00459 else if (Constants::ONOVERFLOW_DISCARDOLDEST == onOverflow) { 00460 onOverflow_ = Constants::ONOVERFLOW_DISCARDOLDEST; 00461 00462 onOverflow_ = Constants::ONOVERFLOW_DEADMESSAGE; // TODO !!! 00463 log_.error(ME, string("queue onOverflow='") + string(Constants::ONOVERFLOW_DISCARDOLDEST) + string("' is not implemented, switching to ") + onOverflow_ + string(" mode")); 00464 } 00465 else { 00466 onOverflow_ = Constants::ONOVERFLOW_DEADMESSAGE; 00467 log_.warn(ME, string("The queue onOverflow attribute is invalid '") + onOverflow + string("', setting to '") + onOverflow_ + string("'")); 00468 } 00469 } 00470 00475 string QueuePropertyBase::getOnOverflow() const 00476 { 00477 return onOverflow_; 00478 } 00479 00480 /* 00481 * The default mode, when queue is full the publisher blocks until 00482 * there is space again. 00483 public final boolean onOverflowBlock() { 00484 if (Constants.ONOVERFLOW_BLOCK.equalsIgnoreCase(getOnOverflow())) 00485 return true; 00486 return false; 00487 } 00488 */ 00489 00495 void QueuePropertyBase::setOnFailure(const string& onFailure) 00496 { 00497 if (Constants::ONOVERFLOW_DEADMESSAGE == onFailure) 00498 onFailure_ = Constants::ONOVERFLOW_DEADMESSAGE; 00499 else { 00500 log_.warn(ME, string("The queue onFailure attribute is invalid '") + onFailure + string("', setting to 'deadMessage'")); 00501 onFailure_ = Constants::ONOVERFLOW_DEADMESSAGE; 00502 } 00503 } 00504 00509 string QueuePropertyBase::getOnFailure() const 00510 { 00511 return onFailure_; 00512 } 00513 00517 bool QueuePropertyBase::onFailureDeadMessage() 00518 { 00519 if (Constants::ONOVERFLOW_DEADMESSAGE == getOnFailure()) 00520 return true; 00521 return false; 00522 } 00523 00528 AddressVector QueuePropertyBase::getAddresses() const 00529 { 00530 return addressArr_; 00531 } 00532 00539 string QueuePropertyBase::toXml(const string& extraOffset) const 00540 { 00541 string offset = Constants::OFFSET + extraOffset; 00542 string ret; 00543 ret += offset + string("<!-- QueuePropertyBase -->"); 00544 00545 ret += offset + string("<queue relating='") + getRelating(); 00546 if (DEFAULT_type != getType()) 00547 ret += string("' type='") + getType(); 00548 if (DEFAULT_version != getVersion()) 00549 ret += string("' version='") + getVersion(); 00550 if (DEFAULT_maxEntriesDefault != getMaxEntries()) 00551 ret += string("' maxEntries='") + lexical_cast<std::string>(getMaxEntries()); 00552 if (DEFAULT_maxEntriesCacheDefault != getMaxEntriesCache()) 00553 ret += string("' maxEntriesCache='") + lexical_cast<std::string>(getMaxEntriesCache()); 00554 if (DEFAULT_bytesDefault != getMaxBytes()) 00555 ret += string("' maxBytes='") + lexical_cast<std::string>(getMaxBytes()); 00556 if (DEFAULT_bytesCacheDefault != getMaxBytesCache()) 00557 ret += string("' maxBytesCache='") + lexical_cast<std::string>(getMaxBytesCache()); 00558 ret += string("' storeSwapLevel='") + lexical_cast<std::string>(getStoreSwapLevel()); 00559 ret += string("' storeSwapBytes='") + lexical_cast<std::string>(getStoreSwapBytes()); 00560 ret += string("' reloadSwapLevel='") + lexical_cast<std::string>(getReloadSwapLevel()); 00561 ret += string("' reloadSwapBytes='") + lexical_cast<std::string>(getReloadSwapBytes()); 00562 if (DEFAULT_expires != getExpires()) 00563 ret += string("' expires='") + lexical_cast<std::string>(getExpires()); 00564 if (DEFAULT_onOverflow != getOnOverflow()) 00565 ret += string("' onOverflow='") + getOnOverflow(); 00566 if (DEFAULT_onFailure != getOnFailure()) 00567 ret += string("' onFailure='") + getOnFailure(); 00568 00569 if (!addressArr_.empty()) { 00570 ret += string("'>"); 00571 AddressVector::const_iterator iter = addressArr_.begin(); 00572 while (iter != addressArr_.end()) { 00573 ret += (*iter)->toXml(extraOffset + Constants::INDENT); 00574 iter++; 00575 } 00576 ret += offset + string("</queue>"); 00577 00578 } 00579 else 00580 ret += string("'/>"); 00581 return ret; 00582 } 00583 00587 Global& QueuePropertyBase::getGlobal() 00588 { 00589 return global_; 00590 } 00591 00592 /* 00593 void QueuePropertyBase::cleanupAddresses() 00594 { 00595 AddressVector::iterator iter = addressArr_.begin(); 00596 while (iter != addressArr_.end()) { 00597 AddressBase* el = *iter; 00598 addressArr_.erase(iter); 00599 delete el; 00600 iter = addressArr_.begin(); 00601 } 00602 } 00603 */ 00604 00605 string QueuePropertyBase::getPropertyPrefix() const 00606 { 00607 return propertyPrefix_; 00608 } 00609 00610 void QueuePropertyBase::setpropertyPrefix(const string& prefix) 00611 { 00612 propertyPrefix_ = prefix; 00613 } 00614 00619 string QueuePropertyBase::getPrefix() 00620 { 00621 return (propertyPrefix_.length() > 0) ? 00622 getRootTagName()+"/"+propertyPrefix_+"/" : 00623 getRootTagName()+"/"; 00624 } 00625 00631 string QueuePropertyBase::getPropName(const string& token) 00632 { 00633 return "-" + getPrefix() + token; 00634 } 00635 00636 string QueuePropertyBase::getRootTagName() const 00637 { 00638 return rootTagName_; 00639 } 00640 00641 00642 }}}}} // namespaces 00643 00644