From: erickson Date: Fri, 11 May 2007 15:45:27 +0000 (+0000) Subject: implemented enough of the stack/session logic to make stateless requests and receive... X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=afc3f92e923288fec47b18d703d839c886502092;p=opensrf%2Fbjwebb.git implemented enough of the stack/session logic to make stateless requests and receive responses git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@884 9efc2488-bf62-4759-914b-345cdb29e865 --- diff --git a/src/java/org/opensrf/ClientSession.java b/src/java/org/opensrf/ClientSession.java index 9f3483e..db44606 100644 --- a/src/java/org/opensrf/ClientSession.java +++ b/src/java/org/opensrf/ClientSession.java @@ -3,6 +3,7 @@ import java.util.Date; import java.util.List; import java.util.ArrayList; import java.util.Random; +import java.util.Arrays; import org.opensrf.util.*; @@ -21,20 +22,103 @@ public class ClientSession extends Session { private List requests; /** - * @param service The remove service to communicate with + * Creates a new client session. Initializes the + * @param service The remove service. */ public ClientSession(String service) throws ConfigException { this.service = service; - domain = (String) Config.getFirst("/domain/domains"); - router = (String) Config.getString("/router_name"); + + /** generate the remote node string */ + domain = (String) Config.getFirst("/domains/domain"); + router = Config.getString("/router_name"); setRemoteNode(router + "@" + domain + "/" + service); origRemoteNode = getRemoteNode(); - requests = new ArrayList(); - nextId = 0; + + + /* create a random thread */ long time = new Date().getTime(); Random rand = new Random(time); - thread = rand.nextInt()+""+rand.nextInt()+""+time; + setThread(rand.nextInt()+""+rand.nextInt()+""+time); + + requests = new ArrayList(); + nextId = 0; cacheSession(); } + + /** + * Creates a new request to send to our remote service. + * @param method The method API name + * @param params The list of method parameters + * @return The request object. + */ + public Request request(String method, List params) throws SessionException { + return request(new Request(this, nextId++, method, params)); + } + + /** + * Creates a new request to send to our remote service. + * @param method The method API name + * @param params The list of method parameters + * @return The request object. + */ + public Request request(String method, Object[] params) throws SessionException { + return request(new Request(this, nextId++, method, Arrays.asList(params))); + } + + + /** + * Creates a new request to send to our remote service. + * @param method The method API name + * @return The request object. + */ + public Request request(String method) throws SessionException { + return request(new Request(this, nextId++, method)); + } + + + public Request request(Request req) throws SessionException { + if(getConnectState() != ConnectState.CONNECTED) + resetRemoteId(); + //requests.set(req.getId(), req); + requests.add(req); + req.send(); + return req; + } + + + /** + * Resets the remoteNode to its original state. + */ + public void resetRemoteId() { + setRemoteNode(origRemoteNode); + } + + + /** + * Pushes a response onto the queue of the appropriate request. + * @param msg The the received RESULT Message whose payload + * contains a Result object. + */ + public void pushResponse(Message msg) { + + Request req; + + try { + req = requests.get(msg.getId()); + } catch(IndexOutOfBoundsException e) { + /** LOG that an unexpected response arrived */ + return; + } + + OSRFObject payload = (OSRFObject) msg.get("payload"); + + req.pushResponse( + new Result( + payload.getString("status"), + payload.getInt("statusCode"), + payload.get("content") + ) + ); + } } diff --git a/src/java/org/opensrf/Message.java b/src/java/org/opensrf/Message.java index f1db9c7..35e3e59 100644 --- a/src/java/org/opensrf/Message.java +++ b/src/java/org/opensrf/Message.java @@ -4,19 +4,16 @@ import org.opensrf.util.*; public class Message implements OSRFSerializable { - /** Message types */ - public enum Type { - REQUEST, - STATUS, - RESULT, - CONNECT, - DISCONNECT, - }; + public static final String REQUEST = "REQUEST"; + public static final String STATUS = "STATUS"; + public static final String RESULT = "RESULT"; + public static final String CONNECT = "CONNECT"; + public static final String DISCONNECT = "DISCONNECT"; /** Message ID. This number is used to relate requests to responses */ private int id; - /** Type of message. */ - private Type type; + /** String of message. */ + private String type; /** message payload */ private Object payload; @@ -31,11 +28,11 @@ public class Message implements OSRFSerializable { * @param id This message's ID * @param type The type of message */ - public Message(int id, Type type) { + public Message(int id, String type) { setId(id); - setType(type); + setString(type); } - public Message(int id, Type type, Object payload) { + public Message(int id, String type, Object payload) { this(id, type); setPayload(payload); } @@ -44,7 +41,7 @@ public class Message implements OSRFSerializable { public int getId() { return id; } - public Type getType() { + public String getType() { return type; } public Object getPayload() { @@ -53,7 +50,7 @@ public class Message implements OSRFSerializable { public void setId(int id) { this.id = id; } - public void setType(Type type) { + public void setString(String type) { this.type = type; } public void setPayload(Object p) { diff --git a/src/java/org/opensrf/Request.java b/src/java/org/opensrf/Request.java index b646950..4c61ec5 100644 --- a/src/java/org/opensrf/Request.java +++ b/src/java/org/opensrf/Request.java @@ -2,6 +2,7 @@ package org.opensrf; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.List; +import java.util.Date; import org.opensrf.net.xmpp.XMPPException; public class Request { @@ -22,11 +23,49 @@ public class Request { resetTimeout = false; } + public Request(ClientSession ses, int id, String methodName) { + this(ses, id, new Method(methodName)); + } + public Request(ClientSession ses, int id, String methodName, List params) { this(ses, id, new Method(methodName, params)); } - public void send() throws XMPPException { - session.send(new Message(id, Message.Type.REQUEST, method)); + public void send() throws SessionException { + session.send(new Message(id, Message.REQUEST, method)); + } + + public Result recv(long millis) throws SessionException { + + Result result = null; + + if(millis < 0) { + session.waitForMessage(millis); + if((result = resultQueue.poll()) != null) + return result; + + } else { + + while(millis >= 0) { + long start = new Date().getTime(); + session.waitForMessage(millis); + millis -= new Date().getTime() - start; + if((result = resultQueue.poll()) != null) + return result; + } + } + + return null; + } + + public void pushResponse(Result result) { + resultQueue.offer(result); + } + + /** + * @return This request's ID + */ + public int getId() { + return id; } } diff --git a/src/java/org/opensrf/Session.java b/src/java/org/opensrf/Session.java index 2eb9aa9..01f1d21 100644 --- a/src/java/org/opensrf/Session.java +++ b/src/java/org/opensrf/Session.java @@ -3,6 +3,7 @@ import org.opensrf.util.JSONWriter; import org.opensrf.net.xmpp.*; import java.util.Map; import java.util.HashMap; +import java.util.Arrays; public abstract class Session { @@ -28,7 +29,7 @@ public abstract class Session { * In other words, each session has a unique thread, and all messages * in that session will carry this thread around as an indicator. */ - protected String thread; + private String thread; public Session() { connectState = ConnectState.DISCONNECTED; @@ -37,20 +38,20 @@ public abstract class Session { /** * Sends a Message to our remoteNode. */ - public void send(Message omsg) throws XMPPException { + public void send(Message omsg) throws SessionException { /** construct the XMPP message */ XMPPMessage xmsg = new XMPPMessage(); xmsg.setTo(remoteNode); xmsg.setThread(thread); - xmsg.setBody(new JSONWriter(omsg).write()); + xmsg.setBody(new JSONWriter(Arrays.asList(new Message[] {omsg})).write()); XMPPSession ses = XMPPSession.getGlobalSession(); try { XMPPSession.getGlobalSession().send(xmsg); } catch(XMPPException e) { - /* XXX log.. what else? */ connectState = ConnectState.DISCONNECTED; + throw new SessionException("Error sending message to " + remoteNode, e); } } @@ -59,12 +60,12 @@ public abstract class Session { * all received messages to the stack for processing * @param millis The number of milliseconds to wait for a message to arrive */ - public static void waitForMessage(long millis) { + public static void waitForMessage(long millis) throws SessionException { try { Stack.processXMPPMessage( XMPPSession.getGlobalSession().recv(millis)); } catch(XMPPException e) { - /* XXX log.. what else? */ + throw new SessionException("Error waiting for message", e); } } @@ -94,4 +95,37 @@ public abstract class Session { public String getRemoteNode() { return remoteNode; } + + + /** + * Get thread. + * @return thread as String. + */ + public String getThread() { + return thread; + } + + /** + * Set thread. + * @param thread the value to set. + */ + public void setThread(String thread) { + this.thread = thread; + } + + /** + * Get connectState. + * @return connectState as ConnectState. + */ + public ConnectState getConnectState() { + return connectState; + } + + /** + * Set connectState. + * @param connectState the value to set. + */ + public void setConnectState(ConnectState connectState) { + this.connectState = connectState; + } } diff --git a/src/java/org/opensrf/Stack.java b/src/java/org/opensrf/Stack.java index 8ee753d..58a39a5 100644 --- a/src/java/org/opensrf/Stack.java +++ b/src/java/org/opensrf/Stack.java @@ -10,6 +10,8 @@ public class Stack { public static void processXMPPMessage(XMPPMessage msg) { + if(msg == null) return; + Session ses = Session.findCachedSession(msg.getThread()); if(ses == null) { @@ -24,6 +26,7 @@ public class Stack { msgList = new JSONReader(msg.getBody()).readArray(); } catch(JSONException e) { /** XXX LOG error */ + e.printStackTrace(); return; } @@ -33,13 +36,18 @@ public class Stack { long start = new Date().getTime(); while(itr.hasNext()) { + /** Construct a Message object from the generic OSRFObject returned from parsing */ obj = (OSRFObject) itr.next(); - processOSRFMessage(ses, + + processOSRFMessage( + ses, new Message( - ((Integer) obj.get("threadTrace")).intValue(), - (Message.Type) obj.get("type"), - obj.get("payload"))); + obj.getInt("threadTrace"), + obj.getString("type"), + obj.get("payload") + ) + ); } /** LOG the duration */ @@ -47,14 +55,18 @@ public class Stack { public static void processOSRFMessage(Session ses, Message msg) { if( ses instanceof ClientSession ) - processServerResponse((ClientSession) ses, msg); + processResponse((ClientSession) ses, msg); else - processClientRequest((ServerSession) ses, msg); + processRequest((ServerSession) ses, msg); } - public static void processServerResponse(ClientSession session, Message msg) { + public static void processResponse(ClientSession session, Message msg) { + if(msg.RESULT.equals(msg.getType())) { + session.pushResponse(msg); + return; + } } - public static void processClientRequest(ServerSession session, Message msg) { + public static void processRequest(ServerSession session, Message msg) { } } diff --git a/src/java/org/opensrf/net/xmpp/XMPPSession.java b/src/java/org/opensrf/net/xmpp/XMPPSession.java index 68147aa..bcd304e 100644 --- a/src/java/org/opensrf/net/xmpp/XMPPSession.java +++ b/src/java/org/opensrf/net/xmpp/XMPPSession.java @@ -138,7 +138,8 @@ public class XMPPSession { public synchronized void send(XMPPMessage msg) throws XMPPException { checkConnected(); try { - outStream.write(msg.toXML().getBytes()); + String xml = msg.toXML(); + outStream.write(xml.getBytes()); } catch (Exception e) { throw new XMPPException(e.toString()); } diff --git a/src/java/org/opensrf/test/TestClient.java b/src/java/org/opensrf/test/TestClient.java new file mode 100644 index 0000000..3b9920f --- /dev/null +++ b/src/java/org/opensrf/test/TestClient.java @@ -0,0 +1,62 @@ +package org.opensrf.test; +import org.opensrf.*; +import org.opensrf.util.*; +import org.opensrf.net.xmpp.*; +import java.io.PrintStream; +import java.util.Map; + + +public class TestClient { + public static void main(String args[]) throws Exception { + + PrintStream out = System.out; + + try { + + /** setup the config parser */ + String configFile = args[0]; + Config config = new Config("/config/opensrf"); + config.parse(configFile); + Config.setConfig(config); + + /** Connect to jabber */ + String username = Config.getString("/username"); + String passwd = Config.getString("/passwd"); + String host = (String) Config.getFirst("/domains/domain"); + int port = Config.getInt("/port"); + XMPPSession xses = new XMPPSession(host, port); + xses.connect(username, passwd, "test-java-client"); + XMPPSession.setGlobalSession(xses); + + /** build the client session and send the request */ + ClientSession session = new ClientSession("opensrf.settings"); + Request request = session.request( + "opensrf.settings.host_config.get", + new String[] {args[1]} + ); + + Result result = request.recv(10000); + if(result == null) { + out.println("no result"); + return; + } + + out.println("status = " + result.getStatus()); + out.println("status code = " + result.getStatusCode()); + + out.println("setting config memcache server(s) = " + + new JSONWriter( + Utils.findPath( (Map) result.getContent(), + "/cache/global/servers/server") + ).write()); + + + } catch(ArrayIndexOutOfBoundsException e) { + out.println("usage: org.opensrf.test.TestClient "); + return; + } + } +} + + + diff --git a/src/java/org/opensrf/test/TestJSON.java b/src/java/org/opensrf/test/TestJSON.java index 75cb7eb..b19d408 100644 --- a/src/java/org/opensrf/test/TestJSON.java +++ b/src/java/org/opensrf/test/TestJSON.java @@ -37,7 +37,7 @@ public class TestJSON { System.out.println(new JSONWriter(map).write() + "\n"); - Message m = new Message(1, Message.Type.REQUEST); + Message m = new Message(1, Message.REQUEST); Method method = new Method("opensrf.settings.host_config.get"); method.addParam("app07.dev.gapines.org"); m.setPayload(method); diff --git a/src/java/org/opensrf/util/Config.java b/src/java/org/opensrf/util/Config.java index 6f430c4..388afc7 100644 --- a/src/java/org/opensrf/util/Config.java +++ b/src/java/org/opensrf/util/Config.java @@ -56,6 +56,10 @@ public class Config { } } + public static int getInt(String path) throws ConfigException { + return Integer.parseInt(getString(path)); + } + /** * Returns the configuration object found at the requested path. * @see org.opensrf.util.Utils.findPath for path description. diff --git a/src/java/org/opensrf/util/OSRFObject.java b/src/java/org/opensrf/util/OSRFObject.java index 6ea0db3..65a340b 100644 --- a/src/java/org/opensrf/util/OSRFObject.java +++ b/src/java/org/opensrf/util/OSRFObject.java @@ -46,4 +46,12 @@ public class OSRFObject extends HashMap implements OSRFSerializa public Object get(String field) { return super.get(field); } + + public String getString(String field) { + return (String) get(field); + } + + public int getInt(String field) { + return ((Integer) get(field)).intValue(); + } }