From 446284353bb7f6072198af09a0ff8b9379f2a481 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Wed, 30 Jan 2013 17:44:30 -0500 Subject: [PATCH] Z39.50 Batch Search/Overlay API New API for performing Z39.50 queries for a set of bib records across a set of Z39 sources. Searches are performed in parallel (up to a configurable maximum number of active searches and maximum results) and results are stored in a vandelay MARC Import/Export queue. Search fields and values are determined by Z39 index field maps. At search time, the caller provides a set of field maps to use for the search. Search values for each bib record are found by following the index maps to the indexed values in metabib.*_field_entry or metabib.record_attr. Signed-off-by: Bill Erickson --- .../lib/OpenILS/Application/Search/Z3950.pm | 375 ++++++++++++++++++++- 1 file changed, 374 insertions(+), 1 deletion(-) diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Z3950.pm b/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Z3950.pm index 1f17a0ccc0..e01460ade8 100644 --- a/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Z3950.pm +++ b/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Z3950.pm @@ -11,11 +11,14 @@ use XML::LibXML; use OpenILS::Event; use OpenSRF::EX qw(:try); +use OpenSRF::MultiSession; use OpenILS::Utils::ModsParser; use OpenSRF::Utils::SettingsClient; +use OpenSRF::Utils::JSON; use OpenILS::Application::AppUtils; use OpenSRF::Utils::Logger qw/$logger/; use OpenILS::Utils::CStoreEditor q/:funcs/; +use OpenILS::Utils::Normalize qw/clean_marc/; MARC::Charset->assume_unicode(1); MARC::Charset->ignore_errors(1); @@ -437,7 +440,7 @@ sub process_results { my $res = {}; my $count = $$res{count} = $results->size; - $logger->info("z3950: search returned $count hits"); + $logger->info("z3950: '$service' search returned $count hits"); my $tend = $limit + $offset; @@ -525,5 +528,375 @@ sub compile_query { return $str; } + +__PACKAGE__->register_method( + method => 'bucket_search_queue', + api_name => 'open-ils.search.z3950.bucket_search_queue', + stream => 1, + # disable opensrf chunking so the caller can receive timely responses + max_chunk_size => 0, + signature => { + desc => q/ + Performs a Z39.50 search for every record in a bucket, using the + provided Z39.50 fields. Add all search results to the specified + Vandelay queue. If no source records or search results are found, + no queue is created. + /, + params => [ + {desc => q/Authentication token/, type => 'string'}, + {desc => q/Bucket ID/, type => 'number'}, + {desc => q/Z39 Sources. List of czs.name/, type => 'array'}, + {desc => q/Z39 Index Maps. List of czifm.id/, type => 'array'}, + { desc => q/Vandelay arguments + queue_name -- required + match_set + ... + /, + type => 'object' + } + ], + return => { + q/Object containing status information about the on-going search + and queue operation. + { + bre_count : $num, -- number of bibs to search against + search_count : $num, + search_complete : $num, + queue_count : $num + queue : $queue_obj + } + This object will be streamed back with each milestone (search + result or complete). + Event object returned on failure + / + } + } +); + +sub bucket_search_queue { + my $self = shift; + my $conn = shift; + my $auth = shift; + my $bucket_id = shift; + my $z_sources = shift; + my $z_indexes = shift; + my $vandelay = shift; + + my $e = new_editor(authtoken => $auth); + return $e->event unless + $e->checkauth and + $e->allowed('REMOTE_Z3950_QUERY') and + $e->allowed('CREATE_BIB_IMPORT_QUEUE'); + + # find the source bib records + + my $bre_ids = $e->json_query({ + select => {cbrebi => ['target_biblio_record_entry']}, + from => 'cbrebi', + where => {bucket => $bucket_id}, + distinct => 1 + }); + + # empty bucket + return {bre_count => 0} unless @$bre_ids; + + $bre_ids = [ map {$_->{target_biblio_record_entry}} @$bre_ids ]; + + $z_indexes = $e->search_config_z3950_index_field_map({id => $z_indexes}); + + return OpenILS::Event->new('BAD_PARAMS', + note => q/No z_indexes/) unless @$z_indexes; + + # build the Z39 queries for the source bib records + + my $z_searches = compile_bucket_zsearch( + $e, $bre_ids, $z_sources, $z_indexes); + + return $e->event unless $z_searches; + return {bre_count => 0} unless @$z_searches; + + my $queue = create_z39_bucket_queue($e, $bucket_id, $vandelay); + return $e->event unless $queue; + + send_and_queue_bucket_searches($conn, $e, $queue, $z_searches); + + return undef; +} + + # create the queue for storing search results +sub create_z39_bucket_queue { + my ($e, $bucket_id, $vandelay) = @_; + + my $existing = $e->search_vandelay_bib_queue({ + name => $vandelay->{queue_name}, + owner => $e->requestor->id + })->[0]; + + return $existing if $existing; + + my $queue = Fieldmapper::vandelay::bib_queue->new; + $queue->match_bucket($bucket_id); + $queue->owner($e->requestor->id); + $queue->name($vandelay->{queue_name}); + $queue->match_set($vandelay->{match_set}); + + $e->xact_begin; + unless ($e->create_vandelay_bib_queue($queue)) { + $e->rollback; + return undef; + } + $e->commit; + + return $queue; +} + +# sets the 901c value to the Z39 service and +# adds the record to the growing vandelay queue +# returns the number of successfully queued records +sub stamp_and_queue_results { + my ($e, $queue, $service, $bre_id, $result) = @_; + my $qcount = 0; + + for my $rec (@{$result->{records}}) { + # insert z39 service as the 901z + my $marc = MARC::Record->new_from_xml( + $rec->{marcxml}, 'UTF-8', 'USMARC'); + + $marc->insert_fields_ordered( + MARC::Field->new('901', '', '', z => $service)); + + # put the record into the queue + my $qrec = Fieldmapper::vandelay::queued_bib_record->new; + $qrec->marc(clean_marc($marc)); + $qrec->queue($queue->id); + + $e->xact_begin; + if ($e->create_vandelay_queued_bib_record($qrec)) { + $e->commit; + $qcount++; + } else { + my $evt = $e->die_event; + $logger->error("z39: unable to queue record: $evt"); + } + } + + return $qcount; +} + +sub send_and_queue_bucket_searches { + my ($conn, $e, $queue, $z_searches) = @_; + + my $max_parallel = 5; # TODO org setting + my $search_limit = 5; # TODO org setting + + my $response = { + bre_count => 0, + search_count => 0, + search_complete => 0, + queue_count => 0 + }; + + # searches are about to be in flight + # let the caller know we're still alive + $conn->respond($response); + + my $handle_search_result = sub { + my ($self, $req) = @_; + my $bre_id = $req->{req}->{_bre_id}; + + my @p = $req->{req}->payload->params; + $logger->debug("z39: multi-search response for request [$bre_id]". + OpenSRF::Utils::JSON->perl2JSON(\@p)); + + for my $resp (@{$req->{response}}) { + $response->{search_complete}++; + my $result = $resp->content or next; + my $service = $result->{service}; + $response->{queue_count} += + stamp_and_queue_results($e, $queue, $service, $bre_id, $result); + } + + $conn->respond($response); + }; + + my $multi_ses = OpenSRF::MultiSession->new( + app => 'open-ils.search', + cap => $max_parallel, + timeout => 120, + success_handler => $handle_search_result + ); + + # note: mult-session blocks new requests when it hits max + # parallel, so we need to cacluate summary values up front. + my %bre_uniq; + $bre_uniq{$_->{bre_id}} = 1 for @$z_searches; + $response->{bre_count} = scalar(keys %bre_uniq); + $response->{search_count} += scalar(@$z_searches); + + # let the caller know searches are on their way out + $conn->respond($response); + + for my $search (@$z_searches) { + + my $bre_id = delete $search->{bre_id}; + $search->{limit} = $search_limit; + + # toss it onto the multi-pile + my $req = $multi_ses->request( + 'open-ils.search.z3950.search_class', $e->authtoken, $search); + + $req->{_bre_id} = $bre_id; + } + + $multi_ses->session_wait(1); + $response->{queue} = $queue; + $conn->respond($response); +} + + +# creates a series of Z39.50 searchs based on the +# in-bucket records and the selected sources and indexes +sub compile_bucket_zsearch { + my ($e, $bre_ids, $z_sources, $z_indexes) = @_; + + # pre-load the metabib_field's we'll need for this batch + + my %mb_fields; + my @mb_fields = grep { $_->metabib_field } @$z_indexes; + if (@mb_fields) { + @mb_fields = map { $_->metabib_field } @mb_fields; + my $field_objs = $e->search_config_metabib_field({id => \@mb_fields}); + %mb_fields = map {$_->id => $_} @$field_objs; + } + + # pre-load the z3950_attrs we'll need for this batch + + my %z3950_attrs; + my @z3950_attrs = grep { $_->z3950_attr } @$z_indexes; + if (@z3950_attrs) { + @z3950_attrs = map { $_->z3950_attr } @z3950_attrs; + my $attr_objs = $e->search_config_z3950_attr({id => \@z3950_attrs}); + %z3950_attrs = map {$_->id => $_} @$attr_objs; + } + + # indexes with specific z3950_attr's take precedence + my @z_index_attrs = grep { $_->z3950_attr } @$z_indexes; + my @z_index_types = grep { !$_->z3950_attr } @$z_indexes; + + # for each bib record, extract the indexed value for the selected indexes. + my %z_searches; + + for my $bre_id (@$bre_ids) { + + $z_searches{$bre_id} = {}; + + for my $z_index (@z_index_attrs, @z_index_types) { + + my $bre_val; + if ($z_index->record_attr) { + + my $attrs = $U->get_bre_attrs($bre_id, $e); + $bre_val = $attrs->{$bre_id}{$z_index->record_attr}{code}; + + } else { # metabib_field + my $fid = $z_index->metabib_field; + + # the value for each field will be in the + # index class-specific table + my $entry_query = sprintf( + 'search_metabib_%s_field_entry', + $mb_fields{$fid}->field_class); + + my $entry = $e->$entry_query( + {field => $fid, source => $bre_id})->[0]; + + $bre_val = $entry->value if $entry; + } + + # no value means no search + next unless $bre_val; + + # determine which z3950 source to send this search field to + + my $z_source = []; + my $z_index_name; + if ($z_index->z3950_attr) { + + # a specific z3950_attr means this search index + # only applies to the z_source linked to the attr + + $z_index_name = $z3950_attrs{$z_index->z3950_attr}->name; + my $src = $z3950_attrs{$z_index->z3950_attr}->source; + + if (grep { $_ eq $src } @$z_sources) { + $z_searches{$bre_id}{$src} ||= { + service => [$src], + search => {} + }; + $z_searches{$bre_id}{$src}{search}{$z_index_name} = $bre_val; + + } else { + $logger->warn("z39: z3950_attr '$z_index_name' for '$src'". + " selected, but $src is not in the search list. Skipping..."); + } + + } else { + + # when a generic attr type is used, it applies to all + # z-sources, except those for which a more specific + # z3950_attr has already been applied + + $z_index_name = $z_index->z3950_attr_type; + + my @excluded; + for my $attr (values %z3950_attrs) { + push(@excluded, $attr->source) + if $attr->name eq $z_index_name; + } + + for my $src (@$z_sources) { + next if grep {$_ eq $src} @excluded; + $z_searches{$bre_id}{$src} ||= { + service => [$src], + search => {} + }; + $z_searches{$bre_id}{$src}{search}{$z_index_name} = $bre_val; + } + } + } + } + + # NOTE: ISBNs are sent through the translate_isbn1013 normalize + # before entring metabib.identifier_field_entry. As such, there + # will always be at minimum 2 ISBNs per record w/ ISBN and the + # data will be pre-sanitized. The first ISBN in the list is the + # ISBN from the record. Use that for these searches. + for my $bre_id (keys %z_searches) { + for my $src (keys %{$z_searches{$bre_id}}) { + my $blob = $z_searches{$bre_id}{$src}; + + # Sanitized ISBNs are space-separated. + # kill everything past the first space + $blob->{search}{isbn} =~ s/\s.*//g if $blob->{search}{isbn}; + } + } + + # let's turn this into something slightly more digestable + my @searches; + for my $bre_id (keys %z_searches) { + for my $blobset (values %{$z_searches{$bre_id}}) { + $blobset = [$blobset] unless ref $blobset eq 'ARRAY'; + for my $blob (@$blobset) { + $blob->{bre_id} = $bre_id; + push(@searches, $blob); + } + } + } + + return \@searches; +} + + + 1; # vim:et:ts=4:sw=4: -- 2.11.0