1 /*------------------------------------------------------------------------------
  2 Name:      RamQueuePlugin.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 
  7 #include <util/queue/RamQueuePlugin.h>
  8 #include <util/XmlBlasterException.h>
  9 #include <util/Global.h>
 10 
 11 using namespace std;
 12 using namespace org::xmlBlaster::util;
 13 using namespace org::xmlBlaster::util::thread;
 14 using namespace org::xmlBlaster::util::qos::storage;
 15 
 16 namespace org { namespace xmlBlaster { namespace util { namespace queue {
 17 
 18 RamQueuePlugin::RamQueuePlugin(Global& global, const ClientQueueProperty& property)
 19    : ME("RamQueuePlugin"), 
 20      global_(global), 
 21      log_(global.getLog("org.xmlBlaster.util.queue")), 
 22      property_(property), 
 23      storage_(), 
 24      accessMutex_()
 25 {
 26    numOfBytes_ = 0;
 27    log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "]");
 28 }
 29 
 30 RamQueuePlugin::RamQueuePlugin(const RamQueuePlugin& queue)
 31    : ME("RamQueuePlugin"), 
 32      global_(queue.global_), 
 33      log_(queue.log_), 
 34      property_(queue.property_), 
 35      storage_(queue.storage_), 
 36      accessMutex_()
 37 {
 38    numOfBytes_ = queue.numOfBytes_;
 39 }
 40 
 41 RamQueuePlugin& RamQueuePlugin::operator =(const RamQueuePlugin& queue)
 42 {
 43    Lock lock(queue.accessMutex_);
 44    property_   = queue.property_;
 45    storage_    = queue.storage_;
 46    numOfBytes_ = queue.numOfBytes_;
 47    return *this;
 48 
 49 }
 50 
 51 RamQueuePlugin::~RamQueuePlugin()
 52 {
 53    if (log_.call()) log_.call(ME, "destructor");
 54    if (!storage_.empty()) {
 55       Lock lock(accessMutex_);
 56       storage_.erase(storage_.begin(), storage_.end());
 57    }
 58 } 
 59 
 60 void RamQueuePlugin::put(const MsgQueueEntry &entry)
 61 {
 62    if (log_.call()) log_.call(ME, "::put");
 63    if (log_.dump()) log_.dump(ME, string("::put, the entry is: ")  + entry.toXml());
 64 
 65    Lock lock(accessMutex_);
 66    if (numOfBytes_+entry.getSizeInBytes() > ((size_t)property_.getMaxBytes()) ) {
 67       throw XmlBlasterException(RESOURCE_OVERFLOW_QUEUE_BYTES, ME + "::put", "client queue");
 68    }
 69 
 70    if (storage_.size() >= (size_t)property_.getMaxEntries() ) {
 71       throw XmlBlasterException(RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME + "::put", "client queue");
 72    }
 73    try {
 74       const EntryType help(*entry.getClone());
 75       storage_.insert(help);
 76       numOfBytes_ += entry.getSizeInBytes();
 77       // add the sizeInBytes_ here ...
 78    }
 79    catch (exception& ex) {
 80       throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::put", ex.what());
 81    }      
 82    catch (...) {
 83       throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::put", "the original type of this exception is unknown");
 84    }
 85 }
 86 
 87 const vector<EntryType> RamQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const
 88 {
 89    Lock lock(accessMutex_);
 90    vector<EntryType> ret;
 91    if (storage_.empty()) return ret;
 92    StorageType::const_iterator iter = storage_.begin();
 93    long numOfEntries = 0;
 94    long numOfBytes = 0;
 95    int referencePriority = (**iter).getPriority();
 96    while (iter != storage_.end()) {
 97       numOfBytes += (**iter).getSizeInBytes();
 98       numOfEntries++;
 99       if (numOfBytes > maxNumOfBytes && maxNumOfBytes > -1) break;
100       if (numOfEntries > maxNumOfEntries && maxNumOfEntries > -1) break;
101       if ((**iter).getPriority() != referencePriority ) break;
102       EntryType entry = (*iter);
103       ret.insert(ret.end(), entry); 
104       iter++;
105    }
106    return ret;
107 }
108 
109 
110 long RamQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) 
111 {
112    Lock lock(accessMutex_);
113    if (start == end || storage_.empty()) return 0;
114    vector<EntryType>::const_iterator iter = start;
115    long count = 0;
116    while (iter != end) {
117       long entrySize = (*iter)->getSizeInBytes();
118       if (storage_.empty()) return 0;
119       string::size_type help = storage_.erase(*iter);
120       if (help > 0) {
121          count += help;
122          numOfBytes_ -= help * entrySize;
123       }
124       iter++;
125    }
126    return count;
127 }
128 
129 long RamQueuePlugin::getNumOfEntries() const
130 {
131    return storage_.size();
132 }
133 
134 long RamQueuePlugin::getMaxNumOfEntries() const
135 {
136    return property_.getMaxEntries();
137 }
138 
139 int64_t RamQueuePlugin::getNumOfBytes() const
140 {
141    return numOfBytes_;
142 }
143 
144 int64_t RamQueuePlugin::getMaxNumOfBytes() const
145 {
146    return property_.getMaxBytes();
147 }
148 
149 void RamQueuePlugin::clear()
150 {
151    Lock lock(accessMutex_);
152    storage_.erase(storage_.begin(), storage_.end());
153    numOfBytes_ = 0;
154 }
155 
156 
157 bool RamQueuePlugin::empty() const
158 {
159    return storage_.empty();
160 }
161 
162 
163 }}}} // namespace


syntax highlighted by Code2HTML, v. 0.9.1