Skip to content

Commit

Permalink
add column lineage graph endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski committed Sep 28, 2022
1 parent 21dac22 commit a8a5a41
Show file tree
Hide file tree
Showing 25 changed files with 1,274 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Added
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import marquez.api.TagResource;
import marquez.api.exceptions.JdbiExceptionExceptionMapper;
import marquez.db.BaseDao;
import marquez.db.ColumnLineageDao;
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
Expand All @@ -39,6 +40,7 @@
import marquez.db.TagDao;
import marquez.graphql.GraphqlSchemaBuilder;
import marquez.graphql.MarquezGraphqlServletBuilder;
import marquez.service.ColumnLineageService;
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
Expand Down Expand Up @@ -70,6 +72,7 @@ public final class MarquezContext {
@Getter private final TagDao tagDao;
@Getter private final OpenLineageDao openLineageDao;
@Getter private final LineageDao lineageDao;
@Getter private final ColumnLineageDao columnLineageDao;
@Getter private final SearchDao searchDao;
@Getter private final List<RunTransitionListener> runTransitionListeners;

Expand All @@ -81,6 +84,7 @@ public final class MarquezContext {
@Getter private final RunService runService;
@Getter private final OpenLineageService openLineageService;
@Getter private final LineageService lineageService;
@Getter private final ColumnLineageService columnLineageService;
@Getter private final NamespaceResource namespaceResource;
@Getter private final SourceResource sourceResource;
@Getter private final DatasetResource datasetResource;
Expand Down Expand Up @@ -115,6 +119,7 @@ private MarquezContext(
this.tagDao = jdbi.onDemand(TagDao.class);
this.openLineageDao = jdbi.onDemand(OpenLineageDao.class);
this.lineageDao = jdbi.onDemand(LineageDao.class);
this.columnLineageDao = jdbi.onDemand(ColumnLineageDao.class);
this.searchDao = jdbi.onDemand(SearchDao.class);
this.runTransitionListeners = runTransitionListeners;

Expand All @@ -128,6 +133,7 @@ private MarquezContext(
this.tagService.init(tags);
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao, jobDao);
this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao);
this.jdbiException = new JdbiExceptionExceptionMapper();
final ServiceFactory serviceFactory =
ServiceFactory.builder()
Expand All @@ -139,6 +145,7 @@ private MarquezContext(
.openLineageService(openLineageService)
.sourceService(sourceService)
.lineageService(lineageService)
.columnLineageService(columnLineageService)
.datasetFieldService(new DatasetFieldService(baseDao))
.datasetVersionService(new DatasetVersionService(baseDao))
.build();
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/api/BaseResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.SourceName;
import marquez.service.ColumnLineageService;
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
Expand All @@ -50,6 +51,7 @@ public class BaseResource {
protected DatasetVersionService datasetVersionService;
protected DatasetFieldService datasetFieldService;
protected LineageService lineageService;
protected ColumnLineageService columnLineageService;

public BaseResource(ServiceFactory serviceFactory) {
this.serviceFactory = serviceFactory;
Expand All @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) {
this.datasetVersionService = serviceFactory.getDatasetVersionService();
this.datasetFieldService = serviceFactory.getDatasetFieldService();
this.lineageService = serviceFactory.getLineageService();
this.columnLineageService = serviceFactory.getColumnLineageService();
}

void throwIfNotExists(@NonNull NamespaceName namespaceName) {
Expand Down
50 changes: 50 additions & 0 deletions api/src/main/java/marquez/api/ColumnLineageResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.api;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.service.ServiceFactory;
import marquez.service.models.NodeId;

@Slf4j
@Path("/api/v1/column-lineage")
public class ColumnLineageResource extends BaseResource {

private static final String DEFAULT_DEPTH = "20";

public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
super(serviceFactory);
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
throws ExecutionException, InterruptedException {
return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build();
}
}
27 changes: 27 additions & 0 deletions api/src/main/java/marquez/common/models/DatasetFieldId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

/** ID for {@code DatasetField}. */
@EqualsAndHashCode
@AllArgsConstructor
@ToString
public class DatasetFieldId {

@Getter private final DatasetId datasetId;
@Getter private final FieldName fieldName;

public static DatasetFieldId of(String namespace, String datasetName, String field) {
return new DatasetFieldId(
new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)),
FieldName.of(field));
}
}
62 changes: 62 additions & 0 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,27 @@

package marquez.db;

import static org.jdbi.v3.sqlobject.customizer.BindList.EmptyHandling.NULL_STRING;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.db.mappers.ColumnLineageNodeDataMapper;
import marquez.db.mappers.ColumnLineageRowMapper;
import marquez.db.models.ColumnLineageNodeData;
import marquez.db.models.ColumnLineageRow;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindBeanList;
import org.jdbi.v3.sqlobject.customizer.BindList;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(ColumnLineageRowMapper.class)
@RegisterRowMapper(ColumnLineageNodeDataMapper.class)
public interface ColumnLineageDao extends BaseDao {

default List<ColumnLineageRow> upsertColumnLineageRow(
Expand Down Expand Up @@ -88,4 +95,59 @@ void doUpsertColumnLineageRow(
},
value = "values")
List<ColumnLineageRow> rows);

@SqlQuery(
"""
WITH RECURSIVE
dataset_fields_view AS (
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
FROM dataset_fields df
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
),
column_lineage_recursive AS (
SELECT *, 0 as depth
FROM column_lineage
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
UNION
SELECT
upstream_node.output_dataset_version_uuid,
upstream_node.output_dataset_field_uuid,
upstream_node.input_dataset_version_uuid,
upstream_node.input_dataset_field_uuid,
upstream_node.transformation_description,
upstream_node.transformation_type,
upstream_node.created_at,
upstream_node.updated_at,
node.depth + 1 as depth
FROM column_lineage upstream_node, column_lineage_recursive node
WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid
AND node.depth < :depth
)
SELECT
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
FROM column_lineage_recursive clr
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
""")
Set<ColumnLineageNodeData> getLineage(
int depth,
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
Instant createdAtUntil);
}
19 changes: 19 additions & 0 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ default Dataset updateTags(
+ "WHERE dataset_uuid = :datasetUuid AND name = :name")
Optional<UUID> findUuid(UUID datasetUuid, String name);

@SqlQuery(
"""
SELECT df.uuid
FROM dataset_fields df
INNER JOIN datasets_view AS d
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
""")
List<UUID> findDatasetFieldsUuids(String namespace, String datasetName);

@SqlQuery(
"""
SELECT df.uuid
FROM dataset_fields df
INNER JOIN datasets_view AS d
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
WHERE df.name = :name
""")
Optional<UUID> findUuid(String namespace, String datasetName, String name);

@SqlQuery(
"SELECT f.*, "
+ "ARRAY(SELECT t.name "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package marquez.db.mappers;

import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
import static marquez.db.Columns.TRANSFORMATION_TYPE;
import static marquez.db.Columns.stringOrThrow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
import marquez.db.Columns;
import marquez.db.models.ColumnLineageNodeData;
import marquez.db.models.InputFieldNodeData;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
import org.postgresql.jdbc.PgArray;

@Slf4j
public class ColumnLineageNodeDataMapper implements RowMapper<ColumnLineageNodeData> {

private static final ObjectMapper MAPPER = Utils.getMapper();

@Override
public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws SQLException {
return new ColumnLineageNodeData(
stringOrThrow(results, Columns.NAMESPACE_NAME),
stringOrThrow(results, Columns.DATASET_NAME),
stringOrThrow(results, Columns.FIELD_NAME),
stringOrThrow(results, Columns.TYPE),
stringOrThrow(results, TRANSFORMATION_DESCRIPTION),
stringOrThrow(results, TRANSFORMATION_TYPE),
toInputFields(results, "inputFields"));
}

public static ImmutableList<InputFieldNodeData> toInputFields(ResultSet results, String column)
throws SQLException {
if (results.getObject(column) == null) {
return ImmutableList.of();
}

PgArray pgArray = (PgArray) results.getObject(column);
Object[] deserializedArray = (Object[]) pgArray.getArray();

return ImmutableList.copyOf(
Arrays.asList(deserializedArray).stream()
.map(o -> (String[]) o)
.map(arr -> new InputFieldNodeData(arr[0], arr[1], arr[2]))
.collect(Collectors.toList()));
}
}
23 changes: 23 additions & 0 deletions api/src/main/java/marquez/db/models/ColumnLineageNodeData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.models;

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;

@Getter
@AllArgsConstructor
public class ColumnLineageNodeData implements NodeData {
@NonNull String namespace;
@NonNull String name;
@NonNull String field;
@NonNull String dataType;
@NonNull String transformationDescription;
@NonNull String transformationType;
@NonNull List<InputFieldNodeData> inputFields;
}
18 changes: 18 additions & 0 deletions api/src/main/java/marquez/db/models/InputFieldNodeData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.models;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;

@Getter
@AllArgsConstructor
public class InputFieldNodeData {
@NonNull String namespace;
@NonNull String name;
@NonNull String field;
}
3 changes: 2 additions & 1 deletion api/src/main/java/marquez/db/models/NodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = DatasetData.class, name = "DATASET"),
@JsonSubTypes.Type(value = JobData.class, name = "JOB")
@JsonSubTypes.Type(value = JobData.class, name = "JOB"),
@JsonSubTypes.Type(value = ColumnLineageNodeData.class, name = "DATASET_FIELD")
})
public interface NodeData {}
Loading

0 comments on commit a8a5a41

Please sign in to comment.