From 022c83e8b00220349a64a8fdae6d39cc161ad9e2 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Mon, 27 Feb 2012 18:05:07 -0500 Subject: [PATCH] Java HTTP gateway interface Supports sync and async requests. Async requests support onResponse, onComplete, and onError handlers. Supports a max-threads value to limit the number of activately communicating threads over any connection. When max-threads is reached, requests are queued and delivered as soon as there is room. Note that since this is talking to the OpenSRF gateway and not the translater, responses are simply collected and passed one at a time to onResponse. They are not streamed. The goal of supporting onResponse is to provide the same client API for both the gateway and translator. Signed-off-by: Bill Erickson Signed-off-by: Dan Scott --- src/java/org/opensrf/net/http/GatewayRequest.java | 129 +++++++++++++++++++++ src/java/org/opensrf/net/http/HttpConnection.java | 97 ++++++++++++++++ src/java/org/opensrf/net/http/HttpRequest.java | 66 +++++++++++ .../org/opensrf/net/http/HttpRequestHandler.java | 25 ++++ 4 files changed, 317 insertions(+) create mode 100644 src/java/org/opensrf/net/http/GatewayRequest.java create mode 100644 src/java/org/opensrf/net/http/HttpConnection.java create mode 100644 src/java/org/opensrf/net/http/HttpRequest.java create mode 100644 src/java/org/opensrf/net/http/HttpRequestHandler.java diff --git a/src/java/org/opensrf/net/http/GatewayRequest.java b/src/java/org/opensrf/net/http/GatewayRequest.java new file mode 100644 index 0000000..00631bf --- /dev/null +++ b/src/java/org/opensrf/net/http/GatewayRequest.java @@ -0,0 +1,129 @@ +package org.opensrf.net.http; + +import org.opensrf.*; +import org.opensrf.util.*; + +import java.io.IOException; +import java.io.BufferedInputStream; +import java.io.OutputStreamWriter; +import java.io.InputStream; +import java.io.IOException; +import java.net.URL; +import java.net.URI; +import java.net.HttpURLConnection; +import java.lang.StringBuffer; +import java.util.List; +import java.util.Iterator; +import java.util.Map; +import java.util.HashMap; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class GatewayRequest extends HttpRequest { + + private boolean readComplete; + + public GatewayRequest(HttpConnection conn, String service, Method method) { + super(conn, service, method); + readComplete = false; + } + + public GatewayRequest send() { + try { + + String postData = compilePostData(service, method); + + urlConn = (HttpURLConnection) httpConn.url.openConnection(); + urlConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + urlConn.setDoInput(true); + urlConn.setDoOutput(true); + + OutputStreamWriter wr = new OutputStreamWriter(urlConn.getOutputStream()); + wr.write(postData); + wr.flush(); + wr.close(); + + } catch (java.io.IOException ex) { + failed = true; + failure = ex; + } + + return this; + } + + public Object recv() { + + if (readComplete) + return nextResponse(); + + try { + + InputStream netStream = new BufferedInputStream(urlConn.getInputStream()); + StringBuffer readBuf = new StringBuffer(); + + int bytesRead = 0; + byte[] buffer = new byte[1024]; + + while ((bytesRead = netStream.read(buffer)) != -1) { + readBuf.append(new String(buffer, 0, bytesRead)); + } + + netStream.close(); + urlConn = null; + + Map result = null; + + try { + result = (Map) new JSONReader(readBuf.toString()).readObject(); + } catch (org.opensrf.util.JSONException ex) { + ex.printStackTrace(); + return null; + } + + String status = result.get("status").toString(); + if (!"200".equals(status)) { + failed = true; + // failure = + } + + // gateway always returns a wrapper array with the full results set + responseList = (List) result.get("payload"); + + } catch (java.io.IOException ex) { + failed = true; + failure = ex; + } + + readComplete = true; + return nextResponse(); + } + + private String compilePostData(String service, Method method) { + URI uri = null; + StringBuffer postData = new StringBuffer(); + + postData.append("service="); + postData.append(service); + postData.append("&method="); + postData.append(method.getName()); + + List params = method.getParams(); + Iterator itr = params.iterator(); + + while (itr.hasNext()) { + postData.append("¶m="); + postData.append(new JSONWriter(itr.next()).write()); + } + + try { + // not using URLEncoder because it replaces ' ' with '+'. + uri = new URI("http", "", null, postData.toString(), null); + } catch (java.net.URISyntaxException ex) { + ex.printStackTrace(); + } + + return uri.getRawQuery(); + } +} + + diff --git a/src/java/org/opensrf/net/http/HttpConnection.java b/src/java/org/opensrf/net/http/HttpConnection.java new file mode 100644 index 0000000..32fdebc --- /dev/null +++ b/src/java/org/opensrf/net/http/HttpConnection.java @@ -0,0 +1,97 @@ +package org.opensrf.net.http; + +import java.net.URL; +import java.net.MalformedURLException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.opensrf.*; +import org.opensrf.util.*; + + +/** + * Manages connection parameters and thread limiting for opensrf json gateway connections. + */ + +public class HttpConnection { + + /** Compiled URL object */ + protected URL url; + /** Number of threads currently communicating with the server */ + protected int activeThreads; + /** Queue of pending async requests */ + protected Queue pendingThreadQueue; + /** maximum number of actively communicating threads allowed */ + protected int maxThreads = 10; + + public HttpConnection(String fullUrl) throws java.net.MalformedURLException { + activeThreads = 0; + pendingThreadQueue = new ConcurrentLinkedQueue(); + url = new URL(fullUrl); + } + + public int getMaxThreads() { + return maxThreads; + } + + /** + * Set the maximum number of actively communicating threads allowed + */ + public void setMaxThreads(int max) { + maxThreads = max; + } + + /** + * Launches or queues an asynchronous request. + * + * If the maximum active thread count has not been reached, + * start a new thread and use it to send and receive the request. + * The response is passed to the request's HttpRequestHandler + * onComplete(). After complete, if the number of active threads + * is still lower than the max, one request will be pulled (if + * present) from the async queue and fired. + * + * If there are too many active threads, the main request is + * pushed onto the async queue for later processing + */ + protected void manageAsyncRequest(final HttpRequest request) { + + if (activeThreads >= maxThreads) { + pendingThreadQueue.offer(request); + return; + } + + activeThreads++; + + //Send the request receive the response, fire off the next + //thread if necessary, then pass the result to the handler + Runnable r = new Runnable() { + public void run() { + + Object response; + request.send(); + + while ((response = request.recv()) != null) { + if (request.handler != null) + request.handler.onResponse(request, response); + } + + if (request.handler != null) + request.handler.onComplete(request); + + activeThreads--; + + if (activeThreads < maxThreads) { + try { + manageAsyncRequest(pendingThreadQueue.remove()); + } catch (java.util.NoSuchElementException ex) { + // may have been gobbled by another thread + } + } + } + }; + + new Thread(r).start(); + } +} + + diff --git a/src/java/org/opensrf/net/http/HttpRequest.java b/src/java/org/opensrf/net/http/HttpRequest.java new file mode 100644 index 0000000..d91a127 --- /dev/null +++ b/src/java/org/opensrf/net/http/HttpRequest.java @@ -0,0 +1,66 @@ +package org.opensrf.net.http; +import org.opensrf.*; +import org.opensrf.util.*; + +import java.util.List; +import java.util.LinkedList; +import java.net.HttpURLConnection; + +public abstract class HttpRequest { + + protected String service; + protected Method method; + protected HttpURLConnection urlConn; + protected HttpConnection httpConn; + protected HttpRequestHandler handler; + protected List responseList; + protected Exception failure; + protected boolean failed; + protected boolean complete; + + public HttpRequest() { + failed = false; + complete = false; + handler = null; + urlConn = null; + } + + public HttpRequest(HttpConnection conn, String service, Method method) { + this(); + this.httpConn = conn; + this.service = service; + this.method = method; + } + + public void sendAsync(final HttpRequestHandler handler) { + this.handler = handler; + httpConn.manageAsyncRequest(this); + } + + protected void pushResponse(Object response) { + if (responseList == null) + responseList = new LinkedList(); + responseList.add(response); + } + + protected List responses() { + return responseList; + } + + protected Object nextResponse() { + if (complete || failed) return null; + if (responseList.size() > 0) + return responseList.remove(0); + return null; + } + + public Exception getFailure() { + return failure; + } + + public abstract HttpRequest send(); + + public abstract Object recv(); +} + + diff --git a/src/java/org/opensrf/net/http/HttpRequestHandler.java b/src/java/org/opensrf/net/http/HttpRequestHandler.java new file mode 100644 index 0000000..9c0f9e5 --- /dev/null +++ b/src/java/org/opensrf/net/http/HttpRequestHandler.java @@ -0,0 +1,25 @@ +package org.opensrf.net.http; + +import java.util.List; + +/* + * Handler for async gateway responses. + */ +public abstract class HttpRequestHandler { + + /** + * Called when all responses have been received. + * + * If discardResponses() returns true, will be passed null. + */ + public void onComplete(HttpRequest request) { + } + + /** + * Called with each response received from the server. + * + * @param payload the value returned from the server. + */ + public void onResponse(HttpRequest request, Object response) { + } +} -- 2.11.0