util/qos/MsgQosFactory.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      MsgQosSaxFactory.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 ------------------------------------------------------------------------------*/
00006 #include <util/qos/MsgQosFactory.h>
00007 #include <util/MethodName.h>
00008 #include <util/Global.h>
00009 #include <util/StringTrim.h>
00010 #include <util/lexical_cast.h>
00011 
00012 
00013 using namespace org::xmlBlaster::util;
00014 using namespace org::xmlBlaster::util::parser;
00015 using namespace org::xmlBlaster::util::cluster;
00016 using namespace org::xmlBlaster::util::qos::storage;
00017 using namespace std;
00018 
00019 namespace org { namespace xmlBlaster { namespace util { namespace qos {
00020 
00021 MsgQosFactory::MsgQosFactory(Global& global)
00022    : XmlHandlerBase(global), 
00023      ME("MsgQosFactory"),
00024      msgQosDataP_(0), 
00025      destination_(global), 
00026      routeInfo_(global),
00027      queuePropertyFactory_(global),
00028      clientProperty_(0)
00029 {
00030    ME                 = string("MsgQosFactory");
00031    LIFE_TIME          = string("lifeTime");
00032    FORCE_DESTROY      = string("forceDestroy");
00033    REMAINING_LIFE     = string("remainingLife");
00034    READ_ONLY          = string("readOnly");
00035    DESTROY_DELAY      = string("destroyDelay");
00036    CREATE_DOM_ENTRY   = string("createDomEntry");
00037    NANOS              = string("nanos");
00038    ID                 = string("id");
00039    STRATUM            = string("stratum");
00040    TIMESTAMP          = string("timestamp");
00041    DIRTY_READ         = string("dirtyRead");
00042    INDEX              = string("index");
00043    SIZE               = string("size");
00044    inState_           = false;
00045    inSubscribe_       = false;
00046    inRedeliver_       = false;
00047    inQueue_           = false;
00048    inPersistence_     = false;
00049    inDestination_     = false;
00050    inSender_          = false;
00051    inPriority_        = false;
00052    inClientProperty_  = false;
00053    inExpiration_      = false;
00054    inRcvTimestamp_    = false;
00055    inIsVolatile_      = false;
00056    inIsPersistent_    = false;
00057    inReadonly_        = false;
00058    inRoute_           = false;
00059    sendRemainingLife_ = true;
00060    inQos_             = false;
00061 }
00062 
00063 MsgQosFactory::~MsgQosFactory() 
00064 {
00065    if (clientProperty_ != 0) {
00066       delete(clientProperty_);
00067    }
00068    if (msgQosDataP_ != 0) {
00069       delete(msgQosDataP_);
00070    }
00071 }                
00072 
00073 MsgQosData MsgQosFactory::readObject(const string& xmlQos)
00074 {
00075    delete msgQosDataP_;
00076    msgQosDataP_ = new MsgQosData(global_);
00077    routeInfo_ = RouteInfo(global_);
00078    //queuePropertyFactory_ = QueuePropertyFactory(global_);
00079    delete clientProperty_;
00080    clientProperty_ = 0;
00081 
00082    if (xmlQos.empty()) init("<qos/>");
00083    else init(xmlQos);
00084    return *msgQosDataP_;
00085 }
00086 
00087 void MsgQosFactory::startElement(const string &name, const AttributeMap& attrs)
00088 {
00089    bool      tmpBool;
00090    string    tmpString;
00091    long      tmpLong;
00092    Timestamp tmpTimestamp;
00093    if (name.compare("qos") == 0) {
00094      inQos_ = true;
00095      return;
00096    }
00097    if (name.compare("persistence") == 0 || inPersistence_) {
00098       if (!inQos_) return;
00099       inPersistence_ = true;
00100       queuePropertyFactory_.startElement(name, attrs);
00101       return;
00102    }
00103    if (name.compare("state") == 0) {
00104       if (!inQos_) return;
00105       inState_ = true;
00106       AttributeMap::const_iterator iter = attrs.begin();
00107       string tmpName = (*iter).first;
00108       string tmpValue = (*iter).second;
00109       while (iter != attrs.end()) {
00110          if (tmpName.compare("id") == 0) {
00111             msgQosDataP_->setState(tmpValue);
00112          }
00113          else if (tmpName.compare("info") == 0) {
00114             msgQosDataP_->setStateInfo(tmpValue);
00115          }
00116          iter++;
00117       }
00118       return;
00119    }
00120    if (name.compare("destination") == 0) {
00121       if (!inQos_) return;
00122       inDestination_ = true;
00123       destination_ = Destination(global_);
00124       AttributeMap::const_iterator iter = attrs.begin();
00125       while (iter != attrs.end()) {
00126          string tmpName = (*iter).first;
00127          string tmpValue = (*iter).second;
00128          if (tmpName.compare("queryType") == 0) {
00129             string queryType = tmpValue;
00130             if (queryType == "EXACT")      destination_.setQueryType(queryType);
00131             else if (queryType == "XPATH") destination_.setQueryType(queryType);
00132             else log_.error(ME, string("Sorry, destination queryType='") + queryType + string("' is not supported"));
00133          }
00134          else if( tmpName.compare("forceQueuing") == 0) {
00135             destination_.forceQueuing(XmlHandlerBase::getBoolValue(tmpValue));
00136          }
00137          iter++;
00138       }
00139 
00140       return;
00141    }
00142    if (name.compare("sender") == 0) {
00143       if (!inQos_) return;
00144       inSender_ = true;
00145       return;
00146    }
00147    if (name.compare("priority") == 0) {
00148       if (!inQos_) return;
00149       inPriority_ = true;
00150       return;
00151    }
00152    if (name.compare("expiration") == 0) {
00153       if (!inQos_) return;
00154       inExpiration_ = true;
00155 //         int len = attrs.getLength();
00156       if (getLongAttr(attrs, LIFE_TIME, tmpLong)) msgQosDataP_->setLifeTime(tmpLong);
00157       else {
00158          log_.warn(ME, string("QoS <expiration> misses lifeTime attribute, setting default of ") + lexical_cast<std::string>(msgQosDataP_->getMaxLifeTime()));
00159          msgQosDataP_->setLifeTime(msgQosDataP_->getMaxLifeTime());
00160       }
00161       if (getBoolAttr(attrs, FORCE_DESTROY, tmpBool)) msgQosDataP_->setForceDestroy(tmpBool);
00162       if (getLongAttr(attrs, REMAINING_LIFE, tmpLong)) msgQosDataP_->setRemainingLifeStatic(tmpLong);
00163       return;
00164    }
00165    if (name.compare("rcvTimestamp") == 0) {
00166       if (!inQos_) return;
00167       if (getTimestampAttr(attrs, NANOS, tmpTimestamp)) msgQosDataP_->setRcvTimestamp(tmpTimestamp);
00168       inRcvTimestamp_ = true;
00169       return;
00170    }
00171    if (name.compare("redeliver") == 0) {
00172       if (!inQos_) return;
00173       inRedeliver_ = true;
00174       return;
00175    }
00176    if (name.compare("route") == 0) {
00177       if (!inQos_) return;
00178       inRoute_ = true;
00179       return;
00180    }
00181    if (name.compare("node") == 0) {
00182       if (!inRoute_) {
00183          log_.error(ME, "Ignoring <node>, it is not inside <route>");
00184          return;
00185       }
00186       if (attrs.size() > 0) {
00187          if (!getStringAttr(attrs, ID, tmpString)) {
00188             log_.error(ME, "QoS <route><node> misses id attribute, ignoring node");
00189             return;
00190          }
00191          NodeId nodeId(global_, tmpString); // where tmpString is the id
00192          int stratum = 0;
00193          if (!getIntAttr(attrs, STRATUM, stratum)) {
00194             log_.warn(ME, "QoS <route><node> misses stratum attribute, setting to 0: ");
00195             //Thread.currentThread().dumpStack();
00196          }
00197          Timestamp timestamp = 0;
00198          if (!getTimestampAttr(attrs, TIMESTAMP, timestamp)) {
00199             log_.warn(ME, "QoS <route><node> misses receive timestamp attribute, setting to 0");
00200          }
00201     //      bool dirtyRead = org::xmlBlaster::util::cluster::DEFAULT_dirtyRead;
00202          if (log_.trace()) log_.trace(ME, "Found node tag");
00203          routeInfo_ = RouteInfo(nodeId, stratum, timestamp);
00204          if (getBoolAttr(attrs, DIRTY_READ, tmpBool)) routeInfo_.setDirtyRead(tmpBool);
00205       }
00206       return;
00207    }
00208    if (name.compare(MethodName::SUBSCRIBE) == 0) {
00209       if (!inQos_) return;
00210       inSubscribe_ = true;
00211       AttributeMap::const_iterator iter = attrs.begin();
00212       while (iter != attrs.end()) {
00213          if ( ((*iter).first).compare("id") == 0) {
00214             msgQosDataP_->setSubscriptionId( (*iter).second );
00215          }
00216          iter++;
00217       }
00218       return;
00219    }
00220 
00221    if (name.compare("persistent") == 0) {
00222       if (!inQos_) return;
00223       msgQosDataP_->setPersistent(true);
00224       return;
00225    }
00226 
00227    if (name.compare("forceUpdate") == 0) {
00228       if (!inQos_) return;
00229       msgQosDataP_->setForceUpdate(true);
00230       return;
00231    }
00232 
00233    if (name.compare("readonly") == 0) {
00234       if (!inQos_) return;
00235       msgQosDataP_->setReadonly(true);
00236       log_.error(ME, "<qos><readonly/></qos> is deprecated, please use readonly as topic attribute <qos><topic readonly='true'></qos>");
00237       return;
00238    }
00239    
00240    if (name.compare("clientProperty") == 0) {
00241       if (!inQos_) return;
00242       inClientProperty_ = true;
00243       character_.erase();
00244       string nameAttr;
00245 
00246       AttributeMap::const_iterator iter = attrs.find("name");
00247       if (iter != attrs.end()) nameAttr = (*iter).second;
00248 
00249       string encoding;
00250       iter = attrs.find("encoding");
00251       if (iter != attrs.end()) encoding = (*iter).second;
00252 
00253       string type;
00254       iter = attrs.find("type");
00255       if (iter != attrs.end()) type = (*iter).second;
00256 
00257       string charset;
00258       iter = attrs.find("charset");
00259       if (iter != attrs.end()) charset = (*iter).second;
00260 
00261       clientProperty_ = new ClientProperty(true, nameAttr, type, encoding, charset);
00262    }
00263       
00264 }
00265 
00266 
00267 void MsgQosFactory::characters(const string &ch) 
00268 {
00269    XmlHandlerBase::characters(ch);
00270    if (inQueue_ || inPersistence_) queuePropertyFactory_.characters(ch);
00271 }
00272 
00273 
00274 void MsgQosFactory::endElement(const string &name) 
00275 {
00276    if (name.compare("qos") == 0) {
00277       inQos_ = false;
00278       return;
00279    }
00280    //log_.error(ME, "endElement: name=" + name + " character_=" + character_);
00281    if (inQueue_ || inPersistence_) {
00282       queuePropertyFactory_.endElement(name);
00283       if(name.compare("queue") == 0) {
00284          inQueue_ = false;
00285          character_.erase();
00286          QueuePropertyBase tmp = queuePropertyFactory_.getQueueProperty();
00287          string relating = tmp.getRelating();
00288          TopicProperty tmpProp = msgQosDataP_->getTopicProperty();
00289          if (relating == Constants::RELATING_HISTORY) {
00290             tmpProp.setHistoryQueueProperty(tmp);
00291             msgQosDataP_->setTopicProperty(tmpProp);
00292          }
00293          else {
00294             log_.error(ME, string("Ignoring unknown <queue relating='") + relating + "'/> configuration");
00295          }
00296          return;
00297       }
00298 
00299       if(name.compare("persistence") == 0) { // topic: RELATING_MSGUNITSTORE
00300          inPersistence_ = false;
00301          character_.erase();
00302          QueuePropertyBase tmp = queuePropertyFactory_.getQueueProperty();
00303          TopicProperty tmpProp = msgQosDataP_->getTopicProperty();
00304          tmpProp.setMsgUnitStoreProperty(tmp);
00305          msgQosDataP_->setTopicProperty(tmpProp);
00306          return;
00307       }
00308    }
00309 
00310    if (name.compare("state") == 0) {
00311       inState_ = false;
00312       character_.erase();
00313       return;
00314    }
00315 
00316    if( name.compare("destination") == 0) {
00317       inDestination_ = false;
00318       StringTrim::trim(character_); // The address or XPath query string
00319       if (!character_.empty()) {
00320          destination_.getDestination()->setAbsoluteName(character_); // set address or XPath query string if it is before the forceQueuing tag
00321          character_.erase();
00322       }
00323       msgQosDataP_->addDestination(destination_);
00324       return;
00325    }
00326 
00327    if(name.compare("sender") == 0) {
00328       inSender_ = false;
00329       StringTrim::trim(character_);
00330       msgQosDataP_->getSender()->setAbsoluteName(character_);
00331       // if (log.trace()) log.trace(ME, "Found message sender login name = " + msgQosData.getSender());
00332       character_.erase();
00333       return;
00334    }
00335 
00336    if(name.compare("priority") == 0) {
00337       inPriority_ = false;
00338       msgQosDataP_->setPriority(str2Priority(character_));
00339       character_.erase();
00340       return;
00341    }
00342 
00343    if(name.compare("expiration") == 0) {
00344       inExpiration_ = false;
00345       character_.erase();
00346       return;
00347    }
00348 
00349    if(name.compare("rcvTimestamp") == 0) {
00350       inRcvTimestamp_ = false;
00351       character_.erase();
00352       return;
00353    }
00354 
00355    if(name.compare("forceUpdate") == 0) {
00356       inIsVolatile_ = false;
00357       msgQosDataP_->setForceUpdate(StringTrim::isTrueTrim(character_));
00358       character_.erase();
00359       return;
00360    }
00361 
00362    if (name.compare(MethodName::SUBSCRIBE) == 0) {
00363       inSubscribe_ = false;
00364       character_.erase();
00365       return;
00366    }
00367 
00368    if(name.compare("persistent") == 0) {
00369       inIsPersistent_ = false;
00370       msgQosDataP_->setPersistent(StringTrim::isTrueTrim(character_));
00371       character_.erase();
00372       return;
00373    }
00374 
00375    if(name.compare("readonly") == 0) {
00376       inReadonly_ = false;
00377       msgQosDataP_->setReadonly(StringTrim::isTrueTrim(character_));
00378       character_.erase();
00379       return;
00380    }
00381 
00382    if(name.compare("redeliver") == 0) {
00383       inRedeliver_ = false;
00384       StringTrim::trim(character_);
00385       msgQosDataP_->setRedeliver(atoi(character_.c_str()));
00386       character_.erase();
00387       return;
00388    }
00389 
00390    if (name.compare("node") == 0) {
00391       msgQosDataP_->addRouteInfo(routeInfo_);
00392       character_.erase();
00393       return;
00394    }
00395 
00396    if (name.compare("route") == 0) {
00397       inRoute_ = false;
00398       character_.erase();
00399       return;
00400    }
00401 
00402    if (name.compare("clientProperty") == 0) {
00403       inClientProperty_ = false;
00404       clientProperty_->setValueRaw(character_);
00405       msgQosDataP_->addClientProperty(*clientProperty_);
00406       delete clientProperty_;
00407       clientProperty_ = 0;
00408       character_.erase();
00409    }
00410 
00411    character_.erase(); // reset data from unknown tags
00412 }
00413 
00414 
00416 void MsgQosFactory::sendRemainingLife(bool sendRemainingLife) 
00417 { 
00418    sendRemainingLife_ = sendRemainingLife; 
00419 }
00420 
00421 bool MsgQosFactory::sendRemainingLife() 
00422 { 
00423    return sendRemainingLife_; 
00424 }
00425 
00426 }}}}
00427 
00428 
00429 #ifdef _XMLBLASTER_CLASSTEST
00430 
00431 using namespace std;
00432 using namespace org::xmlBlaster::util::qos;
00433 
00434 int main(int args, char* argv[])
00435 {
00436     try
00437     {
00438        Global& glob = Global::getInstance();
00439        glob.initialize(args, argv);
00440 
00441        MsgQosData    data1(glob);
00442        MsgQosFactory factory(glob);
00443        string        qos   = data1.toXml();
00444        MsgQosData    data2 = factory.readObject(qos);
00445 
00446        cout << "data before parsing: " << data1.toXml() << endl;
00447        cout << "data after parsing : " << data2.toXml() << endl;
00448     }
00449     catch(...)  {
00450        cout << "exception occured\n";
00451        return 1;
00452     }
00453    return 0;
00454 }
00455 
00456 #endif
00457 
00458 /*
00459  <qos>
00460     <state id='OK' info='Keep on running"/> <!-- Only for updates and PtP -->
00461     <sender>Tim</sender>
00462     <priority>5</priority>
00463     <subscribe id='__subId:1'/>     <!-- Only for updates, id='__subId:PtP' for point to point messages -->
00464     <rcvTimestamp nanos='1007764305862000002'> <!-- UTC time when message was created in xmlBlaster server with a publish() call, in nanoseconds since 1970 -->
00465           2001-12-07 23:31:45.862000002   <!-- The nanos from above but human readable -->
00466     </rcvTimestamp>
00467     <expiration lifeTime='129595811' forceDestroy='false'/> <!-- Only for persistence layer -->
00468     <queue index='0' of='1'/> <!-- If queued messages are flushed on login -->
00469     <persistent/>
00470     <redeliver>4</redeliver>             <!-- Only for updates -->
00471     <route>
00472        <node id='heron'/>
00473     </route>
00474     <topic readonly='false' destroyDelay='60000' createDomEntry='true'>
00475        <queue relating='topic' type='CACHE' version='1.0' maxEntries='1000' maxBytes='4000000' onOverflow='deadMessage'/>
00476        <queue relating='history' type='CACHE' version='1.0' maxEntries='1000' maxBytes='4000000' onOverflow='exception'/>
00477     </topic>
00478  </qos>
00479 
00480 
00481 
00482 
00483  <qos>
00484     <destination queryType='EXACT' forceQueuing='true'>
00485        Tim
00486     </destination>
00487     <destination queryType='EXACT'>
00488        /node/heron/client/Ben
00489     </destination>
00490     <destination queryType='XPATH'>   <!-- Not supported yet -->
00491        //[GROUP='Manager']
00492     </destination>
00493     <destination queryType='XPATH'>   <!-- Not supported yet -->
00494        //ROLE/[@id='Developer']
00495     </destination>
00496     <sender>
00497        Gesa
00498     </sender>
00499     <priority>7</priority>
00500     <route>
00501        <node id='bilbo' stratum='2' timestamp='34460239640' dirtyRead='true'/>
00502     </route>
00503  </qos>
00504 
00505 */
00506 
00507 
00508