1 /*------------------------------------------------------------------------------
2 Name: CallbackAddress.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Holding callback address string and protocol string
6 ------------------------------------------------------------------------------*/
7
8 #include <util/qos/address/CallbackAddress.h>
9 #include <util/lexical_cast.h>
10 #include <util/Global.h>
11
12 namespace org { namespace xmlBlaster { namespace util { namespace qos { namespace address {
13
14 using namespace std;
15 using namespace org::xmlBlaster::util;
16
17
18 inline void CallbackAddress::initialize()
19 {
20 initHostname(global_.getCbHostname()); // don't use setHostname() as it would set isCardcodedHostname=true
21 setPort(global_.getProperty().getIntProperty("dispatch/callback/port", getPort()));
22 setType(global_.getProperty().getStringProperty("protocol", getType()));
23 setType(global_.getProperty().getStringProperty("dispatch/callback/protocol", getType()));
24 setCollectTime(global_.getProperty().getLongProperty("dispatch/callback/burstMode/collectTime", DEFAULT_collectTime)); // sync update()
25 setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/callback/burstMode/maxEntries", DEFAULT_burstModeMaxEntries));
26 setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/callback/burstMode/maxBytes", DEFAULT_burstModeMaxBytes));
27 setPingInterval(global_.getProperty().getLongProperty("dispatch/callback/pingInterval", defaultPingInterval_));
28 setRetries(global_.getProperty().getIntProperty("dispatch/callback/retries", defaultRetries_));
29 setDelay(global_.getProperty().getLongProperty("dispatch/callback/delay", defaultDelay_));
30 useForSubjectQueue(global_.getProperty().getBoolProperty("dispatch/callback/useForSubjectQueue", DEFAULT_useForSubjectQueue));
31 setOneway(global_.getProperty().getBoolProperty("dispatch/callback/oneway", DEFAULT_oneway));
32 setDispatcherActive(global_.getProperty().getBoolProperty("dispatch/callback/dispatcherActive", DEFAULT_dispatcherActive));
33 setCompressType(global_.getProperty().getStringProperty("dispatch/callback/compress.type", DEFAULT_compressType));
34 setMinSize(global_.getProperty().getLongProperty("dispatch/callback/compress.minSize", DEFAULT_minSize));
35 setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/callback/ptpAllowed", DEFAULT_ptpAllowed));
36 setSecretSessionId(global_.getProperty().getStringProperty("dispatch/callback/sessionId", DEFAULT_sessionId));
37 setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/callback/DispatchPlugin/defaultPlugin", DEFAULT_dispatchPlugin));
38 if (nodeId_ != "") {
39 setPort(global_.getProperty().getIntProperty("dispatch/callback/port["+nodeId_+"]", getPort()));
40 setType(global_.getProperty().getStringProperty("dispatch/callback/protocol["+nodeId_+"]", getType()));
41 setCollectTime(global_.getProperty().getLongProperty("dispatch/callback/burstMode/collectTime["+nodeId_+"]", collectTime_));
42 setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/callback/burstMode/maxEntries["+nodeId_+"]", getBurstModeMaxEntries()));
43 setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/callback/burstMode/maxBytes["+nodeId_+"]", getBurstModeMaxBytes()));
44 setPingInterval(global_.getProperty().getLongProperty("dispatch/callback/pingInterval["+nodeId_+"]", pingInterval_));
45 setRetries(global_.getProperty().getIntProperty("dispatch/callback/retries["+nodeId_+"]", retries_));
46 setDelay(global_.getProperty().getLongProperty("dispatch/callback/delay["+nodeId_+"]", delay_));
47 useForSubjectQueue(global_.getProperty().getBoolProperty("dispatch/callback/useForSubjectQueue["+nodeId_+"]", useForSubjectQueue_));
48 setOneway(global_.getProperty().getBoolProperty("dispatch/callback/oneway["+nodeId_+"]", oneway_));
49 setDispatcherActive(global_.getProperty().getBoolProperty("dispatch/callback/dispatcherActive["+nodeId_+"]", dispatcherActive_));
50 setCompressType(global_.getProperty().getStringProperty("dispatch/callback/compress.type["+nodeId_+"]", compressType_));
51 setMinSize(global_.getProperty().getLongProperty("dispatch/callback/compress.minSize["+nodeId_+"]", minSize_));
52 setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/callback/ptpAllowed["+nodeId_+"]", ptpAllowed_));
53 setSecretSessionId(global_.getProperty().getStringProperty("dispatch/callback/sessionId["+nodeId_+"]", sessionId_));
54 setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/callback/DispatchPlugin/defaultPlugin["+nodeId_+"]", dispatchPlugin_));
55 }
56 }
57
58
59
60 CallbackAddress::CallbackAddress(Global& global, const string& type, const string nodeId)
61 : AddressBase(global, "callback")
62 {
63 defaultRetries_ = 0;
64 defaultDelay_ = Constants::MINUTE_IN_MILLIS;
65 defaultPingInterval_ = Constants::MINUTE_IN_MILLIS;
66 ME = "CallbackAddress";
67 if (nodeId != "") nodeId_ = nodeId;
68 pingInterval_ = defaultPingInterval_;
69 retries_ = defaultRetries_;
70 delay_ = defaultDelay_;
71 initialize();
72 if (type != "") setType(type);
73 }
74
75 CallbackAddress::CallbackAddress(const AddressBase& addr) : AddressBase(addr)
76 {
77 }
78
79 CallbackAddress& CallbackAddress::operator =(const AddressBase& addr)
80 {
81 AddressBase::copy(addr);
82 return *this;
83 }
84
85 /**
86 * Shall this address be used for subject queue messages?
87 * @return false if address is for session queue only
88 */
89 bool CallbackAddress::useForSubjectQueue()
90 {
91 return useForSubjectQueue_;
92 }
93
94 /**
95 * Shall this address be used for subject queue messages?
96 * @param useForSubjectQueue false if address is for session queue only
97 */
98 void CallbackAddress::useForSubjectQueue(bool useForSubjectQueue)
99 {
100 useForSubjectQueue_ = useForSubjectQueue;
101 }
102
103 string CallbackAddress::toString()
104 {
105 return getRawAddress();
106 }
107
108 /**
109 * Get a usage string for the server side supported callback connection parameters
110 */
111 string CallbackAddress::usage()
112 {
113 string text;
114 text += string("Control xmlBlaster server side callback (if we install a local callback server):\n");
115 text += string(" -dispatch/callback/sessionId []\n");
116 text += string(" The session ID which is passed to our callback server update() method.\n");
117 text += string(" -dispatch/callback/burstMode/collectTime [") + lexical_cast<std::string>(DEFAULT_collectTime) + string("]\n");
118 text += string(" Number of milliseconds xmlBlaster shall collect callback messages.\n");
119 text += string(" The burst mode allows performance tuning, try set it to 200.\n");
120 text += string(" -dispatch/callback/burstMode/maxEntries [" + lexical_cast<std::string>(DEFAULT_burstModeMaxEntries) + "]\n");
121 text += string(" The maximum number of callback queue entries to send in a bulk.\n");
122 text += string(" -1L takes all entries of highest priority available in the callback ram queue in a bulk.\n");
123 text += string(" -dispatch/callback/burstMode/maxBytes [" + lexical_cast<std::string>(DEFAULT_burstModeMaxBytes) + "]\n");
124 text += string(" The maximum bulk size of callback messages.\n");
125 text += string(" -1L takes all entries of highest priority available in the callback ram queue in a bulk.\n");
126
127 text += string(" -dispatch/callback/oneway [") + lexical_cast<std::string>(DEFAULT_oneway) + string("]\n");
128 text += string(" Shall the update() messages be send oneway (no application level ACK).\n");
129
130 text += string(" -dispatch/callback/dispatcherActive [") + lexical_cast<string>(DEFAULT_dispatcherActive) + string("]\n");
131 text += string(" If false inhibit delivery of callback messages.\n");
132
133 text += string(" -dispatch/callback/pingInterval [") + lexical_cast<std::string>(defaultPingInterval_) + string("]\n");
134 text += string(" Pinging every given milliseconds.\n");
135 text += string(" -dispatch/callback/retries [") + lexical_cast<std::string>(defaultRetries_) + string("]\n");
136 text += string(" How often to retry if callback fails.\n");
137 text += string(" -1 forever, 0 no retry, > 0 number of retries.\n");
138 text += string(" -dispatch/callback/delay [") + lexical_cast<std::string>(defaultDelay_) + string("]\n");
139 text += string(" Delay between callback retries in millisecond.\n");
140 //text += string(" -dispatch/callback/compress.type With which format message be compressed on callback [") + DEFAULT_compressType + string("]\n");
141 //text += string(" -dispatch/callback/compress.minSize Messages bigger this size in bytes are compressed [") + lexical_cast<std::string>(DEFAULT_minSize) + string("]\n");
142
143 //text += string(" -cb.ptpAllowed PtP messages wanted? false prevents spamming [") + lexical_cast<std::string>(DEFAULT_ptpAllowed) + string("]\n");
144 //text += " -cb.DispatchPlugin/defaultPlugin Specify your specific dispatcher plugin [" + CallbackAddress.DEFAULT_dispatchPlugin + "]\n";
145 return text;
146 }
147
148 }}}}} // namespace
149
150 #ifdef _XMLBLASTER_CLASSTEST
151
152 using namespace std;
153 using namespace org::xmlBlaster::util::qos::address;
154
155 /** For testing: java org.xmlBlaster.authentication.plugins.simple.SecurityQos */
156 int main(int args, char* argv[])
157 {
158 try {
159 {
160 Global& glob = Global::getInstance();
161 glob.initialize(args, argv);
162
163 CallbackAddress a(glob);
164 a.setType("SOCKET");
165 a.setAddress("127.0.0.1:7600");
166 a.setCollectTime(12345L);
167 a.setPingInterval(54321L);
168 a.setRetries(17);
169 a.setDelay(7890L);
170 a.setOneway(true);
171 a.setSecretSessionId("0x4546hwi89");
172 cout << a.toXml() << endl;
173 }
174 {
175 string nodeId = "heron";
176
177 int nmax = 8;
178 const char** argc = new const char*[nmax];
179 string help = string("-dispatch/callback/sessionId[") +nodeId + string("]");
180 argc[0] = help.c_str();
181 argc[1] = "OK";
182 argc[2] = "dispatch/callback/sessionId";
183 argc[3] = "ERROR";
184 argc[4] = "-cb.pingInterval";
185 argc[5] = "8888";
186 help = string("-cb.delay[") +nodeId + string("]");
187 argc[6] = help.c_str();
188 argc[7] = "8888";
189
190 Global& glob = Global::getInstance();
191 glob.initialize(nmax, argc);
192 CallbackAddress a(glob, "RMI", nodeId);
193 cout << a.toXml() << endl;
194 }
195 }
196 catch(...) {
197 cout << "Exception in main method" << endl;
198 }
199 }
200
201 #endif
syntax highlighted by Code2HTML, v. 0.9.1