Skip to content

Commit

Permalink
pass feature flag client env variable to orchestrator (#22523)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfsiega-airbyte authored Feb 7, 2023
1 parent f45718d commit 823adfb
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import datadog.trace.api.Trace;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
Expand Down Expand Up @@ -222,17 +223,20 @@ public Process write(final Path jobRoot,

private Map<String, String> getWorkerMetadata() {
final Configs configs = new EnvConfigs();
return Map.of(
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName,
WorkerEnvConstants.WORKER_JOB_ID, jobId,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces(),
EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit(),
EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest(),
EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey());
return Maps.newHashMap(
ImmutableMap.<String, String>builder()
.put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName)
.put(WorkerEnvConstants.WORKER_JOB_ID, jobId)
.put(WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt))
.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()))
.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()))
.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()))
.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces())
.put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit())
.put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest())
.put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey())
.put(EnvConfigs.FEATURE_FLAG_CLIENT, configs.getFeatureFlagClient())
.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.Configs;
Expand Down Expand Up @@ -55,17 +56,21 @@ class AirbyteIntegrationLauncherTest {
private static final FeatureFlags FEATURE_FLAGS = new EnvVariableFeatureFlags();
private static final Configs CONFIGS = new EnvConfigs();

private static final Map<String, String> JOB_METADATA = Map.of(
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE,
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(FEATURE_FLAGS.useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces(),
EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest(),
EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit(),
EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey());
private static final Map<String, String> JOB_METADATA =
Maps.newHashMap(
ImmutableMap.<String, String>builder()
.put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE)
.put(WorkerEnvConstants.WORKER_JOB_ID, JOB_ID)
.put(WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT))
.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(FEATURE_FLAGS.useStreamCapableState()))
.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema()))
.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection()))
.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces())
.put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit())
.put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest())
.put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey())
.put(EnvConfigs.FEATURE_FLAG_CLIENT, CONFIGS.getFeatureFlagClient())
.build());

private WorkerConfigs workerConfigs;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,13 @@ public interface Configs {
*/
String getLaunchDarklyKey();

/**
* Get the type of feature flag client to use.
*
* @return
*/
String getFeatureFlagClient();

/**
* Defines a default map of environment variables to use for any launched job containers. The
* expected format is a JSON encoded String -> String map. Make sure to escape properly. Defaults to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ public class EnvConfigs implements Configs {
public static final int DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE = 14;
public static final String LAUNCHDARKLY_KEY = "LAUNCHDARKLY_KEY";

public static final String FEATURE_FLAG_CLIENT = "FEATURE_FLAG_CLIENT";

private final Function<String, String> getEnv;
private final Supplier<Set<String>> getAllEnvKeys;
private final LogConfigs logConfigs;
Expand Down Expand Up @@ -854,6 +856,11 @@ public String getLaunchDarklyKey() {
return getEnvOrDefault(LAUNCHDARKLY_KEY, "");
}

@Override
public String getFeatureFlagClient() {
return getEnvOrDefault(FEATURE_FLAG_CLIENT, "");
}

/**
* There are two types of environment variables available to the job container:
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_EMAIL_ENV_VAR, dataPlaneServiceAccountEmail);

final Configs configs = new EnvConfigs();
environmentVariables.put(EnvConfigs.FEATURE_FLAG_CLIENT, configs.getFeatureFlagClient());
environmentVariables.put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey());
environmentVariables.put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit());
environmentVariables.put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest());
Expand Down

0 comments on commit 823adfb

Please sign in to comment.