req_method = osrf_msg.payload()
params = req_method.params() or []
- method = Application.methods[req_method.method()]
+ method_name = req_method.method()
+ method = Application.methods.get(method_name)
+
+ if method is None:
+ session.send_method_not_found(osrf_msg.threadTrace(), method_name)
+ return
+
handler = method.get_func()
param_json = osrf.json.to_json(params)
idx -= 1
# ---------------------------------------------------------
+ # Session data test
+ # ---------------------------------------------------------
Application.register_method(
api_name = 'opensrf.stateful_session_test',
return c
# ---------------------------------------------------------
+ # Session callbacks test
+ # ---------------------------------------------------------
+ Application.register_method(
+ api_name = 'opensrf.session_callback_test',
+ method = 'callback_test',
+ argc = 0
+ )
+
+ def callback_test(self, request):
+
+ def pre_req_cb(ses):
+ osrf.log.log_info("running pre_request callback")
+
+ def post_req_cb(ses):
+ osrf.log.log_info("running post_request callback")
+
+ def disconnect_cb(ses):
+ osrf.log.log_info("running disconnect callback")
+
+ def death_cb(ses):
+ osrf.log.log_info("running death callback")
+
+ ses = request.session
+
+ ses.register_callback('pre_request', pre_req_cb)
+ ses.register_callback('post_request', post_req_cb)
+ ses.register_callback('disconnect', disconnect_cb)
+ ses.register_callback('death', death_cb)
+
+ c = ses.session_data.get('counter', 0) + 1
+ ses.session_data['counter'] = c
+ return c
+
+
+ # ---------------------------------------------------------
# These example methods override methods from
# osrf.app.Application. They are not required.
# ---------------------------------------------------------
handle = THREAD_SESSIONS.get(threading.currentThread().getName())
if handle:
osrf.log.log_internal("clearing network handle %s" % handle.jid.as_utf8())
- #handle.disconnect()
del THREAD_SESSIONS[threading.currentThread().getName()]
return handle
return msg
+ def flush_inbound_data(self):
+ ''' Read all pending inbound messages from the socket and discard them '''
+ cb = self.receive_callback
+ self.receive_callback = None
+ while self.recv(0): pass
+ self.receive_callback = cb
+
+
+
size = int(self.read_data.recv(SIZE_PAD) or 0)
data = self.read_data.recv(size)
osrf.log.log_internal("recv'd data " + data)
+ osrf.net.get_network_handle().flush_inbound_data()
session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
self.keepalive_loop(session)
self.num_requests += 1
while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
- start = time.time()
- session.wait(keepalive)
- end = time.time()
+ status = session.wait(keepalive)
if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
osrf.log.log_internal("client sent disconnect, exiting keepalive")
break
- if (end - start) >= keepalive: # exceeded keepalive timeout
+ if status is None: # no msg received before keepalive timeout expired
osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive));
break
+ session.run_callback('death')
return
def send_status(self):
"""Wait up to <timeout> seconds for data to arrive on the network"""
osrf.log.log_internal("Session.wait(%d)" % timeout)
handle = osrf.net.get_network_handle()
- handle.recv(timeout)
+ return handle.recv(timeout)
def send(self, omessages):
"""Sends an OpenSRF message"""
})
self.send_status(thread_trace, status_msg)
+ def send_method_not_found(self, thread_trace, method_name):
+ status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({
+ 'status' : 'Method [%s] not found for %s' % (method_name, self.service),
+ 'statusCode': osrf.const.OSRF_STATUS_NOTFOUND
+ })
+ self.send_status(thread_trace, status_msg)
+
+
+ def run_callback(self, type):
+ if type in self.callbacks:
+ self.callbacks[type](self)
+
+ def register_callback(self, type, func):
+ self.callbacks[type] = func
+
def cleanup(self):
Session.cleanup(self)
- if 'death' in self.callbacks:
- self.callbacks['death'](self)
+ self.run_callbacks('death')
class ServerRequest(Request):
if message.type() == osrf.const.OSRF_MESSAGE_TYPE_REQUEST:
osrf.log.log_debug("server received REQUEST from %s" % session.remote_id)
+ session.run_callback('pre_request')
osrf.app.Application.handle_request(session, message)
+ session.run_callback('post_request')
return
if message.type() == osrf.const.OSRF_MESSAGE_TYPE_CONNECT:
if message.type() == osrf.const.OSRF_MESSAGE_TYPE_DISCONNECT:
osrf.log.log_debug("server received DISCONNECT from %s" % session.remote_id)
session.state = osrf.const.OSRF_APP_SESSION_DISCONNECTED
+ session.run_callback('disconnect')
return
if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS:
- # Should never get here
- osrf.log.log_warn("server received STATUS from %s" % session.remote_id)
+ osrf.log.log_debug("server ignoring STATUS from %s" % session.remote_id)
+ return
+
+ if message.type() == osrf.const.OSRF_MESSAGE_TYPE_RESULT:
+ osrf.log.log_debug("server ignoring RESULT from %s" % session.remote_id)
return