xmlBlaster 2.0.0 API

org.xmlBlaster.util.dispatch
Class DispatchManager

java.lang.Object
  extended by org.xmlBlaster.util.dispatch.DispatchManager
All Implemented Interfaces:
java.util.EventListener, I_Timeout, I_QueuePutListener

public final class DispatchManager
extends java.lang.Object
implements I_Timeout, I_QueuePutListener

Manages the sending of messages and commands and does error recovery further we communicate with the dispatcher plugin if one is configured.

There is one instance of this class per queue and remote connection.

Author:
xmlBlaster@marcelruff.info

Field Summary
private  java.lang.Object ALIVE_TRANSITION_MONITOR
           
private  long burstModeMaxBytes
           
private  int burstModeMaxEntries
           
private  long collectTime
          If > 0 does burst mode
private  java.util.HashSet connectionStatusListeners
           
private  DispatchConnectionsHandler dispatchConnectionsHandler
           
private  boolean dispatcherActive
          async delivery is activated only when this flag is 'true'.
private  boolean dispatchWorkerIsActive
           
private  I_MsgErrorHandler failureListener
           
private  Global glob
           
private  boolean inAliveTransition
           
private  boolean inDispatchManagerCtor
           
private  boolean isShutdown
           
private  boolean isSyncMode
           
private static java.util.logging.Logger log
           
 java.lang.String ME
           
private  I_MsgDispatchInterceptor msgInterceptor
           
private  I_Queue msgQueue
           
private  int notifyCounter
           
private  I_MsgSecurityInterceptor securityInterceptor
           
private  SessionName sessionName
           
private  boolean shallCallToAliveSync
           
private  DispatchWorker syncDispatchWorker
          The worker for synchronous invocations
private  Timestamp timerKey
           
private  long toAliveTime
           
private  long toPollingTime
           
private  boolean trySyncMode
           
private  java.lang.String typeVersion
           
 
Constructor Summary
DispatchManager(Global glob, I_MsgErrorHandler failureListener, I_MsgSecurityInterceptor securityInterceptor, I_Queue msgQueue, I_ConnectionStatusListener connectionStatusListener, AddressBase[] addrArr, SessionName sessionName)
           
 
Method Summary
private  void activateDispatchWorker()
          Give the callback worker thread a kick to deliver the messages.
 boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener)
           
 boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener, boolean fireInitial)
           
private  void callToAliveSync()
           
private  boolean checkSending(boolean isPublisherThread)
           
 java.util.ArrayList filterDistributorEntries(java.util.ArrayList entries, java.lang.Throwable ex)
           
 void finalize()
           
 long getAliveSinceTime()
          Get timestamp when we went to ALIVE state.
 long getBurstModeMaxBytes()
          How many bytes maximum shall the callback thread take in one bulk out of the callback queue and deliver in one bulk.
 int getBurstModeMaxEntries()
          How many messages maximum shall the callback thread take in one bulk out of the callback queue and deliver in one bulk.
 I_ConnectionStatusListener[] getConnectionStatusListeners()
           
 DispatchConnectionsHandler getDispatchConnectionsHandler()
           
 DispatchStatistic getDispatchStatistic()
           
 java.lang.String getId()
          For logging
 I_MsgDispatchInterceptor getMsgDispatchInterceptor()
           
 I_MsgErrorHandler getMsgErrorHandler()
           
 I_MsgSecurityInterceptor getMsgSecurityInterceptor()
           
 int getNotifyCounter()
          Counts how often a new entry was added since the current worker thread was started.
 long getPollingSinceTime()
          Get timestamp when we went to POLLING state.
 I_Queue getQueue()
           
 SessionName getSessionName()
           
 java.lang.String getTypeVersion()
          The name in the configuration file for the plugin
private  void givingUpDelivery(XmlBlasterException ex)
           
(package private)  void handleSyncWorkerException(java.util.List<I_Entry> entryList, java.lang.Throwable throwable)
          Called by DispatchWorker if an Exception occured in sync mode Only on client side
(package private)  void handleWorkerException(java.util.List<I_Entry> entryList, java.lang.Throwable throwable)
          Called by DispatchWorker if an Exception occurred in async mode.
private  void initDispatcherActive(AddressBase[] addrArr)
          Switch on/off the sending of messages.
 void internalError(java.lang.Throwable throwable)
          Called locally and from TopicHandler when internal error (Throwable) occurred to avoid infinite looping
 boolean isAlive()
           
 boolean isDead()
           
 boolean isDispatcherActive()
           
 boolean isPolling()
           
 boolean isShutdown()
           
 boolean isSyncMode()
           
 void lostClientConnection()
          Can be called when client connection is lost (NOT the callback connection).
 void notifyAboutNewEntry()
          When somebody puts a new entry into the queue, we want to be notified about this after the entry is fed.
 boolean pingCallbackServer(boolean sync, boolean connectionIsDown)
           
 void postSendNotification(MsgQueueEntry entry)
           
 void postSendNotification(MsgQueueEntry[] entries)
           
 java.util.ArrayList prepareMsgsFromQueue(java.util.List<I_Entry> entryList)
          Here we prepare messages which are coming directly from the queue.
static java.util.ArrayList prepareMsgsFromQueue(java.lang.String logId, java.util.logging.Logger log, I_Queue queue, java.util.List<I_Entry> entryList)
           
 void putPost(I_QueueEntry queueEntry)
          Called by I_Queue implementation before leaving put() and somebody has registered for such events.
 void putPost(I_QueueEntry[] queueEntries)
          Called by I_Queue implementation before leaving put() and somebody has registered for such events.
 boolean putPre(I_QueueEntry queueEntry)
          Called by I_Queue implementation when a put() is invoked and somebody has registered for such events
 boolean putPre(I_QueueEntry[] queueEntries)
          Called by I_Queue implementation when a put() is invoked and somebody has registered for such events
 void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection)
           
private  void removeBurstModeTimer()
          Remove the burst mode timer
 boolean removeConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener)
          Remove the given listener
 void removeFromQueue(MsgQueueEntry[] entries, boolean postSendNotify)
          Messages are successfully sent, remove them now from queue (sort of a commit()): We remove filtered/destroyed messages as well (which doen't show up in entryListChecked)
 boolean sendingFailedNotification(MsgQueueEntry[] entries, XmlBlasterException ex)
          Notify I_PostSendListener about problem.
 void setAddresses(AddressBase[] addr)
          Set new callback addresses, typically after a session login/logout
 void setDispatcherActive(boolean dispatcherActive)
          Inhibits/activates the delivery of asynchronous dispatches of messages.
(package private)  void setDispatchWorkerIsActive(boolean val)
          The worker notifies us that it is finished, if messages are available it is triggered again.
 void shutdown()
          Stop all callback drivers of this client.
(package private)  void shutdownFomAnyState(ConnectionStateEnum oldState, XmlBlasterException ex)
          Call by DispatchConnectionsHandler on state transition
private  void startWorkerThread(boolean fromTimeout)
           
 void switchToASyncMode()
          Switch back to asynchronous mode.
 void switchToSyncMode()
          We register a QueuePutListener and all put() into the queue are intercepted - our put() is called instead.
 void timeout(java.lang.Object userData)
          We are notified about the burst mode timeout through this method.
(package private)  void toAlive(ConnectionStateEnum oldState)
          Call by DispatchConnectionsHandler on state transition NOTE: toAlive is called initially when a protocol plugin is successfully loaded but we don't know yet if it ever is able to connect
 void toDead(XmlBlasterException ex)
           
(package private)  void toPolling(ConnectionStateEnum oldState)
          Call by DispatchConnectionsHandler on state transition
 java.lang.String toXml(java.lang.String extraOffset)
          Dump state of this object into a XML ASCII string.
 void trySyncMode(boolean trySyncMode)
          Set behavior of dispatch framework.
 void updateProperty(CallbackAddress[] addressArr)
          Reconfigure dispatcher with given properties.
private  boolean useBurstModeTimer()
           
 
Methods inherited from class java.lang.Object
clone, equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

ME

public final java.lang.String ME

glob

private final Global glob

log

private static java.util.logging.Logger log

msgQueue

private final I_Queue msgQueue

dispatchConnectionsHandler

private final DispatchConnectionsHandler dispatchConnectionsHandler

failureListener

private final I_MsgErrorHandler failureListener

securityInterceptor

private final I_MsgSecurityInterceptor securityInterceptor

msgInterceptor

private final I_MsgDispatchInterceptor msgInterceptor

connectionStatusListeners

private java.util.HashSet connectionStatusListeners

typeVersion

private final java.lang.String typeVersion

collectTime

private long collectTime
If > 0 does burst mode


toAliveTime

private long toAliveTime

toPollingTime

private long toPollingTime

dispatchWorkerIsActive

private boolean dispatchWorkerIsActive

syncDispatchWorker

private DispatchWorker syncDispatchWorker
The worker for synchronous invocations


timerKey

private Timestamp timerKey

notifyCounter

private int notifyCounter

isShutdown

private boolean isShutdown

isSyncMode

private boolean isSyncMode

trySyncMode

private boolean trySyncMode

inAliveTransition

private boolean inAliveTransition

ALIVE_TRANSITION_MONITOR

private final java.lang.Object ALIVE_TRANSITION_MONITOR

burstModeMaxEntries

private int burstModeMaxEntries

burstModeMaxBytes

private long burstModeMaxBytes

dispatcherActive

private boolean dispatcherActive
async delivery is activated only when this flag is 'true'. Used to temporarly inhibit dispatch of messages


shallCallToAliveSync

private boolean shallCallToAliveSync

inDispatchManagerCtor

private boolean inDispatchManagerCtor

sessionName

private SessionName sessionName
Constructor Detail

DispatchManager

public DispatchManager(Global glob,
                       I_MsgErrorHandler failureListener,
                       I_MsgSecurityInterceptor securityInterceptor,
                       I_Queue msgQueue,
                       I_ConnectionStatusListener connectionStatusListener,
                       AddressBase[] addrArr,
                       SessionName sessionName)
                throws XmlBlasterException
Parameters:
msgQueue - The message queue which i use (!!! TODO: this changes, we should pass it on every method where needed)
connectionStatusListener - The implementation which listens on connectionState events (e.g. XmlBlasterAccess.java), or null
addrArr - The addresses i shall connect to
Throws:
XmlBlasterException
Method Detail

getSessionName

public SessionName getSessionName()
Returns:
Never null

isSyncMode

public boolean isSyncMode()

trySyncMode

public void trySyncMode(boolean trySyncMode)
Set behavior of dispatch framework.

Parameters:
trySyncMode - true: client side queue embedding, false: server side callback queue defaults to false

updateProperty

public final void updateProperty(CallbackAddress[] addressArr)
                          throws XmlBlasterException
Reconfigure dispatcher with given properties. Note that only a limited re-configuration is supported

Parameters:
addressArr - The new configuration
Throws:
XmlBlasterException

finalize

public void finalize()
Overrides:
finalize in class java.lang.Object

getQueue

public I_Queue getQueue()

addConnectionStatusListener

public boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener)

addConnectionStatusListener

public boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener,
                                           boolean fireInitial)

removeConnectionStatusListener

public boolean removeConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener)
Remove the given listener

Parameters:
connectionStatusListener -
Returns:
true if it was removed

getConnectionStatusListeners

public I_ConnectionStatusListener[] getConnectionStatusListeners()

getTypeVersion

public java.lang.String getTypeVersion()
The name in the configuration file for the plugin

Returns:
e.g. "Priority,1.0"

getMsgSecurityInterceptor

public I_MsgSecurityInterceptor getMsgSecurityInterceptor()
Returns:
The import/export encrypt handle or null if created by a SubjectInfo (no session info available)

getDispatchConnectionsHandler

public final DispatchConnectionsHandler getDispatchConnectionsHandler()
Returns:
The handler of all callback plugins, is never null

getBurstModeMaxEntries

public final int getBurstModeMaxEntries()
How many messages maximum shall the callback thread take in one bulk out of the callback queue and deliver in one bulk.


getBurstModeMaxBytes

public final long getBurstModeMaxBytes()
How many bytes maximum shall the callback thread take in one bulk out of the callback queue and deliver in one bulk.


getAliveSinceTime

public final long getAliveSinceTime()
Get timestamp when we went to ALIVE state.

Returns:
millis timestamp

getPollingSinceTime

public final long getPollingSinceTime()
Get timestamp when we went to POLLING state.

Returns:
millis timestamp

toAlive

void toAlive(ConnectionStateEnum oldState)
Call by DispatchConnectionsHandler on state transition NOTE: toAlive is called initially when a protocol plugin is successfully loaded but we don't know yet if it ever is able to connect


reachedAliveSync

public void reachedAliveSync(ConnectionStateEnum oldState,
                             I_XmlBlasterAccess connection)

toPolling

void toPolling(ConnectionStateEnum oldState)
Call by DispatchConnectionsHandler on state transition


toDead

public void toDead(XmlBlasterException ex)
Parameters:
ex -

shutdownFomAnyState

void shutdownFomAnyState(ConnectionStateEnum oldState,
                         XmlBlasterException ex)
Call by DispatchConnectionsHandler on state transition


givingUpDelivery

private void givingUpDelivery(XmlBlasterException ex)

postSendNotification

public void postSendNotification(MsgQueueEntry entry)

postSendNotification

public void postSendNotification(MsgQueueEntry[] entries)

sendingFailedNotification

public boolean sendingFailedNotification(MsgQueueEntry[] entries,
                                         XmlBlasterException ex)
Notify I_PostSendListener about problem.

Typically XmlBlasterAccess is notified when message came asynchronously from queue

Parameters:
entryList -
ex -
Returns:
true if processed
See Also:
for explanation

handleSyncWorkerException

void handleSyncWorkerException(java.util.List<I_Entry> entryList,
                               java.lang.Throwable throwable)
                         throws XmlBlasterException
Called by DispatchWorker if an Exception occured in sync mode Only on client side

Throws:
XmlBlasterException

removeFromQueue

public void removeFromQueue(MsgQueueEntry[] entries,
                            boolean postSendNotify)
                     throws XmlBlasterException
Messages are successfully sent, remove them now from queue (sort of a commit()): We remove filtered/destroyed messages as well (which doen't show up in entryListChecked)

Parameters:
postSendNotify - TODO
Throws:
XmlBlasterException

handleWorkerException

void handleWorkerException(java.util.List<I_Entry> entryList,
                           java.lang.Throwable throwable)
                     throws XmlBlasterException
Called by DispatchWorker if an Exception occurred in async mode.

Throws:
XmlBlasterException - should never happen but is possible during removing entries from queue

getMsgErrorHandler

public I_MsgErrorHandler getMsgErrorHandler()

switchToSyncMode

public void switchToSyncMode()
We register a QueuePutListener and all put() into the queue are intercepted - our put() is called instead. We then deliver this QueueEntry directly to the remote connection and return synchronously the returned value or the Exception if one is thrown.


callToAliveSync

private void callToAliveSync()

switchToASyncMode

public void switchToASyncMode()
Switch back to asynchronous mode. Our thread pool will take the messages out of the queue and deliver them in asynchronous mode.


putPre

public boolean putPre(I_QueueEntry queueEntry)
               throws XmlBlasterException
Description copied from interface: I_QueuePutListener
Called by I_Queue implementation when a put() is invoked and somebody has registered for such events

Specified by:
putPre in interface I_QueuePutListener
Parameters:
queueEntry - Is guaranteed to never be null
Returns:
true: Continue to put message into queue, false: return without putting entry into queue
Throws:
XmlBlasterException
See Also:
I_QueuePutListener.putPre(I_QueueEntry)

putPre

public boolean putPre(I_QueueEntry[] queueEntries)
               throws XmlBlasterException
Description copied from interface: I_QueuePutListener
Called by I_Queue implementation when a put() is invoked and somebody has registered for such events

Specified by:
putPre in interface I_QueuePutListener
Parameters:
queueEntries - Is guaranteed to never be null
Returns:
true: Continue to put message into queue, false: return without putting entry into queue
Throws:
XmlBlasterException
See Also:
putPre(I_QueueEntry), I_QueuePutListener.putPre(I_QueueEntry[])

putPost

public void putPost(I_QueueEntry queueEntry)
             throws XmlBlasterException
Description copied from interface: I_QueuePutListener
Called by I_Queue implementation before leaving put() and somebody has registered for such events. The message is already safely entered to the queue.

Specified by:
putPost in interface I_QueuePutListener
Parameters:
queueEntry - Is guaranteed to never be null
Throws:
XmlBlasterException
See Also:
I_QueuePutListener.putPost(I_QueueEntry)

putPost

public void putPost(I_QueueEntry[] queueEntries)
             throws XmlBlasterException
Description copied from interface: I_QueuePutListener
Called by I_Queue implementation before leaving put() and somebody has registered for such events. The message is already safely entered to the queue.

Specified by:
putPost in interface I_QueuePutListener
Parameters:
queueEntries - Is guaranteed to never be null
Throws:
XmlBlasterException
See Also:
putPost(I_QueueEntry), I_QueuePutListener.putPost(I_QueueEntry[])

prepareMsgsFromQueue

public java.util.ArrayList prepareMsgsFromQueue(java.util.List<I_Entry> entryList)
Here we prepare messages which are coming directly from the queue.
  1. We eliminate destroyed messages
  2. We make a shallow copy of the message. We need to do this, out messages are references directly into the queue. The delivery framework is later changing the QoS and plugins may change the content - and this should not modify the queue entries


prepareMsgsFromQueue

public static java.util.ArrayList prepareMsgsFromQueue(java.lang.String logId,
                                                       java.util.logging.Logger log,
                                                       I_Queue queue,
                                                       java.util.List<I_Entry> entryList)

notifyAboutNewEntry

public void notifyAboutNewEntry()
When somebody puts a new entry into the queue, we want to be notified about this after the entry is fed.

Called by I_Queue.putPost()


getNotifyCounter

public int getNotifyCounter()
Counts how often a new entry was added since the current worker thread was started.


activateDispatchWorker

private void activateDispatchWorker()
Give the callback worker thread a kick to deliver the messages. Throws no exception.


useBurstModeTimer

private boolean useBurstModeTimer()
Returns:
true if a burst mode timer was activated

removeBurstModeTimer

private void removeBurstModeTimer()
Remove the burst mode timer


startWorkerThread

private void startWorkerThread(boolean fromTimeout)
Parameters:
fromTimeout - for logging only

isDead

public boolean isDead()

isPolling

public boolean isPolling()

isAlive

public boolean isAlive()

lostClientConnection

public void lostClientConnection()
Can be called when client connection is lost (NOT the callback connection). Currently only detected by the SOCKET protocol plugin. Others can only detect lost clients with their callback protocol pings


pingCallbackServer

public boolean pingCallbackServer(boolean sync,
                                  boolean connectionIsDown)

checkSending

private boolean checkSending(boolean isPublisherThread)
Parameters:
isPublisherThread - We take care that the publisher thread, coming through putPost() does never too much work to return fast enough and avoid possible dead locks.
Returns:
true is status is OK and we can try to send a message

timeout

public void timeout(java.lang.Object userData)
We are notified about the burst mode timeout through this method.

Specified by:
timeout in interface I_Timeout
Parameters:
userData - You get bounced back your userData which you passed with Timeout.addTimeoutListener()

getMsgDispatchInterceptor

public I_MsgDispatchInterceptor getMsgDispatchInterceptor()
Returns:
The interceptor plugin if available, otherwise null

setAddresses

public void setAddresses(AddressBase[] addr)
                  throws XmlBlasterException
Set new callback addresses, typically after a session login/logout

Throws:
XmlBlasterException

initDispatcherActive

private void initDispatcherActive(AddressBase[] addrArr)
Switch on/off the sending of messages.


setDispatchWorkerIsActive

void setDispatchWorkerIsActive(boolean val)
The worker notifies us that it is finished, if messages are available it is triggered again.


internalError

public void internalError(java.lang.Throwable throwable)
Called locally and from TopicHandler when internal error (Throwable) occurred to avoid infinite looping


getDispatchStatistic

public DispatchStatistic getDispatchStatistic()
Returns:
A container holding some statistical delivery information

isShutdown

public boolean isShutdown()

shutdown

public void shutdown()
Stop all callback drivers of this client. Possibly invoked twice (givingUpDelivery() calls it indirectly as well) We don't shutdown the corresponding queue.


getId

public java.lang.String getId()
For logging


toXml

public java.lang.String toXml(java.lang.String extraOffset)
Dump state of this object into a XML ASCII string.

Parameters:
extraOffset - indenting of tags for nice output
Returns:
internal state as a XML ASCII string

setDispatcherActive

public void setDispatcherActive(boolean dispatcherActive)
Inhibits/activates the delivery of asynchronous dispatches of messages.

Parameters:
dispatcherActive -

isDispatcherActive

public boolean isDispatcherActive()
Returns:
true if the dispacher is currently activated, i.e. if it is able to deliver asynchronousy messages from the callback queue.

filterDistributorEntries

public java.util.ArrayList filterDistributorEntries(java.util.ArrayList entries,
                                                    java.lang.Throwable ex)

xmlBlaster 2.0.0 API

Copyright © 1999-2010 The xmlBlaster.org contributers.