# -----------------------------------------------------------------------
import os, sys, threading, logging, fcntl, socket, errno, signal, time
-import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app
+import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app, osrf.const
# used to define the size of the PID/size leader in
self.children = [] # list of children
self.osrf_handle = None # xmpp handle
self.routers = [] # list of registered routers
+ self.keepalive = 0 # how long to wait for subsequent, stateful requests
# Global status socketpair. All children relay their
# availability info to the parent through this socketpair.
size = int(self.read_data.recv(SIZE_PAD) or 0)
data = self.read_data.recv(size)
osrf.log.log_internal("recv'd data " + data)
- osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
+ session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
+ self.keepalive_loop(session)
self.num_requests += 1
if self.num_requests == self.controller.max_requests:
break
# run the exit handler
osrf.app.Application.application.child_exit()
+ def keepalive_loop(self, session):
+ keepalive = self.controller.keepalive
+
+ while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
+
+ start = time.time()
+ session.wait(keepalive)
+ end = time.time()
+
+ if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
+ osrf.log.log_internal("client sent disconnect, exiting keepalive")
+ break
+
+ if (end - start) >= keepalive: # exceeded keepalive timeout
+
+ osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive));
+
+ session.send_status(
+ session.thread,
+ osrf.net_obj.NetworkObject.osrfConnectStatus({
+ 'status' : 'Disconnected on timeout',
+ 'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
+ })
+ )
+
+ break
+
+ return
+
def send_status(self):
''' Informs the controller that we are done processing this request '''
fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
self.thread = None
self.service = None
+
@staticmethod
def find_or_create(thread):
if thread in Session.session_cache:
# cache this session in the global session cache
Session.session_cache[self.thread] = self
+
def reset_request_timeout(self, rid):
req = self.find_request(rid)
if req:
def __init__(self, thread):
Session.__init__(self)
self.thread = thread
+ Session.session_cache[thread] = self
def send_status(self, thread_trace, payload):
self.send(
if isinstance(ses, osrf.ses.ServerSession):
osrf.log.log_info("Message processing duration %f" % duration)
+ return ses
+
def handle_message(session, message):
osrf.log.log_internal("handle_message(): processing message of "
if message.type() == osrf.const.OSRF_MESSAGE_TYPE_CONNECT:
osrf.log.log_debug("server received CONNECT from %s" % session.remote_id)
- session.state == osrf.const.OSRF_APP_SESSION_CONNECTED
+ session.state = osrf.const.OSRF_APP_SESSION_CONNECTED
session.send_connect_ok(message.threadTrace())
return