MultiSession, YAY
authormiker <miker@9efc2488-bf62-4759-914b-345cdb29e865>
Wed, 5 Apr 2006 21:09:47 +0000 (21:09 +0000)
committermiker <miker@9efc2488-bf62-4759-914b-345cdb29e865>
Wed, 5 Apr 2006 21:09:47 +0000 (21:09 +0000)
git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@677 9efc2488-bf62-4759-914b-345cdb29e865

src/perlmods/OpenSRF/MultiSession.pm [new file with mode: 0644]

diff --git a/src/perlmods/OpenSRF/MultiSession.pm b/src/perlmods/OpenSRF/MultiSession.pm
new file mode 100644 (file)
index 0000000..c25de3c
--- /dev/null
@@ -0,0 +1,263 @@
+package OpenSRF::MultiSession;
+use OpenSRF::AppSession;
+use OpenSRF::Utils::Logger;
+use Time::HiRes qw/time usleep/;
+
+my $log = 'OpenSRF::Utils::Logger';
+
+sub new {
+       my $class = shift;
+       $class = ref($class) || $class;
+
+       my $self = bless {@_} => $class;
+
+       $self->{api_level} = 1 if (!defined($self->{api_level}));
+       $self->{session_hash_function} = \&_dummy_session_hash_function
+               if (!defined($self->{session_hash_function}));
+
+       if ($self->{cap}) {
+               $self->session_cap($self->{cap});
+               $self->request_cap($self->{cap});
+       }
+
+       if (!$self->session_cap) {
+               # XXX make adaptive the default once the logic is in place
+               #$self->adaptive(1);
+
+               $self->session_cap(10);
+       }
+       if (!$self->request_cap) {
+               # XXX make adaptive the default once the logic is in place
+               #$self->adaptive(1);
+
+               $self->request_cap(10);
+       }
+
+       $self->{sessions} = [];
+       $self->{running} = [];
+       $self->{completed} = [];
+       $self->{failed} = [];
+
+       for ( 1 .. $self->session_cap) {
+               push @{ $self->{sessions} },
+                       OpenSRF::AppSession->create(
+                               $self->{app},
+                               $self->{api_level},
+                               1
+                       );
+               #print "Creating connection ".$self->{sessions}->[-1]->session_id." ...\n";
+               $log->debug("Creating connection ".$self->{sessions}->[-1]->session_id." ...");
+       }
+
+       return $self;
+}
+
+sub _dummy_session_hash_function {
+       my $self = shift;
+       $self->{_dummy_hash_counter} = 0 if (!exists($self->{_dummy_hash_counter}));
+       $self->{_dummy_hash_counter}++;
+       return ( $self->{_dummy_hash_counter} % $self->session_cap ) - 1;
+}
+
+sub connect {
+       my $self = shift;
+       $_->connect for (@{$self->{sessions}});
+}
+
+sub finish {
+       my $self = shift;
+       $_->finish for (@{$self->{sessions}});
+}
+
+sub disconnect {
+       my $self = shift;
+       $_->disconnect for (@{$self->{sessions}});
+}
+
+sub session_hash_function {
+       my $self = shift;
+       my $session_hash_function = shift;
+       return unless (ref $self);
+
+       $self->{session_hash_function} = $session_hash_function if (defined $session_hash_function);
+       return $self->{session_hash_function};
+}
+
+sub failure_handler {
+       my $self = shift;
+       my $failure_handler = shift;
+       return unless (ref $self);
+
+       $self->{failure_handler} = $failure_handler if (defined $failure_handler);
+       return $self->{failure_handler};
+}
+
+sub success_handler {
+       my $self = shift;
+       my $success_handler = shift;
+       return unless (ref $self);
+
+       $self->{success_handler} = $success_handler if (defined $success_handler);
+       return $self->{success_handler};
+}
+
+sub session_cap {
+       my $self = shift;
+       my $cap = shift;
+       return unless (ref $self);
+
+       $self->{session_cap} = $cap if (defined $cap);
+       return $self->{session_cap};
+}
+
+sub request_cap {
+       my $self = shift;
+       my $cap = shift;
+       return unless (ref $self);
+
+       $self->{request_cap} = $cap if (defined $cap);
+       return $self->{request_cap};
+}
+
+sub adaptive {
+       my $self = shift;
+       my $adapt = shift;
+       return unless (ref $self);
+
+       $self->{adaptive} = $adapt if (defined $adapt);
+       return $self->{adaptive};
+}
+
+sub completed {
+       my $self = shift;
+       my $count = shift;
+       return unless (ref $self);
+
+
+       if (wantarray) {
+               $count ||= scalar @{$self->{completed}}; 
+       }
+
+       if (defined $count) {
+               return () unless (@{$self->{completed}});
+               return splice @{$self->{completed}}, 0, $count;
+       }
+
+       return scalar @{$self->{completed}};
+}
+
+sub failed {
+       my $self = shift;
+       my $count = shift;
+       return unless (ref $self);
+
+
+       if (wantarray) {
+               $count ||= scalar @{$self->{failed}}; 
+       }
+
+       if (defined $count) {
+               return () unless (@{$self->{failed}});
+               return splice @{$self->{failed}}, 0, $count;
+       }
+
+       return scalar @{$self->{failed}};
+}
+
+sub running {
+       my $self = shift;
+       return unless (ref $self);
+       return scalar(@{ $self->{running} });
+}
+
+
+sub request {
+       my $self = shift;
+       my $method = shift;
+       my @params = @_;
+
+       if ($self->running < $self->request_cap ) {
+               my $index = $self->session_hash_function->($self, $method, @params);
+               my $ses = $self->{sessions}->[$index]; 
+
+               #print "Running $method using session ".$ses->session_id."\n";
+
+               my $req = $ses->request( $method, @params );
+
+               push @{ $self->{running} },
+                       { req => $req,
+                         meth => $method,
+                         params => [@params],
+                         start => time,
+                       };
+
+               $log->debug("Making request [$method] ".$self->running."...");
+
+               return $req;
+       } elsif (!$self->adaptive) {
+               #print "Oops.  Too many running: ".$self->running."\n";
+               $self->session_wait;
+               return $self->request($method => @params);
+       } else {
+               # XXX do addaptive stuff ...
+       }
+}
+
+sub session_wait {
+       my $self = shift;
+       my $all = shift;
+
+       if ($all) {
+               while ($self->running) {
+                       $self->session_reap;
+               }
+       } else {
+               while(!$self->session_reap) {
+                       usleep 10;
+               }
+       }
+}
+
+sub session_reap {
+       my $self = shift;
+
+       my $done = 0;
+       my @running;
+       while ( my $req = shift @{ $self->{running} } ) {
+               if ($req->{req}->complete) {
+                       #print "Currently running: ".$self->running."\n";
+
+                       $req->{response} = [$req->{req}->recv];
+
+                       $req->{end} = time;
+                       $req->{duration} = $req->{end} - $req->{start};
+
+                       #print "Completed ".$req->{meth}." in session ".$req->{req}->session->session_id." [$req->{duration}]\n";
+
+                       if ($req->{req}->failed) {
+                               #print "ARG!!!! Failed command $req->{meth} in session ".$req->{req}->session->session_id."\n";
+                               $req->{error} = $req->{req}->failed;
+                               push @{ $self->{failed} }, $req;
+                       } else {
+                               push @{ $self->{completed} }, $req;
+                       }
+
+                       $req->{req}->finish;
+                       delete $$req{req};
+
+                       my $handler = $req->{error} ? $self->failure_handler : $self->success_handler;
+
+                       $handler->($self, $req) if ($handler);
+                       
+                       $done++;
+               } else {
+                       #print "Still running ".$req->{meth}." in session ".$req->{req}->session->session_id."\n";
+                       push @running, $req;
+               }
+       }
+       push @{ $self->{running} }, @running;
+       return $done
+}
+
+1;
+