Skip to content

Commit

Permalink
fix downstream recursion
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski committed Oct 12, 2022
1 parent 472e88c commit a117772
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 9 deletions.
26 changes: 19 additions & 7 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,27 @@ void doUpsertColumnLineageRow(
@SqlQuery(
"""
WITH RECURSIVE
column_lineage_latest AS (
SELECT DISTINCT ON (output_dataset_field_uuid, input_dataset_field_uuid) *
FROM column_lineage
ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at
),
dataset_fields_view AS (
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
FROM dataset_fields df
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
),
column_lineage_recursive AS (
(
SELECT DISTINCT ON (output_dataset_field_uuid, input_dataset_field_uuid) *, 0 as depth
FROM column_lineage
SELECT
*,
0 as depth,
false as is_cycle,
ARRAY[ROW(output_dataset_field_uuid, input_dataset_field_uuid)] as path -- path and is_cycle mechanism as describe here https://www.postgresql.org/docs/current/queries-with.html (CYCLE clause not available in postgresql 12)
FROM column_lineage_latest
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at
)
UNION
UNION ALL
SELECT
adjacent_node.output_dataset_version_uuid,
adjacent_node.output_dataset_field_uuid,
Expand All @@ -121,27 +129,31 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :crea
adjacent_node.transformation_type,
adjacent_node.created_at,
adjacent_node.updated_at,
node.depth + 1 as depth
FROM column_lineage adjacent_node, column_lineage_recursive node
node.depth + 1 as depth,
ROW(adjacent_node.input_dataset_field_uuid, adjacent_node.output_dataset_field_uuid) = ANY(path) as is_cycle,
path || ROW(adjacent_node.input_dataset_field_uuid, adjacent_node.output_dataset_field_uuid) as path
FROM column_lineage_latest adjacent_node, column_lineage_recursive node
WHERE (
(node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) --upstream lineage
OR (:withDownstream AND adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) --optional downstream lineage
)
AND node.depth < :depth
AND NOT is_cycle
)
SELECT
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
FROM column_lineage_recursive clr
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
WHERE NOT clr.is_cycle
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private Lineage toLineage(Set<ColumnLineageNodeData> lineageNodeData) {
DatasetFieldId.of(i.getNamespace(), i.getDataset(), i.getField())))
.forEach(
inputNodeId -> {
graphNodes.put(inputNodeId, Node.datasetField().id(inputNodeId));
graphNodes.putIfAbsent(inputNodeId, Node.datasetField().id(inputNodeId));
Optional.ofNullable(outEdges.get(inputNodeId))
.ifPresentOrElse(
nodeEdges -> nodeEdges.add(nodeId),
Expand Down
2 changes: 1 addition & 1 deletion api/src/test/java/marquez/PostgresContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.testcontainers.utility.DockerImageName;

public final class PostgresContainer extends PostgreSQLContainer<PostgresContainer> {
private static final DockerImageName POSTGRES = DockerImageName.parse("postgres:11.8");
private static final DockerImageName POSTGRES = DockerImageName.parse("postgres:12.1");
private static final int JDBC = 5;

private static final Map<String, PostgresContainer> containers = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,15 @@ public void testGetLineageWithDownstream() {
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d"))
.findAny())
.isPresent();

ColumnLineageNodeData nodeData_C =
(ColumnLineageNodeData)
lineage.getGraph().stream()
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_c"))
.findAny()
.get()
.getData();
assertThat(nodeData_C.getInputFields()).hasSize(2);
}

@Test
Expand Down

0 comments on commit a117772

Please sign in to comment.