From 4d34335761466ef765efc41b64fbf58562fbb583 Mon Sep 17 00:00:00 2001 From: evantahler Date: Fri, 20 Jan 2023 11:38:51 -0800 Subject: [PATCH 1/7] Pass allowed hosts to Process Factories and log --- .../workers/general/DbtTransformationRunner.java | 1 + .../workers/helper/EntrypointEnvChecker.java | 1 + .../normalization/DefaultNormalizationRunner.java | 1 + .../process/AirbyteIntegrationLauncher.java | 9 +++++++++ .../workers/process/DockerProcessFactory.java | 4 +++- .../workers/process/KubeProcessFactory.java | 5 ++++- .../io/airbyte/workers/process/ProcessFactory.java | 2 ++ .../DefaultNormalizationRunnerTest.java | 1 + .../process/AirbyteIntegrationLauncherTest.java | 8 ++++++-- .../workers/process/DockerProcessFactoryTest.java | 4 +++- .../io/airbyte/config/persistence/DbConverter.java | 11 +++++++++-- .../main/resources/seed/source_definitions.yaml | 3 +++ .../orchestrator/ReplicationJobOrchestrator.java | 2 ++ .../destination/DestinationAcceptanceTest.java | 8 ++++---- .../source/AbstractSourceConnectorTest.java | 14 +++++++------- airbyte-worker-models/build.gradle | 4 ++++ .../workers_models/IntegrationLauncherConfig.yaml | 3 +++ .../connection/CheckConnectionActivityImpl.java | 1 + .../catalog/DiscoverCatalogActivityImpl.java | 3 ++- .../activities/GenerateInputActivityImpl.java | 10 ++++++++-- .../workers/temporal/spec/SpecActivityImpl.java | 1 + .../temporal/sync/ReplicationActivityImpl.java | 2 ++ .../process/KubePodProcessIntegrationTest.java | 2 +- 23 files changed, 78 insertions(+), 22 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationRunner.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationRunner.java index 7abd7bc7b672..1a42902b86a1 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationRunner.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationRunner.java @@ -107,6 +107,7 @@ public boolean transform(final String jobId, files, "/bin/bash", resourceRequirements, + null, Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, CUSTOM_STEP), Collections.emptyMap(), Collections.emptyMap(), diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/EntrypointEnvChecker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/EntrypointEnvChecker.java index a88ae8a62f51..09a0cd58d068 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/EntrypointEnvChecker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/EntrypointEnvChecker.java @@ -45,6 +45,7 @@ public static String getEntrypointEnvVariable(final ProcessFactory processFactor Collections.emptyMap(), "printenv", null, + null, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index 8c2813804069..ce6177f34b34 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -134,6 +134,7 @@ private boolean runProcess(final String jobId, false, files, null, resourceRequirements, + null, Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, NORMALIZE_STEP), Collections.emptyMap(), Collections.emptyMap(), diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 6b8c9ac347cd..17a7853f51f5 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -23,6 +23,7 @@ import datadog.trace.api.Trace; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.config.AllowedHosts; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.WorkerEnvConstants; import io.airbyte.metrics.lib.ApmTraceUtils; @@ -50,12 +51,14 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher { * At this moment, we put custom connector jobs into an isolated pool. */ private final boolean useIsolatedPool; + private final AllowedHosts allowedHosts; public AirbyteIntegrationLauncher(final String jobId, final int attempt, final String imageName, final ProcessFactory processFactory, final ResourceRequirements resourceRequirement, + final AllowedHosts allowedHosts, final boolean useIsolatedPool, final FeatureFlags featureFlags) { this.jobId = jobId; @@ -63,6 +66,7 @@ public AirbyteIntegrationLauncher(final String jobId, this.imageName = imageName; this.processFactory = processFactory; this.resourceRequirement = resourceRequirement; + this.allowedHosts = allowedHosts; this.featureFlags = featureFlags; this.useIsolatedPool = useIsolatedPool; } @@ -82,6 +86,7 @@ public Process spec(final Path jobRoot) throws WorkerException { Collections.emptyMap(), null, resourceRequirement, + allowedHosts, Map.of(JOB_TYPE_KEY, SPEC_JOB), getWorkerMetadata(), Collections.emptyMap(), @@ -103,6 +108,7 @@ public Process check(final Path jobRoot, final String configFilename, final Stri ImmutableMap.of(configFilename, configContents), null, resourceRequirement, + allowedHosts, Map.of(JOB_TYPE_KEY, CHECK_JOB), getWorkerMetadata(), Collections.emptyMap(), @@ -125,6 +131,7 @@ public Process discover(final Path jobRoot, final String configFilename, final S ImmutableMap.of(configFilename, configContents), null, resourceRequirement, + allowedHosts, Map.of(JOB_TYPE_KEY, DISCOVER_JOB), getWorkerMetadata(), Collections.emptyMap(), @@ -171,6 +178,7 @@ public Process read(final Path jobRoot, files, null, resourceRequirement, + allowedHosts, Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, READ_STEP), getWorkerMetadata(), Collections.emptyMap(), @@ -201,6 +209,7 @@ public Process write(final Path jobRoot, files, null, resourceRequirement, + allowedHosts, Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, WRITE_STEP), getWorkerMetadata(), Collections.emptyMap(), diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java index 5ed26798030b..29bb78af1262 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java @@ -12,6 +12,7 @@ import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.map.MoreMaps; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.AllowedHosts; import io.airbyte.config.ResourceRequirements; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerUtils; @@ -88,6 +89,7 @@ public Process create(final String jobType, final Map files, final String entrypoint, final ResourceRequirements resourceRequirements, + final AllowedHosts allowedHosts, final Map labels, final Map jobMetadata, final Map internalToExternalPorts, @@ -117,7 +119,7 @@ public Process create(final String jobType, "--log-driver", "none"); final String containerName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, DOCKER_NAME_LEN_LIMIT); - LOGGER.info("Creating docker container = {} with resources {}", containerName, resourceRequirements); + LOGGER.info("Creating docker container = {} with resources {} and allowed hosts {}", containerName, resourceRequirements, allowedHosts); cmd.add("--name"); cmd.add(containerName); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 8643ba99219e..a8d7cc5e411d 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -7,6 +7,7 @@ import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.map.MoreMaps; +import io.airbyte.config.AllowedHosts; import io.airbyte.config.ResourceRequirements; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.exception.WorkerException; @@ -85,6 +86,7 @@ public Process create( final Map files, final String entrypoint, final ResourceRequirements resourceRequirements, + final AllowedHosts allowedHosts, final Map customLabels, final Map jobMetadata, final Map internalToExternalPorts, @@ -93,7 +95,8 @@ public Process create( try { // used to differentiate source and destination processes with the same id and attempt final String podName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, KUBE_NAME_LEN_LIMIT); - LOGGER.info("Attempting to start pod = {} for {} with resources {}", podName, imageName, resourceRequirements); + LOGGER.info("Attempting to start pod = {} for {} with resources {} and allowed hosts {}", podName, imageName, resourceRequirements, + allowedHosts); final int stdoutLocalPort = KubePortManagerSingleton.getInstance().take(); LOGGER.info("{} stdoutLocalPort = {}", podName, stdoutLocalPort); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/ProcessFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/ProcessFactory.java index d9cf45084a06..5c4bd09e18c8 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/ProcessFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/ProcessFactory.java @@ -4,6 +4,7 @@ package io.airbyte.workers.process; +import io.airbyte.config.AllowedHosts; import io.airbyte.config.ResourceRequirements; import io.airbyte.workers.exception.WorkerException; import java.nio.file.Path; @@ -49,6 +50,7 @@ Process create(String jobType, final Map files, final String entrypoint, final ResourceRequirements resourceRequirements, + final AllowedHosts allowedHosts, final Map labels, final Map jobMetadata, final Map portMapping, diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java index db3bbc57671e..7fb14d19948b 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java @@ -91,6 +91,7 @@ void setup() throws IOException, WorkerException { when(processFactory.create(NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), false, false, files, null, workerConfigs.getResourceRequirements(), + null, Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, NORMALIZE_STEP), Map.of(), Map.of(), diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 458885c0d940..c6ec9ed2f49c 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -69,7 +69,7 @@ class AirbyteIntegrationLauncherTest { @BeforeEach void setUp() { workerConfigs = new WorkerConfigs(new EnvConfigs()); - launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), false, + launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), null, false, featureFlags); } @@ -78,7 +78,7 @@ void spec() throws WorkerException { launcher.spec(JOB_ROOT); Mockito.verify(processFactory).create(SPEC_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, Collections.emptyMap(), null, - workerConfigs.getResourceRequirements(), Map.of(JOB_TYPE_KEY, SPEC_JOB), JOB_METADATA, + workerConfigs.getResourceRequirements(), null, Map.of(JOB_TYPE_KEY, SPEC_JOB), JOB_METADATA, Map.of(), "spec"); } @@ -89,6 +89,7 @@ void check() throws WorkerException { Mockito.verify(processFactory).create(CHECK_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_FILES, null, workerConfigs.getResourceRequirements(), + null, Map.of(JOB_TYPE_KEY, CHECK_JOB), JOB_METADATA, Map.of(), @@ -102,6 +103,7 @@ void discover() throws WorkerException { Mockito.verify(processFactory).create(DISCOVER_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_FILES, null, workerConfigs.getResourceRequirements(), + null, Map.of(JOB_TYPE_KEY, DISCOVER_JOB), JOB_METADATA, Map.of(), @@ -115,6 +117,7 @@ void read() throws WorkerException { Mockito.verify(processFactory).create(READ_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_CATALOG_STATE_FILES, null, workerConfigs.getResourceRequirements(), + null, Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, READ_STEP), JOB_METADATA, Map.of(), @@ -131,6 +134,7 @@ void write() throws WorkerException { Mockito.verify(processFactory).create(WRITE_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, true, CONFIG_CATALOG_FILES, null, workerConfigs.getResourceRequirements(), + null, Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, WRITE_STEP), JOB_METADATA, Map.of(), diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/DockerProcessFactoryTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/DockerProcessFactoryTest.java index 215fb0bb2015..a4983c384227 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/DockerProcessFactoryTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/DockerProcessFactoryTest.java @@ -86,7 +86,7 @@ void testFileWriting() throws IOException, WorkerException { final DockerProcessFactory processFactory = new DockerProcessFactory(new WorkerConfigs(new EnvConfigs()), workspaceRoot, null, null, null); processFactory.create("tester", "job_id", 0, jobRoot, BUSYBOX, false, false, ImmutableMap.of("config.json", "{\"data\": 2}"), "echo hi", - new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of(), Map.of(), Map.of()); + new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), null, Map.of(), Map.of(), Map.of()); assertEquals( Jsons.jsonNode(ImmutableMap.of("data", 2)), @@ -125,6 +125,7 @@ void testEnvMapSet() throws IOException, WorkerException, InterruptedException { Map.of(), "/bin/sh", workerConfigs.getResourceRequirements(), + null, Map.of(), Map.of(), Map.of(), @@ -158,6 +159,7 @@ private void waitForDockerToInitialize(final ProcessFactory processFactory, fina Map.of(), "/bin/sh", workerConfigs.getResourceRequirements(), + null, Map.of(), Map.of(), Map.of(), diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java index 41f1822bbd45..58f4b32496d2 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java @@ -19,6 +19,7 @@ import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.ActorCatalogWithUpdatedAt; import io.airbyte.config.ActorDefinitionResourceRequirements; +import io.airbyte.config.AllowedHosts; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; import io.airbyte.config.FieldSelectionData; @@ -160,7 +161,10 @@ public static StandardSourceDefinition buildStandardSourceDefinition(final Recor : record.get(ACTOR_DEFINITION.RELEASE_DATE).toString()) .withResourceRequirements(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS) == null ? null - : Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class)); + : Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class)) + .withAllowedHosts(record.get(ACTOR_DEFINITION.ALLOWED_HOSTS) == null + ? null + : Jsons.deserialize(record.get(ACTOR_DEFINITION.ALLOWED_HOSTS).data(), AllowedHosts.class)); } public static StandardDestinationDefinition buildStandardDestinationDefinition(final Record record) { @@ -193,7 +197,10 @@ public static StandardDestinationDefinition buildStandardDestinationDefinition(f : null) .withResourceRequirements(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS) == null ? null - : Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class)); + : Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class)) + .withAllowedHosts(record.get(ACTOR_DEFINITION.ALLOWED_HOSTS) == null + ? null + : Jsons.deserialize(record.get(ACTOR_DEFINITION.ALLOWED_HOSTS).data(), AllowedHosts.class)); } public static DestinationOAuthParameter buildDestinationOAuthParameter(final Record record) { diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index e7dfad1c3c12..43f0e84ee34c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1343,6 +1343,9 @@ icon: postgresql.svg sourceType: database releaseStage: generally_available + allowedHosts: + hosts: + - "{{config.host}}" - name: Postmark App sourceDefinitionId: cde75ca1-1e28-4a0f-85bb-90c546de9f1f dockerRepository: airbyte/source-postmarkapp diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index 4e2496f3735e..c73f06f4e0cb 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -123,6 +123,7 @@ public Optional runJob() throws Exception { sourceLauncherConfig.getDockerImage(), processFactory, syncInput.getSourceResourceRequirements(), + sourceLauncherConfig.getAllowedHosts(), useIsolatedPool, featureFlags); @@ -133,6 +134,7 @@ public Optional runJob() throws Exception { destinationLauncherConfig.getDockerImage(), processFactory, syncInput.getDestinationResourceRequirements(), + destinationLauncherConfig.getAllowedHosts(), useIsolatedPool, featureFlags); diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java index cff00930528a..1fce78c5bd17 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -1250,14 +1250,14 @@ protected void assertNamespaceNormalization(final String testCaseId, private ConnectorSpecification runSpec() throws WorkerException { return convertProtocolObject( new DefaultGetSpecWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags())) + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false, new EnvVariableFeatureFlags())) .run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot).getSpec(), ConnectorSpecification.class); } protected StandardCheckConnectionOutput runCheck(final JsonNode config) throws WorkerException { return new DefaultCheckConnectionWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()), + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false, new EnvVariableFeatureFlags()), mConnectorConfigUpdater) .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot) .getCheckConnection(); @@ -1267,7 +1267,7 @@ protected StandardCheckConnectionOutput.Status runCheckWithCatchedException( final JsonNode config) { try { final StandardCheckConnectionOutput standardCheckConnectionOutput = new DefaultCheckConnectionWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()), + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false, new EnvVariableFeatureFlags()), mConnectorConfigUpdater) .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot) .getCheckConnection(); @@ -1280,7 +1280,7 @@ protected StandardCheckConnectionOutput.Status runCheckWithCatchedException( protected AirbyteDestination getDestination() { return new DefaultAirbyteDestination( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags())); + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false, new EnvVariableFeatureFlags())); } protected void runSyncAndVerifyStateOutput(final JsonNode config, diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java index f3d1ae3598bc..58b3f2b3baf0 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java @@ -158,7 +158,7 @@ public void tearDownInternal() throws Exception { protected ConnectorSpecification runSpec() throws WorkerException { final io.airbyte.protocol.models.ConnectorSpecification spec = new DefaultGetSpecWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), null, false, new EnvVariableFeatureFlags())) .run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot).getSpec(); return convertProtocolObject(spec, ConnectorSpecification.class); @@ -166,7 +166,7 @@ protected ConnectorSpecification runSpec() throws WorkerException { protected StandardCheckConnectionOutput runCheck() throws Exception { return new DefaultCheckConnectionWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), null, false, new EnvVariableFeatureFlags()), mConnectorConfigUpdater) .run(new StandardCheckConnectionInput().withConnectionConfiguration(getConfig()), jobRoot).getCheckConnection(); @@ -174,7 +174,7 @@ protected StandardCheckConnectionOutput runCheck() throws Exception { protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exception { return new DefaultCheckConnectionWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), null, false, new EnvVariableFeatureFlags()), mConnectorConfigUpdater) .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot).getCheckConnection().getStatus().toString(); @@ -183,7 +183,7 @@ protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exce protected UUID runDiscover() throws Exception { final UUID toReturn = new DefaultDiscoverCatalogWorker( mConfigRepository, - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), null, false, new EnvVariableFeatureFlags()), mConnectorConfigUpdater) .run(new StandardDiscoverCatalogInput().withSourceId(SOURCE_ID.toString()).withConnectionConfiguration(getConfig()), jobRoot) @@ -218,7 +218,7 @@ protected List runRead(final ConfiguredAirbyteCatalog catalog, f final var featureFlags = new EnvVariableFeatureFlags(); final AirbyteSource source = new DefaultAirbyteSource( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), null, false, featureFlags), featureFlags); final List messages = new ArrayList<>(); @@ -271,9 +271,9 @@ private AirbyteSource prepareAirbyteSource(final ResourceRequirements resourceRe final var workerConfigs = new WorkerConfigs(new EnvConfigs()); final var featureFlags = new EnvVariableFeatureFlags(); final var integrationLauncher = resourceRequirements == null - ? new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + ? new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), null, false, featureFlags) - : new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, resourceRequirements, false, featureFlags); + : new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, resourceRequirements, null, false, featureFlags); return new DefaultAirbyteSource(integrationLauncher, featureFlags); } diff --git a/airbyte-worker-models/build.gradle b/airbyte-worker-models/build.gradle index 57796fd0246f..d6aee586fe39 100644 --- a/airbyte-worker-models/build.gradle +++ b/airbyte-worker-models/build.gradle @@ -5,6 +5,10 @@ plugins { id 'com.github.eirnym.js2p' version '1.0' } +dependencies { + implementation project(path: ':airbyte-config:config-models') +} + jsonSchema2Pojo { sourceType = SourceType.YAMLSCHEMA source = files("${sourceSets.main.output.resourcesDir}/workers_models") diff --git a/airbyte-worker-models/src/main/resources/workers_models/IntegrationLauncherConfig.yaml b/airbyte-worker-models/src/main/resources/workers_models/IntegrationLauncherConfig.yaml index e8035d05b6ab..59582b993e36 100644 --- a/airbyte-worker-models/src/main/resources/workers_models/IntegrationLauncherConfig.yaml +++ b/airbyte-worker-models/src/main/resources/workers_models/IntegrationLauncherConfig.yaml @@ -28,3 +28,6 @@ properties: existingJavaType: io.airbyte.commons.version.Version isCustomConnector: type: boolean + allowedHosts: + type: object + existingJavaType: io.airbyte.config.AllowedHosts diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index 0d752d189b83..f95af4a371d8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -140,6 +140,7 @@ private CheckedSupplier launcherConfig.getDockerImage(), processFactory, workerConfigs.getResourceRequirements(), + launcherConfig.getAllowedHosts(), launcherConfig.getIsCustomConnector(), featureFlags); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index 5d5e5837acff..d21cc2cfe4fb 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -130,7 +130,8 @@ private CheckedSupplier return () -> { final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), - processFactory, workerConfigs.getResourceRequirements(), launcherConfig.getIsCustomConnector(), featureFlags); + processFactory, workerConfigs.getResourceRequirements(), launcherConfig.getAllowedHosts(), launcherConfig.getIsCustomConnector(), + featureFlags); final AirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, launcherConfig.getProtocolVersion(), Optional.empty()); final ConnectorConfigUpdater connectorConfigUpdater = diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 91ad3cd162eb..692b3ad290c2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -19,6 +19,7 @@ import io.airbyte.config.JobSyncConfig; import io.airbyte.config.ResetSourceConfiguration; import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.persistence.ConfigRepository; @@ -94,6 +95,9 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { final UUID connectionId = UUID.fromString(job.getScope()); final StandardSync standardSync = configRepository.getStandardSync(connectionId); + final StandardSourceDefinition sourceDefinition = + configRepository.getStandardSourceDefinition(standardSync.getSourceId()); + final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromDestination(standardSync.getDestinationId()); final String destinationNormalizationDockerImage = destinationDefinition.getNormalizationConfig() != null @@ -109,7 +113,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withAttemptId((long) attempt) .withDockerImage(config.getSourceDockerImage()) .withProtocolVersion(config.getSourceProtocolVersion()) - .withIsCustomConnector(config.getIsSourceCustomConnector()); + .withIsCustomConnector(config.getIsSourceCustomConnector()) + .withAllowedHosts(sourceDefinition.getAllowedHosts()); final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig() .withJobId(String.valueOf(jobId)) @@ -119,7 +124,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withIsCustomConnector(config.getIsDestinationCustomConnector()) .withNormalizationDockerImage(destinationNormalizationDockerImage) .withSupportsDbt(destinationDefinition.getSupportsDbt()) - .withNormalizationIntegrationType(normalizationIntegrationType); + .withNormalizationIntegrationType(normalizationIntegrationType) + .withAllowedHosts(destinationDefinition.getAllowedHosts()); final StandardSyncInput syncInput = new StandardSyncInput() .withNamespaceDefinition(config.getNamespaceDefinition()) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index a57fa6fa2833..c078e5672ad4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -118,6 +118,7 @@ private CheckedSupplier, Exception> launcherConfig.getDockerImage(), processFactory, workerConfigs.getResourceRequirements(), + launcherConfig.getAllowedHosts(), launcherConfig.getIsCustomConnector(), featureFlags); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index ffd8c744e702..e3eeec37dea6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -279,6 +279,7 @@ private CheckedSupplier, Exception> sourceLauncherConfig.getDockerImage(), processFactory, syncInput.getSourceResourceRequirements(), + sourceLauncherConfig.getAllowedHosts(), sourceLauncherConfig.getIsCustomConnector(), featureFlags); final IntegrationLauncher destinationLauncher = new AirbyteIntegrationLauncher( @@ -287,6 +288,7 @@ private CheckedSupplier, Exception> destinationLauncherConfig.getDockerImage(), processFactory, syncInput.getDestinationResourceRequirements(), + destinationLauncherConfig.getAllowedHosts(), destinationLauncherConfig.getIsCustomConnector(), featureFlags); diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java index 582004aef227..10a9be316f34 100644 --- a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java @@ -431,7 +431,7 @@ private Process getProcess(final String entrypoint, final Map fi private Process getProcess(final Map customLabels, final String entrypoint, final Map files) throws WorkerException { return processFactory.create("tester", "some-id", 0, Path.of("/tmp/job-root"), "busybox:latest", false, false, files, entrypoint, - DEFAULT_RESOURCE_REQUIREMENTS, customLabels, Collections.emptyMap(), Collections.emptyMap()); + DEFAULT_RESOURCE_REQUIREMENTS, null, customLabels, Collections.emptyMap(), Collections.emptyMap()); } private static Set getOpenPorts(final int count) { From de7cb40076f74363791bedecadcd2350cdd6b982 Mon Sep 17 00:00:00 2001 From: evantahler Date: Fri, 20 Jan 2023 12:06:04 -0800 Subject: [PATCH 2/7] fix loading method --- .../scheduling/activities/GenerateInputActivityImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 692b3ad290c2..697c2cd1a725 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -96,7 +96,7 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { final StandardSync standardSync = configRepository.getStandardSync(connectionId); final StandardSourceDefinition sourceDefinition = - configRepository.getStandardSourceDefinition(standardSync.getSourceId()); + configRepository.getSourceDefinitionFromSource(standardSync.getSourceId()); final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromDestination(standardSync.getDestinationId()); From 378517830438207a426ba83370d50bff8f438ea8 Mon Sep 17 00:00:00 2001 From: evantahler Date: Fri, 20 Jan 2023 12:06:14 -0800 Subject: [PATCH 3/7] github allowed hosts --- .../init/src/main/resources/seed/source_definitions.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 43f0e84ee34c..a2d714e6284a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -602,6 +602,9 @@ icon: github.svg sourceType: api releaseStage: generally_available + allowedHosts: + hosts: + - api.github.com - name: Gitlab sourceDefinitionId: 5e6175e5-68e1-4c17-bff9-56103bbb0d80 dockerRepository: airbyte/source-gitlab From b40404cab226f470f820f5c091463c0c0038e2fc Mon Sep 17 00:00:00 2001 From: evantahler Date: Fri, 20 Jan 2023 15:35:24 -0800 Subject: [PATCH 4/7] StringSubstitutor --- .../resources/seed/source_definitions.yaml | 2 +- .../activities/GenerateInputActivityImpl.java | 46 ++++++++++++++++++- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index a2d714e6284a..ca09c155dc20 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1348,7 +1348,7 @@ releaseStage: generally_available allowedHosts: hosts: - - "{{config.host}}" + - "${host}" - name: Postmark App sourceDefinitionId: cde75ca1-1e28-4a0f-85bb-90c546de9f1f dockerRepository: airbyte/source-postmarkapp diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 697c2cd1a725..d5dbfaead5f4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -8,12 +8,16 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JsonNode; import datadog.trace.api.Trace; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.TemporalWorkflowUtils; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; +import io.airbyte.config.AllowedHosts; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; @@ -31,9 +35,13 @@ import io.airbyte.workers.WorkerConstants; import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.commons.text.StringSubstitutor; @Singleton @Requires(env = WorkerMode.CONTROL_PLANE) @@ -114,7 +122,7 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withDockerImage(config.getSourceDockerImage()) .withProtocolVersion(config.getSourceProtocolVersion()) .withIsCustomConnector(config.getIsSourceCustomConnector()) - .withAllowedHosts(sourceDefinition.getAllowedHosts()); + .withAllowedHosts(replaceHostsWithConfigValues(sourceDefinition.getAllowedHosts(), config.getSourceConfiguration())); final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig() .withJobId(String.valueOf(jobId)) @@ -125,7 +133,7 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withNormalizationDockerImage(destinationNormalizationDockerImage) .withSupportsDbt(destinationDefinition.getSupportsDbt()) .withNormalizationIntegrationType(normalizationIntegrationType) - .withAllowedHosts(destinationDefinition.getAllowedHosts()); + .withAllowedHosts(replaceHostsWithConfigValues(destinationDefinition.getAllowedHosts(), config.getDestinationConfiguration())); final StandardSyncInput syncInput = new StandardSyncInput() .withNamespaceDefinition(config.getNamespaceDefinition()) @@ -160,4 +168,38 @@ public GeneratedJobInput getSyncWorkflowInputWithAttemptNumber(final SyncInputWi input.getJobId())); } + /* + * Note: This method does not interact with the secret manager. It is currently expected that all + * replacement values are not secret (e.g. host vs password). This also assumed that the JSON config + * for a connector has a single depth. + */ + private AllowedHosts replaceHostsWithConfigValues(AllowedHosts allowedHosts, JsonNode config) throws IOException { + if (allowedHosts == null || allowedHosts.getHosts() == null) { + return null; + } + + final List resolvedHosts = new ArrayList<>(); + final Map valuesMap = new HashMap<>(); + final JsonParser jsonParser = config.traverse(); + while (!jsonParser.isClosed()) { + if (jsonParser.nextToken() == JsonToken.FIELD_NAME) { + final String key = jsonParser.getCurrentName(); + if (config.get(key) != null) { + valuesMap.put(key, config.get(key).textValue()); + } + } + } + + // I substitute strings with ${} access, e.g. "The ${animal} jumped over the ${target}" with {animal: fox, target: fence} + final StringSubstitutor sub = new StringSubstitutor(valuesMap); + final List hosts = allowedHosts.getHosts(); + for (String host : hosts) { + resolvedHosts.add(sub.replace(host)); + } + + final AllowedHosts resolvedAllowedHosts = new AllowedHosts(); + resolvedAllowedHosts.setHosts(resolvedHosts); + return resolvedAllowedHosts; + } + } From 59a7d14707c3b1e10d319aa6006ed2ba03061e77 Mon Sep 17 00:00:00 2001 From: evantahler Date: Fri, 20 Jan 2023 15:43:37 -0800 Subject: [PATCH 5/7] lint --- .../scheduling/activities/GenerateInputActivityImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index d5dbfaead5f4..2440dc25ff9d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -190,7 +190,8 @@ private AllowedHosts replaceHostsWithConfigValues(AllowedHosts allowedHosts, Jso } } - // I substitute strings with ${} access, e.g. "The ${animal} jumped over the ${target}" with {animal: fox, target: fence} + // I substitute strings with ${} access, e.g. "The ${animal} jumped over the ${target}" with + // {animal: fox, target: fence} final StringSubstitutor sub = new StringSubstitutor(valuesMap); final List hosts = allowedHosts.getHosts(); for (String host : hosts) { From 8d2dc359ece0365749e747ce41d4ba498525bae8 Mon Sep 17 00:00:00 2001 From: evantahler Date: Mon, 23 Jan 2023 10:27:49 -0800 Subject: [PATCH 6/7] ConfigReplacerTest class + tests --- .../activities/GenerateInputActivityImpl.java | 50 ++------------- .../airbyte/workers/utils/ConfigReplacer.java | 63 ++++++++++++++++++ .../workers/utils/ConfigReplacerTest.java | 64 +++++++++++++++++++ 3 files changed, 132 insertions(+), 45 deletions(-) create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/utils/ConfigReplacer.java create mode 100644 airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 2440dc25ff9d..80f0c489242a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -8,16 +8,12 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.JsonNode; import datadog.trace.api.Trace; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.TemporalWorkflowUtils; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; -import io.airbyte.config.AllowedHosts; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; @@ -33,15 +29,12 @@ import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; +import io.airbyte.workers.utils.ConfigReplacer; import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.commons.text.StringSubstitutor; @Singleton @Requires(env = WorkerMode.CONTROL_PLANE) @@ -59,6 +52,8 @@ public GenerateInputActivityImpl(final JobPersistence jobPersistence, @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { + final ConfigReplacer configReplacer = new ConfigReplacer(); + try { ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); final long jobId = input.getJobId(); @@ -122,7 +117,7 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withDockerImage(config.getSourceDockerImage()) .withProtocolVersion(config.getSourceProtocolVersion()) .withIsCustomConnector(config.getIsSourceCustomConnector()) - .withAllowedHosts(replaceHostsWithConfigValues(sourceDefinition.getAllowedHosts(), config.getSourceConfiguration())); + .withAllowedHosts(configReplacer.getAllowedHosts(sourceDefinition.getAllowedHosts(), config.getSourceConfiguration())); final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig() .withJobId(String.valueOf(jobId)) @@ -133,7 +128,7 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withNormalizationDockerImage(destinationNormalizationDockerImage) .withSupportsDbt(destinationDefinition.getSupportsDbt()) .withNormalizationIntegrationType(normalizationIntegrationType) - .withAllowedHosts(replaceHostsWithConfigValues(destinationDefinition.getAllowedHosts(), config.getDestinationConfiguration())); + .withAllowedHosts(configReplacer.getAllowedHosts(destinationDefinition.getAllowedHosts(), config.getDestinationConfiguration())); final StandardSyncInput syncInput = new StandardSyncInput() .withNamespaceDefinition(config.getNamespaceDefinition()) @@ -168,39 +163,4 @@ public GeneratedJobInput getSyncWorkflowInputWithAttemptNumber(final SyncInputWi input.getJobId())); } - /* - * Note: This method does not interact with the secret manager. It is currently expected that all - * replacement values are not secret (e.g. host vs password). This also assumed that the JSON config - * for a connector has a single depth. - */ - private AllowedHosts replaceHostsWithConfigValues(AllowedHosts allowedHosts, JsonNode config) throws IOException { - if (allowedHosts == null || allowedHosts.getHosts() == null) { - return null; - } - - final List resolvedHosts = new ArrayList<>(); - final Map valuesMap = new HashMap<>(); - final JsonParser jsonParser = config.traverse(); - while (!jsonParser.isClosed()) { - if (jsonParser.nextToken() == JsonToken.FIELD_NAME) { - final String key = jsonParser.getCurrentName(); - if (config.get(key) != null) { - valuesMap.put(key, config.get(key).textValue()); - } - } - } - - // I substitute strings with ${} access, e.g. "The ${animal} jumped over the ${target}" with - // {animal: fox, target: fence} - final StringSubstitutor sub = new StringSubstitutor(valuesMap); - final List hosts = allowedHosts.getHosts(); - for (String host : hosts) { - resolvedHosts.add(sub.replace(host)); - } - - final AllowedHosts resolvedAllowedHosts = new AllowedHosts(); - resolvedAllowedHosts.setHosts(resolvedHosts); - return resolvedAllowedHosts; - } - } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/utils/ConfigReplacer.java b/airbyte-workers/src/main/java/io/airbyte/workers/utils/ConfigReplacer.java new file mode 100644 index 000000000000..356ad5fadec8 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/utils/ConfigReplacer.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.utils; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.config.AllowedHosts; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.text.StringSubstitutor; + +/** + * This class takes values from a connector's configuration and uses it to fill in template-string + * values. It substitutes strings with ${} access, e.g. "The ${animal} jumped over the ${target}" + * with {animal: fox, target: fence} + */ +public class ConfigReplacer { + + /** + * Note: This method does not interact with the secret manager. It is currently expected that all + * replacement values are not secret (e.g. host vs password). This also assumed that the JSON config + * for a connector has a single depth. + */ + public AllowedHosts getAllowedHosts(AllowedHosts allowedHosts, JsonNode config) throws IOException { + if (allowedHosts == null || allowedHosts.getHosts() == null) { + return null; + } + + final List resolvedHosts = new ArrayList<>(); + final Map valuesMap = new HashMap<>(); + final JsonParser jsonParser = config.traverse(); + while (!jsonParser.isClosed()) { + if (jsonParser.nextToken() == JsonToken.FIELD_NAME) { + final String key = jsonParser.getCurrentName(); + if (config.get(key) != null) { + valuesMap.put(key, config.get(key).textValue()); + } + } + } + + final StringSubstitutor sub = new StringSubstitutor(valuesMap); + final List hosts = allowedHosts.getHosts(); + for (String host : hosts) { + final String replacedString = sub.replace(host); + if (replacedString.contains("${")) { + throw new IOException( + "The allowed host value, '" + host + "', is expecting an interpolation value from the connector's configuration, but none is present"); + } + resolvedHosts.add(replacedString); + } + + final AllowedHosts resolvedAllowedHosts = new AllowedHosts(); + resolvedAllowedHosts.setHosts(resolvedHosts); + return resolvedAllowedHosts; + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java new file mode 100644 index 000000000000..225f0ab6925f --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.utils; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.config.AllowedHosts; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +public class ConfigReplacerTest { + + final ConfigReplacer replacer = new ConfigReplacer(); + final ObjectMapper mapper = new ObjectMapper(); + + @Test + void getAllowedHostsGeneralTest() throws IOException { + final AllowedHosts allowedHosts = new AllowedHosts(); + final List hosts = new ArrayList(); + hosts.add("localhost"); + hosts.add("static-site.com"); + hosts.add("${host}"); + hosts.add("${subdomain}.vendor.com"); + allowedHosts.setHosts(hosts); + + final List expected = new ArrayList<>(); + expected.add("localhost"); + expected.add("static-site.com"); + expected.add("foo.com"); + expected.add("account.vendor.com"); + + final String configJson = "{\"host\": \"foo.com\", \"subdomain\": \"account\", \"password\": \"abc123\"}"; + final JsonNode config = mapper.readValue(configJson, JsonNode.class); + final AllowedHosts response = replacer.getAllowedHosts(allowedHosts, config); + + assertThat(response.getHosts()).isEqualTo(expected); + } + + @Test() + void getAllowedHostsMissingValue() throws IOException { + final AllowedHosts allowedHosts = new AllowedHosts(); + final List hosts = new ArrayList(); + hosts.add("${subdomain}.vendor.com"); + allowedHosts.setHosts(hosts); + + final String configJson = "{\"password\": \"abc123\"}"; + final JsonNode config = mapper.readValue(configJson, JsonNode.class); + + try { + replacer.getAllowedHosts(allowedHosts, config); + throw new RuntimeException("should not get here"); + } catch (Exception e) { + assertThat(e).hasMessage( + "The allowed host value, '${subdomain}.vendor.com', is expecting an interpolation value from the connector's configuration, but none is present"); + } + } + +} From 1c11f9ce50633bb5d8147beb003cc8bc9158f1aa Mon Sep 17 00:00:00 2001 From: evantahler Date: Mon, 23 Jan 2023 10:34:45 -0800 Subject: [PATCH 7/7] tests are private --- .../test/java/io/airbyte/workers/utils/ConfigReplacerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java index 225f0ab6925f..4ef50ebb48a3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/utils/ConfigReplacerTest.java @@ -14,7 +14,7 @@ import java.util.List; import org.junit.jupiter.api.Test; -public class ConfigReplacerTest { +class ConfigReplacerTest { final ConfigReplacer replacer = new ConfigReplacer(); final ObjectMapper mapper = new ObjectMapper();