diff --git a/build.gradle b/build.gradle index 22217f2149c5e..f8a911b1e4d31 100644 --- a/build.gradle +++ b/build.gradle @@ -114,6 +114,7 @@ project.ext.externalDependency = [ 'jsonSchemaAvro': 'com.github.fge:json-schema-avro:0.1.4', 'jsonSimple': 'com.googlecode.json-simple:json-simple:1.1.1', 'jsonSmart': 'net.minidev:json-smart:2.4.6', + 'json': 'org.json:json:20090211', 'junitJupiterApi': "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion", 'junitJupiterParams': "org.junit.jupiter:junit-jupiter-params:$junitJupiterVersion", 'junitJupiterEngine': "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion", diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java index ccec5f4b6760a..1f67856e69f9d 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java @@ -22,6 +22,7 @@ import com.linkedin.metadata.key.ExecutionRequestKey; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.metadata.utils.IngestionUtils; import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; @@ -105,7 +106,10 @@ public CompletableFuture get(final DataFetchingEnvironment environment) execInput.setRequestedAt(System.currentTimeMillis()); Map arguments = new HashMap<>(); - arguments.put(RECIPE_ARG_NAME, injectRunId(ingestionSourceInfo.getConfig().getRecipe(), executionRequestUrn.toString())); + String recipe = ingestionSourceInfo.getConfig().getRecipe(); + recipe = injectRunId(recipe, executionRequestUrn.toString()); + recipe = IngestionUtils.injectPipelineName(recipe, executionRequestUrn.toString()); + arguments.put(RECIPE_ARG_NAME, recipe); arguments.put(VERSION_ARG_NAME, ingestionSourceInfo.getConfig().hasVersion() ? ingestionSourceInfo.getConfig().getVersion() : _ingestionConfiguration.getDefaultCliVersion() diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java index 8e101be7accdd..41e786927c01a 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java @@ -21,8 +21,6 @@ import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; import lombok.extern.slf4j.Slf4j; -import org.json.JSONException; -import org.json.JSONObject; import java.net.URISyntaxException; import java.util.Optional; @@ -56,7 +54,6 @@ public CompletableFuture get(final DataFetchingEnvironment environment) final UpdateIngestionSourceInput input = bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class); final MetadataChangeProposal proposal = new MetadataChangeProposal(); - String ingestionSourceUrnString; if (ingestionSourceUrn.isPresent()) { // Update existing ingestion source @@ -67,7 +64,6 @@ public CompletableFuture get(final DataFetchingEnvironment environment) String.format("Malformed urn %s provided.", ingestionSourceUrn.get()), DataHubGraphQLErrorCode.BAD_REQUEST); } - ingestionSourceUrnString = ingestionSourceUrn.get(); } else { // Create new ingestion source // Since we are creating a new Ingestion Source, we need to generate a unique UUID. @@ -78,11 +74,10 @@ public CompletableFuture get(final DataFetchingEnvironment environment) final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey(); key.setId(uuidStr); proposal.setEntityKeyAspect(GenericRecordUtils.serializeAspect(key)); - ingestionSourceUrnString = String.format("urn:li:dataHubIngestionSource:%s", uuidStr); } // Create the policy info. - final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input, ingestionSourceUrnString); + final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input); proposal.setEntityType(Constants.INGESTION_SOURCE_ENTITY_NAME); proposal.setAspectName(Constants.INGESTION_INFO_ASPECT_NAME); proposal.setAspect(GenericRecordUtils.serializeAspect(info)); @@ -98,23 +93,20 @@ public CompletableFuture get(final DataFetchingEnvironment environment) }); } - private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input, final String ingestionSourceUrn) { + private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input) { final DataHubIngestionSourceInfo result = new DataHubIngestionSourceInfo(); result.setType(input.getType()); result.setName(input.getName()); - result.setConfig(mapConfig(input.getConfig(), ingestionSourceUrn)); + result.setConfig(mapConfig(input.getConfig())); if (input.getSchedule() != null) { result.setSchedule(mapSchedule(input.getSchedule())); } return result; } - private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input, final String ingestionSourceUrn) { + private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input) { final DataHubIngestionSourceConfig result = new DataHubIngestionSourceConfig(); String recipe = input.getRecipe(); - if (recipe != null) { - recipe = optionallySetPipelineName(recipe, ingestionSourceUrn); - } result.setRecipe(recipe); if (input.getVersion() != null) { result.setVersion(input.getVersion()); @@ -134,19 +126,4 @@ private DataHubIngestionSourceSchedule mapSchedule(final UpdateIngestionSourceSc result.setTimezone(input.getTimezone()); return result; } - - private String optionallySetPipelineName(String recipe, String ingestionSourceUrn) { - try { - JSONObject jsonRecipe = new JSONObject(recipe); - boolean hasPipelineName = jsonRecipe.has("pipeline_name") && jsonRecipe.get("pipeline_name") != null && !jsonRecipe.get("pipeline_name").equals(""); - - if (!hasPipelineName) { - jsonRecipe.put("pipeline_name", ingestionSourceUrn); - recipe = jsonRecipe.toString(); - } - } catch (JSONException e) { - log.warn("Error parsing ingestion recipe in JSON form", e); - } - return recipe; - } } diff --git a/ingestion-scheduler/src/main/java/com/datahub/metadata/ingestion/IngestionScheduler.java b/ingestion-scheduler/src/main/java/com/datahub/metadata/ingestion/IngestionScheduler.java index 968cb86c4495b..5d50c0a326054 100644 --- a/ingestion-scheduler/src/main/java/com/datahub/metadata/ingestion/IngestionScheduler.java +++ b/ingestion-scheduler/src/main/java/com/datahub/metadata/ingestion/IngestionScheduler.java @@ -21,6 +21,7 @@ import com.linkedin.metadata.key.ExecutionRequestKey; import com.linkedin.metadata.query.ListResult; import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.metadata.utils.IngestionUtils; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.r2.RemoteInvocationException; import java.util.ArrayList; @@ -347,7 +348,8 @@ public void run() { input.setRequestedAt(System.currentTimeMillis()); Map arguments = new HashMap<>(); - arguments.put(RECIPE_ARGUMENT_NAME, _ingestionSourceInfo.getConfig().getRecipe()); + String recipe = IngestionUtils.injectPipelineName(_ingestionSourceInfo.getConfig().getRecipe(), _ingestionSourceUrn.toString()); + arguments.put(RECIPE_ARGUMENT_NAME, recipe); arguments.put(VERSION_ARGUMENT_NAME, _ingestionSourceInfo.getConfig().hasVersion() ? _ingestionSourceInfo.getConfig().getVersion() : _ingestionConfiguration.getDefaultCliVersion()); diff --git a/metadata-utils/build.gradle b/metadata-utils/build.gradle index 3ab83a97d27df..9124480146aba 100644 --- a/metadata-utils/build.gradle +++ b/metadata-utils/build.gradle @@ -8,6 +8,7 @@ dependencies { compile externalDependency.elasticSearchRest compile externalDependency.httpClient compile externalDependency.neo4jJavaDriver + compile externalDependency.json compile spec.product.pegasus.restliClient compile spec.product.pegasus.restliCommon diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/IngestionUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/IngestionUtils.java new file mode 100644 index 0000000000000..d923005c8c023 --- /dev/null +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/IngestionUtils.java @@ -0,0 +1,37 @@ +package com.linkedin.metadata.utils; + +import org.json.JSONException; +import org.json.JSONObject; + +import javax.annotation.Nonnull; + + +public class IngestionUtils { + + private static final String PIPELINE_NAME = "pipeline_name"; + + private IngestionUtils() { + } + + /** + * Injects a pipeline_name into a recipe if there isn't a pipeline_name already there. + * The pipeline_name will be the urn of the ingestion source. + * + * @param pipelineName the new pipeline name in the recipe. + * @return a modified recipe JSON string + */ + public static String injectPipelineName(@Nonnull String originalJson, @Nonnull final String pipelineName) { + try { + final JSONObject jsonRecipe = new JSONObject(originalJson); + boolean hasPipelineName = jsonRecipe.has(PIPELINE_NAME) && jsonRecipe.get(PIPELINE_NAME) != null && !jsonRecipe.get(PIPELINE_NAME).equals(""); + + if (!hasPipelineName) { + jsonRecipe.put(PIPELINE_NAME, pipelineName); + return jsonRecipe.toString(); + } + } catch (JSONException e) { + throw new IllegalArgumentException("Failed to create execution request: Invalid recipe json provided.", e); + } + return originalJson; + } +} diff --git a/metadata-utils/src/test/java/com/linkedin/metadata/utils/IngestionUtilsTest.java b/metadata-utils/src/test/java/com/linkedin/metadata/utils/IngestionUtilsTest.java new file mode 100644 index 0000000000000..8b2078c7b9533 --- /dev/null +++ b/metadata-utils/src/test/java/com/linkedin/metadata/utils/IngestionUtilsTest.java @@ -0,0 +1,29 @@ +package com.linkedin.metadata.utils; + +import org.testng.annotations.Test; + + +import static org.testng.Assert.assertEquals; + +public class IngestionUtilsTest { + + private final String ingestionSourceUrn = "urn:li:ingestionSource:12345"; + + @Test + public void injectPipelineNameWhenThere() { + String recipe = "{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}},\"pipeline_name\":\"test\"}"; + + assertEquals(recipe, IngestionUtils.injectPipelineName(recipe, ingestionSourceUrn)); + } + + @Test + public void injectPipelineNameWhenNotThere() { + String recipe = "{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}}}"; + recipe = IngestionUtils.injectPipelineName(recipe, ingestionSourceUrn); + + assertEquals( + recipe, + "{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}},\"pipeline_name\":\"urn:li:ingestionSource:12345\"}" + ); + } +}