Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream run level lineage implementation #2658

Merged
merged 9 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Web: add IO tab [`#2613`](https://github.com/MarquezProject/marquez/pull/2613) [
*Improves experience with large graphs by adding a new tab to move between graph elements without looking at the graph itself.*
Web: add hover-over Tag tooltip to datasets [`#2630`](https://github.com/MarquezProject/marquez/pull/2630) [@davidsharp7](https://github.com/davidsharp7)
*For parity with columns in the GUI, this adds a Tag tooltip to datasets.*
* API: upstream run-level lineage API [`#2658`](https://github.com/MarquezProject/marquez/pull/2658) [@julienledem]( https://github.com/julienledem)
*When trouble shooting an issue and doing root cause analysis, it is usefull to get the upstream run-level lineage to know exactly what version of each job and dataset a run is depending on.*

### Changed
Docker: upgrade to Docker Compose V2 [`#2644`](https://github.com/MarquezProject/marquez/pull/2644) [@merobi-hub](https://github.com/merobi-hub)
Expand Down
23 changes: 23 additions & 0 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import marquez.api.models.SortDirection;
import marquez.common.models.RunId;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.BaseEvent;
Expand Down Expand Up @@ -130,6 +131,28 @@ public Response getLineageEvents(
return Response.ok(new Events(events, totalCount)).build();
}

/**
* Returns the upstream lineage for a given run. Recursively: run -> dataset version it read from
* -> the run that produced it
*
* @param runId the run to get upstream lineage from
* @param depth the maximum depth of the upstream lineage
* @return the upstream lineage for that run up to `detph` levels
*/
@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/runlineage/upstream")
public Response getRunLineageUpstream(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mind documenting this in openapi.spec

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@QueryParam("runId") @NotNull RunId runId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
throwIfNotExists(runId);
return Response.ok(lineageService.upstream(runId, depth)).build();
}

@Value
static class Events {
@NonNull
Expand Down
43 changes: 16 additions & 27 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ public static UUID uuidOrNull(final ResultSet results, final String column) thro
}

public static UUID uuidOrThrow(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getObject(column, UUID.class);
}

Expand All @@ -166,9 +164,7 @@ public static Instant timestampOrNull(final ResultSet results, final String colu

public static Instant timestampOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getTimestamp(column).toInstant();
}

Expand All @@ -182,9 +178,7 @@ public static String stringOrNull(final ResultSet results, final String column)

public static String stringOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getString(column);
}

Expand All @@ -199,40 +193,30 @@ public static boolean booleanOrDefault(

public static boolean booleanOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getBoolean(column);
}

public static int intOrThrow(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getInt(column);
}

public static PGInterval pgIntervalOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return new PGInterval(results.getString(column));
}

public static BigDecimal bigDecimalOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getBigDecimal(column);
}

public static List<UUID> uuidArrayOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return Arrays.asList((UUID[]) results.getArray(column).getArray());
}

Expand All @@ -246,9 +230,7 @@ public static List<UUID> uuidArrayOrEmpty(final ResultSet results, final String

public static List<String> stringArrayOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return Arrays.asList((String[]) results.getArray(column).getArray());
}

Expand Down Expand Up @@ -311,4 +293,11 @@ public static PGobject toPgObject(@NonNull final Object object) {
}
return jsonObject;
}

private static void checkNotNull(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException(column + " not found in result");
}
}
}
64 changes: 64 additions & 0 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@

package marquez.db;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.validation.constraints.NotNull;
import marquez.common.models.DatasetName;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.db.mappers.DatasetDataMapper;
import marquez.db.mappers.JobDataMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
import marquez.db.mappers.UpstreamRunRowMapper;
import marquez.service.models.DatasetData;
import marquez.service.models.JobData;
import marquez.service.models.Run;
Expand All @@ -25,8 +32,18 @@
@RegisterRowMapper(JobDataMapper.class)
@RegisterRowMapper(RunMapper.class)
@RegisterRowMapper(JobRowMapper.class)
@RegisterRowMapper(UpstreamRunRowMapper.class)
public interface LineageDao {

public record JobSummary(NamespaceName namespace, JobName name, UUID version) {}

public record RunSummary(RunId id, Instant start, Instant end, String status) {}

public record DatasetSummary(
NamespaceName namespace, DatasetName name, UUID version, RunId producedByRunId) {}

public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary input) {}

/**
* Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the
* input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have
Expand Down Expand Up @@ -154,4 +171,51 @@ SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)
ORDER BY r.job_name, r.namespace_name, created_at DESC""")
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);

@SqlQuery(
"""
WITH RECURSIVE
upstream_runs(
r_uuid, -- run uuid
dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, -- input dataset version to the run
u_r_uuid, -- upstream run that produced that dataset version
depth -- current depth of traversal
) AS (

-- initial case: find the inputs of the initial runs
select r.uuid,
dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name,
dv.run_uuid,
0 AS depth -- starts at 0
FROM (SELECT :runId::uuid AS uuid) r -- initial run
LEFT JOIN runs_input_mapping rim ON rim.run_uuid = r.uuid
LEFT JOIN dataset_versions dv ON dv.uuid = rim.dataset_version_uuid

UNION

-- recursion: find the inputs of the inputs found on the previous iteration and increase depth to know when to stop
SELECT
ur.u_r_uuid,
dv2.dataset_uuid, dv2."version", dv2.namespace_name, dv2.dataset_name,
dv2.run_uuid,
ur.depth + 1 AS depth -- increase depth to check end condition
FROM upstream_runs ur
LEFT JOIN runs_input_mapping rim2 ON rim2.run_uuid = ur.u_r_uuid
LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid
-- end condition of the recursion: no input or depth is over the maximum set
-- also avoid following cycles (ex: merge statement)
WHERE ur.u_r_uuid IS NOT NULL AND ur.u_r_uuid <> ur.r_uuid AND depth < :depth
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
)

-- present the result: use Distinct as we may have traversed the same edge multiple times if there are diamonds in the graph.
SELECT * FROM ( -- we need the extra statement to sort after the DISTINCT
SELECT DISTINCT ON (upstream_runs.r_uuid, upstream_runs.dataset_version_uuid, upstream_runs.u_r_uuid)
upstream_runs.*,
r.started_at, r.ended_at, r.current_run_state as state,
r.job_uuid, r.job_version_uuid, r.namespace_name as job_namespace, r.job_name
FROM upstream_runs, runs r WHERE upstream_runs.r_uuid = r.uuid
) sub
ORDER BY depth ASC;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to ORDER BY depth ASC, r.job_name ASC;
Tests rely on the ordering of the query output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

""")
List<UpstreamRunRow> getUpstreamRuns(@NotNull UUID runId, int depth);
}
55 changes: 55 additions & 0 deletions api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrNull;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import lombok.NonNull;
import marquez.common.models.DatasetName;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.db.Columns;
import marquez.db.LineageDao.DatasetSummary;
import marquez.db.LineageDao.JobSummary;
import marquez.db.LineageDao.RunSummary;
import marquez.db.LineageDao.UpstreamRunRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

/** Maps the upstream query result set to a UpstreamRunRow */
public final class UpstreamRunRowMapper implements RowMapper<UpstreamRunRow> {
@Override
public UpstreamRunRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new UpstreamRunRow(
new JobSummary(
new NamespaceName(stringOrThrow(results, "job_namespace")),
new JobName(stringOrThrow(results, "job_name")),
Optional.ofNullable(stringOrNull(results, "job_version_uuid"))
.map(UUID::fromString)
.orElse(null)),
new RunSummary(
new RunId(uuidOrThrow(results, "r_uuid")),
timestampOrNull(results, Columns.STARTED_AT),
timestampOrNull(results, Columns.ENDED_AT),
stringOrThrow(results, Columns.STATE)),
results.getObject("dataset_name") == null
? null
: new DatasetSummary(
new NamespaceName(stringOrThrow(results, "dataset_namespace")),
new DatasetName(stringOrThrow(results, "dataset_name")),
UUID.fromString(stringOrThrow(results, "dataset_version_uuid")),
new RunId(UUID.fromString(stringOrThrow(results, "u_r_uuid")))));
}
}
43 changes: 43 additions & 0 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

package marquez.service;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;

import com.google.common.base.Functions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -21,14 +25,20 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.constraints.NotNull;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.common.models.DatasetId;
import marquez.common.models.JobId;
import marquez.common.models.RunId;
import marquez.db.JobDao;
import marquez.db.LineageDao;
import marquez.db.LineageDao.DatasetSummary;
import marquez.db.LineageDao.JobSummary;
import marquez.db.LineageDao.RunSummary;
import marquez.db.models.JobRow;
import marquez.service.DelegatingDaos.DelegatingLineageDao;
import marquez.service.LineageService.UpstreamRunLineage;
import marquez.service.models.DatasetData;
import marquez.service.models.Edge;
import marquez.service.models.Graph;
Expand All @@ -41,6 +51,11 @@

@Slf4j
public class LineageService extends DelegatingLineageDao {

public record UpstreamRunLineage(List<UpstreamRun> runs) {}

public record UpstreamRun(JobSummary job, RunSummary run, List<DatasetSummary> inputs) {}

private final JobDao jobDao;

public LineageService(LineageDao delegate, JobDao jobDao) {
Expand Down Expand Up @@ -252,4 +267,32 @@ public Optional<UUID> getJobUuid(NodeId nodeId) {
String.format("Node '%s' must be of type dataset or job!", nodeId.getValue()));
}
}

/**
* Returns the upstream lineage for a given run. Recursively: run -> dataset version it read from
* -> the run that produced it
*
* @param runId the run to get upstream lineage from
* @param depth the maximum depth of the upstream lineage
* @return the upstream lineage for that run up to `detph` levels
*/
public UpstreamRunLineage upstream(@NotNull RunId runId, int depth) {
List<UpstreamRunRow> upstreamRuns = getUpstreamRuns(runId.getValue(), depth);
Map<RunId, List<UpstreamRunRow>> collect =
upstreamRuns.stream().collect(groupingBy(r -> r.run().id(), LinkedHashMap::new, toList()));
List<UpstreamRun> runs =
collect.entrySet().stream()
.map(
row -> {
UpstreamRunRow upstreamRunRow = row.getValue().get(0);
List<DatasetSummary> inputs =
row.getValue().stream()
.map(UpstreamRunRow::input)
.filter(i -> i != null)
.collect(toList());
return new UpstreamRun(upstreamRunRow.job(), upstreamRunRow.run(), inputs);
})
.collect(toList());
return new UpstreamRunLineage(runs);
}
}
Loading
Loading