[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[xmlblaster] Another volatile bug fix patch



Although we see this bug with volatile messages, I believe
it would affect all users.

The problem was that when one client subscribed multiple
times to the same message (xpath), the subscription-id
was wrong when a published message matched.

For example, if I subscribed looking for //foo 3 times from
the same client, then publish a message that matches //foo,
from another client, the first one will get 3 update callbacks,
but depending on the timing, the subscription-id will be
duplicated or wrong for the given subscription.

Attached is a patch which includes a comment in the relevant
section on what was happening specifically.

It's not pretty, but it works for now for me.  A proper
fix will require some slight interface changes in the
callback worker classes.

--
David Kerry


--AqsLC8rIMeq19msA Content-Type: text/plain; charset=us-ascii Content-Disposition: attachment; filename="cb_volatile_patch.txt"

Index: src/java/org/xmlBlaster/engine/callback/CbConnection.java
===================================================================
RCS file: /opt/cvsroot/xmlBlaster/src/java/org/xmlBlaster/engine/callback/CbConnection.java,v
retrieving revision 1.10
diff -c -r1.10 CbConnection.java
*** src/java/org/xmlBlaster/engine/callback/CbConnection.java 2002/09/24 21:33:28 1.10
--- src/java/org/xmlBlaster/engine/callback/CbConnection.java 2002/11/01 00:04:11
***************
*** 190,195 ****
--- 190,198 ----
throw new XmlBlasterException(ME, "Internal problem: callback driver is in stater IS_DEAD, msg.length=" + msg.length + " messages are lost");
}


+ String[] returnVal=new String[msg.length];
+ MsgQueueEntry[] onemsg=new MsgQueueEntry[1];
+
// First we export the message (call the interceptor) ...
for (int i=0; i<msg.length; i++) {
I_Session sessionSecCtx = msg[i].getSessionInfo().getSecuritySession();
***************
*** 198,230 ****
throw new XmlBlasterException(ME+".accessDenied", "No session security context!");
}
//cbAddress[0] REDUCE UPDATE QOS!!! TODO
msg[i].setMessageUnit(sessionSecCtx.exportMessage(msg[i].getMessageUnit(i, msg.length, redeliver)));
if (log.DUMP) log.dump(ME, "CallbackQos=" + msg[i].getMessageUnit().getQos());
- }


!       try {
!          if (cbAddress.oneway()) {
!             cbDriver.sendUpdateOneway(msg);
!             if (log.TRACE) log.trace(ME, "Success, sent " + msg.length + " oneway messages.");
!             return null;
!          }

! if (log.TRACE) log.trace(ME, "Before update " + msg.length + " acknowledged messages ...");
! String[] returnVal = cbDriver.sendUpdate(msg);
! if (log.TRACE) log.trace(ME, "Success, sent " + msg.length + " acknowledged messages, return value #1 is '" + returnVal[0] + "'");
! return returnVal;
! }
! catch (XmlBlasterException e) {
! if (isPolling() && log.TRACE) log.trace(ME, "Exception from update(), retryCounter=" + retryCounter + ", state=" + getStateStr());
! handleTransition(false, true);
! throw e;
}
! catch (Throwable e) {
! log.error(ME, "Exception from update(), retryCounter=" + retryCounter + ", state=" + getStateStr() + ": " + e.toString());
! handleTransition(false, true);
! //if (!(e instanceof IOException)) e.printStackTrace();
! throw new XmlBlasterException(ME, e.toString());
! }
}


/** Ping the callback server of the client */
--- 201,245 ----
throw new XmlBlasterException(ME+".accessDenied", "No session security context!");
}
//cbAddress[0] REDUCE UPDATE QOS!!! TODO
+
+ // Calling this in bulk on the array falls apart if we're delivering
+ // the same message multiple times to the same subscriber. The qos ends
+ // up being set wrong for the subscription (one message --> multiple QoS
+ // values).
+ //
+ // Work around the problem by submitting messages to the driver one at
+ // at a time instead of all at once.
msg[i].setMessageUnit(sessionSecCtx.exportMessage(msg[i].getMessageUnit(i, msg.length, redeliver)));
if (log.DUMP) log.dump(ME, "CallbackQos=" + msg[i].getMessageUnit().getQos());


!          onemsg[0]=msg[i];
!
!          try {
!            if (cbAddress.oneway()) {
!               cbDriver.sendUpdateOneway(onemsg);
!               if (log.TRACE) log.trace(ME, "Success, sent 1 oneway messages.");
!               returnVal=null;
!               continue;
!            }

! if (log.TRACE) log.trace(ME, "Before update " + onemsg.length + " acknowledged messages ...");
! returnVal[i] = cbDriver.sendUpdate(onemsg)[0];
! if (log.TRACE) log.trace(ME, "Success, sent " + onemsg.length + " acknowledged messages, return value #1 is '" + returnVal[i] + "'");
! }
! catch (XmlBlasterException e) {
! if (isPolling() && log.TRACE) log.trace(ME, "Exception from update(), retryCounter=" + retryCounter + ", state=" + getStateStr());
! handleTransition(false, true);
! throw e;
! }
! catch (Throwable e) {
! log.error(ME, "Exception from update(), retryCounter=" + retryCounter + ", state=" + getStateStr() + ": " + e.toString());
! handleTransition(false, true);
! //if (!(e instanceof IOException)) e.printStackTrace();
! throw new XmlBlasterException(ME, e.toString());
! }
}
!
! return returnVal;
}


     /** Ping the callback server of the client */

--AqsLC8rIMeq19msA--