1 /*------------------------------------------------------------------------------
2 Name: CorbaDriver.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: The client driver for the corba protocol
6 ------------------------------------------------------------------------------*/
7 #include <client/protocol/corba/CorbaDriver.h>
8 #include <util/ErrorCode.h>
9 #include <util/XmlBlasterException.h>
10 #include <util/Global.h>
11 #include <util/lexical_cast.h>
12
13 namespace org {
14 namespace xmlBlaster {
15 namespace client {
16 namespace protocol {
17 namespace corba {
18
19 using namespace std;
20 using namespace org::xmlBlaster::util;
21 using namespace org::xmlBlaster::util::qos;
22 using namespace org::xmlBlaster::util::thread;
23 using namespace org::xmlBlaster::client::protocol;
24 using namespace org::xmlBlaster::client::qos;
25 using namespace org::xmlBlaster::client::key;
26
27 void CorbaDriver::freeResources(bool deleteConnection, bool deleteCallback)
28 {
29 if (deleteConnection) {
30 delete connection_;
31 connection_ = NULL;
32 }
33 if (deleteCallback) {
34 delete defaultCallback_;
35 defaultCallback_ = NULL;
36 }
37 }
38
39 #define _COMM_TRY try {
40
41
42 #define _COMM_CATCH(methodName, deleteConnection, deleteCallback) \
43 } \
44 catch(serverIdl::XmlBlasterException &ex) { \
45 freeResources(deleteConnection, deleteCallback); \
46 throw convertFromCorbaException(ex); \
47 } \
48 catch(const CosNaming::NamingContext::CannotProceed &ex) { \
49 freeResources(deleteConnection, deleteCallback); \
50 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \
51 "unknown node", ME + string(methodName), "en", \
52 global_.getVersion() + " " + global_.getBuildTimestamp(),\
53 "", "", to_string(ex)); \
54 } \
55 catch(const CosNaming::NamingContext::InvalidName &ex) { \
56 freeResources(deleteConnection, deleteCallback); \
57 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \
58 "unknown node", ME + string(methodName), "en", \
59 global_.getVersion() + " " + global_.getBuildTimestamp(),\
60 "", "", to_string(ex)); \
61 } \
62 catch(const CosNaming::NamingContext::AlreadyBound &ex) { \
63 freeResources(deleteConnection, deleteCallback); \
64 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \
65 "unknown node", ME + string(methodName), "en", \
66 global_.getVersion() + " " + global_.getBuildTimestamp(),\
67 "", "", to_string(ex)); \
68 } \
69 catch(const CosNaming::NamingContext::NotEmpty &ex) { \
70 freeResources(deleteConnection, deleteCallback); \
71 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \
72 "unknown node", ME + string(methodName), "en", \
73 global_.getVersion() + " " + global_.getBuildTimestamp(),\
74 "", "", to_string(ex)); \
75 } \
76 catch(const CosNaming::NamingContext::NotFound &ex) { \
77 freeResources(deleteConnection, deleteCallback); \
78 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \
79 "unknown node", ME + string(methodName), "en", \
80 global_.getVersion() + " " + global_.getBuildTimestamp(),\
81 "", "", to_string(ex)); \
82 } \
83 catch(const CORBA::Exception &ex) { \
84 freeResources(deleteConnection, deleteCallback); \
85 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \
86 "unknown node", ME + string(methodName), "en", \
87 global_.getVersion() + " " + global_.getBuildTimestamp(),\
88 "", "", to_string(ex)); \
89 } \
90 catch(const XmlBlasterException &ex) { \
91 freeResources(deleteConnection, deleteCallback); \
92 throw ex; \
93 } \
94 catch(const XmlBlasterException *ex) { \
95 freeResources(deleteConnection, deleteCallback); \
96 throw ex; \
97 } \
98 catch(const exception &ex) { \
99 freeResources(deleteConnection, deleteCallback); \
100 throw XmlBlasterException(INTERNAL_UNKNOWN, \
101 "unknown node", ME + string(methodName), "en", \
102 global_.getVersion() + " " + global_.getBuildTimestamp(),\
103 "", "", \
104 string("type='exception', msg='") + ex.what() + "'"); \
105 } \
106 catch(const string &ex) { \
107 freeResources(deleteConnection, deleteCallback); \
108 throw XmlBlasterException(INTERNAL_UNKNOWN, \
109 "unknown node", ME + string(methodName), "en", \
110 global_.getVersion() + " " + global_.getBuildTimestamp(),\
111 "", "", \
112 string("type='string', msg='") + ex + "'"); \
113 } \
114 catch(const char *ex) { \
115 freeResources(deleteConnection, deleteCallback); \
116 throw XmlBlasterException(INTERNAL_UNKNOWN, \
117 "unknown node", ME + string(methodName), "en", \
118 global_.getVersion() + " " + global_.getBuildTimestamp(),\
119 "", "", \
120 string("type='char*', msg='") + ex + "'"); \
121 } \
122 catch(int ex) { \
123 freeResources(deleteConnection, deleteCallback); \
124 throw XmlBlasterException(INTERNAL_UNKNOWN, \
125 "unknown node", ME + string(methodName), "en", \
126 global_.getVersion() + " " + global_.getBuildTimestamp(),\
127 "", "", \
128 string("type='int', msg='") + lexical_cast<std::string>(ex) + "'"); \
129 } \
130 catch (...) { \
131 freeResources(deleteConnection, deleteCallback); \
132 throw XmlBlasterException(INTERNAL_UNKNOWN, \
133 "unknown node", ME + string(methodName), "en", \
134 global_.getVersion() + " " + global_.getBuildTimestamp()); \
135 }
136
137 /*
138
139 static bool dummy;
140
141 CorbaDriver::CorbaDriver()
142 : doRun_(dummy),
143 isRunning_(dummy),
144 mutex_(),
145 count_(0),
146 ME("CorbaDriver"),
147 global_(Global::getInstance()),
148 log_(global_.getLog("org.xmlBlaster.client.protocol.corba")),
149 statusQosFactory_(global_),
150 {
151 connection_ = NULL;
152 defaultCallback_ = NULL;
153 _COMM_TRY
154 connection_ = new CorbaConnection(global_, false);
155 _COMM_CATCH("::Constructor", true, false)
156 }
157 */
158
159 CorbaDriver::CorbaDriver(const CorbaDriver& corbaDriver)
160 : mutex_(corbaDriver.mutex_),
161 ME("CorbaDriver"),
162 global_(corbaDriver.global_),
163 log_(corbaDriver.log_),
164 statusQosFactory_(corbaDriver.global_),
165 orbIsThreadSafe_(ORB_IS_THREAD_SAFE)
166 {
167 // no instantiation of these since this should never be invoked (just to make it private)
168 connection_ = NULL;
169 defaultCallback_ = NULL;
170 if (log_.call()) log_.call("CorbaDriver", string("Constructor orbIsThreadSafe_=") + lexical_cast<std::string>(orbIsThreadSafe_));
171 }
172
173 CorbaDriver& CorbaDriver::operator =(const CorbaDriver& /*corbaDriver*/)
174 {
175 return *this;
176 }
177
178
179 CorbaDriver::CorbaDriver(Global& global, Mutex& mutex, const string instanceName, CORBA::ORB_ptr orb)
180 : mutex_(mutex),
181 ME(string("CorbaDriver-") + instanceName),
182 global_(global),
183 log_(global.getLog("org.xmlBlaster.client.protocol.corba")),
184 statusQosFactory_(global),
185 orbIsThreadSafe_(ORB_IS_THREAD_SAFE)
186 {
187 connection_ = NULL;
188 defaultCallback_ = NULL;
189
190 if (log_.call()) log_.call("CorbaDriver", string("getInstance for ") + instanceName +
191 " orbIsThreadSafe_=" + lexical_cast<std::string>(orbIsThreadSafe_));
192
193 _COMM_TRY
194 connection_ = new CorbaConnection(global_, orb);
195 _COMM_CATCH("::Constructor", true, false)
196 }
197
198 CorbaDriver::~CorbaDriver()
199 {
200 if (log_.call()) log_.call(ME, "~CorbaDriver()");
201 try {
202 // delete defaultCallback_; // Is a memory leak, but we need to track down the valgrind issue first
203 delete connection_;
204 }
205 catch (...) {
206 }
207 }
208
209 void CorbaDriver::initialize(const string& name, I_Callback &client)
210 {
211 Lock lock(mutex_, orbIsThreadSafe_);
212 _COMM_TRY
213 // if (defaultCallback_ != NULL) delete defaultCallback_;
214 defaultCallback_ = NULL;
215 defaultCallback_ = new DefaultCallback(global_, name, &client, 0);
216 // if (connection_ != NULL) delete connection_;
217 // connection_ = NULL;
218 if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
219 connection_->createCallbackServer(defaultCallback_);
220 if (log_.trace()) log_.trace(ME, "After createCallbackServer");
221 _COMM_CATCH("::initialize", true, true)
222 }
223
224 string CorbaDriver::getCbProtocol()
225 {
226 return Constants::IOR; // "IOR";
227 }
228
229 string CorbaDriver::getCbAddress()
230 {
231 _COMM_TRY
232 return connection_->getCbAddress();
233 _COMM_CATCH("::getCbAddress", false, false)
234 }
235
236 bool CorbaDriver::shutdownCb()
237 {
238 _COMM_TRY
239 return connection_->shutdownCb();
240 _COMM_CATCH("::shutdownCb", false, false)
241 }
242
243 ConnectReturnQosRef CorbaDriver::connect(const ConnectQosRef& qos)
244 {
245 Lock lock(mutex_, orbIsThreadSafe_);
246 _COMM_TRY
247 return connection_->connect(qos);
248 _COMM_CATCH("::connect", false, false)
249 }
250
251 bool CorbaDriver::disconnect(const DisconnectQos& qos)
252 {
253 Lock lock(mutex_, orbIsThreadSafe_);
254 _COMM_TRY
255 return connection_->disconnect(qos.toXml());
256 _COMM_CATCH("::disconnect", false, false)
257 }
258
259 string CorbaDriver::getProtocol()
260 {
261 return Constants::IOR; // "IOR";
262 }
263
264 /*
265 string CorbaDriver::loginRaw()
266 {
267 _COMM_TRY
268 connection_->loginRaw();
269 return getLoginName();
270 _COMM_CATCH("::loginRaw", false, false)
271 }
272 */
273
274 bool CorbaDriver::shutdown()
275 {
276 Lock lock(mutex_, orbIsThreadSafe_);
277 _COMM_TRY
278 return connection_->shutdown();
279 _COMM_CATCH("::shutdown", false, false)
280 }
281
282 string CorbaDriver::getLoginName()
283 {
284 _COMM_TRY
285 return connection_->getLoginName();
286 _COMM_CATCH("::getLoginName", false, false)
287 }
288
289 bool CorbaDriver::isLoggedIn()
290 {
291 _COMM_TRY
292 return connection_->isLoggedIn();
293 _COMM_CATCH("::isLoggedIn", false, false)
294 }
295
296 string CorbaDriver::ping(const string& qos)
297 {
298 Lock lock(mutex_, orbIsThreadSafe_);
299 _COMM_TRY
300 return connection_->ping(qos);
301 _COMM_CATCH("::ping", false, false)
302 }
303
304 SubscribeReturnQos CorbaDriver::subscribe(const SubscribeKey& key, const SubscribeQos& qos)
305 {
306 Lock lock(mutex_, orbIsThreadSafe_);
307 _COMM_TRY
308 string ret = connection_->subscribe(key.toXml(), qos.toXml());
309 return SubscribeReturnQos(global_, statusQosFactory_.readObject(ret));
310 _COMM_CATCH("::subscribe", false, false)
311 }
312
313 vector<MessageUnit> CorbaDriver::get(const GetKey& key, const GetQos& qos)
314 {
315 Lock lock(mutex_, orbIsThreadSafe_);
316 _COMM_TRY
317 return connection_->get(key.toXml(), qos.toXml());
318 _COMM_CATCH("::get", false, false)
319 }
320
321 vector<UnSubscribeReturnQos>
322 CorbaDriver::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
323 {
324 Lock lock(mutex_, orbIsThreadSafe_);
325 _COMM_TRY
326 vector<std::string> tmp = connection_->unSubscribe(key.toXml(), qos.toXml());
327 vector<std::string>::const_iterator iter = tmp.begin();
328 vector<UnSubscribeReturnQos> ret;
329 while (iter != tmp.end()) {
330 ret.insert(ret.end(), UnSubscribeReturnQos(global_, statusQosFactory_.readObject(*iter)));
331 iter++;
332 }
333 return ret;
334 _COMM_CATCH("::unSubscribe", false, false)
335 }
336
337 PublishReturnQos CorbaDriver::publish(const MessageUnit& msgUnit)
338 {
339 Lock lock(mutex_, orbIsThreadSafe_);
340 _COMM_TRY
341 if (log_.call()) log_.call(ME, "publish");
342 string ret = connection_->publish(msgUnit);
343 if (log_.trace()) log_.trace(ME, "successfully published");
344 return PublishReturnQos(global_, statusQosFactory_.readObject(ret));
345 _COMM_CATCH("::publish", false, false)
346 }
347
348 void CorbaDriver::publishOneway(const vector<MessageUnit> &msgUnitArr)
349 {
350 Lock lock(mutex_, orbIsThreadSafe_);
351 _COMM_TRY
352 connection_->publishOneway(msgUnitArr);
353 _COMM_CATCH("::publishOneway", false, false)
354 }
355
356 vector<PublishReturnQos> CorbaDriver::publishArr(const vector<MessageUnit> &msgUnitArr)
357 {
358 Lock lock(mutex_, orbIsThreadSafe_);
359 _COMM_TRY
360 vector<std::string> tmp = connection_->publishArr(msgUnitArr);
361 vector<std::string>::const_iterator iter = tmp.begin();
362 vector<PublishReturnQos> ret;
363 while (iter != tmp.end()) {
364 ret.insert(ret.end(), PublishReturnQos(global_, statusQosFactory_.readObject(*iter)) );
365 iter++;
366 }
367 return ret;
368 _COMM_CATCH("::publishArr", false, false)
369 }
370
371 vector<EraseReturnQos> CorbaDriver::erase(const EraseKey& key, const EraseQos& qos)
372 {
373 _COMM_TRY
374 Lock lock(mutex_, orbIsThreadSafe_);
375 vector<std::string> tmp = connection_->erase(key.toXml(), qos.toXml());
376 vector<std::string>::const_iterator iter = tmp.begin();
377 vector<EraseReturnQos> ret;
378 while (iter != tmp.end()) {
379 ret.insert(ret.end(), EraseReturnQos(global_, statusQosFactory_.readObject(*iter)) );
380 iter++;
381 }
382 return ret;
383 _COMM_CATCH("::erase", false, false)
384 }
385
386 I_ProgressListener* CorbaDriver::registerProgressListener(I_ProgressListener *) {
387 log_.warn("CorbaDriver", "registerProgressListener() is not implemented, we ignore the provided listener.");
388 return 0;
389 }
390
391 std::string CorbaDriver::usage()
392 {
393 return CorbaConnection::usage();
394 }
395
396
397 // Exception conversion ....
398 org::xmlBlaster::util::XmlBlasterException
399 CorbaDriver::convertFromCorbaException(const serverIdl::XmlBlasterException& ex)
400 {
401 string tmp = "";
402 return org::xmlBlaster::util::XmlBlasterException(ex.errorCodeStr.in()==0?tmp:string(ex.errorCodeStr),
403 ex.node.in()==0?tmp:string(ex.node),
404 ex.location.in()==0?tmp:string(ex.location),
405 ex.lang.in()==0?tmp:string(ex.lang),
406 ex.message.in()==0?tmp:string(ex.message),
407 ex.versionInfo.in()==0?tmp:string(ex.versionInfo),
408 ex.timestampStr.in()==0?tmp:string(ex.timestampStr),
409 ex.stackTrace.in()==0?tmp:string(ex.stackTrace),
410 ex.embeddedMessage.in()==0?tmp:string(ex.embeddedMessage),
411 ex.transactionInfo.in()==0?tmp:string(ex.transactionInfo));
412 }
413
414 serverIdl::XmlBlasterException
415 CorbaDriver::convertToCorbaException(org::xmlBlaster::util::XmlBlasterException& ex)
416 {
417 return serverIdl::XmlBlasterException(ex.getErrorCodeStr().c_str(),
418 ex.getNode().c_str(),
419 ex.getLocation().c_str(),
420 ex.getLang().c_str(),
421 ex.getMessage().c_str(),
422 ex.getVersionInfo().c_str(),
423 ex.getTimestamp().c_str(),
424 ex.getStackTraceStr().c_str(),
425 ex.getEmbeddedMessage().c_str(),
426 ex.getTransactionInfo().c_str(), "");
427 }
428
429 }}}}} // namespaces
syntax highlighted by Code2HTML, v. 0.9.1