1 /*------------------------------------------------------------------------------
2 Name: AddressBase.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Holding connect address and callback address string including protocol
6 Version: $Id: AddressBase.cpp 14417 2005-12-22 12:13:15Z ruff $
7 ------------------------------------------------------------------------------*/
8
9 /**
10 * Abstract helper class holding connect address and callback address string
11 * and protocol string.
12 * <p />
13 * See examples in the implementing classes
14 * @see Address
15 * @see CallbackAddress
16 */
17
18 #include <util/qos/address/AddressBase.h>
19 #include <util/lexical_cast.h>
20 #include <util/Global.h>
21
22 using namespace std;
23 using namespace org::xmlBlaster::util;
24
25 namespace org { namespace xmlBlaster { namespace util { namespace qos { namespace address {
26
27 const int DEFAULT_port = 3412;
28 const string DEFAULT_type = Global::getDefaultProtocol(); //"SOCKET";
29 const string DEFAULT_version = "1.0";
30 const long DEFAULT_collectTime = 0;
31 const int DEFAULT_burstModeMaxEntries= 1;
32 const long DEFAULT_burstModeMaxBytes = -1L;
33 const bool DEFAULT_oneway = false;
34 const bool DEFAULT_dispatcherActive = true;
35 const string DEFAULT_compressType = "";
36 const long DEFAULT_minSize = 0L;
37 const bool DEFAULT_ptpAllowed = true;
38 const string DEFAULT_sessionId = "unknown";
39 const bool DEFAULT_useForSubjectQueue = true;
40 string DEFAULT_dispatchPlugin = "";
41 string ATTRIBUTE_TAG = "attribute";
42
43
44 AddressBase::AddressBase(Global& global, const string& rootTag)
45 : ReferenceCounterBase(), global_(global), log_(global.getLog("org.xmlBlaster.util.qos")),
46 attributes_()
47 {
48
49 defaultPingInterval_ = 0L;
50 defaultRetries_ = 0;
51 defaultDelay_ = 0L;
52
53 // set the defaults here ...
54 ME = "AddressBase";
55 nodeId_ = "";
56 maxEntries_ = 0;
57 address_ = "";
58 hostname_ = "";
59 isHardcodedHostname_ = false;
60 type_ = DEFAULT_type;
61 port_ = DEFAULT_port;
62 version_ = DEFAULT_version;
63 collectTime_ = DEFAULT_collectTime;
64 burstModeMaxEntries_ = DEFAULT_burstModeMaxEntries;
65 burstModeMaxBytes_ = DEFAULT_burstModeMaxBytes;
66 pingInterval_ = defaultPingInterval_;
67 retries_ = defaultRetries_;
68 delay_ = defaultDelay_;
69 oneway_ = DEFAULT_oneway;
70 dispatcherActive_ = DEFAULT_dispatcherActive;
71 compressType_ = DEFAULT_compressType;
72 minSize_ = DEFAULT_minSize;
73 ptpAllowed_ = DEFAULT_ptpAllowed;
74 sessionId_ = DEFAULT_sessionId;
75 useForSubjectQueue_ = DEFAULT_useForSubjectQueue;
76 dispatchPlugin_ = DEFAULT_dispatchPlugin;
77 setRootTag(rootTag);
78 }
79
80 AddressBase::AddressBase(const AddressBase& addr)
81 : global_(addr.global_), log_(addr.log_),
82 attributes_()
83 {
84 copy(addr);
85 }
86
87 AddressBase& AddressBase::operator =(const AddressBase& addr)
88 {
89 copy(addr);
90 return *this;
91 }
92
93
94 AddressBase::~AddressBase()
95 {
96 }
97
98 void AddressBase::copy(const AddressBase& addr)
99 {
100 port_ = addr.port_;
101 ME = addr.ME;
102 rootTag_ = addr.rootTag_;
103 address_ = addr.address_;
104 hostname_ = addr.hostname_;
105 isHardcodedHostname_ = addr.isHardcodedHostname_;
106 type_ = addr.type_;
107 version_ = addr.version_;
108 collectTime_ = addr.collectTime_;
109 pingInterval_ = addr.pingInterval_;
110 retries_ = addr.retries_;
111 delay_ = addr.delay_;
112 oneway_ = addr.oneway_;
113 dispatcherActive_ = addr.dispatcherActive_;
114 compressType_ = addr.compressType_;
115 minSize_ = addr.minSize_;
116 ptpAllowed_ = addr.ptpAllowed_;
117 sessionId_ = addr.sessionId_;
118 useForSubjectQueue_ = addr.useForSubjectQueue_;
119 dispatchPlugin_ = addr.dispatchPlugin_;
120 nodeId_ = addr.nodeId_;
121 maxEntries_ = addr.maxEntries_;
122 defaultPingInterval_ = addr.defaultPingInterval_;
123 defaultRetries_ = addr.defaultRetries_;
124 defaultDelay_ = addr.defaultDelay_;
125 attributes_ = addr.attributes_;
126 }
127
128 /**
129 * A nice human readable name for this address (used for logging)
130 */
131 string AddressBase::getName()
132 {
133 return getHostname() + string(":") + lexical_cast<std::string>(getPort());
134 }
135
136 /**
137 * Check if supplied address would connect to the address of this instance
138 */
139 bool AddressBase::isSameAddress(AddressBase& other)
140 {
141 string oa = other.getRawAddress();
142 if ( (oa!="") && (oa == getRawAddress()) ) return true;
143 string oh = other.getHostname();
144 int op = other.getPort();
145 if ( (op>0) && (op==getPort()) && (oh!="") && (oh==getHostname()))
146 return true;
147 return false;
148 }
149
150 /**
151 * Show some important settings for logging
152 */
153 string AddressBase::getSettings() const
154 {
155 string ret = string("type=") + type_ + string(" oneway=") + lexical_cast<std::string>(oneway_) + string(" burstMode.collectTime=") + lexical_cast<std::string>(getCollectTime());
156 return ret;
157 }
158
159 /**
160 * @param type The protocol type, e.g. "IOR", "SOCKET", "XMLRPC"
161 */
162 void AddressBase::setType(const string& type)
163 {
164 type_ = type;
165 }
166
167 /**
168 * @param version The protocol version, e.g. "1.0"
169 */
170 void AddressBase::setVersion(const string& version)
171 {
172 version_ = version;
173 }
174
175 /**
176 * Updates the internal address as well.
177 * @param host An IP or DNS
178 */
179 void AddressBase::setHostname(const string& host)
180 {
181 initHostname(host);
182 isHardcodedHostname_ = true;
183 }
184
185 /**
186 * @return true if the bootstrapHostname is explicitly set by user with setHostname()
187 * false if it is determined automatically
188 */
189 bool AddressBase::isHardcodedHostname()
190 {
191 return isHardcodedHostname_;
192 }
193
194 /**
195 * Check if a bootstrapHostname is set already
196 */
197 bool AddressBase::hasHostname() {
198 return hostname_ != "";
199 }
200
201 /**
202 * @return The Hostname, IP or "" if not known
203 */
204 string AddressBase::getHostname() const
205 {
206 if (hostname_ == "") {
207 hostname_ = global_.getBootstrapHostname();
208 address_ = ""; // reset cache
209 }
210 return hostname_;
211 }
212
213 /**
214 * Set the bootstrapping port.
215 * Updates the internal address as well.
216 */
217 void AddressBase::setPort(int port)
218 {
219 port_ = port;
220 address_ = ""; // reset cache
221 }
222
223 int AddressBase::getPort() const
224 {
225 return port_;
226 }
227
228 /**
229 * Set the callback address, it should fit to the protocol-type.
230 *
231 * @param address The callback address, e.g. "socket://192.168.1.1:7607"
232 */
233 void AddressBase::setAddress(const string& address)
234 {
235 address_ = address;
236 }
237
238 // TODO: Protocol abstraction from plugin
239 string AddressBase::getRawAddress() const
240 {
241 if (address_ == "") {
242 string schema = (getType() == Constants::SOCKET) ? "socket" : "http";
243 address_ = schema + "://" + getHostname(); // socket://192.168.1.1:7607
244 if (getPort() > 0)
245 address_ += ":" + lexical_cast<std::string>(getPort());
246 }
247 return address_;
248 }
249
250 /**
251 * Returns the protocol type.
252 * @return e.g. "SOCKET" or "IOR" (never null).
253 */
254 string AddressBase::getType() const
255 {
256 return type_;
257 }
258
259 /**
260 * Returns the protocol version.
261 * @return e.g. "1.0" or null
262 */
263 string AddressBase::getVersion() const
264 {
265 return version_;
266 }
267
268 /**
269 * What to do if max retries is exhausted.
270 * <p />
271 * This mode is currently not configurable, we always destroy the login session.
272 * This is interpreted only server side if callback fails.
273 * @return Constants.ONEXHAUST_KILL_SESSION="killSession"
274 */
275 string AddressBase::getOnExhaust() const
276 {
277 return Constants::ONEXHAUST_KILL_SESSION; // in future possibly Constants.ONEXHAUST_KILL_CALLBACK
278 }
279
280 /**
281 * Kill login session if max callback retries is exhausted?
282 */
283 bool AddressBase::getOnExhaustKillSession() const
284 {
285 return getOnExhaust() == Constants::ONEXHAUST_KILL_SESSION;
286 }
287
288 /**
289 * BurstMode: The time span to collect messages before sending.
290 * @return The time to collect in milliseconds
291 */
292 long AddressBase::getCollectTime() const
293 {
294 return collectTime_;
295 }
296
297 /**
298 * BurstMode: The time to collect messages for sending in a bulk.
299 * @param The time to collect in milliseconds
300 */
301 void AddressBase::setCollectTime(long collectTime)
302 {
303 if (collectTime < 0) collectTime_ = 0;
304 else collectTime_ = collectTime;
305 }
306
307 int AddressBase::getBurstModeMaxEntries() const
308 {
309 return burstModeMaxEntries_;
310 }
311
312 void AddressBase::setBurstModeMaxEntries(int burstModeMaxEntries)
313 {
314 if (burstModeMaxEntries < -1) {
315 burstModeMaxEntries_ = 1;
316 }
317 else if (burstModeMaxEntries == 0) {
318 log_.warn(ME, string("<burstMode maxEntries='") + lexical_cast<std::string>(burstModeMaxEntries) + string("'> is not supported and may cause strange behavior"));
319 burstModeMaxEntries_ = burstModeMaxEntries;
320 }
321 else {
322 burstModeMaxEntries_ = burstModeMaxEntries;
323 }
324 }
325
326 long AddressBase::getBurstModeMaxBytes() const
327 {
328 return burstModeMaxBytes_;
329 }
330
331 void AddressBase::setBurstModeMaxBytes(long burstModeMaxBytes)
332 {
333 if (burstModeMaxBytes < -1) burstModeMaxBytes_ = -1;
334 else burstModeMaxBytes_ = burstModeMaxBytes;
335 }
336
337 /**
338 * How long to wait between pings to the callback server.
339 * @return The pause time between pings in millis
340 */
341 long AddressBase::getPingInterval() const
342 {
343 return pingInterval_;
344 }
345
346 /**
347 * How long to wait between pings to the callback server.
348 * @param pingInterval The pause time between pings in millis
349 */
350 void AddressBase::setPingInterval(long pingInterval)
351 {
352 if (pingInterval <= 0) pingInterval_ = 0;
353 else if (pingInterval < 10) {
354 log_.warn(ME, string("pingInterval=") + lexical_cast<std::string>(pingInterval) + string(" msec is too short, setting it to 10 millis"));
355 pingInterval_ = 10;
356 }
357 else pingInterval_ = pingInterval;
358 }
359
360 /**
361 * How often shall we retry callback attempt on callback failure
362 * @return -1 forever, 0 no retry, > 0 number of retries
363 */
364 int AddressBase::getRetries() const
365 {
366 return retries_;
367 }
368
369 /**
370 * How often shall we retry callback attempt on callback failure
371 * @param -1 forever, 0 no retry, > 0 number of retries
372 */
373 void AddressBase::setRetries(int retries)
374 {
375 if (retries < -1) retries_ = -1;
376 else retries_ = retries;
377 }
378
379 /**
380 * Delay between callback retries in milliseconds, defaults to one minute
381 * @return The delay in millisconds
382 */
383 long AddressBase::getDelay() const
384 {
385 return delay_;
386 }
387
388 /**
389 * Delay between callback retries in milliseconds, defaults to one minute
390 */
391 void AddressBase::setDelay(long delay)
392 {
393 if (delay < 0) delay_ = 0;
394 else delay_ = delay;
395 }
396
397 /**
398 * Shall the publish() or callback update() message be oneway.
399 * Is only with CORBA and our native SOCKET protocol supported
400 * @return true if you want to force oneway sending
401 */
402 bool AddressBase::oneway() const
403 {
404 return oneway_;
405 }
406
407 /**
408 * Shall the publish() or callback update() message be oneway.
409 * Is only with CORBA and our native SOCKET protocol supported
410 * @param oneway false is default
411 */
412 void AddressBase::setOneway(bool oneway)
413 {
414 oneway_ = oneway;
415 }
416
417 bool AddressBase::isDispatcherActive() const
418 {
419 return dispatcherActive_;
420 }
421
422 void AddressBase::setDispatcherActive(bool dispatcherActive)
423 {
424 dispatcherActive_ = dispatcherActive;
425 }
426
427 /**
428 * @param Set if we accept point to point messages
429 */
430 void AddressBase::setPtpAllowed(bool ptpAllowed)
431 {
432 ptpAllowed_ = ptpAllowed;
433 }
434
435 /**
436 * @return true if we may send PtP messages
437 */
438 bool AddressBase::isPtpAllowed()
439 {
440 return ptpAllowed_;
441 }
442
443 void AddressBase::setCompressType(const string& compressType)
444 {
445 compressType_ = compressType;
446
447 // TODO !!!
448 if (compressType != "")
449 log_.warn(ME, "Compression of messages is not yet supported");
450 }
451
452 /**
453 * The identifier sent to the callback client, the client can decide if he trusts this invocation
454 * @return never null
455 */
456 string AddressBase::getSecretSessionId() const
457 {
458 return sessionId_;
459 }
460
461 /** The identifier sent to the callback client, the client can decide if he trusts this invocation */
462 void AddressBase::setSecretSessionId(const string& sessionId)
463 {
464 sessionId_ = sessionId;
465 }
466
467 /**
468 * Get the compression method.
469 * @return "" No compression
470 */
471 string AddressBase::getCompressType() const
472 {
473 return compressType_;
474 }
475
476 /**
477 * Messages bigger this size in bytes are compressed.
478 * <br />
479 * Note: This value is only used if compressType is set to a supported value
480 * @return size in bytes
481 */
482 long AddressBase::getMinSize() const
483 {
484 return minSize_;
485 }
486
487 /**
488 * Messages bigger this size in bytes are compressed.
489 * <br />
490 * Note: This value is only evaluated if compressType is set to a supported value
491 * @return size in bytes
492 */
493 void AddressBase::setMinSize(long minSize)
494 {
495 minSize_ = minSize;
496 }
497
498 /**
499 * Specify your dispatcher plugin configuration.
500 * <p>
501 * Set to "undef" to switch off, or to e.g. "Priority,1.0" to access the PriorizedDispatchPlugin
502 * </p>
503 * <p>
504 * This overwrites the xmlBlaster.properties default setting e.g.:
505 * <pre>
506 * DispatchPlugin[Priority][1.0]=org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin
507 * DispatchPlugin[SlowMotion][1.0]=org.xmlBlaster.util.dispatch.plugins.motion.SlowMotion
508 * DispatchPlugin/defaultPlugin=Priority,1.0
509 * </pre>
510 * </p>
511 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html">The dispatch.control.plugin requirement</a>
512 */
513 void AddressBase::setDispatchPlugin(const string& dispatchPlugin)
514 {
515 dispatchPlugin_ = dispatchPlugin;
516 }
517
518 /**
519 * @return "undef" or e.g. "Priority,1.0"
520 */
521 string AddressBase::getDispatchPlugin() const
522 {
523 return dispatchPlugin_;
524 }
525
526 void AddressBase::addAttribute(const ClientProperty& attribute)
527 {
528 attributes_.insert(ClientPropertyMap::value_type(attribute.getName(), attribute));
529 }
530
531 const AddressBase::ClientPropertyMap& AddressBase::getAttributes() const
532 {
533 return attributes_;
534 }
535
536 string AddressBase::dumpAttributes(const string& extraOffset, bool clearText) const
537 {
538 string ret = "";
539 QosData::ClientPropertyMap::const_iterator iter = attributes_.begin();
540 while (iter != attributes_.end()) {
541 const ClientProperty& cp = (*iter).second;
542 ret += cp.toXml(extraOffset, clearText, ATTRIBUTE_TAG);
543 iter++;
544 }
545 return ret;
546 }
547
548
549 /**
550 * Dump state of this object into a XML ASCII string.
551 * <br>
552 * Only none default values are dumped for performance reasons
553 * @param extraOffset indenting of tags for nice output
554 * @return The xml representation
555 */
556 string AddressBase::toXml(const string& extraOffset) const
557 {
558 //if (log_.call()) log_.call(ME, "::toXml");
559 string ret;
560 string offset = Constants::OFFSET + extraOffset;
561 string offset2 = offset + Constants::INDENT;
562
563 // <address type='SOCKET' bootstrapHostname='127.0.0.1' dispatchPlugin='undef'> ...
564 ret += offset + string("<") + rootTag_ + string(" type='") + getType() + string("'");
565 if ( (getVersion()!="") && (getVersion()!=DEFAULT_version))
566 ret += string(" version='") + getVersion() + string("'");
567 if (getHostname() != "")
568 ret += string(" bootstrapHostname='") + getHostname() + string("'");
569 if (DEFAULT_port != getPort())
570 ret += string(" bootstrapPort='") + lexical_cast<std::string>(getPort()) + string("'");
571 if (DEFAULT_sessionId != getSecretSessionId())
572 ret += string(" sessionId='") + getSecretSessionId() + string("'");
573 if (defaultPingInterval_ != getPingInterval())
574 ret += string(" pingInterval='") + lexical_cast<std::string>(getPingInterval()) + string("'");
575 if (defaultRetries_ != getRetries())
576 ret += string(" retries='") + lexical_cast<std::string>(getRetries()) + string("'");
577 if (defaultDelay_ != getDelay())
578 ret += string(" delay='") + lexical_cast<std::string>(getDelay()) + string("'");
579 if (DEFAULT_oneway != oneway()) {
580 ret += string(" oneway='") + lexical_cast<std::string>(oneway()) + string("'");
581 }
582 if (DEFAULT_dispatcherActive != isDispatcherActive()) {
583 ret += string(" dispatcherActive='") + lexical_cast<std::string>(isDispatcherActive()) + string("'");
584 }
585 if (DEFAULT_useForSubjectQueue != useForSubjectQueue_) {
586 ret += string(" useForSubjectQueue='") + lexical_cast<std::string>(useForSubjectQueue_) + string("'");
587 }
588 if (DEFAULT_dispatchPlugin != dispatchPlugin_)
589 ret += string(" dispatchPlugin='") + dispatchPlugin_ + string("'");
590 ret += string(">");
591 if (getRawAddress() != "")
592 ret += offset + string(" ") + getRawAddress();
593 if (getCollectTime() != DEFAULT_collectTime || getBurstModeMaxEntries() != DEFAULT_burstModeMaxEntries || getBurstModeMaxBytes() != DEFAULT_burstModeMaxBytes) {
594 ret += offset2 + string("<burstMode");
595 if (getCollectTime() != DEFAULT_collectTime)
596 ret += string(" collectTime='") + lexical_cast<std::string>(getCollectTime()) + string("'");
597 if (getBurstModeMaxEntries() != DEFAULT_burstModeMaxEntries)
598 ret += string(" maxEntries='") + lexical_cast<std::string>(getBurstModeMaxEntries()) + string("'");
599 if (getBurstModeMaxBytes() != DEFAULT_burstModeMaxBytes)
600 ret += string(" maxBytes='") + lexical_cast<std::string>(getBurstModeMaxBytes()) + string("'");
601 ret += string("/>");
602 }
603 if (getCompressType() != DEFAULT_compressType)
604 ret += offset2 + string("<compress type='") + getCompressType() + string("' minSize='") + lexical_cast<std::string>(getMinSize()) + string("'/>");
605 if (ptpAllowed_ != DEFAULT_ptpAllowed) {
606 ret += offset2 + string("<ptp>") + lexical_cast<std::string>(ptpAllowed_) + string("</ptp>");
607 }
608 ret += dumpAttributes(offset2);
609 ret += offset + string("</") + rootTag_ + string(">");
610 return ret;
611 }
612
613 }}}}} // namespaces
syntax highlighted by Code2HTML, v. 0.9.1