Java HTTP gateway
authorBill Erickson <berick@esilibrary.com>
Mon, 27 Feb 2012 23:05:07 +0000 (18:05 -0500)
committerBill Erickson <berick@esilibrary.com>
Thu, 15 Mar 2012 20:23:51 +0000 (16:23 -0400)
Signed-off-by: Bill Erickson <berick@esilibrary.com>
src/java/org/opensrf/net/http/GatewayConnection.java [new file with mode: 0644]
src/java/org/opensrf/net/http/GatewayRequest.java [new file with mode: 0644]
src/java/org/opensrf/net/http/GatewayRequestHandler.java [new file with mode: 0644]

diff --git a/src/java/org/opensrf/net/http/GatewayConnection.java b/src/java/org/opensrf/net/http/GatewayConnection.java
new file mode 100644 (file)
index 0000000..beab024
--- /dev/null
@@ -0,0 +1,92 @@
+package org.opensrf.net.http;
+
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class GatewayConnection {
+
+    private String proto; /* http, https */
+    private String host;
+    private String path;
+    private URL url;
+    protected int activeThreads;
+    protected Queue<GatewayRequest> pendingThreadQueue;
+    protected int maxThreads = 10;
+    public static final String defaultPath = "/osrf-gateway-v1";
+
+    public GatewayConnection(String proto, String host) {
+        this.proto = proto;
+        this.host = host;
+        this.path = defaultPath;
+        activeThreads = 0;
+        pendingThreadQueue = new ConcurrentLinkedQueue();
+    }
+
+    public GatewayConnection(String proto, String host, String path) {
+        this(proto, host);
+        this.path = path; 
+    }
+
+    public int getMaxThreads() {
+        return maxThreads;
+    }
+
+    public void setMaxThreads(int max) {
+        maxThreads = max;
+    }
+
+    public URL getUrl() throws java.net.MalformedURLException {
+        try {
+            if (url == null) {
+                url = new URL(proto + "://" + host + path);
+            }
+        } catch (java.net.MalformedURLException ex) {
+            ex.printStackTrace(); 
+        }
+        return url;
+    }
+
+    public GatewayRequest request(String service, Method method) {
+        return new GatewayRequest(this, service, method);
+    }
+
+    protected void manageAsyncRequest(final GatewayRequest request) {
+
+        if (activeThreads >= maxThreads) {
+            pendingThreadQueue.offer(request);
+            return;
+        }
+
+        activeThreads++;
+
+        /*
+         * Send the request receive the response, fire off the next 
+         * thread if necessary, then pass the result to the handler
+         */
+        Runnable r = new Runnable() {
+            public void run() {
+
+                Object response = request.send().recv();
+                activeThreads--;
+
+                if (activeThreads < maxThreads) {
+                    try {
+                        manageAsyncRequest(pendingThreadQueue.remove());
+                    } catch (java.util.NoSuchElementException ex) {}
+                }
+
+                if (request.handler != null)
+                    request.handler.onComplete(response);
+            }
+        };
+
+        new Thread(r).start();
+    }
+}
+
+
diff --git a/src/java/org/opensrf/net/http/GatewayRequest.java b/src/java/org/opensrf/net/http/GatewayRequest.java
new file mode 100644 (file)
index 0000000..5226fa5
--- /dev/null
@@ -0,0 +1,134 @@
+package org.opensrf.net.http;
+
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+import java.io.IOException;
+import java.io.BufferedInputStream;
+import java.io.OutputStreamWriter;
+import java.io.InputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URI;
+import java.net.HttpURLConnection;
+import java.lang.StringBuffer;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class GatewayRequest {
+
+    private String service;
+    private Method method;
+    private HttpURLConnection urlConn;
+    private GatewayConnection gConn;
+    protected GatewayRequestHandler handler;
+
+    /**
+     * Invoked from GatewayConnection
+     */
+    protected GatewayRequest(GatewayConnection conn, String service, Method method) {
+        this.handler = null;
+        this.urlConn = null;
+        this.gConn = conn;
+        this.service = service;
+        this.method = method;
+    }
+
+    public void sendAsync(final GatewayRequestHandler handler) {
+        this.handler = handler;
+        gConn.manageAsyncRequest(this);
+    }
+
+    public GatewayRequest send() {
+        try {
+            URL url = gConn.getUrl();
+            String postData = compilePostData(service, method);
+
+            urlConn = (HttpURLConnection) url.openConnection();
+            urlConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); 
+            urlConn.setDoInput(true);
+            urlConn.setDoOutput(true);
+
+            OutputStreamWriter wr = new OutputStreamWriter(urlConn.getOutputStream());
+            wr.write(postData);
+            wr.flush();
+            wr.close();
+
+        } catch (Exception ex) { // TODO inspect more closely
+            ex.printStackTrace(); 
+        }
+
+        return this;
+    }
+
+    public Object recv() {
+
+        Object payload = null;
+        StringBuffer readBuf = new StringBuffer();
+
+        try {
+            InputStream netStream = new BufferedInputStream(urlConn.getInputStream());
+
+            int bytesRead = 0;
+            byte[] buffer = new byte[1024];
+
+            while ((bytesRead = netStream.read(buffer)) != -1) {
+                readBuf.append(new String(buffer, 0, bytesRead));
+            }
+
+            Map<String,?> result = null;
+
+            try {
+                result = (Map<String, ?>) new JSONReader(readBuf.toString()).readObject();
+            } catch (org.opensrf.util.JSONException ex) {
+                ex.printStackTrace();
+                return null;
+            }
+
+            String status = result.get("status").toString(); 
+            if (!"200".equals(status)) {
+                // throw exception
+            }
+
+            payload = result.get("payload"); 
+
+        } catch (Exception ex) { // TODO inspect more closely
+            ex.printStackTrace(); 
+        }
+
+        return payload;
+    }
+
+    private String compilePostData(String service, Method method) {
+        URI uri = null;
+        StringBuffer postData = new StringBuffer();
+
+        postData.append("service=");
+        postData.append(service);
+        postData.append("&method=");
+        postData.append(method.getName());
+
+        List params = method.getParams();
+        Iterator itr = params.iterator();
+
+        while (itr.hasNext()) {
+            postData.append("&param=");
+            postData.append(new JSONWriter(itr.next()).write());
+        }
+
+        try {
+            // not using URLEncoder because it replaces ' ' with '+'.
+            uri = new URI("http", "", null, postData.toString(), null);
+        } catch (java.net.URISyntaxException ex) {
+            ex.printStackTrace(); 
+        }
+
+        return uri.getRawQuery();
+    }
+}
+
+
diff --git a/src/java/org/opensrf/net/http/GatewayRequestHandler.java b/src/java/org/opensrf/net/http/GatewayRequestHandler.java
new file mode 100644 (file)
index 0000000..3a62769
--- /dev/null
@@ -0,0 +1,5 @@
+package org.opensrf.net.http;
+
+public interface GatewayRequestHandler {
+    public void onComplete(Object payload);
+}