package org.opensrf;
import java.util.Date;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.ArrayList;
import java.util.Random;
import java.util.Arrays;
/** The remote service to communicate with */
private String service;
+ /** OpenSRF domain */
private String domain;
+ /** Router name */
private String router;
+
+ /**
+ * original remote node. The current remote node will change based
+ * on server responses. This is used to reset the remote node to
+ * its original state.
+ */
private String origRemoteNode;
+ /** The next request id */
private int nextId;
- private List<Request> requests;
+ /** The requests this session has sent */
+ private Map<Integer, Request> requests;
/**
* Creates a new client session. Initializes the
origRemoteNode = getRemoteNode();
- /* create a random thread */
+ /** create a random thread */
long time = new Date().getTime();
Random rand = new Random(time);
setThread(rand.nextInt()+""+rand.nextInt()+""+time);
- requests = new ArrayList<Request>();
nextId = 0;
+ requests = new HashMap<Integer, Request>();
cacheSession();
}
}
- public Request request(Request req) throws SessionException {
+ private Request request(Request req) throws SessionException {
if(getConnectState() != ConnectState.CONNECTED)
resetRemoteId();
- //requests.set(req.getId(), req);
- requests.add(req);
+ requests.put(new Integer(req.getId()), req);
req.send();
return req;
}
/**
- * Pushes a response onto the queue of the appropriate request.
- * @param msg The the received RESULT Message whose payload
- * contains a Result object.
+ * Pushes a response onto the result queue of the appropriate request.
+ * @param msg The received RESULT Message
*/
public void pushResponse(Message msg) {
- Request req;
-
- try {
- req = requests.get(msg.getId());
- } catch(IndexOutOfBoundsException e) {
- /** LOG that an unexpected response arrived */
+ Request req = requests.get(new Integer(msg.getId()));
+ if(req == null) {
+ /** LOG that we've received a result to a non-existant request */
return;
}
-
OSRFObject payload = (OSRFObject) msg.get("payload");
+ /** build a result and push it onto the request's result queue */
req.pushResponse(
new Result(
payload.getString("status"),
)
);
}
+
+ /**
+ * Removes a request for this session's request set
+ */
+ public void cleanupRequest(int reqId) {
+ requests.remove(new Integer(reqId));
+ }
}
public class Message implements OSRFSerializable {
+ /** Message types */
public static final String REQUEST = "REQUEST";
public static final String STATUS = "STATUS";
public static final String RESULT = "RESULT";
/** Message ID. This number is used to relate requests to responses */
private int id;
- /** String of message. */
+ /** type of message. */
private String type;
/** message payload */
private Object payload;
- /** Go ahead and register the Message object */
+ /** Create a registry for the osrfMessage object */
private static OSRFRegistry registry =
OSRFRegistry.registerObject(
"osrfMessage",
setId(id);
setString(type);
}
+
+ /**
+ * @param id This message's ID
+ * @param type The type of message
+ * @param payload The message payload
+ */
public Message(int id, String type, Object payload) {
this(id, type);
setPayload(payload);
return null;
}
+ /**
+ * @return The osrfMessage registry.
+ */
public OSRFRegistry getRegistry() {
return registry;
}
public class Method extends OSRFObject {
+ /** The method API name */
private String name;
+ /** The ordered list of method params */
private List<Object> params;
+ /** Create a registry for the osrfMethod object */
private static OSRFRegistry registry =
OSRFRegistry.registerObject(
"osrfMethod",
OSRFRegistry.WireProtocol.HASH,
new String[] {"method", "params"});
+ /**
+ * @param name The method API name
+ */
public Method(String name) {
this.name = name;
this.params = new ArrayList<Object>(8);
}
+ /**
+ * @param name The method API name
+ * @param params The ordered list of params
+ */
public Method(String name, List<Object> params) {
this.name = name;
this.params = params;
}
+ /**
+ * @return The method API name
+ */
public String getName() {
return name;
}
+ /**
+ * @return The ordered list of params
+ */
public List<Object> getParams() {
return params;
}
return null;
}
+ /**
+ * @return The osrfMethod registry.
+ */
public OSRFRegistry getRegistry() {
return registry;
}
public class Request {
+ /** This request's controlling session */
private ClientSession session;
+ /** The method */
private Method method;
+ /** The ID of this request */
private int id;
+ /** Queue of Results */
private Queue<Result> resultQueue;
+ /** If true, the receive timeout for this request should be reset */
private boolean resetTimeout;
+
+ /** If true, the server has indicated that this request is complete. */
private boolean complete;
+ /**
+ * @param ses The controlling session for this request.
+ * @param id This request's ID.
+ * @param method The requested method.
+ */
public Request(ClientSession ses, int id, Method method) {
this.session = ses;
this.id = id;
resetTimeout = false;
}
+ /**
+ * @param ses The controlling session for this request.
+ * @param id This request's ID.
+ * @param methodName The requested method's API name.
+ */
public Request(ClientSession ses, int id, String methodName) {
this(ses, id, new Method(methodName));
}
+ /**
+ * @param ses The controlling session for this request.
+ * @param id This request's ID.
+ * @param methodName The requested method's API name.
+ * @param params The list of request params
+ */
public Request(ClientSession ses, int id, String methodName, List<Object> params) {
this(ses, id, new Method(methodName, params));
}
+ /**
+ * Sends the request to the server.
+ */
public void send() throws SessionException {
session.send(new Message(id, Message.REQUEST, method));
}
+ /**
+ * Receives the next result for this request. This method
+ * will wait up to the specified number of milliseconds for
+ * a response.
+ * @param millis Number of milliseconds to wait for a result. If
+ * negative, this method will wait indefinitely.
+ * @return The result or null if none arrives in time
+ */
public Result recv(long millis) throws SessionException {
Result result = null;
if(millis < 0) {
+ /** wait potentially forever for a result to arrive */
session.waitForMessage(millis);
if((result = resultQueue.poll()) != null)
return result;
} else {
while(millis >= 0) {
+
+ /** wait up to millis milliseconds for a result. waitForMessage()
+ * will return if a response to any request arrives, so we keep track
+ * of how long we've been waiting in total for a response to
+ * this request
+ */
long start = new Date().getTime();
session.waitForMessage(millis);
millis -= new Date().getTime() - start;
return null;
}
+ /**
+ * Pushes a result onto the result queue
+ * @param result The result to push
+ */
public void pushResponse(Result result) {
resultQueue.offer(result);
}
public int getId() {
return id;
}
+
+ /**
+ * Removes this request from the controlling session's request set
+ */
+ public void cleanup() {
+ session.cleanupRequest(id);
+ }
}
new String[] {"status", "statusCode", "content"});
+ /**
+ * @param status The status message for this result
+ * @param statusCode The status code
+ * @param content The content of the result
+ */
public Result(String status, int statusCode, Object content) {
this.status = status;
this.statusCode = statusCode;
return null;
}
+ /**
+ * @return The osrfMethod registry.
+ */
public OSRFRegistry getRegistry() {
return registry;
}
/** the current connection state */
private ConnectState connectState;
- /** The (jabber) address of the remote party we are communicating with */
+ /** The address of the remote party we are communicating with */
private String remoteNode;
/**
return sessionCache.get(thread);
}
+ /**
+ * Puts this session into session cache.
+ */
protected void cacheSession() {
sessionCache.put(thread, this);
}
+ /**
+ * Sets the remote address
+ * @param nodeName The name of the remote node.
+ */
public void setRemoteNode(String nodeName) {
remoteNode = nodeName;
}
+ /**
+ * @return The remote node
+ */
public String getRemoteNode() {
return remoteNode;
}
if(msg == null) return;
+ /** fetch this session from the cache */
Session ses = Session.findCachedSession(msg.getThread());
if(ses == null) {
OSRFObject obj = null;
long start = new Date().getTime();
+ /** cycle through the messages and push them up the stack */
while(itr.hasNext()) {
- /** Construct a Message object from the generic OSRFObject returned from parsing */
+ /** Construct a Message object from the parsed generic OSRFObject */
obj = (OSRFObject) itr.next();
processOSRFMessage(
/** LOG the duration */
}
- public static void processOSRFMessage(Session ses, Message msg) {
+ private static void processOSRFMessage(Session ses, Message msg) {
if( ses instanceof ClientSession )
processResponse((ClientSession) ses, msg);
else
processRequest((ServerSession) ses, msg);
}
- public static void processResponse(ClientSession session, Message msg) {
+ /**
+ * Process a server response
+ */
+ private static void processResponse(ClientSession session, Message msg) {
if(msg.RESULT.equals(msg.getType())) {
session.pushResponse(msg);
return;
}
}
- public static void processRequest(ServerSession session, Message msg) {
+ /**
+ * Process a client request
+ */
+ private static void processRequest(ServerSession session, Message msg) {
}
}
import java.io.*;
-
-/*
- * uncomment to use the DOM serialization code...
-
-import org.w3c.dom.*;
-import org.apache.xerces.dom.DocumentImpl;
-import org.apache.xerces.dom.DOMImplementationImpl;
-import org.apache.xml.serialize.OutputFormat;
-import org.apache.xml.serialize.Serializer;
-import org.apache.xml.serialize.SerializerFactory;
-import org.apache.xml.serialize.XMLSerializer;
-*/
-
-
/**
* Models a single XMPP message.
*/
}
}
}
-
-
-
- /**
- * This is a DOM implementataion of message serialization.
- * I'm inclined to think the stringbuffer version is faster, but
- * I have no proof.
- */
- /*
- public String __toXML() {
-
- Document doc = new DocumentImpl();
- Element message = doc.createElement("message");
- Element body = doc.createElement("body");
- Element thread = doc.createElement("thread");
-
- doc.appendChild(message);
- message.setAttribute("to", getTo());
- message.setAttribute("from", getFrom());
- message.appendChild(body);
- message.appendChild(thread);
-
- body.appendChild(doc.createTextNode(getBody()));
- thread.appendChild(doc.createTextNode(getThread()));
-
- XMLSerializer serializer = new XMLSerializer();
- StringWriter strWriter = new StringWriter();
- OutputFormat outFormat = new OutputFormat();
-
- outFormat.setEncoding("UTF-8");
- outFormat.setVersion("1.0");
- outFormat.setIndenting(false);
- outFormat.setOmitXMLDeclaration(true);
-
- serializer.setOutputCharStream(strWriter);
- serializer.setOutputFormat(outFormat);
-
- try {
- serializer.serialize(doc);
- } catch(IOException ioe) {
- }
- return strWriter.toString();
- }
- */
}
this.port = port;
}
+ /**
+ * Returns the global, process-wide session
+ */
public static XMPPSession getGlobalSession() {
return globalSession;
}
+ /**
+ * Sets the global, process-wide section
+ */
public static void setGlobalSession(XMPPSession ses) {
globalSession = ses;
}
import org.opensrf.net.xmpp.*;
import java.io.PrintStream;
import java.util.Map;
+import java.util.Date;
public class TestClient {
private static Config config;
/** The object form of the parsed config */
private Map configObject;
+ /**
+ * The log parsing context. This is used as a prefix to the
+ * config item search path. This allows config XML chunks to
+ * be inserted into arbitrary XML files.
+ */
private String context;
+ /**
+ * @param context The config context
+ */
public Config(String context) {
this.context = context;
}
}
}
+ /**
+ * Gets the int value at the given path
+ * @param path The search path
+ */
public static int getInt(String path) throws ConfigException {
- return Integer.parseInt(getString(path));
+ try {
+ return Integer.parseInt(getString(path));
+ } catch(Exception e) {
+ throw new
+ ConfigException("No config int found at " + path);
+ }
}
/**
}
}
+ /**
+ * Returns the first item in the list found at the given path. If
+ * no list is found, ConfigException is thrown.
+ * @param path The search path
+ */
public static Object getFirst(String path) throws ConfigException {
Object obj = get(path);
if(obj instanceof List)
package org.opensrf.util;
+/**
+ * Used to indicate JSON parsing errors
+ */
public class JSONException extends Exception {
public JSONException(String s) {
super(s);
/** Special OpenSRF serializable object payload key */
public static final String JSON_PAYLOAD_KEY = "__p";
+ /** The JSON string to parser */
private String json;
+ /**
+ * @param The JSON to parse
+ */
public JSONReader(String json) {
this.json = json;
}
}
}
+ /**
+ * Assumes that a JSON array will be read. Returns
+ * the resulting array as a list.
+ */
public List<?> readArray() throws JSONException {
Object o = read();
try {
}
}
+ /**
+ * Assumes that a JSON object will be read. Returns
+ * the resulting object as a map.
+ */
public Map<?,?> readObject() throws JSONException {
Object o = read();
try {
}
+ /**
+ * Recurse through the object and turn items into maps, lists, etc.
+ */
private Object readSubObject(Object obj) throws JSONException {
if( obj == null ||
/** The object to serialize to JSON */
private Object obj;
+ /**
+ * @param obj The object to encode
+ */
public JSONWriter(Object obj) {
this.obj = obj;
}
write(obj, sb);
}
+ /**
+ * Encodes the object as JSON into the provided buffer
+ */
public void write(Object obj, StringBuffer sb) {
/** JSON null */
if( reg.getWireProtocol() == OSRFRegistry.WireProtocol.ARRAY ) {
+ /** encode arrays as lists */
List<Object> list = new ArrayList<Object>(fields.length);
for(String s : fields)
list.add(obj.get(s));
} else {
+ /** encode hashes as maps */
Map<String, Object> subMap = new HashMap<String, Object>();
for(String s : fields)
subMap.put(s, obj.get(s));
public OSRFObject() {
}
-
- /*
- public OSRFObject(String netClass, Map map) {
- super(map);
- registry = OSRFRegistry.getRegistry(netClass);
- }
- */
-
/**
* Creates a new object with the provided registry
*/
registry = reg;
}
-
/**
* @return This object's registry
*/
return super.get(field);
}
+ /** Returns the string value found at the given field */
public String getString(String field) {
return (String) get(field);
}
+ /** Returns the int value found at the given field */
public int getInt(String field) {
+ Object o = get(field);
+ if(o instanceof String)
+ return Integer.parseInt((String) o);
return ((Integer) get(field)).intValue();
}
}
return -1;
}
+ /** Returns the wire protocol of this object */
public WireProtocol getWireProtocol() {
return this.wireProtocol;
}
+ /** Returns the netClass ("hint") of this object */
public String getNetClass() {
return this.netClass;
}
/**
- * Creates a new registry object */
+ * Creates a new registry object.
+ * @param netClass The network class/hint
+ * @param wireProtocol The wire protocol
+ * @param fields The array of field names. For array-based objects,
+ * the fields array must be sorted in accordance with the sorting
+ * of the objects in the array.
+ */
public OSRFRegistry(String netClass, WireProtocol wireProtocol, String fields[]) {
this.netClass = netClass;
this.wireProtocol = wireProtocol;
*/
public abstract OSRFRegistry getRegistry();
+ /**
+ * Returns the object found at the given field
+ */
public abstract Object get(String field);
}