use OpenILS::Event;
use OpenSRF::EX qw(:try);
+use OpenSRF::MultiSession;
use OpenILS::Utils::ModsParser;
use OpenSRF::Utils::SettingsClient;
use OpenILS::Application::AppUtils;
method => 'bucket_search_queue',
api_name => 'open-ils.search.z3950.bucket_search_queue',
stream => 1,
+ # disable opensrf chunking so the caller can receive timely responses
+ max_chunk_size => 0,
signature => {
desc => q/
Performs a Z39.50 search for every record in a bucket, using the
params => [
{desc => q/Authentication token/, type => 'string'},
{desc => q/Bucket ID/, type => 'number'},
- {desc => q/Destination Queue name/, type => 'string'},
{desc => q/Z39 Sources. List of czs.name/, type => 'array'},
- {desc => q/Z39 Index Maps. List of czifm.id/, type => 'array'}
+ {desc => q/Z39 Index Maps. List of czifm.id/, type => 'array'},
+ { desc => q/Vandelay arguments
+ queue_name -- required
+ match_set
+ ...
+ /,
+ type => 'object'
+ }
],
return => {
q/Object containing status information about the on-going search
and queue operation.
{
- searches : {
- $zsource : {
- pending : $pending,
- sent : $sent,
- received : $received
- },
- ...
- },
- queue_count : $queue_count, # on completion; 0 on no-op
- queue : $queue_obj # on completion and action
+ bre_count : $num, -- number of bibs to search against
+ search_count : $num,
+ search_complete : $num,
+ queue_count : $num
+ queue : $queue_obj
}
This object will be streamed back with each milestone (search
result or complete).
my $conn = shift;
my $auth = shift;
my $bucket_id = shift;
- my $qname = shift;
my $z_sources = shift;
my $z_indexes = shift;
+ my $vandelay = shift;
my $e = new_editor(authtoken => $auth);
return $e->event unless
});
# empty bucket
- return {queue_count => 0} unless @$bre_ids;
+ return {bre_count => 0} unless @$bre_ids;
$bre_ids = [ map {$_->{target_biblio_record_entry}} @$bre_ids ];
$e, $bre_ids, $z_sources, $z_indexes);
return $e->event unless $z_searches;
+ return {bre_count => 0} unless keys %$z_searches;
+
+ my $queue = create_z39_bucket_queue($e, $bucket_id, $vandelay);
+ return $e->event unless $queue;
+
+ send_and_queue_bucket_searches($conn, $e, $queue, $z_searches);
+
+ # TODO: process vandelay queue if requested
+
+ return undef;
+}
+
+ # create the queue for storing search results
+sub create_z39_bucket_queue {
+ my ($e, $bucket_id, $vandelay) = @_;
+
+ my $existing = $e->search_vandelay_bib_queue({
+ name => $vandelay->{queue_name},
+ owner => $e->requestor->id
+ })->[0];
+
+ return $existing if $existing;
+
+ my $queue = Fieldmapper::vandelay::bib_queue->new;
+ $queue->match_bucket($bucket_id);
+ $queue->owner($e->requestor->id);
+ $queue->name($vandelay->{queue_name});
+ $queue->match_set($vandelay->{match_set});
+
+ $e->xact_begin;
+ unless ($e->create_vandelay_bib_queue($queue)) {
+ $e->rollback;
+ return undef;
+ }
+ $e->commit;
+
+ return $queue;
+}
+
+sub send_and_queue_bucket_searches {
+ my ($conn, $e, $queue, $z_searches) = @_;
+
+ my $max_parallel = 5; # TODO org setting
+ my $search_limit = 5; # TODO org setting
+
+ my $response = {
+ bre_count => scalar(keys %$z_searches),
+ search_count => 0,
+ search_complete => 0,
+ queue_count => 0
+ };
+
+ # searches are about to be in flight
+ $conn->respond($response);
+
+ my $handle_search_result = sub {
+ my ($self, $req) = @_;
+
+ # there will be one response per z-source
+ for my $resp (@{$req->{response}}) {
+ next unless $resp->content;
+
+ my $result = $resp->content;
+ $response->{search_complete}++;
+
+ my $service = $result->{service};
+
+ for my $rec (@{$result->{records}}) {
+ # TODO stamp zsource 901z; add to queue
+ $response->{queue_count}++;
+ $logger->info("z39: got result $rec");
+ }
+ }
+
+ $conn->respond($response);
+ };
+
+ my $multi_ses = OpenSRF::MultiSession->new(
+ app => 'open-ils.search',
+ cap => $max_parallel,
+ success_handler => $handle_search_result
+ );
+
+ # fire a Z39 search for each distinct combination of z-sources
+ for my $bre_id (keys %$z_searches) {
+
+ # build a search blob for each distinct combination of z-sources
+ my @searches;
+
+ for my $z_index (keys %{$z_searches->{$bre_id}}) {
+ my $search = $z_searches->{$bre_id}{$z_index};
+
+ my ($blob) = grep {
+ # array compare, weee
+ @{$_->{service}} ~~ @{$search->{z_source}}
+ } @searches;
+
+ if ($blob) {
+ $blob->{search}{$search->{z_index_name}} =
+ $search->{bib_value};
+ } else {
+ $blob = {
+ service => $search->{z_source},
+ $search->{z_index_name} => $search->{bib_value},
+ limit => $search_limit,
+ offset => 0
+ };
+ push(@searches, $blob);
+ }
+
+ # search count is incremented by one for every
+ # service we sent a search blob to
+ $response->{search_count}++ for @{$blob->{service}};
+ }
+
+ $multi_ses->request(
+ 'open-ils.search.z3950.search_class',
+ $e->authtoken, $_) for @searches;
+ }
+
+ $conn->respond($response); # searches in flight
+ $multi_ses->session_wait(1);
- return $z_searches; # TODO
+ $response->{queue} = $queue;
+ $conn->respond($response);
}