|
-- José Enrique García Maciñeiras <kszosze at yahoo.es> |
// xmlBlaster/demo/HelloWorld6.java
import org.jutils.log.LogChannel;
import org.xmlBlaster.util.Global;
import org.xmlBlaster.util.XmlBlasterException;
import org.xmlBlaster.util.MsgUnit;
import org.xmlBlaster.util.qos.address.Address;
import org.xmlBlaster.util.qos.storage.ClientQueueProperty;
import org.xmlBlaster.util.qos.address.CallbackAddress;
import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
import org.xmlBlaster.client.qos.ConnectQos;
import org.xmlBlaster.client.qos.ConnectReturnQos;
import org.xmlBlaster.client.qos.DisconnectQos;
import org.xmlBlaster.client.I_ConnectionStateListener;
import org.xmlBlaster.client.I_Callback;
import org.xmlBlaster.client.key.SubscribeKey;
import org.xmlBlaster.client.key.PublishKey;
import org.xmlBlaster.client.key.UpdateKey;
import org.xmlBlaster.client.key.EraseKey;
import org.xmlBlaster.client.qos.PublishQos;
import org.xmlBlaster.client.qos.PublishReturnQos;
import org.xmlBlaster.client.qos.UpdateQos;
import org.xmlBlaster.client.qos.SubscribeQos;
import org.xmlBlaster.client.qos.SubscribeReturnQos;
import org.xmlBlaster.client.qos.EraseQos;
import org.xmlBlaster.client.qos.EraseReturnQos;
import org.xmlBlaster.client.I_XmlBlasterAccess;
/**
* This client connects to xmlBlaster in fail save mode and uses specific update handlers.
* <p />
* In fail save mode the client will poll for the xmlBlaster server and
* queue messages until the server is available.<br />
* Further you see how to configure the connection behavior hard coded.
* <p />
* Invoke: java HelloWorld6 -session.name jack/5
* <p />
* Invoke: java HelloWorld6 -session.name joe/2 -passwd secret -dispatch/connection/protocol XMLRPC
* at see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
*/
public class fantasmaEntidad {
private final String ME = "HelloWorld6";
private final LogChannel log;
private I_XmlBlasterAccess con = null;
private ConnectReturnQos conRetQos = null;
public fantasmaEntidad(final Global glob) {
log = glob.getLog(null);
try {
con = glob.getXmlBlasterAccess();
/*
// Change hard-coded the protocol and server lookup:
String[] args = { "-protocol", "SOCKET",
"-dispatch/connection/plugin/socket/hostname", "server.xmlBlaster.org",
"-dispatch/connection/plugin/socket/port", "9455",
//"-dispatch/connection/plugin/socket/localHostname", "myHost.com",
//"-dispatch/connection/plugin/socket/localPort", "8888"
};
glob.init(args);
*/
ConnectQos connectQos = new ConnectQos(glob);
ClientQueueProperty prop = new ClientQueueProperty(glob, null);
prop.setMaxEntries(10000); // Client side queue up to 10000 entries if not connected
Address address = new Address(glob);
address.setDelay(4000L); // retry connecting every 4 sec
address.setRetries(-1); // -1 == forever
address.setPingInterval(2000L); // ping every 2 sec
// Example how to hardcode a XmlRpc server:
address.setType("XMLRPC"); // force XmlRpc protocol
address.setRawAddress("http://samael:8080/"); // Address to find the server
// Example how to hardcode a SOCKET server:
//address.setType("SOCKET"); // force SOCKET protocol
//address.setRawAddress("socket://noty:9988"); // Address to find the server
prop.setAddress(address);
connectQos.addClientQueueProperty(prop);
CallbackAddress cbAddress = new CallbackAddress(glob);
cbAddress.setDelay(4000L); // retry connecting every 4 sec
cbAddress.setRetries(-1); // -1 == forever
cbAddress.setPingInterval(4000L); // ping every 4 seconds
// Example how to hardcode a SOCKET server:
//cbAddress.setType("SOCKET"); // force SOCKET protocol for callback
connectQos.addCallbackAddress(cbAddress);
connectQos.addClientQueueProperty(prop);
// We want to be notified about connection states:
con.registerConnectionListener(new I_ConnectionStateListener() {
public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
conRetQos = connection.getConnectReturnQos();
log.info(ME, "I_ConnectionStateListener: We were lucky, connected to " + glob.getId() + " as " + conRetQos.getSessionName());
// we can access the queue via connectionHandler and for example erase the entries ...
}
public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
log.warn(ME, "I_ConnectionStateListener: No connection to " + glob.getId() + ", we are polling ...");
}
public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
log.warn(ME, "I_ConnectionStateListener: Connection to " + glob.getId() + " is DEAD -> Good bye");
System.exit(1);
}
});
// We connect to xmlBlaster and register the callback handle:
this.conRetQos = con.connect(connectQos, new I_Callback() {
public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
if (log.DUMP) log.dump(ME, "UpdateKey.toString()=" + updateKey.toString());
if (updateKey.isInternal()) {
log.error(ME, "Receiving unexpected asynchronous internal message '" + updateKey.getOid() +
"' in default handler");
return "";
}
if (updateQos.isErased()) {
log.info(ME, "Message '" + updateKey.getOid() + "' is erased");
return "";
}
if (updateKey.getOid().equals("Banking"))
log.info(ME, "Receiving asynchronous message '" + updateKey.getOid() +
"' state=" + updateQos.getState() + " in default handler");
else
log.error(ME, "Receiving unexpected asynchronous message '" + updateKey.getOid() +
"' in default handler");
return "";
}
}); // Login to xmlBlaster, default handler for updates
if (con.isAlive())
log.info(ME, "Connected as " + connectQos.getUserId() + " to xmlBlaster: " + this.conRetQos.getSessionName());
else
log.info(ME, "Not connected to xmlBlaster, proceeding in fail save mode ...");
SubscribeKey sk = new SubscribeKey(glob, "Banking");
SubscribeQos sq = new SubscribeQos(glob);
SubscribeReturnQos sr1 = con.subscribe(sk, sq);
sk = new SubscribeKey(glob, "HelloWorld6");
sq = new SubscribeQos(glob);
SubscribeReturnQos sr2 = con.subscribe(sk, sq, new I_Callback() {
public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
if (updateKey.getOid().equals("HelloWorld6"))
log.info(ME, "Receiving asynchronous message '" + updateKey.getOid() +
"' state=" + updateQos.getState() + " in HelloWorld6 handler");
else
log.error(ME, "Receiving unexpected asynchronous message '" + updateKey.getOid() +
"' with state '" + updateQos.getState() + "' in HelloWorld6 handler");
return "";
}
}); // subscribe with our specific update handler
PublishKey pk = new PublishKey(glob, "HelloWorld6", "text/plain", "1.0");
PublishQos pq = new PublishQos(glob);
java.io.BufferedReader entrada = new java.io.BufferedReader(new java.io.InputStreamReader(System.in));
try{
int i = 0;
String mensaje;
while (true){
while (!entrada.ready()){}
mensaje = entrada.readLine();
MsgUnit msgUnit = new MsgUnit(pk, mensaje.getBytes(), pq);
System.err.println(i++ +" - Envio esto "+mensaje);
PublishReturnQos retQos = con.publish(msgUnit);
log.info(ME, "Published message '" + pk.getOid() + "'");
}
}catch (java.io.IOException IOEx){
System.err.println("Warning Error ");
IOEx.printStackTrace();
}
//
// pk = new PublishKey(glob, "Banking", "text/plain", "1.0");
// pk.setClientTags("<Account><withdraw/></Account>"); // Add banking specific meta data
// pq = new PublishQos(glob);
// msgUnit = new MsgUnit(pk, "Ho".getBytes(), pq);
// retQos = con.publish(msgUnit);
// log.info(ME, "Published message '" + pk.getOid() + "'");
}
catch (XmlBlasterException e) {
log.error(ME, "Houston, we have a problem: " + e.toString());
}
finally {
// Wait a second for messages to arrive before we logout
try { Thread.currentThread().sleep(1000); } catch( InterruptedException i) {}
Global.waitOnKeyboardHit("Success, hit a key to exit");
if (con != null && con.isConnected()) {
try {
EraseQos eq = new EraseQos(glob);
EraseKey ek = new EraseKey(glob, "HelloWorld6");
EraseReturnQos[] er = con.erase(ek, eq);
ek = new EraseKey(glob, "Banking");
er = con.erase(ek, eq);
// Wait on message erase events
try { Thread.currentThread().sleep(1000); } catch( InterruptedException i) {}
}
catch (XmlBlasterException e) {
log.error(ME, "Houston, we have a problem: " + e.toString());
e.printStackTrace();
}
con.disconnect(new DisconnectQos(glob));
}
}
}
/**
* Try
* <pre>
* java HelloWorld6 -help
* </pre>
* for usage help
*/
public static void main(String args[]) {
Global glob = new Global();
if (glob.init(args) != 0) { // Get help with -help
System.out.println(glob.usage());
System.err.println("Example: java HelloWorld6 -session.name Jeff\n");
System.exit(1);
}
new fantasmaEntidad(glob);
}
}