Rename "chunking" to "bundling"; Implement message splitting ("chunking") in Perl...
authorMike Rylander <mrylander@gmail.com>
Sun, 23 Feb 2014 19:51:13 +0000 (14:51 -0500)
committerMike Rylander <mrylander@gmail.com>
Thu, 21 Jul 2016 15:40:04 +0000 (11:40 -0400)
Signed-off-by: Mike Rylander <mrylander@gmail.com>
src/perl/lib/OpenSRF/AppSession.pm
src/perl/lib/OpenSRF/Application.pm
src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm

index 9e371c7..7454970 100644 (file)
@@ -858,15 +858,17 @@ sub new {
                        payload                 => $payload,
                        complete                => 0,
                        resp_count              => 0,
-                       max_chunk_count         => 0,
-                       current_chunk_count     => 0,
+                       max_bundle_count        => 0,
+                       current_bundle_count=> 0,
                        max_chunk_size          => 0,
-                       current_chunk_size      => 0,
-                       current_chunk           => [],
+                       max_bundle_size         => 0,
+                       current_bundle_size     => 0,
+                       current_bundle          => [],
                        timeout_reset           => 0,
                        recv_timeout            => 30,
                        remaining_recv_timeout  => 30,
                        recv_queue              => [],
+                       part_recv_buffer=> '',
        };
 
        bless $self => $class;
@@ -876,18 +878,18 @@ sub new {
        return $self;
 }
 
-sub max_chunk_count {
+sub max_bundle_count {
        my $self = shift;
        my $value = shift;
-       $self->{max_chunk_count} = $value if (defined($value));
-       return $self->{max_chunk_count};
+       $self->{max_bundle_count} = $value if (defined($value));
+       return $self->{max_bundle_count};
 }
 
-sub max_chunk_size {
+sub max_bundle_size {
        my $self = shift;
        my $value = shift;
-       $self->{max_chunk_size} = $value if (defined($value));
-       return $self->{max_chunk_size};
+       $self->{max_bundle_size} = $value if (defined($value));
+       return $self->{max_bundle_size};
 }
 
 sub recv_timeout {
@@ -932,9 +934,17 @@ sub complete {
        } else {
                $self->session->queue_wait(0);
        }
+    $self->completing(0) if ($self->{complete});
        return $self->{complete};
 }
 
+sub completing {
+       my $self = shift;
+       my $value = shift;
+       $self->{_completing} = $value if (defined($value));
+       return $self->{_completing};
+}
+
 sub duration {
        my $self = shift;
        $self->wait_complete;
@@ -969,6 +979,18 @@ sub push_queue {
                $self->complete(1);
                #return; eventually...
        }
+
+       if( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::Partial")) {
+               $self->{part_recv_buffer} .= $resp->content;
+               return 1;
+       } elsif( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::PartialComplete")) {
+               if ($self->{part_recv_buffer}) {
+                       $resp = new OpenSRF::DomainObject::oilsResult;
+                       $resp->content( OpenSRF::Utils::JSON->JSON2perl( $self->{part_recv_buffer} ) );
+                       $self->{part_recv_buffer} = '';
+               } 
+       }
+
        push @{ $self->{recv_queue} }, $resp;
 }
 
@@ -1012,35 +1034,50 @@ sub respond {
        my $msg = shift;
        return unless ($self and $self->session and !$self->complete);
 
+
+    my $type = 'RESULT';
        my $response;
-       if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
+       if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResponse')) {
                $response = $msg;
-       } else {
-               $response = new OpenSRF::DomainObject::oilsResult;
-               $response->content($msg);
-       }
+        $type = 'STATUS' if UNIVERSAL::isa($response, 'OpenSRF::DomainObject::oilsStatus');
+       } elsif ($self->max_chunk_size > 0) { # we might need to chunk
+        my $str = OpenSRF::Utils::JSON->perl2JSON($msg);
+        if (length($str) > $self->max_chunk_size) { # send partials ("chunking")
+            for (my $i = 0; $i < length($str); $i += $self->max_chunk_size) {
+                $response = new OpenSRF::DomainObject::oilsResult::Partial;
+                       $response->content( substr($str, $i, $self->max_chunk_size) );
+                   $self->session->send($type, $response, $self->threadTrace);
+            }
+            # This triggers reconstruction on the remote end
+            $response = new OpenSRF::DomainObject::oilsResult::PartialComplete;
+               return $self->session->send($type, $response, $self->threadTrace);
+        } else {
+            $response = new OpenSRF::DomainObject::oilsResult;
+            $response->content( $msg );
+        }
+    }
 
-    if ($self->{max_chunk_count} > 0 or $self->{max_chunk_size} > 0) { # we are chunking, and we need to test the size or count
+    if ($self->{max_bundle_count} > 0 or $self->{max_bundle_size} > 0) { # we are bundling, and we need to test the size or count
 
-        $self->{current_chunk_size} += length(OpenSRF::Utils::JSON->perl2JSON($response));
-        push @{$self->{current_chunk}}, $response;  
-        $self->{current_chunk_count}++;
+        $self->{current_bundle_size} += length(OpenSRF::Utils::JSON->perl2JSON($response));
+        push @{$self->{current_bundle}}, $type, $response;  
+        $self->{current_bundle_count}++;
 
-        if (
-                ($self->{max_chunk_size}  && $self->{current_chunk_size}  >= $self->{max_chunk_size} ) ||
-                ($self->{max_chunk_count} && $self->{current_chunk_count} >= $self->{max_chunk_count})
+        if ( $self->completing ||
+                ($self->{max_bundle_size}  && $self->{current_bundle_size}  >= $self->{max_bundle_size} ) ||
+                ($self->{max_bundle_count} && $self->{current_bundle_count} >= $self->{max_bundle_count})
         ) { # send chunk and reset
-            my $send_res = $self->session->send(( map { ('RESULT', $_) } @{$self->{current_chunk}} ), $self->threadTrace);
-            $self->{current_chunk} = [];
-            $self->{current_chunk_size} = 0;
-            $self->{current_chunk_count} = 0;
+            my $send_res = $self->session->send( @{$self->{current_bundle}}, $self->threadTrace);
+            $self->{current_bundle} = [];
+            $self->{current_bundle_size} = 0;
+            $self->{current_bundle_count} = 0;
             return $send_res;
         } else { # not at a chunk yet, just queue it up
             return $self->session->app_request( $self->threadTrace );
         }
     }
 
-       $self->session->send('RESULT', $response, $self->threadTrace);
+       $self->session->send($type, $response, $self->threadTrace);
 }
 
 sub respond_complete {
@@ -1048,23 +1085,15 @@ sub respond_complete {
        my $msg = shift;
        return unless ($self and $self->session and !$self->complete);
 
-    if (defined($msg)) {
-       my $response;
-           if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
-                   $response = $msg;
-       } else {
-               $response = new OpenSRF::DomainObject::oilsResult;
-                   $response->content($msg);
-       }
-
-        push @{$self->{current_chunk}}, $response;
-    }
-
-       my $stat = OpenSRF::DomainObject::oilsConnectStatus->new(
-               statusCode => STATUS_COMPLETE(),
-               status => 'Request Complete' );
+    $self->respond($msg) if (defined($msg));
 
-       $self->session->send( ( map { ('RESULT', $_) } @{$self->{current_chunk}} ), 'STATUS' => $stat, $self->threadTrace);
+    $self->completing(1);
+    $self->respond(
+        OpenSRF::DomainObject::oilsConnectStatus->new(
+            statusCode => STATUS_COMPLETE(),
+            status => 'Request Complete'
+        )
+    );
        $self->complete(1);
 }
 
@@ -1129,9 +1158,17 @@ sub complete {
        my $x = shift;
        my $c = shift;
        $x->{complete} = $c if ($c);
+    $x->completing(0) if ($c);
        return $x->{complete};
 }
 
+sub completing {
+       my $self = shift;
+       my $value = shift;
+       $self->{_completing} = $value if (defined($value));
+       return $self->{_completing};
+}
+
 sub status {}
 
 
index 023bb8d..9749a1d 100644 (file)
@@ -48,17 +48,24 @@ sub argc {
        return $self->{argc};
 }
 
-sub max_chunk_size {
+sub max_bundle_size {
        my $self = shift;
        return 0 unless ref($self);
-       return $self->{max_chunk_size} if (defined($self->{max_chunk_size}));
+       return $self->{max_bundle_size} if (defined($self->{max_bundle_size}));
        return 10240;
 }
 
-sub max_chunk_count {
+sub max_bundle_count {
+       my $self = shift;
+       return 0 unless ref($self);
+       return $self->{max_bundle_count} || 0;
+}
+
+sub max_chunk_size {
        my $self = shift;
        return 0 unless ref($self);
-       return $self->{max_chunk_count} || 0;
+       return $self->{max_chunk_size} if (defined($self->{max_chunk_size}));
+       return 2 * $self->max_bundle_size;
 }
 
 sub api_name {
@@ -173,8 +180,8 @@ sub handler {
                        my @args = $app_msg->params;
                        $coderef->session( $session );
                        my $appreq = OpenSRF::AppRequest->new( $session );
-                       $appreq->max_chunk_size( $coderef->max_chunk_size );
-                       $appreq->max_chunk_count( $coderef->max_chunk_count );
+                       $appreq->max_bundle_size( $coderef->max_bundle_size );
+                       $appreq->max_bundle_count( $coderef->max_bundle_count );
 
                        $log->debug( "in_request = $in_request : [" . $appreq->threadTrace."]", INTERNAL );
                        if( $in_request ) {
index 18e2e3c..25d8f50 100644 (file)
@@ -51,10 +51,13 @@ layer messages send between the client and server.
 
 sub STATUS_CONTINUE            { return 100 }
 
-sub STATUS_OK                          { return 200 }
+sub STATUS_OK                  { return 200 }
 sub STATUS_ACCEPTED            { return 202 }
 sub STATUS_COMPLETE            { return 205 }
 
+sub STATUS_PARTIAL             { return 206 }
+sub STATUS_NOCONTENT   { return 204 }
+
 sub STATUS_REDIRECTED  { return 307 }
 
 sub STATUS_BADREQUEST  { return 400 }
@@ -277,6 +280,85 @@ B<OpenSRF::DomainObject::oilsResponse>
 
 #-------------------------------------------------------------------------------
 
+package OpenSRF::DomainObject::oilsResult::Partial;
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
+use base 'OpenSRF::DomainObject::oilsResult';
+use vars qw/$status $statusCode/;
+OpenSRF::Utils::JSON->register_class_hint( hint => 'osrfResult', name => 'OpenSRF::DomainObject::oilsResult::Partial', type => 'hash' );
+
+
+$status = 'Partial Response';
+$statusCode = STATUS_PARTIAL;
+
+=head1 NAME
+
+OpenSRF::DomainObject::oilsResult::Partial
+
+=head1 SYNOPSIS
+
+This class is used internally to break apart large OpenSRF messages into small
+chunks, to reduce the maximum possible stanza size when sending a message over
+XMPP.
+
+=cut
+
+sub content {
+        my $self = shift;
+       my $val = shift;
+
+       $self->{content} = $val if (defined $val);
+       return $self->{content};
+}
+
+=head1 SEE ALSO
+
+B<OpenSRF::DomainObject::oilsResponse>
+
+=cut
+
+1;
+
+#-------------------------------------------------------------------------------
+
+package OpenSRF::DomainObject::oilsResult::PartialComplete;
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
+use base 'OpenSRF::DomainObject::oilsResult';
+use vars qw/$status $statusCode/;
+OpenSRF::Utils::JSON->register_class_hint( hint => 'osrfResult', name => 'OpenSRF::DomainObject::oilsResult::Partial', type => 'hash' );
+
+
+$status = 'Partial Response Finalized';
+$statusCode = STATUS_NOCONTENT;
+
+=head1 NAME
+
+OpenSRF::DomainObject::oilsResult::Partial
+
+=head1 SYNOPSIS
+
+This class is used internally to mark the end of a stream of small partial
+OpenSRF messages of type OpenSRF::DomainObject::oilsResult::Partial.
+
+=cut
+
+sub content {
+        my $self = shift;
+       my $val = shift;
+
+       $self->{content} = $val if (defined $val);
+       return $self->{content};
+}
+
+=head1 SEE ALSO
+
+B<OpenSRF::DomainObject::oilsResponse>
+
+=cut
+
+1;
+
+#-------------------------------------------------------------------------------
+
 package OpenSRF::DomainObject::oilsException;
 use OpenSRF::DomainObject::oilsResponse qw/:status/;
 use OpenSRF::EX;