diff --git a/airbyte-bootloader/src/main/resources/application.yml b/airbyte-bootloader/src/main/resources/application.yml index 9709b4b88e5f..38ed361e5e0c 100644 --- a/airbyte-bootloader/src/main/resources/application.yml +++ b/airbyte-bootloader/src/main/resources/application.yml @@ -22,7 +22,7 @@ airbyte: target: range: min-version: ${AIRBYTE_PROTOCOL_VERSION_MIN:0.0.0} - max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:1.0.0} + max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:0.3.0} secret: persistence: ${SECRET_PERSISTENCE:TESTING_CONFIG_DB_TABLE} store: diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/MigrationContainer.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/MigrationContainer.java index 6a317b03bd80..663f487a88c0 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/MigrationContainer.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/MigrationContainer.java @@ -16,7 +16,9 @@ public class MigrationContainer { private final List migrationsToRegister; private final SortedMap migrations = new TreeMap<>(); - private String mostRecentMajorVersion = ""; + + // mostRecentMajorVersion defaults to v0 as no migration is required + private String mostRecentMajorVersion = "0"; public MigrationContainer(final List migrations) { this.migrationsToRegister = migrations; diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/AirbyteMessageMigrationV1.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/AirbyteMessageMigrationV1.java index 5150e2ac0e48..13ebbddfd26e 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/AirbyteMessageMigrationV1.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/AirbyteMessageMigrationV1.java @@ -24,13 +24,13 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.JsonSchemaReferenceTypes; import io.airbyte.validation.json.JsonSchemaValidator; -import jakarta.inject.Singleton; import java.util.Iterator; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -@Singleton +// Disable V1 Migration, uncomment to re-enable +// @Singleton public class AirbyteMessageMigrationV1 implements AirbyteMessageMigration { private final JsonSchemaValidator validator; diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/CatalogMigrationV1Helper.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/CatalogMigrationV1Helper.java index f5a1f7836872..e34df750d65a 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/CatalogMigrationV1Helper.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/CatalogMigrationV1Helper.java @@ -118,4 +118,102 @@ private static boolean hasV0DataType(final JsonNode schema) { return false; } + /** + * Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected + * + * @param configuredAirbyteCatalog to migrate + */ + public static void downgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) { + if (containsV1DataTypes(configuredAirbyteCatalog)) { + downgradeSchema(configuredAirbyteCatalog); + } + } + + /** + * Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected + * + * @param airbyteCatalog to migrate + */ + public static void downgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) { + if (containsV1DataTypes(airbyteCatalog)) { + downgradeSchema(airbyteCatalog); + } + } + + /** + * Performs an in-place migration of the schema from v1 to v0 + * + * @param configuredAirbyteCatalog to migrate + */ + private static void downgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) { + for (final var stream : configuredAirbyteCatalog.getStreams()) { + SchemaMigrationV1.downgradeSchema(stream.getStream().getJsonSchema()); + } + } + + /** + * Performs an in-place migration of the schema from v1 to v0 + * + * @param airbyteCatalog to migrate + */ + private static void downgradeSchema(final AirbyteCatalog airbyteCatalog) { + for (final var stream : airbyteCatalog.getStreams()) { + SchemaMigrationV1.downgradeSchema(stream.getJsonSchema()); + } + } + + /** + * Returns true if catalog contains v1 data types + */ + private static boolean containsV1DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) { + if (configuredAirbyteCatalog == null) { + return false; + } + + return configuredAirbyteCatalog + .getStreams() + .stream().findFirst() + .map(ConfiguredAirbyteStream::getStream) + .map(CatalogMigrationV1Helper::streamContainsV1DataTypes) + .orElse(false); + } + + /** + * Returns true if catalog contains v1 data types + */ + private static boolean containsV1DataTypes(final AirbyteCatalog airbyteCatalog) { + if (airbyteCatalog == null) { + return false; + } + + return airbyteCatalog + .getStreams() + .stream().findFirst() + .map(CatalogMigrationV1Helper::streamContainsV1DataTypes) + .orElse(false); + } + + private static boolean streamContainsV1DataTypes(final AirbyteStream airbyteStream) { + if (airbyteStream == null || airbyteStream.getJsonSchema() == null) { + return false; + } + return hasV1DataType(airbyteStream.getJsonSchema()); + } + + /** + * Performs of search of a v0 data type node, returns true at the first node found. + */ + private static boolean hasV1DataType(final JsonNode schema) { + if (SchemaMigrationV1.isPrimitiveReferenceTypeDeclaration(schema)) { + return true; + } + + for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) { + if (hasV1DataType(subSchema)) { + return true; + } + } + return false; + } + } diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/ConfiguredAirbyteCatalogMigrationV1.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/ConfiguredAirbyteCatalogMigrationV1.java index e400b31724b7..f6dc897a64d8 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/ConfiguredAirbyteCatalogMigrationV1.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/ConfiguredAirbyteCatalogMigrationV1.java @@ -11,9 +11,9 @@ import io.airbyte.commons.version.Version; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import jakarta.inject.Singleton; -@Singleton +// Disable V1 Migration, uncomment to re-enable +// @Singleton public class ConfiguredAirbyteCatalogMigrationV1 implements ConfiguredAirbyteCatalogMigration { diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/SchemaMigrationV1.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/SchemaMigrationV1.java index 5a4e5fcbab00..286bce93c84c 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/SchemaMigrationV1.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/SchemaMigrationV1.java @@ -65,7 +65,7 @@ static boolean isPrimitiveTypeDeclaration(final JsonNode schema) { * Detects any schema that looks like a reference type declaration, e.g.: { "$ref": * "WellKnownTypes.json...." } or { "oneOf": [{"$ref": "..."}, {"type": "object"}] } */ - private static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) { + static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) { if (!schema.isObject()) { // Non-object schemas (i.e. true/false) never need to be modified return false; diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0Deserializer.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0Deserializer.java index f280c508e417..85272cef9cbe 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0Deserializer.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0Deserializer.java @@ -5,7 +5,7 @@ package io.airbyte.commons.protocol.serde; import io.airbyte.commons.version.AirbyteProtocolVersion; -import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage; import jakarta.inject.Singleton; @Singleton diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0Serializer.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0Serializer.java index f68ce7be46a4..badfee63f298 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0Serializer.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0Serializer.java @@ -5,7 +5,7 @@ package io.airbyte.commons.protocol.serde; import io.airbyte.commons.version.AirbyteProtocolVersion; -import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage; import jakarta.inject.Singleton; @Singleton diff --git a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/MigratorsMicronautTest.java b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/MigratorsMicronautTest.java index 1c52ed7c8a45..281df4b898b7 100644 --- a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/MigratorsMicronautTest.java +++ b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/MigratorsMicronautTest.java @@ -22,7 +22,7 @@ class MigratorsMicronautTest { // This should contain the list of all the supported majors of the airbyte protocol except the most // recent one since the migrations themselves are keyed on the lower version. - final Set SUPPORTED_VERSIONS = Set.of("0"); + final Set SUPPORTED_VERSIONS = Set.of(); @Test void testAirbyteMessageMigrationInjection() { diff --git a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0SerDeTest.java b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0SerDeTest.java index 4f0a7965209e..c812d41f97e1 100644 --- a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0SerDeTest.java +++ b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0SerDeTest.java @@ -7,9 +7,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.ConnectorSpecification; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.ConnectorSpecification; import java.net.URI; import java.net.URISyntaxException; import org.junit.jupiter.api.Test; diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java index 9c887cfc8d27..0d6defce415e 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java @@ -12,8 +12,6 @@ import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; import io.airbyte.commons.protocol.ConfiguredAirbyteCatalogMigrator; -import io.airbyte.commons.protocol.migrations.v1.AirbyteMessageMigrationV1; -import io.airbyte.commons.protocol.migrations.v1.ConfiguredAirbyteCatalogMigrationV1; import io.airbyte.commons.protocol.serde.AirbyteMessageV0Deserializer; import io.airbyte.commons.protocol.serde.AirbyteMessageV0Serializer; import io.airbyte.commons.protocol.serde.AirbyteMessageV1Deserializer; @@ -45,10 +43,12 @@ void beforeEach() { List.of(new AirbyteMessageV0Serializer(), new AirbyteMessageV1Serializer()))); serDeProvider.initialize(); final AirbyteMessageMigrator airbyteMessageMigrator = new AirbyteMessageMigrator( - List.of(new AirbyteMessageMigrationV1())); + // TODO once data types v1 is re-enabled, this test should contain the migration + List.of(/* new AirbyteMessageMigrationV1() */)); airbyteMessageMigrator.initialize(); final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator = new ConfiguredAirbyteCatalogMigrator( - List.of(new ConfiguredAirbyteCatalogMigrationV1())); + // TODO once data types v1 is re-enabled, this test should contain the migration + List.of(/* new ConfiguredAirbyteCatalogMigrationV1() */)); configuredAirbyteCatalogMigrator.initialize(); migratorFactory = spy(new AirbyteProtocolVersionedMigratorFactory(airbyteMessageMigrator, configuredAirbyteCatalogMigrator)); } diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index d1506ffa91df..c448270d6c7a 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -218,7 +218,7 @@ public class EnvConfigs implements Configs { private static final long DEFAULT_MAX_SYNC_WORKERS = 5; private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5; private static final String DEFAULT_NETWORK = "host"; - private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MAX = new Version("1.0.0"); + private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MAX = new Version("0.3.0"); private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MIN = new Version("0.0.0"); private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA"; private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION"; diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java index 23af41e4e717..55d162320888 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java @@ -98,7 +98,9 @@ public static StandardSync buildStandardSync(final Record record, final Listv1) - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(configuredAirbyteCatalog); return configuredAirbyteCatalog; } @@ -249,7 +251,9 @@ public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Rec public static AirbyteCatalog parseAirbyteCatalog(final String airbyteCatalogString) { final AirbyteCatalog airbyteCatalog = Jsons.deserialize(airbyteCatalogString, AirbyteCatalog.class); // On-the-fly migration of persisted data types related objects (protocol v0->v1) - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(airbyteCatalog); return airbyteCatalog; } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java index 1de35c619a83..872020e46089 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java @@ -161,7 +161,7 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException, configRepository.writeSourceConnectionNoSecrets(source); final AirbyteCatalog actorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING)); - final AirbyteCatalog expectedActorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING_V1)); + final AirbyteCatalog expectedActorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING)); configRepository.writeActorCatalogFetchEvent( actorCatalog, source.getSourceId(), DOCKER_IMAGE_TAG, CONFIG_HASH); @@ -201,7 +201,8 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException, assertEquals(expectedActorCatalog, Jsons.object(catalogNewConfig.get().getCatalog(), AirbyteCatalog.class)); final int catalogDbEntry2 = database.query(ctx -> ctx.selectCount().from(ACTOR_CATALOG)).fetchOne().into(int.class); - assertEquals(2, catalogDbEntry2); + // TODO this should be 2 once we re-enable datatypes v1 + assertEquals(1, catalogDbEntry2); } @Test @@ -484,13 +485,16 @@ void testGetStandardSyncUsingOperation() throws IOException { } private List copyWithV1Types(final List syncs) { - return syncs.stream() - .map(standardSync -> { - final StandardSync copiedStandardSync = Jsons.deserialize(Jsons.serialize(standardSync), StandardSync.class); - copiedStandardSync.setCatalog(MockData.getConfiguredCatalogWithV1DataTypes()); - return copiedStandardSync; - }) - .toList(); + return syncs; + // TODO adjust with data types feature flag testing + // return syncs.stream() + // .map(standardSync -> { + // final StandardSync copiedStandardSync = Jsons.deserialize(Jsons.serialize(standardSync), + // StandardSync.class); + // copiedStandardSync.setCatalog(MockData.getConfiguredCatalogWithV1DataTypes()); + // return copiedStandardSync; + // }) + // .toList(); } private void assertSyncsMatch(final List expectedSyncs, final List actualSyncs) { diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index e3bbdc7362bd..3dbc9398e796 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -931,9 +931,13 @@ private static JobConfig parseJobConfigFromString(final String jobConfigString) final JobConfig jobConfig = Jsons.deserialize(jobConfigString, JobConfig.class); // On-the-fly migration of persisted data types related objects (protocol v0->v1) if (jobConfig.getConfigType() == ConfigType.SYNC && jobConfig.getSync() != null) { - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog()); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog()); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog()); } else if (jobConfig.getConfigType() == ConfigType.RESET_CONNECTION && jobConfig.getResetConnection() != null) { - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog()); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog()); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog()); } return jobConfig; } @@ -960,9 +964,13 @@ private static JobOutput parseJobOutputFromString(final String jobOutputString) final JobOutput jobOutput = Jsons.deserialize(jobOutputString, JobOutput.class); // On-the-fly migration of persisted data types related objects (protocol v0->v1) if (jobOutput.getOutputType() == OutputType.DISCOVER_CATALOG && jobOutput.getDiscoverCatalog() != null) { - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog()); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog()); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog()); } else if (jobOutput.getOutputType() == OutputType.SYNC && jobOutput.getSync() != null) { - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog()); + // TODO feature flag this for data types rollout + // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog()); + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog()); } return jobOutput; } diff --git a/airbyte-server/src/main/resources/application.yml b/airbyte-server/src/main/resources/application.yml index f8598499b43d..1c58a63ef546 100644 --- a/airbyte-server/src/main/resources/application.yml +++ b/airbyte-server/src/main/resources/application.yml @@ -81,7 +81,7 @@ airbyte: root: ${WORKSPACE_ROOT} protocol: min-version: ${AIRBYTE_PROTOCOL_VERSION_MIN:0.0.0} - max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:1.0.0} + max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:0.3.0} temporal: cloud: diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index 049bfaf7ee43..cb66fe314fa2 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -585,19 +585,6 @@ public List retrieveSourceRecords(final Database database, final Strin return database.query(context -> context.fetch(String.format("SELECT * FROM %s;", table))) .stream() .map(Record::intoMap) - .map(rec -> { - // The protocol requires converting numbers to strings. source-postgres does that internally, - // but we're querying the DB directly, so we have to do it manually. - final Map stringifiedNumbers = new HashMap<>(); - for (final String key : rec.keySet()) { - Object o = rec.get(key); - if (o instanceof Number) { - o = o.toString(); - } - stringifiedNumbers.put(key, o); - } - return stringifiedNumbers; - }) .map(Jsons::jsonNode) .collect(Collectors.toList()); } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index b9878502aff5..13f1cd13daf2 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -46,6 +46,7 @@ import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionState; import io.airbyte.api.client.model.generated.ConnectionStatus; +import io.airbyte.api.client.model.generated.DataType; import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.DestinationDefinitionIdWithWorkspaceId; import io.airbyte.api.client.model.generated.DestinationDefinitionRead; @@ -319,8 +320,8 @@ void testDiscoverSourceSchema() throws ApiException { final AirbyteCatalog actual = testHarness.discoverSourceSchema(sourceId); final Map> fields = ImmutableMap.of( - COLUMN_ID, ImmutableMap.of(REF, INTEGER_REFERENCE), - COLUMN_NAME, ImmutableMap.of(REF, STRING_REFERENCE)); + COLUMN_ID, ImmutableMap.of(TYPE, DataType.NUMBER.getValue(), "airbyte_type", "integer"), + COLUMN_NAME, ImmutableMap.of(TYPE, DataType.STRING.getValue())); final JsonNode jsonSchema = Jsons.jsonNode(ImmutableMap.builder() .put(TYPE, "object") .put("properties", fields) @@ -573,8 +574,8 @@ void testIncrementalDedupeSync() throws Exception { // add new records and run again. final Database source = testHarness.getSourceDatabase(); final List expectedRawRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME); - expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, "sherif").build())); - expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "7").put(COLUMN_NAME, "chris").build())); + expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "sherif").build())); + expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 7).put(COLUMN_NAME, "chris").build())); source.query(ctx -> ctx.execute("UPDATE id_and_name SET id=6 WHERE name='sherif'")); source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(7, 'chris')")); // retrieve latest snapshot of source records after modifications; the deduplicated table in @@ -627,7 +628,7 @@ void testIncrementalSync() throws Exception { final Database source = testHarness.getSourceDatabase(); // get contents of source before mutating records. final List expectedRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME); - expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, GERALT).build())); + expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, GERALT).build())); // add a new record source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')")); // mutate a record that was already synced with out updating its cursor value. if we are actually @@ -925,7 +926,7 @@ void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Except final Database sourceDatabase = testHarness.getSourceDatabase(); // get contents of source before mutating records. final List expectedRecords = testHarness.retrieveSourceRecords(sourceDatabase, STREAM_NAME); - expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, GERALT).build())); + expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, GERALT).build())); // add a new record sourceDatabase.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')")); // mutate a record that was already synced with out updating its cursor value. if we are actually @@ -1226,9 +1227,9 @@ void testIncrementalSyncMultipleStreams() throws Exception { testHarness.retrieveSourceRecords(source, STAGING_SCHEMA_NAME + "." + COOL_EMPLOYEES_TABLE_NAME); final List expectedRecordsAwesomePeople = testHarness.retrieveSourceRecords(source, STAGING_SCHEMA_NAME + "." + AWESOME_PEOPLE_TABLE_NAME); - expectedRecordsIdAndName.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, GERALT).build())); - expectedRecordsCoolEmployees.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, GERALT).build())); - expectedRecordsAwesomePeople.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "3").put(COLUMN_NAME, GERALT).build())); + expectedRecordsIdAndName.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, GERALT).build())); + expectedRecordsCoolEmployees.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, GERALT).build())); + expectedRecordsAwesomePeople.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 3).put(COLUMN_NAME, GERALT).build())); // add a new record to each table source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')")); source.query(ctx -> ctx.execute("INSERT INTO staging.cool_employees(id, name) VALUES(6, 'geralt')")); @@ -1463,8 +1464,8 @@ void testIncrementalDedupeSyncRemoveOneColumn() throws Exception { source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'mike')")); source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(7, 'chris')")); // The expected new raw records should only have the ID column. - expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").build())); - expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "7").build())); + expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).build())); + expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 7).build())); final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 0127c1ad0e30..807d6a7c73a0 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -194,11 +194,11 @@ void testIncrementalCdcSync(final TestInfo testInfo) throws Exception { // new value and an updated_at time corresponding to this update query source.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2")); expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( - Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, "geralt").build()), + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()), beforeFirstUpdate, Optional.empty())); expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( - Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "2").put(COLUMN_NAME, "yennefer").build()), + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 2).put(COLUMN_NAME, "yennefer").build()), beforeFirstUpdate, Optional.empty())); @@ -206,11 +206,11 @@ void testIncrementalCdcSync(final TestInfo testInfo) throws Exception { source.query(ctx -> ctx.execute("INSERT INTO color_palette(id, color) VALUES(4, 'yellow')")); source.query(ctx -> ctx.execute("UPDATE color_palette SET color='purple' WHERE id=2")); expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher( - Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "4").put(COLUMN_COLOR, "yellow").build()), + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 4).put(COLUMN_COLOR, "yellow").build()), beforeFirstUpdate, Optional.empty())); expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher( - Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "2").put(COLUMN_COLOR, "purple").build()), + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 2).put(COLUMN_COLOR, "purple").build()), beforeFirstUpdate, Optional.empty())); @@ -298,7 +298,7 @@ void testDeleteRecordCdcSync(final TestInfo testInfo) throws Exception { source.query(ctx -> ctx.execute("DELETE FROM id_and_name WHERE id=1")); final Map deletedRecordMap = new HashMap<>(); - deletedRecordMap.put(COLUMN_ID, "1"); + deletedRecordMap.put(COLUMN_ID, 1); deletedRecordMap.put(COLUMN_NAME, null); expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( Jsons.jsonNode(deletedRecordMap), @@ -431,13 +431,13 @@ void testPartialResetFromStreamSelection(final TestInfo testInfo) throws Excepti final Instant beforeInsert = Instant.now(); source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')")); expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( - Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, "geralt").build()), + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()), beforeInsert, Optional.empty())); source.query(ctx -> ctx.execute("INSERT INTO color_palette(id, color) VALUES(4, 'yellow')")); expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher( - Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "4").put(COLUMN_COLOR, "yellow").build()), + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 4).put(COLUMN_COLOR, "yellow").build()), beforeInsert, Optional.empty())); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 54ab44f99edd..ddf1ca1c4ba0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -113,19 +113,19 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, // Check the version of normalization // We require at least version 0.3.0 to support data types v1. Using an older version would lead to - // all columns being typed as JSONB. We should fail before coercing the types into an unexpected - // form. + // all columns being typed as JSONB. If normalization is using an older version, fallback to using + // v0 data types. if (!normalizationSupportsV1DataTypes(destinationLauncherConfig)) { - throw new IllegalStateException("Normalization is too old, a version >=\"0.3.0\" is required but got \"" - + destinationLauncherConfig.getNormalizationDockerImage() + "\" instead"); - } + CatalogMigrationV1Helper.downgradeSchemaIfNeeded(fullInput.getCatalog()); + } else { - // This should only be useful for syncs that started before the release that contained v1 migration. - // However, we lack the effective way to detect those syncs so this code should remain until we - // phase v0 out. - // Performance impact should be low considering the nature of the check compared to the time to run - // normalization. - CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog()); + // This should only be useful for syncs that started before the release that contained v1 migration. + // However, we lack the effective way to detect those syncs so this code should remain until we + // phase v0 out. + // Performance impact should be low considering the nature of the check compared to the time to run + // normalization. + CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog()); + } final Supplier inputSupplier = () -> { airbyteConfigValidator.ensureAsRuntime(ConfigSchema.NORMALIZATION_INPUT, Jsons.jsonNode(fullInput)); @@ -173,8 +173,8 @@ static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig return normalizationVersion.greaterThanOrEqualTo(MINIMAL_VERSION_FOR_DATATYPES_V1); } catch (final IllegalArgumentException e) { // IllegalArgument here means that the version isn't in a semver format. - // The current behavior is to assume it supports v1 data types for dev purposes. - return true; + // The current behavior is to assume it supports v0 data types for dev purposes. + return false; } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java index 9c589a456fe9..aff7afb1ab57 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java @@ -17,7 +17,7 @@ void checkNormalizationDataTypesSupportFromVersionString() { Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.3.0"))); Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.4.1"))); Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("dev"))); - Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("protocolv1"))); + Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("protocolv1"))); } private IntegrationLauncherConfig withNormalizationVersion(final String version) {