+++ /dev/null
-package org.opensrf.net.http;
-
-import java.net.URL;
-import java.net.MalformedURLException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.opensrf.*;
-import org.opensrf.util.*;
-
-
-/**
- * Manages connection parameters and thread limiting for opensrf json gateway connections.
- */
-
-public class GatewayConnection {
-
- /** http or https */
- private String proto;
- /** server name */
- private String host;
- /** URL path to the gateway, defaults to defaultPath */
- private String path;
- /** Compiled URL object */
- private URL url;
- /** Number of threads currently communicating with the server */
- protected int activeThreads;
- /** Queue of pending async requests */
- protected Queue<GatewayRequest> pendingThreadQueue;
- /** maximum number of actively communicating threads allowed */
- protected int maxThreads = 10;
- /** URL path for the gateway */
- public static final String defaultPath = "/osrf-gateway-v1";
-
- public GatewayConnection(String proto, String host) {
- this.proto = proto;
- this.host = host;
- this.path = defaultPath;
- activeThreads = 0;
- pendingThreadQueue = new ConcurrentLinkedQueue();
- }
-
- public GatewayConnection(String proto, String host, String path) {
- this(proto, host);
- this.path = path;
- }
-
- public int getMaxThreads() {
- return maxThreads;
- }
-
- /**
- * Set the maximum number of actively communicating threads allowed
- */
- public void setMaxThreads(int max) {
- maxThreads = max;
- }
-
- protected URL getUrl() throws java.net.MalformedURLException {
- try {
- if (url == null) {
- url = new URL(proto + "://" + host + path);
- }
- } catch (java.net.MalformedURLException ex) {
- ex.printStackTrace();
- }
- return url;
- }
-
- public GatewayRequest request(String service, Method method) {
- return new GatewayRequest(this, service, method);
- }
-
- /**
- * Launches or queues an asynchronous request.
- *
- * If the maximum active thread count has not been reached,
- * start a new thread and use it to send and receive the request.
- * The response is passed to the request's GatewayRequestHandler
- * onComplete(). After complete, if the number of active threads
- * is still lower than the max, one request will be pulled (if
- * present) from the async queue and fired.
- *
- * If there are too many active threads, the main request is
- * pushed onto the async queue for later processing
- */
- protected void manageAsyncRequest(final GatewayRequest request) {
-
- if (activeThreads >= maxThreads) {
- pendingThreadQueue.offer(request);
- return;
- }
-
- activeThreads++;
-
- //Send the request receive the response, fire off the next
- //thread if necessary, then pass the result to the handler
- Runnable r = new Runnable() {
- public void run() {
-
- Object response = request.send().recv();
- activeThreads--;
-
- if (activeThreads < maxThreads) {
- try {
- manageAsyncRequest(pendingThreadQueue.remove());
- } catch (java.util.NoSuchElementException ex) {}
- }
-
- if (request.handler != null)
- request.handler.onComplete(response);
- }
- };
-
- new Thread(r).start();
- }
-}
-
-
import java.net.HttpURLConnection;
import java.lang.StringBuffer;
import java.util.List;
+import java.util.LinkedList;
import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class GatewayRequest {
-
- private String service;
- private Method method;
- private HttpURLConnection urlConn;
- private GatewayConnection gConn;
- protected GatewayRequestHandler handler;
-
- /**
- * Invoked from GatewayConnection
- */
- protected GatewayRequest(GatewayConnection conn, String service, Method method) {
- this.handler = null;
- this.urlConn = null;
- this.gConn = conn;
- this.service = service;
- this.method = method;
- }
+public class GatewayRequest extends HttpRequest {
+
+ private List responseList;
- public void sendAsync(final GatewayRequestHandler handler) {
- this.handler = handler;
- gConn.manageAsyncRequest(this);
+ public GatewayRequest(HttpConnection conn, String service, Method method) {
+ super(conn, service, method);
+ responseList = new LinkedList(); // TODO
}
public GatewayRequest send() {
try {
- URL url = gConn.getUrl();
String postData = compilePostData(service, method);
- urlConn = (HttpURLConnection) url.openConnection();
+ urlConn = (HttpURLConnection) httpConn.url.openConnection();
urlConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
urlConn.setDoInput(true);
urlConn.setDoOutput(true);
public Object recv() {
+ if (complete) return null;
+
Object payload = null;
StringBuffer readBuf = new StringBuffer();
while ((bytesRead = netStream.read(buffer)) != -1) {
readBuf.append(new String(buffer, 0, bytesRead));
}
+
+ netStream.close();
+ urlConn = null;
Map<String,?> result = null;
payload = result.get("payload");
+
} catch (Exception ex) { // TODO inspect more closely
ex.printStackTrace();
}
+ complete = true;
return payload;
}
+++ /dev/null
-package org.opensrf.net.http;
-
-/*
- * Handler for async gateway request responses
- */
-
-public interface GatewayRequestHandler {
- public void onComplete(Object payload);
-}
--- /dev/null
+package org.opensrf.net.http;
+
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+
+/**
+ * Manages connection parameters and thread limiting for opensrf json gateway connections.
+ */
+
+public class HttpConnection {
+
+ /** Compiled URL object */
+ protected URL url;
+ /** Number of threads currently communicating with the server */
+ protected int activeThreads;
+ /** Queue of pending async requests */
+ protected Queue<HttpRequest> pendingThreadQueue;
+ /** maximum number of actively communicating threads allowed */
+ protected int maxThreads = 10;
+
+ public HttpConnection(String fullUrl) throws java.net.MalformedURLException {
+ activeThreads = 0;
+ pendingThreadQueue = new ConcurrentLinkedQueue();
+ url = new URL(fullUrl);
+ }
+
+ public int getMaxThreads() {
+ return maxThreads;
+ }
+
+ /**
+ * Set the maximum number of actively communicating threads allowed
+ */
+ public void setMaxThreads(int max) {
+ maxThreads = max;
+ }
+
+ /**
+ * Launches or queues an asynchronous request.
+ *
+ * If the maximum active thread count has not been reached,
+ * start a new thread and use it to send and receive the request.
+ * The response is passed to the request's HttpRequestHandler
+ * onComplete(). After complete, if the number of active threads
+ * is still lower than the max, one request will be pulled (if
+ * present) from the async queue and fired.
+ *
+ * If there are too many active threads, the main request is
+ * pushed onto the async queue for later processing
+ */
+ protected void manageAsyncRequest(final HttpRequest request) {
+
+ if (activeThreads >= maxThreads) {
+ pendingThreadQueue.offer(request);
+ return;
+ }
+
+ activeThreads++;
+
+ //Send the request receive the response, fire off the next
+ //thread if necessary, then pass the result to the handler
+ Runnable r = new Runnable() {
+ public void run() {
+
+ Object response;
+ request.send();
+
+ while ((response = request.recv()) != null) {
+ if (request.handler != null)
+ request.handler.onResponse(request, response);
+ }
+
+ if (request.handler != null)
+ request.handler.onComplete(request);
+
+ activeThreads--;
+
+ if (activeThreads < maxThreads) {
+ try {
+ manageAsyncRequest(pendingThreadQueue.remove());
+ } catch (java.util.NoSuchElementException ex) {
+ // may have been gobbled by another thread
+ }
+ }
+ }
+ };
+
+ new Thread(r).start();
+ }
+}
+
+
--- /dev/null
+package org.opensrf.net.http;
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.net.HttpURLConnection;
+
+public abstract class HttpRequest {
+
+ protected String service;
+ protected Method method;
+ protected HttpURLConnection urlConn;
+ protected HttpConnection httpConn;
+ protected HttpRequestHandler handler;
+ private List<Object> responseList;
+ protected boolean complete = false;
+
+ public HttpRequest() {
+ }
+
+ public HttpRequest(HttpConnection conn, String service, Method method) {
+ this.handler = null;
+ this.urlConn = null;
+ this.httpConn = conn;
+ this.service = service;
+ this.method = method;
+ }
+
+ public void sendAsync(final HttpRequestHandler handler) {
+ this.handler = handler;
+ httpConn.manageAsyncRequest(this);
+ }
+
+ protected void pushResponse(Object response) {
+ if (responseList == null)
+ responseList = new LinkedList<Object>();
+ responseList.add(response);
+ }
+
+ protected List responses() {
+ return responseList;
+ }
+
+ public abstract HttpRequest send();
+
+ public abstract Object recv();
+}
+
+
--- /dev/null
+package org.opensrf.net.http;
+
+import java.util.List;
+
+/*
+ * Handler for async gateway responses.
+ */
+public abstract class HttpRequestHandler {
+
+ /**
+ * Called when all responses have been received.
+ *
+ * If discardResponses() returns true, will be passed null.
+ */
+ public void onComplete(HttpRequest request) {
+ }
+
+ /**
+ * Called with each response received from the server.
+ *
+ * @param payload the value returned from the server.
+ */
+ public void onResponse(HttpRequest request, Object response) {
+ request.pushResponse(response);
+ }
+}