def __init__(self, service):
self.service = service # service name
- self.application = None # the application we're serving
self.max_requests = 0 # max child requests
self.max_children = 0 # max num of child processes
self.min_childen = 0 # min num of child processes
self.num_children = 0 # current num children
- self.child_idx = 0 # current index into the children array
- self.children = [] # list of children
self.osrf_handle = None # xmpp handle
self.routers = [] # list of registered routers
self.keepalive = 0 # how long to wait for subsequent, stateful requests
+ self.active_list = [] # list of active children
+ self.idle_list = [] # list of idle children
# Global status socketpair. All children relay their
# availability info to the parent through this socketpair.
self.read_status.close()
self.write_status.close()
- for child in self.children:
+ for child in self.idle_list + self.active_list:
child.read_data.shutdown(socket.SHUT_RDWR)
child.write_data.shutdown(socket.SHUT_RDWR)
child.read_data.close()
def try_avail_child(self, data):
''' Trys to send current request data to an available child process '''
- ctr = 0
- while ctr < self.num_children:
- if self.child_idx >= self.num_children:
- self.child_idx = 0
- child = self.children[self.child_idx]
+ if len(self.idle_list) == 0:
+ return False
- if child.available:
- osrf.log.log_internal("sending data to available child")
- self.write_child(child, data)
- return True
-
- ctr += 1
- self.child_idx += 1
- return False
+ child = self.idle_list.pop(0) # remove from idle list
+ osrf.log.log_internal("sending data to available child %d" % child.pid)
+ self.write_child(child, data)
+ self.active_list.insert(0, child) # add to active list
+ return True
def try_new_child(self, data):
''' Tries to spawn a new child to send request data to '''
+
if self.num_children < self.max_children:
osrf.log.log_internal("spawning new child to handle data")
- child = self.spawn_child()
+ child = self.spawn_child(True)
self.write_child(child, data)
return True
return False
def try_wait_child(self, data):
''' Waits for a child to become available '''
+
osrf.log.log_warn("No children available, waiting...")
child = self.check_status(True)
self.write_child(child, data)
def write_child(self, child, data):
''' Sends data to the child process '''
- child.available = False
+ # Do we need to watch for sigpipe, etc?
child.write_data.sendall(str(len(data)).rjust(SIZE_PAD) + data)
- self.child_idx += 1
def check_status(self, block=False):
indefinitely for a child to be free. '''
pid = None
- child = None
if block:
pid = self.read_status.recv(SIZE_PAD)
else:
if pid:
pid = int(pid)
- child = [c for c in self.children if c.pid == pid][0]
- child.available = True
+ child = [c for c in self.active_list if c.pid == pid][0]
+ self.active_list.remove(child)
+ self.idle_list.insert(0, child)
+ return child
- return child
+ return None
def reap_children(self, done=False):
''' Uses waitpid() to reap the children. If necessary, new children are spawned '''
+
options = 0
if not done:
options = os.WNOHANG
if not done:
self.spawn_children()
return
+
osrf.log.log_debug("reaping child %d" % pid)
self.num_children -= 1
- self.children = [c for c in self.children if c.pid != pid]
+
+ # locate the child in the active or idle list and remove it
+ # Note: typically, a dead child will be in the active list, since
+ # exiting children do not send a cleanup status to the controller
+
+ child = [c for c in self.active_list if c.pid == pid]
+ if len(child) > 0:
+ self.active_list.remove(child[0])
+ else:
+ child = [c for c in self.idle_list if c.pid == pid]
+ self.idle_list.remove(child[0])
+
except OSError:
return
while self.num_children < self.min_children:
self.spawn_child()
- def spawn_child(self):
+ def spawn_child(self, active=False):
''' Spawns a new child process '''
child = Child(self)
if child.pid:
self.num_children += 1
- self.children.append(child)
+ if active:
+ self.active_list.insert(0, child)
+ else:
+ self.idle_list.insert(0, child)
osrf.log.log_debug("spawned child %d : %d total" % (child.pid, self.num_children))
return child
else:
def register_router(self, target):
''' Registers with a single router '''
+
osrf.log.log_info("registering with router %s" % target)
self.routers.append(target)
def cleanup_routers(self):
''' Un-registers with all connected routers '''
+
for target in self.routers:
osrf.log.log_info("un-registering with router %s" % target)
unreg_msg = osrf.net.NetworkMessage(
self.num_requests = 0 # how many requests we've served so far
self.read_data = None # the child reads data from the controller on this socket
self.write_data = None # the controller sends data to the child on this socket
- self.available = True # true if this child is not currently serving a request
self.pid = 0 # my process id
-
def run(self):
''' Loops, processing data, until max_requests is reached '''
+
while True:
try:
size = int(self.read_data.recv(SIZE_PAD) or 0)
self.send_status()
except KeyboardInterrupt:
pass
+
# run the exit handler
osrf.app.Application.application.child_exit()