added multi-threaded client support to the opensrf network/xmpp layer
authorerickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Fri, 17 Aug 2007 21:57:16 +0000 (21:57 +0000)
committererickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Fri, 17 Aug 2007 21:57:16 +0000 (21:57 +0000)
this is all managed below the covers so that clients can continue to safely
use bootstrapClient and will all "just work"

we now allow 1 xmpp connection per thread, as opposed to 1 per process.

added a test module

git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1079 9efc2488-bf62-4759-914b-345cdb29e865

src/java/org/opensrf/ClientSession.java
src/java/org/opensrf/Request.java
src/java/org/opensrf/Session.java
src/java/org/opensrf/Sys.java
src/java/org/opensrf/net/xmpp/XMPPSession.java
src/java/org/opensrf/test/TestThread.java [new file with mode: 0644]

index cc68505..e36ef07 100644 (file)
@@ -51,7 +51,7 @@ public class ClientSession extends Session {
         /** create a random thread */
         long time = new Date().getTime();
         Random rand = new Random(time);
-        setThread(rand.nextInt()+""+rand.nextInt()+""+time);
+        setThread(rand.nextInt()+""+rand.nextInt()+""+time+Thread.currentThread().getId());
 
         nextId = 0;
         requests = new HashMap<Integer, Request>();
@@ -115,6 +115,7 @@ public class ClientSession extends Session {
         Request req = findRequest(msg.getId());
         if(req == null) {
             /** LOG that we've received a result to a non-existant request */
+            System.err.println(msg.getId() +" has no corresponding request");
             return;
         }
         OSRFObject payload = (OSRFObject) msg.get("payload");
index e5d0ef6..8fe537f 100644 (file)
@@ -73,11 +73,16 @@ public class Request {
 
         Result result = null;
 
+        if((result = resultQueue.poll()) != null)
+            return result;
+
         if(millis < 0 && !complete) {
             /** wait potentially forever for a result to arrive */
-            session.waitForMessage(millis);
-            if((result = resultQueue.poll()) != null)
-                return result;
+            while(!complete) {
+                session.waitForMessage(millis);
+                if((result = resultQueue.poll()) != null)
+                    return result;
+            }
 
         } else {
 
index e2b41c3..f5045b3 100644 (file)
@@ -45,10 +45,9 @@ public abstract class Session {
         xmsg.setTo(remoteNode);
         xmsg.setThread(thread);
         xmsg.setBody(new JSONWriter(Arrays.asList(new Message[] {omsg})).write());
-        XMPPSession ses = XMPPSession.getGlobalSession();
 
         try {
-            XMPPSession.getGlobalSession().send(xmsg);
+            XMPPSession.getThreadSession().send(xmsg);
         } catch(XMPPException e) {
             connectState = ConnectState.DISCONNECTED;
             throw new SessionException("Error sending message to " + remoteNode, e);
@@ -63,7 +62,7 @@ public abstract class Session {
     public static void waitForMessage(long millis) throws SessionException, MethodException {
         try {
             Stack.processXMPPMessage(
-                XMPPSession.getGlobalSession().recv(millis));
+                XMPPSession.getThreadSession().recv(millis));
         } catch(XMPPException e) {
             throw new SessionException("Error waiting for message", e);
         }
index 8520838..85fb118 100644 (file)
@@ -2,6 +2,9 @@ package org.opensrf;
 
 import org.opensrf.util.*;
 import org.opensrf.net.xmpp.*;
+import java.util.Random;
+import java.util.Date;
+import java.net.InetAddress;
 
 
 public class Sys {
@@ -16,6 +19,10 @@ public class Sys {
     public static void bootstrapClient(String configFile, String configContext) 
             throws ConfigException, SessionException  {
 
+        /** see if the current thread already has a connection */
+        if(XMPPSession.getThreadSession() != null)
+            return;
+
         /** create the config parser */
         Config config = new Config(configContext);
         config.parse(configFile);
@@ -27,11 +34,24 @@ public class Sys {
         String host = (String) config.getFirst("/domains/domain");
         int port = config.getInt("/port");
 
+
+        /** Create a random login resource string */
+        String res = "java_";
         try {
+            res += InetAddress.getLocalHost().getHostAddress();
+        } catch(java.net.UnknownHostException e) {}
+        res += "_"+Math.abs(new Random(new Date().getTime()).nextInt()) 
+            + "_t"+ Thread.currentThread().getId();
+
+
+        try {
+
             /** Connect to the Jabber network */
             XMPPSession xses = new XMPPSession(host, port);
-            xses.connect(username, passwd, "test-java"); /* XXX */
-            XMPPSession.setGlobalSession(xses);
+            System.out.println("resource = " + res);
+            xses.connect(username, passwd, res);
+            XMPPSession.setThreadSession(xses);
+
         } catch(XMPPException e) {
             throw new SessionException("Unable to bootstrap client", e);
         }
@@ -41,7 +61,7 @@ public class Sys {
      * Shuts down the connection to the opensrf network
      */
     public static void shutdown() {
-        XMPPSession.getGlobalSession().disconnect();
+        XMPPSession.getThreadSession().disconnect();
     }
 }
 
index 11e0c7d..cc2a9f6 100644 (file)
@@ -2,6 +2,9 @@ package org.opensrf.net.xmpp;
 
 import java.io.*;
 import java.net.Socket;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
@@ -21,6 +24,8 @@ public class XMPPSession {
 
     public static final String JABBER_DISCONNECT = "</stream:stream>";
 
+    private static Map threadConnections = new ConcurrentHashMap();
+
     /** jabber domain */
     private String host;
     /** jabber port */
@@ -59,16 +64,56 @@ public class XMPPSession {
     /**
      * Returns the global, process-wide session
      */
+    /*
     public static XMPPSession getGlobalSession() {
         return globalSession;
     }
+    */
+
+    public static XMPPSession getThreadSession() {
+        return (XMPPSession) threadConnections.get(new Long(Thread.currentThread().getId()));
+    }
+
+    /**
+     * Sets the given session as the global session for the current thread
+     * @param ses The session
+     */
+    public static void setThreadSession(XMPPSession ses) {
+        /* every time we create a new connection, clean up any dead threads. 
+         * this is cheaper than cleaning up the dead threads at every access. */
+        cleanupThreadSessions();
+        threadConnections.put(new Long(Thread.currentThread().getId()), ses);
+    }
+
+    /**
+     * Analyzes the threadSession data to see if there are any sessions
+     * whose controlling thread has gone away.  
+     */
+    private static void cleanupThreadSessions() {
+        Thread threads[] = new Thread[Thread.activeCount()]; 
+        Thread.enumerate(threads);
+        for(Iterator i = threadConnections.keySet().iterator(); i.hasNext(); ) {
+            boolean found = false;
+            Long id = (Long) i.next();
+            for(Thread t : threads) {
+                if(t.getId() == id.longValue()) {
+                    found = true;
+                    break;
+                }
+            }
+            if(!found) 
+                threadConnections.remove(id);
+        }
+    }
 
     /**
      * Sets the global, process-wide section
      */
+    /*
     public static void setGlobalSession(XMPPSession ses) {
         globalSession = ses;
     }
+    */
 
 
     /** true if this session is connected to the server */
@@ -190,6 +235,8 @@ public class XMPPSession {
         } else {
 
             while(timeout >= 0) { /* wait at most 'timeout' milleseconds for a message to arrive */
+                msg = reader.popMessageQueue();
+                if( msg != null ) return msg;
                 timeout -= reader.waitCoreEvent(timeout);
                 msg = reader.popMessageQueue();
                 if( msg != null ) return msg;
@@ -197,7 +244,7 @@ public class XMPPSession {
             }
         }
 
-        return null;
+        return reader.popMessageQueue();
     }
 
 
diff --git a/src/java/org/opensrf/test/TestThread.java b/src/java/org/opensrf/test/TestThread.java
new file mode 100644 (file)
index 0000000..bb4cf06
--- /dev/null
@@ -0,0 +1,68 @@
+package org.opensrf.test;
+import org.opensrf.*;
+import org.opensrf.util.*;
+import java.util.Map;
+import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
+import java.io.PrintStream;
+
+/**
+ * Connects to the opensrf network once per thread and runs
+ * and runs a series of request acccross all launched threads.
+ * The purpose is to verify that the java threaded client api 
+ * is functioning as expected
+ */
+public class TestThread implements Runnable {
+
+    String args[];
+
+    public TestThread(String args[]) {
+        this.args = args;
+    }
+
+    public void run() {
+
+        try {
+
+            Sys.bootstrapClient(args[0], "/config/opensrf");
+            ClientSession session = new ClientSession(args[3]);
+    
+            List params = new ArrayList<Object>();
+            for(int i = 5; i < args.length; i++) 
+                params.add(new JSONReader(args[3]).read());
+    
+            for(int i = 0; i < Integer.parseInt(args[2]); i++) {
+                System.out.println("thread " + Thread.currentThread().getId()+" sending request " + i);
+                Request request = session.request(args[4], params);
+                Result result = request.recv(3000);
+                if(result != null) {
+                    System.out.println("thread " + Thread.currentThread().getId()+ 
+                        " got result JSON: " + new JSONWriter(result.getContent()).write());
+                } else {
+                    System.out.println("* thread " + Thread.currentThread().getId()+ " got NO result");
+                }
+            }
+    
+            Sys.shutdown();
+        } catch(Exception e) {
+            System.err.println(e);
+        }
+    }
+
+    public static void main(String args[]) throws Exception {
+
+        if(args.length < 5) {
+            System.out.println( "usage: org.opensrf.test.TestClient "+
+                "<osrfConfigFile> <numthreads> <numiter> <service> <method> [<JSONparam1>, <JSONparam2>]");
+            return;
+        }
+
+        int numThreads = Integer.parseInt(args[1]);
+        for(int i = 0; i < numThreads; i++) 
+            new Thread(new TestThread(args)).start();
+    }
+}
+
+
+