[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [xmlblaster] missing volatile messages



Marcel,

I fixed a multithread racing issue which caused message losing. The patch is attached with the mail.

It was concerned with org.xmlBlaster.engine.TopicHandler.publish() and entryDestoyed(). Let's assume thread A just called toAlive() in line 537, which set topic state to ALIVE. Before thread A pushs a msgUnit into callback queue(s) through TopicHandler.invokeCallbackAndHandleFailure() in line 645 --> invokeCallback() in line 1337, another thread B just finished its publish in the same topic and might triggered a message distroying, which, in turn, might switch topic state to UNREFERENCED. When thread A reached TopicHandler.invokeCallback() in line 1321, it would find the topic was in invalid state and droped the message.

I used a publishCounter to record the number of running threads in the publish method. When entryDetryed() intents to change topic state to UNREFERENCED, it will first check the publishCounter. If it is greater than one, don't change topic state.

I just dealed with one scenario in one possible way without looking at the whole picture.

Eugene
--- xmlBlaster_1.1.1_orig/src/java/org/xmlBlaster/engine/TopicHandler.java	2006-01-03 19:01:41.000000000 +0000
+++ xmlBlaster_1.1.1_my/src/java/org/xmlBlaster/engine/TopicHandler.java	2006-04-12 15:16:53.000000000 +0000
 at  at  -139,6 +139,8  at  at 
 
    private boolean isRegisteredInBigXmlDom = false;
 
+   private int publishCounter = 0; //count the threads running in publish method
+   
    /**
     * This topic is destroyed after given timeout
     * The timer is activated on state change to UNREFERENCED
 at  at  -566,7 +568,12  at  at 
             }
 
             msgUnitWrapper = new MsgUnitWrapper(glob, msgUnit, this.msgUnitCache, initialCounter, 0, -1);
-
+            
+            publishCounter++;
+            if (!isAlive()) {
+                toAlive();
+            }
+            
             // Forcing RAM entry temporary (reset in finally below) to avoid performance critical harddisk IO during initialization, every callback/subject/history queue put()/take() is changing the reference counter of MsgUnitWrapper. For persistent messages this needs to be written to harddisk
             // If the server crashed during this RAM operation it is not critical as the publisher didn't get an ACK yet
             synchronized(this.msgUnitWrapperUnderConstruction) {
 at  at  -649,6 +656,7  at  at 
       finally {
          if (msgUnitWrapper != null) {
             synchronized(this) {
+               publishCounter--;
                synchronized(msgUnitWrapper) {
                   synchronized(this.msgUnitCache) {
                      try {
 at  at  -941,9 +949,12  at  at 
                if (isSoftErased()) {
                   notifyList = toDead(this.creatorSessionName, false);
                }
-               else {
+               else if (publishCounter==0) {
                   notifyList = toUnreferenced(false);
                }
+               else {
+            	   if (log.TRACE) log.trace(ME, "ignored the attempt to set topic unreferenced as other thread in publish");
+               }
             }
             catch (XmlBlasterException e) {
                log.error(ME, "Internal problem with entryDestroyed: " + e.getMessage() + ": " + toXml());