Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/spark): Platform instance and column level lineage fix #10843

Merged
merged 13 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class DatahubOpenlineageConfig {
@Builder.Default private String hivePlatformAlias = "hive";
@Builder.Default private Map<String, String> urnAliases = new HashMap<>();
@Builder.Default private final boolean disableSymlinkResolution = false;
@Builder.Default private final boolean lowerCaseDatasetUrns = false;

public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.Edge;
import com.linkedin.common.EdgeArray;
import com.linkedin.common.FabricType;
import com.linkedin.common.GlobalTags;
import com.linkedin.common.Owner;
import com.linkedin.common.OwnerArray;
Expand Down Expand Up @@ -57,6 +58,8 @@
import io.datahubproject.openlineage.dataset.DatahubJob;
import io.datahubproject.openlineage.dataset.HdfsPathDataset;
import io.datahubproject.openlineage.dataset.HdfsPlatform;
import io.datahubproject.openlineage.dataset.PathSpec;
import io.datahubproject.openlineage.utils.DatahubUtils;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import java.io.IOException;
Expand Down Expand Up @@ -151,6 +154,11 @@ public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
private static Optional<DatasetUrn> getDatasetUrnFromOlDataset(
String namespace, String datasetName, DatahubOpenlineageConfig mappingConfig) {
String platform;
if (mappingConfig.isLowerCaseDatasetUrns()) {
namespace = namespace.toLowerCase();
datasetName = datasetName.toLowerCase();
}

if (namespace.contains(SCHEME_SEPARATOR)) {
try {
URI datasetUri;
Expand Down Expand Up @@ -183,12 +191,45 @@ private static Optional<DatasetUrn> getDatasetUrnFromOlDataset(
platform = namespace;
}

if (mappingConfig.getCommonDatasetPlatformInstance() != null) {
datasetName = mappingConfig.getCommonDatasetPlatformInstance() + "." + datasetName;
String platformInstance = getPlatformInstance(mappingConfig, platform);
FabricType env = getEnv(mappingConfig, platform);
return Optional.of(DatahubUtils.createDatasetUrn(platform, platformInstance, datasetName, env));
}

private static FabricType getEnv(DatahubOpenlineageConfig mappingConfig, String platform) {
FabricType fabricType = mappingConfig.getFabricType();
if (mappingConfig.getPathSpecs() != null
&& mappingConfig.getPathSpecs().containsKey(platform)) {
List<PathSpec> path_specs = mappingConfig.getPathSpecs().get(platform);
for (PathSpec pathSpec : path_specs) {
if (pathSpec.getEnv().isPresent()) {
try {
fabricType = FabricType.valueOf(pathSpec.getEnv().get());
return fabricType;
} catch (IllegalArgumentException e) {
log.warn("Invalid environment value: {}", pathSpec.getEnv());
}
}
}
}
return fabricType;
}

return Optional.of(
new DatasetUrn(new DataPlatformUrn(platform), datasetName, mappingConfig.getFabricType()));
private static String getPlatformInstance(
DatahubOpenlineageConfig mappingConfig, String platform) {
// Use the platform instance from the path spec if it is present otherwise use the one from the
// commonDatasetPlatformInstance
String platformInstance = mappingConfig.getCommonDatasetPlatformInstance();
if (mappingConfig.getPathSpecs() != null
&& mappingConfig.getPathSpecs().containsKey(platform)) {
List<PathSpec> path_specs = mappingConfig.getPathSpecs().get(platform);
for (PathSpec pathSpec : path_specs) {
if (pathSpec.getPlatformInstance().isPresent()) {
return pathSpec.getPlatformInstance().get();
}
}
}
return platformInstance;
}

public static GlobalTags generateTags(List<String> tags) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,11 @@ private Pair<UrnArray, EdgeArray> processDownstreams(
for (Urn downstream :
Objects.requireNonNull(fineGrainedLineage.getDownstreams())) {
upstreamLineagePatchBuilder.addFineGrainedUpstreamField(
downstream,
upstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
upstream,
downstream,
null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class PathSpec {
final String alias;
final String platform;
@Builder.Default final String env = "PROD";
@Builder.Default final Optional<String> env = Optional.empty();
final List<String> pathSpecList;
@Builder.Default final Optional<String> platformInstance = Optional.empty();
}
12 changes: 10 additions & 2 deletions metadata-integration/java/spark-lineage-beta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ information like tokens.
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run . |
treff7es marked this conversation as resolved.
Show resolved Hide resolved
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. |
|
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is disabled. |
treff7es marked this conversation as resolved.
Show resolved Hide resolved
| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default it is disabled. |
treff7es marked this conversation as resolved.
Show resolved Hide resolved
| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer not use dataset symlink (for example if you prefer to have the s3 location instead of the Hive table). By default it is disabled. |
treff7es marked this conversation as resolved.
Show resolved Hide resolved

treff7es marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix grammatical and punctuation errors.

There are minor grammatical and punctuation issues in the descriptions.

- By default it is disabled.
+ By default, it is disabled.

- Set this to true to lowercase dataset urns. By default it is disabled.
+ Set this to true to lowercase dataset URNs. By default, it is disabled.

- Set this to true if you prefer to have the s3 location instead of the Hive table. By default it is disabled.
+ Set this to true if you prefer using the S3 location instead of the Hive table. By default, it is disabled.
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is disabled. |
| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default it is disabled. |
| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer not use dataset symlink (for example if you prefer to have the s3 location instead of the Hive table). By default it is disabled. |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. |
| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset URNs. By default, it is disabled. |
| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the S3 location instead of the Hive table. By default, it is disabled. |
Tools
LanguageTool

[uncategorized] ~185-~185: Did you mean: “By default,”?
Context: ...rwrites existing Dataset lineage edges. By default it is disabled. ...

(BY_DEFAULT_COMMA)


[uncategorized] ~186-~186: Did you mean: “By default,”?
Context: ...this to true to lowercase dataset urns. By default it is disabled. ...

(BY_DEFAULT_COMMA)


[style] ~187-~187: ‘prefer to have’ might be wordy. Consider a shorter alternative.
Context: ...use dataset symlink (for example if you prefer to have the s3 location instead of the Hive tab...

(EN_WORDINESS_PREMIUM_PREFER_TO_HAVE)


[uncategorized] ~187-~187: Did you mean: “By default,”?
Context: ...s3 location instead of the Hive table). By default it is disabled. ...

(BY_DEFAULT_COMMA)


## What to Expect: The Metadata Model

Expand Down Expand Up @@ -343,3 +345,9 @@ Use Java 8 to build the project. The project uses Gradle as the build tool. To b
```
## Known limitations

## Changelog
treff7es marked this conversation as resolved.
Show resolved Hide resolved
### Version 0.2.12
treff7es marked this conversation as resolved.
Show resolved Hide resolved
- Add option to lowercase dataset URNs
treff7es marked this conversation as resolved.
Show resolved Hide resolved
- Add option to set platform instance and/or env per platform with `spark.datahub.platform.<platform_name>.env` and `spark.datahub.platform.<platform_name>.platform_instance` config parameter
- Fixing platform instance setting for datasets when `spark.datahub.metadata.dataset.platformInstance` is set
- Fixing column level lineage support when patch is enabled
treff7es marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class SparkConfigParser {
public static final String DATAHUB_FLOW_NAME = "flow_name";
public static final String DATASET_ENV_KEY = "metadata.dataset.env";
public static final String DATASET_HIVE_PLATFORM_ALIAS = "metadata.dataset.hivePlatformAlias";
public static final String DATASET_LOWERCASE_URNS = "metadata.dataset.lowerCaseUrns";

public static final String DATASET_MATERIALIZE_KEY = "metadata.dataset.materialize";
public static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
public static final String DATASET_INCLUDE_SCHEMA_METADATA =
Expand Down Expand Up @@ -152,6 +154,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf(
builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig));
builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig));
builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig));
builder.lowerCaseDatasetUrns(SparkConfigParser.isLowerCaseDatasetUrns(sparkConfig));
try {
String parentJob = SparkConfigParser.getParentJobKey(sparkConfig);
if (parentJob != null) {
Expand Down Expand Up @@ -246,15 +249,18 @@ public static Map<String, List<PathSpec>> getPathSpecListMap(Config datahubConfi
pathSpecBuilder.alias(pathSpecKey);
pathSpecBuilder.platform(key);
if (datahubConfig.hasPath(aliasKey + ".env")) {
pathSpecBuilder.env(datahubConfig.getString(aliasKey + ".env"));
pathSpecBuilder.env(Optional.ofNullable(datahubConfig.getString(aliasKey + ".env")));
}
if (datahubConfig.hasPath(aliasKey + ".platformInstance")) {
if (datahubConfig.hasPath(aliasKey + "." + PLATFORM_INSTANCE_KEY)) {
pathSpecBuilder.platformInstance(
Optional.ofNullable(datahubConfig.getString(aliasKey + ".platformInstance")));
Optional.ofNullable(
datahubConfig.getString(aliasKey + "." + PLATFORM_INSTANCE_KEY)));
}
if (datahubConfig.hasPath(aliasKey + "." + PATH_SPEC_LIST_KEY)) {
pathSpecBuilder.pathSpecList(
Arrays.asList(
datahubConfig.getString(aliasKey + "." + PATH_SPEC_LIST_KEY).split(",")));
}
pathSpecBuilder.pathSpecList(
Arrays.asList(datahubConfig.getString(aliasKey + "." + pathSpecKey).split(",")));

platformSpecs.add(pathSpecBuilder.build());
}
pathSpecMap.put(key, platformSpecs);
Expand All @@ -264,8 +270,8 @@ public static Map<String, List<PathSpec>> getPathSpecListMap(Config datahubConfi
}

public static String getPlatformInstance(Config pathSpecConfig) {
return pathSpecConfig.hasPath(PLATFORM_INSTANCE_KEY)
? pathSpecConfig.getString(PLATFORM_INSTANCE_KEY)
return pathSpecConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)
? pathSpecConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY)
: null;
}

Expand Down Expand Up @@ -341,4 +347,9 @@ public static boolean isEmitCoalescePeriodically(Config datahubConfig) {
return datahubConfig.hasPath(STAGE_METADATA_COALESCING)
&& datahubConfig.getBoolean(STAGE_METADATA_COALESCING);
}

public static boolean isLowerCaseDatasetUrns(Config datahubConfig) {
return datahubConfig.hasPath(DATASET_LOWERCASE_URNS)
&& datahubConfig.getBoolean(DATASET_LOWERCASE_URNS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import io.datahubproject.openlineage.converter.OpenLineageToDataHub;
import io.datahubproject.openlineage.dataset.DatahubDataset;
import io.datahubproject.openlineage.dataset.DatahubJob;
import io.datahubproject.openlineage.dataset.PathSpec;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import junit.framework.TestCase;
Expand Down Expand Up @@ -598,4 +600,158 @@ public void testProcessRedshiftOutput() throws URISyntaxException, IOException {
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}

public void testProcessRedshiftOutputWithPlatformInstance()
throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.hivePlatformAlias("glue");
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.commonDatasetPlatformInstance("my-platform-instance");

String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"),
StandardCharsets.UTF_8);

OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());

assertNotNull(datahubJob);

for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:mysql,my-platform-instance.datahub.metadata_aspect_v2,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:redshift,my-platform-instance.dev.public.spark_redshift_load_test,DEV)",
dataset.getUrn().toString());
assertEquals(
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}

public void testProcessRedshiftOutputWithPlatformSpecificPlatformInstance()
throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.hivePlatformAlias("glue");
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.pathSpecs(
new HashMap<String, List<PathSpec>>() {
{
put(
"redshift",
List.of(
PathSpec.builder()
.platform("redshift")
.platformInstance(Optional.of("my-platform-instance"))
.build()));
}
});

String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"),
StandardCharsets.UTF_8);

OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());

assertNotNull(datahubJob);

for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:redshift,my-platform-instance.dev.public.spark_redshift_load_test,DEV)",
dataset.getUrn().toString());
assertEquals(
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}

public void testProcessRedshiftOutputWithPlatformSpecificEnv()
throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.hivePlatformAlias("glue");
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.pathSpecs(
new HashMap<String, List<PathSpec>>() {
{
put(
"redshift",
List.of(PathSpec.builder().platform("redshift").env(Optional.of("PROD")).build()));
}
});

String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"),
StandardCharsets.UTF_8);

OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());

assertNotNull(datahubJob);

for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.spark_redshift_load_test,PROD)",
dataset.getUrn().toString());
assertEquals(
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}

public void testProcessRedshiftOutputLowercasedUrns() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.hivePlatformAlias("glue");
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.lowerCaseDatasetUrns(true);

String olEvent =
IOUtils.toString(
this.getClass()
.getResourceAsStream("/ol_events/redshift_mixed_case_lineage_spark.json"),
StandardCharsets.UTF_8);

OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());

assertNotNull(datahubJob);

for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.spark_redshift_load_test,DEV)",
dataset.getUrn().toString());
assertEquals(
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}
}
Loading
Loading