Skip to content

Commit

Permalink
Separate connector upgrade from import (#5965)
Browse files Browse the repository at this point in the history
* Remove connector update in dump importer

* Remove seed persistence

* Update connector definition with loadData method

* Add override annotation

* Pass in seed persistence

* Remove import

* Restore parameter order

* Throw exception in FileSystemConfigPersistence#loadData
  • Loading branch information
tuliren authored Sep 10, 2021
1 parent 332687a commit b53d826
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ public interface ConfigPersistence {

Map<String, Stream<JsonNode>> dumpConfigs() throws IOException;

void loadData(ConfigPersistence seedPersistence) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,8 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
return persistence.dumpConfigs();
}

public void loadData(ConfigPersistence seedPersistence) throws IOException {
persistence.loadData(seedPersistence);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public DatabaseConfigPersistence(Database database) {
/**
* Load or update the configs from the seed.
*/
public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistence) throws IOException {
@Override
public void loadData(ConfigPersistence seedConfigPersistence) throws IOException {
database.transaction(ctx -> {
boolean isInitialized = ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where());
if (isInitialized) {
Expand All @@ -86,7 +87,6 @@ public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistenc
}
return null;
});
return this;
}

public ValidatingConfigPersistence withValidation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.config.AirbyteConfig;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
Expand Down Expand Up @@ -198,6 +199,11 @@ public void replaceAllConfigs(Map<AirbyteConfig, Stream<?>> configs, boolean dry
LOGGER.info("Deleted {}", oldConfigsDir);
}

@Override
public void loadData(ConfigPersistence seedPersistence) throws IOException {
throw new UnsupportedEncodingException("This method is not supported in this implementation");
}

private <T> T getConfigInternal(AirbyteConfig configType, String configId, Class<T> clazz)
throws ConfigNotFoundException, IOException {
// validate file with schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
return decoratedPersistence.dumpConfigs();
}

@Override
public void loadData(ConfigPersistence seedPersistence) throws IOException {
decoratedPersistence.loadData(seedPersistence);
}

private <T> void validateJson(T config, AirbyteConfig configType) throws JsonValidationException {
JsonNode schema = JsonSchemaValidator.getSchema(configType.getConfigSchemaFile());
schemaValidator.ensure(schema, Jsons.jsonNode(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,9 @@ public Map<String, Stream<JsonNode>> dumpConfigs() {
e -> e.getValue().values().stream()));
}

@Override
public void loadData(ConfigPersistence seedPersistence) throws IOException {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,11 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
Expand Down Expand Up @@ -131,7 +128,7 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist
// 2. dry run
try {
checkImport(targetVersion, sourceRoot);
importConfigsFromArchive(sourceRoot, seedPersistence, true);
importConfigsFromArchive(sourceRoot, true);
} catch (Exception e) {
LOGGER.error("Dry run failed.", e);
throw e;
Expand All @@ -140,8 +137,9 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist
// 3. Import Postgres content
importDatabaseFromArchive(sourceRoot, targetVersion);

// 4. Import Configs
importConfigsFromArchive(sourceRoot, seedPersistence, false);
// 4. Import Configs and update connector definitions
importConfigsFromArchive(sourceRoot, false);
configRepository.loadData(seedPersistence);

// 5. Set DB version
LOGGER.info("Setting the DB Airbyte version to : " + targetVersion);
Expand All @@ -158,7 +156,7 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist
configRepository.listStandardWorkspaces(true).forEach(workspace -> TrackingClientSingleton.get().identify(workspace.getWorkspaceId()));
}

private void checkImport(String targetVersion, Path tempFolder) throws IOException, JsonValidationException {
private void checkImport(String targetVersion, Path tempFolder) throws IOException {
final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME);
final String importVersion = Files.readString(versionFile, Charset.defaultCharset())
.replace("\n", "").strip();
Expand All @@ -179,21 +177,10 @@ private List<String> listDirectories(Path sourceRoot) throws IOException {
}
}

private <T> void importConfigsFromArchive(final Path sourceRoot, ConfigPersistence seedPersistence, final boolean dryRun)
throws IOException, JsonValidationException {
final Set<String> sourceDefinitionsInUse = new HashSet<>();
final Set<String> destinationDefinitionsInUse = new HashSet<>();
final boolean[] sourceProcessed = {false};
final boolean[] destinationProcessed = {false};
private void importConfigsFromArchive(final Path sourceRoot, final boolean dryRun) throws IOException {
final List<String> directories = listDirectories(sourceRoot);
// We sort the directories because we want to process SOURCE_CONNECTION before
// STANDARD_SOURCE_DEFINITION and DESTINATION_CONNECTION before STANDARD_DESTINATION_DEFINITION
// so that we can identify which definitions should not be upgraded to the latest version
Collections.sort(directories);
final Map<AirbyteConfig, Stream<?>> data = new LinkedHashMap<>();

final Map<ConfigSchema, Map<String, ?>> seeds = getSeeds(seedPersistence);

for (final String directory : directories) {
final Optional<ConfigSchema> configSchemaOptional = Enums.toEnum(directory.replace(".yaml", ""), ConfigSchema.class);

Expand All @@ -202,90 +189,11 @@ private <T> void importConfigsFromArchive(final Path sourceRoot, ConfigPersisten
}

final ConfigSchema configSchema = configSchemaOptional.get();
Stream<?> configs = readConfigsFromArchive(sourceRoot, configSchema);

// If there is no source or destination connection, mark them as processed respectively.
if (configSchema == ConfigSchema.STANDARD_SOURCE_DEFINITION && !data.containsKey(ConfigSchema.SOURCE_CONNECTION)) {
sourceProcessed[0] = true;
} else if (configSchema == ConfigSchema.STANDARD_DESTINATION_DEFINITION && !data.containsKey(ConfigSchema.DESTINATION_CONNECTION)) {
destinationProcessed[0] = true;
}

configs = streamWithAdditionalOperation(
sourceDefinitionsInUse,
destinationDefinitionsInUse,
sourceProcessed,
destinationProcessed,
configSchema,
configs,
seeds);
data.put(configSchema, configs);
data.put(configSchema, readConfigsFromArchive(sourceRoot, configSchema));
}
configRepository.replaceAllConfigs(data, dryRun);
}

/**
* Convert config dumps from {@link ConfigPersistence#dumpConfigs} to the desired format.
*/
@SuppressWarnings("unchecked")
private static Map<ConfigSchema, Map<String, ?>> getSeeds(ConfigPersistence configSeedPersistence) throws IOException {
Map<ConfigSchema, Map<String, ?>> allData = new HashMap<>(2);
for (Map.Entry<String, Stream<JsonNode>> configStream : configSeedPersistence.dumpConfigs().entrySet()) {
ConfigSchema configSchema = ConfigSchema.valueOf(configStream.getKey());
Map<String, ?> configSeeds = configStream.getValue()
.map(node -> Jsons.object(node, configSchema.getClassName()))
.collect(Collectors.toMap(
configSchema::getId,
object -> object));
allData.put(configSchema, configSeeds);
}
return allData;
}

private Stream<?> streamWithAdditionalOperation(Set<String> sourceDefinitionsInUse,
Set<String> destinationDefinitionsInUse,
boolean[] sourceProcessed,
boolean[] destinationProcessed,
ConfigSchema configSchema,
Stream<?> configs,
Map<ConfigSchema, Map<String, ?>> latestSeeds) {
if (configSchema == ConfigSchema.SOURCE_CONNECTION) {
sourceProcessed[0] = true;
configs = configs.peek(config -> sourceDefinitionsInUse.add(((SourceConnection) config).getSourceDefinitionId().toString()));
} else if (configSchema == ConfigSchema.DESTINATION_CONNECTION) {
destinationProcessed[0] = true;
configs = configs.peek(config -> destinationDefinitionsInUse.add(((DestinationConnection) config).getDestinationDefinitionId().toString()));
} else if (configSchema == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
Map<String, ?> sourceDefinitionSeeds = latestSeeds.get(configSchema);
configs = getDefinitionStream(sourceDefinitionsInUse, sourceProcessed[0], configSchema, configs, sourceDefinitionSeeds);
} else if (configSchema == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
Map<String, ?> destinationDefinitionSeeds = latestSeeds.get(configSchema);
configs = getDefinitionStream(destinationDefinitionsInUse, destinationProcessed[0], configSchema, configs, destinationDefinitionSeeds);
}
return configs;
}

/**
* This method combines the latest definitions with existing ones. If a connector is being used by
* user, it will continue to be at the same version, otherwise it will be migrated to the latest
* version
*/
private Stream<?> getDefinitionStream(Set<String> definitionsInUse,
boolean definitionsPopulated,
ConfigSchema configSchema,
Stream<?> currentDefinitions,
Map<String, ?> latestDefinitions) {
if (!definitionsPopulated) {
throw new RuntimeException("Trying to process " + configSchema + " without populating the definitions in use");
}

return Streams.concat(
// Keep all the definitions in use
currentDefinitions.filter(c -> definitionsInUse.contains(configSchema.getId(c))),
// Upgrade all the definitions not in use
latestDefinitions.entrySet().stream().filter(c -> !definitionsInUse.contains(c.getKey())).map(Entry::getValue));
}

private <T> Stream<T> readConfigsFromArchive(final Path storageRoot, final ConfigSchema schemaType)
throws IOException {

Expand Down
8 changes: 3 additions & 5 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.ConfigSeedProvider;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
Expand Down Expand Up @@ -179,10 +178,9 @@ public static ServerRunnable getServer(ServerFactory apiFactory) throws Exceptio
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl())
.getAndInitialize();
final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase)
.loadData(ConfigSeedProvider.get(configs))
.withValidation();
final ConfigRepository configRepository = new ConfigRepository(configPersistence);
final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase);
configPersistence.loadData(ConfigSeedProvider.get(configs));
final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation());

LOGGER.info("Creating Scheduler persistence...");
final Database jobDatabase = new JobsDatabaseInstance(
Expand Down

0 comments on commit b53d826

Please sign in to comment.