added java jabber layer
authorerickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Wed, 9 May 2007 17:45:42 +0000 (17:45 +0000)
committererickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Wed, 9 May 2007 17:45:42 +0000 (17:45 +0000)
git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@876 9efc2488-bf62-4759-914b-345cdb29e865

src/java/org/opensrf/net/xmpp/XMPPException.java [new file with mode: 0644]
src/java/org/opensrf/net/xmpp/XMPPMessage.java [new file with mode: 0644]
src/java/org/opensrf/net/xmpp/XMPPReader.java [new file with mode: 0644]
src/java/org/opensrf/net/xmpp/XMPPSession.java [new file with mode: 0644]
src/java/org/opensrf/test/TestXMPP.java [new file with mode: 0644]

diff --git a/src/java/org/opensrf/net/xmpp/XMPPException.java b/src/java/org/opensrf/net/xmpp/XMPPException.java
new file mode 100644 (file)
index 0000000..29ee0e7
--- /dev/null
@@ -0,0 +1,18 @@
+package org.opensrf.net.xmpp;
+
+/**
+ * Used for XMPP stream/authentication errors
+ */
+public class XMPPException extends Exception {
+    private String info;
+
+    /**
+     * @param info Runtime exception information.
+     */
+    public XMPPException(String info) {
+        this.info = info;
+    }
+    public String toString() {
+        return this.info;
+    }
+}
diff --git a/src/java/org/opensrf/net/xmpp/XMPPMessage.java b/src/java/org/opensrf/net/xmpp/XMPPMessage.java
new file mode 100644 (file)
index 0000000..f1f0d55
--- /dev/null
@@ -0,0 +1,159 @@
+package org.opensrf.net.xmpp;
+
+import java.io.*;
+
+
+/*
+ * uncomment to use the DOM serialization code...
+import org.w3c.dom.*;
+import org.apache.xerces.dom.DocumentImpl;
+import org.apache.xerces.dom.DOMImplementationImpl;
+import org.apache.xml.serialize.OutputFormat;
+import org.apache.xml.serialize.Serializer;
+import org.apache.xml.serialize.SerializerFactory;
+import org.apache.xml.serialize.XMLSerializer;
+*/
+
+
+/**
+ * Models a single XMPP message.
+ */
+public class XMPPMessage {
+
+    /** Message body */
+    private String body;
+    /** Message recipient */
+    private String to;
+    /** Message sender */
+    private String from;
+    /** Message thread */
+    private String thread;
+    /** Message xid */
+    private String xid;
+
+    public XMPPMessage() {
+    }
+
+    public String getBody() {
+        return body;
+    }
+    public String getTo() { 
+        return to; 
+    }
+    public String getFrom() { 
+        return from;
+    }
+    public String getThread() { 
+        return thread; 
+    }
+    public String getXid() {
+        return xid;
+    }
+    public void setBody(String body) {
+        this.body = body;
+    }
+    public void setTo(String to) { 
+        this.to = to; 
+    }
+    public void setFrom(String from) { 
+        this.from = from; 
+    }
+    public void setThread(String thread) { 
+        this.thread = thread; 
+    }
+    public void setXid(String xid) {
+        this.xid = xid; 
+    }
+
+
+    /**
+     * Generates the XML representation of this message.
+     */
+    public String toXML() {
+        StringBuffer sb = new StringBuffer("<message to='");
+        escapeXML(to, sb);
+        sb.append("' osrf_xid='");
+        escapeXML(xid, sb);
+        sb.append("'><thread>");
+        escapeXML(thread, sb);
+        sb.append("</thread><body>");
+        escapeXML(body, sb);
+        sb.append("</body></message>");
+        return sb.toString();
+    }
+
+
+    /**
+     * Escapes non-valid XML characters.
+     * @param s The string to escape.
+     * @param sb The StringBuffer to append new data to.
+     */
+    private void escapeXML(String s, StringBuffer sb) {
+        if( s == null ) return;
+        char c;
+        int l = s.length();
+        for( int i = 0; i < l; i++ ) {
+            c = s.charAt(i);
+            switch(c) {
+                case '<': 
+                    sb.append("&lt;");
+                    break;
+                case '>': 
+                    sb.append("&gt;");
+                    break;
+                case '&': 
+                    sb.append("&amp;");
+                    break;
+                default:
+                    sb.append(c);
+            }
+        }
+    }
+
+
+
+    /**
+     * This is a DOM implementataion of message serialization. 
+     * I'm inclined to think the stringbuffer version is faster, but 
+     * I have no proof.
+     */
+    /*
+    public String __toXML() {
+
+        Document doc = new DocumentImpl();
+        Element message = doc.createElement("message");
+        Element body = doc.createElement("body");
+        Element thread = doc.createElement("thread");
+
+        doc.appendChild(message);
+        message.setAttribute("to", getTo());
+        message.setAttribute("from", getFrom());
+        message.appendChild(body);
+        message.appendChild(thread);
+
+        body.appendChild(doc.createTextNode(getBody()));
+        thread.appendChild(doc.createTextNode(getThread()));
+
+        XMLSerializer serializer = new XMLSerializer();
+        StringWriter strWriter = new StringWriter();
+        OutputFormat outFormat = new OutputFormat();
+
+        outFormat.setEncoding("UTF-8");
+        outFormat.setVersion("1.0");
+        outFormat.setIndenting(false);
+        outFormat.setOmitXMLDeclaration(true);
+
+        serializer.setOutputCharStream(strWriter);
+        serializer.setOutputFormat(outFormat);
+
+        try {
+            serializer.serialize(doc);
+        } catch(IOException ioe) {
+        }
+        return strWriter.toString();
+    }
+    */
+}
+
+
diff --git a/src/java/org/opensrf/net/xmpp/XMPPReader.java b/src/java/org/opensrf/net/xmpp/XMPPReader.java
new file mode 100644 (file)
index 0000000..39ea0c2
--- /dev/null
@@ -0,0 +1,279 @@
+package org.opensrf.net.xmpp;
+
+import javax.xml.stream.*;
+import javax.xml.stream.events.* ;
+import javax.xml.namespace.QName;
+import java.util.Queue;
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Date;
+
+
+/**
+ * Slim XMPP Stream reader.  This reader only understands enough XMPP
+ * to handle logins and recv messages.
+ * @author Bill Erickson, Georgia Public Library Systems
+ */
+public class XMPPReader implements Runnable {
+
+    /** Queue of received messages. */
+    private Queue<XMPPMessage> msgQueue;
+    /** Incoming XMPP XML stream */
+    private InputStream inStream;
+    /** Current message body */
+    private StringBuffer msgBody;
+    /** Current message thread */
+    private StringBuffer msgThread;
+    /** Current message status */
+    private StringBuffer msgStatus;
+    /** Current message error type */
+    private StringBuffer msgErrType;
+    /** Current message sender */
+    private String msgFrom;
+    /** Current message recipient */
+    private String msgTo;
+    /** Current message error code */
+    private int msgErrCode;
+
+    /** Where this reader currently is in the document */
+    private XMLState xmlState;
+
+    /** The current connect state to the XMPP server */
+    private XMPPStreamState streamState;
+
+
+    /** Used to represent out connection state to the XMPP server */
+    public static enum XMPPStreamState {
+        DISCONNECTED,   /* not connected to the server */
+        CONNECT_SENT,   /* we've sent the initial connect message */
+        CONNECT_RECV,   /* we've received a response to our connect message */
+        AUTH_SENT,      /* we've sent an authentication request */
+        CONNECTED       /* authentication is complete */
+    };
+
+
+    /** Used to represents where we are in the XML document stream. */
+    public static enum XMLState {
+        IN_NOTHING,
+        IN_BODY,
+        IN_THREAD,
+        IN_STATUS
+    };
+
+
+    /**
+     * Creates a new reader. Initializes the message queue.
+     * Sets the stream state to disconnected, and the xml
+     * state to in_nothing.
+     * @param inStream the inbound XML stream
+     */
+    public XMPPReader(InputStream inStream) {
+        msgQueue = new ConcurrentLinkedQueue();
+        this.inStream = inStream;
+        resetBuffers();
+        xmlState = XMLState.IN_NOTHING;
+        streamState = XMPPStreamState.DISCONNECTED;
+    }
+
+    /**
+     * Change the connect state and notify that a core 
+     * event has occurred.
+     */
+    protected void setXMPPStreamState(XMPPStreamState state) {
+        streamState = state;
+        notifyCoreEvent();
+    }
+
+    /**
+     * @return The current stream state of the reader 
+     */
+    public XMPPStreamState getXMPPStreamState() {
+        return streamState;
+    }
+
+
+    /**
+     * @return The next message in the queue, or null
+     */
+    public XMPPMessage popMessageQueue() {
+        return (XMPPMessage) msgQueue.poll();
+    }
+
+
+    /**
+     * Initializes the message buffers 
+     */
+    private void resetBuffers() {
+        msgBody = new StringBuffer();
+        msgThread = new StringBuffer();
+        msgStatus = new StringBuffer(); 
+        msgErrType = new StringBuffer();
+        msgFrom = "";
+        msgTo = "";
+    }
+
+
+    /**
+     * Notifies the waiting thread that a core event has occurred.
+     * Each reader should have exactly one dependent session thread. 
+     */
+    private synchronized void notifyCoreEvent() {
+        notify();
+    }
+
+
+    /**
+     * Waits up to timeout milliseconds for a core event to occur. 
+     * Also, having a message already waiting in the queue 
+     * constitutes a core event.
+     * @param timeout The number of milliseconds to wait.  If 
+     * timeout is negative, waits potentially forever.
+     * @return The number of milliseconds in wait
+     */
+    public synchronized long waitCoreEvent(int timeout) {
+
+        if(msgQueue.peek() != null || timeout == 0) return 0;
+
+        long start = new Date().getTime();
+        try{
+            if(timeout < 0) wait();
+            else wait(timeout);
+        } catch(InterruptedException ie) {}
+
+        return new Date().getTime() - start;
+    }
+
+
+    /** Thread kickoff point */
+    public void run() {
+        read();
+    }
+
+
+    /**
+     * Parses XML data from the provided XMPP stream.
+     * @param inStream The stream to parse.
+     */
+    public void read() {
+
+        try {
+            XMLInputFactory factory = XMLInputFactory.newInstance();
+
+            /** disable as many features as possible to speed up the parsing */
+            factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
+            factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
+            factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
+            factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
+            factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
+
+            /** create the stream reader */
+            XMLStreamReader reader = factory.createXMLStreamReader(inStream);
+            int eventType;
+
+            while(reader.hasNext()) {
+                /** cycle through the XML events */
+
+                eventType = reader.next();
+
+                switch(eventType) {
+
+                    case XMLEvent.START_ELEMENT:
+                        handleStartElement(reader);
+                        break;
+
+                    case XMLEvent.CHARACTERS:
+                        switch(xmlState) {
+                            case IN_BODY:
+                                msgBody.append(reader.getText());
+                                break;
+                            case IN_THREAD:
+                                msgThread.append(reader.getText());
+                                break;
+                            case IN_STATUS:
+                                msgStatus.append(reader.getText());
+                                break;
+                        }
+                        break;
+
+                    case XMLEvent.END_ELEMENT: 
+                        xmlState = XMLState.IN_NOTHING;
+                        if("message".equals(reader.getName().toString())) {
+
+                           /** build a message and add it to the message queue */
+                           XMPPMessage msg = new XMPPMessage();
+                           msg.setFrom(msgFrom);
+                           msg.setTo(msgTo);
+                           msg.setBody(msgBody.toString());
+                           msg.setThread(msgThread.toString());
+
+                           msgQueue.offer(msg);
+                           resetBuffers(); 
+                           notifyCoreEvent();
+                        }
+                        break;
+                }
+            }
+
+        } catch(javax.xml.stream.XMLStreamException se) {
+            /* XXX log an error, set a state, and notify */
+        }
+    }
+
+
+    /**
+     * Handles the start_element event.
+     */
+    private void handleStartElement(XMLStreamReader reader) {
+
+        String name = reader.getName().toString();
+
+        if("stream:stream".equals(name)) {
+            setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
+            return;
+        }
+
+        if("iq".equals(name)) {
+            if("result".equals(reader.getAttributeValue(null, "type")))
+                setXMPPStreamState(XMPPStreamState.CONNECTED);
+            return;
+        }
+
+        if("message".equals(name)) {
+            xmlState = XMLState.IN_BODY;
+            /** add a special case for the opensrf "router_from" attribute */
+            String rf = reader.getAttributeValue(null, "router_from");
+            if( rf != null )
+                msgFrom = rf;
+            else
+                msgFrom = reader.getAttributeValue(null, "from");
+            msgTo = reader.getAttributeValue(null, "to");
+            return;
+        }
+
+        if("thread".equals(name)) {
+            xmlState = XMLState.IN_THREAD;
+            return;
+        }
+
+        if("status".equals(name)) {
+            xmlState = XMLState.IN_STATUS;
+            return;
+        }
+
+        if("stream:error".equals(name)) {
+            setXMPPStreamState(XMPPStreamState.DISCONNECTED);
+            return;
+        }
+
+        if("error".equals(name)) {
+            msgErrType.append(reader.getAttributeValue(null, "type"));
+            msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
+            setXMPPStreamState(XMPPStreamState.DISCONNECTED);
+            return;
+        }
+    }
+}
+
+
+
+
diff --git a/src/java/org/opensrf/net/xmpp/XMPPSession.java b/src/java/org/opensrf/net/xmpp/XMPPSession.java
new file mode 100644 (file)
index 0000000..98b555f
--- /dev/null
@@ -0,0 +1,178 @@
+package org.opensrf.net.xmpp;
+
+import java.io.*;
+import java.net.Socket;
+
+
+/**
+ * Represents a single XMPP session.  Sessions are responsible for writing to
+ * the stream and for managing a stream reader.
+ */
+public class XMPPSession {
+
+    /** Initial jabber message */
+    public static final String JABBER_CONNECT = 
+        "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
+
+    /** Basic auth message */
+    public static final String JABBER_BASIC_AUTH =  
+        "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" +
+        "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
+
+    /** jabber domain */
+    private String host;
+    /** jabber port */
+    private int port;
+    /** jabber username */
+    private String username;
+    /** jabber password */
+    private String password;
+    /** jabber resource */
+    private String resource;
+
+    /** XMPP stream reader */
+    XMPPReader reader;
+    /** Fprint-capable socket writer */
+    PrintWriter writer;
+    /** Raw socket output stream */
+    OutputStream outStream;
+
+
+
+    /**
+     * Creates a new session.
+     * @param host The jabber domain
+     * @param port The jabber port
+     */
+    public XMPPSession( String host, int port ) {
+        this.host = host;
+        this.port = port;
+    }
+
+
+    /** true if this session is connected to the server */
+    public boolean connected() {
+        return (
+            reader != null && 
+            reader.getXMPPStreamState() == XMPPReader.XMPPStreamState.CONNECTED);
+    }
+
+
+    /**
+     * Connects to the network.
+     * @param username The jabber username
+     * @param password The jabber password
+     * @param resource The Jabber resource
+     */
+    public void connect(String username, String password, String resource) throws XMPPException {
+
+        this.username = username;
+        this.password = password;
+        this.resource = resource;
+
+        Socket socket;
+
+        try { 
+            /* open the socket and associated streams */
+            socket = new Socket(host, port);
+
+            /** the session maintains control over the output stream */
+            outStream = socket.getOutputStream();
+            writer = new PrintWriter(outStream, true);
+
+            /** pass the input stream to the reader */
+            reader = new XMPPReader(socket.getInputStream());
+
+        } catch(IOException ioe) {
+            throw new 
+                XMPPException("unable to communicate with host " + host + " on port " + port);
+        }
+
+        /* build the reader thread */
+        Thread thread = new Thread(reader);
+        thread.setDaemon(true);
+        thread.start();
+
+        /* send the initial jabber message */
+        sendConnect();
+        reader.waitCoreEvent(10000);
+        if( reader.getXMPPStreamState() != XMPPReader.XMPPStreamState.CONNECT_RECV ) 
+            throw new XMPPException("unable to connect to jabber server");
+
+        /* send the basic auth message */
+        sendBasicAuth(); /* XXX add support for other auth mechanisms */
+        reader.waitCoreEvent(10000);
+        if(!connected())
+            throw new XMPPException("Authentication failed");
+    }
+
+    /** Sends the initial jabber message */
+    private void sendConnect() {
+        writer.printf(JABBER_CONNECT, host);
+        reader.setXMPPStreamState(XMPPReader.XMPPStreamState.CONNECT_SENT);
+    }
+
+    /** Send the basic auth message */
+    private void sendBasicAuth() {
+        writer.printf(JABBER_BASIC_AUTH, username, password, resource);
+        reader.setXMPPStreamState(XMPPReader.XMPPStreamState.AUTH_SENT);
+    }
+
+
+    /**
+     * Sends an XMPPMessage.
+     * @param msg The message to send.
+     */
+    public void send(XMPPMessage msg) throws XMPPException {
+        checkConnected();
+        try {
+            outStream.write(msg.toXML().getBytes()); 
+        } catch (Exception e) {
+            throw new XMPPException(e.toString());
+        }
+    }
+
+
+    /**
+     * @throws XMPPException if we are no longer connected.
+     */
+    private void checkConnected() throws XMPPException {
+        if(!connected())
+            throw new XMPPException("Disconnected stream");
+    }
+
+
+    /**
+     * Receives messages from the network.  
+     * @param timeout Maximum number of milliseconds to wait for a message to arrive.
+     * If timeout is negative, this method will wait indefinitely.
+     * If timeout is 0, this method will not block at all, but will return a 
+     * message if there is already a message available.
+     */
+    public XMPPMessage recv(int timeout) throws XMPPException {
+
+        XMPPMessage msg;
+
+        if(timeout < 0) {
+
+            while(true) { /* wait indefinitely for a message to arrive */
+                reader.waitCoreEvent(timeout);
+                msg = reader.popMessageQueue();
+                if( msg != null ) return msg;
+                checkConnected();
+            }
+
+        } else {
+
+            while(timeout >= 0) { /* wait at most 'timeout' milleseconds for a message to arrive */
+                timeout -= reader.waitCoreEvent(timeout);
+                msg = reader.popMessageQueue();
+                if( msg != null ) return msg;
+                checkConnected();
+            }
+        }
+
+        return null;
+    }
+}
+
diff --git a/src/java/org/opensrf/test/TestXMPP.java b/src/java/org/opensrf/test/TestXMPP.java
new file mode 100644 (file)
index 0000000..8a34a2e
--- /dev/null
@@ -0,0 +1,57 @@
+package org.opensrf.test;
+
+import org.opensrf.net.xmpp.XMPPReader;
+import org.opensrf.net.xmpp.XMPPMessage;
+import org.opensrf.net.xmpp.XMPPSession;
+
+public class TestXMPP {
+
+    public static void main(String args[]) throws Exception {
+
+        String host;
+        int port;
+        String username;
+        String password;
+        String resource;
+        String recipient;
+
+        try {
+            host = args[0];
+            port = Integer.parseInt(args[1]);
+            username = args[2];
+            password = args[3];
+            resource = args[4];
+
+        } catch(ArrayIndexOutOfBoundsException e) {
+            System.err.println("usage: org.opensrf.test.TestXMPP <host> <port> <username> <password> <resource>");
+            return;
+        }
+
+        XMPPSession session = new XMPPSession(host, port);
+        session.connect(username, password, resource);
+
+        XMPPMessage msg;
+
+        if( args.length == 6 ) {
+            /** they specified a recipient */
+            recipient = args[5];
+            msg = new XMPPMessage();
+            msg.setTo(recipient);
+            msg.setThread("test-thread");
+            msg.setBody("Hello, from java-xmpp");
+            System.out.println("Sending message to " + recipient);
+            session.send(msg);
+        }
+
+        while(true) {
+            System.out.println("waiting for message...");
+            msg = session.recv(-1);
+            System.out.println("got message: " + msg.toXML());
+        }
+    }
+}
+
+
+
+
+