00001 /*----------------------------------------------------------------------------- 00002 Name: Timeout.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Allows you be called back after a given delay. 00006 -----------------------------------------------------------------------------*/ 00007 #include <algorithm> 00008 #include <string> 00009 00010 #include <util/Timeout.h> 00011 #include <util/lexical_cast.h> 00012 #include <util/Constants.h> 00013 #include <util/Global.h> 00014 00015 namespace org { namespace xmlBlaster { namespace util { 00016 00017 using namespace std; 00018 using namespace org::xmlBlaster::util::thread; 00019 00020 Timeout::Timeout(Global& global) 00021 : Thread(), ME("Timeout"), threadName_("Timeout-Thread"), 00022 timeoutMap_(), isRunning_(false), isReady_(false), 00023 mapHasNewEntry_(false), isActive_(true), 00024 isDebug_(false), detached_(false), timestampFactory_(TimestampFactory::getInstance()), 00025 global_(global), log_(global.getLog("org.xmlBlaster.util")), 00026 invocationMutex_(), waitForTimeoutMutex_(), waitForTimeoutCondition_() 00027 { 00028 ME += "-Timeout-Thread-" + lexical_cast<std::string>(this); 00029 // the thread will only be instantiated when starting 00030 if (log_.call()) log_.call(ME, " default constructor"); 00031 if (log_.trace()) log_.trace(ME, " default constructor: after creating timeout condition"); 00032 start(detached_); 00033 if (log_.trace()) log_.trace(ME, " default constructor: after starting the thread"); 00034 } 00035 00036 Timeout::Timeout(Global& global, const string &name) 00037 : Thread(), ME("Timeout"), threadName_(name), 00038 timeoutMap_(), isRunning_(false), isReady_(false), 00039 mapHasNewEntry_(false), isActive_(true), 00040 isDebug_(false), detached_(false), timestampFactory_(TimestampFactory::getInstance()), 00041 global_(global), log_(global.getLog("org.xmlBlaster.util")), 00042 invocationMutex_(), waitForTimeoutMutex_(), waitForTimeoutCondition_() 00043 { 00044 // the thread remains uninitialized ... 00045 ME += "-" + name + "-" + lexical_cast<std::string>(this); 00046 if (log_.call()) log_.call(ME, " alternative constructor"); 00047 start(detached_); 00048 if (log_.trace()) log_.trace(ME, " default constructor: after starting the thread"); 00049 } 00050 00051 Timeout::~Timeout() 00052 { 00053 if (log_.call()) log_.call(ME, " destructor"); 00054 00055 shutdown(); 00056 00057 if (!detached_) 00058 join(); 00059 00060 if (isActive_) { /* Should never happen */ 00061 for (int i=0; i<200; i++) { 00062 if (!isActive_) break; 00063 log_.warn(ME, "Waiting for timer thread to finish"); 00064 //Thread::yield(); 00065 Thread::sleep(10); 00066 } 00067 } 00068 } 00069 00070 00071 bool Timeout::start(bool detached) 00072 { 00073 if (log_.call()) log_.call(ME, " start" + lexical_cast<string>(detached)); 00074 isRunning_ = true; 00075 if (log_.trace()) log_.trace(ME, " before creating the running thread"); 00076 Thread::start(detached); 00077 00078 if (log_.trace()) log_.trace(ME, " start: waiting for the thread to be ready (waiting for the first timeout addition)"); 00079 while (!isReady_) { 00080 Thread::sleep(5); 00081 } 00082 if (log_.trace()) log_.trace(ME, " start: running thread created and ready"); 00083 return true; 00084 } 00085 00086 void Timeout::join() 00087 { 00088 Thread::join(); 00089 if (log_.trace()) log_.trace(ME, " start: running thread joined (i.e. thread started)"); 00090 } 00091 00092 Timestamp Timeout::addTimeoutListener(I_Timeout *listener, long delay, void *userData) 00093 { 00094 if (!isRunning_) 00095 throw org::xmlBlaster::util::XmlBlasterException(USER_WRONG_API_USAGE, "", ME + ".addTimeoutListener", "en", "The timer is not running"); 00096 00097 //if (log_.call()) log_.call(ME, " addTimeoutListener"); 00098 Timestamp key = 0; 00099 if (delay < 1) log_.error(ME, ": addTimeoutListener with delay = " + lexical_cast<std::string>(delay)); 00100 00101 { 00102 Lock lock(invocationMutex_); 00103 while (true) { 00104 key = timestampFactory_.getTimestamp() + Constants::MILLION * delay; 00105 TimeoutMap::iterator iter = timeoutMap_.find(key); 00106 if (iter == timeoutMap_.end()) { 00107 if (log_.trace()) log_.trace(ME, "addTimeoutListener, adding key: " + lexical_cast<std::string>(key)); 00108 Container cont(listener, userData); 00109 TimeoutMap::value_type el(key, cont); 00110 timeoutMap_.insert(el); 00111 mapHasNewEntry_ = true; 00112 break; 00113 } 00114 } 00115 } 00116 00117 if (log_.trace()) log_.trace(ME, "addTimeoutListener, going to notify"); 00118 Lock waitForTimeoutLock(waitForTimeoutMutex_); 00119 waitForTimeoutCondition_.notify(); 00120 //if (log_.trace()) log_.trace(ME, "addTimeoutListener, successfully notified"); 00121 return key; 00122 } 00123 00124 Timestamp Timeout::refreshTimeoutListener(Timestamp key, long delay) 00125 { 00126 if (log_.call()) log_.call(ME, " refreshTimeoutListener"); 00127 00128 if (!isRunning_) 00129 throw org::xmlBlaster::util::XmlBlasterException(USER_WRONG_API_USAGE, "", ME + ".refreshTimeoutListener", "en", "The timer is not running"); 00130 00131 if (key < 0) 00132 throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_ILLEGALARGUMENT, "", ME + ".refreshTimeoutListener", "en", "In Timeout.cpp refreshTimeoutListener() is key < 0"); 00133 00134 I_Timeout *callback = 0; 00135 void *userData = 0; 00136 { 00137 Lock lock(invocationMutex_); 00138 TimeoutMap::iterator iter = timeoutMap_.find(key); 00139 if (iter == timeoutMap_.end()) { 00140 if (log_.trace()) log_.trace(ME, "The timeout handle '" + lexical_cast<std::string>(key) + "' is unknown, no timeout refresh done"); 00141 return -1; 00142 } 00143 callback = (*iter).second.first; 00144 userData = (*iter).second.second; 00145 timeoutMap_.erase(key); 00146 } 00147 return addTimeoutListener(callback, delay, userData); 00148 } 00149 00150 Timestamp Timeout::addOrRefreshTimeoutListener(I_Timeout *listener, long delay, void *userData, Timestamp key) 00151 { 00152 if (log_.call()) log_.call(ME, " addOrRefreshTimeoutListener"); 00153 if (key <= 0) return addTimeoutListener(listener, delay, userData); 00154 key = refreshTimeoutListener(key, delay); 00155 if (key <= 0) return addTimeoutListener(listener, delay, userData); 00156 return key; 00157 } 00158 00159 void Timeout::removeTimeoutListener(Timestamp key) 00160 { 00161 if (log_.call()) log_.call(ME, " removeTimeoutListener"); 00162 Lock lock(invocationMutex_); 00163 timeoutMap_.erase(key); 00164 } 00165 00166 bool Timeout::isExpired(Timestamp key) 00167 { 00168 if (log_.call()) log_.call(ME, " isExpired"); 00169 Lock lock(invocationMutex_); 00170 return (timeoutMap_.find(key) == timeoutMap_.end()); 00171 } 00172 00173 long Timeout::spanToTimeout(Timestamp key) 00174 { 00175 if (log_.call()) log_.call(ME, " spanToTimeout"); 00176 Lock lock(invocationMutex_); 00177 TimeoutMap::iterator iter = timeoutMap_.find(key); 00178 if (iter == timeoutMap_.end()) return -1; 00179 Timestamp currentTimestamp = timestampFactory_.getTimestamp(); 00180 return getTimeout(key) - (long)(currentTimestamp / Constants::MILLION); 00181 } 00182 00183 long Timeout::getTimeout(Timestamp key) 00184 { 00185 if (log_.call()) log_.call(ME, " getTimeout"); 00186 if (key < 0) return -1; 00187 return (long)(key / Constants::MILLION); 00188 } 00189 00190 void Timeout::removeAll() 00191 { 00192 if (log_.call()) log_.call(ME, " removeAll"); 00193 Lock lock(invocationMutex_); 00194 timeoutMap_.clear(); 00195 } 00196 00197 void Timeout::shutdown() 00198 { 00199 if (log_.call()) log_.call(ME, " shutdown"); 00200 isRunning_ = false; 00201 removeAll(); 00202 Lock waitForTimeoutLock(waitForTimeoutMutex_); 00203 waitForTimeoutCondition_.notify(); 00204 } 00205 00206 00207 size_t Timeout::getTimeoutMapSize() 00208 { 00209 Lock lock(invocationMutex_); 00210 return timeoutMap_.size(); 00211 } 00212 00213 00214 void Timeout::run() 00215 { 00216 if (log_.call()) log_.call(ME, " run()"); 00217 isActive_ = true; 00218 00219 Container *container = NULL; 00220 Container tmpContainer; 00221 00222 try { 00223 while (isRunning_) { 00224 00225 if (log_.trace()) log_.trace(ME, " run(): is running"); 00226 Timestamp delay = 100000 * Constants::MILLION; // sleep veeery long 00227 00228 { 00229 Lock lock(invocationMutex_); 00230 00231 TimeoutMap::iterator iter = timeoutMap_.begin(); 00232 if (iter == timeoutMap_.end()) { 00233 if (log_.trace()) log_.trace(ME, "No timer is registered, nothing to do"); 00234 } 00235 else { 00236 if (log_.trace()) log_.trace(ME, " The timeout is not empty"); 00237 Timestamp nextWakeup = (*iter).first; 00238 if (log_.trace()) log_.trace(ME, "run, next event (Timestamp): " + lexical_cast<std::string>(nextWakeup) + " ns"); 00239 delay = nextWakeup - timestampFactory_.getTimestamp(); 00240 00241 if (log_.trace()) log_.trace(ME, "run, delay : " + lexical_cast<std::string>(delay) + " ns"); 00242 if ( delay < 0 ) delay = 0; 00243 00244 if (delay <= 0) { 00245 tmpContainer = (*iter).second; 00246 timeoutMap_.erase((*iter).first); 00247 container = &tmpContainer; 00248 if (log_.trace()) log_.trace(ME, "Timeout occurred, calling listener with real time error of " + lexical_cast<std::string>(delay) + " nanos"); 00249 } 00250 } 00251 mapHasNewEntry_ = false; 00252 } 00253 // must be outside the sync 00254 if (container != NULL) { 00255 (container->first)->timeout(container->second); 00256 container = NULL; 00257 } 00258 Timestamp milliDelay = delay / Constants::MILLION; 00259 if (milliDelay > 0) { 00260 if (log_.trace()) log_.trace(ME, "sleeping ... " + lexical_cast<std::string>(milliDelay) + " milliseconds"); 00261 Lock waitForTimeoutLock(waitForTimeoutMutex_); 00262 if (!mapHasNewEntry_) { 00263 isReady_ = true; 00264 if (!isRunning_) break; 00265 waitForTimeoutCondition_.wait(waitForTimeoutLock, (long)milliDelay); 00266 //if (log_.trace()) log_.trace(ME, "waking up ... "); 00267 } 00268 } 00269 } 00270 if (log_.trace()) log_.trace(ME, "The running thread is exiting"); 00271 } 00272 catch (const std::exception &e) { 00273 log_.error(ME, string("The running thread is exiting: ") + e.what()); 00274 } 00275 catch (...) { 00276 log_.error(ME, "The running thread is exiting with an unknown exception"); 00277 } 00278 isActive_ = false; 00279 } 00280 00281 }}} // namespaces 00282