1 /*------------------------------------------------------------------------------
2 Name: CorbaDriver.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: The client driver for the corba protocol
6 ------------------------------------------------------------------------------*/
7
8 #include <client/protocol/corba/CorbaDriverFactory.h>
9 #include <util/ErrorCode.h>
10 #include <util/XmlBlasterException.h>
11 #include <util/Global.h>
12 #include <util/lexical_cast.h>
13
14 namespace org {
15 namespace xmlBlaster {
16 namespace client {
17 namespace protocol {
18 namespace corba {
19
20 using namespace std;
21 using namespace org::xmlBlaster::util;
22 using namespace org::xmlBlaster::util::thread;
23
24 CorbaDriverFactory::CorbaDriverFactory(Global& global, CORBA::ORB_ptr orb)
25 : Thread(),
26 ME("CorbaDriverFactory"),
27 drivers_(),
28 mutex_(),
29 getterMutex_(),
30 orbIsThreadSafe_(ORB_IS_THREAD_SAFE)
31 {
32 org::xmlBlaster::util::I_Log& log = global.getLog("org.xmlBlaster.client.protocol.corba");
33 if (log.call())
34 log.call("CorbaDriver", string("Constructor orbIsThreadSafe_=") + lexical_cast<std::string>(orbIsThreadSafe_));
35 doRun_ = true;
36 isRunning_ = false;
37
38 if (orb) {
39 orb_ = orb;
40 isOwnOrb_ = false;
41 }
42 else {
43 int args = global.getArgs();
44 const char * const* argc = global.getArgc();
45 orb_ = CORBA::ORB_init(args, const_cast<char **>(argc));
46 isOwnOrb_ = true;
47 }
48 }
49
50 CorbaDriverFactory::CorbaDriverFactory(const CorbaDriverFactory& factory)
51 : Thread(),
52 ME(factory.ME),
53 drivers_(),
54 doRun_(true),
55 isRunning_(false),
56 mutex_(),
57 getterMutex_(),
58 orbIsThreadSafe_(factory.orbIsThreadSafe_),
59 orb_(factory.orb_),
60 isOwnOrb_(factory.isOwnOrb_)
61 {
62 throw XmlBlasterException(INTERNAL_NOTIMPLEMENTED, ME, "private copy constructor");
63 }
64
65 CorbaDriverFactory& CorbaDriverFactory::operator =(const CorbaDriverFactory&)
66 {
67 throw XmlBlasterException(INTERNAL_NOTIMPLEMENTED, ME, "private assignement operator");
68 }
69
70 CorbaDriverFactory::~CorbaDriverFactory()
71 {
72 //if (log_.call()) log_.call(ME, "Destructor start");
73 Lock lock(getterMutex_);
74 DriversMap::iterator iter = drivers_.begin();
75 while (iter != drivers_.end()) {
76 delete ((*iter).second).first;
77 iter++;
78 }
79 drivers_.erase(drivers_.begin(), drivers_.end());
80 //if (log_.trace()) log_.trace(ME, "erased all drivers");
81 if (!orbIsThreadSafe_) { // stop the running thread
82 if (isRunning_) {
83 //if (log_.trace()) log_.trace(ME, "stopping the thread which performs orb work");
84 doRun_ = false;
85 join();
86 }
87 }
88 if (isOwnOrb_) {
89 if (!CORBA::is_nil(orb_)) {
90 //if (log_.trace()) log_.trace(ME, "shutting down the orb");
91 orb_->shutdown(true);
92 #if !(defined(_WINDOWS) && defined(XMLBLASTER_TAO))
93 //if (log_.trace()) log_.trace(ME, "destroying the orb");
94 orb_->destroy(); // blocks forever on Windows XP VC7 with TAO 1.3
95 #endif
96 //if (log_.trace()) log_.trace(ME, "releasing the orb");
97 CORBA::release(orb_);
98 }
99 }
100 //if (log_.trace()) log_.trace(ME, "Destructor end");
101 }
102
103 CorbaDriverFactory* CorbaDriverFactory::factory_ = NULL;
104
105 CorbaDriverFactory& CorbaDriverFactory::getFactory(Global& global, CORBA::ORB_ptr orb)
106 {
107 //static CorbaDriverFactory factory(global, orb);
108 //return factory;
109 if(factory_ == NULL)
110 {
111 factory_ = new CorbaDriverFactory(global, orb);
112 org::xmlBlaster::util::Object_Lifetime_Manager::instance()->manage_object("XB_CorbaDriverFactory", factory_); // if not pre-allocated.
113 }
114 return *factory_;
115 }
116
117 CorbaDriver& CorbaDriverFactory::getDriverInstance(Global* global)
118 {
119 std::string instanceName = lexical_cast<std::string>(global);
120 I_Log& log = global->getLog("org.xmlBlaster.client.protocol.corba");
121 if (log.call()) log.call("CorbaDriver", string("getInstance for ") + instanceName);
122 CorbaDriver* driver = NULL;
123 int count = 1;
124 {
125 Lock lock(getterMutex_);
126 DriversMap::iterator iter = drivers_.find(global);
127 if (iter == drivers_.end()) {
128 if (log.trace()) log.trace("CorbaDriver", string("created a new instance for ") + instanceName);
129
130 CORBA::ORB_ptr orb = CORBA::ORB::_duplicate(orb_);
131 driver = new CorbaDriver(*global, mutex_, instanceName, orb);
132 // initially the counter is set to 1
133 drivers_.insert(DriversMap::value_type(global, pair<CorbaDriver*, int>(driver, 1)));
134
135 // In a thread to support single thread orb->performwork to dispatch corba main loop
136 const bool detached = false;
137 if (!isRunning_) start(detached); // if threadSafe isRunning_ will never be set to true
138 }
139 else {
140 driver = ((*iter).second).first;
141 count = ((*iter).second).second++; // this is the counter ...
142 }
143 }
144 if (log.trace())
145 log.trace("CorbaDriver", string("number of instances for '") + instanceName + "' are " + lexical_cast<std::string>(count));
146 return *driver;
147 }
148
149
150 int CorbaDriverFactory::killDriverInstance(Global* global)
151 {
152 std::string instanceName = lexical_cast<std::string>(global);
153 I_Log& log = global->getLog("org.xmlBlaster.client.protocol.corba");
154 log.call(ME, "killDriverInstance");
155 Lock lock(getterMutex_);
156 DriversMap::iterator iter = drivers_.find(global);
157 if (iter == drivers_.end()) return -1;
158 int ret = --(*iter).second.second;
159 if (log.trace()) log.trace(ME, string("instances before deleting ") + lexical_cast<std::string>(ret));
160 if (ret <= 0) {
161 if (log.trace()) log.trace(ME, string("kill instance '") + instanceName + "' will be deleted now");
162 // do remove it since the counter is zero
163 CorbaDriver* driver = (*iter).second.first;
164 drivers_.erase(iter);
165 delete driver;
166 if (drivers_.empty() && isOwnOrb_) {
167 if (!orbIsThreadSafe_) {
168 if (isRunning_) {
169 doRun_ = false;
170 join(); // wait until the run thread has returned ...
171 }
172 }
173 // orb_->shutdown(true);
174 // orb_->destroy();
175 return 0;
176 }
177 }
178 if (log.trace())
179 log.trace("CorbaDriver", string("kill instance '") + instanceName + "' the number of references is " + lexical_cast<std::string>(ret));
180 return ret;
181 }
182
183 bool CorbaDriverFactory::orbRun()
184 {
185 if (orb_ == NULL) return false;
186 orb_->run();
187 return true;
188 }
189
190 void CorbaDriverFactory::run()
191 {
192 //if (log_.trace()) log_.trace(ME, "the corba loop starts now");
193 if (!isOwnOrb_) return;
194
195 if (orbIsThreadSafe_) {
196 orbRun(); // e.g. TAO
197 }
198 else {
199 doRun_ = true; // e.g. MICO
200 if (isRunning_) return;
201 //log_.info(ME, "the corba loop starts now");
202 isRunning_ = true;
203
204 try {
205 while (doRun_) {
206 { // this is for the scope of the lock ...
207 Lock lock(mutex_, orbIsThreadSafe_);
208 //if (log_.trace()) log_.trace(ME, "sweep in running thread");
209 while (orb_->work_pending()) orb_->perform_work();
210 }
211 //if (log_.trace()) log_.trace(ME, "sleeping for 20 millis");
212 sleep(20); // sleep 20 milliseconds
213 //if (log_.trace()) log_.trace(ME, string("awakening, doRun is: ") + lexical_cast<std::string>(doRun_));
214 }
215 }
216 catch(CORBA::Exception &ex) {
217 //log_.warn(ME, string("a corba exception occured in the running thread. It has now been stopped: ") + to_string(ex));
218 std::cerr << ME << " " << string("a corba exception occured in the running thread. It has now been stopped: ") << to_string(ex) << std::endl;
219 }
220 catch (exception &ex) {
221 //log_.warn(ME, string("an exception occured in the running thread. It has now been stopped: ") + ex.what());
222 std::cerr << ME << string("an exception occured in the running thread. It has now been stopped: ") << ex.what() << std::endl;
223 }
224
225 catch (...) {
226 //log_.warn(ME, "an unknown exception occured in the running thread. It has now been stopped");
227 std::cerr << ME << "an unknown exception occured in the running thread. It has now been stopped" << std::endl;
228 }
229
230 //log_.info(ME, "the corba loop has ended now");
231 isRunning_ = false;
232 }
233 }
234
235
236 }}}}} // namespaces
syntax highlighted by Code2HTML, v. 0.9.1