1 /*------------------------------------------------------------------------------
  2 Name:      MsgQosSaxFactory.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 #include <util/qos/MsgQosFactory.h>
  7 #include <util/MethodName.h>
  8 #include <util/Global.h>
  9 #include <util/StringTrim.h>
 10 #include <util/lexical_cast.h>
 11 
 12 
 13 using namespace org::xmlBlaster::util;
 14 using namespace org::xmlBlaster::util::parser;
 15 using namespace org::xmlBlaster::util::cluster;
 16 using namespace org::xmlBlaster::util::qos::storage;
 17 using namespace std;
 18 
 19 namespace org { namespace xmlBlaster { namespace util { namespace qos {
 20 
 21 MsgQosFactory::MsgQosFactory(Global& global)
 22    : XmlHandlerBase(global), 
 23      ME("MsgQosFactory"),
 24      msgQosDataP_(0), 
 25      destination_(global), 
 26      routeInfo_(global),
 27      queuePropertyFactory_(global),
 28      clientProperty_(0)
 29 {
 30    ME                 = string("MsgQosFactory");
 31    LIFE_TIME          = string("lifeTime");
 32    FORCE_DESTROY      = string("forceDestroy");
 33    REMAINING_LIFE     = string("remainingLife");
 34    READ_ONLY          = string("readOnly");
 35    DESTROY_DELAY      = string("destroyDelay");
 36    CREATE_DOM_ENTRY   = string("createDomEntry");
 37    NANOS              = string("nanos");
 38    ID                 = string("id");
 39    STRATUM            = string("stratum");
 40    TIMESTAMP          = string("timestamp");
 41    DIRTY_READ         = string("dirtyRead");
 42    INDEX              = string("index");
 43    SIZE               = string("size");
 44    inState_           = false;
 45    inSubscribe_       = false;
 46    inRedeliver_       = false;
 47    inQueue_           = false;
 48    inPersistence_     = false;
 49    inDestination_     = false;
 50    inSender_          = false;
 51    inPriority_        = false;
 52    inAdministrative_  = false;
 53    inClientProperty_  = false;
 54    inExpiration_      = false;
 55    inRcvTimestamp_    = false;
 56    inIsVolatile_      = false;
 57    inIsPersistent_    = false;
 58    inReadonly_        = false;
 59    inRoute_           = false;
 60    sendRemainingLife_ = true;
 61    inQos_             = false;
 62 }
 63 
 64 MsgQosFactory::~MsgQosFactory() 
 65 {
 66    if (clientProperty_ != 0) {
 67       delete(clientProperty_);
 68    }
 69    if (msgQosDataP_ != 0) {
 70       delete(msgQosDataP_);
 71    }
 72 }                
 73 
 74 MsgQosData MsgQosFactory::readObject(const string& xmlQos)
 75 {
 76    delete msgQosDataP_;
 77    msgQosDataP_ = new MsgQosData(global_);
 78    routeInfo_ = RouteInfo(global_);
 79    //queuePropertyFactory_ = QueuePropertyFactory(global_);
 80    delete clientProperty_;
 81    clientProperty_ = 0;
 82 
 83    if (xmlQos.empty()) init("<qos/>");
 84    else init(xmlQos);
 85    return *msgQosDataP_;
 86 }
 87 
 88 void MsgQosFactory::startElement(const string &name, const AttributeMap& attrs)
 89 {
 90    bool      tmpBool;
 91    string    tmpString;
 92    long      tmpLong;
 93    Timestamp tmpTimestamp;
 94    if (name.compare("qos") == 0) {
 95      inQos_ = true;
 96      return;
 97    }
 98    if (name.compare("persistence") == 0 || inPersistence_) {
 99       if (!inQos_) return;
100       inPersistence_ = true;
101       queuePropertyFactory_.startElement(name, attrs);
102       return;
103    }
104    if (name.compare("state") == 0) {
105       if (!inQos_) return;
106       inState_ = true;
107       AttributeMap::const_iterator iter = attrs.begin();
108       string tmpName = (*iter).first;
109       string tmpValue = (*iter).second;
110       while (iter != attrs.end()) {
111          if (tmpName.compare("id") == 0) {
112             msgQosDataP_->setState(tmpValue);
113          }
114          else if (tmpName.compare("info") == 0) {
115             msgQosDataP_->setStateInfo(tmpValue);
116          }
117          iter++;
118       }
119       return;
120    }
121    if (name.compare("destination") == 0) {
122       if (!inQos_) return;
123       inDestination_ = true;
124       destination_ = Destination(global_);
125       AttributeMap::const_iterator iter = attrs.begin();
126       while (iter != attrs.end()) {
127          string tmpName = (*iter).first;
128          string tmpValue = (*iter).second;
129          if (tmpName.compare("queryType") == 0) {
130             string queryType = tmpValue;
131             if (queryType == "EXACT")      destination_.setQueryType(queryType);
132             else if (queryType == "XPATH") destination_.setQueryType(queryType);
133             else log_.error(ME, string("Sorry, destination queryType='") + queryType + string("' is not supported"));
134          }
135          else if( tmpName.compare("forceQueuing") == 0) {
136             destination_.forceQueuing(XmlHandlerBase::getBoolValue(tmpValue));
137          }
138          iter++;
139       }
140 
141       return;
142    }
143    if (name.compare("sender") == 0) {
144       if (!inQos_) return;
145       inSender_ = true;
146       return;
147    }
148    if (name.compare("priority") == 0) {
149       if (!inQos_) return;
150       inPriority_ = true;
151       return;
152    }
153    if (name.compare("administrative") == 0) {
154       if (!inQos_) return;
155       inAdministrative_ = true;
156       return;
157    }
158    if (name.compare("expiration") == 0) {
159       if (!inQos_) return;
160       inExpiration_ = true;
161 //         int len = attrs.getLength();
162       if (getLongAttr(attrs, LIFE_TIME, tmpLong)) msgQosDataP_->setLifeTime(tmpLong);
163       else {
164          log_.warn(ME, string("QoS <expiration> misses lifeTime attribute, setting default of ") + lexical_cast<std::string>(msgQosDataP_->getMaxLifeTime()));
165          msgQosDataP_->setLifeTime(msgQosDataP_->getMaxLifeTime());
166       }
167       if (getBoolAttr(attrs, FORCE_DESTROY, tmpBool)) msgQosDataP_->setForceDestroy(tmpBool);
168       if (getLongAttr(attrs, REMAINING_LIFE, tmpLong)) msgQosDataP_->setRemainingLifeStatic(tmpLong);
169       return;
170    }
171    if (name.compare("rcvTimestamp") == 0) {
172       if (!inQos_) return;
173       if (getTimestampAttr(attrs, NANOS, tmpTimestamp)) msgQosDataP_->setRcvTimestamp(tmpTimestamp);
174       inRcvTimestamp_ = true;
175       return;
176    }
177    if (name.compare("redeliver") == 0) {
178       if (!inQos_) return;
179       inRedeliver_ = true;
180       return;
181    }
182    if (name.compare("route") == 0) {
183       if (!inQos_) return;
184       inRoute_ = true;
185       return;
186    }
187    if (name.compare("node") == 0) {
188       if (!inRoute_) {
189          log_.error(ME, "Ignoring <node>, it is not inside <route>");
190          return;
191       }
192       if (attrs.size() > 0) {
193          if (!getStringAttr(attrs, ID, tmpString)) {
194             log_.error(ME, "QoS <route><node> misses id attribute, ignoring node");
195             return;
196          }
197          NodeId nodeId(global_, tmpString); // where tmpString is the id
198          int stratum = 0;
199          if (!getIntAttr(attrs, STRATUM, stratum)) {
200             log_.warn(ME, "QoS <route><node> misses stratum attribute, setting to 0: ");
201             //Thread.currentThread().dumpStack();
202          }
203          Timestamp timestamp = 0;
204          if (!getTimestampAttr(attrs, TIMESTAMP, timestamp)) {
205             log_.warn(ME, "QoS <route><node> misses receive timestamp attribute, setting to 0");
206          }
207     //      bool dirtyRead = org::xmlBlaster::util::cluster::DEFAULT_dirtyRead;
208          if (log_.trace()) log_.trace(ME, "Found node tag");
209          routeInfo_ = RouteInfo(nodeId, stratum, timestamp);
210          if (getBoolAttr(attrs, DIRTY_READ, tmpBool)) routeInfo_.setDirtyRead(tmpBool);
211       }
212       return;
213    }
214    if (name.compare(MethodName::SUBSCRIBE) == 0) {
215       if (!inQos_) return;
216       inSubscribe_ = true;
217       AttributeMap::const_iterator iter = attrs.begin();
218       while (iter != attrs.end()) {
219          if ( ((*iter).first).compare("id") == 0) {
220             msgQosDataP_->setSubscriptionId( (*iter).second );
221          }
222          iter++;
223       }
224       return;
225    }
226 
227    if (name.compare("persistent") == 0) {
228       if (!inQos_) return;
229       msgQosDataP_->setPersistent(true);
230       return;
231    }
232 
233    if (name.compare("forceUpdate") == 0) {
234       if (!inQos_) return;
235       msgQosDataP_->setForceUpdate(true);
236       return;
237    }
238 
239    if (name.compare("readonly") == 0) {
240       if (!inQos_) return;
241       msgQosDataP_->setReadonly(true);
242       log_.error(ME, "<qos><readonly/></qos> is deprecated, please use readonly as topic attribute <qos><topic readonly='true'></qos>");
243       return;
244    }
245    
246    if (name.compare("clientProperty") == 0) {
247       if (!inQos_) return;
248       inClientProperty_ = true;
249       character_.erase();
250       string nameAttr;
251 
252       AttributeMap::const_iterator iter = attrs.find("name");
253       if (iter != attrs.end()) nameAttr = (*iter).second;
254 
255       string encoding;
256       iter = attrs.find("encoding");
257       if (iter != attrs.end()) encoding = (*iter).second;
258 
259       string type;
260       iter = attrs.find("type");
261       if (iter != attrs.end()) type = (*iter).second;
262 
263       string charset;
264       iter = attrs.find("charset");
265       if (iter != attrs.end()) charset = (*iter).second;
266 
267       clientProperty_ = new ClientProperty(true, nameAttr, type, encoding, charset);
268    }
269       
270 }
271 
272 
273 void MsgQosFactory::characters(const string &ch) 
274 {
275    XmlHandlerBase::characters(ch);
276    if (inQueue_ || inPersistence_) queuePropertyFactory_.characters(ch);
277 }
278 
279 
280 void MsgQosFactory::endElement(const string &name) 
281 {
282    if (name.compare("qos") == 0) {
283       inQos_ = false;
284       return;
285    }
286    //log_.error(ME, "endElement: name=" + name + " character_=" + character_);
287    if (inQueue_ || inPersistence_) {
288       queuePropertyFactory_.endElement(name);
289       if(name.compare("queue") == 0) {
290          inQueue_ = false;
291          character_.erase();
292          QueuePropertyBase tmp = queuePropertyFactory_.getQueueProperty();
293          string relating = tmp.getRelating();
294          TopicProperty tmpProp = msgQosDataP_->getTopicProperty();
295          if (relating == Constants::RELATING_HISTORY) {
296             tmpProp.setHistoryQueueProperty(tmp);
297             msgQosDataP_->setTopicProperty(tmpProp);
298          }
299          else {
300             log_.error(ME, string("Ignoring unknown <queue relating='") + relating + "'/> configuration");
301          }
302          return;
303       }
304 
305       if(name.compare("persistence") == 0) { // topic: RELATING_MSGUNITSTORE
306          inPersistence_ = false;
307          character_.erase();
308          QueuePropertyBase tmp = queuePropertyFactory_.getQueueProperty();
309          TopicProperty tmpProp = msgQosDataP_->getTopicProperty();
310          tmpProp.setMsgUnitStoreProperty(tmp);
311          msgQosDataP_->setTopicProperty(tmpProp);
312          return;
313       }
314    }
315 
316    if (name.compare("state") == 0) {
317       inState_ = false;
318       character_.erase();
319       return;
320    }
321 
322    if( name.compare("destination") == 0) {
323       inDestination_ = false;
324       StringTrim::trim(character_); // The address or XPath query string
325       if (!character_.empty()) {
326          destination_.getDestination()->setAbsoluteName(character_); // set address or XPath query string if it is before the forceQueuing tag
327          character_.erase();
328       }
329       msgQosDataP_->addDestination(destination_);
330       return;
331    }
332 
333    if(name.compare("sender") == 0) {
334       inSender_ = false;
335       StringTrim::trim(character_);
336       msgQosDataP_->getSender()->setAbsoluteName(character_);
337       // if (log.trace()) log.trace(ME, "Found message sender login name = " + msgQosData.getSender());
338       character_.erase();
339       return;
340    }
341 
342    if(name.compare("priority") == 0) {
343       inPriority_ = false;
344       msgQosDataP_->setPriority(str2Priority(character_));
345       character_.erase();
346       return;
347    }
348 
349    if(name.compare("administrative") == 0) {
350       inAdministrative_ = false;
351       msgQosDataP_->setAdministrative(XmlHandlerBase::getBoolValue(character_));
352       character_.erase();
353       return;
354    }
355 
356    if(name.compare("expiration") == 0) {
357       inExpiration_ = false;
358       character_.erase();
359       return;
360    }
361 
362    if(name.compare("rcvTimestamp") == 0) {
363       inRcvTimestamp_ = false;
364       character_.erase();
365       return;
366    }
367 
368    if(name.compare("forceUpdate") == 0) {
369       inIsVolatile_ = false;
370       msgQosDataP_->setForceUpdate(StringTrim::isTrueTrim(character_));
371       character_.erase();
372       return;
373    }
374 
375    if (name.compare(MethodName::SUBSCRIBE) == 0) {
376       inSubscribe_ = false;
377       character_.erase();
378       return;
379    }
380 
381    if(name.compare("persistent") == 0) {
382       inIsPersistent_ = false;
383       msgQosDataP_->setPersistent(StringTrim::isTrueTrim(character_));
384       character_.erase();
385       return;
386    }
387 
388    if(name.compare("readonly") == 0) {
389       inReadonly_ = false;
390       msgQosDataP_->setReadonly(StringTrim::isTrueTrim(character_));
391       character_.erase();
392       return;
393    }
394 
395    if(name.compare("redeliver") == 0) {
396       inRedeliver_ = false;
397       StringTrim::trim(character_);
398       msgQosDataP_->setRedeliver(atoi(character_.c_str()));
399       character_.erase();
400       return;
401    }
402 
403    if (name.compare("node") == 0) {
404       msgQosDataP_->addRouteInfo(routeInfo_);
405       character_.erase();
406       return;
407    }
408 
409    if (name.compare("route") == 0) {
410       inRoute_ = false;
411       character_.erase();
412       return;
413    }
414 
415    if (name.compare("clientProperty") == 0) {
416       inClientProperty_ = false;
417       clientProperty_->setValueRaw(character_);
418       msgQosDataP_->addClientProperty(*clientProperty_);
419       delete clientProperty_;
420       clientProperty_ = 0;
421       character_.erase();
422    }
423 
424    character_.erase(); // reset data from unknown tags
425 }
426 
427 
428 /** Configure if remaingLife is sent in Qos (redesign approach to work with all QoS attributes */
429 void MsgQosFactory::sendRemainingLife(bool sendRemainingLife) 
430 { 
431    sendRemainingLife_ = sendRemainingLife; 
432 }
433 
434 bool MsgQosFactory::sendRemainingLife() 
435 { 
436    return sendRemainingLife_; 
437 }
438 
439 }}}}
440 
441 
442 #ifdef _XMLBLASTER_CLASSTEST
443 
444 using namespace std;
445 using namespace org::xmlBlaster::util::qos;
446 
447 int main(int args, char* argv[])
448 {
449     try
450     {
451        Global& glob = Global::getInstance();
452        glob.initialize(args, argv);
453 
454        MsgQosData    data1(glob);
455        MsgQosFactory factory(glob);
456        string        qos   = data1.toXml();
457        MsgQosData    data2 = factory.readObject(qos);
458 
459        cout << "data before parsing: " << data1.toXml() << endl;
460        cout << "data after parsing : " << data2.toXml() << endl;
461     }
462     catch(...)  {
463        cout << "exception occured\n";
464        return 1;
465     }
466    return 0;
467 }
468 
469 #endif
470 
471 /*
472  <qos>
473     <state id='OK' info='Keep on running"/> <!-- Only for updates and PtP -->
474     <sender>Tim</sender>
475     <priority>5</priority>
476     <administrative>false</administrative>
477     <subscribe id='__subId:1'/>     <!-- Only for updates, id='__subId:PtP' for point to point messages -->
478     <rcvTimestamp nanos='1007764305862000002'> <!-- UTC time when message was created in xmlBlaster server with a publish() call, in nanoseconds since 1970 -->
479           2001-12-07 23:31:45.862000002   <!-- The nanos from above but human readable -->
480     </rcvTimestamp>
481     <expiration lifeTime='129595811' forceDestroy='false'/> <!-- Only for persistence layer -->
482     <queue index='0' of='1'/> <!-- If queued messages are flushed on login -->
483     <persistent/>
484     <redeliver>4</redeliver>             <!-- Only for updates -->
485     <route>
486        <node id='heron'/>
487     </route>
488     <topic readonly='false' destroyDelay='60000' createDomEntry='true'>
489        <queue relating='topic' type='CACHE' version='1.0' maxEntries='1000' maxBytes='4000000' onOverflow='deadMessage'/>
490        <queue relating='history' type='CACHE' version='1.0' maxEntries='1000' maxBytes='4000000' onOverflow='exception'/>
491     </topic>
492  </qos>
493 
494 
495 
496 
497  <qos>
498     <destination queryType='EXACT' forceQueuing='true'>
499        Tim
500     </destination>
501     <destination queryType='EXACT'>
502        /node/heron/client/Ben
503     </destination>
504     <destination queryType='XPATH'>   <!-- Not supported yet -->
505        //[GROUP='Manager']
506     </destination>
507     <destination queryType='XPATH'>   <!-- Not supported yet -->
508        //ROLE/[@id='Developer']
509     </destination>
510     <sender>
511        Gesa
512     </sender>
513     <priority>7</priority>
514     <route>
515        <node id='bilbo' stratum='2' timestamp='34460239640' dirtyRead='true'/>
516     </route>
517  </qos>
518 
519 */


syntax highlighted by Code2HTML, v. 0.9.1