From 317fd16dd1820427a263b68ae83ae8439b1e4fa4 Mon Sep 17 00:00:00 2001 From: Mike Rylander Date: Sun, 23 Feb 2014 14:51:13 -0500 Subject: [PATCH] Rename "chunking" to "bundling"; Implement message splitting ("chunking") in Perl using two new oilsResult subclasses Signed-off-by: Mike Rylander --- src/perl/lib/OpenSRF/AppSession.pm | 123 ++++++++++++++-------- src/perl/lib/OpenSRF/Application.pm | 19 ++-- src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm | 84 ++++++++++++++- 3 files changed, 176 insertions(+), 50 deletions(-) diff --git a/src/perl/lib/OpenSRF/AppSession.pm b/src/perl/lib/OpenSRF/AppSession.pm index 9e371c7..7454970 100644 --- a/src/perl/lib/OpenSRF/AppSession.pm +++ b/src/perl/lib/OpenSRF/AppSession.pm @@ -858,15 +858,17 @@ sub new { payload => $payload, complete => 0, resp_count => 0, - max_chunk_count => 0, - current_chunk_count => 0, + max_bundle_count => 0, + current_bundle_count=> 0, max_chunk_size => 0, - current_chunk_size => 0, - current_chunk => [], + max_bundle_size => 0, + current_bundle_size => 0, + current_bundle => [], timeout_reset => 0, recv_timeout => 30, remaining_recv_timeout => 30, recv_queue => [], + part_recv_buffer=> '', }; bless $self => $class; @@ -876,18 +878,18 @@ sub new { return $self; } -sub max_chunk_count { +sub max_bundle_count { my $self = shift; my $value = shift; - $self->{max_chunk_count} = $value if (defined($value)); - return $self->{max_chunk_count}; + $self->{max_bundle_count} = $value if (defined($value)); + return $self->{max_bundle_count}; } -sub max_chunk_size { +sub max_bundle_size { my $self = shift; my $value = shift; - $self->{max_chunk_size} = $value if (defined($value)); - return $self->{max_chunk_size}; + $self->{max_bundle_size} = $value if (defined($value)); + return $self->{max_bundle_size}; } sub recv_timeout { @@ -932,9 +934,17 @@ sub complete { } else { $self->session->queue_wait(0); } + $self->completing(0) if ($self->{complete}); return $self->{complete}; } +sub completing { + my $self = shift; + my $value = shift; + $self->{_completing} = $value if (defined($value)); + return $self->{_completing}; +} + sub duration { my $self = shift; $self->wait_complete; @@ -969,6 +979,18 @@ sub push_queue { $self->complete(1); #return; eventually... } + + if( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::Partial")) { + $self->{part_recv_buffer} .= $resp->content; + return 1; + } elsif( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::PartialComplete")) { + if ($self->{part_recv_buffer}) { + $resp = new OpenSRF::DomainObject::oilsResult; + $resp->content( OpenSRF::Utils::JSON->JSON2perl( $self->{part_recv_buffer} ) ); + $self->{part_recv_buffer} = ''; + } + } + push @{ $self->{recv_queue} }, $resp; } @@ -1012,35 +1034,50 @@ sub respond { my $msg = shift; return unless ($self and $self->session and !$self->complete); + + my $type = 'RESULT'; my $response; - if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) { + if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResponse')) { $response = $msg; - } else { - $response = new OpenSRF::DomainObject::oilsResult; - $response->content($msg); - } + $type = 'STATUS' if UNIVERSAL::isa($response, 'OpenSRF::DomainObject::oilsStatus'); + } elsif ($self->max_chunk_size > 0) { # we might need to chunk + my $str = OpenSRF::Utils::JSON->perl2JSON($msg); + if (length($str) > $self->max_chunk_size) { # send partials ("chunking") + for (my $i = 0; $i < length($str); $i += $self->max_chunk_size) { + $response = new OpenSRF::DomainObject::oilsResult::Partial; + $response->content( substr($str, $i, $self->max_chunk_size) ); + $self->session->send($type, $response, $self->threadTrace); + } + # This triggers reconstruction on the remote end + $response = new OpenSRF::DomainObject::oilsResult::PartialComplete; + return $self->session->send($type, $response, $self->threadTrace); + } else { + $response = new OpenSRF::DomainObject::oilsResult; + $response->content( $msg ); + } + } - if ($self->{max_chunk_count} > 0 or $self->{max_chunk_size} > 0) { # we are chunking, and we need to test the size or count + if ($self->{max_bundle_count} > 0 or $self->{max_bundle_size} > 0) { # we are bundling, and we need to test the size or count - $self->{current_chunk_size} += length(OpenSRF::Utils::JSON->perl2JSON($response)); - push @{$self->{current_chunk}}, $response; - $self->{current_chunk_count}++; + $self->{current_bundle_size} += length(OpenSRF::Utils::JSON->perl2JSON($response)); + push @{$self->{current_bundle}}, $type, $response; + $self->{current_bundle_count}++; - if ( - ($self->{max_chunk_size} && $self->{current_chunk_size} >= $self->{max_chunk_size} ) || - ($self->{max_chunk_count} && $self->{current_chunk_count} >= $self->{max_chunk_count}) + if ( $self->completing || + ($self->{max_bundle_size} && $self->{current_bundle_size} >= $self->{max_bundle_size} ) || + ($self->{max_bundle_count} && $self->{current_bundle_count} >= $self->{max_bundle_count}) ) { # send chunk and reset - my $send_res = $self->session->send(( map { ('RESULT', $_) } @{$self->{current_chunk}} ), $self->threadTrace); - $self->{current_chunk} = []; - $self->{current_chunk_size} = 0; - $self->{current_chunk_count} = 0; + my $send_res = $self->session->send( @{$self->{current_bundle}}, $self->threadTrace); + $self->{current_bundle} = []; + $self->{current_bundle_size} = 0; + $self->{current_bundle_count} = 0; return $send_res; } else { # not at a chunk yet, just queue it up return $self->session->app_request( $self->threadTrace ); } } - $self->session->send('RESULT', $response, $self->threadTrace); + $self->session->send($type, $response, $self->threadTrace); } sub respond_complete { @@ -1048,23 +1085,15 @@ sub respond_complete { my $msg = shift; return unless ($self and $self->session and !$self->complete); - if (defined($msg)) { - my $response; - if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) { - $response = $msg; - } else { - $response = new OpenSRF::DomainObject::oilsResult; - $response->content($msg); - } - - push @{$self->{current_chunk}}, $response; - } - - my $stat = OpenSRF::DomainObject::oilsConnectStatus->new( - statusCode => STATUS_COMPLETE(), - status => 'Request Complete' ); + $self->respond($msg) if (defined($msg)); - $self->session->send( ( map { ('RESULT', $_) } @{$self->{current_chunk}} ), 'STATUS' => $stat, $self->threadTrace); + $self->completing(1); + $self->respond( + OpenSRF::DomainObject::oilsConnectStatus->new( + statusCode => STATUS_COMPLETE(), + status => 'Request Complete' + ) + ); $self->complete(1); } @@ -1129,9 +1158,17 @@ sub complete { my $x = shift; my $c = shift; $x->{complete} = $c if ($c); + $x->completing(0) if ($c); return $x->{complete}; } +sub completing { + my $self = shift; + my $value = shift; + $self->{_completing} = $value if (defined($value)); + return $self->{_completing}; +} + sub status {} diff --git a/src/perl/lib/OpenSRF/Application.pm b/src/perl/lib/OpenSRF/Application.pm index 023bb8d..9749a1d 100644 --- a/src/perl/lib/OpenSRF/Application.pm +++ b/src/perl/lib/OpenSRF/Application.pm @@ -48,17 +48,24 @@ sub argc { return $self->{argc}; } -sub max_chunk_size { +sub max_bundle_size { my $self = shift; return 0 unless ref($self); - return $self->{max_chunk_size} if (defined($self->{max_chunk_size})); + return $self->{max_bundle_size} if (defined($self->{max_bundle_size})); return 10240; } -sub max_chunk_count { +sub max_bundle_count { + my $self = shift; + return 0 unless ref($self); + return $self->{max_bundle_count} || 0; +} + +sub max_chunk_size { my $self = shift; return 0 unless ref($self); - return $self->{max_chunk_count} || 0; + return $self->{max_chunk_size} if (defined($self->{max_chunk_size})); + return 2 * $self->max_bundle_size; } sub api_name { @@ -173,8 +180,8 @@ sub handler { my @args = $app_msg->params; $coderef->session( $session ); my $appreq = OpenSRF::AppRequest->new( $session ); - $appreq->max_chunk_size( $coderef->max_chunk_size ); - $appreq->max_chunk_count( $coderef->max_chunk_count ); + $appreq->max_bundle_size( $coderef->max_bundle_size ); + $appreq->max_bundle_count( $coderef->max_bundle_count ); $log->debug( "in_request = $in_request : [" . $appreq->threadTrace."]", INTERNAL ); if( $in_request ) { diff --git a/src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm b/src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm index 18e2e3c..25d8f50 100644 --- a/src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm +++ b/src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm @@ -51,10 +51,13 @@ layer messages send between the client and server. sub STATUS_CONTINUE { return 100 } -sub STATUS_OK { return 200 } +sub STATUS_OK { return 200 } sub STATUS_ACCEPTED { return 202 } sub STATUS_COMPLETE { return 205 } +sub STATUS_PARTIAL { return 206 } +sub STATUS_NOCONTENT { return 204 } + sub STATUS_REDIRECTED { return 307 } sub STATUS_BADREQUEST { return 400 } @@ -277,6 +280,85 @@ B #------------------------------------------------------------------------------- +package OpenSRF::DomainObject::oilsResult::Partial; +use OpenSRF::DomainObject::oilsResponse qw/:status/; +use base 'OpenSRF::DomainObject::oilsResult'; +use vars qw/$status $statusCode/; +OpenSRF::Utils::JSON->register_class_hint( hint => 'osrfResult', name => 'OpenSRF::DomainObject::oilsResult::Partial', type => 'hash' ); + + +$status = 'Partial Response'; +$statusCode = STATUS_PARTIAL; + +=head1 NAME + +OpenSRF::DomainObject::oilsResult::Partial + +=head1 SYNOPSIS + +This class is used internally to break apart large OpenSRF messages into small +chunks, to reduce the maximum possible stanza size when sending a message over +XMPP. + +=cut + +sub content { + my $self = shift; + my $val = shift; + + $self->{content} = $val if (defined $val); + return $self->{content}; +} + +=head1 SEE ALSO + +B + +=cut + +1; + +#------------------------------------------------------------------------------- + +package OpenSRF::DomainObject::oilsResult::PartialComplete; +use OpenSRF::DomainObject::oilsResponse qw/:status/; +use base 'OpenSRF::DomainObject::oilsResult'; +use vars qw/$status $statusCode/; +OpenSRF::Utils::JSON->register_class_hint( hint => 'osrfResult', name => 'OpenSRF::DomainObject::oilsResult::Partial', type => 'hash' ); + + +$status = 'Partial Response Finalized'; +$statusCode = STATUS_NOCONTENT; + +=head1 NAME + +OpenSRF::DomainObject::oilsResult::Partial + +=head1 SYNOPSIS + +This class is used internally to mark the end of a stream of small partial +OpenSRF messages of type OpenSRF::DomainObject::oilsResult::Partial. + +=cut + +sub content { + my $self = shift; + my $val = shift; + + $self->{content} = $val if (defined $val); + return $self->{content}; +} + +=head1 SEE ALSO + +B + +=cut + +1; + +#------------------------------------------------------------------------------- + package OpenSRF::DomainObject::oilsException; use OpenSRF::DomainObject::oilsResponse qw/:status/; use OpenSRF::EX; -- 2.11.0