indexed => 0,
total => 0
};
- $self->{ingest_queue} = [];
$self->{cache} = {};
$self->throttle(5) unless $self->throttle;
$self->{post_proc_queue} = [];
return $self;
}
-sub ingest_ses {
- my($self, $val) = @_;
- $self->{ingest_ses} = $val if $val;
- return $self->{ingest_ses};
-}
-
-sub push_ingest_queue {
- my($self, $rec_id) = @_;
-
- $self->ingest_ses(OpenSRF::AppSession->connect('open-ils.ingest'))
- unless $self->ingest_ses;
-
- my $req = $self->ingest_ses->request('open-ils.ingest.full.biblio.record', $rec_id);
-
- push(@{$self->{ingest_queue}}, $req);
-}
-
-sub process_ingest_records {
- my $self = shift;
- return unless @{$self->{ingest_queue}};
-
- for my $req (@{$self->{ingest_queue}}) {
-
- try {
- $req->gather(1);
- $self->{args}->{indexed} += 1;
- $self->{args}->{progress} += 1;
- } otherwise {};
-
- $self->respond;
- }
- $self->ingest_ses->disconnect;
-}
-
-
sub cache {
my($self, $org, $key, $val) = @_;
$self->{cache}->{$org} = {} unless $self->{cache}->{org};
my($mgr, $li_ids, $vandelay) = @_;
return undef if check_import_li_marc_perms($mgr, $li_ids);
- $li_ids = import_lineitems_via_vandelay($mgr, $li_ids, $vandelay);
- return undef unless $li_ids;
+ my $res = import_li_bibs_via_vandelay($mgr, $li_ids, $vandelay);
+ return undef unless $res;
- # create the bibs/volumes/copies and ingest the records
- for my $li_id (@$li_ids) {
+ # create the bibs/volumes/copies for the successfully imported records
+ for my $li_id (@{$res->{li_ids}}) {
$mgr->editor->xact_begin;
my $data = create_lineitem_assets($mgr, $li_id) or return undef;
$mgr->editor->xact_commit;
$mgr->respond;
}
- $mgr->process_ingest_records;
- return 1;
+
+ return $res;
}
-sub import_lineitems_via_vandelay {
+sub import_li_bibs_via_vandelay {
my ($mgr, $li_ids, $vandelay) = @_;
+ my $res = {li_ids => []};
+ my $e = $mgr->editor;
+ $e->xact_begin;
my $queue_id = $vandelay->{named_queue};
if ($vandelay->{tmp_queue}) {
my $queue = new Fieldmapper::vandelay::bib_queue;
$queue->name(md5_hex($$.rand().time)); # since it's temp, should the DB generate the name?
- $queue->owner($mgr->editor->requestor->id);
+ $queue->owner($e->requestor->id);
$queue->queue_type('acq');
$queue->temp(1);
- $queue = $mgr->editor->create_vandelay_bib_queue($queue) or return undef;
+ $queue = $e->create_vandelay_bib_queue($queue) or return $res;
$queue_id = $queue->id;
}
- return undef unless $queue_id;
+ return $res unless $queue_id;
# load the lineitems into the queue for merge processing
my @vqbr_ids;
+ my @lis;
for my $li_id (@$li_ids) {
- my $li = $mgr->editor->retrieve_acq_lineitem($li_id) or return undef;
+ my $li = $e->retrieve_acq_lineitem($li_id) or return $res;
my $rec = new Fieldmapper::vandelay::queued_bib_record;
$rec->marc($li->marc);
$rec->queue($queue_id);
$rec->bib_source($vandelay->{bib_source});
- $rec = $mgr->editor->create_vandelay_queued_bib_record->new($rec) or return undef;
+ $rec = $e->create_vandelay_queued_bib_record->new($rec) or return $res;
push(@vqbr_ids, $rec->id);
# tell the acq record which vandelay record it's linked to
$li->queued_record($rec->id);
- $mgr->editor->update_acq_lineitem($li) or return undef;
+ $e->update_acq_lineitem($li) or return $res;
+ push(@lis, $li);
}
- # attempt to import the queue and collect the results
- # any lineitems that successfully merged or imported are
- # safe for acq asset importing
+ # we have to commit the transaction now since
+ # vandelay uses its own transactions.
+ $e->commit;
- # we have to commit the transaction now since vandelay
- # importing uses its own transactions.
- $mgr->editor->commit;
+ # Import the bibs via vandelay. Note: Vandely will update
+ # the acq.lineitem's eg_bib_id on successful import.
$vandelay->{report_all} = 1;
my $ses = OpenSRF::AppSession->create('open-ils.vandelay');
'open-ils.vandelay.bib_record.list.import',
\@vqbr_ids, $vandelay);
- my @imported;
+ # pull the responses, noting all that were successfully imported
+ my @success_lis;
while (my $resp = $req->recv) {
my $stat = $resp->content;
- push(@imported, $stat->{imported})
+ push(@success_lis, map {$_->queued_rec eq $stat->{imported}} @lis)
if $stat and $stat->{imported};
}
- # start a new transaction to manage the rest of asset creation
- $mgr->editor->xact_begin;
-
- # find the acq lineitems linked to imported vandelay records
- $li_ids = $mgr->editor->json_query({
- select => {jub => ['id']},
- from => 'jub',
- where => {queued_record => \@imported}
- });
-
- return [ map {$_->{id}} @$li_ids ];
+ return {queue => $queue_id, li_ids => \@success_lis};
}
# returns event on error, undef on success
$mgr->respond;
}
- my $die_event = activate_purchase_order_impl($mgr, $po->id) if $po and $activate_po;
- return $die_event if $die_event;
-
$e->commit;
unlink($filename);
$cache->delete_cache('vandelay_import_spool_' . $key);
- if ($vandelay) {
+ if ($po and $activate_po) {
+ my $die_event = activate_purchase_order_impl($mgr, $po->id, $vandelay);
+ return $die_event if $die_event;
+
+ } elsif ($vandelay) {
create_lineitem_list_assets($mgr, \@li_list, $vandelay) or return $e->die_event;
}
$pargs{provider} = $po->provider if $po->provider;
$pargs{ordering_agency} = $po->ordering_agency if $po->ordering_agency;
$pargs{prepayment_required} = $po->prepayment_required if $po->prepayment_required;
+ my $vandelay = $args->{vandelay};
$po = create_purchase_order($mgr, %pargs) or return $e->die_event;
# commit before starting the asset creation
$e->xact_commit;
- if($li_ids and $$args{create_assets}) {
+ if($li_ids and $vandelay) {
create_lineitem_list_assets($mgr, $li_ids) or return $e->die_event;
}
my $res = receive_lineitem($mgr, $li_id) or return $e->die_event;
$e->commit;
$conn->respond_complete($res);
- $mgr->run_post_response_hooks;
+ $mgr->run_post_response_hooks
}
);
sub activate_purchase_order {
- my($self, $conn, $auth, $po_id) = @_;
+ my($self, $conn, $auth, $po_id, $vandelay) = @_;
my $dry_run = ($self->api_name =~ /\.dry_run/) ? 1 : 0;
- my $e = new_editor(xact=>1, authtoken=>$auth);
+ my $e = new_editor(authtoken=>$auth);
return $e->die_event unless $e->checkauth;
my $mgr = OpenILS::Application::Acq::BatchManager->new(editor => $e, conn => $conn);
- my $die_event = activate_purchase_order_impl($mgr, $po_id, $dry_run);
+ my $die_event = activate_purchase_order_impl($mgr, $po_id, $vandelay, $dry_run);
return $e->die_event if $die_event;
- if ($dry_run) {
- $e->rollback;
- } else {
- $e->commit;
- }
$conn->respond_complete(1);
- $mgr->run_post_response_hooks;
+ $mgr->run_post_response_hooks unless $dry_run;
return undef;
}
+# xacts managed within
sub activate_purchase_order_impl {
- my ($mgr, $po_id, $dry_run) = @_;
+ my ($mgr, $po_id, $vandelay, $dry_run) = @_;
+
+ # read-only until lineitem asset creation
my $e = $mgr->editor;
+ $e->xact_begin;
my $po = $e->retrieve_acq_purchase_order($po_id) or return $e->die_event;
return $e->die_event unless $e->allowed('CREATE_PURCHASE_ORDER', $po->ordering_agency);
-
my $provider = $e->retrieve_acq_provider($po->provider);
- $po->state('on-order');
- $po->order_date('now');
- update_purchase_order($mgr, $po) or return $e->die_event;
+ # find lineitems and create assets for all
- my $query = [
- {
- purchase_order => $po_id,
- state => [qw/pending-order new order-ready/]
- },
- {limit => 1}
- ];
+ my $query = {
+ purchase_order => $po_id,
+ state => [qw/pending-order new order-ready/]
+ };
- while( my $li_id = $e->search_acq_lineitem($query, {idlist => 1})->[0] ) {
+ my $li_ids = $e->search_acq_lineitem($query, {idlist => 1});
- my $li;
- if($dry_run) {
- $li = $e->retrieve_acq_lineitem($li_id);
- } else {
- # can't activate a PO w/o assets. Create lineitem assets as necessary
- my $data = create_lineitem_assets($mgr, $li_id) or return $e->die_event;
- $li = $data->{li};
+ my $vl_resp; # imported li's and the queue the managing queue
+ if (!$dry_run) {
+ $e->rollback; # read-only thus far
+ $vl_resp = create_lineitem_list_assets($mgr, $li_ids, $vandelay) or return $e->die_event;
+ $e->xact_begin;
+ }
+
+ # create fund debits for lineitems
+
+ for my $li_id (@$li_ids) {
+ my $li = $e->retrieve_acq_lineitem($li_id);
+
+ if (!$li->eg_bib_id and !$dry_run) {
+ # we encountered a lineitem that was not successfully imported.
+ # we cannot continue. rollback and report.
+ my $queue = $e->retrieve_vandelay_queue($vl_resp->{queue});
+ $e->rollback;
+ return OpenILS::Event->new('ACQ_LI_IMPORT_FAILED', {queue => $queue});
}
$li->state('on-order');
$mgr->respond;
}
+ # create po-item debits
+
for my $po_item (@{$e->search_acq_po_item({purchase_order => $po_id})}) {
my $debit = create_fund_debit(
$mgr->respond;
}
+ # mark PO as ordered
+
+ $po->state('on-order');
+ $po->order_date('now');
+ update_purchase_order($mgr, $po) or return $e->die_event;
+
+ # clean up the xact
+ $dry_run and $e->rollback or $e->commit;
+
# tell the world we activated a PO
$U->create_events_for_hook('acqpo.activated', $po, $po->ordering_agency) unless $dry_run;