+"""
+Implements an OpenSRF forking request server
+"""
# -----------------------------------------------------------------------
# Copyright (C) 2008-2010 Equinox Software, Inc.
# Bill Erickson <erickson@esilibrary.com>
# 02110-1301, USA
# -----------------------------------------------------------------------
-import os, sys, threading, fcntl, socket, errno, signal, time
-import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app, osrf.const
+import os, sys, fcntl, socket, errno, signal, time
+import osrf.log, osrf.conf, osrf.net, osrf.system
+import osrf.stack, osrf.app, osrf.const
# used to define the size of the PID/size leader in
'''
def __init__(self, service):
+ '''Initialize the Controller object'''
self.service = service # service name
self.max_requests = 0 # max child requests
self.max_children = 0 # max num of child processes
self.read_status, self.write_status = socket.socketpair()
self.read_status.setblocking(0)
- def load_app(self):
- settings = osrf.set.get('activeapps.%s' % self.service)
-
-
def cleanup(self):
''' Closes management sockets, kills children, reaps children, exits '''
def handle_signals(self):
''' Installs SIGINT and SIGTERM handlers '''
+
def handler(signum, frame):
+ ''' Handler implementation '''
self.cleanup()
+
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
def run(self):
+ ''' Run the OpenSRF service, spawning and reaping children '''
osrf.net.get_network_handle().disconnect()
osrf.net.clear_network_handle()
self.spawn_children()
self.handle_signals()
- time.sleep(.5) # give children a chance to connect before we start taking data
+ # give children a chance to connect before we start taking data
+ time.sleep(.5)
self.osrf_handle = osrf.system.System.net_connect(
resource = '%s_listener' % self.service,
service = self.service
)
- # clear the recv callback so inbound messages do not filter through the opensrf stack
+ # clear the recv callback so inbound messages do not filter
+ # through the opensrf stack
self.osrf_handle.receive_callback = None
# connect to our listening routers
if len(self.idle_list) > 0:
child = self.idle_list.pop()
self.active_list.append(child)
- osrf.log.log_internal("server: sending data to available child %d" % child.pid)
+ osrf.log.log_internal(
+ "server: sending data to available child %d" % child.pid
+ )
elif self.num_children < self.max_children:
child = self.spawn_child(True)
- osrf.log.log_internal("server: sending data to new child %d" % child.pid)
+ osrf.log.log_internal(
+ "server: sending data to new child %d" % child.pid
+ )
else:
- osrf.log.log_warn("server: no children available, waiting...")
+ osrf.log.log_warn("server: no children available, \
+waiting... consider increasing max_children for this application higher than \
+%d in the OpenSRF configuration if this message occurs frequently" \
+ % self.max_children)
child = self.check_status(True)
self.write_child(child, data)
except KeyboardInterrupt:
osrf.log.log_info("server: exiting with keyboard interrupt")
- except Exception, e:
- osrf.log.log_error("server: exiting with exception: %s" % e.message)
+ except Exception, exc:
+ osrf.log.log_error(
+ "server: exiting with exception: %s" % exc.message
+ )
finally:
self.cleanup()
try:
child.write_data.sendall(data)
- except Exception, e:
- osrf.log.log_error("server: error sending data to child %d: %s" % (child.pid, str(e)))
+ except Exception, ex:
+ osrf.log.log_error(
+ "server: error sending data to child %d: %s"
+ % (child.pid, str(ex))
+ )
self.cleanup_child(child.pid, True)
return False
try:
pid = self.read_status.recv(SIZE_PAD)
- except socket.error, e:
- if e.args[0] == errno.EAGAIN:
+ except socket.error, exc:
+ if exc.args[0] == errno.EAGAIN:
break # no data left to read in nonblocking mode
- osrf.log.log_error("server: child status check failed: %s" % str(e))
+ osrf.log.log_error(
+ "server: child status check failed: %s" % str(exc)
+ )
if not wait or ret_child:
break
if pid:
child = self.pid_map[int(pid)]
- osrf.log.log_internal("server: child process %d reporting for duty" % child.pid)
+ osrf.log.log_internal(
+ "server: child process %d reporting for duty" % child.pid
+ )
if wait and ret_child is None:
- # caller is waiting for a free child, leave it in the active list
+ # caller is waiting for a free child;
+ # leave it in the active list
ret_child = child
else:
self.active_list.remove(child)
def reap_children(self, done=False):
- ''' Uses waitpid() to reap the children. If necessary, new children are spawned '''
+ '''
+ Uses waitpid() to reap the children. If necessary, spawns new children.
+ '''
options = 0
if not done:
return
def cleanup_child(self, pid, kill=False):
+ '''
+ Removes the child from the active or idle list.
+
+ Kills the process if requested.
+ '''
if kill:
os.kill(pid, signal.SIGKILL)
child = Child(self)
child.read_data, child.write_data = socket.socketpair()
child.pid = os.fork()
+ sys.stdin.close()
+ sys.stdin = open(os.devnull, 'r')
+ sys.stdout.close()
+ sys.stdout = open(os.devnull, 'w')
+ sys.stderr.close()
+ sys.stderr = open(os.devnull, 'w')
+
- if child.pid: # parent process
+ if child.pid:
self.num_children += 1
self.pid_map[child.pid] = child
if active:
self.active_list.append(child)
else:
self.idle_list.append(child)
- osrf.log.log_internal("server: %s spawned child %d : %d total" % (self.service, child.pid, self.num_children))
+ osrf.log.log_internal(
+ "server: %s spawned child %d : %d total"
+ % (self.service, child.pid, self.num_children)
+ )
return child
else:
child.pid = os.getpid()
''' Models a single child process '''
def __init__(self, controller):
- self.controller = controller # our Controller object
- self.num_requests = 0 # how many requests we've served so far
- self.read_data = None # the child reads data from the controller on this socket
- self.write_data = None # the controller sends data to the child on this socket
- self.pid = 0 # my process id
+ ''' Initializes child process instance '''
+
+ # our Controller object
+ self.controller = controller
+
+ # how many requests we've served so far
+ self.num_requests = 0
+
+ # the child reads data from the controller on this socket
+ self.read_data = None
+
+ # the controller sends data to the child on this socket
+ self.write_data = None
+
+ # my process id
+ self.pid = 0
def run(self):
''' Loops, processing data, until max_requests is reached '''
buf = None
try:
buf = self.read_data.recv(2048)
- except socket.error, e:
- if e.args[0] == errno.EAGAIN:
+ except socket.error, exc:
+ if exc.args[0] == errno.EAGAIN:
break
- osrf.log.log_error("server: child data read failed: %s" % str(e))
+ osrf.log.log_error(
+ "server: child data read failed: %s" % str(exc)
+ )
osrf.app.Application.application.child_exit()
return
osrf.log.log_internal("server: child received message: " + data)
osrf.net.get_network_handle().flush_inbound_data()
- session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
+ session = osrf.stack.push(
+ osrf.net.NetworkMessage.from_xml(data)
+ )
self.keepalive_loop(session)
self.num_requests += 1
osrf.app.Application.application.child_exit()
def keepalive_loop(self, session):
+ '''
+ Keeps session alive while client is connected.
+
+ If timeout occurs, session disconnects and gets cleaned up.
+ '''
keepalive = self.controller.keepalive
while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
status = session.wait(keepalive)
if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
- osrf.log.log_internal("server: client sent disconnect, exiting keepalive")
+ osrf.log.log_internal(
+ "server: client sent disconnect, exiting keepalive"
+ )
break
- if status is None: # no msg received before keepalive timeout expired
+ # if no msg received before keepalive timeout expired
+ if status is None:
osrf.log.log_info(
- "server: no request was received in %d seconds from %s, exiting stateful session" % (
- session.remote_id, int(keepalive)));
+ "server: no request was received in %d seconds from %s, "
+ "exiting stateful session"
+ % (session.remote_id, int(keepalive))
+ )
session.send_status(
session.thread,