[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [xmlblaster] How to persist?



Hi,

you should also mark your subscription to be persistent.
Like this after server restart the messages are collected even
if your subscriber has not yet connected,
Marcel


Michele wrote:
Hi Paul,
that is the correct behaviour, in order to get all messages you must make sure to be subscribed. If your publishes have been done before your subscription you get only the last published message. Persistent means that the message is saved persistently until delivery it does not mean you can get it even if you suscribe after having being published. I think the behaviour you are looking for is a request to the history queue. Read more on how to make a query to the history queue:


http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.qos.queryspec.QueueQuery.html


Also remember you have to configure the history queue to be sufficiently big to keep the messages you need.


Regards
Michele



Paul Babyak wrote:
Hello.
I'm trying to use persistence messages, but i think i missed something in examples.
When i publish message into topic only last persist. Moreover, this message exist every time when i subscribe to topic.


Here the sample code (formerly HelloWorld).
================Publish===================
        I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();

try {
String name = glob.getProperty().get("session.name <http://session.name>", "XBTestsPublish");
String passwd = glob.getProperty().get("passwd", "secret");


            ConnectQos qos = new ConnectQos(glob, name, passwd);
            qos.setPersistent(true);
            con.connect(qos, this);

PublishKey pk = new PublishKey(glob, "HelloWorld", "text/xml", "1.0");

            PublishQos pq = new PublishQos(glob);
            pq.setPersistent(true);

            MsgUnit msgUnit;
            PublishReturnQos prq;

for (int i = 0; i < 10; i++) {
msgUnit = new MsgUnit(pk, "Hi " + i, pq);
prq = con.publish(msgUnit);
log.info <http://log.info>("Got status='" + prq.getState() + "' for published message '" + prq.getKeyOid());
System.out.println("Got status='" + prq.getState() + "' for published message '" + prq.getKeyOid());
}


            try {
                Thread.sleep(1000);
            }
            catch (InterruptedException i) {
            } // wait a second to receive update()

            DisconnectQos dq = new DisconnectQos(glob);
            con.disconnect(dq);
            glob.shutdown(); // free resources
        }catch (Throwable e) {
            log.severe(e.toString());
            e.printStackTrace();
        }
==============================
============Subscribe============
        I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();

try {
String name = glob.getProperty().get("session.name <http://session.name>", "XBTestSubscribe/5");
String passwd = glob.getProperty().get("passwd", "secret");


            ConnectQos qos = new ConnectQos(glob, name, passwd);
            ConnectReturnQos crq = con.connect(qos, this);

String subId = Constants.SUBSCRIPTIONID_PREFIX + crq.getSessionName().getRelativeName(true) + "-" + "4";

SubscribeKey sk = new SubscribeKey(glob, "HelloWorld");
SubscribeQos sq = new SubscribeQos(glob);
sq.setSubscriptionId(subId);
// sq.setWantUpdateOneway();
SubscribeReturnQos subRet = con.subscribe(sk, sq, new I_Callback() {
public String update(String cbSessionId, UpdateKey updateKey, byte[] bytes, UpdateQos updateQos) throws XmlBlasterException {
System.out.println("message = " + (new String(bytes)));
return null;
}
});


            try {
                Global.waitOnKeyboardHit("Success, hit a key to exit");
            }
            catch (Exception i) {
            } // wait a second to receive update()

UnSubscribeKey uk = new UnSubscribeKey(glob, subRet.getSubscriptionId());
UnSubscribeQos uq = new UnSubscribeQos(glob);
UnSubscribeReturnQos[] urq = con.unSubscribe(uk, uq);
if (urq.length > 0) log.info <http://log.info>("Unsubscribed from topic");


            DisconnectQos dq = new DisconnectQos(glob);
            con.disconnect(dq);
            glob.shutdown(); // free resources
        }catch (Throwable e) {
            log.severe(e.toString());
            e.printStackTrace();
        }
===============================



--
Marcel Ruff
http://www.xmlBlaster.org
http://watchee.net
Phone: +49 7551 309371