[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Fwd: Re: Fwd: Re: [xmlblaster] Question on pausing the update method long enough to process messages...



> To: xmlblaster at server.xmlblaster.org
> X-Mailer: Lotus Notes Release 5.0.4  June 8, 2000
> Message-ID: <OF0BDCA7E1.0B50621C-ON85256C80.001142F5 at caci.com>
> From: "Kelli Fuller" <kfuller at caci.com>
> Date: Thu, 28 Nov 2002 22:16:09 -0500
> X-MIMETrack: Serialize by Router on CACIMTA/CACI(Release 5.0.10 |March 22, 2002) at 11/28/2002
>  10:15:23 PM
> MIME-Version: 1.0
> Content-type: text/plain; charset=iso-8859-1
> Content-Transfer-Encoding: 8bit
> X-MIME-Autoconverted: from quoted-printable to 8bit by server.xmlblaster.org id gAT3Gee31475
>
>
> I actually thought about splitting the processing up. My concern was that
> after the messages in the queue were subscribed to once, I didn't want them
> to be subscribed to again. I thought that if I set the initial update to
> FALSE, that I would only be able to subscribe to new messages on the queue
> once and only once. But instead what I found was that I couldn't subscribe
> to anything ever with this setting. The following code is what I used to
> implement this:
>
> String sSubscribeKey = "/xmlBlaster/key/TRANSACTION[ at type='billing' or
>  at type='estimate']";
> SubscribeKeyWrapper sk = new SubscribeKeyWrapper(sSubscribeKey,
> Constants.XPATH);
> SubscribeQosWrapper sq = new SubscribeQosWrapper();
> sq.setInitialUpdate(false);
> SubscribeRetQos subRet = con.subscribe(sk.toXml(), sq.toXml());
>
> Am I misunderstanding what setInitialUpdate() does?
> Is it possible that the messages that are being published to the xmlBlaster
> queue are not being published as 'new' messages for some reason?
>
> Thanks so much for your help,
>
> Kelli
>
>
>
>
>                       "Kelli Fuller"
>                       <kellibfuller at hot        To:       kfuller at caci.com
>                       mail.com>                cc:
>                                                Subject:  Fwd: Re: Fwd: Re: [xmlblaster] Question on pausing the update method long
>                       11/28/2002 09:59          enough to process messages...
>                       PM
>
>
>
>
>
>
>
>
>
>
>
>
>
> >From: Heinrich Götzger <Heinrich.Goetzger at exploding-systems.de>
> >Reply-To: xmlblaster at server.xmlBlaster.org
> >To: xmlBlaster List <xmlblaster at server.xmlblaster.org>
> >Subject: Re: Fwd: Re: [xmlblaster] Question on pausing the update method
> >long enough to process messages...
> >Date: Thu, 28 Nov 2002 09:35:59 +0100 (CET)
> >
> >Kelli,
> >
> >hmm, this sounds quite interessting. So I'm still not sure if I get all
> >your requirements right.
> >
> >What I would see, that your problem has to break apart in small tasks. I
> >would think about putting single processing needs into single
> >tasks/Threads and try to uncouple them.
> >
> >So for example having some updateWorker thread in sys2 which receives all
> >msg from sys1 and does the processing independant of the subscriber
> >itself.
> >
> >Once a subscribe is placed, the update will be invoked, if there are
> >messages in the queue. The update will be called whenever there are
> >messages in the queue. Otherwise the update is blocked, once it's called
> >and not finished.
> >
> >If you want to know if you received all messages you are waiting for, you
> >need to publish the number of thme first. Right? Or send some
> >'end_of_messages' last.
> >
> >It probably would help to introduce some more publisher or subscribers
> >(in different threads?).
> >
> >I'm not sure if this can be solved with xmlBlaster settings itself without
> >a redesign of the clients. Otherwise I would need to spend more time to
> >capture the whole problem to find some solution.
> >
> >regards
> >
> >Heinrich
> >
> >
> >On Wed, 27 Nov 2002, Kelli Fuller wrote:
> >
> > >
> > > I'm sorry for the confusion. I am a bit confused myself so I know I'm
> >not
> > > explaining myself well. Let me try again...
> > >
> > > I'm working on a project that involves interfacing 2 systems, near real
> > > time. I'll call those systems, 'Sys1' and 'Sys2' for this example.
> > >
> > > We've written 2 Java applications, I'll call those 'Publish' and
> > > 'Subscribe', that run every 2 minutes.
> > >
> > > 'Publish' publishes 'Sys1's' messages to the xmlBlaster queue, and also
> > > checks for (subscribes to) acknowledgements from 'Subscribe'. We're
> > > estimating 20 or so messages every 2 minutes to be published.
> > >
> > > 'Subscribe' subscribes to those messages, converts them to xml DOM,
> >edits
> > > them, republishes them, erases the orig messages subscribed to then
> >creates
> > > xml files containing the republished content and stores them on a local
> > > directory for 'Sys2' to pick up at some specified time. Right after
> >that,
> > > 'Subscribe' checks for acknowledgements sent from 'Sys2' and if some
> are
> > > found, 'Subscribe' publishes those to the xmlBlaster queue for
> 'Publish'
> >to
> > > pick up.
> > >
> > > I have put display statements all over the 'Subscribe' code to watch
> >what's
> > > happening. My problem is that after I call con.subscribe(sk.toXml(),
> > > sq.toXml()), the update() method is invoked, which is fine, but within
> >that
> > > update() method, I have all the code that converts the messages to xml
> >DOM,
> > > edits them, republishes them, erases the orig messages subscribed to
> >then
> > > creates xml files containing the republished content and stores them on
>
> >a
> > > local directory.
> > >
> > > Based on my display statements, I have found that the update() method
> >gets
> > > invoked for each message that the subscribe retrieved, which is exactly
> > > what I need it to do. However, I have more code following the
> >con.subscribe
> > > that does other processing and it is getting executed BEFORE the
> >update()
> > > has been called for all the messages retrieved. This caused problems so
>
> >to
> > > get around that, I increased the sleep time. The following code snippet
> > > demonstrates what I did:
> > >
> > > START CODE SNIPPET
> > >
> >
> *****************************************************************************************************
>
> > > // First see how many messages we are going to retrieve when we
> > > subscribe...
> > > String sSubscribeKey = "/xmlBlaster/key/TRANSACTION[ at type='billing' or
> > >  at type='estimate']";
> > > GetKeyWrapper gk = new GetKeyWrapper(sSubscribeKey, Constants.XPATH);
> > > GetQosWrapper gq = new GetQosWrapper();
> > > MessageUnit[] messages = con.get(gk.toXml(), gq.toXml());
> > >
> > > // Get count of messages we're going to subscribe to...
> > > int updateCnt = messages.length;
> > >
> > > // If we have messages to subscribe to, go ahead and subscribe...
> > > if(updateCnt > 0)
> > > {
> > >       SubscribeKeyWrapper sk = new SubscribeKeyWrapper(sSubscribeKey,
> > > Constants.XPATH);
> > >       SubscribeQosWrapper sq = new SubscribeQosWrapper();
> > >       SubscribeRetQos subRet = con.subscribe(sk.toXml(), sq.toXml());
> > >
> > >       // Wait 3 secs for each message to ensure the update method has
> > >       // time to complete...
> > >       // This is fine for a few messages in the queue, but it will not
> >work
> > > for large numbers of messages in the queue.
> > >       for(int ii=0; ii<updateCnt; ii++)
> > >       {
> > >             try { Thread.currentThread().sleep(3000); }
> > >             catch(InterruptedException i) {}
> > >       }
> > > }
> > >
> > >
> > > // Once the update() processing has run for all messages, I want to get
>
> >all
> > > the xml files currently sitting in the acks folder plus do other things
>
> >but
> > > the following
> > > // code will get executed BEFORE the update() has completed for ALL
> > > messages if I don't have the 'sleep' code above. This is my problem...
> > > File tempUploadDir = new File("D:\\xml\\acknowledgements");
> > > String[] fileNameList = tempUploadDir.list();
> > > log.info("","****** There are "+fileNameList.length+" ack files to
> > > process.");
> > >
> > > String msg = "";
> > > String msgKey = "";
> > > String msgInfo = "";
> > >
> > > // For each file in the acks directory, read it in as an xml document,
> > > // create a string version of the xml doc, and publish message to
> > > xmlBlaster
> > > for (int i = 0; i < fileNameList.length; i++)
> > > {
> > >
> > >       File curUploadDir = new File ("D:\\xml\\acknowledgements",
> > > fileNameList[i]);
> > >             File f = new File(curUploadDir.toString());
> > >             Document dcXmlDoc = docbuilder.parse(f);
> > >
> > >       ...
> > >       ...
> > >       ...
> > >
> > >       msgInfo = "<TRANSACTION type='acknowledgement' url='"+sURL+"'/>";
> > >
> > >             //Send the message on its way
> > >       PublishKeyWrapper pk = new PublishKeyWrapper(msgKey, "text/xml");
> > >
> > >       pk.wrap(msgInfo);
> > >       PublishQosWrapper pq = new PublishQosWrapper();
> > >       MessageUnit msgUnit = new MessageUnit(pk.toXml(), msg.getBytes(),
> > > pq.toXml());
> > >
> > >       log.info("","****** PUBLISH: xmlString contains: " + xmlString);
> > >
> > >       con.publish(msgUnit);
> > >
> > > }
> > >
> > > //Disconnect from xmlBlaster
> > > DisconnectQos dq = new DisconnectQos();
> > > con.disconnect(dq);
> > > END CODE SNIPPET
> > >
> >
> *****************************************************************************************************
>
> > >
> > > So what I am trying to figure out is, how do I know when the update
> >handler
> > > is complete for ALL messages subscribed to? I am aware of the command
> >line
> > > arg, burstMode.collectTime 400, but this did not solve my problem. I've
> > > also tried using the code:
> > >
> > > **************************************************
> > > ConnectQos qos = new ConnectQos(glob, name, passwd);
> > > CallbackAddress cbAddress = new CallbackAddress(glob);
> > > cbAddress.setCollectTime(2000);      // sets burst mode to 2000
> > > milliseconds
> > > qos.addCallbackAddress(cbAddress);
> > > ConnectReturnQos connectReturnQos = con.connect(qos, this);
> > > **************************************************
> > >
> > > however this seemed to stall prior to even calling the update. What I
> >need
> > > is some way to stall so that once the update is invoked, everything
> gets
> > > done before moving on.
> > >
> > > Are there other settings in the qos that I should try? Or is the qos
> not
> > > where I need to focus?
> > >
> > > I hope this helps to clarify what I'm asking. If not, please let me
> know
> > > and I will try again.
> > >
> > > Thank you very much,
> > > Kelli
> > >
> > >
> > >
> > >
> > >                       "Kelli Fuller"
> > >                       <kellibfuller at hot        To:
> >kfuller at caci.com
> > >                       mail.com>                cc:
> > >                                                Subject:  Fwd: Re:
> >[xmlblaster] Question on pausing the update method long enough to
> > >                       11/27/2002 12:06          process messages...
> > >                       PM
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > >From: Heinrich Götzger <Heinrich.Goetzger at exploding-systems.de>
> > > >Reply-To: xmlblaster at server.xmlBlaster.org
> > > >To: xmlBlaster List <xmlblaster at server.xmlblaster.org>
> > > >Subject: Re: [xmlblaster] Question on pausing the update method long
> > > enough
> > > >to process messages...
> > > >Date: Wed, 27 Nov 2002 16:54:53 +0100 (CET)
> > > >
> > > >Kelli,
> > > >
> > > >I'm not sure if I got your problem right.
> > > >
> > > >Do you want some store and forward queue which will be filled by the
> > > >update method?
> > > >
> > > >
> > > >Heinrich
> > > >
> > > >
> > > >On Wed, 27 Nov 2002, Kelli Fuller wrote:
> > > >
> > > > > I am trying to learn how to use xmlBlaster. With that, I have
> >written a
> > > > > pretty basic class to subscribe to and publish messages. In this
> >class,
> > >
> > > >I
> > > > > only subscribe to messages once and I only have one update()
> method,
> > > > > however in that method I take the content of the message, covert it
>
> >to
> > > a
> > > > > xml DOM, edit some node values then I republish the message with a
> >new
> > > >key
> > > > > and erase the original message then finally create a xml file to
> >store
> > > >in a
> > > > > directory for another system to pick up. This class is on a
> >scheduler
> > > >that
> > > > > runs every 2 mintues.
> > > > >
> > > > > Currently, I am forcing a sleep period right after the code that
> > > >subscribes
> > > > > of 3 secs per message, like this:
> > > > >
> > > > > if(updateCnt > 0)
> > > > > {
> > > > >       SubscribeKeyWrapper sk = new
> >SubscribeKeyWrapper(sSubscribeKey,
> > > > > Constants.XPATH);
> > > > >       SubscribeQosWrapper sq = new SubscribeQosWrapper();
> > > > >       SubscribeRetQos subRet = con.subscribe(sk.toXml(),
> >sq.toXml());
> > > > >
> > > > >       // Wait 3 secs for each message to ensure the update method
> >has
> > > >time
> > > > >       // to complete...
> > > > >       for(int ii=0; ii<updateCnt; ii++)
> > > > >       {
> > > > >             try { Thread.currentThread().sleep(3000); }
> > > > >             catch(InterruptedException i) {}
> > > > >       }
> > > > > }
> > > > >
> > > > > I have to do this to prevent the code below this from being
> >executed.
> > > >This
> > > > > is not a viable solution though because of the number of messages
> >that
> > > > > could be in the queue at any given time.
> > > > >
> > > > > How can I ensure that all messages I've subscribed to run through
> >the
> > > > > update method before continuring with any other processing? I've
> >read
> > > >about
> > > > > setting the burstMode.collectTime as an argument, which I've tried,
>
> >but
> > > > > this didn't seem to solve my problem.
> > > > >
> > > > > I also need to ensure that for the current call to my class, that
> >the
> > > > > update method doesn't get called by xmlBlaster when new messages
> are
> > > > > published. I want the next call to my class to get any new
> published
> > > > > messages and so on. So in other words, I want to subscribe to one
> >set
> > > of
> > > > > messages, call update for only that set of messages given enough
> >time
> > > to
> > > > > process as described above and stop and wait 2 minutes for this
> >class
> > > to
> > > >be
> > > > > called again and repeat.
> > > > >
> > > > > How can I control this?
> > > > >
> > > > > Thanks so much,
> > > > >
> > > > > Kelli
> > > > >
>
>
> _________________________________________________________________
> The new MSN 8: smart spam protection and 2 months FREE*
> http://join.msn.com/?page=features/junkmail
>
>
>
>
>
>
>

Mit freundlichen Grüßen

Heinrich Götzger
--
Heinrich Götzger, Dipl.-Inform. (FH)              Exploding Systems GmbH
fon: +49-0170-9334400                                  Rheingutstraße 45
mailto:Heinrich.Goetzger at Exploding-Systems.de             78462 Konstanz
http://www.Exploding-Systems.de