00001 /*------------------------------------------------------------------------------ 00002 Name: RamQueuePlugin.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 ------------------------------------------------------------------------------*/ 00006 00007 #include <util/queue/RamQueuePlugin.h> 00008 #include <util/XmlBlasterException.h> 00009 #include <util/Global.h> 00010 00011 using namespace std; 00012 using namespace org::xmlBlaster::util; 00013 using namespace org::xmlBlaster::util::thread; 00014 using namespace org::xmlBlaster::util::qos::storage; 00015 00016 namespace org { namespace xmlBlaster { namespace util { namespace queue { 00017 00018 RamQueuePlugin::RamQueuePlugin(Global& global, const ClientQueueProperty& property) 00019 : ME("RamQueuePlugin"), 00020 global_(global), 00021 log_(global.getLog("org.xmlBlaster.util.queue")), 00022 property_(property), 00023 storage_(), 00024 accessMutex_() 00025 { 00026 numOfBytes_ = 0; 00027 log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "]"); 00028 } 00029 00030 RamQueuePlugin::RamQueuePlugin(const RamQueuePlugin& queue) 00031 : ME("RamQueuePlugin"), 00032 global_(queue.global_), 00033 log_(queue.log_), 00034 property_(queue.property_), 00035 storage_(queue.storage_), 00036 accessMutex_() 00037 { 00038 numOfBytes_ = queue.numOfBytes_; 00039 } 00040 00041 RamQueuePlugin& RamQueuePlugin::operator =(const RamQueuePlugin& queue) 00042 { 00043 Lock lock(queue.accessMutex_); 00044 property_ = queue.property_; 00045 storage_ = queue.storage_; 00046 numOfBytes_ = queue.numOfBytes_; 00047 return *this; 00048 00049 } 00050 00051 RamQueuePlugin::~RamQueuePlugin() 00052 { 00053 if (log_.call()) log_.call(ME, "destructor"); 00054 if (!storage_.empty()) { 00055 Lock lock(accessMutex_); 00056 storage_.erase(storage_.begin(), storage_.end()); 00057 } 00058 } 00059 00060 void RamQueuePlugin::put(const MsgQueueEntry &entry) 00061 { 00062 if (log_.call()) log_.call(ME, "::put"); 00063 if (log_.dump()) log_.dump(ME, string("::put, the entry is: ") + entry.toXml()); 00064 00065 Lock lock(accessMutex_); 00066 if (numOfBytes_+entry.getSizeInBytes() > ((size_t)property_.getMaxBytes()) ) { 00067 throw XmlBlasterException(RESOURCE_OVERFLOW_QUEUE_BYTES, ME + "::put", "client queue"); 00068 } 00069 00070 if (storage_.size() >= (size_t)property_.getMaxEntries() ) { 00071 throw XmlBlasterException(RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME + "::put", "client queue"); 00072 } 00073 try { 00074 const EntryType help(*entry.getClone()); 00075 storage_.insert(help); 00076 numOfBytes_ += entry.getSizeInBytes(); 00077 // add the sizeInBytes_ here ... 00078 } 00079 catch (exception& ex) { 00080 throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::put", ex.what()); 00081 } 00082 catch (...) { 00083 throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::put", "the original type of this exception is unknown"); 00084 } 00085 } 00086 00087 const vector<EntryType> RamQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const 00088 { 00089 Lock lock(accessMutex_); 00090 vector<EntryType> ret; 00091 if (storage_.empty()) return ret; 00092 StorageType::const_iterator iter = storage_.begin(); 00093 long numOfEntries = 0; 00094 long numOfBytes = 0; 00095 int referencePriority = (**iter).getPriority(); 00096 while (iter != storage_.end()) { 00097 numOfBytes += (**iter).getSizeInBytes(); 00098 numOfEntries++; 00099 if (numOfBytes > maxNumOfBytes && maxNumOfBytes > -1) break; 00100 if (numOfEntries > maxNumOfEntries && maxNumOfEntries > -1) break; 00101 if ((**iter).getPriority() != referencePriority ) break; 00102 EntryType entry = (*iter); 00103 ret.insert(ret.end(), entry); 00104 iter++; 00105 } 00106 return ret; 00107 } 00108 00109 00110 long RamQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) 00111 { 00112 Lock lock(accessMutex_); 00113 if (start == end || storage_.empty()) return 0; 00114 vector<EntryType>::const_iterator iter = start; 00115 long count = 0; 00116 while (iter != end) { 00117 long entrySize = (*iter)->getSizeInBytes(); 00118 if (storage_.empty()) return 0; 00119 string::size_type help = storage_.erase(*iter); 00120 if (help > 0) { 00121 count += help; 00122 numOfBytes_ -= help * entrySize; 00123 } 00124 iter++; 00125 } 00126 return count; 00127 } 00128 00129 long RamQueuePlugin::getNumOfEntries() const 00130 { 00131 return storage_.size(); 00132 } 00133 00134 long RamQueuePlugin::getMaxNumOfEntries() const 00135 { 00136 return property_.getMaxEntries(); 00137 } 00138 00139 int64_t RamQueuePlugin::getNumOfBytes() const 00140 { 00141 return numOfBytes_; 00142 } 00143 00144 int64_t RamQueuePlugin::getMaxNumOfBytes() const 00145 { 00146 return property_.getMaxBytes(); 00147 } 00148 00149 void RamQueuePlugin::clear() 00150 { 00151 Lock lock(accessMutex_); 00152 storage_.erase(storage_.begin(), storage_.end()); 00153 numOfBytes_ = 0; 00154 } 00155 00156 00157 bool RamQueuePlugin::empty() const 00158 { 00159 return storage_.empty(); 00160 } 00161 00162 00163 }}}} // namespace 00164 00165 00166