Skip to content

Commit

Permalink
refactor(cardano-services): extend typeormProvider with typeormService
Browse files Browse the repository at this point in the history
  • Loading branch information
greatertomi authored and AngelCastilloB committed Aug 22, 2023
1 parent 5f2d478 commit c078a91
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -1,72 +1,27 @@
import { BehaviorSubject, Observable, Subscription, filter, firstValueFrom, tap } from 'rxjs';
import { DataSource } from 'typeorm';
import { HealthCheckResponse, Provider } from '@cardano-sdk/core';
import { Logger } from 'ts-log';
import { Observable, skip } from 'rxjs';
import { PgConnectionConfig } from '@cardano-sdk/projection-typeorm';
import { RunnableModule, isNotNil } from '@cardano-sdk/util';
import { createTypeormDataSource } from './util';
import { TypeormService } from '../TypeormService';

export interface TypeormProviderDependencies {
logger: Logger;
entities: Function[];
connectionConfig$: Observable<PgConnectionConfig>;
}

export abstract class TypeormProvider extends RunnableModule implements Provider {
#entities: Function[];
#subscription: Subscription | undefined;
#connectionConfig$: Observable<PgConnectionConfig>;
#dataSource$ = new BehaviorSubject<DataSource | null>(null);

export abstract class TypeormProvider extends TypeormService implements Provider {
health: HealthCheckResponse = { ok: false, reason: 'not started' };

constructor(name: string, { connectionConfig$, logger, entities }: TypeormProviderDependencies) {
super(name, logger);
this.#entities = entities;
this.#connectionConfig$ = connectionConfig$;
}

#subscribeToDataSource() {
this.#subscription = createTypeormDataSource(this.#connectionConfig$, this.#entities, this.logger)
.pipe(tap(() => (this.health = { ok: true })))
.subscribe((dataSource) => this.#dataSource$.next(dataSource));
}

#reset() {
this.#subscription?.unsubscribe();
this.#subscription = undefined;
this.#dataSource$.value !== null && this.#dataSource$.next(null);
}

onError(_: unknown) {
this.#reset();
this.health = { ok: false, reason: 'Provider error' };
this.#subscribeToDataSource();
}

async withDataSource<T>(callback: (dataSource: DataSource) => Promise<T>): Promise<T> {
try {
return await callback(await firstValueFrom(this.#dataSource$.pipe(filter(isNotNil))));
} catch (error) {
this.onError(error);
throw error;
}
super(name, { connectionConfig$, entities, logger });
// We skip 1 to omit the initial null value of the subject
this.dataSource$.pipe(skip(1)).subscribe((dataSource) => {
this.health = dataSource ? { ok: true } : { ok: false, reason: 'Provider error' };
});
}

async healthCheck(): Promise<HealthCheckResponse> {
return this.health;
}

async initializeImpl() {
return Promise.resolve();
}

async startImpl() {
this.#subscribeToDataSource();
}

async shutdownImpl() {
this.#reset();
this.#dataSource$.complete();
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from './TypeormProvider';
export * from './util';
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,37 @@ import { DataSource } from 'typeorm';
import { Logger } from 'ts-log';
import { PgConnectionConfig } from '@cardano-sdk/projection-typeorm';
import { RunnableModule, isNotNil } from '@cardano-sdk/util';
import { TypeormProviderDependencies, createTypeormDataSource } from '../TypeormProvider';
import { createTypeormDataSource } from '../createTypeormDataSource';

export class TypeormService extends RunnableModule {
interface TypeormServiceDependencies {
logger: Logger;
entities: Function[];
connectionConfig$: Observable<PgConnectionConfig>;
}

export abstract class TypeormService extends RunnableModule {
#entities: Function[];
#connectionConfig$: Observable<PgConnectionConfig>;
logger: Logger;
#dataSource$ = new BehaviorSubject<DataSource | null>(null);
protected dataSource$ = new BehaviorSubject<DataSource | null>(null);
#subscription: Subscription | undefined;

constructor(name: string, { connectionConfig$, logger, entities }: TypeormProviderDependencies) {
constructor(name: string, { connectionConfig$, logger, entities }: TypeormServiceDependencies) {
super(name, logger);
this.#entities = entities;
this.#connectionConfig$ = connectionConfig$;
}

#subscribeToDataSource() {
this.#subscription = createTypeormDataSource(this.#connectionConfig$, this.#entities, this.logger).subscribe(
(dataSource) => this.#dataSource$.next(dataSource)
(dataSource) => this.dataSource$.next(dataSource)
);
}

#reset() {
this.#subscription?.unsubscribe();
this.#subscription = undefined;
this.#dataSource$.value !== null && this.#dataSource$.next(null);
this.dataSource$.value !== null && this.dataSource$.next(null);
}

onError(_: unknown) {
Expand All @@ -37,7 +43,7 @@ export class TypeormService extends RunnableModule {

async withDataSource<T>(callback: (dataSource: DataSource) => Promise<T>): Promise<T> {
try {
return await callback(await firstValueFrom(this.#dataSource$.pipe(filter(isNotNil))));
return await callback(await firstValueFrom(this.dataSource$.pipe(filter(isNotNil))));
} catch (error) {
this.onError(error);
throw error;
Expand All @@ -54,6 +60,6 @@ export class TypeormService extends RunnableModule {

async shutdownImpl() {
this.#reset();
this.#dataSource$.complete();
this.dataSource$.complete();
}
}

0 comments on commit c078a91

Please sign in to comment.