diff --git a/CHANGELOG.md b/CHANGELOG.md index c23deeca8b..aba9f74b12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ * Column-lineage endpoints supports point-in-time requests [`#2265`](https://github.com/MarquezProject/marquez/pull/2265) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) *Enable requesting `column-lineage` endpoint by a dataset version, job version or dataset field of a specific dataset version.* +* Column lineage point in time java client [`#2269`](https://github.com/MarquezProject/marquez/pull/2269) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) + *Java client methods to retrieve point in time `column-lineage`. Please note that existing methods `getColumnLineageByDataset`, `getColumnLineageByDataset` and `getColumnLineageByDatasetField` were replaced by a single `getColumnLineage` taking `NodeId` as a parameter.* ### Fixed diff --git a/api/src/test/java/marquez/ColumnLineageIntegrationTest.java b/api/src/test/java/marquez/ColumnLineageIntegrationTest.java index 91d0c886fd..bbdc8e2ae5 100644 --- a/api/src/test/java/marquez/ColumnLineageIntegrationTest.java +++ b/api/src/test/java/marquez/ColumnLineageIntegrationTest.java @@ -14,7 +14,11 @@ import java.util.Optional; import marquez.api.JdbiUtils; import marquez.client.MarquezClient; +import marquez.client.models.DatasetFieldId; +import marquez.client.models.DatasetId; +import marquez.client.models.JobId; import marquez.client.models.Node; +import marquez.client.models.NodeId; import marquez.db.LineageTestUtils; import marquez.db.OpenLineageDao; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; @@ -64,7 +68,8 @@ public void tearDown(Jdbi jdbi) { @Test public void testColumnLineageEndpointByDataset() { - MarquezClient.Lineage lineage = client.getColumnLineageByDataset("namespace", "dataset_b"); + MarquezClient.Lineage lineage = + client.getColumnLineage(NodeId.of(new DatasetId("namespace", "dataset_b"))); assertThat(lineage.getGraph()).hasSize(3); assertThat(getNodeByFieldName(lineage, "col_a")).isPresent(); @@ -75,7 +80,7 @@ public void testColumnLineageEndpointByDataset() { @Test public void testColumnLineageEndpointByDatasetField() { MarquezClient.Lineage lineage = - client.getColumnLineageByDataset("namespace", "dataset_b", "col_c"); + client.getColumnLineage(NodeId.of(new DatasetFieldId("namespace", "dataset_b", "col_c"))); assertThat(lineage.getGraph()).hasSize(3); assertThat(getNodeByFieldName(lineage, "col_a")).isPresent(); @@ -86,7 +91,8 @@ public void testColumnLineageEndpointByDatasetField() { @Test public void testColumnLineageEndpointWithDepthLimit() { MarquezClient.Lineage lineage = - client.getColumnLineageByDatasetField("namespace", "dataset_c", "col_d", 1, false); + client.getColumnLineage( + NodeId.of(new DatasetFieldId("namespace", "dataset_c", "col_d")), 1, false); assertThat(lineage.getGraph()).hasSize(2); assertThat(getNodeByFieldName(lineage, "col_c")).isPresent(); @@ -96,7 +102,7 @@ public void testColumnLineageEndpointWithDepthLimit() { @Test public void testColumnLineageEndpointWithDownstream() { MarquezClient.Lineage lineage = - client.getColumnLineageByDatasetField("namespace", "dataset_b", "col_c", 10, true); + client.getColumnLineage(NodeId.of(new JobId("namespace", "job1")), 10, true); assertThat(lineage.getGraph()).hasSize(4); assertThat(getNodeByFieldName(lineage, "col_d")).isPresent(); @@ -104,7 +110,8 @@ public void testColumnLineageEndpointWithDownstream() { @Test public void testColumnLineageEndpointByJob() { - MarquezClient.Lineage lineage = client.getColumnLineageByJob("namespace", "job1"); + MarquezClient.Lineage lineage = + client.getColumnLineage(NodeId.of(new JobId("namespace", "job1")), 1, false); assertThat(lineage.getGraph()).hasSize(3); assertThat(getNodeByFieldName(lineage, "col_a")).isPresent(); diff --git a/api/src/test/java/marquez/service/models/NodeIdTest.java b/api/src/test/java/marquez/service/models/NodeIdTest.java index a65b76dbd1..7162af0aad 100644 --- a/api/src/test/java/marquez/service/models/NodeIdTest.java +++ b/api/src/test/java/marquez/service/models/NodeIdTest.java @@ -10,7 +10,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.UUID; import marquez.common.models.DatasetFieldId; +import marquez.common.models.DatasetFieldVersionId; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.FieldName; @@ -150,12 +152,16 @@ public void testDatasetField(String namespace, String dataset, String field) { "gs://bucket$/path/to/data$col_A#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" }, delimiter = '$') - public void testDatasetFieldVersion(String namespace, String dataset, String field) { + public void testDatasetFieldVersion(String namespace, String dataset, String fieldWithVersion) { + String version = fieldWithVersion.split(VERSION_DELIM)[1]; + String field = fieldWithVersion.split(VERSION_DELIM)[0]; + NamespaceName namespaceName = NamespaceName.of(namespace); - FieldName fieldName = FieldName.of(field); + FieldName fieldName = FieldName.of(field.split(VERSION_DELIM)[0]); DatasetName datasetName = DatasetName.of(dataset); DatasetId dsId = new DatasetId(namespaceName, datasetName); - DatasetFieldId dsfId = new DatasetFieldId(dsId, fieldName); + DatasetFieldVersionId dsfId = + new DatasetFieldVersionId(dsId, fieldName, UUID.fromString(version)); NodeId nodeId = NodeId.of(dsfId); assertFalse(nodeId.isRunType()); assertFalse(nodeId.isJobType()); @@ -163,12 +169,12 @@ public void testDatasetFieldVersion(String namespace, String dataset, String fie assertTrue(nodeId.hasVersion()); assertTrue(nodeId.isDatasetFieldVersionType()); - assertEquals(dsfId, nodeId.asDatasetFieldId()); + assertEquals(dsfId, nodeId.asDatasetFieldVersionId()); assertEquals(nodeId, NodeId.of(nodeId.getValue())); - assertEquals(namespace, nodeId.asDatasetFieldId().getDatasetId().getNamespace().getValue()); - assertEquals(dataset, nodeId.asDatasetFieldId().getDatasetId().getName().getValue()); - assertEquals(field, nodeId.asDatasetFieldId().getFieldName().getValue()); assertEquals( - field.split(VERSION_DELIM)[1], nodeId.asDatasetFieldVersionId().getVersion().toString()); + namespace, nodeId.asDatasetFieldVersionId().getDatasetId().getNamespace().getValue()); + assertEquals(dataset, nodeId.asDatasetFieldVersionId().getDatasetId().getName().getValue()); + assertEquals(field, nodeId.asDatasetFieldVersionId().getFieldName().getValue()); + assertEquals(version, nodeId.asDatasetFieldVersionId().getVersion().toString()); } } diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index 3972f868ac..3af0ed433e 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -44,6 +44,7 @@ import marquez.client.models.Namespace; import marquez.client.models.NamespaceMeta; import marquez.client.models.Node; +import marquez.client.models.NodeId; import marquez.client.models.Run; import marquez.client.models.RunMeta; import marquez.client.models.RunState; @@ -115,50 +116,12 @@ public enum SortDirection { @Getter public final String value; } - public Lineage getColumnLineageByDataset( - @NonNull String namespaceName, @NonNull String datasetName) { - return getColumnLineageByDataset( - namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false); - } - - public Lineage getColumnLineageByDataset( - @NonNull String namespaceName, @NonNull String datasetName, @NonNull String field) { - return getColumnLineageByDatasetField( - namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false); - } - - public Lineage getColumnLineageByDataset( - @NonNull String namespaceName, - @NonNull String datasetName, - int depth, - boolean withDownstream) { - final String bodyAsJson = - http.get( - url.toColumnLineageUrlByDataset(namespaceName, datasetName, depth, withDownstream)); - return Lineage.fromJson(bodyAsJson); + public Lineage getColumnLineage(NodeId nodeId) { + return getColumnLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH, false); } - public Lineage getColumnLineageByDatasetField( - @NonNull String namespaceName, - @NonNull String datasetName, - @NonNull String field, - int depth, - boolean withDownstream) { - final String bodyAsJson = - http.get( - url.toColumnLineageUrlByDatasetField( - namespaceName, datasetName, field, depth, withDownstream)); - return Lineage.fromJson(bodyAsJson); - } - - public Lineage getColumnLineageByJob(@NonNull String namespaceName, @NonNull String jobName) { - return getColumnLineageByJob(namespaceName, jobName, DEFAULT_LINEAGE_GRAPH_DEPTH, false); - } - - public Lineage getColumnLineageByJob( - @NonNull String namespaceName, @NonNull String jobName, int depth, boolean withDownstream) { - final String bodyAsJson = - http.get(url.toColumnLineageUrlByJob(namespaceName, jobName, depth, withDownstream)); + public Lineage getColumnLineage(NodeId nodeId, int depth, boolean withDownstream) { + final String bodyAsJson = http.get(url.toColumnLineageUrl(nodeId, depth, withDownstream)); return Lineage.fromJson(bodyAsJson); } diff --git a/clients/java/src/main/java/marquez/client/MarquezUrl.java b/clients/java/src/main/java/marquez/client/MarquezUrl.java index 242d85e907..29a704c753 100644 --- a/clients/java/src/main/java/marquez/client/MarquezUrl.java +++ b/clients/java/src/main/java/marquez/client/MarquezUrl.java @@ -42,9 +42,6 @@ import java.util.Map; import javax.annotation.Nullable; import lombok.NonNull; -import marquez.client.models.DatasetFieldId; -import marquez.client.models.DatasetId; -import marquez.client.models.JobId; import marquez.client.models.NodeId; import marquez.client.models.RunState; import marquez.client.models.SearchFilter; @@ -211,27 +208,9 @@ URL toSearchUrl( return from(searchPath(), queryParams.build()); } - URL toColumnLineageUrlByDatasetField( - String namespace, String dataset, String field, int depth, boolean withDownstream) { + URL toColumnLineageUrl(NodeId nodeId, int depth, boolean withDownstream) { final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); - queryParams.put("nodeId", NodeId.of(new DatasetFieldId(namespace, dataset, field)).getValue()); - queryParams.put("depth", String.valueOf(depth)); - queryParams.put("withDownstream", String.valueOf(withDownstream)); - return from(columnLineagePath(), queryParams.build()); - } - - URL toColumnLineageUrlByDataset( - String namespace, String dataset, int depth, boolean withDownstream) { - final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); - queryParams.put("nodeId", NodeId.of(new DatasetId(namespace, dataset)).getValue()); - queryParams.put("depth", String.valueOf(depth)); - queryParams.put("withDownstream", String.valueOf(withDownstream)); - return from(columnLineagePath(), queryParams.build()); - } - - URL toColumnLineageUrlByJob(String namespace, String job, int depth, boolean withDownstream) { - final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); - queryParams.put("nodeId", NodeId.of(new JobId(namespace, job)).getValue()); + queryParams.put("nodeId", nodeId.getValue()); queryParams.put("depth", String.valueOf(depth)); queryParams.put("withDownstream", String.valueOf(withDownstream)); return from(columnLineagePath(), queryParams.build()); diff --git a/clients/java/src/main/java/marquez/client/models/DatasetFieldVersionId.java b/clients/java/src/main/java/marquez/client/models/DatasetFieldVersionId.java new file mode 100644 index 0000000000..8e6a744c82 --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/DatasetFieldVersionId.java @@ -0,0 +1,24 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +/** ID for {@code DatasetField} with a version of {@code Dataset}. */ +@EqualsAndHashCode +@AllArgsConstructor +@ToString +public class DatasetFieldVersionId { + + @Getter private final String namespace; + @Getter private final String name; + @Getter private final String field; + @Getter private final UUID version; +} diff --git a/clients/java/src/main/java/marquez/client/models/DatasetVersionId.java b/clients/java/src/main/java/marquez/client/models/DatasetVersionId.java new file mode 100644 index 0000000000..cc468efd4e --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/DatasetVersionId.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +/** Version ID for {@code Dataset}. */ +@Value +@Builder +@AllArgsConstructor +public class DatasetVersionId { + @NonNull String namespace; + @NonNull String name; + @NonNull UUID version; +} diff --git a/clients/java/src/main/java/marquez/client/models/NodeId.java b/clients/java/src/main/java/marquez/client/models/NodeId.java index c7235ec667..c655903ff1 100644 --- a/clients/java/src/main/java/marquez/client/models/NodeId.java +++ b/clients/java/src/main/java/marquez/client/models/NodeId.java @@ -12,8 +12,10 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.util.StdConverter; import com.google.common.base.Joiner; +import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.annotation.Nullable; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; @@ -74,6 +76,31 @@ public static NodeId of(@NonNull JobId jobId) { return of(ID_JOINER.join(ID_PREFX_JOB, jobId.getNamespace(), jobId.getName())); } + public static NodeId of(@NonNull DatasetFieldVersionId datasetFieldVersionId) { + return of( + appendVersionTo( + ID_JOINER.join( + ID_PREFX_DATASET_FIELD, + datasetFieldVersionId.getNamespace(), + datasetFieldVersionId.getName(), + datasetFieldVersionId.getField()), + datasetFieldVersionId.getVersion())); + } + + public static NodeId of(@NonNull JobVersionId jobVersionId) { + return NodeId.of( + new JobId( + jobVersionId.getNamespace(), + appendVersionTo(jobVersionId.getName(), jobVersionId.getVersion()))); + } + + public static NodeId of(@NonNull DatasetVersionId versionId) { + return NodeId.of( + new DatasetId( + versionId.getNamespace(), + appendVersionTo(versionId.getName(), versionId.getVersion()))); + } + @JsonIgnore public boolean isDatasetFieldType() { return value.startsWith(ID_PREFX_DATASET_FIELD); @@ -89,6 +116,26 @@ public boolean isJobType() { return value.startsWith(ID_PREFX_JOB); } + @JsonIgnore + public boolean isDatasetFieldVersionType() { + return value.startsWith(ID_PREFX_DATASET_FIELD) && hasVersion(); + } + + @JsonIgnore + public boolean isDatasetVersionType() { + return value.startsWith(ID_PREFX_DATASET) && hasVersion(); + } + + @JsonIgnore + public boolean isJobVersionType() { + return value.startsWith(ID_PREFX_JOB) && hasVersion(); + } + + @JsonIgnore + public boolean hasVersion() { + return value.contains(VERSION_DELIM); + } + @JsonIgnore private String[] parts(int expectedParts, String expectedType) { @@ -139,6 +186,28 @@ public JobId asJobId() { return new JobId(parts[1], parts[2]); } + @JsonIgnore + public DatasetFieldVersionId asDatasetFieldVersionId() { + String[] parts = parts(4, ID_PREFX_DATASET_FIELD); + String[] nameAndVersion = parts[3].split(VERSION_DELIM); + return new DatasetFieldVersionId( + parts[1], parts[2], nameAndVersion[0], UUID.fromString(nameAndVersion[1])); + } + + @JsonIgnore + public JobVersionId asJobVersionId() { + String[] parts = parts(3, ID_PREFX_JOB); + String[] nameAndVersion = parts[2].split(VERSION_DELIM); + return new JobVersionId(parts[1], nameAndVersion[0], UUID.fromString(nameAndVersion[1])); + } + + @JsonIgnore + public DatasetVersionId asDatasetVersionId() { + String[] parts = parts(3, ID_PREFX_DATASET); + String[] nameAndVersion = parts[2].split(VERSION_DELIM); + return new DatasetVersionId(parts[1], nameAndVersion[0], UUID.fromString(nameAndVersion[1])); + } + public static class FromValue extends StdConverter { @Override public NodeId convert(@NonNull String value) { @@ -157,4 +226,8 @@ public String convert(@NonNull NodeId id) { public int compareTo(NodeId o) { return value.compareTo(o.getValue()); } + + private static String appendVersionTo(@NonNull final String value, @Nullable final UUID version) { + return (version == null) ? value : (value + VERSION_DELIM + version); + } } diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index a9ba5c54f5..a041ce310a 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -393,6 +393,34 @@ public class MarquezClientTest { private static final DatasetFieldId DATASET_FIELD_ID = new DatasetFieldId(NAMESPACE_NAME, DB_TABLE_NAME, FIELD_NAME); + private static final DatasetFieldId DATASET_FIELD_VERSION_ID = + new DatasetFieldId(NAMESPACE_NAME, DB_TABLE_NAME, FIELD_NAME); + + private static final Node LINEAGE_NODE = + new Node( + NodeId.of(DATASET_FIELD_ID), + NodeType.DATASET_FIELD, + new ColumnLineageNodeData( + NAMESPACE_NAME, + DB_TABLE_NAME, + FIELD_NAME, + "String", + Collections.singletonList( + new ColumnLineageInputField( + "namespace", + "inDataset", + "some-col1", + "transformationDescription", + "transformationType"))), + ImmutableSet.of( + Edge.of( + NodeId.of(DATASET_FIELD_ID), + NodeId.of(new DatasetFieldId("namespace", "inDataset", "some-col1")))), + ImmutableSet.of( + Edge.of( + NodeId.of(new DatasetFieldId("namespace", "outDataset", "some-col2")), + NodeId.of(DATASET_FIELD_ID)))); + private final MarquezUrl marquezUrl = MarquezUrl.create(DEFAULT_BASE_URL); @Mock private MarquezHttp http; private MarquezClient client; @@ -955,31 +983,7 @@ public void testCreateTag() throws Exception { @Test public void testGetColumnLineage() throws Exception { - Node node = - new Node( - NodeId.of(DATASET_FIELD_ID), - NodeType.DATASET_FIELD, - new ColumnLineageNodeData( - NAMESPACE_NAME, - DB_TABLE_NAME, - FIELD_NAME, - "String", - Collections.singletonList( - new ColumnLineageInputField( - "namespace", - "inDataset", - "some-col1", - "transformationDescription", - "transformationType"))), - ImmutableSet.of( - Edge.of( - NodeId.of(DATASET_FIELD_ID), - NodeId.of(new DatasetFieldId("namespace", "inDataset", "some-col1")))), - ImmutableSet.of( - Edge.of( - NodeId.of(new DatasetFieldId("namespace", "outDataset", "some-col2")), - NodeId.of(DATASET_FIELD_ID)))); - MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(node)); + MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(LINEAGE_NODE)); String lineageJson = lineage.toJson(); when(http.get( buildUrlFor( @@ -987,88 +991,13 @@ public void testGetColumnLineage() throws Exception { .thenReturn(lineageJson); Node retrievedNode = - client.getColumnLineageByDataset("namespace", "dataset").getGraph().stream() + client + .getColumnLineage(NodeId.of(new DatasetId("namespace", "dataset"))) + .getGraph() + .stream() .findAny() .get(); - assertThat(retrievedNode).isEqualTo(node); - } - - @Test - public void testGetColumnLineageByField() throws Exception { - Node node = - new Node( - NodeId.of(DATASET_FIELD_ID), - NodeType.DATASET_FIELD, - new ColumnLineageNodeData( - NAMESPACE_NAME, - DB_TABLE_NAME, - FIELD_NAME, - "String", - Collections.singletonList( - new ColumnLineageInputField( - "namespace", - "inDataset", - "some-col1", - "transformationDescription", - "transformationType"))), - ImmutableSet.of( - Edge.of( - NodeId.of(DATASET_FIELD_ID), - NodeId.of(new DatasetFieldId("namespace", "inDataset", "some-col1")))), - ImmutableSet.of( - Edge.of( - NodeId.of(new DatasetFieldId("namespace", "outDataset", "some-col2")), - NodeId.of(DATASET_FIELD_ID)))); - MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(node)); - String lineageJson = lineage.toJson(); - when(http.get( - buildUrlFor( - "/column-lineage?nodeId=datasetField%3Anamespace%3Adataset%3Asome-col1&depth=20&withDownstream=false"))) - .thenReturn(lineageJson); - - Node retrievedNode = - client.getColumnLineageByDataset("namespace", "dataset", "some-col1").getGraph().stream() - .findAny() - .get(); - assertThat(retrievedNode).isEqualTo(node); - } - - @Test - public void testGetColumnLineageByJob() throws Exception { - Node node = - new Node( - NodeId.of(DATASET_FIELD_ID), - NodeType.DATASET_FIELD, - new ColumnLineageNodeData( - NAMESPACE_NAME, - DB_TABLE_NAME, - FIELD_NAME, - "String", - Collections.singletonList( - new ColumnLineageInputField( - "namespace", - "inDataset", - "some-col1", - "transformationDescription", - "transformationType"))), - ImmutableSet.of( - Edge.of( - NodeId.of(DATASET_FIELD_ID), - NodeId.of(new DatasetFieldId("namespace", "inDataset", "some-col1")))), - ImmutableSet.of( - Edge.of( - NodeId.of(new DatasetFieldId("namespace", "outDataset", "some-col2")), - NodeId.of(DATASET_FIELD_ID)))); - MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(node)); - String lineageJson = lineage.toJson(); - when(http.get( - buildUrlFor( - "/column-lineage?nodeId=job%3Anamespace%3Ajob&depth=20&withDownstream=false"))) - .thenReturn(lineageJson); - - Node retrievedNode = - client.getColumnLineageByJob("namespace", "job").getGraph().stream().findAny().get(); - assertThat(retrievedNode).isEqualTo(node); + assertThat(retrievedNode).isEqualTo(LINEAGE_NODE); } private URL buildUrlFor(String pathTemplate) throws Exception { diff --git a/clients/java/src/test/java/marquez/client/MarquezUrlTest.java b/clients/java/src/test/java/marquez/client/MarquezUrlTest.java index 195f756981..20308e0c15 100644 --- a/clients/java/src/test/java/marquez/client/MarquezUrlTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezUrlTest.java @@ -7,6 +7,14 @@ import java.net.MalformedURLException; import java.net.URL; +import java.util.UUID; +import marquez.client.models.DatasetFieldId; +import marquez.client.models.DatasetFieldVersionId; +import marquez.client.models.DatasetId; +import marquez.client.models.DatasetVersionId; +import marquez.client.models.JobId; +import marquez.client.models.JobVersionId; +import marquez.client.models.NodeId; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -15,6 +23,7 @@ public class MarquezUrlTest { static String basePath = "http://marquez:5000"; static MarquezUrl marquezUrl; + static String version = UUID.randomUUID().toString(); @BeforeAll static void beforeAll() throws MalformedURLException { @@ -38,12 +47,54 @@ void testEncodedMarquezUrl() { void testToColumnLineageUrl() { Assertions.assertEquals( "http://marquez:5000/api/v1/column-lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20&withDownstream=true", - marquezUrl.toColumnLineageUrlByDataset("namespace", "dataset", 20, true).toString()); + marquezUrl + .toColumnLineageUrl(NodeId.of(new DatasetId("namespace", "dataset")), 20, true) + .toString()); Assertions.assertEquals( "http://marquez:5000/api/v1/column-lineage?nodeId=datasetField%3Anamespace%3Adataset%3Afield&depth=20&withDownstream=true", marquezUrl - .toColumnLineageUrlByDatasetField("namespace", "dataset", "field", 20, true) + .toColumnLineageUrl( + NodeId.of(new DatasetFieldId("namespace", "dataset", "field")), 20, true) + .toString()); + + Assertions.assertEquals( + "http://marquez:5000/api/v1/column-lineage?nodeId=job%3Anamespace%3Ajob&depth=20&withDownstream=true", + marquezUrl + .toColumnLineageUrl(NodeId.of(new JobId("namespace", "job")), 20, true) + .toString()); + + Assertions.assertEquals( + "http://marquez:5000/api/v1/column-lineage?nodeId=dataset%3Anamespace%3Adataset%23" + + version + + "&depth=20&withDownstream=true", + marquezUrl + .toColumnLineageUrl( + NodeId.of(new DatasetVersionId("namespace", "dataset", UUID.fromString(version))), + 20, + true) + .toString()); + + Assertions.assertEquals( + "http://marquez:5000/api/v1/column-lineage?nodeId=datasetField%3Anamespace%3Adataset%3Afield%23" + + version + + "&depth=20&withDownstream=true", + marquezUrl + .toColumnLineageUrl( + NodeId.of( + new DatasetFieldVersionId( + "namespace", "dataset", "field", UUID.fromString(version))), + 20, + true) + .toString()); + + Assertions.assertEquals( + "http://marquez:5000/api/v1/column-lineage?nodeId=job%3Anamespace%3Ajob%23" + + version + + "&depth=20&withDownstream=true", + marquezUrl + .toColumnLineageUrl( + NodeId.of(new JobVersionId("namespace", "job", UUID.fromString(version))), 20, true) .toString()); } } diff --git a/clients/java/src/test/java/marquez/client/models/NodeIdTest.java b/clients/java/src/test/java/marquez/client/models/NodeIdTest.java index 359c5e59a5..8a1ee55581 100644 --- a/clients/java/src/test/java/marquez/client/models/NodeIdTest.java +++ b/clients/java/src/test/java/marquez/client/models/NodeIdTest.java @@ -5,10 +5,12 @@ package marquez.client.models; +import static marquez.client.models.NodeId.VERSION_DELIM; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.UUID; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -62,4 +64,76 @@ public void testJob(String namespace, String job) { assertEquals(namespace, nodeId.asJobId().getNamespace()); assertEquals(job, nodeId.asJobId().getName()); } + + @ParameterizedTest(name = "testJobWithVersion-{index} {argumentsWithNames}") + @CsvSource( + value = { + "my-namespace$my-job#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "org://team$my-job#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + }, + delimiter = '$') + public void testJobWithVersion(String namespace, String job) { + JobId jobId = new JobId(namespace, job); + NodeId nodeId = NodeId.of(jobId); + assertTrue(nodeId.isJobType()); + assertFalse(nodeId.isDatasetType()); + assertTrue(nodeId.hasVersion()); + assertEquals(jobId, nodeId.asJobId()); + assertEquals(nodeId, NodeId.of(nodeId.getValue())); + assertEquals(namespace, nodeId.asJobId().getNamespace()); + assertEquals(job, nodeId.asJobId().getName()); + assertEquals(job.split(VERSION_DELIM)[1], nodeId.asJobVersionId().getVersion().toString()); + } + + @ParameterizedTest(name = "testDatasetWithVersion-{index} {argumentsWithNames}") + @CsvSource( + value = { + "my-namespace$my-dataset#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "gs://bucket$/path/to/data#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "postgresql://hostname:5432/database$my_table#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "my-namespace$my_struct#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + }, + delimiter = '$') + public void testDatasetWithVersion(String namespace, String dataset) { + DatasetId dsId = new DatasetId(namespace, dataset); + NodeId nodeId = NodeId.of(dsId); + assertFalse(nodeId.isJobType()); + assertTrue(nodeId.isDatasetType()); + assertTrue(nodeId.isDatasetVersionType()); + assertTrue(nodeId.hasVersion()); + assertEquals(dsId, nodeId.asDatasetId()); + assertEquals(namespace, nodeId.asDatasetId().getNamespace()); + assertEquals(dataset, nodeId.asDatasetId().getName()); + assertEquals( + dataset.split(VERSION_DELIM)[1], nodeId.asDatasetVersionId().getVersion().toString()); + } + + @ParameterizedTest(name = "testDatasetFieldWithVersion-{index} {argumentsWithNames}") + @CsvSource( + value = { + "my-namespace$my-dataset$colA#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "gs://bucket$/path/to/data$colA#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "gs://bucket$/path/to/data$col_A#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + }, + delimiter = '$') + public void testDatasetFieldWithVersion( + String namespace, String dataset, String fieldWithVersion) { + String version = fieldWithVersion.split(VERSION_DELIM)[1]; + String field = fieldWithVersion.split(VERSION_DELIM)[0]; + + DatasetFieldVersionId dsfId = + new DatasetFieldVersionId(namespace, dataset, field, UUID.fromString(version)); + NodeId nodeId = NodeId.of(dsfId); + assertFalse(nodeId.isJobType()); + assertFalse(nodeId.isDatasetType()); + assertTrue(nodeId.hasVersion()); + assertTrue(nodeId.isDatasetFieldVersionType()); + + assertEquals(dsfId, nodeId.asDatasetFieldVersionId()); + assertEquals(nodeId, NodeId.of(nodeId.getValue())); + assertEquals(namespace, nodeId.asDatasetFieldVersionId().getNamespace()); + assertEquals(dataset, nodeId.asDatasetFieldVersionId().getName()); + assertEquals(field, nodeId.asDatasetFieldVersionId().getField()); + assertEquals(version, nodeId.asDatasetFieldVersionId().getVersion().toString()); + } }