[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [xmlblaster] missing volatile messages
Marcel,
I fixed a multithread racing issue which caused message losing. The
patch is attached with the mail.
It was concerned with org.xmlBlaster.engine.TopicHandler.publish() and
entryDestoyed(). Let's assume thread A just called toAlive() in line
537, which set topic state to ALIVE. Before thread A pushs a msgUnit
into callback queue(s) through
TopicHandler.invokeCallbackAndHandleFailure() in line 645 -->
invokeCallback() in line 1337, another thread B just finished its
publish in the same topic and might triggered a message distroying,
which, in turn, might switch topic state to UNREFERENCED. When thread
A reached TopicHandler.invokeCallback() in line 1321, it would find the
topic was in invalid state and droped the message.
I used a publishCounter to record the number of running threads in the
publish method. When entryDetryed() intents to change topic state to
UNREFERENCED, it will first check the publishCounter. If it is greater
than one, don't change topic state.
I just dealed with one scenario in one possible way without looking at
the whole picture.
Eugene
--- xmlBlaster_1.1.1_orig/src/java/org/xmlBlaster/engine/TopicHandler.java 2006-01-03 19:01:41.000000000 +0000
+++ xmlBlaster_1.1.1_my/src/java/org/xmlBlaster/engine/TopicHandler.java 2006-04-12 15:16:53.000000000 +0000
at at -139,6 +139,8 at at
private boolean isRegisteredInBigXmlDom = false;
+ private int publishCounter = 0; //count the threads running in publish method
+
/**
* This topic is destroyed after given timeout
* The timer is activated on state change to UNREFERENCED
at at -566,7 +568,12 at at
}
msgUnitWrapper = new MsgUnitWrapper(glob, msgUnit, this.msgUnitCache, initialCounter, 0, -1);
-
+
+ publishCounter++;
+ if (!isAlive()) {
+ toAlive();
+ }
+
// Forcing RAM entry temporary (reset in finally below) to avoid performance critical harddisk IO during initialization, every callback/subject/history queue put()/take() is changing the reference counter of MsgUnitWrapper. For persistent messages this needs to be written to harddisk
// If the server crashed during this RAM operation it is not critical as the publisher didn't get an ACK yet
synchronized(this.msgUnitWrapperUnderConstruction) {
at at -649,6 +656,7 at at
finally {
if (msgUnitWrapper != null) {
synchronized(this) {
+ publishCounter--;
synchronized(msgUnitWrapper) {
synchronized(this.msgUnitCache) {
try {
at at -941,9 +949,12 at at
if (isSoftErased()) {
notifyList = toDead(this.creatorSessionName, false);
}
- else {
+ else if (publishCounter==0) {
notifyList = toUnreferenced(false);
}
+ else {
+ if (log.TRACE) log.trace(ME, "ignored the attempt to set topic unreferenced as other thread in publish");
+ }
}
catch (XmlBlasterException e) {
log.error(ME, "Internal problem with entryDestroyed: " + e.getMessage() + ": " + toXml());