/** create a random thread */
long time = new Date().getTime();
Random rand = new Random(time);
- setThread(rand.nextInt()+""+rand.nextInt()+""+time);
+ setThread(rand.nextInt()+""+rand.nextInt()+""+time+Thread.currentThread().getId());
nextId = 0;
requests = new HashMap<Integer, Request>();
Request req = findRequest(msg.getId());
if(req == null) {
/** LOG that we've received a result to a non-existant request */
+ System.err.println(msg.getId() +" has no corresponding request");
return;
}
OSRFObject payload = (OSRFObject) msg.get("payload");
Result result = null;
+ if((result = resultQueue.poll()) != null)
+ return result;
+
if(millis < 0 && !complete) {
/** wait potentially forever for a result to arrive */
- session.waitForMessage(millis);
- if((result = resultQueue.poll()) != null)
- return result;
+ while(!complete) {
+ session.waitForMessage(millis);
+ if((result = resultQueue.poll()) != null)
+ return result;
+ }
} else {
xmsg.setTo(remoteNode);
xmsg.setThread(thread);
xmsg.setBody(new JSONWriter(Arrays.asList(new Message[] {omsg})).write());
- XMPPSession ses = XMPPSession.getGlobalSession();
try {
- XMPPSession.getGlobalSession().send(xmsg);
+ XMPPSession.getThreadSession().send(xmsg);
} catch(XMPPException e) {
connectState = ConnectState.DISCONNECTED;
throw new SessionException("Error sending message to " + remoteNode, e);
public static void waitForMessage(long millis) throws SessionException, MethodException {
try {
Stack.processXMPPMessage(
- XMPPSession.getGlobalSession().recv(millis));
+ XMPPSession.getThreadSession().recv(millis));
} catch(XMPPException e) {
throw new SessionException("Error waiting for message", e);
}
import org.opensrf.util.*;
import org.opensrf.net.xmpp.*;
+import java.util.Random;
+import java.util.Date;
+import java.net.InetAddress;
public class Sys {
public static void bootstrapClient(String configFile, String configContext)
throws ConfigException, SessionException {
+ /** see if the current thread already has a connection */
+ if(XMPPSession.getThreadSession() != null)
+ return;
+
/** create the config parser */
Config config = new Config(configContext);
config.parse(configFile);
String host = (String) config.getFirst("/domains/domain");
int port = config.getInt("/port");
+
+ /** Create a random login resource string */
+ String res = "java_";
try {
+ res += InetAddress.getLocalHost().getHostAddress();
+ } catch(java.net.UnknownHostException e) {}
+ res += "_"+Math.abs(new Random(new Date().getTime()).nextInt())
+ + "_t"+ Thread.currentThread().getId();
+
+
+ try {
+
/** Connect to the Jabber network */
XMPPSession xses = new XMPPSession(host, port);
- xses.connect(username, passwd, "test-java"); /* XXX */
- XMPPSession.setGlobalSession(xses);
+ System.out.println("resource = " + res);
+ xses.connect(username, passwd, res);
+ XMPPSession.setThreadSession(xses);
+
} catch(XMPPException e) {
throw new SessionException("Unable to bootstrap client", e);
}
* Shuts down the connection to the opensrf network
*/
public static void shutdown() {
- XMPPSession.getGlobalSession().disconnect();
+ XMPPSession.getThreadSession().disconnect();
}
}
import java.io.*;
import java.net.Socket;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
/**
public static final String JABBER_DISCONNECT = "</stream:stream>";
+ private static Map threadConnections = new ConcurrentHashMap();
+
/** jabber domain */
private String host;
/** jabber port */
/**
* Returns the global, process-wide session
*/
+ /*
public static XMPPSession getGlobalSession() {
return globalSession;
}
+ */
+
+ public static XMPPSession getThreadSession() {
+ return (XMPPSession) threadConnections.get(new Long(Thread.currentThread().getId()));
+ }
+
+ /**
+ * Sets the given session as the global session for the current thread
+ * @param ses The session
+ */
+ public static void setThreadSession(XMPPSession ses) {
+ /* every time we create a new connection, clean up any dead threads.
+ * this is cheaper than cleaning up the dead threads at every access. */
+ cleanupThreadSessions();
+ threadConnections.put(new Long(Thread.currentThread().getId()), ses);
+ }
+
+ /**
+ * Analyzes the threadSession data to see if there are any sessions
+ * whose controlling thread has gone away.
+ */
+ private static void cleanupThreadSessions() {
+ Thread threads[] = new Thread[Thread.activeCount()];
+ Thread.enumerate(threads);
+ for(Iterator i = threadConnections.keySet().iterator(); i.hasNext(); ) {
+ boolean found = false;
+ Long id = (Long) i.next();
+ for(Thread t : threads) {
+ if(t.getId() == id.longValue()) {
+ found = true;
+ break;
+ }
+ }
+ if(!found)
+ threadConnections.remove(id);
+ }
+ }
/**
* Sets the global, process-wide section
*/
+ /*
public static void setGlobalSession(XMPPSession ses) {
globalSession = ses;
}
+ */
/** true if this session is connected to the server */
} else {
while(timeout >= 0) { /* wait at most 'timeout' milleseconds for a message to arrive */
+ msg = reader.popMessageQueue();
+ if( msg != null ) return msg;
timeout -= reader.waitCoreEvent(timeout);
msg = reader.popMessageQueue();
if( msg != null ) return msg;
}
}
- return null;
+ return reader.popMessageQueue();
}
--- /dev/null
+package org.opensrf.test;
+import org.opensrf.*;
+import org.opensrf.util.*;
+import java.util.Map;
+import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
+import java.io.PrintStream;
+
+/**
+ * Connects to the opensrf network once per thread and runs
+ * and runs a series of request acccross all launched threads.
+ * The purpose is to verify that the java threaded client api
+ * is functioning as expected
+ */
+public class TestThread implements Runnable {
+
+ String args[];
+
+ public TestThread(String args[]) {
+ this.args = args;
+ }
+
+ public void run() {
+
+ try {
+
+ Sys.bootstrapClient(args[0], "/config/opensrf");
+ ClientSession session = new ClientSession(args[3]);
+
+ List params = new ArrayList<Object>();
+ for(int i = 5; i < args.length; i++)
+ params.add(new JSONReader(args[3]).read());
+
+ for(int i = 0; i < Integer.parseInt(args[2]); i++) {
+ System.out.println("thread " + Thread.currentThread().getId()+" sending request " + i);
+ Request request = session.request(args[4], params);
+ Result result = request.recv(3000);
+ if(result != null) {
+ System.out.println("thread " + Thread.currentThread().getId()+
+ " got result JSON: " + new JSONWriter(result.getContent()).write());
+ } else {
+ System.out.println("* thread " + Thread.currentThread().getId()+ " got NO result");
+ }
+ }
+
+ Sys.shutdown();
+ } catch(Exception e) {
+ System.err.println(e);
+ }
+ }
+
+ public static void main(String args[]) throws Exception {
+
+ if(args.length < 5) {
+ System.out.println( "usage: org.opensrf.test.TestClient "+
+ "<osrfConfigFile> <numthreads> <numiter> <service> <method> [<JSONparam1>, <JSONparam2>]");
+ return;
+ }
+
+ int numThreads = Integer.parseInt(args[1]);
+ for(int i = 0; i < numThreads; i++)
+ new Thread(new TestThread(args)).start();
+ }
+}
+
+
+