From b4bb62d5a05b0b40b96103b375b8cee107909ae2 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Thu, 31 Jan 2013 12:37:46 -0500 Subject: [PATCH] Z39 Batch ML Signed-off-by: Bill Erickson --- .../lib/OpenILS/Application/Search/Z3950.pm | 157 +++++++++++++++++++-- 1 file changed, 142 insertions(+), 15 deletions(-) diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Z3950.pm b/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Z3950.pm index 0036df0463..de2f2255aa 100644 --- a/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Z3950.pm +++ b/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Z3950.pm @@ -11,6 +11,7 @@ use XML::LibXML; use OpenILS::Event; use OpenSRF::EX qw(:try); +use OpenSRF::MultiSession; use OpenILS::Utils::ModsParser; use OpenSRF::Utils::SettingsClient; use OpenILS::Application::AppUtils; @@ -484,6 +485,8 @@ __PACKAGE__->register_method( method => 'bucket_search_queue', api_name => 'open-ils.search.z3950.bucket_search_queue', stream => 1, + # disable opensrf chunking so the caller can receive timely responses + max_chunk_size => 0, signature => { desc => q/ Performs a Z39.50 search for every record in a bucket, using the @@ -494,24 +497,25 @@ __PACKAGE__->register_method( params => [ {desc => q/Authentication token/, type => 'string'}, {desc => q/Bucket ID/, type => 'number'}, - {desc => q/Destination Queue name/, type => 'string'}, {desc => q/Z39 Sources. List of czs.name/, type => 'array'}, - {desc => q/Z39 Index Maps. List of czifm.id/, type => 'array'} + {desc => q/Z39 Index Maps. List of czifm.id/, type => 'array'}, + { desc => q/Vandelay arguments + queue_name -- required + match_set + ... + /, + type => 'object' + } ], return => { q/Object containing status information about the on-going search and queue operation. { - searches : { - $zsource : { - pending : $pending, - sent : $sent, - received : $received - }, - ... - }, - queue_count : $queue_count, # on completion; 0 on no-op - queue : $queue_obj # on completion and action + bre_count : $num, -- number of bibs to search against + search_count : $num, + search_complete : $num, + queue_count : $num + queue : $queue_obj } This object will be streamed back with each milestone (search result or complete). @@ -526,9 +530,9 @@ sub bucket_search_queue { my $conn = shift; my $auth = shift; my $bucket_id = shift; - my $qname = shift; my $z_sources = shift; my $z_indexes = shift; + my $vandelay = shift; my $e = new_editor(authtoken => $auth); return $e->event unless @@ -546,7 +550,7 @@ sub bucket_search_queue { }); # empty bucket - return {queue_count => 0} unless @$bre_ids; + return {bre_count => 0} unless @$bre_ids; $bre_ids = [ map {$_->{target_biblio_record_entry}} @$bre_ids ]; @@ -561,8 +565,131 @@ sub bucket_search_queue { $e, $bre_ids, $z_sources, $z_indexes); return $e->event unless $z_searches; + return {bre_count => 0} unless keys %$z_searches; + + my $queue = create_z39_bucket_queue($e, $bucket_id, $vandelay); + return $e->event unless $queue; + + send_and_queue_bucket_searches($conn, $e, $queue, $z_searches); + + # TODO: process vandelay queue if requested + + return undef; +} + + # create the queue for storing search results +sub create_z39_bucket_queue { + my ($e, $bucket_id, $vandelay) = @_; + + my $existing = $e->search_vandelay_bib_queue({ + name => $vandelay->{queue_name}, + owner => $e->requestor->id + })->[0]; + + return $existing if $existing; + + my $queue = Fieldmapper::vandelay::bib_queue->new; + $queue->match_bucket($bucket_id); + $queue->owner($e->requestor->id); + $queue->name($vandelay->{queue_name}); + $queue->match_set($vandelay->{match_set}); + + $e->xact_begin; + unless ($e->create_vandelay_bib_queue($queue)) { + $e->rollback; + return undef; + } + $e->commit; + + return $queue; +} + +sub send_and_queue_bucket_searches { + my ($conn, $e, $queue, $z_searches) = @_; + + my $max_parallel = 5; # TODO org setting + my $search_limit = 5; # TODO org setting + + my $response = { + bre_count => scalar(keys %$z_searches), + search_count => 0, + search_complete => 0, + queue_count => 0 + }; + + # searches are about to be in flight + $conn->respond($response); + + my $handle_search_result = sub { + my ($self, $req) = @_; + + # there will be one response per z-source + for my $resp (@{$req->{response}}) { + next unless $resp->content; + + my $result = $resp->content; + $response->{search_complete}++; + + my $service = $result->{service}; + + for my $rec (@{$result->{records}}) { + # TODO stamp zsource 901z; add to queue + $response->{queue_count}++; + $logger->info("z39: got result $rec"); + } + } + + $conn->respond($response); + }; + + my $multi_ses = OpenSRF::MultiSession->new( + app => 'open-ils.search', + cap => $max_parallel, + success_handler => $handle_search_result + ); + + # fire a Z39 search for each distinct combination of z-sources + for my $bre_id (keys %$z_searches) { + + # build a search blob for each distinct combination of z-sources + my @searches; + + for my $z_index (keys %{$z_searches->{$bre_id}}) { + my $search = $z_searches->{$bre_id}{$z_index}; + + my ($blob) = grep { + # array compare, weee + @{$_->{service}} ~~ @{$search->{z_source}} + } @searches; + + if ($blob) { + $blob->{search}{$search->{z_index_name}} = + $search->{bib_value}; + } else { + $blob = { + service => $search->{z_source}, + $search->{z_index_name} => $search->{bib_value}, + limit => $search_limit, + offset => 0 + }; + push(@searches, $blob); + } + + # search count is incremented by one for every + # service we sent a search blob to + $response->{search_count}++ for @{$blob->{service}}; + } + + $multi_ses->request( + 'open-ils.search.z3950.search_class', + $e->authtoken, $_) for @searches; + } + + $conn->respond($response); # searches in flight + $multi_ses->session_wait(1); - return $z_searches; # TODO + $response->{queue} = $queue; + $conn->respond($response); } -- 2.11.0