import java.util.logging.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.nio.ByteBuffer;
-import javax.json.*;
import java.io.IOException;
+import org.json.*;
+
public class MessageIO {
- private LinkedBlockingQueue<JsonValue> inQueue;
- private LinkedBlockingQueue<JsonValue> outQueue;
+ private LinkedBlockingQueue<JSONObject> inQueue;
+ private LinkedBlockingQueue<JSONObject> outQueue;
//private static Logger logger = Logger.getLogger("org.evergreen_ils.hatch");
private static Logger logger = Hatch.getLogger();
private MessageWriter writer;
public MessageIO() {
- inQueue = new LinkedBlockingQueue<JsonValue>();
- outQueue = new LinkedBlockingQueue<JsonValue>();
+ inQueue = new LinkedBlockingQueue<JSONObject>();
+ outQueue = new LinkedBlockingQueue<JSONObject>();
reader = new MessageReader();
writer = new MessageWriter();
}
return
(bytes[3] << 24) & 0xff000000
| (bytes[2] << 16) & 0x00ff0000
- | (bytes[1] << 8) & 0x0000ff00
- | (bytes[0] << 0) & 0x000000ff;
+ | (bytes[1] << 8) & 0x0000ff00
+ | (bytes[0] << 0) & 0x000000ff;
}
private String readOneMessage() throws EndOfStreamException, IOException {
while (true) {
+ String message = "";
+ JSONObject jsonMsg = null;
+
try {
- String message = readOneMessage();
+ message = readOneMessage();
+ jsonMsg = new JSONObject(message);
} catch (EndOfStreamException eose) {
- logger.warning("STDIN closed. MessageReader thread exiting");
- return;
+ logger.warning("STDIN closed... exiting");
+ System.exit(1);
} catch (IOException ioe) {
- logger.warning(ioe);
+ logger.warning(ioe.toString());
+
+ } catch (JSONException je) {
+
+ logger.warning("Error parsing JSON message on STDIN " +
+ je.toString() + " : " + message);
+
+ continue;
}
- // TODO: convert to JSON
- // TODO: push onto inQueue
+ inQueue.offer(jsonMsg);
+
+ logger.info("inQueue contains " + inQueue.size() + " messages");
}
-
}
}
return bytes;
}
-
public void writeOneMessage(String message) throws IOException {
-
System.out.write(intToBytes(message.length()));
System.out.write(message.getBytes("UTF-8"));
System.out.flush();
public void run() {
+ while (true) {
+ logger.info("MessageWriter waiting for outQueue message");
+
+ try {
+
+ // take() blocks the thread until a message is available
+ JSONObject jsonMsg = outQueue.take();
+
+ writeOneMessage(jsonMsg.toString());
+
+ } catch (InterruptedException e) {
+ // interrupted, go back and listen
+ continue;
+ } catch (IOException ioe) {
+ logger.warning(
+ "Error writing message to STDOUT: " + ioe.toString());
+ }
+ }
}
}
-
}
+