diff --git a/api/src/main/java/marquez/db/ColumnLineageDao.java b/api/src/main/java/marquez/db/ColumnLineageDao.java index b0be61947d..3de522efe9 100644 --- a/api/src/main/java/marquez/db/ColumnLineageDao.java +++ b/api/src/main/java/marquez/db/ColumnLineageDao.java @@ -99,6 +99,11 @@ void doUpsertColumnLineageRow( @SqlQuery( """ WITH RECURSIVE + column_lineage_latest AS ( + SELECT DISTINCT ON (output_dataset_field_uuid, input_dataset_field_uuid) * + FROM column_lineage + ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at + ), dataset_fields_view AS ( SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid FROM dataset_fields df @@ -106,12 +111,15 @@ dataset_fields_view AS ( ), column_lineage_recursive AS ( ( - SELECT DISTINCT ON (output_dataset_field_uuid, input_dataset_field_uuid) *, 0 as depth - FROM column_lineage + SELECT + *, + 0 as depth, + false as is_cycle, + ARRAY[ROW(output_dataset_field_uuid, input_dataset_field_uuid)] as path -- path and is_cycle mechanism as describe here https://www.postgresql.org/docs/current/queries-with.html (CYCLE clause not available in postgresql 12) + FROM column_lineage_latest WHERE output_dataset_field_uuid IN () AND created_at <= :createdAtUntil - ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at ) - UNION + UNION ALL SELECT adjacent_node.output_dataset_version_uuid, adjacent_node.output_dataset_field_uuid, @@ -121,20 +129,23 @@ WHERE output_dataset_field_uuid IN () AND created_at <= :crea adjacent_node.transformation_type, adjacent_node.created_at, adjacent_node.updated_at, - node.depth + 1 as depth - FROM column_lineage adjacent_node, column_lineage_recursive node + node.depth + 1 as depth, + ROW(adjacent_node.input_dataset_field_uuid, adjacent_node.output_dataset_field_uuid) = ANY(path) as is_cycle, + path || ROW(adjacent_node.input_dataset_field_uuid, adjacent_node.output_dataset_field_uuid) as path + FROM column_lineage_latest adjacent_node, column_lineage_recursive node WHERE ( (node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) --upstream lineage OR (:withDownstream AND adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) --optional downstream lineage ) AND node.depth < :depth + AND NOT is_cycle ) SELECT output_fields.namespace_name, output_fields.dataset_name, output_fields.field_name, output_fields.type, - ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields, + ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields, clr.transformation_description, clr.transformation_type, clr.created_at, @@ -142,6 +153,7 @@ WHERE output_dataset_field_uuid IN () AND created_at <= :crea FROM column_lineage_recursive clr INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid + WHERE NOT clr.is_cycle GROUP BY output_fields.namespace_name, output_fields.dataset_name, diff --git a/api/src/main/java/marquez/service/ColumnLineageService.java b/api/src/main/java/marquez/service/ColumnLineageService.java index b29e9ee1e5..7a7a85a9be 100644 --- a/api/src/main/java/marquez/service/ColumnLineageService.java +++ b/api/src/main/java/marquez/service/ColumnLineageService.java @@ -71,7 +71,7 @@ private Lineage toLineage(Set lineageNodeData) { DatasetFieldId.of(i.getNamespace(), i.getDataset(), i.getField()))) .forEach( inputNodeId -> { - graphNodes.put(inputNodeId, Node.datasetField().id(inputNodeId)); + graphNodes.putIfAbsent(inputNodeId, Node.datasetField().id(inputNodeId)); Optional.ofNullable(outEdges.get(inputNodeId)) .ifPresentOrElse( nodeEdges -> nodeEdges.add(nodeId), diff --git a/api/src/test/java/marquez/PostgresContainer.java b/api/src/test/java/marquez/PostgresContainer.java index a74b4893a8..9690f72b41 100644 --- a/api/src/test/java/marquez/PostgresContainer.java +++ b/api/src/test/java/marquez/PostgresContainer.java @@ -11,7 +11,7 @@ import org.testcontainers.utility.DockerImageName; public final class PostgresContainer extends PostgreSQLContainer { - private static final DockerImageName POSTGRES = DockerImageName.parse("postgres:11.8"); + private static final DockerImageName POSTGRES = DockerImageName.parse("postgres:12.1"); private static final int JDBC = 5; private static final Map containers = new HashMap<>(); diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java index 7ec8ac67c0..2b818a27ef 100644 --- a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -318,6 +318,15 @@ public void testGetLineageWithDownstream() { .filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d")) .findAny()) .isPresent(); + + ColumnLineageNodeData nodeData_C = + (ColumnLineageNodeData) + lineage.getGraph().stream() + .filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_c")) + .findAny() + .get() + .getData(); + assertThat(nodeData_C.getInputFields()).hasSize(2); } @Test