00001 /*---------------------------------------------------------------------------- 00002 Name: CorbaConnection.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Helper to connect to xmlBlaster: for now a simplified version 00006 without caching and without failsafe mode. 00007 Author: <Michele Laghi> michele.laghi@attglobal.net 00008 -----------------------------------------------------------------------------*/ 00009 /* 00010 #ifdef _WINDOWS 00011 #pragma warning(disable:4786) 00012 #endif 00013 */ 00014 #include <client/protocol/corba/CorbaConnection.h> 00015 #include <util/Constants.h> 00016 #include <sys/types.h> 00017 #ifdef _WINDOWS 00018 # include <winsock.h> 00019 #else 00020 # if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__hpux__) 00021 # include <netinet/in.h> 00022 # include <sys/types.h> /* Needed for __FreeBSD__ */ 00023 # endif 00024 # include <sys/socket.h> 00025 # include <netdb.h> 00026 # include <arpa/inet.h> // inet_addr() 00027 # include <unistd.h> // gethostname() 00028 #endif 00029 00030 #include <util/XmlBlasterException.h> 00031 #include <util/Global.h> 00032 #include <client/protocol/corba/CorbaDriver.h> 00033 00034 void closeSocket(int fd) { 00035 #ifdef _WINDOWS 00036 closesocket(fd); 00037 #else 00038 (void)close(fd); 00039 #endif 00040 } 00041 00042 namespace org { 00043 namespace xmlBlaster { 00044 namespace client { 00045 namespace protocol { 00046 namespace corba { 00047 00048 using namespace std; 00049 using namespace org::xmlBlaster::util; 00050 using namespace org::xmlBlaster::util::qos; 00051 using namespace org::xmlBlaster::util::key; 00052 00053 CorbaConnection::CorbaConnection(Global& global, CORBA::ORB_ptr orb) 00054 : orb_(0), 00055 poa_(0), 00056 /* loginQos_(), */ 00057 connectReturnQos_((ConnectReturnQos*)0), 00058 global_(global), 00059 log_(global.getLog("org.xmlBlaster.client.protocol.corba")), 00060 msgKeyFactory_(global), 00061 msgQosFactory_(global) 00062 { 00063 //global_.getProperty().loadPropertyFile(); 00064 log_.info(me(), "Initializing CORBA ORB"); 00065 if (log_.call()) log_.call(me(), "CorbaConnection constructor ..."); 00066 // if (numOfSessions_ == 0) { 00067 if (orb) orb_ = orb; 00068 else { 00069 int args = global_.getArgs(); 00070 const char * const* argc = global_.getArgc(); 00071 orb_ = CORBA::ORB_init(args, const_cast<char **>(argc)); //, "XmlBlaster-C++-Client"); 00072 } 00073 00074 // numOfSessions_++; 00075 nameServerControl_ = 0; 00076 numLogins_ = 0; 00077 xmlBlaster_ = 0; 00078 authServer_ = 0; // initAuthenticationService(); 00079 callback_ = 0; 00080 defaultCallback_ = 0; 00081 sessionId_ = ""; 00082 xmlBlasterIOR_ = ""; 00083 } 00084 00085 00086 CorbaConnection::~CorbaConnection() 00087 { 00088 if (log_.call()) log_.call(me(), "destructor"); 00089 00090 delete nameServerControl_; 00091 00092 if (log_.trace()) log_.trace(me(), "destructor: invoking shutdown"); 00093 shutdown(); 00094 00095 if (log_.trace()) log_.trace(me(), "destructor: invoking shutdownCb"); 00096 shutdownCb(); 00097 if (log_.trace()) log_.trace(me(), "destructor: deleting the defaultCallback"); 00098 delete defaultCallback_; 00099 00100 if (log_.trace()) log_.trace(me(), "destructor: releasing the orb"); 00101 if (!CORBA::is_nil(orb_)) CORBA::release(orb_); 00102 if (log_.trace()) log_.trace(me(), "destructor: releasing the poa"); 00103 if (!CORBA::is_nil(poa_)) CORBA::release(poa_); 00104 orb_ = 0; 00105 poa_ = 0; 00106 } 00107 00108 string CorbaConnection::getAddress() const 00109 { 00110 return xmlBlasterIOR_; 00111 } 00112 00113 string CorbaConnection::getCbAddress() const 00114 { 00115 return callbackIOR_; 00116 } 00117 00118 void CorbaConnection::initNamingService() 00119 { 00120 if (log_.call()) log_.call(me(), "initNamingService() ..."); 00121 if (orb_ == 0) log_.panic(me(), "orb==null, internal problem"); 00122 if (nameServerControl_ == 0) 00123 nameServerControl_ = new NameServerControl(orb_); 00124 } 00125 00126 00127 void CorbaConnection::initAuthenticationService() 00128 { 00129 if (log_.call()) log_.call(me(), "initAuthenticationService() ..."); 00130 if (!CORBA::is_nil(authServer_)) 00131 return; 00132 00133 // 1) check if argument -IOR at program startup is given 00134 string authServerIOR = /* -dispatch/connection/plugin/ior/iorString IOR string is directly given */ 00135 global_.getProperty().getStringProperty("dispatch/callback/plugin/ior/iorString",""); 00136 if (authServerIOR != "") { 00137 CORBA::Object_var 00138 obj = orb_->string_to_object(authServerIOR.c_str()); 00139 authServer_ = authenticateIdl::AuthServer::_narrow(obj.in()); 00140 log_.info(me(),"Accessing xmlBlaster using your given IOR string"); 00141 return; 00142 } 00143 if (log_.trace()) log_.trace(me(), "No -dispatch/connection/plugin/ior/iorString ..."); 00144 00145 string authServerIORFile = 00146 global_.getProperty().getStringProperty("dispatch/connection/plugin/ior/iorFile",""); 00147 // -dispatch/connection/plugin/ior/iorFile IOR string is given through a file 00148 if (authServerIORFile != "") { 00149 ifstream in(authServerIORFile.c_str()); 00150 if ((!in) /* && (log_.PANIC) */ ) 00151 log_.panic(me(), "Could not open the file"); 00152 in >> authServerIOR; 00153 in.close(); 00154 CORBA::Object_var 00155 obj = orb_->string_to_object(authServerIOR.c_str()); 00156 authServer_ = authenticateIdl::AuthServer::_narrow(obj.in()); 00157 string msg = "Accessing xmlBlaster using your given IOR file "; 00158 msg += authServerIORFile; 00159 log_.info(me(), msg); 00160 return; 00161 } 00162 if (log_.trace()) log_.trace(me(), "No -dispatch/connection/plugin/ior/iorFile ..."); 00163 00164 // 3) Using builtin http IOR download ... 00165 { 00166 char myHostName[126]; 00167 strcpy(myHostName, "localhost"); 00168 gethostname(myHostName, 125); 00169 string iorHost = global_.getProperty().getStringProperty("bootstrapHostname",myHostName); 00170 // Port may be a name from /etc/services: "xmlBlaster 3412/tcp" 00171 string iorPortStr = global_.getProperty().getStringProperty("bootstrapPort","3412"); // default bootstrapPort=3412 (xmlblaster) 00172 if (log_.trace()) log_.trace(me(), "Trying -bootstrapHostname=" + iorHost + " and -bootstrapPort=" + iorPortStr + " ..."); 00173 struct sockaddr_in xmlBlasterAddr; 00174 memset((char *)&xmlBlasterAddr, 0, sizeof(xmlBlasterAddr)); 00175 xmlBlasterAddr.sin_family=AF_INET; 00176 struct hostent *hostP = gethostbyname(iorHost.c_str()); 00177 struct servent *portP = getservbyname(iorPortStr.c_str(), "tcp"); 00178 string authServerIOR; 00179 authServerIOR.reserve(520); 00180 if (hostP != NULL) { 00181 xmlBlasterAddr.sin_addr.s_addr = ((struct in_addr *)(hostP->h_addr))->s_addr; //inet_addr("192.168.1.2"); 00182 if (portP != NULL) 00183 xmlBlasterAddr.sin_port = portP->s_port; 00184 else 00185 xmlBlasterAddr.sin_port = htons(global_.getProperty().getIntProperty("bootstrapPort",3412)); 00186 int s = socket(AF_INET, SOCK_STREAM, 0); 00187 if (s != -1) { 00188 if (::connect(s, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr)) != -1) { 00189 string req="GET /AuthenticationService.ior HTTP/1.0\r\n \n"; 00190 int numSent = send(s, req.c_str(), req.size(), 0); 00191 if (numSent < (int)req.size()) { 00192 log_.error(me(), "Problems sending request '" + req + "'"); 00193 } 00194 else { 00195 log_.trace(me(), "Sent IOR request '" + req + "'"); 00196 } 00197 int numRead; 00198 char buf[10]; 00199 while ((numRead = recv(s, buf, 10, 0)) > 0) { 00200 authServerIOR.append(buf, numRead); 00201 } 00202 if (log_.dump()) log_.dump(me(), "Received IOR data: '" + authServerIOR + "'"); 00203 size_t pos = authServerIOR.find("IOR:"); 00204 // if (pos > 0) 00205 if (pos != authServerIOR.npos) authServerIOR = authServerIOR.substr(pos); 00206 else { 00207 throw serverIdl::XmlBlasterException("communication.noConnection", 00208 "client", me().c_str(), "en", 00209 "can't access authentication Service", "", "", "", "", 00210 "", ""); 00211 } 00212 if (log_.trace()) log_.trace(me(), "Received IOR data: '" + authServerIOR + "'"); 00213 } 00214 else { 00215 log_.warn(me(), "Connecting to -bootstrapHostname=" + iorHost + " failed"); // errno 00216 } 00217 ::shutdown(s, 2); // SHUT_RDWR 00218 ::closeSocket(s); // Added because of handle leak reported by James Cazier 00219 } 00220 } 00221 if (!authServerIOR.empty()) { 00222 CORBA::Object_var obj = orb_->string_to_object(authServerIOR.c_str()); 00223 if (!CORBA::is_nil(obj.in())) { 00224 if (!CORBA::is_nil(authServer_)) { 00225 CORBA::release(authServer_); 00226 authServer_ = 0; 00227 } 00228 authServer_ = authenticateIdl::AuthServer::_narrow(obj.in()); 00229 string msg = "Accessing xmlBlaster using -bootstrapHostname "+iorHost; 00230 log_.info(me(), msg); 00231 return; 00232 } 00233 } 00234 } 00235 if (log_.trace()) log_.trace(me(), "No -bootstrapHostname and -bootstrapPort ..."); 00236 00237 00238 // 4) asking Name Service CORBA compliant 00239 bool useNameService=global_.getProperty().getBoolProperty("dispatch/connection/plugin/ior/useNameService",true); 00240 // -dispatch/connection/plugin/ior/useNameService default is to ask the naming service 00241 00242 string text = "Can't access xmlBlaster Authentication Service"; 00243 text += ", is the server running and ready?\n - try to specify "; 00244 text += "'-dispatch/connection/plugin/ior/iorFile <fileName>' if server is running on same host\n"; 00245 text += " - try to specify '-bootstrapHostname <hostName> -bootstrapPort 3412' to "; 00246 text += "locate xmlBlaster\n - or contact your "; 00247 text += "system administrator to start a naming service"; 00248 00249 if (useNameService) { 00250 try { 00251 if (!nameServerControl_) initNamingService(); 00252 00253 string contextId = global_.getProperty().getStringProperty("NameService.context.id", "xmlBlaster"); 00254 string contextKind = global_.getProperty().getStringProperty("NameService.context.kind", "MOM"); 00255 string clusterId = global_.getProperty().getStringProperty("NameService.node.id", global_.getStrippedId()); 00256 string clusterKind = global_.getProperty().getStringProperty("NameService.node.kind", "MOM"); 00257 00258 CORBA::Object_var obj = nameServerControl_->resolve(contextId, contextKind); 00259 CosNaming::NamingContext_var relativeContext_obj = CosNaming::NamingContext::_narrow(obj.in()); 00260 NameServerControl relativeContext(relativeContext_obj); 00261 log_.info(me(), "Retrieved NameService context " + contextId + "." + contextKind); 00262 00263 authenticateIdl::AuthServer_var authServerFirst; 00264 string tmpId = ""; // for logging only 00265 string tmpServerName = ""; // for logging only 00266 string firstServerName = ""; // for logging only 00267 int countServerFound = 0; // for logging only 00268 string serverNameList = ""; // for logging only 00269 try { 00270 authServer_ = authenticateIdl::AuthServer::_narrow(relativeContext.resolve(clusterId, clusterKind)); 00271 } 00272 catch (XmlBlasterException ex) { 00273 log_.info(me(), "Narrow AuthServer failed: " + ex.toString()); 00274 } 00275 00276 /*============================ 00277 TestGet -ORBInitRef NameService=`cat /tmp/ns.ior` -trace true -call true 00278 =============================*/ 00279 00280 if ( CORBA::is_nil(authServer_) ) { 00281 if (log_.trace()) log_.trace(me(), "Query NameServer to find a suitable xmlBlaster server for '" + 00282 NameServerControl::getString(contextId, contextKind)+"/"+NameServerControl::getString(clusterId, clusterKind) + 00283 "' failed, is nil"); 00284 CosNaming::BindingList_var bl; 00285 CosNaming::BindingIterator_var bi; 00286 CosNaming::NamingContext_var tmp = relativeContext.getNamingService(); 00287 tmp->list(0, bl, bi); 00288 00289 // process the remaining bindings if an iterator exists: 00290 if (CORBA::is_nil(authServer_) && !CORBA::is_nil(bi.in())) { 00291 int i = 0; 00292 CORBA::Boolean more; 00293 do { 00294 more = bi->next_n(1, bl); 00295 if (bl->length() != 1) { 00296 if (log_.trace()) log_.trace(me(), "NameService entry id is nil"); 00297 break; 00298 } 00299 CORBA::ULong index = 0; 00300 string id = lexical_cast<std::string>(bl[index].binding_name[0].id); 00301 string kind = lexical_cast<std::string>(bl[index].binding_name[0].kind); 00302 if (log_.trace()) log_.trace(me(), "id=" + id + " kind=" + kind); 00303 00304 tmpId = id; 00305 countServerFound++; 00306 tmpServerName = NameServerControl::getString(contextId, contextKind)+"/"+NameServerControl::getString(id, kind); 00307 if (i>0) serverNameList += ", "; 00308 i++; 00309 serverNameList += tmpServerName; 00310 00311 if (clusterId == id && clusterKind == kind) { 00312 try { 00313 if (log_.trace()) log_.trace(me(), "Trying to resolve NameService entry '"+NameServerControl::getString(id, kind)+"'"); 00314 authServer_ = authenticateIdl::AuthServer::_narrow(relativeContext.resolve(id, kind)); 00315 if (! CORBA::is_nil(authServer_)) 00316 break; // found a matching server 00317 else 00318 log_.warn(me(), "Connecting to NameService entry '"+tmpServerName+"' failed, is_nil"); 00319 } 00320 catch (const CORBA::Exception &exc) { 00321 log_.warn(me(), "Connecting to NameService entry '"+tmpServerName+"' failed: " + to_string(exc)); 00322 } 00323 } 00324 00325 if (CORBA::is_nil(authServerFirst.in())) { 00326 if (log_.trace()) log_.trace(me(), "Remember the first server"); 00327 try { 00328 firstServerName = tmpServerName; 00329 if (log_.trace()) log_.trace(me(), "Remember the first reachable xmlBlaster server from NameService entry '"+firstServerName+"'"); 00330 authServerFirst = authenticateIdl::AuthServer::_narrow(relativeContext.resolve(id, kind)); 00331 } 00332 catch (const CORBA::Exception &exc) { 00333 log_.warn(me(), "Connecting to NameService entry '"+tmpServerName+"' failed: " + to_string(exc)); 00334 } 00335 } 00336 } while ( more ); 00337 } 00338 bi->destroy(); // Clean up server side iteration resources 00339 } 00340 00341 if (CORBA::is_nil(authServer_)) { 00342 if (!CORBA::is_nil(authServerFirst.in())) { 00343 if (countServerFound > 1) { 00344 string str = string("Can't choose one of ") + lexical_cast<std::string>(countServerFound) + 00345 " avalailable server in CORBA NameService: " + serverNameList + 00346 ". Please choose one with e.g. -NameService.node.id " + tmpId; 00347 log_.warn(me(), str); 00348 throw XmlBlasterException("communication.noConnection", "client", me(), "en", str); 00349 } 00350 log_.info(me(), "Choosing only available server '" + firstServerName + "' in CORBA NameService"); 00351 this->authServer_ = authenticateIdl::AuthServer::_duplicate(authServerFirst.in()); 00352 return; 00353 } 00354 else { 00355 log_.trace(me(), "No usable xmlBlaster server found in NameService: " + serverNameList); 00356 throw XmlBlasterException("communication.noConnection", "client", me(), "en", text); 00357 } 00358 } 00359 00360 log_.info(me(), "Accessing xmlBlaster using CORBA naming service entry '" + 00361 NameServerControl::getString(contextId, contextKind) + 00362 "/" + NameServerControl::getString(clusterId, clusterKind)); 00363 00364 return; 00365 } 00366 catch(serverIdl::XmlBlasterException &e ) { 00367 log_.trace(me() + ".NoAuthService", text); 00368 throw CorbaDriver::convertFromCorbaException(e); 00369 } 00370 } // if (useNameService) 00371 00372 if (log_.trace()) log_.trace(me(), "No -dispatch/connection/plugin/ior/useNameService ..."); 00373 throw XmlBlasterException("communication.noConnection", "client", me(), "en", text); 00374 } // initAuthenticationService() 00375 00376 void CorbaConnection::createCallbackServer(POA_clientIdl::BlasterCallback *implObj) 00377 { 00378 if (implObj) { 00379 if (log_.trace()) log_.trace(me(), "Trying resolve_initial_references ..."); 00380 CORBA::Object_var obj = orb_->resolve_initial_references("RootPOA"); 00381 if (log_.trace()) log_.trace(me(), "Trying narrowing POA ..."); 00382 poa_ = PortableServer::POA::_narrow(obj.in()); 00383 PortableServer::POAManager_var poa_mgr = poa_->the_POAManager(); 00384 // _this() incarnates with the servant ... 00385 callback_ = implObj->_this(); 00386 if (log_.trace()) log_.trace(me(), "Trying object_to_string POA ..."); 00387 CORBA::String_var tmp = orb_->object_to_string(callback_); 00388 callbackIOR_ = tmp; 00389 if (log_.trace()) log_.trace(me(), "Trying activate POA ..."); 00390 poa_mgr->activate(); 00391 #if defined(XMLBLASTER_MICO) && defined(ORB_IS_THREAD_SAFE) 00392 // - multi threaded mico 2.3.11 sometimes blocked forever in work_pending() 00393 // - omniORB doesn't need perform_work() either but it doesn't harm 00394 #else 00395 // - TAO seems to need it (callback messages won't arrive without) 00396 if (log_.trace()) log_.trace(me(), "Trying orb.work_pending ..."); 00397 while (orb_->work_pending()) { 00398 if (log_.trace()) log_.trace(me(), "Entering perform_work ..."); 00399 orb_->perform_work(); 00400 } 00401 if (log_.trace()) log_.trace(me(), "Trying work_pending POA done ..."); 00402 #endif 00403 return; 00404 // add exception handling here !!!!! 00405 } 00406 return; 00407 } 00408 00409 ConnectReturnQosRef CorbaConnection::connect(const ConnectQosRef& connectQos) 00410 { 00411 if ( !CORBA::is_nil(xmlBlaster_)) { 00412 string msg = "You are already logged in, returning cached handle"; 00413 msg += " on xmlBlaster"; 00414 log_.warn(me(), msg); 00415 return connectReturnQos_; 00416 } 00417 00418 loginName_ = connectQos->getUserId(); 00419 if (log_.call()) log_.call(me(),"connect(" + loginName_ + ") ..."); 00420 try { 00421 if (CORBA::is_nil(authServer_)) initAuthenticationService(); 00422 ConnectQos help = *connectQos; // since it is a const 00423 string reqQos = help.toXml(); 00424 if (log_.trace()) log_.trace(me(), string("connect req: ") + reqQos); 00425 // If using wstring in xmlBlaster.idl: 00426 //CORBA::WString_var ws1 = CORBA::wstring_dup(toWstring(reqQos).c_str()); 00427 //CORBA::WString_var ws2 = authServer_->connect(ws1); 00428 //string retQos = toString(wstring(ws2)); 00429 // or 00430 string retQos = corbaWStringToString(authServer_->connect(toCorbaWString(reqQos))); 00431 //string retQos = authServer_->connect(reqQos.c_str()); 00432 if (log_.trace()) log_.trace(me(), string("connect ret: ") + retQos); 00433 ConnectQosFactory factory(global_); 00434 if (log_.dump()) log_.dump(me(), "connect: the connect return qos before parsing: " + retQos); 00435 connectReturnQos_ = factory.readObject(retQos); 00436 sessionId_ = connectReturnQos_->getSecretSessionId(); 00437 xmlBlasterIOR_ = connectReturnQos_->getServerRef().getAddress(); 00438 00439 CORBA::Object_var obj = orb_->string_to_object(xmlBlasterIOR_.c_str()); 00440 xmlBlaster_ = serverIdl::Server::_narrow(obj.in()); 00441 00442 numLogins_++; 00443 if (log_.trace()) log_.trace(me(),"Success, connect for "+loginName_); 00444 return connectReturnQos_; 00445 } 00446 catch(const XmlBlasterException &e) { 00447 string msg = "Connect failed for "; 00448 msg += loginName_; // + ", numLogins=" + numLogins_; 00449 if (log_.trace()) log_.trace(me(), msg); 00450 throw e; 00451 } 00452 } 00453 00454 bool CorbaConnection::shutdown() 00455 { 00456 bool ret = false; 00457 if (!CORBA::is_nil(xmlBlaster_)) { 00458 CORBA::release(xmlBlaster_); 00459 xmlBlaster_ = NULL; 00460 ret = true; 00461 } 00462 if (!CORBA::is_nil(authServer_)) { 00463 CORBA::release(authServer_); 00464 authServer_ = NULL; 00465 ret = true; 00466 } 00467 return ret; 00468 } 00469 00470 bool CorbaConnection::shutdownCb() 00471 { 00472 if (!CORBA::is_nil(callback_)) { 00473 CORBA::release(callback_); 00474 callback_ = NULL; 00475 return true; 00476 } 00477 return false; 00478 } 00479 00480 bool CorbaConnection::disconnect(const string& qos) 00481 { 00482 if (log_.call()) log_.call(me(), "disconnect() ..."); 00483 if (log_.dump()) log_.dump(me(), string("disconnect: the qos: ") + qos); 00484 00485 try { 00486 if (!CORBA::is_nil(authServer_)) { 00487 if (sessionId_=="") authServer_->logout(xmlBlaster_); 00488 else authServer_->disconnect(sessionId_.c_str(), toCorbaWString(qos)); 00489 } 00490 shutdown(); 00491 return true; 00492 } 00493 catch (...) { 00494 } 00495 shutdown(); 00496 return false; 00497 } 00498 00505 string 00506 CorbaConnection::subscribe(const string &xmlKey, const string &qos) 00507 { 00508 if (log_.call()) log_.call(me(), "subscribe() ..."); 00509 if (log_.dump()) { 00510 log_.dump(me(), string("subscribe: the key: ") + xmlKey); 00511 log_.dump(me(), string("subscribe: the qos: ") + qos); 00512 } 00513 if (CORBA::is_nil(xmlBlaster_)) { 00514 string txt = "no auth.Server, you must login first"; 00515 throw serverIdl::XmlBlasterException("communication.noConnection", 00516 "client", me().c_str(), "en", 00517 txt.c_str(), "", "", "", "", "", ""); 00518 } 00519 try { 00520 return corbaWStringToString(xmlBlaster_->subscribe(toCorbaWString(xmlKey), toCorbaWString(qos))); 00521 //CORBA::String_var ret = toString(xmlBlaster_->subscribe(xmlKey.c_str(), qos.c_str())); 00522 //return static_cast<const char *>(ret); 00523 } catch(serverIdl::XmlBlasterException &e) { 00524 throw e; 00525 } 00526 //return ""; 00527 } 00528 00529 00530 vector<std::string> CorbaConnection::unSubscribe(const string &xmlKey, 00531 const string &qos) 00532 { 00533 if (log_.call()) log_.call(me(), "unSubscribe() ..."); 00534 if (log_.dump()) { 00535 log_.dump(me(), string("unSubscribe: the key: ") + xmlKey); 00536 log_.dump(me(), string("unSubscribe: the qos: ") + qos); 00537 } 00538 00539 if (CORBA::is_nil(xmlBlaster_)) { 00540 string txt = "no auth.Server, you must login first"; 00541 throw serverIdl::XmlBlasterException("communication.noConnection", 00542 "client", me().c_str(), "en", 00543 txt.c_str(), "", "", "", "", "", ""); 00544 } 00545 00546 try { 00547 serverIdl::XmlTypeArr_var 00548 retArr = xmlBlaster_->unSubscribe(toCorbaWString(xmlKey), toCorbaWString(qos)); 00549 00550 vector<std::string> vecArr; 00551 for (CORBA::ULong ii=0; ii<retArr->length(); ii++) { 00552 vecArr.push_back(corbaWStringToString(retArr[ii].inout())); 00553 } 00554 return vecArr; 00555 } 00556 catch(serverIdl::XmlBlasterException e) { 00557 throw e; 00558 } 00559 } 00560 00569 string CorbaConnection::publish(const util::MessageUnit &msgUnitUtil) { 00570 if (log_.trace()) log_.trace(me(), "Publishing the STL way ..."); 00571 if (log_.dump()) { 00572 log_.dump(me(), string("publish: the msgUnit: ") + msgUnitUtil.toXml()); 00573 } 00574 00575 if (CORBA::is_nil(xmlBlaster_)) { 00576 string txt = "no auth.Server, you must login first"; 00577 throw serverIdl::XmlBlasterException("communication.noConnection", 00578 "client", me().c_str(), "en", 00579 txt.c_str(), "", "", "", "", "", ""); 00580 } 00581 00582 try { 00583 serverIdl::MessageUnit msgUnit; 00584 // serverIdl::MessageUnit_var msgUnit; 00585 copyToCorba(msgUnit, msgUnitUtil); 00586 return corbaWStringToString(xmlBlaster_->publish(msgUnit)); 00587 //CORBA::String_var ret = xmlBlaster_->publish(msgUnit); 00588 //return static_cast<char *>(ret); 00589 } 00590 catch(serverIdl::XmlBlasterException &e) { 00591 string msg = "XmlBlasterException: "; 00592 msg += e.message; 00593 if (log_.trace()) log_.trace(me(), msg); 00594 throw e; 00595 } 00596 // catch(CORBA::Exception &ex1) { 00597 // throw serverIdl::XmlBlasterException(me().c_str(),to_string(ex1)); 00598 // } 00599 } 00600 00604 string 00605 CorbaConnection::publish(const serverIdl::MessageUnit &msgUnit) 00606 { 00607 if (log_.trace()) log_.trace(me(), "Publishing ..."); 00608 00609 if (CORBA::is_nil(xmlBlaster_)) { 00610 string txt = "no auth.Server, you must login first"; 00611 throw serverIdl::XmlBlasterException("communication.noConnection", 00612 "client", me().c_str(), "en", 00613 txt.c_str(), "", "", "", "", "", ""); 00614 } 00615 00616 try { 00617 return corbaWStringToString(xmlBlaster_->publish(msgUnit)); 00618 //CORBA::String_var ret = xmlBlaster_->publish(msgUnit); 00619 //return static_cast<char *>(ret); 00620 } 00621 catch(serverIdl::XmlBlasterException &e) { 00622 string msg = "XmlBlasterException: "; 00623 msg += e.message; 00624 if (log_.trace()) log_.trace(me(), msg); 00625 throw e; 00626 } 00627 // catch(CORBA::Exception &ex1) { 00628 // throw serverIdl::XmlBlasterException(me().c_str(),to_string(ex1)); 00629 // } 00630 } 00631 00641 vector<std::string> 00642 CorbaConnection::publishArr(const vector<util::MessageUnit> &msgVec) 00643 { 00644 if (log_.call()) log_.call(me(), "publishArr() ..."); 00645 00646 if (CORBA::is_nil(xmlBlaster_)) { 00647 string txt = "no auth.Server, you must login first"; 00648 throw serverIdl::XmlBlasterException("communication.noConnection", 00649 "client", me().c_str(), "en", 00650 txt.c_str(), "", "", "", "", "", ""); 00651 } 00652 00653 try { 00654 serverIdl::MessageUnitArr_var msgUnitArr = new serverIdl::MessageUnitArr; 00655 copyToCorba(msgUnitArr, msgVec); 00656 serverIdl::XmlTypeArr_var retArr = xmlBlaster_->publishArr(msgUnitArr); 00657 vector<std::string> vecArr; 00658 for (CORBA::ULong ii=0; ii<retArr->length(); ii++) { 00659 vecArr.push_back(corbaWStringToString(retArr[ii].inout())); 00660 //vecArr.push_back(static_cast<char *>(retArr[ii].inout())); 00661 } 00662 return vecArr; 00663 } 00664 catch(serverIdl::XmlBlasterException &e) { 00665 if (log_.trace()) log_.trace(me(), "XmlBlasterException: " 00666 + string(e.message) ); 00667 throw e; 00668 } 00669 } 00670 00674 serverIdl::XmlTypeArr* 00675 CorbaConnection::publishArr(const serverIdl::MessageUnitArr& msgUnitArr) 00676 { 00677 if (log_.call()) log_.call(me(), "publishArr() ..."); 00678 00679 if (CORBA::is_nil(xmlBlaster_)) { 00680 string txt = "no auth.Server, you must login first"; 00681 throw serverIdl::XmlBlasterException("communication.noConnection", 00682 "client", me().c_str(), "en", 00683 txt.c_str(), "", "", "", "", "", ""); 00684 } 00685 00686 try { 00687 return xmlBlaster_->publishArr(msgUnitArr); 00688 } 00689 catch(serverIdl::XmlBlasterException &e) { 00690 if (log_.trace()) log_.trace(me(), "XmlBlasterException: " 00691 + string(e.message) ); 00692 throw e; 00693 } 00694 return 0; 00695 } 00696 00705 void 00706 CorbaConnection::publishOneway(const vector<util::MessageUnit>& msgVec) 00707 { 00708 if (log_.call()) log_.call(me(), "publishOneway() ..."); 00709 00710 if (CORBA::is_nil(xmlBlaster_)) { 00711 string txt = "no auth.Server, you must login first"; 00712 throw serverIdl::XmlBlasterException("communication.noConnection", 00713 "client", me().c_str(), "en", 00714 txt.c_str(), "", "", "", "", "", ""); 00715 } 00716 00717 try { 00718 serverIdl::MessageUnitArr_var msgUnitArr = new serverIdl::MessageUnitArr; 00719 copyToCorba(msgUnitArr, msgVec); 00720 xmlBlaster_->publishOneway(msgUnitArr); 00721 } 00722 catch (const exception& e) { 00723 log_.error(me(), string("Exception caught in publishOneway, it is not transferred to client: ") + e.what()); 00724 } 00725 catch(...) { 00726 log_.error(me(), "Exception caught in publishOneway, it is not transferred to client"); 00727 } 00728 } 00729 00730 /* 00731 * Please use the STL based variant 00732 * @param The MessageUnit array as a CORBA datatype 00733 * @deprecated Use the vector<util::MessageUnit> variant 00734 */ 00735 void 00736 CorbaConnection::publishOneway(const serverIdl::MessageUnitArr& msgUnitArr) 00737 { 00738 if (log_.call()) log_.call(me(), "publishOneway() ..."); 00739 00740 if (CORBA::is_nil(xmlBlaster_)) { 00741 string txt = "no auth.Server, you must login first"; 00742 throw serverIdl::XmlBlasterException("communication.noConnection", 00743 "client", me().c_str(), "en", 00744 txt.c_str(), "", "", "", "", "", ""); 00745 } 00746 00747 try { 00748 xmlBlaster_->publishOneway(msgUnitArr); 00749 } 00750 catch (const exception& e) { 00751 log_.error(me(), string("Exception caught in publishOneway, it is not transferred to client: ") + e.what()); 00752 } 00753 catch(...) { 00754 log_.error(me(), "Exception caught in publishOneway, it is not transferred to client"); 00755 } 00756 } 00757 00763 vector<std::string> 00764 CorbaConnection::erase(const string &xmlKey, const string &qos) 00765 { 00766 if (log_.call()) log_.call(me(), "erase() ..."); 00767 if (log_.dump()) { 00768 log_.dump(me(), string("erase: the key: ") + xmlKey); 00769 log_.dump(me(), string("erase: the qos: ") + qos); 00770 } 00771 00772 if (CORBA::is_nil(xmlBlaster_)) { 00773 string txt = "no auth.Server, you must login first"; 00774 throw serverIdl::XmlBlasterException("communication.noConnection", 00775 "client", me().c_str(), "en", 00776 txt.c_str(), "", "", "", "", "", ""); 00777 } 00778 00779 try { 00780 serverIdl::XmlTypeArr_var retArr = xmlBlaster_->erase(toCorbaWString(xmlKey), toCorbaWString(qos)); 00781 vector<std::string> vecArr; 00782 for (CORBA::ULong ii=0; ii<retArr->length(); ii++) { 00783 vecArr.push_back(corbaWStringToString(retArr[ii])); 00784 //vecArr.push_back(static_cast<const char *>(retArr[ii])); 00785 } 00786 return vecArr; 00787 } 00788 catch(serverIdl::XmlBlasterException e) { 00789 throw e; 00790 } 00791 } 00792 00793 00801 vector<util::MessageUnit> 00802 CorbaConnection::get(const string &xmlKey, const string &qos) 00803 { 00804 00805 serverIdl::MessageUnitArr_var units; 00806 if (log_.call()) log_.call(me(), "get() ..."); 00807 if (log_.dump()) { 00808 log_.dump(me(), string("get: the key: ") + xmlKey); 00809 log_.dump(me(), string("get: the qos: ") + qos); 00810 } 00811 00812 if (CORBA::is_nil(xmlBlaster_)) { 00813 string txt = "no auth.Server, you must login first"; 00814 throw serverIdl::XmlBlasterException("communication.noConnection", 00815 "client", me().c_str(), "en", 00816 txt.c_str(), "", "", "", "", "", ""); 00817 } 00818 00819 try { 00820 units = xmlBlaster_->get(toCorbaWString(xmlKey), toCorbaWString(qos)); 00821 /* 00822 string subId = xmlBlaster_->subscribe(xmlKey.c_str(), 00823 qos.c_str()); 00824 log_.info(me(),"New Entry in Cache created (subId="+subId+")"); 00825 */ 00826 vector<util::MessageUnit> msgVec; 00827 copyFromCorba(msgVec, units); 00828 return msgVec; 00829 } 00830 catch(serverIdl::XmlBlasterException &e) { 00831 throw e; 00832 } 00833 } 00834 00835 string 00836 CorbaConnection::ping(const string &qos) 00837 { 00838 if (log_.call()) log_.call(me(), "ping(" + qos + ") ..."); 00839 00840 if (CORBA::is_nil(xmlBlaster_)) { 00841 string txt = "no auth.Server, you must login first"; 00842 throw serverIdl::XmlBlasterException("communication.noConnection", 00843 "client", me().c_str(), "en", 00844 txt.c_str(), "", "", "", "", "", ""); 00845 } 00846 00847 try { 00848 CORBA::String_var ret = xmlBlaster_->ping(""); 00849 return static_cast<char *>(ret); 00850 } 00851 catch(serverIdl::XmlBlasterException &e) { 00852 throw e; 00853 } 00854 } 00855 00859 void 00860 CorbaConnection::copyToCorba(serverIdl::MessageUnit &dest, 00861 const util::MessageUnit &src) const 00862 { 00863 dest.xmlKey = toCorbaWString(src.getKey().toXml()); 00864 serverIdl::ContentType content(src.getContentLen(), 00865 src.getContentLen(), 00866 (CORBA::Octet*)src.getContent(), 00867 false); // our src does memory management itself 00868 dest.content = content; // dest.content and content point to same memory? memory leak? 00869 dest.qos = toCorbaWString(src.getQos().toXml()); 00870 } 00871 00872 00876 void 00877 CorbaConnection::copyToCorba(serverIdl::MessageUnitArr_var &units, 00878 const vector<util::MessageUnit> &msgVec) const 00879 { 00880 unsigned int len = msgVec.size(); 00881 units->length(len); 00882 for (CORBA::ULong ii=0; ii<len; ii++) { 00883 util::MessageUnit src = msgVec[ii]; 00884 serverIdl::MessageUnit dest; 00885 copyToCorba(dest, src); 00886 units[ii] = dest; 00887 } 00888 } 00889 00894 void 00895 CorbaConnection::copyFromCorba(vector<util::MessageUnit> &msgVec, 00896 serverIdl::MessageUnitArr_var &units) 00897 { 00898 unsigned int len = units->length(); 00899 msgVec.reserve(len); 00900 for (CORBA::ULong ii=0; ii<len; ii++) { 00901 const serverIdl::MessageUnit &msgUnit = static_cast<const serverIdl::MessageUnit>(units[ii]); 00902 unsigned long len = static_cast<unsigned long>(msgUnit.content.length()); 00903 const unsigned char * blob = static_cast<const unsigned char *>(&msgUnit.content[0]); 00904 if (log_.trace()) log_.trace(me(), "copyFromCorba() '" + string((const char *)blob) + "' len=" + lexical_cast<std::string>(len)); 00905 MsgKeyData key = msgKeyFactory_.readObject(corbaWStringToString(msgUnit.xmlKey)); 00906 MsgQosData qos = msgQosFactory_.readObject(corbaWStringToString(msgUnit.qos)); 00907 const util::MessageUnit msg(key, len, blob, qos); 00908 msgVec.push_back(msg); 00909 } 00910 } 00911 00927 std::string CorbaConnection::usage() 00928 { 00929 std::string text = string(""); 00930 //text += string("\n"); 00931 text += string("\nThe CORBA plugin configuration:"); 00932 text += string("\n -bootstrapHostname <host>"); 00933 text += string("\n The host where to find xmlBlaster [localhost]"); 00934 text += string("\n -bootstrapPort <port>"); 00935 text += string("\n The bootstrap port where xmlBlaster publishes its IOR [3412]"); 00936 text += string("\n -dispatch/connection/plugin/ior/iorString <IOR:00...>"); 00937 text += string("\n The IOR string of the xmlBlaster-authentication server."); 00938 text += string("\n -dispatch/connection/plugin/ior/iorFile <file>"); 00939 text += string("\n A file with the xmlBlaster-authentication server IOR."); 00940 text += string("\n -dispatch/connection/plugin/ior/useNameService <true/false>"); 00941 text += string("\n Try to access xmlBlaster through a naming service [true]"); 00942 text += string("\n"); 00943 return text; 00944 } 00945 00946 // CORBA::ORB_ptr CorbaConnection::orb_ = 0; 00947 // unsigned short CorbaConnection::numOfSessions_ = 0; 00948 // PortableServer::POA_ptr CorbaConnection::poa_ = 0; 00949 00950 }}}}} // end of namespace 00951 00952