more java gateway
authorBill Erickson <berick@esilibrary.com>
Wed, 29 Feb 2012 14:18:44 +0000 (09:18 -0500)
committerBill Erickson <berick@esilibrary.com>
Thu, 15 Mar 2012 20:23:51 +0000 (16:23 -0400)
Signed-off-by: Bill Erickson <berick@esilibrary.com>
src/java/org/opensrf/net/http/GatewayConnection.java [deleted file]
src/java/org/opensrf/net/http/GatewayRequest.java
src/java/org/opensrf/net/http/GatewayRequestHandler.java [deleted file]
src/java/org/opensrf/net/http/HttpConnection.java [new file with mode: 0644]
src/java/org/opensrf/net/http/HttpRequest.java [new file with mode: 0644]
src/java/org/opensrf/net/http/HttpRequestHandler.java [new file with mode: 0644]

diff --git a/src/java/org/opensrf/net/http/GatewayConnection.java b/src/java/org/opensrf/net/http/GatewayConnection.java
deleted file mode 100644 (file)
index ac62169..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-package org.opensrf.net.http;
-
-import java.net.URL;
-import java.net.MalformedURLException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.opensrf.*;
-import org.opensrf.util.*;
-
-
-/**
- * Manages connection parameters and thread limiting for opensrf json gateway connections.
- */
-
-public class GatewayConnection {
-
-    /** http or https */
-    private String proto;
-    /** server name */
-    private String host;
-    /** URL path to the gateway, defaults to defaultPath */
-    private String path;
-    /** Compiled URL object */
-    private URL url;
-    /** Number of threads currently communicating with the server */
-    protected int activeThreads;
-    /** Queue of pending async requests */
-    protected Queue<GatewayRequest> pendingThreadQueue;
-    /** maximum number of actively communicating threads allowed */
-    protected int maxThreads = 10;
-    /** URL path for the gateway */
-    public static final String defaultPath = "/osrf-gateway-v1";
-
-    public GatewayConnection(String proto, String host) {
-        this.proto = proto;
-        this.host = host;
-        this.path = defaultPath;
-        activeThreads = 0;
-        pendingThreadQueue = new ConcurrentLinkedQueue();
-    }
-
-    public GatewayConnection(String proto, String host, String path) {
-        this(proto, host);
-        this.path = path; 
-    }
-
-    public int getMaxThreads() {
-        return maxThreads;
-    }
-
-    /** 
-     * Set the maximum number of actively communicating threads allowed 
-     */
-    public void setMaxThreads(int max) {
-        maxThreads = max;
-    }
-
-    protected URL getUrl() throws java.net.MalformedURLException {
-        try {
-            if (url == null) {
-                url = new URL(proto + "://" + host + path);
-            }
-        } catch (java.net.MalformedURLException ex) {
-            ex.printStackTrace(); 
-        }
-        return url;
-    }
-
-    public GatewayRequest request(String service, Method method) {
-        return new GatewayRequest(this, service, method);
-    }
-
-    /**
-     * Launches or queues an asynchronous request.
-     *
-     * If the maximum active thread count has not been reached,
-     * start a new thread and use it to send and receive the request.
-     * The response is passed to the request's GatewayRequestHandler
-     * onComplete().  After complete, if the number of active threads
-     * is still lower than the max, one request will be pulled (if 
-     * present) from the async queue and fired.
-     *
-     * If there are too many active threads, the main request is
-     * pushed onto the async queue for later processing
-     */
-    protected void manageAsyncRequest(final GatewayRequest request) {
-
-        if (activeThreads >= maxThreads) {
-            pendingThreadQueue.offer(request);
-            return;
-        }
-
-        activeThreads++;
-
-         //Send the request receive the response, fire off the next 
-         //thread if necessary, then pass the result to the handler
-        Runnable r = new Runnable() {
-            public void run() {
-
-                Object response = request.send().recv();
-                activeThreads--;
-
-                if (activeThreads < maxThreads) {
-                    try {
-                        manageAsyncRequest(pendingThreadQueue.remove());
-                    } catch (java.util.NoSuchElementException ex) {}
-                }
-
-                if (request.handler != null)
-                    request.handler.onComplete(response);
-            }
-        };
-
-        new Thread(r).start();
-    }
-}
-
-
index 5226fa5..bf9e071 100644 (file)
@@ -13,42 +13,27 @@ import java.net.URI;
 import java.net.HttpURLConnection;
 import java.lang.StringBuffer;
 import java.util.List;
+import java.util.LinkedList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class GatewayRequest {
-
-    private String service;
-    private Method method;
-    private HttpURLConnection urlConn;
-    private GatewayConnection gConn;
-    protected GatewayRequestHandler handler;
-
-    /**
-     * Invoked from GatewayConnection
-     */
-    protected GatewayRequest(GatewayConnection conn, String service, Method method) {
-        this.handler = null;
-        this.urlConn = null;
-        this.gConn = conn;
-        this.service = service;
-        this.method = method;
-    }
+public class GatewayRequest extends HttpRequest {
+
+    private List responseList;
 
-    public void sendAsync(final GatewayRequestHandler handler) {
-        this.handler = handler;
-        gConn.manageAsyncRequest(this);
+    public GatewayRequest(HttpConnection conn, String service, Method method) {
+        super(conn, service, method);
+        responseList = new LinkedList(); // TODO
     }
 
     public GatewayRequest send() {
         try {
-            URL url = gConn.getUrl();
             String postData = compilePostData(service, method);
 
-            urlConn = (HttpURLConnection) url.openConnection();
+            urlConn = (HttpURLConnection) httpConn.url.openConnection();
             urlConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); 
             urlConn.setDoInput(true);
             urlConn.setDoOutput(true);
@@ -67,6 +52,8 @@ public class GatewayRequest {
 
     public Object recv() {
 
+        if (complete) return null;
+
         Object payload = null;
         StringBuffer readBuf = new StringBuffer();
 
@@ -79,6 +66,9 @@ public class GatewayRequest {
             while ((bytesRead = netStream.read(buffer)) != -1) {
                 readBuf.append(new String(buffer, 0, bytesRead));
             }
+            
+            netStream.close();
+            urlConn = null;
 
             Map<String,?> result = null;
 
@@ -96,10 +86,12 @@ public class GatewayRequest {
 
             payload = result.get("payload"); 
 
+
         } catch (Exception ex) { // TODO inspect more closely
             ex.printStackTrace(); 
         }
 
+        complete = true;
         return payload;
     }
 
diff --git a/src/java/org/opensrf/net/http/GatewayRequestHandler.java b/src/java/org/opensrf/net/http/GatewayRequestHandler.java
deleted file mode 100644 (file)
index 48ceeda..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.opensrf.net.http;
-
-/*
- * Handler for async gateway request responses
- */
-
-public interface GatewayRequestHandler {
-    public void onComplete(Object payload);
-}
diff --git a/src/java/org/opensrf/net/http/HttpConnection.java b/src/java/org/opensrf/net/http/HttpConnection.java
new file mode 100644 (file)
index 0000000..32fdebc
--- /dev/null
@@ -0,0 +1,97 @@
+package org.opensrf.net.http;
+
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+
+/**
+ * Manages connection parameters and thread limiting for opensrf json gateway connections.
+ */
+
+public class HttpConnection {
+
+    /** Compiled URL object */
+    protected URL url;
+    /** Number of threads currently communicating with the server */
+    protected int activeThreads;
+    /** Queue of pending async requests */
+    protected Queue<HttpRequest> pendingThreadQueue;
+    /** maximum number of actively communicating threads allowed */
+    protected int maxThreads = 10;
+
+    public HttpConnection(String fullUrl) throws java.net.MalformedURLException {
+        activeThreads = 0;
+        pendingThreadQueue = new ConcurrentLinkedQueue();
+        url = new URL(fullUrl);
+    }
+
+    public int getMaxThreads() {
+        return maxThreads;
+    }
+
+    /** 
+     * Set the maximum number of actively communicating threads allowed 
+     */
+    public void setMaxThreads(int max) {
+        maxThreads = max;
+    }
+
+    /**
+     * Launches or queues an asynchronous request.
+     *
+     * If the maximum active thread count has not been reached,
+     * start a new thread and use it to send and receive the request.
+     * The response is passed to the request's HttpRequestHandler
+     * onComplete().  After complete, if the number of active threads
+     * is still lower than the max, one request will be pulled (if 
+     * present) from the async queue and fired.
+     *
+     * If there are too many active threads, the main request is
+     * pushed onto the async queue for later processing
+     */
+    protected void manageAsyncRequest(final HttpRequest request) {
+
+        if (activeThreads >= maxThreads) {
+            pendingThreadQueue.offer(request);
+            return;
+        }
+
+        activeThreads++;
+
+         //Send the request receive the response, fire off the next 
+         //thread if necessary, then pass the result to the handler
+        Runnable r = new Runnable() {
+            public void run() {
+
+                Object response;
+                request.send();
+
+                while ((response = request.recv()) != null) {
+                    if (request.handler != null) 
+                        request.handler.onResponse(request, response);
+                }
+
+                if (request.handler != null)
+                    request.handler.onComplete(request);
+
+                activeThreads--;
+
+                if (activeThreads < maxThreads) {
+                    try {
+                        manageAsyncRequest(pendingThreadQueue.remove());
+                    } catch (java.util.NoSuchElementException ex) {
+                        // may have been gobbled by another thread
+                    }
+                }
+            }
+        };
+
+        new Thread(r).start();
+    }
+}
+
+
diff --git a/src/java/org/opensrf/net/http/HttpRequest.java b/src/java/org/opensrf/net/http/HttpRequest.java
new file mode 100644 (file)
index 0000000..3548af3
--- /dev/null
@@ -0,0 +1,50 @@
+package org.opensrf.net.http;
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.net.HttpURLConnection;
+
+public abstract class HttpRequest {
+
+    protected String service;
+    protected Method method;
+    protected HttpURLConnection urlConn;
+    protected HttpConnection httpConn;
+    protected HttpRequestHandler handler;
+    private List<Object> responseList;
+    protected boolean complete = false;
+
+    public HttpRequest() {
+    }
+
+    public HttpRequest(HttpConnection conn, String service, Method method) {
+        this.handler = null;
+        this.urlConn = null;
+        this.httpConn = conn;
+        this.service = service;
+        this.method = method;
+    }
+
+    public void sendAsync(final HttpRequestHandler handler) {
+        this.handler = handler;
+        httpConn.manageAsyncRequest(this);
+    }
+
+    protected void pushResponse(Object response) {
+        if (responseList == null)
+            responseList = new LinkedList<Object>();
+        responseList.add(response);
+    }
+
+    protected List responses() {
+        return responseList;
+    }
+
+    public abstract HttpRequest send();
+
+    public abstract Object recv();
+}
+
+
diff --git a/src/java/org/opensrf/net/http/HttpRequestHandler.java b/src/java/org/opensrf/net/http/HttpRequestHandler.java
new file mode 100644 (file)
index 0000000..8d8484d
--- /dev/null
@@ -0,0 +1,26 @@
+package org.opensrf.net.http;
+
+import java.util.List;
+
+/*
+ * Handler for async gateway responses.
+ */
+public abstract class HttpRequestHandler {
+
+    /**
+     * Called when all responses have been received.
+     *
+     * If discardResponses() returns true, will be passed null.
+     */
+    public void onComplete(HttpRequest request) {
+    }
+
+    /**
+     * Called with each response received from the server.
+     * 
+     * @param payload the value returned from the server.
+     */
+    public void onResponse(HttpRequest request, Object response) {
+        request.pushResponse(response);
+    }
+}