diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java index 60e29644ef17..ce611221ee51 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java @@ -46,4 +46,6 @@ public interface ConfigPersistence { Map> dumpConfigs() throws IOException; + void loadData(ConfigPersistence seedPersistence) throws IOException; + } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index ae463693f10b..4b32d52784e6 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -248,4 +248,8 @@ public Map> dumpConfigs() throws IOException { return persistence.dumpConfigs(); } + public void loadData(ConfigPersistence seedPersistence) throws IOException { + persistence.loadData(seedPersistence); + } + } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index 29e12e1cc107..137f3d3d0233 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -76,7 +76,8 @@ public DatabaseConfigPersistence(Database database) { /** * Load or update the configs from the seed. */ - public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistence) throws IOException { + @Override + public void loadData(ConfigPersistence seedConfigPersistence) throws IOException { database.transaction(ctx -> { boolean isInitialized = ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where()); if (isInitialized) { @@ -86,7 +87,6 @@ public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistenc } return null; }); - return this; } public ValidatingConfigPersistence withValidation() { diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java index 309a0fd4d3a7..9c95b4f8da20 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java @@ -31,6 +31,7 @@ import io.airbyte.config.AirbyteConfig; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; @@ -198,6 +199,11 @@ public void replaceAllConfigs(Map> configs, boolean dry LOGGER.info("Deleted {}", oldConfigsDir); } + @Override + public void loadData(ConfigPersistence seedPersistence) throws IOException { + throw new UnsupportedEncodingException("This method is not supported in this implementation"); + } + private T getConfigInternal(AirbyteConfig configType, String configId, Class clazz) throws ConfigNotFoundException, IOException { // validate file with schema diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java index 785648744d52..145e417626a0 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java @@ -88,6 +88,11 @@ public Map> dumpConfigs() throws IOException { return decoratedPersistence.dumpConfigs(); } + @Override + public void loadData(ConfigPersistence seedPersistence) throws IOException { + decoratedPersistence.loadData(seedPersistence); + } + private void validateJson(T config, AirbyteConfig configType) throws JsonValidationException { JsonNode schema = JsonSchemaValidator.getSchema(configType.getConfigSchemaFile()); schemaValidator.ensure(schema, Jsons.jsonNode(config)); diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java index fdfe3fae749a..8bbff9c06215 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java @@ -130,4 +130,9 @@ public Map> dumpConfigs() { e -> e.getValue().values().stream())); } + @Override + public void loadData(ConfigPersistence seedPersistence) throws IOException { + throw new UnsupportedOperationException("The seed config persistence is read only."); + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java index f3dcbce2bc41..e6da55223c9e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java @@ -62,14 +62,11 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.Spliterator; @@ -131,7 +128,7 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist // 2. dry run try { checkImport(targetVersion, sourceRoot); - importConfigsFromArchive(sourceRoot, seedPersistence, true); + importConfigsFromArchive(sourceRoot, true); } catch (Exception e) { LOGGER.error("Dry run failed.", e); throw e; @@ -140,8 +137,9 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist // 3. Import Postgres content importDatabaseFromArchive(sourceRoot, targetVersion); - // 4. Import Configs - importConfigsFromArchive(sourceRoot, seedPersistence, false); + // 4. Import Configs and update connector definitions + importConfigsFromArchive(sourceRoot, false); + configRepository.loadData(seedPersistence); // 5. Set DB version LOGGER.info("Setting the DB Airbyte version to : " + targetVersion); @@ -158,7 +156,7 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist configRepository.listStandardWorkspaces(true).forEach(workspace -> TrackingClientSingleton.get().identify(workspace.getWorkspaceId())); } - private void checkImport(String targetVersion, Path tempFolder) throws IOException, JsonValidationException { + private void checkImport(String targetVersion, Path tempFolder) throws IOException { final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME); final String importVersion = Files.readString(versionFile, Charset.defaultCharset()) .replace("\n", "").strip(); @@ -179,21 +177,10 @@ private List listDirectories(Path sourceRoot) throws IOException { } } - private void importConfigsFromArchive(final Path sourceRoot, ConfigPersistence seedPersistence, final boolean dryRun) - throws IOException, JsonValidationException { - final Set sourceDefinitionsInUse = new HashSet<>(); - final Set destinationDefinitionsInUse = new HashSet<>(); - final boolean[] sourceProcessed = {false}; - final boolean[] destinationProcessed = {false}; + private void importConfigsFromArchive(final Path sourceRoot, final boolean dryRun) throws IOException { final List directories = listDirectories(sourceRoot); - // We sort the directories because we want to process SOURCE_CONNECTION before - // STANDARD_SOURCE_DEFINITION and DESTINATION_CONNECTION before STANDARD_DESTINATION_DEFINITION - // so that we can identify which definitions should not be upgraded to the latest version - Collections.sort(directories); final Map> data = new LinkedHashMap<>(); - final Map> seeds = getSeeds(seedPersistence); - for (final String directory : directories) { final Optional configSchemaOptional = Enums.toEnum(directory.replace(".yaml", ""), ConfigSchema.class); @@ -202,90 +189,11 @@ private void importConfigsFromArchive(final Path sourceRoot, ConfigPersisten } final ConfigSchema configSchema = configSchemaOptional.get(); - Stream configs = readConfigsFromArchive(sourceRoot, configSchema); - - // If there is no source or destination connection, mark them as processed respectively. - if (configSchema == ConfigSchema.STANDARD_SOURCE_DEFINITION && !data.containsKey(ConfigSchema.SOURCE_CONNECTION)) { - sourceProcessed[0] = true; - } else if (configSchema == ConfigSchema.STANDARD_DESTINATION_DEFINITION && !data.containsKey(ConfigSchema.DESTINATION_CONNECTION)) { - destinationProcessed[0] = true; - } - - configs = streamWithAdditionalOperation( - sourceDefinitionsInUse, - destinationDefinitionsInUse, - sourceProcessed, - destinationProcessed, - configSchema, - configs, - seeds); - data.put(configSchema, configs); + data.put(configSchema, readConfigsFromArchive(sourceRoot, configSchema)); } configRepository.replaceAllConfigs(data, dryRun); } - /** - * Convert config dumps from {@link ConfigPersistence#dumpConfigs} to the desired format. - */ - @SuppressWarnings("unchecked") - private static Map> getSeeds(ConfigPersistence configSeedPersistence) throws IOException { - Map> allData = new HashMap<>(2); - for (Map.Entry> configStream : configSeedPersistence.dumpConfigs().entrySet()) { - ConfigSchema configSchema = ConfigSchema.valueOf(configStream.getKey()); - Map configSeeds = configStream.getValue() - .map(node -> Jsons.object(node, configSchema.getClassName())) - .collect(Collectors.toMap( - configSchema::getId, - object -> object)); - allData.put(configSchema, configSeeds); - } - return allData; - } - - private Stream streamWithAdditionalOperation(Set sourceDefinitionsInUse, - Set destinationDefinitionsInUse, - boolean[] sourceProcessed, - boolean[] destinationProcessed, - ConfigSchema configSchema, - Stream configs, - Map> latestSeeds) { - if (configSchema == ConfigSchema.SOURCE_CONNECTION) { - sourceProcessed[0] = true; - configs = configs.peek(config -> sourceDefinitionsInUse.add(((SourceConnection) config).getSourceDefinitionId().toString())); - } else if (configSchema == ConfigSchema.DESTINATION_CONNECTION) { - destinationProcessed[0] = true; - configs = configs.peek(config -> destinationDefinitionsInUse.add(((DestinationConnection) config).getDestinationDefinitionId().toString())); - } else if (configSchema == ConfigSchema.STANDARD_SOURCE_DEFINITION) { - Map sourceDefinitionSeeds = latestSeeds.get(configSchema); - configs = getDefinitionStream(sourceDefinitionsInUse, sourceProcessed[0], configSchema, configs, sourceDefinitionSeeds); - } else if (configSchema == ConfigSchema.STANDARD_DESTINATION_DEFINITION) { - Map destinationDefinitionSeeds = latestSeeds.get(configSchema); - configs = getDefinitionStream(destinationDefinitionsInUse, destinationProcessed[0], configSchema, configs, destinationDefinitionSeeds); - } - return configs; - } - - /** - * This method combines the latest definitions with existing ones. If a connector is being used by - * user, it will continue to be at the same version, otherwise it will be migrated to the latest - * version - */ - private Stream getDefinitionStream(Set definitionsInUse, - boolean definitionsPopulated, - ConfigSchema configSchema, - Stream currentDefinitions, - Map latestDefinitions) { - if (!definitionsPopulated) { - throw new RuntimeException("Trying to process " + configSchema + " without populating the definitions in use"); - } - - return Streams.concat( - // Keep all the definitions in use - currentDefinitions.filter(c -> definitionsInUse.contains(configSchema.getId(c))), - // Upgrade all the definitions not in use - latestDefinitions.entrySet().stream().filter(c -> !definitionsInUse.contains(c.getKey())).map(Entry::getValue)); - } - private Stream readConfigsFromArchive(final Path storageRoot, final ConfigSchema schemaType) throws IOException { diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 48a776415bbe..e4dc92e004af 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -33,7 +33,6 @@ import io.airbyte.config.EnvConfigs; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.helpers.LogClientSingleton; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.ConfigSeedProvider; import io.airbyte.config.persistence.DatabaseConfigPersistence; @@ -179,10 +178,9 @@ public static ServerRunnable getServer(ServerFactory apiFactory) throws Exceptio configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) .getAndInitialize(); - final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase) - .loadData(ConfigSeedProvider.get(configs)) - .withValidation(); - final ConfigRepository configRepository = new ConfigRepository(configPersistence); + final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase); + configPersistence.loadData(ConfigSeedProvider.get(configs)); + final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation()); LOGGER.info("Creating Scheduler persistence..."); final Database jobDatabase = new JobsDatabaseInstance(