Skip to content

Commit

Permalink
Revert "Revert "Add normalization strict incremental feature flag (#2…
Browse files Browse the repository at this point in the history
…2514)" (#22612)" (#22619)

This reverts commit 8702cdf.
  • Loading branch information
benmoriceau authored Feb 9, 2023
1 parent 8702cdf commit b9609ab
Show file tree
Hide file tree
Showing 17 changed files with 167 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class OrchestratorConstants {
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA,
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION,
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES,
EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES,
EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG,
FEATURE_FLAG_CLIENT,
FEATURE_FLAG_PATH,
EnvConfigs.LAUNCHDARKLY_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ public Process write(final Path jobRoot,

private Map<String, String> getWorkerMetadata() {
final Configs configs = new EnvConfigs();
// We've managed to exceed the maximum number of parameters for Map.of(), so use a builder + convert
// back to hashmap
return Maps.newHashMap(
ImmutableMap.<String, String>builder()
.put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName)
Expand All @@ -232,6 +234,8 @@ private Map<String, String> getWorkerMetadata() {
.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()))
.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()))
.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces())
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces())
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag())
.put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit())
.put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest())
.put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class AirbyteIntegrationLauncherTest {
.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema()))
.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection()))
.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces())
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces())
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAGS.strictComparisonNormalizationTag())
.put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit())
.put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest())
.put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public class EnvVariableFeatureFlags implements FeatureFlags {

public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";

public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES";
public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG";

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));
Expand Down Expand Up @@ -64,6 +67,16 @@ public String fieldSelectionWorkspaces() {
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg);
}

@Override
public String strictComparisonNormalizationWorkspaces() {
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "", (arg) -> arg);
}

@Override
public String strictComparisonNormalizationTag() {
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison", (arg) -> arg);
}

// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
public <T> T getEnvOrDefault(final String key, final T defaultValue, final Function<String, T> parser) {
final String value = System.getenv(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,43 @@

package io.airbyte.commons.features;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FeatureFlagHelper {

public static boolean isFieldSelectionEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) {
final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces();
return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, "field selection")
|| featureFlags.applyFieldSelection();
}

public static boolean isStrictComparisonNormalizationEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) {
return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::strictComparisonNormalizationWorkspaces, workspaceId,
"strict comparison in normalization");
}

@VisibleForTesting
static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags,
final Function<FeatureFlags, String> flagRetriever,
final UUID workspaceId,
final String context) {
final String workspaceIdsString = flagRetriever.apply(featureFlags);
final Set<UUID> workspaceIds = new HashSet<>();
if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) {
for (final String id : workspaceIdsString.split(",")) {
try {
workspaceIds.add(UUID.fromString(id));
} catch (final IllegalArgumentException e) {
log.warn("Malformed workspace id for field selection: {}", id);
log.warn("Malformed workspace id for {}: {}", context, id);
}
}
}
if (workspaceId != null && workspaceIds.contains(workspaceId)) {
return true;
}

return featureFlags.applyFieldSelection();
return workspaceId != null && workspaceIds.contains(workspaceId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,18 @@ public interface FeatureFlags {
*/
String fieldSelectionWorkspaces();

/**
* Get the workspaces allow-listed for strict incremental comparison in normalization. This takes
* precedence over the normalization version in destination_definitions.yaml.
*
* @return a comma-separated list of workspace ids where strict incremental comparison should be
* enabled in normalization.
*/
String strictComparisonNormalizationWorkspaces();

/**
* @return The Docker image tag representing the normalization version with strict-comparison
*/
String strictComparisonNormalizationTag();

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,47 @@ void beforeEach() {
void isFieldSelectionEnabledForWorkspaceWithEmptyString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn("");

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null));
}

@Test
void isFieldSelectionEnabledForNullWorkspaceWithEmptyString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn("");

assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, null, null));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithSpaceString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(" ");

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithNullString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(null);

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndAMatch() {
final UUID workspaceId = UUID.randomUUID();
final UUID randomId = UUID.randomUUID();
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId.toString() + "," + workspaceId.toString());
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId + "," + workspaceId);

assertTrue(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));
assertTrue(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndNoMatch() {
final UUID workspaceId = UUID.randomUUID();
final UUID randomId1 = UUID.randomUUID();
final UUID randomId2 = UUID.randomUUID();
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1.toString() + "," + randomId2.toString());
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1 + "," + randomId2);

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,10 @@ public interface Configs {

String getFieldSelectionWorkspaces();

String getStrictComparisonNormalizationWorkspaces();

String getStrictComparisonNormalizationTag();

enum TrackingStrategy {
SEGMENT,
LOGGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ public class EnvConfigs implements Configs {
private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";
private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";

private static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES";
private static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG";

public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(),
AIRBYTE_ROLE, EnvConfigs::getAirbyteRole,
Expand Down Expand Up @@ -1152,6 +1155,16 @@ public String getFieldSelectionWorkspaces() {
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "");
}

@Override
public String getStrictComparisonNormalizationWorkspaces() {
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "");
}

@Override
public String getStrictComparisonNormalizationTag() {
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison");
}

@Override
public int getActivityNumberOfAttempt() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ properties:
type: object
description: optional resource requirements to run sync workers
existingJavaType: io.airbyte.config.ResourceRequirements
workspaceId:
description: The id of the workspace associated with this sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema()));
environmentVariables.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, Boolean.toString(featureFlags.applyFieldSelection()));
environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());
environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES,
featureFlags.strictComparisonNormalizationWorkspaces());
environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag());
environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts);
environmentVariables.put(CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR, controlPlaneAuthEndpoint);
environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR, dataPlaneServiceAccountCredentialsPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.commons.features.FeatureFlagHelper;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.protocol.migrations.v1.CatalogMigrationV1Helper;
Expand Down Expand Up @@ -50,8 +52,10 @@
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;

@Singleton
@Slf4j
public class NormalizationActivityImpl implements NormalizationActivity {

private final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig;
Expand All @@ -62,13 +66,17 @@ public class NormalizationActivityImpl implements NormalizationActivity {
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final String airbyteVersion;
private final FeatureFlags featureFlags;
private final Integer serverPort;
private final AirbyteConfigValidator airbyteConfigValidator;
private final TemporalUtils temporalUtils;
private final ResourceRequirements normalizationResourceRequirements;
private final AirbyteApiClient airbyteApiClient;

private final static Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0");
// This constant is not currently in use. We'll need to bump it when we try releasing v1 again.
private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0");
private static final String V1_NORMALIZATION_MINOR_VERSION = "3";
private static final String NON_STRICT_COMPARISON_IMAGE_TAG = "0.2.25";

public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig,
@Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs,
Expand All @@ -78,6 +86,7 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
@Value("${airbyte.version}") final String airbyteVersion,
final FeatureFlags featureFlags,
@Value("${micronaut.server.port}") final Integer serverPort,
final AirbyteConfigValidator airbyteConfigValidator,
final TemporalUtils temporalUtils,
Expand All @@ -91,6 +100,7 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.airbyteVersion = airbyteVersion;
this.featureFlags = featureFlags;
this.serverPort = serverPort;
this.airbyteConfigValidator = airbyteConfigValidator;
this.temporalUtils = temporalUtils;
Expand All @@ -111,11 +121,17 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig,
final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);

if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, input.getWorkspaceId())) {
log.info("Using strict comparison normalization");
replaceNormalizationImageTag(destinationLauncherConfig, featureFlags.strictComparisonNormalizationTag());
}

// Check the version of normalization
// We require at least version 0.3.0 to support data types v1. Using an older version would lead to
// all columns being typed as JSONB. If normalization is using an older version, fallback to using
// v0 data types.
if (!normalizationSupportsV1DataTypes(destinationLauncherConfig)) {
log.info("Using protocol v0");
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(fullInput.getCatalog());
} else {

Expand All @@ -124,6 +140,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig,
// phase v0 out.
// Performance impact should be low considering the nature of the check compared to the time to run
// normalization.
log.info("Using protocol v1");
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog());
}

Expand All @@ -134,6 +151,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig,

final CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Exception> workerFactory;

log.info("Using normalization: " + destinationLauncherConfig.getNormalizationDockerImage());
if (containerOrchestratorConfig.isPresent()) {
workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig,
() -> context);
Expand Down Expand Up @@ -162,22 +180,33 @@ public NormalizationInput generateNormalizationInput(final StandardSyncInput syn
return new NormalizationInput()
.withDestinationConfiguration(syncInput.getDestinationConfiguration())
.withCatalog(syncOutput.getOutputCatalog())
.withResourceRequirements(normalizationResourceRequirements);
.withResourceRequirements(normalizationResourceRequirements)
.withWorkspaceId(syncInput.getWorkspaceId());
}

@VisibleForTesting
static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig destinationLauncherConfig) {
try {
final String[] normalizationImage = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2);
final Version normalizationVersion = new Version(normalizationImage[1]);
return normalizationVersion.greaterThanOrEqualTo(MINIMAL_VERSION_FOR_DATATYPES_V1);
final Version normalizationVersion = new Version(getNormalizationImageTag(destinationLauncherConfig));
return V1_NORMALIZATION_MINOR_VERSION.equals(normalizationVersion.getMinorVersion());
} catch (final IllegalArgumentException e) {
// IllegalArgument here means that the version isn't in a semver format.
// The current behavior is to assume it supports v0 data types for dev purposes.
return false;
}
}

private static String getNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig) {
return destinationLauncherConfig.getNormalizationDockerImage().split(":", 2)[1];
}

@VisibleForTesting
static void replaceNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig, final String newTag) {
final String[] imageComponents = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2);
imageComponents[1] = newTag;
destinationLauncherConfig.setNormalizationDockerImage(String.join(":", imageComponents));
}

private CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Exception> getLegacyWorkerFactory(
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig) {
Expand Down
Loading

0 comments on commit b9609ab

Please sign in to comment.