1 /*------------------------------------------------------------------------------
  2 Name:      CallbackAddress.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Holding callback address string and protocol string
  6 ------------------------------------------------------------------------------*/
  7 
  8 #include <util/qos/address/CallbackAddress.h>
  9 #include <util/lexical_cast.h>
 10 #include <util/Global.h>
 11 
 12 namespace org { namespace xmlBlaster { namespace util { namespace qos { namespace address {
 13 
 14 using namespace std;
 15 using namespace org::xmlBlaster::util;
 16 
 17 
 18 inline void CallbackAddress::initialize()
 19 {
 20    initHostname(global_.getCbHostname()); // don't use setHostname() as it would set isCardcodedHostname=true
 21    setPort(global_.getProperty().getIntProperty("dispatch/callback/port", getPort()));
 22    setType(global_.getProperty().getStringProperty("protocol", getType()));
 23    setType(global_.getProperty().getStringProperty("dispatch/callback/protocol", getType()));
 24    setCollectTime(global_.getProperty().getLongProperty("dispatch/callback/burstMode/collectTime", DEFAULT_collectTime)); // sync update()
 25    setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/callback/burstMode/maxEntries", DEFAULT_burstModeMaxEntries));
 26    setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/callback/burstMode/maxBytes", DEFAULT_burstModeMaxBytes));
 27    setPingInterval(global_.getProperty().getLongProperty("dispatch/callback/pingInterval", defaultPingInterval_));
 28    setRetries(global_.getProperty().getIntProperty("dispatch/callback/retries", defaultRetries_));
 29    setDelay(global_.getProperty().getLongProperty("dispatch/callback/delay", defaultDelay_));
 30    useForSubjectQueue(global_.getProperty().getBoolProperty("dispatch/callback/useForSubjectQueue", DEFAULT_useForSubjectQueue));
 31    setOneway(global_.getProperty().getBoolProperty("dispatch/callback/oneway", DEFAULT_oneway));
 32    setDispatcherActive(global_.getProperty().getBoolProperty("dispatch/callback/dispatcherActive", DEFAULT_dispatcherActive));
 33    setCompressType(global_.getProperty().getStringProperty("dispatch/callback/compress.type", DEFAULT_compressType));
 34    setMinSize(global_.getProperty().getLongProperty("dispatch/callback/compress.minSize", DEFAULT_minSize));
 35    setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/callback/ptpAllowed", DEFAULT_ptpAllowed));
 36    setSecretSessionId(global_.getProperty().getStringProperty("dispatch/callback/sessionId", DEFAULT_sessionId));
 37    setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/callback/DispatchPlugin/defaultPlugin", DEFAULT_dispatchPlugin));
 38    if (nodeId_ != "") {
 39       setPort(global_.getProperty().getIntProperty("dispatch/callback/port["+nodeId_+"]", getPort()));
 40       setType(global_.getProperty().getStringProperty("dispatch/callback/protocol["+nodeId_+"]", getType()));
 41       setCollectTime(global_.getProperty().getLongProperty("dispatch/callback/burstMode/collectTime["+nodeId_+"]", collectTime_));
 42       setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/callback/burstMode/maxEntries["+nodeId_+"]", getBurstModeMaxEntries()));
 43       setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/callback/burstMode/maxBytes["+nodeId_+"]", getBurstModeMaxBytes()));
 44       setPingInterval(global_.getProperty().getLongProperty("dispatch/callback/pingInterval["+nodeId_+"]", pingInterval_));
 45       setRetries(global_.getProperty().getIntProperty("dispatch/callback/retries["+nodeId_+"]", retries_));
 46       setDelay(global_.getProperty().getLongProperty("dispatch/callback/delay["+nodeId_+"]", delay_));
 47       useForSubjectQueue(global_.getProperty().getBoolProperty("dispatch/callback/useForSubjectQueue["+nodeId_+"]", useForSubjectQueue_));
 48       setOneway(global_.getProperty().getBoolProperty("dispatch/callback/oneway["+nodeId_+"]", oneway_));
 49       setDispatcherActive(global_.getProperty().getBoolProperty("dispatch/callback/dispatcherActive["+nodeId_+"]", dispatcherActive_));
 50       setCompressType(global_.getProperty().getStringProperty("dispatch/callback/compress.type["+nodeId_+"]", compressType_));
 51       setMinSize(global_.getProperty().getLongProperty("dispatch/callback/compress.minSize["+nodeId_+"]", minSize_));
 52       setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/callback/ptpAllowed["+nodeId_+"]", ptpAllowed_));
 53       setSecretSessionId(global_.getProperty().getStringProperty("dispatch/callback/sessionId["+nodeId_+"]", sessionId_));
 54       setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/callback/DispatchPlugin/defaultPlugin["+nodeId_+"]", dispatchPlugin_));
 55    }
 56 }
 57 
 58 
 59 
 60 CallbackAddress::CallbackAddress(Global& global, const string& type, const string nodeId)
 61    : AddressBase(global, "callback")
 62 {
 63    defaultRetries_      = 0;
 64    defaultDelay_        = Constants::MINUTE_IN_MILLIS;
 65    defaultPingInterval_ = Constants::MINUTE_IN_MILLIS;
 66    ME = "CallbackAddress";
 67    if (nodeId != "") nodeId_ = nodeId;
 68    pingInterval_ = defaultPingInterval_;
 69    retries_      = defaultRetries_;
 70    delay_        = defaultDelay_;
 71    initialize();
 72    if (type != "") setType(type);
 73 }
 74 
 75 CallbackAddress::CallbackAddress(const AddressBase& addr) : AddressBase(addr)
 76 {
 77 }
 78 
 79 CallbackAddress& CallbackAddress::operator =(const AddressBase& addr)
 80 {
 81    AddressBase::copy(addr);
 82    return *this;
 83 }
 84 
 85 /**
 86  * Shall this address be used for subject queue messages?
 87  * @return false if address is for session queue only
 88  */
 89 bool CallbackAddress::useForSubjectQueue()
 90 {
 91    return useForSubjectQueue_;
 92 }
 93 
 94 /**
 95  * Shall this address be used for subject queue messages?
 96  * @param useForSubjectQueue false if address is for session queue only
 97  */
 98 void CallbackAddress::useForSubjectQueue(bool useForSubjectQueue)
 99 {
100    useForSubjectQueue_ = useForSubjectQueue;
101 }
102 
103 string CallbackAddress::toString()
104 {
105    return getRawAddress();
106 }
107 
108 /**
109  * Get a usage string for the server side supported callback connection parameters
110  */
111 string CallbackAddress::usage()
112 {
113    string text;
114    text += string("Control xmlBlaster server side callback (if we install a local callback server):\n");
115    text += string("   -dispatch/callback/sessionId []\n");
116    text += string("                       The session ID which is passed to our callback server update() method.\n");
117    text += string("   -dispatch/callback/burstMode/collectTime [") + lexical_cast<std::string>(DEFAULT_collectTime) + string("]\n");
118    text += string("                       Number of milliseconds xmlBlaster shall collect callback messages.\n");
119    text += string("                       The burst mode allows performance tuning, try set it to 200.\n");
120    text += string("   -dispatch/callback/burstMode/maxEntries [" + lexical_cast<std::string>(DEFAULT_burstModeMaxEntries) + "]\n");
121    text += string("                       The maximum number of callback queue entries to send in a bulk.\n");
122    text += string("                       -1L takes all entries of highest priority available in the callback ram queue in a bulk.\n");
123    text += string("   -dispatch/callback/burstMode/maxBytes [" + lexical_cast<std::string>(DEFAULT_burstModeMaxBytes) + "]\n");
124    text += string("                       The maximum bulk size of callback messages.\n");
125    text += string("                       -1L takes all entries of highest priority available in the callback ram queue in a bulk.\n");
126 
127    text += string("   -dispatch/callback/oneway [") + lexical_cast<std::string>(DEFAULT_oneway) + string("]\n");
128    text += string("                       Shall the update() messages be send oneway (no application level ACK).\n");
129 
130    text += string("   -dispatch/callback/dispatcherActive [") + lexical_cast<string>(DEFAULT_dispatcherActive) + string("]\n");
131    text += string("                       If false inhibit delivery of callback messages.\n");
132 
133    text += string("   -dispatch/callback/pingInterval [") + lexical_cast<std::string>(defaultPingInterval_) + string("]\n");
134    text += string("                       Pinging every given milliseconds.\n");
135    text += string("   -dispatch/callback/retries [") + lexical_cast<std::string>(defaultRetries_) + string("]\n");
136    text += string("                       How often to retry if callback fails.\n");
137    text += string("                       -1 forever, 0 no retry, > 0 number of retries.\n");
138    text += string("   -dispatch/callback/delay [") + lexical_cast<std::string>(defaultDelay_) + string("]\n");
139    text += string("                       Delay between callback retries in millisecond.\n");
140    //text += string("   -dispatch/callback/compress.type   With which format message be compressed on callback [") + DEFAULT_compressType + string("]\n");
141    //text += string("   -dispatch/callback/compress.minSize Messages bigger this size in bytes are compressed [") + lexical_cast<std::string>(DEFAULT_minSize) + string("]\n");
142 
143    //text += string("   -cb.ptpAllowed      PtP messages wanted? false prevents spamming [") + lexical_cast<std::string>(DEFAULT_ptpAllowed) + string("]\n");
144    //text += "   -cb.DispatchPlugin/defaultPlugin  Specify your specific dispatcher plugin [" + CallbackAddress.DEFAULT_dispatchPlugin + "]\n";
145    return text;
146 }
147 
148 }}}}} // namespace
149 
150 #ifdef _XMLBLASTER_CLASSTEST
151 
152 using namespace std;
153 using namespace org::xmlBlaster::util::qos::address;
154 
155 /** For testing: java org.xmlBlaster.authentication.plugins.simple.SecurityQos */
156 int main(int args, char* argv[])
157 {
158    try {
159       {
160          Global& glob = Global::getInstance();
161          glob.initialize(args, argv);
162 
163          CallbackAddress a(glob);
164          a.setType("SOCKET");
165          a.setAddress("127.0.0.1:7600");
166          a.setCollectTime(12345L);
167          a.setPingInterval(54321L);
168          a.setRetries(17);
169          a.setDelay(7890L);
170          a.setOneway(true);
171          a.setSecretSessionId("0x4546hwi89");
172          cout << a.toXml() << endl;
173       }
174       {
175          string nodeId = "heron";
176 
177          int                nmax = 8;
178          const char** argc = new const char*[nmax];
179          string help = string("-dispatch/callback/sessionId[") +nodeId + string("]");
180          argc[0] = help.c_str();
181          argc[1] = "OK";
182          argc[2] = "dispatch/callback/sessionId";
183          argc[3] = "ERROR";
184          argc[4] = "-cb.pingInterval";
185          argc[5] = "8888";
186          help = string("-cb.delay[") +nodeId + string("]");
187          argc[6] = help.c_str();
188          argc[7] = "8888";
189 
190          Global& glob = Global::getInstance();
191          glob.initialize(nmax, argc);
192          CallbackAddress a(glob, "RMI", nodeId);
193          cout << a.toXml() << endl;
194       }
195    }
196    catch(...) {
197       cout << "Exception in main method" << endl;
198    }
199 }
200 
201 #endif


syntax highlighted by Code2HTML, v. 0.9.1