1 // xmlBlaster/demo/HelloWorld4.java
2 import java.util.List;
3 import java.util.logging.Level;
4 import java.util.logging.Logger;
5
6 import org.xmlBlaster.client.I_Callback;
7 import org.xmlBlaster.client.I_ConnectionStateListener;
8 import org.xmlBlaster.client.I_XmlBlasterAccess;
9 import org.xmlBlaster.client.key.EraseKey;
10 import org.xmlBlaster.client.key.GetKey;
11 import org.xmlBlaster.client.key.PublishKey;
12 import org.xmlBlaster.client.key.SubscribeKey;
13 import org.xmlBlaster.client.key.UpdateKey;
14 import org.xmlBlaster.client.qos.ConnectQos;
15 import org.xmlBlaster.client.qos.ConnectReturnQos;
16 import org.xmlBlaster.client.qos.DisconnectQos;
17 import org.xmlBlaster.client.qos.EraseQos;
18 import org.xmlBlaster.client.qos.GetQos;
19 import org.xmlBlaster.client.qos.GetReturnQos;
20 import org.xmlBlaster.client.qos.PublishQos;
21 import org.xmlBlaster.client.qos.PublishReturnQos;
22 import org.xmlBlaster.client.qos.SubscribeQos;
23 import org.xmlBlaster.client.qos.UpdateQos;
24 import org.xmlBlaster.util.Global;
25 import org.xmlBlaster.util.MsgUnit;
26 import org.xmlBlaster.util.XmlBlasterException;
27 import org.xmlBlaster.util.def.MethodName;
28 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
29 import org.xmlBlaster.util.dispatch.I_PostSendListener;
30 import org.xmlBlaster.util.error.I_MsgErrorHandler;
31 import org.xmlBlaster.util.error.I_MsgErrorInfo;
32 import org.xmlBlaster.util.qos.QosData;
33 import org.xmlBlaster.util.queue.I_Entry;
34 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
35
36
37 /**
38 * This client connects to xmlBlaster in failsafe mode and uses specific update handlers.
39 * <p />
40 * In fail save mode the client will poll for the xmlBlaster server and
41 * queue messages until the server is available.
42 * We show all available control of a client in failsafe mode.
43 * <p />
44 * Invoke: java HelloWorld4 -session.name joe/2 -passwd secret
45 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
46 */
47 public class HelloWorld4
48 {
49 private final Global glob;
50 private static Logger log = Logger.getLogger(HelloWorld4.class.getName());
51 private I_XmlBlasterAccess con = null;
52 private ConnectReturnQos conRetQos = null;
53
54 public HelloWorld4(final Global glob) {
55 this.glob = glob;
56
57
58 try {
59 con = glob.getXmlBlasterAccess();
60
61 // Do all client side error handling our self
62 // this error handler is called when we are/were polling for the server:
63 con.setClientErrorHandler(new I_MsgErrorHandler() {
64
65 public void handleError(I_MsgErrorInfo msgErrorInfo) {
66 if (msgErrorInfo == null) return;
67 XmlBlasterException ex = msgErrorInfo.getXmlBlasterException();
68 if (ex.isUser()) {
69 log.severe("Connection failed: " + msgErrorInfo.getXmlBlasterException().getMessage());
70 if (msgErrorInfo.getDispatchManager() != null) {
71 msgErrorInfo.getDispatchManager().toDead(msgErrorInfo.getXmlBlasterException());
72 if (msgErrorInfo.getQueue() != null)
73 msgErrorInfo.getQueue().clear();
74 msgErrorInfo.getDispatchManager().shutdown();
75 return;
76 }
77 }
78 MsgQueueEntry[] entries = msgErrorInfo.getMsgQueueEntries();
79 for (int i=0; i<entries.length; i++)
80 log.severe("Message '" + entries[i].getEmbeddedType() + "' '" +
81 entries[i].getLogId() + "' is lost: " + msgErrorInfo.getXmlBlasterException().getMessage());
82 if (msgErrorInfo.getQueue() != null)
83 msgErrorInfo.getQueue().clear();
84 }
85
86 public String handleErrorSync(I_MsgErrorInfo msgErrorInfo) throws XmlBlasterException {
87 if (msgErrorInfo.getXmlBlasterException().isCommunication()) {
88 handleError(msgErrorInfo);
89 return "";
90 }
91 throw msgErrorInfo.getXmlBlasterException(); // Throw back to client
92 }
93
94 public void shutdown() {
95 }
96 }
97 );
98
99 // This listener receives only events from asynchronously send messages from queue.
100 // e.g. after a reconnect when client side queued messages are delivered
101 con.registerPostSendListener(new I_PostSendListener() {
102 /**
103 * @see I_PostSendListener#postSend(MsgQueueEntry[])
104 */
105 public void postSend(MsgQueueEntry[] entries) {
106 try {
107 for (int i=0; i<entries.length; i++) {
108 if (MethodName.PUBLISH.equals(entries[i].getMethodName())) {
109 MsgUnit msg = entries[i].getMsgUnit();
110 PublishReturnQos retQos = (PublishReturnQos)entries[i].getReturnObj();
111 log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue: " + retQos.toXml());
112 }
113 else
114 log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
115 }
116 } catch (Throwable e) {
117 e.printStackTrace();
118 }
119 }
120
121 /**
122 * @see I_PostSendListener#sendingFailed(MsgQueueEntry[], XmlBlasterException)
123 */
124 public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException ex) {
125 try {
126 for (int i=0; i<entries.length; i++) {
127 if (MethodName.PUBLISH.equals(entries[i].getMethodName())) {
128 MsgUnit msg = entries[i].getMsgUnit();
129 log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue failed: " + ex.getMessage());
130 }
131 else
132 log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
133 }
134 } catch (Throwable e) {
135 e.printStackTrace();
136 }
137 return false; // false: message remains in queue and we go to dead
138 }
139 });
140
141
142 // Listen on status changes of our connection to xmlBlaster
143 con.registerConnectionListener(new I_ConnectionStateListener() {
144
145 public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
146 log.info("I_ConnectionStateListener.reachedAlive(): We were lucky, connected to " +
147 connection.getConnectReturnQos().getSessionName());
148 if (connection.getQueue().getNumOfEntries() > 0) {
149 log.info("I_ConnectionStateListener.reachedAlive(): Queue contains " +
150 connection.getQueue().getNumOfEntries() + " messages: " +
151 connection.getQueue().toXml(""));
152 try {
153 List<I_Entry> list = connection.getQueue().peek(-1, -1);
154 for (int i=0; i<list.size(); i++) {
155 log.info(((MsgQueueEntry)list.get(i)).toXml());
156 }
157 /*
158 MsgQueueEntry entry = (MsgQueueEntry)connection.getQueue().peek();
159 log.info("I_ConnectionStateListener.reachedAlive(): Discarding messages from queue");
160 connection.getQueue().clear(); // e.g. discard all msgs (it is our choice)
161 if (MethodName.CONNECT == entry.getMethodName()) {
162 connection.getQueue().put(entry, false);
163 }
164 */
165 }
166 catch (XmlBlasterException e) {
167 }
168 }
169 if (!connection.getConnectReturnQos().isReconnected()) {
170 log.info("I_ConnectionStateListener.reachedAlive(): New server instance found");
171 if (connection.isConnected())
172 initClient(); // initialize subscription etc. again
173 }
174 else {
175 log.info("I_ConnectionStateListener.reachedAlive(): Same server instance found");
176 }
177 }
178
179 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
180 log.warning("I_ConnectionStateListener.reachedPolling(): No connection to " + glob.getId() + ", we are polling ...");
181 if (!connection.isConnected())
182 initClient();
183 }
184
185 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
186 log.severe("I_ConnectionStateListener.reachedDead(): Connection to " + glob.getId() + " is dead, good bye");
187 System.exit(1);
188 }
189 public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
190 }
191
192 });
193
194
195 ConnectQos qos = new ConnectQos(glob);
196 conRetQos = con.connect(qos, new I_Callback() {
197
198 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
199 if (log.isLoggable(Level.FINEST)) log.finest("UpdateKey.toString()=" + updateKey.toString() +
200 "UpdateQos.toString()=" + updateQos.toString());
201 if (updateKey.isInternal()) {
202 log.severe("Receiving unexpected asynchronous internal message '" + updateKey.getOid() +
203 "' in default handler");
204 return "";
205 }
206 if (updateQos.isErased()) {
207 log.info("Message '" + updateKey.getOid() + "' is erased");
208 return "";
209 }
210 if (updateKey.getOid().equals("Banking"))
211 log.info("Receiving asynchronous message '" + updateKey.getOid() +
212 "' state=" + updateQos.getState() + " in default handler");
213 else
214 log.severe("Receiving unexpected asynchronous message '" + updateKey.getOid() +
215 "' in default handler");
216 return "";
217 }
218
219 }); // Login to xmlBlaster, default handler for updates
220
221
222 if (con.isAlive())
223 log.info("Connected as " + qos.getUserId() + " to xmlBlaster, your public session ID is " + conRetQos.getSessionName());
224 else
225 log.info("Not connected to xmlBlaster, proceeding in fail save mode ...");
226
227 while (true) {
228 // Wait a second for messages to arrive before we logout
229 try { Thread.sleep(1000); } catch( InterruptedException i) {}
230 int key = Global.waitOnKeyboardHit("Hit a key: 'p'=publish, 'g'=get, 'q'=exit");
231 if (key == 'p') {
232 publishMessages();
233 continue;
234 }
235 else if (key == 'g') {
236 GetKey gk = new GetKey(glob, "Banking");
237 GetQos gq = new GetQos(glob);
238 try {
239 MsgUnit[] msgs = con.get(gk, gq);
240 if (msgs.length > 0) {
241 GetReturnQos grq = new GetReturnQos(glob, msgs[0].getQos());
242 log.info("Accessed xmlBlaster message with content '" + new String(msgs[0].getContent()) +
243 "' and status=" + grq.getState());
244 }
245 else {
246 log.info("No message matched get() call on " + gk.getOid());
247 }
248 }
249 catch (XmlBlasterException e) {
250 log.warning("get() failed:" + e.getMessage());
251 }
252 continue;
253 }
254 else if (key == 'q') {
255 break;
256 }
257 }
258 }
259 catch (XmlBlasterException e) {
260 log.severe("Houston, we have a problem: " + e.getMessage());
261 }
262 finally {
263 if (con != null) {
264 if (con.isConnected()) {
265 try {
266 EraseQos eq = new EraseQos(glob);
267
268 EraseKey ek = new EraseKey(glob, "HelloWorld4");
269 con.erase(ek, eq);
270
271 ek = new EraseKey(glob, "Banking");
272 con.erase(ek, eq);
273
274 // Wait on message erase events
275 try { Thread.sleep(1000); } catch( InterruptedException i) {}
276 }
277 catch (XmlBlasterException e) {
278 log.severe("Houston, we have a problem: " + e.getMessage());
279 e.printStackTrace();
280 }
281 }
282 con.disconnect(new DisconnectQos(glob));
283 }
284 }
285 }
286
287 /**
288 * We subscribe to some messages on startup or on reconnect
289 * to a new server instance.
290 */
291 private void initClient() {
292 log.info("Entering initClient() and doing subscribes");
293 try {
294 SubscribeKey sk = new SubscribeKey(glob, "Banking");
295 SubscribeQos sq = new SubscribeQos(glob);
296 sq.setWantInitialUpdate(false);
297 con.subscribe(sk, sq);
298
299
300 sk = new SubscribeKey(glob, "HelloWorld4");
301 sq = new SubscribeQos(glob);
302 sq.setWantInitialUpdate(false);
303 con.subscribe(sk, sq, new I_Callback() {
304 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
305 if (updateKey.getOid().equals("HelloWorld4"))
306 log.info("Receiving asynchronous message '" + updateKey.getOid() +
307 "' state=" + updateQos.getState() + " in HelloWorld4 handler");
308 else
309 log.severe("Receiving unexpected asynchronous message '" + updateKey.getOid() +
310 "' with state '" + updateQos.getState() + "' in HelloWorld4 handler");
311 return "";
312 }
313 }); // subscribe with our specific update handler
314 }
315 catch (XmlBlasterException e) {
316 log.severe("Client initialization failed: " + e.getMessage());
317 }
318 }
319
320 /**
321 * We publish some messages.
322 */
323 private void publishMessages() {
324 try {
325
326 PublishKey pk = new PublishKey(glob, "HelloWorld4", "text/plain", "1.0");
327 PublishQos pq = new PublishQos(glob);
328 MsgUnit msgUnit = new MsgUnit(pk, "Hi", pq);
329 con.publish(msgUnit);
330 log.info("Published message '" + pk.getOid() + "'");
331
332
333 pk = new PublishKey(glob, "Banking", "text/plain", "1.0");
334 pk.setClientTags("<Account><withdraw/></Account>"); // Add banking specific meta data
335 pq = new PublishQos(glob);
336 msgUnit = new MsgUnit(pk, "Ho".getBytes(), pq);
337 con.publish(msgUnit);
338 log.info("Published message '" + pk.getOid() + "'");
339
340 }
341 catch (XmlBlasterException e) {
342 log.severe("Houston, we have a problem: " + e.getMessage());
343 }
344 }
345
346 /**
347 * Try
348 * <pre>
349 * java HelloWorld4 -help
350 * </pre>
351 * for usage help
352 */
353 public static void main(String args[]) {
354 Global glob = new Global();
355
356 if (glob.init(args) != 0) { // Get help with -help
357 System.out.println(glob.usage());
358 System.err.println("Example: java HelloWorld4 -session.name Jeff\n");
359 System.exit(1);
360 }
361
362 new HelloWorld4(glob);
363 }
364 }
syntax highlighted by Code2HTML, v. 0.9.1