1 /*------------------------------------------------------------------------------
2 Name: RamQueuePlugin.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6
7 #include <util/queue/RamQueuePlugin.h>
8 #include <util/XmlBlasterException.h>
9 #include <util/Global.h>
10
11 using namespace std;
12 using namespace org::xmlBlaster::util;
13 using namespace org::xmlBlaster::util::thread;
14 using namespace org::xmlBlaster::util::qos::storage;
15
16 namespace org { namespace xmlBlaster { namespace util { namespace queue {
17
18 RamQueuePlugin::RamQueuePlugin(Global& global, const ClientQueueProperty& property)
19 : ME("RamQueuePlugin"),
20 global_(global),
21 log_(global.getLog("org.xmlBlaster.util.queue")),
22 property_(property),
23 storage_(),
24 accessMutex_()
25 {
26 numOfBytes_ = 0;
27 log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "]");
28 }
29
30 RamQueuePlugin::RamQueuePlugin(const RamQueuePlugin& queue)
31 : ME("RamQueuePlugin"),
32 global_(queue.global_),
33 log_(queue.log_),
34 property_(queue.property_),
35 storage_(queue.storage_),
36 accessMutex_()
37 {
38 numOfBytes_ = queue.numOfBytes_;
39 }
40
41 RamQueuePlugin& RamQueuePlugin::operator =(const RamQueuePlugin& queue)
42 {
43 Lock lock(queue.accessMutex_);
44 property_ = queue.property_;
45 storage_ = queue.storage_;
46 numOfBytes_ = queue.numOfBytes_;
47 return *this;
48
49 }
50
51 RamQueuePlugin::~RamQueuePlugin()
52 {
53 if (log_.call()) log_.call(ME, "destructor");
54 if (!storage_.empty()) {
55 Lock lock(accessMutex_);
56 storage_.erase(storage_.begin(), storage_.end());
57 }
58 }
59
60 void RamQueuePlugin::put(const MsgQueueEntry &entry)
61 {
62 if (log_.call()) log_.call(ME, "::put");
63 if (log_.dump()) log_.dump(ME, string("::put, the entry is: ") + entry.toXml());
64
65 Lock lock(accessMutex_);
66 if (numOfBytes_+entry.getSizeInBytes() > ((size_t)property_.getMaxBytes()) ) {
67 throw XmlBlasterException(RESOURCE_OVERFLOW_QUEUE_BYTES, ME + "::put", "client queue");
68 }
69
70 if (storage_.size() >= (size_t)property_.getMaxEntries() ) {
71 throw XmlBlasterException(RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME + "::put", "client queue");
72 }
73 try {
74 const EntryType help(*entry.getClone());
75 storage_.insert(help);
76 numOfBytes_ += entry.getSizeInBytes();
77 // add the sizeInBytes_ here ...
78 }
79 catch (exception& ex) {
80 throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::put", ex.what());
81 }
82 catch (...) {
83 throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::put", "the original type of this exception is unknown");
84 }
85 }
86
87 const vector<EntryType> RamQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const
88 {
89 Lock lock(accessMutex_);
90 vector<EntryType> ret;
91 if (storage_.empty()) return ret;
92 StorageType::const_iterator iter = storage_.begin();
93 long numOfEntries = 0;
94 long numOfBytes = 0;
95 int referencePriority = (**iter).getPriority();
96 while (iter != storage_.end()) {
97 numOfBytes += (**iter).getSizeInBytes();
98 numOfEntries++;
99 if (numOfBytes > maxNumOfBytes && maxNumOfBytes > -1) break;
100 if (numOfEntries > maxNumOfEntries && maxNumOfEntries > -1) break;
101 if ((**iter).getPriority() != referencePriority ) break;
102 EntryType entry = (*iter);
103 ret.insert(ret.end(), entry);
104 iter++;
105 }
106 return ret;
107 }
108
109
110 long RamQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end)
111 {
112 Lock lock(accessMutex_);
113 if (start == end || storage_.empty()) return 0;
114 vector<EntryType>::const_iterator iter = start;
115 long count = 0;
116 while (iter != end) {
117 long entrySize = (*iter)->getSizeInBytes();
118 if (storage_.empty()) return 0;
119 string::size_type help = storage_.erase(*iter);
120 if (help > 0) {
121 count += help;
122 numOfBytes_ -= help * entrySize;
123 }
124 iter++;
125 }
126 return count;
127 }
128
129 long RamQueuePlugin::getNumOfEntries() const
130 {
131 return storage_.size();
132 }
133
134 long RamQueuePlugin::getMaxNumOfEntries() const
135 {
136 return property_.getMaxEntries();
137 }
138
139 int64_t RamQueuePlugin::getNumOfBytes() const
140 {
141 return numOfBytes_;
142 }
143
144 int64_t RamQueuePlugin::getMaxNumOfBytes() const
145 {
146 return property_.getMaxBytes();
147 }
148
149 void RamQueuePlugin::clear()
150 {
151 Lock lock(accessMutex_);
152 storage_.erase(storage_.begin(), storage_.end());
153 numOfBytes_ = 0;
154 }
155
156
157 bool RamQueuePlugin::empty() const
158 {
159 return storage_.empty();
160 }
161
162
163 }}}} // namespace
syntax highlighted by Code2HTML, v. 0.9.1