[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[xmlblaster] How to persist?



Hello.
I'm trying to use persistence messages, but i think i missed something in examples.
When i publish message into topic only last persist. Moreover, this message exist every time when i subscribe to topic.

Here the sample code (formerly HelloWorld).
================Publish===================
        I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();

        try {
            String name = glob.getProperty().get("session.name", "XBTestsPublish");
            String passwd = glob.getProperty().get("passwd", "secret");

            ConnectQos qos = new ConnectQos(glob, name, passwd);
            qos.setPersistent(true);
            con.connect(qos, this);

            PublishKey pk = new PublishKey(glob, "HelloWorld", "text/xml", "1.0");

            PublishQos pq = new PublishQos(glob);
            pq.setPersistent(true);

            MsgUnit msgUnit;
            PublishReturnQos prq;

            for (int i = 0; i < 10; i++) {
                msgUnit = new MsgUnit(pk, "Hi " + i, pq);
                prq = con.publish(msgUnit);
                log.info("Got status='" + prq.getState() + "' for published message '" + prq.getKeyOid());
                System.out.println("Got status='" + prq.getState() + "' for published message '" + prq.getKeyOid());
            }

            try {
                Thread.sleep(1000);
            }
            catch (InterruptedException i) {
            } // wait a second to receive update()

            DisconnectQos dq = new DisconnectQos(glob);
            con.disconnect(dq);
            glob.shutdown(); // free resources
        }catch (Throwable e) {
            log.severe(e.toString());
            e.printStackTrace();
        }
==============================
============Subscribe============
        I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();

        try {
            String name = glob.getProperty().get("session.name", "XBTestSubscribe/5");
            String passwd = glob.getProperty().get("passwd", "secret");

            ConnectQos qos = new ConnectQos(glob, name, passwd);
            ConnectReturnQos crq = con.connect(qos, this);

            String subId = Constants.SUBSCRIPTIONID_PREFIX + crq.getSessionName().getRelativeName(true) + "-" + "4";

            SubscribeKey sk = new SubscribeKey(glob, "HelloWorld");
            SubscribeQos sq = new SubscribeQos(glob);
            sq.setSubscriptionId(subId);
//            sq.setWantUpdateOneway();
            SubscribeReturnQos subRet = con.subscribe(sk, sq, new I_Callback() {
                public String update(String cbSessionId, UpdateKey updateKey, byte[] bytes, UpdateQos updateQos) throws XmlBlasterException {
                    System.out.println("message = " + (new String(bytes)));
                    return null;
                }
            });

            try {
                Global.waitOnKeyboardHit("Success, hit a key to exit");
            }
            catch (Exception i) {
            } // wait a second to receive update()

            UnSubscribeKey uk = new UnSubscribeKey(glob, subRet.getSubscriptionId());
            UnSubscribeQos uq = new UnSubscribeQos(glob);
            UnSubscribeReturnQos[] urq = con.unSubscribe(uk, uq);
            if (urq.length > 0) log.info("Unsubscribed from topic");

            DisconnectQos dq = new DisconnectQos(glob);
            con.disconnect(dq);
            glob.shutdown(); // free resources
        }catch (Throwable e) {
            log.severe(e.toString());
            e.printStackTrace();
        }
===============================