00001 /*------------------------------------------------------------------------------ 00002 Name: MsgQosSaxFactory.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 ------------------------------------------------------------------------------*/ 00006 #include <util/qos/MsgQosFactory.h> 00007 #include <util/MethodName.h> 00008 #include <util/Global.h> 00009 #include <util/StringTrim.h> 00010 #include <util/lexical_cast.h> 00011 00012 00013 using namespace org::xmlBlaster::util; 00014 using namespace org::xmlBlaster::util::parser; 00015 using namespace org::xmlBlaster::util::cluster; 00016 using namespace org::xmlBlaster::util::qos::storage; 00017 using namespace std; 00018 00019 namespace org { namespace xmlBlaster { namespace util { namespace qos { 00020 00021 MsgQosFactory::MsgQosFactory(Global& global) 00022 : XmlHandlerBase(global), 00023 ME("MsgQosFactory"), 00024 msgQosDataP_(0), 00025 destination_(global), 00026 routeInfo_(global), 00027 queuePropertyFactory_(global), 00028 clientProperty_(0) 00029 { 00030 ME = string("MsgQosFactory"); 00031 LIFE_TIME = string("lifeTime"); 00032 FORCE_DESTROY = string("forceDestroy"); 00033 REMAINING_LIFE = string("remainingLife"); 00034 READ_ONLY = string("readOnly"); 00035 DESTROY_DELAY = string("destroyDelay"); 00036 CREATE_DOM_ENTRY = string("createDomEntry"); 00037 NANOS = string("nanos"); 00038 ID = string("id"); 00039 STRATUM = string("stratum"); 00040 TIMESTAMP = string("timestamp"); 00041 DIRTY_READ = string("dirtyRead"); 00042 INDEX = string("index"); 00043 SIZE = string("size"); 00044 inState_ = false; 00045 inSubscribe_ = false; 00046 inRedeliver_ = false; 00047 inQueue_ = false; 00048 inPersistence_ = false; 00049 inDestination_ = false; 00050 inSender_ = false; 00051 inPriority_ = false; 00052 inClientProperty_ = false; 00053 inExpiration_ = false; 00054 inRcvTimestamp_ = false; 00055 inIsVolatile_ = false; 00056 inIsPersistent_ = false; 00057 inReadonly_ = false; 00058 inRoute_ = false; 00059 sendRemainingLife_ = true; 00060 inQos_ = false; 00061 } 00062 00063 MsgQosFactory::~MsgQosFactory() 00064 { 00065 if (clientProperty_ != 0) { 00066 delete(clientProperty_); 00067 } 00068 if (msgQosDataP_ != 0) { 00069 delete(msgQosDataP_); 00070 } 00071 } 00072 00073 MsgQosData MsgQosFactory::readObject(const string& xmlQos) 00074 { 00075 delete msgQosDataP_; 00076 msgQosDataP_ = new MsgQosData(global_); 00077 routeInfo_ = RouteInfo(global_); 00078 //queuePropertyFactory_ = QueuePropertyFactory(global_); 00079 delete clientProperty_; 00080 clientProperty_ = 0; 00081 00082 if (xmlQos.empty()) init("<qos/>"); 00083 else init(xmlQos); 00084 return *msgQosDataP_; 00085 } 00086 00087 void MsgQosFactory::startElement(const string &name, const AttributeMap& attrs) 00088 { 00089 bool tmpBool; 00090 string tmpString; 00091 long tmpLong; 00092 Timestamp tmpTimestamp; 00093 if (name.compare("qos") == 0) { 00094 inQos_ = true; 00095 return; 00096 } 00097 if (name.compare("persistence") == 0 || inPersistence_) { 00098 if (!inQos_) return; 00099 inPersistence_ = true; 00100 queuePropertyFactory_.startElement(name, attrs); 00101 return; 00102 } 00103 if (name.compare("state") == 0) { 00104 if (!inQos_) return; 00105 inState_ = true; 00106 AttributeMap::const_iterator iter = attrs.begin(); 00107 string tmpName = (*iter).first; 00108 string tmpValue = (*iter).second; 00109 while (iter != attrs.end()) { 00110 if (tmpName.compare("id") == 0) { 00111 msgQosDataP_->setState(tmpValue); 00112 } 00113 else if (tmpName.compare("info") == 0) { 00114 msgQosDataP_->setStateInfo(tmpValue); 00115 } 00116 iter++; 00117 } 00118 return; 00119 } 00120 if (name.compare("destination") == 0) { 00121 if (!inQos_) return; 00122 inDestination_ = true; 00123 destination_ = Destination(global_); 00124 AttributeMap::const_iterator iter = attrs.begin(); 00125 while (iter != attrs.end()) { 00126 string tmpName = (*iter).first; 00127 string tmpValue = (*iter).second; 00128 if (tmpName.compare("queryType") == 0) { 00129 string queryType = tmpValue; 00130 if (queryType == "EXACT") destination_.setQueryType(queryType); 00131 else if (queryType == "XPATH") destination_.setQueryType(queryType); 00132 else log_.error(ME, string("Sorry, destination queryType='") + queryType + string("' is not supported")); 00133 } 00134 else if( tmpName.compare("forceQueuing") == 0) { 00135 destination_.forceQueuing(XmlHandlerBase::getBoolValue(tmpValue)); 00136 } 00137 iter++; 00138 } 00139 00140 return; 00141 } 00142 if (name.compare("sender") == 0) { 00143 if (!inQos_) return; 00144 inSender_ = true; 00145 return; 00146 } 00147 if (name.compare("priority") == 0) { 00148 if (!inQos_) return; 00149 inPriority_ = true; 00150 return; 00151 } 00152 if (name.compare("expiration") == 0) { 00153 if (!inQos_) return; 00154 inExpiration_ = true; 00155 // int len = attrs.getLength(); 00156 if (getLongAttr(attrs, LIFE_TIME, tmpLong)) msgQosDataP_->setLifeTime(tmpLong); 00157 else { 00158 log_.warn(ME, string("QoS <expiration> misses lifeTime attribute, setting default of ") + lexical_cast<std::string>(msgQosDataP_->getMaxLifeTime())); 00159 msgQosDataP_->setLifeTime(msgQosDataP_->getMaxLifeTime()); 00160 } 00161 if (getBoolAttr(attrs, FORCE_DESTROY, tmpBool)) msgQosDataP_->setForceDestroy(tmpBool); 00162 if (getLongAttr(attrs, REMAINING_LIFE, tmpLong)) msgQosDataP_->setRemainingLifeStatic(tmpLong); 00163 return; 00164 } 00165 if (name.compare("rcvTimestamp") == 0) { 00166 if (!inQos_) return; 00167 if (getTimestampAttr(attrs, NANOS, tmpTimestamp)) msgQosDataP_->setRcvTimestamp(tmpTimestamp); 00168 inRcvTimestamp_ = true; 00169 return; 00170 } 00171 if (name.compare("redeliver") == 0) { 00172 if (!inQos_) return; 00173 inRedeliver_ = true; 00174 return; 00175 } 00176 if (name.compare("route") == 0) { 00177 if (!inQos_) return; 00178 inRoute_ = true; 00179 return; 00180 } 00181 if (name.compare("node") == 0) { 00182 if (!inRoute_) { 00183 log_.error(ME, "Ignoring <node>, it is not inside <route>"); 00184 return; 00185 } 00186 if (attrs.size() > 0) { 00187 if (!getStringAttr(attrs, ID, tmpString)) { 00188 log_.error(ME, "QoS <route><node> misses id attribute, ignoring node"); 00189 return; 00190 } 00191 NodeId nodeId(global_, tmpString); // where tmpString is the id 00192 int stratum = 0; 00193 if (!getIntAttr(attrs, STRATUM, stratum)) { 00194 log_.warn(ME, "QoS <route><node> misses stratum attribute, setting to 0: "); 00195 //Thread.currentThread().dumpStack(); 00196 } 00197 Timestamp timestamp = 0; 00198 if (!getTimestampAttr(attrs, TIMESTAMP, timestamp)) { 00199 log_.warn(ME, "QoS <route><node> misses receive timestamp attribute, setting to 0"); 00200 } 00201 // bool dirtyRead = org::xmlBlaster::util::cluster::DEFAULT_dirtyRead; 00202 if (log_.trace()) log_.trace(ME, "Found node tag"); 00203 routeInfo_ = RouteInfo(nodeId, stratum, timestamp); 00204 if (getBoolAttr(attrs, DIRTY_READ, tmpBool)) routeInfo_.setDirtyRead(tmpBool); 00205 } 00206 return; 00207 } 00208 if (name.compare(MethodName::SUBSCRIBE) == 0) { 00209 if (!inQos_) return; 00210 inSubscribe_ = true; 00211 AttributeMap::const_iterator iter = attrs.begin(); 00212 while (iter != attrs.end()) { 00213 if ( ((*iter).first).compare("id") == 0) { 00214 msgQosDataP_->setSubscriptionId( (*iter).second ); 00215 } 00216 iter++; 00217 } 00218 return; 00219 } 00220 00221 if (name.compare("persistent") == 0) { 00222 if (!inQos_) return; 00223 msgQosDataP_->setPersistent(true); 00224 return; 00225 } 00226 00227 if (name.compare("forceUpdate") == 0) { 00228 if (!inQos_) return; 00229 msgQosDataP_->setForceUpdate(true); 00230 return; 00231 } 00232 00233 if (name.compare("readonly") == 0) { 00234 if (!inQos_) return; 00235 msgQosDataP_->setReadonly(true); 00236 log_.error(ME, "<qos><readonly/></qos> is deprecated, please use readonly as topic attribute <qos><topic readonly='true'></qos>"); 00237 return; 00238 } 00239 00240 if (name.compare("clientProperty") == 0) { 00241 if (!inQos_) return; 00242 inClientProperty_ = true; 00243 character_.erase(); 00244 string nameAttr; 00245 00246 AttributeMap::const_iterator iter = attrs.find("name"); 00247 if (iter != attrs.end()) nameAttr = (*iter).second; 00248 00249 string encoding; 00250 iter = attrs.find("encoding"); 00251 if (iter != attrs.end()) encoding = (*iter).second; 00252 00253 string type; 00254 iter = attrs.find("type"); 00255 if (iter != attrs.end()) type = (*iter).second; 00256 00257 string charset; 00258 iter = attrs.find("charset"); 00259 if (iter != attrs.end()) charset = (*iter).second; 00260 00261 clientProperty_ = new ClientProperty(true, nameAttr, type, encoding, charset); 00262 } 00263 00264 } 00265 00266 00267 void MsgQosFactory::characters(const string &ch) 00268 { 00269 XmlHandlerBase::characters(ch); 00270 if (inQueue_ || inPersistence_) queuePropertyFactory_.characters(ch); 00271 } 00272 00273 00274 void MsgQosFactory::endElement(const string &name) 00275 { 00276 if (name.compare("qos") == 0) { 00277 inQos_ = false; 00278 return; 00279 } 00280 //log_.error(ME, "endElement: name=" + name + " character_=" + character_); 00281 if (inQueue_ || inPersistence_) { 00282 queuePropertyFactory_.endElement(name); 00283 if(name.compare("queue") == 0) { 00284 inQueue_ = false; 00285 character_.erase(); 00286 QueuePropertyBase tmp = queuePropertyFactory_.getQueueProperty(); 00287 string relating = tmp.getRelating(); 00288 TopicProperty tmpProp = msgQosDataP_->getTopicProperty(); 00289 if (relating == Constants::RELATING_HISTORY) { 00290 tmpProp.setHistoryQueueProperty(tmp); 00291 msgQosDataP_->setTopicProperty(tmpProp); 00292 } 00293 else { 00294 log_.error(ME, string("Ignoring unknown <queue relating='") + relating + "'/> configuration"); 00295 } 00296 return; 00297 } 00298 00299 if(name.compare("persistence") == 0) { // topic: RELATING_MSGUNITSTORE 00300 inPersistence_ = false; 00301 character_.erase(); 00302 QueuePropertyBase tmp = queuePropertyFactory_.getQueueProperty(); 00303 TopicProperty tmpProp = msgQosDataP_->getTopicProperty(); 00304 tmpProp.setMsgUnitStoreProperty(tmp); 00305 msgQosDataP_->setTopicProperty(tmpProp); 00306 return; 00307 } 00308 } 00309 00310 if (name.compare("state") == 0) { 00311 inState_ = false; 00312 character_.erase(); 00313 return; 00314 } 00315 00316 if( name.compare("destination") == 0) { 00317 inDestination_ = false; 00318 StringTrim::trim(character_); // The address or XPath query string 00319 if (!character_.empty()) { 00320 destination_.getDestination()->setAbsoluteName(character_); // set address or XPath query string if it is before the forceQueuing tag 00321 character_.erase(); 00322 } 00323 msgQosDataP_->addDestination(destination_); 00324 return; 00325 } 00326 00327 if(name.compare("sender") == 0) { 00328 inSender_ = false; 00329 StringTrim::trim(character_); 00330 msgQosDataP_->getSender()->setAbsoluteName(character_); 00331 // if (log.trace()) log.trace(ME, "Found message sender login name = " + msgQosData.getSender()); 00332 character_.erase(); 00333 return; 00334 } 00335 00336 if(name.compare("priority") == 0) { 00337 inPriority_ = false; 00338 msgQosDataP_->setPriority(str2Priority(character_)); 00339 character_.erase(); 00340 return; 00341 } 00342 00343 if(name.compare("expiration") == 0) { 00344 inExpiration_ = false; 00345 character_.erase(); 00346 return; 00347 } 00348 00349 if(name.compare("rcvTimestamp") == 0) { 00350 inRcvTimestamp_ = false; 00351 character_.erase(); 00352 return; 00353 } 00354 00355 if(name.compare("forceUpdate") == 0) { 00356 inIsVolatile_ = false; 00357 msgQosDataP_->setForceUpdate(StringTrim::isTrueTrim(character_)); 00358 character_.erase(); 00359 return; 00360 } 00361 00362 if (name.compare(MethodName::SUBSCRIBE) == 0) { 00363 inSubscribe_ = false; 00364 character_.erase(); 00365 return; 00366 } 00367 00368 if(name.compare("persistent") == 0) { 00369 inIsPersistent_ = false; 00370 msgQosDataP_->setPersistent(StringTrim::isTrueTrim(character_)); 00371 character_.erase(); 00372 return; 00373 } 00374 00375 if(name.compare("readonly") == 0) { 00376 inReadonly_ = false; 00377 msgQosDataP_->setReadonly(StringTrim::isTrueTrim(character_)); 00378 character_.erase(); 00379 return; 00380 } 00381 00382 if(name.compare("redeliver") == 0) { 00383 inRedeliver_ = false; 00384 StringTrim::trim(character_); 00385 msgQosDataP_->setRedeliver(atoi(character_.c_str())); 00386 character_.erase(); 00387 return; 00388 } 00389 00390 if (name.compare("node") == 0) { 00391 msgQosDataP_->addRouteInfo(routeInfo_); 00392 character_.erase(); 00393 return; 00394 } 00395 00396 if (name.compare("route") == 0) { 00397 inRoute_ = false; 00398 character_.erase(); 00399 return; 00400 } 00401 00402 if (name.compare("clientProperty") == 0) { 00403 inClientProperty_ = false; 00404 clientProperty_->setValueRaw(character_); 00405 msgQosDataP_->addClientProperty(*clientProperty_); 00406 delete clientProperty_; 00407 clientProperty_ = 0; 00408 character_.erase(); 00409 } 00410 00411 character_.erase(); // reset data from unknown tags 00412 } 00413 00414 00416 void MsgQosFactory::sendRemainingLife(bool sendRemainingLife) 00417 { 00418 sendRemainingLife_ = sendRemainingLife; 00419 } 00420 00421 bool MsgQosFactory::sendRemainingLife() 00422 { 00423 return sendRemainingLife_; 00424 } 00425 00426 }}}} 00427 00428 00429 #ifdef _XMLBLASTER_CLASSTEST 00430 00431 using namespace std; 00432 using namespace org::xmlBlaster::util::qos; 00433 00434 int main(int args, char* argv[]) 00435 { 00436 try 00437 { 00438 Global& glob = Global::getInstance(); 00439 glob.initialize(args, argv); 00440 00441 MsgQosData data1(glob); 00442 MsgQosFactory factory(glob); 00443 string qos = data1.toXml(); 00444 MsgQosData data2 = factory.readObject(qos); 00445 00446 cout << "data before parsing: " << data1.toXml() << endl; 00447 cout << "data after parsing : " << data2.toXml() << endl; 00448 } 00449 catch(...) { 00450 cout << "exception occured\n"; 00451 return 1; 00452 } 00453 return 0; 00454 } 00455 00456 #endif 00457 00458 /* 00459 <qos> 00460 <state id='OK' info='Keep on running"/> <!-- Only for updates and PtP --> 00461 <sender>Tim</sender> 00462 <priority>5</priority> 00463 <subscribe id='__subId:1'/> <!-- Only for updates, id='__subId:PtP' for point to point messages --> 00464 <rcvTimestamp nanos='1007764305862000002'> <!-- UTC time when message was created in xmlBlaster server with a publish() call, in nanoseconds since 1970 --> 00465 2001-12-07 23:31:45.862000002 <!-- The nanos from above but human readable --> 00466 </rcvTimestamp> 00467 <expiration lifeTime='129595811' forceDestroy='false'/> <!-- Only for persistence layer --> 00468 <queue index='0' of='1'/> <!-- If queued messages are flushed on login --> 00469 <persistent/> 00470 <redeliver>4</redeliver> <!-- Only for updates --> 00471 <route> 00472 <node id='heron'/> 00473 </route> 00474 <topic readonly='false' destroyDelay='60000' createDomEntry='true'> 00475 <queue relating='topic' type='CACHE' version='1.0' maxEntries='1000' maxBytes='4000000' onOverflow='deadMessage'/> 00476 <queue relating='history' type='CACHE' version='1.0' maxEntries='1000' maxBytes='4000000' onOverflow='exception'/> 00477 </topic> 00478 </qos> 00479 00480 00481 00482 00483 <qos> 00484 <destination queryType='EXACT' forceQueuing='true'> 00485 Tim 00486 </destination> 00487 <destination queryType='EXACT'> 00488 /node/heron/client/Ben 00489 </destination> 00490 <destination queryType='XPATH'> <!-- Not supported yet --> 00491 //[GROUP='Manager'] 00492 </destination> 00493 <destination queryType='XPATH'> <!-- Not supported yet --> 00494 //ROLE/[@id='Developer'] 00495 </destination> 00496 <sender> 00497 Gesa 00498 </sender> 00499 <priority>7</priority> 00500 <route> 00501 <node id='bilbo' stratum='2' timestamp='34460239640' dirtyRead='true'/> 00502 </route> 00503 </qos> 00504 00505 */ 00506 00507 00508