From e2146b9457544198ba20337fa6ee56c95bf92397 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 20 Dec 2022 11:35:02 -0800 Subject: [PATCH 1/8] Extract MigrationContainer from AirbyteMessageMigrator --- .../protocol/AirbyteMessageMigrator.java | 77 ++----------- .../commons/protocol/MigrationContainer.java | 102 ++++++++++++++++++ .../protocol/AirbyteMessageMigratorTest.java | 11 +- 3 files changed, 120 insertions(+), 70 deletions(-) create mode 100644 airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/MigrationContainer.java diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java index 1906e143c33a..ef7e7eebe743 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java @@ -9,12 +9,8 @@ import io.airbyte.commons.version.Version; import jakarta.annotation.PostConstruct; import jakarta.inject.Singleton; -import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; /** * AirbyteProtocol Message Migrator @@ -25,21 +21,15 @@ @Singleton public class AirbyteMessageMigrator { - private final List> migrationsToRegister; - private final SortedMap> migrations = new TreeMap<>(); - private String mostRecentMajorVersion = ""; + private final MigrationContainer migrationContainer; public AirbyteMessageMigrator(List> migrations) { - migrationsToRegister = migrations; - } - - public AirbyteMessageMigrator() { - this(Collections.emptyList()); + migrationContainer = new MigrationContainer(migrations); } @PostConstruct public void initialize() { - migrationsToRegister.forEach(this::registerMigration); + migrationContainer.initialize(); } /** @@ -47,16 +37,7 @@ public void initialize() { * required migrations */ public PreviousVersion downgrade(final CurrentVersion message, final Version target) { - if (target.getMajorVersion().equals(mostRecentMajorVersion)) { - return (PreviousVersion) message; - } - - Object result = message; - Object[] selectedMigrations = selectMigrations(target).toArray(); - for (int i = selectedMigrations.length; i > 0; --i) { - result = applyDowngrade((AirbyteMessageMigration) selectedMigrations[i - 1], result); - } - return (PreviousVersion) result; + return migrationContainer.downgrade(message, target, AirbyteMessageMigrator::applyDowngrade); } /** @@ -64,65 +45,29 @@ public PreviousVersion downgrade(final Current * migrations */ public CurrentVersion upgrade(final PreviousVersion message, final Version source) { - if (source.getMajorVersion().equals(mostRecentMajorVersion)) { - return (CurrentVersion) message; - } - - Object result = message; - for (var migration : selectMigrations(source)) { - result = applyUpgrade(migration, result); - } - return (CurrentVersion) result; + return migrationContainer.upgrade(message, source, AirbyteMessageMigrator::applyUpgrade); } public Version getMostRecentVersion() { - return new Version(mostRecentMajorVersion, "0", "0"); - } - - private Collection> selectMigrations(final Version version) { - final Collection> results = migrations.tailMap(version.getMajorVersion()).values(); - if (results.isEmpty()) { - throw new RuntimeException("Unsupported migration version " + version.serialize()); - } - return results; + return migrationContainer.getMostRecentVersion(); } // Helper function to work around type casting - private PreviousVersion applyDowngrade(final AirbyteMessageMigration migration, - final Object message) { + private static PreviousVersion applyDowngrade(final AirbyteMessageMigration migration, + final Object message) { return migration.downgrade((CurrentVersion) message); } // Helper function to work around type casting - private CurrentVersion applyUpgrade(final AirbyteMessageMigration migration, - final Object message) { + private static CurrentVersion applyUpgrade(final AirbyteMessageMigration migration, + final Object message) { return migration.upgrade((PreviousVersion) message); } - /** - * Store migration in a sorted map key by the major of the lower version of the migration. - * - * The goal is to be able to retrieve the list of migrations to apply to get to/from a given - * version. We are only keying on the lower version because the right side (most recent version of - * the migration range) is always current version. - */ - @VisibleForTesting - void registerMigration(final AirbyteMessageMigration migration) { - final String key = migration.getPreviousVersion().getMajorVersion(); - if (!migrations.containsKey(key)) { - migrations.put(key, migration); - if (migration.getCurrentVersion().getMajorVersion().compareTo(mostRecentMajorVersion) > 0) { - mostRecentMajorVersion = migration.getCurrentVersion().getMajorVersion(); - } - } else { - throw new RuntimeException("Trying to register a duplicated migration " + migration.getClass().getName()); - } - } - // Used for inspection of the injection @VisibleForTesting Set getMigrationKeys() { - return migrations.keySet(); + return migrationContainer.getMigrationKeys(); } } diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/MigrationContainer.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/MigrationContainer.java new file mode 100644 index 000000000000..63fe3edbd5c9 --- /dev/null +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/MigrationContainer.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.protocol; + +import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration; +import io.airbyte.commons.version.Version; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.BiFunction; + +public class MigrationContainer { + + private final List> migrationsToRegister; + private final SortedMap> migrations = new TreeMap<>(); + private String mostRecentMajorVersion = ""; + + public MigrationContainer(final List> migrations) { + this.migrationsToRegister = migrations; + } + + public void initialize() { + migrationsToRegister.forEach(this::registerMigration); + } + + public Version getMostRecentVersion() { + return new Version(mostRecentMajorVersion, "0", "0"); + } + + /** + * Downgrade a message from the most recent version to the target version by chaining all the + * required migrations + */ + public PreviousVersion downgrade(final CurrentVersion message, + final Version target, + final BiFunction, Object, Object> applyDowngrade) { + if (target.getMajorVersion().equals(mostRecentMajorVersion)) { + return (PreviousVersion) message; + } + + Object result = message; + Object[] selectedMigrations = selectMigrations(target).toArray(); + for (int i = selectedMigrations.length; i > 0; --i) { + result = applyDowngrade.apply((AirbyteMessageMigration) selectedMigrations[i - 1], result); + } + return (PreviousVersion) result; + } + + /** + * Upgrade a message from the source version to the most recent version by chaining all the required + * migrations + */ + public CurrentVersion upgrade(final PreviousVersion message, + final Version source, + final BiFunction, Object, Object> applyUpgrade) { + if (source.getMajorVersion().equals(mostRecentMajorVersion)) { + return (CurrentVersion) message; + } + + Object result = message; + for (var migration : selectMigrations(source)) { + result = applyUpgrade.apply(migration, result); + } + return (CurrentVersion) result; + } + + public Collection> selectMigrations(final Version version) { + final Collection> results = migrations.tailMap(version.getMajorVersion()).values(); + if (results.isEmpty()) { + throw new RuntimeException("Unsupported migration version " + version.serialize()); + } + return results; + } + + /** + * Store migration in a sorted map key by the major of the lower version of the migration. + * + * The goal is to be able to retrieve the list of migrations to apply to get to/from a given + * version. We are only keying on the lower version because the right side (most recent version of + * the migration range) is always current version. + */ + private void registerMigration(final AirbyteMessageMigration migration) { + final String key = migration.getPreviousVersion().getMajorVersion(); + if (!migrations.containsKey(key)) { + migrations.put(key, migration); + if (migration.getCurrentVersion().getMajorVersion().compareTo(mostRecentMajorVersion) > 0) { + mostRecentMajorVersion = migration.getCurrentVersion().getMajorVersion(); + } + } else { + throw new RuntimeException("Trying to register a duplicated migration " + migration.getClass().getName()); + } + } + + public Set getMigrationKeys() { + return migrations.keySet(); + } + +} diff --git a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorTest.java b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorTest.java index c609b2f27b51..5e703af1186d 100644 --- a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorTest.java +++ b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorTest.java @@ -9,6 +9,7 @@ import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration; import io.airbyte.commons.version.Version; +import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -76,9 +77,9 @@ public Version getCurrentVersion() { @BeforeEach void beforeEach() { - migrator = new AirbyteMessageMigrator(); - migrator.registerMigration(new Migrate0to1()); - migrator.registerMigration(new Migrate1to2()); + migrator = new AirbyteMessageMigrator( + List.of(new Migrate0to1(), new Migrate1to2())); + migrator.initialize(); } @Test @@ -127,7 +128,9 @@ void testUnsupportedUpgradeShouldFailExplicitly() { @Test void testRegisterCollisionsShouldFail() { assertThrows(RuntimeException.class, () -> { - migrator.registerMigration(new Migrate0to1()); + migrator = new AirbyteMessageMigrator( + List.of(new Migrate0to1(), new Migrate1to2(), new Migrate0to1())); + migrator.initialize(); }); } From caa169252afed8753541390a4f2490caccb9fea3 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 20 Dec 2022 14:18:55 -0800 Subject: [PATCH 2/8] Add ConfiguredAirbyteCatalogMigrations --- .../protocol/AirbyteMessageMigrator.java | 7 +- .../ConfiguredAirbyteCatalogMigrator.java | 68 +++++++++++++++++++ .../migrations/AirbyteMessageMigration.java | 14 +--- .../ConfiguredAirbyteCatalogMigration.java | 25 +++++++ .../ConfiguredAirbyteCatalogMigrationV1.java | 37 ++++++++++ .../protocol/migrations/Migration.java | 21 ++++++ .../{ => migrations}/MigrationContainer.java | 23 +++---- .../AirbyteMessageMigratorMicronautTest.java | 28 -------- .../protocol/MigratorsMicronautTest.java | 37 ++++++++++ 9 files changed, 204 insertions(+), 56 deletions(-) create mode 100644 airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/ConfiguredAirbyteCatalogMigrator.java create mode 100644 airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigration.java create mode 100644 airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigrationV1.java create mode 100644 airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/Migration.java rename airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/{ => migrations}/MigrationContainer.java (75%) delete mode 100644 airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorMicronautTest.java create mode 100644 airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/MigratorsMicronautTest.java diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java index ef7e7eebe743..d661e1dafe7a 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java @@ -6,6 +6,7 @@ import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration; +import io.airbyte.commons.protocol.migrations.MigrationContainer; import io.airbyte.commons.version.Version; import jakarta.annotation.PostConstruct; import jakarta.inject.Singleton; @@ -21,10 +22,10 @@ @Singleton public class AirbyteMessageMigrator { - private final MigrationContainer migrationContainer; + private final MigrationContainer> migrationContainer; - public AirbyteMessageMigrator(List> migrations) { - migrationContainer = new MigrationContainer(migrations); + public AirbyteMessageMigrator(final List> migrations) { + migrationContainer = new MigrationContainer<>(migrations); } @PostConstruct diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/ConfiguredAirbyteCatalogMigrator.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/ConfiguredAirbyteCatalogMigrator.java new file mode 100644 index 000000000000..7c2c2111445d --- /dev/null +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/ConfiguredAirbyteCatalogMigrator.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.protocol; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.protocol.migrations.ConfiguredAirbyteCatalogMigration; +import io.airbyte.commons.protocol.migrations.MigrationContainer; +import io.airbyte.commons.version.Version; +import jakarta.annotation.PostConstruct; +import jakarta.inject.Singleton; +import java.util.List; +import java.util.Set; + +@Singleton +public class ConfiguredAirbyteCatalogMigrator { + + private final MigrationContainer> migrationContainer; + + public ConfiguredAirbyteCatalogMigrator(final List> migrations) { + migrationContainer = new MigrationContainer<>(migrations); + } + + @PostConstruct + public void initialize() { + migrationContainer.initialize(); + } + + /** + * Downgrade a message from the most recent version to the target version by chaining all the + * required migrations + */ + public PreviousVersion downgrade(final CurrentVersion message, final Version target) { + return migrationContainer.downgrade(message, target, ConfiguredAirbyteCatalogMigrator::applyDowngrade); + } + + /** + * Upgrade a message from the source version to the most recent version by chaining all the required + * migrations + */ + public CurrentVersion upgrade(final PreviousVersion message, final Version source) { + return migrationContainer.upgrade(message, source, ConfiguredAirbyteCatalogMigrator::applyUpgrade); + } + + public Version getMostRecentVersion() { + return migrationContainer.getMostRecentVersion(); + } + + // Helper function to work around type casting + private static PreviousVersion applyDowngrade(final ConfiguredAirbyteCatalogMigration migration, + final Object message) { + return migration.downgrade((CurrentVersion) message); + } + + // Helper function to work around type casting + private static CurrentVersion applyUpgrade(final ConfiguredAirbyteCatalogMigration migration, + final Object message) { + return migration.upgrade((PreviousVersion) message); + } + + // Used for inspection of the injection + @VisibleForTesting + Set getMigrationKeys() { + return migrationContainer.getMigrationKeys(); + } + +} diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigration.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigration.java index ce3746198bbe..42e2b30990e8 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigration.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigration.java @@ -4,15 +4,13 @@ package io.airbyte.commons.protocol.migrations; -import io.airbyte.commons.version.Version; - /** * AirbyteProtocol message migration interface * * @param The Old AirbyteMessage type * @param The New AirbyteMessage type */ -public interface AirbyteMessageMigration { +public interface AirbyteMessageMigration extends Migration { /** * Downgrades a message to from the new version to the old version @@ -30,14 +28,4 @@ public interface AirbyteMessageMigration { */ CurrentVersion upgrade(final PreviousVersion message); - /** - * The Old version, note that due to semver, the important piece of information is the Major. - */ - Version getPreviousVersion(); - - /** - * The New version, note that due to semver, the important piece of information is the Major. - */ - Version getCurrentVersion(); - } diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigration.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigration.java new file mode 100644 index 000000000000..49b99dfef7c5 --- /dev/null +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigration.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.protocol.migrations; + +public interface ConfiguredAirbyteCatalogMigration extends Migration { + + /** + * Downgrades a ConfiguredAirbyteCatalog to from the new version to the old version + * + * @param message: the ConfiguredAirbyteCatalog to downgrade + * @return the downgraded ConfiguredAirbyteCatalog + */ + PreviousVersion downgrade(final CurrentVersion message); + + /** + * Upgrades a ConfiguredAirbyteCatalog from the old version to the new version + * + * @param message: the ConfiguredAirbyteCatalog to upgrade + * @return the upgraded ConfiguredAirbyteCatalog + */ + CurrentVersion upgrade(final PreviousVersion message); + +} diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigrationV1.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigrationV1.java new file mode 100644 index 000000000000..c2293d740da3 --- /dev/null +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigrationV1.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.protocol.migrations; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.version.AirbyteProtocolVersion; +import io.airbyte.commons.version.Version; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import jakarta.inject.Singleton; + +@Singleton +public class ConfiguredAirbyteCatalogMigrationV1 + implements ConfiguredAirbyteCatalogMigration { + + @Override + public ConfiguredAirbyteCatalog downgrade(io.airbyte.protocol.models.ConfiguredAirbyteCatalog message) { + return Jsons.object(Jsons.jsonNode(message), ConfiguredAirbyteCatalog.class); + } + + @Override + public io.airbyte.protocol.models.ConfiguredAirbyteCatalog upgrade(ConfiguredAirbyteCatalog message) { + return Jsons.object(Jsons.jsonNode(message), io.airbyte.protocol.models.ConfiguredAirbyteCatalog.class); + } + + @Override + public Version getPreviousVersion() { + return AirbyteProtocolVersion.V0; + } + + @Override + public Version getCurrentVersion() { + return AirbyteProtocolVersion.V1; + } + +} diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/Migration.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/Migration.java new file mode 100644 index 000000000000..8a21f8a9ab2f --- /dev/null +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/Migration.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.protocol.migrations; + +import io.airbyte.commons.version.Version; + +public interface Migration { + + /** + * The Old version, note that due to semver, the important piece of information is the Major. + */ + Version getPreviousVersion(); + + /** + * The New version, note that due to semver, the important piece of information is the Major. + */ + Version getCurrentVersion(); + +} diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/MigrationContainer.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/MigrationContainer.java similarity index 75% rename from airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/MigrationContainer.java rename to airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/MigrationContainer.java index 63fe3edbd5c9..6a317b03bd80 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/MigrationContainer.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/MigrationContainer.java @@ -2,9 +2,8 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.protocol; +package io.airbyte.commons.protocol.migrations; -import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration; import io.airbyte.commons.version.Version; import java.util.Collection; import java.util.List; @@ -13,13 +12,13 @@ import java.util.TreeMap; import java.util.function.BiFunction; -public class MigrationContainer { +public class MigrationContainer { - private final List> migrationsToRegister; - private final SortedMap> migrations = new TreeMap<>(); + private final List migrationsToRegister; + private final SortedMap migrations = new TreeMap<>(); private String mostRecentMajorVersion = ""; - public MigrationContainer(final List> migrations) { + public MigrationContainer(final List migrations) { this.migrationsToRegister = migrations; } @@ -37,7 +36,7 @@ public Version getMostRecentVersion() { */ public PreviousVersion downgrade(final CurrentVersion message, final Version target, - final BiFunction, Object, Object> applyDowngrade) { + final BiFunction applyDowngrade) { if (target.getMajorVersion().equals(mostRecentMajorVersion)) { return (PreviousVersion) message; } @@ -45,7 +44,7 @@ public PreviousVersion downgrade(final Current Object result = message; Object[] selectedMigrations = selectMigrations(target).toArray(); for (int i = selectedMigrations.length; i > 0; --i) { - result = applyDowngrade.apply((AirbyteMessageMigration) selectedMigrations[i - 1], result); + result = applyDowngrade.apply((T) selectedMigrations[i - 1], result); } return (PreviousVersion) result; } @@ -56,7 +55,7 @@ public PreviousVersion downgrade(final Current */ public CurrentVersion upgrade(final PreviousVersion message, final Version source, - final BiFunction, Object, Object> applyUpgrade) { + final BiFunction applyUpgrade) { if (source.getMajorVersion().equals(mostRecentMajorVersion)) { return (CurrentVersion) message; } @@ -68,8 +67,8 @@ public CurrentVersion upgrade(final PreviousVe return (CurrentVersion) result; } - public Collection> selectMigrations(final Version version) { - final Collection> results = migrations.tailMap(version.getMajorVersion()).values(); + public Collection selectMigrations(final Version version) { + final Collection results = migrations.tailMap(version.getMajorVersion()).values(); if (results.isEmpty()) { throw new RuntimeException("Unsupported migration version " + version.serialize()); } @@ -83,7 +82,7 @@ public CurrentVersion upgrade(final PreviousVe * version. We are only keying on the lower version because the right side (most recent version of * the migration range) is always current version. */ - private void registerMigration(final AirbyteMessageMigration migration) { + private void registerMigration(final T migration) { final String key = migration.getPreviousVersion().getMajorVersion(); if (!migrations.containsKey(key)) { migrations.put(key, migration); diff --git a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorMicronautTest.java b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorMicronautTest.java deleted file mode 100644 index a2d5556d99cd..000000000000 --- a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorMicronautTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.commons.protocol; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import io.micronaut.test.extensions.junit5.annotation.MicronautTest; -import jakarta.inject.Inject; -import java.util.HashSet; -import java.util.List; -import org.junit.jupiter.api.Test; - -@MicronautTest -class AirbyteMessageMigratorMicronautTest { - - @Inject - AirbyteMessageMigrator messageMigrator; - - @Test - void testMigrationInjection() { - // This should contain the list of all the supported majors of the airbyte protocol except the most - // recent one since the migrations themselves are keyed on the lower version. - assertEquals(new HashSet<>(List.of("0")), messageMigrator.getMigrationKeys()); - } - -} diff --git a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/MigratorsMicronautTest.java b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/MigratorsMicronautTest.java new file mode 100644 index 000000000000..1c52ed7c8a45 --- /dev/null +++ b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/MigratorsMicronautTest.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.protocol; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import java.util.Set; +import org.junit.jupiter.api.Test; + +@MicronautTest +class MigratorsMicronautTest { + + @Inject + AirbyteMessageMigrator messageMigrator; + + @Inject + ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator; + + // This should contain the list of all the supported majors of the airbyte protocol except the most + // recent one since the migrations themselves are keyed on the lower version. + final Set SUPPORTED_VERSIONS = Set.of("0"); + + @Test + void testAirbyteMessageMigrationInjection() { + assertEquals(SUPPORTED_VERSIONS, messageMigrator.getMigrationKeys()); + } + + @Test + void testConfiguredAirbyteCatalogMigrationInjection() { + assertEquals(SUPPORTED_VERSIONS, configuredAirbyteCatalogMigrator.getMigrationKeys()); + } + +} From a374a62d462f0dfb86881184d3b95fb7fe2ba0f1 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 20 Dec 2022 15:24:49 -0800 Subject: [PATCH 3/8] Add ConfiguredAirbyteCatalog to AirbyteMessageMigrations --- .../protocol/AirbyteMessageMigrator.java | 24 +++++++++++------ .../AirbyteMessageVersionedMigrator.java | 10 ++++--- .../migrations/AirbyteMessageMigration.java | 9 +++++-- .../migrations/AirbyteMessageMigrationV1.java | 8 ++++-- .../protocol/AirbyteMessageMigratorTest.java | 26 ++++++++++--------- ...VersionedAirbyteMessageBufferedWriter.java | 9 +++++-- ...edAirbyteMessageBufferedWriterFactory.java | 10 +++++-- .../VersionedAirbyteStreamFactory.java | 11 +++++--- .../VersionedAirbyteStreamFactoryTest.java | 19 +++++++++----- .../ReplicationJobOrchestrator.java | 17 +++++++----- .../CheckConnectionActivityImpl.java | 3 ++- .../catalog/DiscoverCatalogActivityImpl.java | 3 ++- .../temporal/spec/SpecActivityImpl.java | 3 ++- .../sync/ReplicationActivityImpl.java | 5 +++- 14 files changed, 105 insertions(+), 52 deletions(-) diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java index d661e1dafe7a..c171a559b822 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java @@ -8,9 +8,11 @@ import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration; import io.airbyte.commons.protocol.migrations.MigrationContainer; import io.airbyte.commons.version.Version; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import jakarta.annotation.PostConstruct; import jakarta.inject.Singleton; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -37,16 +39,20 @@ public void initialize() { * Downgrade a message from the most recent version to the target version by chaining all the * required migrations */ - public PreviousVersion downgrade(final CurrentVersion message, final Version target) { - return migrationContainer.downgrade(message, target, AirbyteMessageMigrator::applyDowngrade); + public PreviousVersion downgrade(final CurrentVersion message, + final Version target, + final Optional configuredAirbyteCatalog) { + return migrationContainer.downgrade(message, target, (migration, msg) -> applyDowngrade(migration, msg, configuredAirbyteCatalog)); } /** * Upgrade a message from the source version to the most recent version by chaining all the required * migrations */ - public CurrentVersion upgrade(final PreviousVersion message, final Version source) { - return migrationContainer.upgrade(message, source, AirbyteMessageMigrator::applyUpgrade); + public CurrentVersion upgrade(final PreviousVersion message, + final Version source, + final Optional configuredAirbyteCatalog) { + return migrationContainer.upgrade(message, source, (migration, msg) -> applyUpgrade(migration, msg, configuredAirbyteCatalog)); } public Version getMostRecentVersion() { @@ -55,14 +61,16 @@ public Version getMostRecentVersion() { // Helper function to work around type casting private static PreviousVersion applyDowngrade(final AirbyteMessageMigration migration, - final Object message) { - return migration.downgrade((CurrentVersion) message); + final Object message, + final Optional configuredAirbyteCatalog) { + return migration.downgrade((CurrentVersion) message, configuredAirbyteCatalog); } // Helper function to work around type casting private static CurrentVersion applyUpgrade(final AirbyteMessageMigration migration, - final Object message) { - return migration.upgrade((PreviousVersion) message); + final Object message, + final Optional configuredAirbyteCatalog) { + return migration.upgrade((PreviousVersion) message, configuredAirbyteCatalog); } // Used for inspection of the injection diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigrator.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigrator.java index c421777c03eb..a0474d7663c7 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigrator.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigrator.java @@ -6,6 +6,8 @@ import io.airbyte.commons.version.Version; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.Optional; /** * Wraps message migration from a fixed version to the most recent version @@ -20,12 +22,12 @@ public AirbyteMessageVersionedMigrator(final AirbyteMessageMigrator migrator, fi this.version = version; } - public OriginalMessageType downgrade(final AirbyteMessage message) { - return migrator.downgrade(message, version); + public OriginalMessageType downgrade(final AirbyteMessage message, final Optional configuredAirbyteCatalog) { + return migrator.downgrade(message, version, configuredAirbyteCatalog); } - public AirbyteMessage upgrade(final OriginalMessageType message) { - return migrator.upgrade(message, version); + public AirbyteMessage upgrade(final OriginalMessageType message, final Optional configuredAirbyteCatalog) { + return migrator.upgrade(message, version, configuredAirbyteCatalog); } public Version getVersion() { diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigration.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigration.java index 42e2b30990e8..f193414da705 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigration.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigration.java @@ -4,6 +4,9 @@ package io.airbyte.commons.protocol.migrations; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.Optional; + /** * AirbyteProtocol message migration interface * @@ -16,16 +19,18 @@ public interface AirbyteMessageMigration extend * Downgrades a message to from the new version to the old version * * @param message: the message to downgrade + * @param configuredAirbyteCatalog: the ConfiguredAirbyteCatalog of the connection when applicable * @return the downgraded message */ - PreviousVersion downgrade(final CurrentVersion message); + PreviousVersion downgrade(final CurrentVersion message, final Optional configuredAirbyteCatalog); /** * Upgrades a message from the old version to the new version * * @param message: the message to upgrade + * @param configuredAirbyteCatalog: the ConfiguredAirbyteCatalog of the connection when applicable * @return the upgrade message */ - CurrentVersion upgrade(final PreviousVersion message); + CurrentVersion upgrade(final PreviousVersion message, final Optional configuredAirbyteCatalog); } diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigrationV1.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigrationV1.java index 501fca10c464..b48a373f9329 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigrationV1.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigrationV1.java @@ -7,8 +7,10 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.version.AirbyteProtocolVersion; import io.airbyte.commons.version.Version; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteMessage; import jakarta.inject.Singleton; +import java.util.Optional; /** * Placeholder AirbyteMessage Migration from v0 to v1 @@ -17,12 +19,14 @@ public class AirbyteMessageMigrationV1 implements AirbyteMessageMigration { @Override - public AirbyteMessage downgrade(io.airbyte.protocol.models.AirbyteMessage message) { + public AirbyteMessage downgrade(final io.airbyte.protocol.models.AirbyteMessage message, + final Optional configuredAirbyteCatalog) { return Jsons.object(Jsons.jsonNode(message), AirbyteMessage.class); } @Override - public io.airbyte.protocol.models.AirbyteMessage upgrade(AirbyteMessage message) { + public io.airbyte.protocol.models.AirbyteMessage upgrade(final AirbyteMessage message, + final Optional configuredAirbyteCatalog) { return Jsons.object(Jsons.jsonNode(message), io.airbyte.protocol.models.AirbyteMessage.class); } diff --git a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorTest.java b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorTest.java index 5e703af1186d..c11a33f66cc0 100644 --- a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorTest.java +++ b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorTest.java @@ -9,7 +9,9 @@ import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration; import io.airbyte.commons.version.Version; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.List; +import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -28,12 +30,12 @@ record ObjectV2(String name2) {} static class Migrate0to1 implements AirbyteMessageMigration { @Override - public ObjectV0 downgrade(ObjectV1 message) { + public ObjectV0 downgrade(ObjectV1 message, Optional configuredAirbyteCatalog) { return new ObjectV0(message.name1); } @Override - public ObjectV1 upgrade(ObjectV0 message) { + public ObjectV1 upgrade(ObjectV0 message, Optional configuredAirbyteCatalog) { return new ObjectV1(message.name0); } @@ -52,12 +54,12 @@ public Version getCurrentVersion() { static class Migrate1to2 implements AirbyteMessageMigration { @Override - public ObjectV1 downgrade(ObjectV2 message) { + public ObjectV1 downgrade(ObjectV2 message, Optional configuredAirbyteCatalog) { return new ObjectV1(message.name2); } @Override - public ObjectV2 upgrade(ObjectV1 message) { + public ObjectV2 upgrade(ObjectV1 message, Optional configuredAirbyteCatalog) { return new ObjectV2(message.name1); } @@ -86,42 +88,42 @@ void beforeEach() { void testDowngrade() { final ObjectV2 obj = new ObjectV2("my name"); - final ObjectV0 objDowngradedTo0 = migrator.downgrade(obj, v0); + final ObjectV0 objDowngradedTo0 = migrator.downgrade(obj, v0, Optional.empty()); assertEquals(obj.name2, objDowngradedTo0.name0); - final ObjectV1 objDowngradedTo1 = migrator.downgrade(obj, v1); + final ObjectV1 objDowngradedTo1 = migrator.downgrade(obj, v1, Optional.empty()); assertEquals(obj.name2, objDowngradedTo1.name1); - final ObjectV2 objDowngradedTo2 = migrator.downgrade(obj, v2); + final ObjectV2 objDowngradedTo2 = migrator.downgrade(obj, v2, Optional.empty()); assertEquals(obj.name2, objDowngradedTo2.name2); } @Test void testUpgrade() { final ObjectV0 obj0 = new ObjectV0("my name 0"); - final ObjectV2 objUpgradedFrom0 = migrator.upgrade(obj0, v0); + final ObjectV2 objUpgradedFrom0 = migrator.upgrade(obj0, v0, Optional.empty()); assertEquals(obj0.name0, objUpgradedFrom0.name2); final ObjectV1 obj1 = new ObjectV1("my name 1"); - final ObjectV2 objUpgradedFrom1 = migrator.upgrade(obj1, v1); + final ObjectV2 objUpgradedFrom1 = migrator.upgrade(obj1, v1, Optional.empty()); assertEquals(obj1.name1, objUpgradedFrom1.name2); final ObjectV2 obj2 = new ObjectV2("my name 2"); - final ObjectV2 objUpgradedFrom2 = migrator.upgrade(obj2, v2); + final ObjectV2 objUpgradedFrom2 = migrator.upgrade(obj2, v2, Optional.empty()); assertEquals(obj2.name2, objUpgradedFrom2.name2); } @Test void testUnsupportedDowngradeShouldFailExplicitly() { assertThrows(RuntimeException.class, () -> { - migrator.downgrade(new ObjectV2("woot"), new Version("5.0.0")); + migrator.downgrade(new ObjectV2("woot"), new Version("5.0.0"), Optional.empty()); }); } @Test void testUnsupportedUpgradeShouldFailExplicitly() { assertThrows(RuntimeException.class, () -> { - migrator.upgrade(new ObjectV0("woot"), new Version("4.0.0")); + migrator.upgrade(new ObjectV0("woot"), new Version("4.0.0"), Optional.empty()); }); } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriter.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriter.java index e1b9b25a4b92..533274e8913a 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriter.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriter.java @@ -7,25 +7,30 @@ import io.airbyte.commons.protocol.AirbyteMessageVersionedMigrator; import io.airbyte.commons.protocol.serde.AirbyteMessageSerializer; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.io.BufferedWriter; import java.io.IOException; +import java.util.Optional; public class VersionedAirbyteMessageBufferedWriter extends DefaultAirbyteMessageBufferedWriter { private final AirbyteMessageSerializer serializer; private final AirbyteMessageVersionedMigrator migrator; + private final Optional configuredAirbyteCatalog; public VersionedAirbyteMessageBufferedWriter(final BufferedWriter writer, final AirbyteMessageSerializer serializer, - final AirbyteMessageVersionedMigrator migrator) { + final AirbyteMessageVersionedMigrator migrator, + final Optional configuredAirbyteCatalog) { super(writer); this.serializer = serializer; this.migrator = migrator; + this.configuredAirbyteCatalog = configuredAirbyteCatalog; } @Override public void write(final AirbyteMessage message) throws IOException { - final T downgradedMessage = migrator.downgrade(message); + final T downgradedMessage = migrator.downgrade(message, configuredAirbyteCatalog); writer.write(serializer.serialize(downgradedMessage)); writer.newLine(); } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java index 3b2a2c8f0a56..be002f4517ad 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java @@ -7,7 +7,9 @@ import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; import io.airbyte.commons.version.Version; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.io.BufferedWriter; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,13 +20,16 @@ public class VersionedAirbyteMessageBufferedWriterFactory implements AirbyteMess private final AirbyteMessageSerDeProvider serDeProvider; private final AirbyteMessageVersionedMigratorFactory migratorFactory; private final Version protocolVersion; + private final Optional configuredAirbyteCatalog; public VersionedAirbyteMessageBufferedWriterFactory(final AirbyteMessageSerDeProvider serDeProvider, final AirbyteMessageVersionedMigratorFactory migratorFactory, - final Version protocolVersion) { + final Version protocolVersion, + final Optional configuredAirbyteCatalog) { this.serDeProvider = serDeProvider; this.migratorFactory = migratorFactory; this.protocolVersion = protocolVersion; + this.configuredAirbyteCatalog = configuredAirbyteCatalog; } @Override @@ -37,7 +42,8 @@ public AirbyteMessageBufferedWriter createWriter(BufferedWriter bufferedWriter) return new VersionedAirbyteMessageBufferedWriter<>( bufferedWriter, serDeProvider.getSerializer(protocolVersion).orElseThrow(), - migratorFactory.getVersionedMigrator(protocolVersion)); + migratorFactory.getVersionedMigrator(protocolVersion), + configuredAirbyteCatalog); } } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java index 2c829415cccc..1828e44da7b4 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java @@ -17,6 +17,7 @@ import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer; import io.airbyte.commons.version.Version; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.io.BufferedReader; import java.io.IOException; import java.util.Optional; @@ -47,6 +48,7 @@ public class VersionedAirbyteStreamFactory extends DefaultAirbyteStreamFactor private final AirbyteMessageSerDeProvider serDeProvider; private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private final Optional configuredAirbyteCatalog; private AirbyteMessageDeserializer deserializer; private AirbyteMessageVersionedMigrator migrator; private Version protocolVersion; @@ -55,19 +57,22 @@ public class VersionedAirbyteStreamFactory extends DefaultAirbyteStreamFactor public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProvider, final AirbyteMessageVersionedMigratorFactory migratorFactory, - final Version protocolVersion) { - this(serDeProvider, migratorFactory, protocolVersion, MdcScope.DEFAULT_BUILDER); + final Version protocolVersion, + final Optional configuredAirbyteCatalog) { + this(serDeProvider, migratorFactory, protocolVersion, configuredAirbyteCatalog, MdcScope.DEFAULT_BUILDER); } public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProvider, final AirbyteMessageVersionedMigratorFactory migratorFactory, final Version protocolVersion, + final Optional configuredAirbyteCatalog, final MdcScope.Builder containerLogMdcBuilder) { // TODO AirbyteProtocolPredicate needs to be updated to be protocol version aware super(new AirbyteProtocolPredicate(), LOGGER, containerLogMdcBuilder); Preconditions.checkNotNull(protocolVersion); this.serDeProvider = serDeProvider; this.migratorFactory = migratorFactory; + this.configuredAirbyteCatalog = configuredAirbyteCatalog; this.initializeForProtocolVersion(protocolVersion); } @@ -164,7 +169,7 @@ final protected void initializeForProtocolVersion(final Version protocolVersion) @Override protected Stream toAirbyteMessage(final JsonNode json) { try { - final AirbyteMessage message = migrator.upgrade(deserializer.deserialize(json)); + final AirbyteMessage message = migrator.upgrade(deserializer.deserialize(json), configuredAirbyteCatalog); return Stream.of(message); } catch (final RuntimeException e) { logger.warn("Failed to upgrade a message from version {}: {}", protocolVersion, Jsons.serialize(json), e); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java index b692938e84e0..14840ac50af9 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java @@ -23,6 +23,7 @@ import java.io.StringReader; import java.nio.charset.Charset; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -50,7 +51,8 @@ void beforeEach() { @Test void testCreate() { final Version initialVersion = new Version("0.1.2"); - final VersionedAirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion); + final VersionedAirbyteStreamFactory streamFactory = + new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion, Optional.empty()); final BufferedReader bufferedReader = new BufferedReader(new StringReader("")); streamFactory.create(bufferedReader); @@ -62,8 +64,9 @@ void testCreate() { @Test void testCreateWithVersionDetection() { final Version initialVersion = new Version("0.0.0"); - final VersionedAirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion) - .withDetectVersion(true); + final VersionedAirbyteStreamFactory streamFactory = + new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion, Optional.empty()) + .withDetectVersion(true); final BufferedReader bufferedReader = getBuffereredReader("version-detection/logs-with-version.jsonl"); @@ -78,8 +81,9 @@ void testCreateWithVersionDetection() { @Test void testCreateWithVersionDetectionFallback() { final Version initialVersion = new Version("0.0.6"); - final VersionedAirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion) - .withDetectVersion(true); + final VersionedAirbyteStreamFactory streamFactory = + new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion, Optional.empty()) + .withDetectVersion(true); final BufferedReader bufferedReader = getBuffereredReader("version-detection/logs-without-version.jsonl"); @@ -94,8 +98,9 @@ void testCreateWithVersionDetectionFallback() { @Test void testCreateWithVersionDetectionWithoutSpecMessage() { final Version initialVersion = new Version("0.0.1"); - final VersionedAirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion) - .withDetectVersion(true); + final VersionedAirbyteStreamFactory streamFactory = + new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion, Optional.empty()) + .withDetectVersion(true); final BufferedReader bufferedReader = getBuffereredReader("version-detection/logs-without-spec-message.jsonl"); diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index b4d73416e743..bd005a772b4c 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -25,6 +25,7 @@ import io.airbyte.metrics.lib.MetricEmittingApps; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.RecordSchemaValidator; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.WorkerMetricReporter; @@ -130,7 +131,7 @@ public Optional runJob() throws Exception { WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage()) ? new EmptyAirbyteSource( featureFlags.useStreamCapableState()) : new DefaultAirbyteSource(sourceLauncher, - getStreamFactory(sourceLauncherConfig.getProtocolVersion(), DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER)); + getStreamFactory(sourceLauncherConfig.getProtocolVersion(), syncInput.getCatalog(), DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER)); MetricClientFactory.initialize(MetricEmittingApps.WORKER); final var metricClient = MetricClientFactory.getMetricClient(); @@ -143,9 +144,11 @@ public Optional runJob() throws Exception { Math.toIntExact(jobRunConfig.getAttemptId()), airbyteSource, new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()), - new DefaultAirbyteDestination(destinationLauncher, getStreamFactory(destinationLauncherConfig.getProtocolVersion(), - DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER), - new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), + new DefaultAirbyteDestination(destinationLauncher, + getStreamFactory(destinationLauncherConfig.getProtocolVersion(), syncInput.getCatalog(), + DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER), + new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion(), + Optional.of(syncInput.getCatalog()))), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), metricReporter, featureFlags.applyFieldSelection()); @@ -159,9 +162,11 @@ public Optional runJob() throws Exception { return Optional.of(Jsons.serialize(replicationOutput)); } - private AirbyteStreamFactory getStreamFactory(final Version protocolVersion, final MdcScope.Builder mdcScope) { + private AirbyteStreamFactory getStreamFactory(final Version protocolVersion, + final ConfiguredAirbyteCatalog configuredAirbyteCatalog, + final MdcScope.Builder mdcScope) { return protocolVersion != null - ? new VersionedAirbyteStreamFactory(serDeProvider, migratorFactory, protocolVersion, mdcScope) + ? new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, protocolVersion, Optional.of(configuredAirbyteCatalog), mdcScope) : new DefaultAirbyteStreamFactory(mdcScope); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index ba21f2af375a..f0a4ce72e373 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -44,6 +44,7 @@ import jakarta.inject.Singleton; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; @Singleton @Requires(env = WorkerMode.CONTROL_PLANE) @@ -133,7 +134,7 @@ private CheckedSupplier workerConfigs.getResourceRequirements(), launcherConfig.getIsCustomConnector()); final AirbyteStreamFactory streamFactory = launcherConfig.getProtocolVersion() != null - ? new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, launcherConfig.getProtocolVersion()) + ? new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, launcherConfig.getProtocolVersion(), Optional.empty()) : new DefaultAirbyteStreamFactory(); return new DefaultCheckConnectionWorker(integrationLauncher, streamFactory); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index 5a36c144fc43..89f98984a3fa 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -43,6 +43,7 @@ import jakarta.inject.Singleton; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @Singleton @@ -126,7 +127,7 @@ private CheckedSupplier new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), processFactory, workerConfigs.getResourceRequirements(), launcherConfig.getIsCustomConnector()); final AirbyteStreamFactory streamFactory = - new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, launcherConfig.getProtocolVersion()); + new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, launcherConfig.getProtocolVersion(), Optional.empty()); return new DefaultDiscoverCatalogWorker(configRepository, integrationLauncher, streamFactory); }; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index 280196052ddb..d718e8cb4cd0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -41,6 +41,7 @@ import jakarta.inject.Singleton; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import java.util.function.Supplier; @Singleton @@ -123,7 +124,7 @@ private AirbyteStreamFactory getStreamFactory(final IntegrationLauncherConfig la final Version protocolVersion = launcherConfig.getProtocolVersion() != null ? launcherConfig.getProtocolVersion() : migratorFactory.getMostRecentVersion(); // Try to detect version from the stream - return new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, protocolVersion).withDetectVersion(true); + return new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, protocolVersion, Optional.empty()).withDetectVersion(true); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 17a501ff25c9..c422e1e2bac0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -288,6 +288,7 @@ private CheckedSupplier, Exception> ? new EmptyAirbyteSource(featureFlags.useStreamCapableState()) : new DefaultAirbyteSource(sourceLauncher, new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, sourceLauncherConfig.getProtocolVersion(), + Optional.of(syncInput.getCatalog()), DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER)); MetricClientFactory.initialize(MetricEmittingApps.WORKER); final MetricClient metricClient = MetricClientFactory.getMetricClient(); @@ -300,8 +301,10 @@ private CheckedSupplier, Exception> new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()), new DefaultAirbyteDestination(destinationLauncher, new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion(), + Optional.of(syncInput.getCatalog()), DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER), - new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), + new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion(), + Optional.of(syncInput.getCatalog()))), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), metricReporter, false); From 7adbe361e18e12c8850afad0f9e4dc0d8934c906 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 20 Dec 2022 17:27:37 -0800 Subject: [PATCH 4/8] Enable ConfiguredAirbyteCatalog migration --- ...irbyteMessageVersionedMigratorFactory.java | 30 --------------- ...rbyteProtocolVersionedMigratorFactory.java | 37 +++++++++++++++++++ .../protocol/DefaultProtocolSerializer.java | 17 +++++++++ .../commons/protocol/ProtocolSerializer.java | 13 +++++++ .../protocol/VersionedProtocolSerializer.java | 26 +++++++++++++ .../internal/DefaultAirbyteDestination.java | 12 ++++-- .../internal/DefaultAirbyteSource.java | 16 ++++++-- ...edAirbyteMessageBufferedWriterFactory.java | 8 ++-- .../VersionedAirbyteStreamFactory.java | 10 ++--- .../DefaultAirbyteDestinationTest.java | 9 ++++- .../internal/DefaultAirbyteSourceTest.java | 12 +++--- .../VersionedAirbyteStreamFactoryTest.java | 17 ++++++--- .../config/ContainerOrchestratorFactory.java | 4 +- .../ReplicationJobOrchestrator.java | 12 +++--- .../ContainerOrchestratorFactoryTest.java | 14 +++---- .../CheckConnectionActivityImpl.java | 6 +-- .../catalog/DiscoverCatalogActivityImpl.java | 6 +-- .../temporal/spec/SpecActivityImpl.java | 6 +-- .../sync/ReplicationActivityImpl.java | 12 +++--- 19 files changed, 180 insertions(+), 87 deletions(-) delete mode 100644 airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigratorFactory.java create mode 100644 airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteProtocolVersionedMigratorFactory.java create mode 100644 airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/DefaultProtocolSerializer.java create mode 100644 airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/ProtocolSerializer.java create mode 100644 airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/VersionedProtocolSerializer.java diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigratorFactory.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigratorFactory.java deleted file mode 100644 index dec45297880e..000000000000 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigratorFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.commons.protocol; - -import io.airbyte.commons.version.Version; -import jakarta.inject.Singleton; - -/** - * Factory to build AirbyteMessageVersionedMigrator - */ -@Singleton -public class AirbyteMessageVersionedMigratorFactory { - - private final AirbyteMessageMigrator migrator; - - public AirbyteMessageVersionedMigratorFactory(final AirbyteMessageMigrator migrator) { - this.migrator = migrator; - } - - public AirbyteMessageVersionedMigrator getVersionedMigrator(final Version version) { - return new AirbyteMessageVersionedMigrator<>(this.migrator, version); - } - - public Version getMostRecentVersion() { - return migrator.getMostRecentVersion(); - } - -} diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteProtocolVersionedMigratorFactory.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteProtocolVersionedMigratorFactory.java new file mode 100644 index 000000000000..594dad0f7687 --- /dev/null +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteProtocolVersionedMigratorFactory.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.protocol; + +import io.airbyte.commons.version.Version; +import jakarta.inject.Singleton; + +/** + * Factory to build AirbyteMessageVersionedMigrator + */ +@Singleton +public class AirbyteProtocolVersionedMigratorFactory { + + private final AirbyteMessageMigrator airbyteMessageMigrator; + private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator; + + public AirbyteProtocolVersionedMigratorFactory(final AirbyteMessageMigrator airbyteMessageMigrator, + final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator) { + this.airbyteMessageMigrator = airbyteMessageMigrator; + this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator; + } + + public AirbyteMessageVersionedMigrator getAirbyteMessageMigrator(final Version version) { + return new AirbyteMessageVersionedMigrator<>(this.airbyteMessageMigrator, version); + } + + public final VersionedProtocolSerializer getProtocolSerializer(final Version version) { + return new VersionedProtocolSerializer(configuredAirbyteCatalogMigrator, version); + } + + public Version getMostRecentVersion() { + return airbyteMessageMigrator.getMostRecentVersion(); + } + +} diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/DefaultProtocolSerializer.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/DefaultProtocolSerializer.java new file mode 100644 index 000000000000..0e6ee93f5bbd --- /dev/null +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/DefaultProtocolSerializer.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.protocol; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; + +public class DefaultProtocolSerializer implements ProtocolSerializer { + + @Override + public String serialize(ConfiguredAirbyteCatalog configuredAirbyteCatalog) { + return Jsons.serialize(configuredAirbyteCatalog); + } + +} diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/ProtocolSerializer.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/ProtocolSerializer.java new file mode 100644 index 000000000000..527697ebd1ca --- /dev/null +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/ProtocolSerializer.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.protocol; + +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; + +public interface ProtocolSerializer { + + String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog); + +} diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/VersionedProtocolSerializer.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/VersionedProtocolSerializer.java new file mode 100644 index 000000000000..c595f3ca8f1a --- /dev/null +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/VersionedProtocolSerializer.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.protocol; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.version.Version; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; + +public class VersionedProtocolSerializer implements ProtocolSerializer { + + private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator; + private final Version protocolVersion; + + public VersionedProtocolSerializer(final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator, final Version protocolVersion) { + this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator; + this.protocolVersion = protocolVersion; + } + + @Override + public String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) { + return Jsons.serialize(configuredAirbyteCatalogMigrator.downgrade(configuredAirbyteCatalog, protocolVersion)); + } + +} diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteDestination.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteDestination.java index d776dfba1a95..1af997d1869e 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteDestination.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteDestination.java @@ -15,6 +15,8 @@ import io.airbyte.commons.logging.LoggingHelper.Color; import io.airbyte.commons.logging.MdcScope; import io.airbyte.commons.logging.MdcScope.Builder; +import io.airbyte.commons.protocol.DefaultProtocolSerializer; +import io.airbyte.commons.protocol.ProtocolSerializer; import io.airbyte.config.WorkerDestinationConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -43,6 +45,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination { private final IntegrationLauncher integrationLauncher; private final AirbyteStreamFactory streamFactory; private final AirbyteMessageBufferedWriterFactory messageWriterFactory; + private final ProtocolSerializer protocolSerializer; private final AtomicBoolean inputHasEnded = new AtomicBoolean(false); @@ -52,16 +55,19 @@ public class DefaultAirbyteDestination implements AirbyteDestination { private Integer exitValue = null; public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher) { - this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER), new DefaultAirbyteMessageBufferedWriterFactory()); + this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER), new DefaultAirbyteMessageBufferedWriterFactory(), + new DefaultProtocolSerializer()); } public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher, final AirbyteStreamFactory streamFactory, - final AirbyteMessageBufferedWriterFactory messageWriterFactory) { + final AirbyteMessageBufferedWriterFactory messageWriterFactory, + final ProtocolSerializer protocolSerializer) { this.integrationLauncher = integrationLauncher; this.streamFactory = streamFactory; this.messageWriterFactory = messageWriterFactory; + this.protocolSerializer = protocolSerializer; } @Trace(operationName = WORKER_OPERATION_NAME) @@ -75,7 +81,7 @@ public void start(final WorkerDestinationConfig destinationConfig, final Path jo WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(destinationConfig.getDestinationConnectionConfiguration()), WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, - Jsons.serialize(destinationConfig.getCatalog())); + protocolSerializer.serialize(destinationConfig.getCatalog())); // stdout logs are logged elsewhere since stdout also contains data LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination", CONTAINER_LOG_MDC_BUILDER); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java index 62e0ac5792b8..68ed52707c16 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java @@ -16,6 +16,8 @@ import io.airbyte.commons.logging.LoggingHelper.Color; import io.airbyte.commons.logging.MdcScope; import io.airbyte.commons.logging.MdcScope.Builder; +import io.airbyte.commons.protocol.DefaultProtocolSerializer; +import io.airbyte.commons.protocol.ProtocolSerializer; import io.airbyte.config.WorkerSourceConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -45,6 +47,7 @@ public class DefaultAirbyteSource implements AirbyteSource { private final IntegrationLauncher integrationLauncher; private final AirbyteStreamFactory streamFactory; + private final ProtocolSerializer protocolSerializer; private final HeartbeatMonitor heartbeatMonitor; private Process sourceProcess = null; @@ -53,19 +56,23 @@ public class DefaultAirbyteSource implements AirbyteSource { private final boolean logConnectorMessages = new EnvVariableFeatureFlags().logConnectorMessages(); public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) { - this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER)); + this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER), new DefaultProtocolSerializer()); } - public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, final AirbyteStreamFactory streamFactory) { - this(integrationLauncher, streamFactory, new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION)); + public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, + final AirbyteStreamFactory streamFactory, + final ProtocolSerializer protocolSerializer) { + this(integrationLauncher, streamFactory, protocolSerializer, new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION)); } @VisibleForTesting DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, final AirbyteStreamFactory streamFactory, + final ProtocolSerializer protocolSerializer, final HeartbeatMonitor heartbeatMonitor) { this.integrationLauncher = integrationLauncher; this.streamFactory = streamFactory; + this.protocolSerializer = protocolSerializer; this.heartbeatMonitor = heartbeatMonitor; } @@ -78,8 +85,9 @@ public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) thr WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(sourceConfig.getSourceConnectionConfiguration()), WorkerConstants.SOURCE_CATALOG_JSON_FILENAME, - Jsons.serialize(sourceConfig.getCatalog()), + protocolSerializer.serialize(sourceConfig.getCatalog()), sourceConfig.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME, + // TODO We should be passing a typed state here and use the protocolSerializer sourceConfig.getState() == null ? null : Jsons.serialize(sourceConfig.getState().getState())); // stdout logs are logged elsewhere since stdout also contains data LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error, "airbyte-source", CONTAINER_LOG_MDC_BUILDER); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java index be002f4517ad..c10a0dc4e45a 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java @@ -5,7 +5,7 @@ package io.airbyte.workers.internal; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; -import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; import io.airbyte.commons.version.Version; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.io.BufferedWriter; @@ -18,12 +18,12 @@ public class VersionedAirbyteMessageBufferedWriterFactory implements AirbyteMess private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteMessageBufferedWriterFactory.class); private final AirbyteMessageSerDeProvider serDeProvider; - private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private final AirbyteProtocolVersionedMigratorFactory migratorFactory; private final Version protocolVersion; private final Optional configuredAirbyteCatalog; public VersionedAirbyteMessageBufferedWriterFactory(final AirbyteMessageSerDeProvider serDeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory, + final AirbyteProtocolVersionedMigratorFactory migratorFactory, final Version protocolVersion, final Optional configuredAirbyteCatalog) { this.serDeProvider = serDeProvider; @@ -42,7 +42,7 @@ public AirbyteMessageBufferedWriter createWriter(BufferedWriter bufferedWriter) return new VersionedAirbyteMessageBufferedWriter<>( bufferedWriter, serDeProvider.getSerializer(protocolVersion).orElseThrow(), - migratorFactory.getVersionedMigrator(protocolVersion), + migratorFactory.getAirbyteMessageMigrator(protocolVersion), configuredAirbyteCatalog); } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java index 1828e44da7b4..eaeef960958e 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java @@ -13,7 +13,7 @@ import io.airbyte.commons.logging.MdcScope; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; import io.airbyte.commons.protocol.AirbyteMessageVersionedMigrator; -import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer; import io.airbyte.commons.version.Version; import io.airbyte.protocol.models.AirbyteMessage; @@ -47,7 +47,7 @@ public class VersionedAirbyteStreamFactory extends DefaultAirbyteStreamFactor private static final String TYPE_FIELD_NAME = "type"; private final AirbyteMessageSerDeProvider serDeProvider; - private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private final AirbyteProtocolVersionedMigratorFactory migratorFactory; private final Optional configuredAirbyteCatalog; private AirbyteMessageDeserializer deserializer; private AirbyteMessageVersionedMigrator migrator; @@ -56,14 +56,14 @@ public class VersionedAirbyteStreamFactory extends DefaultAirbyteStreamFactor private boolean shouldDetectVersion = false; public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory, + final AirbyteProtocolVersionedMigratorFactory migratorFactory, final Version protocolVersion, final Optional configuredAirbyteCatalog) { this(serDeProvider, migratorFactory, protocolVersion, configuredAirbyteCatalog, MdcScope.DEFAULT_BUILDER); } public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory, + final AirbyteProtocolVersionedMigratorFactory migratorFactory, final Version protocolVersion, final Optional configuredAirbyteCatalog, final MdcScope.Builder containerLogMdcBuilder) { @@ -162,7 +162,7 @@ public VersionedAirbyteStreamFactory withDetectVersion(final boolean detectVe final protected void initializeForProtocolVersion(final Version protocolVersion) { this.deserializer = (AirbyteMessageDeserializer) serDeProvider.getDeserializer(protocolVersion).orElseThrow(); - this.migrator = migratorFactory.getVersionedMigrator(protocolVersion); + this.migrator = migratorFactory.getAirbyteMessageMigrator(protocolVersion); this.protocolVersion = protocolVersion; } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteDestinationTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteDestinationTest.java index cf599ad972a7..7028eff2fa74 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteDestinationTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteDestinationTest.java @@ -20,6 +20,8 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.LoggingHelper.Color; +import io.airbyte.commons.protocol.DefaultProtocolSerializer; +import io.airbyte.commons.protocol.ProtocolSerializer; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.WorkerDestinationConfig; import io.airbyte.config.helpers.LogClientSingleton; @@ -80,6 +82,7 @@ class DefaultAirbyteDestinationTest { private Process process; private AirbyteStreamFactory streamFactory; private AirbyteMessageBufferedWriterFactory messageWriterFactory; + private final ProtocolSerializer protocolSerializer = new DefaultProtocolSerializer(); private ByteArrayOutputStream outputStream; @BeforeEach @@ -122,7 +125,8 @@ void tearDown() throws IOException { @SuppressWarnings("BusyWait") @Test void testSuccessfulLifecycle() throws Exception { - final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher, streamFactory, messageWriterFactory); + final AirbyteDestination destination = + new DefaultAirbyteDestination(integrationLauncher, streamFactory, messageWriterFactory, protocolSerializer); destination.start(DESTINATION_CONFIG, jobRoot); final AirbyteMessage recordMessage = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"); @@ -161,7 +165,8 @@ void testSuccessfulLifecycle() throws Exception { @Test void testTaggedLogs() throws Exception { - final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher, streamFactory, messageWriterFactory); + final AirbyteDestination destination = + new DefaultAirbyteDestination(integrationLauncher, streamFactory, messageWriterFactory, protocolSerializer); destination.start(DESTINATION_CONFIG, jobRoot); final AirbyteMessage recordMessage = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java index 0aadd7aa265a..d2e8121bed30 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java @@ -20,6 +20,8 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.LoggingHelper.Color; +import io.airbyte.commons.protocol.DefaultProtocolSerializer; +import io.airbyte.commons.protocol.ProtocolSerializer; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.State; import io.airbyte.config.WorkerSourceConfig; @@ -94,6 +96,7 @@ class DefaultAirbyteSourceTest { private Process process; private AirbyteStreamFactory streamFactory; private HeartbeatMonitor heartbeatMonitor; + private final ProtocolSerializer protocolSerializer = new DefaultProtocolSerializer(); @BeforeEach void setup() throws IOException, WorkerException { @@ -137,7 +140,7 @@ void testSuccessfulLifecycle() throws Exception { when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false); - final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); + final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, protocolSerializer, heartbeatMonitor); source.start(SOURCE_CONFIG, jobRoot); final List messages = Lists.newArrayList(); @@ -172,8 +175,7 @@ void testTaggedLogs() throws Exception { when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false); - final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, - heartbeatMonitor); + final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, protocolSerializer, heartbeatMonitor); source.start(SOURCE_CONFIG, jobRoot); final List messages = Lists.newArrayList(); @@ -198,7 +200,7 @@ void testTaggedLogs() throws Exception { @Test void testNonzeroExitCodeThrows() throws Exception { - final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); + final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, protocolSerializer, heartbeatMonitor); tap.start(SOURCE_CONFIG, jobRoot); when(process.exitValue()).thenReturn(1); @@ -208,7 +210,7 @@ void testNonzeroExitCodeThrows() throws Exception { @Test void testGetExitValue() throws Exception { - final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); + final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, protocolSerializer, heartbeatMonitor); source.start(SOURCE_CONFIG, jobRoot); when(process.isAlive()).thenReturn(false); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java index 14840ac50af9..b6b275548f24 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java @@ -10,8 +10,10 @@ import io.airbyte.commons.protocol.AirbyteMessageMigrator; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; -import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; +import io.airbyte.commons.protocol.ConfiguredAirbyteCatalogMigrator; import io.airbyte.commons.protocol.migrations.AirbyteMessageMigrationV1; +import io.airbyte.commons.protocol.migrations.ConfiguredAirbyteCatalogMigrationV1; import io.airbyte.commons.protocol.serde.AirbyteMessageV0Deserializer; import io.airbyte.commons.protocol.serde.AirbyteMessageV0Serializer; import io.airbyte.commons.protocol.serde.AirbyteMessageV1Deserializer; @@ -32,7 +34,7 @@ class VersionedAirbyteStreamFactoryTest { AirbyteMessageSerDeProvider serDeProvider; - AirbyteMessageVersionedMigratorFactory migratorFactory; + AirbyteProtocolVersionedMigratorFactory migratorFactory; final static Version defaultVersion = new Version("0.2.0"); @@ -42,10 +44,13 @@ void beforeEach() { List.of(new AirbyteMessageV0Deserializer(), new AirbyteMessageV1Deserializer()), List.of(new AirbyteMessageV0Serializer(), new AirbyteMessageV1Serializer()))); serDeProvider.initialize(); - final AirbyteMessageMigrator migrator = new AirbyteMessageMigrator( + final AirbyteMessageMigrator airbyteMessageMigrator = new AirbyteMessageMigrator( List.of(new AirbyteMessageMigrationV1())); - migrator.initialize(); - migratorFactory = spy(new AirbyteMessageVersionedMigratorFactory(migrator)); + airbyteMessageMigrator.initialize(); + final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator = new ConfiguredAirbyteCatalogMigrator( + List.of(new ConfiguredAirbyteCatalogMigrationV1())); + configuredAirbyteCatalogMigrator.initialize(); + migratorFactory = spy(new AirbyteProtocolVersionedMigratorFactory(airbyteMessageMigrator, configuredAirbyteCatalogMigrator)); } @Test @@ -58,7 +63,7 @@ void testCreate() { streamFactory.create(bufferedReader); verify(serDeProvider).getDeserializer(initialVersion); - verify(migratorFactory).getVersionedMigrator(initialVersion); + verify(migratorFactory).getAirbyteMessageMigrator(initialVersion); } @Test diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactory.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactory.java index 69d8cd74b836..94da832257f0 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactory.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactory.java @@ -7,7 +7,7 @@ import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; -import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; import io.airbyte.commons.temporal.sync.OrchestratorConstants; import io.airbyte.config.EnvConfigs; import io.airbyte.container_orchestrator.orchestrator.DbtJobOrchestrator; @@ -99,7 +99,7 @@ JobOrchestrator jobOrchestrator( final FeatureFlags featureFlags, final WorkerConfigs workerConfigs, final AirbyteMessageSerDeProvider serdeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory, + final AirbyteProtocolVersionedMigratorFactory migratorFactory, final JobRunConfig jobRunConfig) { return switch (application) { case ReplicationLauncherWorker.REPLICATION -> new ReplicationJobOrchestrator(envConfigs, processFactory, featureFlags, serdeProvider, diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index bd005a772b4c..71dfa1f1036d 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -14,7 +14,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.MdcScope; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; -import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.commons.version.Version; import io.airbyte.config.Configs; @@ -58,14 +58,14 @@ public class ReplicationJobOrchestrator implements JobOrchestrator runJob() throws Exception { WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage()) ? new EmptyAirbyteSource( featureFlags.useStreamCapableState()) : new DefaultAirbyteSource(sourceLauncher, - getStreamFactory(sourceLauncherConfig.getProtocolVersion(), syncInput.getCatalog(), DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER)); + getStreamFactory(sourceLauncherConfig.getProtocolVersion(), syncInput.getCatalog(), DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER), + migratorFactory.getProtocolSerializer(sourceLauncherConfig.getProtocolVersion())); MetricClientFactory.initialize(MetricEmittingApps.WORKER); final var metricClient = MetricClientFactory.getMetricClient(); @@ -148,7 +149,8 @@ public Optional runJob() throws Exception { getStreamFactory(destinationLauncherConfig.getProtocolVersion(), syncInput.getCatalog(), DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER), new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion(), - Optional.of(syncInput.getCatalog()))), + Optional.of(syncInput.getCatalog())), + migratorFactory.getProtocolSerializer(destinationLauncherConfig.getProtocolVersion())), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), metricReporter, featureFlags.applyFieldSelection()); diff --git a/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactoryTest.java b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactoryTest.java index d99a521fae99..d69bccc0ec69 100644 --- a/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactoryTest.java +++ b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactoryTest.java @@ -11,7 +11,7 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; -import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; import io.airbyte.config.EnvConfigs; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConfigs; @@ -44,7 +44,7 @@ class ContainerOrchestratorFactoryTest { AirbyteMessageSerDeProvider airbyteMessageSerDeProvider; @Inject - AirbyteMessageVersionedMigratorFactory airbyteMessageVersionedMigratorFactory; + AirbyteProtocolVersionedMigratorFactory airbyteProtocolVersionedMigratorFactory; @Inject JobRunConfig jobRunConfig; @@ -87,29 +87,29 @@ void jobOrchestrator() { final var repl = factory.jobOrchestrator( ReplicationLauncherWorker.REPLICATION, envConfigs, processFactory, featureFlags, workerConfigs, - airbyteMessageSerDeProvider, airbyteMessageVersionedMigratorFactory, jobRunConfig); + airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig); assertEquals("Replication", repl.getOrchestratorName()); final var norm = factory.jobOrchestrator( NormalizationLauncherWorker.NORMALIZATION, envConfigs, processFactory, featureFlags, workerConfigs, - airbyteMessageSerDeProvider, airbyteMessageVersionedMigratorFactory, jobRunConfig); + airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig); assertEquals("Normalization", norm.getOrchestratorName()); final var dbt = factory.jobOrchestrator( DbtLauncherWorker.DBT, envConfigs, processFactory, featureFlags, workerConfigs, - airbyteMessageSerDeProvider, airbyteMessageVersionedMigratorFactory, jobRunConfig); + airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig); assertEquals("DBT Transformation", dbt.getOrchestratorName()); final var noop = factory.jobOrchestrator( AsyncOrchestratorPodProcess.NO_OP, envConfigs, processFactory, featureFlags, workerConfigs, - airbyteMessageSerDeProvider, airbyteMessageVersionedMigratorFactory, jobRunConfig); + airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig); assertEquals("NO_OP", noop.getOrchestratorName()); var caught = false; try { factory.jobOrchestrator( "does not exist", envConfigs, processFactory, featureFlags, workerConfigs, - airbyteMessageSerDeProvider, airbyteMessageVersionedMigratorFactory, jobRunConfig); + airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig); } catch (final Exception e) { caught = true; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index f0a4ce72e373..1b799568948e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -14,7 +14,7 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; -import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.config.Configs.WorkerEnvironment; @@ -59,7 +59,7 @@ public class CheckConnectionActivityImpl implements CheckConnectionActivity { private final AirbyteApiClient airbyteApiClient; private final String airbyteVersion; private final AirbyteMessageSerDeProvider serDeProvider; - private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private final AirbyteProtocolVersionedMigratorFactory migratorFactory; public CheckConnectionActivityImpl(@Named("checkWorkerConfigs") final WorkerConfigs workerConfigs, @Named("checkProcessFactory") final ProcessFactory processFactory, @@ -70,7 +70,7 @@ public CheckConnectionActivityImpl(@Named("checkWorkerConfigs") final WorkerConf final AirbyteApiClient airbyteApiClient, @Value("${airbyte.version}") final String airbyteVersion, final AirbyteMessageSerDeProvider serDeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory) { + final AirbyteProtocolVersionedMigratorFactory migratorFactory) { this.workerConfigs = workerConfigs; this.processFactory = processFactory; this.workspaceRoot = workspaceRoot; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index 89f98984a3fa..39f83675244e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -14,7 +14,7 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; -import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.config.Configs.WorkerEnvironment; @@ -62,7 +62,7 @@ public class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity { private final ConfigRepository configRepository; private final AirbyteMessageSerDeProvider serDeProvider; - private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private final AirbyteProtocolVersionedMigratorFactory migratorFactory; public DiscoverCatalogActivityImpl(@Named("discoverWorkerConfigs") final WorkerConfigs workerConfigs, @Named("discoverProcessFactory") final ProcessFactory processFactory, @@ -74,7 +74,7 @@ public DiscoverCatalogActivityImpl(@Named("discoverWorkerConfigs") final WorkerC final AirbyteApiClient airbyteApiClient, @Value("${airbyte.version}") final String airbyteVersion, final AirbyteMessageSerDeProvider serDeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory) { + final AirbyteProtocolVersionedMigratorFactory migratorFactory) { this.configRepository = configRepository; this.workerConfigs = workerConfigs; this.processFactory = processFactory; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index d718e8cb4cd0..d07cc94c9e39 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -13,7 +13,7 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; -import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.version.Version; @@ -56,7 +56,7 @@ public class SpecActivityImpl implements SpecActivity { private final AirbyteApiClient airbyteApiClient; private final String airbyteVersion; private final AirbyteMessageSerDeProvider serDeProvider; - private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private final AirbyteProtocolVersionedMigratorFactory migratorFactory; public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerConfigs, @Named("specProcessFactory") final ProcessFactory processFactory, @@ -66,7 +66,7 @@ public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerCo final AirbyteApiClient airbyteApiClient, @Value("${airbyte.version}") final String airbyteVersion, final AirbyteMessageSerDeProvider serDeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory) { + final AirbyteProtocolVersionedMigratorFactory migratorFactory) { this.workerConfigs = workerConfigs; this.processFactory = processFactory; this.workspaceRoot = workspaceRoot; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index c422e1e2bac0..80788f34a281 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -22,7 +22,7 @@ import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; -import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory; import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.AirbyteConfigValidator; @@ -99,7 +99,7 @@ public class ReplicationActivityImpl implements ReplicationActivity { private final TemporalUtils temporalUtils; private final AirbyteApiClient airbyteApiClient; private final AirbyteMessageSerDeProvider serDeProvider; - private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private final AirbyteProtocolVersionedMigratorFactory migratorFactory; private final WorkerConfigs workerConfigs; public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, @@ -115,7 +115,7 @@ public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optio final TemporalUtils temporalUtils, final AirbyteApiClient airbyteApiClient, final AirbyteMessageSerDeProvider serDeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory, + final AirbyteProtocolVersionedMigratorFactory migratorFactory, @Named("replicationWorkerConfigs") final WorkerConfigs workerConfigs) { this.containerOrchestratorConfig = containerOrchestratorConfig; this.processFactory = processFactory; @@ -289,7 +289,8 @@ private CheckedSupplier, Exception> : new DefaultAirbyteSource(sourceLauncher, new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, sourceLauncherConfig.getProtocolVersion(), Optional.of(syncInput.getCatalog()), - DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER)); + DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER), + migratorFactory.getProtocolSerializer(sourceLauncherConfig.getProtocolVersion())); MetricClientFactory.initialize(MetricEmittingApps.WORKER); final MetricClient metricClient = MetricClientFactory.getMetricClient(); final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage()); @@ -304,7 +305,8 @@ private CheckedSupplier, Exception> Optional.of(syncInput.getCatalog()), DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER), new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion(), - Optional.of(syncInput.getCatalog()))), + Optional.of(syncInput.getCatalog())), + migratorFactory.getProtocolSerializer(destinationLauncherConfig.getProtocolVersion())), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), metricReporter, false); From dce73623e004e9a8039f12b77cf5e0e16531dfba Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Mon, 9 Jan 2023 15:56:24 -0800 Subject: [PATCH 5/8] Fix tests --- .../io/airbyte/workers/internal/DefaultAirbyteSourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java index 6494a17bb0e0..13b1bed01246 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java @@ -210,7 +210,7 @@ void testNonzeroExitCodeThrows() throws Exception { @Test void testIgnoredExitCodes() throws Exception { - final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); + final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, protocolSerializer, heartbeatMonitor); tap.start(SOURCE_CONFIG, jobRoot); when(process.isAlive()).thenReturn(false); From c8275d2eaf8684e06c70a719e3a007186be2eccf Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Mon, 9 Jan 2023 17:09:40 -0800 Subject: [PATCH 6/8] Remove extra this. --- .../protocol/AirbyteProtocolVersionedMigratorFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteProtocolVersionedMigratorFactory.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteProtocolVersionedMigratorFactory.java index 594dad0f7687..52af4e2233e2 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteProtocolVersionedMigratorFactory.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteProtocolVersionedMigratorFactory.java @@ -23,7 +23,7 @@ public AirbyteProtocolVersionedMigratorFactory(final AirbyteMessageMigrator airb } public AirbyteMessageVersionedMigrator getAirbyteMessageMigrator(final Version version) { - return new AirbyteMessageVersionedMigrator<>(this.airbyteMessageMigrator, version); + return new AirbyteMessageVersionedMigrator<>(airbyteMessageMigrator, version); } public final VersionedProtocolSerializer getProtocolSerializer(final Version version) { From f9717cd59bae7fccca4e7ef2c94ad0a01f55e7e7 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Mon, 9 Jan 2023 17:29:44 -0800 Subject: [PATCH 7/8] Add missing docs --- .../commons/protocol/VersionedProtocolSerializer.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/VersionedProtocolSerializer.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/VersionedProtocolSerializer.java index c595f3ca8f1a..a53c068a5984 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/VersionedProtocolSerializer.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/VersionedProtocolSerializer.java @@ -8,6 +8,12 @@ import io.airbyte.commons.version.Version; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +/** + * Serialize a ConfiguredAirbyteCatalog to the specified version + *

+ * This Serializer expects a ConfiguredAirbyteCatalog from the Current version of the platform, + * converts it to the target protocol version before serializing it. + */ public class VersionedProtocolSerializer implements ProtocolSerializer { private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator; From c083b64340de3ee3df25a7811de339cc509772d7 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Mon, 9 Jan 2023 17:31:11 -0800 Subject: [PATCH 8/8] Typo --- .../protocol/migrations/ConfiguredAirbyteCatalogMigration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigration.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigration.java index 49b99dfef7c5..fa3e32862d87 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigration.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/ConfiguredAirbyteCatalogMigration.java @@ -7,7 +7,7 @@ public interface ConfiguredAirbyteCatalogMigration extends Migration { /** - * Downgrades a ConfiguredAirbyteCatalog to from the new version to the old version + * Downgrades a ConfiguredAirbyteCatalog from the new version to the old version * * @param message: the ConfiguredAirbyteCatalog to downgrade * @return the downgraded ConfiguredAirbyteCatalog