From d57ea4eee5ff93b52dd68d3fdddba87e1192f191 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Wed, 20 Dec 2023 10:08:37 +0100 Subject: [PATCH] fix broken lineage for repeated runs Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 4 + .../main/java/marquez/db/JobVersionDao.java | 2 +- .../java/marquez/db/LineageTestUtils.java | 37 +++++++++- .../marquez/service/LineageServiceTest.java | 73 +++++++++++++++++++ 4 files changed, 114 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cfa0adf0b4..fc03ef50de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.43.0...HEAD) +### Fixed: +* API: fix broken lineage graph for multiple runs of the same job.[`#2710`](https://github.com/MarquezProject/marquez/pull/2710) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Problem: lineage graph was not available for jobs run multiple times of the same job as a result of bug introduced with recent release. In order to fix the inconsistent data, [this query](https://github.com/MarquezProject/marquez/blob/83608bb13bd4dc235c065f95bebf8a88dcb53c61/api/src/main/java/marquez/db/migrations/V67_2_JobVersionsIOMappingBackfillJob.java#L19) should be run. This is not required when upgrading directly to this version.* + ## [0.43.0](https://github.com/MarquezProject/marquez/compare/0.42.0...0.43.0) - 2023-12-15 ### Added * API: refactor the `RunDao` SQL query [`#2685`](https://github.com/MarquezProject/marquez/pull/2685) [@sophiely](https://github.com/sophiely) diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index b4be634f3f..77688e0dcd 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -204,7 +204,7 @@ ExtendedJobVersionRow upsertJobVersion( INSERT INTO job_versions_io_mapping ( job_version_uuid, dataset_uuid, io_type, job_uuid, job_symlink_target_uuid, is_current_job_version, made_current_at) VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE, NOW()) - ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO NOTHING + ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_current_job_version = TRUE """) void upsertCurrentInputOrOutputDatasetFor( UUID jobVersionUuid, diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index 0bcc138897..19d3752dee 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -116,13 +116,48 @@ public static UpdateLineageRow createLineageRow( List outputs, @Valid LineageEvent.ParentRunFacet parentRunFacet, ImmutableMap runFacets) { + return createLineageRow( + dao, + jobName, + UUID.randomUUID(), + status, + jobFacet, + inputs, + outputs, + parentRunFacet, + runFacets); + } + + /** + * Create an {@link UpdateLineageRow} from the input job details and datasets. + * + * @param dao + * @param jobName + * @param runId + * @param status + * @param jobFacet + * @param inputs + * @param outputs + * @param parentRunFacet + * @param runFacets + * @return + */ + public static UpdateLineageRow createLineageRow( + OpenLineageDao dao, + String jobName, + UUID runId, + String status, + JobFacet jobFacet, + List inputs, + List outputs, + @Valid LineageEvent.ParentRunFacet parentRunFacet, + ImmutableMap runFacets) { NominalTimeRunFacet nominalTimeRunFacet = new NominalTimeRunFacet(); nominalTimeRunFacet.setNominalStartTime( Instant.now().atZone(LOCAL_ZONE).truncatedTo(ChronoUnit.HOURS)); nominalTimeRunFacet.setNominalEndTime( nominalTimeRunFacet.getNominalStartTime().plus(1, ChronoUnit.HOURS)); - UUID runId = UUID.randomUUID(); LineageEvent event = LineageEvent.builder() .eventType(status) diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 9366bbb8b9..1063f00aa9 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -16,6 +16,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.stream.Collectors; import marquez.api.JdbiUtils; import marquez.common.models.DatasetId; @@ -56,6 +57,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @ExtendWith(MarquezJdbiExternalPostgresExtension.class) public class LineageServiceTest { @@ -427,6 +429,77 @@ public void testLineageWithWithCycle() { .matches(n -> n.isJobType() && n.asJobId().getName().getValue().equals("writeJob")); } + @Test + public void testGetLineageJobRunTwice() { + Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build(); + Dataset output = Dataset.builder().name("output-dataset").namespace(NAMESPACE).build(); + UUID runId = UUID.randomUUID(); + + // (1) Run batch job which outputs input-dataset + LineageTestUtils.createLineageRow( + openLineageDao, + "someJob", + runId, + "START", + jobFacet, + Arrays.asList(input), + Collections.emptyList(), + null, + ImmutableMap.of()); + + LineageTestUtils.createLineageRow( + openLineageDao, + "someJob", + runId, + "COMPLETE", + jobFacet, + Collections.emptyList(), + Arrays.asList(output), + null, + ImmutableMap.of()); + + // (2) Rerun it + LineageTestUtils.createLineageRow( + openLineageDao, + "someJob", + runId, + "START", + jobFacet, + Arrays.asList(input), + Collections.emptyList(), + null, + ImmutableMap.of()); + + LineageTestUtils.createLineageRow( + openLineageDao, + "someJob", + runId, + "COMPLETE", + jobFacet, + Collections.emptyList(), + Arrays.asList(output), + null, + ImmutableMap.of()); + + // (4) lineage on output dataset shall be same as lineage on input dataset + Lineage lineageFromInput = + lineageService.lineage( + NodeId.of( + new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("input-dataset"))), + 5, + true); + + Lineage lineageFromOutput = + lineageService.lineage( + NodeId.of( + new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))), + 5, + true); + + assertThat(lineageFromInput.getGraph()).hasSize(3); // 2 datasets + 1 job + assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph()); + } + @Test public void testGetLineageForRunningStreamingJob() { Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build();