00001 /*------------------------------------------------------------------------------ 00002 Name: Address.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Holding address string and protocol string 00006 Version: $Id: Address.cpp 13597 2005-07-28 10:11:52Z ruff $ 00007 ------------------------------------------------------------------------------*/ 00008 00023 #include <util/qos/address/Address.h> 00024 #include <util/lexical_cast.h> 00025 #include <util/Global.h> 00026 #include <util/StringTrim.h> 00027 00028 namespace org { namespace xmlBlaster { namespace util { namespace qos { namespace address { 00029 00030 using namespace std; 00031 00032 inline void Address::initialize() 00033 { 00034 setPort(global_.getProperty().getIntProperty("bootstrapPort", getPort())); 00035 00036 setType(global_.getProperty().getStringProperty("protocol", getType())); 00037 setType(global_.getProperty().getStringProperty("dispatch/connection/protocol", getType())); 00038 setCollectTime(global_.getProperty().getLongProperty("dispatch/connection/burstMode/collectTime", DEFAULT_collectTime)); 00039 setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/connection/burstMode/maxEntries", DEFAULT_burstModeMaxEntries)); 00040 setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/connection/burstMode/maxBytes", DEFAULT_burstModeMaxBytes)); 00041 setPingInterval(global_.getProperty().getLongProperty("dispatch/connection/pingInterval", defaultPingInterval_)); 00042 setRetries(global_.getProperty().getIntProperty("dispatch/connection/retries", defaultRetries_)); 00043 setDelay(global_.getProperty().getLongProperty("dispatch/connection/delay", defaultDelay_)); 00044 setOneway(global_.getProperty().getBoolProperty("dispatch/connection/oneway", DEFAULT_oneway)); 00045 setCompressType(global_.getProperty().getStringProperty("dispatch/connection/compress.type", DEFAULT_compressType)); 00046 setMinSize(global_.getProperty().getLongProperty("dispatch/connection/compress.minSize", DEFAULT_minSize)); 00047 setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/connection/ptpAllowed", DEFAULT_ptpAllowed)); 00048 setSecretSessionId(global_.getProperty().getStringProperty("dispatch/connection/sessionId", DEFAULT_sessionId)); 00049 setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/connection/DispatchPlugin/defaultPlugin", DEFAULT_dispatchPlugin)); 00050 if (nodeId_ != "") { 00051 setPort(global_.getProperty().getIntProperty("dispatch/connection/port["+nodeId_+"]", getPort())); 00052 00053 setType(global_.getProperty().getStringProperty("dispatch/connection/protocol["+nodeId_+"]", getType())); 00054 setCollectTime(global_.getProperty().getLongProperty("dispatch/connection/burstMode/collectTime["+nodeId_+"]", getCollectTime())); 00055 setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/connection/burstMode/maxEntries["+nodeId_+"]", getBurstModeMaxEntries())); 00056 setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/connection/burstMode/maxBytes["+nodeId_+"]", getBurstModeMaxBytes())); 00057 setPingInterval(global_.getProperty().getLongProperty("dispatch/connection/pingInterval["+nodeId_+"]", getPingInterval())); 00058 setRetries(global_.getProperty().getIntProperty("dispatch/connection/retries["+nodeId_+"]", getRetries())); 00059 setDelay(global_.getProperty().getLongProperty("dispatch/connection/delay["+nodeId_+"]", getDelay())); 00060 setOneway(global_.getProperty().getBoolProperty("dispatch/connection/oneway["+nodeId_+"]", oneway())); 00061 setCompressType(global_.getProperty().getStringProperty("dispatch/connection/compress.type["+nodeId_+"]", getCompressType())); 00062 setMinSize(global_.getProperty().getLongProperty("dispatch/connection/compress.minSize["+nodeId_+"]", getMinSize())); 00063 setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/connection/ptpAllowed["+nodeId_+"]", isPtpAllowed())); 00064 setSecretSessionId(global_.getProperty().getStringProperty("dispatch/connection/sessionId["+nodeId_+"]", getSecretSessionId())); 00065 setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/connection/DispatchPlugin/defaultPlugin["+nodeId_+"]", dispatchPlugin_)); 00066 } 00067 00068 // TODO: This is handled in ClientQueueProperty.java already -> 00069 // long maxEntries = global_.getProperty().getLongProperty("queue/connection/maxEntries", CbQueueProperty.DEFAULT_maxEntriesDefault); 00070 long maxEntries = global_.getProperty().getLongProperty("queue/connection/maxEntries", 10000l); 00071 setMaxEntries(maxEntries); 00072 if (nodeId_ != "") { 00073 setMaxEntries(global_.getProperty().getLongProperty("queue/connection/maxEntries["+nodeId_+"]", getMaxEntries())); 00074 } 00075 00076 // Resets cached rawAddress_ : 00077 string type = getType(); 00078 StringTrim::toLowerCase(type); 00079 // These properties are evaluated directly by our C SOCKET library: 00080 // -dispatch/connection/plugin/socket/port 00081 // -dispatch/connection/plugin/socket/hostname 00082 // -dispatch/connection/plugin/socket/localPort 00083 // -dispatch/connection/plugin/socket/localHostname 00084 hostname_ = global_.getProperty().getStringProperty("dispatch/connection/plugin/"+type+"/hostname", getHostname()); 00085 setPort(global_.getProperty().getIntProperty("dispatch/connection/plugin/"+type+"/port", getPort())); 00086 } 00087 00088 Address::Address(Global& global, const string& type, const string& nodeId) 00089 : AddressBase(global, "address") 00090 { 00091 defaultRetries_ = -1; // How often to retry if connection fails: defaults to -1 (retry forever), 0 switches failsafe mode off 00092 defaultDelay_ = 5000; // Delay between connection retries in milliseconds: defaults to 5000 (5 sec) 00093 defaultPingInterval_ = 10000; // Ping interval: pinging every given milliseconds, defaults to 10 seconds 00094 pingInterval_ = defaultPingInterval_; 00095 retries_ = defaultRetries_; 00096 delay_ = defaultDelay_; 00097 ME = "Address"; 00098 if (nodeId != "") nodeId_ = nodeId; 00099 initialize(); 00100 if (type != "") type_ = type; 00101 } 00102 00103 Address::Address(const AddressBase& addr) : AddressBase(addr) 00104 { 00105 } 00106 00107 Address& Address::operator =(const AddressBase& addr) 00108 { 00109 AddressBase::copy(addr); 00110 return *this; 00111 } 00112 00113 00114 void Address::setMaxEntries(long maxEntries) 00115 { 00116 maxEntries_ = maxEntries; 00117 } 00118 00119 long Address::getMaxEntries() const 00120 { 00121 return maxEntries_; 00122 } 00123 00125 string Address::getSettings() 00126 { 00127 string ret; 00128 ret = AddressBase::getSettings(); 00129 if (getDelay() > 0) 00130 ret += string(" delay=") + lexical_cast<std::string>(getDelay()) + 00131 string(" retries=") + lexical_cast<std::string>(getRetries()) + 00132 string(" maxEntries=") + lexical_cast<std::string>(getMaxEntries()) + 00133 string(" pingInterval=") + lexical_cast<std::string>(getPingInterval()); 00134 return ret; 00135 } 00136 00137 string Address::toString() 00138 { 00139 return getRawAddress(); 00140 } 00141 00145 string Address::usage() 00146 { 00147 string text = ""; 00148 text += string("Control failsafe connection to xmlBlaster server:\n"); 00149 // is in ClientQueueProperty.java: text += " -queue/connection/maxEntries The max. capacity of the client queue in number of messages [" + CbQueueProperty.DEFAULT_maxEntriesDefault + "].\n"; 00150 //text += " -queue/callback/onOverflow Error handling when queue is full, 'block | deadMessage' [" + CbQueueProperty.DEFAULT_onOverflow + "].\n"; 00151 //text += " -queue/callback/onFailure Error handling when connection failed (after all retries etc.) [" + CbQueueProperty.DEFAULT_onFailure + "].\n"; 00152 text += string(" -dispatch/connection/burstMode/collectTime [" + lexical_cast<std::string>(DEFAULT_collectTime) + "]\n"); 00153 text += string(" Number of milliseconds we shall collect publish messages.\n"); 00154 text += string(" This allows performance tuning, try set it to 200.\n"); 00155 text += string(" -dispatch/connection/burstMode/maxEntries [" + lexical_cast<std::string>(DEFAULT_burstModeMaxEntries) + "]\n"); 00156 text += string(" The maximum number of queue entries to send in a bulk.\n"); 00157 text += string(" -1L takes all entries of highest priority available in the ram queue in a bulk.\n"); 00158 text += string(" -dispatch/connection/burstMode/maxBytes [" + lexical_cast<std::string>(DEFAULT_burstModeMaxBytes) + "]\n"); 00159 text += string(" The maximum bulk size of invocations.\n"); 00160 text += string(" -1L takes all entries of highest priority available in the ram queue in a bulk.\n"); 00161 //text += " -oneway Shall the publish() messages be send oneway (no application level ACK) [" + Address.DEFAULT_oneway + "]\n"; 00162 text += string(" -dispatch/connection/pingInterval [" + lexical_cast<std::string>(defaultPingInterval_) + "]\n"); 00163 text += string(" Pinging every given milliseconds.\n"); 00164 text += string(" -dispatch/connection/retries [" + lexical_cast<std::string>(defaultRetries_) + "]\n"); 00165 text += string(" How often to retry if connection fails (-1 is forever).\n"); 00166 text += string(" -dispatch/connection/delay [" + lexical_cast<std::string>(defaultDelay_) + "]\n"); 00167 text += string(" Delay between connection retries in milliseconds.\n"); 00168 text += string(" A delay value > 0 switches fails save mode on, 0 switches it off.\n"); 00169 //text += " -DispatchPlugin/defaultPlugin Specify your specific dispatcher plugin [" + CallbackAddress.DEFAULT_dispatchPlugin + "]\n"; 00170 //text += " -compress.type With which format message be compressed on callback [" + Address.DEFAULT_compressType + "]\n"; 00171 //text += " -compress.minSize Messages bigger this size in bytes are compressed [" + Address.DEFAULT_minSize + "]\n"; 00172 return text; 00173 } 00174 00175 }}}}} // namespace 00176 00177 00178 #ifdef _XMLBLASTER_CLASSTEST 00179 00180 using namespace std; 00181 using namespace org::xmlBlaster::util::qos::address; 00182 00184 int main(int args, char* argv[]) 00185 { 00186 try { 00187 { 00188 Global& glob = Global::getInstance(); 00189 glob.initialize(args, argv); 00190 Log& log = glob.getLog("org.xmlBlaster.util.qos"); 00191 log.info("main", "This is a simple info"); 00192 Address a(glob); 00193 a.setType("SOCKET"); 00194 a.setAddress("127.0.0.1:7600"); 00195 a.setCollectTime(12345l); 00196 a.setPingInterval(54321l); 00197 a.setRetries(17); 00198 a.setDelay(7890l); 00199 a.setOneway(true); 00200 a.setSecretSessionId("0x4546hwi89"); 00201 cout << a.toXml() << endl; 00202 } 00203 { 00204 string nodeId = "heron"; 00205 int nmax = 8; 00206 const char** argc = new const char*[nmax]; 00207 argc[0] = "-sessionId"; 00208 argc[1] = "ERROR"; 00209 string help = string("-sessionId[") + nodeId + string("]"); 00210 argc[2] = string(help).c_str(); 00211 argc[3] = "OK"; 00212 argc[4] = "-pingInterval"; 00213 argc[5] = "8888"; 00214 help = string("-delay[") + nodeId + string("]"); 00215 argc[6] = help.c_str(); 00216 argc[7] = "8888"; 00217 00218 Global& glob = Global::getInstance(); 00219 glob.initialize(nmax, argc); 00220 Address a(glob, "RMI", nodeId); 00221 cout << a.toXml() << endl; 00222 } 00223 } 00224 catch(...) { 00225 cout << "unknown uncatched exception" << endl; 00226 } 00227 } 00228 00229 #endif