1 /*------------------------------------------------------------------------------
  2 Name:      AddressBase.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Holding connect address and callback address string including protocol
  6 Version:   $Id: AddressBase.cpp 14417 2005-12-22 12:13:15Z ruff $
  7 ------------------------------------------------------------------------------*/
  8 
  9 /**
 10  * Abstract helper class holding connect address and callback address string
 11  * and protocol string.
 12  * <p />
 13  * See examples in the implementing classes
 14  * @see Address
 15  * @see CallbackAddress
 16  */
 17 
 18 #include <util/qos/address/AddressBase.h>
 19 #include <util/lexical_cast.h>
 20 #include <util/Global.h>
 21 
 22 using namespace std;
 23 using namespace org::xmlBlaster::util;
 24 
 25 namespace org { namespace xmlBlaster { namespace util { namespace qos { namespace address {
 26 
 27 const int    DEFAULT_port               = 3412;
 28 const string DEFAULT_type               = Global::getDefaultProtocol(); //"SOCKET";
 29 const string DEFAULT_version            = "1.0";
 30 const long   DEFAULT_collectTime        = 0;
 31 const int    DEFAULT_burstModeMaxEntries= 1;
 32 const long   DEFAULT_burstModeMaxBytes  = -1L;
 33 const bool   DEFAULT_oneway             = false;
 34 const bool   DEFAULT_dispatcherActive   = true;
 35 const string DEFAULT_compressType       = "";
 36 const long   DEFAULT_minSize            = 0L;
 37 const bool   DEFAULT_ptpAllowed         = true;
 38 const string DEFAULT_sessionId          = "unknown";
 39 const bool   DEFAULT_useForSubjectQueue = true;
 40 string DEFAULT_dispatchPlugin     = "";
 41 string ATTRIBUTE_TAG = "attribute";
 42 
 43 
 44 AddressBase::AddressBase(Global& global, const string& rootTag)
 45    : ReferenceCounterBase(), global_(global), log_(global.getLog("org.xmlBlaster.util.qos")),
 46       attributes_()
 47 {
 48 
 49    defaultPingInterval_ = 0L;
 50    defaultRetries_      = 0;
 51    defaultDelay_        = 0L;
 52 
 53    // set the defaults here ...
 54    ME                   = "AddressBase";
 55    nodeId_              = "";
 56    maxEntries_          = 0;
 57    address_             = "";
 58    hostname_            = "";
 59    isHardcodedHostname_ = false;
 60    type_                = DEFAULT_type;
 61    port_                = DEFAULT_port;
 62    version_             = DEFAULT_version;
 63    collectTime_         = DEFAULT_collectTime;
 64    burstModeMaxEntries_ = DEFAULT_burstModeMaxEntries;
 65    burstModeMaxBytes_   = DEFAULT_burstModeMaxBytes;
 66    pingInterval_        = defaultPingInterval_;
 67    retries_             = defaultRetries_;
 68    delay_               = defaultDelay_;
 69    oneway_              = DEFAULT_oneway;
 70    dispatcherActive_    = DEFAULT_dispatcherActive;
 71    compressType_        = DEFAULT_compressType;
 72    minSize_             = DEFAULT_minSize;
 73    ptpAllowed_          = DEFAULT_ptpAllowed;
 74    sessionId_           = DEFAULT_sessionId;
 75    useForSubjectQueue_  = DEFAULT_useForSubjectQueue;
 76    dispatchPlugin_      = DEFAULT_dispatchPlugin;
 77    setRootTag(rootTag);
 78 }
 79 
 80 AddressBase::AddressBase(const AddressBase& addr)
 81    : global_(addr.global_), log_(addr.log_),
 82       attributes_()
 83 {
 84    copy(addr);
 85 }
 86 
 87 AddressBase& AddressBase::operator =(const AddressBase& addr)
 88 {
 89    copy(addr);
 90    return *this;
 91 }
 92 
 93 
 94 AddressBase::~AddressBase()
 95 {
 96 }
 97 
 98 void AddressBase::copy(const AddressBase& addr)
 99 {
100    port_                = addr.port_;
101    ME                   = addr.ME;
102    rootTag_             = addr.rootTag_;
103    address_             = addr.address_;
104    hostname_            = addr.hostname_;
105    isHardcodedHostname_ = addr.isHardcodedHostname_;
106    type_                = addr.type_;
107    version_             = addr.version_;
108    collectTime_         = addr.collectTime_;
109    pingInterval_        = addr.pingInterval_;
110    retries_             = addr.retries_;
111    delay_               = addr.delay_;
112    oneway_              = addr.oneway_;
113    dispatcherActive_    = addr.dispatcherActive_;
114    compressType_        = addr.compressType_;
115    minSize_             = addr.minSize_;
116    ptpAllowed_          = addr.ptpAllowed_;
117    sessionId_           = addr.sessionId_;
118    useForSubjectQueue_  = addr.useForSubjectQueue_;
119    dispatchPlugin_      = addr.dispatchPlugin_;
120    nodeId_              = addr.nodeId_;
121    maxEntries_          = addr.maxEntries_;
122    defaultPingInterval_ = addr.defaultPingInterval_;
123    defaultRetries_      = addr.defaultRetries_;
124    defaultDelay_        = addr.defaultDelay_;
125    attributes_          = addr.attributes_;
126 }
127 
128 /**
129  * A nice human readable name for this address (used for logging)
130  */
131 string AddressBase::getName()
132 {
133    return getHostname() + string(":") + lexical_cast<std::string>(getPort());
134 }
135 
136 /**
137  * Check if supplied address would connect to the address of this instance
138  */
139 bool AddressBase::isSameAddress(AddressBase& other)
140 {
141    string oa = other.getRawAddress();
142    if ( (oa!="") && (oa == getRawAddress()) ) return true;
143    string oh = other.getHostname();
144    int op = other.getPort();
145    if ( (op>0) && (op==getPort()) && (oh!="") && (oh==getHostname()))
146       return true;
147    return false;
148 }
149 
150 /**
151  * Show some important settings for logging
152  */
153 string AddressBase::getSettings() const
154 {
155    string ret = string("type=") + type_ + string(" oneway=") + lexical_cast<std::string>(oneway_) + string(" burstMode.collectTime=") + lexical_cast<std::string>(getCollectTime());
156    return ret;
157 }
158 
159 /**
160  * @param type    The protocol type, e.g. "IOR", "SOCKET", "XMLRPC"
161  */
162 void AddressBase::setType(const string& type)
163 {
164    type_ = type;
165 }
166 
167 /**
168  * @param version   The protocol version, e.g. "1.0"
169  */
170 void AddressBase::setVersion(const string& version)
171 {
172    version_ = version;
173 }
174 
175 /**
176  * Updates the internal address as well. 
177  * @param host An IP or DNS
178  */
179 void AddressBase::setHostname(const string& host)
180 {
181    initHostname(host);
182    isHardcodedHostname_ = true;
183 }
184 
185 /**
186  * @return true if the bootstrapHostname is explicitly set by user with setHostname()
187  * false if it is determined automatically
188  */
189 bool AddressBase::isHardcodedHostname()
190 {
191    return isHardcodedHostname_;
192 }
193 
194 /**             
195  * Check if a bootstrapHostname is set already
196  */
197 bool AddressBase::hasHostname() {
198    return hostname_ != "";
199 }
200 
201 /**
202  * @return The Hostname, IP or "" if not known
203  */
204 string AddressBase::getHostname() const
205 {
206    if (hostname_ == "") {
207       hostname_ = global_.getBootstrapHostname();
208       address_  = ""; // reset cache
209    }
210    return hostname_;
211 }
212 
213 /**
214  * Set the bootstrapping port. 
215  * Updates the internal address as well. 
216  */
217 void AddressBase::setPort(int port)
218 {
219    port_    = port;
220    address_ = ""; // reset cache
221 }
222 
223 int AddressBase::getPort() const
224 {
225    return port_;
226 }
227 
228 /**
229  * Set the callback address, it should fit to the protocol-type.
230  *
231  * @param address The callback address, e.g. "socket://192.168.1.1:7607"
232  */
233 void AddressBase::setAddress(const string& address)
234 {
235    address_ = address;
236 }
237 
238 // TODO: Protocol abstraction from plugin
239 string AddressBase::getRawAddress() const
240 {
241    if (address_ == "") {
242       string schema = (getType() == Constants::SOCKET) ? "socket" : "http";
243       address_ = schema + "://" + getHostname();  // socket://192.168.1.1:7607
244       if (getPort() > 0)
245          address_ += ":" + lexical_cast<std::string>(getPort());
246    }
247    return address_;
248 }
249 
250 /**
251  * Returns the protocol type.
252  * @return e.g. "SOCKET" or "IOR" (never null).
253  */
254 string AddressBase::getType() const
255 {
256    return type_;
257 }
258 
259 /**
260  * Returns the protocol version.
261  * @return e.g. "1.0" or null
262  */
263 string AddressBase::getVersion() const
264 {
265    return version_;
266 }
267 
268 /**
269  * What to do if max retries is exhausted. 
270  * <p />
271  * This mode is currently not configurable, we always destroy the login session. 
272  * This is interpreted only server side if callback fails.
273  * @return Constants.ONEXHAUST_KILL_SESSION="killSession"
274  */
275 string AddressBase::getOnExhaust() const
276 {
277    return Constants::ONEXHAUST_KILL_SESSION; // in future possibly Constants.ONEXHAUST_KILL_CALLBACK
278 }
279 
280 /**
281  * Kill login session if max callback retries is exhausted?
282  */
283 bool AddressBase::getOnExhaustKillSession() const
284 {
285    return getOnExhaust() == Constants::ONEXHAUST_KILL_SESSION;
286 }
287 
288 /**
289  * BurstMode: The time span to collect messages before sending. 
290  * @return The time to collect in milliseconds
291  */
292 long AddressBase::getCollectTime() const
293 {
294    return collectTime_;
295 }
296 
297 /**
298  * BurstMode: The time to collect messages for sending in a bulk. 
299  * @param The time to collect in milliseconds
300  */
301 void AddressBase::setCollectTime(long collectTime)
302 {
303    if (collectTime < 0) collectTime_ = 0;
304    else collectTime_ = collectTime;
305 }
306 
307 int AddressBase::getBurstModeMaxEntries() const
308 {
309    return burstModeMaxEntries_;
310 }
311 
312 void AddressBase::setBurstModeMaxEntries(int burstModeMaxEntries)
313 {
314    if (burstModeMaxEntries < -1) {
315       burstModeMaxEntries_ = 1;
316    }
317    else if (burstModeMaxEntries == 0) {
318       log_.warn(ME, string("<burstMode maxEntries='") + lexical_cast<std::string>(burstModeMaxEntries) + string("'> is not supported and may cause strange behavior"));
319       burstModeMaxEntries_ = burstModeMaxEntries;
320    }
321    else {
322       burstModeMaxEntries_ = burstModeMaxEntries;
323    }
324 }
325 
326 long AddressBase::getBurstModeMaxBytes() const
327 {
328    return burstModeMaxBytes_;
329 }
330 
331 void AddressBase::setBurstModeMaxBytes(long burstModeMaxBytes)
332 {
333    if (burstModeMaxBytes < -1) burstModeMaxBytes_ = -1;
334    else burstModeMaxBytes_ = burstModeMaxBytes;
335 }
336 
337 /**
338  * How long to wait between pings to the callback server. 
339  * @return The pause time between pings in millis
340  */
341 long AddressBase::getPingInterval() const
342 {
343    return pingInterval_;
344 }
345 
346 /**
347  * How long to wait between pings to the callback server. 
348  * @param pingInterval The pause time between pings in millis
349  */
350 void AddressBase::setPingInterval(long pingInterval)
351 {
352    if (pingInterval <= 0) pingInterval_ = 0;
353    else if (pingInterval < 10) {
354       log_.warn(ME, string("pingInterval=") + lexical_cast<std::string>(pingInterval) + string(" msec is too short, setting it to 10 millis"));
355       pingInterval_ = 10;
356    }
357    else pingInterval_ = pingInterval;
358 }
359 
360 /**
361  * How often shall we retry callback attempt on callback failure
362  * @return -1 forever, 0 no retry, > 0 number of retries
363  */
364 int AddressBase::getRetries() const
365 {
366    return retries_;
367 }
368 
369 /**
370  * How often shall we retry callback attempt on callback failure
371  * @param -1 forever, 0 no retry, > 0 number of retries
372  */
373 void AddressBase::setRetries(int retries)
374 {
375    if (retries < -1) retries_ = -1;
376    else retries_ = retries;
377 }
378 
379 /**
380  * Delay between callback retries in milliseconds, defaults to one minute
381  * @return The delay in millisconds
382  */
383 long AddressBase::getDelay() const
384 {
385    return delay_;
386 }
387 
388 /**
389  * Delay between callback retries in milliseconds, defaults to one minute
390  */
391 void AddressBase::setDelay(long delay)
392 {
393    if (delay < 0) delay_ = 0;
394    else delay_ = delay;
395 }
396 
397 /**
398  * Shall the publish() or callback update() message be oneway. 
399  * Is only with CORBA and our native SOCKET protocol supported
400  * @return true if you want to force oneway sending
401  */
402 bool AddressBase::oneway() const
403 {
404    return oneway_;
405 }
406 
407 /**
408  * Shall the publish() or callback update() message be oneway. 
409  * Is only with CORBA and our native SOCKET protocol supported
410  * @param oneway false is default
411  */
412 void AddressBase::setOneway(bool oneway)
413 {
414    oneway_ = oneway;
415 }
416 
417 bool AddressBase::isDispatcherActive() const
418 {
419    return dispatcherActive_;
420 }
421 
422 void AddressBase::setDispatcherActive(bool dispatcherActive)
423 {
424    dispatcherActive_ = dispatcherActive;
425 }
426 
427 /**
428  * @param Set if we accept point to point messages
429  */
430 void AddressBase::setPtpAllowed(bool ptpAllowed)
431 {
432    ptpAllowed_ = ptpAllowed;
433 }
434 
435 /**
436  * @return true if we may send PtP messages
437  */
438 bool AddressBase::isPtpAllowed()
439 {
440    return ptpAllowed_;
441 }
442 
443 void AddressBase::setCompressType(const string& compressType)
444 {
445    compressType_ = compressType;
446 
447    // TODO !!!
448    if (compressType != "")
449       log_.warn(ME, "Compression of messages is not yet supported");
450 }
451 
452 /**
453  * The identifier sent to the callback client, the client can decide if he trusts this invocation
454  * @return never null
455  */
456 string AddressBase::getSecretSessionId() const
457 {
458    return sessionId_;
459 }
460 
461 /** The identifier sent to the callback client, the client can decide if he trusts this invocation */
462 void AddressBase::setSecretSessionId(const string& sessionId)
463 {
464    sessionId_ = sessionId;
465 }
466 
467 /**
468  * Get the compression method. 
469  * @return "" No compression
470  */
471 string AddressBase::getCompressType() const
472 {
473    return compressType_;
474 }
475 
476 /** 
477  * Messages bigger this size in bytes are compressed. 
478  * <br />
479  * Note: This value is only used if compressType is set to a supported value
480  * @return size in bytes
481  */
482 long AddressBase::getMinSize() const
483 {
484    return minSize_;
485 }
486 
487 /** 
488  * Messages bigger this size in bytes are compressed. 
489  * <br />
490  * Note: This value is only evaluated if compressType is set to a supported value
491  * @return size in bytes
492  */
493 void AddressBase::setMinSize(long minSize)
494 {
495    minSize_ = minSize;
496 }
497 
498 /**
499  * Specify your dispatcher plugin configuration. 
500  * <p>
501  * Set to "undef" to switch off, or to e.g. "Priority,1.0" to access the PriorizedDispatchPlugin
502  * </p>
503  * <p>
504  * This overwrites the xmlBlaster.properties default setting e.g.:
505  * <pre>
506  * DispatchPlugin[Priority][1.0]=org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin
507  * DispatchPlugin[SlowMotion][1.0]=org.xmlBlaster.util.dispatch.plugins.motion.SlowMotion
508  * DispatchPlugin/defaultPlugin=Priority,1.0
509  * </pre>
510  * </p>
511  * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html">The dispatch.control.plugin requirement</a>
512  */
513 void AddressBase::setDispatchPlugin(const string& dispatchPlugin)
514 {
515    dispatchPlugin_ = dispatchPlugin;
516 }
517 
518 /**
519  * @return "undef" or e.g. "Priority,1.0"
520  */
521 string AddressBase::getDispatchPlugin() const
522 {
523    return dispatchPlugin_;
524 }
525 
526 void AddressBase::addAttribute(const ClientProperty& attribute)
527 {
528    attributes_.insert(ClientPropertyMap::value_type(attribute.getName(), attribute));   
529 }
530 
531 const AddressBase::ClientPropertyMap& AddressBase::getAttributes() const
532 {
533    return attributes_;
534 }
535 
536 string AddressBase::dumpAttributes(const string& extraOffset, bool clearText) const
537 {
538    string ret = "";
539    QosData::ClientPropertyMap::const_iterator iter = attributes_.begin();
540    while (iter != attributes_.end()) {
541       const ClientProperty& cp = (*iter).second;
542       ret += cp.toXml(extraOffset, clearText, ATTRIBUTE_TAG);
543       iter++;
544    }
545    return ret;
546 }
547 
548 
549 /**
550  * Dump state of this object into a XML ASCII string.
551  * <br>
552  * Only none default values are dumped for performance reasons
553  * @param extraOffset indenting of tags for nice output
554  * @return The xml representation
555  */
556 string AddressBase::toXml(const string& extraOffset) const
557 {
558    //if (log_.call()) log_.call(ME, "::toXml");
559    string ret;
560    string offset = Constants::OFFSET + extraOffset;
561    string offset2 = offset + Constants::INDENT;
562 
563    // <address type='SOCKET' bootstrapHostname='127.0.0.1' dispatchPlugin='undef'> ...
564    ret += offset + string("<") + rootTag_  + string(" type='") + getType() + string("'");
565    if ( (getVersion()!="") && (getVersion()!=DEFAULT_version))
566       ret += string(" version='") + getVersion() + string("'");
567    if (getHostname() != "")
568       ret += string(" bootstrapHostname='") + getHostname() + string("'");
569    if (DEFAULT_port != getPort())
570        ret += string(" bootstrapPort='") + lexical_cast<std::string>(getPort()) + string("'");
571    if (DEFAULT_sessionId != getSecretSessionId())
572        ret += string(" sessionId='") + getSecretSessionId() + string("'");
573    if (defaultPingInterval_ != getPingInterval())
574        ret += string(" pingInterval='") + lexical_cast<std::string>(getPingInterval()) + string("'");
575    if (defaultRetries_ != getRetries())
576        ret += string(" retries='") + lexical_cast<std::string>(getRetries()) + string("'");
577    if (defaultDelay_ != getDelay())
578        ret += string(" delay='") + lexical_cast<std::string>(getDelay()) + string("'");
579    if (DEFAULT_oneway != oneway()) {
580        ret += string(" oneway='") + lexical_cast<std::string>(oneway()) + string("'");
581    }
582    if (DEFAULT_dispatcherActive != isDispatcherActive()) {
583        ret += string(" dispatcherActive='") + lexical_cast<std::string>(isDispatcherActive()) + string("'");
584    }
585    if (DEFAULT_useForSubjectQueue != useForSubjectQueue_) {
586        ret += string(" useForSubjectQueue='") + lexical_cast<std::string>(useForSubjectQueue_) + string("'");
587    }
588    if (DEFAULT_dispatchPlugin != dispatchPlugin_)
589        ret += string(" dispatchPlugin='") + dispatchPlugin_ + string("'");
590    ret += string(">");
591    if (getRawAddress() != "")
592       ret += offset + string("   ") + getRawAddress();
593    if (getCollectTime() != DEFAULT_collectTime || getBurstModeMaxEntries() != DEFAULT_burstModeMaxEntries || getBurstModeMaxBytes() != DEFAULT_burstModeMaxBytes) {
594       ret += offset2 + string("<burstMode");
595       if (getCollectTime() != DEFAULT_collectTime)
596          ret += string(" collectTime='") + lexical_cast<std::string>(getCollectTime()) + string("'");
597       if (getBurstModeMaxEntries() != DEFAULT_burstModeMaxEntries)
598          ret += string(" maxEntries='") + lexical_cast<std::string>(getBurstModeMaxEntries()) + string("'");
599       if (getBurstModeMaxBytes() != DEFAULT_burstModeMaxBytes)
600          ret += string(" maxBytes='") + lexical_cast<std::string>(getBurstModeMaxBytes()) + string("'");
601       ret += string("/>");
602    }
603    if (getCompressType() != DEFAULT_compressType)
604       ret += offset2 + string("<compress type='") + getCompressType() + string("' minSize='") + lexical_cast<std::string>(getMinSize()) + string("'/>");
605    if (ptpAllowed_ != DEFAULT_ptpAllowed) {
606       ret += offset2 + string("<ptp>") + lexical_cast<std::string>(ptpAllowed_) + string("</ptp>");
607    }
608    ret += dumpAttributes(offset2);
609    ret += offset + string("</") + rootTag_ + string(">");
610    return ret;
611 }
612 
613 }}}}} // namespaces


syntax highlighted by Code2HTML, v. 0.9.1