Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hide config persistence #18803

Merged
merged 3 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import io.airbyte.config.init.DefinitionsProvider;
import io.airbyte.config.init.LocalDefinitionsProvider;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
Expand Down Expand Up @@ -193,10 +191,6 @@ private static Database getConfigDatabase(final DSLContext dslContext) throws IO
return new Database(dslContext);
}

private static ConfigPersistence getConfigPersistence(final Database configDatabase) throws IOException {
return DatabaseConfigPersistence.createWithValidation(configDatabase);
}

private static DefinitionsProvider getLocalDefinitionsProvider() throws IOException {
return new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
}
Expand All @@ -212,7 +206,7 @@ private static JobPersistence getJobPersistence(final Database jobDatabase) thro
private void initPersistences(final DSLContext configsDslContext, final DSLContext jobsDslContext) {
try {
configDatabase = getConfigDatabase(configsDslContext);
configRepository = new ConfigRepository(getConfigPersistence(configDatabase), configDatabase);
configRepository = new ConfigRepository(configDatabase);
localDefinitionsProvider = getLocalDefinitionsProvider();
jobDatabase = getJobDatabase(jobsDslContext);
jobPersistence = getJobPersistence(jobDatabase);
Expand All @@ -236,8 +230,7 @@ public static void main(final String[] args) throws Exception {

// TODO Will be converted to an injected singleton during DI migration
final Database configDatabase = getConfigDatabase(configsDslContext);
final ConfigPersistence configPersistence = getConfigPersistence(configDatabase);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase);
final ConfigRepository configRepository = new ConfigRepository(configDatabase);
final Database jobDatabase = getJobDatabase(jobsDslContext);
final JobPersistence jobPersistence = getJobPersistence(jobDatabase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.airbyte.config.init.DefinitionsProvider;
import io.airbyte.config.init.LocalDefinitionsProvider;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence;
Expand Down Expand Up @@ -181,8 +180,7 @@ void testBootloaderAppRunSecretMigration() throws Exception {
val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false);
val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false);

val configPersistence = new DatabaseConfigPersistence(configDatabase);
val configRepository = new ConfigRepository(configPersistence, configDatabase);
val configRepository = new ConfigRepository(configDatabase);
val jobsPersistence = new DefaultJobPersistence(jobDatabase);

val secretsPersistence = SecretPersistence.getLongLived(configsDslContext, mockedConfigs);
Expand Down Expand Up @@ -236,6 +234,7 @@ void testBootloaderAppRunSecretMigration() throws Exception {
.withSourceId(sourceId)
.withName("test source")
.withWorkspaceId(workspaceId)
.withTombstone(false)
.withConfiguration(mapper.readTree(sourceSpecs)));

when(mockedFeatureFlags.forceSecretMigration()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/**
* We are moving migrating away from this interface entirely. Use ConfigRepository instead.
*/
@Deprecated
@Deprecated(forRemoval = true)
public interface ConfigPersistence {

<T> T getConfig(AirbyteConfig configType, String configId, Class<T> clazz) throws ConfigNotFoundException, JsonValidationException, IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ public class ConfigRepository {
private final ExceptionWrappingDatabase database;
private final ActorDefinitionMigrator actorDefinitionMigrator;

public ConfigRepository(final ConfigPersistence persistence, final Database database) {
this(persistence, database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)));
public ConfigRepository(final Database database) {
this(DatabaseConfigPersistence.createWithValidation(database), database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)));
}

@VisibleForTesting
Expand Down Expand Up @@ -579,6 +579,16 @@ public void writeSourceConnectionNoSecrets(final SourceConnection partialSource)
persistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, partialSource.getSourceId().toString(), partialSource);
}

public boolean deleteSource(final UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException {
try {
getSourceConnection(sourceId);
persistence.deleteConfig(ConfigSchema.SOURCE_CONNECTION, sourceId.toString());
return true;
} catch (final ConfigNotFoundException e) {
return false;
}
}

/**
* Returns all sources in the database. Does not contain secrets. To hydrate with secrets see
* { @link SecretsRepositoryReader#listSourceConnectionWithSecrets() }.
Expand Down Expand Up @@ -638,6 +648,16 @@ public void writeDestinationConnectionNoSecrets(final DestinationConnection part
persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, partialDestination.getDestinationId().toString(), partialDestination);
}

public boolean deleteDestination(final UUID destId) throws JsonValidationException, ConfigNotFoundException, IOException {
try {
getDestinationConnection(destId);
persistence.deleteConfig(ConfigSchema.DESTINATION_CONNECTION, destId.toString());
return true;
} catch (final ConfigNotFoundException e) {
return false;
}
}

/**
* Returns all destinations in the database. Does not contain secrets. To hydrate with secrets see
* { @link SecretsRepositoryReader#listDestinationConnectionWithSecrets() }.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated(forRemoval = true)
@SuppressWarnings({"PMD.CyclomaticComplexity", "PMD.CognitiveComplexity", "PMD.NPathComplexity", "PMD.ExcessiveMethodLength",
"PMD.AvoidThrowingRawExceptionTypes", "PMD.ShortVariable", "PMD.LongVariable", "PMD.ExcessiveClassLength", "PMD.AvoidLiteralsInIfCondition"})
public class DatabaseConfigPersistence implements ConfigPersistence {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Validates that json input and outputs for the ConfigPersistence against their schemas.
*/
@SuppressWarnings("PMD.AvoidThrowingRawExceptionTypes")
@Deprecated(forRemoval = true)
public class ValidatingConfigPersistence implements ConfigPersistence {

private final JsonSchemaValidator schemaValidator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition;
import io.airbyte.config.persistence.ConfigRepository.SourceAndDefinition;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.init.DatabaseInitializationException;
Expand Down Expand Up @@ -97,7 +98,7 @@ void setup() throws IOException, JsonValidationException, SQLException, Database
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false);
configPersistence = spy(new DatabaseConfigPersistence(database));
configRepository = spy(new ConfigRepository(configPersistence, database));
configRepository = spy(new ConfigRepository(configPersistence, database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database))));
final ConfigsDatabaseMigrator configsDatabaseMigrator =
new ConfigsDatabaseMigrator(database, flyway);
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -62,7 +63,7 @@ class ConfigRepositoryTest {
void setup() {
configPersistence = mock(ConfigPersistence.class);
database = mock(Database.class);
configRepository = spy(new ConfigRepository(configPersistence, database));
configRepository = spy(new ConfigRepository(configPersistence, database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database))));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.config.State;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.init.DatabaseInitializationException;
Expand Down Expand Up @@ -544,7 +545,10 @@ void afterEach() {
}

private void setupTestData() throws JsonValidationException, IOException {
configRepository = new ConfigRepository(new DatabaseConfigPersistence(database), database);
configRepository = new ConfigRepository(
new DatabaseConfigPersistence(database),
database,
new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)));

final StandardWorkspace workspace = MockData.standardWorkspaces().get(0);
final StandardSourceDefinition sourceDefinition = MockData.publicSourceDefinition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
package io.airbyte.cron.config;

import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.db.Database;
import io.airbyte.db.check.DatabaseMigrationCheck;
import io.airbyte.db.factory.DatabaseCheckFactory;
Expand Down Expand Up @@ -68,15 +65,8 @@ public Flyway configFlyway(@Named("config") final FlywayConfigurationProperties
}

@Singleton
public ConfigPersistence configPersistence(@Named("configDatabase") final Database configDatabase,
final JsonSecretsProcessor jsonSecretsProcessor) {
return DatabaseConfigPersistence.createWithValidation(configDatabase);
}

@Singleton
public ConfigRepository configRepository(@Named("configPersistence") final ConfigPersistence configPersistence,
@Named("configDatabase") final Database configDatabase) {
return new ConfigRepository(configPersistence, configDatabase);
public ConfigRepository configRepository(@Named("configDatabase") final Database configDatabase) {
return new ConfigRepository(configDatabase);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.StreamResetPersistence;
Expand Down Expand Up @@ -189,11 +187,10 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,

LOGGER.info("Creating config repository...");
final Database configsDatabase = new Database(configsDslContext);
final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configsDatabase);
final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configsDslContext, configs);
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configsDslContext, configs);
final Optional<SecretPersistence> ephemeralSecretPersistence = SecretPersistence.getEphemeral(configsDslContext, configs);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, configsDatabase);
final ConfigRepository configRepository = new ConfigRepository(configsDatabase);
final SecretsRepositoryReader secretsRepositoryReader = new SecretsRepositoryReader(configRepository, secretsHydrator);
final SecretsRepositoryWriter secretsRepositoryWriter =
new SecretsRepositoryWriter(configRepository, secretPersistence, ephemeralSecretPersistence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
package io.airbyte.workers.config;

import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.db.Database;
import io.airbyte.db.check.DatabaseMigrationCheck;
import io.airbyte.db.check.impl.JobsDatabaseAvailabilityCheck;
Expand Down Expand Up @@ -90,16 +87,8 @@ public Flyway jobsFlyway(@Named("jobs") final FlywayConfigurationProperties jobs

@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public ConfigPersistence configPersistence(@Named("configDatabase") final Database configDatabase,
final JsonSecretsProcessor jsonSecretsProcessor) {
return DatabaseConfigPersistence.createWithValidation(configDatabase);
}

@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public ConfigRepository configRepository(@Named("configPersistence") final ConfigPersistence configPersistence,
@Named("configDatabase") final Database configDatabase) {
return new ConfigRepository(configPersistence, configDatabase);
public ConfigRepository configRepository(@Named("configDatabase") final Database configDatabase) {
return new ConfigRepository(configDatabase);
}

@Singleton
Expand Down