Java HTTP gateway interface
authorBill Erickson <berick@esilibrary.com>
Mon, 27 Feb 2012 23:05:07 +0000 (18:05 -0500)
committerDan Scott <dscott@laurentian.ca>
Sun, 29 Apr 2012 18:22:26 +0000 (14:22 -0400)
Supports sync and async requests.  Async requests support onResponse,
onComplete, and onError handlers.

Supports a max-threads value to limit the number of activately
communicating threads over any connection.  When max-threads is reached,
requests are queued and delivered as soon as there is room.

Note that since this is talking to the OpenSRF gateway and not the
translater, responses are simply collected and passed one at a time to
onResponse.  They are not streamed.  The goal of supporting onResponse
is to provide the same client API for both the gateway and translator.

Signed-off-by: Bill Erickson <berick@esilibrary.com>
Signed-off-by: Dan Scott <dscott@laurentian.ca>
src/java/org/opensrf/net/http/GatewayRequest.java [new file with mode: 0644]
src/java/org/opensrf/net/http/HttpConnection.java [new file with mode: 0644]
src/java/org/opensrf/net/http/HttpRequest.java [new file with mode: 0644]
src/java/org/opensrf/net/http/HttpRequestHandler.java [new file with mode: 0644]

diff --git a/src/java/org/opensrf/net/http/GatewayRequest.java b/src/java/org/opensrf/net/http/GatewayRequest.java
new file mode 100644 (file)
index 0000000..00631bf
--- /dev/null
@@ -0,0 +1,129 @@
+package org.opensrf.net.http;
+
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+import java.io.IOException;
+import java.io.BufferedInputStream;
+import java.io.OutputStreamWriter;
+import java.io.InputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URI;
+import java.net.HttpURLConnection;
+import java.lang.StringBuffer;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class GatewayRequest extends HttpRequest {
+
+    private boolean readComplete;
+
+    public GatewayRequest(HttpConnection conn, String service, Method method) {
+        super(conn, service, method);
+        readComplete = false;
+    }
+
+    public GatewayRequest send() {
+        try {
+
+            String postData = compilePostData(service, method);
+
+            urlConn = (HttpURLConnection) httpConn.url.openConnection();
+            urlConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); 
+            urlConn.setDoInput(true);
+            urlConn.setDoOutput(true);
+
+            OutputStreamWriter wr = new OutputStreamWriter(urlConn.getOutputStream());
+            wr.write(postData);
+            wr.flush();
+            wr.close();
+
+        } catch (java.io.IOException ex) {
+            failed = true;
+            failure = ex;
+        }
+
+        return this;
+    }
+
+    public Object recv() {
+
+        if (readComplete) 
+            return nextResponse();
+
+        try {
+
+            InputStream netStream = new BufferedInputStream(urlConn.getInputStream());
+            StringBuffer readBuf = new StringBuffer();
+
+            int bytesRead = 0;
+            byte[] buffer = new byte[1024];
+
+            while ((bytesRead = netStream.read(buffer)) != -1) {
+                readBuf.append(new String(buffer, 0, bytesRead));
+            }
+            
+            netStream.close();
+            urlConn = null;
+
+            Map<String,?> result = null;
+
+            try {
+                result = (Map<String, ?>) new JSONReader(readBuf.toString()).readObject();
+            } catch (org.opensrf.util.JSONException ex) {
+                ex.printStackTrace();
+                return null;
+            }
+
+            String status = result.get("status").toString(); 
+            if (!"200".equals(status)) {
+                failed = true;
+                // failure = <some new exception>
+            }
+
+             // gateway always returns a wrapper array with the full results set
+             responseList = (List) result.get("payload"); 
+
+        } catch (java.io.IOException ex) { 
+            failed = true;
+            failure = ex;
+        }
+
+        readComplete = true;
+        return nextResponse();
+    }
+
+    private String compilePostData(String service, Method method) {
+        URI uri = null;
+        StringBuffer postData = new StringBuffer();
+
+        postData.append("service=");
+        postData.append(service);
+        postData.append("&method=");
+        postData.append(method.getName());
+
+        List params = method.getParams();
+        Iterator itr = params.iterator();
+
+        while (itr.hasNext()) {
+            postData.append("&param=");
+            postData.append(new JSONWriter(itr.next()).write());
+        }
+
+        try {
+            // not using URLEncoder because it replaces ' ' with '+'.
+            uri = new URI("http", "", null, postData.toString(), null);
+        } catch (java.net.URISyntaxException ex) {
+            ex.printStackTrace(); 
+        }
+
+        return uri.getRawQuery();
+    }
+}
+
+
diff --git a/src/java/org/opensrf/net/http/HttpConnection.java b/src/java/org/opensrf/net/http/HttpConnection.java
new file mode 100644 (file)
index 0000000..32fdebc
--- /dev/null
@@ -0,0 +1,97 @@
+package org.opensrf.net.http;
+
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+
+/**
+ * Manages connection parameters and thread limiting for opensrf json gateway connections.
+ */
+
+public class HttpConnection {
+
+    /** Compiled URL object */
+    protected URL url;
+    /** Number of threads currently communicating with the server */
+    protected int activeThreads;
+    /** Queue of pending async requests */
+    protected Queue<HttpRequest> pendingThreadQueue;
+    /** maximum number of actively communicating threads allowed */
+    protected int maxThreads = 10;
+
+    public HttpConnection(String fullUrl) throws java.net.MalformedURLException {
+        activeThreads = 0;
+        pendingThreadQueue = new ConcurrentLinkedQueue();
+        url = new URL(fullUrl);
+    }
+
+    public int getMaxThreads() {
+        return maxThreads;
+    }
+
+    /** 
+     * Set the maximum number of actively communicating threads allowed 
+     */
+    public void setMaxThreads(int max) {
+        maxThreads = max;
+    }
+
+    /**
+     * Launches or queues an asynchronous request.
+     *
+     * If the maximum active thread count has not been reached,
+     * start a new thread and use it to send and receive the request.
+     * The response is passed to the request's HttpRequestHandler
+     * onComplete().  After complete, if the number of active threads
+     * is still lower than the max, one request will be pulled (if 
+     * present) from the async queue and fired.
+     *
+     * If there are too many active threads, the main request is
+     * pushed onto the async queue for later processing
+     */
+    protected void manageAsyncRequest(final HttpRequest request) {
+
+        if (activeThreads >= maxThreads) {
+            pendingThreadQueue.offer(request);
+            return;
+        }
+
+        activeThreads++;
+
+         //Send the request receive the response, fire off the next 
+         //thread if necessary, then pass the result to the handler
+        Runnable r = new Runnable() {
+            public void run() {
+
+                Object response;
+                request.send();
+
+                while ((response = request.recv()) != null) {
+                    if (request.handler != null) 
+                        request.handler.onResponse(request, response);
+                }
+
+                if (request.handler != null)
+                    request.handler.onComplete(request);
+
+                activeThreads--;
+
+                if (activeThreads < maxThreads) {
+                    try {
+                        manageAsyncRequest(pendingThreadQueue.remove());
+                    } catch (java.util.NoSuchElementException ex) {
+                        // may have been gobbled by another thread
+                    }
+                }
+            }
+        };
+
+        new Thread(r).start();
+    }
+}
+
+
diff --git a/src/java/org/opensrf/net/http/HttpRequest.java b/src/java/org/opensrf/net/http/HttpRequest.java
new file mode 100644 (file)
index 0000000..d91a127
--- /dev/null
@@ -0,0 +1,66 @@
+package org.opensrf.net.http;
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.net.HttpURLConnection;
+
+public abstract class HttpRequest {
+
+    protected String service;
+    protected Method method;
+    protected HttpURLConnection urlConn;
+    protected HttpConnection httpConn;
+    protected HttpRequestHandler handler;
+    protected List<Object> responseList;
+    protected Exception failure;
+    protected boolean failed;
+    protected boolean complete;
+
+    public HttpRequest() {
+        failed = false;
+        complete = false;
+        handler = null;
+        urlConn = null;
+    }
+
+    public HttpRequest(HttpConnection conn, String service, Method method) {
+        this();
+        this.httpConn = conn;
+        this.service = service;
+        this.method = method;
+    }
+
+    public void sendAsync(final HttpRequestHandler handler) {
+        this.handler = handler;
+        httpConn.manageAsyncRequest(this);
+    }
+
+    protected void pushResponse(Object response) {
+        if (responseList == null)
+            responseList = new LinkedList<Object>();
+        responseList.add(response);
+    }
+
+    protected List responses() {
+        return responseList;
+    }
+    
+    protected Object nextResponse() {
+        if (complete || failed) return null;
+        if (responseList.size() > 0)
+            return responseList.remove(0);
+        return null;
+    }
+
+    public Exception getFailure() {
+        return failure;
+    }
+
+    public abstract HttpRequest send();
+
+    public abstract Object recv();
+}
+
+
diff --git a/src/java/org/opensrf/net/http/HttpRequestHandler.java b/src/java/org/opensrf/net/http/HttpRequestHandler.java
new file mode 100644 (file)
index 0000000..9c0f9e5
--- /dev/null
@@ -0,0 +1,25 @@
+package org.opensrf.net.http;
+
+import java.util.List;
+
+/*
+ * Handler for async gateway responses.
+ */
+public abstract class HttpRequestHandler {
+
+    /**
+     * Called when all responses have been received.
+     *
+     * If discardResponses() returns true, will be passed null.
+     */
+    public void onComplete(HttpRequest request) {
+    }
+
+    /**
+     * Called with each response received from the server.
+     * 
+     * @param payload the value returned from the server.
+     */
+    public void onResponse(HttpRequest request, Object response) {
+    }
+}