diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f5a2c5ad1..eb6fcb1bb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,15 @@ ## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.32.0...HEAD) ### Fixed + * UI: better handling of null job latestRun for Jobs page [#2467](https://github.com/MarquezProject/marquez/pull/2467) [@perttus](https://github.com/perttus) + +### Added + +* Support `inputFacets` and `outputFacets` from Openlineage specificatio [`#2417`](https://github.com/MarquezProject/marquez/pull/2417) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Adds the ability to store `inputFacets` / `outputFacets` which are sent within datasets.* + *Expose them through Marquez API as a member of `Run` resource.* + ## [0.32.0](https://github.com/MarquezProject/marquez/compare/0.31.0...0.32.0) - 2023-03-20 ### Fixed diff --git a/COMMITTERS.md b/COMMITTERS.md index 5f0c08eb0a..35b38266a9 100644 --- a/COMMITTERS.md +++ b/COMMITTERS.md @@ -24,7 +24,7 @@ They take responsibility for guiding new pull requests into the main branch. | Michael Robinson | [@merobi-hub](https://github.com/merobi-hub) | | Ross Turk | [@rossturk](https://github.com/rossturk) | | Minkyu Park | [@fm100](https://github.com/fm100) | -| Pawel Leszczynski | [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) | +| Paweł Leszczyński | [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) | ## Emeritus diff --git a/api/src/main/java/marquez/common/models/InputDatasetVersion.java b/api/src/main/java/marquez/common/models/InputDatasetVersion.java new file mode 100644 index 0000000000..f054db44c3 --- /dev/null +++ b/api/src/main/java/marquez/common/models/InputDatasetVersion.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.common.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +/** + * Class used to store dataset version and `inputFacets` which are assigned to datasets within + * OpenLineage spec, but are exposed within Marquez api as a part of {@link + * marquez.service.models.Run} + */ +@EqualsAndHashCode +@ToString +@Getter +public class InputDatasetVersion { + + private final DatasetVersionId datasetVersionId; + private final ImmutableMap facets; + + public InputDatasetVersion( + @JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId, + @JsonProperty("facets") @NonNull ImmutableMap facets) { + this.datasetVersionId = datasetVersionId; + this.facets = facets; + } +} diff --git a/api/src/main/java/marquez/common/models/OutputDatasetVersion.java b/api/src/main/java/marquez/common/models/OutputDatasetVersion.java new file mode 100644 index 0000000000..a31a004dff --- /dev/null +++ b/api/src/main/java/marquez/common/models/OutputDatasetVersion.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.common.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +/** + * Class used to store dataset version and `outputFacets` which are assigned to datasets within + * OpenLineage spec, but are exposed within Marquez api as a part of {@link + * marquez.service.models.Run} + */ +@EqualsAndHashCode +@ToString +@Getter +public class OutputDatasetVersion { + + private final DatasetVersionId datasetVersionId; + private final ImmutableMap facets; + + public OutputDatasetVersion( + @JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId, + @JsonProperty("facets") @NonNull ImmutableMap facets) { + this.datasetVersionId = datasetVersionId; + this.facets = facets; + } +} diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index fca22cff50..8abc5ab162 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -55,6 +55,7 @@ private Columns() {} public static final String NAMESPACE_NAME = "namespace_name"; public static final String DATASET_NAME = "dataset_name"; public static final String FACETS = "facets"; + public static final String DATASET_FACETS = "dataset_facets"; public static final String TAGS = "tags"; public static final String IS_HIDDEN = "is_hidden"; diff --git a/api/src/main/java/marquez/db/DatasetDao.java b/api/src/main/java/marquez/db/DatasetDao.java index 64da665b3f..04919cd8fd 100644 --- a/api/src/main/java/marquez/db/DatasetDao.java +++ b/api/src/main/java/marquez/db/DatasetDao.java @@ -86,7 +86,7 @@ LEFT JOIN ( df.dataset_version_uuid, JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets FROM dataset_facets_view AS df - WHERE df.facet IS NOT NULL + WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') GROUP BY df.dataset_version_uuid ) f ON f.dataset_version_uuid = d.current_version_uuid WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks) @@ -134,7 +134,7 @@ LEFT JOIN ( df.dataset_version_uuid, JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets FROM dataset_facets_view AS df - WHERE df.facet IS NOT NULL + WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') GROUP BY df.dataset_version_uuid ) f ON f.dataset_version_uuid = d.current_version_uuid WHERE d.namespace_name = :namespaceName diff --git a/api/src/main/java/marquez/db/DatasetFacetsDao.java b/api/src/main/java/marquez/db/DatasetFacetsDao.java index 28361aaa70..679a9bfaa3 100644 --- a/api/src/main/java/marquez/db/DatasetFacetsDao.java +++ b/api/src/main/java/marquez/db/DatasetFacetsDao.java @@ -149,6 +149,58 @@ default void insertDatasetFacetsFor( FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName)))); } + default void insertInputDatasetFacetsFor( + @NonNull UUID datasetUuid, + @NonNull UUID datasetVersionUuid, + @NonNull UUID runUuid, + @NonNull Instant lineageEventTime, + @NonNull String lineageEventType, + @NonNull LineageEvent.InputDatasetFacets inputFacets) { + final Instant now = Instant.now(); + + JsonNode jsonNode = Utils.getMapper().valueToTree(inputFacets); + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false) + .forEach( + fieldName -> + insertDatasetFacet( + now, + datasetUuid, + datasetVersionUuid, + runUuid, + lineageEventTime, + lineageEventType, + Type.INPUT, + fieldName, + FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName)))); + } + + default void insertOutputDatasetFacetsFor( + @NonNull UUID datasetUuid, + @NonNull UUID datasetVersionUuid, + @NonNull UUID runUuid, + @NonNull Instant lineageEventTime, + @NonNull String lineageEventType, + @NonNull LineageEvent.OutputDatasetFacets outputFacets) { + final Instant now = Instant.now(); + + JsonNode jsonNode = Utils.getMapper().valueToTree(outputFacets); + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false) + .forEach( + fieldName -> + insertDatasetFacet( + now, + datasetUuid, + datasetVersionUuid, + runUuid, + lineageEventTime, + lineageEventType, + Type.OUTPUT, + fieldName, + FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName)))); + } + record DatasetFacetRow( Instant createdAt, UUID datasetUuid, diff --git a/api/src/main/java/marquez/db/DatasetVersionDao.java b/api/src/main/java/marquez/db/DatasetVersionDao.java index f08b2f3903..7b3b544a32 100644 --- a/api/src/main/java/marquez/db/DatasetVersionDao.java +++ b/api/src/main/java/marquez/db/DatasetVersionDao.java @@ -156,10 +156,19 @@ default void updateDatasetVersionMetric( @SqlQuery( """ + WITH selected_dataset_versions AS ( + SELECT dv.* + FROM dataset_versions dv + WHERE dv.version = :version + ), selected_dataset_version_facets AS ( + SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet + FROM selected_dataset_versions dv + LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid + ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, t.tags, f.facets - FROM dataset_versions dv + FROM selected_dataset_versions dv LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid LEFT JOIN ( @@ -169,21 +178,28 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = dv.dataset_uuid LEFT JOIN ( - SELECT dvf.dataset_version_uuid, - JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets - FROM dataset_facets_view dvf - GROUP BY dataset_version_uuid - ) f ON f.dataset_version_uuid = dv.uuid - WHERE dv.version = :version - """) + SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets + FROM selected_dataset_version_facets dvf + WHERE dvf.run_uuid = dvf.run_uuid + GROUP BY dvf.uuid + ) f ON f.dataset_uuid = dv.uuid""") Optional findBy(UUID version); @SqlQuery( """ + WITH selected_dataset_versions AS ( + SELECT dv.* + FROM dataset_versions dv + WHERE dv.uuid = :uuid + ), selected_dataset_version_facets AS ( + SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet + FROM selected_dataset_versions dv + LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') + ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, t.tags, f.facets - FROM dataset_versions dv + FROM selected_dataset_versions dv LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid LEFT JOIN ( @@ -192,14 +208,12 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = dv.dataset_uuid - LEFT JOIN ( - SELECT dvf.dataset_version_uuid, - JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets - FROM dataset_facets_view dvf - GROUP BY dataset_version_uuid - ) f ON f.dataset_version_uuid = dv.uuid - WHERE dv.uuid = :uuid - """) + LEFT JOIN ( + SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets + FROM selected_dataset_version_facets dvf + WHERE dvf.run_uuid = dvf.run_uuid + GROUP BY dvf.uuid + ) f ON f.dataset_uuid = dv.uuid""") Optional findByUuid(UUID uuid); default Optional findByWithRun(UUID version) { @@ -246,6 +260,7 @@ LEFT JOIN ( SELECT dvf.dataset_version_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets FROM dataset_facets_view dvf + WHERE (type ILIKE 'dataset' OR type ILIKE 'unknown') GROUP BY dataset_version_uuid ) f ON f.dataset_version_uuid = dv.uuid WHERE dv.namespace_name = :namespaceName diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 04c86b7701..f71f24e562 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -38,37 +38,45 @@ public interface LineageDao { */ @SqlQuery( """ - WITH RECURSIVE - job_io AS ( - SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, - ARRAY_AGG(DISTINCT j.uuid) AS ids, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs - FROM jobs j - LEFT JOIN jobs_view s On s.uuid=j.symlink_target_uuid - LEFT JOIN job_versions v on v.uuid=COALESCE(s.current_version_uuid, j.current_version_uuid) - LEFT JOIN job_versions_io_mapping io ON io.job_version_uuid=v.uuid - GROUP BY COALESCE(j.symlink_target_uuid, j.uuid) - ), - lineage(job_uuid, inputs, outputs) AS ( - SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, - COALESCE(inputs, Array[]::uuid[]) AS inputs, - COALESCE(outputs, Array[]::uuid[]) AS outputs, - 0 AS depth - FROM jobs_view j - INNER JOIN job_io io ON j.uuid=ANY(io.ids) - WHERE io.ids && ARRAY[]::uuid[] - UNION - SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1 - FROM job_io io, - lineage l - WHERE io.job_uuid != l.job_uuid AND - array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) - AND depth < :depth) - SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context - FROM lineage l2 - INNER JOIN jobs_view j ON j.uuid=l2.job_uuid - LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid; + WITH RECURSIVE + -- Find the current version of a job or its symlink target if the target has no + -- current_version_uuid. This ensures that we don't lose lineage for a job after it is + -- symlinked to another job but before that target job has run successfully. + job_current_version AS ( + SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, + COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid + FROM jobs j + LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid + WHERE s.current_version_uuid IS NULL + ), + job_io AS ( + SELECT j.job_uuid, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs + FROM job_versions_io_mapping io + INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid + GROUP BY j.job_uuid + ), + lineage(job_uuid, inputs, outputs) AS ( + SELECT v.job_uuid AS job_uuid, + COALESCE(inputs, Array[]::uuid[]) AS inputs, + COALESCE(outputs, Array[]::uuid[]) AS outputs, + 0 AS depth + FROM jobs j + INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid + LEFT JOIN job_io io ON io.job_uuid=v.job_uuid + WHERE j.uuid IN () OR j.symlink_target_uuid IN () + UNION + SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1 + FROM job_io io, + lineage l + WHERE io.job_uuid != l.job_uuid AND + array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) + AND depth < :depth) + SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context + FROM lineage l2 + INNER JOIN jobs_view j ON j.uuid=l2.job_uuid + LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid; """) Set getLineage(@BindList Set jobIds, int depth); diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 7be42d0dca..5f1b30c545 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -279,6 +279,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper now, event.getEventType(), facets)); + + // InputFacets ... + Optional.ofNullable(dataset.getInputFacets()) + .ifPresent( + facets -> + datasetFacetsDao.insertInputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + event.getEventType(), + facets)); } } bag.setInputs(Optional.ofNullable(datasetInputs)); @@ -314,6 +326,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper now, event.getEventType(), facets)); + + // OutputFacets ... + Optional.ofNullable(dataset.getOutputFacets()) + .ifPresent( + facets -> + datasetFacetsDao.insertOutputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + event.getEventType(), + facets)); } } diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 0437b9605a..57a708bd64 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -74,33 +74,51 @@ public interface RunDao extends BaseDao { void updateEndState(UUID rowUuid, Instant transitionedAt, UUID endRunStateUuid); String BASE_FIND_RUN_SQL = - "SELECT r.*, ra.args, f.facets,\n" - + "jv.version AS job_version,\n" - + "ri.input_versions, ro.output_versions\n" - + "FROM runs_view AS r\n" - + "LEFT OUTER JOIN\n" - + "(\n" - + " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n" - + " FROM run_facets_view rf\n" - + " GROUP BY rf.run_uuid\n" - + ") AS f ON r.uuid=f.run_uuid\n" - + "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n" - + "LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n" - + "LEFT OUTER JOIN (\n" - + " SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,\n" - + " 'name', dv.dataset_name,\n" - + " 'version', dv.version)) AS input_versions\n" - + " FROM runs_input_mapping im\n" - + " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n" - + " GROUP BY im.run_uuid\n" - + ") ri ON ri.run_uuid=r.uuid\n" - + "LEFT OUTER JOIN (\n" - + " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n" - + " 'name', dataset_name,\n" - + " 'version', version)) AS output_versions\n" - + " FROM dataset_versions\n" - + " GROUP BY run_uuid\n" - + ") ro ON ro.run_uuid=r.uuid\n"; + """ + SELECT r.*, ra.args, f.facets, + jv.version AS job_version, + ri.input_versions, ro.output_versions, df.dataset_facets + FROM runs_view AS r + LEFT OUTER JOIN + ( + SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets + FROM run_facets_view rf + GROUP BY rf.run_uuid + ) AS f ON r.uuid=f.run_uuid + LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid + LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid + LEFT OUTER JOIN ( + SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, + 'name', dv.dataset_name, + 'version', dv.version, + 'dataset_version_uuid', uuid)) AS input_versions + FROM runs_input_mapping im + INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid + GROUP BY im.run_uuid + ) ri ON ri.run_uuid=r.uuid + LEFT OUTER JOIN ( + SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, + 'name', dataset_name, + 'version', version, + 'dataset_version_uuid', uuid + )) AS output_versions + FROM dataset_versions + GROUP BY run_uuid + ) ro ON ro.run_uuid=r.uuid + LEFT OUTER JOIN ( + SELECT + run_uuid, + JSON_AGG(json_build_object( + 'dataset_version_uuid', dataset_version_uuid, + 'name', name, + 'type', type, + 'facet', facet + ) ORDER BY created_at ASC) as dataset_facets + FROM dataset_facets_view + WHERE (type ILIKE 'output' OR type ILIKE 'input') + GROUP BY run_uuid + ) AS df ON r.uuid = df.run_uuid + """; @SqlQuery(BASE_FIND_RUN_SQL + "WHERE r.uuid = :runUuid") Optional findRunByUuid(UUID runUuid); @@ -123,7 +141,7 @@ public interface RunDao extends BaseDao { """ SELECT r.*, ra.args, f.facets, j.namespace_name, j.name, jv.version AS job_version, - ri.input_versions, ro.output_versions + ri.input_versions, ro.output_versions, df.dataset_facets FROM runs_view AS r INNER JOIN jobs_view j ON r.job_uuid=j.uuid LEFT JOIN LATERAL @@ -138,7 +156,9 @@ SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS fac LEFT OUTER JOIN ( SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, 'name', dv.dataset_name, - 'version', dv.version)) AS input_versions + 'version', dv.version, + 'dataset_version_uuid', uuid + )) AS input_versions FROM runs_input_mapping im INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid GROUP BY im.run_uuid @@ -146,10 +166,25 @@ SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, LEFT OUTER JOIN ( SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, 'name', dataset_name, - 'version', version)) AS output_versions + 'version', version, + 'dataset_version_uuid', uuid + )) AS output_versions FROM dataset_versions GROUP BY run_uuid ) ro ON ro.run_uuid=r.uuid + LEFT OUTER JOIN ( + SELECT + run_uuid, + JSON_AGG(json_build_object( + 'dataset_version_uuid', dataset_version_uuid, + 'name', name, + 'type', type, + 'facet', facet + ) ORDER BY created_at ASC) as dataset_facets + FROM dataset_facets_view + WHERE (type ILIKE 'output' OR type ILIKE 'input') + GROUP BY run_uuid + ) AS df ON r.uuid = df.run_uuid WHERE j.namespace_name=:namespace AND (j.name=:jobName OR :jobName = ANY(j.aliases)) ORDER BY STARTED_AT DESC NULLS LAST LIMIT :limit OFFSET :offset diff --git a/api/src/main/java/marquez/db/mappers/RunMapper.java b/api/src/main/java/marquez/db/mappers/RunMapper.java index 0dad020ade..9f9354fa4b 100644 --- a/api/src/main/java/marquez/db/mappers/RunMapper.java +++ b/api/src/main/java/marquez/db/mappers/RunMapper.java @@ -6,6 +6,7 @@ package marquez.db.mappers; import static java.time.temporal.ChronoUnit.MILLIS; +import static java.util.stream.Collectors.toList; import static marquez.common.models.RunState.NEW; import static marquez.db.Columns.stringOrNull; import static marquez.db.Columns.stringOrThrow; @@ -15,7 +16,10 @@ import static marquez.db.Columns.uuidOrThrow; import static marquez.db.mappers.MapperUtils.toFacetsOrNull; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.sql.ResultSet; @@ -26,19 +30,30 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import marquez.common.Utils; +import marquez.common.models.DatasetName; import marquez.common.models.DatasetVersionId; +import marquez.common.models.InputDatasetVersion; +import marquez.common.models.NamespaceName; +import marquez.common.models.OutputDatasetVersion; import marquez.common.models.RunId; import marquez.common.models.RunState; import marquez.db.Columns; import marquez.service.models.Run; import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.statement.StatementContext; +import org.postgresql.util.PGobject; +@Slf4j public final class RunMapper implements RowMapper { private final String columnPrefix; + private static final ObjectMapper MAPPER = Utils.getMapper(); + public RunMapper() { this(""); } @@ -56,6 +71,14 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) Optional durationMs = Optional.ofNullable(timestampOrNull(results, columnPrefix + Columns.ENDED_AT)) .flatMap(endedAt -> startedAt.map(s -> s.until(endedAt, MILLIS))); + List inputDatasetVersions = + columnNames.contains(columnPrefix + Columns.INPUT_VERSIONS) + ? toQueryDatasetVersion(results, columnPrefix + Columns.INPUT_VERSIONS) + : ImmutableList.of(); + List outputDatasetVersions = + columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS) + ? toQueryDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS) + : ImmutableList.of(); return new Run( RunId.of(uuidOrThrow(results, columnPrefix + Columns.ROW_UUID)), timestampOrThrow(results, columnPrefix + Columns.CREATED_AT), @@ -77,21 +100,18 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) stringOrThrow(results, columnPrefix + Columns.JOB_NAME), uuidOrNull(results, columnPrefix + Columns.JOB_VERSION), stringOrNull(results, columnPrefix + Columns.LOCATION), - columnNames.contains(columnPrefix + Columns.INPUT_VERSIONS) - ? toDatasetVersion(results, columnPrefix + Columns.INPUT_VERSIONS) - : ImmutableList.of(), - columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS) - ? toDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS) - : ImmutableList.of(), + toInputDatasetVersions(results, inputDatasetVersions, true), + toOutputDatasetVersions(results, outputDatasetVersions, false), toFacetsOrNull(results, columnPrefix + Columns.FACETS)); } - private List toDatasetVersion(ResultSet rs, String column) throws SQLException { + private List toQueryDatasetVersion(ResultSet rs, String column) + throws SQLException { String dsString = rs.getString(column); if (dsString == null) { return Collections.emptyList(); } - return Utils.fromJson(dsString, new TypeReference>() {}); + return Utils.fromJson(dsString, new TypeReference>() {}); } private Map toArgsOrNull(ResultSet results, String argsColumn) @@ -105,4 +125,94 @@ private Map toArgsOrNull(ResultSet results, String argsColumn) } return Utils.fromJson(args, new TypeReference>() {}); } + + private List toInputDatasetVersions( + ResultSet rs, List datasetVersionIds, boolean input) + throws SQLException { + ImmutableList queryFacets = getQueryDatasetFacets(rs); + try { + return datasetVersionIds.stream() + .map( + version -> + new InputDatasetVersion( + version.toDatasetVersionId(), getFacetsMap(input, queryFacets, version))) + .collect(toList()); + } catch (IllegalStateException e) { + return Collections.emptyList(); + } + } + + private List toOutputDatasetVersions( + ResultSet rs, List datasetVersionIds, boolean input) + throws SQLException { + ImmutableList queryFacets = getQueryDatasetFacets(rs); + try { + return datasetVersionIds.stream() + .map( + version -> + new OutputDatasetVersion( + version.toDatasetVersionId(), getFacetsMap(input, queryFacets, version))) + .collect(toList()); + } catch (IllegalStateException e) { + return Collections.emptyList(); + } + } + + private ImmutableMap getFacetsMap( + boolean input, + ImmutableList queryDatasetFacets, + QueryDatasetVersion queryDatasetVersion) { + return ImmutableMap.copyOf( + queryDatasetFacets.stream() + .filter(rf -> rf.type.equalsIgnoreCase(input ? "input" : "output")) + .filter(rf -> rf.datasetVersionUUID.equals(queryDatasetVersion.datasetVersionUUID)) + .collect( + Collectors.toMap( + QueryDatasetFacet::name, + facet -> + Utils.getMapper() + .convertValue( + Utils.getMapper().valueToTree(facet.facet).get(facet.name), + Object.class), + (a1, a2) -> a2 // in case of duplicates, choose more recent + ))); + } + + private ImmutableList getQueryDatasetFacets(ResultSet resultSet) + throws SQLException { + String column = columnPrefix + Columns.DATASET_FACETS; + ImmutableList queryDatasetFacets = ImmutableList.of(); + if (Columns.exists(resultSet, column) && resultSet.getObject(column) != null) { + try { + queryDatasetFacets = + MAPPER.readValue( + ((PGobject) resultSet.getObject(column)).getValue(), + new TypeReference>() {}); + } catch (JsonProcessingException e) { + log.error(String.format("Could not read dataset from job row %s", column), e); + } + } + return queryDatasetFacets; + } + + record QueryDatasetFacet( + @JsonProperty("dataset_version_uuid") String datasetVersionUUID, + String name, + String type, + Object facet) {} + + record QueryDatasetVersion( + String namespace, + String name, + UUID version, + // field required to merge input versions with input dataset facets + @JsonProperty("dataset_version_uuid") String datasetVersionUUID) { + public DatasetVersionId toDatasetVersionId() { + return DatasetVersionId.builder() + .name(DatasetName.of(name)) + .namespace(NamespaceName.of(namespace)) + .version(version) + .build(); + } + } } diff --git a/api/src/main/java/marquez/service/models/LineageEvent.java b/api/src/main/java/marquez/service/models/LineageEvent.java index 6ae0b2419c..9478a7bdbd 100644 --- a/api/src/main/java/marquez/service/models/LineageEvent.java +++ b/api/src/main/java/marquez/service/models/LineageEvent.java @@ -309,6 +309,22 @@ public static class Dataset extends BaseJsonModel { @NotNull private String namespace; @NotNull private String name; @Valid private DatasetFacets facets; + @Valid private InputDatasetFacets inputFacets; + @Valid private OutputDatasetFacets outputFacets; + + /** + * Constructor with three args added manually to support dozens of existing usages created + * before adding inputFacets and outputFacets, as Lombok does not provide SomeArgsConstructor. + * + * @param namespace + * @param name + * @param facets + */ + public Dataset(String namespace, String name, DatasetFacets facets) { + this.namespace = namespace; + this.name = name; + this.facets = facets; + } } @Builder @@ -561,4 +577,48 @@ public static class ColumnLineageInputField extends BaseJsonModel { @NotNull private String name; @NotNull private String field; } + + @Builder + @AllArgsConstructor + @NoArgsConstructor + @Setter + @Getter + @Valid + @ToString + public static class InputDatasetFacets { + + @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); + + @JsonAnySetter + public void setInputFacet(String key, Object value) { + additional.put(key, value); + } + + @JsonAnyGetter + public Map getAdditionalFacets() { + return additional; + } + } + + @Builder + @AllArgsConstructor + @NoArgsConstructor + @Setter + @Getter + @Valid + @ToString + public static class OutputDatasetFacets { + + @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); + + @JsonAnySetter + public void setOutputFacet(String key, Object value) { + additional.put(key, value); + } + + @JsonAnyGetter + public Map getAdditionalFacets() { + return additional; + } + } } diff --git a/api/src/main/java/marquez/service/models/Run.java b/api/src/main/java/marquez/service/models/Run.java index 99231772f9..29e4f61b67 100644 --- a/api/src/main/java/marquez/service/models/Run.java +++ b/api/src/main/java/marquez/service/models/Run.java @@ -25,10 +25,11 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; import marquez.api.models.JobVersion; -import marquez.common.models.DatasetVersionId; +import marquez.common.models.InputDatasetVersion; import marquez.common.models.JobName; import marquez.common.models.JobVersionId; import marquez.common.models.NamespaceName; +import marquez.common.models.OutputDatasetVersion; import marquez.common.models.RunId; import marquez.common.models.RunState; @@ -58,9 +59,10 @@ public final class Run { private final String jobName; private final UUID jobVersion; private final String location; - @Getter private final List inputVersions; - @Getter private final List outputVersions; + @Getter private final List inputDatasetVersions; + @Getter private final List outputDatasetVersions; @Getter private final ImmutableMap facets; + ; public Run( @NonNull final RunId id, @@ -77,8 +79,8 @@ public Run( String jobName, UUID jobVersion, String location, - List inputVersions, - List outputVersions, + List inputDatasetVersions, + List outputDatasetFacets, @Nullable final ImmutableMap facets) { this.id = id; this.createdAt = createdAt; @@ -94,8 +96,8 @@ public Run( this.jobName = jobName; this.jobVersion = jobVersion; this.location = location; - this.inputVersions = inputVersions; - this.outputVersions = outputVersions; + this.inputDatasetVersions = inputDatasetVersions; + this.outputDatasetVersions = outputDatasetFacets; this.facets = (facets == null) ? ImmutableMap.of() : facets; } @@ -161,12 +163,16 @@ public static class Builder { private Map args; private JobVersionId jobVersion; private String location; - private List inputVersions; - private List outputVersions; @JsonInclude(JsonInclude.Include.NON_NULL) private ImmutableMap facets; + @JsonInclude(JsonInclude.Include.NON_NULL) + private List inputDatasetVersions; + + @JsonInclude(JsonInclude.Include.NON_NULL) + private List outputDatasetVersions; + public Run build() { return new Run( id, @@ -183,8 +189,8 @@ public Run build() { jobVersion.getName().getValue(), jobVersion.getVersion(), location, - inputVersions, - outputVersions, + inputDatasetVersions, + outputDatasetVersions, facets); } } diff --git a/api/src/test/java/marquez/FlowIntegrationTest.java b/api/src/test/java/marquez/FlowIntegrationTest.java index b4e7ff631d..5ea0d47850 100644 --- a/api/src/test/java/marquez/FlowIntegrationTest.java +++ b/api/src/test/java/marquez/FlowIntegrationTest.java @@ -109,29 +109,41 @@ public void testOutputVersionShouldBeOnlyOneCreatedViaJobAndDatasetApi() throws client.markRunAs(createdRun.getId(), RunState.COMPLETED); Map body = getRunResponse(createdRun); - assertThat(((List>) body.get("outputVersions"))).size().isEqualTo(1); + assertThat(((List>) body.get("outputDatasetVersions"))).size().isEqualTo(1); assertInputDatasetVersionDiffersFromOutput(body); } private void assertInputDatasetVersionDiffersFromOutput(Map body) throws IOException { - List> inputDatasetVersionIds = - ((List>) body.get("inputVersions")); - assertThat(inputDatasetVersionIds.stream().map(Map::entrySet).collect(Collectors.toList())) + List>> inputDatasetVersionIds = + ((List>>) body.get("inputDatasetVersions")); + assertThat( + inputDatasetVersionIds.stream() + .map(e -> e.get("datasetVersionId")) + .map(Map::entrySet) + .collect(Collectors.toList())) .allMatch(e -> e.contains(entry("namespace", NAMESPACE_NAME))) .allMatch(e -> e.contains(entry("name", DATASET_NAME))); - List> outputDatasetVersionIds = - ((List>) body.get("outputVersions")); - assertThat(outputDatasetVersionIds.stream().map(Map::entrySet).collect(Collectors.toList())) + List>> outputDatasetVersionIds = + ((List>>) body.get("outputDatasetVersions")); + assertThat( + outputDatasetVersionIds.stream() + .map(e -> e.get("datasetVersionId")) + .map(Map::entrySet) + .collect(Collectors.toList())) .allMatch(e -> e.contains(entry("namespace", NAMESPACE_NAME))) .allMatch(e -> e.contains(entry("name", DATASET_NAME))); List inputVersions = - inputDatasetVersionIds.stream().map(it -> it.get("version")).collect(Collectors.toList()); + inputDatasetVersionIds.stream() + .map(it -> it.get("datasetVersionId").get("version")) + .collect(Collectors.toList()); List outputVersions = - outputDatasetVersionIds.stream().map(it -> it.get("version")).collect(Collectors.toList()); + outputDatasetVersionIds.stream() + .map(it -> it.get("datasetVersionId").get("version")) + .collect(Collectors.toList()); assertThat(Collections.disjoint(inputVersions, outputVersions)).isTrue(); } diff --git a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java index fd408a2477..6a4e9e0ed3 100644 --- a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java @@ -9,6 +9,7 @@ import static marquez.db.LineageTestUtils.SCHEMA_URL; import static org.assertj.core.api.Assertions.assertThat; +import com.google.common.collect.ImmutableMap; import java.time.Instant; import java.util.Arrays; import java.util.Collections; @@ -18,6 +19,7 @@ import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.Dataset; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -304,6 +306,70 @@ public void testInsertDatasetFacetsForUnknownTypeFacet() { assertThat(facet.facet().toString()).isEqualTo("{\"custom-output\": \"{whatever}\"}"); } + @Test + public void testInsertOutputDatasetFacetsFor() { + LineageEvent.JobFacet jobFacet = + new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job_" + UUID.randomUUID(), + "COMPLETE", + jobFacet, + Collections.emptyList(), + Arrays.asList( + new Dataset( + "namespace", + "dataset_output", + null, + null, + LineageEvent.OutputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "outputFacet1", "{some-facet1}", + "outputFacet2", "{some-facet2}")) + .build())), + null); + + assertThat(getDatasetFacet(lineageRow, "outputFacet1").facet().toString()) + .isEqualTo("{\"outputFacet1\": \"{some-facet1}\"}"); + assertThat(getDatasetFacet(lineageRow, "outputFacet2").facet().toString()) + .isEqualTo("{\"outputFacet2\": \"{some-facet2}\"}"); + } + + @Test + public void testInsertInputDatasetFacetsFor() { + LineageEvent.JobFacet jobFacet = + new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job_" + UUID.randomUUID(), + "COMPLETE", + jobFacet, + Arrays.asList( + new Dataset( + "namespace", + "dataset_output", + null, + LineageEvent.InputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "inputFacet1", "{some-facet1}", + "inputFacet2", "{some-facet2}")) + .build(), + null)), + Collections.emptyList(), + null); + + assertThat(getDatasetFacet(lineageRow, "inputFacet1").facet().toString()) + .isEqualTo("{\"inputFacet1\": \"{some-facet1}\"}"); + assertThat(getDatasetFacet(lineageRow, "inputFacet2").facet().toString()) + .isEqualTo("{\"inputFacet2\": \"{some-facet2}\"}"); + } + private UpdateLineageRow createLineageRowWithInputDataset( LineageEvent.DatasetFacets.DatasetFacetsBuilder inputDatasetFacetsbuilder) { LineageEvent.JobFacet jobFacet = diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index e42b1667d5..354ab495dc 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -841,7 +841,7 @@ public void testGetCurrentRunsWithFacetsGetsLatestRun() { // assert that run_args, input/output versions, and run facets are fetched from the dao. for (Run run : currentRuns) { assertThat(run.getArgs()).hasSize(2); - assertThat(run.getOutputVersions()).hasSize(1); + assertThat(run.getOutputDatasetVersions()).hasSize(1); assertThat(run.getFacets()).hasSize(1); } } diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index fd68d1ec9a..d0ddf7253b 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -14,6 +14,9 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import marquez.common.models.DatasetName; +import marquez.common.models.DatasetVersionId; +import marquez.common.models.NamespaceName; import marquez.db.models.UpdateLineageRow; import marquez.db.models.UpdateLineageRow.DatasetRecord; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; @@ -23,12 +26,14 @@ import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; +import marquez.service.models.Run; import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.groups.Tuple; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @ExtendWith(MarquezJdbiExternalPostgresExtension.class) class OpenLineageDaoTest { @@ -48,6 +53,7 @@ class OpenLineageDaoTest { private static DatasetSymlinkDao symlinkDao; private static NamespaceDao namespaceDao; private static DatasetFieldDao datasetFieldDao; + private static RunDao runDao; private final DatasetFacets datasetFacets = LineageTestUtils.newDatasetFacet( new SchemaField("name", "STRING", "my name"), new SchemaField("age", "INT", "my age")); @@ -58,6 +64,7 @@ public static void setUpOnce(Jdbi jdbi) { symlinkDao = jdbi.onDemand(DatasetSymlinkDao.class); namespaceDao = jdbi.onDemand(NamespaceDao.class); datasetFieldDao = jdbi.onDemand(DatasetFieldDao.class); + runDao = jdbi.onDemand(RunDao.class); } /** When reading a dataset, the version is assumed to be the version last written */ @@ -508,6 +515,69 @@ void testGetOpenLineageEvents() { .contains(LineageTestUtils.NAMESPACE, WRITE_JOB_NAME); } + @Test + void testInputOutputDatasetFacets() { + JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + dao, + WRITE_JOB_NAME, + "COMPLETE", + jobFacet, + Arrays.asList( + new Dataset( + "namespace", + "dataset_input", + null, + LineageEvent.InputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "inputFacet1", "{some-facet1}", + "inputFacet2", "{some-facet2}")) + .build(), + null)), + Arrays.asList( + new Dataset( + "namespace", + "dataset_output", + null, + null, + LineageEvent.OutputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "outputFacet1", "{some-facet1}", + "outputFacet2", "{some-facet2}")) + .build()))); + + Run run = runDao.findRunByUuid(lineageRow.getRun().getUuid()).get(); + + assertThat(run.getInputDatasetVersions()).hasSize(1); + assertThat(run.getInputDatasetVersions().get(0).getDatasetVersionId()) + .isEqualTo( + new DatasetVersionId( + NamespaceName.of("namespace"), + DatasetName.of("dataset_input"), + lineageRow.getInputs().get().get(0).getDatasetVersionRow().getVersion())); + assertThat(run.getInputDatasetVersions().get(0).getFacets()) + .containsAllEntriesOf( + ImmutableMap.of( + "inputFacet1", "{some-facet1}", + "inputFacet2", "{some-facet2}")); + + assertThat(run.getOutputDatasetVersions()).hasSize(1); + assertThat(run.getOutputDatasetVersions().get(0).getDatasetVersionId()) + .isEqualTo( + new DatasetVersionId( + NamespaceName.of("namespace"), + DatasetName.of("dataset_output"), + lineageRow.getOutputs().get().get(0).getDatasetVersionRow().getVersion())); + assertThat(run.getOutputDatasetVersions().get(0).getFacets()) + .containsAllEntriesOf( + ImmutableMap.of( + "outputFacet1", "{some-facet1}", + "outputFacet2", "{some-facet2}")); + } + private Dataset getInputDataset() { return new Dataset( INPUT_NAMESPACE, diff --git a/api/src/test/java/marquez/db/RunDaoTest.java b/api/src/test/java/marquez/db/RunDaoTest.java index 3095aebf0f..102c2c63ce 100644 --- a/api/src/test/java/marquez/db/RunDaoTest.java +++ b/api/src/test/java/marquez/db/RunDaoTest.java @@ -25,7 +25,9 @@ import marquez.api.JdbiUtils; import marquez.common.models.DatasetId; import marquez.common.models.DatasetVersionId; +import marquez.common.models.InputDatasetVersion; import marquez.common.models.NamespaceName; +import marquez.common.models.OutputDatasetVersion; import marquez.common.models.RunId; import marquez.common.models.RunState; import marquez.db.models.ExtendedRunRow; @@ -87,16 +89,21 @@ public void getRun() { assertThat(run) .isPresent() .get() - .extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getInputDatasetVersions, InstanceOfAssertFactories.list(InputDatasetVersion.class)) .hasSize(jobMeta.getInputs().size()) + .map(InputDatasetVersion::getDatasetVersionId) .map(DatasetVersionId::getName) .containsAll( jobMeta.getInputs().stream().map(DatasetId::getName).collect(Collectors.toSet())); assertThat(run) .get() - .extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getOutputDatasetVersions, + InstanceOfAssertFactories.list(OutputDatasetVersion.class)) .hasSize(jobMeta.getOutputs().size()) + .map(OutputDatasetVersion::getDatasetVersionId) .map(DatasetVersionId::getName) .containsAll( jobMeta.getOutputs().stream().map(DatasetId::getName).collect(Collectors.toSet())); diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index b1a3bc8213..7f6828dfa0 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -16,10 +16,11 @@ import java.util.Optional; import marquez.api.JdbiUtils; import marquez.common.models.DatasetName; -import marquez.common.models.DatasetVersionId; +import marquez.common.models.InputDatasetVersion; import marquez.common.models.JobId; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; +import marquez.common.models.OutputDatasetVersion; import marquez.db.DatasetDao; import marquez.db.JobDao; import marquez.db.LineageDao; @@ -158,10 +159,13 @@ public void testLineage() { .get(); runAssert.extracting(r -> r.getId().getValue()).isEqualTo(secondRun.getRun().getUuid()); runAssert - .extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getInputDatasetVersions, InstanceOfAssertFactories.list(InputDatasetVersion.class)) .hasSize(0); runAssert - .extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getOutputDatasetVersions, + InstanceOfAssertFactories.list(OutputDatasetVersion.class)) .hasSize(1); // check the output edges for the commonDataset node @@ -267,10 +271,13 @@ public void testLineageWithDeletedDataset() { .get(); runAssert.extracting(r -> r.getId().getValue()).isEqualTo(secondRun.getRun().getUuid()); runAssert - .extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getInputDatasetVersions, InstanceOfAssertFactories.list(InputDatasetVersion.class)) .hasSize(0); runAssert - .extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getOutputDatasetVersions, + InstanceOfAssertFactories.list(InputDatasetVersion.class)) .hasSize(1); // check the output edges for the commonDataset node diff --git a/clients/java/src/main/java/marquez/client/models/InputDatasetVersion.java b/clients/java/src/main/java/marquez/client/models/InputDatasetVersion.java new file mode 100644 index 0000000000..6aa0526949 --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/InputDatasetVersion.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import lombok.NonNull; +import lombok.Value; + +/** Class to contain inputFacets. */ +@Value +public class InputDatasetVersion { + + private final DatasetVersionId datasetVersionId; + private final ImmutableMap facets; + + public InputDatasetVersion( + @JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId, + @JsonProperty("facets") @NonNull ImmutableMap facets) { + this.datasetVersionId = datasetVersionId; + this.facets = facets; + } +} diff --git a/clients/java/src/main/java/marquez/client/models/OutputDatasetVersion.java b/clients/java/src/main/java/marquez/client/models/OutputDatasetVersion.java new file mode 100644 index 0000000000..3c3f61c75a --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/OutputDatasetVersion.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import lombok.NonNull; +import lombok.Value; + +/** Class to contain outputFacets. */ +@Value +public class OutputDatasetVersion { + + private final DatasetVersionId datasetVersionId; + private final ImmutableMap facets; + + public OutputDatasetVersion( + @JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId, + @JsonProperty("facets") @NonNull ImmutableMap facets) { + this.datasetVersionId = datasetVersionId; + this.facets = facets; + } +} diff --git a/clients/java/src/main/java/marquez/client/models/Run.java b/clients/java/src/main/java/marquez/client/models/Run.java index e2e8d43ec8..24c57936ad 100644 --- a/clients/java/src/main/java/marquez/client/models/Run.java +++ b/clients/java/src/main/java/marquez/client/models/Run.java @@ -8,6 +8,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableMap; import java.time.Instant; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; @@ -27,6 +29,8 @@ public final class Run extends RunMeta { @Nullable private final Long durationMs; @Nullable private final Instant endedAt; @Getter private final Map facets; + @Getter private final List inputDatasetVersions; + @Getter private final List outputDatasetVersions; public Run( @NonNull final String id, @@ -39,7 +43,9 @@ public Run( @Nullable final Instant endedAt, @Nullable final Long durationMs, @Nullable final Map args, - @Nullable final Map facets) { + @Nullable final Map facets, + @Nullable final List inputDatasetVersions, + @Nullable final List outputDatasetVersions) { super(id, nominalStartTime, nominalEndTime, args); this.createdAt = createdAt; this.updatedAt = updatedAt; @@ -48,6 +54,10 @@ public Run( this.durationMs = durationMs; this.endedAt = endedAt; this.facets = (facets == null) ? ImmutableMap.of() : ImmutableMap.copyOf(facets); + this.inputDatasetVersions = + (inputDatasetVersions == null) ? Collections.emptyList() : inputDatasetVersions; + this.outputDatasetVersions = + (outputDatasetVersions == null) ? Collections.emptyList() : outputDatasetVersions; } public Optional getStartedAt() { diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index ff95f3bdbd..a24c98315b 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -14,11 +14,13 @@ import static marquez.client.models.ModelGenerator.newDatasetPhysicalName; import static marquez.client.models.ModelGenerator.newDescription; import static marquez.client.models.ModelGenerator.newFields; +import static marquez.client.models.ModelGenerator.newInputDatasetVersion; import static marquez.client.models.ModelGenerator.newInputs; import static marquez.client.models.ModelGenerator.newJobIdWith; import static marquez.client.models.ModelGenerator.newJobType; import static marquez.client.models.ModelGenerator.newLocation; import static marquez.client.models.ModelGenerator.newNamespaceName; +import static marquez.client.models.ModelGenerator.newOutputDatasetVersion; import static marquez.client.models.ModelGenerator.newOutputs; import static marquez.client.models.ModelGenerator.newOwnerName; import static marquez.client.models.ModelGenerator.newRunArgs; @@ -78,6 +80,7 @@ import marquez.client.models.DbTableVersion; import marquez.client.models.Edge; import marquez.client.models.Field; +import marquez.client.models.InputDatasetVersion; import marquez.client.models.Job; import marquez.client.models.JobId; import marquez.client.models.JobMeta; @@ -89,6 +92,7 @@ import marquez.client.models.Node; import marquez.client.models.NodeId; import marquez.client.models.NodeType; +import marquez.client.models.OutputDatasetVersion; import marquez.client.models.Run; import marquez.client.models.RunMeta; import marquez.client.models.RunState; @@ -261,6 +265,13 @@ public class MarquezClientTest { private static final Instant ENDED_AT = START_AT.plusMillis(1000L); private static final long DURATION = START_AT.until(ENDED_AT, MILLIS); private static final Map RUN_ARGS = newRunArgs(); + + private static final List INPUT_RUN_DATASET_FACETS = + Collections.singletonList(newInputDatasetVersion()); + + private static final List OUTPUT_RUN_DATASET_FACETS = + Collections.singletonList(newOutputDatasetVersion()); + private static final Run NEW = new Run( newRunId(), @@ -273,7 +284,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS); private static final Run RUNNING = new Run( newRunId(), @@ -286,7 +299,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS); private static final Run COMPLETED = new Run( newRunId(), @@ -299,7 +314,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS); private static final Run ABORTED = new Run( newRunId(), @@ -312,7 +329,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS); private static final Run FAILED = new Run( newRunId(), @@ -325,7 +344,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS); private static final String RUN_ID = newRunId(); private static final Job JOB_WITH_LATEST_RUN = @@ -353,7 +374,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null), + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS), null, null); diff --git a/clients/java/src/test/java/marquez/client/models/JsonGenerator.java b/clients/java/src/test/java/marquez/client/models/JsonGenerator.java index ef1b7e1c27..8a248c7cd5 100644 --- a/clients/java/src/test/java/marquez/client/models/JsonGenerator.java +++ b/clients/java/src/test/java/marquez/client/models/JsonGenerator.java @@ -310,12 +310,17 @@ private static ObjectNode toObj(final Run run) { .put("id", run.getId()) .put("createdAt", ISO_INSTANT.format(run.getCreatedAt())) .put("updatedAt", ISO_INSTANT.format(run.getUpdatedAt())); + final ArrayNode inputDatasetVersions = MAPPER.valueToTree(run.getInputDatasetVersions()); + final ArrayNode outputDatasetVersions = MAPPER.valueToTree(run.getOutputDatasetVersions()); + obj.put("nominalStartTime", run.getNominalStartTime().map(ISO_INSTANT::format).orElse(null)); obj.put("nominalEndTime", run.getNominalEndTime().map(ISO_INSTANT::format).orElse(null)); obj.put("state", run.getState().name()); obj.put("startedAt", run.getStartedAt().map(ISO_INSTANT::format).orElse(null)); obj.put("endedAt", run.getEndedAt().map(ISO_INSTANT::format).orElse(null)); obj.put("durationMs", run.getDurationMs().orElse(null)); + obj.putArray("inputDatasetVersions").addAll(inputDatasetVersions); + obj.putArray("outputDatasetVersions").addAll(outputDatasetVersions); final ObjectNode runArgs = MAPPER.createObjectNode(); run.getArgs().forEach(runArgs::put); diff --git a/clients/java/src/test/java/marquez/client/models/ModelGenerator.java b/clients/java/src/test/java/marquez/client/models/ModelGenerator.java index 5a18683d3d..3bfa758374 100644 --- a/clients/java/src/test/java/marquez/client/models/ModelGenerator.java +++ b/clients/java/src/test/java/marquez/client/models/ModelGenerator.java @@ -238,7 +238,19 @@ public static List newRuns(final int limit) { public static Run newRun() { final Instant now = newTimestamp(); return new Run( - newRunId(), now, now, now, now, RunState.NEW, null, null, null, newRunArgs(), null); + newRunId(), + now, + now, + now, + now, + RunState.NEW, + null, + null, + null, + newRunArgs(), + null, + null, + null); } public static String newOwnerName() { @@ -394,4 +406,16 @@ public static Map.Entry newFacetProducer() { public static Map.Entry newFacetSchemaURL() { return new AbstractMap.SimpleImmutableEntry<>("_schemaURL", "test_schemaURL" + newId()); } + + public static InputDatasetVersion newInputDatasetVersion() { + return new InputDatasetVersion( + new DatasetVersionId(newNamespaceName(), newDatasetName(), UUID.randomUUID()), + ImmutableMap.of("datasetFacet", "{some-facet1}")); + } + + public static OutputDatasetVersion newOutputDatasetVersion() { + return new OutputDatasetVersion( + new DatasetVersionId(newNamespaceName(), newDatasetName(), UUID.randomUUID()), + ImmutableMap.of("datasetFacet", "{some-facet1}")); + } } diff --git a/dev/README.md b/dev/README.md index c6a4067957..7e9cb49207 100644 --- a/dev/README.md +++ b/dev/README.md @@ -4,10 +4,11 @@ The `get_changes.sh` script uses a fork of saadmk11/changelog-ci to get all merged changes between two specified releases. To get all changes since the latest release, set `END_RELEASE_VERSION` to the planned next release. -The changes will appear in this directory in a new file, CHANGES.md. +The changes will appear at the top of CHANGELOG.md. #### Requirements +Python 3.10 or newer is required. See the requirements.txt file for required dependencies. The script also requires that the following environment variables be set: @@ -18,6 +19,8 @@ The script also requires that the following environment variables be set: For example: `export END_RELEASE_VERSION=0.21.0`. +Use the planned next release for the end release version. + For instructions on creating a GitHub personal access token to use the GitHub API, see: https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token. diff --git a/dev/get_changes.sh b/dev/get_changes.sh index 1c79edcd7b..c75f7e7013 100755 --- a/dev/get_changes.sh +++ b/dev/get_changes.sh @@ -3,7 +3,7 @@ # Copyright 2018-2023 contributors to the Marquez project # SPDX-License-Identifier: Apache-2.0 -export INPUT_CHANGELOG_FILENAME=CHANGES.md +export INPUT_CHANGELOG_FILENAME=../CHANGELOG.md export GITHUB_REPOSITORY=MarquezProject/marquez git clone --branch add-testing-script --single-branch git@github.com:merobi-hub/changelog-ci.git diff --git a/spec/openapi.yml b/spec/openapi.yml index ffb0255774..ab8d0b225a 100644 --- a/spec/openapi.yml +++ b/spec/openapi.yml @@ -475,7 +475,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/IncompleteRun' + $ref: '#/components/schemas/Run' /jobs/runs/{id}/facets: parameters: @@ -1734,20 +1734,20 @@ components: Run: type: object - allOf: + anyOf: - $ref: '#/components/schemas/IncompleteRun' - type: object properties: jobVersion: $ref: '#/components/schemas/JobVersionId' - inputVersions: + inputDatasetVersions: type: array items: - $ref: '#/components/schemas/DatasetVersionId' - outputVersions: + $ref: '#/components/schemas/InputDatasetVersion' + outputDatasetVersions: type: array items: - $ref: '#/components/schemas/DatasetVersionId' + $ref: '#/components/schemas/OutputDatasetVersion' context: description: A key/value pair that must be of type `string`. A context can be used for getting additional details about the job. type: object @@ -1955,4 +1955,32 @@ components: description: The ID associated with the run modifying the table. type: string facets: - $ref: '#/components/schemas/CustomFacet' \ No newline at end of file + $ref: '#/components/schemas/CustomFacet' + + InputDatasetVersion: + type: object + properties: + datasetVersionId: + $ref: '#/components/schemas/DatasetVersionId' + facets: + type: object + additionalProperties: + type: string + description: Dataset facets in run context, like `inputFacets`. + required: + - datasetVersionId + - facets + + OutputDatasetVersion: + type: object + properties: + datasetVersionId: + $ref: '#/components/schemas/DatasetVersionId' + facets: + type: object + additionalProperties: + type: string + description: Dataset facets in run context, like `outputFacets`. + required: + - datasetVersionId + - facets \ No newline at end of file diff --git a/web/src/components/lineage/components/edge/Edge.tsx b/web/src/components/lineage/components/edge/Edge.tsx index cb5e3b3114..983e0e2992 100644 --- a/web/src/components/lineage/components/edge/Edge.tsx +++ b/web/src/components/lineage/components/edge/Edge.tsx @@ -52,7 +52,7 @@ class Edge extends React.Component { data={edge.points} x={(d, index) => (index === 0 ? d.x + 20 : d.x - 25)} y={d => d.y} - stroke={edge.isSelected ? theme.palette.common.white : theme.palette.secondary.main} + stroke={edge.isSelected ? theme.palette.primary.main : theme.palette.secondary.main} strokeWidth={1} opacity={1} shapeRendering='geometricPrecision' @@ -66,7 +66,7 @@ class Edge extends React.Component { y={edge.y - ICON_SIZE / 2} width={ICON_SIZE} height={ICON_SIZE} - color={edge.isSelected ? theme.palette.common.white : theme.palette.secondary.main} + color={edge.isSelected ? theme.palette.primary.main : theme.palette.secondary.main} /> ))} diff --git a/web/src/components/lineage/components/node/Node.tsx b/web/src/components/lineage/components/node/Node.tsx index f76c3b494e..3c4c85a401 100644 --- a/web/src/components/lineage/components/node/Node.tsx +++ b/web/src/components/lineage/components/node/Node.tsx @@ -62,7 +62,7 @@ class Node extends React.Component { style={{ cursor: 'pointer' }} r={RADIUS} fill={isSelected ? theme.palette.secondary.main : theme.palette.common.white} - stroke={isSelected ? theme.palette.common.white : theme.palette.secondary.main} + stroke={isSelected ? theme.palette.primary.main : theme.palette.secondary.main} strokeWidth={BORDER / 2} cx={node.x} cy={node.y} @@ -76,7 +76,7 @@ class Node extends React.Component { height={ICON_SIZE} x={node.x - ICON_SIZE / 2} y={node.y - ICON_SIZE / 2} - color={isSelected ? theme.palette.common.white : theme.palette.secondary.main} + color={isSelected ? theme.palette.primary.main : theme.palette.secondary.main} /> ) : ( @@ -86,7 +86,7 @@ class Node extends React.Component { x={node.x - RADIUS} y={node.y - RADIUS} fill={isSelected ? theme.palette.secondary.main : theme.palette.common.white} - stroke={isSelected ? theme.palette.common.white : theme.palette.secondary.main} + stroke={isSelected ? theme.palette.primary.main: theme.palette.secondary.main} strokeWidth={BORDER / 2} width={RADIUS * 2} height={RADIUS * 2} @@ -109,7 +109,7 @@ class Node extends React.Component { height={ICON_SIZE} x={node.x - ICON_SIZE / 2} y={node.y - ICON_SIZE / 2} - color={isSelected ? theme.palette.common.white : theme.palette.secondary.main} + color={isSelected ? theme.palette.primary.main : theme.palette.secondary.main} /> )}