From f1da4a453e2c1186f6455fb80225bf8777e84a21 Mon Sep 17 00:00:00 2001 From: erickson Date: Thu, 7 Oct 2010 14:56:43 +0000 Subject: [PATCH] Parallel action/trigger collection and reaction QA'ed patch from miker to support parallel a/t event collection and reaction. Max parallel procs is controlled by two new opensrf.xml trigger app_settings. Sample config included, settings disabled by default. git-svn-id: svn://svn.open-ils.org/ILS/trunk@18219 dcc99617-32d9-48b4-a31d-7c20da2025e4 --- Open-ILS/examples/opensrf.xml.example | 9 ++ .../src/perlmods/OpenILS/Application/Trigger.pm | 169 +++++++++++++++------ .../perlmods/OpenILS/Application/Trigger/Event.pm | 13 +- 3 files changed, 143 insertions(+), 48 deletions(-) diff --git a/Open-ILS/examples/opensrf.xml.example b/Open-ILS/examples/opensrf.xml.example index 74c28a1cf8..acbd585dd5 100644 --- a/Open-ILS/examples/opensrf.xml.example +++ b/Open-ILS/examples/opensrf.xml.example @@ -590,6 +590,15 @@ vim:et:ts=4:sw=4: 1 5 + + + + diff --git a/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm b/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm index b243e4aaf8..c9c1553e43 100644 --- a/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm +++ b/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm @@ -7,6 +7,7 @@ use OpenSRF::EX qw/:try/; use OpenSRF::Utils::JSON; use OpenSRF::AppSession; +use OpenSRF::MultiSession; use OpenSRF::Utils::SettingsClient; use OpenSRF::Utils::Logger qw/$logger/; use OpenSRF::Utils qw/:datetime/; @@ -21,8 +22,16 @@ use OpenILS::Application::Trigger::EventGroup; my $log = 'OpenSRF::Utils::Logger'; +my $parallel_collect; +my $parallel_react; -sub initialize {} +sub initialize { + + my $conf = OpenSRF::Utils::SettingsClient->new; + $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1; + $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1; + +} sub child_init {} sub create_active_events_for_object { @@ -619,6 +628,45 @@ __PACKAGE__->register_method( api_level=> 1 ); +sub gather_events { + my $self = shift; + my $client = shift; + my $e_ids = shift; + + $e_ids = [$e_ids] if (!ref($e_ids)); + + my @events; + for my $e_id (@$e_ids) { + my $e; + try { + $e = OpenILS::Application::Trigger::Event->new($e_id); + } catch Error with { + $logger->error("trigger: Event creation failed with ".shift()); + }; + + next unless $e; + + try { + $e->build_environment; + } catch Error with { + $logger->error("trigger: Event environment building failed with ".shift()); + }; + + $e->editor->disconnect; + $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding + $client->respond($e); + } + + OpenILS::Application::Trigger::Event->ClearObjectCache(); + + return undef; +} +__PACKAGE__->register_method( + api_name => 'open-ils.trigger.event.gather', + method => 'gather_events', + api_level=> 1 +); + sub grouped_events { my $self = shift; my $client = shift; @@ -636,27 +684,33 @@ sub grouped_events { return \%groups; } - for my $e_id ( @$events ) { - $logger->info("trigger: processing event $e_id"); + my @fleshed_events; - # let the client know we're still chugging along TODO add osrf support for method_lookup $client's + if ($parallel_collect == 1 or @$events == 1) { # use method lookup + @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events); + } else { + my $self_multi = OpenSRF::MultiSession->new( + app => 'open-ils.trigger', + cap => $parallel_collect, + success_handler => sub { + my $self = shift; + my $req = shift; + + push @fleshed_events, + map { OpenILS::Application::Trigger::Event->new($_) } + map { $_->content } + @{ $req->{response} }; + }, + ); + + $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events ); $client->status( new OpenSRF::DomainObject::oilsContinueStatus ); - my $e; - try { - $e = OpenILS::Application::Trigger::Event->new($e_id); - } catch Error with { - $logger->error("trigger: Event creation failed with ".shift()); - }; - - next unless $e; - - try { - $e->build_environment; - } catch Error with { - $logger->error("trigger: Event environment building failed with ".shift()); - }; + $self_multi->session_wait(1); + $client->status( new OpenSRF::DomainObject::oilsContinueStatus ); + } + for my $e (@fleshed_events) { if (my $group = $e->event->event_def->group_field) { # split the grouping link steps @@ -669,6 +723,10 @@ sub grouped_events { # get the grouping value for the grouping object on this event my $ident_value = $node->$group_field(); + if(ref $ident_value) { + my $ident_field = $ident_value->Identity; + $ident_value = $ident_value->$ident_field() + } # push this event onto the event+grouping_value stack $groups{$e->event->event_def->id}{$ident_value} ||= []; @@ -677,11 +735,9 @@ sub grouped_events { # it's a non-grouped event push @{ $groups{'*'} }, $e; } - - $e->editor->disconnect; } - OpenILS::Application::Trigger::Event->ClearObjectCache(); + return \%groups; } __PACKAGE__->register_method( @@ -697,50 +753,77 @@ sub run_all_events { my $granflag = shift; my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag); - $client->status( new OpenSRF::DomainObject::oilsContinueStatus ); - - # Could report on how the "found" events were grouped, but who's going to - # consume that information? -# for my $key (keys %$groups) { -# if (@{ $$groups{$key} }) { -# $client->respond({"status" => "found"}); -# last; -# } -# } + $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}}); + + my $self_multi; + if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) { + $self_multi = OpenSRF::MultiSession->new( + app => 'open-ils.trigger', + cap => $parallel_react, + session_hash_function => sub { + my $args = shift; + return $args->{target_id}; + }, + success_handler => sub { + my $me = shift; + my $req = shift; + $client->respond( $req->{response}->[0]->content ); + } + ); + } for my $def ( keys %$groups ) { if ($def eq '*') { $logger->info("trigger: run_all_events firing un-grouped events"); for my $event ( @{ $$groups{'*'} } ) { try { - $client->respond( - $self - ->method_lookup('open-ils.trigger.event.fire') - ->run($event) - ); + if ($self_multi) { + $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding + $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event); + } else { + $client->respond( + $self + ->method_lookup('open-ils.trigger.event.fire') + ->run($event) + ); + } } catch Error with { $logger->error("trigger: event firing failed with ".shift()); }; } - $logger->info("trigger: run_all_events completed firing un-grouped events"); + $logger->info("trigger: run_all_events completed queuing un-grouped events"); + $client->status( new OpenSRF::DomainObject::oilsContinueStatus ); } else { my $defgroup = $$groups{$def}; $logger->info("trigger: run_all_events firing events for grouped event def=$def"); for my $ident ( keys %$defgroup ) { + $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident"); try { - $client->respond( - $self - ->method_lookup('open-ils.trigger.event_group.fire') - ->run($$defgroup{$ident}) - ); + if ($self_multi) { + $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding + $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident}); + } else { + $client->respond( + $self + ->method_lookup('open-ils.trigger.event_group.fire') + ->run($$defgroup{$ident}) + ); + } + $client->status( new OpenSRF::DomainObject::oilsContinueStatus ); } catch Error with { $logger->error("trigger: event firing failed with ".shift()); }; } - $logger->info("trigger: run_all_events completed firing events for grouped event def=$def"); + $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def"); } } + + $self_multi->session_wait(1) if ($self_multi); + $logger->info("trigger: run_all_events completed firing events"); + + $client->respond_complete(); + return undef; } __PACKAGE__->register_method( api_name => 'open-ils.trigger.event.run_all_pending', diff --git a/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm b/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm index 1e534b597f..1a1989550a 100644 --- a/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm +++ b/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm @@ -2,13 +2,10 @@ package OpenILS::Application::Trigger::Event; use strict; use warnings; use OpenSRF::EX qw/:try/; use OpenSRF::Utils::JSON; - use OpenSRF::Utils::Logger qw/$logger/; - use OpenILS::Utils::Fieldmapper; use OpenILS::Utils::CStoreEditor q/:funcs/; use OpenILS::Application::Trigger::ModRunner; - use Safe; my $log = 'OpenSRF::Utils::Logger'; @@ -19,11 +16,17 @@ sub new { my $editor = shift; $class = ref($class) || $class; - return $id if (ref($id) && ref($id) eq $class); - my $standalone = $editor ? 0 : 1; $editor ||= new_editor(); + if (ref($id) && ref($id) eq $class) { + $id->environment->{EventProcessor} = $id + if ($id->environment->{complete}); # in case it came over an opensrf tube + $id->editor( $editor ); + $id->standalone( $standalone ); + return $id; + } + my $self = bless { id => $id, editor => $editor, standalone => $standalone } => $class; return $self->init() -- 2.11.0