Skip to content
This repository has been archived by the owner on Mar 14, 2024. It is now read-only.

Commit

Permalink
calculate and compare report data during change detection
Browse files Browse the repository at this point in the history
  • Loading branch information
sleidig committed Feb 15, 2024
1 parent 6ae92b5 commit 4474d74
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 40 deletions.
3 changes: 2 additions & 1 deletion src/domain/report-data-change-event.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Reference } from './reference';
import { ReportCalculation } from './report-calculation';

/**
* Used as core that a report's calculated results have changed, due to updates in the underlying database.
Expand All @@ -8,5 +9,5 @@ export interface ReportDataChangeEvent {
report: Reference;

/** The calculation containing the latest data after the change, ready to be fetched */
calculation: Reference;
calculation: ReportCalculation;
}
72 changes: 49 additions & 23 deletions src/report-changes/core/couchdb-report-changes.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { EntityDoc, ReportChangeDetector } from './report-change.detector';
import { NotificationService } from '../../notification/core/notification.service';
import { Reference } from '../../domain/reference';
import { ReportDataChangeEvent } from '../../domain/report-data-change-event';
import { ReportCalculation } from '../../domain/report-calculation';
import { ReportCalculationOutcomeSuccess } from '../../domain/report-calculation';
import {
CouchDbChangeResult,
CouchDbChangesResponse,
Expand All @@ -12,7 +12,11 @@ import { Report } from '../../domain/report';
import { ReportChangesService } from './report-changes.service';
import { CouchdbChangesService } from '../storage/couchdb-changes.service';
import { DefaultReportStorage } from '../../report/storage/report-storage.service';
import { map, mergeAll, tap } from 'rxjs';
import { filter, map, mergeAll, Observable, switchMap, tap, zip } from 'rxjs';
import {
CreateReportCalculationFailed,
CreateReportCalculationUseCase,
} from '../../report/core/use-cases/create-report-calculation-use-case.service';

@Injectable()
export class CouchdbReportChangesService implements ReportChangesService {
Expand All @@ -22,6 +26,7 @@ export class CouchdbReportChangesService implements ReportChangesService {
private notificationService: NotificationService,
private reportStorage: DefaultReportStorage,
private couchdbChangesRepository: CouchdbChangesService,
private createReportCalculation: CreateReportCalculationUseCase,
) {
this.notificationService
.activeReports()
Expand Down Expand Up @@ -78,13 +83,15 @@ export class CouchdbReportChangesService implements ReportChangesService {
this.checkReportConfigUpdate(change),
),
map((c: CouchDbChangeResult) => this.getChangeDetails(c)),
map((change: DocChangeDetails) => this.changeIsAffectingReport(change)),
switchMap((change: DocChangeDetails) =>
this.changeIsAffectingReport(change),
),
// TODO: collect a batch of changes for a while before checking?
)
.subscribe((affectedReports: ReportDataChangeEvent[]) => {
affectedReports.forEach((event) => {
this.notificationService.triggerNotification(event),
console.log('Report change detected:', event);
this.notificationService.triggerNotification(event);
console.log('Report change detected:', event);
});
});
}
Expand All @@ -108,8 +115,8 @@ export class CouchdbReportChangesService implements ReportChangesService {

private changeIsAffectingReport(
docChange: DocChangeDetails,
): ReportDataChangeEvent[] {
const affectedReports = [];
): Observable<ReportDataChangeEvent[]> {
const affectedReports: Observable<ReportDataChangeEvent>[] = [];

for (const [reportId, changeDetector] of this.reportMonitors.entries()) {
if (!changeDetector.affectsReport(docChange)) {
Expand All @@ -118,24 +125,43 @@ export class CouchdbReportChangesService implements ReportChangesService {

const reportRef = new Reference(reportId);

// (!!!) TODO: calculate a new report calculation here (or in ChangeDetector?)
// --> move ReportCalculationController implementation into a core service with .triggerCalculation(reportId) method
// const newResult = await this.reportService.runReportCalculation(reportId);
// if (newResult.hash !== oldResult.hash)
const calculation: ReportCalculation = new ReportCalculation(
'x',
reportRef,
);

const event: ReportDataChangeEvent = {
report: reportRef,
calculation: reportRef,
};

affectedReports.push(event);
const reportChangeEventObservable = this.createReportCalculation
.startReportCalculation(changeDetector.report)
.pipe(
switchMap((outcome) => {
if (outcome instanceof CreateReportCalculationFailed) {
// TODO: what do we do here in case of failure?
throw new Error('Report calculation failed');
}

return this.createReportCalculation.getCompletedReportCalculation(
new Reference(outcome.result.id),
);
}),
filter(
(calcUpdate) =>
(calcUpdate.outcome as ReportCalculationOutcomeSuccess)
?.result_hash !== changeDetector.lastCalculationHash,
),
tap(
(calcUpdate) =>
(changeDetector.lastCalculationHash = (
calcUpdate.outcome as ReportCalculationOutcomeSuccess
)?.result_hash),
),
map(
(result) =>
({
report: result.report,
calculation: result,
}) as ReportDataChangeEvent,
),
);

affectedReports.push(reportChangeEventObservable);
}

return affectedReports;
return zip(affectedReports);
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/report-changes/core/report-change.detector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ import { Report } from '../../domain/report';
import { DocChangeDetails } from './couchdb-report-changes.service';

export class ReportChangeDetector {
private report?: Report;
public report: Report;
public lastCalculationHash: string | undefined;

private sqlTableNames: string[] = [];

constructor(report: Report) {
this.report = report;
this.updateReportConfig(report);
}

Expand Down
31 changes: 19 additions & 12 deletions src/report/controller/report-calculation.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
Controller,
Get,
Headers,
InternalServerErrorException,
NotFoundException,
Param,
Post,
Expand All @@ -11,11 +12,17 @@ import { map, Observable, switchMap } from 'rxjs';
import { ReportCalculation } from '../../domain/report-calculation';
import { Reference } from '../../domain/reference';
import { ReportData } from '../../domain/report-data';
import { v4 as uuidv4 } from 'uuid';
import {
CreateReportCalculationFailed,
CreateReportCalculationUseCase,
} from '../core/use-cases/create-report-calculation-use-case.service';

@Controller('/api/v1/reporting')
export class ReportCalculationController {
constructor(private reportStorage: DefaultReportStorage) {}
constructor(
private reportStorage: DefaultReportStorage,
private createReportCalculation: CreateReportCalculationUseCase,
) {}

@Post('/report-calculation/report/:reportId')
startCalculation(
Expand All @@ -28,16 +35,16 @@ export class ReportCalculationController {
throw new NotFoundException();
}

return this.reportStorage
.storeCalculation(
new ReportCalculation(
`ReportCalculation:${uuidv4()}`,
new Reference(reportId),
),
)
.pipe(
map((reportCalculation) => new Reference(reportCalculation.id)),
);
return this.createReportCalculation.startReportCalculation(value).pipe(
map((outcome) => {
if (outcome instanceof CreateReportCalculationFailed) {
// TODO: other error codes?
throw new InternalServerErrorException();
}

return new Reference(outcome.result.id);
}),
);
}),
);
}
Expand Down
4 changes: 3 additions & 1 deletion src/report/core/report-storage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Reference } from '../../domain/reference';
import { Report } from '../../domain/report';
import { Observable } from 'rxjs';
import { Observable, Subject } from 'rxjs';
import { ReportCalculation } from '../../domain/report-calculation';
import { ReportData } from '../../domain/report-data';

Expand All @@ -27,4 +27,6 @@ export interface ReportStorage {
fetchData(runRef: Reference): Observable<ReportData | undefined>;

isCalculationOngoing(reportRef: Reference): Observable<boolean>;

reportCalculationUpdated: Subject<ReportCalculation>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Test, TestingModule } from '@nestjs/testing';
import { CreateReportCalculationUseCase } from './create-report-calculation-use-case.service';

describe('CreateReportCalculationUseCaseService', () => {
let service: CreateReportCalculationUseCase;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [CreateReportCalculationUseCase],
}).compile();

service = module.get<CreateReportCalculationUseCase>(
CreateReportCalculationUseCase,
);
});

it('should be defined', () => {
expect(service).toBeDefined();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { Injectable } from '@nestjs/common';
import { Reference } from '../../../domain/reference';
import { filter, map, merge, Observable, take } from 'rxjs';
import {
ReportCalculation,
ReportCalculationStatus,
} from '../../../domain/report-calculation';
import { v4 as uuidv4 } from 'uuid';
import { Report } from '../../../domain/report';
import { DefaultReportStorage } from '../../storage/report-storage.service';

@Injectable()
export class CreateReportCalculationUseCase {
constructor(private reportStorage: DefaultReportStorage) {}

startReportCalculation(
report: Report,
): Observable<CreateReportCalculationOutcome> {
const calculation = new ReportCalculation(
`ReportCalculation:${uuidv4()}`,
new Reference(report.id),
);
return this.reportStorage
.storeCalculation(calculation)
.pipe(
map((calculation) => new CreateReportCalculationSuccess(calculation)),
);
}

getCompletedReportCalculation(
reportCalculation: Reference,
): Observable<ReportCalculation> {
return merge(
this.reportStorage.fetchCalculation(reportCalculation).pipe(
map((calc) => {
if (!calc) {
throw new Error('Report calculation not found');
// TODO: can this really return undefined? Looks like it would throw instead (which seems a good way to handle it to me)
}
return calc as ReportCalculation;
}),
),
this.reportStorage.reportCalculationUpdated,
).pipe(
filter((calcUpdate) => calcUpdate?.id === reportCalculation.id),
filter(
(calcUpdate) =>
calcUpdate?.status === ReportCalculationStatus.FINISHED_SUCCESS ||
calcUpdate?.status === ReportCalculationStatus.FINISHED_ERROR,
),
take(1),
);
}
}

export type CreateReportCalculationOutcome =
| CreateReportCalculationSuccess
| CreateReportCalculationFailed;

export class CreateReportCalculationSuccess {
constructor(public result: ReportCalculation) {}
}

export class CreateReportCalculationFailed {
constructor(
public errorMessage: string,
public errorCode: CreateReportCalculationError,
public error?: any,
) {}
}

export enum CreateReportCalculationError {
NotImplemented,
}
4 changes: 3 additions & 1 deletion src/report/report.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { ReportCalculationTask } from './tasks/report-calculation-task.service';
import { ReportCalculationProcessor } from './tasks/report-calculation-processor.service';
import { SqsReportCalculator } from './core/sqs-report-calculator.service';
import { CouchDbClient } from '../couchdb/couch-db-client.service';
import { CreateReportCalculationUseCase } from './core/use-cases/create-report-calculation-use-case.service';

@Module({
controllers: [ReportController, ReportCalculationController],
Expand All @@ -21,7 +22,8 @@ import { CouchDbClient } from '../couchdb/couch-db-client.service';
ReportCalculationProcessor,
SqsReportCalculator,
CouchDbClient,
CreateReportCalculationUseCase,
],
exports: [DefaultReportStorage],
exports: [DefaultReportStorage, CreateReportCalculationUseCase],
})
export class ReportModule {}
5 changes: 4 additions & 1 deletion src/report/storage/report-storage.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Reference } from '../../domain/reference';
import { Report } from '../../domain/report';
import { ReportStorage } from '../core/report-storage';
import { ReportRepository } from '../repository/report-repository.service';
import { map, Observable, switchMap } from 'rxjs';
import { map, Observable, Subject, switchMap, tap } from 'rxjs';
import { Injectable, NotFoundException } from '@nestjs/common';
import {
ReportCalculation,
Expand All @@ -21,6 +21,8 @@ export class DefaultReportStorage implements ReportStorage {
private reportCalculationRepository: ReportCalculationRepository,
) {}

reportCalculationUpdated = new Subject<ReportCalculation>();

fetchAllReports(authToken: string, mode = 'sql'): Observable<Report[]> {
return this.reportRepository.fetchReports(authToken).pipe(
map((response) => {
Expand Down Expand Up @@ -137,6 +139,7 @@ export class DefaultReportStorage implements ReportStorage {
return value;
}
}),
tap((calculation) => this.reportCalculationUpdated.next(calculation)),
);
}

Expand Down

0 comments on commit 4474d74

Please sign in to comment.