00001 /*------------------------------------------------------------------------------ 00002 Name: CorbaDriver.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: The client driver for the corba protocol 00006 ------------------------------------------------------------------------------*/ 00007 #include <client/protocol/corba/CorbaDriver.h> 00008 #include <util/ErrorCode.h> 00009 #include <util/XmlBlasterException.h> 00010 #include <util/Global.h> 00011 #include <util/lexical_cast.h> 00012 00013 namespace org { 00014 namespace xmlBlaster { 00015 namespace client { 00016 namespace protocol { 00017 namespace corba { 00018 00019 using namespace std; 00020 using namespace org::xmlBlaster::util; 00021 using namespace org::xmlBlaster::util::qos; 00022 using namespace org::xmlBlaster::util::thread; 00023 using namespace org::xmlBlaster::client::protocol; 00024 using namespace org::xmlBlaster::client::qos; 00025 using namespace org::xmlBlaster::client::key; 00026 00027 void CorbaDriver::freeResources(bool deleteConnection, bool deleteCallback) 00028 { 00029 if (deleteConnection) { 00030 delete connection_; 00031 connection_ = NULL; 00032 } 00033 if (deleteCallback) { 00034 delete defaultCallback_; 00035 defaultCallback_ = NULL; 00036 } 00037 } 00038 00039 #define _COMM_TRY try { 00040 00041 00042 #define _COMM_CATCH(methodName, deleteConnection, deleteCallback) \ 00043 } \ 00044 catch(serverIdl::XmlBlasterException &ex) { \ 00045 freeResources(deleteConnection, deleteCallback); \ 00046 throw convertFromCorbaException(ex); \ 00047 } \ 00048 catch(const CosNaming::NamingContext::CannotProceed &ex) { \ 00049 freeResources(deleteConnection, deleteCallback); \ 00050 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \ 00051 "unknown node", ME + string(methodName), "en", \ 00052 global_.getVersion() + " " + global_.getBuildTimestamp(),\ 00053 "", "", to_string(ex)); \ 00054 } \ 00055 catch(const CosNaming::NamingContext::InvalidName &ex) { \ 00056 freeResources(deleteConnection, deleteCallback); \ 00057 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \ 00058 "unknown node", ME + string(methodName), "en", \ 00059 global_.getVersion() + " " + global_.getBuildTimestamp(),\ 00060 "", "", to_string(ex)); \ 00061 } \ 00062 catch(const CosNaming::NamingContext::AlreadyBound &ex) { \ 00063 freeResources(deleteConnection, deleteCallback); \ 00064 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \ 00065 "unknown node", ME + string(methodName), "en", \ 00066 global_.getVersion() + " " + global_.getBuildTimestamp(),\ 00067 "", "", to_string(ex)); \ 00068 } \ 00069 catch(const CosNaming::NamingContext::NotEmpty &ex) { \ 00070 freeResources(deleteConnection, deleteCallback); \ 00071 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \ 00072 "unknown node", ME + string(methodName), "en", \ 00073 global_.getVersion() + " " + global_.getBuildTimestamp(),\ 00074 "", "", to_string(ex)); \ 00075 } \ 00076 catch(const CosNaming::NamingContext::NotFound &ex) { \ 00077 freeResources(deleteConnection, deleteCallback); \ 00078 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \ 00079 "unknown node", ME + string(methodName), "en", \ 00080 global_.getVersion() + " " + global_.getBuildTimestamp(),\ 00081 "", "", to_string(ex)); \ 00082 } \ 00083 catch(const CORBA::Exception &ex) { \ 00084 freeResources(deleteConnection, deleteCallback); \ 00085 throw XmlBlasterException(COMMUNICATION_NOCONNECTION, \ 00086 "unknown node", ME + string(methodName), "en", \ 00087 global_.getVersion() + " " + global_.getBuildTimestamp(),\ 00088 "", "", to_string(ex)); \ 00089 } \ 00090 catch(const XmlBlasterException &ex) { \ 00091 freeResources(deleteConnection, deleteCallback); \ 00092 throw ex; \ 00093 } \ 00094 catch(const XmlBlasterException *ex) { \ 00095 freeResources(deleteConnection, deleteCallback); \ 00096 throw ex; \ 00097 } \ 00098 catch(const exception &ex) { \ 00099 freeResources(deleteConnection, deleteCallback); \ 00100 throw XmlBlasterException(INTERNAL_UNKNOWN, \ 00101 "unknown node", ME + string(methodName), "en", \ 00102 global_.getVersion() + " " + global_.getBuildTimestamp(),\ 00103 "", "", \ 00104 string("type='exception', msg='") + ex.what() + "'"); \ 00105 } \ 00106 catch(const string &ex) { \ 00107 freeResources(deleteConnection, deleteCallback); \ 00108 throw XmlBlasterException(INTERNAL_UNKNOWN, \ 00109 "unknown node", ME + string(methodName), "en", \ 00110 global_.getVersion() + " " + global_.getBuildTimestamp(),\ 00111 "", "", \ 00112 string("type='string', msg='") + ex + "'"); \ 00113 } \ 00114 catch(const char *ex) { \ 00115 freeResources(deleteConnection, deleteCallback); \ 00116 throw XmlBlasterException(INTERNAL_UNKNOWN, \ 00117 "unknown node", ME + string(methodName), "en", \ 00118 global_.getVersion() + " " + global_.getBuildTimestamp(),\ 00119 "", "", \ 00120 string("type='char*', msg='") + ex + "'"); \ 00121 } \ 00122 catch(int ex) { \ 00123 freeResources(deleteConnection, deleteCallback); \ 00124 throw XmlBlasterException(INTERNAL_UNKNOWN, \ 00125 "unknown node", ME + string(methodName), "en", \ 00126 global_.getVersion() + " " + global_.getBuildTimestamp(),\ 00127 "", "", \ 00128 string("type='int', msg='") + lexical_cast<std::string>(ex) + "'"); \ 00129 } \ 00130 catch (...) { \ 00131 freeResources(deleteConnection, deleteCallback); \ 00132 throw XmlBlasterException(INTERNAL_UNKNOWN, \ 00133 "unknown node", ME + string(methodName), "en", \ 00134 global_.getVersion() + " " + global_.getBuildTimestamp()); \ 00135 } 00136 00137 /* 00138 00139 static bool dummy; 00140 00141 CorbaDriver::CorbaDriver() 00142 : doRun_(dummy), 00143 isRunning_(dummy), 00144 mutex_(), 00145 count_(0), 00146 ME("CorbaDriver"), 00147 global_(Global::getInstance()), 00148 log_(global_.getLog("org.xmlBlaster.client.protocol.corba")), 00149 statusQosFactory_(global_), 00150 { 00151 connection_ = NULL; 00152 defaultCallback_ = NULL; 00153 _COMM_TRY 00154 connection_ = new CorbaConnection(global_, false); 00155 _COMM_CATCH("::Constructor", true, false) 00156 } 00157 */ 00158 00159 CorbaDriver::CorbaDriver(const CorbaDriver& corbaDriver) 00160 : mutex_(corbaDriver.mutex_), 00161 ME("CorbaDriver"), 00162 global_(corbaDriver.global_), 00163 log_(corbaDriver.log_), 00164 statusQosFactory_(corbaDriver.global_), 00165 orbIsThreadSafe_(ORB_IS_THREAD_SAFE) 00166 { 00167 // no instantiation of these since this should never be invoked (just to make it private) 00168 connection_ = NULL; 00169 defaultCallback_ = NULL; 00170 if (log_.call()) log_.call("CorbaDriver", string("Constructor orbIsThreadSafe_=") + lexical_cast<std::string>(orbIsThreadSafe_)); 00171 } 00172 00173 CorbaDriver& CorbaDriver::operator =(const CorbaDriver& /*corbaDriver*/) 00174 { 00175 return *this; 00176 } 00177 00178 00179 CorbaDriver::CorbaDriver(Global& global, Mutex& mutex, const string instanceName, CORBA::ORB_ptr orb) 00180 : mutex_(mutex), 00181 ME(string("CorbaDriver-") + instanceName), 00182 global_(global), 00183 log_(global.getLog("org.xmlBlaster.client.protocol.corba")), 00184 statusQosFactory_(global), 00185 orbIsThreadSafe_(ORB_IS_THREAD_SAFE) 00186 { 00187 connection_ = NULL; 00188 defaultCallback_ = NULL; 00189 00190 if (log_.call()) log_.call("CorbaDriver", string("getInstance for ") + instanceName + 00191 " orbIsThreadSafe_=" + lexical_cast<std::string>(orbIsThreadSafe_)); 00192 00193 _COMM_TRY 00194 connection_ = new CorbaConnection(global_, orb); 00195 _COMM_CATCH("::Constructor", true, false) 00196 } 00197 00198 CorbaDriver::~CorbaDriver() 00199 { 00200 if (log_.call()) log_.call(ME, "~CorbaDriver()"); 00201 try { 00202 // delete defaultCallback_; // Is a memory leak, but we need to track down the valgrind issue first 00203 delete connection_; 00204 } 00205 catch (...) { 00206 } 00207 } 00208 00209 void CorbaDriver::initialize(const string& name, I_Callback &client) 00210 { 00211 Lock lock(mutex_, orbIsThreadSafe_); 00212 _COMM_TRY 00213 // if (defaultCallback_ != NULL) delete defaultCallback_; 00214 defaultCallback_ = NULL; 00215 defaultCallback_ = new DefaultCallback(global_, name, &client, 0); 00216 // if (connection_ != NULL) delete connection_; 00217 // connection_ = NULL; 00218 if (log_.trace()) log_.trace(ME, "Before createCallbackServer"); 00219 connection_->createCallbackServer(defaultCallback_); 00220 if (log_.trace()) log_.trace(ME, "After createCallbackServer"); 00221 _COMM_CATCH("::initialize", true, true) 00222 } 00223 00224 string CorbaDriver::getCbProtocol() 00225 { 00226 return Constants::IOR; // "IOR"; 00227 } 00228 00229 string CorbaDriver::getCbAddress() 00230 { 00231 _COMM_TRY 00232 return connection_->getCbAddress(); 00233 _COMM_CATCH("::getCbAddress", false, false) 00234 } 00235 00236 bool CorbaDriver::shutdownCb() 00237 { 00238 _COMM_TRY 00239 return connection_->shutdownCb(); 00240 _COMM_CATCH("::shutdownCb", false, false) 00241 } 00242 00243 ConnectReturnQosRef CorbaDriver::connect(const ConnectQosRef& qos) 00244 { 00245 Lock lock(mutex_, orbIsThreadSafe_); 00246 _COMM_TRY 00247 return connection_->connect(qos); 00248 _COMM_CATCH("::connect", false, false) 00249 } 00250 00251 bool CorbaDriver::disconnect(const DisconnectQos& qos) 00252 { 00253 Lock lock(mutex_, orbIsThreadSafe_); 00254 _COMM_TRY 00255 return connection_->disconnect(qos.toXml()); 00256 _COMM_CATCH("::disconnect", false, false) 00257 } 00258 00259 string CorbaDriver::getProtocol() 00260 { 00261 return Constants::IOR; // "IOR"; 00262 } 00263 00264 /* 00265 string CorbaDriver::loginRaw() 00266 { 00267 _COMM_TRY 00268 connection_->loginRaw(); 00269 return getLoginName(); 00270 _COMM_CATCH("::loginRaw", false, false) 00271 } 00272 */ 00273 00274 bool CorbaDriver::shutdown() 00275 { 00276 Lock lock(mutex_, orbIsThreadSafe_); 00277 _COMM_TRY 00278 return connection_->shutdown(); 00279 _COMM_CATCH("::shutdown", false, false) 00280 } 00281 00282 string CorbaDriver::getLoginName() 00283 { 00284 _COMM_TRY 00285 return connection_->getLoginName(); 00286 _COMM_CATCH("::getLoginName", false, false) 00287 } 00288 00289 bool CorbaDriver::isLoggedIn() 00290 { 00291 _COMM_TRY 00292 return connection_->isLoggedIn(); 00293 _COMM_CATCH("::isLoggedIn", false, false) 00294 } 00295 00296 string CorbaDriver::ping(const string& qos) 00297 { 00298 Lock lock(mutex_, orbIsThreadSafe_); 00299 _COMM_TRY 00300 return connection_->ping(qos); 00301 _COMM_CATCH("::ping", false, false) 00302 } 00303 00304 SubscribeReturnQos CorbaDriver::subscribe(const SubscribeKey& key, const SubscribeQos& qos) 00305 { 00306 Lock lock(mutex_, orbIsThreadSafe_); 00307 _COMM_TRY 00308 string ret = connection_->subscribe(key.toXml(), qos.toXml()); 00309 return SubscribeReturnQos(global_, statusQosFactory_.readObject(ret)); 00310 _COMM_CATCH("::subscribe", false, false) 00311 } 00312 00313 vector<MessageUnit> CorbaDriver::get(const GetKey& key, const GetQos& qos) 00314 { 00315 Lock lock(mutex_, orbIsThreadSafe_); 00316 _COMM_TRY 00317 return connection_->get(key.toXml(), qos.toXml()); 00318 _COMM_CATCH("::get", false, false) 00319 } 00320 00321 vector<UnSubscribeReturnQos> 00322 CorbaDriver::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos) 00323 { 00324 Lock lock(mutex_, orbIsThreadSafe_); 00325 _COMM_TRY 00326 vector<std::string> tmp = connection_->unSubscribe(key.toXml(), qos.toXml()); 00327 vector<std::string>::const_iterator iter = tmp.begin(); 00328 vector<UnSubscribeReturnQos> ret; 00329 while (iter != tmp.end()) { 00330 ret.insert(ret.end(), UnSubscribeReturnQos(global_, statusQosFactory_.readObject(*iter))); 00331 iter++; 00332 } 00333 return ret; 00334 _COMM_CATCH("::unSubscribe", false, false) 00335 } 00336 00337 PublishReturnQos CorbaDriver::publish(const MessageUnit& msgUnit) 00338 { 00339 Lock lock(mutex_, orbIsThreadSafe_); 00340 _COMM_TRY 00341 if (log_.call()) log_.call(ME, "publish"); 00342 string ret = connection_->publish(msgUnit); 00343 if (log_.trace()) log_.trace(ME, "successfully published"); 00344 return PublishReturnQos(global_, statusQosFactory_.readObject(ret)); 00345 _COMM_CATCH("::publish", false, false) 00346 } 00347 00348 void CorbaDriver::publishOneway(const vector<MessageUnit> &msgUnitArr) 00349 { 00350 Lock lock(mutex_, orbIsThreadSafe_); 00351 _COMM_TRY 00352 connection_->publishOneway(msgUnitArr); 00353 _COMM_CATCH("::publishOneway", false, false) 00354 } 00355 00356 vector<PublishReturnQos> CorbaDriver::publishArr(const vector<MessageUnit> &msgUnitArr) 00357 { 00358 Lock lock(mutex_, orbIsThreadSafe_); 00359 _COMM_TRY 00360 vector<std::string> tmp = connection_->publishArr(msgUnitArr); 00361 vector<std::string>::const_iterator iter = tmp.begin(); 00362 vector<PublishReturnQos> ret; 00363 while (iter != tmp.end()) { 00364 ret.insert(ret.end(), PublishReturnQos(global_, statusQosFactory_.readObject(*iter)) ); 00365 iter++; 00366 } 00367 return ret; 00368 _COMM_CATCH("::publishArr", false, false) 00369 } 00370 00371 vector<EraseReturnQos> CorbaDriver::erase(const EraseKey& key, const EraseQos& qos) 00372 { 00373 _COMM_TRY 00374 Lock lock(mutex_, orbIsThreadSafe_); 00375 vector<std::string> tmp = connection_->erase(key.toXml(), qos.toXml()); 00376 vector<std::string>::const_iterator iter = tmp.begin(); 00377 vector<EraseReturnQos> ret; 00378 while (iter != tmp.end()) { 00379 ret.insert(ret.end(), EraseReturnQos(global_, statusQosFactory_.readObject(*iter)) ); 00380 iter++; 00381 } 00382 return ret; 00383 _COMM_CATCH("::erase", false, false) 00384 } 00385 00386 I_ProgressListener* CorbaDriver::registerProgressListener(I_ProgressListener *) { 00387 log_.warn("CorbaDriver", "registerProgressListener() is not implemented, we ignore the provided listener."); 00388 return 0; 00389 } 00390 00391 std::string CorbaDriver::usage() 00392 { 00393 return CorbaConnection::usage(); 00394 } 00395 00396 00397 // Exception conversion .... 00398 org::xmlBlaster::util::XmlBlasterException 00399 CorbaDriver::convertFromCorbaException(const serverIdl::XmlBlasterException& ex) 00400 { 00401 string tmp = ""; 00402 return org::xmlBlaster::util::XmlBlasterException(ex.errorCodeStr.in()==0?tmp:string(ex.errorCodeStr), 00403 ex.node.in()==0?tmp:string(ex.node), 00404 ex.location.in()==0?tmp:string(ex.location), 00405 ex.lang.in()==0?tmp:string(ex.lang), 00406 ex.message.in()==0?tmp:string(ex.message), 00407 ex.versionInfo.in()==0?tmp:string(ex.versionInfo), 00408 ex.timestampStr.in()==0?tmp:string(ex.timestampStr), 00409 ex.stackTrace.in()==0?tmp:string(ex.stackTrace), 00410 ex.embeddedMessage.in()==0?tmp:string(ex.embeddedMessage), 00411 ex.transactionInfo.in()==0?tmp:string(ex.transactionInfo)); 00412 } 00413 00414 serverIdl::XmlBlasterException 00415 CorbaDriver::convertToCorbaException(org::xmlBlaster::util::XmlBlasterException& ex) 00416 { 00417 return serverIdl::XmlBlasterException(ex.getErrorCodeStr().c_str(), 00418 ex.getNode().c_str(), 00419 ex.getLocation().c_str(), 00420 ex.getLang().c_str(), 00421 ex.getMessage().c_str(), 00422 ex.getVersionInfo().c_str(), 00423 ex.getTimestamp().c_str(), 00424 ex.getStackTraceStr().c_str(), 00425 ex.getEmbeddedMessage().c_str(), 00426 ex.getTransactionInfo().c_str(), ""); 00427 } 00428 00429 }}}}} // namespaces 00430