From: Bill Erickson Date: Wed, 29 Feb 2012 14:18:44 +0000 (-0500) Subject: more java gateway X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=f974fe368903b7f19fd9cfabf6111cd3fef4f8c0;p=working%2FOpenSRF.git more java gateway Signed-off-by: Bill Erickson --- diff --git a/src/java/org/opensrf/net/http/GatewayConnection.java b/src/java/org/opensrf/net/http/GatewayConnection.java deleted file mode 100644 index ac62169..0000000 --- a/src/java/org/opensrf/net/http/GatewayConnection.java +++ /dev/null @@ -1,118 +0,0 @@ -package org.opensrf.net.http; - -import java.net.URL; -import java.net.MalformedURLException; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import org.opensrf.*; -import org.opensrf.util.*; - - -/** - * Manages connection parameters and thread limiting for opensrf json gateway connections. - */ - -public class GatewayConnection { - - /** http or https */ - private String proto; - /** server name */ - private String host; - /** URL path to the gateway, defaults to defaultPath */ - private String path; - /** Compiled URL object */ - private URL url; - /** Number of threads currently communicating with the server */ - protected int activeThreads; - /** Queue of pending async requests */ - protected Queue pendingThreadQueue; - /** maximum number of actively communicating threads allowed */ - protected int maxThreads = 10; - /** URL path for the gateway */ - public static final String defaultPath = "/osrf-gateway-v1"; - - public GatewayConnection(String proto, String host) { - this.proto = proto; - this.host = host; - this.path = defaultPath; - activeThreads = 0; - pendingThreadQueue = new ConcurrentLinkedQueue(); - } - - public GatewayConnection(String proto, String host, String path) { - this(proto, host); - this.path = path; - } - - public int getMaxThreads() { - return maxThreads; - } - - /** - * Set the maximum number of actively communicating threads allowed - */ - public void setMaxThreads(int max) { - maxThreads = max; - } - - protected URL getUrl() throws java.net.MalformedURLException { - try { - if (url == null) { - url = new URL(proto + "://" + host + path); - } - } catch (java.net.MalformedURLException ex) { - ex.printStackTrace(); - } - return url; - } - - public GatewayRequest request(String service, Method method) { - return new GatewayRequest(this, service, method); - } - - /** - * Launches or queues an asynchronous request. - * - * If the maximum active thread count has not been reached, - * start a new thread and use it to send and receive the request. - * The response is passed to the request's GatewayRequestHandler - * onComplete(). After complete, if the number of active threads - * is still lower than the max, one request will be pulled (if - * present) from the async queue and fired. - * - * If there are too many active threads, the main request is - * pushed onto the async queue for later processing - */ - protected void manageAsyncRequest(final GatewayRequest request) { - - if (activeThreads >= maxThreads) { - pendingThreadQueue.offer(request); - return; - } - - activeThreads++; - - //Send the request receive the response, fire off the next - //thread if necessary, then pass the result to the handler - Runnable r = new Runnable() { - public void run() { - - Object response = request.send().recv(); - activeThreads--; - - if (activeThreads < maxThreads) { - try { - manageAsyncRequest(pendingThreadQueue.remove()); - } catch (java.util.NoSuchElementException ex) {} - } - - if (request.handler != null) - request.handler.onComplete(response); - } - }; - - new Thread(r).start(); - } -} - - diff --git a/src/java/org/opensrf/net/http/GatewayRequest.java b/src/java/org/opensrf/net/http/GatewayRequest.java index 5226fa5..bf9e071 100644 --- a/src/java/org/opensrf/net/http/GatewayRequest.java +++ b/src/java/org/opensrf/net/http/GatewayRequest.java @@ -13,42 +13,27 @@ import java.net.URI; import java.net.HttpURLConnection; import java.lang.StringBuffer; import java.util.List; +import java.util.LinkedList; import java.util.Iterator; import java.util.Map; import java.util.HashMap; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -public class GatewayRequest { - - private String service; - private Method method; - private HttpURLConnection urlConn; - private GatewayConnection gConn; - protected GatewayRequestHandler handler; - - /** - * Invoked from GatewayConnection - */ - protected GatewayRequest(GatewayConnection conn, String service, Method method) { - this.handler = null; - this.urlConn = null; - this.gConn = conn; - this.service = service; - this.method = method; - } +public class GatewayRequest extends HttpRequest { + + private List responseList; - public void sendAsync(final GatewayRequestHandler handler) { - this.handler = handler; - gConn.manageAsyncRequest(this); + public GatewayRequest(HttpConnection conn, String service, Method method) { + super(conn, service, method); + responseList = new LinkedList(); // TODO } public GatewayRequest send() { try { - URL url = gConn.getUrl(); String postData = compilePostData(service, method); - urlConn = (HttpURLConnection) url.openConnection(); + urlConn = (HttpURLConnection) httpConn.url.openConnection(); urlConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); urlConn.setDoInput(true); urlConn.setDoOutput(true); @@ -67,6 +52,8 @@ public class GatewayRequest { public Object recv() { + if (complete) return null; + Object payload = null; StringBuffer readBuf = new StringBuffer(); @@ -79,6 +66,9 @@ public class GatewayRequest { while ((bytesRead = netStream.read(buffer)) != -1) { readBuf.append(new String(buffer, 0, bytesRead)); } + + netStream.close(); + urlConn = null; Map result = null; @@ -96,10 +86,12 @@ public class GatewayRequest { payload = result.get("payload"); + } catch (Exception ex) { // TODO inspect more closely ex.printStackTrace(); } + complete = true; return payload; } diff --git a/src/java/org/opensrf/net/http/GatewayRequestHandler.java b/src/java/org/opensrf/net/http/GatewayRequestHandler.java deleted file mode 100644 index 48ceeda..0000000 --- a/src/java/org/opensrf/net/http/GatewayRequestHandler.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.opensrf.net.http; - -/* - * Handler for async gateway request responses - */ - -public interface GatewayRequestHandler { - public void onComplete(Object payload); -} diff --git a/src/java/org/opensrf/net/http/HttpConnection.java b/src/java/org/opensrf/net/http/HttpConnection.java new file mode 100644 index 0000000..32fdebc --- /dev/null +++ b/src/java/org/opensrf/net/http/HttpConnection.java @@ -0,0 +1,97 @@ +package org.opensrf.net.http; + +import java.net.URL; +import java.net.MalformedURLException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.opensrf.*; +import org.opensrf.util.*; + + +/** + * Manages connection parameters and thread limiting for opensrf json gateway connections. + */ + +public class HttpConnection { + + /** Compiled URL object */ + protected URL url; + /** Number of threads currently communicating with the server */ + protected int activeThreads; + /** Queue of pending async requests */ + protected Queue pendingThreadQueue; + /** maximum number of actively communicating threads allowed */ + protected int maxThreads = 10; + + public HttpConnection(String fullUrl) throws java.net.MalformedURLException { + activeThreads = 0; + pendingThreadQueue = new ConcurrentLinkedQueue(); + url = new URL(fullUrl); + } + + public int getMaxThreads() { + return maxThreads; + } + + /** + * Set the maximum number of actively communicating threads allowed + */ + public void setMaxThreads(int max) { + maxThreads = max; + } + + /** + * Launches or queues an asynchronous request. + * + * If the maximum active thread count has not been reached, + * start a new thread and use it to send and receive the request. + * The response is passed to the request's HttpRequestHandler + * onComplete(). After complete, if the number of active threads + * is still lower than the max, one request will be pulled (if + * present) from the async queue and fired. + * + * If there are too many active threads, the main request is + * pushed onto the async queue for later processing + */ + protected void manageAsyncRequest(final HttpRequest request) { + + if (activeThreads >= maxThreads) { + pendingThreadQueue.offer(request); + return; + } + + activeThreads++; + + //Send the request receive the response, fire off the next + //thread if necessary, then pass the result to the handler + Runnable r = new Runnable() { + public void run() { + + Object response; + request.send(); + + while ((response = request.recv()) != null) { + if (request.handler != null) + request.handler.onResponse(request, response); + } + + if (request.handler != null) + request.handler.onComplete(request); + + activeThreads--; + + if (activeThreads < maxThreads) { + try { + manageAsyncRequest(pendingThreadQueue.remove()); + } catch (java.util.NoSuchElementException ex) { + // may have been gobbled by another thread + } + } + } + }; + + new Thread(r).start(); + } +} + + diff --git a/src/java/org/opensrf/net/http/HttpRequest.java b/src/java/org/opensrf/net/http/HttpRequest.java new file mode 100644 index 0000000..3548af3 --- /dev/null +++ b/src/java/org/opensrf/net/http/HttpRequest.java @@ -0,0 +1,50 @@ +package org.opensrf.net.http; +import org.opensrf.*; +import org.opensrf.util.*; + +import java.util.List; +import java.util.LinkedList; +import java.net.HttpURLConnection; + +public abstract class HttpRequest { + + protected String service; + protected Method method; + protected HttpURLConnection urlConn; + protected HttpConnection httpConn; + protected HttpRequestHandler handler; + private List responseList; + protected boolean complete = false; + + public HttpRequest() { + } + + public HttpRequest(HttpConnection conn, String service, Method method) { + this.handler = null; + this.urlConn = null; + this.httpConn = conn; + this.service = service; + this.method = method; + } + + public void sendAsync(final HttpRequestHandler handler) { + this.handler = handler; + httpConn.manageAsyncRequest(this); + } + + protected void pushResponse(Object response) { + if (responseList == null) + responseList = new LinkedList(); + responseList.add(response); + } + + protected List responses() { + return responseList; + } + + public abstract HttpRequest send(); + + public abstract Object recv(); +} + + diff --git a/src/java/org/opensrf/net/http/HttpRequestHandler.java b/src/java/org/opensrf/net/http/HttpRequestHandler.java new file mode 100644 index 0000000..8d8484d --- /dev/null +++ b/src/java/org/opensrf/net/http/HttpRequestHandler.java @@ -0,0 +1,26 @@ +package org.opensrf.net.http; + +import java.util.List; + +/* + * Handler for async gateway responses. + */ +public abstract class HttpRequestHandler { + + /** + * Called when all responses have been received. + * + * If discardResponses() returns true, will be passed null. + */ + public void onComplete(HttpRequest request) { + } + + /** + * Called with each response received from the server. + * + * @param payload the value returned from the server. + */ + public void onResponse(HttpRequest request, Object response) { + request.pushResponse(response); + } +}