From e11a792779888a3881b43cea7f529fcc870d0bda Mon Sep 17 00:00:00 2001 From: Willy Lulciuc Date: Tue, 13 Dec 2022 01:27:23 +0100 Subject: [PATCH] Ensure job data in lineage query is not null or empty (#2253) * Ensure job data in lineage query is not null or empty Signed-off-by: wslulciuc * continued: Ensure job data in lineage query is not null or empty Signed-off-by: wslulciuc * Add toLineageWithOrphanDataset() to build orphan graph Signed-off-by: wslulciuc * continued: Add toLineageWithOrphanDataset() to build orphan graph Signed-off-by: wslulciuc * continued: Add toLineageWithOrphanDataset() to build orphan graph Signed-off-by: wslulciuc * Return orphan graph on failed lookup for job when dataset nodeID provided Signed-off-by: wslulciuc Signed-off-by: wslulciuc --- .../main/java/marquez/api/BaseResource.java | 32 ++++++++++++++ .../java/marquez/api/OpenLineageResource.java | 1 + .../java/marquez/service/LineageService.java | 44 ++++++++++++++----- .../marquez/api/OpenLineageResourceTest.java | 9 +++- 4 files changed, 73 insertions(+), 13 deletions(-) diff --git a/api/src/main/java/marquez/api/BaseResource.java b/api/src/main/java/marquez/api/BaseResource.java index 7b116ab596..ee96be6a9b 100644 --- a/api/src/main/java/marquez/api/BaseResource.java +++ b/api/src/main/java/marquez/api/BaseResource.java @@ -18,9 +18,11 @@ import marquez.api.exceptions.RunAlreadyExistsException; import marquez.api.exceptions.RunNotFoundException; import marquez.api.exceptions.SourceNotFoundException; +import marquez.common.models.DatasetFieldId; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.FieldName; +import marquez.common.models.JobId; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; import marquez.common.models.RunId; @@ -37,6 +39,7 @@ import marquez.service.ServiceFactory; import marquez.service.SourceService; import marquez.service.TagService; +import marquez.service.models.NodeId; import marquez.service.models.Run; public class BaseResource { @@ -74,6 +77,10 @@ void throwIfNotExists(@NonNull NamespaceName namespaceName) { } } + void throwIfNotExists(@NonNull DatasetId datasetId) { + throwIfNotExists(datasetId.getNamespace(), datasetId.getName()); + } + void throwIfNotExists(@NonNull NamespaceName namespaceName, @NonNull DatasetName datasetName) { if (!datasetService.exists(namespaceName.getValue(), datasetName.getValue())) { throw new DatasetNotFoundException(datasetName); @@ -86,6 +93,13 @@ void throwIfSourceNotExists(SourceName sourceName) { } } + void throwIfNotExists(@NonNull DatasetFieldId datasetFieldId) { + throwIfNotExists( + datasetFieldId.getDatasetId().getNamespace(), + datasetFieldId.getDatasetId().getName(), + datasetFieldId.getFieldName()); + } + void throwIfNotExists( @NonNull NamespaceName namespaceName, @NonNull DatasetName datasetName, @@ -96,6 +110,10 @@ void throwIfNotExists( } } + void throwIfNotExists(@NonNull JobId jobId) { + throwIfNotExists(jobId.getNamespace(), jobId.getName()); + } + void throwIfNotExists(@NonNull NamespaceName namespaceName, @NonNull JobName jobName) { if (!jobService.exists(namespaceName.getValue(), jobName.getValue())) { throw new JobNotFoundException(jobName); @@ -137,6 +155,20 @@ void throwIfDatasetsNotExist(ImmutableSet datasets) { } } + void throwIfNotExists(@NonNull NodeId nodeId) { + if (!nodeId.hasVersion()) { + if (nodeId.isDatasetType()) { + throwIfNotExists(nodeId.asDatasetId()); + } else if (nodeId.isDatasetFieldType()) { + throwIfNotExists(nodeId.asDatasetFieldId()); + } else if (nodeId.isJobType()) { + throwIfNotExists(nodeId.asJobId()); + } else if (nodeId.isRunType()) { + throwIfNotExists(nodeId.asRunId()); + } + } + } + URI locationFor(@NonNull UriInfo uriInfo, @NonNull Run run) { return uriInfo .getBaseUriBuilder() diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index a534a744d9..1dd59eaa82 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -96,6 +96,7 @@ private int determineStatusCode(Throwable e) { public Response getLineage( @QueryParam("nodeId") @NotNull NodeId nodeId, @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) { + throwIfNotExists(nodeId); return Response.ok(lineageService.lineage(nodeId, depth, true)).build(); } diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 684dde5d08..3014b32f74 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -21,6 +21,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import marquez.common.models.DatasetId; import marquez.common.models.JobId; @@ -49,14 +50,30 @@ public LineageService(LineageDao delegate, JobDao jobDao) { // TODO make input parameters easily extendable if adding more options like 'withJobFacets' public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) { + log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth); Optional optionalUUID = getJobUuid(nodeId); if (optionalUUID.isEmpty()) { - throw new NodeIdNotFoundException("Could not find node"); + log.warn( + "Failed to get job associated with node '{}', returning orphan graph...", + nodeId.getValue()); + return toLineageWithOrphanDataset(nodeId.asDatasetId()); } UUID job = optionalUUID.get(); - + log.debug("Attempting to get lineage for job '{}'", job); Set jobData = getLineage(Collections.singleton(job), depth); + // Ensure job data is not empty, an empty set cannot be passed to LineageDao.getCurrentRuns() or + // LineageDao.getCurrentRunsWithFacets(). + if (jobData.isEmpty()) { + // Log warning, then return an orphan lineage graph; a graph should contain at most one + // job->dataset relationship. + log.warn( + "Failed to get lineage for job '{}' associated with node '{}', returning orphan graph...", + job, + nodeId.getValue()); + return toLineageWithOrphanDataset(nodeId.asDatasetId()); + } + List runs = withRunFacets ? getCurrentRunsWithFacets( @@ -85,19 +102,23 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) { if (nodeId.isDatasetType() && datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) { log.warn( - "Found jobs {} which no longer share lineage with dataset {} - discarding", - jobData.stream().map(JobData::getId).toList()); - DatasetId datasetId = nodeId.asDatasetId(); - DatasetData datasetData = - getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue()); - return new Lineage( - ImmutableSortedSet.of( - Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build())); + "Found jobs {} which no longer share lineage with dataset '{}' - discarding", + jobData.stream().map(JobData::getId).toList(), + nodeId.getValue()); + return toLineageWithOrphanDataset(nodeId.asDatasetId()); } return toLineage(jobData, datasets); } + private Lineage toLineageWithOrphanDataset(@NonNull DatasetId datasetId) { + final DatasetData datasetData = + getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue()); + return new Lineage( + ImmutableSortedSet.of( + Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build())); + } + private Lineage toLineage(Set jobData, Set datasets) { Set nodes = new LinkedHashSet<>(); // build mapping for later @@ -227,7 +248,8 @@ public Optional getJobUuid(NodeId nodeId) { return getJobFromInputOrOutput( datasetId.getName().getValue(), datasetId.getNamespace().getValue()); } else { - throw new NodeIdNotFoundException("Node must be a dataset node or job node"); + throw new NodeIdNotFoundException( + String.format("Node '%s' must be of type dataset or job!", nodeId.getValue())); } } } diff --git a/api/src/test/java/marquez/api/OpenLineageResourceTest.java b/api/src/test/java/marquez/api/OpenLineageResourceTest.java index fac1bb08ec..32965b0d4d 100644 --- a/api/src/test/java/marquez/api/OpenLineageResourceTest.java +++ b/api/src/test/java/marquez/api/OpenLineageResourceTest.java @@ -9,6 +9,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -20,6 +21,7 @@ import javax.ws.rs.core.Response; import marquez.common.Utils; import marquez.db.OpenLineageDao; +import marquez.service.JobService; import marquez.service.LineageService; import marquez.service.ServiceFactory; import marquez.service.models.Lineage; @@ -36,6 +38,8 @@ class OpenLineageResourceTest { static { LineageService lineageService = mock(LineageService.class); OpenLineageDao openLineageDao = mock(OpenLineageDao.class); + JobService jobService = mock(JobService.class); + when(jobService.exists(anyString(), anyString())).thenReturn(true); Node testNode = Utils.fromJson( @@ -45,7 +49,8 @@ class OpenLineageResourceTest { when(lineageService.lineage(any(NodeId.class), anyInt(), anyBoolean())).thenReturn(LINEAGE); ServiceFactory serviceFactory = - ApiTestUtils.mockServiceFactory(Map.of(LineageService.class, lineageService)); + ApiTestUtils.mockServiceFactory( + Map.of(LineageService.class, lineageService, JobService.class, jobService)); UNDER_TEST = ResourceExtension.builder() @@ -58,7 +63,7 @@ public void testGetLineage() { final Lineage lineage = UNDER_TEST .target("/api/v1/lineage") - .queryParam("nodeId", "job:test") + .queryParam("nodeId", "job:test-namespace:test-job") .request() .get() .readEntity(Lineage.class);