1 /*-----------------------------------------------------------------------------
  2 Name:      Timeout.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Allows you be called back after a given delay.
  6 -----------------------------------------------------------------------------*/
  7 #include <algorithm>
  8 #include <string>
  9 
 10 #include <util/Timeout.hpp>
 11 #include <util/lexical_cast.h>
 12 #include <util/Constants.h>
 13 #include <util/Global.h>
 14 
 15 namespace org { namespace xmlBlaster { namespace util {
 16 
 17 using namespace std;
 18 using namespace org::xmlBlaster::util::thread;
 19 
 20 Timeout::Timeout(Global& global)
 21    : Thread(), ME("Timeout"), threadName_("Timeout-Thread"),
 22      timeoutMap_(), isRunning_(false), isReady_(false),
 23      mapHasNewEntry_(false), isActive_(true),
 24      isDebug_(false), detached_(false), timestampFactory_(TimestampFactory::getInstance()),
 25      global_(global), log_(global.getLog("org.xmlBlaster.util")),
 26      invocationMutex_(), waitForTimeoutMutex_(), waitForTimeoutCondition_()
 27 {
 28    ME += "-Timeout-Thread-" + lexical_cast<std::string>(this);
 29    // the thread will only be instantiated when starting
 30    if (log_.call()) log_.call(ME, " default constructor");
 31    if (log_.trace()) log_.trace(ME, " default constructor: after creating timeout condition");
 32    start(detached_);
 33    if (log_.trace()) log_.trace(ME, " default constructor: after starting the thread");
 34 }
 35 
 36 Timeout::Timeout(Global& global, const string &name)
 37    : Thread(), ME("Timeout"), threadName_(name),
 38      timeoutMap_(), isRunning_(false), isReady_(false),
 39      mapHasNewEntry_(false), isActive_(true),
 40      isDebug_(false), detached_(false), timestampFactory_(TimestampFactory::getInstance()),
 41      global_(global), log_(global.getLog("org.xmlBlaster.util")),
 42      invocationMutex_(), waitForTimeoutMutex_(), waitForTimeoutCondition_()
 43 {
 44    // the thread remains uninitialized ...
 45    ME += "-" + name + "-" + lexical_cast<std::string>(this);
 46    if (log_.call()) log_.call(ME, " alternative constructor");
 47    start(detached_);
 48    if (log_.trace()) log_.trace(ME, " default constructor: after starting the thread");
 49 }
 50 
 51 Timeout::~Timeout() 
 52 {
 53    if (log_.call()) log_.call(ME, " destructor");
 54 
 55    shutdown();
 56 
 57    if (!detached_)
 58       join();
 59 
 60    if (isActive_) { /* Should never happen */
 61       for (int i=0; i<200; i++) {
 62          if (!isActive_) break;
 63          log_.warn(ME, "Waiting for timer thread to finish");
 64          //Thread::yield();
 65          Thread::sleep(10);
 66       }
 67    }
 68 }
 69 
 70 
 71 bool Timeout::start(bool detached) 
 72 {
 73    if (log_.call()) log_.call(ME, " start" + lexical_cast<string>(detached));
 74    isRunning_ = true;
 75    if (log_.trace()) log_.trace(ME, " before creating the running thread");
 76    Thread::start(detached);
 77 
 78    if (log_.trace()) log_.trace(ME, " start: waiting for the thread to be ready (waiting for the first timeout addition)");
 79    while (!isReady_) {
 80       Thread::sleep(5);
 81    }
 82    if (log_.trace()) log_.trace(ME, " start: running thread created and ready");
 83    return true;
 84 }
 85 
 86 void Timeout::join() 
 87 {
 88    Thread::join();
 89    if (log_.trace()) log_.trace(ME, " start: running thread joined (i.e. thread started)");
 90 }
 91 
 92 Timestamp Timeout::addTimeoutListener(I_Timeout *listener, long delay, void *userData) 
 93 {
 94    if (!isRunning_) 
 95       throw org::xmlBlaster::util::XmlBlasterException(USER_WRONG_API_USAGE, "", ME + ".addTimeoutListener", "en", "The timer is not running");
 96 
 97    //if (log_.call()) log_.call(ME, " addTimeoutListener");
 98    Timestamp key = 0;
 99    if (delay < 1) log_.error(ME, ": addTimeoutListener with delay = " + lexical_cast<std::string>(delay));
100 
101    {
102       Lock lock(invocationMutex_);
103       while (true) {
104          key = timestampFactory_.getTimestamp() + Constants::MILLION * delay;
105          TimeoutMap::iterator iter = timeoutMap_.find(key);
106          if (iter == timeoutMap_.end()) {
107             if (log_.trace()) log_.trace(ME, "addTimeoutListener, adding key: " + lexical_cast<std::string>(key));
108             Container cont(listener, userData);
109             TimeoutMap::value_type el(key, cont);
110             timeoutMap_.insert(el);
111             mapHasNewEntry_ = true;
112             break;
113          }
114       }
115    }
116 
117    if (log_.trace()) log_.trace(ME, "addTimeoutListener, going to notify");
118    Lock waitForTimeoutLock(waitForTimeoutMutex_);
119    waitForTimeoutCondition_.notify();
120    //if (log_.trace()) log_.trace(ME, "addTimeoutListener, successfully notified");
121    return key;
122 }
123 
124 Timestamp Timeout::refreshTimeoutListener(Timestamp key, long delay) 
125 {
126    if (log_.call()) log_.call(ME, " refreshTimeoutListener");
127 
128    if (!isRunning_) 
129       throw org::xmlBlaster::util::XmlBlasterException(USER_WRONG_API_USAGE, "", ME + ".refreshTimeoutListener", "en", "The timer is not running");
130 
131    if (key < 0)
132       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_ILLEGALARGUMENT, "", ME + ".refreshTimeoutListener", "en", "In Timeout.cpp refreshTimeoutListener() is key < 0");
133 
134    I_Timeout *callback = 0;
135    void *userData = 0;
136    {
137       Lock lock(invocationMutex_);
138       TimeoutMap::iterator iter = timeoutMap_.find(key);
139       if (iter == timeoutMap_.end()) {
140          if (log_.trace()) log_.trace(ME, "The timeout handle '" + lexical_cast<std::string>(key) + "' is unknown, no timeout refresh done");
141          return -1;
142       }
143       callback = (*iter).second.first;
144       userData = (*iter).second.second;
145       timeoutMap_.erase(key);
146    }
147    return addTimeoutListener(callback, delay, userData);
148 }
149 
150 Timestamp Timeout::addOrRefreshTimeoutListener(I_Timeout *listener, long delay, void *userData, Timestamp key) 
151 {
152    if (log_.call()) log_.call(ME, " addOrRefreshTimeoutListener");
153    if (key <= 0) return addTimeoutListener(listener, delay, userData);
154    key = refreshTimeoutListener(key, delay);
155    if (key <= 0) return addTimeoutListener(listener, delay, userData);
156    return key;
157 }
158 
159 void Timeout::removeTimeoutListener(Timestamp key) 
160 {
161    if (log_.call()) log_.call(ME, " removeTimeoutListener");
162    Lock lock(invocationMutex_);
163    timeoutMap_.erase(key);
164 }
165 
166 bool Timeout::isExpired(Timestamp key) 
167 {
168    if (log_.call()) log_.call(ME, " isExpired");
169    Lock lock(invocationMutex_);
170    return (timeoutMap_.find(key) == timeoutMap_.end());
171 }
172 
173 long Timeout::spanToTimeout(Timestamp key) 
174 {
175    if (log_.call()) log_.call(ME, " spanToTimeout");
176    Lock lock(invocationMutex_);
177    TimeoutMap::iterator iter = timeoutMap_.find(key);
178    if (iter == timeoutMap_.end()) return -1;
179    Timestamp currentTimestamp = timestampFactory_.getTimestamp();
180    return getTimeout(key) - (long)(currentTimestamp / Constants::MILLION);
181 }
182 
183 long Timeout::getTimeout(Timestamp key) 
184 {
185    if (log_.call()) log_.call(ME, " getTimeout");
186    if (key < 0) return -1;
187    return (long)(key / Constants::MILLION);
188 }
189 
190 void Timeout::removeAll() 
191 {
192    if (log_.call()) log_.call(ME, " removeAll");
193    Lock lock(invocationMutex_);
194    timeoutMap_.clear();
195 }
196 
197 void Timeout::shutdown() 
198 {
199    if (log_.call()) log_.call(ME, " shutdown");
200    isRunning_ = false;
201    removeAll();
202    Lock waitForTimeoutLock(waitForTimeoutMutex_);
203    waitForTimeoutCondition_.notify();
204 }
205 
206 
207 size_t Timeout::getTimeoutMapSize()
208 {
209    Lock lock(invocationMutex_);
210    return timeoutMap_.size();
211 }
212 
213 
214 void Timeout::run()
215 {
216    if (log_.call()) log_.call(ME, " run()");
217    isActive_ = true;
218 
219    Container *container = NULL;
220    Container tmpContainer;
221 
222    try {
223       while (isRunning_) {
224 
225          if (log_.trace()) log_.trace(ME, " run(): is running");
226          Timestamp delay = 100000 * Constants::MILLION; // sleep veeery long
227 
228          {
229             Lock lock(invocationMutex_);
230 
231             TimeoutMap::iterator iter = timeoutMap_.begin();
232             if (iter == timeoutMap_.end()) {
233                if (log_.trace()) log_.trace(ME, "No timer is registered, nothing to do");
234             }
235             else {
236                if (log_.trace()) log_.trace(ME, " The timeout is not empty");
237                Timestamp nextWakeup = (*iter).first;
238                if (log_.trace()) log_.trace(ME, "run, next event (Timestamp): " + lexical_cast<std::string>(nextWakeup) + " ns");
239                delay = nextWakeup - timestampFactory_.getTimestamp();
240 
241                if (log_.trace()) log_.trace(ME, "run, delay       : " + lexical_cast<std::string>(delay) + " ns");
242                if ( delay < 0 ) delay = 0;
243 
244                if (delay <= 0) {
245                   tmpContainer = (*iter).second;
246                   timeoutMap_.erase((*iter).first);
247                   container = &tmpContainer;
248                   if (log_.trace()) log_.trace(ME, "Timeout occurred, calling listener with real time error of " + lexical_cast<std::string>(delay) + " nanos");
249                }
250             }
251             mapHasNewEntry_ = false;
252          }
253          // must be outside the sync
254          if (container != NULL) {
255              (container->first)->timeout(container->second);
256              container = NULL;
257          }
258          Timestamp milliDelay = delay / Constants::MILLION;
259          if (milliDelay > 0) {
260             if (log_.trace()) log_.trace(ME, "sleeping ... " + lexical_cast<std::string>(milliDelay) + " milliseconds");
261             Lock waitForTimeoutLock(waitForTimeoutMutex_);
262             if (!mapHasNewEntry_) {
263                isReady_ = true;
264                if (!isRunning_) break;
265                waitForTimeoutCondition_.wait(waitForTimeoutLock, (long)milliDelay);
266                //if (log_.trace()) log_.trace(ME, "waking up ... ");
267             }
268          }
269       }
270       if (log_.trace()) log_.trace(ME, "The running thread is exiting");
271    }
272    catch (const std::exception &e) {
273       log_.error(ME, string("The running thread is exiting: ") + e.what());
274    }
275    catch (...) {
276       log_.error(ME, "The running thread is exiting with an unknown exception");
277    }
278    isActive_ = false;
279 }
280 
281 }}} // namespaces


syntax highlighted by Code2HTML, v. 0.9.1