Skip to content

Commit

Permalink
Merge branch 'main' into fix/runstatecolor
Browse files Browse the repository at this point in the history
  • Loading branch information
perttus authored Apr 17, 2023
2 parents 27ad949 + cf0ba3e commit 135553e
Show file tree
Hide file tree
Showing 31 changed files with 834 additions and 140 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.32.0...HEAD)

### Fixed

* UI: better handling of null job latestRun for Jobs page [#2467](https://github.com/MarquezProject/marquez/pull/2467) [@perttus](https://github.com/perttus)

### Added

* Support `inputFacets` and `outputFacets` from Openlineage specificatio [`#2417`](https://github.com/MarquezProject/marquez/pull/2417) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Adds the ability to store `inputFacets` / `outputFacets` which are sent within datasets.*
*Expose them through Marquez API as a member of `Run` resource.*

## [0.32.0](https://github.com/MarquezProject/marquez/compare/0.31.0...0.32.0) - 2023-03-20

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion COMMITTERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ They take responsibility for guiding new pull requests into the main branch.
| Michael Robinson | [@merobi-hub](https://github.com/merobi-hub) |
| Ross Turk | [@rossturk](https://github.com/rossturk) |
| Minkyu Park | [@fm100](https://github.com/fm100) |
| Pawel Leszczynski | [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) |
| Paweł Leszczyński | [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) |
## Emeritus

Expand Down
34 changes: 34 additions & 0 deletions api/src/main/java/marquez/common/models/InputDatasetVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;

/**
* Class used to store dataset version and `inputFacets` which are assigned to datasets within
* OpenLineage spec, but are exposed within Marquez api as a part of {@link
* marquez.service.models.Run}
*/
@EqualsAndHashCode
@ToString
@Getter
public class InputDatasetVersion {

private final DatasetVersionId datasetVersionId;
private final ImmutableMap<String, Object> facets;

public InputDatasetVersion(
@JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId,
@JsonProperty("facets") @NonNull ImmutableMap<String, Object> facets) {
this.datasetVersionId = datasetVersionId;
this.facets = facets;
}
}
34 changes: 34 additions & 0 deletions api/src/main/java/marquez/common/models/OutputDatasetVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;

/**
* Class used to store dataset version and `outputFacets` which are assigned to datasets within
* OpenLineage spec, but are exposed within Marquez api as a part of {@link
* marquez.service.models.Run}
*/
@EqualsAndHashCode
@ToString
@Getter
public class OutputDatasetVersion {

private final DatasetVersionId datasetVersionId;
private final ImmutableMap<String, Object> facets;

public OutputDatasetVersion(
@JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId,
@JsonProperty("facets") @NonNull ImmutableMap<String, Object> facets) {
this.datasetVersionId = datasetVersionId;
this.facets = facets;
}
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private Columns() {}
public static final String NAMESPACE_NAME = "namespace_name";
public static final String DATASET_NAME = "dataset_name";
public static final String FACETS = "facets";
public static final String DATASET_FACETS = "dataset_facets";
public static final String TAGS = "tags";
public static final String IS_HIDDEN = "is_hidden";

Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ LEFT JOIN (
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL
WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
Expand Down Expand Up @@ -134,7 +134,7 @@ LEFT JOIN (
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL
WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE d.namespace_name = :namespaceName
Expand Down
52 changes: 52 additions & 0 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,58 @@ default void insertDatasetFacetsFor(
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

default void insertInputDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.InputDatasetFacets inputFacets) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(inputFacets);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertDatasetFacet(
now,
datasetUuid,
datasetVersionUuid,
runUuid,
lineageEventTime,
lineageEventType,
Type.INPUT,
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

default void insertOutputDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.OutputDatasetFacets outputFacets) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(outputFacets);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertDatasetFacet(
now,
datasetUuid,
datasetVersionUuid,
runUuid,
lineageEventTime,
lineageEventType,
Type.OUTPUT,
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

record DatasetFacetRow(
Instant createdAt,
UUID datasetUuid,
Expand Down
49 changes: 32 additions & 17 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,19 @@ default void updateDatasetVersionMetric(

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.version = :version
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM dataset_versions dv
FROM selected_dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -169,21 +178,28 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.version = :version
""")
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
Optional<DatasetVersion> findBy(UUID version);

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.uuid = :uuid
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM dataset_versions dv
FROM selected_dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -192,14 +208,12 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.uuid = :uuid
""")
LEFT JOIN (
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
Optional<DatasetVersion> findByUuid(UUID uuid);

default Optional<DatasetVersion> findByWithRun(UUID version) {
Expand Down Expand Up @@ -246,6 +260,7 @@ LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
WHERE (type ILIKE 'dataset' OR type ILIKE 'unknown')
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.namespace_name = :namespaceName
Expand Down
70 changes: 39 additions & 31 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,45 @@ public interface LineageDao {
*/
@SqlQuery(
"""
WITH RECURSIVE
job_io AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
ARRAY_AGG(DISTINCT j.uuid) AS ids,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
FROM jobs j
LEFT JOIN jobs_view s On s.uuid=j.symlink_target_uuid
LEFT JOIN job_versions v on v.uuid=COALESCE(s.current_version_uuid, j.current_version_uuid)
LEFT JOIN job_versions_io_mapping io ON io.job_version_uuid=v.uuid
GROUP BY COALESCE(j.symlink_target_uuid, j.uuid)
),
lineage(job_uuid, inputs, outputs) AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM jobs_view j
INNER JOIN job_io io ON j.uuid=ANY(io.ids)
WHERE io.ids && ARRAY[<jobIds>]::uuid[]
UNION
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io,
lineage l
WHERE io.job_uuid != l.job_uuid AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context
FROM lineage l2
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid
LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid;
WITH RECURSIVE
-- Find the current version of a job or its symlink target if the target has no
-- current_version_uuid. This ensures that we don't lose lineage for a job after it is
-- symlinked to another job but before that target job has run successfully.
job_current_version AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid
FROM jobs j
LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid
WHERE s.current_version_uuid IS NULL
),
job_io AS (
SELECT j.job_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid
GROUP BY j.job_uuid
),
lineage(job_uuid, inputs, outputs) AS (
SELECT v.job_uuid AS job_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM jobs j
INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid
LEFT JOIN job_io io ON io.job_uuid=v.job_uuid
WHERE j.uuid IN (<jobIds>) OR j.symlink_target_uuid IN (<jobIds>)
UNION
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io,
lineage l
WHERE io.job_uuid != l.job_uuid AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context
FROM lineage l2
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid
LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid;
""")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

Expand Down
24 changes: 24 additions & 0 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
now,
event.getEventType(),
facets));

// InputFacets ...
Optional.ofNullable(dataset.getInputFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertInputDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
runUuid,
now,
event.getEventType(),
facets));
}
}
bag.setInputs(Optional.ofNullable(datasetInputs));
Expand Down Expand Up @@ -314,6 +326,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
now,
event.getEventType(),
facets));

// OutputFacets ...
Optional.ofNullable(dataset.getOutputFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertOutputDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
runUuid,
now,
event.getEventType(),
facets));
}
}

Expand Down
Loading

0 comments on commit 135553e

Please sign in to comment.