1 /*-----------------------------------------------------------------------------
2 Name: Timeout.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Allows you be called back after a given delay.
6 -----------------------------------------------------------------------------*/
7 #include <algorithm>
8 #include <string>
9
10 #include <util/Timeout.hpp>
11 #include <util/lexical_cast.h>
12 #include <util/Constants.h>
13 #include <util/Global.h>
14
15 namespace org { namespace xmlBlaster { namespace util {
16
17 using namespace std;
18 using namespace org::xmlBlaster::util::thread;
19
20 Timeout::Timeout(Global& global)
21 : Thread(), ME("Timeout"), threadName_("Timeout-Thread"),
22 timeoutMap_(), isRunning_(false), isReady_(false),
23 mapHasNewEntry_(false), isActive_(true),
24 isDebug_(false), detached_(false), timestampFactory_(TimestampFactory::getInstance()),
25 global_(global), log_(global.getLog("org.xmlBlaster.util")),
26 invocationMutex_(), waitForTimeoutMutex_(), waitForTimeoutCondition_()
27 {
28 ME += "-Timeout-Thread-" + lexical_cast<std::string>(this);
29 // the thread will only be instantiated when starting
30 if (log_.call()) log_.call(ME, " default constructor");
31 if (log_.trace()) log_.trace(ME, " default constructor: after creating timeout condition");
32 start(detached_);
33 if (log_.trace()) log_.trace(ME, " default constructor: after starting the thread");
34 }
35
36 Timeout::Timeout(Global& global, const string &name)
37 : Thread(), ME("Timeout"), threadName_(name),
38 timeoutMap_(), isRunning_(false), isReady_(false),
39 mapHasNewEntry_(false), isActive_(true),
40 isDebug_(false), detached_(false), timestampFactory_(TimestampFactory::getInstance()),
41 global_(global), log_(global.getLog("org.xmlBlaster.util")),
42 invocationMutex_(), waitForTimeoutMutex_(), waitForTimeoutCondition_()
43 {
44 // the thread remains uninitialized ...
45 ME += "-" + name + "-" + lexical_cast<std::string>(this);
46 if (log_.call()) log_.call(ME, " alternative constructor");
47 start(detached_);
48 if (log_.trace()) log_.trace(ME, " default constructor: after starting the thread");
49 }
50
51 Timeout::~Timeout()
52 {
53 if (log_.call()) log_.call(ME, " destructor");
54
55 shutdown();
56
57 if (!detached_)
58 join();
59
60 if (isActive_) { /* Should never happen */
61 for (int i=0; i<200; i++) {
62 if (!isActive_) break;
63 log_.warn(ME, "Waiting for timer thread to finish");
64 //Thread::yield();
65 Thread::sleep(10);
66 }
67 }
68 }
69
70
71 bool Timeout::start(bool detached)
72 {
73 if (log_.call()) log_.call(ME, " start" + lexical_cast<string>(detached));
74 isRunning_ = true;
75 if (log_.trace()) log_.trace(ME, " before creating the running thread");
76 Thread::start(detached);
77
78 if (log_.trace()) log_.trace(ME, " start: waiting for the thread to be ready (waiting for the first timeout addition)");
79 while (!isReady_) {
80 Thread::sleep(5);
81 }
82 if (log_.trace()) log_.trace(ME, " start: running thread created and ready");
83 return true;
84 }
85
86 void Timeout::join()
87 {
88 Thread::join();
89 if (log_.trace()) log_.trace(ME, " start: running thread joined (i.e. thread started)");
90 }
91
92 Timestamp Timeout::addTimeoutListener(I_Timeout *listener, long delay, void *userData)
93 {
94 if (!isRunning_)
95 throw org::xmlBlaster::util::XmlBlasterException(USER_WRONG_API_USAGE, "", ME + ".addTimeoutListener", "en", "The timer is not running");
96
97 //if (log_.call()) log_.call(ME, " addTimeoutListener");
98 Timestamp key = 0;
99 if (delay < 1) log_.error(ME, ": addTimeoutListener with delay = " + lexical_cast<std::string>(delay));
100
101 {
102 Lock lock(invocationMutex_);
103 while (true) {
104 key = timestampFactory_.getTimestamp() + Constants::MILLION * delay;
105 TimeoutMap::iterator iter = timeoutMap_.find(key);
106 if (iter == timeoutMap_.end()) {
107 if (log_.trace()) log_.trace(ME, "addTimeoutListener, adding key: " + lexical_cast<std::string>(key));
108 Container cont(listener, userData);
109 TimeoutMap::value_type el(key, cont);
110 timeoutMap_.insert(el);
111 mapHasNewEntry_ = true;
112 break;
113 }
114 }
115 }
116
117 if (log_.trace()) log_.trace(ME, "addTimeoutListener, going to notify");
118 Lock waitForTimeoutLock(waitForTimeoutMutex_);
119 waitForTimeoutCondition_.notify();
120 //if (log_.trace()) log_.trace(ME, "addTimeoutListener, successfully notified");
121 return key;
122 }
123
124 Timestamp Timeout::refreshTimeoutListener(Timestamp key, long delay)
125 {
126 if (log_.call()) log_.call(ME, " refreshTimeoutListener");
127
128 if (!isRunning_)
129 throw org::xmlBlaster::util::XmlBlasterException(USER_WRONG_API_USAGE, "", ME + ".refreshTimeoutListener", "en", "The timer is not running");
130
131 if (key < 0)
132 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_ILLEGALARGUMENT, "", ME + ".refreshTimeoutListener", "en", "In Timeout.cpp refreshTimeoutListener() is key < 0");
133
134 I_Timeout *callback = 0;
135 void *userData = 0;
136 {
137 Lock lock(invocationMutex_);
138 TimeoutMap::iterator iter = timeoutMap_.find(key);
139 if (iter == timeoutMap_.end()) {
140 if (log_.trace()) log_.trace(ME, "The timeout handle '" + lexical_cast<std::string>(key) + "' is unknown, no timeout refresh done");
141 return -1;
142 }
143 callback = (*iter).second.first;
144 userData = (*iter).second.second;
145 timeoutMap_.erase(key);
146 }
147 return addTimeoutListener(callback, delay, userData);
148 }
149
150 Timestamp Timeout::addOrRefreshTimeoutListener(I_Timeout *listener, long delay, void *userData, Timestamp key)
151 {
152 if (log_.call()) log_.call(ME, " addOrRefreshTimeoutListener");
153 if (key <= 0) return addTimeoutListener(listener, delay, userData);
154 key = refreshTimeoutListener(key, delay);
155 if (key <= 0) return addTimeoutListener(listener, delay, userData);
156 return key;
157 }
158
159 void Timeout::removeTimeoutListener(Timestamp key)
160 {
161 if (log_.call()) log_.call(ME, " removeTimeoutListener");
162 Lock lock(invocationMutex_);
163 timeoutMap_.erase(key);
164 }
165
166 bool Timeout::isExpired(Timestamp key)
167 {
168 if (log_.call()) log_.call(ME, " isExpired");
169 Lock lock(invocationMutex_);
170 return (timeoutMap_.find(key) == timeoutMap_.end());
171 }
172
173 long Timeout::spanToTimeout(Timestamp key)
174 {
175 if (log_.call()) log_.call(ME, " spanToTimeout");
176 Lock lock(invocationMutex_);
177 TimeoutMap::iterator iter = timeoutMap_.find(key);
178 if (iter == timeoutMap_.end()) return -1;
179 Timestamp currentTimestamp = timestampFactory_.getTimestamp();
180 return getTimeout(key) - (long)(currentTimestamp / Constants::MILLION);
181 }
182
183 long Timeout::getTimeout(Timestamp key)
184 {
185 if (log_.call()) log_.call(ME, " getTimeout");
186 if (key < 0) return -1;
187 return (long)(key / Constants::MILLION);
188 }
189
190 void Timeout::removeAll()
191 {
192 if (log_.call()) log_.call(ME, " removeAll");
193 Lock lock(invocationMutex_);
194 timeoutMap_.clear();
195 }
196
197 void Timeout::shutdown()
198 {
199 if (log_.call()) log_.call(ME, " shutdown");
200 isRunning_ = false;
201 removeAll();
202 Lock waitForTimeoutLock(waitForTimeoutMutex_);
203 waitForTimeoutCondition_.notify();
204 }
205
206
207 size_t Timeout::getTimeoutMapSize()
208 {
209 Lock lock(invocationMutex_);
210 return timeoutMap_.size();
211 }
212
213
214 void Timeout::run()
215 {
216 if (log_.call()) log_.call(ME, " run()");
217 isActive_ = true;
218
219 Container *container = NULL;
220 Container tmpContainer;
221
222 try {
223 while (isRunning_) {
224
225 if (log_.trace()) log_.trace(ME, " run(): is running");
226 Timestamp delay = 100000 * Constants::MILLION; // sleep veeery long
227
228 {
229 Lock lock(invocationMutex_);
230
231 TimeoutMap::iterator iter = timeoutMap_.begin();
232 if (iter == timeoutMap_.end()) {
233 if (log_.trace()) log_.trace(ME, "No timer is registered, nothing to do");
234 }
235 else {
236 if (log_.trace()) log_.trace(ME, " The timeout is not empty");
237 Timestamp nextWakeup = (*iter).first;
238 if (log_.trace()) log_.trace(ME, "run, next event (Timestamp): " + lexical_cast<std::string>(nextWakeup) + " ns");
239 delay = nextWakeup - timestampFactory_.getTimestamp();
240
241 if (log_.trace()) log_.trace(ME, "run, delay : " + lexical_cast<std::string>(delay) + " ns");
242 if ( delay < 0 ) delay = 0;
243
244 if (delay <= 0) {
245 tmpContainer = (*iter).second;
246 timeoutMap_.erase((*iter).first);
247 container = &tmpContainer;
248 if (log_.trace()) log_.trace(ME, "Timeout occurred, calling listener with real time error of " + lexical_cast<std::string>(delay) + " nanos");
249 }
250 }
251 mapHasNewEntry_ = false;
252 }
253 // must be outside the sync
254 if (container != NULL) {
255 (container->first)->timeout(container->second);
256 container = NULL;
257 }
258 Timestamp milliDelay = delay / Constants::MILLION;
259 if (milliDelay > 0) {
260 if (log_.trace()) log_.trace(ME, "sleeping ... " + lexical_cast<std::string>(milliDelay) + " milliseconds");
261 Lock waitForTimeoutLock(waitForTimeoutMutex_);
262 if (!mapHasNewEntry_) {
263 isReady_ = true;
264 if (!isRunning_) break;
265 waitForTimeoutCondition_.wait(waitForTimeoutLock, (long)milliDelay);
266 //if (log_.trace()) log_.trace(ME, "waking up ... ");
267 }
268 }
269 }
270 if (log_.trace()) log_.trace(ME, "The running thread is exiting");
271 }
272 catch (const std::exception &e) {
273 log_.error(ME, string("The running thread is exiting: ") + e.what());
274 }
275 catch (...) {
276 log_.error(ME, "The running thread is exiting with an unknown exception");
277 }
278 isActive_ = false;
279 }
280
281 }}} // namespaces
syntax highlighted by Code2HTML, v. 0.9.1