1 /*------------------------------------------------------------------------------
2 Name: MsgQosSaxFactory.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 #include <util/qos/MsgQosFactory.h>
7 #include <util/MethodName.h>
8 #include <util/Global.h>
9 #include <util/StringTrim.h>
10 #include <util/lexical_cast.h>
11
12
13 using namespace org::xmlBlaster::util;
14 using namespace org::xmlBlaster::util::parser;
15 using namespace org::xmlBlaster::util::cluster;
16 using namespace org::xmlBlaster::util::qos::storage;
17 using namespace std;
18
19 namespace org { namespace xmlBlaster { namespace util { namespace qos {
20
21 MsgQosFactory::MsgQosFactory(Global& global)
22 : XmlHandlerBase(global),
23 ME("MsgQosFactory"),
24 msgQosDataP_(0),
25 destination_(global),
26 routeInfo_(global),
27 queuePropertyFactory_(global),
28 clientProperty_(0)
29 {
30 ME = string("MsgQosFactory");
31 LIFE_TIME = string("lifeTime");
32 FORCE_DESTROY = string("forceDestroy");
33 REMAINING_LIFE = string("remainingLife");
34 READ_ONLY = string("readOnly");
35 DESTROY_DELAY = string("destroyDelay");
36 CREATE_DOM_ENTRY = string("createDomEntry");
37 NANOS = string("nanos");
38 ID = string("id");
39 STRATUM = string("stratum");
40 TIMESTAMP = string("timestamp");
41 DIRTY_READ = string("dirtyRead");
42 INDEX = string("index");
43 SIZE = string("size");
44 inState_ = false;
45 inSubscribe_ = false;
46 inRedeliver_ = false;
47 inQueue_ = false;
48 inPersistence_ = false;
49 inDestination_ = false;
50 inSender_ = false;
51 inPriority_ = false;
52 inAdministrative_ = false;
53 inClientProperty_ = false;
54 inExpiration_ = false;
55 inRcvTimestamp_ = false;
56 inIsVolatile_ = false;
57 inIsPersistent_ = false;
58 inReadonly_ = false;
59 inRoute_ = false;
60 sendRemainingLife_ = true;
61 inQos_ = false;
62 }
63
64 MsgQosFactory::~MsgQosFactory()
65 {
66 if (clientProperty_ != 0) {
67 delete(clientProperty_);
68 }
69 if (msgQosDataP_ != 0) {
70 delete(msgQosDataP_);
71 }
72 }
73
74 MsgQosData MsgQosFactory::readObject(const string& xmlQos)
75 {
76 delete msgQosDataP_;
77 msgQosDataP_ = new MsgQosData(global_);
78 routeInfo_ = RouteInfo(global_);
79 //queuePropertyFactory_ = QueuePropertyFactory(global_);
80 delete clientProperty_;
81 clientProperty_ = 0;
82
83 if (xmlQos.empty()) init("<qos/>");
84 else init(xmlQos);
85 return *msgQosDataP_;
86 }
87
88 void MsgQosFactory::startElement(const string &name, const AttributeMap& attrs)
89 {
90 bool tmpBool;
91 string tmpString;
92 long tmpLong;
93 Timestamp tmpTimestamp;
94 if (name.compare("qos") == 0) {
95 inQos_ = true;
96 return;
97 }
98 if (name.compare("persistence") == 0 || inPersistence_) {
99 if (!inQos_) return;
100 inPersistence_ = true;
101 queuePropertyFactory_.startElement(name, attrs);
102 return;
103 }
104 if (name.compare("state") == 0) {
105 if (!inQos_) return;
106 inState_ = true;
107 AttributeMap::const_iterator iter = attrs.begin();
108 string tmpName = (*iter).first;
109 string tmpValue = (*iter).second;
110 while (iter != attrs.end()) {
111 if (tmpName.compare("id") == 0) {
112 msgQosDataP_->setState(tmpValue);
113 }
114 else if (tmpName.compare("info") == 0) {
115 msgQosDataP_->setStateInfo(tmpValue);
116 }
117 iter++;
118 }
119 return;
120 }
121 if (name.compare("destination") == 0) {
122 if (!inQos_) return;
123 inDestination_ = true;
124 destination_ = Destination(global_);
125 AttributeMap::const_iterator iter = attrs.begin();
126 while (iter != attrs.end()) {
127 string tmpName = (*iter).first;
128 string tmpValue = (*iter).second;
129 if (tmpName.compare("queryType") == 0) {
130 string queryType = tmpValue;
131 if (queryType == "EXACT") destination_.setQueryType(queryType);
132 else if (queryType == "XPATH") destination_.setQueryType(queryType);
133 else log_.error(ME, string("Sorry, destination queryType='") + queryType + string("' is not supported"));
134 }
135 else if( tmpName.compare("forceQueuing") == 0) {
136 destination_.forceQueuing(XmlHandlerBase::getBoolValue(tmpValue));
137 }
138 iter++;
139 }
140
141 return;
142 }
143 if (name.compare("sender") == 0) {
144 if (!inQos_) return;
145 inSender_ = true;
146 return;
147 }
148 if (name.compare("priority") == 0) {
149 if (!inQos_) return;
150 inPriority_ = true;
151 return;
152 }
153 if (name.compare("administrative") == 0) {
154 if (!inQos_) return;
155 inAdministrative_ = true;
156 return;
157 }
158 if (name.compare("expiration") == 0) {
159 if (!inQos_) return;
160 inExpiration_ = true;
161 // int len = attrs.getLength();
162 if (getLongAttr(attrs, LIFE_TIME, tmpLong)) msgQosDataP_->setLifeTime(tmpLong);
163 else {
164 log_.warn(ME, string("QoS <expiration> misses lifeTime attribute, setting default of ") + lexical_cast<std::string>(msgQosDataP_->getMaxLifeTime()));
165 msgQosDataP_->setLifeTime(msgQosDataP_->getMaxLifeTime());
166 }
167 if (getBoolAttr(attrs, FORCE_DESTROY, tmpBool)) msgQosDataP_->setForceDestroy(tmpBool);
168 if (getLongAttr(attrs, REMAINING_LIFE, tmpLong)) msgQosDataP_->setRemainingLifeStatic(tmpLong);
169 return;
170 }
171 if (name.compare("rcvTimestamp") == 0) {
172 if (!inQos_) return;
173 if (getTimestampAttr(attrs, NANOS, tmpTimestamp)) msgQosDataP_->setRcvTimestamp(tmpTimestamp);
174 inRcvTimestamp_ = true;
175 return;
176 }
177 if (name.compare("redeliver") == 0) {
178 if (!inQos_) return;
179 inRedeliver_ = true;
180 return;
181 }
182 if (name.compare("route") == 0) {
183 if (!inQos_) return;
184 inRoute_ = true;
185 return;
186 }
187 if (name.compare("node") == 0) {
188 if (!inRoute_) {
189 log_.error(ME, "Ignoring <node>, it is not inside <route>");
190 return;
191 }
192 if (attrs.size() > 0) {
193 if (!getStringAttr(attrs, ID, tmpString)) {
194 log_.error(ME, "QoS <route><node> misses id attribute, ignoring node");
195 return;
196 }
197 NodeId nodeId(global_, tmpString); // where tmpString is the id
198 int stratum = 0;
199 if (!getIntAttr(attrs, STRATUM, stratum)) {
200 log_.warn(ME, "QoS <route><node> misses stratum attribute, setting to 0: ");
201 //Thread.currentThread().dumpStack();
202 }
203 Timestamp timestamp = 0;
204 if (!getTimestampAttr(attrs, TIMESTAMP, timestamp)) {
205 log_.warn(ME, "QoS <route><node> misses receive timestamp attribute, setting to 0");
206 }
207 // bool dirtyRead = org::xmlBlaster::util::cluster::DEFAULT_dirtyRead;
208 if (log_.trace()) log_.trace(ME, "Found node tag");
209 routeInfo_ = RouteInfo(nodeId, stratum, timestamp);
210 if (getBoolAttr(attrs, DIRTY_READ, tmpBool)) routeInfo_.setDirtyRead(tmpBool);
211 }
212 return;
213 }
214 if (name.compare(MethodName::SUBSCRIBE) == 0) {
215 if (!inQos_) return;
216 inSubscribe_ = true;
217 AttributeMap::const_iterator iter = attrs.begin();
218 while (iter != attrs.end()) {
219 if ( ((*iter).first).compare("id") == 0) {
220 msgQosDataP_->setSubscriptionId( (*iter).second );
221 }
222 iter++;
223 }
224 return;
225 }
226
227 if (name.compare("persistent") == 0) {
228 if (!inQos_) return;
229 msgQosDataP_->setPersistent(true);
230 return;
231 }
232
233 if (name.compare("forceUpdate") == 0) {
234 if (!inQos_) return;
235 msgQosDataP_->setForceUpdate(true);
236 return;
237 }
238
239 if (name.compare("readonly") == 0) {
240 if (!inQos_) return;
241 msgQosDataP_->setReadonly(true);
242 log_.error(ME, "<qos><readonly/></qos> is deprecated, please use readonly as topic attribute <qos><topic readonly='true'></qos>");
243 return;
244 }
245
246 if (name.compare("clientProperty") == 0) {
247 if (!inQos_) return;
248 inClientProperty_ = true;
249 character_.erase();
250 string nameAttr;
251
252 AttributeMap::const_iterator iter = attrs.find("name");
253 if (iter != attrs.end()) nameAttr = (*iter).second;
254
255 string encoding;
256 iter = attrs.find("encoding");
257 if (iter != attrs.end()) encoding = (*iter).second;
258
259 string type;
260 iter = attrs.find("type");
261 if (iter != attrs.end()) type = (*iter).second;
262
263 string charset;
264 iter = attrs.find("charset");
265 if (iter != attrs.end()) charset = (*iter).second;
266
267 clientProperty_ = new ClientProperty(true, nameAttr, type, encoding, charset);
268 }
269
270 }
271
272
273 void MsgQosFactory::characters(const string &ch)
274 {
275 XmlHandlerBase::characters(ch);
276 if (inQueue_ || inPersistence_) queuePropertyFactory_.characters(ch);
277 }
278
279
280 void MsgQosFactory::endElement(const string &name)
281 {
282 if (name.compare("qos") == 0) {
283 inQos_ = false;
284 return;
285 }
286 //log_.error(ME, "endElement: name=" + name + " character_=" + character_);
287 if (inQueue_ || inPersistence_) {
288 queuePropertyFactory_.endElement(name);
289 if(name.compare("queue") == 0) {
290 inQueue_ = false;
291 character_.erase();
292 QueuePropertyBase tmp = queuePropertyFactory_.getQueueProperty();
293 string relating = tmp.getRelating();
294 TopicProperty tmpProp = msgQosDataP_->getTopicProperty();
295 if (relating == Constants::RELATING_HISTORY) {
296 tmpProp.setHistoryQueueProperty(tmp);
297 msgQosDataP_->setTopicProperty(tmpProp);
298 }
299 else {
300 log_.error(ME, string("Ignoring unknown <queue relating='") + relating + "'/> configuration");
301 }
302 return;
303 }
304
305 if(name.compare("persistence") == 0) { // topic: RELATING_MSGUNITSTORE
306 inPersistence_ = false;
307 character_.erase();
308 QueuePropertyBase tmp = queuePropertyFactory_.getQueueProperty();
309 TopicProperty tmpProp = msgQosDataP_->getTopicProperty();
310 tmpProp.setMsgUnitStoreProperty(tmp);
311 msgQosDataP_->setTopicProperty(tmpProp);
312 return;
313 }
314 }
315
316 if (name.compare("state") == 0) {
317 inState_ = false;
318 character_.erase();
319 return;
320 }
321
322 if( name.compare("destination") == 0) {
323 inDestination_ = false;
324 StringTrim::trim(character_); // The address or XPath query string
325 if (!character_.empty()) {
326 destination_.getDestination()->setAbsoluteName(character_); // set address or XPath query string if it is before the forceQueuing tag
327 character_.erase();
328 }
329 msgQosDataP_->addDestination(destination_);
330 return;
331 }
332
333 if(name.compare("sender") == 0) {
334 inSender_ = false;
335 StringTrim::trim(character_);
336 msgQosDataP_->getSender()->setAbsoluteName(character_);
337 // if (log.trace()) log.trace(ME, "Found message sender login name = " + msgQosData.getSender());
338 character_.erase();
339 return;
340 }
341
342 if(name.compare("priority") == 0) {
343 inPriority_ = false;
344 msgQosDataP_->setPriority(str2Priority(character_));
345 character_.erase();
346 return;
347 }
348
349 if(name.compare("administrative") == 0) {
350 inAdministrative_ = false;
351 msgQosDataP_->setAdministrative(XmlHandlerBase::getBoolValue(character_));
352 character_.erase();
353 return;
354 }
355
356 if(name.compare("expiration") == 0) {
357 inExpiration_ = false;
358 character_.erase();
359 return;
360 }
361
362 if(name.compare("rcvTimestamp") == 0) {
363 inRcvTimestamp_ = false;
364 character_.erase();
365 return;
366 }
367
368 if(name.compare("forceUpdate") == 0) {
369 inIsVolatile_ = false;
370 msgQosDataP_->setForceUpdate(StringTrim::isTrueTrim(character_));
371 character_.erase();
372 return;
373 }
374
375 if (name.compare(MethodName::SUBSCRIBE) == 0) {
376 inSubscribe_ = false;
377 character_.erase();
378 return;
379 }
380
381 if(name.compare("persistent") == 0) {
382 inIsPersistent_ = false;
383 msgQosDataP_->setPersistent(StringTrim::isTrueTrim(character_));
384 character_.erase();
385 return;
386 }
387
388 if(name.compare("readonly") == 0) {
389 inReadonly_ = false;
390 msgQosDataP_->setReadonly(StringTrim::isTrueTrim(character_));
391 character_.erase();
392 return;
393 }
394
395 if(name.compare("redeliver") == 0) {
396 inRedeliver_ = false;
397 StringTrim::trim(character_);
398 msgQosDataP_->setRedeliver(atoi(character_.c_str()));
399 character_.erase();
400 return;
401 }
402
403 if (name.compare("node") == 0) {
404 msgQosDataP_->addRouteInfo(routeInfo_);
405 character_.erase();
406 return;
407 }
408
409 if (name.compare("route") == 0) {
410 inRoute_ = false;
411 character_.erase();
412 return;
413 }
414
415 if (name.compare("clientProperty") == 0) {
416 inClientProperty_ = false;
417 clientProperty_->setValueRaw(character_);
418 msgQosDataP_->addClientProperty(*clientProperty_);
419 delete clientProperty_;
420 clientProperty_ = 0;
421 character_.erase();
422 }
423
424 character_.erase(); // reset data from unknown tags
425 }
426
427
428 /** Configure if remaingLife is sent in Qos (redesign approach to work with all QoS attributes */
429 void MsgQosFactory::sendRemainingLife(bool sendRemainingLife)
430 {
431 sendRemainingLife_ = sendRemainingLife;
432 }
433
434 bool MsgQosFactory::sendRemainingLife()
435 {
436 return sendRemainingLife_;
437 }
438
439 }}}}
440
441
442 #ifdef _XMLBLASTER_CLASSTEST
443
444 using namespace std;
445 using namespace org::xmlBlaster::util::qos;
446
447 int main(int args, char* argv[])
448 {
449 try
450 {
451 Global& glob = Global::getInstance();
452 glob.initialize(args, argv);
453
454 MsgQosData data1(glob);
455 MsgQosFactory factory(glob);
456 string qos = data1.toXml();
457 MsgQosData data2 = factory.readObject(qos);
458
459 cout << "data before parsing: " << data1.toXml() << endl;
460 cout << "data after parsing : " << data2.toXml() << endl;
461 }
462 catch(...) {
463 cout << "exception occured\n";
464 return 1;
465 }
466 return 0;
467 }
468
469 #endif
470
471 /*
472 <qos>
473 <state id='OK' info='Keep on running"/> <!-- Only for updates and PtP -->
474 <sender>Tim</sender>
475 <priority>5</priority>
476 <administrative>false</administrative>
477 <subscribe id='__subId:1'/> <!-- Only for updates, id='__subId:PtP' for point to point messages -->
478 <rcvTimestamp nanos='1007764305862000002'> <!-- UTC time when message was created in xmlBlaster server with a publish() call, in nanoseconds since 1970 -->
479 2001-12-07 23:31:45.862000002 <!-- The nanos from above but human readable -->
480 </rcvTimestamp>
481 <expiration lifeTime='129595811' forceDestroy='false'/> <!-- Only for persistence layer -->
482 <queue index='0' of='1'/> <!-- If queued messages are flushed on login -->
483 <persistent/>
484 <redeliver>4</redeliver> <!-- Only for updates -->
485 <route>
486 <node id='heron'/>
487 </route>
488 <topic readonly='false' destroyDelay='60000' createDomEntry='true'>
489 <queue relating='topic' type='CACHE' version='1.0' maxEntries='1000' maxBytes='4000000' onOverflow='deadMessage'/>
490 <queue relating='history' type='CACHE' version='1.0' maxEntries='1000' maxBytes='4000000' onOverflow='exception'/>
491 </topic>
492 </qos>
493
494
495
496
497 <qos>
498 <destination queryType='EXACT' forceQueuing='true'>
499 Tim
500 </destination>
501 <destination queryType='EXACT'>
502 /node/heron/client/Ben
503 </destination>
504 <destination queryType='XPATH'> <!-- Not supported yet -->
505 //[GROUP='Manager']
506 </destination>
507 <destination queryType='XPATH'> <!-- Not supported yet -->
508 //ROLE/[@id='Developer']
509 </destination>
510 <sender>
511 Gesa
512 </sender>
513 <priority>7</priority>
514 <route>
515 <node id='bilbo' stratum='2' timestamp='34460239640' dirtyRead='true'/>
516 </route>
517 </qos>
518
519 */
syntax highlighted by Code2HTML, v. 0.9.1