[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [xmlblaster] Missing messages



Hi, there:

Figured out one cause for the issue. It could happen when a pulishing thread and a timeout thread work on a same TopicHandler instance.

For example, after thread A (publishing) retrieved a topicHandler from topicHandlerMap in line 1695 of RoquestBroker:
Object obj = topicHandlerMap.get(msgUnit.getKeyOid());


the topicHandler's publish method would be invoked. As a synchronized lock on the topicHandler can not be applyed on the whole pubish() method (dead-lock concerns), another thread B (timeout) could get a chance to invoke timeout(Object userData) method of the topicHandler before thread A sets the topicHandler's state to ALIVE. Therefore, the topicHandler could be publishing a message and, at the same time, also could be destroyed, which caused unpredictable results.

In my fix (attached), both publishing thread and timeout thread have a check point, in which three things are done to let a slightly ahead thread be able to prevent a slightly behind competing thread from running:

1) Get the lock on the topicHandler (synchronized lock)
2) Check if the topicHandler is working with a competing thread. If yes, stop the current thread for timeout or stop using the handler for publishing. Otherwise, goto step 3.
3) Setup a check mark to prevent its competing thread running. The mark must be cleared by the framework in a proper time later on.


Here are more detailed scenarios:

a) Publishing thread ahead
After thread A got the topicHandler, its state will be checked. If it is dead (and should be destroyed now because of a synchronized lock in timeout method), a new TopicHandler will be created. If the retrieved topicHandle is not dead, its destroyTimer will be removed. Later on, thread B will be stoped at the point where it check for the destroyTimer's null value.


The destroyTimer will be reset by the framework when the topicHandler is turned to UNREFERENCED.

b) Timeout thread ahead
After thread B got the topicHandler, it will check for the existence of destroyTimer. If not existed, stop the timeout event. Otherwise, the thread will continue. Because of a synchronized lock, it will set the topicHandler as DEAD eventually, which will prevent thread A from using the obsolete handler.


   A dead topicHandle will be garbage collected. (need to confirm this!)


Regards, Eugene



Marcel Ruff wrote:

David Kerry wrote:

Some more info on this...

Not sure if it will help, but I discovered this related exception in our logs
that is tied to this event as well:


java.lang.Exception: Stack trace
at java.lang.Thread.dumpStack(Thread.java:1158)
at org.xmlBlaster.engine.TopicHandler.entryDestroyed(TopicHandler.java:952)
at org.xmlBlaster.engine.MsgUnitWrapper.toDestroyed(MsgUnitWrapper.java:596)


at org.xmlBlaster.engine.MsgUnitWrapper.setReferenceCounter(MsgUnitWrapper.java:308)

at org.xmlBlaster.engine.TopicHandler.publish(TopicHandler.java:681)
at org.xmlBlaster.engine.RequestBroker.publish(RequestBroker.java:1704)
at org.xmlBlaster.engine.RequestBroker.publish(RequestBroker.java:1495)
at org.xmlBlaster.engine.RequestBroker.publish(RequestBroker.java:1489)
at org.xmlBlaster.engine.XmlBlasterImpl.publishArr(XmlBlasterImpl.java:198)
at org.xmlBlaster.util.protocol.RequestReplyExecutor.receiveReply(RequestReplyExecutor.java:405)


at org.xmlBlaster.protocol.socket.HandleClient.handleMessage(HandleClient.java:232)

at org.xmlBlaster.protocol.socket.HandleClient.run(HandleClient.java:354)
at java.lang.Thread.run(Thread.java:595)

Yes, that is the stack trace which relates to the problem you described in your last mail.
This seems to be a multi threading issue during a topic is destroyed,
the traversed states are UNCONFIGURED -> UNREFERENCED -> DEAD


As a workaround you should keep the topic "http_post" alive (instead of create/delete/create/delete ... because of your volatile messages).
Please try to set


 -topic.destroyDelay 86400000

on xmlBlaster server command line or in xmlBlaster.properties (topic.destroyDelay=86400000)

I think it prevents your problem (but it is not a general fix yet).

regards
Marcel


Index: org/xmlBlaster/engine/TopicHandler.java
===================================================================
--- org/xmlBlaster/engine/TopicHandler.java	(revision 15178)
+++ org/xmlBlaster/engine/TopicHandler.java	(working copy)
 at  at  -1946,6 +1946,13  at  at 
       return notifyList;
    }
 
+   void unsetDestroyTimer() {
+      if (this.timerKey != null) {
+         this.destroyTimer.removeTimeoutListener(this.timerKey);
+         this.timerKey = null;
+      }
+   }
+   
    /**
     * Merge the message DOM tree into the big xmlBlaster DOM tree
     */
 at  at  -2196,7 +2203,7  at  at 
     * This timeout occurs after a configured delay (destroyDelay) in UNREFERENCED state
     */
    public final void timeout(Object userData) {
-      if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy delay occurred - destroying topic now ...");
+      if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy delay occurred - destroying topic (oid="+getUniqueKey()+") now ...");
       ArrayList notifyList = timeout();
       if (notifyList != null) notifySubscribersAboutErase(notifyList); // must be outside the synchronize
    }
 at  at  -2207,6 +2214,10  at  at 
       synchronized (this) {
          if (isAlive()) // interim message arrived?
             return null;
+         if (timerKey==null) {
+            log.finer("Ignored timeout event for the timer was unset by a publishing thread");
+            return null;
+         }
          return toDead(this.creatorSessionName, null, null);
       }
    }
Index: org/xmlBlaster/engine/RequestBroker.java
===================================================================
--- org/xmlBlaster/engine/RequestBroker.java	(revision 15178)
+++ org/xmlBlaster/engine/RequestBroker.java	(working copy)
 at  at  -1686,6 +1686,7  at  at 
          // Handle local message
 
          // Find or create the topic
+         boolean newlycreated = false;
          TopicHandler topicHandler = null;
          synchronized(this.topicHandlerMap) {
             if (!msgKeyData.getOid().equals(msgUnit.getKeyOid())) {
 at  at  -1695,11 +1696,25  at  at 
             Object obj = topicHandlerMap.get(msgUnit.getKeyOid());
             if (obj == null) {
                topicHandler = new TopicHandler(this, sessionInfo, msgUnit.getKeyOid()); // adds itself to topicHandlerMap
+               newlycreated = true;
             }
             else {
                topicHandler = (TopicHandler)obj;
             }
          }
+         
+         if (!newlycreated && !topicHandler.isAlive()) {
+            log.finer("The topic is not ALIVE, need to check if it is dying or not");
+            synchronized(topicHandler) { // wiating if the topicHandler is dying.
+               if (topicHandler.isDead()) {
+                  log.finer("The topic is DEAD, need to create a new one");
+                  topicHandler = new TopicHandler(this, sessionInfo, msgUnit.getKeyOid());
+               }
+               else {
+                  topicHandler.unsetDestroyTimer(); // to provent the topicHandler from being timed out
+               }
+            }
+         }
 
          // Process the message
          publishReturnQos = topicHandler.publish(sessionInfo, msgUnit, publishQos);