util/qos/storage/QueuePropertyBase.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      QueuePropertyBase.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Holding callback queue properties
00006 Version:   $Id: QueuePropertyBase.cpp 13445 2005-07-15 01:54:35Z ruff $
00007 ------------------------------------------------------------------------------*/
00008 
00009 
00017 #include <util/qos/storage/QueuePropertyBase.h>
00018 #include <util/lexical_cast.h>
00019 #include <util/Global.h>
00020 
00021 namespace org { namespace xmlBlaster { namespace util { namespace qos { namespace storage {
00022 
00023 using namespace std;
00024 using namespace org::xmlBlaster::util;
00025 using namespace org::xmlBlaster::util::qos::address;
00026 
00027 const long DEFAULT_maxEntriesDefault = 1000L;
00028 const long DEFAULT_maxEntriesCacheDefault = 1000L;
00029 const long DEFAULT_bytesDefault = 10485760L; // 10 MB
00030 const long DEFAULT_bytesCacheDefault = 2097152L; // 2 MB
00032 const double DEFAULT_storeSwapLevelRatio = 0.70;
00034 const double DEFAULT_storeSwapBytesRatio = 0.25;
00036 const double DEFAULT_reloadSwapLevelRatio = 0.30;
00038 const double DEFAULT_reloadSwapBytesRatio = 0.25;
00039 const Timestamp DEFAULT_minExpires = 1000;
00040 const Timestamp DEFAULT_maxExpires = 0;
00041 const string DEFAULT_onOverflow = Constants::ONOVERFLOW_DEADMESSAGE;
00042 const string DEFAULT_onFailure = Constants::ONOVERFLOW_DEADMESSAGE;
00043 
00044 // static variables
00045 string DEFAULT_type = "CACHE";
00046 string DEFAULT_version = "1.0";
00048 long DEFAULT_expires;
00049 
00050 
00055 void QueuePropertyBase::initialize(const string& propertyPrefix)
00056 {
00057    //if (log_.call()) log_.call(ME, string("::initialize with property prefix '") + propertyPrefix + "'");
00058    propertyPrefix_ = propertyPrefix;
00059    string prefix = getPrefix();
00060    //if (log_.trace()) log_.trace(ME, string("::initialize: got the prefix '") + prefix + "'");
00061 
00062    // Do we need this range settings?
00063    setMinExpires(global_.getProperty().getTimestampProperty("queue/expires.min", DEFAULT_minExpires));
00064    setMaxExpires(global_.getProperty().getTimestampProperty("queue/expires.max", DEFAULT_maxExpires)); // Long.MAX_VALUE);
00065    //if (log_.trace()) log_.trace(ME, "::initialize: expires set");
00066    if (nodeId_ != "") {
00067       setMinExpires(global_.getProperty().getTimestampProperty("queue/expires.min["+nodeId_+"]", getMinExpires()));
00068       setMaxExpires(global_.getProperty().getTimestampProperty("queue/expires.max["+nodeId_+"]", getMaxExpires())); // Long.MAX_VALUE);
00069    }
00070    //if (log_.trace()) log_.trace(ME, "::initialize: expires for the specific node set");
00071 
00072    // prefix is e.g. "queue/history/" or "persistence/topicStore/"
00073    setMaxEntries(global_.getProperty().getLongProperty(prefix+"maxEntries", DEFAULT_maxEntriesDefault));
00074    //if (log_.trace()) log_.trace(ME, "::initialize: setMaxEntries -> " + lexical_cast<string>(getMaxEntries()));
00075    setMaxEntriesCache(global_.getProperty().getLongProperty(prefix+"maxEntriesCache", DEFAULT_maxEntriesCacheDefault));
00076    //if (log_.trace()) log_.trace(ME, "::initialize: setMaxEntriesCache -> " + lexical_cast<string>(getMaxEntriesCache()));
00077    setMaxBytes(global_.getProperty().getLongProperty(prefix+"maxBytes", DEFAULT_bytesDefault));
00078    //if (log_.trace()) log_.trace(ME, "::initialize: setMaxBytes -> " + lexical_cast<string>(getMaxBytes()));
00079    setMaxBytesCache(global_.getProperty().getLongProperty(prefix+"maxBytesCache", DEFAULT_bytesCacheDefault));
00080    //if (log_.trace()) log_.trace(ME, "::initialize: setMaxBytesCache -> " + lexical_cast<string>(getMaxBytesCache()));
00081 
00082    setStoreSwapLevel(global_.getProperty().getLongProperty(prefix+"storeSwapLevel", (long)(DEFAULT_storeSwapLevelRatio*maxBytesCache_)));
00083    setStoreSwapBytes(global_.getProperty().getLongProperty(prefix+"storeSwapBytes", (long)(DEFAULT_storeSwapBytesRatio*maxBytesCache_)));
00084    setReloadSwapLevel(global_.getProperty().getLongProperty(prefix+"reloadSwapLevel", (long)(DEFAULT_reloadSwapLevelRatio*maxBytesCache_)));
00085    setReloadSwapBytes(global_.getProperty().getLongProperty(prefix+"reloadSwapBytes", (long)(DEFAULT_reloadSwapBytesRatio*maxBytesCache_)));
00086 
00087    //if (log_.trace()) log_.trace(ME, "::initialize: values for the swap control set");
00088 
00089    setExpires(global_.getProperty().getTimestampProperty(prefix+"expires", DEFAULT_maxExpires));
00090    setOnOverflow(global_.getProperty().getStringProperty(prefix+"onOverflow", DEFAULT_onOverflow));
00091    setOnFailure(global_.getProperty().getStringProperty(prefix+"onFailure", DEFAULT_onFailure));
00092    setType(global_.getProperty().getStringProperty(prefix+"type", DEFAULT_type));
00093    setVersion(global_.getProperty().getStringProperty(prefix+"version", DEFAULT_version));
00094 
00095    //if (log_.trace()) log_.trace(ME, "::initialize: going to set specific node properties");
00096 
00097    if (nodeId_ != "") {
00098       setMaxEntries(global_.getProperty().getLongProperty(prefix+"maxEntries["+nodeId_+"]", getMaxEntries()));
00099       setMaxEntriesCache(global_.getProperty().getLongProperty(prefix+"maxEntriesCache["+nodeId_+"]", getMaxEntriesCache()));
00100       setMaxBytes(global_.getProperty().getLongProperty(prefix+"maxBytes["+nodeId_+"]", getMaxBytes()));
00101       setMaxBytesCache(global_.getProperty().getLongProperty(prefix+"maxBytesCache["+nodeId_+"]", getMaxBytesCache()));
00102       setStoreSwapLevel(global_.getProperty().getLongProperty(prefix+"storeSwapLevel["+nodeId_+"]", getStoreSwapLevel()));
00103       setStoreSwapBytes(global_.getProperty().getLongProperty(prefix+"storeSwapBytes["+nodeId_+"]", getStoreSwapBytes()));
00104       setReloadSwapLevel(global_.getProperty().getLongProperty(prefix+"reloadSwapLevel["+nodeId_+"]", getReloadSwapLevel()));
00105       setReloadSwapBytes(global_.getProperty().getLongProperty(prefix+"reloadSwapBytes["+nodeId_+"]", getReloadSwapBytes()));
00106       setExpires(global_.getProperty().getTimestampProperty(prefix+"expires["+nodeId_+"]", getExpires()));
00107       setOnOverflow(global_.getProperty().getStringProperty(prefix+"onOverflow["+nodeId_+"]", getOnOverflow()));
00108       setOnFailure(global_.getProperty().getStringProperty(prefix+"onFailure["+nodeId_+"]", getOnFailure()));
00109       setType(global_.getProperty().getStringProperty(prefix+"type["+nodeId_+"]", getType()));
00110       setVersion(global_.getProperty().getStringProperty(prefix+"version["+nodeId_+"]", getVersion()));
00111    }
00112    if (log_.trace()) log_.trace(ME, string("::initialized to: ") + toXml());
00113 }
00114 
00115 /*
00116    void QueuePropertyBase::initialize()
00117    {
00118       // Do we need this range settings?
00119       setMinExpires(global_.getProperty().getTimestampProperty("queue/expires.min", DEFAULT_minExpires));
00120       setMaxExpires(global_.getProperty().getTimestampProperty("queue/expires.max", DEFAULT_maxExpires)); // Long.MAX_VALUE);
00121       if (nodeId_ != "") {
00122          setMinExpires(global_.getProperty().getTimestampProperty(string("queue/expires.min[")+nodeId_+string("]"), getMinExpires()));
00123          setMaxExpires(global_.getProperty().getTimestampProperty(string("queue/expires.max[")+nodeId_+string("]"), getMaxExpires())); // Long.MAX_VALUE);
00124       }
00125 
00126 //         PluginInfo pluginInfo = new PluginInfo(glob, null, global_.getProperty().get("queue/defaultPlugin", DEFAULT_type));
00127 //         DEFAULT_type = pluginInfo.getType();
00128 //         DEFAULT_version = pluginInfo.getVersion();
00129    }
00130 */
00131 
00132 
00133    QueuePropertyBase::QueuePropertyBase(Global& global, const string& nodeId)
00134       : ME("QueuePropertyBase"),
00135         global_(global),
00136         log_(global.getLog("org.xmlBlaster.util.qos")),
00137         type_(DEFAULT_type),
00138         version_(DEFAULT_version),
00139         minExpires_(DEFAULT_minExpires),
00140         maxExpires_(DEFAULT_minExpires),
00141         relating_(Constants::RELATING_CALLBACK),
00142         expires_(DEFAULT_expires),
00143         maxEntries_(DEFAULT_maxEntriesDefault),
00144         maxBytes_(DEFAULT_bytesDefault),
00145         maxEntriesCache_(DEFAULT_maxEntriesCacheDefault),
00146         storeSwapLevel_(0),
00147         storeSwapBytes_(0),
00148         reloadSwapLevel_(0),
00149         reloadSwapBytes_(0),
00150         maxBytesCache_(DEFAULT_bytesCacheDefault),
00151         onOverflow_(Constants::ONOVERFLOW_DEADMESSAGE),
00152         onFailure_(Constants::ONOVERFLOW_DEADMESSAGE),
00153         addressArr_(),
00154         nodeId_(nodeId),
00155         propertyPrefix_(""),
00156         rootTagName_("queue")
00157    {
00158    }
00159 
00160    QueuePropertyBase::QueuePropertyBase(const QueuePropertyBase& prop)
00161       : ME("QueuePropertyBase"), global_(prop.global_), log_(prop.log_)
00162    {
00163       copy(prop);
00164    }
00165 
00166    QueuePropertyBase&
00167    QueuePropertyBase::operator =(const QueuePropertyBase& prop)
00168    {
00169       copy(prop);
00170       return *this;
00171    }
00172 
00173 
00174    QueuePropertyBase::~QueuePropertyBase()
00175    {
00176       addressArr_.clear();
00177       // delete all entries of the address vector since they are pointers
00178       // owned by this object.
00179 //      cleanupAddresses();
00180    }
00181 
00185    void QueuePropertyBase::setRelating(const string& relating)
00186    {
00187       if (Constants::RELATING_CALLBACK == relating)
00188          relating_ = Constants::RELATING_CALLBACK;
00189       else if (Constants::RELATING_SUBJECT == relating)
00190          relating_ = Constants::RELATING_SUBJECT;
00191       else if (Constants::RELATING_CLIENT == relating)
00192          relating_ = Constants::RELATING_CLIENT;
00193       else if (Constants::RELATING_HISTORY == relating)
00194          relating_ = Constants::RELATING_HISTORY;
00195       else if (Constants::RELATING_MSGUNITSTORE == relating)
00196          relating_ = Constants::RELATING_MSGUNITSTORE;
00197       else if (Constants::RELATING_TOPICSTORE == relating)
00198          relating_ = Constants::RELATING_TOPICSTORE;
00199       else {
00200          log_.warn(ME, string("Ignoring relating=") + relating);
00201       }
00202    }
00203 
00208    string QueuePropertyBase::getRelating() const
00209    {
00210       return relating_;
00211    }
00212 
00217    Timestamp QueuePropertyBase::getExpires() const
00218    {
00219       return expires_;
00220    }
00221 
00226    void QueuePropertyBase::setExpires(Timestamp expires)
00227    {
00228       if (maxExpires_ <= 0) expires_ = expires;
00229       else if ( (expires>0) && (maxExpires_>0) && (expires>maxExpires_) )
00230          expires_ = maxExpires_;
00231       else if ( (expires<=0) && (maxExpires_>0) )
00232          expires_ = maxExpires_;
00233 
00234       if ( (expires>0) && (expires<minExpires_) )
00235          expires_ = minExpires_;
00236    }
00237 
00238 
00244    long QueuePropertyBase::getMaxEntries() const
00245    {
00246       return maxEntries_;
00247    }
00248 
00254    void QueuePropertyBase::setMaxEntries(long maxEntries)
00255    {
00256       maxEntries_ = maxEntries;
00257    }
00258 
00259 
00265    string QueuePropertyBase::getType() const
00266    {
00267       return type_;
00268    }
00269 
00275    void QueuePropertyBase::setType(const string& type)
00276    {
00277       type_ = type;
00278    }
00279 
00285    string QueuePropertyBase::getVersion() const
00286    {
00287       return version_;
00288    }
00289 
00295    void QueuePropertyBase::setVersion(const string& version)
00296    {
00297       version_ = version;
00298    }
00299 
00305    long QueuePropertyBase::getMaxEntriesCache() const
00306    {
00307       return maxEntriesCache_;
00308    }
00309 
00315    void QueuePropertyBase::setMaxEntriesCache(long maxEntriesCache)
00316    {
00317       maxEntriesCache_ = maxEntriesCache;
00318    }
00319 
00320 
00326    long QueuePropertyBase::getMaxBytes() const
00327    {
00328       return maxBytes_;
00329    }
00330 
00336    void QueuePropertyBase::setMaxBytes(long maxBytes)
00337    {
00338       maxBytes_ = maxBytes;
00339    }
00340 
00341 
00347    long QueuePropertyBase::getMaxBytesCache() const
00348    {
00349       return maxBytesCache_;
00350    }
00351 
00352 
00358    long QueuePropertyBase::getStoreSwapLevel() const
00359    {
00360       return storeSwapLevel_;
00361    }
00362 
00368    void QueuePropertyBase::setStoreSwapLevel(long storeSwapLevel)
00369    {
00370       storeSwapLevel_ = storeSwapLevel;
00371    }
00372 
00378    long QueuePropertyBase::getStoreSwapBytes() const
00379    {
00380       return storeSwapBytes_;
00381    }
00382 
00388    void QueuePropertyBase::setStoreSwapBytes(long storeSwapBytes)
00389    {
00390       storeSwapBytes_ = storeSwapBytes;
00391    }
00392 
00398    long QueuePropertyBase::getReloadSwapLevel() const
00399    {
00400       return reloadSwapLevel_;
00401    }
00402 
00408    void QueuePropertyBase::setReloadSwapLevel(long reloadSwapLevel)
00409    {
00410       reloadSwapLevel_ = reloadSwapLevel;
00411    }
00412 
00418    long QueuePropertyBase::getReloadSwapBytes() const
00419    {
00420       return reloadSwapBytes_;
00421    }
00422 
00428    void QueuePropertyBase::setReloadSwapBytes(long reloadSwapBytes)
00429    {
00430       reloadSwapBytes_ = reloadSwapBytes;
00431    }
00432 
00438    void QueuePropertyBase::setMaxBytesCache(long maxBytesCache)
00439    {
00440       maxBytesCache_ = maxBytesCache;
00441    }
00442 
00443 
00449    void QueuePropertyBase::setOnOverflow(const string& onOverflow)
00450    {
00451       /*
00452       if (Constants.ONOVERFLOW_BLOCK.equalsIgnoreCase(onOverflow)) {
00453          this.onOverflow = Constants.ONOVERFLOW_BLOCK;
00454       }
00455       */
00456       if (Constants::ONOVERFLOW_DEADMESSAGE == onOverflow) {
00457          onOverflow_ = Constants::ONOVERFLOW_DEADMESSAGE;
00458       }
00459       else if (Constants::ONOVERFLOW_DISCARDOLDEST == onOverflow) {
00460          onOverflow_ = Constants::ONOVERFLOW_DISCARDOLDEST;
00461 
00462          onOverflow_ = Constants::ONOVERFLOW_DEADMESSAGE; // TODO !!!
00463          log_.error(ME, string("queue onOverflow='") + string(Constants::ONOVERFLOW_DISCARDOLDEST) + string("' is not implemented, switching to ") + onOverflow_ + string(" mode"));
00464       }
00465       else {
00466          onOverflow_ = Constants::ONOVERFLOW_DEADMESSAGE;
00467          log_.warn(ME, string("The queue onOverflow attribute is invalid '") + onOverflow + string("', setting to '") + onOverflow_ + string("'"));
00468       }
00469    }
00470 
00475    string QueuePropertyBase::getOnOverflow() const
00476    {
00477       return onOverflow_;
00478    }
00479 
00480    /*
00481     * The default mode, when queue is full the publisher blocks until
00482     * there is space again.
00483    public final boolean onOverflowBlock() {
00484       if (Constants.ONOVERFLOW_BLOCK.equalsIgnoreCase(getOnOverflow()))
00485          return true;
00486       return false;
00487    }
00488     */
00489 
00495    void QueuePropertyBase::setOnFailure(const string& onFailure)
00496    {
00497       if (Constants::ONOVERFLOW_DEADMESSAGE == onFailure)
00498          onFailure_ = Constants::ONOVERFLOW_DEADMESSAGE;
00499       else {
00500          log_.warn(ME, string("The queue onFailure attribute is invalid '") + onFailure + string("', setting to 'deadMessage'"));
00501          onFailure_ = Constants::ONOVERFLOW_DEADMESSAGE;
00502       }
00503    }
00504 
00509    string QueuePropertyBase::getOnFailure() const
00510    {
00511       return onFailure_;
00512    }
00513 
00517    bool QueuePropertyBase::onFailureDeadMessage()
00518    {
00519       if (Constants::ONOVERFLOW_DEADMESSAGE == getOnFailure())
00520          return true;
00521       return false;
00522    }
00523 
00528    AddressVector QueuePropertyBase::getAddresses() const
00529    {
00530       return addressArr_;
00531    }
00532 
00539    string QueuePropertyBase::toXml(const string& extraOffset) const
00540    {
00541       string offset = Constants::OFFSET + extraOffset;
00542       string ret;   
00543       ret += offset + string("<!-- QueuePropertyBase -->");
00544 
00545       ret += offset + string("<queue relating='") + getRelating();
00546       if (DEFAULT_type != getType())
00547          ret += string("' type='") + getType();
00548       if (DEFAULT_version != getVersion())
00549          ret += string("' version='") + getVersion();
00550       if (DEFAULT_maxEntriesDefault != getMaxEntries())
00551          ret += string("' maxEntries='") + lexical_cast<std::string>(getMaxEntries());
00552       if (DEFAULT_maxEntriesCacheDefault != getMaxEntriesCache())
00553          ret += string("' maxEntriesCache='") + lexical_cast<std::string>(getMaxEntriesCache());
00554       if (DEFAULT_bytesDefault != getMaxBytes())
00555          ret += string("' maxBytes='") + lexical_cast<std::string>(getMaxBytes());
00556       if (DEFAULT_bytesCacheDefault != getMaxBytesCache())
00557          ret += string("' maxBytesCache='") + lexical_cast<std::string>(getMaxBytesCache());
00558       ret += string("' storeSwapLevel='") + lexical_cast<std::string>(getStoreSwapLevel());
00559       ret += string("' storeSwapBytes='") + lexical_cast<std::string>(getStoreSwapBytes());
00560       ret += string("' reloadSwapLevel='") + lexical_cast<std::string>(getReloadSwapLevel());
00561       ret += string("' reloadSwapBytes='") + lexical_cast<std::string>(getReloadSwapBytes());
00562       if (DEFAULT_expires != getExpires())
00563          ret += string("' expires='") + lexical_cast<std::string>(getExpires());
00564       if (DEFAULT_onOverflow != getOnOverflow())
00565          ret += string("' onOverflow='") + getOnOverflow();
00566       if (DEFAULT_onFailure != getOnFailure())
00567          ret += string("' onFailure='") + getOnFailure();
00568 
00569       if (!addressArr_.empty()) {
00570          ret += string("'>");
00571          AddressVector::const_iterator iter = addressArr_.begin();
00572          while (iter != addressArr_.end()) {
00573             ret += (*iter)->toXml(extraOffset + Constants::INDENT);
00574             iter++;
00575          }
00576          ret += offset + string("</queue>");
00577 
00578       }
00579       else
00580          ret += string("'/>");
00581       return ret;
00582    }
00583 
00587    Global& QueuePropertyBase::getGlobal()
00588    {
00589       return global_;
00590    }
00591 
00592 /*
00593    void QueuePropertyBase::cleanupAddresses()
00594    {
00595       AddressVector::iterator iter = addressArr_.begin();
00596       while (iter != addressArr_.end()) {
00597         AddressBase* el = *iter;
00598         addressArr_.erase(iter);
00599         delete el;
00600         iter = addressArr_.begin();
00601       }
00602    }
00603 */
00604 
00605    string QueuePropertyBase::getPropertyPrefix() const
00606    {
00607       return propertyPrefix_;
00608    }
00609 
00610    void QueuePropertyBase::setpropertyPrefix(const string& prefix)
00611    {
00612       propertyPrefix_ = prefix;
00613    }
00614 
00619    string QueuePropertyBase::getPrefix()
00620    {
00621       return (propertyPrefix_.length() > 0) ?
00622                    getRootTagName()+"/"+propertyPrefix_+"/" :
00623                    getRootTagName()+"/";
00624    }
00625 
00631    string QueuePropertyBase::getPropName(const string& token)
00632    {
00633       return "-" + getPrefix() + token;
00634    }
00635 
00636    string QueuePropertyBase::getRootTagName() const
00637    {
00638       return rootTagName_;
00639    }
00640 
00641 
00642 }}}}} // namespaces
00643 
00644