util/qos/address/Address.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      Address.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Holding address string and protocol string
00006 Version:   $Id: Address.cpp 13597 2005-07-28 10:11:52Z ruff $
00007 ------------------------------------------------------------------------------*/
00008 
00023 #include <util/qos/address/Address.h>
00024 #include <util/lexical_cast.h>
00025 #include <util/Global.h>
00026 #include <util/StringTrim.h>
00027 
00028 namespace org { namespace xmlBlaster { namespace util { namespace qos { namespace address {
00029 
00030 using namespace std;
00031 
00032 inline void Address::initialize()
00033 {
00034    setPort(global_.getProperty().getIntProperty("bootstrapPort", getPort()));
00035 
00036    setType(global_.getProperty().getStringProperty("protocol", getType()));
00037    setType(global_.getProperty().getStringProperty("dispatch/connection/protocol", getType()));
00038    setCollectTime(global_.getProperty().getLongProperty("dispatch/connection/burstMode/collectTime", DEFAULT_collectTime));
00039    setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/connection/burstMode/maxEntries", DEFAULT_burstModeMaxEntries));
00040    setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/connection/burstMode/maxBytes", DEFAULT_burstModeMaxBytes));
00041    setPingInterval(global_.getProperty().getLongProperty("dispatch/connection/pingInterval", defaultPingInterval_));
00042    setRetries(global_.getProperty().getIntProperty("dispatch/connection/retries", defaultRetries_));
00043    setDelay(global_.getProperty().getLongProperty("dispatch/connection/delay", defaultDelay_));
00044    setOneway(global_.getProperty().getBoolProperty("dispatch/connection/oneway", DEFAULT_oneway));
00045    setCompressType(global_.getProperty().getStringProperty("dispatch/connection/compress.type", DEFAULT_compressType));
00046    setMinSize(global_.getProperty().getLongProperty("dispatch/connection/compress.minSize", DEFAULT_minSize));
00047    setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/connection/ptpAllowed", DEFAULT_ptpAllowed));
00048    setSecretSessionId(global_.getProperty().getStringProperty("dispatch/connection/sessionId", DEFAULT_sessionId));
00049    setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/connection/DispatchPlugin/defaultPlugin", DEFAULT_dispatchPlugin));
00050    if (nodeId_ != "") {
00051       setPort(global_.getProperty().getIntProperty("dispatch/connection/port["+nodeId_+"]", getPort()));
00052 
00053       setType(global_.getProperty().getStringProperty("dispatch/connection/protocol["+nodeId_+"]", getType()));
00054       setCollectTime(global_.getProperty().getLongProperty("dispatch/connection/burstMode/collectTime["+nodeId_+"]", getCollectTime()));
00055       setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/connection/burstMode/maxEntries["+nodeId_+"]", getBurstModeMaxEntries()));
00056       setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/connection/burstMode/maxBytes["+nodeId_+"]", getBurstModeMaxBytes()));
00057       setPingInterval(global_.getProperty().getLongProperty("dispatch/connection/pingInterval["+nodeId_+"]", getPingInterval()));
00058       setRetries(global_.getProperty().getIntProperty("dispatch/connection/retries["+nodeId_+"]", getRetries()));
00059       setDelay(global_.getProperty().getLongProperty("dispatch/connection/delay["+nodeId_+"]", getDelay()));
00060       setOneway(global_.getProperty().getBoolProperty("dispatch/connection/oneway["+nodeId_+"]", oneway()));
00061       setCompressType(global_.getProperty().getStringProperty("dispatch/connection/compress.type["+nodeId_+"]", getCompressType()));
00062       setMinSize(global_.getProperty().getLongProperty("dispatch/connection/compress.minSize["+nodeId_+"]", getMinSize()));
00063       setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/connection/ptpAllowed["+nodeId_+"]", isPtpAllowed()));
00064       setSecretSessionId(global_.getProperty().getStringProperty("dispatch/connection/sessionId["+nodeId_+"]", getSecretSessionId()));
00065       setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/connection/DispatchPlugin/defaultPlugin["+nodeId_+"]", dispatchPlugin_));
00066    }
00067 
00068    // TODO: This is handled in ClientQueueProperty.java already ->
00069    //      long maxEntries = global_.getProperty().getLongProperty("queue/connection/maxEntries", CbQueueProperty.DEFAULT_maxEntriesDefault);
00070    long maxEntries = global_.getProperty().getLongProperty("queue/connection/maxEntries", 10000l);
00071    setMaxEntries(maxEntries);
00072    if (nodeId_ != "") {
00073       setMaxEntries(global_.getProperty().getLongProperty("queue/connection/maxEntries["+nodeId_+"]", getMaxEntries()));
00074    }
00075 
00076    // Resets cached rawAddress_ :
00077    string type = getType();
00078    StringTrim::toLowerCase(type);
00079    // These properties are evaluated directly by our C SOCKET library:
00080    // -dispatch/connection/plugin/socket/port
00081    // -dispatch/connection/plugin/socket/hostname
00082    // -dispatch/connection/plugin/socket/localPort
00083    // -dispatch/connection/plugin/socket/localHostname
00084    hostname_ = global_.getProperty().getStringProperty("dispatch/connection/plugin/"+type+"/hostname", getHostname());
00085    setPort(global_.getProperty().getIntProperty("dispatch/connection/plugin/"+type+"/port", getPort()));
00086 }
00087 
00088 Address::Address(Global& global, const string& type, const string& nodeId)
00089  : AddressBase(global, "address")
00090 {
00091    defaultRetries_      = -1;    // How often to retry if connection fails: defaults to -1 (retry forever), 0 switches failsafe mode off
00092    defaultDelay_        = 5000;  // Delay between connection retries in milliseconds: defaults to 5000 (5 sec)
00093    defaultPingInterval_ = 10000; // Ping interval: pinging every given milliseconds, defaults to 10 seconds
00094    pingInterval_ = defaultPingInterval_;
00095    retries_      = defaultRetries_;
00096    delay_        = defaultDelay_;
00097    ME = "Address";
00098    if (nodeId != "") nodeId_ = nodeId;
00099    initialize();
00100    if (type != "")   type_ = type;
00101 }
00102 
00103 Address::Address(const AddressBase& addr) : AddressBase(addr)
00104 {
00105 }
00106 
00107 Address& Address::operator =(const AddressBase& addr)
00108 {
00109    AddressBase::copy(addr);
00110    return *this;
00111 }
00112 
00113 
00114 void Address::setMaxEntries(long maxEntries)
00115 {
00116    maxEntries_ = maxEntries;
00117 }
00118 
00119 long Address::getMaxEntries() const
00120 {
00121    return maxEntries_;
00122 }
00123 
00125 string Address::getSettings()
00126 {
00127    string ret;
00128    ret = AddressBase::getSettings();
00129    if (getDelay() > 0)
00130       ret += string(" delay=") + lexical_cast<std::string>(getDelay()) +
00131              string(" retries=") + lexical_cast<std::string>(getRetries()) +
00132              string(" maxEntries=") + lexical_cast<std::string>(getMaxEntries()) +
00133              string(" pingInterval=") + lexical_cast<std::string>(getPingInterval());
00134    return ret;
00135 }
00136 
00137 string Address::toString()
00138 {
00139    return getRawAddress();
00140 }
00141 
00145 string Address::usage()
00146 {
00147    string text = "";
00148    text += string("Control failsafe connection to xmlBlaster server:\n");
00149    // is in ClientQueueProperty.java: text += "   -queue/connection/maxEntries       The max. capacity of the client queue in number of messages [" + CbQueueProperty.DEFAULT_maxEntriesDefault + "].\n";
00150    //text += "   -queue/callback/onOverflow   Error handling when queue is full, 'block | deadMessage' [" + CbQueueProperty.DEFAULT_onOverflow + "].\n";
00151    //text += "   -queue/callback/onFailure    Error handling when connection failed (after all retries etc.) [" + CbQueueProperty.DEFAULT_onFailure + "].\n";
00152    text += string("   -dispatch/connection/burstMode/collectTime [" + lexical_cast<std::string>(DEFAULT_collectTime) + "]\n");
00153    text += string("                       Number of milliseconds we shall collect publish messages.\n");
00154    text += string("                       This allows performance tuning, try set it to 200.\n");
00155    text += string("   -dispatch/connection/burstMode/maxEntries [" + lexical_cast<std::string>(DEFAULT_burstModeMaxEntries) + "]\n");
00156    text += string("                       The maximum number of queue entries to send in a bulk.\n");
00157    text += string("                       -1L takes all entries of highest priority available in the ram queue in a bulk.\n");
00158    text += string("   -dispatch/connection/burstMode/maxBytes [" + lexical_cast<std::string>(DEFAULT_burstModeMaxBytes) + "]\n");
00159    text += string("                       The maximum bulk size of invocations.\n");
00160    text += string("                       -1L takes all entries of highest priority available in the ram queue in a bulk.\n");
00161  //text += "   -oneway             Shall the publish() messages be send oneway (no application level ACK) [" + Address.DEFAULT_oneway + "]\n";
00162    text += string("   -dispatch/connection/pingInterval [" + lexical_cast<std::string>(defaultPingInterval_) + "]\n");
00163    text += string("                       Pinging every given milliseconds.\n");
00164    text += string("   -dispatch/connection/retries [" + lexical_cast<std::string>(defaultRetries_) + "]\n");
00165    text += string("                       How often to retry if connection fails (-1 is forever).\n");
00166    text += string("   -dispatch/connection/delay [" + lexical_cast<std::string>(defaultDelay_) + "]\n");
00167    text += string("                       Delay between connection retries in milliseconds.\n");
00168    text += string("                       A delay value > 0 switches fails save mode on, 0 switches it off.\n");
00169  //text += "   -DispatchPlugin/defaultPlugin  Specify your specific dispatcher plugin [" + CallbackAddress.DEFAULT_dispatchPlugin + "]\n";
00170  //text += "   -compress.type      With which format message be compressed on callback [" + Address.DEFAULT_compressType + "]\n";
00171  //text += "   -compress.minSize   Messages bigger this size in bytes are compressed [" + Address.DEFAULT_minSize + "]\n";
00172    return text;
00173 }
00174 
00175 }}}}} // namespace
00176 
00177 
00178 #ifdef _XMLBLASTER_CLASSTEST
00179 
00180 using namespace std;
00181 using namespace org::xmlBlaster::util::qos::address;
00182 
00184 int main(int args, char* argv[])
00185 {
00186    try {
00187       {
00188          Global& glob = Global::getInstance();
00189          glob.initialize(args, argv);
00190          Log& log = glob.getLog("org.xmlBlaster.util.qos");
00191          log.info("main", "This is a simple info");
00192          Address a(glob);
00193          a.setType("SOCKET");
00194          a.setAddress("127.0.0.1:7600");
00195          a.setCollectTime(12345l);
00196          a.setPingInterval(54321l);
00197          a.setRetries(17);
00198          a.setDelay(7890l);
00199          a.setOneway(true);
00200          a.setSecretSessionId("0x4546hwi89");
00201          cout << a.toXml() << endl;
00202       }
00203       {
00204          string nodeId = "heron";
00205          int                nmax = 8;
00206          const char** argc = new const char*[nmax];
00207          argc[0] = "-sessionId";
00208          argc[1] = "ERROR";
00209          string help = string("-sessionId[") + nodeId + string("]");
00210          argc[2] = string(help).c_str();
00211          argc[3] = "OK";
00212          argc[4] = "-pingInterval";
00213          argc[5] = "8888";
00214          help = string("-delay[") + nodeId + string("]");
00215          argc[6] = help.c_str();
00216          argc[7] = "8888";
00217 
00218          Global& glob = Global::getInstance();
00219          glob.initialize(nmax, argc);
00220          Address a(glob, "RMI", nodeId);
00221          cout << a.toXml() << endl;
00222       }
00223    }
00224    catch(...) {
00225       cout << "unknown uncatched exception" << endl;
00226    }
00227 }
00228 
00229 #endif