--- /dev/null
+package org.opensrf.net.http;
+
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+import java.io.IOException;
+import java.io.BufferedInputStream;
+import java.io.OutputStreamWriter;
+import java.io.InputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URI;
+import java.net.HttpURLConnection;
+import java.lang.StringBuffer;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class GatewayRequest extends HttpRequest {
+
+ private boolean readComplete;
+
+ public GatewayRequest(HttpConnection conn, String service, Method method) {
+ super(conn, service, method);
+ readComplete = false;
+ }
+
+ public GatewayRequest send() {
+ try {
+
+ String postData = compilePostData(service, method);
+
+ urlConn = (HttpURLConnection) httpConn.url.openConnection();
+ urlConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
+ urlConn.setDoInput(true);
+ urlConn.setDoOutput(true);
+
+ OutputStreamWriter wr = new OutputStreamWriter(urlConn.getOutputStream());
+ wr.write(postData);
+ wr.flush();
+ wr.close();
+
+ } catch (java.io.IOException ex) {
+ failed = true;
+ failure = ex;
+ }
+
+ return this;
+ }
+
+ public Object recv() {
+
+ if (readComplete)
+ return nextResponse();
+
+ try {
+
+ InputStream netStream = new BufferedInputStream(urlConn.getInputStream());
+ StringBuffer readBuf = new StringBuffer();
+
+ int bytesRead = 0;
+ byte[] buffer = new byte[1024];
+
+ while ((bytesRead = netStream.read(buffer)) != -1) {
+ readBuf.append(new String(buffer, 0, bytesRead));
+ }
+
+ netStream.close();
+ urlConn = null;
+
+ Map<String,?> result = null;
+
+ try {
+ result = (Map<String, ?>) new JSONReader(readBuf.toString()).readObject();
+ } catch (org.opensrf.util.JSONException ex) {
+ ex.printStackTrace();
+ return null;
+ }
+
+ String status = result.get("status").toString();
+ if (!"200".equals(status)) {
+ failed = true;
+ // failure = <some new exception>
+ }
+
+ // gateway always returns a wrapper array with the full results set
+ responseList = (List) result.get("payload");
+
+ } catch (java.io.IOException ex) {
+ failed = true;
+ failure = ex;
+ }
+
+ readComplete = true;
+ return nextResponse();
+ }
+
+ private String compilePostData(String service, Method method) {
+ URI uri = null;
+ StringBuffer postData = new StringBuffer();
+
+ postData.append("service=");
+ postData.append(service);
+ postData.append("&method=");
+ postData.append(method.getName());
+
+ List params = method.getParams();
+ Iterator itr = params.iterator();
+
+ while (itr.hasNext()) {
+ postData.append("¶m=");
+ postData.append(new JSONWriter(itr.next()).write());
+ }
+
+ try {
+ // not using URLEncoder because it replaces ' ' with '+'.
+ uri = new URI("http", "", null, postData.toString(), null);
+ } catch (java.net.URISyntaxException ex) {
+ ex.printStackTrace();
+ }
+
+ return uri.getRawQuery();
+ }
+}
+
+
--- /dev/null
+package org.opensrf.net.http;
+
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+
+/**
+ * Manages connection parameters and thread limiting for opensrf json gateway connections.
+ */
+
+public class HttpConnection {
+
+ /** Compiled URL object */
+ protected URL url;
+ /** Number of threads currently communicating with the server */
+ protected int activeThreads;
+ /** Queue of pending async requests */
+ protected Queue<HttpRequest> pendingThreadQueue;
+ /** maximum number of actively communicating threads allowed */
+ protected int maxThreads = 10;
+
+ public HttpConnection(String fullUrl) throws java.net.MalformedURLException {
+ activeThreads = 0;
+ pendingThreadQueue = new ConcurrentLinkedQueue();
+ url = new URL(fullUrl);
+ }
+
+ public int getMaxThreads() {
+ return maxThreads;
+ }
+
+ /**
+ * Set the maximum number of actively communicating threads allowed
+ */
+ public void setMaxThreads(int max) {
+ maxThreads = max;
+ }
+
+ /**
+ * Launches or queues an asynchronous request.
+ *
+ * If the maximum active thread count has not been reached,
+ * start a new thread and use it to send and receive the request.
+ * The response is passed to the request's HttpRequestHandler
+ * onComplete(). After complete, if the number of active threads
+ * is still lower than the max, one request will be pulled (if
+ * present) from the async queue and fired.
+ *
+ * If there are too many active threads, the main request is
+ * pushed onto the async queue for later processing
+ */
+ protected void manageAsyncRequest(final HttpRequest request) {
+
+ if (activeThreads >= maxThreads) {
+ pendingThreadQueue.offer(request);
+ return;
+ }
+
+ activeThreads++;
+
+ //Send the request receive the response, fire off the next
+ //thread if necessary, then pass the result to the handler
+ Runnable r = new Runnable() {
+ public void run() {
+
+ Object response;
+ request.send();
+
+ while ((response = request.recv()) != null) {
+ if (request.handler != null)
+ request.handler.onResponse(request, response);
+ }
+
+ if (request.handler != null)
+ request.handler.onComplete(request);
+
+ activeThreads--;
+
+ if (activeThreads < maxThreads) {
+ try {
+ manageAsyncRequest(pendingThreadQueue.remove());
+ } catch (java.util.NoSuchElementException ex) {
+ // may have been gobbled by another thread
+ }
+ }
+ }
+ };
+
+ new Thread(r).start();
+ }
+}
+
+
--- /dev/null
+package org.opensrf.net.http;
+import org.opensrf.*;
+import org.opensrf.util.*;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.net.HttpURLConnection;
+
+public abstract class HttpRequest {
+
+ protected String service;
+ protected Method method;
+ protected HttpURLConnection urlConn;
+ protected HttpConnection httpConn;
+ protected HttpRequestHandler handler;
+ protected List<Object> responseList;
+ protected Exception failure;
+ protected boolean failed;
+ protected boolean complete;
+
+ public HttpRequest() {
+ failed = false;
+ complete = false;
+ handler = null;
+ urlConn = null;
+ }
+
+ public HttpRequest(HttpConnection conn, String service, Method method) {
+ this();
+ this.httpConn = conn;
+ this.service = service;
+ this.method = method;
+ }
+
+ public void sendAsync(final HttpRequestHandler handler) {
+ this.handler = handler;
+ httpConn.manageAsyncRequest(this);
+ }
+
+ protected void pushResponse(Object response) {
+ if (responseList == null)
+ responseList = new LinkedList<Object>();
+ responseList.add(response);
+ }
+
+ protected List responses() {
+ return responseList;
+ }
+
+ protected Object nextResponse() {
+ if (complete || failed) return null;
+ if (responseList.size() > 0)
+ return responseList.remove(0);
+ return null;
+ }
+
+ public Exception getFailure() {
+ return failure;
+ }
+
+ public abstract HttpRequest send();
+
+ public abstract Object recv();
+}
+
+
--- /dev/null
+package org.opensrf.net.http;
+
+import java.util.List;
+
+/*
+ * Handler for async gateway responses.
+ */
+public abstract class HttpRequestHandler {
+
+ /**
+ * Called when all responses have been received.
+ *
+ * If discardResponses() returns true, will be passed null.
+ */
+ public void onComplete(HttpRequest request) {
+ }
+
+ /**
+ * Called with each response received from the server.
+ *
+ * @param payload the value returned from the server.
+ */
+ public void onResponse(HttpRequest request, Object response) {
+ }
+}