Skip to content

Commit

Permalink
java client: point-in-time for column-level lineage
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski committed Dec 12, 2022
1 parent 68fcb96 commit 4036507
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 185 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

* Column-lineage endpoints supports point-in-time requests [`#2265`](https://github.com/MarquezProject/marquez/pull/2265) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Enable requesting `column-lineage` endpoint by a dataset version, job version or dataset field of a specific dataset version.*
* Column lineage point in time java client [`#2269`](https://github.com/MarquezProject/marquez/pull/2269) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Java client methods to retrieve point in time `column-lineage`. Please note that existing methods `getColumnLineageByDataset`, `getColumnLineageByDataset` and `getColumnLineageByDatasetField` were replaced by a single `getColumnLineage` taking `NodeId` as a parameter.*

### Fixed

Expand Down
17 changes: 12 additions & 5 deletions api/src/test/java/marquez/ColumnLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
import java.util.Optional;
import marquez.api.JdbiUtils;
import marquez.client.MarquezClient;
import marquez.client.models.DatasetFieldId;
import marquez.client.models.DatasetId;
import marquez.client.models.JobId;
import marquez.client.models.Node;
import marquez.client.models.NodeId;
import marquez.db.LineageTestUtils;
import marquez.db.OpenLineageDao;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
Expand Down Expand Up @@ -64,7 +68,8 @@ public void tearDown(Jdbi jdbi) {

@Test
public void testColumnLineageEndpointByDataset() {
MarquezClient.Lineage lineage = client.getColumnLineageByDataset("namespace", "dataset_b");
MarquezClient.Lineage lineage =
client.getColumnLineage(NodeId.of(new DatasetId("namespace", "dataset_b")));

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
Expand All @@ -75,7 +80,7 @@ public void testColumnLineageEndpointByDataset() {
@Test
public void testColumnLineageEndpointByDatasetField() {
MarquezClient.Lineage lineage =
client.getColumnLineageByDataset("namespace", "dataset_b", "col_c");
client.getColumnLineage(NodeId.of(new DatasetFieldId("namespace", "dataset_b", "col_c")));

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
Expand All @@ -86,7 +91,8 @@ public void testColumnLineageEndpointByDatasetField() {
@Test
public void testColumnLineageEndpointWithDepthLimit() {
MarquezClient.Lineage lineage =
client.getColumnLineageByDatasetField("namespace", "dataset_c", "col_d", 1, false);
client.getColumnLineage(
NodeId.of(new DatasetFieldId("namespace", "dataset_c", "col_d")), 1, false);

assertThat(lineage.getGraph()).hasSize(2);
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
Expand All @@ -96,15 +102,16 @@ public void testColumnLineageEndpointWithDepthLimit() {
@Test
public void testColumnLineageEndpointWithDownstream() {
MarquezClient.Lineage lineage =
client.getColumnLineageByDatasetField("namespace", "dataset_b", "col_c", 10, true);
client.getColumnLineage(NodeId.of(new JobId("namespace", "job1")), 10, true);

assertThat(lineage.getGraph()).hasSize(4);
assertThat(getNodeByFieldName(lineage, "col_d")).isPresent();
}

@Test
public void testColumnLineageEndpointByJob() {
MarquezClient.Lineage lineage = client.getColumnLineageByJob("namespace", "job1");
MarquezClient.Lineage lineage =
client.getColumnLineage(NodeId.of(new JobId("namespace", "job1")), 1, false);

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
Expand Down
22 changes: 14 additions & 8 deletions api/src/test/java/marquez/service/models/NodeIdTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.UUID;
import marquez.common.models.DatasetFieldId;
import marquez.common.models.DatasetFieldVersionId;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.FieldName;
Expand Down Expand Up @@ -150,25 +152,29 @@ public void testDatasetField(String namespace, String dataset, String field) {
"gs://bucket$/path/to/data$col_A#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
},
delimiter = '$')
public void testDatasetFieldVersion(String namespace, String dataset, String field) {
public void testDatasetFieldVersion(String namespace, String dataset, String fieldWithVersion) {
String version = fieldWithVersion.split(VERSION_DELIM)[1];
String field = fieldWithVersion.split(VERSION_DELIM)[0];

NamespaceName namespaceName = NamespaceName.of(namespace);
FieldName fieldName = FieldName.of(field);
FieldName fieldName = FieldName.of(field.split(VERSION_DELIM)[0]);
DatasetName datasetName = DatasetName.of(dataset);
DatasetId dsId = new DatasetId(namespaceName, datasetName);
DatasetFieldId dsfId = new DatasetFieldId(dsId, fieldName);
DatasetFieldVersionId dsfId =
new DatasetFieldVersionId(dsId, fieldName, UUID.fromString(version));
NodeId nodeId = NodeId.of(dsfId);
assertFalse(nodeId.isRunType());
assertFalse(nodeId.isJobType());
assertFalse(nodeId.isDatasetType());
assertTrue(nodeId.hasVersion());
assertTrue(nodeId.isDatasetFieldVersionType());

assertEquals(dsfId, nodeId.asDatasetFieldId());
assertEquals(dsfId, nodeId.asDatasetFieldVersionId());
assertEquals(nodeId, NodeId.of(nodeId.getValue()));
assertEquals(namespace, nodeId.asDatasetFieldId().getDatasetId().getNamespace().getValue());
assertEquals(dataset, nodeId.asDatasetFieldId().getDatasetId().getName().getValue());
assertEquals(field, nodeId.asDatasetFieldId().getFieldName().getValue());
assertEquals(
field.split(VERSION_DELIM)[1], nodeId.asDatasetFieldVersionId().getVersion().toString());
namespace, nodeId.asDatasetFieldVersionId().getDatasetId().getNamespace().getValue());
assertEquals(dataset, nodeId.asDatasetFieldVersionId().getDatasetId().getName().getValue());
assertEquals(field, nodeId.asDatasetFieldVersionId().getFieldName().getValue());
assertEquals(version, nodeId.asDatasetFieldVersionId().getVersion().toString());
}
}
47 changes: 5 additions & 42 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import marquez.client.models.Namespace;
import marquez.client.models.NamespaceMeta;
import marquez.client.models.Node;
import marquez.client.models.NodeId;
import marquez.client.models.Run;
import marquez.client.models.RunMeta;
import marquez.client.models.RunState;
Expand Down Expand Up @@ -115,50 +116,12 @@ public enum SortDirection {
@Getter public final String value;
}

public Lineage getColumnLineageByDataset(
@NonNull String namespaceName, @NonNull String datasetName) {
return getColumnLineageByDataset(
namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineageByDataset(
@NonNull String namespaceName, @NonNull String datasetName, @NonNull String field) {
return getColumnLineageByDatasetField(
namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineageByDataset(
@NonNull String namespaceName,
@NonNull String datasetName,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(
url.toColumnLineageUrlByDataset(namespaceName, datasetName, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
public Lineage getColumnLineage(NodeId nodeId) {
return getColumnLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineageByDatasetField(
@NonNull String namespaceName,
@NonNull String datasetName,
@NonNull String field,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(
url.toColumnLineageUrlByDatasetField(
namespaceName, datasetName, field, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineageByJob(@NonNull String namespaceName, @NonNull String jobName) {
return getColumnLineageByJob(namespaceName, jobName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineageByJob(
@NonNull String namespaceName, @NonNull String jobName, int depth, boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrlByJob(namespaceName, jobName, depth, withDownstream));
public Lineage getColumnLineage(NodeId nodeId, int depth, boolean withDownstream) {
final String bodyAsJson = http.get(url.toColumnLineageUrl(nodeId, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

Expand Down
25 changes: 2 additions & 23 deletions clients/java/src/main/java/marquez/client/MarquezUrl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@
import java.util.Map;
import javax.annotation.Nullable;
import lombok.NonNull;
import marquez.client.models.DatasetFieldId;
import marquez.client.models.DatasetId;
import marquez.client.models.JobId;
import marquez.client.models.NodeId;
import marquez.client.models.RunState;
import marquez.client.models.SearchFilter;
Expand Down Expand Up @@ -211,27 +208,9 @@ URL toSearchUrl(
return from(searchPath(), queryParams.build());
}

URL toColumnLineageUrlByDatasetField(
String namespace, String dataset, String field, int depth, boolean withDownstream) {
URL toColumnLineageUrl(NodeId nodeId, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetFieldId(namespace, dataset, field)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrlByDataset(
String namespace, String dataset, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetId(namespace, dataset)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrlByJob(String namespace, String job, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new JobId(namespace, job)).getValue());
queryParams.put("nodeId", nodeId.getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

/** ID for {@code DatasetField} with a version of {@code Dataset}. */
@EqualsAndHashCode
@AllArgsConstructor
@ToString
public class DatasetFieldVersionId {

@Getter private final String namespace;
@Getter private final String name;
@Getter private final String field;
@Getter private final UUID version;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

/** Version ID for {@code Dataset}. */
@Value
@Builder
@AllArgsConstructor
public class DatasetVersionId {
@NonNull String namespace;
@NonNull String name;
@NonNull UUID version;
}
73 changes: 73 additions & 0 deletions clients/java/src/main/java/marquez/client/models/NodeId.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.util.StdConverter;
import com.google.common.base.Joiner;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -74,6 +76,31 @@ public static NodeId of(@NonNull JobId jobId) {
return of(ID_JOINER.join(ID_PREFX_JOB, jobId.getNamespace(), jobId.getName()));
}

public static NodeId of(@NonNull DatasetFieldVersionId datasetFieldVersionId) {
return of(
appendVersionTo(
ID_JOINER.join(
ID_PREFX_DATASET_FIELD,
datasetFieldVersionId.getNamespace(),
datasetFieldVersionId.getName(),
datasetFieldVersionId.getField()),
datasetFieldVersionId.getVersion()));
}

public static NodeId of(@NonNull JobVersionId jobVersionId) {
return NodeId.of(
new JobId(
jobVersionId.getNamespace(),
appendVersionTo(jobVersionId.getName(), jobVersionId.getVersion())));
}

public static NodeId of(@NonNull DatasetVersionId versionId) {
return NodeId.of(
new DatasetId(
versionId.getNamespace(),
appendVersionTo(versionId.getName(), versionId.getVersion())));
}

@JsonIgnore
public boolean isDatasetFieldType() {
return value.startsWith(ID_PREFX_DATASET_FIELD);
Expand All @@ -89,6 +116,26 @@ public boolean isJobType() {
return value.startsWith(ID_PREFX_JOB);
}

@JsonIgnore
public boolean isDatasetFieldVersionType() {
return value.startsWith(ID_PREFX_DATASET_FIELD) && hasVersion();
}

@JsonIgnore
public boolean isDatasetVersionType() {
return value.startsWith(ID_PREFX_DATASET) && hasVersion();
}

@JsonIgnore
public boolean isJobVersionType() {
return value.startsWith(ID_PREFX_JOB) && hasVersion();
}

@JsonIgnore
public boolean hasVersion() {
return value.contains(VERSION_DELIM);
}

@JsonIgnore
private String[] parts(int expectedParts, String expectedType) {

Expand Down Expand Up @@ -139,6 +186,28 @@ public JobId asJobId() {
return new JobId(parts[1], parts[2]);
}

@JsonIgnore
public DatasetFieldVersionId asDatasetFieldVersionId() {
String[] parts = parts(4, ID_PREFX_DATASET_FIELD);
String[] nameAndVersion = parts[3].split(VERSION_DELIM);
return new DatasetFieldVersionId(
parts[1], parts[2], nameAndVersion[0], UUID.fromString(nameAndVersion[1]));
}

@JsonIgnore
public JobVersionId asJobVersionId() {
String[] parts = parts(3, ID_PREFX_JOB);
String[] nameAndVersion = parts[2].split(VERSION_DELIM);
return new JobVersionId(parts[1], nameAndVersion[0], UUID.fromString(nameAndVersion[1]));
}

@JsonIgnore
public DatasetVersionId asDatasetVersionId() {
String[] parts = parts(3, ID_PREFX_DATASET);
String[] nameAndVersion = parts[2].split(VERSION_DELIM);
return new DatasetVersionId(parts[1], nameAndVersion[0], UUID.fromString(nameAndVersion[1]));
}

public static class FromValue extends StdConverter<String, NodeId> {
@Override
public NodeId convert(@NonNull String value) {
Expand All @@ -157,4 +226,8 @@ public String convert(@NonNull NodeId id) {
public int compareTo(NodeId o) {
return value.compareTo(o.getValue());
}

private static String appendVersionTo(@NonNull final String value, @Nullable final UUID version) {
return (version == null) ? value : (value + VERSION_DELIM + version);
}
}
Loading

0 comments on commit 4036507

Please sign in to comment.