Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Column lineage point in time java client #2269

Merged
merged 2 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

* Column-lineage endpoints supports point-in-time requests [`#2265`](https://github.com/MarquezProject/marquez/pull/2265) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Enable requesting `column-lineage` endpoint by a dataset version, job version or dataset field of a specific dataset version.*
* Column lineage point in time java client [`#2269`](https://github.com/MarquezProject/marquez/pull/2269) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Java client methods to retrieve point in time `column-lineage`. Please note that existing methods `getColumnLineageByDataset`, `getColumnLineageByDataset` and `getColumnLineageByDatasetField` were replaced by a single `getColumnLineage` taking `NodeId` as a parameter.*

### Fixed

Expand Down
17 changes: 12 additions & 5 deletions api/src/test/java/marquez/ColumnLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
import java.util.Optional;
import marquez.api.JdbiUtils;
import marquez.client.MarquezClient;
import marquez.client.models.DatasetFieldId;
import marquez.client.models.DatasetId;
import marquez.client.models.JobId;
import marquez.client.models.Node;
import marquez.client.models.NodeId;
import marquez.db.LineageTestUtils;
import marquez.db.OpenLineageDao;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
Expand Down Expand Up @@ -64,7 +68,8 @@ public void tearDown(Jdbi jdbi) {

@Test
public void testColumnLineageEndpointByDataset() {
MarquezClient.Lineage lineage = client.getColumnLineageByDataset("namespace", "dataset_b");
MarquezClient.Lineage lineage =
client.getColumnLineage(NodeId.of(new DatasetId("namespace", "dataset_b")));

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
Expand All @@ -75,7 +80,7 @@ public void testColumnLineageEndpointByDataset() {
@Test
public void testColumnLineageEndpointByDatasetField() {
MarquezClient.Lineage lineage =
client.getColumnLineageByDataset("namespace", "dataset_b", "col_c");
client.getColumnLineage(NodeId.of(new DatasetFieldId("namespace", "dataset_b", "col_c")));

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
Expand All @@ -86,7 +91,8 @@ public void testColumnLineageEndpointByDatasetField() {
@Test
public void testColumnLineageEndpointWithDepthLimit() {
MarquezClient.Lineage lineage =
client.getColumnLineageByDatasetField("namespace", "dataset_c", "col_d", 1, false);
client.getColumnLineage(
NodeId.of(new DatasetFieldId("namespace", "dataset_c", "col_d")), 1, false);

assertThat(lineage.getGraph()).hasSize(2);
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
Expand All @@ -96,15 +102,16 @@ public void testColumnLineageEndpointWithDepthLimit() {
@Test
public void testColumnLineageEndpointWithDownstream() {
MarquezClient.Lineage lineage =
client.getColumnLineageByDatasetField("namespace", "dataset_b", "col_c", 10, true);
client.getColumnLineage(NodeId.of(new JobId("namespace", "job1")), 10, true);

assertThat(lineage.getGraph()).hasSize(4);
assertThat(getNodeByFieldName(lineage, "col_d")).isPresent();
}

@Test
public void testColumnLineageEndpointByJob() {
MarquezClient.Lineage lineage = client.getColumnLineageByJob("namespace", "job1");
MarquezClient.Lineage lineage =
client.getColumnLineage(NodeId.of(new JobId("namespace", "job1")), 1, false);

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
Expand Down
22 changes: 14 additions & 8 deletions api/src/test/java/marquez/service/models/NodeIdTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.UUID;
import marquez.common.models.DatasetFieldId;
import marquez.common.models.DatasetFieldVersionId;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.FieldName;
Expand Down Expand Up @@ -150,25 +152,29 @@ public void testDatasetField(String namespace, String dataset, String field) {
"gs://bucket$/path/to/data$col_A#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
},
delimiter = '$')
public void testDatasetFieldVersion(String namespace, String dataset, String field) {
public void testDatasetFieldVersion(String namespace, String dataset, String fieldWithVersion) {
String version = fieldWithVersion.split(VERSION_DELIM)[1];
String field = fieldWithVersion.split(VERSION_DELIM)[0];

NamespaceName namespaceName = NamespaceName.of(namespace);
FieldName fieldName = FieldName.of(field);
FieldName fieldName = FieldName.of(field.split(VERSION_DELIM)[0]);
DatasetName datasetName = DatasetName.of(dataset);
DatasetId dsId = new DatasetId(namespaceName, datasetName);
DatasetFieldId dsfId = new DatasetFieldId(dsId, fieldName);
DatasetFieldVersionId dsfId =
new DatasetFieldVersionId(dsId, fieldName, UUID.fromString(version));
NodeId nodeId = NodeId.of(dsfId);
assertFalse(nodeId.isRunType());
assertFalse(nodeId.isJobType());
assertFalse(nodeId.isDatasetType());
assertTrue(nodeId.hasVersion());
assertTrue(nodeId.isDatasetFieldVersionType());

assertEquals(dsfId, nodeId.asDatasetFieldId());
assertEquals(dsfId, nodeId.asDatasetFieldVersionId());
assertEquals(nodeId, NodeId.of(nodeId.getValue()));
assertEquals(namespace, nodeId.asDatasetFieldId().getDatasetId().getNamespace().getValue());
assertEquals(dataset, nodeId.asDatasetFieldId().getDatasetId().getName().getValue());
assertEquals(field, nodeId.asDatasetFieldId().getFieldName().getValue());
assertEquals(
field.split(VERSION_DELIM)[1], nodeId.asDatasetFieldVersionId().getVersion().toString());
namespace, nodeId.asDatasetFieldVersionId().getDatasetId().getNamespace().getValue());
assertEquals(dataset, nodeId.asDatasetFieldVersionId().getDatasetId().getName().getValue());
assertEquals(field, nodeId.asDatasetFieldVersionId().getFieldName().getValue());
assertEquals(version, nodeId.asDatasetFieldVersionId().getVersion().toString());
}
}
47 changes: 5 additions & 42 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import marquez.client.models.Namespace;
import marquez.client.models.NamespaceMeta;
import marquez.client.models.Node;
import marquez.client.models.NodeId;
import marquez.client.models.Run;
import marquez.client.models.RunMeta;
import marquez.client.models.RunState;
Expand Down Expand Up @@ -115,50 +116,12 @@ public enum SortDirection {
@Getter public final String value;
}

public Lineage getColumnLineageByDataset(
@NonNull String namespaceName, @NonNull String datasetName) {
return getColumnLineageByDataset(
namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineageByDataset(
@NonNull String namespaceName, @NonNull String datasetName, @NonNull String field) {
return getColumnLineageByDatasetField(
namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineageByDataset(
@NonNull String namespaceName,
@NonNull String datasetName,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(
url.toColumnLineageUrlByDataset(namespaceName, datasetName, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
public Lineage getColumnLineage(NodeId nodeId) {
return getColumnLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineageByDatasetField(
@NonNull String namespaceName,
@NonNull String datasetName,
@NonNull String field,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(
url.toColumnLineageUrlByDatasetField(
namespaceName, datasetName, field, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineageByJob(@NonNull String namespaceName, @NonNull String jobName) {
return getColumnLineageByJob(namespaceName, jobName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineageByJob(
@NonNull String namespaceName, @NonNull String jobName, int depth, boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrlByJob(namespaceName, jobName, depth, withDownstream));
public Lineage getColumnLineage(NodeId nodeId, int depth, boolean withDownstream) {
final String bodyAsJson = http.get(url.toColumnLineageUrl(nodeId, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

Expand Down
25 changes: 2 additions & 23 deletions clients/java/src/main/java/marquez/client/MarquezUrl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@
import java.util.Map;
import javax.annotation.Nullable;
import lombok.NonNull;
import marquez.client.models.DatasetFieldId;
import marquez.client.models.DatasetId;
import marquez.client.models.JobId;
import marquez.client.models.NodeId;
import marquez.client.models.RunState;
import marquez.client.models.SearchFilter;
Expand Down Expand Up @@ -211,27 +208,9 @@ URL toSearchUrl(
return from(searchPath(), queryParams.build());
}

URL toColumnLineageUrlByDatasetField(
String namespace, String dataset, String field, int depth, boolean withDownstream) {
URL toColumnLineageUrl(NodeId nodeId, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetFieldId(namespace, dataset, field)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrlByDataset(
String namespace, String dataset, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetId(namespace, dataset)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrlByJob(String namespace, String job, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new JobId(namespace, job)).getValue());
queryParams.put("nodeId", nodeId.getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

/** ID for {@code DatasetField} with a version of {@code Dataset}. */
@EqualsAndHashCode
@AllArgsConstructor
@ToString
public class DatasetFieldVersionId {

@Getter private final String namespace;
@Getter private final String name;
@Getter private final String field;
@Getter private final UUID version;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

/** Version ID for {@code Dataset}. */
@Value
@Builder
@AllArgsConstructor
public class DatasetVersionId {
@NonNull String namespace;
@NonNull String name;
@NonNull UUID version;
}
73 changes: 73 additions & 0 deletions clients/java/src/main/java/marquez/client/models/NodeId.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.util.StdConverter;
import com.google.common.base.Joiner;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -74,6 +76,31 @@ public static NodeId of(@NonNull JobId jobId) {
return of(ID_JOINER.join(ID_PREFX_JOB, jobId.getNamespace(), jobId.getName()));
}

public static NodeId of(@NonNull DatasetFieldVersionId datasetFieldVersionId) {
return of(
appendVersionTo(
ID_JOINER.join(
ID_PREFX_DATASET_FIELD,
datasetFieldVersionId.getNamespace(),
datasetFieldVersionId.getName(),
datasetFieldVersionId.getField()),
datasetFieldVersionId.getVersion()));
}

public static NodeId of(@NonNull JobVersionId jobVersionId) {
return NodeId.of(
new JobId(
jobVersionId.getNamespace(),
appendVersionTo(jobVersionId.getName(), jobVersionId.getVersion())));
}

public static NodeId of(@NonNull DatasetVersionId versionId) {
return NodeId.of(
new DatasetId(
versionId.getNamespace(),
appendVersionTo(versionId.getName(), versionId.getVersion())));
}

@JsonIgnore
public boolean isDatasetFieldType() {
return value.startsWith(ID_PREFX_DATASET_FIELD);
Expand All @@ -89,6 +116,26 @@ public boolean isJobType() {
return value.startsWith(ID_PREFX_JOB);
}

@JsonIgnore
public boolean isDatasetFieldVersionType() {
return value.startsWith(ID_PREFX_DATASET_FIELD) && hasVersion();
}

@JsonIgnore
public boolean isDatasetVersionType() {
return value.startsWith(ID_PREFX_DATASET) && hasVersion();
}

@JsonIgnore
public boolean isJobVersionType() {
return value.startsWith(ID_PREFX_JOB) && hasVersion();
}

@JsonIgnore
public boolean hasVersion() {
return value.contains(VERSION_DELIM);
}

@JsonIgnore
private String[] parts(int expectedParts, String expectedType) {

Expand Down Expand Up @@ -139,6 +186,28 @@ public JobId asJobId() {
return new JobId(parts[1], parts[2]);
}

@JsonIgnore
public DatasetFieldVersionId asDatasetFieldVersionId() {
String[] parts = parts(4, ID_PREFX_DATASET_FIELD);
String[] nameAndVersion = parts[3].split(VERSION_DELIM);
return new DatasetFieldVersionId(
parts[1], parts[2], nameAndVersion[0], UUID.fromString(nameAndVersion[1]));
}

@JsonIgnore
public JobVersionId asJobVersionId() {
String[] parts = parts(3, ID_PREFX_JOB);
String[] nameAndVersion = parts[2].split(VERSION_DELIM);
return new JobVersionId(parts[1], nameAndVersion[0], UUID.fromString(nameAndVersion[1]));
}

@JsonIgnore
public DatasetVersionId asDatasetVersionId() {
String[] parts = parts(3, ID_PREFX_DATASET);
String[] nameAndVersion = parts[2].split(VERSION_DELIM);
return new DatasetVersionId(parts[1], nameAndVersion[0], UUID.fromString(nameAndVersion[1]));
}

public static class FromValue extends StdConverter<String, NodeId> {
@Override
public NodeId convert(@NonNull String value) {
Expand All @@ -157,4 +226,8 @@ public String convert(@NonNull NodeId id) {
public int compareTo(NodeId o) {
return value.compareTo(o.getValue());
}

private static String appendVersionTo(@NonNull final String value, @Nullable final UUID version) {
return (version == null) ? value : (value + VERSION_DELIM + version);
}
}
Loading