[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [xmlblaster] Missing messages
Hi Eugene,
many thanks for the deep analysis and patches. We will look at them in
the next weeks (quite busy right now).
We will also try to find a solution to fill the gaps which still remain
since my experience is that if something is "theoretically possible"
then it will occur for sure in real life ;)
Thanks
Michele
Eugene Wu wrote:
> Amendment on my fix:
> TopicHandler: moved destroyTimer check point from "private ArrayList
> timeout()" to "public final void timeout(Object userData)" in order to
> enable the immediate destruction when destroyDelay = 0.
>
> My approach still leaves some gap in order to avoid dead lock:
> After a publishing thread unset the destroyTimer and just began to do
> actual publishing, another publishing thread just finished its job and
> could reset the destroyTimer.
> In my testing, the chance didn't occur, but it's possible
> theoretically. So my approach only reduced the chance but not solved the
> problem completely.
>
>
> Eugene Wu wrote:
>
>> Hi, there:
>>
>> Figured out one cause for the issue. It could happen when a pulishing
>> thread and a timeout thread work on a same TopicHandler instance.
>>
>> For example, after thread A (publishing) retrieved a topicHandler from
>> topicHandlerMap in line 1695 of RoquestBroker:
>> Object obj = topicHandlerMap.get(msgUnit.getKeyOid());
>>
>> the topicHandler's publish method would be invoked. As a synchronized
>> lock on the topicHandler can not be applyed on the whole pubish()
>> method (dead-lock concerns), another thread B (timeout) could get a
>> chance to invoke timeout(Object userData) method of the topicHandler
>> before thread A sets the topicHandler's state to ALIVE. Therefore, the
>> topicHandler could be publishing a message and, at the same time, also
>> could be destroyed, which caused unpredictable results.
>>
>> In my fix (attached), both publishing thread and timeout thread have a
>> check point, in which three things are done to let a slightly ahead
>> thread be able to prevent a slightly behind competing thread from
>> running:
>>
>> 1) Get the lock on the topicHandler (synchronized lock)
>> 2) Check if the topicHandler is working with a competing thread. If
>> yes, stop the current thread for timeout or stop using the handler for
>> publishing. Otherwise, goto step 3.
>> 3) Setup a check mark to prevent its competing thread running. The
>> mark must be cleared by the framework in a proper time later on.
>>
>> Here are more detailed scenarios:
>>
>> a) Publishing thread ahead
>> After thread A got the topicHandler, its state will be checked. If
>> it is dead (and should be destroyed now because of a synchronized
>> lock in timeout method), a new TopicHandler will be created. If the
>> retrieved topicHandle is not dead, its destroyTimer will be removed.
>> Later on, thread B will be stoped at the point where it check for the
>> destroyTimer's null value.
>>
>> The destroyTimer will be reset by the framework when the
>> topicHandler is turned to UNREFERENCED.
>>
>> b) Timeout thread ahead
>> After thread B got the topicHandler, it will check for the
>> existence of destroyTimer. If not existed, stop the timeout event.
>> Otherwise, the thread will continue. Because of a synchronized lock,
>> it will set the topicHandler as DEAD eventually, which will prevent
>> thread A from using the obsolete handler.
>>
>> A dead topicHandle will be garbage collected. (need to confirm this!)
>>
>>
>> Regards,
>> Eugene
>>
>
> ------------------------------------------------------------------------
>
> Index: org/xmlBlaster/engine/TopicHandler.java
> ===================================================================
> --- org/xmlBlaster/engine/TopicHandler.java (revision 15178)
> +++ org/xmlBlaster/engine/TopicHandler.java (working copy)
> at at -1946,6 +1946,14 at at
> return notifyList;
> }
>
> + void unsetDestroyTimer() {
> + if (this.timerKey != null) {
> + log.finer("Unset destroyTimer for TopicHandler: "+this.getUniqueKey());
> + this.destroyTimer.removeTimeoutListener(this.timerKey);
> + this.timerKey = null;
> + }
> + }
> +
> /**
> * Merge the message DOM tree into the big xmlBlaster DOM tree
> */
> at at -2196,8 +2204,16 at at
> * This timeout occurs after a configured delay (destroyDelay) in UNREFERENCED state
> */
> public final void timeout(Object userData) {
> - if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy delay occurred - destroying topic now ...");
> - ArrayList notifyList = timeout();
> + if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy delay occurred - destroying topic (oid="+getUniqueKey()+") now ...");
> + ArrayList notifyList = null;
> + synchronized (this) {
> + if (timerKey==null) {
> + log.finer("Ignored timeout event for the timer was unset by a publishing thread");
> + return;
> + }
> + notifyList = timeout();
> + }
> +
> if (notifyList != null) notifySubscribersAboutErase(notifyList); // must be outside the synchronize
> }
>
> Index: org/xmlBlaster/engine/RequestBroker.java
> ===================================================================
> --- org/xmlBlaster/engine/RequestBroker.java (revision 15178)
> +++ org/xmlBlaster/engine/RequestBroker.java (working copy)
> at at -1686,6 +1686,7 at at
> // Handle local message
>
> // Find or create the topic
> + boolean newlycreated = false;
> TopicHandler topicHandler = null;
> synchronized(this.topicHandlerMap) {
> if (!msgKeyData.getOid().equals(msgUnit.getKeyOid())) {
> at at -1695,11 +1696,25 at at
> Object obj = topicHandlerMap.get(msgUnit.getKeyOid());
> if (obj == null) {
> topicHandler = new TopicHandler(this, sessionInfo, msgUnit.getKeyOid()); // adds itself to topicHandlerMap
> + newlycreated = true;
> }
> else {
> topicHandler = (TopicHandler)obj;
> }
> }
> +
> + if (!newlycreated && !topicHandler.isAlive()) {
> + log.finer("The topic is not ALIVE, need to check if it is dying or not");
> + synchronized(topicHandler) { // wiating if the topicHandler is dying.
> + if (topicHandler.isDead()) {
> + log.finer("The topic is DEAD, need to create a new one");
> + topicHandler = new TopicHandler(this, sessionInfo, msgUnit.getKeyOid());
> + }
> + else {
> + topicHandler.unsetDestroyTimer(); // to provent the topicHandler from being timed out
> + }
> + }
> + }
>
> // Process the message
> publishReturnQos = topicHandler.publish(sessionInfo, msgUnit, publishQos);