Add response chunking support to the Perl implementation of OpenSRF
authormiker <miker@9efc2488-bf62-4759-914b-345cdb29e865>
Thu, 29 Jul 2010 17:40:03 +0000 (17:40 +0000)
committermiker <miker@9efc2488-bf62-4759-914b-345cdb29e865>
Thu, 29 Jul 2010 17:40:03 +0000 (17:40 +0000)
Two new optional paramters to register_method are now supported:
 * max_chunk_size
 * max_chunk_count

OpenSRF has always supported message bundling, but only respond_complete made
use of this fact by sending the final result message and the completion status
message in the same XMPP envelope.  Now, on a per method basis, RESULT messages
can be bundled (cached) until one of three conditions occurs:

 * The size of the JSON of the RESULT messages matches or exceeds max_chunk_size
 * The number of RESULT messages cached matches or exceeds max_chunk_count
 * respond_complete is called (which happens implicitly by returning from a method)

Because the overhead of sending multiple XMPP messages far outweighs the caching
and cache management costs of chunking, the default for max_chunk_size is set
at 10240 bytes (10k).  The default for max_chunk_count is 0.  To turn off chunking
completely, set the max_chunk_size register_method parameter to 0.

git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1985 9efc2488-bf62-4759-914b-345cdb29e865

src/perl/lib/OpenSRF/AppSession.pm
src/perl/lib/OpenSRF/Application.pm

index 851f02c..d450159 100644 (file)
@@ -826,6 +826,12 @@ sub new {
                        threadTrace             => $threadTrace,
                        payload                 => $payload,
                        complete                => 0,
+                       resp_count              => 0,
+                       max_chunk_count         => 0,
+                       current_chunk_count     => 0,
+                       max_chunk_size          => 0,
+                       current_chunk_size      => 0,
+                       current_chunk           => [],
                        timeout_reset           => 0,
                        recv_timeout            => 30,
                        remaining_recv_timeout  => 30,
@@ -839,6 +845,20 @@ sub new {
        return $self;
 }
 
+sub max_chunk_count {
+       my $self = shift;
+       my $value = shift;
+       $self->{max_chunk_count} = $value if (defined($value));
+       return $self->{max_chunk_count};
+}
+
+sub max_chunk_size {
+       my $self = shift;
+       my $value = shift;
+       $self->{max_chunk_size} = $value if (defined($value));
+       return $self->{max_chunk_size};
+}
+
 sub recv_timeout {
        my $self = shift;
        my $timeout = shift;
@@ -969,6 +989,26 @@ sub respond {
                $response->content($msg);
        }
 
+    if ($self->{max_chunk_count} > 0 or $self->{max_chunk_size} > 0) { # we are chunking, and we need to test the size or count
+
+        $self->{current_chunk_size} += OpenSRF::Utils::JSON->perl2JSON($response);
+        push @{$self->{current_chunk}}, $response;  
+        $self->{current_chunk_count}++;
+
+        if (
+                ($self->{max_chunk_size}  && $self->{current_chunk_size}  >= $self->{max_chunk_size} ) ||
+                ($self->{max_chunk_count} && $self->{current_chunk_count} >= $self->{max_chunk_count})
+        ) { # send chunk and reset
+            my $send_res = $self->session->send(( map { ('RESULT', $_) } @{$self->{current_chunk}} ), $self->threadTrace);
+            $self->{current_chunk} = [];
+            $self->{current_chunk_size} = 0;
+            $self->{current_chunk_count} = 0;
+            return $send_res;
+        } else { # not at a chunk yet, just queue it up
+            return $self->session->app_request( $self->threadTrace );
+        }
+    }
+
        $self->session->send('RESULT', $response, $self->threadTrace);
 }
 
@@ -985,12 +1025,14 @@ sub respond_complete {
                $response->content($msg);
        }
 
+    push @{$self->{current_chunk}}, $response;
+
        my $stat = OpenSRF::DomainObject::oilsConnectStatus->new(
                statusCode => STATUS_COMPLETE(),
                status => 'Request Complete' );
 
 
-       $self->session->send( 'RESULT' => $response, 'STATUS' => $stat, $self->threadTrace);
+       $self->session->send( ( map { ('RESULT', $_) } @{$self->{current_chunk}} ), 'STATUS' => $stat, $self->threadTrace);
        $self->complete(1);
 }
 
index 0329a02..d1e3d8f 100644 (file)
@@ -48,6 +48,19 @@ sub argc {
        return $self->{argc};
 }
 
+sub max_chunk_size {
+       my $self = shift;
+       return 0 unless ref($self);
+       return $self->{max_chunk_size} if (defined($self->{max_chunk_size}));
+       return 10240;
+}
+
+sub max_chunk_count {
+       my $self = shift;
+       return 0 unless ref($self);
+       return $self->{max_chunk_count} || 0;
+}
+
 sub api_name {
        my $self = shift;
        return 1 unless ref($self);
@@ -138,6 +151,8 @@ sub handler {
                if (ref $coderef) {
                        my @args = $app_msg->params;
                        my $appreq = OpenSRF::AppRequest->new( $session );
+                       $appreq->max_chunk_size( $coderef->max_chunk_size );
+                       $appreq->max_chunk_count( $coderef->max_chunk_count );
 
                        $log->debug( "in_request = $in_request : [" . $appreq->threadTrace."]", INTERNAL );
                        if( $in_request ) {