From f974fe368903b7f19fd9cfabf6111cd3fef4f8c0 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Wed, 29 Feb 2012 09:18:44 -0500 Subject: [PATCH] more java gateway Signed-off-by: Bill Erickson --- src/java/org/opensrf/net/http/GatewayRequest.java | 38 +++++-------- .../opensrf/net/http/GatewayRequestHandler.java | 9 --- ...{GatewayConnection.java => HttpConnection.java} | 65 ++++++++-------------- src/java/org/opensrf/net/http/HttpRequest.java | 50 +++++++++++++++++ .../org/opensrf/net/http/HttpRequestHandler.java | 26 +++++++++ 5 files changed, 113 insertions(+), 75 deletions(-) delete mode 100644 src/java/org/opensrf/net/http/GatewayRequestHandler.java rename src/java/org/opensrf/net/http/{GatewayConnection.java => HttpConnection.java} (61%) create mode 100644 src/java/org/opensrf/net/http/HttpRequest.java create mode 100644 src/java/org/opensrf/net/http/HttpRequestHandler.java diff --git a/src/java/org/opensrf/net/http/GatewayRequest.java b/src/java/org/opensrf/net/http/GatewayRequest.java index 5226fa5..bf9e071 100644 --- a/src/java/org/opensrf/net/http/GatewayRequest.java +++ b/src/java/org/opensrf/net/http/GatewayRequest.java @@ -13,42 +13,27 @@ import java.net.URI; import java.net.HttpURLConnection; import java.lang.StringBuffer; import java.util.List; +import java.util.LinkedList; import java.util.Iterator; import java.util.Map; import java.util.HashMap; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -public class GatewayRequest { - - private String service; - private Method method; - private HttpURLConnection urlConn; - private GatewayConnection gConn; - protected GatewayRequestHandler handler; - - /** - * Invoked from GatewayConnection - */ - protected GatewayRequest(GatewayConnection conn, String service, Method method) { - this.handler = null; - this.urlConn = null; - this.gConn = conn; - this.service = service; - this.method = method; - } +public class GatewayRequest extends HttpRequest { + + private List responseList; - public void sendAsync(final GatewayRequestHandler handler) { - this.handler = handler; - gConn.manageAsyncRequest(this); + public GatewayRequest(HttpConnection conn, String service, Method method) { + super(conn, service, method); + responseList = new LinkedList(); // TODO } public GatewayRequest send() { try { - URL url = gConn.getUrl(); String postData = compilePostData(service, method); - urlConn = (HttpURLConnection) url.openConnection(); + urlConn = (HttpURLConnection) httpConn.url.openConnection(); urlConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); urlConn.setDoInput(true); urlConn.setDoOutput(true); @@ -67,6 +52,8 @@ public class GatewayRequest { public Object recv() { + if (complete) return null; + Object payload = null; StringBuffer readBuf = new StringBuffer(); @@ -79,6 +66,9 @@ public class GatewayRequest { while ((bytesRead = netStream.read(buffer)) != -1) { readBuf.append(new String(buffer, 0, bytesRead)); } + + netStream.close(); + urlConn = null; Map result = null; @@ -96,10 +86,12 @@ public class GatewayRequest { payload = result.get("payload"); + } catch (Exception ex) { // TODO inspect more closely ex.printStackTrace(); } + complete = true; return payload; } diff --git a/src/java/org/opensrf/net/http/GatewayRequestHandler.java b/src/java/org/opensrf/net/http/GatewayRequestHandler.java deleted file mode 100644 index 48ceeda..0000000 --- a/src/java/org/opensrf/net/http/GatewayRequestHandler.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.opensrf.net.http; - -/* - * Handler for async gateway request responses - */ - -public interface GatewayRequestHandler { - public void onComplete(Object payload); -} diff --git a/src/java/org/opensrf/net/http/GatewayConnection.java b/src/java/org/opensrf/net/http/HttpConnection.java similarity index 61% rename from src/java/org/opensrf/net/http/GatewayConnection.java rename to src/java/org/opensrf/net/http/HttpConnection.java index ac62169..32fdebc 100644 --- a/src/java/org/opensrf/net/http/GatewayConnection.java +++ b/src/java/org/opensrf/net/http/HttpConnection.java @@ -12,36 +12,21 @@ import org.opensrf.util.*; * Manages connection parameters and thread limiting for opensrf json gateway connections. */ -public class GatewayConnection { - - /** http or https */ - private String proto; - /** server name */ - private String host; - /** URL path to the gateway, defaults to defaultPath */ - private String path; +public class HttpConnection { + /** Compiled URL object */ - private URL url; + protected URL url; /** Number of threads currently communicating with the server */ protected int activeThreads; /** Queue of pending async requests */ - protected Queue pendingThreadQueue; + protected Queue pendingThreadQueue; /** maximum number of actively communicating threads allowed */ protected int maxThreads = 10; - /** URL path for the gateway */ - public static final String defaultPath = "/osrf-gateway-v1"; - public GatewayConnection(String proto, String host) { - this.proto = proto; - this.host = host; - this.path = defaultPath; + public HttpConnection(String fullUrl) throws java.net.MalformedURLException { activeThreads = 0; pendingThreadQueue = new ConcurrentLinkedQueue(); - } - - public GatewayConnection(String proto, String host, String path) { - this(proto, host); - this.path = path; + url = new URL(fullUrl); } public int getMaxThreads() { @@ -55,27 +40,12 @@ public class GatewayConnection { maxThreads = max; } - protected URL getUrl() throws java.net.MalformedURLException { - try { - if (url == null) { - url = new URL(proto + "://" + host + path); - } - } catch (java.net.MalformedURLException ex) { - ex.printStackTrace(); - } - return url; - } - - public GatewayRequest request(String service, Method method) { - return new GatewayRequest(this, service, method); - } - /** * Launches or queues an asynchronous request. * * If the maximum active thread count has not been reached, * start a new thread and use it to send and receive the request. - * The response is passed to the request's GatewayRequestHandler + * The response is passed to the request's HttpRequestHandler * onComplete(). After complete, if the number of active threads * is still lower than the max, one request will be pulled (if * present) from the async queue and fired. @@ -83,7 +53,7 @@ public class GatewayConnection { * If there are too many active threads, the main request is * pushed onto the async queue for later processing */ - protected void manageAsyncRequest(final GatewayRequest request) { + protected void manageAsyncRequest(final HttpRequest request) { if (activeThreads >= maxThreads) { pendingThreadQueue.offer(request); @@ -97,17 +67,26 @@ public class GatewayConnection { Runnable r = new Runnable() { public void run() { - Object response = request.send().recv(); + Object response; + request.send(); + + while ((response = request.recv()) != null) { + if (request.handler != null) + request.handler.onResponse(request, response); + } + + if (request.handler != null) + request.handler.onComplete(request); + activeThreads--; if (activeThreads < maxThreads) { try { manageAsyncRequest(pendingThreadQueue.remove()); - } catch (java.util.NoSuchElementException ex) {} + } catch (java.util.NoSuchElementException ex) { + // may have been gobbled by another thread + } } - - if (request.handler != null) - request.handler.onComplete(response); } }; diff --git a/src/java/org/opensrf/net/http/HttpRequest.java b/src/java/org/opensrf/net/http/HttpRequest.java new file mode 100644 index 0000000..3548af3 --- /dev/null +++ b/src/java/org/opensrf/net/http/HttpRequest.java @@ -0,0 +1,50 @@ +package org.opensrf.net.http; +import org.opensrf.*; +import org.opensrf.util.*; + +import java.util.List; +import java.util.LinkedList; +import java.net.HttpURLConnection; + +public abstract class HttpRequest { + + protected String service; + protected Method method; + protected HttpURLConnection urlConn; + protected HttpConnection httpConn; + protected HttpRequestHandler handler; + private List responseList; + protected boolean complete = false; + + public HttpRequest() { + } + + public HttpRequest(HttpConnection conn, String service, Method method) { + this.handler = null; + this.urlConn = null; + this.httpConn = conn; + this.service = service; + this.method = method; + } + + public void sendAsync(final HttpRequestHandler handler) { + this.handler = handler; + httpConn.manageAsyncRequest(this); + } + + protected void pushResponse(Object response) { + if (responseList == null) + responseList = new LinkedList(); + responseList.add(response); + } + + protected List responses() { + return responseList; + } + + public abstract HttpRequest send(); + + public abstract Object recv(); +} + + diff --git a/src/java/org/opensrf/net/http/HttpRequestHandler.java b/src/java/org/opensrf/net/http/HttpRequestHandler.java new file mode 100644 index 0000000..8d8484d --- /dev/null +++ b/src/java/org/opensrf/net/http/HttpRequestHandler.java @@ -0,0 +1,26 @@ +package org.opensrf.net.http; + +import java.util.List; + +/* + * Handler for async gateway responses. + */ +public abstract class HttpRequestHandler { + + /** + * Called when all responses have been received. + * + * If discardResponses() returns true, will be passed null. + */ + public void onComplete(HttpRequest request) { + } + + /** + * Called with each response received from the server. + * + * @param payload the value returned from the server. + */ + public void onResponse(HttpRequest request, Object response) { + request.pushResponse(response); + } +} -- 2.11.0