From b4944d7685be66324f65475463fdf90150856e4f Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Tue, 24 Oct 2023 14:53:37 -0700 Subject: [PATCH 1/7] upstream run level lineage implementation Signed-off-by: Julien Le Dem --- .../java/marquez/api/OpenLineageResource.java | 16 ++++++ api/src/main/java/marquez/db/LineageDao.java | 54 +++++++++++++++++++ .../db/mappers/UpstreamRunRowMapper.java | 48 +++++++++++++++++ .../java/marquez/service/LineageService.java | 34 ++++++++++++ 4 files changed, 152 insertions(+) create mode 100644 api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 259a500a53..ebbe24cd7e 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -36,6 +36,7 @@ import lombok.Value; import lombok.extern.slf4j.Slf4j; import marquez.api.models.SortDirection; +import marquez.common.models.RunId; import marquez.db.OpenLineageDao; import marquez.service.ServiceFactory; import marquez.service.models.BaseEvent; @@ -130,6 +131,21 @@ public Response getLineageEvents( return Response.ok(new Events(events, totalCount)).build(); } + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Consumes(APPLICATION_JSON) + @Produces(APPLICATION_JSON) + @Path("/runlineage/upstream") + public Response getRunLineageUpstream( + @QueryParam("runId") @NotNull RunId runId, + @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth, + @QueryParam("facets") String facets) { + throwIfNotExists(runId); + return Response.ok(lineageService.upstream(runId, depth, facets == null ? null : facets.split(","))).build(); + } + @Value static class Events { @NonNull diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index c45a06e5a9..e7f9dbc863 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -5,15 +5,22 @@ package marquez.db; +import java.time.Instant; import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.UUID; +import javax.validation.constraints.NotNull; +import marquez.common.models.DatasetName; +import marquez.common.models.JobName; +import marquez.common.models.NamespaceName; +import marquez.common.models.RunId; import marquez.db.mappers.DatasetDataMapper; import marquez.db.mappers.JobDataMapper; import marquez.db.mappers.JobRowMapper; import marquez.db.mappers.RunMapper; +import marquez.db.mappers.UpstreamRunRowMapper; import marquez.service.models.DatasetData; import marquez.service.models.JobData; import marquez.service.models.Run; @@ -25,8 +32,18 @@ @RegisterRowMapper(JobDataMapper.class) @RegisterRowMapper(RunMapper.class) @RegisterRowMapper(JobRowMapper.class) +@RegisterRowMapper(UpstreamRunRowMapper.class) public interface LineageDao { + public record JobSummary(NamespaceName namespace, JobName name, UUID version) {} + + public record RunSummary(RunId id, Instant start, Instant end, String status) {} + + public record DatasetSummary( + NamespaceName namespace, DatasetName name, UUID version, RunId producedByRunId) {} + + public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary input) {} + /** * Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the * input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have @@ -154,4 +171,41 @@ SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version WHERE j.uuid in () OR j.symlink_target_uuid IN () ORDER BY r.job_name, r.namespace_name, created_at DESC""") List getCurrentRuns(@BindList Collection jobUuid); + + @SqlQuery( + """ +WITH RECURSIVE + upstream_runs( + r_uuid, started_at, ended_at, state, + job_uuid, job_version_uuid, job_namespace, job_name, + dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, + u_r_uuid, depth) AS ( + SELECT + r.uuid, r.started_at, r.ended_at, r.current_run_state, + r.job_uuid, r.job_version_uuid, r.namespace_name, r.job_name, + dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name, + dv.run_uuid, + 0 AS depth + FROM runs r + LEFT JOIN runs_input_mapping rim ON rim.run_uuid = r.uuid + LEFT JOIN dataset_versions dv ON dv.uuid = rim.dataset_version_uuid + LEFT JOIN runs r1 ON r1.uuid = dv.run_uuid + WHERE r.uuid = :runId + UNION + SELECT + ur.u_r_uuid, r2.started_at, r2.ended_at, r2.current_run_state, + r2.job_uuid, r2.job_version_uuid, r2.namespace_name, r2.job_name, + dv2.dataset_uuid, dv2."version", dv2.namespace_name, dv2.dataset_name, + dv2.run_uuid, + ur.depth + 1 AS depth + FROM upstream_runs ur + INNER JOIN runs r2 ON r2.uuid = ur.u_r_uuid + LEFT JOIN runs_input_mapping rim2 ON rim2.run_uuid = ur.u_r_uuid + LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid + WHERE ur.u_r_uuid IS NOT NULL AND depth < :depth + ) +SELECT * FROM upstream_runs ORDER BY depth DESC; +; +""") + List getUpstreamRuns(@NotNull UUID runId, int depth); } diff --git a/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java b/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java new file mode 100644 index 0000000000..af3ccf1e71 --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static marquez.db.Columns.stringOrThrow; +import static marquez.db.Columns.timestampOrThrow; +import static marquez.db.Columns.uuidOrThrow; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.UUID; +import lombok.NonNull; +import marquez.common.models.DatasetName; +import marquez.common.models.JobName; +import marquez.common.models.NamespaceName; +import marquez.common.models.RunId; +import marquez.db.Columns; +import marquez.db.LineageDao.DatasetSummary; +import marquez.db.LineageDao.JobSummary; +import marquez.db.LineageDao.RunSummary; +import marquez.db.LineageDao.UpstreamRunRow; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +public final class UpstreamRunRowMapper implements RowMapper { + @Override + public UpstreamRunRow map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + return new UpstreamRunRow( + new JobSummary( + new NamespaceName(stringOrThrow(results, "job_namespace")), + new JobName(stringOrThrow(results, "job_name")), + UUID.fromString(stringOrThrow(results, "job_version_uuid"))), + new RunSummary( + new RunId(uuidOrThrow(results, "r_uuid")), + timestampOrThrow(results, Columns.STARTED_AT), + timestampOrThrow(results, Columns.ENDED_AT), + stringOrThrow(results, Columns.STATE)), + results.getObject("dataset_name") == null ? null : new DatasetSummary( + new NamespaceName(stringOrThrow(results, "dataset_namespace")), + new DatasetName(stringOrThrow(results, "dataset_name")), + UUID.fromString(stringOrThrow(results, "dataset_version_uuid")), + new RunId(UUID.fromString(stringOrThrow(results, "u_r_uuid"))))); + } +} diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 1c2dc34a05..2ebaac0f0a 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -5,6 +5,9 @@ package marquez.service; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; + import com.google.common.base.Functions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedSet; @@ -12,6 +15,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -21,14 +25,20 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.validation.constraints.NotNull; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import marquez.common.models.DatasetId; import marquez.common.models.JobId; +import marquez.common.models.RunId; import marquez.db.JobDao; import marquez.db.LineageDao; +import marquez.db.LineageDao.DatasetSummary; +import marquez.db.LineageDao.JobSummary; +import marquez.db.LineageDao.RunSummary; import marquez.db.models.JobRow; import marquez.service.DelegatingDaos.DelegatingLineageDao; +import marquez.service.LineageService.UpstreamRunLineage; import marquez.service.models.DatasetData; import marquez.service.models.Edge; import marquez.service.models.Graph; @@ -41,6 +51,11 @@ @Slf4j public class LineageService extends DelegatingLineageDao { + + public record UpstreamRunLineage(List runs) {} + + public record UpstreamRun(JobSummary job, RunSummary run, List inputs) {} + private final JobDao jobDao; public LineageService(LineageDao delegate, JobDao jobDao) { @@ -252,4 +267,23 @@ public Optional getJobUuid(NodeId nodeId) { String.format("Node '%s' must be of type dataset or job!", nodeId.getValue())); } } + + public UpstreamRunLineage upstream(@NotNull RunId runId, int depth, String[] facets + /** TODO */ + ) { + List upstreamRuns = getUpstreamRuns(runId.getValue(), depth); + Map> collect = + upstreamRuns.stream().collect(groupingBy(r -> r.run().id(), LinkedHashMap::new, toList())); + List runs = + collect.entrySet().stream() + .map( + row -> { + UpstreamRunRow upstreamRunRow = row.getValue().get(0); + List inputs = + row.getValue().stream().map(UpstreamRunRow::input).collect(toList()); + return new UpstreamRun(upstreamRunRow.job(), upstreamRunRow.run(), inputs); + }) + .collect(toList()); + return new UpstreamRunLineage(runs); + } } From 2007c82b1e63600d8353315e58ae36187f97da72 Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Tue, 24 Oct 2023 16:52:22 -0700 Subject: [PATCH 2/7] fix ordering and spurious nulls bug Signed-off-by: Julien Le Dem --- .../main/java/marquez/api/OpenLineageResource.java | 4 +++- api/src/main/java/marquez/db/LineageDao.java | 2 +- .../marquez/db/mappers/UpstreamRunRowMapper.java | 12 +++++++----- .../main/java/marquez/service/LineageService.java | 5 ++++- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index ebbe24cd7e..18105786e3 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -143,7 +143,9 @@ public Response getRunLineageUpstream( @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth, @QueryParam("facets") String facets) { throwIfNotExists(runId); - return Response.ok(lineageService.upstream(runId, depth, facets == null ? null : facets.split(","))).build(); + return Response.ok( + lineageService.upstream(runId, depth, facets == null ? null : facets.split(","))) + .build(); } @Value diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index e7f9dbc863..24021720e5 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -204,7 +204,7 @@ WHERE j.uuid in () OR j.symlink_target_uuid IN () LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid WHERE ur.u_r_uuid IS NOT NULL AND depth < :depth ) -SELECT * FROM upstream_runs ORDER BY depth DESC; +SELECT * FROM upstream_runs ORDER BY depth ASC; ; """) List getUpstreamRuns(@NotNull UUID runId, int depth); diff --git a/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java b/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java index af3ccf1e71..1024605115 100644 --- a/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java @@ -39,10 +39,12 @@ public UpstreamRunRow map(@NonNull ResultSet results, @NonNull StatementContext timestampOrThrow(results, Columns.STARTED_AT), timestampOrThrow(results, Columns.ENDED_AT), stringOrThrow(results, Columns.STATE)), - results.getObject("dataset_name") == null ? null : new DatasetSummary( - new NamespaceName(stringOrThrow(results, "dataset_namespace")), - new DatasetName(stringOrThrow(results, "dataset_name")), - UUID.fromString(stringOrThrow(results, "dataset_version_uuid")), - new RunId(UUID.fromString(stringOrThrow(results, "u_r_uuid"))))); + results.getObject("dataset_name") == null + ? null + : new DatasetSummary( + new NamespaceName(stringOrThrow(results, "dataset_namespace")), + new DatasetName(stringOrThrow(results, "dataset_name")), + UUID.fromString(stringOrThrow(results, "dataset_version_uuid")), + new RunId(UUID.fromString(stringOrThrow(results, "u_r_uuid"))))); } } diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 2ebaac0f0a..f0d7db7bda 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -280,7 +280,10 @@ public UpstreamRunLineage upstream(@NotNull RunId runId, int depth, String[] fac row -> { UpstreamRunRow upstreamRunRow = row.getValue().get(0); List inputs = - row.getValue().stream().map(UpstreamRunRow::input).collect(toList()); + row.getValue().stream() + .map(UpstreamRunRow::input) + .filter(i -> i != null) + .collect(toList()); return new UpstreamRun(upstreamRunRow.job(), upstreamRunRow.run(), inputs); }) .collect(toList()); From 059c7da45798f85d80a002c11326fb955c1887ec Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Wed, 25 Oct 2023 09:14:48 -0700 Subject: [PATCH 3/7] move runs table out of the recursion Signed-off-by: Julien Le Dem --- api/src/main/java/marquez/db/LineageDao.java | 26 +++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 24021720e5..677a0d280d 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -176,35 +176,31 @@ WHERE j.uuid in () OR j.symlink_target_uuid IN () """ WITH RECURSIVE upstream_runs( - r_uuid, started_at, ended_at, state, - job_uuid, job_version_uuid, job_namespace, job_name, + r_uuid, dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, u_r_uuid, depth) AS ( - SELECT - r.uuid, r.started_at, r.ended_at, r.current_run_state, - r.job_uuid, r.job_version_uuid, r.namespace_name, r.job_name, - dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name, - dv.run_uuid, - 0 AS depth - FROM runs r + select r.uuid, + dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name, + dv.run_uuid, + 0 AS depth + FROM (SELECT :runId::uuid AS uuid) r LEFT JOIN runs_input_mapping rim ON rim.run_uuid = r.uuid LEFT JOIN dataset_versions dv ON dv.uuid = rim.dataset_version_uuid - LEFT JOIN runs r1 ON r1.uuid = dv.run_uuid - WHERE r.uuid = :runId UNION SELECT - ur.u_r_uuid, r2.started_at, r2.ended_at, r2.current_run_state, - r2.job_uuid, r2.job_version_uuid, r2.namespace_name, r2.job_name, + ur.u_r_uuid, dv2.dataset_uuid, dv2."version", dv2.namespace_name, dv2.dataset_name, dv2.run_uuid, ur.depth + 1 AS depth FROM upstream_runs ur - INNER JOIN runs r2 ON r2.uuid = ur.u_r_uuid LEFT JOIN runs_input_mapping rim2 ON rim2.run_uuid = ur.u_r_uuid LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid WHERE ur.u_r_uuid IS NOT NULL AND depth < :depth ) -SELECT * FROM upstream_runs ORDER BY depth ASC; +SELECT upstream_runs.*, + r.started_at, r.ended_at, r.current_run_state as state, + r.job_uuid, r.job_version_uuid, r.namespace_name as job_namespace, r.job_name +FROM upstream_runs, runs r where upstream_runs.r_uuid = r.uuid; ; """) List getUpstreamRuns(@NotNull UUID runId, int depth); From 400644adf8e5a8d73569e1bc2485cce9aa8699f7 Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Wed, 25 Oct 2023 15:59:15 -0700 Subject: [PATCH 4/7] add comments and small improvement to the query Signed-off-by: Julien Le Dem --- api/src/main/java/marquez/db/LineageDao.java | 29 ++++++++++++++------ 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 677a0d280d..fcab6ffed4 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -176,28 +176,41 @@ WHERE j.uuid in () OR j.symlink_target_uuid IN () """ WITH RECURSIVE upstream_runs( - r_uuid, - dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, - u_r_uuid, depth) AS ( + r_uuid, -- run uuid + dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, -- input dataset version to the run + u_r_uuid, -- upstream run that produced that dataset version + depth -- current depth of traversal + ) AS ( + + -- initial case: find the inputs of the initial runs select r.uuid, dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name, dv.run_uuid, - 0 AS depth - FROM (SELECT :runId::uuid AS uuid) r + 0 AS depth -- starts at 0 + FROM (SELECT :runId::uuid AS uuid) r -- initial run LEFT JOIN runs_input_mapping rim ON rim.run_uuid = r.uuid LEFT JOIN dataset_versions dv ON dv.uuid = rim.dataset_version_uuid + UNION + + -- recursion: find the inputs of the inputs found on the previous iteration and increase depth to know when to stop SELECT ur.u_r_uuid, dv2.dataset_uuid, dv2."version", dv2.namespace_name, dv2.dataset_name, dv2.run_uuid, - ur.depth + 1 AS depth + ur.depth + 1 AS depth -- increase depth to check end condition FROM upstream_runs ur LEFT JOIN runs_input_mapping rim2 ON rim2.run_uuid = ur.u_r_uuid LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid - WHERE ur.u_r_uuid IS NOT NULL AND depth < :depth + -- end condition of the recursion: no input or depth is over the maximum set + -- also avoid following cycles (merge statement) + WHERE ur.u_r_uuid IS NOT NULL AND ur.u_r_uuid <> ur.r_uuid AND depth < :depth ) -SELECT upstream_runs.*, + + -- present the result: use Distinct as we may have traversed the same edge multiple times if there are diamonds in the graph. +SELECT DISTINCT ON (upstream_runs.r_uuid, upstream_runs.dataset_version_uuid, upstream_runs.u_r_uuid) + upstream_runs.*, + -- we add the run information after the recursion so that we join with the large run table only once r.started_at, r.ended_at, r.current_run_state as state, r.job_uuid, r.job_version_uuid, r.namespace_name as job_namespace, r.job_name FROM upstream_runs, runs r where upstream_runs.r_uuid = r.uuid; From a118c0632123962f78ea4d6772cce31067caa71e Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Tue, 31 Oct 2023 15:07:14 -0700 Subject: [PATCH 5/7] add tests Signed-off-by: Julien Le Dem --- CHANGELOG.md | 2 + .../java/marquez/api/OpenLineageResource.java | 15 ++-- api/src/main/java/marquez/db/Columns.java | 43 ++++------ api/src/main/java/marquez/db/LineageDao.java | 85 ++++++++++--------- .../db/mappers/UpstreamRunRowMapper.java | 13 ++- .../java/marquez/service/LineageService.java | 12 ++- .../test/java/marquez/db/LineageDaoTest.java | 69 +++++++++++++++ .../marquez/service/LineageServiceTest.java | 25 ++++++ 8 files changed, 183 insertions(+), 81 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13c8a535c3..8d12b5538f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ Web: add IO tab [`#2613`](https://github.com/MarquezProject/marquez/pull/2613) [ *Improves experience with large graphs by adding a new tab to move between graph elements without looking at the graph itself.* Web: add hover-over Tag tooltip to datasets [`#2630`](https://github.com/MarquezProject/marquez/pull/2630) [@davidsharp7](https://github.com/davidsharp7) *For parity with columns in the GUI, this adds a Tag tooltip to datasets.* +* API: upstream run-level lineage API [`#2658`](https://github.com/MarquezProject/marquez/pull/2658) [@julienledem]( https://github.com/julienledem) + *When trouble shooting an issue and doing root cause analysis, it is usefull to get the upstream run-level lineage to know exactly what version of each job and dataset a run is depending on.* ### Changed Docker: upgrade to Docker Compose V2 [`#2644`](https://github.com/MarquezProject/marquez/pull/2644) [@merobi-hub](https://github.com/merobi-hub) diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 18105786e3..aa43065d33 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -131,6 +131,14 @@ public Response getLineageEvents( return Response.ok(new Events(events, totalCount)).build(); } + /** + * Returns the upstream lineage for a given run. Recursively: run -> dataset version it read from + * -> the run that produced it + * + * @param runId the run to get upstream lineage from + * @param depth the maximum depth of the upstream lineage + * @return the upstream lineage for that run up to `detph` levels + */ @Timed @ResponseMetered @ExceptionMetered @@ -140,12 +148,9 @@ public Response getLineageEvents( @Path("/runlineage/upstream") public Response getRunLineageUpstream( @QueryParam("runId") @NotNull RunId runId, - @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth, - @QueryParam("facets") String facets) { + @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) { throwIfNotExists(runId); - return Response.ok( - lineageService.upstream(runId, depth, facets == null ? null : facets.split(","))) - .build(); + return Response.ok(lineageService.upstream(runId, depth)).build(); } @Value diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index dfdf67492a..fe7c1fcf5f 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -150,9 +150,7 @@ public static UUID uuidOrNull(final ResultSet results, final String column) thro } public static UUID uuidOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getObject(column, UUID.class); } @@ -166,9 +164,7 @@ public static Instant timestampOrNull(final ResultSet results, final String colu public static Instant timestampOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getTimestamp(column).toInstant(); } @@ -182,9 +178,7 @@ public static String stringOrNull(final ResultSet results, final String column) public static String stringOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getString(column); } @@ -199,40 +193,30 @@ public static boolean booleanOrDefault( public static boolean booleanOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getBoolean(column); } public static int intOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getInt(column); } public static PGInterval pgIntervalOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return new PGInterval(results.getString(column)); } public static BigDecimal bigDecimalOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return results.getBigDecimal(column); } public static List uuidArrayOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return Arrays.asList((UUID[]) results.getArray(column).getArray()); } @@ -246,9 +230,7 @@ public static List uuidArrayOrEmpty(final ResultSet results, final String public static List stringArrayOrThrow(final ResultSet results, final String column) throws SQLException { - if (results.getObject(column) == null) { - throw new IllegalArgumentException(); - } + checkNotNull(results, column); return Arrays.asList((String[]) results.getArray(column).getArray()); } @@ -311,4 +293,11 @@ public static PGobject toPgObject(@NonNull final Object object) { } return jsonObject; } + + private static void checkNotNull(final ResultSet results, final String column) + throws SQLException { + if (results.getObject(column) == null) { + throw new IllegalArgumentException(column + " not found in result"); + } + } } diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index fcab6ffed4..f3b1eaa294 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -174,47 +174,48 @@ WHERE j.uuid in () OR j.symlink_target_uuid IN () @SqlQuery( """ -WITH RECURSIVE - upstream_runs( - r_uuid, -- run uuid - dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, -- input dataset version to the run - u_r_uuid, -- upstream run that produced that dataset version - depth -- current depth of traversal - ) AS ( - - -- initial case: find the inputs of the initial runs - select r.uuid, - dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name, - dv.run_uuid, - 0 AS depth -- starts at 0 - FROM (SELECT :runId::uuid AS uuid) r -- initial run - LEFT JOIN runs_input_mapping rim ON rim.run_uuid = r.uuid - LEFT JOIN dataset_versions dv ON dv.uuid = rim.dataset_version_uuid - - UNION - - -- recursion: find the inputs of the inputs found on the previous iteration and increase depth to know when to stop - SELECT - ur.u_r_uuid, - dv2.dataset_uuid, dv2."version", dv2.namespace_name, dv2.dataset_name, - dv2.run_uuid, - ur.depth + 1 AS depth -- increase depth to check end condition - FROM upstream_runs ur - LEFT JOIN runs_input_mapping rim2 ON rim2.run_uuid = ur.u_r_uuid - LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid - -- end condition of the recursion: no input or depth is over the maximum set - -- also avoid following cycles (merge statement) - WHERE ur.u_r_uuid IS NOT NULL AND ur.u_r_uuid <> ur.r_uuid AND depth < :depth - ) - - -- present the result: use Distinct as we may have traversed the same edge multiple times if there are diamonds in the graph. -SELECT DISTINCT ON (upstream_runs.r_uuid, upstream_runs.dataset_version_uuid, upstream_runs.u_r_uuid) - upstream_runs.*, - -- we add the run information after the recursion so that we join with the large run table only once - r.started_at, r.ended_at, r.current_run_state as state, - r.job_uuid, r.job_version_uuid, r.namespace_name as job_namespace, r.job_name -FROM upstream_runs, runs r where upstream_runs.r_uuid = r.uuid; -; -""") + WITH RECURSIVE + upstream_runs( + r_uuid, -- run uuid + dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, -- input dataset version to the run + u_r_uuid, -- upstream run that produced that dataset version + depth -- current depth of traversal + ) AS ( + + -- initial case: find the inputs of the initial runs + select r.uuid, + dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name, + dv.run_uuid, + 0 AS depth -- starts at 0 + FROM (SELECT :runId::uuid AS uuid) r -- initial run + LEFT JOIN runs_input_mapping rim ON rim.run_uuid = r.uuid + LEFT JOIN dataset_versions dv ON dv.uuid = rim.dataset_version_uuid + + UNION + + -- recursion: find the inputs of the inputs found on the previous iteration and increase depth to know when to stop + SELECT + ur.u_r_uuid, + dv2.dataset_uuid, dv2."version", dv2.namespace_name, dv2.dataset_name, + dv2.run_uuid, + ur.depth + 1 AS depth -- increase depth to check end condition + FROM upstream_runs ur + LEFT JOIN runs_input_mapping rim2 ON rim2.run_uuid = ur.u_r_uuid + LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid + -- end condition of the recursion: no input or depth is over the maximum set + -- also avoid following cycles (ex: merge statement) + WHERE ur.u_r_uuid IS NOT NULL AND ur.u_r_uuid <> ur.r_uuid AND depth < :depth + ) + + -- present the result: use Distinct as we may have traversed the same edge multiple times if there are diamonds in the graph. + SELECT * FROM ( -- we need the extra statement to sort after the DISTINCT + SELECT DISTINCT ON (upstream_runs.r_uuid, upstream_runs.dataset_version_uuid, upstream_runs.u_r_uuid) + upstream_runs.*, + r.started_at, r.ended_at, r.current_run_state as state, + r.job_uuid, r.job_version_uuid, r.namespace_name as job_namespace, r.job_name + FROM upstream_runs, runs r WHERE upstream_runs.r_uuid = r.uuid + ) sub + ORDER BY depth ASC; + """) List getUpstreamRuns(@NotNull UUID runId, int depth); } diff --git a/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java b/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java index 1024605115..37d49a4cad 100644 --- a/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java @@ -5,12 +5,14 @@ package marquez.db.mappers; +import static marquez.db.Columns.stringOrNull; import static marquez.db.Columns.stringOrThrow; -import static marquez.db.Columns.timestampOrThrow; +import static marquez.db.Columns.timestampOrNull; import static marquez.db.Columns.uuidOrThrow; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Optional; import java.util.UUID; import lombok.NonNull; import marquez.common.models.DatasetName; @@ -25,6 +27,7 @@ import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.statement.StatementContext; +/** Maps the upstream query result set to a UpstreamRunRow */ public final class UpstreamRunRowMapper implements RowMapper { @Override public UpstreamRunRow map(@NonNull ResultSet results, @NonNull StatementContext context) @@ -33,11 +36,13 @@ public UpstreamRunRow map(@NonNull ResultSet results, @NonNull StatementContext new JobSummary( new NamespaceName(stringOrThrow(results, "job_namespace")), new JobName(stringOrThrow(results, "job_name")), - UUID.fromString(stringOrThrow(results, "job_version_uuid"))), + Optional.ofNullable(stringOrNull(results, "job_version_uuid")) + .map(UUID::fromString) + .orElse(null)), new RunSummary( new RunId(uuidOrThrow(results, "r_uuid")), - timestampOrThrow(results, Columns.STARTED_AT), - timestampOrThrow(results, Columns.ENDED_AT), + timestampOrNull(results, Columns.STARTED_AT), + timestampOrNull(results, Columns.ENDED_AT), stringOrThrow(results, Columns.STATE)), results.getObject("dataset_name") == null ? null diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index f0d7db7bda..7eddec4fd9 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -268,9 +268,15 @@ public Optional getJobUuid(NodeId nodeId) { } } - public UpstreamRunLineage upstream(@NotNull RunId runId, int depth, String[] facets - /** TODO */ - ) { + /** + * Returns the upstream lineage for a given run. Recursively: run -> dataset version it read from + * -> the run that produced it + * + * @param runId the run to get upstream lineage from + * @param depth the maximum depth of the upstream lineage + * @return the upstream lineage for that run up to `detph` levels + */ + public UpstreamRunLineage upstream(@NotNull RunId runId, int depth) { List upstreamRuns = getUpstreamRuns(runId.getValue(), depth); Map> collect = upstreamRuns.stream().collect(groupingBy(r -> r.run().id(), LinkedHashMap::new, toList())); diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 354ab495dc..57fcea63ce 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -29,6 +29,7 @@ import java.util.stream.Stream; import marquez.api.JdbiUtils; import marquez.common.models.JobType; +import marquez.db.LineageDao.UpstreamRunRow; import marquez.db.LineageTestUtils.DatasetConsumerJob; import marquez.db.LineageTestUtils.JobLineage; import marquez.db.models.JobRow; @@ -888,4 +889,72 @@ public void testGetCurrentRunsGetsLatestRun() { .extracting(r -> r.getId().getValue()) .containsAll(expectedRunIds); } + + @Test + public void testGetRunLineage() { + + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(dataset)); + List jobRows = + writeDownstreamLineage( + openLineageDao, + new LinkedList<>( + Arrays.asList( + new DatasetConsumerJob("readJob", 20, Optional.of("outputData")), + new DatasetConsumerJob("downstreamJob", 1, Optional.empty()))), + jobFacet, + dataset); + + // don't expect a failed job in the returned lineage + UpdateLineageRow failedJobRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "readJobFailed", + "FAILED", + jobFacet, + Arrays.asList(dataset), + Arrays.asList()); + + // don't expect a disjoint job in the returned lineage + UpdateLineageRow disjointJob = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeRandomDataset", + "COMPLETE", + jobFacet, + Arrays.asList( + new Dataset( + NAMESPACE, + "randomDataset", + newDatasetFacet( + new SchemaField("firstname", "string", "the first name"), + new SchemaField("lastname", "string", "the last name")))), + Arrays.asList()); + + { + List upstream = + lineageDao.getUpstreamRuns(failedJobRow.getRun().getUuid(), 10); + + assertThat(upstream).size().isEqualTo(2); + assertThat(upstream.get(0).job().name().getValue()) + .isEqualTo(failedJobRow.getJob().getName()); + assertThat(upstream.get(0).input().name().getValue()).isEqualTo(dataset.getName()); + assertThat(upstream.get(1).job().name().getValue()).isEqualTo(writeJob.getJob().getName()); + } + + { + List upstream2 = lineageDao.getUpstreamRuns(jobRows.get(0).getRunId(), 10); + + assertThat(upstream2).size().isEqualTo(2); + assertThat(upstream2.get(0).job().name().getValue()).isEqualTo(jobRows.get(0).getName()); + assertThat(upstream2.get(0).input().name().getValue()).isEqualTo(dataset.getName()); + assertThat(upstream2.get(1).job().name().getValue()).isEqualTo(writeJob.getJob().getName()); + } + } } diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 7f6828dfa0..de8d0e15c5 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -9,11 +9,13 @@ import static marquez.db.LineageTestUtils.newDatasetFacet; import static marquez.db.LineageTestUtils.writeDownstreamLineage; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import marquez.api.JdbiUtils; import marquez.common.models.DatasetName; import marquez.common.models.InputDatasetVersion; @@ -30,7 +32,9 @@ import marquez.db.OpenLineageDao; import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.LineageService.UpstreamRunLineage; import marquez.service.models.Edge; +import marquez.service.models.Job; import marquez.service.models.JobData; import marquez.service.models.Lineage; import marquez.service.models.LineageEvent.Dataset; @@ -195,6 +199,27 @@ && jobNameEquals(n, "downstreamJob0<-outputData<-readJob0<-commonDataset")) NodeId.of( new NamespaceName(NAMESPACE), new DatasetName("outputData<-readJob0<-commonDataset"))); + + List jobs = jobDao.findAllWithRun(NAMESPACE, 1000, 0); + jobs = + jobs.stream() + .filter(j -> j.getName().getValue().contains("newDownstreamJob")) + .collect(Collectors.toList()); + assertTrue(jobs.size() > 0); + Job job = jobs.get(0); + assertTrue(job.getLatestRun().isPresent()); + UpstreamRunLineage upstreamLineage = + lineageService.upstream(job.getLatestRun().get().getId(), 10); + assertThat(upstreamLineage.runs()).size().isEqualTo(3); + assertThat(upstreamLineage.runs().get(0).job().name().getValue()) + .matches("newDownstreamJob.*<-outputData.*<-newReadJob.*<-commonDataset"); + assertThat(upstreamLineage.runs().get(0).inputs().get(0).name().getValue()) + .matches("outputData.*<-newReadJob.*<-commonDataset"); + assertThat(upstreamLineage.runs().get(1).job().name().getValue()) + .matches("newReadJob.*<-commonDataset"); + assertThat(upstreamLineage.runs().get(1).inputs().get(0).name().getValue()) + .isEqualTo("commonDataset"); + assertThat(upstreamLineage.runs().get(2).job().name().getValue()).isEqualTo("writeJob"); } @Test From 61da2808be470ab7170172d10b46cf3cf7f6db4c Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Thu, 2 Nov 2023 16:33:30 -0700 Subject: [PATCH 6/7] add openapi spec Signed-off-by: Julien Le Dem --- spec/openapi.yml | 75 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/spec/openapi.yml b/spec/openapi.yml index 5b79d0dbfa..b8d4d0488d 100644 --- a/spec/openapi.yml +++ b/spec/openapi.yml @@ -603,6 +603,23 @@ paths: schema: $ref: '#/components/schemas/LineageGraph' + /runlineage/upstream: + get: + operationId: getRunLineageUpstream + paramteters: + - $ref: '#/components/parameters/runId' + - $ref: '#/components/parameters/depth' + tags: + - Lineage + summary: Get the upstream lineage for a given run + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/UpstreamRunLineage' + /column-lineage: get: operationId: getLineage @@ -1913,6 +1930,64 @@ components: example: description: My first tag! + UpstreamRunLineage: + type: object + properties: + runs: + description: the list of upstream runs including the selected run in topological order upstream + type: array + items: + type: object + propoerties: + job: + type: object + description: identifier of the job version for this run + propoerties: + namespace: + type: string + name: + type: string + version: + type: string + format: uuid + run: + description: run information + type: object + properties: + id: + description: the run identifier + type: string + format: uuid + start: + description: start time of the run + type: string + format: date-time + end: + description: end time of the run + type: string + format: date-time + status: + description: whether the run completed succesfully or not + type: string + inputs: + description: ustream input dataset versions + type: array + items: + description: identifier of the upstream dataset version consumed by this run + type: object + properties: + namespace: + type: string + name: + type: string + version: + type: string + format: uuid + producedByRunId: + description: the run that produced this dataset version + type: string + format: uuid + LineageEvent: example: eventType: COMPLETE From 870557f6b6ba75681008f559ba6c9e4fab2069d1 Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Mon, 6 Nov 2023 10:26:11 -0800 Subject: [PATCH 7/7] review feedback: improve tests; query Signed-off-by: Julien Le Dem --- api/src/main/java/marquez/db/LineageDao.java | 2 +- .../test/java/marquez/db/LineageDaoTest.java | 22 ++++++++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index f3b1eaa294..5e520b22a6 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -215,7 +215,7 @@ SELECT DISTINCT ON (upstream_runs.r_uuid, upstream_runs.dataset_version_uuid, up r.job_uuid, r.job_version_uuid, r.namespace_name as job_namespace, r.job_name FROM upstream_runs, runs r WHERE upstream_runs.r_uuid = r.uuid ) sub - ORDER BY depth ASC; + ORDER BY depth ASC, job_name ASC; """) List getUpstreamRuns(@NotNull UUID runId, int depth); } diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 57fcea63ce..aed23901a8 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -893,13 +893,24 @@ public void testGetCurrentRunsGetsLatestRun() { @Test public void testGetRunLineage() { + Dataset upstreamDataset = new Dataset(NAMESPACE, "upstreamDataset", null); + + UpdateLineageRow upstreamJob = + LineageTestUtils.createLineageRow( + openLineageDao, + "upstreamJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(upstreamDataset)); + UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( openLineageDao, "writeJob", "COMPLETE", jobFacet, - Arrays.asList(), + Arrays.asList(upstreamDataset), Arrays.asList(dataset)); List jobRows = writeDownstreamLineage( @@ -941,20 +952,25 @@ public void testGetRunLineage() { List upstream = lineageDao.getUpstreamRuns(failedJobRow.getRun().getUuid(), 10); - assertThat(upstream).size().isEqualTo(2); + assertThat(upstream).size().isEqualTo(3); assertThat(upstream.get(0).job().name().getValue()) .isEqualTo(failedJobRow.getJob().getName()); assertThat(upstream.get(0).input().name().getValue()).isEqualTo(dataset.getName()); assertThat(upstream.get(1).job().name().getValue()).isEqualTo(writeJob.getJob().getName()); + assertThat(upstream.get(1).input().name().getValue()).isEqualTo(upstreamDataset.getName()); + assertThat(upstream.get(2).job().name().getValue()).isEqualTo(upstreamJob.getJob().getName()); } { List upstream2 = lineageDao.getUpstreamRuns(jobRows.get(0).getRunId(), 10); - assertThat(upstream2).size().isEqualTo(2); + assertThat(upstream2).size().isEqualTo(3); assertThat(upstream2.get(0).job().name().getValue()).isEqualTo(jobRows.get(0).getName()); assertThat(upstream2.get(0).input().name().getValue()).isEqualTo(dataset.getName()); assertThat(upstream2.get(1).job().name().getValue()).isEqualTo(writeJob.getJob().getName()); + assertThat(upstream2.get(1).input().name().getValue()).isEqualTo(upstreamDataset.getName()); + assertThat(upstream2.get(2).job().name().getValue()) + .isEqualTo(upstreamJob.getJob().getName()); } } }