From c463ec9226220979d364313c3aa99359373b819d Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 1 Nov 2022 11:47:42 -0700 Subject: [PATCH 1/3] hide ConfigPersistence inside ConfigRepository as an implementation detail --- .../java/io/airbyte/bootloader/BootloaderApp.java | 11 ++--------- .../io/airbyte/bootloader/BootloaderAppTest.java | 4 +--- .../config/persistence/ConfigRepository.java | 4 ++-- .../ConfigRepositoryE2EReadWriteTest.java | 3 ++- .../config/persistence/ConfigRepositoryTest.java | 3 ++- .../config/persistence/StatePersistenceTest.java | 6 +++++- .../airbyte/cron/config/DatabaseBeanFactory.java | 14 ++------------ .../main/java/io/airbyte/server/ServerApp.java | 5 +---- .../workers/config/DatabaseBeanFactory.java | 15 ++------------- 9 files changed, 19 insertions(+), 46 deletions(-) diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java index 5d6d5067f983..9d3d8cfac7b2 100644 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java @@ -18,9 +18,7 @@ import io.airbyte.config.init.DefinitionsProvider; import io.airbyte.config.init.LocalDefinitionsProvider; import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.SecretsRepositoryReader; import io.airbyte.config.persistence.SecretsRepositoryWriter; import io.airbyte.config.persistence.split_secrets.SecretPersistence; @@ -193,10 +191,6 @@ private static Database getConfigDatabase(final DSLContext dslContext) throws IO return new Database(dslContext); } - private static ConfigPersistence getConfigPersistence(final Database configDatabase) throws IOException { - return DatabaseConfigPersistence.createWithValidation(configDatabase); - } - private static DefinitionsProvider getLocalDefinitionsProvider() throws IOException { return new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); } @@ -212,7 +206,7 @@ private static JobPersistence getJobPersistence(final Database jobDatabase) thro private void initPersistences(final DSLContext configsDslContext, final DSLContext jobsDslContext) { try { configDatabase = getConfigDatabase(configsDslContext); - configRepository = new ConfigRepository(getConfigPersistence(configDatabase), configDatabase); + configRepository = new ConfigRepository(configDatabase); localDefinitionsProvider = getLocalDefinitionsProvider(); jobDatabase = getJobDatabase(jobsDslContext); jobPersistence = getJobPersistence(jobDatabase); @@ -236,8 +230,7 @@ public static void main(final String[] args) throws Exception { // TODO Will be converted to an injected singleton during DI migration final Database configDatabase = getConfigDatabase(configsDslContext); - final ConfigPersistence configPersistence = getConfigPersistence(configDatabase); - final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase); + final ConfigRepository configRepository = new ConfigRepository(configDatabase); final Database jobDatabase = getJobDatabase(jobsDslContext); final JobPersistence jobPersistence = getJobPersistence(jobDatabase); diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index ecd6cf516ade..c9d9b379357d 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -27,7 +27,6 @@ import io.airbyte.config.init.DefinitionsProvider; import io.airbyte.config.init.LocalDefinitionsProvider; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.SecretsRepositoryReader; import io.airbyte.config.persistence.SecretsRepositoryWriter; import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence; @@ -181,8 +180,7 @@ void testBootloaderAppRunSecretMigration() throws Exception { val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false); val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false); - val configPersistence = new DatabaseConfigPersistence(configDatabase); - val configRepository = new ConfigRepository(configPersistence, configDatabase); + val configRepository = new ConfigRepository(configDatabase); val jobsPersistence = new DefaultJobPersistence(jobDatabase); val secretsPersistence = SecretPersistence.getLongLived(configsDslContext, mockedConfigs); diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index eca206641691..84a8b66300fc 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -93,8 +93,8 @@ public class ConfigRepository { private final ExceptionWrappingDatabase database; private final ActorDefinitionMigrator actorDefinitionMigrator; - public ConfigRepository(final ConfigPersistence persistence, final Database database) { - this(persistence, database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database))); + public ConfigRepository(final Database database) { + this(DatabaseConfigPersistence.createWithValidation(database), database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database))); } @VisibleForTesting diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java index dac71a7f4a2d..c33dbb21ba3b 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java @@ -32,6 +32,7 @@ import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition; import io.airbyte.config.persistence.ConfigRepository.SourceAndDefinition; import io.airbyte.db.Database; +import io.airbyte.db.ExceptionWrappingDatabase; import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.FlywayFactory; import io.airbyte.db.init.DatabaseInitializationException; @@ -97,7 +98,7 @@ void setup() throws IOException, JsonValidationException, SQLException, Database ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false); configPersistence = spy(new DatabaseConfigPersistence(database)); - configRepository = spy(new ConfigRepository(configPersistence, database)); + configRepository = spy(new ConfigRepository(configPersistence, database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)))); final ConfigsDatabaseMigrator configsDatabaseMigrator = new ConfigsDatabaseMigrator(database, flyway); final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator); diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java index d4507005049c..1893b7a193e3 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java @@ -29,6 +29,7 @@ import io.airbyte.config.StandardWorkspace; import io.airbyte.config.State; import io.airbyte.db.Database; +import io.airbyte.db.ExceptionWrappingDatabase; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -62,7 +63,7 @@ class ConfigRepositoryTest { void setup() { configPersistence = mock(ConfigPersistence.class); database = mock(Database.class); - configRepository = spy(new ConfigRepository(configPersistence, database)); + configRepository = spy(new ConfigRepository(configPersistence, database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)))); } @AfterEach diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java index 88357923de47..a3a888f9530d 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java @@ -16,6 +16,7 @@ import io.airbyte.config.State; import io.airbyte.config.StateType; import io.airbyte.config.StateWrapper; +import io.airbyte.db.ExceptionWrappingDatabase; import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.FlywayFactory; import io.airbyte.db.init.DatabaseInitializationException; @@ -544,7 +545,10 @@ void afterEach() { } private void setupTestData() throws JsonValidationException, IOException { - configRepository = new ConfigRepository(new DatabaseConfigPersistence(database), database); + configRepository = new ConfigRepository( + new DatabaseConfigPersistence(database), + database, + new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database))); final StandardWorkspace workspace = MockData.standardWorkspaces().get(0); final StandardSourceDefinition sourceDefinition = MockData.publicSourceDefinition(); diff --git a/airbyte-cron/src/main/java/io/airbyte/cron/config/DatabaseBeanFactory.java b/airbyte-cron/src/main/java/io/airbyte/cron/config/DatabaseBeanFactory.java index 4def28600908..fc791b1c3e62 100644 --- a/airbyte-cron/src/main/java/io/airbyte/cron/config/DatabaseBeanFactory.java +++ b/airbyte-cron/src/main/java/io/airbyte/cron/config/DatabaseBeanFactory.java @@ -5,11 +5,8 @@ package io.airbyte.cron.config; import io.airbyte.commons.temporal.config.WorkerMode; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.StreamResetPersistence; -import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; import io.airbyte.db.Database; import io.airbyte.db.check.DatabaseMigrationCheck; import io.airbyte.db.factory.DatabaseCheckFactory; @@ -68,15 +65,8 @@ public Flyway configFlyway(@Named("config") final FlywayConfigurationProperties } @Singleton - public ConfigPersistence configPersistence(@Named("configDatabase") final Database configDatabase, - final JsonSecretsProcessor jsonSecretsProcessor) { - return DatabaseConfigPersistence.createWithValidation(configDatabase); - } - - @Singleton - public ConfigRepository configRepository(@Named("configPersistence") final ConfigPersistence configPersistence, - @Named("configDatabase") final Database configDatabase) { - return new ConfigRepository(configPersistence, configDatabase); + public ConfigRepository configRepository(@Named("configDatabase") final Database configDatabase) { + return new ConfigRepository(configDatabase); } @Singleton diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 902519c1d77f..06109721f2f7 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -22,9 +22,7 @@ import io.airbyte.config.StandardSync.Status; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.SecretsRepositoryReader; import io.airbyte.config.persistence.SecretsRepositoryWriter; import io.airbyte.config.persistence.StreamResetPersistence; @@ -189,11 +187,10 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, LOGGER.info("Creating config repository..."); final Database configsDatabase = new Database(configsDslContext); - final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configsDatabase); final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configsDslContext, configs); final Optional secretPersistence = SecretPersistence.getLongLived(configsDslContext, configs); final Optional ephemeralSecretPersistence = SecretPersistence.getEphemeral(configsDslContext, configs); - final ConfigRepository configRepository = new ConfigRepository(configPersistence, configsDatabase); + final ConfigRepository configRepository = new ConfigRepository(configsDatabase); final SecretsRepositoryReader secretsRepositoryReader = new SecretsRepositoryReader(configRepository, secretsHydrator); final SecretsRepositoryWriter secretsRepositoryWriter = new SecretsRepositoryWriter(configRepository, secretPersistence, ephemeralSecretPersistence); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/DatabaseBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/DatabaseBeanFactory.java index e3e240b23dea..d203f2b24ef0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/DatabaseBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/DatabaseBeanFactory.java @@ -5,12 +5,9 @@ package io.airbyte.workers.config; import io.airbyte.commons.temporal.config.WorkerMode; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.StatePersistence; import io.airbyte.config.persistence.StreamResetPersistence; -import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; import io.airbyte.db.Database; import io.airbyte.db.check.DatabaseMigrationCheck; import io.airbyte.db.check.impl.JobsDatabaseAvailabilityCheck; @@ -90,16 +87,8 @@ public Flyway jobsFlyway(@Named("jobs") final FlywayConfigurationProperties jobs @Singleton @Requires(env = WorkerMode.CONTROL_PLANE) - public ConfigPersistence configPersistence(@Named("configDatabase") final Database configDatabase, - final JsonSecretsProcessor jsonSecretsProcessor) { - return DatabaseConfigPersistence.createWithValidation(configDatabase); - } - - @Singleton - @Requires(env = WorkerMode.CONTROL_PLANE) - public ConfigRepository configRepository(@Named("configPersistence") final ConfigPersistence configPersistence, - @Named("configDatabase") final Database configDatabase) { - return new ConfigRepository(configPersistence, configDatabase); + public ConfigRepository configRepository(@Named("configDatabase") final Database configDatabase) { + return new ConfigRepository(configDatabase); } @Singleton From 64db7986dadd0493ebbb3fdaa8ba54dca1d75182 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 1 Nov 2022 14:20:23 -0700 Subject: [PATCH 2/3] clean up --- .../src/test/java/io/airbyte/bootloader/BootloaderAppTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index c9d9b379357d..c9697aa427ae 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -234,6 +234,7 @@ void testBootloaderAppRunSecretMigration() throws Exception { .withSourceId(sourceId) .withName("test source") .withWorkspaceId(workspaceId) + .withTombstone(false) .withConfiguration(mapper.readTree(sourceSpecs))); when(mockedFeatureFlags.forceSecretMigration()).thenReturn(false); From 101f877bcbb321c8f7b3f86c0fe753f521203e69 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 1 Nov 2022 15:22:01 -0700 Subject: [PATCH 3/3] add delete source / destination --- .../config/persistence/ConfigPersistence.java | 2 +- .../config/persistence/ConfigRepository.java | 20 +++++++++++++++++++ .../DatabaseConfigPersistence.java | 1 + .../ValidatingConfigPersistence.java | 1 + 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java index 9849c4ed7b65..abfdbe9b7136 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java @@ -14,7 +14,7 @@ /** * We are moving migrating away from this interface entirely. Use ConfigRepository instead. */ -@Deprecated +@Deprecated(forRemoval = true) public interface ConfigPersistence { T getConfig(AirbyteConfig configType, String configId, Class clazz) throws ConfigNotFoundException, JsonValidationException, IOException; diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 84a8b66300fc..24f2090cf866 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -579,6 +579,16 @@ public void writeSourceConnectionNoSecrets(final SourceConnection partialSource) persistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, partialSource.getSourceId().toString(), partialSource); } + public boolean deleteSource(final UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException { + try { + getSourceConnection(sourceId); + persistence.deleteConfig(ConfigSchema.SOURCE_CONNECTION, sourceId.toString()); + return true; + } catch (final ConfigNotFoundException e) { + return false; + } + } + /** * Returns all sources in the database. Does not contain secrets. To hydrate with secrets see * { @link SecretsRepositoryReader#listSourceConnectionWithSecrets() }. @@ -638,6 +648,16 @@ public void writeDestinationConnectionNoSecrets(final DestinationConnection part persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, partialDestination.getDestinationId().toString(), partialDestination); } + public boolean deleteDestination(final UUID destId) throws JsonValidationException, ConfigNotFoundException, IOException { + try { + getDestinationConnection(destId); + persistence.deleteConfig(ConfigSchema.DESTINATION_CONNECTION, destId.toString()); + return true; + } catch (final ConfigNotFoundException e) { + return false; + } + } + /** * Returns all destinations in the database. Does not contain secrets. To hydrate with secrets see * { @link SecretsRepositoryReader#listDestinationConnectionWithSecrets() }. diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index 68b2469306d8..8313465964c7 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -79,6 +79,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Deprecated(forRemoval = true) @SuppressWarnings({"PMD.CyclomaticComplexity", "PMD.CognitiveComplexity", "PMD.NPathComplexity", "PMD.ExcessiveMethodLength", "PMD.AvoidThrowingRawExceptionTypes", "PMD.ShortVariable", "PMD.LongVariable", "PMD.ExcessiveClassLength", "PMD.AvoidLiteralsInIfCondition"}) public class DatabaseConfigPersistence implements ConfigPersistence { diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java index 0e2dcbac7f9b..c29e15dbb453 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java @@ -19,6 +19,7 @@ * Validates that json input and outputs for the ConfigPersistence against their schemas. */ @SuppressWarnings("PMD.AvoidThrowingRawExceptionTypes") +@Deprecated(forRemoval = true) public class ValidatingConfigPersistence implements ConfigPersistence { private final JsonSchemaValidator schemaValidator;