--- /dev/null
+package org.opensrf.net.xmpp;
+
+/**
+ * Used for XMPP stream/authentication errors
+ */
+public class XMPPException extends Exception {
+ private String info;
+
+ /**
+ * @param info Runtime exception information.
+ */
+ public XMPPException(String info) {
+ this.info = info;
+ }
+ public String toString() {
+ return this.info;
+ }
+}
--- /dev/null
+package org.opensrf.net.xmpp;
+
+import java.io.*;
+
+
+/*
+ * uncomment to use the DOM serialization code...
+
+import org.w3c.dom.*;
+import org.apache.xerces.dom.DocumentImpl;
+import org.apache.xerces.dom.DOMImplementationImpl;
+import org.apache.xml.serialize.OutputFormat;
+import org.apache.xml.serialize.Serializer;
+import org.apache.xml.serialize.SerializerFactory;
+import org.apache.xml.serialize.XMLSerializer;
+*/
+
+
+/**
+ * Models a single XMPP message.
+ */
+public class XMPPMessage {
+
+ /** Message body */
+ private String body;
+ /** Message recipient */
+ private String to;
+ /** Message sender */
+ private String from;
+ /** Message thread */
+ private String thread;
+ /** Message xid */
+ private String xid;
+
+ public XMPPMessage() {
+ }
+
+ public String getBody() {
+ return body;
+ }
+ public String getTo() {
+ return to;
+ }
+ public String getFrom() {
+ return from;
+ }
+ public String getThread() {
+ return thread;
+ }
+ public String getXid() {
+ return xid;
+ }
+ public void setBody(String body) {
+ this.body = body;
+ }
+ public void setTo(String to) {
+ this.to = to;
+ }
+ public void setFrom(String from) {
+ this.from = from;
+ }
+ public void setThread(String thread) {
+ this.thread = thread;
+ }
+ public void setXid(String xid) {
+ this.xid = xid;
+ }
+
+
+ /**
+ * Generates the XML representation of this message.
+ */
+ public String toXML() {
+ StringBuffer sb = new StringBuffer("<message to='");
+ escapeXML(to, sb);
+ sb.append("' osrf_xid='");
+ escapeXML(xid, sb);
+ sb.append("'><thread>");
+ escapeXML(thread, sb);
+ sb.append("</thread><body>");
+ escapeXML(body, sb);
+ sb.append("</body></message>");
+ return sb.toString();
+ }
+
+
+ /**
+ * Escapes non-valid XML characters.
+ * @param s The string to escape.
+ * @param sb The StringBuffer to append new data to.
+ */
+ private void escapeXML(String s, StringBuffer sb) {
+ if( s == null ) return;
+ char c;
+ int l = s.length();
+ for( int i = 0; i < l; i++ ) {
+ c = s.charAt(i);
+ switch(c) {
+ case '<':
+ sb.append("<");
+ break;
+ case '>':
+ sb.append(">");
+ break;
+ case '&':
+ sb.append("&");
+ break;
+ default:
+ sb.append(c);
+ }
+ }
+ }
+
+
+
+ /**
+ * This is a DOM implementataion of message serialization.
+ * I'm inclined to think the stringbuffer version is faster, but
+ * I have no proof.
+ */
+ /*
+ public String __toXML() {
+
+ Document doc = new DocumentImpl();
+ Element message = doc.createElement("message");
+ Element body = doc.createElement("body");
+ Element thread = doc.createElement("thread");
+
+ doc.appendChild(message);
+ message.setAttribute("to", getTo());
+ message.setAttribute("from", getFrom());
+ message.appendChild(body);
+ message.appendChild(thread);
+
+ body.appendChild(doc.createTextNode(getBody()));
+ thread.appendChild(doc.createTextNode(getThread()));
+
+ XMLSerializer serializer = new XMLSerializer();
+ StringWriter strWriter = new StringWriter();
+ OutputFormat outFormat = new OutputFormat();
+
+ outFormat.setEncoding("UTF-8");
+ outFormat.setVersion("1.0");
+ outFormat.setIndenting(false);
+ outFormat.setOmitXMLDeclaration(true);
+
+ serializer.setOutputCharStream(strWriter);
+ serializer.setOutputFormat(outFormat);
+
+ try {
+ serializer.serialize(doc);
+ } catch(IOException ioe) {
+ }
+ return strWriter.toString();
+ }
+ */
+}
+
+
--- /dev/null
+package org.opensrf.net.xmpp;
+
+import javax.xml.stream.*;
+import javax.xml.stream.events.* ;
+import javax.xml.namespace.QName;
+import java.util.Queue;
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Date;
+
+
+/**
+ * Slim XMPP Stream reader. This reader only understands enough XMPP
+ * to handle logins and recv messages.
+ * @author Bill Erickson, Georgia Public Library Systems
+ */
+public class XMPPReader implements Runnable {
+
+ /** Queue of received messages. */
+ private Queue<XMPPMessage> msgQueue;
+ /** Incoming XMPP XML stream */
+ private InputStream inStream;
+ /** Current message body */
+ private StringBuffer msgBody;
+ /** Current message thread */
+ private StringBuffer msgThread;
+ /** Current message status */
+ private StringBuffer msgStatus;
+ /** Current message error type */
+ private StringBuffer msgErrType;
+ /** Current message sender */
+ private String msgFrom;
+ /** Current message recipient */
+ private String msgTo;
+ /** Current message error code */
+ private int msgErrCode;
+
+ /** Where this reader currently is in the document */
+ private XMLState xmlState;
+
+ /** The current connect state to the XMPP server */
+ private XMPPStreamState streamState;
+
+
+ /** Used to represent out connection state to the XMPP server */
+ public static enum XMPPStreamState {
+ DISCONNECTED, /* not connected to the server */
+ CONNECT_SENT, /* we've sent the initial connect message */
+ CONNECT_RECV, /* we've received a response to our connect message */
+ AUTH_SENT, /* we've sent an authentication request */
+ CONNECTED /* authentication is complete */
+ };
+
+
+ /** Used to represents where we are in the XML document stream. */
+ public static enum XMLState {
+ IN_NOTHING,
+ IN_BODY,
+ IN_THREAD,
+ IN_STATUS
+ };
+
+
+ /**
+ * Creates a new reader. Initializes the message queue.
+ * Sets the stream state to disconnected, and the xml
+ * state to in_nothing.
+ * @param inStream the inbound XML stream
+ */
+ public XMPPReader(InputStream inStream) {
+ msgQueue = new ConcurrentLinkedQueue();
+ this.inStream = inStream;
+ resetBuffers();
+ xmlState = XMLState.IN_NOTHING;
+ streamState = XMPPStreamState.DISCONNECTED;
+ }
+
+ /**
+ * Change the connect state and notify that a core
+ * event has occurred.
+ */
+ protected void setXMPPStreamState(XMPPStreamState state) {
+ streamState = state;
+ notifyCoreEvent();
+ }
+
+ /**
+ * @return The current stream state of the reader
+ */
+ public XMPPStreamState getXMPPStreamState() {
+ return streamState;
+ }
+
+
+ /**
+ * @return The next message in the queue, or null
+ */
+ public XMPPMessage popMessageQueue() {
+ return (XMPPMessage) msgQueue.poll();
+ }
+
+
+ /**
+ * Initializes the message buffers
+ */
+ private void resetBuffers() {
+ msgBody = new StringBuffer();
+ msgThread = new StringBuffer();
+ msgStatus = new StringBuffer();
+ msgErrType = new StringBuffer();
+ msgFrom = "";
+ msgTo = "";
+ }
+
+
+ /**
+ * Notifies the waiting thread that a core event has occurred.
+ * Each reader should have exactly one dependent session thread.
+ */
+ private synchronized void notifyCoreEvent() {
+ notify();
+ }
+
+
+ /**
+ * Waits up to timeout milliseconds for a core event to occur.
+ * Also, having a message already waiting in the queue
+ * constitutes a core event.
+ * @param timeout The number of milliseconds to wait. If
+ * timeout is negative, waits potentially forever.
+ * @return The number of milliseconds in wait
+ */
+ public synchronized long waitCoreEvent(int timeout) {
+
+ if(msgQueue.peek() != null || timeout == 0) return 0;
+
+ long start = new Date().getTime();
+ try{
+ if(timeout < 0) wait();
+ else wait(timeout);
+ } catch(InterruptedException ie) {}
+
+ return new Date().getTime() - start;
+ }
+
+
+ /** Thread kickoff point */
+ public void run() {
+ read();
+ }
+
+
+ /**
+ * Parses XML data from the provided XMPP stream.
+ * @param inStream The stream to parse.
+ */
+ public void read() {
+
+ try {
+ XMLInputFactory factory = XMLInputFactory.newInstance();
+
+ /** disable as many features as possible to speed up the parsing */
+ factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
+ factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
+ factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
+ factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
+ factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
+
+ /** create the stream reader */
+ XMLStreamReader reader = factory.createXMLStreamReader(inStream);
+ int eventType;
+
+ while(reader.hasNext()) {
+ /** cycle through the XML events */
+
+ eventType = reader.next();
+
+ switch(eventType) {
+
+ case XMLEvent.START_ELEMENT:
+ handleStartElement(reader);
+ break;
+
+ case XMLEvent.CHARACTERS:
+ switch(xmlState) {
+ case IN_BODY:
+ msgBody.append(reader.getText());
+ break;
+ case IN_THREAD:
+ msgThread.append(reader.getText());
+ break;
+ case IN_STATUS:
+ msgStatus.append(reader.getText());
+ break;
+ }
+ break;
+
+ case XMLEvent.END_ELEMENT:
+ xmlState = XMLState.IN_NOTHING;
+ if("message".equals(reader.getName().toString())) {
+
+ /** build a message and add it to the message queue */
+ XMPPMessage msg = new XMPPMessage();
+ msg.setFrom(msgFrom);
+ msg.setTo(msgTo);
+ msg.setBody(msgBody.toString());
+ msg.setThread(msgThread.toString());
+
+ msgQueue.offer(msg);
+ resetBuffers();
+ notifyCoreEvent();
+ }
+ break;
+ }
+ }
+
+ } catch(javax.xml.stream.XMLStreamException se) {
+ /* XXX log an error, set a state, and notify */
+ }
+ }
+
+
+ /**
+ * Handles the start_element event.
+ */
+ private void handleStartElement(XMLStreamReader reader) {
+
+ String name = reader.getName().toString();
+
+ if("stream:stream".equals(name)) {
+ setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
+ return;
+ }
+
+ if("iq".equals(name)) {
+ if("result".equals(reader.getAttributeValue(null, "type")))
+ setXMPPStreamState(XMPPStreamState.CONNECTED);
+ return;
+ }
+
+ if("message".equals(name)) {
+ xmlState = XMLState.IN_BODY;
+ /** add a special case for the opensrf "router_from" attribute */
+ String rf = reader.getAttributeValue(null, "router_from");
+ if( rf != null )
+ msgFrom = rf;
+ else
+ msgFrom = reader.getAttributeValue(null, "from");
+ msgTo = reader.getAttributeValue(null, "to");
+ return;
+ }
+
+ if("thread".equals(name)) {
+ xmlState = XMLState.IN_THREAD;
+ return;
+ }
+
+ if("status".equals(name)) {
+ xmlState = XMLState.IN_STATUS;
+ return;
+ }
+
+ if("stream:error".equals(name)) {
+ setXMPPStreamState(XMPPStreamState.DISCONNECTED);
+ return;
+ }
+
+ if("error".equals(name)) {
+ msgErrType.append(reader.getAttributeValue(null, "type"));
+ msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
+ setXMPPStreamState(XMPPStreamState.DISCONNECTED);
+ return;
+ }
+ }
+}
+
+
+
+
--- /dev/null
+package org.opensrf.net.xmpp;
+
+import java.io.*;
+import java.net.Socket;
+
+
+/**
+ * Represents a single XMPP session. Sessions are responsible for writing to
+ * the stream and for managing a stream reader.
+ */
+public class XMPPSession {
+
+ /** Initial jabber message */
+ public static final String JABBER_CONNECT =
+ "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
+
+ /** Basic auth message */
+ public static final String JABBER_BASIC_AUTH =
+ "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" +
+ "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
+
+ /** jabber domain */
+ private String host;
+ /** jabber port */
+ private int port;
+ /** jabber username */
+ private String username;
+ /** jabber password */
+ private String password;
+ /** jabber resource */
+ private String resource;
+
+ /** XMPP stream reader */
+ XMPPReader reader;
+ /** Fprint-capable socket writer */
+ PrintWriter writer;
+ /** Raw socket output stream */
+ OutputStream outStream;
+
+
+
+ /**
+ * Creates a new session.
+ * @param host The jabber domain
+ * @param port The jabber port
+ */
+ public XMPPSession( String host, int port ) {
+ this.host = host;
+ this.port = port;
+ }
+
+
+ /** true if this session is connected to the server */
+ public boolean connected() {
+ return (
+ reader != null &&
+ reader.getXMPPStreamState() == XMPPReader.XMPPStreamState.CONNECTED);
+ }
+
+
+ /**
+ * Connects to the network.
+ * @param username The jabber username
+ * @param password The jabber password
+ * @param resource The Jabber resource
+ */
+ public void connect(String username, String password, String resource) throws XMPPException {
+
+ this.username = username;
+ this.password = password;
+ this.resource = resource;
+
+ Socket socket;
+
+ try {
+ /* open the socket and associated streams */
+ socket = new Socket(host, port);
+
+ /** the session maintains control over the output stream */
+ outStream = socket.getOutputStream();
+ writer = new PrintWriter(outStream, true);
+
+ /** pass the input stream to the reader */
+ reader = new XMPPReader(socket.getInputStream());
+
+ } catch(IOException ioe) {
+ throw new
+ XMPPException("unable to communicate with host " + host + " on port " + port);
+ }
+
+ /* build the reader thread */
+ Thread thread = new Thread(reader);
+ thread.setDaemon(true);
+ thread.start();
+
+ /* send the initial jabber message */
+ sendConnect();
+ reader.waitCoreEvent(10000);
+ if( reader.getXMPPStreamState() != XMPPReader.XMPPStreamState.CONNECT_RECV )
+ throw new XMPPException("unable to connect to jabber server");
+
+ /* send the basic auth message */
+ sendBasicAuth(); /* XXX add support for other auth mechanisms */
+ reader.waitCoreEvent(10000);
+ if(!connected())
+ throw new XMPPException("Authentication failed");
+ }
+
+ /** Sends the initial jabber message */
+ private void sendConnect() {
+ writer.printf(JABBER_CONNECT, host);
+ reader.setXMPPStreamState(XMPPReader.XMPPStreamState.CONNECT_SENT);
+ }
+
+ /** Send the basic auth message */
+ private void sendBasicAuth() {
+ writer.printf(JABBER_BASIC_AUTH, username, password, resource);
+ reader.setXMPPStreamState(XMPPReader.XMPPStreamState.AUTH_SENT);
+ }
+
+
+ /**
+ * Sends an XMPPMessage.
+ * @param msg The message to send.
+ */
+ public void send(XMPPMessage msg) throws XMPPException {
+ checkConnected();
+ try {
+ outStream.write(msg.toXML().getBytes());
+ } catch (Exception e) {
+ throw new XMPPException(e.toString());
+ }
+ }
+
+
+ /**
+ * @throws XMPPException if we are no longer connected.
+ */
+ private void checkConnected() throws XMPPException {
+ if(!connected())
+ throw new XMPPException("Disconnected stream");
+ }
+
+
+ /**
+ * Receives messages from the network.
+ * @param timeout Maximum number of milliseconds to wait for a message to arrive.
+ * If timeout is negative, this method will wait indefinitely.
+ * If timeout is 0, this method will not block at all, but will return a
+ * message if there is already a message available.
+ */
+ public XMPPMessage recv(int timeout) throws XMPPException {
+
+ XMPPMessage msg;
+
+ if(timeout < 0) {
+
+ while(true) { /* wait indefinitely for a message to arrive */
+ reader.waitCoreEvent(timeout);
+ msg = reader.popMessageQueue();
+ if( msg != null ) return msg;
+ checkConnected();
+ }
+
+ } else {
+
+ while(timeout >= 0) { /* wait at most 'timeout' milleseconds for a message to arrive */
+ timeout -= reader.waitCoreEvent(timeout);
+ msg = reader.popMessageQueue();
+ if( msg != null ) return msg;
+ checkConnected();
+ }
+ }
+
+ return null;
+ }
+}
+
--- /dev/null
+package org.opensrf.test;
+
+import org.opensrf.net.xmpp.XMPPReader;
+import org.opensrf.net.xmpp.XMPPMessage;
+import org.opensrf.net.xmpp.XMPPSession;
+
+public class TestXMPP {
+
+ public static void main(String args[]) throws Exception {
+
+ String host;
+ int port;
+ String username;
+ String password;
+ String resource;
+ String recipient;
+
+ try {
+ host = args[0];
+ port = Integer.parseInt(args[1]);
+ username = args[2];
+ password = args[3];
+ resource = args[4];
+
+ } catch(ArrayIndexOutOfBoundsException e) {
+ System.err.println("usage: org.opensrf.test.TestXMPP <host> <port> <username> <password> <resource>");
+ return;
+ }
+
+ XMPPSession session = new XMPPSession(host, port);
+ session.connect(username, password, resource);
+
+ XMPPMessage msg;
+
+ if( args.length == 6 ) {
+ /** they specified a recipient */
+ recipient = args[5];
+ msg = new XMPPMessage();
+ msg.setTo(recipient);
+ msg.setThread("test-thread");
+ msg.setBody("Hello, from java-xmpp");
+ System.out.println("Sending message to " + recipient);
+ session.send(msg);
+ }
+
+ while(true) {
+ System.out.println("waiting for message...");
+ msg = session.recv(-1);
+ System.out.println("got message: " + msg.toXML());
+ }
+ }
+}
+
+
+
+
+