added final bits for stateful sessions (drone keepalive)
authorerickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Tue, 19 May 2009 14:12:40 +0000 (14:12 +0000)
committererickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Tue, 19 May 2009 14:12:40 +0000 (14:12 +0000)
git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1703 9efc2488-bf62-4759-914b-345cdb29e865

src/python/osrf/server.py
src/python/osrf/ses.py
src/python/osrf/stack.py

index 10f4d66..6f2307d 100644 (file)
@@ -19,7 +19,7 @@
 # -----------------------------------------------------------------------
 
 import os, sys, threading, logging, fcntl, socket, errno, signal, time
-import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app
+import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app, osrf.const
 
 
 # used to define the size of the PID/size leader in 
@@ -42,6 +42,7 @@ class Controller(object):
         self.children = [] # list of children
         self.osrf_handle = None # xmpp handle
         self.routers = [] # list of registered routers
+        self.keepalive = 0 # how long to wait for subsequent, stateful requests
 
         # Global status socketpair.  All children relay their 
         # availability info to the parent through this socketpair. 
@@ -296,7 +297,8 @@ class Child(object):
                 size = int(self.read_data.recv(SIZE_PAD) or 0)
                 data = self.read_data.recv(size)
                 osrf.log.log_internal("recv'd data " + data)
-                osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
+                session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
+                self.keepalive_loop(session)
                 self.num_requests += 1
                 if self.num_requests == self.controller.max_requests:
                     break
@@ -306,6 +308,35 @@ class Child(object):
         # run the exit handler
         osrf.app.Application.application.child_exit()
 
+    def keepalive_loop(self, session):
+        keepalive = self.controller.keepalive
+
+        while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
+
+            start = time.time()
+            session.wait(keepalive)
+            end = time.time()
+
+            if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
+                osrf.log.log_internal("client sent disconnect, exiting keepalive")
+                break
+
+            if (end - start) >= keepalive: # exceeded keepalive timeout
+
+                osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive));
+
+                session.send_status(
+                    session.thread, 
+                    osrf.net_obj.NetworkObject.osrfConnectStatus({   
+                        'status' : 'Disconnected on timeout',
+                        'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
+                    })
+                )
+
+                break
+
+        return
+
     def send_status(self):
         ''' Informs the controller that we are done processing this request '''
         fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
index 63c3a35..09182a0 100644 (file)
@@ -46,6 +46,7 @@ class Session(object):
         self.thread = None
         self.service = None
 
+
     @staticmethod
     def find_or_create(thread):
         if thread in Session.session_cache:
@@ -114,6 +115,7 @@ class ClientSession(Session):
         # cache this session in the global session cache
         Session.session_cache[self.thread] = self
 
+
     def reset_request_timeout(self, rid):
         req = self.find_request(rid)
         if req:
@@ -334,6 +336,7 @@ class ServerSession(Session):
     def __init__(self, thread):
         Session.__init__(self)
         self.thread = thread
+        Session.session_cache[thread] = self
 
     def send_status(self, thread_trace, payload):
         self.send(
index 3426ada..417b431 100644 (file)
@@ -36,6 +36,8 @@ def push(net_msg):
     if isinstance(ses, osrf.ses.ServerSession):
         osrf.log.log_info("Message processing duration %f" % duration)
 
+    return ses
+
 def handle_message(session, message):
 
     osrf.log.log_internal("handle_message(): processing message of "
@@ -104,7 +106,7 @@ def handle_server(session, message):
 
     if message.type() == osrf.const.OSRF_MESSAGE_TYPE_CONNECT:
         osrf.log.log_debug("server received CONNECT from %s" % session.remote_id)
-        session.state == osrf.const.OSRF_APP_SESSION_CONNECTED 
+        session.state = osrf.const.OSRF_APP_SESSION_CONNECTED 
         session.send_connect_ok(message.threadTrace())
         return