Skip to content

Commit

Permalink
Fix lineage for orphaned datasets (#2314)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Collado <[email protected]>

Signed-off-by: Michael Collado <[email protected]>
Co-authored-by: Willy Lulciuc <[email protected]>
  • Loading branch information
collado-mike and wslulciuc authored Dec 12, 2022
1 parent 67027dd commit 3212c8f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 0 deletions.
8 changes: 8 additions & 0 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids,
WHERE ds.uuid IN (<dsUuids>)""")
Set<DatasetData> getDatasetData(@BindList Set<UUID> dsUuids);

@SqlQuery(
"""
SELECT ds.*, dv.fields, dv.lifecycle_state
FROM datasets_view ds
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName""")
DatasetData getDatasetData(String namespaceName, String datasetName);

@SqlQuery(
"""
SELECT j.uuid FROM jobs j
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.google.common.base.Functions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -81,6 +82,18 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
if (!datasetIds.isEmpty()) {
datasets.addAll(this.getDatasetData(datasetIds));
}
if (nodeId.isDatasetType()
&& datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) {
log.warn(
"Found jobs {} which no longer share lineage with dataset {} - discarding",
jobData.stream().map(JobData::getId).toList());
DatasetId datasetId = nodeId.asDatasetId();
DatasetData datasetData =
getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());
return new Lineage(
ImmutableSortedSet.of(
Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build()));
}

return toLineage(jobData, datasets);
}
Expand Down
33 changes: 33 additions & 0 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import marquez.api.JdbiUtils;
import marquez.common.models.DatasetName;
import marquez.common.models.DatasetVersionId;
import marquez.common.models.JobId;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.db.DatasetDao;
Expand Down Expand Up @@ -393,6 +394,38 @@ public void testLineageWithWithCycle() {
.matches(n -> n.isJobType() && n.asJobId().getName().getValue().equals("writeJob"));
}

@Test
public void testLineageForOrphanedDataset() {
UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
openLineageDao,
"writeJob",
"COMPLETE",
jobFacet,
Arrays.asList(),
Arrays.asList(dataset));

NodeId datasetNodeId =
NodeId.of(new NamespaceName(dataset.getNamespace()), new DatasetName(dataset.getName()));
Lineage lineage = lineageService.lineage(datasetNodeId, 2, false);
assertThat(lineage.getGraph())
.hasSize(2)
.extracting(Node::getId)
.containsExactlyInAnyOrder(
NodeId.of(new JobId(new NamespaceName(NAMESPACE), new JobName("writeJob"))),
datasetNodeId);

UpdateLineageRow updatedWriteJob =
LineageTestUtils.createLineageRow(
openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList());

lineage = lineageService.lineage(datasetNodeId, 2, false);
assertThat(lineage.getGraph())
.hasSize(1)
.extracting(Node::getId)
.containsExactlyInAnyOrder(datasetNodeId);
}

private boolean jobNameEquals(Node node, String writeJob) {
return node.getId().asJobId().getName().getValue().equals(writeJob);
}
Expand Down

0 comments on commit 3212c8f

Please sign in to comment.