implemented enough of the stack/session logic to make stateless requests and receive...
authorerickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Fri, 11 May 2007 15:45:27 +0000 (15:45 +0000)
committererickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Fri, 11 May 2007 15:45:27 +0000 (15:45 +0000)
git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@884 9efc2488-bf62-4759-914b-345cdb29e865

src/java/org/opensrf/ClientSession.java
src/java/org/opensrf/Message.java
src/java/org/opensrf/Request.java
src/java/org/opensrf/Session.java
src/java/org/opensrf/Stack.java
src/java/org/opensrf/net/xmpp/XMPPSession.java
src/java/org/opensrf/test/TestClient.java [new file with mode: 0644]
src/java/org/opensrf/test/TestJSON.java
src/java/org/opensrf/util/Config.java
src/java/org/opensrf/util/OSRFObject.java

index 9f3483e..db44606 100644 (file)
@@ -3,6 +3,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Random;
+import java.util.Arrays;
 
 import org.opensrf.util.*;
 
@@ -21,20 +22,103 @@ public class ClientSession extends Session {
     private List<Request> requests;
 
     /**
-     * @param service The remove service to communicate with
+     * Creates a new client session.  Initializes the 
+     * @param service The remove service.
      */
     public ClientSession(String service) throws ConfigException {
         this.service = service;
-        domain = (String) Config.getFirst("/domain/domains");
-        router = (String) Config.getString("/router_name");
+
+        /** generate the remote node string */
+        domain = (String) Config.getFirst("/domains/domain");
+        router = Config.getString("/router_name");
         setRemoteNode(router + "@" + domain + "/" + service);
         origRemoteNode = getRemoteNode();
-        requests = new ArrayList<Request>();
-        nextId = 0;
+
+
+        /* create a random thread */
         long time = new Date().getTime();
         Random rand = new Random(time);
-        thread = rand.nextInt()+""+rand.nextInt()+""+time;
+        setThread(rand.nextInt()+""+rand.nextInt()+""+time);
+
+        requests = new ArrayList<Request>();
+        nextId = 0;
         cacheSession();
     }
+
+    /**
+     * Creates a new request to send to our remote service.
+     * @param method The method API name
+     * @param params The list of method parameters
+     * @return The request object.
+     */
+    public Request request(String method, List<Object> params) throws SessionException {
+        return request(new Request(this, nextId++, method, params));
+    }
+
+    /**
+     * Creates a new request to send to our remote service.
+     * @param method The method API name
+     * @param params The list of method parameters
+     * @return The request object.
+     */
+    public Request request(String method, Object[] params) throws SessionException {
+        return request(new Request(this, nextId++, method, Arrays.asList(params)));
+    }
+
+
+    /**
+     * Creates a new request to send to our remote service.
+     * @param method The method API name
+     * @return The request object.
+     */
+    public Request request(String method) throws SessionException {
+        return request(new Request(this, nextId++, method));
+    }
+
+
+    public Request request(Request req) throws SessionException {
+        if(getConnectState() != ConnectState.CONNECTED)
+            resetRemoteId();
+        //requests.set(req.getId(), req); 
+        requests.add(req); 
+        req.send();
+        return req;
+    }
+
+
+    /**
+     * Resets the remoteNode to its original state.
+     */
+    public void resetRemoteId() {
+        setRemoteNode(origRemoteNode);
+    }
+
+
+    /**
+     * Pushes a response onto the queue of the appropriate request.
+     * @param msg The the received RESULT Message whose payload 
+     * contains a Result object.
+     */
+    public void pushResponse(Message msg) {
+
+        Request req;
+
+        try {
+           req = requests.get(msg.getId());
+        } catch(IndexOutOfBoundsException e) {
+            /** LOG that an unexpected response arrived */
+            return;
+        }
+
+        OSRFObject payload = (OSRFObject) msg.get("payload");
+
+        req.pushResponse(
+            new Result( 
+                payload.getString("status"), 
+                payload.getInt("statusCode"),
+                payload.get("content")
+            )
+        );
+    }
 }
 
index f1db9c7..35e3e59 100644 (file)
@@ -4,19 +4,16 @@ import org.opensrf.util.*;
 
 public class Message implements OSRFSerializable {
 
-    /** Message types */
-    public enum Type {
-        REQUEST,
-        STATUS,
-        RESULT,
-        CONNECT,
-        DISCONNECT,
-    };
+    public static final String REQUEST = "REQUEST";
+    public static final String STATUS = "STATUS";
+    public static final String RESULT = "RESULT";
+    public static final String CONNECT = "CONNECT";
+    public static final String DISCONNECT = "DISCONNECT";
 
     /** Message ID.  This number is used to relate requests to responses */
     private int id;
-    /** Type of message. */
-    private Type type;
+    /** String of message. */
+    private String type;
     /** message payload */
     private Object payload;
 
@@ -31,11 +28,11 @@ public class Message implements OSRFSerializable {
      * @param id This message's ID
      * @param type The type of message
      */
-    public Message(int id, Type type) {
+    public Message(int id, String type) {
         setId(id);
-        setType(type);
+        setString(type);
     }
-    public Message(int id, Type type, Object payload) {
+    public Message(int id, String type, Object payload) {
         this(id, type);
         setPayload(payload);
     }
@@ -44,7 +41,7 @@ public class Message implements OSRFSerializable {
     public int getId() {
         return id;
     }   
-    public Type getType() {
+    public String getType() {
         return type;
     }
     public Object getPayload() {
@@ -53,7 +50,7 @@ public class Message implements OSRFSerializable {
     public void setId(int id) {
         this.id = id;
     }
-    public void setType(Type type) {
+    public void setString(String type) {
         this.type = type;
     }
     public void setPayload(Object p) {
index b646950..4c61ec5 100644 (file)
@@ -2,6 +2,7 @@ package org.opensrf;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.List;
+import java.util.Date;
 import org.opensrf.net.xmpp.XMPPException;
 
 public class Request {
@@ -22,11 +23,49 @@ public class Request {
         resetTimeout = false;
     }
 
+    public Request(ClientSession ses, int id, String methodName) {
+        this(ses, id, new Method(methodName));
+    }
+
     public Request(ClientSession ses, int id, String methodName, List<Object> params) {
         this(ses, id, new Method(methodName, params));
     }
 
-    public void send() throws XMPPException {
-        session.send(new Message(id, Message.Type.REQUEST, method));
+    public void send() throws SessionException {
+        session.send(new Message(id, Message.REQUEST, method));
+    }
+
+    public Result recv(long millis) throws SessionException {
+
+        Result result = null;
+
+        if(millis < 0) {
+            session.waitForMessage(millis);
+            if((result = resultQueue.poll()) != null)
+                return result;
+
+        } else {
+
+            while(millis >= 0) {
+                long start = new Date().getTime();
+                session.waitForMessage(millis);
+                millis -= new Date().getTime() - start;
+                if((result = resultQueue.poll()) != null)
+                    return result;
+            }
+        }
+
+        return null;
+    }
+
+    public void pushResponse(Result result) {
+        resultQueue.offer(result);
+    }
+
+    /**
+     * @return This request's ID
+     */
+    public int getId() {
+        return id;
     }
 }
index 2eb9aa9..01f1d21 100644 (file)
@@ -3,6 +3,7 @@ import org.opensrf.util.JSONWriter;
 import org.opensrf.net.xmpp.*;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.Arrays;
 
 public abstract class Session {
 
@@ -28,7 +29,7 @@ public abstract class Session {
      * In other words, each session has a unique thread, and all messages 
      * in that session will carry this thread around as an indicator.
      */
-    protected String thread;
+    private String thread;
 
     public Session() {
         connectState = ConnectState.DISCONNECTED;
@@ -37,20 +38,20 @@ public abstract class Session {
     /**
      * Sends a Message to our remoteNode.
      */
-    public void send(Message omsg) throws XMPPException {
+    public void send(Message omsg) throws SessionException {
 
         /** construct the XMPP message */
         XMPPMessage xmsg = new XMPPMessage();
         xmsg.setTo(remoteNode);
         xmsg.setThread(thread);
-        xmsg.setBody(new JSONWriter(omsg).write());
+        xmsg.setBody(new JSONWriter(Arrays.asList(new Message[] {omsg})).write());
         XMPPSession ses = XMPPSession.getGlobalSession();
 
         try {
             XMPPSession.getGlobalSession().send(xmsg);
         } catch(XMPPException e) {
-            /* XXX log.. what else? */
             connectState = ConnectState.DISCONNECTED;
+            throw new SessionException("Error sending message to " + remoteNode, e);
         }
     }
 
@@ -59,12 +60,12 @@ public abstract class Session {
      * all received messages to the stack for processing
      * @param millis The number of milliseconds to wait for a message to arrive
      */
-    public static void waitForMessage(long millis) {
+    public static void waitForMessage(long millis) throws SessionException {
         try {
             Stack.processXMPPMessage(
                 XMPPSession.getGlobalSession().recv(millis));
         } catch(XMPPException e) {
-            /* XXX log.. what else? */
+            throw new SessionException("Error waiting for message", e);
         }
     }
 
@@ -94,4 +95,37 @@ public abstract class Session {
     public String getRemoteNode() {
         return remoteNode;
     }
+
+
+    /**
+     * Get thread.
+     * @return thread as String.
+     */
+    public String getThread() {
+        return thread;
+    }
+    
+    /**
+     * Set thread.
+     * @param thread the value to set.
+     */
+    public void setThread(String thread) {
+        this.thread = thread;
+    }
+    
+    /**
+     * Get connectState.
+     * @return connectState as ConnectState.
+     */
+    public ConnectState getConnectState() {
+        return connectState;
+    }
+    
+    /**
+     * Set connectState.
+     * @param connectState the value to set.
+     */
+    public void setConnectState(ConnectState connectState) {
+        this.connectState = connectState;
+    }
 }
index 8ee753d..58a39a5 100644 (file)
@@ -10,6 +10,8 @@ public class Stack {
 
     public static void processXMPPMessage(XMPPMessage msg) {
 
+        if(msg == null) return;
+
         Session ses = Session.findCachedSession(msg.getThread());
 
         if(ses == null) {
@@ -24,6 +26,7 @@ public class Stack {
             msgList = new JSONReader(msg.getBody()).readArray();
         } catch(JSONException e) {
             /** XXX LOG error */
+            e.printStackTrace();
             return;
         }
 
@@ -33,13 +36,18 @@ public class Stack {
         long start = new Date().getTime();
 
         while(itr.hasNext()) {
+
             /** Construct a Message object from the generic OSRFObject returned from parsing */
             obj = (OSRFObject) itr.next();
-            processOSRFMessage(ses, 
+
+            processOSRFMessage(
+                ses, 
                 new Message(
-                    ((Integer) obj.get("threadTrace")).intValue(),
-                    (Message.Type) obj.get("type"),
-                    obj.get("payload")));
+                    obj.getInt("threadTrace"),
+                    obj.getString("type"),
+                    obj.get("payload")
+                )
+            );
         }
 
         /** LOG the duration */
@@ -47,14 +55,18 @@ public class Stack {
 
     public static void processOSRFMessage(Session ses, Message msg) {
         if( ses instanceof ClientSession ) 
-            processServerResponse((ClientSession) ses, msg);
+            processResponse((ClientSession) ses, msg);
         else
-            processClientRequest((ServerSession) ses, msg);
+            processRequest((ServerSession) ses, msg);
     }
 
-    public static void processServerResponse(ClientSession session, Message msg) {
+    public static void processResponse(ClientSession session, Message msg) {
+        if(msg.RESULT.equals(msg.getType())) {
+            session.pushResponse(msg);
+            return;
+        }
     }
 
-    public static void processClientRequest(ServerSession session, Message msg) {
+    public static void processRequest(ServerSession session, Message msg) {
     }
 }
index 68147aa..bcd304e 100644 (file)
@@ -138,7 +138,8 @@ public class XMPPSession {
     public synchronized void send(XMPPMessage msg) throws XMPPException {
         checkConnected();
         try {
-            outStream.write(msg.toXML().getBytes()); 
+            String xml = msg.toXML();
+            outStream.write(xml.getBytes()); 
         } catch (Exception e) {
             throw new XMPPException(e.toString());
         }
diff --git a/src/java/org/opensrf/test/TestClient.java b/src/java/org/opensrf/test/TestClient.java
new file mode 100644 (file)
index 0000000..3b9920f
--- /dev/null
@@ -0,0 +1,62 @@
+package org.opensrf.test;
+import org.opensrf.*;
+import org.opensrf.util.*;
+import org.opensrf.net.xmpp.*;
+import java.io.PrintStream;
+import java.util.Map;
+
+
+public class TestClient {
+    public static void main(String args[]) throws Exception {
+        
+        PrintStream out = System.out;
+
+        try {
+
+            /** setup the config parser */
+            String configFile = args[0];
+            Config config = new Config("/config/opensrf");
+            config.parse(configFile);
+            Config.setConfig(config);
+
+            /** Connect to jabber */
+            String username = Config.getString("/username");
+            String passwd = Config.getString("/passwd");
+            String host = (String) Config.getFirst("/domains/domain");
+            int port = Config.getInt("/port");
+            XMPPSession xses = new XMPPSession(host, port);
+            xses.connect(username, passwd, "test-java-client");
+            XMPPSession.setGlobalSession(xses);
+    
+            /** build the client session and send the request */
+            ClientSession session = new ClientSession("opensrf.settings");
+            Request request = session.request(
+                "opensrf.settings.host_config.get", 
+                new String[] {args[1]}
+            );
+
+            Result result = request.recv(10000);
+            if(result == null) {
+                out.println("no result");
+                return;
+            }
+
+            out.println("status = " + result.getStatus());
+            out.println("status code = " + result.getStatusCode());
+
+            out.println("setting config memcache server(s) = " +
+                new JSONWriter(
+                    Utils.findPath( (Map) result.getContent(), 
+                    "/cache/global/servers/server")
+                ).write());
+
+
+        } catch(ArrayIndexOutOfBoundsException e) {
+            out.println("usage: org.opensrf.test.TestClient <osrfConfigFile> <domain>");
+            return;
+        }
+    }
+}
+
+
+
index 75cb7eb..b19d408 100644 (file)
@@ -37,7 +37,7 @@ public class TestJSON {
         System.out.println(new JSONWriter(map).write() + "\n");
 
 
-        Message m = new Message(1, Message.Type.REQUEST);
+        Message m = new Message(1, Message.REQUEST);
         Method method = new Method("opensrf.settings.host_config.get");
         method.addParam("app07.dev.gapines.org");
         m.setPayload(method);
index 6f430c4..388afc7 100644 (file)
@@ -56,6 +56,10 @@ public class Config {
         }
     }
 
+    public static int getInt(String path) throws ConfigException {
+        return Integer.parseInt(getString(path));
+    }
+
     /**
      * Returns the configuration object found at the requested path.
      * @see org.opensrf.util.Utils.findPath for path description.
index 6ea0db3..65a340b 100644 (file)
@@ -46,4 +46,12 @@ public class OSRFObject extends HashMap<String, Object> implements OSRFSerializa
     public Object get(String field) {
         return super.get(field);
     }
+
+    public String getString(String field) {
+        return (String) get(field);
+    }
+
+    public int getInt(String field) {
+        return ((Integer) get(field)).intValue();
+    }
 }