1 /*------------------------------------------------------------------------------
2 Name: CacheQueuePlugin.h
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 #ifndef _UTIL_QUEUE_CACHEQUEUE_H
7 #define _UTIL_QUEUE_CACHEQUEUE_H
8
9 #include <util/xmlBlasterDef.h>
10 #include <util/ReferenceHolder.h>
11 #include <util/queue/I_Queue.h>
12 #include <util/queue/MsgQueueEntry.h>
13 #include <util/thread/ThreadImpl.h>
14 #include <util/I_Log.h>
15 #include <set>
16 #include <functional>
17
18 namespace org { namespace xmlBlaster { namespace util { namespace queue {
19
20 typedef std::set<EntryType, std::greater<EntryType> > StorageType;
21
22 /**
23 * This class implements a very simple cache around the RAM and SQLite queue.
24 * Note there is no swapping support for transient or persistent messages
25 * all transient messages are hold in RAM, and all persistent messages are
26 * duplicated to harddisk.
27 * When time permits we will add swapping support similar to the Java CACHE
28 * implementation.
29 * <br />
30 * If you have mainly persistent messages and need to take care on
31 * your RAM consumption with many messages in queue consider to use
32 * the "SQLite" persistent queue directly (without any RAM or CACHE)
33 * with the option <code>-connection/queue/type SQLite</code> instead of the default
34 * <code>-connection/queue/type CACHE</code>.
35 * <br />
36 * On the other hand if you use only transient message consider using the RAM queue directly
37 * with the option <code>-connection/queue/type RAM</code> instead of the default
38 * <code>-connection/queue/type CACHE</code>.
39 * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a>
40 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/client.cpp.queue.html">The client.cpp.queue requirement</a>
41 */
42 class Dll_Export CacheQueuePlugin : public I_Queue
43 {
44 private:
45 CacheQueuePlugin(const CacheQueuePlugin& queue);
46 CacheQueuePlugin& operator =(const CacheQueuePlugin& queue);
47
48 /**
49 * Shuffle messages from persistent store to transient store.
50 * On startup we can only load the highest priority in a bulk
51 * NOTE: This must be called from inside a synchronization lock.
52 * @return Number of entries loaded
53 */
54 long reloadFromPersistentStore() const;
55
56 protected:
57 std::string ME;
58 org::xmlBlaster::util::Global& global_;
59 org::xmlBlaster::util::I_Log& log_;
60 org::xmlBlaster::util::qos::storage::ClientQueueProperty property_;
61 mutable I_Queue *transientQueueP_;
62 I_Queue *persistentQueueP_;
63 org::xmlBlaster::util::thread::Mutex accessMutex_;
64
65 public:
66 CacheQueuePlugin(org::xmlBlaster::util::Global& global, const org::xmlBlaster::util::qos::storage::ClientQueueProperty& property);
67
68 virtual ~CacheQueuePlugin();
69
70 /**
71 * puts a new entry into the queue.
72 * Note that this method takes the entry pointed to by the argument
73 * and puts a reference to it into the queue. This means that you can not destroy the entry before the
74 * reference to it has been removed from the queue (which normally happens on a remove or when destroying
75 * the queue.
76 */
77 void put(const MsgQueueEntry &entry);
78
79 /**
80 * Returns the entries with the highest priority in the queue. If 'maxNumOfEntries' is positive,
81 * this is the maximum number of entries to return. If maxNumOfBytes is positive, only the entries
82 * which fit into the range specified are returned. If there are no such entries, an empty std::vector is
83 * returned.
84 */
85 const std::vector<EntryType> peekWithSamePriority(long maxNumOfEntries=-1, long maxNumOfBytes=-1) const;
86
87 /**
88 * Deletes the entries specified in the std::vector in the argument list. If this std::vector is empty or if
89 * the queue is empty, zero (0) is returned, otherwise it returns the number of entries really deleted.
90 */
91 long randomRemove(const std::vector<EntryType>::const_iterator &start, const std::vector<EntryType>::const_iterator &end);
92
93 /**
94 * Access the current number of entries.
95 * @return The number of entries in the queue
96 */
97 long getNumOfEntries() const;
98
99 /**
100 * Access the configured maximum number of elements for this queue.
101 * @return The maximum number of elements in the queue
102 */
103 long getMaxNumOfEntries() const;
104
105 /**
106 * Returns the amount of bytes currently in the queue.
107 * If the implementation of this interface is not able to return the correct
108 * number of entries (for example if the implementation must make a remote
109 * call to a DB which is temporarly not available) it will return -1.
110 * @return The amount of bytes currently in the queue, returns -1 on error
111 */
112 int64_t getNumOfBytes() const;
113
114 /**
115 * Access the configured capacity (maximum bytes) for this queue.
116 * @return The maximum capacity for the queue in bytes
117 */
118 int64_t getMaxNumOfBytes() const;
119
120 /**
121 * Clears (removes all entries) this queue
122 */
123 void clear();
124
125 /**
126 * returns true if the queue is empty, false otherwise
127 */
128 bool empty() const;
129
130 static std::string usage();
131
132 /**
133 * Get the name of the plugin.
134 * @return "CACHE"
135 * @enforcedBy I_Plugin
136 */
137 std::string getType() { static std::string type = "CACHE"; return type; }
138
139 /**
140 * Get the version of the plugin.
141 * @return "1.0"
142 * @enforcedBy I_Plugin
143 */
144 std::string getVersion() { static std::string version = "1.0"; return version; }
145
146 void destroy();
147 };
148
149 }}}} // namespace
150
151 #endif
syntax highlighted by Code2HTML, v. 0.9.1