00001 /*------------------------------------------------------------------------------ 00002 Name: CallbackAddress.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Holding callback address string and protocol string 00006 ------------------------------------------------------------------------------*/ 00007 00008 #include <util/qos/address/CallbackAddress.h> 00009 #include <util/lexical_cast.h> 00010 #include <util/Global.h> 00011 00012 namespace org { namespace xmlBlaster { namespace util { namespace qos { namespace address { 00013 00014 using namespace std; 00015 using namespace org::xmlBlaster::util; 00016 00017 00018 inline void CallbackAddress::initialize() 00019 { 00020 initHostname(global_.getCbHostname()); // don't use setHostname() as it would set isCardcodedHostname=true 00021 setPort(global_.getProperty().getIntProperty("dispatch/callback/port", getPort())); 00022 setType(global_.getProperty().getStringProperty("protocol", getType())); 00023 setType(global_.getProperty().getStringProperty("dispatch/callback/protocol", getType())); 00024 setCollectTime(global_.getProperty().getLongProperty("dispatch/callback/burstMode/collectTime", DEFAULT_collectTime)); // sync update() 00025 setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/callback/burstMode/maxEntries", DEFAULT_burstModeMaxEntries)); 00026 setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/callback/burstMode/maxBytes", DEFAULT_burstModeMaxBytes)); 00027 setPingInterval(global_.getProperty().getLongProperty("dispatch/callback/pingInterval", defaultPingInterval_)); 00028 setRetries(global_.getProperty().getIntProperty("dispatch/callback/retries", defaultRetries_)); 00029 setDelay(global_.getProperty().getLongProperty("dispatch/callback/delay", defaultDelay_)); 00030 useForSubjectQueue(global_.getProperty().getBoolProperty("dispatch/callback/useForSubjectQueue", DEFAULT_useForSubjectQueue)); 00031 setOneway(global_.getProperty().getBoolProperty("dispatch/callback/oneway", DEFAULT_oneway)); 00032 setDispatcherActive(global_.getProperty().getBoolProperty("dispatch/callback/dispatcherActive", DEFAULT_dispatcherActive)); 00033 setCompressType(global_.getProperty().getStringProperty("dispatch/callback/compress.type", DEFAULT_compressType)); 00034 setMinSize(global_.getProperty().getLongProperty("dispatch/callback/compress.minSize", DEFAULT_minSize)); 00035 setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/callback/ptpAllowed", DEFAULT_ptpAllowed)); 00036 setSecretSessionId(global_.getProperty().getStringProperty("dispatch/callback/sessionId", DEFAULT_sessionId)); 00037 setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/callback/DispatchPlugin/defaultPlugin", DEFAULT_dispatchPlugin)); 00038 if (nodeId_ != "") { 00039 setPort(global_.getProperty().getIntProperty("dispatch/callback/port["+nodeId_+"]", getPort())); 00040 setType(global_.getProperty().getStringProperty("dispatch/callback/protocol["+nodeId_+"]", getType())); 00041 setCollectTime(global_.getProperty().getLongProperty("dispatch/callback/burstMode/collectTime["+nodeId_+"]", collectTime_)); 00042 setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/callback/burstMode/maxEntries["+nodeId_+"]", getBurstModeMaxEntries())); 00043 setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/callback/burstMode/maxBytes["+nodeId_+"]", getBurstModeMaxBytes())); 00044 setPingInterval(global_.getProperty().getLongProperty("dispatch/callback/pingInterval["+nodeId_+"]", pingInterval_)); 00045 setRetries(global_.getProperty().getIntProperty("dispatch/callback/retries["+nodeId_+"]", retries_)); 00046 setDelay(global_.getProperty().getLongProperty("dispatch/callback/delay["+nodeId_+"]", delay_)); 00047 useForSubjectQueue(global_.getProperty().getBoolProperty("dispatch/callback/useForSubjectQueue["+nodeId_+"]", useForSubjectQueue_)); 00048 setOneway(global_.getProperty().getBoolProperty("dispatch/callback/oneway["+nodeId_+"]", oneway_)); 00049 setDispatcherActive(global_.getProperty().getBoolProperty("dispatch/callback/dispatcherActive["+nodeId_+"]", dispatcherActive_)); 00050 setCompressType(global_.getProperty().getStringProperty("dispatch/callback/compress.type["+nodeId_+"]", compressType_)); 00051 setMinSize(global_.getProperty().getLongProperty("dispatch/callback/compress.minSize["+nodeId_+"]", minSize_)); 00052 setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/callback/ptpAllowed["+nodeId_+"]", ptpAllowed_)); 00053 setSecretSessionId(global_.getProperty().getStringProperty("dispatch/callback/sessionId["+nodeId_+"]", sessionId_)); 00054 setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/callback/DispatchPlugin/defaultPlugin["+nodeId_+"]", dispatchPlugin_)); 00055 } 00056 } 00057 00058 00059 00060 CallbackAddress::CallbackAddress(Global& global, const string& type, const string nodeId) 00061 : AddressBase(global, "callback") 00062 { 00063 defaultRetries_ = 0; 00064 defaultDelay_ = Constants::MINUTE_IN_MILLIS; 00065 defaultPingInterval_ = Constants::MINUTE_IN_MILLIS; 00066 ME = "CallbackAddress"; 00067 if (nodeId != "") nodeId_ = nodeId; 00068 pingInterval_ = defaultPingInterval_; 00069 retries_ = defaultRetries_; 00070 delay_ = defaultDelay_; 00071 initialize(); 00072 if (type != "") setType(type); 00073 } 00074 00075 CallbackAddress::CallbackAddress(const AddressBase& addr) : AddressBase(addr) 00076 { 00077 } 00078 00079 CallbackAddress& CallbackAddress::operator =(const AddressBase& addr) 00080 { 00081 AddressBase::copy(addr); 00082 return *this; 00083 } 00084 00089 bool CallbackAddress::useForSubjectQueue() 00090 { 00091 return useForSubjectQueue_; 00092 } 00093 00098 void CallbackAddress::useForSubjectQueue(bool useForSubjectQueue) 00099 { 00100 useForSubjectQueue_ = useForSubjectQueue; 00101 } 00102 00103 string CallbackAddress::toString() 00104 { 00105 return getRawAddress(); 00106 } 00107 00111 string CallbackAddress::usage() 00112 { 00113 string text; 00114 text += string("Control xmlBlaster server side callback (if we install a local callback server):\n"); 00115 text += string(" -dispatch/callback/sessionId []\n"); 00116 text += string(" The session ID which is passed to our callback server update() method.\n"); 00117 text += string(" -dispatch/callback/burstMode/collectTime [") + lexical_cast<std::string>(DEFAULT_collectTime) + string("]\n"); 00118 text += string(" Number of milliseconds xmlBlaster shall collect callback messages.\n"); 00119 text += string(" The burst mode allows performance tuning, try set it to 200.\n"); 00120 text += string(" -dispatch/callback/burstMode/maxEntries [" + lexical_cast<std::string>(DEFAULT_burstModeMaxEntries) + "]\n"); 00121 text += string(" The maximum number of callback queue entries to send in a bulk.\n"); 00122 text += string(" -1L takes all entries of highest priority available in the callback ram queue in a bulk.\n"); 00123 text += string(" -dispatch/callback/burstMode/maxBytes [" + lexical_cast<std::string>(DEFAULT_burstModeMaxBytes) + "]\n"); 00124 text += string(" The maximum bulk size of callback messages.\n"); 00125 text += string(" -1L takes all entries of highest priority available in the callback ram queue in a bulk.\n"); 00126 00127 text += string(" -dispatch/callback/oneway [") + lexical_cast<std::string>(DEFAULT_oneway) + string("]\n"); 00128 text += string(" Shall the update() messages be send oneway (no application level ACK).\n"); 00129 00130 text += string(" -dispatch/callback/dispatcherActive [") + lexical_cast<string>(DEFAULT_dispatcherActive) + string("]\n"); 00131 text += string(" If false inhibit delivery of callback messages.\n"); 00132 00133 text += string(" -dispatch/callback/pingInterval [") + lexical_cast<std::string>(defaultPingInterval_) + string("]\n"); 00134 text += string(" Pinging every given milliseconds.\n"); 00135 text += string(" -dispatch/callback/retries [") + lexical_cast<std::string>(defaultRetries_) + string("]\n"); 00136 text += string(" How often to retry if callback fails.\n"); 00137 text += string(" -1 forever, 0 no retry, > 0 number of retries.\n"); 00138 text += string(" -dispatch/callback/delay [") + lexical_cast<std::string>(defaultDelay_) + string("]\n"); 00139 text += string(" Delay between callback retries in millisecond.\n"); 00140 //text += string(" -dispatch/callback/compress.type With which format message be compressed on callback [") + DEFAULT_compressType + string("]\n"); 00141 //text += string(" -dispatch/callback/compress.minSize Messages bigger this size in bytes are compressed [") + lexical_cast<std::string>(DEFAULT_minSize) + string("]\n"); 00142 00143 //text += string(" -cb.ptpAllowed PtP messages wanted? false prevents spamming [") + lexical_cast<std::string>(DEFAULT_ptpAllowed) + string("]\n"); 00144 //text += " -cb.DispatchPlugin/defaultPlugin Specify your specific dispatcher plugin [" + CallbackAddress.DEFAULT_dispatchPlugin + "]\n"; 00145 return text; 00146 } 00147 00148 }}}}} // namespace 00149 00150 #ifdef _XMLBLASTER_CLASSTEST 00151 00152 using namespace std; 00153 using namespace org::xmlBlaster::util::qos::address; 00154 00156 int main(int args, char* argv[]) 00157 { 00158 try { 00159 { 00160 Global& glob = Global::getInstance(); 00161 glob.initialize(args, argv); 00162 00163 CallbackAddress a(glob); 00164 a.setType("SOCKET"); 00165 a.setAddress("127.0.0.1:7600"); 00166 a.setCollectTime(12345L); 00167 a.setPingInterval(54321L); 00168 a.setRetries(17); 00169 a.setDelay(7890L); 00170 a.setOneway(true); 00171 a.setSecretSessionId("0x4546hwi89"); 00172 cout << a.toXml() << endl; 00173 } 00174 { 00175 string nodeId = "heron"; 00176 00177 int nmax = 8; 00178 const char** argc = new const char*[nmax]; 00179 string help = string("-dispatch/callback/sessionId[") +nodeId + string("]"); 00180 argc[0] = help.c_str(); 00181 argc[1] = "OK"; 00182 argc[2] = "dispatch/callback/sessionId"; 00183 argc[3] = "ERROR"; 00184 argc[4] = "-cb.pingInterval"; 00185 argc[5] = "8888"; 00186 help = string("-cb.delay[") +nodeId + string("]"); 00187 argc[6] = help.c_str(); 00188 argc[7] = "8888"; 00189 00190 Global& glob = Global::getInstance(); 00191 glob.initialize(nmax, argc); 00192 CallbackAddress a(glob, "RMI", nodeId); 00193 cout << a.toXml() << endl; 00194 } 00195 } 00196 catch(...) { 00197 cout << "Exception in main method" << endl; 00198 } 00199 } 00200 00201 #endif