[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [xmlblaster] How to persist?



Hi Paul,
that is the correct behaviour, in order to get all messages you must make sure to be subscribed. If your publishes have been done before your subscription you get only the last published message. Persistent means that the message is saved persistently until delivery it does not mean you can get it even if you suscribe after having being published. I think the behaviour you are looking for is a request to the history queue. Read more on how to make a query to the history queue:


http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.qos.queryspec.QueueQuery.html

Also remember you have to configure the history queue to be sufficiently big to keep the messages you need.

Regards
Michele



Paul Babyak wrote:
Hello.
I'm trying to use persistence messages, but i think i missed something in examples.
When i publish message into topic only last persist. Moreover, this message exist every time when i subscribe to topic.


Here the sample code (formerly HelloWorld).
================Publish===================
        I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();

try {
String name = glob.getProperty().get("session.name <http://session.name>", "XBTestsPublish");
String passwd = glob.getProperty().get("passwd", "secret");


            ConnectQos qos = new ConnectQos(glob, name, passwd);
            qos.setPersistent(true);
            con.connect(qos, this);

PublishKey pk = new PublishKey(glob, "HelloWorld", "text/xml", "1.0");

            PublishQos pq = new PublishQos(glob);
            pq.setPersistent(true);

            MsgUnit msgUnit;
            PublishReturnQos prq;

for (int i = 0; i < 10; i++) {
msgUnit = new MsgUnit(pk, "Hi " + i, pq);
prq = con.publish(msgUnit);
log.info <http://log.info>("Got status='" + prq.getState() + "' for published message '" + prq.getKeyOid());
System.out.println("Got status='" + prq.getState() + "' for published message '" + prq.getKeyOid());
}


            try {
                Thread.sleep(1000);
            }
            catch (InterruptedException i) {
            } // wait a second to receive update()

            DisconnectQos dq = new DisconnectQos(glob);
            con.disconnect(dq);
            glob.shutdown(); // free resources
        }catch (Throwable e) {
            log.severe(e.toString());
            e.printStackTrace();
        }
==============================
============Subscribe============
        I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();

try {
String name = glob.getProperty().get("session.name <http://session.name>", "XBTestSubscribe/5");
String passwd = glob.getProperty().get("passwd", "secret");


            ConnectQos qos = new ConnectQos(glob, name, passwd);
            ConnectReturnQos crq = con.connect(qos, this);

String subId = Constants.SUBSCRIPTIONID_PREFIX + crq.getSessionName().getRelativeName(true) + "-" + "4";

SubscribeKey sk = new SubscribeKey(glob, "HelloWorld");
SubscribeQos sq = new SubscribeQos(glob);
sq.setSubscriptionId(subId);
// sq.setWantUpdateOneway();
SubscribeReturnQos subRet = con.subscribe(sk, sq, new I_Callback() {
public String update(String cbSessionId, UpdateKey updateKey, byte[] bytes, UpdateQos updateQos) throws XmlBlasterException {
System.out.println("message = " + (new String(bytes)));
return null;
}
});


            try {
                Global.waitOnKeyboardHit("Success, hit a key to exit");
            }
            catch (Exception i) {
            } // wait a second to receive update()

UnSubscribeKey uk = new UnSubscribeKey(glob, subRet.getSubscriptionId());
UnSubscribeQos uq = new UnSubscribeQos(glob);
UnSubscribeReturnQos[] urq = con.unSubscribe(uk, uq);
if (urq.length > 0) log.info <http://log.info>("Unsubscribed from topic");


            DisconnectQos dq = new DisconnectQos(glob);
            con.disconnect(dq);
            glob.shutdown(); // free resources
        }catch (Throwable e) {
            log.severe(e.toString());
            e.printStackTrace();
        }
===============================