params : any[];
observer : Observer<any>;
superseded : Boolean = false;
+ // If set, this will be used instead of a one-off OpenSRF.ClientSession.
+ session? : any;
// Last EgEvent encountered by this request.
// Most callers will not need to import EgEvent since the parsed
// event will be available here.
evt: EgEvent;
- constructor(service: String, method: String, params: any[]) {
+ constructor(service: String, method: String, params: any[], session?: any) {
this.service = service;
this.method = method;
this.params = params;
+ if (session) {
+ this.session = session;
+ } else {
+ this.session = new OpenSRF.ClientSession(service);
+ }
}
}
this.authExpired$ = new EventEmitter<EgNetRequest>();
}
- // Variadic params version
+ // Standard request call -- Variadic params version
request(service: String, method: String, ...params: any[]): Observable<any> {
return this.requestWithParamList(service, method, params);
}
// Array params version
requestWithParamList(service: String,
method: String, params: any[]): Observable<any> {
+ return this.requestCompiled(
+ new EgNetRequest(service, method, params));
+ }
- var request = new EgNetRequest(service, method, params);
-
+ requestCompiled(request: EgNetRequest): Observable<any> {
return Observable.create(
observer => {
request.observer = observer;
- this.sendRequest(request);
+ this.sendCompiledRequest(request);
}
);
}
- private sendRequest(request: EgNetRequest): void {
+ // Version with pre-compiled EgNetRequest object
+ sendCompiledRequest(request: EgNetRequest): void {
OpenSRF.Session.transport = OSRF_TRANSPORT_TYPE_WS;
var this_ = this;
- new OpenSRF.ClientSession(request.service).request({
+ request.session.request({
async : true,
method : request.method,
params : request.params,
let msg =
`${request.method} failed! stat=${statCode} msg=${statMsg}`;
console.error(msg);
+
+ if (request.service == 'open-ils.pcrud' && statCode == 401) {
+ // 401 is the PCRUD equivalent of a NO_SESSION event
+ this_.authExpired$.emit(request);
+ }
+
request.observer.error(msg);
}
--- /dev/null
+import {Injectable} from '@angular/core';
+import {Observable, Observer} from 'rxjs/Rx';
+//import {toPromise} from 'rxjs/operators';
+import {EgIdlService, EgIdlObject} from './idl.service';
+import {EgNetService, EgNetRequest} from './net.service';
+import {EgAuthService} from './auth.service';
+
+// Used for debugging.
+declare var js2JSON: (jsThing:any) => string;
+declare var OpenSRF: any; // creating sessions
+
+export interface EgPcrudReqOps {
+ authoritative?: boolean;
+ anonymous?: boolean;
+ idlist?: boolean;
+ atomic?: boolean;
+}
+
+// For for documentation purposes.
+type EgPcrudResponse = any;
+
+export class EgPcrudContext {
+
+ static verboseLogging: boolean = true; //
+ static identGenerator: number = 0; // for debug logging
+
+ private ident: number;
+ private authoritative: boolean;
+ private xactCloseMode: string;
+ private cudIdx: number;
+ private cudAction: string;
+ private cudLast: EgPcrudResponse;
+ private cudList: EgIdlObject[];
+
+ private egIdl: EgIdlService;
+ private egNet: EgNetService;
+ private egAuth: EgAuthService;
+
+ // Tracks nested CUD actions
+ cudObserver: Observer<EgPcrudResponse>;
+
+ session: any; // OpenSRF.ClientSession
+
+ constructor( // passed in by parent service -- not injected
+ egIdl: EgIdlService,
+ egNet: EgNetService,
+ egAuth: EgAuthService
+ ) {
+ this.egIdl = egIdl;
+ this.egNet = egNet;
+ this.egAuth = egAuth;
+ this.xactCloseMode = 'rollback';
+ this.ident = EgPcrudContext.identGenerator++;
+ this.session = new OpenSRF.ClientSession('open-ils.pcrud');
+ }
+
+ toString(): string {
+ return '[PCRUDContext ' + this.ident + ']';
+ }
+
+ log(msg: string): void {
+ if (EgPcrudContext.verboseLogging)
+ console.debug(this + ': ' + msg);
+ }
+
+ err(msg: string): void {
+ console.error(this + ': ' + msg);
+ }
+
+ token(reqOps?: EgPcrudReqOps): string {
+ return (reqOps && reqOps.anonymous) ?
+ 'ANONYMOUS' : this.egAuth.token();
+ }
+
+ connect(): Promise<EgPcrudContext> {
+ this.log('connect');
+ return new Promise( (resolve, reject) => {
+ this.session.connect({
+ onconnect : () => { resolve(this); }
+ });
+ })
+ }
+
+ disconnect(): void {
+ this.log('disconnect');
+ this.session.disconnect();
+ }
+
+ retrieve(fmClass: string, pkey: Number | string,
+ pcrudOps?: any, reqOps?: EgPcrudReqOps): Observable<EgPcrudResponse> {
+ if (!reqOps) reqOps = {};
+ this.authoritative = reqOps.authoritative || false;
+ return this.dispatch(
+ `open-ils.pcrud.retrieve.${fmClass}`,
+ [this.token(reqOps), pkey, pcrudOps]);
+ }
+
+ retrieveAll(fmClass: string, pcrudOps?: any,
+ reqOps?: EgPcrudReqOps): Observable<EgPcrudResponse> {
+ let search = {};
+ search[this.egIdl.classes[fmClass].pkey] = {'!=' : null};
+ return this.search(fmClass, search, pcrudOps, reqOps);
+ }
+
+ search(fmClass: string, search: any,
+ pcrudOps?: any, reqOps?: EgPcrudReqOps): Observable<EgPcrudResponse> {
+ reqOps = reqOps || {};
+ this.authoritative = reqOps.authoritative || false;
+
+ let returnType = reqOps.idlist ? 'id_list' : 'search';
+ let method = `open-ils.pcrud.${returnType}.${fmClass}`;
+
+ if (reqOps.atomic) method += '.atomic';
+
+ return this.dispatch(method, [this.token(reqOps), search, pcrudOps]);
+ }
+
+ create(list: EgIdlObject[]): Observable<EgPcrudResponse> {
+ return this.cud('create', list)
+ }
+ update(list: EgIdlObject[]): Observable<EgPcrudResponse> {
+ return this.cud('update', list)
+ }
+ remove(list: EgIdlObject[]): Observable<EgPcrudResponse> {
+ return this.cud('delete', list)
+ }
+ autoApply(list: EgIdlObject[]): Observable<EgPcrudResponse> { // RENAMED
+ return this.cud('auto', list)
+ }
+
+ xactClose(): Observable<EgPcrudResponse> {
+ return this.sendRequest(
+ 'open-ils.pcrud.transaction.' + this.xactCloseMode,
+ [this.token()]
+ );
+ };
+
+ xactBegin(): Observable<EgPcrudResponse> {
+ return this.sendRequest(
+ 'open-ils.pcrud.transaction.begin', [this.token()]
+ );
+ };
+
+ private dispatch(method: string, params: any[]): Observable<EgPcrudResponse> {
+ if (this.authoritative) {
+ return this.wrapXact(() => {
+ return this.sendRequest(method, params);
+ });
+ } else {
+ return this.sendRequest(method, params)
+ }
+ };
+
+
+ // => connect
+ // => xact_begin
+ // => action
+ // => xact_close(commit/rollback)
+ // => disconnect
+ wrapXact(mainFunc: () => Observable<EgPcrudResponse>): Observable<EgPcrudResponse> {
+ let this_ = this;
+
+ return Observable.create(observer => {
+
+ // 1. connect
+ this.connect()
+
+ // 2. start the transaction
+ .then(() => {return this_.xactBegin().toPromise()})
+
+ // 3. execute the main body
+ .then(() => {
+
+ mainFunc().subscribe(
+ res => observer.next(res),
+ err => observer.error(err),
+ () => {
+ this_.xactClose().toPromise().then(() => {
+ // 5. disconnect
+ this_.disconnect();
+ // 6. all done
+ observer.complete();
+ });
+ }
+ );
+ })
+ });
+ };
+
+ private sendRequest(method: string,
+ params: any[]): Observable<EgPcrudResponse> {
+
+ this.log(`sendRequest(${method})`);
+
+ return this.egNet.requestCompiled(
+ new EgNetRequest(
+ 'open-ils.pcrud', method, params, this.session)
+ );
+ }
+
+ private cud(action: string,
+ list: EgIdlObject | EgIdlObject[]): Observable<EgPcrudResponse> {
+
+ this.log(`CUD(): ${action}`);
+
+ this.cudIdx = 0;
+ this.cudAction = action;
+ this.xactCloseMode = 'commit';
+
+ if (!Array.isArray(list)) this.cudList = [list];
+
+ let this_ = this;
+
+ return this.wrapXact(() => {
+ return Observable.create(observer => {
+ this_.cudObserver = observer;
+ this_.nextCudRequest();
+ });
+ });
+ }
+
+ /**
+ * Loops through the list of objects to update and sends
+ * them one at a time to the server for processing. Once
+ * all are done, the cudObserver is resolved.
+ */
+ nextCudRequest(): void {
+ let this_ = this;
+
+ if (this.cudIdx >= this.cudList.length) {
+ this.cudObserver.complete();
+ return;
+ }
+
+ let action = this.cudAction;
+ let fmObj = this.cudList[this.cudIdx++];
+
+ if (action == 'auto') {
+ if (fmObj.ischanged()) action = 'update';
+ if (fmObj.isnew()) action = 'create';
+ if (fmObj.isdeleted()) action = 'delete';
+
+ if (action == 'auto') {
+ // object does not need updating; move along
+ this.nextCudRequest();
+ }
+ }
+
+ this.sendRequest(
+ `open-ils.pcrud.${action}.${fmObj.classname}`,
+ [this.token(), fmObj]
+ ).subscribe(
+ res => this_.cudObserver.next(res),
+ err => this_.cudObserver.error(err),
+ () => this_.nextCudRequest()
+ );
+ };
+}
+
+@Injectable()
+export class EgPcrudService {
+
+ constructor(
+ private egIdl: EgIdlService,
+ private egNet: EgNetService,
+ private egAuth: EgAuthService
+ ) {}
+
+ // Pass-thru functions for one-off PCRUD calls
+
+ connect(): Promise<EgPcrudContext> {
+ return this.newContext().connect();
+ }
+
+ newContext(): EgPcrudContext {
+ return new EgPcrudContext(this.egIdl, this.egNet, this.egAuth);
+ }
+
+ retrieve(fmClass: string, pkey: Number | string,
+ pcrudOps?: any, reqOps?: EgPcrudReqOps): Observable<EgPcrudResponse> {
+ return this.newContext().retrieve(fmClass, pkey, pcrudOps, reqOps);
+ }
+
+ retrieveAll(fmClass: string, pcrudOps?: any,
+ reqOps?: EgPcrudReqOps): Observable<EgPcrudResponse> {
+ return this.newContext().retrieveAll(fmClass, pcrudOps, reqOps);
+ }
+
+ search(fmClass: string, search: any,
+ pcrudOps?: any, reqOps?: EgPcrudReqOps): Observable<EgPcrudResponse> {
+ return this.newContext().search(fmClass, search, pcrudOps, reqOps);
+ }
+
+ create(list: EgIdlObject[]): Observable<EgPcrudResponse> {
+ return this.newContext().create(list);
+ }
+
+ update(list: EgIdlObject[]): Observable<EgPcrudResponse> {
+ return this.newContext().update(list);
+ }
+
+ remove(list: EgIdlObject[]): Observable<EgPcrudResponse> {
+ return this.newContext().remove(list);
+ }
+
+ autoApply(list: EgIdlObject[]): Observable<EgPcrudResponse> {
+ return this.newContext().autoApply(list);
+ }
+}
+
+