Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate connector upgrade from import #5965

Merged
merged 9 commits into from
Sep 10, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ public interface ConfigPersistence {

Map<String, Stream<JsonNode>> dumpConfigs() throws IOException;

void loadData(ConfigPersistence seedPersistence) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,8 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
return persistence.dumpConfigs();
}

public void loadData(ConfigPersistence seedPersistence) throws IOException {
persistence.loadData(seedPersistence);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public DatabaseConfigPersistence(Database database) {
/**
* Load or update the configs from the seed.
*/
public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistence) throws IOException {
@Override
public void loadData(ConfigPersistence seedConfigPersistence) throws IOException {
database.transaction(ctx -> {
boolean isInitialized = ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where());
if (isInitialized) {
Expand All @@ -86,7 +87,6 @@ public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistenc
}
return null;
});
return this;
}

public ValidatingConfigPersistence withValidation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.config.AirbyteConfig;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
Expand Down Expand Up @@ -198,6 +199,11 @@ public void replaceAllConfigs(Map<AirbyteConfig, Stream<?>> configs, boolean dry
LOGGER.info("Deleted {}", oldConfigsDir);
}

@Override
public void loadData(ConfigPersistence seedPersistence) throws IOException {
throw new UnsupportedEncodingException("This method is not supported in this implementation");
}

private <T> T getConfigInternal(AirbyteConfig configType, String configId, Class<T> clazz)
throws ConfigNotFoundException, IOException {
// validate file with schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
return decoratedPersistence.dumpConfigs();
}

@Override
public void loadData(ConfigPersistence seedPersistence) throws IOException {
decoratedPersistence.loadData(seedPersistence);
}

private <T> void validateJson(T config, AirbyteConfig configType) throws JsonValidationException {
JsonNode schema = JsonSchemaValidator.getSchema(configType.getConfigSchemaFile());
schemaValidator.ensure(schema, Jsons.jsonNode(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,9 @@ public Map<String, Stream<JsonNode>> dumpConfigs() {
e -> e.getValue().values().stream()));
}

@Override
public void loadData(ConfigPersistence seedPersistence) throws IOException {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,11 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
Expand Down Expand Up @@ -131,7 +128,7 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist
// 2. dry run
try {
checkImport(targetVersion, sourceRoot);
importConfigsFromArchive(sourceRoot, seedPersistence, true);
importConfigsFromArchive(sourceRoot, true);
} catch (Exception e) {
LOGGER.error("Dry run failed.", e);
throw e;
Expand All @@ -140,8 +137,9 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist
// 3. Import Postgres content
importDatabaseFromArchive(sourceRoot, targetVersion);

// 4. Import Configs
importConfigsFromArchive(sourceRoot, seedPersistence, false);
// 4. Import Configs and update connector definitions
importConfigsFromArchive(sourceRoot, false);
configRepository.loadData(seedPersistence);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but doesn't loadData need to handle the upgrade checks that we were handling in importConfigsFromArchive before to ensure we aren't overwriting source and destination definitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, DatabaseConfigPersistence#loadData does avoid updating a connector if it is used. The FileSystemConfigPersistence#loadData overwrites everything, but it is not used. Let me change it to throw an exception.

These existing tests ensure that we don’t update a connector definition if it is in use:
https://github.com/airbytehq/airbyte/blob/master/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java
https://github.com/airbytehq/airbyte/blob/master/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java#L163


// 5. Set DB version
LOGGER.info("Setting the DB Airbyte version to : " + targetVersion);
Expand All @@ -158,7 +156,7 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist
configRepository.listStandardWorkspaces(true).forEach(workspace -> TrackingClientSingleton.get().identify(workspace.getWorkspaceId()));
}

private void checkImport(String targetVersion, Path tempFolder) throws IOException, JsonValidationException {
private void checkImport(String targetVersion, Path tempFolder) throws IOException {
final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME);
final String importVersion = Files.readString(versionFile, Charset.defaultCharset())
.replace("\n", "").strip();
Expand All @@ -179,21 +177,10 @@ private List<String> listDirectories(Path sourceRoot) throws IOException {
}
}

private <T> void importConfigsFromArchive(final Path sourceRoot, ConfigPersistence seedPersistence, final boolean dryRun)
throws IOException, JsonValidationException {
final Set<String> sourceDefinitionsInUse = new HashSet<>();
final Set<String> destinationDefinitionsInUse = new HashSet<>();
final boolean[] sourceProcessed = {false};
final boolean[] destinationProcessed = {false};
private void importConfigsFromArchive(final Path sourceRoot, final boolean dryRun) throws IOException {
final List<String> directories = listDirectories(sourceRoot);
// We sort the directories because we want to process SOURCE_CONNECTION before
// STANDARD_SOURCE_DEFINITION and DESTINATION_CONNECTION before STANDARD_DESTINATION_DEFINITION
// so that we can identify which definitions should not be upgraded to the latest version
Collections.sort(directories);
final Map<AirbyteConfig, Stream<?>> data = new LinkedHashMap<>();

final Map<ConfigSchema, Map<String, ?>> seeds = getSeeds(seedPersistence);

for (final String directory : directories) {
final Optional<ConfigSchema> configSchemaOptional = Enums.toEnum(directory.replace(".yaml", ""), ConfigSchema.class);

Expand All @@ -202,90 +189,11 @@ private <T> void importConfigsFromArchive(final Path sourceRoot, ConfigPersisten
}

final ConfigSchema configSchema = configSchemaOptional.get();
Stream<?> configs = readConfigsFromArchive(sourceRoot, configSchema);

// If there is no source or destination connection, mark them as processed respectively.
if (configSchema == ConfigSchema.STANDARD_SOURCE_DEFINITION && !data.containsKey(ConfigSchema.SOURCE_CONNECTION)) {
sourceProcessed[0] = true;
} else if (configSchema == ConfigSchema.STANDARD_DESTINATION_DEFINITION && !data.containsKey(ConfigSchema.DESTINATION_CONNECTION)) {
destinationProcessed[0] = true;
}

configs = streamWithAdditionalOperation(
sourceDefinitionsInUse,
destinationDefinitionsInUse,
sourceProcessed,
destinationProcessed,
configSchema,
configs,
seeds);
data.put(configSchema, configs);
data.put(configSchema, readConfigsFromArchive(sourceRoot, configSchema));
}
configRepository.replaceAllConfigs(data, dryRun);
}

/**
* Convert config dumps from {@link ConfigPersistence#dumpConfigs} to the desired format.
*/
@SuppressWarnings("unchecked")
private static Map<ConfigSchema, Map<String, ?>> getSeeds(ConfigPersistence configSeedPersistence) throws IOException {
Map<ConfigSchema, Map<String, ?>> allData = new HashMap<>(2);
for (Map.Entry<String, Stream<JsonNode>> configStream : configSeedPersistence.dumpConfigs().entrySet()) {
ConfigSchema configSchema = ConfigSchema.valueOf(configStream.getKey());
Map<String, ?> configSeeds = configStream.getValue()
.map(node -> Jsons.object(node, configSchema.getClassName()))
.collect(Collectors.toMap(
configSchema::getId,
object -> object));
allData.put(configSchema, configSeeds);
}
return allData;
}

private Stream<?> streamWithAdditionalOperation(Set<String> sourceDefinitionsInUse,
Set<String> destinationDefinitionsInUse,
boolean[] sourceProcessed,
boolean[] destinationProcessed,
ConfigSchema configSchema,
Stream<?> configs,
Map<ConfigSchema, Map<String, ?>> latestSeeds) {
if (configSchema == ConfigSchema.SOURCE_CONNECTION) {
sourceProcessed[0] = true;
configs = configs.peek(config -> sourceDefinitionsInUse.add(((SourceConnection) config).getSourceDefinitionId().toString()));
} else if (configSchema == ConfigSchema.DESTINATION_CONNECTION) {
destinationProcessed[0] = true;
configs = configs.peek(config -> destinationDefinitionsInUse.add(((DestinationConnection) config).getDestinationDefinitionId().toString()));
} else if (configSchema == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
Map<String, ?> sourceDefinitionSeeds = latestSeeds.get(configSchema);
configs = getDefinitionStream(sourceDefinitionsInUse, sourceProcessed[0], configSchema, configs, sourceDefinitionSeeds);
} else if (configSchema == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
Map<String, ?> destinationDefinitionSeeds = latestSeeds.get(configSchema);
configs = getDefinitionStream(destinationDefinitionsInUse, destinationProcessed[0], configSchema, configs, destinationDefinitionSeeds);
}
return configs;
}

/**
* This method combines the latest definitions with existing ones. If a connector is being used by
* user, it will continue to be at the same version, otherwise it will be migrated to the latest
* version
*/
private Stream<?> getDefinitionStream(Set<String> definitionsInUse,
boolean definitionsPopulated,
ConfigSchema configSchema,
Stream<?> currentDefinitions,
Map<String, ?> latestDefinitions) {
if (!definitionsPopulated) {
throw new RuntimeException("Trying to process " + configSchema + " without populating the definitions in use");
}

return Streams.concat(
// Keep all the definitions in use
currentDefinitions.filter(c -> definitionsInUse.contains(configSchema.getId(c))),
// Upgrade all the definitions not in use
latestDefinitions.entrySet().stream().filter(c -> !definitionsInUse.contains(c.getKey())).map(Entry::getValue));
}

private <T> Stream<T> readConfigsFromArchive(final Path storageRoot, final ConfigSchema schemaType)
throws IOException {

Expand Down
8 changes: 3 additions & 5 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.ConfigSeedProvider;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
Expand Down Expand Up @@ -179,10 +178,9 @@ public static ServerRunnable getServer(ServerFactory apiFactory) throws Exceptio
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl())
.getAndInitialize();
final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase)
.loadData(ConfigSeedProvider.get(configs))
.withValidation();
final ConfigRepository configRepository = new ConfigRepository(configPersistence);
final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase);
configPersistence.loadData(ConfigSeedProvider.get(configs));
final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation());

LOGGER.info("Creating Scheduler persistence...");
final Database jobDatabase = new JobsDatabaseInstance(
Expand Down