Z39 Batch ML
authorBill Erickson <berick@esilibrary.com>
Thu, 31 Jan 2013 17:37:46 +0000 (12:37 -0500)
committerBill Erickson <berick@esilibrary.com>
Thu, 31 Jan 2013 17:37:46 +0000 (12:37 -0500)
Signed-off-by: Bill Erickson <berick@esilibrary.com>
Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Z3950.pm

index 0036df0..de2f225 100644 (file)
@@ -11,6 +11,7 @@ use XML::LibXML;
 
 use OpenILS::Event;
 use OpenSRF::EX qw(:try);
+use OpenSRF::MultiSession;
 use OpenILS::Utils::ModsParser;
 use OpenSRF::Utils::SettingsClient;
 use OpenILS::Application::AppUtils;
@@ -484,6 +485,8 @@ __PACKAGE__->register_method(
     method    => 'bucket_search_queue',
     api_name  => 'open-ils.search.z3950.bucket_search_queue',
     stream    => 1,
+    # disable opensrf chunking so the caller can receive timely responses
+    max_chunk_size => 0,
     signature => {
         desc => q/
             Performs a Z39.50 search for every record in a bucket, using the
@@ -494,24 +497,25 @@ __PACKAGE__->register_method(
         params => [
             {desc => q/Authentication token/, type => 'string'},
             {desc => q/Bucket ID/, type => 'number'},
-            {desc => q/Destination Queue name/, type => 'string'},
             {desc => q/Z39 Sources.  List of czs.name/, type => 'array'},
-            {desc => q/Z39 Index Maps.  List of czifm.id/, type => 'array'}
+            {desc => q/Z39 Index Maps.  List of czifm.id/, type => 'array'},
+            {   desc => q/Vandelay arguments
+                    queue_name -- required
+                    match_set
+                    ...
+                    /, 
+                type => 'object'
+            }
         ],
         return => {
             q/Object containing status information about the on-going search
             and queue operation. 
             {
-                searches : { 
-                    $zsource  : {
-                        pending : $pending,
-                        sent : $sent,
-                        received : $received
-                    },
-                    ...
-                },
-                queue_count : $queue_count, # on completion; 0 on no-op
-                queue : $queue_obj # on completion and action
+                bre_count : $num, -- number of bibs to search against
+                search_count : $num,
+                search_complete  : $num,
+                queue_count  : $num
+                queue        : $queue_obj
             }
             This object will be streamed back with each milestone (search
             result or complete).
@@ -526,9 +530,9 @@ sub bucket_search_queue {
     my $conn = shift;
     my $auth = shift;
     my $bucket_id = shift;
-    my $qname = shift;
     my $z_sources = shift;
     my $z_indexes = shift;
+    my $vandelay = shift;
 
     my $e = new_editor(authtoken => $auth);
     return $e->event unless 
@@ -546,7 +550,7 @@ sub bucket_search_queue {
     });
 
     # empty bucket
-    return {queue_count => 0} unless @$bre_ids;
+    return {bre_count => 0} unless @$bre_ids;
 
     $bre_ids = [ map {$_->{target_biblio_record_entry}} @$bre_ids ];
 
@@ -561,8 +565,131 @@ sub bucket_search_queue {
         $e, $bre_ids, $z_sources, $z_indexes);
 
     return $e->event unless $z_searches;
+    return {bre_count => 0} unless keys %$z_searches;
+
+    my $queue = create_z39_bucket_queue($e, $bucket_id, $vandelay);
+    return $e->event unless $queue;
+
+    send_and_queue_bucket_searches($conn, $e, $queue, $z_searches);
+
+    # TODO: process vandelay queue if requested
+
+    return undef;
+}
+
+ # create the queue for storing search results
+sub create_z39_bucket_queue {
+    my ($e, $bucket_id, $vandelay) = @_;
+
+    my $existing = $e->search_vandelay_bib_queue({
+        name => $vandelay->{queue_name},
+        owner => $e->requestor->id
+    })->[0];
+
+    return $existing if $existing;
+
+    my $queue = Fieldmapper::vandelay::bib_queue->new;
+    $queue->match_bucket($bucket_id);
+    $queue->owner($e->requestor->id);
+    $queue->name($vandelay->{queue_name});
+    $queue->match_set($vandelay->{match_set});
+
+    $e->xact_begin;
+    unless ($e->create_vandelay_bib_queue($queue)) {
+        $e->rollback;
+        return undef;
+    }
+    $e->commit;
+
+    return $queue;
+}
+
+sub send_and_queue_bucket_searches {
+    my ($conn, $e, $queue, $z_searches) = @_;
+
+    my $max_parallel = 5; # TODO org setting
+    my $search_limit = 5; # TODO org setting
+
+    my $response = {
+        bre_count => scalar(keys %$z_searches),
+        search_count => 0,
+        search_complete => 0,
+        queue_count => 0
+    };
+
+    # searches are about to be in flight
+    $conn->respond($response);
+
+    my $handle_search_result = sub {
+        my ($self, $req) = @_;
+
+        # there will be one response per z-source
+        for my $resp (@{$req->{response}}) {
+            next unless $resp->content;
+
+            my $result = $resp->content;
+            $response->{search_complete}++;
+
+            my $service = $result->{service};
+
+            for my $rec (@{$result->{records}}) {
+                # TODO stamp zsource 901z; add to queue
+                $response->{queue_count}++;
+                $logger->info("z39: got result $rec");
+            }
+        }
+
+        $conn->respond($response);
+    };
+
+    my $multi_ses = OpenSRF::MultiSession->new(
+        app             => 'open-ils.search',
+        cap             => $max_parallel,
+        success_handler => $handle_search_result
+    );
+
+    # fire a Z39 search for each distinct combination of z-sources
+    for my $bre_id (keys %$z_searches) {
+
+        # build a search blob for each distinct combination of z-sources
+        my @searches;
+
+        for my $z_index (keys %{$z_searches->{$bre_id}}) {
+            my $search = $z_searches->{$bre_id}{$z_index};
+
+            my ($blob) = grep { 
+                # array compare, weee
+                @{$_->{service}} ~~ @{$search->{z_source}} 
+            } @searches;
+
+            if ($blob) {
+                $blob->{search}{$search->{z_index_name}} = 
+                    $search->{bib_value};
+            } else {
+                $blob = {
+                    service => $search->{z_source},
+                    $search->{z_index_name} => $search->{bib_value},
+                    limit => $search_limit,
+                    offset => 0
+                };
+                push(@searches, $blob);
+            }
+
+            # search count is incremented by one for every 
+            # service we sent a search blob to
+            $response->{search_count}++ for @{$blob->{service}};
+        }
+
+        $multi_ses->request(
+            'open-ils.search.z3950.search_class', 
+            $e->authtoken, $_) for @searches;
+    }
+
+    $conn->respond($response); # searches in flight
+    $multi_ses->session_wait(1);
 
-    return $z_searches; # TODO
+    $response->{queue} = $queue;
+    $conn->respond($response);
 }