-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update airbyte protocol migration #20745
Update airbyte protocol migration #20745
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, I only really reviewed the migration interface stuff (i.e. skipped over all the files that actually call into migrations). Did a quick merge into my upgrade/downgrade branches locally and they seem happy, so and I'll do the merges correctly + extract the catalog migration
…susnp/20695-update-airbyte-protocol-migration
Affected Connector ReportNOTE
|
Connector | Version | Changelog | Publish |
---|
- See "Actionable Items" below for how to resolve warnings and errors.
❌ Destinations (16)
Connector | Version | Changelog | Publish |
---|---|---|---|
destination-bigquery |
1.2.9 |
✅ | ✅ |
destination-bigquery-denormalized |
1.2.10 |
✅ | ✅ |
destination-clickhouse |
0.2.1 |
✅ | ✅ |
destination-clickhouse-strict-encrypt |
0.2.1 |
🔵 (ignored) |
🔵 (ignored) |
destination-jdbc |
0.3.14 |
🔵 (ignored) |
🔵 (ignored) |
destination-mssql |
0.1.22 |
✅ | ✅ |
destination-mssql-strict-encrypt |
0.1.22 |
🔵 (ignored) |
🔵 (ignored) |
destination-mysql |
0.1.20 |
✅ | ✅ |
destination-mysql-strict-encrypt |
❌ 0.1.21 (mismatch: 0.1.20 ) |
🔵 (ignored) |
🔵 (ignored) |
destination-oracle |
0.1.19 |
✅ | ✅ |
destination-oracle-strict-encrypt |
0.1.19 |
🔵 (ignored) |
🔵 (ignored) |
destination-postgres |
0.3.26 |
✅ | ✅ |
destination-postgres-strict-encrypt |
0.3.26 |
🔵 (ignored) |
🔵 (ignored) |
destination-redshift |
0.3.53 |
✅ | ✅ |
destination-snowflake |
0.4.41 |
✅ | ✅ |
destination-tidb |
0.1.0 |
✅ | ✅ |
- See "Actionable Items" below for how to resolve warnings and errors.
👀 Other Modules (1)
- base-normalization
Actionable Items
(click to expand)
Category | Status | Actionable Item |
---|---|---|
Version | ❌ mismatch |
The version of the connector is different from its normal variant. Please bump the version of the connector. |
⚠ doc not found |
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug. |
|
Changelog | ⚠ doc not found |
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug. |
❌ changelog missing |
There is no chnagelog for the current version of the connector. If you are the author of the current version, please add a changelog. | |
Publish | ⚠ not in seed |
The connector is not in the seed file (e.g. source_definitions.yaml ), so its publication status cannot be checked. This can be normal (e.g. some connectors are cloud-specific, and only listed in the cloud seed file). Please double-check to make sure that it is not a bug. |
❌ diff seed version |
The connector exists in the seed file, but the latest version is not listed there. This usually means that the latest version is not published. Please use the /publish command to publish the latest version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with a small comment about creation a micronaut factory
* Factory to build AirbyteMessageVersionedMigrator | ||
*/ | ||
@Singleton | ||
public class AirbyteProtocolVersionedMigratorFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be an actual micronaut factory (the @Factory
annotation)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the current structure, we inject the factory so that the worker can get a version dependent instance. Because this is per request, I'd expect a bigger refactoring to be needed for this.
Trying to limit complexity of the changes at the moment.
} | ||
|
||
public <T> AirbyteMessageVersionedMigrator<T> getAirbyteMessageMigrator(final Version version) { | ||
return new AirbyteMessageVersionedMigrator<>(this.airbyteMessageMigrator, version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this
qualifier necessary here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final Version target) { | ||
return migrationContainer.downgrade(message, target, ConfiguredAirbyteCatalogMigrator::applyDowngrade); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to validate that this version is indeed a downgrade?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, current design enforces a one-way look up from/to the current version. If the version provided isn't a valid downgrade or upgrade, we either fail because the version isn't supported (in the list of known migrations) or a mismatch in the type conversion.
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final Version source) { | ||
return migrationContainer.upgrade(message, source, ConfiguredAirbyteCatalogMigrator::applyUpgrade); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to validate that this version is indeed an upgrade?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
answered above.
private static <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> migration, | ||
final Object message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this method is generic, can you have final CurrentVersion message
instead of final Object message
?
} | ||
|
||
// Helper function to work around type casting | ||
private static <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> migration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this method is generic, can you have final PreviousVersion message
instead of final Object message
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not able to make it work because of the MigrationContainer
.
More specifically, eventually, we apply a chain of transforms f(A -> B -> C)
which is falling appart when passing in final BiFunction<T, Object, Object> applyDowngrade
. Type end up being casted dynamically from object because of this.
public interface ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> extends Migration { | ||
|
||
/** | ||
* Downgrades a ConfiguredAirbyteCatalog to from the new version to the old version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo in this javadoc, the first to
doesn't apply
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
*/ | ||
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, | ||
final Version target, | ||
final BiFunction<T, Object, Object> applyDowngrade) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be BiFunction<T, PreviousVersion, CurrentVersion> applyDowngrade
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question applies to the next method as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar answer as a previous question, reposting here for context:
We apply a chain of transforms f(A -> B -> C) which is falling appart when passing in final BiFunction<T, Object, Object> applyDowngrade. Type end up being casted dynamically from object because of this.
return (PreviousVersion) message; | ||
} | ||
|
||
Object result = message; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be defined as PreviousVersion results
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No because of the chaining of transforms, if we end up having more than one migration, the type of results
will change.
} | ||
} | ||
|
||
public Set<String> getMigrationKeys() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be public
or @VisibleForTesting
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be public, once the bootloader is migrated to micronaut, we should use this to check that we actually have all the migrations required to support the range of protocol versions from the config.
public AirbyteMessageMigrator(final List<AirbyteMessageMigration<?, ?>> migrations) { | ||
migrationContainer = new MigrationContainer<>(migrations); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the MigrationContainer
be injected here instead of the List<>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, the intent is to inject the list of migrations. We have different migrations intended for the AirbyteMessageMigrator
or the ConfiguredAirbyteCatalogMigrator
.
The MigrationContainer<>
is a shared container for abstracting the logic of how we fetch the migrations. Injecting the MigrationContainer would involve an extra layer of subclassing which looks more like overhead than solving a problem at the moment.
* Add Airbyte Protocol V1 support. * Fix VersionedAirbyteStreamFactoryTest * Remove AirbyteMessageMigrationV0 example * Add Protocol Version constants * 🎉Updated normalization to handle new datatypes (#19721) * Updated normalization simple stream processing to handle new datatypes * Updated normalization nested stream processing to handle new datatypes * Updated normalization nested stream processing to handle new datatypes * Updated normalization drop_scd_catalog processing to handle new datatypes * Updated normalization ephemeral test processing to handle new datatypes * fixed more tests for normalization * fixed more tests for normalization * fixed more tests for normalization * fixed more tests for normalization * fixed more issues * fixed more issues (clickhouse) * fixed more issues * fixed more issues * fixed more issues * added binary type processing for some DBs * cleared commented code and moved some hardcodes to processing as macro * fixed codestyle and cleared commented code * minor refactor * minor refactor * minor refactor * fixed bool cast error * fixed dict->str cast error * fixed is_combining_node cast py check * removed commented code * removed commented code * committed autogenerated normalization_test_output files * committed autogenerated normalization_test_output files (new files) * refactored utils.py * Updated utils.py to use Callable functions and get rid of property_type in is_number and is_bool functions * committed autogenerated normalization_test_output files (new files) * fixed typo in TIMESTAMP_WITH_TIMEZONE_TYPE * updated stream_processor to handle string type first as a wider type * fixed arrays normalization by updating is_simple_property method as per new approaches * format Co-authored-by: Edward Gao <[email protected]> * Update airbyte protocol migration (#20745) * Extract MigrationContainer from AirbyteMessageMigrator * Add ConfiguredAirbyteCatalogMigrations * Add ConfiguredAirbyteCatalog to AirbyteMessageMigrations * Enable ConfiguredAirbyteCatalog migration * Fix tests * Remove extra this. * Add missing docs * Typo Co-authored-by: Edward Gao <[email protected]> * Data types update: Implement protocol message migrations (#19240) * Extract MigrationContainer from AirbyteMessageMigrator * Add ConfiguredAirbyteCatalogMigrations * Add ConfiguredAirbyteCatalog to AirbyteMessageMigrations * Enable ConfiguredAirbyteCatalog migration * set up scaffolding * [wip] more scaffolding, basic unit test * minimal green code * [wip] add failing test for other primitive types * correct version number * handle basic primitive type decls * add implicit cases * add recursive schema * formatting * comment * support not * fix indentation * handle all nested schema cases * handle boolean schemas * verify empty schema handling * cleanup * extract map * code organization * extract method * reformat * [wip] more tests, minor fix type array handling * corrected test * cleanup * reformat * switch to v1 * add support for multityped fields * missed test case * nested test class * basic record upgrade * implement record upgrades * slight refactor * comments+clarificationso * extract constants * (partly) correct model classes * add de/ser * formatting * extract constants * fix json reference * update docs * switch to v1 models * fix compile+test * add base64 handling * use vnull * Data types update: Implement protocol message downgrade path (#19909) * rough skeleton for passing catalog into migration * basic test * more scaffolding * basic implementation * add primitives test * add in other tests (nested fields currently failing) * add formats * impleent oneOf handling * formatting * oneOf handling * better tests * comments + organization * progress * basic test case * downgrade objects, ish * basic array implementation * handle numeric failure * test for new type * handle array items * empty schema handling * first pass at oneof handling * add more tests+handling * more tests * comments * add empty oneof test case * format + reorganize * more reorganize * fix name * also downgrade binary data * only import vnull * move migrations into v1 package * extract schema mutation code * comment * extract schema migration to new class * extract record downgrade logic for future use * format * fix build after rebase * rename private method for consistency * also implement configuredcatalog migrations >.> * quick and dirty tests * slight cleanup * fix tests * pmd * pmd test * null check on message objects * maybe fix acceptance tests? * fix name * extract constants * more fixes * tmp * meh * fix cdc acc tests * revert to master source-postgres * remove log messages * revert other misc hacks * integers are valid cursors * remove unrelated change * fix build * fix build more? * [MUST REVERT] use dev normalization * capture kube logs * also here? * no debug logs? * delete dup from merging * add final everywhere * revert test changes Co-authored-by: Jimmy Ma <[email protected]> * On-the-fly migrations of persisted catalogs (#21757) * On the fly catalog migration for normalization activity * On the fly catalog migration for job persistence * On the fly migration for standard sync persistence * On the fly migration for airbyte catalogs * Refactor code to share JsonSchema traversal * Add V0 Data type search function * PMD and Format * Fix getOrInsertActorCatalog and ConfigRepositoryE2E tests * Null-proofing CatalogMigrationV1Helper * More null checks * Fix test * Format * Add data type v1 support to the FE * Changes AC test check to check exited ps (#21672) some docker compose changes no longer show exited processes. this broke out test this change should fix master tested in a runner that failed * Move wellknown types mapping to the utility function * use protocolv1 normalization --------- Co-authored-by: Topher Lubaway <[email protected]> Co-authored-by: Edward Gao <[email protected]> * Update protocol support range (#21996) * bump normalization version to 0.3.0 * Add version check on normalization (#22048) * Add normalization min version check * Add visible for testing --------- Co-authored-by: Edward Gao <[email protected]> Co-authored-by: Eugene <[email protected]> Co-authored-by: Topher Lubaway <[email protected]>
What
Addresses a few gaps in the current protocol migration:
AirbyteMessage
migration need theConfiguredAirbyteCatalog
to be able to downgrade messagesConfiguredAirbyteCatalog
is a top level object that has a separate lifecycle from theAirbyteMessage
, add the necessary classes to support its migrationConfiguredAirbyteCatalog
is passed as part of the connector inputs that has a separate serialization, we need to add versioning support in that code path.Closes #20695
How
ProtocolSerializer
to wrap migrations that are not directly no AirbyteMessagesAirbyteMessageMIgrator
andConfiguredAirbyteCatologMigrator
. As a result,MigrationContainer
has been extractedRecommended reading order
🚨 User Impact 🚨
There should be no user impact.
This will enable the migration of the ConfiguredAirbyteCatalog.