[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[xmlblaster] problem about xmBlaster and System.in



Hi!,

    I'm making a program to give to realtimeblaster(RTB) game the distributed capability, so, I must trap the message that RTB send to the robots. It makes it with classical C pipes to standard IN and standard Out.

    I'm making all in java to make possible the integration of any S.O. (actually RTB it's only limited to work in Linux and may in windows).

    The problem that I have now and I couldn't fix, is this:

        I have a little java class that read from System.in and send de message to a xmlBlaster server. it's only has the code to read a property file to get the address of te server, the connect to the xmlBlaster server,  a thread to read de messages and a subclass to write the messages that received from xmlBlaster.

        well, I launch the RTB that launch this class by a script. this class start to read and send messages. all it's ok, but when it reach some number of messages it give me an IOException -> Resource Temporarily unavailable. in System.in ¿?

    I make some test, and, when I'm not connected to the xmlBlaster server it's works fine. When I'm connected it's fail. I try to change the Stream of reading. I change the System.in by a text plain file and it's works fine.

    Thinking that I have a mistake with the config of the client I make some modifications in helloword6 and try it (I add the file) but it make the same fail. when it's connected to the server, and after some time it crashs and don't read.

    I don't know why, and how I could solve it, and I'm very hurry to fix it.

    Any idea?

thanks.
--
José Enrique García Maciñeiras <kszosze at yahoo.es>
// xmlBlaster/demo/HelloWorld6.java
import org.jutils.log.LogChannel;
import org.xmlBlaster.util.Global;
import org.xmlBlaster.util.XmlBlasterException;
import org.xmlBlaster.util.MsgUnit;
import org.xmlBlaster.util.qos.address.Address;
import org.xmlBlaster.util.qos.storage.ClientQueueProperty;
import org.xmlBlaster.util.qos.address.CallbackAddress;
import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
import org.xmlBlaster.client.qos.ConnectQos;
import org.xmlBlaster.client.qos.ConnectReturnQos;
import org.xmlBlaster.client.qos.DisconnectQos;
import org.xmlBlaster.client.I_ConnectionStateListener;
import org.xmlBlaster.client.I_Callback;
import org.xmlBlaster.client.key.SubscribeKey;
import org.xmlBlaster.client.key.PublishKey;
import org.xmlBlaster.client.key.UpdateKey;
import org.xmlBlaster.client.key.EraseKey;
import org.xmlBlaster.client.qos.PublishQos;
import org.xmlBlaster.client.qos.PublishReturnQos;
import org.xmlBlaster.client.qos.UpdateQos;
import org.xmlBlaster.client.qos.SubscribeQos;
import org.xmlBlaster.client.qos.SubscribeReturnQos;
import org.xmlBlaster.client.qos.EraseQos;
import org.xmlBlaster.client.qos.EraseReturnQos;
import org.xmlBlaster.client.I_XmlBlasterAccess;


/**
 * This client connects to xmlBlaster in fail save mode and uses specific update handlers.
 * <p />
 * In fail save mode the client will poll for the xmlBlaster server and
 * queue messages until the server is available.<br />
 * Further you see how to configure the connection behavior hard coded.
 * <p />
 * Invoke: java HelloWorld6 -session.name jack/5
 * <p />
 * Invoke: java HelloWorld6 -session.name joe/2 -passwd secret -dispatch/connection/protocol XMLRPC
 *  at see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html"; target="others">xmlBlaster interface</a>
 */
public class fantasmaEntidad {
    private final String ME = "HelloWorld6";
    private final LogChannel log;
    private I_XmlBlasterAccess con = null;
    private ConnectReturnQos conRetQos = null;
    
    public fantasmaEntidad(final Global glob) {
        
        log = glob.getLog(null);
        
        try {
            con = glob.getXmlBlasterAccess();
            
         /*
         // Change hard-coded the protocol and server lookup:
         String[] args = { "-protocol", "SOCKET",
                           "-dispatch/connection/plugin/socket/hostname", "server.xmlBlaster.org",
                           "-dispatch/connection/plugin/socket/port", "9455",
                           //"-dispatch/connection/plugin/socket/localHostname", "myHost.com",
                           //"-dispatch/connection/plugin/socket/localPort", "8888"
                         };
         glob.init(args);
          */
            
            ConnectQos connectQos = new ConnectQos(glob);
            
            ClientQueueProperty prop = new ClientQueueProperty(glob, null);
            prop.setMaxEntries(10000);    // Client side queue up to 10000 entries if not connected
            
            Address address = new Address(glob);
            address.setDelay(4000L);      // retry connecting every 4 sec
            address.setRetries(-1);       // -1 == forever
            address.setPingInterval(2000L);  // ping every 2 sec
            
            // Example how to hardcode a XmlRpc server:
            address.setType("XMLRPC");    // force XmlRpc protocol
            address.setRawAddress("http://samael:8080/";); // Address to find the server
            
            // Example how to hardcode a SOCKET server:
            //address.setType("SOCKET");    // force SOCKET protocol
            //address.setRawAddress("socket://noty:9988"); // Address to find the server
            
            prop.setAddress(address);
            connectQos.addClientQueueProperty(prop);
            
            CallbackAddress cbAddress = new CallbackAddress(glob);
            cbAddress.setDelay(4000L);      // retry connecting every 4 sec
            cbAddress.setRetries(-1);       // -1 == forever
            cbAddress.setPingInterval(4000L); // ping every 4 seconds
            
            // Example how to hardcode a SOCKET server:
            //cbAddress.setType("SOCKET");    // force SOCKET protocol for callback
            
            connectQos.addCallbackAddress(cbAddress);
            
            connectQos.addClientQueueProperty(prop);
            
            // We want to be notified about connection states:
            con.registerConnectionListener(new I_ConnectionStateListener() {
                
                public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
                    conRetQos = connection.getConnectReturnQos();
                    log.info(ME, "I_ConnectionStateListener: We were lucky, connected to " + glob.getId() + " as " + conRetQos.getSessionName());
                    // we can access the queue via connectionHandler and for example erase the entries ...
                }
                public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
                    log.warn(ME, "I_ConnectionStateListener: No connection to " + glob.getId() + ", we are polling ...");
                }
                public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
                    log.warn(ME, "I_ConnectionStateListener: Connection to " + glob.getId() + " is DEAD -> Good bye");
                    System.exit(1);
                }
            });
            
            // We connect to xmlBlaster and register the callback handle:
            this.conRetQos = con.connect(connectQos, new I_Callback() {
                
                public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
                    if (log.DUMP) log.dump(ME, "UpdateKey.toString()=" + updateKey.toString());
                    if (updateKey.isInternal()) {
                        log.error(ME, "Receiving unexpected asynchronous internal message '" + updateKey.getOid() +
                        "' in default handler");
                        return "";
                    }
                    if (updateQos.isErased()) {
                        log.info(ME, "Message '" + updateKey.getOid() + "' is erased");
                        return "";
                    }
                    if (updateKey.getOid().equals("Banking"))
                        log.info(ME, "Receiving asynchronous message '" + updateKey.getOid() +
                        "' state=" + updateQos.getState() + " in default handler");
                    else
                        log.error(ME, "Receiving unexpected asynchronous message '" + updateKey.getOid() +
                        "' in default handler");
                    return "";
                }
                
            });  // Login to xmlBlaster, default handler for updates
            
            
            if (con.isAlive())
                log.info(ME, "Connected as " + connectQos.getUserId() + " to xmlBlaster: " + this.conRetQos.getSessionName());
            else
                log.info(ME, "Not connected to xmlBlaster, proceeding in fail save mode ...");
            
            
            SubscribeKey sk = new SubscribeKey(glob, "Banking");
            SubscribeQos sq = new SubscribeQos(glob);
            SubscribeReturnQos sr1 = con.subscribe(sk, sq);
            
            
            sk = new SubscribeKey(glob, "HelloWorld6");
            sq = new SubscribeQos(glob);
            SubscribeReturnQos sr2 = con.subscribe(sk, sq, new I_Callback() {
                public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
                    if (updateKey.getOid().equals("HelloWorld6"))
                        log.info(ME, "Receiving asynchronous message '" + updateKey.getOid() +
                        "' state=" + updateQos.getState() + " in HelloWorld6 handler");
                    else
                        log.error(ME, "Receiving unexpected asynchronous message '" + updateKey.getOid() +
                        "' with state '" + updateQos.getState() + "' in HelloWorld6 handler");
                    return "";
                }
            });  // subscribe with our specific update handler
            
            
            PublishKey pk = new PublishKey(glob, "HelloWorld6", "text/plain", "1.0");
            PublishQos pq = new PublishQos(glob);
            java.io.BufferedReader entrada = new java.io.BufferedReader(new java.io.InputStreamReader(System.in));
            try{
                int i = 0;
                String mensaje;
                while (true){
                    
                    while (!entrada.ready()){}
                    mensaje = entrada.readLine();
                    MsgUnit msgUnit = new MsgUnit(pk, mensaje.getBytes(), pq);
                    System.err.println(i++ +" - Envio esto "+mensaje);
                    PublishReturnQos retQos = con.publish(msgUnit);
                    log.info(ME, "Published message '" + pk.getOid() + "'");
                }
            }catch (java.io.IOException IOEx){
                System.err.println("Warning Error ");
                IOEx.printStackTrace();
            }
            //
            //            pk = new PublishKey(glob, "Banking", "text/plain", "1.0");
            //            pk.setClientTags("<Account><withdraw/></Account>"); // Add banking specific meta data
            //            pq = new PublishQos(glob);
            //            msgUnit = new MsgUnit(pk, "Ho".getBytes(), pq);
            //            retQos = con.publish(msgUnit);
            //            log.info(ME, "Published message '" + pk.getOid() + "'");
        }
        catch (XmlBlasterException e) {
            log.error(ME, "Houston, we have a problem: " + e.toString());
        }
        finally {
            // Wait a second for messages to arrive before we logout
            try { Thread.currentThread().sleep(1000); } catch( InterruptedException i) {}
            Global.waitOnKeyboardHit("Success, hit a key to exit");
            
            if (con != null && con.isConnected()) {
                try {
                    EraseQos eq = new EraseQos(glob);
                    
                    EraseKey ek = new EraseKey(glob, "HelloWorld6");
                    EraseReturnQos[] er = con.erase(ek, eq);
                    
                    ek = new EraseKey(glob, "Banking");
                    er = con.erase(ek, eq);
                    
                    // Wait on message erase events
                    try { Thread.currentThread().sleep(1000); } catch( InterruptedException i) {}
                }
                catch (XmlBlasterException e) {
                    log.error(ME, "Houston, we have a problem: " + e.toString());
                    e.printStackTrace();
                }
                
                con.disconnect(new DisconnectQos(glob));
            }
        }
    }
    
    /**
     * Try
     * <pre>
     *   java HelloWorld6 -help
     * </pre>
     * for usage help
     */
    public static void main(String args[]) {
        Global glob = new Global();
        
        if (glob.init(args) != 0) { // Get help with -help
            System.out.println(glob.usage());
            System.err.println("Example: java HelloWorld6 -session.name Jeff\n");
            System.exit(1);
        }
        
        new fantasmaEntidad(glob);
    }
}