1 /*----------------------------------------------------------------------------
2 Name: CorbaConnection.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Helper to connect to xmlBlaster: for now a simplified version
6 without caching and without failsafe mode.
7 Author: <Michele Laghi> michele.laghi@attglobal.net
8 -----------------------------------------------------------------------------*/
9 /*
10 #ifdef _WINDOWS
11 #pragma warning(disable:4786)
12 #endif
13 */
14 #include <client/protocol/corba/CorbaConnection.h>
15 #include <util/Constants.h>
16 #include <sys/types.h>
17 #ifdef _WINDOWS
18 # include <winsock.h>
19 #else
20 # if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__hpux__)
21 # include <netinet/in.h>
22 # include <sys/types.h> /* Needed for __FreeBSD__ */
23 # endif
24 # include <sys/socket.h>
25 # include <netdb.h>
26 # include <arpa/inet.h> // inet_addr()
27 # include <unistd.h> // gethostname()
28 #endif
29
30 #include <util/XmlBlasterException.h>
31 #include <util/Global.h>
32 #include <client/protocol/corba/CorbaDriver.h>
33
34 void closeSocket(int fd) {
35 #ifdef _WINDOWS
36 closesocket(fd);
37 #else
38 (void)close(fd);
39 #endif
40 }
41
42 namespace org {
43 namespace xmlBlaster {
44 namespace client {
45 namespace protocol {
46 namespace corba {
47
48 using namespace std;
49 using namespace org::xmlBlaster::util;
50 using namespace org::xmlBlaster::util::qos;
51 using namespace org::xmlBlaster::util::key;
52
53 CorbaConnection::CorbaConnection(Global& global, CORBA::ORB_ptr orb)
54 : orb_(0),
55 poa_(0),
56 /* loginQos_(), */
57 connectReturnQos_((ConnectReturnQos*)0),
58 global_(global),
59 log_(global.getLog("org.xmlBlaster.client.protocol.corba")),
60 msgKeyFactory_(global),
61 msgQosFactory_(global)
62 {
63 //global_.getProperty().loadPropertyFile();
64 log_.info(me(), "Initializing CORBA ORB");
65 if (log_.call()) log_.call(me(), "CorbaConnection constructor ...");
66 // if (numOfSessions_ == 0) {
67 if (orb) orb_ = orb;
68 else {
69 int args = global_.getArgs();
70 const char * const* argc = global_.getArgc();
71 orb_ = CORBA::ORB_init(args, const_cast<char **>(argc)); //, "XmlBlaster-C++-Client");
72 }
73
74 // numOfSessions_++;
75 nameServerControl_ = 0;
76 numLogins_ = 0;
77 xmlBlaster_ = 0;
78 authServer_ = 0; // initAuthenticationService();
79 callback_ = 0;
80 defaultCallback_ = 0;
81 sessionId_ = "";
82 xmlBlasterIOR_ = "";
83 }
84
85
86 CorbaConnection::~CorbaConnection()
87 {
88 if (log_.call()) log_.call(me(), "destructor");
89
90 delete nameServerControl_;
91
92 if (log_.trace()) log_.trace(me(), "destructor: invoking shutdown");
93 shutdown();
94
95 if (log_.trace()) log_.trace(me(), "destructor: invoking shutdownCb");
96 shutdownCb();
97 if (log_.trace()) log_.trace(me(), "destructor: deleting the defaultCallback");
98 delete defaultCallback_;
99
100 if (log_.trace()) log_.trace(me(), "destructor: releasing the orb");
101 if (!CORBA::is_nil(orb_)) CORBA::release(orb_);
102 if (log_.trace()) log_.trace(me(), "destructor: releasing the poa");
103 if (!CORBA::is_nil(poa_)) CORBA::release(poa_);
104 orb_ = 0;
105 poa_ = 0;
106 }
107
108 string CorbaConnection::getAddress() const
109 {
110 return xmlBlasterIOR_;
111 }
112
113 string CorbaConnection::getCbAddress() const
114 {
115 return callbackIOR_;
116 }
117
118 void CorbaConnection::initNamingService()
119 {
120 if (log_.call()) log_.call(me(), "initNamingService() ...");
121 if (orb_ == 0) log_.panic(me(), "orb==null, internal problem");
122 if (nameServerControl_ == 0)
123 nameServerControl_ = new NameServerControl(orb_);
124 }
125
126
127 void CorbaConnection::initAuthenticationService()
128 {
129 if (log_.call()) log_.call(me(), "initAuthenticationService() ...");
130 if (!CORBA::is_nil(authServer_))
131 return;
132
133 // 1) check if argument -IOR at program startup is given
134 string authServerIOR = /* -dispatch/connection/plugin/ior/iorString IOR string is directly given */
135 global_.getProperty().getStringProperty("dispatch/callback/plugin/ior/iorString","");
136 if (authServerIOR != "") {
137 CORBA::Object_var
138 obj = orb_->string_to_object(authServerIOR.c_str());
139 authServer_ = authenticateIdl::AuthServer::_narrow(obj.in());
140 log_.info(me(),"Accessing xmlBlaster using your given IOR string");
141 return;
142 }
143 if (log_.trace()) log_.trace(me(), "No -dispatch/connection/plugin/ior/iorString ...");
144
145 string authServerIORFile =
146 global_.getProperty().getStringProperty("dispatch/connection/plugin/ior/iorFile","");
147 // -dispatch/connection/plugin/ior/iorFile IOR string is given through a file
148 if (authServerIORFile != "") {
149 ifstream in(authServerIORFile.c_str());
150 if ((!in) /* && (log_.PANIC) */ )
151 log_.panic(me(), "Could not open the file");
152 in >> authServerIOR;
153 in.close();
154 CORBA::Object_var
155 obj = orb_->string_to_object(authServerIOR.c_str());
156 authServer_ = authenticateIdl::AuthServer::_narrow(obj.in());
157 string msg = "Accessing xmlBlaster using your given IOR file ";
158 msg += authServerIORFile;
159 log_.info(me(), msg);
160 return;
161 }
162 if (log_.trace()) log_.trace(me(), "No -dispatch/connection/plugin/ior/iorFile ...");
163
164 // 3) Using builtin http IOR download ...
165 {
166 char myHostName[126];
167 strcpy(myHostName, "localhost");
168 gethostname(myHostName, 125);
169 string iorHost = global_.getProperty().getStringProperty("bootstrapHostname",myHostName);
170 // Port may be a name from /etc/services: "xmlBlaster 3412/tcp"
171 string iorPortStr = global_.getProperty().getStringProperty("bootstrapPort","3412"); // default bootstrapPort=3412 (xmlblaster)
172 if (log_.trace()) log_.trace(me(), "Trying -bootstrapHostname=" + iorHost + " and -bootstrapPort=" + iorPortStr + " ...");
173 struct sockaddr_in xmlBlasterAddr;
174 memset((char *)&xmlBlasterAddr, 0, sizeof(xmlBlasterAddr));
175 xmlBlasterAddr.sin_family=AF_INET;
176 struct hostent *hostP = gethostbyname(iorHost.c_str());
177 struct servent *portP = getservbyname(iorPortStr.c_str(), "tcp");
178 string authServerIOR;
179 authServerIOR.reserve(520);
180 if (hostP != NULL) {
181 xmlBlasterAddr.sin_addr.s_addr = ((struct in_addr *)(hostP->h_addr))->s_addr; //inet_addr("192.168.1.2");
182 if (portP != NULL)
183 xmlBlasterAddr.sin_port = portP->s_port;
184 else
185 xmlBlasterAddr.sin_port = htons(global_.getProperty().getIntProperty("bootstrapPort",3412));
186 int s = socket(AF_INET, SOCK_STREAM, 0);
187 if (s != -1) {
188 if (::connect(s, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr)) != -1) {
189 string req="GET /AuthenticationService.ior HTTP/1.0\r\n \n";
190 int numSent = send(s, req.c_str(), req.size(), 0);
191 if (numSent < (int)req.size()) {
192 log_.error(me(), "Problems sending request '" + req + "'");
193 }
194 else {
195 log_.trace(me(), "Sent IOR request '" + req + "'");
196 }
197 int numRead;
198 char buf[10];
199 while ((numRead = recv(s, buf, 10, 0)) > 0) {
200 authServerIOR.append(buf, numRead);
201 }
202 if (log_.dump()) log_.dump(me(), "Received IOR data: '" + authServerIOR + "'");
203 size_t pos = authServerIOR.find("IOR:");
204 // if (pos > 0)
205 if (pos != authServerIOR.npos) authServerIOR = authServerIOR.substr(pos);
206 else {
207 throw serverIdl::XmlBlasterException("communication.noConnection",
208 "client", me().c_str(), "en",
209 "can't access authentication Service", "", "", "", "",
210 "", "");
211 }
212 if (log_.trace()) log_.trace(me(), "Received IOR data: '" + authServerIOR + "'");
213 }
214 else {
215 log_.warn(me(), "Connecting to -bootstrapHostname=" + iorHost + " failed"); // errno
216 }
217 ::shutdown(s, 2); // SHUT_RDWR
218 ::closeSocket(s); // Added because of handle leak reported by James Cazier
219 }
220 }
221 if (!authServerIOR.empty()) {
222 CORBA::Object_var obj = orb_->string_to_object(authServerIOR.c_str());
223 if (!CORBA::is_nil(obj.in())) {
224 if (!CORBA::is_nil(authServer_)) {
225 CORBA::release(authServer_);
226 authServer_ = 0;
227 }
228 authServer_ = authenticateIdl::AuthServer::_narrow(obj.in());
229 string msg = "Accessing xmlBlaster using -bootstrapHostname "+iorHost;
230 log_.info(me(), msg);
231 return;
232 }
233 }
234 }
235 if (log_.trace()) log_.trace(me(), "No -bootstrapHostname and -bootstrapPort ...");
236
237
238 // 4) asking Name Service CORBA compliant
239 bool useNameService=global_.getProperty().getBoolProperty("dispatch/connection/plugin/ior/useNameService",true);
240 // -dispatch/connection/plugin/ior/useNameService default is to ask the naming service
241
242 string text = "Can't access xmlBlaster Authentication Service";
243 text += ", is the server running and ready?\n - try to specify ";
244 text += "'-dispatch/connection/plugin/ior/iorFile <fileName>' if server is running on same host\n";
245 text += " - try to specify '-bootstrapHostname <hostName> -bootstrapPort 3412' to ";
246 text += "locate xmlBlaster\n - or contact your ";
247 text += "system administrator to start a naming service";
248
249 if (useNameService) {
250 try {
251 if (!nameServerControl_) initNamingService();
252
253 string contextId = global_.getProperty().getStringProperty("NameService.context.id", "xmlBlaster");
254 string contextKind = global_.getProperty().getStringProperty("NameService.context.kind", "MOM");
255 string clusterId = global_.getProperty().getStringProperty("NameService.node.id", global_.getStrippedId());
256 string clusterKind = global_.getProperty().getStringProperty("NameService.node.kind", "MOM");
257
258 CORBA::Object_var obj = nameServerControl_->resolve(contextId, contextKind);
259 CosNaming::NamingContext_var relativeContext_obj = CosNaming::NamingContext::_narrow(obj.in());
260 NameServerControl relativeContext(relativeContext_obj);
261 log_.info(me(), "Retrieved NameService context " + contextId + "." + contextKind);
262
263 authenticateIdl::AuthServer_var authServerFirst;
264 string tmpId = ""; // for logging only
265 string tmpServerName = ""; // for logging only
266 string firstServerName = ""; // for logging only
267 int countServerFound = 0; // for logging only
268 string serverNameList = ""; // for logging only
269 try {
270 authServer_ = authenticateIdl::AuthServer::_narrow(relativeContext.resolve(clusterId, clusterKind));
271 }
272 catch (XmlBlasterException ex) {
273 log_.info(me(), "Narrow AuthServer failed: " + ex.toString());
274 }
275
276 /*============================
277 TestGet -ORBInitRef NameService=`cat /tmp/ns.ior` -trace true -call true
278 =============================*/
279
280 if ( CORBA::is_nil(authServer_) ) {
281 if (log_.trace()) log_.trace(me(), "Query NameServer to find a suitable xmlBlaster server for '" +
282 NameServerControl::getString(contextId, contextKind)+"/"+NameServerControl::getString(clusterId, clusterKind) +
283 "' failed, is nil");
284 CosNaming::BindingList_var bl;
285 CosNaming::BindingIterator_var bi;
286 CosNaming::NamingContext_var tmp = relativeContext.getNamingService();
287 tmp->list(0, bl, bi);
288
289 // process the remaining bindings if an iterator exists:
290 if (CORBA::is_nil(authServer_) && !CORBA::is_nil(bi.in())) {
291 int i = 0;
292 CORBA::Boolean more;
293 do {
294 more = bi->next_n(1, bl);
295 if (bl->length() != 1) {
296 if (log_.trace()) log_.trace(me(), "NameService entry id is nil");
297 break;
298 }
299 CORBA::ULong index = 0;
300 string id = lexical_cast<std::string>(bl[index].binding_name[0].id);
301 string kind = lexical_cast<std::string>(bl[index].binding_name[0].kind);
302 if (log_.trace()) log_.trace(me(), "id=" + id + " kind=" + kind);
303
304 tmpId = id;
305 countServerFound++;
306 tmpServerName = NameServerControl::getString(contextId, contextKind)+"/"+NameServerControl::getString(id, kind);
307 if (i>0) serverNameList += ", ";
308 i++;
309 serverNameList += tmpServerName;
310
311 if (clusterId == id && clusterKind == kind) {
312 try {
313 if (log_.trace()) log_.trace(me(), "Trying to resolve NameService entry '"+NameServerControl::getString(id, kind)+"'");
314 authServer_ = authenticateIdl::AuthServer::_narrow(relativeContext.resolve(id, kind));
315 if (! CORBA::is_nil(authServer_))
316 break; // found a matching server
317 else
318 log_.warn(me(), "Connecting to NameService entry '"+tmpServerName+"' failed, is_nil");
319 }
320 catch (const CORBA::Exception &exc) {
321 log_.warn(me(), "Connecting to NameService entry '"+tmpServerName+"' failed: " + to_string(exc));
322 }
323 }
324
325 if (CORBA::is_nil(authServerFirst.in())) {
326 if (log_.trace()) log_.trace(me(), "Remember the first server");
327 try {
328 firstServerName = tmpServerName;
329 if (log_.trace()) log_.trace(me(), "Remember the first reachable xmlBlaster server from NameService entry '"+firstServerName+"'");
330 authServerFirst = authenticateIdl::AuthServer::_narrow(relativeContext.resolve(id, kind));
331 }
332 catch (const CORBA::Exception &exc) {
333 log_.warn(me(), "Connecting to NameService entry '"+tmpServerName+"' failed: " + to_string(exc));
334 }
335 }
336 } while ( more );
337 }
338 bi->destroy(); // Clean up server side iteration resources
339 }
340
341 if (CORBA::is_nil(authServer_)) {
342 if (!CORBA::is_nil(authServerFirst.in())) {
343 if (countServerFound > 1) {
344 string str = string("Can't choose one of ") + lexical_cast<std::string>(countServerFound) +
345 " avalailable server in CORBA NameService: " + serverNameList +
346 ". Please choose one with e.g. -NameService.node.id " + tmpId;
347 log_.warn(me(), str);
348 throw XmlBlasterException("communication.noConnection", "client", me(), "en", str);
349 }
350 log_.info(me(), "Choosing only available server '" + firstServerName + "' in CORBA NameService");
351 this->authServer_ = authenticateIdl::AuthServer::_duplicate(authServerFirst.in());
352 return;
353 }
354 else {
355 log_.trace(me(), "No usable xmlBlaster server found in NameService: " + serverNameList);
356 throw XmlBlasterException("communication.noConnection", "client", me(), "en", text);
357 }
358 }
359
360 log_.info(me(), "Accessing xmlBlaster using CORBA naming service entry '" +
361 NameServerControl::getString(contextId, contextKind) +
362 "/" + NameServerControl::getString(clusterId, clusterKind));
363
364 return;
365 }
366 catch(serverIdl::XmlBlasterException &e ) {
367 log_.trace(me() + ".NoAuthService", text);
368 throw CorbaDriver::convertFromCorbaException(e);
369 }
370 } // if (useNameService)
371
372 if (log_.trace()) log_.trace(me(), "No -dispatch/connection/plugin/ior/useNameService ...");
373 throw XmlBlasterException("communication.noConnection", "client", me(), "en", text);
374 } // initAuthenticationService()
375
376 void CorbaConnection::createCallbackServer(POA_clientIdl::BlasterCallback *implObj)
377 {
378 if (implObj) {
379 if (log_.trace()) log_.trace(me(), "Trying resolve_initial_references ...");
380 CORBA::Object_var obj = orb_->resolve_initial_references("RootPOA");
381 if (log_.trace()) log_.trace(me(), "Trying narrowing POA ...");
382 poa_ = PortableServer::POA::_narrow(obj.in());
383 PortableServer::POAManager_var poa_mgr = poa_->the_POAManager();
384 // _this() incarnates with the servant ...
385 callback_ = implObj->_this();
386 if (log_.trace()) log_.trace(me(), "Trying object_to_string POA ...");
387 CORBA::String_var tmp = orb_->object_to_string(callback_);
388 callbackIOR_ = tmp;
389 if (log_.trace()) log_.trace(me(), "Trying activate POA ...");
390 poa_mgr->activate();
391 #if defined(XMLBLASTER_MICO) && defined(ORB_IS_THREAD_SAFE)
392 // - multi threaded mico 2.3.11 sometimes blocked forever in work_pending()
393 // - omniORB doesn't need perform_work() either but it doesn't harm
394 #else
395 // - TAO seems to need it (callback messages won't arrive without)
396 if (log_.trace()) log_.trace(me(), "Trying orb.work_pending ...");
397 while (orb_->work_pending()) {
398 if (log_.trace()) log_.trace(me(), "Entering perform_work ...");
399 orb_->perform_work();
400 }
401 if (log_.trace()) log_.trace(me(), "Trying work_pending POA done ...");
402 #endif
403 return;
404 // add exception handling here !!!!!
405 }
406 return;
407 }
408
409 ConnectReturnQosRef CorbaConnection::connect(const ConnectQosRef& connectQos)
410 {
411 if ( !CORBA::is_nil(xmlBlaster_)) {
412 string msg = "You are already logged in, returning cached handle";
413 msg += " on xmlBlaster";
414 log_.warn(me(), msg);
415 return connectReturnQos_;
416 }
417
418 loginName_ = connectQos->getUserId();
419 if (log_.call()) log_.call(me(),"connect(" + loginName_ + ") ...");
420 try {
421 if (CORBA::is_nil(authServer_)) initAuthenticationService();
422 ConnectQos help = *connectQos; // since it is a const
423 string reqQos = help.toXml();
424 if (log_.trace()) log_.trace(me(), string("connect req: ") + reqQos);
425 // If using wstring in xmlBlaster.idl:
426 //CORBA::WString_var ws1 = CORBA::wstring_dup(toWstring(reqQos).c_str());
427 //CORBA::WString_var ws2 = authServer_->connect(ws1);
428 //string retQos = toString(wstring(ws2));
429 // or
430 string retQos = corbaWStringToString(authServer_->connect(toCorbaWString(reqQos)));
431 //string retQos = authServer_->connect(reqQos.c_str());
432 if (log_.trace()) log_.trace(me(), string("connect ret: ") + retQos);
433 ConnectQosFactory factory(global_);
434 if (log_.dump()) log_.dump(me(), "connect: the connect return qos before parsing: " + retQos);
435 connectReturnQos_ = factory.readObject(retQos);
436 sessionId_ = connectReturnQos_->getSecretSessionId();
437 xmlBlasterIOR_ = connectReturnQos_->getServerRef().getAddress();
438
439 CORBA::Object_var obj = orb_->string_to_object(xmlBlasterIOR_.c_str());
440 xmlBlaster_ = serverIdl::Server::_narrow(obj.in());
441
442 numLogins_++;
443 if (log_.trace()) log_.trace(me(),"Success, connect for "+loginName_);
444 return connectReturnQos_;
445 }
446 catch(const XmlBlasterException &e) {
447 string msg = "Connect failed for ";
448 msg += loginName_; // + ", numLogins=" + numLogins_;
449 if (log_.trace()) log_.trace(me(), msg);
450 throw e;
451 }
452 }
453
454 bool CorbaConnection::shutdown()
455 {
456 bool ret = false;
457 if (!CORBA::is_nil(xmlBlaster_)) {
458 CORBA::release(xmlBlaster_);
459 xmlBlaster_ = NULL;
460 ret = true;
461 }
462 if (!CORBA::is_nil(authServer_)) {
463 CORBA::release(authServer_);
464 authServer_ = NULL;
465 ret = true;
466 }
467 return ret;
468 }
469
470 bool CorbaConnection::shutdownCb()
471 {
472 if (!CORBA::is_nil(callback_)) {
473 CORBA::release(callback_);
474 callback_ = NULL;
475 return true;
476 }
477 return false;
478 }
479
480 bool CorbaConnection::disconnect(const string& qos)
481 {
482 if (log_.call()) log_.call(me(), "disconnect() ...");
483 if (log_.dump()) log_.dump(me(), string("disconnect: the qos: ") + qos);
484
485 try {
486 if (!CORBA::is_nil(authServer_)) {
487 if (sessionId_=="") authServer_->logout(xmlBlaster_);
488 else authServer_->disconnect(sessionId_.c_str(), toCorbaWString(qos));
489 }
490 shutdown();
491 return true;
492 }
493 catch (...) {
494 }
495 shutdown();
496 return false;
497 }
498
499 /**
500 * Subscribe a message.
501 * <br />
502 * Note: You don't need to free anything
503 * @return The xml based QoS
504 */
505 string
506 CorbaConnection::subscribe(const string &xmlKey, const string &qos)
507 {
508 if (log_.call()) log_.call(me(), "subscribe() ...");
509 if (log_.dump()) {
510 log_.dump(me(), string("subscribe: the key: ") + xmlKey);
511 log_.dump(me(), string("subscribe: the qos: ") + qos);
512 }
513 if (CORBA::is_nil(xmlBlaster_)) {
514 string txt = "no auth.Server, you must login first";
515 throw serverIdl::XmlBlasterException("communication.noConnection",
516 "client", me().c_str(), "en",
517 txt.c_str(), "", "", "", "", "", "");
518 }
519 try {
520 return corbaWStringToString(xmlBlaster_->subscribe(toCorbaWString(xmlKey), toCorbaWString(qos)));
521 //CORBA::String_var ret = toString(xmlBlaster_->subscribe(xmlKey.c_str(), qos.c_str()));
522 //return static_cast<const char *>(ret);
523 } catch(serverIdl::XmlBlasterException &e) {
524 throw e;
525 }
526 //return "";
527 }
528
529
530 vector<std::string> CorbaConnection::unSubscribe(const string &xmlKey,
531 const string &qos)
532 {
533 if (log_.call()) log_.call(me(), "unSubscribe() ...");
534 if (log_.dump()) {
535 log_.dump(me(), string("unSubscribe: the key: ") + xmlKey);
536 log_.dump(me(), string("unSubscribe: the qos: ") + qos);
537 }
538
539 if (CORBA::is_nil(xmlBlaster_)) {
540 string txt = "no auth.Server, you must login first";
541 throw serverIdl::XmlBlasterException("communication.noConnection",
542 "client", me().c_str(), "en",
543 txt.c_str(), "", "", "", "", "", "");
544 }
545
546 try {
547 serverIdl::XmlTypeArr_var
548 retArr = xmlBlaster_->unSubscribe(toCorbaWString(xmlKey), toCorbaWString(qos));
549
550 vector<std::string> vecArr;
551 for (CORBA::ULong ii=0; ii<retArr->length(); ii++) {
552 vecArr.push_back(corbaWStringToString(retArr[ii].inout()));
553 }
554 return vecArr;
555 }
556 catch(serverIdl::XmlBlasterException e) {
557 throw e;
558 }
559 }
560
561 /**
562 * publish a message.
563 * <br />
564 * This method has a common interface which is not CORBA depending.
565 * <br />
566 * Note: You don't need to free anything
567 * @return The xml based QoS
568 */
569 string CorbaConnection::publish(const util::MessageUnit &msgUnitUtil) {
570 if (log_.trace()) log_.trace(me(), "Publishing the STL way ...");
571 if (log_.dump()) {
572 log_.dump(me(), string("publish: the msgUnit: ") + msgUnitUtil.toXml());
573 }
574
575 if (CORBA::is_nil(xmlBlaster_)) {
576 string txt = "no auth.Server, you must login first";
577 throw serverIdl::XmlBlasterException("communication.noConnection",
578 "client", me().c_str(), "en",
579 txt.c_str(), "", "", "", "", "", "");
580 }
581
582 try {
583 serverIdl::MessageUnit msgUnit;
584 // serverIdl::MessageUnit_var msgUnit;
585 copyToCorba(msgUnit, msgUnitUtil);
586 return corbaWStringToString(xmlBlaster_->publish(msgUnit));
587 //CORBA::String_var ret = xmlBlaster_->publish(msgUnit);
588 //return static_cast<char *>(ret);
589 }
590 catch(serverIdl::XmlBlasterException &e) {
591 string msg = "XmlBlasterException: ";
592 msg += e.message;
593 if (log_.trace()) log_.trace(me(), msg);
594 throw e;
595 }
596 // catch(CORBA::Exception &ex1) {
597 // throw serverIdl::XmlBlasterException(me().c_str(),to_string(ex1));
598 // }
599 }
600
601 /**
602 * @deprecated Please use the util::MessageUnit variant
603 */
604 string
605 CorbaConnection::publish(const serverIdl::MessageUnit &msgUnit)
606 {
607 if (log_.trace()) log_.trace(me(), "Publishing ...");
608
609 if (CORBA::is_nil(xmlBlaster_)) {
610 string txt = "no auth.Server, you must login first";
611 throw serverIdl::XmlBlasterException("communication.noConnection",
612 "client", me().c_str(), "en",
613 txt.c_str(), "", "", "", "", "", "");
614 }
615
616 try {
617 return corbaWStringToString(xmlBlaster_->publish(msgUnit));
618 //CORBA::String_var ret = xmlBlaster_->publish(msgUnit);
619 //return static_cast<char *>(ret);
620 }
621 catch(serverIdl::XmlBlasterException &e) {
622 string msg = "XmlBlasterException: ";
623 msg += e.message;
624 if (log_.trace()) log_.trace(me(), msg);
625 throw e;
626 }
627 // catch(CORBA::Exception &ex1) {
628 // throw serverIdl::XmlBlasterException(me().c_str(),to_string(ex1));
629 // }
630 }
631
632 /**
633 * Publish a bulk of messages.
634 * <br />
635 * This method has a common interface which is not CORBA depending.
636 * <br />
637 * Note: You don't need to free anything
638 * @param A vector with MessageUnit
639 * @return A vector of strings each is a publish return QoS.
640 */
641 vector<std::string>
642 CorbaConnection::publishArr(const vector<util::MessageUnit> &msgVec)
643 {
644 if (log_.call()) log_.call(me(), "publishArr() ...");
645
646 if (CORBA::is_nil(xmlBlaster_)) {
647 string txt = "no auth.Server, you must login first";
648 throw serverIdl::XmlBlasterException("communication.noConnection",
649 "client", me().c_str(), "en",
650 txt.c_str(), "", "", "", "", "", "");
651 }
652
653 try {
654 serverIdl::MessageUnitArr_var msgUnitArr = new serverIdl::MessageUnitArr;
655 copyToCorba(msgUnitArr, msgVec);
656 serverIdl::XmlTypeArr_var retArr = xmlBlaster_->publishArr(msgUnitArr);
657 vector<std::string> vecArr;
658 for (CORBA::ULong ii=0; ii<retArr->length(); ii++) {
659 vecArr.push_back(corbaWStringToString(retArr[ii].inout()));
660 //vecArr.push_back(static_cast<char *>(retArr[ii].inout()));
661 }
662 return vecArr;
663 }
664 catch(serverIdl::XmlBlasterException &e) {
665 if (log_.trace()) log_.trace(me(), "XmlBlasterException: "
666 + string(e.message) );
667 throw e;
668 }
669 }
670
671 /**
672 * @deprecated Please use the STL vector variant
673 */
674 serverIdl::XmlTypeArr*
675 CorbaConnection::publishArr(const serverIdl::MessageUnitArr& msgUnitArr)
676 {
677 if (log_.call()) log_.call(me(), "publishArr() ...");
678
679 if (CORBA::is_nil(xmlBlaster_)) {
680 string txt = "no auth.Server, you must login first";
681 throw serverIdl::XmlBlasterException("communication.noConnection",
682 "client", me().c_str(), "en",
683 txt.c_str(), "", "", "", "", "", "");
684 }
685
686 try {
687 return xmlBlaster_->publishArr(msgUnitArr);
688 }
689 catch(serverIdl::XmlBlasterException &e) {
690 if (log_.trace()) log_.trace(me(), "XmlBlasterException: "
691 + string(e.message) );
692 throw e;
693 }
694 return 0;
695 }
696
697 /**
698 * Publish a bulk of messages without ACK.
699 * <br />
700 * This method has a common interface which is not CORBA depending.
701 * <br />
702 * Note: You don't need to free anything
703 * @param The MessageUnit array as a STL vector
704 */
705 void
706 CorbaConnection::publishOneway(const vector<util::MessageUnit>& msgVec)
707 {
708 if (log_.call()) log_.call(me(), "publishOneway() ...");
709
710 if (CORBA::is_nil(xmlBlaster_)) {
711 string txt = "no auth.Server, you must login first";
712 throw serverIdl::XmlBlasterException("communication.noConnection",
713 "client", me().c_str(), "en",
714 txt.c_str(), "", "", "", "", "", "");
715 }
716
717 try {
718 serverIdl::MessageUnitArr_var msgUnitArr = new serverIdl::MessageUnitArr;
719 copyToCorba(msgUnitArr, msgVec);
720 xmlBlaster_->publishOneway(msgUnitArr);
721 }
722 catch (const exception& e) {
723 log_.error(me(), string("Exception caught in publishOneway, it is not transferred to client: ") + e.what());
724 }
725 catch(...) {
726 log_.error(me(), "Exception caught in publishOneway, it is not transferred to client");
727 }
728 }
729
730 /*
731 * Please use the STL based variant
732 * @param The MessageUnit array as a CORBA datatype
733 * @deprecated Use the vector<util::MessageUnit> variant
734 */
735 void
736 CorbaConnection::publishOneway(const serverIdl::MessageUnitArr& msgUnitArr)
737 {
738 if (log_.call()) log_.call(me(), "publishOneway() ...");
739
740 if (CORBA::is_nil(xmlBlaster_)) {
741 string txt = "no auth.Server, you must login first";
742 throw serverIdl::XmlBlasterException("communication.noConnection",
743 "client", me().c_str(), "en",
744 txt.c_str(), "", "", "", "", "", "");
745 }
746
747 try {
748 xmlBlaster_->publishOneway(msgUnitArr);
749 }
750 catch (const exception& e) {
751 log_.error(me(), string("Exception caught in publishOneway, it is not transferred to client: ") + e.what());
752 }
753 catch(...) {
754 log_.error(me(), "Exception caught in publishOneway, it is not transferred to client");
755 }
756 }
757
758 /**
759 * This method has a common interface which is not CORBA depending.
760 * <br />
761 * Note: You don't need to free anything
762 */
763 vector<std::string>
764 CorbaConnection::erase(const string &xmlKey, const string &qos)
765 {
766 if (log_.call()) log_.call(me(), "erase() ...");
767 if (log_.dump()) {
768 log_.dump(me(), string("erase: the key: ") + xmlKey);
769 log_.dump(me(), string("erase: the qos: ") + qos);
770 }
771
772 if (CORBA::is_nil(xmlBlaster_)) {
773 string txt = "no auth.Server, you must login first";
774 throw serverIdl::XmlBlasterException("communication.noConnection",
775 "client", me().c_str(), "en",
776 txt.c_str(), "", "", "", "", "", "");
777 }
778
779 try {
780 serverIdl::XmlTypeArr_var retArr = xmlBlaster_->erase(toCorbaWString(xmlKey), toCorbaWString(qos));
781 vector<std::string> vecArr;
782 for (CORBA::ULong ii=0; ii<retArr->length(); ii++) {
783 vecArr.push_back(corbaWStringToString(retArr[ii]));
784 //vecArr.push_back(static_cast<const char *>(retArr[ii]));
785 }
786 return vecArr;
787 }
788 catch(serverIdl::XmlBlasterException e) {
789 throw e;
790 }
791 }
792
793
794 /**
795 * Access messages the synchronous way.
796 * <br />
797 * Note: You don't need to free anything
798 * @return The STL MessageUnit vector, its a copy so if you have the variable on the
799 * stack it will free itself
800 */
801 vector<util::MessageUnit>
802 CorbaConnection::get(const string &xmlKey, const string &qos)
803 {
804
805 serverIdl::MessageUnitArr_var units;
806 if (log_.call()) log_.call(me(), "get() ...");
807 if (log_.dump()) {
808 log_.dump(me(), string("get: the key: ") + xmlKey);
809 log_.dump(me(), string("get: the qos: ") + qos);
810 }
811
812 if (CORBA::is_nil(xmlBlaster_)) {
813 string txt = "no auth.Server, you must login first";
814 throw serverIdl::XmlBlasterException("communication.noConnection",
815 "client", me().c_str(), "en",
816 txt.c_str(), "", "", "", "", "", "");
817 }
818
819 try {
820 units = xmlBlaster_->get(toCorbaWString(xmlKey), toCorbaWString(qos));
821 /*
822 string subId = xmlBlaster_->subscribe(xmlKey.c_str(),
823 qos.c_str());
824 log_.info(me(),"New Entry in Cache created (subId="+subId+")");
825 */
826 vector<util::MessageUnit> msgVec;
827 copyFromCorba(msgVec, units);
828 return msgVec;
829 }
830 catch(serverIdl::XmlBlasterException &e) {
831 throw e;
832 }
833 }
834
835 string
836 CorbaConnection::ping(const string &qos)
837 {
838 if (log_.call()) log_.call(me(), "ping(" + qos + ") ...");
839
840 if (CORBA::is_nil(xmlBlaster_)) {
841 string txt = "no auth.Server, you must login first";
842 throw serverIdl::XmlBlasterException("communication.noConnection",
843 "client", me().c_str(), "en",
844 txt.c_str(), "", "", "", "", "", "");
845 }
846
847 try {
848 CORBA::String_var ret = xmlBlaster_->ping("");
849 return static_cast<char *>(ret);
850 }
851 catch(serverIdl::XmlBlasterException &e) {
852 throw e;
853 }
854 }
855
856 /**
857 * Transform a util::MessageUnit to the corba variant
858 */
859 void
860 CorbaConnection::copyToCorba(serverIdl::MessageUnit &dest,
861 const util::MessageUnit &src) const
862 {
863 dest.xmlKey = toCorbaWString(src.getKey().toXml());
864 serverIdl::ContentType content(src.getContentLen(),
865 src.getContentLen(),
866 (CORBA::Octet*)src.getContent(),
867 false); // our src does memory management itself
868 dest.content = content; // dest.content and content point to same memory? memory leak?
869 dest.qos = toCorbaWString(src.getQos().toXml());
870 }
871
872
873 /**
874 * Transform STL vector to corba messageUnit array variant.
875 */
876 void
877 CorbaConnection::copyToCorba(serverIdl::MessageUnitArr_var &units,
878 const vector<util::MessageUnit> &msgVec) const
879 {
880 unsigned int len = msgVec.size();
881 units->length(len);
882 for (CORBA::ULong ii=0; ii<len; ii++) {
883 util::MessageUnit src = msgVec[ii];
884 serverIdl::MessageUnit dest;
885 copyToCorba(dest, src);
886 units[ii] = dest;
887 }
888 }
889
890 /**
891 * Transform corba messageUnit array to vector variant.
892 * @param units Is not const as [] operator does not like it
893 */
894 void
895 CorbaConnection::copyFromCorba(vector<util::MessageUnit> &msgVec,
896 serverIdl::MessageUnitArr_var &units)
897 {
898 unsigned int len = units->length();
899 msgVec.reserve(len);
900 for (CORBA::ULong ii=0; ii<len; ii++) {
901 const serverIdl::MessageUnit &msgUnit = static_cast<const serverIdl::MessageUnit>(units[ii]);
902 unsigned long len = static_cast<unsigned long>(msgUnit.content.length());
903 const unsigned char * blob = static_cast<const unsigned char *>(&msgUnit.content[0]);
904 if (log_.trace()) log_.trace(me(), "copyFromCorba() '" + string((const char *)blob) + "' len=" + lexical_cast<std::string>(len));
905 MsgKeyData key = msgKeyFactory_.readObject(corbaWStringToString(msgUnit.xmlKey));
906 MsgQosData qos = msgQosFactory_.readObject(corbaWStringToString(msgUnit.qos));
907 const util::MessageUnit msg(key, len, blob, qos);
908 msgVec.push_back(msg);
909 }
910 }
911
912 /**
913 * Transform corba messageUnit array to vector variant.
914 void CorbaConnection::copyFromCorba(util::MessageUnit &msgUnitUtil, serverIdl::MessageUnitArr_var &msgUnit) {
915 string key(units[ii].xmlKey);
916 unsigned long len = static_cast<unsigned long>(units[ii].content.length());
917 const unsigned char * blob = static_cast<const unsigned char *>(&units[ii].content[0]);
918 //unsigned char *blob = (unsigned char *)&units[ii].content[0];
919 string qos(units[ii].qos);
920 msgUnitUtil.setKey(key);
921 msgUnitUtil.setContentLen(len);
922 msgUnitUtil.setContent(blob);
923 msgUnitUtil.setQos(qos);
924 }
925 */
926
927 std::string CorbaConnection::usage()
928 {
929 std::string text = string("");
930 //text += string("\n");
931 text += string("\nThe CORBA plugin configuration:");
932 text += string("\n -bootstrapHostname <host>");
933 text += string("\n The host where to find xmlBlaster [localhost]");
934 text += string("\n -bootstrapPort <port>");
935 text += string("\n The bootstrap port where xmlBlaster publishes its IOR [3412]");
936 text += string("\n -dispatch/connection/plugin/ior/iorString <IOR:00...>");
937 text += string("\n The IOR string of the xmlBlaster-authentication server.");
938 text += string("\n -dispatch/connection/plugin/ior/iorFile <file>");
939 text += string("\n A file with the xmlBlaster-authentication server IOR.");
940 text += string("\n -dispatch/connection/plugin/ior/useNameService <true/false>");
941 text += string("\n Try to access xmlBlaster through a naming service [true]");
942 text += string("\n");
943 return text;
944 }
945
946 // CORBA::ORB_ptr CorbaConnection::orb_ = 0;
947 // unsigned short CorbaConnection::numOfSessions_ = 0;
948 // PortableServer::POA_ptr CorbaConnection::poa_ = 0;
949
950 }}}}} // end of namespace
syntax highlighted by Code2HTML, v. 0.9.1