$self->{args} = {
lid => 0,
li => 0,
+ vqbr => 0,
copies => 0,
bibs => 0,
progress => 0,
picklist => undef,
complete => 0,
indexed => 0,
+ queue => undef,
total => 0
};
- $self->{ingest_queue} = [];
$self->{cache} = {};
$self->throttle(5) unless $self->throttle;
$self->{post_proc_queue} = [];
$self->{args}->{progress} += 1;
return $self;
}
+sub add_vqbr {
+ my $self = shift;
+ $self->{args}->{vqbr} += 1;
+ $self->{args}->{progress} += 1;
+ return $self;
+}
sub add_copy {
my $self = shift;
$self->{args}->{copies} += 1;
return $self;
}
-sub ingest_ses {
- my($self, $val) = @_;
- $self->{ingest_ses} = $val if $val;
- return $self->{ingest_ses};
-}
-
-sub push_ingest_queue {
- my($self, $rec_id) = @_;
-
- $self->ingest_ses(OpenSRF::AppSession->connect('open-ils.ingest'))
- unless $self->ingest_ses;
-
- my $req = $self->ingest_ses->request('open-ils.ingest.full.biblio.record', $rec_id);
-
- push(@{$self->{ingest_queue}}, $req);
-}
-
-sub process_ingest_records {
- my $self = shift;
- return unless @{$self->{ingest_queue}};
-
- for my $req (@{$self->{ingest_queue}}) {
-
- try {
- $req->gather(1);
- $self->{args}->{indexed} += 1;
- $self->{args}->{progress} += 1;
- } otherwise {};
-
- $self->respond;
- }
- $self->ingest_ses->disconnect;
-}
-
-
sub cache {
my($self, $org, $key, $val) = @_;
$self->{cache}->{$org} = {} unless $self->{cache}->{org};
use MARC::Record;
use MARC::Batch;
use MARC::File::XML (BinaryEncoding => 'UTF-8');
+use Digest::MD5 qw(md5_hex);
+use Data::Dumper;
+$Data::Dumper::Indent = 0;
my $U = 'OpenILS::Application::AppUtils';
# begins and commit transactions as it goes
sub create_lineitem_list_assets {
- my($mgr, $li_ids) = @_;
- return undef if check_import_li_marc_perms($mgr, $li_ids);
+ my($mgr, $li_ids, $vandelay) = @_;
- # create the bibs/volumes/copies and ingest the records
- for my $li_id (@$li_ids) {
+ if (check_import_li_marc_perms($mgr, $li_ids)) { # event on error
+ $logger->error("acq-vl: user does not have permission to import acq records");
+ return undef;
+ }
+
+ my $res = import_li_bibs_via_vandelay($mgr, $li_ids, $vandelay);
+ return undef unless $res;
+
+ # create the bibs/volumes/copies for the successfully imported records
+ for my $li_id (@{$res->{li_ids}}) {
$mgr->editor->xact_begin;
my $data = create_lineitem_assets($mgr, $li_id) or return undef;
$mgr->editor->xact_commit;
- # XXX ingest is in-db now
- #$mgr->push_ingest_queue($data->{li}->eg_bib_id) if $data->{new_bib};
$mgr->respond;
}
- $mgr->process_ingest_records;
- return 1;
+
+ return $res;
+}
+
+sub verify_vandelay_import_args {
+ my $vandelay = shift;
+
+ # we need a queue
+ return 0 unless $vandelay and
+ ($vandelay->{queue_name} or $vandelay->{existing_queue});
+
+ # match-based merge/overlay import
+ return 1 if $vandelay->{merge_profile} and (
+ $vandelay->{auto_overlay_exact} or
+ $vandelay->{auto_overlay_1match} or
+ $vandelay->{auto_overlay_best_match}
+ );
+
+ # no-match import
+ return 1 if $vandelay->{import_no_match};
+
+ return 0;
+}
+
+sub find_or_create_vandelay_queue {
+ my ($e, $vandelay) = @_;
+
+ my $queue;
+ if (my $name = $vandelay->{queue_name}) {
+
+ # first, see if a queue w/ this name already exists
+ # for this user. If so, use that instead.
+
+ $queue = $e->search_vandelay_bib_queue(
+ {name => $name, owner => $e->requestor->id})->[0];
+
+ if ($queue) {
+
+ $logger->info("acq-vl: using existing queue $name");
+
+ } else {
+
+ $logger->info("acq-vl: creating new vandelay queue $name");
+
+ $queue = new Fieldmapper::vandelay::bib_queue;
+ $queue->name($name);
+ $queue->queue_type('acq');
+ $queue->owner($e->requestor->id);
+ $queue->match_set($vandelay->{match_set} || undef); # avoid ''
+ $queue = $e->create_vandelay_bib_queue($queue) or return undef;
+ }
+
+ } else {
+ $queue = $e->retrieve_vandelay_bib_queue($vandelay->{existing_queue})
+ or return undef;
+ }
+
+ return $queue;
+}
+
+
+sub import_li_bibs_via_vandelay {
+ my ($mgr, $li_ids, $vandelay) = @_;
+ my $res = {li_ids => []};
+ my $e = $mgr->editor;
+ $e->xact_begin;
+
+ my $needs_importing = $e->search_acq_lineitem(
+ {id => $li_ids, eg_bib_id => undef},
+ {idlist => 1}
+ );
+
+ if (!@$needs_importing) {
+ $logger->info("acq-vl: all records already imported. no Vandelay work to do");
+ return {li_ids => $li_ids};
+ }
+
+ # add the already-imported records to the response list
+ push(@{$res->{li_ids}}, grep { $_ != @$needs_importing } @$li_ids);
+
+ $logger->info("acq-vl: processing recs via Vandelay with args: ".Dumper($vandelay));
+
+ if (!verify_vandelay_import_args($vandelay)) {
+ $logger->error("acq-vl: invalid vandelay arguments for acq import");
+ return $res;
+ }
+
+ my $queue = find_or_create_vandelay_queue($e, $vandelay) or return $res;
+ $mgr->{args}->{queue} = $queue;
+
+ # load the lineitems into the queue for merge processing
+ my @vqbr_ids;
+ my @lis;
+ for my $li_id (@$needs_importing) {
+
+ my $li = $e->retrieve_acq_lineitem($li_id) or return $res;
+
+ my $vqbr = Fieldmapper::vandelay::queued_bib_record->new;
+ $vqbr->marc($li->marc);
+ $vqbr->queue($queue->id);
+ $vqbr->bib_source($vandelay->{bib_source} || undef); # avoid ''
+ $vqbr = $e->create_vandelay_queued_bib_record($vqbr) or return $res;
+ push(@vqbr_ids, $vqbr->id);
+ $mgr->add_vqbr;
+ $mgr->respond;
+
+ # tell the acq record which vandelay record it's linked to
+ $li->queued_record($vqbr->id);
+ $e->update_acq_lineitem($li) or return $res;
+ push(@lis, $li);
+ }
+
+ $logger->info("acq-vl: created vandelay records [@vqbr_ids]");
+
+ # we have to commit the transaction now since
+ # vandelay uses its own transactions.
+ $e->commit;
+
+ # Import the bibs via vandelay. Note: Vandely will
+ # update acq.lineitem.eg_bib_id on successful import.
+
+ $vandelay->{report_all} = 1;
+ my $ses = OpenSRF::AppSession->create('open-ils.vandelay');
+ my $req = $ses->request(
+ 'open-ils.vandelay.bib_record.list.import',
+ $e->authtoken, \@vqbr_ids, $vandelay);
+
+ # pull the responses, noting all that were successfully imported
+ my @success_lis;
+ while (my $resp = $req->recv(timeout => 600)) {
+ my $stat = $resp->content;
+
+ if(!$stat or $U->event_code($stat)) { # import failure
+ $logger->error("acq-vl: error importing vandelay record " . Dumper($stat));
+ next;
+ }
+
+ # "imported" refers to the vqbr id, not the
+ # success/failure of the vqbr merge attempt
+ next unless $stat->{imported};
+
+ my ($imported) = grep {$_->queued_record eq $stat->{imported}} @lis;
+ my $li_id = $imported->id;
+
+ if ($stat->{no_import}) {
+ $logger->info("acq-vl: acq lineitem $li_id did not import");
+
+ } else { # successful import
+
+ push(@success_lis, $li_id);
+ $mgr->add_bib;
+ $mgr->respond;
+ $logger->info("acq-vl: acq lineitem $li_id successfully merged/imported");
+ }
+ }
+
+ $ses->kill_me;
+ $logger->info("acq-vl: successfully imported lineitems [@success_lis]");
+
+ # add the successfully imported lineitems to the already-imported lineitems
+ push (@{$res->{li_ids}}, @success_lis);
+
+ return $res;
}
# returns event on error, undef on success
}
]) or return 0;
- # -----------------------------------------------------------------
- # first, create the bib record if necessary
- # -----------------------------------------------------------------
- my $new_bib = 0;
- unless($li->eg_bib_id) {
- create_bib($mgr, $li) or return 0;
- $new_bib = 1;
- }
-
+ # note: at this point, the bib record this LI links to should already be created
# -----------------------------------------------------------------
# The lineitem is going live, promote user request holds to real holds
create_copy($mgr, $volume, $lid, $li) or return 0;
}
- return { li => $li, new_bib => $new_bib };
-}
-
-sub create_bib {
- my($mgr, $li) = @_;
-
- my $record = OpenILS::Application::Cat::BibCommon->biblio_record_xml_import(
- $mgr->editor,
- $li->marc,
- undef, # bib source
- undef,
- 1, # override tcn collisions
- );
-
- if($U->event_code($record)) {
- $mgr->editor->event($record);
- $mgr->editor->rollback;
- return 0;
- }
-
- $li->eg_bib_id($record->id);
- $mgr->add_bib;
- return update_lineitem($mgr, $li);
+ return { li => $li };
}
sub create_volume {
method => 'upload_records',
api_name => 'open-ils.acq.process_upload_records',
stream => 1,
+ max_chunk_count => 1
);
sub upload_records {
- my($self, $conn, $auth, $key) = @_;
+ my($self, $conn, $auth, $key, $args) = @_;
+ $args ||= {};
my $e = new_editor(authtoken => $auth, xact => 1);
return $e->die_event unless $e->checkauth;
my $cache = OpenSRF::Utils::Cache->new;
my $data = $cache->get_cache("vandelay_import_spool_$key");
- my $purpose = $data->{purpose};
my $filename = $data->{path};
- my $provider = $data->{provider};
- my $picklist = $data->{picklist};
- my $create_po = $data->{create_po};
- my $activate_po = $data->{activate_po};
- my $ordering_agency = $data->{ordering_agency};
- my $create_assets = $data->{create_assets};
+ my $provider = $args->{provider};
+ my $picklist = $args->{picklist};
+ my $create_po = $args->{create_po};
+ my $activate_po = $args->{activate_po};
+ my $ordering_agency = $args->{ordering_agency};
+ my $vandelay = $args->{vandelay};
my $po;
my $evt;
$mgr->respond;
}
- my $die_event = activate_purchase_order_impl($mgr, $po->id) if $po and $activate_po;
- return $die_event if $die_event;
-
$e->commit;
unlink($filename);
$cache->delete_cache('vandelay_import_spool_' . $key);
- if ($create_assets) {
- create_lineitem_list_assets($mgr, \@li_list) or return $e->die_event;
+ if ($po and $activate_po) {
+ my $die_event = activate_purchase_order_impl($mgr, $po->id, $vandelay);
+ return $die_event if $die_event;
+
+ } elsif ($vandelay) {
+ create_lineitem_list_assets($mgr, \@li_list, $vandelay) or return $e->die_event;
}
return $mgr->respond_complete;
{desc => 'The purchase order id', type => 'number'},
],
return => {desc => 'Streams a total versus completed counts object, event on error'}
- }
+ },
+ max_chunk_count => 1
);
sub create_po_assets {
- my($self, $conn, $auth, $po_id) = @_;
+ my($self, $conn, $auth, $po_id, $args) = @_;
+ $args ||= {};
my $e = new_editor(authtoken=>$auth, xact=>1);
return $e->die_event unless $e->checkauth;
$mgr->total(scalar(@$li_ids) + $lid_total);
- create_lineitem_list_assets($mgr, $li_ids) or return $e->die_event;
+ create_lineitem_list_assets($mgr, $li_ids, $args->{vandelay})
+ or return $e->die_event;
$e->xact_begin;
update_purchase_order($mgr, $po) or return $e->die_event;
{desc => 'purchase_order to create', type => 'object'}
],
return => {desc => 'The purchase order id, Event on failure'}
- }
+ },
+ max_chunk_count => 1
);
sub create_purchase_order_api {
$pargs{provider} = $po->provider if $po->provider;
$pargs{ordering_agency} = $po->ordering_agency if $po->ordering_agency;
$pargs{prepayment_required} = $po->prepayment_required if $po->prepayment_required;
+ my $vandelay = $args->{vandelay};
$po = create_purchase_order($mgr, %pargs) or return $e->die_event;
# commit before starting the asset creation
$e->xact_commit;
- if($li_ids and $$args{create_assets}) {
- create_lineitem_list_assets($mgr, $li_ids) or return $e->die_event;
+ if($li_ids and $vandelay) {
+ create_lineitem_list_assets($mgr, $li_ids, $vandelay) or return $e->die_event;
}
return $mgr->respond_complete;
my $res = receive_lineitem($mgr, $li_id) or return $e->die_event;
$e->commit;
$conn->respond_complete($res);
- $mgr->run_post_response_hooks;
+ $mgr->run_post_response_hooks
}
);
sub activate_purchase_order {
- my($self, $conn, $auth, $po_id) = @_;
+ my($self, $conn, $auth, $po_id, $vandelay) = @_;
my $dry_run = ($self->api_name =~ /\.dry_run/) ? 1 : 0;
- my $e = new_editor(xact=>1, authtoken=>$auth);
+ my $e = new_editor(authtoken=>$auth);
return $e->die_event unless $e->checkauth;
my $mgr = OpenILS::Application::Acq::BatchManager->new(editor => $e, conn => $conn);
- my $die_event = activate_purchase_order_impl($mgr, $po_id, $dry_run);
+ my $die_event = activate_purchase_order_impl($mgr, $po_id, $vandelay, $dry_run);
return $e->die_event if $die_event;
- if ($dry_run) {
- $e->rollback;
- } else {
- $e->commit;
- }
$conn->respond_complete(1);
- $mgr->run_post_response_hooks;
+ $mgr->run_post_response_hooks unless $dry_run;
return undef;
}
+# xacts managed within
sub activate_purchase_order_impl {
- my ($mgr, $po_id, $dry_run) = @_;
+ my ($mgr, $po_id, $vandelay, $dry_run) = @_;
+
+ # read-only until lineitem asset creation
my $e = $mgr->editor;
+ $e->xact_begin;
my $po = $e->retrieve_acq_purchase_order($po_id) or return $e->die_event;
return $e->die_event unless $e->allowed('CREATE_PURCHASE_ORDER', $po->ordering_agency);
-
my $provider = $e->retrieve_acq_provider($po->provider);
- $po->state('on-order');
- $po->order_date('now');
- update_purchase_order($mgr, $po) or return $e->die_event;
+ # find lineitems and create assets for all
- my $query = [
- {
- purchase_order => $po_id,
- state => [qw/pending-order new order-ready/]
- },
- {limit => 1}
- ];
+ my $query = {
+ purchase_order => $po_id,
+ state => [qw/pending-order new order-ready/]
+ };
- while( my $li_id = $e->search_acq_lineitem($query, {idlist => 1})->[0] ) {
+ my $li_ids = $e->search_acq_lineitem($query, {idlist => 1});
- my $li;
- if($dry_run) {
- $li = $e->retrieve_acq_lineitem($li_id);
- } else {
- # can't activate a PO w/o assets. Create lineitem assets as necessary
- my $data = create_lineitem_assets($mgr, $li_id) or return $e->die_event;
- $li = $data->{li};
+ my $vl_resp; # imported li's and the queue the managing queue
+ if (!$dry_run) {
+ $e->rollback; # read-only thus far
+ $vl_resp = create_lineitem_list_assets($mgr, $li_ids, $vandelay)
+ or return OpenILS::Event->new('ACQ_LI_IMPORT_FAILED');
+ $e->xact_begin;
+ }
+
+ # create fund debits for lineitems
+
+ for my $li_id (@$li_ids) {
+ my $li = $e->retrieve_acq_lineitem($li_id);
+
+ if (!$li->eg_bib_id and !$dry_run) {
+ # we encountered a lineitem that was not successfully imported.
+ # we cannot continue. rollback and report.
+ $e->rollback;
+ return OpenILS::Event->new('ACQ_LI_IMPORT_FAILED', {queue => $vl_resp->{queue}});
}
$li->state('on-order');
$mgr->respond;
}
+ # create po-item debits
+
for my $po_item (@{$e->search_acq_po_item({purchase_order => $po_id})}) {
my $debit = create_fund_debit(
$mgr->respond;
}
+ # mark PO as ordered
+
+ $po->state('on-order');
+ $po->order_date('now');
+ update_purchase_order($mgr, $po) or return $e->die_event;
+
+ # clean up the xact
+ $dry_run and $e->rollback or $e->commit;
+
# tell the world we activated a PO
$U->create_events_for_hook('acqpo.activated', $po, $po->ordering_agency) unless $dry_run;