[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[xmlblaster] Another volatile bug fix patch
Although we see this bug with volatile messages, I believe
it would affect all users.
The problem was that when one client subscribed multiple
times to the same message (xpath), the subscription-id
was wrong when a published message matched.
For example, if I subscribed looking for //foo 3 times from
the same client, then publish a message that matches //foo,
from another client, the first one will get 3 update callbacks,
but depending on the timing, the subscription-id will be
duplicated or wrong for the given subscription.
Attached is a patch which includes a comment in the relevant
section on what was happening specifically.
It's not pretty, but it works for now for me. A proper
fix will require some slight interface changes in the
callback worker classes.
--
David Kerry
--AqsLC8rIMeq19msA
Content-Type: text/plain; charset=us-ascii
Content-Disposition: attachment; filename="cb_volatile_patch.txt"
Index: src/java/org/xmlBlaster/engine/callback/CbConnection.java
===================================================================
RCS file: /opt/cvsroot/xmlBlaster/src/java/org/xmlBlaster/engine/callback/CbConnection.java,v
retrieving revision 1.10
diff -c -r1.10 CbConnection.java
*** src/java/org/xmlBlaster/engine/callback/CbConnection.java 2002/09/24 21:33:28 1.10
--- src/java/org/xmlBlaster/engine/callback/CbConnection.java 2002/11/01 00:04:11
***************
*** 190,195 ****
--- 190,198 ----
throw new XmlBlasterException(ME, "Internal problem: callback driver is in stater
IS_DEAD, msg.length=" + msg.length + " messages are lost");
}
+ String[] returnVal=new String[msg.length];
+ MsgQueueEntry[] onemsg=new MsgQueueEntry[1];
+
// First we export the message (call the interceptor) ...
for (int i=0; i<msg.length; i++) {
I_Session sessionSecCtx = msg[i].getSessionInfo().getSecuritySession();
***************
*** 198,230 ****
throw new XmlBlasterException(ME+".accessDenied", "No session security context!");
}
//cbAddress[0] REDUCE UPDATE QOS!!! TODO
msg[i].setMessageUnit(sessionSecCtx.exportMessage(msg[i].getMessageUnit(i, msg.length,
redeliver)));
if (log.DUMP) log.dump(ME, "CallbackQos=" + msg[i].getMessageUnit().getQos());
- }
! try {
! if (cbAddress.oneway()) {
! cbDriver.sendUpdateOneway(msg);
! if (log.TRACE) log.trace(ME, "Success, sent " + msg.length + " oneway messages.");
! return null;
! }
! if (log.TRACE) log.trace(ME, "Before update " + msg.length + " acknowledged messages ...");
! String[] returnVal = cbDriver.sendUpdate(msg);
! if (log.TRACE) log.trace(ME, "Success, sent " + msg.length + " acknowledged messages,
return value #1 is '" + returnVal[0] + "'");
! return returnVal;
! }
! catch (XmlBlasterException e) {
! if (isPolling() && log.TRACE) log.trace(ME, "Exception from update(), retryCounter=" +
retryCounter + ", state=" + getStateStr());
! handleTransition(false, true);
! throw e;
}
! catch (Throwable e) {
! log.error(ME, "Exception from update(), retryCounter=" + retryCounter + ", state=" +
getStateStr() + ": " + e.toString());
! handleTransition(false, true);
! //if (!(e instanceof IOException)) e.printStackTrace();
! throw new XmlBlasterException(ME, e.toString());
! }
}
/** Ping the callback server of the client */
--- 201,245 ----
throw new XmlBlasterException(ME+".accessDenied", "No session security context!");
}
//cbAddress[0] REDUCE UPDATE QOS!!! TODO
+
+ // Calling this in bulk on the array falls apart if we're delivering
+ // the same message multiple times to the same subscriber. The qos ends
+ // up being set wrong for the subscription (one message --> multiple QoS
+ // values).
+ //
+ // Work around the problem by submitting messages to the driver one at
+ // at a time instead of all at once.
msg[i].setMessageUnit(sessionSecCtx.exportMessage(msg[i].getMessageUnit(i, msg.length,
redeliver)));
if (log.DUMP) log.dump(ME, "CallbackQos=" + msg[i].getMessageUnit().getQos());
! onemsg[0]=msg[i];
!
! try {
! if (cbAddress.oneway()) {
! cbDriver.sendUpdateOneway(onemsg);
! if (log.TRACE) log.trace(ME, "Success, sent 1 oneway messages.");
! returnVal=null;
! continue;
! }
! if (log.TRACE) log.trace(ME, "Before update " + onemsg.length + " acknowledged messages
...");
! returnVal[i] = cbDriver.sendUpdate(onemsg)[0];
! if (log.TRACE) log.trace(ME, "Success, sent " + onemsg.length + " acknowledged
messages, return value #1 is '" + returnVal[i] + "'");
! }
! catch (XmlBlasterException e) {
! if (isPolling() && log.TRACE) log.trace(ME, "Exception from update(), retryCounter=" +
retryCounter + ", state=" + getStateStr());
! handleTransition(false, true);
! throw e;
! }
! catch (Throwable e) {
! log.error(ME, "Exception from update(), retryCounter=" + retryCounter + ", state=" +
getStateStr() + ": " + e.toString());
! handleTransition(false, true);
! //if (!(e instanceof IOException)) e.printStackTrace();
! throw new XmlBlasterException(ME, e.toString());
! }
}
!
! return returnVal;
}
/** Ping the callback server of the client */
--AqsLC8rIMeq19msA--