From 2dfb72b1b8059e79fd2babd519c5000bc653b5e2 Mon Sep 17 00:00:00 2001 From: erickson Date: Thu, 14 Oct 2010 20:48:58 +0000 Subject: [PATCH] Back-porting a number of action/trigger fixes from trunk: parallel collection and reaction with multisession cut down xact begin/rollbacks to reduce call overhead during collection force granularity-only when running w/ --granularity git-svn-id: svn://svn.open-ils.org/ILS/branches/rel_2_0@18350 dcc99617-32d9-48b4-a31d-7c20da2025e4 --- .../src/perlmods/OpenILS/Application/Trigger.pm | 182 +++++++++++++++------ .../perlmods/OpenILS/Application/Trigger/Event.pm | 40 ++--- .../src/support-scripts/action_trigger_runner.pl | 3 + 3 files changed, 160 insertions(+), 65 deletions(-) diff --git a/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm b/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm index b243e4aaf8..ae83a7755a 100644 --- a/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm +++ b/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm @@ -7,6 +7,7 @@ use OpenSRF::EX qw/:try/; use OpenSRF::Utils::JSON; use OpenSRF::AppSession; +use OpenSRF::MultiSession; use OpenSRF::Utils::SettingsClient; use OpenSRF::Utils::Logger qw/$logger/; use OpenSRF::Utils qw/:datetime/; @@ -21,8 +22,16 @@ use OpenILS::Application::Trigger::EventGroup; my $log = 'OpenSRF::Utils::Logger'; +my $parallel_collect; +my $parallel_react; -sub initialize {} +sub initialize { + + my $conf = OpenSRF::Utils::SettingsClient->new; + $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1; + $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1; + +} sub child_init {} sub create_active_events_for_object { @@ -619,6 +628,45 @@ __PACKAGE__->register_method( api_level=> 1 ); +sub gather_events { + my $self = shift; + my $client = shift; + my $e_ids = shift; + + $e_ids = [$e_ids] if (!ref($e_ids)); + + my @events; + for my $e_id (@$e_ids) { + my $e; + try { + $e = OpenILS::Application::Trigger::Event->new($e_id); + } catch Error with { + $logger->error("trigger: Event creation failed with ".shift()); + }; + + next if !$e or $e->event->state eq 'invalid'; + + try { + $e->build_environment; + } catch Error with { + $logger->error("trigger: Event environment building failed with ".shift()); + }; + + $e->editor->disconnect; + $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding + $client->respond($e); + } + + OpenILS::Application::Trigger::Event->ClearObjectCache(); + + return undef; +} +__PACKAGE__->register_method( + api_name => 'open-ils.trigger.event.gather', + method => 'gather_events', + api_level=> 1 +); + sub grouped_events { my $self = shift; my $client = shift; @@ -636,39 +684,56 @@ sub grouped_events { return \%groups; } - for my $e_id ( @$events ) { - $logger->info("trigger: processing event $e_id"); + my @fleshed_events; - # let the client know we're still chugging along TODO add osrf support for method_lookup $client's + if ($parallel_collect == 1 or @$events == 1) { # use method lookup + @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events); + } else { + my $self_multi = OpenSRF::MultiSession->new( + app => 'open-ils.trigger', + cap => $parallel_collect, + success_handler => sub { + my $self = shift; + my $req = shift; + + push @fleshed_events, + map { OpenILS::Application::Trigger::Event->new($_) } + map { $_->content } + @{ $req->{response} }; + }, + ); + + $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events ); $client->status( new OpenSRF::DomainObject::oilsContinueStatus ); - my $e; - try { - $e = OpenILS::Application::Trigger::Event->new($e_id); - } catch Error with { - $logger->error("trigger: Event creation failed with ".shift()); - }; - - next unless $e; - - try { - $e->build_environment; - } catch Error with { - $logger->error("trigger: Event environment building failed with ".shift()); - }; + $self_multi->session_wait(1); + $client->status( new OpenSRF::DomainObject::oilsContinueStatus ); + } + for my $e (@fleshed_events) { if (my $group = $e->event->event_def->group_field) { # split the grouping link steps my @steps = split /\./, $group; my $group_field = pop(@steps); # we didn't flesh to this, it's a field not an object - # find the grouping object - my $node = $e->target; - $node = $node->$_() for ( @steps ); + my $node; + eval { + $node = $e->target; + $node = $node->$_() for ( @steps ); + }; + + unless($node) { # should not get here, but to be safe.. + $e->update_state('invalid'); + next; + } # get the grouping value for the grouping object on this event my $ident_value = $node->$group_field(); + if(ref $ident_value) { + my $ident_field = $ident_value->Identity; + $ident_value = $ident_value->$ident_field() + } # push this event onto the event+grouping_value stack $groups{$e->event->event_def->id}{$ident_value} ||= []; @@ -677,11 +742,9 @@ sub grouped_events { # it's a non-grouped event push @{ $groups{'*'} }, $e; } - - $e->editor->disconnect; } - OpenILS::Application::Trigger::Event->ClearObjectCache(); + return \%groups; } __PACKAGE__->register_method( @@ -697,50 +760,77 @@ sub run_all_events { my $granflag = shift; my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag); - $client->status( new OpenSRF::DomainObject::oilsContinueStatus ); - - # Could report on how the "found" events were grouped, but who's going to - # consume that information? -# for my $key (keys %$groups) { -# if (@{ $$groups{$key} }) { -# $client->respond({"status" => "found"}); -# last; -# } -# } + $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}}); + + my $self_multi; + if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) { + $self_multi = OpenSRF::MultiSession->new( + app => 'open-ils.trigger', + cap => $parallel_react, + session_hash_function => sub { + my $args = shift; + return $args->{target_id}; + }, + success_handler => sub { + my $me = shift; + my $req = shift; + $client->respond( $req->{response}->[0]->content ); + } + ); + } for my $def ( keys %$groups ) { if ($def eq '*') { $logger->info("trigger: run_all_events firing un-grouped events"); for my $event ( @{ $$groups{'*'} } ) { try { - $client->respond( - $self - ->method_lookup('open-ils.trigger.event.fire') - ->run($event) - ); + if ($self_multi) { + $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding + $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event); + } else { + $client->respond( + $self + ->method_lookup('open-ils.trigger.event.fire') + ->run($event) + ); + } } catch Error with { $logger->error("trigger: event firing failed with ".shift()); }; } - $logger->info("trigger: run_all_events completed firing un-grouped events"); + $logger->info("trigger: run_all_events completed queuing un-grouped events"); + $client->status( new OpenSRF::DomainObject::oilsContinueStatus ); } else { my $defgroup = $$groups{$def}; $logger->info("trigger: run_all_events firing events for grouped event def=$def"); for my $ident ( keys %$defgroup ) { + $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident"); try { - $client->respond( - $self - ->method_lookup('open-ils.trigger.event_group.fire') - ->run($$defgroup{$ident}) - ); + if ($self_multi) { + $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding + $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident}); + } else { + $client->respond( + $self + ->method_lookup('open-ils.trigger.event_group.fire') + ->run($$defgroup{$ident}) + ); + } + $client->status( new OpenSRF::DomainObject::oilsContinueStatus ); } catch Error with { $logger->error("trigger: event firing failed with ".shift()); }; } - $logger->info("trigger: run_all_events completed firing events for grouped event def=$def"); + $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def"); } } + + $self_multi->session_wait(1) if ($self_multi); + $logger->info("trigger: run_all_events completed firing events"); + + $client->respond_complete(); + return undef; } __PACKAGE__->register_method( api_name => 'open-ils.trigger.event.run_all_pending', diff --git a/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm b/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm index 1e534b597f..2ec5d1be61 100644 --- a/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm +++ b/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm @@ -2,13 +2,10 @@ package OpenILS::Application::Trigger::Event; use strict; use warnings; use OpenSRF::EX qw/:try/; use OpenSRF::Utils::JSON; - use OpenSRF::Utils::Logger qw/$logger/; - use OpenILS::Utils::Fieldmapper; use OpenILS::Utils::CStoreEditor q/:funcs/; use OpenILS::Application::Trigger::ModRunner; - use Safe; my $log = 'OpenSRF::Utils::Logger'; @@ -19,11 +16,17 @@ sub new { my $editor = shift; $class = ref($class) || $class; - return $id if (ref($id) && ref($id) eq $class); - my $standalone = $editor ? 0 : 1; $editor ||= new_editor(); + if (ref($id) && ref($id) eq $class) { + $id->environment->{EventProcessor} = $id + if ($id->environment->{complete}); # in case it came over an opensrf tube + $id->editor( $editor ); + $id->standalone( $standalone ); + return $id; + } + my $self = bless { id => $id, editor => $editor, standalone => $standalone } => $class; return $self->init() @@ -109,6 +112,11 @@ sub init { $self->editor->xact_rollback || return undef; } + unless($self->target) { + $self->update_state('invalid'); + $self->valid(0); + } + return $self; } @@ -473,11 +481,16 @@ sub _object_by_path { my $collector = shift; my $label = shift; my $path = shift; + my $ed = shift; + my $outer = 0; + if (!$ed) { + $ed = new_editor(xact=>1); + $outer = 1; + } my $step = shift(@$path); - my $fhint = Fieldmapper->publish_fieldmapper->{$context->class_name}{links}{$step}{class}; my $fclass = $self->_fm_class_by_hint( $fhint ); @@ -501,10 +514,6 @@ sub _object_by_path { $meth =~ s/Fieldmapper:://; $meth =~ s/::/_/g; - my $ed = grep( /open-ils.cstore/, @{$fclass->Controller} ) ? - $self->editor : - new_rstore_editor(($self->standalone ? () : (xact=>1))); - my $obj = $context->$step(); $logger->debug( @@ -521,18 +530,10 @@ sub _object_by_path { my $def_id = $self->event->event_def->id; my $str_path = join('.', @$path); - if ($self->standalone) { - $ed->xact_begin || return undef; - } - $obj = $_object_by_path_cache{$def_id}{$str_path}{$step}{$ffield}{$lval} || $ed->$meth( ($multi) ? { $ffield => $lval } : $lval); $_object_by_path_cache{$def_id}{$str_path}{$step}{$ffield}{$lval} ||= $obj; - - if ($self->standalone) { - $ed->xact_rollback || return undef; - } } } @@ -547,7 +548,7 @@ sub _object_by_path { for (@$obj_list) { my @path_clone = @$path; - $self->_object_by_path( $_, $collector, $label, \@path_clone ); + $self->_object_by_path( $_, $collector, $label, \@path_clone, $ed ); } $obj = $$obj_list[0] if (!$multi || $rtype eq 'might_have'); @@ -590,6 +591,7 @@ sub _object_by_path { } } + $ed->rollback if ($outer); return $obj; } diff --git a/Open-ILS/src/support-scripts/action_trigger_runner.pl b/Open-ILS/src/support-scripts/action_trigger_runner.pl index 59e981a77f..0225518315 100755 --- a/Open-ILS/src/support-scripts/action_trigger_runner.pl +++ b/Open-ILS/src/support-scripts/action_trigger_runner.pl @@ -58,6 +58,9 @@ GetOptions( my $max_sleep = $opt_max_sleep; +#XXX need to figure out why this is required... +$opt_gran_only = $opt_granularity ? 1 : 0; + $opt_lockfile .= '.' . $opt_granularity if ($opt_granularity && $opt_gran_only); # typical passive hook filters -- 2.11.0