import java.util.List;
import java.util.ArrayList;
import java.util.Random;
+import java.util.Arrays;
import org.opensrf.util.*;
private List<Request> requests;
/**
- * @param service The remove service to communicate with
+ * Creates a new client session. Initializes the
+ * @param service The remove service.
*/
public ClientSession(String service) throws ConfigException {
this.service = service;
- domain = (String) Config.getFirst("/domain/domains");
- router = (String) Config.getString("/router_name");
+
+ /** generate the remote node string */
+ domain = (String) Config.getFirst("/domains/domain");
+ router = Config.getString("/router_name");
setRemoteNode(router + "@" + domain + "/" + service);
origRemoteNode = getRemoteNode();
- requests = new ArrayList<Request>();
- nextId = 0;
+
+
+ /* create a random thread */
long time = new Date().getTime();
Random rand = new Random(time);
- thread = rand.nextInt()+""+rand.nextInt()+""+time;
+ setThread(rand.nextInt()+""+rand.nextInt()+""+time);
+
+ requests = new ArrayList<Request>();
+ nextId = 0;
cacheSession();
}
+
+ /**
+ * Creates a new request to send to our remote service.
+ * @param method The method API name
+ * @param params The list of method parameters
+ * @return The request object.
+ */
+ public Request request(String method, List<Object> params) throws SessionException {
+ return request(new Request(this, nextId++, method, params));
+ }
+
+ /**
+ * Creates a new request to send to our remote service.
+ * @param method The method API name
+ * @param params The list of method parameters
+ * @return The request object.
+ */
+ public Request request(String method, Object[] params) throws SessionException {
+ return request(new Request(this, nextId++, method, Arrays.asList(params)));
+ }
+
+
+ /**
+ * Creates a new request to send to our remote service.
+ * @param method The method API name
+ * @return The request object.
+ */
+ public Request request(String method) throws SessionException {
+ return request(new Request(this, nextId++, method));
+ }
+
+
+ public Request request(Request req) throws SessionException {
+ if(getConnectState() != ConnectState.CONNECTED)
+ resetRemoteId();
+ //requests.set(req.getId(), req);
+ requests.add(req);
+ req.send();
+ return req;
+ }
+
+
+ /**
+ * Resets the remoteNode to its original state.
+ */
+ public void resetRemoteId() {
+ setRemoteNode(origRemoteNode);
+ }
+
+
+ /**
+ * Pushes a response onto the queue of the appropriate request.
+ * @param msg The the received RESULT Message whose payload
+ * contains a Result object.
+ */
+ public void pushResponse(Message msg) {
+
+ Request req;
+
+ try {
+ req = requests.get(msg.getId());
+ } catch(IndexOutOfBoundsException e) {
+ /** LOG that an unexpected response arrived */
+ return;
+ }
+
+ OSRFObject payload = (OSRFObject) msg.get("payload");
+
+ req.pushResponse(
+ new Result(
+ payload.getString("status"),
+ payload.getInt("statusCode"),
+ payload.get("content")
+ )
+ );
+ }
}
public class Message implements OSRFSerializable {
- /** Message types */
- public enum Type {
- REQUEST,
- STATUS,
- RESULT,
- CONNECT,
- DISCONNECT,
- };
+ public static final String REQUEST = "REQUEST";
+ public static final String STATUS = "STATUS";
+ public static final String RESULT = "RESULT";
+ public static final String CONNECT = "CONNECT";
+ public static final String DISCONNECT = "DISCONNECT";
/** Message ID. This number is used to relate requests to responses */
private int id;
- /** Type of message. */
- private Type type;
+ /** String of message. */
+ private String type;
/** message payload */
private Object payload;
* @param id This message's ID
* @param type The type of message
*/
- public Message(int id, Type type) {
+ public Message(int id, String type) {
setId(id);
- setType(type);
+ setString(type);
}
- public Message(int id, Type type, Object payload) {
+ public Message(int id, String type, Object payload) {
this(id, type);
setPayload(payload);
}
public int getId() {
return id;
}
- public Type getType() {
+ public String getType() {
return type;
}
public Object getPayload() {
public void setId(int id) {
this.id = id;
}
- public void setType(Type type) {
+ public void setString(String type) {
this.type = type;
}
public void setPayload(Object p) {
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.List;
+import java.util.Date;
import org.opensrf.net.xmpp.XMPPException;
public class Request {
resetTimeout = false;
}
+ public Request(ClientSession ses, int id, String methodName) {
+ this(ses, id, new Method(methodName));
+ }
+
public Request(ClientSession ses, int id, String methodName, List<Object> params) {
this(ses, id, new Method(methodName, params));
}
- public void send() throws XMPPException {
- session.send(new Message(id, Message.Type.REQUEST, method));
+ public void send() throws SessionException {
+ session.send(new Message(id, Message.REQUEST, method));
+ }
+
+ public Result recv(long millis) throws SessionException {
+
+ Result result = null;
+
+ if(millis < 0) {
+ session.waitForMessage(millis);
+ if((result = resultQueue.poll()) != null)
+ return result;
+
+ } else {
+
+ while(millis >= 0) {
+ long start = new Date().getTime();
+ session.waitForMessage(millis);
+ millis -= new Date().getTime() - start;
+ if((result = resultQueue.poll()) != null)
+ return result;
+ }
+ }
+
+ return null;
+ }
+
+ public void pushResponse(Result result) {
+ resultQueue.offer(result);
+ }
+
+ /**
+ * @return This request's ID
+ */
+ public int getId() {
+ return id;
}
}
import org.opensrf.net.xmpp.*;
import java.util.Map;
import java.util.HashMap;
+import java.util.Arrays;
public abstract class Session {
* In other words, each session has a unique thread, and all messages
* in that session will carry this thread around as an indicator.
*/
- protected String thread;
+ private String thread;
public Session() {
connectState = ConnectState.DISCONNECTED;
/**
* Sends a Message to our remoteNode.
*/
- public void send(Message omsg) throws XMPPException {
+ public void send(Message omsg) throws SessionException {
/** construct the XMPP message */
XMPPMessage xmsg = new XMPPMessage();
xmsg.setTo(remoteNode);
xmsg.setThread(thread);
- xmsg.setBody(new JSONWriter(omsg).write());
+ xmsg.setBody(new JSONWriter(Arrays.asList(new Message[] {omsg})).write());
XMPPSession ses = XMPPSession.getGlobalSession();
try {
XMPPSession.getGlobalSession().send(xmsg);
} catch(XMPPException e) {
- /* XXX log.. what else? */
connectState = ConnectState.DISCONNECTED;
+ throw new SessionException("Error sending message to " + remoteNode, e);
}
}
* all received messages to the stack for processing
* @param millis The number of milliseconds to wait for a message to arrive
*/
- public static void waitForMessage(long millis) {
+ public static void waitForMessage(long millis) throws SessionException {
try {
Stack.processXMPPMessage(
XMPPSession.getGlobalSession().recv(millis));
} catch(XMPPException e) {
- /* XXX log.. what else? */
+ throw new SessionException("Error waiting for message", e);
}
}
public String getRemoteNode() {
return remoteNode;
}
+
+
+ /**
+ * Get thread.
+ * @return thread as String.
+ */
+ public String getThread() {
+ return thread;
+ }
+
+ /**
+ * Set thread.
+ * @param thread the value to set.
+ */
+ public void setThread(String thread) {
+ this.thread = thread;
+ }
+
+ /**
+ * Get connectState.
+ * @return connectState as ConnectState.
+ */
+ public ConnectState getConnectState() {
+ return connectState;
+ }
+
+ /**
+ * Set connectState.
+ * @param connectState the value to set.
+ */
+ public void setConnectState(ConnectState connectState) {
+ this.connectState = connectState;
+ }
}
public static void processXMPPMessage(XMPPMessage msg) {
+ if(msg == null) return;
+
Session ses = Session.findCachedSession(msg.getThread());
if(ses == null) {
msgList = new JSONReader(msg.getBody()).readArray();
} catch(JSONException e) {
/** XXX LOG error */
+ e.printStackTrace();
return;
}
long start = new Date().getTime();
while(itr.hasNext()) {
+
/** Construct a Message object from the generic OSRFObject returned from parsing */
obj = (OSRFObject) itr.next();
- processOSRFMessage(ses,
+
+ processOSRFMessage(
+ ses,
new Message(
- ((Integer) obj.get("threadTrace")).intValue(),
- (Message.Type) obj.get("type"),
- obj.get("payload")));
+ obj.getInt("threadTrace"),
+ obj.getString("type"),
+ obj.get("payload")
+ )
+ );
}
/** LOG the duration */
public static void processOSRFMessage(Session ses, Message msg) {
if( ses instanceof ClientSession )
- processServerResponse((ClientSession) ses, msg);
+ processResponse((ClientSession) ses, msg);
else
- processClientRequest((ServerSession) ses, msg);
+ processRequest((ServerSession) ses, msg);
}
- public static void processServerResponse(ClientSession session, Message msg) {
+ public static void processResponse(ClientSession session, Message msg) {
+ if(msg.RESULT.equals(msg.getType())) {
+ session.pushResponse(msg);
+ return;
+ }
}
- public static void processClientRequest(ServerSession session, Message msg) {
+ public static void processRequest(ServerSession session, Message msg) {
}
}
public synchronized void send(XMPPMessage msg) throws XMPPException {
checkConnected();
try {
- outStream.write(msg.toXML().getBytes());
+ String xml = msg.toXML();
+ outStream.write(xml.getBytes());
} catch (Exception e) {
throw new XMPPException(e.toString());
}
--- /dev/null
+package org.opensrf.test;
+import org.opensrf.*;
+import org.opensrf.util.*;
+import org.opensrf.net.xmpp.*;
+import java.io.PrintStream;
+import java.util.Map;
+
+
+public class TestClient {
+ public static void main(String args[]) throws Exception {
+
+ PrintStream out = System.out;
+
+ try {
+
+ /** setup the config parser */
+ String configFile = args[0];
+ Config config = new Config("/config/opensrf");
+ config.parse(configFile);
+ Config.setConfig(config);
+
+ /** Connect to jabber */
+ String username = Config.getString("/username");
+ String passwd = Config.getString("/passwd");
+ String host = (String) Config.getFirst("/domains/domain");
+ int port = Config.getInt("/port");
+ XMPPSession xses = new XMPPSession(host, port);
+ xses.connect(username, passwd, "test-java-client");
+ XMPPSession.setGlobalSession(xses);
+
+ /** build the client session and send the request */
+ ClientSession session = new ClientSession("opensrf.settings");
+ Request request = session.request(
+ "opensrf.settings.host_config.get",
+ new String[] {args[1]}
+ );
+
+ Result result = request.recv(10000);
+ if(result == null) {
+ out.println("no result");
+ return;
+ }
+
+ out.println("status = " + result.getStatus());
+ out.println("status code = " + result.getStatusCode());
+
+ out.println("setting config memcache server(s) = " +
+ new JSONWriter(
+ Utils.findPath( (Map) result.getContent(),
+ "/cache/global/servers/server")
+ ).write());
+
+
+ } catch(ArrayIndexOutOfBoundsException e) {
+ out.println("usage: org.opensrf.test.TestClient <osrfConfigFile> <domain>");
+ return;
+ }
+ }
+}
+
+
+
System.out.println(new JSONWriter(map).write() + "\n");
- Message m = new Message(1, Message.Type.REQUEST);
+ Message m = new Message(1, Message.REQUEST);
Method method = new Method("opensrf.settings.host_config.get");
method.addParam("app07.dev.gapines.org");
m.setPayload(method);
}
}
+ public static int getInt(String path) throws ConfigException {
+ return Integer.parseInt(getString(path));
+ }
+
/**
* Returns the configuration object found at the requested path.
* @see org.opensrf.util.Utils.findPath for path description.
public Object get(String field) {
return super.get(field);
}
+
+ public String getString(String field) {
+ return (String) get(field);
+ }
+
+ public int getInt(String field) {
+ return ((Integer) get(field)).intValue();
+ }
}