diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/CatalogMigrationV1Helper.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/CatalogMigrationV1Helper.java index f5a1f7836872..e34df750d65a 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/CatalogMigrationV1Helper.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/CatalogMigrationV1Helper.java @@ -118,4 +118,102 @@ private static boolean hasV0DataType(final JsonNode schema) { return false; } + /** + * Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected + * + * @param configuredAirbyteCatalog to migrate + */ + public static void downgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) { + if (containsV1DataTypes(configuredAirbyteCatalog)) { + downgradeSchema(configuredAirbyteCatalog); + } + } + + /** + * Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected + * + * @param airbyteCatalog to migrate + */ + public static void downgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) { + if (containsV1DataTypes(airbyteCatalog)) { + downgradeSchema(airbyteCatalog); + } + } + + /** + * Performs an in-place migration of the schema from v1 to v0 + * + * @param configuredAirbyteCatalog to migrate + */ + private static void downgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) { + for (final var stream : configuredAirbyteCatalog.getStreams()) { + SchemaMigrationV1.downgradeSchema(stream.getStream().getJsonSchema()); + } + } + + /** + * Performs an in-place migration of the schema from v1 to v0 + * + * @param airbyteCatalog to migrate + */ + private static void downgradeSchema(final AirbyteCatalog airbyteCatalog) { + for (final var stream : airbyteCatalog.getStreams()) { + SchemaMigrationV1.downgradeSchema(stream.getJsonSchema()); + } + } + + /** + * Returns true if catalog contains v1 data types + */ + private static boolean containsV1DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) { + if (configuredAirbyteCatalog == null) { + return false; + } + + return configuredAirbyteCatalog + .getStreams() + .stream().findFirst() + .map(ConfiguredAirbyteStream::getStream) + .map(CatalogMigrationV1Helper::streamContainsV1DataTypes) + .orElse(false); + } + + /** + * Returns true if catalog contains v1 data types + */ + private static boolean containsV1DataTypes(final AirbyteCatalog airbyteCatalog) { + if (airbyteCatalog == null) { + return false; + } + + return airbyteCatalog + .getStreams() + .stream().findFirst() + .map(CatalogMigrationV1Helper::streamContainsV1DataTypes) + .orElse(false); + } + + private static boolean streamContainsV1DataTypes(final AirbyteStream airbyteStream) { + if (airbyteStream == null || airbyteStream.getJsonSchema() == null) { + return false; + } + return hasV1DataType(airbyteStream.getJsonSchema()); + } + + /** + * Performs of search of a v0 data type node, returns true at the first node found. + */ + private static boolean hasV1DataType(final JsonNode schema) { + if (SchemaMigrationV1.isPrimitiveReferenceTypeDeclaration(schema)) { + return true; + } + + for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) { + if (hasV1DataType(subSchema)) { + return true; + } + } + return false; + } + } diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/SchemaMigrationV1.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/SchemaMigrationV1.java index 5a4e5fcbab00..286bce93c84c 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/SchemaMigrationV1.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/SchemaMigrationV1.java @@ -65,7 +65,7 @@ static boolean isPrimitiveTypeDeclaration(final JsonNode schema) { * Detects any schema that looks like a reference type declaration, e.g.: { "$ref": * "WellKnownTypes.json...." } or { "oneOf": [{"$ref": "..."}, {"type": "object"}] } */ - private static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) { + static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) { if (!schema.isObject()) { // Non-object schemas (i.e. true/false) never need to be modified return false; diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java index 23af41e4e717..55d162320888 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java @@ -98,7 +98,9 @@ public static StandardSync buildStandardSync(final Record record, final Listv1) - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(configuredAirbyteCatalog); return configuredAirbyteCatalog; } @@ -249,7 +251,9 @@ public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Rec public static AirbyteCatalog parseAirbyteCatalog(final String airbyteCatalogString) { final AirbyteCatalog airbyteCatalog = Jsons.deserialize(airbyteCatalogString, AirbyteCatalog.class); // On-the-fly migration of persisted data types related objects (protocol v0->v1) - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(airbyteCatalog); return airbyteCatalog; } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index e3bbdc7362bd..3dbc9398e796 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -931,9 +931,13 @@ private static JobConfig parseJobConfigFromString(final String jobConfigString) final JobConfig jobConfig = Jsons.deserialize(jobConfigString, JobConfig.class); // On-the-fly migration of persisted data types related objects (protocol v0->v1) if (jobConfig.getConfigType() == ConfigType.SYNC && jobConfig.getSync() != null) { - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog()); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog()); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog()); } else if (jobConfig.getConfigType() == ConfigType.RESET_CONNECTION && jobConfig.getResetConnection() != null) { - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog()); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog()); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog()); } return jobConfig; } @@ -960,9 +964,13 @@ private static JobOutput parseJobOutputFromString(final String jobOutputString) final JobOutput jobOutput = Jsons.deserialize(jobOutputString, JobOutput.class); // On-the-fly migration of persisted data types related objects (protocol v0->v1) if (jobOutput.getOutputType() == OutputType.DISCOVER_CATALOG && jobOutput.getDiscoverCatalog() != null) { - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog()); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog()); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog()); } else if (jobOutput.getOutputType() == OutputType.SYNC && jobOutput.getSync() != null) { - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog()); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog()); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog()); } return jobOutput; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 54ab44f99edd..ddf1ca1c4ba0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -113,19 +113,19 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, // Check the version of normalization // We require at least version 0.3.0 to support data types v1. Using an older version would lead to - // all columns being typed as JSONB. We should fail before coercing the types into an unexpected - // form. + // all columns being typed as JSONB. If normalization is using an older version, fallback to using + // v0 data types. if (!normalizationSupportsV1DataTypes(destinationLauncherConfig)) { - throw new IllegalStateException("Normalization is too old, a version >=\"0.3.0\" is required but got \"" - + destinationLauncherConfig.getNormalizationDockerImage() + "\" instead"); - } + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(fullInput.getCatalog()); + } else { - // This should only be useful for syncs that started before the release that contained v1 migration. - // However, we lack the effective way to detect those syncs so this code should remain until we - // phase v0 out. - // Performance impact should be low considering the nature of the check compared to the time to run - // normalization. - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog()); + // This should only be useful for syncs that started before the release that contained v1 migration. + // However, we lack the effective way to detect those syncs so this code should remain until we + // phase v0 out. + // Performance impact should be low considering the nature of the check compared to the time to run + // normalization. + CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog()); + } final Supplier inputSupplier = () -> { airbyteConfigValidator.ensureAsRuntime(ConfigSchema.NORMALIZATION_INPUT, Jsons.jsonNode(fullInput)); @@ -173,8 +173,8 @@ static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig return normalizationVersion.greaterThanOrEqualTo(MINIMAL_VERSION_FOR_DATATYPES_V1); } catch (final IllegalArgumentException e) { // IllegalArgument here means that the version isn't in a semver format. - // The current behavior is to assume it supports v1 data types for dev purposes. - return true; + // The current behavior is to assume it supports v0 data types for dev purposes. + return false; } }