Skip to content

Commit

Permalink
fix(ingestion) Inject pipeline_name into recipes at runtime (#6833)
Browse files Browse the repository at this point in the history
  • Loading branch information
chriscollins3456 authored Dec 22, 2022
1 parent 4cba09e commit 2ef2ad0
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 29 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ project.ext.externalDependency = [
'jsonSchemaAvro': 'com.github.fge:json-schema-avro:0.1.4',
'jsonSimple': 'com.googlecode.json-simple:json-simple:1.1.1',
'jsonSmart': 'net.minidev:json-smart:2.4.6',
'json': 'org.json:json:20090211',
'junitJupiterApi': "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion",
'junitJupiterParams': "org.junit.jupiter:junit-jupiter-params:$junitJupiterVersion",
'junitJupiterEngine': "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linkedin.metadata.key.ExecutionRequestKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.IngestionUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
Expand Down Expand Up @@ -105,7 +106,10 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
execInput.setRequestedAt(System.currentTimeMillis());

Map<String, String> arguments = new HashMap<>();
arguments.put(RECIPE_ARG_NAME, injectRunId(ingestionSourceInfo.getConfig().getRecipe(), executionRequestUrn.toString()));
String recipe = ingestionSourceInfo.getConfig().getRecipe();
recipe = injectRunId(recipe, executionRequestUrn.toString());
recipe = IngestionUtils.injectPipelineName(recipe, executionRequestUrn.toString());
arguments.put(RECIPE_ARG_NAME, recipe);
arguments.put(VERSION_ARG_NAME, ingestionSourceInfo.getConfig().hasVersion()
? ingestionSourceInfo.getConfig().getVersion()
: _ingestionConfiguration.getDefaultCliVersion()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONException;
import org.json.JSONObject;

import java.net.URISyntaxException;
import java.util.Optional;
Expand Down Expand Up @@ -56,7 +54,6 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
final UpdateIngestionSourceInput input = bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);

final MetadataChangeProposal proposal = new MetadataChangeProposal();
String ingestionSourceUrnString;

if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source
Expand All @@ -67,7 +64,6 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
ingestionSourceUrnString = ingestionSourceUrn.get();
} else {
// Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
Expand All @@ -78,11 +74,10 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr);
proposal.setEntityKeyAspect(GenericRecordUtils.serializeAspect(key));
ingestionSourceUrnString = String.format("urn:li:dataHubIngestionSource:%s", uuidStr);
}

// Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input, ingestionSourceUrnString);
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
proposal.setEntityType(Constants.INGESTION_SOURCE_ENTITY_NAME);
proposal.setAspectName(Constants.INGESTION_INFO_ASPECT_NAME);
proposal.setAspect(GenericRecordUtils.serializeAspect(info));
Expand All @@ -98,23 +93,20 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
});
}

private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input, final String ingestionSourceUrn) {
private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input) {
final DataHubIngestionSourceInfo result = new DataHubIngestionSourceInfo();
result.setType(input.getType());
result.setName(input.getName());
result.setConfig(mapConfig(input.getConfig(), ingestionSourceUrn));
result.setConfig(mapConfig(input.getConfig()));
if (input.getSchedule() != null) {
result.setSchedule(mapSchedule(input.getSchedule()));
}
return result;
}

private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input, final String ingestionSourceUrn) {
private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input) {
final DataHubIngestionSourceConfig result = new DataHubIngestionSourceConfig();
String recipe = input.getRecipe();
if (recipe != null) {
recipe = optionallySetPipelineName(recipe, ingestionSourceUrn);
}
result.setRecipe(recipe);
if (input.getVersion() != null) {
result.setVersion(input.getVersion());
Expand All @@ -134,19 +126,4 @@ private DataHubIngestionSourceSchedule mapSchedule(final UpdateIngestionSourceSc
result.setTimezone(input.getTimezone());
return result;
}

private String optionallySetPipelineName(String recipe, String ingestionSourceUrn) {
try {
JSONObject jsonRecipe = new JSONObject(recipe);
boolean hasPipelineName = jsonRecipe.has("pipeline_name") && jsonRecipe.get("pipeline_name") != null && !jsonRecipe.get("pipeline_name").equals("");

if (!hasPipelineName) {
jsonRecipe.put("pipeline_name", ingestionSourceUrn);
recipe = jsonRecipe.toString();
}
} catch (JSONException e) {
log.warn("Error parsing ingestion recipe in JSON form", e);
}
return recipe;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.metadata.key.ExecutionRequestKey;
import com.linkedin.metadata.query.ListResult;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.IngestionUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.r2.RemoteInvocationException;
import java.util.ArrayList;
Expand Down Expand Up @@ -347,7 +348,8 @@ public void run() {
input.setRequestedAt(System.currentTimeMillis());

Map<String, String> arguments = new HashMap<>();
arguments.put(RECIPE_ARGUMENT_NAME, _ingestionSourceInfo.getConfig().getRecipe());
String recipe = IngestionUtils.injectPipelineName(_ingestionSourceInfo.getConfig().getRecipe(), _ingestionSourceUrn.toString());
arguments.put(RECIPE_ARGUMENT_NAME, recipe);
arguments.put(VERSION_ARGUMENT_NAME, _ingestionSourceInfo.getConfig().hasVersion()
? _ingestionSourceInfo.getConfig().getVersion()
: _ingestionConfiguration.getDefaultCliVersion());
Expand Down
1 change: 1 addition & 0 deletions metadata-utils/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
compile externalDependency.elasticSearchRest
compile externalDependency.httpClient
compile externalDependency.neo4jJavaDriver
compile externalDependency.json

compile spec.product.pegasus.restliClient
compile spec.product.pegasus.restliCommon
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.linkedin.metadata.utils;

import org.json.JSONException;
import org.json.JSONObject;

import javax.annotation.Nonnull;


public class IngestionUtils {

private static final String PIPELINE_NAME = "pipeline_name";

private IngestionUtils() {
}

/**
* Injects a pipeline_name into a recipe if there isn't a pipeline_name already there.
* The pipeline_name will be the urn of the ingestion source.
*
* @param pipelineName the new pipeline name in the recipe.
* @return a modified recipe JSON string
*/
public static String injectPipelineName(@Nonnull String originalJson, @Nonnull final String pipelineName) {
try {
final JSONObject jsonRecipe = new JSONObject(originalJson);
boolean hasPipelineName = jsonRecipe.has(PIPELINE_NAME) && jsonRecipe.get(PIPELINE_NAME) != null && !jsonRecipe.get(PIPELINE_NAME).equals("");

if (!hasPipelineName) {
jsonRecipe.put(PIPELINE_NAME, pipelineName);
return jsonRecipe.toString();
}
} catch (JSONException e) {
throw new IllegalArgumentException("Failed to create execution request: Invalid recipe json provided.", e);
}
return originalJson;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.metadata.utils;

import org.testng.annotations.Test;


import static org.testng.Assert.assertEquals;

public class IngestionUtilsTest {

private final String ingestionSourceUrn = "urn:li:ingestionSource:12345";

@Test
public void injectPipelineNameWhenThere() {
String recipe = "{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}},\"pipeline_name\":\"test\"}";

assertEquals(recipe, IngestionUtils.injectPipelineName(recipe, ingestionSourceUrn));
}

@Test
public void injectPipelineNameWhenNotThere() {
String recipe = "{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}}}";
recipe = IngestionUtils.injectPipelineName(recipe, ingestionSourceUrn);

assertEquals(
recipe,
"{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}},\"pipeline_name\":\"urn:li:ingestionSource:12345\"}"
);
}
}

0 comments on commit 2ef2ad0

Please sign in to comment.