Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update airbyte protocol migration #20745

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration;
import io.airbyte.commons.protocol.migrations.MigrationContainer;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* AirbyteProtocol Message Migrator
Expand All @@ -25,104 +24,59 @@
@Singleton
public class AirbyteMessageMigrator {

private final List<AirbyteMessageMigration<?, ?>> migrationsToRegister;
private final SortedMap<String, AirbyteMessageMigration<?, ?>> migrations = new TreeMap<>();
private String mostRecentMajorVersion = "";
private final MigrationContainer<AirbyteMessageMigration<?, ?>> migrationContainer;

public AirbyteMessageMigrator(List<AirbyteMessageMigration<?, ?>> migrations) {
migrationsToRegister = migrations;
}

public AirbyteMessageMigrator() {
this(Collections.emptyList());
public AirbyteMessageMigrator(final List<AirbyteMessageMigration<?, ?>> migrations) {
migrationContainer = new MigrationContainer<>(migrations);
Comment on lines +29 to +30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the MigrationContainer be injected here instead of the List<>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, the intent is to inject the list of migrations. We have different migrations intended for the AirbyteMessageMigrator or the ConfiguredAirbyteCatalogMigrator.
The MigrationContainer<> is a shared container for abstracting the logic of how we fetch the migrations. Injecting the MigrationContainer would involve an extra layer of subclassing which looks more like overhead than solving a problem at the moment.

}

@PostConstruct
public void initialize() {
migrationsToRegister.forEach(this::registerMigration);
migrationContainer.initialize();
}

/**
* Downgrade a message from the most recent version to the target version by chaining all the
* required migrations
*/
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final Version target) {
if (target.getMajorVersion().equals(mostRecentMajorVersion)) {
return (PreviousVersion) message;
}

Object result = message;
Object[] selectedMigrations = selectMigrations(target).toArray();
for (int i = selectedMigrations.length; i > 0; --i) {
result = applyDowngrade((AirbyteMessageMigration<?, ?>) selectedMigrations[i - 1], result);
}
return (PreviousVersion) result;
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message,
final Version target,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrationContainer.downgrade(message, target, (migration, msg) -> applyDowngrade(migration, msg, configuredAirbyteCatalog));
}

/**
* Upgrade a message from the source version to the most recent version by chaining all the required
* migrations
*/
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final Version source) {
if (source.getMajorVersion().equals(mostRecentMajorVersion)) {
return (CurrentVersion) message;
}

Object result = message;
for (var migration : selectMigrations(source)) {
result = applyUpgrade(migration, result);
}
return (CurrentVersion) result;
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message,
final Version source,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrationContainer.upgrade(message, source, (migration, msg) -> applyUpgrade(migration, msg, configuredAirbyteCatalog));
}

public Version getMostRecentVersion() {
return new Version(mostRecentMajorVersion, "0", "0");
}

private Collection<AirbyteMessageMigration<?, ?>> selectMigrations(final Version version) {
final Collection<AirbyteMessageMigration<?, ?>> results = migrations.tailMap(version.getMajorVersion()).values();
if (results.isEmpty()) {
throw new RuntimeException("Unsupported migration version " + version.serialize());
}
return results;
return migrationContainer.getMostRecentVersion();
}

// Helper function to work around type casting
private <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
return migration.downgrade((CurrentVersion) message);
private static <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migration.downgrade((CurrentVersion) message, configuredAirbyteCatalog);
}

// Helper function to work around type casting
private <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
return migration.upgrade((PreviousVersion) message);
}

/**
* Store migration in a sorted map key by the major of the lower version of the migration.
*
* The goal is to be able to retrieve the list of migrations to apply to get to/from a given
* version. We are only keying on the lower version because the right side (most recent version of
* the migration range) is always current version.
*/
@VisibleForTesting
void registerMigration(final AirbyteMessageMigration<?, ?> migration) {
final String key = migration.getPreviousVersion().getMajorVersion();
if (!migrations.containsKey(key)) {
migrations.put(key, migration);
if (migration.getCurrentVersion().getMajorVersion().compareTo(mostRecentMajorVersion) > 0) {
mostRecentMajorVersion = migration.getCurrentVersion().getMajorVersion();
}
} else {
throw new RuntimeException("Trying to register a duplicated migration " + migration.getClass().getName());
}
private static <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migration.upgrade((PreviousVersion) message, configuredAirbyteCatalog);
}

// Used for inspection of the injection
@VisibleForTesting
Set<String> getMigrationKeys() {
return migrations.keySet();
return migrationContainer.getMigrationKeys();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Optional;

/**
* Wraps message migration from a fixed version to the most recent version
Expand All @@ -20,12 +22,12 @@ public AirbyteMessageVersionedMigrator(final AirbyteMessageMigrator migrator, fi
this.version = version;
}

public OriginalMessageType downgrade(final AirbyteMessage message) {
return migrator.downgrade(message, version);
public OriginalMessageType downgrade(final AirbyteMessage message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrator.downgrade(message, version, configuredAirbyteCatalog);
}

public AirbyteMessage upgrade(final OriginalMessageType message) {
return migrator.upgrade(message, version);
public AirbyteMessage upgrade(final OriginalMessageType message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrator.upgrade(message, version, configuredAirbyteCatalog);
}

public Version getVersion() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import io.airbyte.commons.version.Version;
import jakarta.inject.Singleton;

/**
* Factory to build AirbyteMessageVersionedMigrator
*/
@Singleton
public class AirbyteProtocolVersionedMigratorFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an actual micronaut factory (the @Factory annotation)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the current structure, we inject the factory so that the worker can get a version dependent instance. Because this is per request, I'd expect a bigger refactoring to be needed for this.
Trying to limit complexity of the changes at the moment.


private final AirbyteMessageMigrator airbyteMessageMigrator;
private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator;

public AirbyteProtocolVersionedMigratorFactory(final AirbyteMessageMigrator airbyteMessageMigrator,
final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator) {
this.airbyteMessageMigrator = airbyteMessageMigrator;
this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator;
}

public <T> AirbyteMessageVersionedMigrator<T> getAirbyteMessageMigrator(final Version version) {
return new AirbyteMessageVersionedMigrator<>(this.airbyteMessageMigrator, version);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this qualifier necessary here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

}

public final VersionedProtocolSerializer getProtocolSerializer(final Version version) {
return new VersionedProtocolSerializer(configuredAirbyteCatalogMigrator, version);
}

public Version getMostRecentVersion() {
return airbyteMessageMigrator.getMostRecentVersion();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.protocol.migrations.ConfiguredAirbyteCatalogMigration;
import io.airbyte.commons.protocol.migrations.MigrationContainer;
import io.airbyte.commons.version.Version;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import java.util.List;
import java.util.Set;

@Singleton
public class ConfiguredAirbyteCatalogMigrator {

private final MigrationContainer<ConfiguredAirbyteCatalogMigration<?, ?>> migrationContainer;

public ConfiguredAirbyteCatalogMigrator(final List<ConfiguredAirbyteCatalogMigration<?, ?>> migrations) {
migrationContainer = new MigrationContainer<>(migrations);
}

@PostConstruct
public void initialize() {
migrationContainer.initialize();
}

/**
* Downgrade a message from the most recent version to the target version by chaining all the
* required migrations
*/
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final Version target) {
return migrationContainer.downgrade(message, target, ConfiguredAirbyteCatalogMigrator::applyDowngrade);
}
Comment on lines +34 to +36
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to validate that this version is indeed a downgrade?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, current design enforces a one-way look up from/to the current version. If the version provided isn't a valid downgrade or upgrade, we either fail because the version isn't supported (in the list of known migrations) or a mismatch in the type conversion.


/**
* Upgrade a message from the source version to the most recent version by chaining all the required
* migrations
*/
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final Version source) {
return migrationContainer.upgrade(message, source, ConfiguredAirbyteCatalogMigrator::applyUpgrade);
}
Comment on lines +42 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to validate that this version is indeed an upgrade?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answered above.


public Version getMostRecentVersion() {
return migrationContainer.getMostRecentVersion();
}

// Helper function to work around type casting
private static <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
Comment on lines +51 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this method is generic, can you have final CurrentVersion message instead of final Object message?

return migration.downgrade((CurrentVersion) message);
}

// Helper function to work around type casting
private static <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> migration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this method is generic, can you have final PreviousVersion message instead of final Object message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not able to make it work because of the MigrationContainer.
More specifically, eventually, we apply a chain of transforms f(A -> B -> C) which is falling appart when passing in final BiFunction<T, Object, Object> applyDowngrade. Type end up being casted dynamically from object because of this.

final Object message) {
return migration.upgrade((PreviousVersion) message);
}

// Used for inspection of the injection
@VisibleForTesting
Set<String> getMigrationKeys() {
return migrationContainer.getMigrationKeys();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;

public class DefaultProtocolSerializer implements ProtocolSerializer {

@Override
public String serialize(ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
return Jsons.serialize(configuredAirbyteCatalog);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;

public interface ProtocolSerializer {

String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;

public class VersionedProtocolSerializer implements ProtocolSerializer {

private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator;
private final Version protocolVersion;

public VersionedProtocolSerializer(final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator, final Version protocolVersion) {
this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator;
this.protocolVersion = protocolVersion;
}

@Override
public String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
return Jsons.serialize(configuredAirbyteCatalogMigrator.downgrade(configuredAirbyteCatalog, protocolVersion));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this call downgrade? Probably worth throwing a comment into the code here explaining the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, added some docs to the VersionedProtocolSerializer

}

}
Loading