[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [xmlblaster] Missing messages
Hi, Michele,
Tryed another approach without affecting your current design. This
approach seems no gaps, but is not a pretty solution, as it exposes a
TopicHandler's thread counter to public and requires caller to invoke
three related methods in order.
How about using topic handler pooling and make each handler single
thread access?
Regards,
Eugene
Michele Laghi wrote:
Hi Eugene,
many thanks for the deep analysis and patches. We will look at them in
the next weeks (quite busy right now).
We will also try to find a solution to fill the gaps which still remain
since my experience is that if something is "theoretically possible"
then it will occur for sure in real life ;)
Thanks
Michele
Index: org/xmlBlaster/engine/TopicHandler.java
===================================================================
--- org/xmlBlaster/engine/TopicHandler.java (revision 15178)
+++ org/xmlBlaster/engine/TopicHandler.java (working copy)
at at -142,7 +142,8 at at
private boolean isRegisteredInBigXmlDom = false;
- private int publishCounter = 0; //count the threads running in publish method
+ private int publishCounter = 0; // count the threads running in publish method (for internal use)
+ private int publishCounter2 = 0; // count the threads invoking publish method (exposed to outside)
/**
* This topic is destroyed after given timeout
at at -482,6 +483,8 at at
* <br />
* Publish filter plugin checks are done already<br />
* Cluster forwards are done already.
+ *
+ * at see incrementPublishCounter()
*
* at param publisherSessionInfo The publisher
* at param msgUnit The new message
at at -1947,6 +1950,21 at at
}
/**
+ * Must call the method just befor calling publish(), and must guarantee
+ * decrementPublishCounter() be called after called publish().
+ */
+ public synchronized void incrementPublishCounter() {
+ publishCounter2 ++;
+ }
+
+ /**
+ * at see incrementPublishCounter()
+ */
+ public synchronized void decrementPublishCounter() {
+ publishCounter2 --;
+ }
+
+ /**
* Merge the message DOM tree into the big xmlBlaster DOM tree
*/
private void addToBigDom() throws XmlBlasterException {
at at -2196,7 +2214,7 at at
* This timeout occurs after a configured delay (destroyDelay) in UNREFERENCED state
*/
public final void timeout(Object userData) {
- if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy delay occurred - destroying topic now ...");
+ if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy delay occurred - destroying topic (oid="+getUniqueKey()+") now ...");
ArrayList notifyList = timeout();
if (notifyList != null) notifySubscribersAboutErase(notifyList); // must be outside the synchronize
}
at at -2207,6 +2225,10 at at
synchronized (this) {
if (isAlive()) // interim message arrived?
return null;
+ if (publishCounter2 != 0) {
+ log.finer("Ignored timeout event for publishing thread running");
+ return null;
+ }
return toDead(this.creatorSessionName, null, null);
}
}
Index: org/xmlBlaster/engine/RequestBroker.java
===================================================================
--- org/xmlBlaster/engine/RequestBroker.java (revision 15178)
+++ org/xmlBlaster/engine/RequestBroker.java (working copy)
at at -1686,6 +1686,7 at at
// Handle local message
// Find or create the topic
+ boolean newlycreated = false;
TopicHandler topicHandler = null;
synchronized(this.topicHandlerMap) {
if (!msgKeyData.getOid().equals(msgUnit.getKeyOid())) {
at at -1695,14 +1696,37 at at
Object obj = topicHandlerMap.get(msgUnit.getKeyOid());
if (obj == null) {
topicHandler = new TopicHandler(this, sessionInfo, msgUnit.getKeyOid()); // adds itself to topicHandlerMap
+ newlycreated = true;
}
else {
topicHandler = (TopicHandler)obj;
}
}
+
+ boolean counterIncreased = false;
+ if (!newlycreated && !topicHandler.isAlive()) {
+ log.finer("The topic is not ALIVE, need to check if it is dying or not");
+ synchronized(topicHandler) { // wiating if the topicHandler is dying.
+ if (topicHandler.isDead()) {
+ log.finer("The topic is DEAD, need to create a new one");
+ topicHandler = new TopicHandler(this, sessionInfo, msgUnit.getKeyOid());
+ }
+ else {
+ topicHandler.incrementPublishCounter(); // to provent the topicHandler from being timed out
+ counterIncreased = true; // do NOT insert any code that might throws any exception between this line and the line of topicHandler.publish() below.
+ }
+ }
+ }
// Process the message
- publishReturnQos = topicHandler.publish(sessionInfo, msgUnit, publishQos);
+ try {
+ publishReturnQos = topicHandler.publish(sessionInfo, msgUnit, publishQos);
+ }
+ finally {
+ if (counterIncreased) {
+ topicHandler.decrementPublishCounter(); // balance counter
+ }
+ }
if (publishReturnQos == null) { // assert only
StatusQosData qos = new StatusQosData(glob, MethodName.PUBLISH);