Topic |
XmlBlaster has a sophisticated message queueing engine |
Des cription |
Message are queued only when necessary. This occurs typically on messages which
need to be sent back to clients:
Queue Features:
- High performing and thread safe
The queue is based on a sorted bounded buffer from Doug Lea.
It is higher performing than a thread safe java.util collection TreeSet,
consuming 100 mirco seconds instead of 200 micro seconds for the latter
to insert and remove one message.
- Priority support
The priority for messages can be specified when publishing.
The standard value is 5, a value of 9 has highest priority, whereas 0 is slowest.
- Guarantee sequence
The sequence of incoming messages is recorded with a unique timestamp.
On delivery this sequence is guaranteed to be equal.
Different message priorities reorder the sequence of message delivery, since
priority is a higher sort criteria than the incoming timestamp.
- Bounded (max size)
Every queue has a maximum size. The maximum boundary is adjustable
as the number of entries in the queue or as maximum memory consumption.
On queue overflow xmlBlaster sends a deadMessage (see below).
'Falling through' - the oldest message is removed when the queue is full and
a new message arrives - is currently not supported.
- Dead message
On queue overflow or on callback failure a "dead message" mode is adjustable (MQSeries calls it dead letter).
In this case a dead message is published, containing the lost message.
Interested exception handlers clients can subscribe to "__sys__deadMessage" messages to
receive them and react accordingly. A logging entry is written in such a case to
the servers log file.
- Queue types
A queue is installed on the fly when needed. If a message destination is
not reachable, a queue is created to hold messages addressed for this destination.
Two message queue types are created on login.
- SessionQueue (callback:)
The life cycle of this queue is bound to the login session life cycle, usually it
is created on login and destroyed on logout (see requirement engine.login.qos.session).
It can be used as the ReplyTo address for sent messages.
This is the default callback for session based subscriptions.
- SubjectQueue (subject:)
This queue is created on login if not existing beforehand.
The queue is named similar to the login name of the client.
It may remain living after logout, or may be created before login if
a PtP message addresses it.
It is destroyed on logout of the last session of a specific client if it contains no messages
(or if forced by the QoS of a disconnect(), see API of DisconnectQos).
If the last message in this queue expires and no login session exists it is destroyed [TODO].
The specified callback server of the SessionQueue is used if not otherwise specified in the login QoS,
see callback attribute useForSubjectQueue='true'.
The messages from this queue are delivered to all current sessions of the user having
set useForSubjectQueue='true'.
This means on multiple logins, the same message is delivered multiple times.
- Zero or one callback address
Every message queue has zero or one callback addresses associated:
The SessionQueue has exactly one for asynchronous
updates or zero when browsed synchronously.
The SubjectQueue has no callback address, it uses the sessions callback.
- Synchronous browsing
A message queue may be browsed with the synchronous get() method.
- Sequence on XPath subscription
A subscription with XPath may collect many messages with one query.
The sequence of those message is guaranteed (priority and timestamp subordering).
|
Example
Java
|
These are example QoS of a connect() invocation:
================================================
<qos>
...
<!-- SessionQueue callback: Use CORBA to send messages back -->
<queue relating='callback' maxEntries='1600' expires='360000000'
onOverflow='deadMessage' onFailure='deadMessage'>
<callback type='IOR' sessionId='sd3lXjs9Fdlggh'
pingInterval='60000' useForSubjectQueue='true'>
IOR:00011200070009990000....
</callback>
</queue>
</qos>
<qos>
...
<!-- specify subjectQueue parameters -->
<queue relating='subject' maxEntries='1600' expires='360000000'
onOverflow='deadMessage'>
</queue>
</qos>
<qos>
...
<!-- SessionQueue callback:
Use CORBA to send messages back with default queue parameters
-->
<callback type='IOR'>
IOR:00011200070009990000....
<compress type='gzip' minSize='1000' />
<!-- compress messages bigger 1000 bytes before sending them to me -->
<burstMode collectTime='400' />
<!-- Collect messages for 400 milliseconds and update
them in one callback (burst mode) -->
</callback>
</qos>
These are example QoS of subscribe() invocations:
================================================
<!-- The subscribed messages are delivered via the SessionQueue of the subscriber -->
<qos>
</qos>
<!-- Same as above -->
<qos>
<queue relating='callback'/>
</qos>
Any other queue overwrites session delivery:
<!-- The subscribed messages are only delivered via the
SubjectQueue of the current client -->
<qos>
<queue relating='subject'/>
<queue relating='subject:somebodyElse'/>
</qos>
<qos>
<queue maxEntries='1000' maxBytes='4000000' onOverflow='deadMessage'/>
<callback type='EMAIL'>
et@mars.universe
<!-- Sends messages to et with specified queue attributes -->
</callback>
</queue>
<callback type='EMAIL' onFailure='deadMessage'>
tolkien@mars.universe
<!-- Sends messages to tolkien, with default queue settings -->
</callback>
<callback type='XMLRPC' sessionId='8sd3lXjspx9Fdlggh'>
http:/www.mars.universe:8080/RPC2
</callback>
<!-- The session id is passed to the client callback server, he can check
if he trusts this sender -->
</qos>
This is an example QoS of a disconnect() invocation:
====================================================
<qos>
<deleteSubjectQueue>false</deleteSubjectQueue>
</qos>
|
Configure |
These parameters allow to configure the xmlBlaster server default behavior.
Property |
Default |
Description |
Implemented |
queue/maxEntries |
1000 |
The max setting allowed for queue size (number of messages in queue until it overflows) |
|
queue/maxBytes |
4000 (4 MB) |
The max setting allowed for queue max size in bytes |
|
queue/expires.min |
1000 |
The min span of life is one second |
|
queue/expires.max |
0 |
The max span of life of a queue is currently forever (=0) |
|
queue/expires |
0 |
If not otherwise noted a queue dies after the max value (0 is forever) |
|
queue/onOverflow |
deadMessage |
Error handling when queue is full |
|
queue/onFailure |
deadMessage |
Error handling when callback failed (after all retries etc.) |
|
NOTE: Configuration parameters are specified on command line (-someValue 17) or in the
xmlBlaster.properties file (someValue=17). See requirement "util.property" for details.
Columns named Impl tells you if the feature is implemented.
Columns named Hot tells you if the configuration is changeable in hot operation.
|
See API |
org.xmlBlaster.engine.queue.MsgQueue
|
See API |
org.xmlBlaster.engine.callback.CbWorker
|
See API |
org.xmlBlaster.util.qos.address.CallbackAddress
|
See API |
org.xmlBlaster.util.qos.storage.QueueProperty
|
See API |
org.xmlBlaster.client.qos.ConnectQos
|
See API |
org.xmlBlaster.client.qos.DisconnectQos
|
See REQ |
engine.callback
|
See REQ |
engine.qos.login.callback
|
See REQ |
util.property
|
See REQ |
util.property.args
|
See REQ |
util.property.env
|
See REQ |
admin.errorHandling
|
This page is generated from the requirement XML file xmlBlaster/doc/requirements/engine.queue.xml