Skip to content

Commit

Permalink
fix daily update (#148)
Browse files Browse the repository at this point in the history
* test: daily update

* fix: handle daily updates over time using imports filtered on yesterday clinical trials

* fix: handle load of no documents
  • Loading branch information
sbedeau authored Jan 10, 2024
1 parent ca210ca commit 6c5c72e
Show file tree
Hide file tree
Showing 17 changed files with 4,636 additions and 106 deletions.
2 changes: 1 addition & 1 deletion cron.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"jobs": [
{
"command": "0 4 * * * node dist/src/etl/console.js import && node dist/src/etl/console.js translate && node dist/src/etl/console.js update-meddra-labels",
"command": "0 4 * * * node dist/src/etl/console.js daily-update",
"size": "S"
}
]
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"dev": "nest start --watch",
"etl:create-index": "ts-node src/etl/console.ts create-index",
"etl:create-policies": "ts-node src/etl/console.ts create-policies",
"etl:daily-update:everything": "ts-node src/etl/console.ts daily-update 1970-01-01",
"etl:daily-update:yesterday": "ts-node src/etl/console.ts daily-update",
"etl:delete-pipelines": "ts-node src/etl/console.ts delete-pipelines",
"etl:delete-policies": "ts-node src/etl/console.ts delete-policies",
"etl:import": "ts-node src/etl/console.ts import",
Expand Down
187 changes: 185 additions & 2 deletions src/etl/EtlService.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { errors } from '@elastic/elasticsearch'
import { TransportRequestCallback } from '@elastic/elasticsearch/lib/Transport'
import fs from 'fs'
import { afterEach, beforeEach } from 'vitest'

import { EtlService } from './EtlService'
import { convertFhirParsedQueryParamsToElasticsearchQuery } from '../api/research-study/gateways/converter/convertFhirParsedQueryParamsToElasticsearchQuery'
import { ElasticsearchBodyType } from '../shared/elasticsearch/ElasticsearchBody'
import { SearchResponse } from '../shared/elasticsearch/ElasticsearchService'
import { setupDependencies } from '../shared/test/helpers/elasticsearchHelper'
import { RiphDtoTestFactory } from '../shared/test/helpers/RiphDtoTestFactory'
import { setupTranslationService } from '../shared/test/helpers/translationHelper'
import { LocalTranslator } from '../shared/translation/LocalTranslator'
import { TranslationService } from '../shared/translation/TranslationService'
import { Translator } from '../shared/translation/Translator'

describe('extract transform load service', () => {
describe('when index is created', () => {
Expand Down Expand Up @@ -66,6 +71,15 @@ describe('extract transform load service', () => {
})

describe('when import is performed', () => {
beforeEach(() => {
vi.useFakeTimers()
vi.setSystemTime(new Date('2023-03-17'))
})

afterEach(() => {
vi.useRealTimers()
})

it('should find data for each clinical trials type (CTIS, DM, JARDE)', async () => {
// GIVEN
const { databaseService, etlService, medDraFile, readerService } = await setup()
Expand Down Expand Up @@ -329,6 +343,173 @@ describe('extract transform load service', () => {
await expect(etlService.updateMeddraLabels()).rejects.toThrow('ES pipelines serach operation has failed')
})
})

describe('when daily update is performed', () => {
afterEach(() => {
vi.useRealTimers()
})

it('should import every clinical trials, translate them and update their meddra labels', async () => {
// GIVEN
const { databaseService, etlService, readerService } = await setup()
vi.spyOn(readerService, 'read')
.mockResolvedValueOnce([RiphDtoTestFactory.ctis()])
.mockResolvedValueOnce([RiphDtoTestFactory.dm()])
.mockResolvedValueOnce([RiphDtoTestFactory.jarde()])
await etlService.createIndex()
await databaseService.createMedDraIndex()
await databaseService.bulkMedDraDocuments([
{
code: '10070575',
label: 'Cancer du sein à récepteurs aux oestrogènes positifs',
},
{
code: '10065430',
label: 'Cancer du sein HER2 positif',
},
])
await databaseService.createPolicies()

// WHEN
await etlService.dailyUpdate('1970-01-01')

// THEN
const query: ElasticsearchBodyType = convertFhirParsedQueryParamsToElasticsearchQuery([{ name: '_count', value: '1000' }])
const result: SearchResponse = await databaseService.search(query)

await expect(result).toMatchFileSnapshot('../shared/test/snapshots/DailyUpdate.snap.json')
})

it('should import clinical trials from yesterday, translate them and update their meddra labels', async () => {
// GIVEN
vi.useFakeTimers()
vi.setSystemTime(new Date('2024-01-07'))

const { databaseService, etlService, readerService } = await setup()
vi.spyOn(readerService, 'read')
.mockResolvedValueOnce([
RiphDtoTestFactory.ctis(),
RiphDtoTestFactory.ctis({
dates_avis_favorable_ms_mns: '22.00800.000094-SM-1:2022-11-07, 22.00800.000094-SM-2:2024-01-06',
historique: '2024-01-06: En cours',
numero_ctis: '2024-500014-26-99',
}),
])
.mockResolvedValueOnce([RiphDtoTestFactory.dm()])
.mockResolvedValueOnce([RiphDtoTestFactory.jarde()])
await etlService.createIndex()
await databaseService.createMedDraIndex()
await databaseService.bulkMedDraDocuments([
{
code: '10070575',
label: 'Cancer du sein à récepteurs aux oestrogènes positifs',
},
{
code: '10065430',
label: 'Cancer du sein HER2 positif',
},
])
await databaseService.createPolicies()

// WHEN
await etlService.dailyUpdate()

// THEN
const query: ElasticsearchBodyType = convertFhirParsedQueryParamsToElasticsearchQuery([{ name: '_count', value: '1000' }])
const result: SearchResponse = await databaseService.search(query)

await expect(result).toMatchFileSnapshot('../shared/test/snapshots/DailyUpdateSinceYesterday.snap.json')
})

it('should not import CTIS clinical trials, translate them and update their meddra labels when there is no new ones', async () => {
// GIVEN
vi.useFakeTimers()
vi.setSystemTime(new Date('2024-01-08'))

const { databaseService, etlService, readerService } = await setup()
vi.spyOn(readerService, 'read')
.mockResolvedValueOnce([
RiphDtoTestFactory.ctis(),
RiphDtoTestFactory.ctis({
dates_avis_favorable_ms_mns: '22.00800.000094-SM-1:2022-11-07, 22.00800.000094-SM-2:2024-01-06',
historique: '2024-01-06: En cours',
numero_ctis: '2024-500014-26-99',
}),
])
.mockResolvedValueOnce([RiphDtoTestFactory.dm()])
.mockResolvedValueOnce([RiphDtoTestFactory.jarde()])
await etlService.createIndex()
await databaseService.createMedDraIndex()
await databaseService.bulkMedDraDocuments([
{
code: '10070575',
label: 'Cancer du sein à récepteurs aux oestrogènes positifs',
},
{
code: '10065430',
label: 'Cancer du sein HER2 positif',
},
])
await databaseService.createPolicies()

// WHEN
await etlService.dailyUpdate()

// THEN
const query: ElasticsearchBodyType = convertFhirParsedQueryParamsToElasticsearchQuery([{ name: '_count', value: '1000' }])
const result: SearchResponse = await databaseService.search(query)

await expect(result).toMatchFileSnapshot('../shared/test/snapshots/DailyUpdateWithoutNewOnes.snap.json')
})

it('should import clinical trials from yesterday, translate them and update their meddra labels and preserve clinical trials from the past', async () => {
// GIVEN
vi.useFakeTimers()
vi.setSystemTime(new Date('2024-01-07'))
const { databaseService, etlService, readerService } = await setup()

vi.spyOn(readerService, 'read')
.mockResolvedValueOnce([RiphDtoTestFactory.ctis()])
.mockResolvedValueOnce([RiphDtoTestFactory.dm()])
.mockResolvedValueOnce([RiphDtoTestFactory.jarde()])

await etlService.createIndex()
await databaseService.createMedDraIndex()
await databaseService.bulkMedDraDocuments([
{
code: '10070575',
label: 'Cancer du sein à récepteurs aux oestrogènes positifs',
},
{
code: '10065430',
label: 'Cancer du sein HER2 positif',
},
])
await databaseService.createPolicies()

// WHEN
await etlService.dailyUpdate('1970-01-01')

vi.spyOn(readerService, 'read')
.mockResolvedValueOnce([
RiphDtoTestFactory.ctis({
dates_avis_favorable_ms_mns: '22.00800.000094-SM-1:2022-11-07, 22.00800.000094-SM-2:2024-01-06',
historique: '2024-01-06: En cours',
numero_ctis: '2024-500014-26-99',
}),
])
.mockResolvedValueOnce([RiphDtoTestFactory.dm()])
.mockResolvedValueOnce([RiphDtoTestFactory.jarde()])

await etlService.dailyUpdate()

// THEN
const query: ElasticsearchBodyType = convertFhirParsedQueryParamsToElasticsearchQuery([{ name: '_count', value: '1000' }])
const result: SearchResponse = await databaseService.search(query)

await expect(result).toMatchFileSnapshot('../shared/test/snapshots/DailyUpdateOverTime.snap.json')
})
})
})

async function setup() {
Expand All @@ -343,7 +524,9 @@ async function setup() {
await databaseService.deleteMedDraIndex()
await databaseService.deleteAnIndex()

const translationService: TranslationService = setupTranslationService()
const translator: Translator = new LocalTranslator()
const translationService: TranslationService = new TranslationService(translator)

const etlService = new EtlService(logger, databaseService, readerService, translationService)

const medDraFile = '10000001$Pneumopathie due à la ventilation$10081988$$$$$$$N$$\n10000002$Déficience en 11-bêta-hydroxylase$10000002$$$$$$$Y$$'
Expand Down
20 changes: 14 additions & 6 deletions src/etl/EtlService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,19 @@ export class EtlService {
}
}

async import(): Promise<void> {
async dailyUpdate(startingDate?: string): Promise<void> {
this.loggerService.info('-- Début de la mise à jour quotidienne des essais cliniques du RIPH.')
await this.import(startingDate)
await this.translate(startingDate)
await this.updateMeddraLabels(startingDate)
this.loggerService.info('-- Fin de la mise à jour quotidienne des essais cliniques du RIPH.')
}

async import(startingDate?: string): Promise<void> {
this.loggerService.info('-- Début de l’import des essais cliniques du RIPH.')

const ingestPipelines: IngestPipeline[] = [
new IngestPipelineCtis(this.loggerService, this.databaseService, this.readerService),
new IngestPipelineCtis(this.loggerService, this.databaseService, this.readerService, startingDate),
new IngestPipelineDmDmdiv(this.loggerService, this.databaseService, this.readerService),
new IngestPipelineJarde(this.loggerService, this.databaseService, this.readerService),
]
Expand Down Expand Up @@ -158,12 +166,12 @@ export class EtlService {
}
}

async translate(date?: string): Promise<void> {
async translate(startingDate?: string): Promise<void> {
this.loggerService.info('-- Début de la traduction des essais cliniques CTIS.')

try {
const translationPipeline: TranslationPipeline = new TranslationPipeline(this.databaseService, this.translationService)
await translationPipeline.execute(date)
await translationPipeline.execute(startingDate)
} catch (error) {
if (error instanceof errors.ResponseError) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
Expand All @@ -175,12 +183,12 @@ export class EtlService {
this.loggerService.info('-- Fin de la traduction des essais cliniques CTIS.')
}

async updateMeddraLabels(date?: string): Promise<void> {
async updateMeddraLabels(startingDate?: string): Promise<void> {
this.loggerService.info('-- Début de la mise à jour des labels Meddra pour les essais cliniques CTIS.')

try {
const medDraPipeline: MedDraPipeline = new MedDraPipeline(this.databaseService)
await medDraPipeline.execute(date)
await medDraPipeline.execute(startingDate)
} catch (error) {
if (error instanceof errors.ResponseError) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
Expand Down
11 changes: 7 additions & 4 deletions src/etl/console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import { AppModule } from '../AppModule'
async function console(): Promise<void> {
const application = await NestFactory.createApplicationContext(AppModule)
const command = process.argv[2]
const date = process.argv[3]
const startingDate = process.argv[3]
const etlService = application.get(EtlService)

switch (command) {
case 'create-index':
await etlService.createIndex()
break
case 'import':
await etlService.import()
await etlService.import(startingDate)
break
case 'reset-index':
await etlService.deleteIndex()
Expand All @@ -33,10 +33,13 @@ async function console(): Promise<void> {
await etlService.deletePipelines()
break
case 'translate':
await etlService.translate(date)
await etlService.translate(startingDate)
break
case 'update-meddra-labels':
await etlService.updateMeddraLabels(date)
await etlService.updateMeddraLabels(startingDate)
break
case 'daily-update':
await etlService.dailyUpdate(startingDate)
break
default:
process.exit(1)
Expand Down
17 changes: 14 additions & 3 deletions src/etl/pipelines/ingest/IngestPipeline.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
import { ElasticsearchService } from '../../../shared/elasticsearch/ElasticsearchService'
import { LoggerService } from '../../../shared/logger/LoggerService'
import { ResearchStudyModel } from '../../../shared/models/domain-resources/ResearchStudyModel'
import { ModelUtils } from '../../../shared/models/eclaire/ModelUtils'
import { RiphCtisDto } from '../../dto/RiphCtisDto'
import { RiphDmDto } from '../../dto/RiphDmDto'
import { RiphJardeDto } from '../../dto/RiphJardeDto'
import { S3Service } from '../../s3/S3Service'

export abstract class IngestPipeline {
protected abstract readonly type: string
protected readonly startingDate: string

constructor(
protected readonly logger: LoggerService,
private readonly databaseService: ElasticsearchService,
private readonly readerService: S3Service
) {}
private readonly readerService: S3Service,
startingDate?: string
) {
if (startingDate) {
this.startingDate = ModelUtils.convertDateToIsoFormatWithoutTime(new Date(startingDate))
} else {
this.startingDate = ModelUtils.getDateOfYesterdayInIsoFormatAndWithoutTime()
}
}

abstract execute(): Promise<void>
abstract transform(riphDtos: RiphDto[]): ResearchStudyModel[]
Expand All @@ -26,7 +35,9 @@ export abstract class IngestPipeline {
}

async load(documents: ResearchStudyModel[]): Promise<void> {
await this.databaseService.bulkDocuments<ResearchStudyModel>(documents)
if (documents.length > 0) {
await this.databaseService.bulkDocuments<ResearchStudyModel>(documents)
}
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/etl/pipelines/ingest/IngestPipelineCtis.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { afterEach } from 'vitest'

import { IngestPipelineCtis } from './IngestPipelineCtis'
import { ResearchStudyModel } from '../../../shared/models/domain-resources/ResearchStudyModel'
import { setupDependencies } from '../../../shared/test/helpers/elasticsearchHelper'
Expand All @@ -21,8 +23,14 @@ describe('etl | IngestPipelineCtis', () => {
})

describe('transform', () => {
afterEach(() => {
vi.useRealTimers()
})

it('should transform array of raw data into a collection of research study documents', () => {
// given
vi.useFakeTimers()
vi.setSystemTime(new Date('2022-10-07'))
const riphCtisDtos = [RiphDtoTestFactory.ctis()]
const { ingestPipelineCtis } = setup()

Expand Down Expand Up @@ -58,7 +66,7 @@ function setup() {
readerService,
} = setupDependencies()

const ingestPipelineCtis = new IngestPipelineCtis(logger, databaseService, readerService)
const ingestPipelineCtis = new IngestPipelineCtis(logger, databaseService, readerService, '1970-01-01')

return { databaseService, ingestPipelineCtis, readerService }
}
Loading

0 comments on commit 6c5c72e

Please sign in to comment.