1 // xmlBlaster/demo/HelloWorld4.java
  2 import java.util.List;
  3 import java.util.logging.Level;
  4 import java.util.logging.Logger;
  5 
  6 import org.xmlBlaster.client.I_Callback;
  7 import org.xmlBlaster.client.I_ConnectionStateListener;
  8 import org.xmlBlaster.client.I_XmlBlasterAccess;
  9 import org.xmlBlaster.client.key.EraseKey;
 10 import org.xmlBlaster.client.key.GetKey;
 11 import org.xmlBlaster.client.key.PublishKey;
 12 import org.xmlBlaster.client.key.SubscribeKey;
 13 import org.xmlBlaster.client.key.UpdateKey;
 14 import org.xmlBlaster.client.qos.ConnectQos;
 15 import org.xmlBlaster.client.qos.ConnectReturnQos;
 16 import org.xmlBlaster.client.qos.DisconnectQos;
 17 import org.xmlBlaster.client.qos.EraseQos;
 18 import org.xmlBlaster.client.qos.GetQos;
 19 import org.xmlBlaster.client.qos.GetReturnQos;
 20 import org.xmlBlaster.client.qos.PublishQos;
 21 import org.xmlBlaster.client.qos.PublishReturnQos;
 22 import org.xmlBlaster.client.qos.SubscribeQos;
 23 import org.xmlBlaster.client.qos.UpdateQos;
 24 import org.xmlBlaster.util.Global;
 25 import org.xmlBlaster.util.MsgUnit;
 26 import org.xmlBlaster.util.XmlBlasterException;
 27 import org.xmlBlaster.util.def.MethodName;
 28 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 29 import org.xmlBlaster.util.dispatch.I_PostSendListener;
 30 import org.xmlBlaster.util.error.I_MsgErrorHandler;
 31 import org.xmlBlaster.util.error.I_MsgErrorInfo;
 32 import org.xmlBlaster.util.qos.QosData;
 33 import org.xmlBlaster.util.queue.I_Entry;
 34 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
 35 
 36 
 37 /**
 38  * This client connects to xmlBlaster in failsafe mode and uses specific update handlers. 
 39  * <p />
 40  * In fail save mode the client will poll for the xmlBlaster server and
 41  * queue messages until the server is available.
 42  * We show all available control of a client in failsafe mode.
 43  * <p />
 44  * Invoke: java HelloWorld4 -session.name joe/2 -passwd secret
 45  * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
 46  */
 47 public class HelloWorld4
 48 {
 49    private final Global glob;
 50    private static Logger log = Logger.getLogger(HelloWorld4.class.getName());
 51    private I_XmlBlasterAccess con = null;
 52    private ConnectReturnQos conRetQos = null;
 53 
 54    public HelloWorld4(final Global glob) {
 55       this.glob = glob;
 56 
 57 
 58       try {
 59          con = glob.getXmlBlasterAccess();
 60 
 61          // Do all client side error handling our self
 62          // this error handler is called when we are/were polling for the server:
 63          con.setClientErrorHandler(new I_MsgErrorHandler() {
 64 
 65                public void handleError(I_MsgErrorInfo msgErrorInfo) {
 66                   if (msgErrorInfo == null) return;
 67                   XmlBlasterException ex = msgErrorInfo.getXmlBlasterException();
 68                   if (ex.isUser()) {
 69                      log.severe("Connection failed: " + msgErrorInfo.getXmlBlasterException().getMessage());
 70                      if (msgErrorInfo.getDispatchManager() != null) {
 71                         msgErrorInfo.getDispatchManager().toDead(msgErrorInfo.getXmlBlasterException());
 72                         if (msgErrorInfo.getQueue() != null)
 73                            msgErrorInfo.getQueue().clear();
 74                         msgErrorInfo.getDispatchManager().shutdown();
 75                         return;
 76                      }
 77                   }
 78                   MsgQueueEntry[] entries = msgErrorInfo.getMsgQueueEntries();
 79                   for (int i=0; i<entries.length; i++)
 80                      log.severe("Message '" + entries[i].getEmbeddedType() + "' '" +
 81                                    entries[i].getLogId() + "' is lost: " + msgErrorInfo.getXmlBlasterException().getMessage());
 82                   if (msgErrorInfo.getQueue() != null)
 83                      msgErrorInfo.getQueue().clear();
 84                }
 85 
 86                public String handleErrorSync(I_MsgErrorInfo msgErrorInfo) throws XmlBlasterException {
 87                   if (msgErrorInfo.getXmlBlasterException().isCommunication()) {
 88                      handleError(msgErrorInfo);
 89                      return "";
 90                   }
 91                   throw msgErrorInfo.getXmlBlasterException(); // Throw back to client
 92                }
 93 
 94                public void shutdown() {
 95                }
 96             }
 97          );
 98          
 99          // This listener receives only events from asynchronously send messages from queue.
100          // e.g. after a reconnect when client side queued messages are delivered
101          con.registerPostSendListener(new I_PostSendListener() {
102             /**
103              * @see I_PostSendListener#postSend(MsgQueueEntry[])
104              */
105             public void postSend(MsgQueueEntry[] entries) {
106                try {
107                   for (int i=0; i<entries.length; i++) {
108                      if (MethodName.PUBLISH.equals(entries[i].getMethodName())) { 
109                         MsgUnit msg = entries[i].getMsgUnit();
110                         PublishReturnQos retQos = (PublishReturnQos)entries[i].getReturnObj();
111                         log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue: " + retQos.toXml());
112                      }
113                      else
114                         log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
115                   }
116                } catch (Throwable e) {
117                   e.printStackTrace();
118                }
119             }
120 
121             /**
122              * @see I_PostSendListener#sendingFailed(MsgQueueEntry[], XmlBlasterException)
123              */
124             public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException ex) {
125                try {
126                   for (int i=0; i<entries.length; i++) {
127                      if (MethodName.PUBLISH.equals(entries[i].getMethodName())) { 
128                         MsgUnit msg = entries[i].getMsgUnit();
129                         log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue failed: " + ex.getMessage());
130                      }
131                      else
132                         log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
133                   }
134                } catch (Throwable e) {
135                   e.printStackTrace();
136                }
137                return false; // false: message remains in queue and we go to dead
138             }
139          });
140 
141 
142          // Listen on status changes of our connection to xmlBlaster
143          con.registerConnectionListener(new I_ConnectionStateListener() {
144                
145                public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
146                   log.info("I_ConnectionStateListener.reachedAlive(): We were lucky, connected to " +
147                            connection.getConnectReturnQos().getSessionName());
148                   if (connection.getQueue().getNumOfEntries() > 0) {
149                      log.info("I_ConnectionStateListener.reachedAlive(): Queue contains " +
150                               connection.getQueue().getNumOfEntries() + " messages: " +
151                               connection.getQueue().toXml(""));
152                      try {
153                       List<I_Entry> list = connection.getQueue().peek(-1, -1);
154                         for (int i=0; i<list.size(); i++) {
155                            log.info(((MsgQueueEntry)list.get(i)).toXml());
156                         }
157                         /*
158                         MsgQueueEntry entry = (MsgQueueEntry)connection.getQueue().peek();
159                         log.info("I_ConnectionStateListener.reachedAlive(): Discarding messages from queue");
160                         connection.getQueue().clear(); // e.g. discard all msgs (it is our choice)
161                         if (MethodName.CONNECT == entry.getMethodName()) {
162                            connection.getQueue().put(entry, false);
163                         }
164                         */
165                      }
166                      catch (XmlBlasterException e) {
167                      }
168                   }
169                   if (!connection.getConnectReturnQos().isReconnected()) {
170                      log.info("I_ConnectionStateListener.reachedAlive(): New server instance found");
171                      if (connection.isConnected())
172                         initClient();    // initialize subscription etc. again
173                   }
174                   else {
175                      log.info("I_ConnectionStateListener.reachedAlive(): Same server instance found");
176                   }
177                }
178 
179                public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
180                   log.warning("I_ConnectionStateListener.reachedPolling(): No connection to " + glob.getId() + ", we are polling ...");
181                   if (!connection.isConnected())
182                      initClient();
183                }
184 
185                public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
186                   log.severe("I_ConnectionStateListener.reachedDead(): Connection to " + glob.getId() + " is dead, good bye");
187                   System.exit(1);
188                }
189                public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
190                }
191 
192             });
193 
194 
195          ConnectQos qos = new ConnectQos(glob);
196          conRetQos = con.connect(qos, new I_Callback() {
197 
198             public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
199                if (log.isLoggable(Level.FINEST)) log.finest("UpdateKey.toString()=" + updateKey.toString() +
200                                           "UpdateQos.toString()=" + updateQos.toString());
201                if (updateKey.isInternal()) {
202                   log.severe("Receiving unexpected asynchronous internal message '" + updateKey.getOid() +
203                                 "' in default handler");
204                   return "";
205                }
206                if (updateQos.isErased()) {
207                   log.info("Message '" + updateKey.getOid() + "' is erased");
208                   return "";
209                }
210                if (updateKey.getOid().equals("Banking"))
211                   log.info("Receiving asynchronous message '" + updateKey.getOid() +
212                                "' state=" + updateQos.getState() + " in default handler");
213                else
214                   log.severe("Receiving unexpected asynchronous message '" + updateKey.getOid() +
215                                    "' in default handler");
216                return "";
217             }
218 
219          });  // Login to xmlBlaster, default handler for updates
220 
221 
222          if (con.isAlive())
223             log.info("Connected as " + qos.getUserId() + " to xmlBlaster, your public session ID is " + conRetQos.getSessionName());
224          else
225             log.info("Not connected to xmlBlaster, proceeding in fail save mode ...");
226 
227          while (true) {
228             // Wait a second for messages to arrive before we logout
229             try { Thread.sleep(1000); } catch( InterruptedException i) {}
230             int key = Global.waitOnKeyboardHit("Hit a key: 'p'=publish, 'g'=get, 'q'=exit");
231             if (key == 'p') {
232                publishMessages();
233                continue;
234             }
235             else if (key == 'g') {
236                GetKey gk = new GetKey(glob, "Banking");
237                GetQos gq = new GetQos(glob);
238                try {
239                   MsgUnit[] msgs = con.get(gk, gq);
240                   if (msgs.length > 0) {
241                      GetReturnQos grq = new GetReturnQos(glob, msgs[0].getQos());
242                      log.info("Accessed xmlBlaster message with content '" + new String(msgs[0].getContent()) +
243                                "' and status=" + grq.getState());
244                   }
245                   else {
246                      log.info("No message matched get() call on " + gk.getOid());
247                   }
248                }
249                catch (XmlBlasterException e) {
250                   log.warning("get() failed:" + e.getMessage());
251                }
252                continue;
253             }
254             else if (key == 'q') {
255                break;
256             }
257          }
258       }
259       catch (XmlBlasterException e) {
260          log.severe("Houston, we have a problem: " + e.getMessage());
261       }
262       finally {
263          if (con != null) {
264             if (con.isConnected()) {
265                try {
266                   EraseQos eq = new EraseQos(glob);
267 
268                   EraseKey ek = new EraseKey(glob, "HelloWorld4");
269                   con.erase(ek, eq);
270                   
271                   ek = new EraseKey(glob, "Banking");
272                   con.erase(ek, eq);
273 
274                   // Wait on message erase events
275                   try { Thread.sleep(1000); } catch( InterruptedException i) {}
276                }
277                catch (XmlBlasterException e) {
278                   log.severe("Houston, we have a problem: " + e.getMessage());
279                   e.printStackTrace();
280                }
281             }
282             con.disconnect(new DisconnectQos(glob));
283          }
284       }
285    }
286 
287    /**
288     * We subscribe to some messages on startup or on reconnect
289     * to a new server instance. 
290     */
291    private void initClient() {
292       log.info("Entering initClient() and doing subscribes");
293       try {   
294          SubscribeKey sk = new SubscribeKey(glob, "Banking");
295          SubscribeQos sq = new SubscribeQos(glob);
296          sq.setWantInitialUpdate(false);
297          con.subscribe(sk, sq);
298 
299 
300          sk = new SubscribeKey(glob, "HelloWorld4");
301          sq = new SubscribeQos(glob);
302          sq.setWantInitialUpdate(false);
303          con.subscribe(sk, sq, new I_Callback() {
304             public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
305                if (updateKey.getOid().equals("HelloWorld4"))
306                   log.info("Receiving asynchronous message '" + updateKey.getOid() +
307                            "' state=" + updateQos.getState() + " in HelloWorld4 handler");
308                else
309                   log.severe("Receiving unexpected asynchronous message '" + updateKey.getOid() +
310                             "' with state '" + updateQos.getState() + "' in HelloWorld4 handler");
311                return "";
312             }
313          });  // subscribe with our specific update handler
314       }
315       catch (XmlBlasterException e) {
316          log.severe("Client initialization failed: " + e.getMessage());
317       }
318    }
319 
320    /**
321     * We publish some messages. 
322     */
323    private void publishMessages() {
324       try {
325 
326          PublishKey pk = new PublishKey(glob, "HelloWorld4", "text/plain", "1.0");
327          PublishQos pq = new PublishQos(glob);
328          MsgUnit msgUnit = new MsgUnit(pk, "Hi", pq);
329          con.publish(msgUnit);
330          log.info("Published message '" + pk.getOid() + "'");
331 
332 
333          pk = new PublishKey(glob, "Banking", "text/plain", "1.0");
334          pk.setClientTags("<Account><withdraw/></Account>"); // Add banking specific meta data
335          pq = new PublishQos(glob);
336          msgUnit = new MsgUnit(pk, "Ho".getBytes(), pq);
337          con.publish(msgUnit);
338          log.info("Published message '" + pk.getOid() + "'");
339 
340       }
341       catch (XmlBlasterException e) {
342          log.severe("Houston, we have a problem: " + e.getMessage());
343       }
344    }
345 
346    /**
347     * Try
348     * <pre>
349     *   java HelloWorld4 -help
350     * </pre>
351     * for usage help
352     */
353    public static void main(String args[]) {
354       Global glob = new Global();
355       
356       if (glob.init(args) != 0) { // Get help with -help
357          System.out.println(glob.usage());
358          System.err.println("Example: java HelloWorld4 -session.name Jeff\n");
359          System.exit(1);
360       }
361 
362       new HelloWorld4(glob);
363    }
364 }


syntax highlighted by Code2HTML, v. 0.9.1