use OpenSRF::Utils::JSON;
use OpenSRF::AppSession;
+use OpenSRF::MultiSession;
use OpenSRF::Utils::SettingsClient;
use OpenSRF::Utils::Logger qw/$logger/;
use OpenSRF::Utils qw/:datetime/;
my $log = 'OpenSRF::Utils::Logger';
+my $parallel_collect;
+my $parallel_react;
-sub initialize {}
+sub initialize {
+
+ my $conf = OpenSRF::Utils::SettingsClient->new;
+ $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1;
+ $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1;
+
+}
sub child_init {}
sub create_active_events_for_object {
api_level=> 1
);
+sub gather_events {
+ my $self = shift;
+ my $client = shift;
+ my $e_ids = shift;
+
+ $e_ids = [$e_ids] if (!ref($e_ids));
+
+ my @events;
+ for my $e_id (@$e_ids) {
+ my $e;
+ try {
+ $e = OpenILS::Application::Trigger::Event->new($e_id);
+ } catch Error with {
+ $logger->error("trigger: Event creation failed with ".shift());
+ };
+
+ next if !$e or $e->event->state eq 'invalid';
+
+ try {
+ $e->build_environment;
+ } catch Error with {
+ $logger->error("trigger: Event environment building failed with ".shift());
+ };
+
+ $e->editor->disconnect;
+ $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding
+ $client->respond($e);
+ }
+
+ OpenILS::Application::Trigger::Event->ClearObjectCache();
+
+ return undef;
+}
+__PACKAGE__->register_method(
+ api_name => 'open-ils.trigger.event.gather',
+ method => 'gather_events',
+ api_level=> 1
+);
+
sub grouped_events {
my $self = shift;
my $client = shift;
return \%groups;
}
- for my $e_id ( @$events ) {
- $logger->info("trigger: processing event $e_id");
+ my @fleshed_events;
- # let the client know we're still chugging along TODO add osrf support for method_lookup $client's
+ if ($parallel_collect == 1 or @$events == 1) { # use method lookup
+ @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events);
+ } else {
+ my $self_multi = OpenSRF::MultiSession->new(
+ app => 'open-ils.trigger',
+ cap => $parallel_collect,
+ success_handler => sub {
+ my $self = shift;
+ my $req = shift;
+
+ push @fleshed_events,
+ map { OpenILS::Application::Trigger::Event->new($_) }
+ map { $_->content }
+ @{ $req->{response} };
+ },
+ );
+
+ $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events );
$client->status( new OpenSRF::DomainObject::oilsContinueStatus );
- my $e;
- try {
- $e = OpenILS::Application::Trigger::Event->new($e_id);
- } catch Error with {
- $logger->error("trigger: Event creation failed with ".shift());
- };
-
- next unless $e;
-
- try {
- $e->build_environment;
- } catch Error with {
- $logger->error("trigger: Event environment building failed with ".shift());
- };
+ $self_multi->session_wait(1);
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
+ }
+ for my $e (@fleshed_events) {
if (my $group = $e->event->event_def->group_field) {
# split the grouping link steps
my @steps = split /\./, $group;
my $group_field = pop(@steps); # we didn't flesh to this, it's a field not an object
- # find the grouping object
- my $node = $e->target;
- $node = $node->$_() for ( @steps );
+ my $node;
+ eval {
+ $node = $e->target;
+ $node = $node->$_() for ( @steps );
+ };
+
+ unless($node) { # should not get here, but to be safe..
+ $e->update_state('invalid');
+ next;
+ }
# get the grouping value for the grouping object on this event
my $ident_value = $node->$group_field();
+ if(ref $ident_value) {
+ my $ident_field = $ident_value->Identity;
+ $ident_value = $ident_value->$ident_field()
+ }
# push this event onto the event+grouping_value stack
$groups{$e->event->event_def->id}{$ident_value} ||= [];
# it's a non-grouped event
push @{ $groups{'*'} }, $e;
}
-
- $e->editor->disconnect;
}
- OpenILS::Application::Trigger::Event->ClearObjectCache();
+
return \%groups;
}
__PACKAGE__->register_method(
my $granflag = shift;
my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag);
- $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
-
- # Could report on how the "found" events were grouped, but who's going to
- # consume that information?
-# for my $key (keys %$groups) {
-# if (@{ $$groups{$key} }) {
-# $client->respond({"status" => "found"});
-# last;
-# }
-# }
+ $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}});
+
+ my $self_multi;
+ if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) {
+ $self_multi = OpenSRF::MultiSession->new(
+ app => 'open-ils.trigger',
+ cap => $parallel_react,
+ session_hash_function => sub {
+ my $args = shift;
+ return $args->{target_id};
+ },
+ success_handler => sub {
+ my $me = shift;
+ my $req = shift;
+ $client->respond( $req->{response}->[0]->content );
+ }
+ );
+ }
for my $def ( keys %$groups ) {
if ($def eq '*') {
$logger->info("trigger: run_all_events firing un-grouped events");
for my $event ( @{ $$groups{'*'} } ) {
try {
- $client->respond(
- $self
- ->method_lookup('open-ils.trigger.event.fire')
- ->run($event)
- );
+ if ($self_multi) {
+ $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding
+ $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event);
+ } else {
+ $client->respond(
+ $self
+ ->method_lookup('open-ils.trigger.event.fire')
+ ->run($event)
+ );
+ }
} catch Error with {
$logger->error("trigger: event firing failed with ".shift());
};
}
- $logger->info("trigger: run_all_events completed firing un-grouped events");
+ $logger->info("trigger: run_all_events completed queuing un-grouped events");
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
} else {
my $defgroup = $$groups{$def};
$logger->info("trigger: run_all_events firing events for grouped event def=$def");
for my $ident ( keys %$defgroup ) {
+ $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident");
try {
- $client->respond(
- $self
- ->method_lookup('open-ils.trigger.event_group.fire')
- ->run($$defgroup{$ident})
- );
+ if ($self_multi) {
+ $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding
+ $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident});
+ } else {
+ $client->respond(
+ $self
+ ->method_lookup('open-ils.trigger.event_group.fire')
+ ->run($$defgroup{$ident})
+ );
+ }
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
} catch Error with {
$logger->error("trigger: event firing failed with ".shift());
};
}
- $logger->info("trigger: run_all_events completed firing events for grouped event def=$def");
+ $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def");
}
}
+
+ $self_multi->session_wait(1) if ($self_multi);
+ $logger->info("trigger: run_all_events completed firing events");
+
+ $client->respond_complete();
+ return undef;
}
__PACKAGE__->register_method(
api_name => 'open-ils.trigger.event.run_all_pending',
use strict; use warnings;
use OpenSRF::EX qw/:try/;
use OpenSRF::Utils::JSON;
-
use OpenSRF::Utils::Logger qw/$logger/;
-
use OpenILS::Utils::Fieldmapper;
use OpenILS::Utils::CStoreEditor q/:funcs/;
use OpenILS::Application::Trigger::ModRunner;
-
use Safe;
my $log = 'OpenSRF::Utils::Logger';
my $editor = shift;
$class = ref($class) || $class;
- return $id if (ref($id) && ref($id) eq $class);
-
my $standalone = $editor ? 0 : 1;
$editor ||= new_editor();
+ if (ref($id) && ref($id) eq $class) {
+ $id->environment->{EventProcessor} = $id
+ if ($id->environment->{complete}); # in case it came over an opensrf tube
+ $id->editor( $editor );
+ $id->standalone( $standalone );
+ return $id;
+ }
+
my $self = bless { id => $id, editor => $editor, standalone => $standalone } => $class;
return $self->init()
$self->editor->xact_rollback || return undef;
}
+ unless($self->target) {
+ $self->update_state('invalid');
+ $self->valid(0);
+ }
+
return $self;
}
my $collector = shift;
my $label = shift;
my $path = shift;
+ my $ed = shift;
+ my $outer = 0;
+ if (!$ed) {
+ $ed = new_editor(xact=>1);
+ $outer = 1;
+ }
my $step = shift(@$path);
-
my $fhint = Fieldmapper->publish_fieldmapper->{$context->class_name}{links}{$step}{class};
my $fclass = $self->_fm_class_by_hint( $fhint );
$meth =~ s/Fieldmapper:://;
$meth =~ s/::/_/g;
- my $ed = grep( /open-ils.cstore/, @{$fclass->Controller} ) ?
- $self->editor :
- new_rstore_editor(($self->standalone ? () : (xact=>1)));
-
my $obj = $context->$step();
$logger->debug(
my $def_id = $self->event->event_def->id;
my $str_path = join('.', @$path);
- if ($self->standalone) {
- $ed->xact_begin || return undef;
- }
-
$obj = $_object_by_path_cache{$def_id}{$str_path}{$step}{$ffield}{$lval} ||
$ed->$meth( ($multi) ? { $ffield => $lval } : $lval);
$_object_by_path_cache{$def_id}{$str_path}{$step}{$ffield}{$lval} ||= $obj;
-
- if ($self->standalone) {
- $ed->xact_rollback || return undef;
- }
}
}
for (@$obj_list) {
my @path_clone = @$path;
- $self->_object_by_path( $_, $collector, $label, \@path_clone );
+ $self->_object_by_path( $_, $collector, $label, \@path_clone, $ed );
}
$obj = $$obj_list[0] if (!$multi || $rtype eq 'might_have');
}
}
+ $ed->rollback if ($outer);
return $obj;
}