1 /*------------------------------------------------------------------------------
2 Name: XmlBlasterAccess.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6
7 #include <client/XmlBlasterAccess.h>
8 #include <util/Global.h>
9 #include <util/lexical_cast.h>
10 #include <util/Timestamp.h>
11 #include <util/dispatch/DispatchManager.h>
12 #include <util/parser/ParserFactory.h>
13 #include <util/queue/MsgQueueEntry.h>
14 #include <util/queue/I_Queue.h>
15
16
17 namespace org { namespace xmlBlaster { namespace client {
18
19 using namespace std;
20 using namespace org::xmlBlaster::util;
21 using namespace org::xmlBlaster::util::qos;
22 using namespace org::xmlBlaster::util::dispatch;
23 using namespace org::xmlBlaster::util::queue;
24 using namespace org::xmlBlaster::util::qos::storage;
25 using namespace org::xmlBlaster::util::qos::address;
26 using namespace org::xmlBlaster::authentication;
27 using namespace org::xmlBlaster::client::protocol;
28 using namespace org::xmlBlaster::client::key;
29 using namespace org::xmlBlaster::client::qos;
30
31 XmlBlasterAccess::XmlBlasterAccess(Global& global)
32 : ME(string("XmlBlasterAccess-UNCONNECTED")),
33 global_(global),
34 globalRef_(NULL),
35 log_(global.getLog("org.xmlBlaster.client")),
36 serverNodeId_("xmlBlaster"),
37 connectQos_(new ConnectQos(global)),
38 connectReturnQos_((ConnectReturnQos*)0),
39 subscriptionCallbackMap_(),
40 updateMutex_(),
41 invocationMutex_(global.getProperty().get("xmlBlaster/invocationMutex/recursive", true)),
42 postSendListener_(0)
43 {
44 log_.call(ME, "::constructor");
45 cbServer_ = NULL;
46 updateClient_ = NULL;
47 connection_ = NULL;
48 dispatchManager_ = NULL;
49 connectionProblems_ = NULL;
50 instanceName_ = lexical_cast<std::string>(TimestampFactory::getInstance().getTimestamp());
51
52 // Hack for Windows: Initialize it from main thread, using the callback thread fails undeterminable (with xerces)
53 org::xmlBlaster::util::parser::ParserFactory::getFactory().initialize(global);
54 }
55
56 XmlBlasterAccess::XmlBlasterAccess(GlobalRef globalRef)
57 : ME(string("XmlBlasterAccess-UNCONNECTED")),
58 global_(*globalRef),
59 globalRef_(globalRef),
60 log_(global_.getLog("org.xmlBlaster.client")),
61 serverNodeId_("xmlBlaster"),
62 connectQos_(new ConnectQos(global_)),
63 connectReturnQos_((ConnectReturnQos*)0),
64 subscriptionCallbackMap_(),
65 updateMutex_(),
66 invocationMutex_(globalRef->getProperty().get("xmlBlaster/invocationMutex/recursive", true)),
67 postSendListener_(0)
68 {
69 log_.call(ME, "::constructor");
70 cbServer_ = NULL;
71 updateClient_ = NULL;
72 connection_ = NULL;
73 dispatchManager_ = NULL;
74 connectionProblems_ = NULL;
75 instanceName_ = lexical_cast<std::string>(TimestampFactory::getInstance().getTimestamp());
76
77 // Hack for Windows: Initialize it from main thread, using the callback thread fails undeterminable (with xerces)
78 org::xmlBlaster::util::parser::ParserFactory::getFactory().initialize(global_);
79 }
80
81 XmlBlasterAccess::~XmlBlasterAccess()
82 {
83 if (log_.call()) log_.call(ME, "destructor");
84 cleanup(true);
85 dispatchManager_ = NULL;
86 updateClient_ = NULL;
87 connectionProblems_ = NULL;
88 if (log_.trace()) log_.trace(ME, "destructor ended");
89 }
90
91 void XmlBlasterAccess::cleanup(bool doLock)
92 {
93 if (log_.call()) log_.call(ME, "cleanup");
94 if (doLock) {
95 // synchronization
96 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
97 org::xmlBlaster::util::thread::Lock lock1(updateMutex_);
98 subscriptionCallbackMap_.clear();
99 }
100 else {
101 org::xmlBlaster::util::thread::Lock lock1(updateMutex_);
102 subscriptionCallbackMap_.clear();
103 }
104
105 if (cbServer_) {
106 CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
107 const AddressBaseRef& addr = prop.getCurrentCallbackAddress(); // c++ may not return null
108 global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() );
109 cbServer_ = NULL;
110 }
111 if (connection_) {
112 if (log_.trace()) log_.trace(ME, "destructor: going to delete the connection");
113 connection_->shutdown();
114 delete connection_;
115 connection_ = NULL;
116 }
117 }
118
119
120 ConnectReturnQos XmlBlasterAccess::connect(const ConnectQos& qos, I_Callback *clientCb)
121 {
122 ME = string("XmlBlasterAccess-") + qos.getSessionQos().getAbsoluteName();
123 if (log_.call()) log_.call(ME, "::connect");
124 if (log_.dump()) log_.dump(ME, string("::connect: qos: ") + qos.toXml());
125
126 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
127
128 cleanup(false);
129
130 global_.setId(qos.getSessionQos().getAbsoluteName()); // global_.setId(loginName + currentTimeMillis());
131 connectQos_ = new ConnectQos(qos);
132 connectQos_->setInstanceId(global_.getInstanceId());
133
134 SecurityQos securityQos = connectQos_->getSecurityQos();
135
136 ME = string("XmlBlasterAccess-") + getId();
137 string typeVersion = global_.getProperty().getStringProperty("queue/defaultPlugin", "CACHE,1.0");
138 typeVersion = global_.getProperty().getStringProperty("queue/connection/defaultPlugin", "typeVersion");
139 updateClient_ = clientCb;
140
141 if (updateClient_) createDefaultCbServer();
142
143 if (log_.trace()) log_.trace(ME, string("::connect. CbServer done"));
144 // currently the simple version will do it ...
145 if (!dispatchManager_) dispatchManager_ = &(global_.getDispatchManager());
146
147 if (!connection_) {
148 connection_ = dispatchManager_->getConnectionsHandler(instanceName_);
149 connection_->registerPostSendListener(this);
150 }
151
152 if (connectionProblems_) {
153 if (log_.trace()) log_.trace(ME, "::connect. Registering initFailsafe");
154 connection_->initFailsafe(connectionProblems_);
155 connectionProblems_ = NULL;
156 }
157 if (log_.trace()) log_.trace(ME, string("::connect. connectQos: ") + connectQos_->toXml());
158 // do connect() now:
159 connectReturnQos_ = connection_->connect(connectQos_);
160
161 ME = string("XmlBlasterAccess-") + connectReturnQos_->getSessionQos().getAbsoluteName();
162
163 setServerNodeId(connectReturnQos_->getSessionQos().getClusterNodeId());
164
165 // Is done in ConnectionsHandler.cpp
166 //global_.setId(connectReturnQos_->getSessionQos().getAbsoluteName());
167
168 return *connectReturnQos_;
169 }
170
171 org::xmlBlaster::util::Global& XmlBlasterAccess::getGlobal()
172 {
173 return this->global_;
174 }
175
176 org::xmlBlaster::util::queue::I_Queue* XmlBlasterAccess::getQueue()
177 {
178 if (connection_) {
179 return connection_->getQueue();
180 }
181 return 0;
182 }
183
184
185 org::xmlBlaster::client::I_Callback* XmlBlasterAccess::getCallback()
186 {
187 return this->updateClient_;
188 }
189
190 void XmlBlasterAccess::setCallbackDispatcherActive(bool isActive)
191 {
192 string command = getSessionName() + "/?dispatcherActive=" + lexical_cast<string>(isActive);
193 sendAdministrativeCommand(command);
194 connectQos_->getCbAddress()->setDispatcherActive(isActive);
195 }
196
197 string XmlBlasterAccess::sendAdministrativeCommand(const string &command, PublishQos *publishQosP)
198 {
199 bool isGet = command.find("get ") == 0 || command.find("GET ") == 0;
200 bool isSet = command.find("set ") == 0 || command.find("SET ") == 0;
201 const string cmd = ((isGet || isSet)) ? command.substr(4) : command;
202
203 if (publishQosP ||(isSet || (!isGet && cmd.find("=") != string::npos)) ) {
204 string oid = string("__cmd:") + cmd;
205 PublishKey key(global_, oid); // oid="__cmd:/client/joe/1/?dispatcherActive=false"
206 PublishQos qos(global_);
207 MessageUnit msgUnit(key, "", ( publishQosP ) ? *publishQosP : qos );
208 try {
209 PublishReturnQos ret = publish(msgUnit);
210 if (log_.trace()) log_.trace(ME, "Send '" + cmd + " '");
211 return ret.getState();
212 }
213 catch (XmlBlasterException &e) {
214 if (log_.trace()) log_.trace(ME, "Sending of '" + cmd + " ' failed: " + e.getMessage());
215 throw e;
216 }
217 }
218 else {
219 string oid = string("__cmd:") + cmd;
220 GetKey getKey(global_);
221 getKey.setOid(oid);
222 GetQos getQos(global_);
223 try {
224 vector<MessageUnit> msgVec = get(getKey, getQos);
225 if (log_.trace()) log_.trace(ME, "Send '" + cmd + " ', got array of size " + lexical_cast<string>(msgVec.size()));
226 if (msgVec.size() == 0)
227 return "";
228 return msgVec[0].getContentStr();
229 }
230 catch (XmlBlasterException &e) {
231 if (log_.trace()) log_.trace(ME, "Sending of '" + cmd + " ' failed: " + e.getMessage());
232 throw e;
233 }
234 }
235 }
236
237
238 void XmlBlasterAccess::createDefaultCbServer()
239 {
240 log_.call(ME, "::createDefaultCbServer");
241
242 CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
243 const AddressBaseRef &addr = prop.getCurrentCallbackAddress();
244
245 if(!cbServer_)
246 cbServer_ = initCbServer(getLoginName(), addr->getType(), addr->getVersion());
247
248 addr->setAddress(cbServer_->getCbAddress());
249 addr->setType(cbServer_->getCbProtocol());
250 // !!!!! prop.setCallbackAddress(addr);
251 connectQos_->setSessionCbQueueProperty(prop);
252 if (log_.trace()) log_.trace(ME, string("::createDefaultCbServer: connectQos: ") + connectQos_->toXml());
253 log_.info(ME, "Callback settings: " + prop.getSettings());
254 }
255
256 I_CallbackServer*
257 XmlBlasterAccess::initCbServer(const string& loginName, const string& type, const string& version)
258 {
259 if (log_.call()) log_.call(ME, string("::initCbServer: loginName='") + loginName + "' type='" + type + "' version='" + version +"'");
260 if (log_.trace()) log_.trace(ME, string("Using 'client.cbProtocol=") + type + string("' to be used by ") + getServerNodeId() + string(", trying to create the callback server ..."));
261 I_CallbackServer* server = &(global_.getCbServerPluginManager().getPlugin(instanceName_, type, version));
262 if (log_.trace()) log_.trace(ME, "After callback plugin creation");
263 server->initialize(loginName, *this);
264 if (log_.trace()) log_.trace(ME, "After callback plugin initialize");
265 return server;
266 }
267
268 org::xmlBlaster::util::dispatch::I_PostSendListener* XmlBlasterAccess::registerPostSendListener(org::xmlBlaster::util::dispatch::I_PostSendListener *listener) {
269 I_PostSendListener* old = this->postSendListener_;
270 this->postSendListener_ = listener;
271 //if (connection_)
272 // return connection_->registerPostSendListener(this);
273 return old;
274 }
275
276 // I_PostSendListener
277 void XmlBlasterAccess::postSend(const std::vector<EntryType> &entries)
278 {
279 I_PostSendListener* l = this->postSendListener_;
280 if (l)
281 l->postSend(entries);
282 }
283
284 // I_PostSendListener
285 bool XmlBlasterAccess::sendingFailed(const std::vector<EntryType> &entries, const XmlBlasterException &exception)
286 {
287 I_PostSendListener* l = this->postSendListener_;
288 if (l)
289 return l->sendingFailed(entries, exception);
290 return false;
291 }
292
293 org::xmlBlaster::client::protocol::I_ProgressListener* XmlBlasterAccess::registerProgressListener(org::xmlBlaster::client::protocol::I_ProgressListener *listener)
294 {
295 return (this->cbServer_) ? this->cbServer_->registerProgressListener(listener) : 0;
296 }
297
298 org::xmlBlaster::util::qos::ConnectQosRef XmlBlasterAccess::getConnectQos() {
299 return connectQos_;
300 }
301
302 //org::xmlBlaster::util::qos::ConnectReturnQosRef XmlBlasterAccess::getConnectReturnQos() {
303 //}
304
305 void
306 XmlBlasterAccess::initSecuritySettings(const string& /*secMechanism*/, const string& /*secVersion*/)
307 {
308 log_.error(ME, "initSecuritySettings not implemented yet");
309 }
310
311 void XmlBlasterAccess::leaveServer(const StringMap &/*map*/)
312 {
313 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
314 if (!isConnected()) {
315 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::leaveServer", "You are not connected to the xmlBlaster");
316 }
317
318 if (cbServer_) {
319 if (log_.trace()) log_.trace(ME, "destructor: going to delete the callback connection");
320 cbServer_->shutdownCb();
321 }
322
323 if (connection_) {
324 if (log_.trace()) log_.trace(ME, "destructor: going to delete the connection");
325 connection_->shutdown();
326 }
327
328 if (cbServer_) {
329 CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
330 AddressBaseRef addr = prop.getCurrentCallbackAddress(); // c++ may not return null
331 global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() );
332 cbServer_ = NULL;
333 }
334
335 if (connection_) {
336 delete connection_;
337 connection_ = NULL;
338 }
339 log_.info(ME, "leaveServer() done");
340 }
341
342
343 bool
344 XmlBlasterAccess::disconnect(const DisconnectQos& qos, bool flush, bool shutdown, bool shutdownCb)
345 {
346 // locking until finished
347 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
348 bool ret1 = true;
349 bool ret3 = true;
350 if (log_.call()) {
351 log_.call(ME, string("disconnect called with flush='") + Global::getBoolAsString(flush) +
352 "' shutdown='" + Global::getBoolAsString(shutdown) +
353 "' shutdownCb='" + Global::getBoolAsString(shutdownCb) + "'");
354 }
355
356 if (log_.trace()) log_.trace(ME, "disconnecting the client connection");
357 if (log_.dump()) log_.dump(ME, string("disconnect: the qos is:\n") + qos.toXml());
358 if (connection_ != NULL) {
359 ret1 = connection_->disconnect(qos);
360 if (shutdown) connection_->shutdown();
361 }
362 else {
363 ret1 = false;
364 }
365 if (shutdownCb) {
366 if (cbServer_) {
367 ret3 = cbServer_->shutdownCb();
368
369 CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
370 const AddressBaseRef &addr = prop.getCurrentCallbackAddress();
371 global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() );
372 cbServer_ = NULL;
373 }
374 else ret3 = false;
375 }
376 return ret1 && ret3;
377 }
378
379 string XmlBlasterAccess::getId()
380 {
381 return getSessionName();
382 }
383
384 SessionNameRef XmlBlasterAccess::getSessionNameRef()
385 {
386 if (!connectReturnQos_.isNull()) return connectReturnQos_->getSessionQos().getSessionName();
387 return connectQos_->getSessionQos().getSessionName();
388 }
389
390 string XmlBlasterAccess::getSessionName()
391 {
392 string ret;
393 if (!connectReturnQos_.isNull()) ret = connectReturnQos_->getSessionQos().getAbsoluteName();
394 if (ret == "") ret = connectQos_->getSessionQos().getAbsoluteName();
395 return ret;
396 }
397
398 string XmlBlasterAccess::getLoginName()
399 {
400 try {
401 string nm = connectQos_->getSecurityQos().getUserId();
402 if (nm != "") return nm;
403 }
404 catch (XmlBlasterException e) {
405 log_.warn(ME, e.toString());
406 }
407 return string("client?");
408 }
409
410 void XmlBlasterAccess::setServerNodeId(const string& nodeId)
411 {
412 serverNodeId_ = nodeId;
413 }
414
415 string XmlBlasterAccess::getServerNodeId() const
416 {
417 return serverNodeId_;
418 }
419
420 /*
421 MsgQueueEntry
422 XmlBlasterAccess::queueMessage(const MsgQueueEntry& entry)
423 {
424 return entry;
425 }
426
427 vector<MsgQueueEntry*>
428 XmlBlasterAccess::queueMessage(const vector<MsgQueueEntry*>& entries)
429 {
430 return entries;
431 }
432 */
433
434 SubscribeReturnQos XmlBlasterAccess::subscribe(const SubscribeKey& key, const SubscribeQos& qos, I_Callback *callback)
435 {
436 // locking until finished
437 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
438
439 if (!isConnected()) {
440 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::subscribe", "you are not connected to the xmlBlaster");
441 }
442 if (log_.call()) log_.call(ME, "subscribe");
443 if (log_.dump()) {
444 log_.dump(ME, string("subscribe. The key:\n") + key.toXml());
445 log_.dump(ME, string("subscribe. The Qos:\n") + qos.toXml());
446 }
447
448 SessionNameRef sessionName = getSessionNameRef();
449 if (sessionName->getPubSessionId() > 0 &&
450 qos.getMultiSubscribe()==false &&
451 !qos.hasSubscriptionId()) {
452 // For failsave clients we generate on client side the subscriptionId
453 // In case of offline/clientSideQueued operation we guarantee like this a not changing
454 // subscriptionId and the client code can reliably use the subscriptionId for further dispatching
455 // of update() messages.
456 SubscribeQos& q = const_cast<SubscribeQos&>(qos);
457 q.generateSubscriptionId(sessionName, key);
458 if (log_.trace()) log_.trace(ME, "subscribe: generated client side subscriptionId=" + q.getData().getSubscriptionId());
459 }
460
461 if (callback != 0) { // using a subscribe specific callback?
462 if (log_.trace()) log_.trace(ME, "subscribe: inserting individual callback in callback map");
463 org::xmlBlaster::util::thread::Lock lockUpdate(updateMutex_);
464 SubscribeReturnQos retQos = connection_->subscribe(key, qos);
465 std::string subId = retQos.getSubscriptionId();
466 subscriptionCallbackMap_.insert(std::map<std::string, I_Callback*>::value_type(subId, callback));
467 return retQos;
468 }
469 else {
470 if (log_.trace()) log_.trace(ME, "subscribe: no specific callback");
471 return connection_->subscribe(key, qos);
472 }
473 }
474
475 vector<MessageUnit> XmlBlasterAccess::get(const GetKey& key, const GetQos& qos)
476 {
477 // locking until finished
478 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
479 if (!isConnected()) {
480 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::get", "you are not connected to the xmlBlaster");
481 }
482 if (log_.call()) log_.call(ME, "get");
483 if (log_.dump()) {
484 log_.dump(ME, string("get. The key:\n") + key.toXml());
485 log_.dump(ME, string("get. The Qos:\n") + qos.toXml());
486 }
487 return connection_->get(key, qos);
488 }
489
490 vector<MessageUnit> XmlBlasterAccess::receive(string oid, int maxEntries, long timeout, bool consumable) {
491 if (!isConnected()) {
492 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::receive", "you are not connected to the xmlBlaster");
493 }
494 if (log_.call()) log_.call(ME, "receive");
495
496 //topic/hello to access a history queue,
497 //client/joe to access a subject queue or
498 //client/joe/session/1
499 if (oid.find("topic") != string::npos)
500 oid = "__cmd:"+oid+"/?historyQueueEntries"; // "__cmd:topic/hello/?historyQueueEntries"
501 else if (oid.find("session") != string::npos)
502 oid = "__cmd:"+oid+"/?callbackQueueEntries"; // "__cmd:client/joe/session/1/?callbackQueueEntries";
503 else if (oid.find("subject") != string::npos)
504 oid = "__cmd:"+oid+"/?subjectQueueEntries"; // "__cmd:client/joe/?subjectQueueEntries"
505 else
506 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::receive", "Can't parse '" + oid + "'");
507
508 GetKey getKey(global_, oid);
509 QueryQosData data(global_);
510 data.setQueryQos(maxEntries, timeout, consumable);
511 GetQos getQos(global_, data);
512 vector<MessageUnit> msgs = get(getKey, getQos);
513 if (log_.trace()) log_.trace(ME, string("receive - got '") + lexical_cast<std::string>(msgs.size()) + "'");
514 return msgs;
515 }
516
517 vector<MessageUnit> XmlBlasterAccess::request(MessageUnit &msgUnit, long timeout, int maxEntries) {
518 if (log_.call()) log_.call(ME, "request");
519
520 // Create a temporary reply topic ...
521 long destroyDelay = timeout+86400000; // on client crash, cleanup after one day; //long destroyDelay = -1;
522 string tempTopicOid = createTemporaryTopic(destroyDelay, maxEntries);
523
524 try {
525 // Send the request ...
526 // "__jms:JMSReplyTo"
527 org::xmlBlaster::util::qos::QosData &qos = const_cast<org::xmlBlaster::util::qos::QosData&>(msgUnit.getQos());
528 qos.addClientProperty(string(Constants::JMS_REPLY_TO), tempTopicOid);
529 publish(msgUnit);
530 // Access the reply ...
531 vector<MessageUnit> msgs = receive("topic/"+tempTopicOid, maxEntries, timeout, true);
532 { // Clean up temporary topic ...
533 EraseKey ek(global_, tempTopicOid);
534 EraseQos eq(global_);
535 eq.setForceDestroy(true);
536 erase(ek, eq);
537 }
538 return msgs;
539 }
540 catch (exception &ex) {
541 { // Clean up temporary topic ...
542 EraseKey ek(global_, tempTopicOid);
543 EraseQos eq(global_);
544 eq.setForceDestroy(true);
545 erase(ek, eq);
546 }
547 throw ex;
548 }
549 }
550
551 std::string XmlBlasterAccess::createTemporaryTopic(long destroyDelay, int historyMaxMsg) {
552 PublishKey pk(global_);
553 PublishQos pq(global_);
554 TopicProperty topicProperty(global_);
555 topicProperty.setDestroyDelay(destroyDelay);
556 topicProperty.setCreateDomEntry(false);
557 topicProperty.setReadonly(false);
558 pq.setAdministrative(true);
559 if (historyMaxMsg >= 0L) {
560 HistoryQueueProperty prop(global_, "");
561 prop.setMaxEntries(historyMaxMsg);
562 topicProperty.setHistoryQueueProperty(prop);
563 }
564 pq.setTopicProperty(topicProperty);
565 MessageUnit msgUnit(pk, "", pq);
566 PublishReturnQos prq = publish(msgUnit);
567 if (log_.call()) log_.call(ME, string("Created temporary topic ") + prq.getKeyOid());
568 return prq.getKeyOid();
569 }
570
571
572 vector<UnSubscribeReturnQos>
573 XmlBlasterAccess::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
574 {
575 // locking until finished
576 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
577 if (!isConnected()) {
578 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::unsSubscribe", "you are not connected to the xmlBlaster");
579 }
580 if (log_.call()) log_.call(ME, "unSubscribe");
581 if (log_.dump()) {
582 log_.dump(ME, string("unSubscribe. The key:\n") + key.toXml());
583 log_.dump(ME, string("unSubscribe. The Qos:\n") + qos.toXml());
584 }
585 // synchronization
586 org::xmlBlaster::util::thread::Lock lock1(updateMutex_);
587 vector<UnSubscribeReturnQos> ret = connection_->unSubscribe(key, qos);
588 vector<UnSubscribeReturnQos>::iterator iter = ret.begin();
589 while (iter != ret.end()) {
590 if (log_.trace()) log_.trace(ME, std::string("unSubscribe: removing callback for '") + (*iter).getSubscriptionId() + "'");
591 subscriptionCallbackMap_.erase((*iter).getSubscriptionId());
592 iter++;
593 }
594 return ret;
595 }
596
597 PublishReturnQos XmlBlasterAccess::publish(const MessageUnit& msgUnit)
598 {
599 // locking until finished
600 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
601 if (!isConnected()) {
602 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publish", "you are not connected to the xmlBlaster");
603 }
604 if (log_.call()) log_.call(ME, "publish");
605 if (log_.dump()) {
606 log_.dump(ME, string("publish. The msgUnit:\n") + msgUnit.toXml());
607 }
608 return connection_->publish(msgUnit);
609 }
610
611 void XmlBlasterAccess::publishOneway(const vector<MessageUnit>& msgUnitArr)
612 {
613 // locking until finished
614 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
615 if (!isConnected()) {
616 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publishOneway", "you are not connected to the xmlBlaster");
617 }
618 if (log_.call()) log_.call(ME, "publishOneway");
619 if (log_.dump()) {
620 for (vector<MessageUnit>::size_type i=0; i < msgUnitArr.size(); i++) {
621 log_.dump(ME, string("publishOneway. The msgUnit[") + lexical_cast<std::string>(i) + "]:\n" + msgUnitArr[i].toXml());
622 }
623 }
624 connection_->publishOneway(msgUnitArr);
625 }
626
627 vector<PublishReturnQos> XmlBlasterAccess::publishArr(const vector<MessageUnit> &msgUnitArr)
628 {
629 // locking until finished
630 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
631 if (!isConnected()) {
632 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publishArr", "you are not connected to the xmlBlaster");
633 }
634 if (log_.call()) log_.call(ME, "publishArr");
635 if (log_.dump()) {
636 for (vector<MessageUnit>::size_type i=0; i < msgUnitArr.size(); i++) {
637 log_.dump(ME, string("publishArr. The msgUnit[") + lexical_cast<std::string>(i) + "]:\n" + msgUnitArr[i].toXml());
638 }
639 }
640 return connection_->publishArr(msgUnitArr);
641 }
642
643 vector<EraseReturnQos> XmlBlasterAccess::erase(const EraseKey& key, const EraseQos& qos)
644 {
645 // locking until finished
646 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
647 if (!isConnected()) {
648 throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::erase", "you are not connected to the xmlBlaster");
649 }
650 if (log_.call()) log_.call(ME, "erase");
651 if (log_.dump()) {
652 log_.dump(ME, string("erase. The key:\n") + key.toXml());
653 log_.dump(ME, string("erase. The Qos:\n") + qos.toXml());
654 }
655 return connection_->erase(key, qos);
656 }
657
658 string
659 XmlBlasterAccess::update(const string &sessionId, UpdateKey &updateKey, const unsigned char *content, long contentSize, UpdateQos &updateQos)
660 {
661 if (log_.call()) log_.call(ME, "::update");
662 if (log_.trace()) log_.trace(ME, string("update. The sessionId is '") + sessionId + "'");
663 if (log_.dump()) {
664 log_.dump(ME, string("update. The key:\n") + updateKey.toXml());
665 log_.dump(ME, string("update. The Qos:\n") + updateQos.toXml());
666 }
667
668 if (!subscriptionCallbackMap_.empty()) {
669 // This is synchronized but you must ensure the callback is still in scope when the update method is
670 // invoked. This could be more robust with a reference counted I_Callback.
671 I_Callback* subscriptionCallback = 0;
672 {
673 org::xmlBlaster::util::thread::Lock lock(updateMutex_);
674 CallbackMapType::iterator iter = subscriptionCallbackMap_.end();
675 iter = subscriptionCallbackMap_.find(updateQos.getSubscriptionId());
676 if (iter != subscriptionCallbackMap_.end()) subscriptionCallback = (*iter).second;
677 }
678
679 if (subscriptionCallback != 0) {
680 if (log_.trace()) log_.trace(ME, std::string("update: invoking specific subscription callback"));
681 return subscriptionCallback->update(sessionId, updateKey, content, contentSize, updateQos);
682 }
683 }
684
685 if (updateClient_)
686 return updateClient_->update(sessionId, updateKey, content, contentSize, updateQos);
687 else {
688 // See similar behavior in XmlBlasterAccess.java
689 log_.error(ME, string("Ignoring unexpected update message as client has not registered a callback: ") + updateKey.toXml() + "" + updateQos.toXml());
690 }
691
692 return Constants::RET_OK; // "<qos><state id='OK'/></qos>";
693 }
694
695 std::string XmlBlasterAccess::usage()
696 {
697 string text = string("\n");
698 text += string("Choose a connection protocol:\n");
699 text += string(" -protocol Specify a protocol to talk with xmlBlaster, choose 'SOCKET' or 'IOR' depending on your compilation.\n");
700 text += string(" Current setting is '") + Global::getInstance().getProperty().getStringProperty("protocol", Global::getDefaultProtocol());
701 text += string("\n\n");
702 text += string("Security features:\n");
703 text += string(" -Security.Client.DefaultPlugin \"gui,1.0\"\n");
704 text += string(" Force the given authentication schema, here the GUI is enforced\n");
705 text += string(" Clients can overwrite this with ConnectQos.java\n");
706
707 return text; // std::cout << text << std::endl;
708 }
709
710 void XmlBlasterAccess::initFailsafe(I_ConnectionProblems* connectionProblems)
711 {
712 if (connection_) connection_->initFailsafe(connectionProblems);
713 else connectionProblems_ = connectionProblems;
714 }
715
716 string XmlBlasterAccess::ping()
717 {
718 // locking until finished
719 org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
720 return connection_->ping("<qos/>");
721 }
722
723 long XmlBlasterAccess::flushQueue()
724 {
725 if (!connection_) {
726 throw XmlBlasterException(INTERNAL_NULLPOINTER, ME + "::flushQueue", "no connection exists when trying to flush the queue: try to connect to xmlBlaster first");
727 }
728 return connection_->flushQueue();
729 }
730
731
732 bool XmlBlasterAccess::isConnected() const
733 {
734 if (!connection_) return false;
735 return connection_->isConnected();
736 }
737
738 bool XmlBlasterAccess::isAlive() const
739 {
740 if (!connection_) return false;
741 return connection_->isAlive();
742 }
743
744 bool XmlBlasterAccess::isPolling() const
745 {
746 if (!connection_) return false;
747 return connection_->isPolling();
748 }
749
750 bool XmlBlasterAccess::isDead() const
751 {
752 if (!connection_) return false;
753 return connection_->isDead();
754 }
755
756
757 std::string XmlBlasterAccess::getStatusString() const
758 {
759 if (!connection_) return "DEAD";
760 return connection_->getStatusString();
761 }
762
763
764 }}} // namespaces
765
766
767 #ifdef _XMLBLASTER_CLASSTEST
768
769 #include <util/Timestamp.h>
770 #include <util/thread/ThreadImpl.h>
771
772 using namespace std;
773 using namespace org::xmlBlaster::client;
774 using namespace org::xmlBlaster::util::thread;
775
776 int main(int args, char* argv[])
777 {
778 // Init the XML platform
779 try
780 {
781 Global& glob = Global::getInstance();
782 glob.initialize(args, argv);
783 Log& log = glob.getLog("org.xmlBlaster.client");
784
785 XmlBlasterAccess xmlBlasterAccess(glob);
786 ConnectQos connectQos(glob);
787
788 log.info("main", string("the connect qos is: ") + connectQos.toXml());
789
790 ConnectReturnQosRef retQos = xmlBlasterAccess.connect(connectQos, NULL);
791 log.info("", "Successfully connect to xmlBlaster");
792
793 if (log.trace()) log.trace("main", "Subscribing using XPath syntax ...");
794 SubscribeKey subKey(glob,"//test","XPATH");
795 log.info("main", string("subscribe key: ") + subKey.toXml());
796 SubscribeQos subQos(glob);
797 log.info("main", string("subscribe qos: ") + subQos.toXml());
798 try {
799 SubscribeReturnQos subReturnQos = xmlBlasterAccess.subscribe(subKey, subQos);
800 log.info("main", string("Success: Subscribe return qos=") +
801 subReturnQos.toXml() + " done");
802 }
803 catch (XmlBlasterException &ex) {
804 log.error("main", ex.toXml());
805 }
806
807 PublishKey pubKey(glob);
808 pubKey.setOid("HelloWorld");
809 pubKey.setClientTags("<test></test>");
810 PublishQos pubQos(glob);
811 MessageUnit msgUnit(pubKey, string("Hi"), pubQos);
812
813 PublishReturnQos pubRetQos = xmlBlasterAccess.publish(msgUnit);
814 log.info("main", string("successfully published, publish return qos: ") + pubRetQos.toXml());
815
816 log.info("", "Successfully published a message to xmlBlaster");
817 log.info("", "Sleeping");
818 Timestamp delay = 10000000000ll; // 10 seconds
819 Thread::sleep(delay);
820 }
821 catch (XmlBlasterException &ex) {
822 std::cout << ex.toXml() << std::endl;
823 }
824 return 0;
825 }
826
827 #endif
syntax highlighted by Code2HTML, v. 0.9.1