diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 879286dc0785..f7a2891b540e 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -164,98 +164,99 @@ MavenLocal debugging steps: ### Java CDK -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:-----------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.21.4 | 2024-02-21 | [\#35511](https://github.com/airbytehq/airbyte/pull/35511) | Reduce CDC state compression limit to 1MB | -| 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs | -| 0.21.2 | 2024-02-20 | [\#34978](https://github.com/airbytehq/airbyte/pull/34978) | Reduce log noise in NormalizationLogParser. | -| 0.21.1 | 2024-02-20 | [\#35199](https://github.com/airbytehq/airbyte/pull/35199) | Add thread names to the logs. | -| 0.21.0 | 2024-02-16 | [\#35314](https://github.com/airbytehq/airbyte/pull/35314) | Delete S3StreamCopier classes. These have been superseded by the async destinations framework. | -| 0.20.9 | 2024-02-15 | [\#35240](https://github.com/airbytehq/airbyte/pull/35240) | Make state emission to platform inside state manager itself. | -| 0.20.8 | 2024-02-15 | [\#35285](https://github.com/airbytehq/airbyte/pull/35285) | Improve blobstore module structure. | -| 0.20.7 | 2024-02-13 | [\#35236](https://github.com/airbytehq/airbyte/pull/35236) | output logs to files in addition to stdout when running tests | -| 0.20.6 | 2024-02-12 | [\#35036](https://github.com/airbytehq/airbyte/pull/35036) | Add trace utility to emit analytics messages. | -| 0.20.5 | 2024-02-13 | [\#34869](https://github.com/airbytehq/airbyte/pull/34869) | Don't emit final state in SourceStateIterator there is an underlying stream failure. | -| 0.20.4 | 2024-02-12 | [\#35042](https://github.com/airbytehq/airbyte/pull/35042) | Use delegate's isDestinationV2 invocation in SshWrappedDestination. | -| 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. | -| 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. | -| 0.20.1 | 2024-02-11 | [\#35111](https://github.com/airbytehq/airbyte/pull/35111) | Fix GlobalAsyncStateManager stats counting logic. | -| 0.20.0 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Add new test cases to BaseTypingDedupingTest to exercise special characters. | -| 0.19.0 | 2024-02-01 | [\#34745](https://github.com/airbytehq/airbyte/pull/34745) | Reorganize CDK module structure. | -| 0.18.0 | 2024-02-08 | [\#33606](https://github.com/airbytehq/airbyte/pull/33606) | Add updated Initial and Incremental Stream State definitions for DB Sources. | -| 0.17.1 | 2024-02-08 | [\#35027](https://github.com/airbytehq/airbyte/pull/35027) | Make state handling thread safe in async destination framework. | -| 0.17.0 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Enable configuring async destination batch size. | -| 0.16.6 | 2024-02-07 | [\#34892](https://github.com/airbytehq/airbyte/pull/34892) | Improved testcontainers logging and support for unshared containers. | -| 0.16.5 | 2024-02-07 | [\#34948](https://github.com/airbytehq/airbyte/pull/34948) | Fix source state stats counting logic | -| 0.16.4 | 2024-02-01 | [\#34727](https://github.com/airbytehq/airbyte/pull/34727) | Add future based stdout consumer in BaseTypingDedupingTest | -| 0.16.3 | 2024-01-30 | [\#34669](https://github.com/airbytehq/airbyte/pull/34669) | Fix org.apache.logging.log4j:log4j-slf4j-impl version conflicts. | -| 0.16.2 | 2024-01-29 | [\#34630](https://github.com/airbytehq/airbyte/pull/34630) | expose NamingTransformer to sub-classes in destinations JdbcSqlGenerator. | -| 0.16.1 | 2024-01-29 | [\#34533](https://github.com/airbytehq/airbyte/pull/34533) | Add a safe method to execute DatabaseMetadata's Resultset returning queries. | -| 0.16.0 | 2024-01-26 | [\#34573](https://github.com/airbytehq/airbyte/pull/34573) | Untangle Debezium harness dependencies. | -| 0.15.2 | 2024-01-25 | [\#34441](https://github.com/airbytehq/airbyte/pull/34441) | Improve airbyte-api build performance. | -| 0.15.1 | 2024-01-25 | [\#34451](https://github.com/airbytehq/airbyte/pull/34451) | Async destinations: Better logging when we fail to parse an AirbyteMessage | -| 0.15.0 | 2024-01-23 | [\#34441](https://github.com/airbytehq/airbyte/pull/34441) | Removed connector registry and micronaut dependencies. | -| 0.14.2 | 2024-01-24 | [\#34458](https://github.com/airbytehq/airbyte/pull/34458) | Handle case-sensitivity in sentry error grouping | -| 0.14.1 | 2024-01-24 | [\#34468](https://github.com/airbytehq/airbyte/pull/34468) | Add wait for process to be done before ending sync in destination BaseTDTest | -| 0.14.0 | 2024-01-23 | [\#34461](https://github.com/airbytehq/airbyte/pull/34461) | Revert non backward compatible signature changes from 0.13.1 | -| 0.13.3 | 2024-01-23 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Denote if destinations fully support Destinations V2 | -| 0.13.2 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector | -| 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator | -| 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes | -| 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method | -| 0.12.0 | 2024-01-10 | [\#33875](https://github.com/airbytehq/airbyte/pull/33875) | Upgrade sshd-mina to 2.11.1 | -| 0.11.5 | 2024-01-10 | [\#34119](https://github.com/airbytehq/airbyte/pull/34119) | Remove wal2json support for postgres+debezium. | -| 0.11.4 | 2024-01-09 | [\#33305](https://github.com/airbytehq/airbyte/pull/33305) | Source stats in incremental syncs | -| 0.11.3 | 2023-01-09 | [\#33658](https://github.com/airbytehq/airbyte/pull/33658) | Always fail when debezium fails, even if it happened during the setup phase. | -| 0.11.2 | 2024-01-09 | [\#33969](https://github.com/airbytehq/airbyte/pull/33969) | Destination state stats implementation | -| 0.11.1 | 2024-01-04 | [\#33727](https://github.com/airbytehq/airbyte/pull/33727) | SSH bastion heartbeats for Destinations | -| 0.11.0 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | DV2 T+D uses Sql struct to represent transactions; other T+D-related changes | -| 0.10.4 | 2023-12-20 | [\#33071](https://github.com/airbytehq/airbyte/pull/33071) | Add the ability to parse JDBC parameters with another delimiter than '&' | -| 0.10.3 | 2024-01-03 | [\#33312](https://github.com/airbytehq/airbyte/pull/33312) | Send out count in AirbyteStateMessage | -| 0.10.1 | 2023-12-21 | [\#33723](https://github.com/airbytehq/airbyte/pull/33723) | Make memory-manager log message less scary | -| 0.10.0 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | JdbcDestinationHandler now properly implements `getInitialRawTableState`; reenable SqlGenerator test | -| 0.9.0 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Separate from Table Creation, exclude the T&D module from the CDK | -| 0.8.0 | 2023-12-18 | [\#33506](https://github.com/airbytehq/airbyte/pull/33506) | Improve async destination shutdown logic; more JDBC async migration work; improve DAT test schema handling | -| 0.7.9 | 2023-12-18 | [\#33549](https://github.com/airbytehq/airbyte/pull/33549) | Improve MongoDB logging. | -| 0.7.8 | 2023-12-18 | [\#33365](https://github.com/airbytehq/airbyte/pull/33365) | Emit stream statuses more consistently | -| 0.7.7 | 2023-12-18 | [\#33434](https://github.com/airbytehq/airbyte/pull/33307) | Remove LEGACY state | -| 0.7.6 | 2023-12-14 | [\#32328](https://github.com/airbytehq/airbyte/pull/33307) | Add schema less mode for mongodb CDC. Fixes for non standard mongodb id type. | -| 0.7.4 | 2023-12-13 | [\#33232](https://github.com/airbytehq/airbyte/pull/33232) | Track stream record count during sync; only run T+D if a stream had nonzero records or the previous sync left unprocessed records. | -| 0.7.3 | 2023-12-13 | [\#33369](https://github.com/airbytehq/airbyte/pull/33369) | Extract shared JDBC T+D code. | -| 0.7.2 | 2023-12-11 | [\#33307](https://github.com/airbytehq/airbyte/pull/33307) | Fix DV2 JDBC type mappings (code changes in [\#33307](https://github.com/airbytehq/airbyte/pull/33307)). | -| 0.7.1 | 2023-12-01 | [\#33027](https://github.com/airbytehq/airbyte/pull/33027) | Add the abstract DB source debugger. | -| 0.7.0 | 2023-12-07 | [\#32326](https://github.com/airbytehq/airbyte/pull/32326) | Destinations V2 changes for JDBC destinations | -| 0.6.4 | 2023-12-06 | [\#33082](https://github.com/airbytehq/airbyte/pull/33082) | Improvements to schema snapshot error handling + schema snapshot history scope (scoped to configured DB). | -| 0.6.2 | 2023-11-30 | [\#32573](https://github.com/airbytehq/airbyte/pull/32573) | Update MSSQLConverter to enforce 6-digit microsecond precision for timestamp fields | -| 0.6.1 | 2023-11-30 | [\#32610](https://github.com/airbytehq/airbyte/pull/32610) | Support DB initial sync using binary as primary key. | -| 0.6.0 | 2023-11-30 | [\#32888](https://github.com/airbytehq/airbyte/pull/32888) | JDBC destinations now use the async framework | -| 0.5.3 | 2023-11-28 | [\#32686](https://github.com/airbytehq/airbyte/pull/32686) | Better attribution of debezium engine shutdown due to heartbeat. | -| 0.5.1 | 2023-11-27 | [\#32662](https://github.com/airbytehq/airbyte/pull/32662) | Debezium initialization wait time will now read from initial setup time. | -| 0.5.0 | 2023-11-22 | [\#32656](https://github.com/airbytehq/airbyte/pull/32656) | Introduce TestDatabase test fixture, refactor database source test base classes. | -| 0.4.11 | 2023-11-14 | [\#32526](https://github.com/airbytehq/airbyte/pull/32526) | Clean up memory manager logs. | -| 0.4.10 | 2023-11-13 | [\#32285](https://github.com/airbytehq/airbyte/pull/32285) | Fix UUID codec ordering for MongoDB connector | -| 0.4.9 | 2023-11-13 | [\#32468](https://github.com/airbytehq/airbyte/pull/32468) | Further error grouping improvements for DV2 connectors | -| 0.4.8 | 2023-11-09 | [\#32377](https://github.com/airbytehq/airbyte/pull/32377) | source-postgres tests: skip dropping database | -| 0.4.7 | 2023-11-08 | [\#31856](https://github.com/airbytehq/airbyte/pull/31856) | source-postgres: support for infinity date and timestamps | -| 0.4.5 | 2023-11-07 | [\#32112](https://github.com/airbytehq/airbyte/pull/32112) | Async destinations framework: Allow configuring the queue flush threshold | -| 0.4.4 | 2023-11-06 | [\#32119](https://github.com/airbytehq/airbyte/pull/32119) | Add STANDARD UUID codec to MongoDB debezium handler | -| 0.4.2 | 2023-11-06 | [\#32190](https://github.com/airbytehq/airbyte/pull/32190) | Improve error deinterpolation | -| 0.4.1 | 2023-11-02 | [\#32192](https://github.com/airbytehq/airbyte/pull/32192) | Add 's3-destinations' CDK module. | -| 0.4.0 | 2023-11-02 | [\#32050](https://github.com/airbytehq/airbyte/pull/32050) | Fix compiler warnings. | -| 0.3.0 | 2023-11-02 | [\#31983](https://github.com/airbytehq/airbyte/pull/31983) | Add deinterpolation feature to AirbyteExceptionHandler. | -| 0.2.4 | 2023-10-31 | [\#31807](https://github.com/airbytehq/airbyte/pull/31807) | Handle case of debezium update and delete of records in mongodb. | -| 0.2.3 | 2023-10-31 | [\#32022](https://github.com/airbytehq/airbyte/pull/32022) | Update Debezium version from 2.20 -> 2.4.0. | -| 0.2.2 | 2023-10-31 | [\#31976](https://github.com/airbytehq/airbyte/pull/31976) | Debezium tweaks to make tests run faster. | -| 0.2.0 | 2023-10-30 | [\#31960](https://github.com/airbytehq/airbyte/pull/31960) | Hoist top-level gradle subprojects into CDK. | -| 0.1.12 | 2023-10-24 | [\#31674](https://github.com/airbytehq/airbyte/pull/31674) | Fail sync when Debezium does not shut down properly. | -| 0.1.11 | 2023-10-18 | [\#31486](https://github.com/airbytehq/airbyte/pull/31486) | Update constants in AdaptiveSourceRunner. | -| 0.1.9 | 2023-10-12 | [\#31309](https://github.com/airbytehq/airbyte/pull/31309) | Use toPlainString() when handling BigDecimals in PostgresConverter | -| 0.1.8 | 2023-10-11 | [\#31322](https://github.com/airbytehq/airbyte/pull/31322) | Cap log line length to 32KB to prevent loss of records | -| 0.1.7 | 2023-10-10 | [\#31194](https://github.com/airbytehq/airbyte/pull/31194) | Deallocate unused per stream buffer memory when empty | -| 0.1.6 | 2023-10-10 | [\#31083](https://github.com/airbytehq/airbyte/pull/31083) | Fix precision of numeric values in async destinations | -| 0.1.5 | 2023-10-09 | [\#31196](https://github.com/airbytehq/airbyte/pull/31196) | Update typo in CDK (CDN_LSN -> CDC_LSN) | -| 0.1.4 | 2023-10-06 | [\#31139](https://github.com/airbytehq/airbyte/pull/31139) | Reduce async buffer | -| 0.1.1 | 2023-09-28 | [\#30835](https://github.com/airbytehq/airbyte/pull/30835) | JDBC destinations now avoid staging area name collisions by using the raw table name as the stage name. (previously we used the stream name as the stage name) | -| 0.1.0 | 2023-09-27 | [\#30445](https://github.com/airbytehq/airbyte/pull/30445) | First launch, including shared classes for all connectors. | -| 0.0.2 | 2023-08-21 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Version bump only (no other changes). | -| 0.0.1 | 2023-08-08 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Initial release for testing. | \ No newline at end of file +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.23.0 | 2024-02-22 | [\#35342](https://github.com/airbytehq/airbyte/pull/35342) | Consolidate and perform upfront gathering of DB metadata state | +| 0.21.4 | 2024-02-21 | [\#35511](https://github.com/airbytehq/airbyte/pull/35511) | Reduce CDC state compression limit to 1MB | +| 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs | +| 0.21.2 | 2024-02-20 | [\#34978](https://github.com/airbytehq/airbyte/pull/34978) | Reduce log noise in NormalizationLogParser. | +| 0.21.1 | 2024-02-20 | [\#35199](https://github.com/airbytehq/airbyte/pull/35199) | Add thread names to the logs. | +| 0.21.0 | 2024-02-16 | [\#35314](https://github.com/airbytehq/airbyte/pull/35314) | Delete S3StreamCopier classes. These have been superseded by the async destinations framework. | +| 0.20.9 | 2024-02-15 | [\#35240](https://github.com/airbytehq/airbyte/pull/35240) | Make state emission to platform inside state manager itself. | +| 0.20.8 | 2024-02-15 | [\#35285](https://github.com/airbytehq/airbyte/pull/35285) | Improve blobstore module structure. | +| 0.20.7 | 2024-02-13 | [\#35236](https://github.com/airbytehq/airbyte/pull/35236) | output logs to files in addition to stdout when running tests | +| 0.20.6 | 2024-02-12 | [\#35036](https://github.com/airbytehq/airbyte/pull/35036) | Add trace utility to emit analytics messages. | +| 0.20.5 | 2024-02-13 | [\#34869](https://github.com/airbytehq/airbyte/pull/34869) | Don't emit final state in SourceStateIterator there is an underlying stream failure. | +| 0.20.4 | 2024-02-12 | [\#35042](https://github.com/airbytehq/airbyte/pull/35042) | Use delegate's isDestinationV2 invocation in SshWrappedDestination. | +| 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. | +| 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. | +| 0.20.1 | 2024-02-11 | [\#35111](https://github.com/airbytehq/airbyte/pull/35111) | Fix GlobalAsyncStateManager stats counting logic. | +| 0.20.0 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Add new test cases to BaseTypingDedupingTest to exercise special characters. | +| 0.19.0 | 2024-02-01 | [\#34745](https://github.com/airbytehq/airbyte/pull/34745) | Reorganize CDK module structure. | +| 0.18.0 | 2024-02-08 | [\#33606](https://github.com/airbytehq/airbyte/pull/33606) | Add updated Initial and Incremental Stream State definitions for DB Sources. | +| 0.17.1 | 2024-02-08 | [\#35027](https://github.com/airbytehq/airbyte/pull/35027) | Make state handling thread safe in async destination framework. | +| 0.17.0 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Enable configuring async destination batch size. | +| 0.16.6 | 2024-02-07 | [\#34892](https://github.com/airbytehq/airbyte/pull/34892) | Improved testcontainers logging and support for unshared containers. | +| 0.16.5 | 2024-02-07 | [\#34948](https://github.com/airbytehq/airbyte/pull/34948) | Fix source state stats counting logic | +| 0.16.4 | 2024-02-01 | [\#34727](https://github.com/airbytehq/airbyte/pull/34727) | Add future based stdout consumer in BaseTypingDedupingTest | +| 0.16.3 | 2024-01-30 | [\#34669](https://github.com/airbytehq/airbyte/pull/34669) | Fix org.apache.logging.log4j:log4j-slf4j-impl version conflicts. | +| 0.16.2 | 2024-01-29 | [\#34630](https://github.com/airbytehq/airbyte/pull/34630) | expose NamingTransformer to sub-classes in destinations JdbcSqlGenerator. | +| 0.16.1 | 2024-01-29 | [\#34533](https://github.com/airbytehq/airbyte/pull/34533) | Add a safe method to execute DatabaseMetadata's Resultset returning queries. | +| 0.16.0 | 2024-01-26 | [\#34573](https://github.com/airbytehq/airbyte/pull/34573) | Untangle Debezium harness dependencies. | +| 0.15.2 | 2024-01-25 | [\#34441](https://github.com/airbytehq/airbyte/pull/34441) | Improve airbyte-api build performance. | +| 0.15.1 | 2024-01-25 | [\#34451](https://github.com/airbytehq/airbyte/pull/34451) | Async destinations: Better logging when we fail to parse an AirbyteMessage | +| 0.15.0 | 2024-01-23 | [\#34441](https://github.com/airbytehq/airbyte/pull/34441) | Removed connector registry and micronaut dependencies. | +| 0.14.2 | 2024-01-24 | [\#34458](https://github.com/airbytehq/airbyte/pull/34458) | Handle case-sensitivity in sentry error grouping | +| 0.14.1 | 2024-01-24 | [\#34468](https://github.com/airbytehq/airbyte/pull/34468) | Add wait for process to be done before ending sync in destination BaseTDTest | +| 0.14.0 | 2024-01-23 | [\#34461](https://github.com/airbytehq/airbyte/pull/34461) | Revert non backward compatible signature changes from 0.13.1 | +| 0.13.3 | 2024-01-23 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Denote if destinations fully support Destinations V2 | +| 0.13.2 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector | +| 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator | +| 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes | +| 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method | +| 0.12.0 | 2024-01-10 | [\#33875](https://github.com/airbytehq/airbyte/pull/33875) | Upgrade sshd-mina to 2.11.1 | +| 0.11.5 | 2024-01-10 | [\#34119](https://github.com/airbytehq/airbyte/pull/34119) | Remove wal2json support for postgres+debezium. | +| 0.11.4 | 2024-01-09 | [\#33305](https://github.com/airbytehq/airbyte/pull/33305) | Source stats in incremental syncs | +| 0.11.3 | 2023-01-09 | [\#33658](https://github.com/airbytehq/airbyte/pull/33658) | Always fail when debezium fails, even if it happened during the setup phase. | +| 0.11.2 | 2024-01-09 | [\#33969](https://github.com/airbytehq/airbyte/pull/33969) | Destination state stats implementation | +| 0.11.1 | 2024-01-04 | [\#33727](https://github.com/airbytehq/airbyte/pull/33727) | SSH bastion heartbeats for Destinations | +| 0.11.0 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | DV2 T+D uses Sql struct to represent transactions; other T+D-related changes | +| 0.10.4 | 2023-12-20 | [\#33071](https://github.com/airbytehq/airbyte/pull/33071) | Add the ability to parse JDBC parameters with another delimiter than '&' | +| 0.10.3 | 2024-01-03 | [\#33312](https://github.com/airbytehq/airbyte/pull/33312) | Send out count in AirbyteStateMessage | +| 0.10.1 | 2023-12-21 | [\#33723](https://github.com/airbytehq/airbyte/pull/33723) | Make memory-manager log message less scary | +| 0.10.0 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | JdbcDestinationHandler now properly implements `getInitialRawTableState`; reenable SqlGenerator test | +| 0.9.0 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Separate from Table Creation, exclude the T&D module from the CDK | +| 0.8.0 | 2023-12-18 | [\#33506](https://github.com/airbytehq/airbyte/pull/33506) | Improve async destination shutdown logic; more JDBC async migration work; improve DAT test schema handling | +| 0.7.9 | 2023-12-18 | [\#33549](https://github.com/airbytehq/airbyte/pull/33549) | Improve MongoDB logging. | +| 0.7.8 | 2023-12-18 | [\#33365](https://github.com/airbytehq/airbyte/pull/33365) | Emit stream statuses more consistently | +| 0.7.7 | 2023-12-18 | [\#33434](https://github.com/airbytehq/airbyte/pull/33307) | Remove LEGACY state | +| 0.7.6 | 2023-12-14 | [\#32328](https://github.com/airbytehq/airbyte/pull/33307) | Add schema less mode for mongodb CDC. Fixes for non standard mongodb id type. | +| 0.7.4 | 2023-12-13 | [\#33232](https://github.com/airbytehq/airbyte/pull/33232) | Track stream record count during sync; only run T+D if a stream had nonzero records or the previous sync left unprocessed records. | +| 0.7.3 | 2023-12-13 | [\#33369](https://github.com/airbytehq/airbyte/pull/33369) | Extract shared JDBC T+D code. | +| 0.7.2 | 2023-12-11 | [\#33307](https://github.com/airbytehq/airbyte/pull/33307) | Fix DV2 JDBC type mappings (code changes in [\#33307](https://github.com/airbytehq/airbyte/pull/33307)). | +| 0.7.1 | 2023-12-01 | [\#33027](https://github.com/airbytehq/airbyte/pull/33027) | Add the abstract DB source debugger. | +| 0.7.0 | 2023-12-07 | [\#32326](https://github.com/airbytehq/airbyte/pull/32326) | Destinations V2 changes for JDBC destinations | +| 0.6.4 | 2023-12-06 | [\#33082](https://github.com/airbytehq/airbyte/pull/33082) | Improvements to schema snapshot error handling + schema snapshot history scope (scoped to configured DB). | +| 0.6.2 | 2023-11-30 | [\#32573](https://github.com/airbytehq/airbyte/pull/32573) | Update MSSQLConverter to enforce 6-digit microsecond precision for timestamp fields | +| 0.6.1 | 2023-11-30 | [\#32610](https://github.com/airbytehq/airbyte/pull/32610) | Support DB initial sync using binary as primary key. | +| 0.6.0 | 2023-11-30 | [\#32888](https://github.com/airbytehq/airbyte/pull/32888) | JDBC destinations now use the async framework | +| 0.5.3 | 2023-11-28 | [\#32686](https://github.com/airbytehq/airbyte/pull/32686) | Better attribution of debezium engine shutdown due to heartbeat. | +| 0.5.1 | 2023-11-27 | [\#32662](https://github.com/airbytehq/airbyte/pull/32662) | Debezium initialization wait time will now read from initial setup time. | +| 0.5.0 | 2023-11-22 | [\#32656](https://github.com/airbytehq/airbyte/pull/32656) | Introduce TestDatabase test fixture, refactor database source test base classes. | +| 0.4.11 | 2023-11-14 | [\#32526](https://github.com/airbytehq/airbyte/pull/32526) | Clean up memory manager logs. | +| 0.4.10 | 2023-11-13 | [\#32285](https://github.com/airbytehq/airbyte/pull/32285) | Fix UUID codec ordering for MongoDB connector | +| 0.4.9 | 2023-11-13 | [\#32468](https://github.com/airbytehq/airbyte/pull/32468) | Further error grouping improvements for DV2 connectors | +| 0.4.8 | 2023-11-09 | [\#32377](https://github.com/airbytehq/airbyte/pull/32377) | source-postgres tests: skip dropping database | +| 0.4.7 | 2023-11-08 | [\#31856](https://github.com/airbytehq/airbyte/pull/31856) | source-postgres: support for infinity date and timestamps | +| 0.4.5 | 2023-11-07 | [\#32112](https://github.com/airbytehq/airbyte/pull/32112) | Async destinations framework: Allow configuring the queue flush threshold | +| 0.4.4 | 2023-11-06 | [\#32119](https://github.com/airbytehq/airbyte/pull/32119) | Add STANDARD UUID codec to MongoDB debezium handler | +| 0.4.2 | 2023-11-06 | [\#32190](https://github.com/airbytehq/airbyte/pull/32190) | Improve error deinterpolation | +| 0.4.1 | 2023-11-02 | [\#32192](https://github.com/airbytehq/airbyte/pull/32192) | Add 's3-destinations' CDK module. | +| 0.4.0 | 2023-11-02 | [\#32050](https://github.com/airbytehq/airbyte/pull/32050) | Fix compiler warnings. | +| 0.3.0 | 2023-11-02 | [\#31983](https://github.com/airbytehq/airbyte/pull/31983) | Add deinterpolation feature to AirbyteExceptionHandler. | +| 0.2.4 | 2023-10-31 | [\#31807](https://github.com/airbytehq/airbyte/pull/31807) | Handle case of debezium update and delete of records in mongodb. | +| 0.2.3 | 2023-10-31 | [\#32022](https://github.com/airbytehq/airbyte/pull/32022) | Update Debezium version from 2.20 -> 2.4.0. | +| 0.2.2 | 2023-10-31 | [\#31976](https://github.com/airbytehq/airbyte/pull/31976) | Debezium tweaks to make tests run faster. | +| 0.2.0 | 2023-10-30 | [\#31960](https://github.com/airbytehq/airbyte/pull/31960) | Hoist top-level gradle subprojects into CDK. | +| 0.1.12 | 2023-10-24 | [\#31674](https://github.com/airbytehq/airbyte/pull/31674) | Fail sync when Debezium does not shut down properly. | +| 0.1.11 | 2023-10-18 | [\#31486](https://github.com/airbytehq/airbyte/pull/31486) | Update constants in AdaptiveSourceRunner. | +| 0.1.9 | 2023-10-12 | [\#31309](https://github.com/airbytehq/airbyte/pull/31309) | Use toPlainString() when handling BigDecimals in PostgresConverter | +| 0.1.8 | 2023-10-11 | [\#31322](https://github.com/airbytehq/airbyte/pull/31322) | Cap log line length to 32KB to prevent loss of records | +| 0.1.7 | 2023-10-10 | [\#31194](https://github.com/airbytehq/airbyte/pull/31194) | Deallocate unused per stream buffer memory when empty | +| 0.1.6 | 2023-10-10 | [\#31083](https://github.com/airbytehq/airbyte/pull/31083) | Fix precision of numeric values in async destinations | +| 0.1.5 | 2023-10-09 | [\#31196](https://github.com/airbytehq/airbyte/pull/31196) | Update typo in CDK (CDN_LSN -> CDC_LSN) | +| 0.1.4 | 2023-10-06 | [\#31139](https://github.com/airbytehq/airbyte/pull/31139) | Reduce async buffer | +| 0.1.1 | 2023-09-28 | [\#30835](https://github.com/airbytehq/airbyte/pull/30835) | JDBC destinations now avoid staging area name collisions by using the raw table name as the stage name. (previously we used the stream name as the stage name) | +| 0.1.0 | 2023-09-27 | [\#30445](https://github.com/airbytehq/airbyte/pull/30445) | First launch, including shared classes for all connectors. | +| 0.0.2 | 2023-08-21 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Version bump only (no other changes). | +| 0.0.1 | 2023-08-08 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Initial release for testing. | \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.java index 9f4ae86cfe78..dc49697d3e99 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.java @@ -10,6 +10,7 @@ import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.exceptions.ConnectionErrorException; +import io.airbyte.commons.functional.Either; import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; import java.util.Collection; @@ -85,6 +86,17 @@ public static void logAllAndThrowFirst(final String initia } } + public static List getResultsOrLogAndThrowFirst(final String initialMessage, + final List> eithers) + throws T { + List throwables = eithers.stream().filter(Either::isLeft).map(Either::getLeft).toList(); + if (!throwables.isEmpty()) { + logAllAndThrowFirst(initialMessage, throwables); + } + // No need to filter on isRight since isLeft will throw before reaching this line. + return eithers.stream().map(Either::getRight).toList(); + } + private static boolean isConfigErrorException(Throwable e) { return e instanceof ConfigErrorException; } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 1df5d31312c9..4433e215f812 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.22.1 +version=0.23.0 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java index f43e1abdea05..c060133a5546 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -252,9 +252,7 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map destinationHandler = getDestinationHandler(databaseName, database); + final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database); final boolean disableTypeDedupe = !config.has(DISABLE_TYPE_DEDUPE) || config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); final TyperDeduper typerDeduper; if (disableTypeDedupe) { - typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator); + typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator); } else { typerDeduper = - new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator); + new DefaultTyperDeduper(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator); } return typerDeduper; } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/ColumnDefinition.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/ColumnDefinition.java index 68e715d2cbb2..fe41101366c2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/ColumnDefinition.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/ColumnDefinition.java @@ -4,8 +4,13 @@ package io.airbyte.cdk.integrations.destination.jdbc; -import java.sql.SQLType; - -public record ColumnDefinition(String name, String type, SQLType sqlType, int columnSize) { +/** + * Jdbc destination column definition representation + * + * @param name + * @param type + * @param columnSize + */ +public record ColumnDefinition(String name, String type, int columnSize, boolean isNullable) { } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/CustomSqlType.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/CustomSqlType.java deleted file mode 100644 index dad853bb8e08..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/CustomSqlType.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.destination.jdbc; - -import java.sql.SQLType; - -/** - * Custom SqlType definition when there is no mapping in {@link java.sql.JDBCType} - * - * @param name - * @param vendor - * @param vendorTypeNumber - */ -public record CustomSqlType(String name, String vendor, Integer vendorTypeNumber) implements SQLType { - - @Override - public String getName() { - return name; - } - - @Override - public String getVendor() { - return vendor; - } - - @Override - public Integer getVendorTypeNumber() { - return vendorTypeNumber; - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TableDefinition.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TableDefinition.java index 353d6d03cb44..c8fc4f2e7ca8 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TableDefinition.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TableDefinition.java @@ -7,7 +7,7 @@ import java.util.LinkedHashMap; /** - * Jdbc destination table definition representation + * Jdbc destination table definition representation with a map of column names to column definitions * * @param columns */ diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TableSchemaRecordSet.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TableSchemaRecordSet.java deleted file mode 100644 index f87d57218c48..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TableSchemaRecordSet.java +++ /dev/null @@ -1,9 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.destination.jdbc; - -public record TableSchemaRecordSet() { - -} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TypeInfoRecordSet.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TypeInfoRecordSet.java deleted file mode 100644 index 2ef35e795b24..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TypeInfoRecordSet.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.destination.jdbc; - -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.util.LinkedHashMap; - -/** - * A record representing the {@link java.sql.ResultSet} returned by calling - * {@link DatabaseMetaData#getTypeInfo()} - *

- * See that method for a better description of the parameters to this record - */ -public record TypeInfoRecordSet( - String typeName, - int dataType, - int precision, - String literalPrefix, - String literalSuffix, - String createParams, - short nullable, - boolean caseSensitive, - short searchable, - boolean unsignedAttribute, - boolean fixedPrecScale, - boolean autoIncrement, - String localTypeName, - short minimumScale, - short maximumScale, - - // Unused - int sqlDataType, - - // Unused - int sqlDatetimeSub, - int numPrecRadix) { - - public static LinkedHashMap getTypeInfoList(final DatabaseMetaData databaseMetaData) throws Exception { - final LinkedHashMap types = new LinkedHashMap<>(); - try (final ResultSet rs = databaseMetaData.getTypeInfo()) { - while (rs.next()) { - final var typeName = rs.getString("TYPE_NAME"); - types.put(typeName, - new TypeInfoRecordSet( - typeName, - rs.getInt("DATA_TYPE"), - rs.getInt("PRECISION"), - rs.getString("LITERAL_PREFIX"), - rs.getString("LITERAL_SUFFIX"), - rs.getString("CREATE_PARAMS"), - rs.getShort("NULLABLE"), - rs.getBoolean("CASE_SENSITIVE"), - rs.getShort("SEARCHABLE"), - rs.getBoolean("UNSIGNED_ATTRIBUTE"), - rs.getBoolean("FIXED_PREC_SCALE"), - rs.getBoolean("AUTO_INCREMENT"), - rs.getString("LOCAL_TYPE_NAME"), - rs.getShort("MINIMUM_SCALE"), - rs.getShort("MAXIMUM_SCALE"), - rs.getInt("SQL_DATA_TYPE"), - rs.getInt("SQL_DATETIME_SUB"), - rs.getInt("NUM_PREC_RADIX"))); - } - } - return types; - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java index 26a50e6586e5..1aa0b687f8c5 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java @@ -4,6 +4,10 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS; import static org.jooq.impl.DSL.exists; import static org.jooq.impl.DSL.field; import static org.jooq.impl.DSL.name; @@ -12,16 +16,23 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition; -import io.airbyte.cdk.integrations.destination.jdbc.CustomSqlType; import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; +import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil; +import io.airbyte.commons.concurrency.CompletableFutures; import io.airbyte.commons.exceptions.SQLRuntimeException; +import io.airbyte.commons.functional.Either; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStateImpl; +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState; import io.airbyte.integrations.base.destination.typing_deduping.Sql; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; -import java.sql.JDBCType; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.SQLType; import java.sql.Timestamp; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -30,6 +41,8 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.jooq.conf.ParamType; @@ -38,7 +51,7 @@ import org.slf4j.LoggerFactory; @Slf4j -public class JdbcDestinationHandler implements DestinationHandler { +public abstract class JdbcDestinationHandler implements DestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(JdbcDestinationHandler.class); @@ -51,18 +64,11 @@ public JdbcDestinationHandler(final String databaseName, this.jdbcDatabase = jdbcDatabase; } - @Override - public Optional findExistingTable(final StreamId id) throws Exception { + private Optional findExistingTable(final StreamId id) throws Exception { return findExistingTable(jdbcDatabase, databaseName, id.finalNamespace(), id.finalName()); } - @Override - public LinkedHashMap findExistingFinalTables(List streamIds) throws Exception { - return null; - } - - @Override - public boolean isFinalTableEmpty(final StreamId id) throws Exception { + private boolean isFinalTableEmpty(final StreamId id) throws Exception { return !jdbcDatabase.queryBoolean( select( field(exists( @@ -72,8 +78,7 @@ public boolean isFinalTableEmpty(final StreamId id) throws Exception { .getSQL(ParamType.INLINED)); } - @Override - public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { + private InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { boolean tableExists = jdbcDatabase.executeMetadataQuery(dbmetadata -> { LOGGER.info("Retrieving table from Db metadata: {} {} {}", databaseName, id.rawNamespace(), id.rawName()); try (final ResultSet table = dbmetadata.getTables(databaseName, id.rawNamespace(), id.rawName(), null)) { @@ -143,6 +148,39 @@ public void execute(final Sql sql) throws Exception { } } + @Override + public List gatherInitialState(List streamConfigs) throws Exception { + final List> initialStates = streamConfigs.stream() + .map(this::retrieveState) + .toList(); + final List> states = CompletableFutures.allOf(initialStates).toCompletableFuture().join(); + return ConnectorExceptionUtil.getResultsOrLogAndThrowFirst("Failed to retrieve initial state", states); + } + + private CompletionStage retrieveState(final StreamConfig streamConfig) { + return CompletableFuture.supplyAsync(() -> { + try { + final Optional finalTableDefinition = findExistingTable(streamConfig.id()); + final boolean isSchemaMismatch; + final boolean isFinalTableEmpty; + if (finalTableDefinition.isPresent()) { + isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, finalTableDefinition.get()); + isFinalTableEmpty = isFinalTableEmpty(streamConfig.id()); + } else { + // If the final table doesn't exist, then by definition it doesn't have a schema mismatch and has no + // records. + isSchemaMismatch = false; + isFinalTableEmpty = true; + } + final InitialRawTableState initialRawTableState = getInitialRawTableState(streamConfig.id()); + return new DestinationInitialStateImpl(streamConfig, finalTableDefinition.isPresent(), initialRawTableState, + isSchemaMismatch, isFinalTableEmpty); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + public static Optional findExistingTable(final JdbcDatabase jdbcDatabase, final String databaseName, final String schemaName, @@ -159,16 +197,8 @@ public static Optional findExistingTable(final JdbcDatabase jdb final String columnName = columns.getString("COLUMN_NAME"); final String typeName = columns.getString("TYPE_NAME"); final int columnSize = columns.getInt("COLUMN_SIZE"); - final int datatype = columns.getInt("DATA_TYPE"); - SQLType sqlType; - try { - sqlType = JDBCType.valueOf(datatype); - } catch (final IllegalArgumentException e) { - // Unknown jdbcType convert to customSqlType - LOGGER.warn("Unrecognized JDBCType {}; falling back to UNKNOWN", datatype, e); - sqlType = new CustomSqlType("Unknown", "Unknown", datatype); - } - columnDefinitions.put(columnName, new ColumnDefinition(columnName, typeName, sqlType, columnSize)); + final String isNullable = columns.getString("IS_NULLABLE"); + columnDefinitions.put(columnName, new ColumnDefinition(columnName, typeName, columnSize, fromIsNullableIsoString(isNullable))); } } catch (final SQLException e) { LOGGER.error("Failed to retrieve column info for {}.{}.{}", databaseName, schemaName, tableName, e); @@ -184,4 +214,55 @@ public static Optional findExistingTable(final JdbcDatabase jdb return Optional.of(new TableDefinition(retrievedColumnDefns)); } + public static boolean fromIsNullableIsoString(final String isNullable) { + return "YES".equalsIgnoreCase(isNullable); + } + + private boolean isAirbyteRawIdColumnMatch(final TableDefinition existingTable) { + return existingTable.columns().containsKey(COLUMN_NAME_AB_RAW_ID) && + toJdbcTypeName(AirbyteProtocolType.STRING).equals(existingTable.columns().get(COLUMN_NAME_AB_RAW_ID).type()); + } + + private boolean isAirbyteExtractedAtColumnMatch(final TableDefinition existingTable) { + return existingTable.columns().containsKey(COLUMN_NAME_AB_EXTRACTED_AT) && + toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE).equals(existingTable.columns().get(COLUMN_NAME_AB_EXTRACTED_AT).type()); + } + + private boolean isAirbyteMetaColumnMatch(final TableDefinition existingTable) { + return existingTable.columns().containsKey(COLUMN_NAME_AB_META) && + toJdbcTypeName(new Struct(new LinkedHashMap<>())).equals(existingTable.columns().get(COLUMN_NAME_AB_META).type()); + } + + protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) { + // Check that the columns match, with special handling for the metadata columns. + if (!isAirbyteRawIdColumnMatch(existingTable) || + !isAirbyteExtractedAtColumnMatch(existingTable) || + !isAirbyteMetaColumnMatch(existingTable)) { + // Missing AB meta columns from final table, we need them to do proper T+D so trigger soft-reset + return false; + } + final LinkedHashMap intendedColumns = stream.columns().entrySet().stream() + .collect(LinkedHashMap::new, + (map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())), + LinkedHashMap::putAll); + + // Filter out Meta columns since they don't exist in stream config. + final LinkedHashMap actualColumns = existingTable.columns().entrySet().stream() + .filter(column -> V2_FINAL_TABLE_METADATA_COLUMNS.stream() + .noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) + .collect(LinkedHashMap::new, + (map, column) -> map.put(column.getKey(), column.getValue().type()), + LinkedHashMap::putAll); + + return actualColumns.equals(intendedColumns); + } + + /** + * Convert to the TYPE_NAME retrieved from {@link java.sql.DatabaseMetaData#getColumns} + * + * @param airbyteType + * @return + */ + protected abstract String toJdbcTypeName(final AirbyteType airbyteType); + } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.java index ec6fab26772d..d32a84de478e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.java @@ -29,7 +29,6 @@ import com.google.common.annotations.VisibleForTesting; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; -import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; import io.airbyte.integrations.base.destination.typing_deduping.Array; @@ -66,7 +65,7 @@ import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; -public abstract class JdbcSqlGenerator implements SqlGenerator { +public abstract class JdbcSqlGenerator implements SqlGenerator { protected static final String ROW_NUMBER_COLUMN_NAME = "row_number"; private static final String TYPING_CTE_ALIAS = "intermediate_data"; diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt index 17447dbf0833..5c9963f5bad8 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt @@ -56,11 +56,4 @@ class RawOnlySqlGenerator(private val namingTransformer: NamingConventionTransfo ): Field? { throw NotImplementedError("This Destination does not support final tables") } - - override fun existingSchemaMatchesStreamConfig( - stream: StreamConfig, - existingTable: TableDefinition - ): Boolean { - throw NotImplementedError("This Destination does not support final tables") - } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java index f532b8ba8766..a5a07903ad48 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java @@ -9,8 +9,10 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.integrations.destination.StandardNameTransformer; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; @@ -139,6 +141,11 @@ protected JdbcSqlGenerator getSqlGenerator() { return null; } + @Override + protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) { + return null; + } + } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java index a71711cf17c4..df4ca42e004b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest; @@ -37,7 +36,7 @@ import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; -public abstract class JdbcSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { +public abstract class JdbcSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { protected abstract JdbcDatabase getDatabase(); diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/java/io/airbyte/commons/concurrency/CompletableFutures.java b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/java/io/airbyte/commons/concurrency/CompletableFutures.java index 7e17307fa9d0..d7330332ca57 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/java/io/airbyte/commons/concurrency/CompletableFutures.java +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/java/io/airbyte/commons/concurrency/CompletableFutures.java @@ -4,25 +4,55 @@ package io.airbyte.commons.concurrency; +import io.airbyte.commons.functional.Either; +import java.lang.reflect.Array; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; public class CompletableFutures { /** - * Utility method which blocks until all futures are complete. Returns a list of the results of all - * futures. + * Non-blocking implementation which does not use join. and returns an aggregated future. The order + * of results is preserved from the original list of futures. * - * @param futures - * @return - * @param + * @param futures list of futures + * @param type of result + * @return a future that completes when all the input futures have completed */ - public static List allOf(final List> futures) { - // return CompletableFuture - // .allOf(futures.toArray(new CompletableFuture[0])) - // .thenApply(v -> futures.stream().map(CompletableFuture::join).toList()) - // .join(); - return null; + public static CompletionStage>> allOf(final List> futures) { + CompletableFuture>> result = new CompletableFuture<>(); + final int size = futures.size(); + final AtomicInteger counter = new AtomicInteger(); + @SuppressWarnings("unchecked") + final Either[] results = (Either[]) Array.newInstance(Either.class, size); + // attach a whenComplete to all futures + for (int i = 0; i < size; i++) { + final int currentIndex = i; + futures.get(i).whenComplete((value, exception) -> { + // if exception is null, then the future completed successfully + // maybe synchronization is unnecessary here, but it's better to be safe + synchronized (results) { + if (exception == null) { + results[currentIndex] = Either.right(value); + } else { + if (exception instanceof Exception) { + results[currentIndex] = Either.left((Exception) exception); + } else { + // this should never happen + throw new RuntimeException("Unexpected exception in a future completion.", exception); + } + } + } + int completedCount = counter.incrementAndGet(); + if (completedCount == size) { + result.complete(Arrays.asList(results)); + } + }); + } + return result; } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/java/io/airbyte/commons/functional/Either.java b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/java/io/airbyte/commons/functional/Either.java new file mode 100644 index 000000000000..187b109e42f2 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/java/io/airbyte/commons/functional/Either.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.functional; + +import java.util.Objects; + +/** + * A class that represents a value of one of two possible types (a disjoint union). An instance of + * Either is an instance of Left or Right. + * + * A common use of Either is for error handling in functional programming. By convention, Left is + * failure and Right is success. + * + * @param the type of the left value + * @param the type of the right value + */ +public class Either { + + private final Error left; + private final Result right; + + private Either(Error left, Result right) { + this.left = left; + this.right = right; + } + + public boolean isLeft() { + return left != null; + } + + public boolean isRight() { + return right != null; + } + + public Error getLeft() { + return left; + } + + public Result getRight() { + return right; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Either either = (Either) o; + return Objects.equals(left, either.left) && Objects.equals(right, either.right); + } + + @Override + public int hashCode() { + return Objects.hash(left, right); + } + + public static Either left(Error error) { + return new Either<>(error, null); + } + + public static Either right(Result result) { + return new Either<>(null, result); + } + +} diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/java/io/airbyte/commons/concurrency/CompletableFuturesTest.java b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/java/io/airbyte/commons/concurrency/CompletableFuturesTest.java new file mode 100644 index 000000000000..def67f8e5916 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/java/io/airbyte/commons/concurrency/CompletableFuturesTest.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.concurrency; + +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.commons.functional.Either; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import org.junit.jupiter.api.Test; + +class CompletableFuturesTest { + + @Test + public void testAllOf() { + // Complete in random order + final List> futures = Arrays.asList( + returnSuccessWithDelay(1, 2000), + returnSuccessWithDelay(2, 200), + returnSuccessWithDelay(3, 500), + returnSuccessWithDelay(4, 100), + returnFailureWithDelay("Fail 5", 2000), + returnFailureWithDelay("Fail 6", 300)); + + final CompletableFuture>> allOfResult = CompletableFutures.allOf(futures).toCompletableFuture(); + final List> result = allOfResult.join(); + List> success = result.stream().filter(Either::isRight).toList(); + assertEquals(success, Arrays.asList( + Either.right(1), + Either.right(2), + Either.right(3), + Either.right(4))); + // Extract wrapped CompletionException messages. + final List failureMessages = result.stream().filter(Either::isLeft).map(either -> either.getLeft().getCause().getMessage()).toList(); + assertEquals(failureMessages, Arrays.asList("Fail 5", "Fail 6")); + } + + private CompletableFuture returnSuccessWithDelay(final int value, final long delayMs) { + return CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return value; + }); + } + + private CompletableFuture returnFailureWithDelay(final String message, final long delayMs) { + return CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException(message); + }); + } + +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java index a33c3b715630..95c5841241b7 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java @@ -13,14 +13,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class BaseDestinationV1V2Migrator implements DestinationV1V2Migrator { +public abstract class BaseDestinationV1V2Migrator implements DestinationV1V2Migrator { protected static final Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class); @Override public void migrateIfNecessary( - final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, + final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler, final StreamConfig streamConfig) throws Exception { LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName()); @@ -59,8 +59,8 @@ protected boolean shouldMigrate(final StreamConfig streamConfig) throws Exceptio * @param destinationHandler the class which executes the sql statements * @param streamConfig the stream to migrate the raw table of */ - public void migrate(final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, + public void migrate(final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler, final StreamConfig streamConfig) throws TableNotMigratedException { final var namespacedTableName = convertToV1RawName(streamConfig); diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java index 925d5037ea28..372f2999be64 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java @@ -23,14 +23,14 @@ public class CatalogParser { private static final Logger LOGGER = LoggerFactory.getLogger(CatalogParser.class); - private final SqlGenerator sqlGenerator; + private final SqlGenerator sqlGenerator; private final String rawNamespace; - public CatalogParser(final SqlGenerator sqlGenerator) { + public CatalogParser(final SqlGenerator sqlGenerator) { this(sqlGenerator, DEFAULT_AIRBYTE_INTERNAL_NAMESPACE); } - public CatalogParser(final SqlGenerator sqlGenerator, final String rawNamespace) { + public CatalogParser(final SqlGenerator sqlGenerator, final String rawNamespace) { this.sqlGenerator = sqlGenerator; this.rawNamespace = rawNamespace; } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java index 764f888c4e16..ec49be79cb57 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java @@ -5,19 +5,24 @@ package io.airbyte.integrations.base.destination.typing_deduping; import static io.airbyte.cdk.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME; -import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.getCountOfTypeAndDedupeThreads; +import static io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst; +import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.*; import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions; import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtilKt.prepareAllSchemas; import static java.util.Collections.singleton; import io.airbyte.cdk.integrations.destination.StreamSyncSummary; +import io.airbyte.commons.concurrency.CompletableFutures; +import io.airbyte.commons.functional.Either; import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -45,22 +50,22 @@ * Note that #prepareTables() initializes some internal state. The other methods will throw an * exception if that method was not called. */ -public class DefaultTyperDeduper implements TyperDeduper { +public class DefaultTyperDeduper implements TyperDeduper { private static final Logger LOGGER = LoggerFactory.getLogger(TyperDeduper.class); private static final String NO_SUFFIX = ""; private static final String TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp"; - private final SqlGenerator sqlGenerator; - private final DestinationHandler destinationHandler; + private final SqlGenerator sqlGenerator; + private final DestinationHandler destinationHandler; - private final DestinationV1V2Migrator v1V2Migrator; + private final DestinationV1V2Migrator v1V2Migrator; private final V2TableMigrator v2TableMigrator; private final ParsedCatalog parsedCatalog; private Set overwriteStreamsWithTmpTable; private final Set> streamsWithSuccessfulSetup; - private final Map initialRawTableStateByStream; + private final Map initialRawTableStateByStream; // We only want to run a single instance of T+D per stream at a time. These objects are used for // synchronization per stream. // Use a read-write lock because we need the same semantics: @@ -74,10 +79,10 @@ public class DefaultTyperDeduper implements TyperDeduper private final ExecutorService executorService; - public DefaultTyperDeduper(final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, + public DefaultTyperDeduper(final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler, final ParsedCatalog parsedCatalog, - final DestinationV1V2Migrator v1V2Migrator, + final DestinationV1V2Migrator v1V2Migrator, final V2TableMigrator v2TableMigrator) { this.sqlGenerator = sqlGenerator; this.destinationHandler = destinationHandler; @@ -92,12 +97,10 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerat new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build()); } - public DefaultTyperDeduper( - final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, + public DefaultTyperDeduper(final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler, final ParsedCatalog parsedCatalog, - final DestinationV1V2Migrator v1V2Migrator, - final int defaultThreadCount) { + final DestinationV1V2Migrator v1V2Migrator) { this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator()); } @@ -113,31 +116,45 @@ public void prepareTables() throws Exception { overwriteStreamsWithTmpTable = ConcurrentHashMap.newKeySet(); LOGGER.info("Preparing tables"); + // This is intentionally not done in parallel to avoid rate limits in some destinations. prepareSchemas(parsedCatalog); - final Set>> prepareTablesTasks = new HashSet<>(); - for (final StreamConfig stream : parsedCatalog.streams()) { - prepareTablesTasks.add(prepareTablesFuture(stream)); - } - CompletableFuture.allOf(prepareTablesTasks.toArray(CompletableFuture[]::new)).join(); - reduceExceptions(prepareTablesTasks, "The following exceptions were thrown attempting to prepare tables:\n"); + + // TODO: Either the migrations run the soft reset and create v2 tables or the actual prepare tables. + // unify the logic with current state of raw tables & final tables. This is done first before gather + // initial state to avoid recreating final tables later again. + final List> runMigrationsResult = + CompletableFutures.allOf(parsedCatalog.streams().stream().map(this::runMigrationsAsync).toList()).toCompletableFuture().join(); + getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to run migrations:\n", runMigrationsResult); + final List initialStates = destinationHandler.gatherInitialState(parsedCatalog.streams()); + final List> prepareTablesFutureResult = CompletableFutures.allOf( + initialStates.stream().map(this::prepareTablesFuture).toList()).toCompletableFuture().join(); + getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to prepare tables:\n", prepareTablesFutureResult); } - private CompletableFuture> prepareTablesFuture(final StreamConfig stream) { + private CompletionStage runMigrationsAsync(StreamConfig streamConfig) { + return CompletableFuture.runAsync(() -> { + try { + // Migrate the Raw Tables if this is the first v2 sync after a v1 sync + v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, streamConfig); + v2TableMigrator.migrateIfNecessary(streamConfig); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, this.executorService); + } + + private CompletionStage prepareTablesFuture(final DestinationInitialState initialState) { // For each stream, make sure that its corresponding final table exists. // Also, for OVERWRITE streams, decide if we're writing directly to the final table, or into an // _airbyte_tmp table. return CompletableFuture.supplyAsync(() -> { + final var stream = initialState.streamConfig(); try { - // Migrate the Raw Tables if this is the first v2 sync after a v1 sync - v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream); - v2TableMigrator.migrateIfNecessary(stream); - - final Optional existingTable = destinationHandler.findExistingTable(stream.id()); - if (existingTable.isPresent()) { + if (initialState.isFinalTablePresent()) { LOGGER.info("Final Table exists for stream {}", stream.id().finalName()); // The table already exists. Decide whether we're writing to it directly, or using a tmp table. if (stream.destinationSyncMode() == DestinationSyncMode.OVERWRITE) { - if (!destinationHandler.isFinalTableEmpty(stream.id()) || !sqlGenerator.existingSchemaMatchesStreamConfig(stream, existingTable.get())) { + if (!initialState.isFinalTableEmpty() || initialState.isSchemaMismatch()) { // We want to overwrite an existing table. Write into a tmp table. We'll overwrite the table at the // end of the sync. overwriteStreamsWithTmpTable.add(stream.id()); @@ -149,7 +166,7 @@ private CompletableFuture> prepareTablesFuture(final StreamC stream.id().finalName()); } - } else if (!sqlGenerator.existingSchemaMatchesStreamConfig(stream, existingTable.get())) { + } else if (initialState.isSchemaMismatch()) { // We're loading data directly into the existing table. Make sure it has the right schema. TypeAndDedupeTransaction.executeSoftReset(sqlGenerator, destinationHandler, stream); } @@ -158,8 +175,8 @@ private CompletableFuture> prepareTablesFuture(final StreamC // The table doesn't exist. Create it. Don't force. destinationHandler.execute(sqlGenerator.createTable(stream, NO_SUFFIX, false)); } - final DestinationHandler.InitialRawTableState initialRawTableState = destinationHandler.getInitialRawTableState(stream.id()); - initialRawTableStateByStream.put(stream.id(), initialRawTableState); + + initialRawTableStateByStream.put(stream.id(), initialState.initialRawTableState()); streamsWithSuccessfulSetup.add(Pair.of(stream.id().originalNamespace(), stream.id().originalName())); @@ -172,10 +189,10 @@ private CompletableFuture> prepareTablesFuture(final StreamC // immediately acquire the lock. internalTdLocks.put(stream.id(), new ReentrantLock()); - return Optional.empty(); + return null; } catch (final Exception e) { LOGGER.error("Exception occurred while preparing tables for stream " + stream.id().originalName(), e); - return Optional.of(e); + throw new RuntimeException(e); } }, this.executorService); } @@ -235,7 +252,7 @@ public CompletableFuture> typeAndDedupeTask(final StreamConf final Lock externalLock = tdLocks.get(streamConfig.id()).writeLock(); externalLock.lock(); try { - final DestinationHandler.InitialRawTableState initialRawTableState = initialRawTableStateByStream.get(streamConfig.id()); + final InitialRawTableState initialRawTableState = initialRawTableStateByStream.get(streamConfig.id()); TypeAndDedupeTransaction.executeTypeAndDedupe( sqlGenerator, destinationHandler, diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationHandler.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationHandler.java index c2fb545f318d..f75f0fc9a040 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationHandler.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationHandler.java @@ -4,37 +4,12 @@ package io.airbyte.integrations.base.destination.typing_deduping; -import java.time.Instant; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Optional; -public interface DestinationHandler { - - Optional findExistingTable(StreamId id) throws Exception; - - /** - * Given a list of stream ids, return a map of stream ids to existing tables. If the table is - * missing, the key should not be present in the map. - * - * @param streamIds - * @return - * @throws Exception - */ - LinkedHashMap findExistingFinalTables(List streamIds) throws Exception; - - boolean isFinalTableEmpty(StreamId id) throws Exception; - - /** - * Returns the highest timestamp such that all records with _airbyte_extracted equal to or earlier - * than that timestamp have non-null _airbyte_loaded_at. - *

- * If the raw table is empty or does not exist, return an empty optional. - */ - InitialRawTableState getInitialRawTableState(StreamId id) throws Exception; - - record InitialRawTableState(boolean hasUnprocessedRecords, Optional maxProcessedTimestamp) {} +public interface DestinationHandler { void execute(final Sql sql) throws Exception; + List gatherInitialState(List streamConfigs) throws Exception; + } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialState.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialState.java new file mode 100644 index 000000000000..31aa25770790 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialState.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.typing_deduping; + +/** + * Interface representing the initial state of a destination table. + * + */ +public interface DestinationInitialState { + + StreamConfig streamConfig(); + + boolean isFinalTablePresent(); + + InitialRawTableState initialRawTableState(); + + boolean isSchemaMismatch(); + + boolean isFinalTableEmpty(); + +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialStateImpl.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialStateImpl.java new file mode 100644 index 000000000000..e1fa315c703e --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialStateImpl.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.typing_deduping; + +public record DestinationInitialStateImpl(StreamConfig streamConfig, + boolean isFinalTablePresent, + InitialRawTableState initialRawTableState, + boolean isSchemaMismatch, + boolean isFinalTableEmpty) + implements DestinationInitialState { + +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java index 7e28906673a6..5e1e26e804f1 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.base.destination.typing_deduping; -public interface DestinationV1V2Migrator { +public interface DestinationV1V2Migrator { /** * This is the primary entrypoint to this interface @@ -17,8 +17,8 @@ public interface DestinationV1V2Migrator { * @param streamConfig the stream to assess migration needs */ void migrateIfNecessary( - final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, + final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler, final StreamConfig streamConfig) throws TableNotMigratedException, UnexpectedSchemaException, Exception; diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableState.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableState.java new file mode 100644 index 000000000000..a037daebfa40 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableState.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.typing_deduping; + +import java.time.Instant; +import java.util.Optional; + +public record InitialRawTableState(boolean hasUnprocessedRecords, Optional maxProcessedTimestamp) { + +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java index d9e49257d0a7..f7f5b275768f 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java @@ -4,11 +4,11 @@ package io.airbyte.integrations.base.destination.typing_deduping; -public class NoOpDestinationV1V2Migrator implements DestinationV1V2Migrator { +public class NoOpDestinationV1V2Migrator implements DestinationV1V2Migrator { @Override - public void migrateIfNecessary(final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, + public void migrateIfNecessary(final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler, final StreamConfig streamConfig) throws TableNotMigratedException, UnexpectedSchemaException { // Do nothing diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java index 1fb3faf59def..f76bd2e07019 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java @@ -30,19 +30,19 @@ * json->string migrations in the raw tables. */ @Slf4j -public class NoOpTyperDeduperWithV1V2Migrations implements TyperDeduper { +public class NoOpTyperDeduperWithV1V2Migrations implements TyperDeduper { - private final DestinationV1V2Migrator v1V2Migrator; + private final DestinationV1V2Migrator v1V2Migrator; private final V2TableMigrator v2TableMigrator; private final ExecutorService executorService; private final ParsedCatalog parsedCatalog; - private final SqlGenerator sqlGenerator; - private final DestinationHandler destinationHandler; + private final SqlGenerator sqlGenerator; + private final DestinationHandler destinationHandler; - public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, + public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler, final ParsedCatalog parsedCatalog, - final DestinationV1V2Migrator v1V2Migrator, + final DestinationV1V2Migrator v1V2Migrator, final V2TableMigrator v2TableMigrator) { this.sqlGenerator = sqlGenerator; this.destinationHandler = destinationHandler; diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.java index 568fd688e9bb..bb12237ebbf9 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.java @@ -9,7 +9,7 @@ import java.time.Instant; import java.util.Optional; -public interface SqlGenerator { +public interface SqlGenerator { StreamId buildStreamId(String namespace, String name, String rawNamespaceOverride); @@ -23,9 +23,7 @@ default ColumnId buildColumnId(final String name) { * Generate a SQL statement to create a fresh table to match the given stream. *

* The generated SQL should throw an exception if the table already exists and {@code force} is - * false. Callers should use - * {@link #existingSchemaMatchesStreamConfig(StreamConfig, java.lang.Object)} if the table is known - * to exist, and potentially softReset + * false. * * @param suffix A suffix to add to the stream name. Useful for full refresh overwrite syncs, where * we write the entire sync to a temp table. @@ -43,15 +41,6 @@ default ColumnId buildColumnId(final String name) { */ Sql createSchema(final String schema); - /** - * Check the final table's schema and compare it to what the stream config would generate. - * - * @param stream the stream/stable in question - * @param existingTable the existing table mapped to the stream - * @return whether the existing table matches the expected schema - */ - boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final DialectTableDefinition existingTable); - /** * Generate a SQL statement to copy new data from the raw table into the final table. *

@@ -95,8 +84,8 @@ default ColumnId buildColumnId(final String name) { * reset. * * @param streamId the stream to migrate - * @param namespace - * @param tableName + * @param namespace the namespace of the v1 raw table + * @param tableName name of the v2 raw table * @return a string containing the necessary sql to migrate */ Sql migrateFromV1toV2(StreamId streamId, String namespace, String tableName); diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.java index a1c1f8cc1684..f350c83e76ca 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.java @@ -26,11 +26,11 @@ public class TypeAndDedupeTransaction { * @param suffix table suffix for temporary tables * @throws Exception if the safe query fails */ - public static void executeTypeAndDedupe(final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, - final StreamConfig streamConfig, - final Optional minExtractedAt, - final String suffix) + public static void executeTypeAndDedupe(final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler, + final StreamConfig streamConfig, + final Optional minExtractedAt, + final String suffix) throws Exception { try { LOGGER.info("Attempting typing and deduping for {}.{} with suffix {}", streamConfig.id().originalNamespace(), streamConfig.id().originalName(), @@ -62,7 +62,9 @@ public static void executeTypeAndDedupe(final SqlGenerator prepareAllSchemas(parsedCatalog: ParsedCatalog, sqlGenerator: SqlGenerator, destinationHandler: DestinationHandler) { +fun prepareAllSchemas(parsedCatalog: ParsedCatalog, sqlGenerator: SqlGenerator, destinationHandler: DestinationHandler) { val rawSchema = parsedCatalog.streams.mapNotNull { it.id.rawNamespace } val finalSchema = parsedCatalog.streams.mapNotNull { it.id.finalNamespace } val createAllSchemasSql = rawSchema.union(finalSchema) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.java index 3922f8ebe4bf..b0237657058b 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.java @@ -19,7 +19,7 @@ class CatalogParserTest { - private SqlGenerator sqlGenerator; + private SqlGenerator sqlGenerator; private CatalogParser parser; @BeforeEach diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java index c81629611501..916c0235722d 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java @@ -7,6 +7,7 @@ import static io.airbyte.integrations.base.destination.typing_deduping.Sql.separately; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.ignoreStubs; @@ -31,42 +32,58 @@ public class DefaultTyperDeduperTest { private MockSqlGenerator sqlGenerator; - private DestinationHandler destinationHandler; + private DestinationHandler destinationHandler; - private DestinationV1V2Migrator migrator; + private List initialStates; + + private DestinationV1V2Migrator migrator; private TyperDeduper typerDeduper; @BeforeEach void setup() throws Exception { sqlGenerator = spy(new MockSqlGenerator()); destinationHandler = mock(DestinationHandler.class); - when(destinationHandler.getInitialRawTableState(any())).thenReturn(new DestinationHandler.InitialRawTableState(true, Optional.empty())); - migrator = new NoOpDestinationV1V2Migrator<>(); + DestinationInitialState overwriteNsState = mock(DestinationInitialState.class); + DestinationInitialState appendNsState = mock(DestinationInitialState.class); + DestinationInitialState dedupeNsState = mock(DestinationInitialState.class); + StreamConfig overwriteStreamConfig = new StreamConfig( + new StreamId("overwrite_ns", "overwrite_stream", null, null, "overwrite_ns", "overwrite_stream"), + null, + DestinationSyncMode.OVERWRITE, + null, + null, + null); + StreamConfig appendStreamConfig = new StreamConfig( + new StreamId("append_ns", "append_stream", null, null, "append_ns", "append_stream"), + null, + DestinationSyncMode.APPEND, + null, + null, + null); + StreamConfig dedupeStreamConfig = new StreamConfig( + new StreamId("dedup_ns", "dedup_stream", null, null, "dedup_ns", "dedup_stream"), + null, + DestinationSyncMode.APPEND_DEDUP, + null, + null, + null); + when(overwriteNsState.streamConfig()).thenReturn(overwriteStreamConfig); + when(appendNsState.streamConfig()).thenReturn(appendStreamConfig); + when(dedupeNsState.streamConfig()).thenReturn(dedupeStreamConfig); + + initialStates = List.of(overwriteNsState, appendNsState, dedupeNsState); + when(destinationHandler.gatherInitialState(anyList())) + .thenReturn(initialStates); + initialStates.forEach(initialState -> when(initialState.initialRawTableState()).thenReturn(new InitialRawTableState(true, Optional.empty()))); + + migrator = new NoOpDestinationV1V2Migrator(); final ParsedCatalog parsedCatalog = new ParsedCatalog(List.of( - new StreamConfig( - new StreamId("overwrite_ns", "overwrite_stream", null, null, "overwrite_ns", "overwrite_stream"), - null, - DestinationSyncMode.OVERWRITE, - null, - null, - null), - new StreamConfig( - new StreamId("append_ns", "append_stream", null, null, "append_ns", "append_stream"), - null, - DestinationSyncMode.APPEND, - null, - null, - null), - new StreamConfig( - new StreamId("dedup_ns", "dedup_stream", null, null, "dedup_ns", "dedup_stream"), - null, - DestinationSyncMode.APPEND_DEDUP, - null, - null, - null))); - - typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, 1); + overwriteStreamConfig, + appendStreamConfig, + dedupeStreamConfig)); + + typerDeduper = new DefaultTyperDeduper(sqlGenerator, destinationHandler, parsedCatalog, migrator); } /** @@ -74,7 +91,8 @@ void setup() throws Exception { */ @Test void emptyDestination() throws Exception { - when(destinationHandler.findExistingTable(any())).thenReturn(Optional.empty()); + initialStates.forEach(initialState -> when(initialState.isFinalTablePresent()).thenReturn(false)); + // when(destinationHandler.findExistingTable(any())).thenReturn(Optional.empty()); typerDeduper.prepareTables(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); @@ -103,9 +121,11 @@ void emptyDestination() throws Exception { */ @Test void existingEmptyTable() throws Exception { - when(destinationHandler.findExistingTable(any())).thenReturn(Optional.of("foo")); - when(destinationHandler.isFinalTableEmpty(any())).thenReturn(true); - when(sqlGenerator.existingSchemaMatchesStreamConfig(any(), any())).thenReturn(false); + initialStates.forEach(initialState -> { + when(initialState.isFinalTablePresent()).thenReturn(true); + when(initialState.isFinalTableEmpty()).thenReturn(true); + when(initialState.isSchemaMismatch()).thenReturn(true); + }); typerDeduper.prepareTables(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); verify(destinationHandler).execute(Sql.of("CREATE TABLE overwrite_ns.overwrite_stream_airbyte_tmp")); @@ -138,9 +158,11 @@ void existingEmptyTable() throws Exception { */ @Test void existingEmptyTableMatchingSchema() throws Exception { - when(destinationHandler.findExistingTable(any())).thenReturn(Optional.of("foo")); - when(destinationHandler.isFinalTableEmpty(any())).thenReturn(true); - when(sqlGenerator.existingSchemaMatchesStreamConfig(any(), any())).thenReturn(true); + initialStates.forEach(initialState -> { + when(initialState.isFinalTablePresent()).thenReturn(true); + when(initialState.isFinalTableEmpty()).thenReturn(true); + when(initialState.isSchemaMismatch()).thenReturn(true); + }); typerDeduper.prepareTables(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); @@ -154,10 +176,12 @@ void existingEmptyTableMatchingSchema() throws Exception { */ @Test void existingNonemptyTable() throws Exception { - when(destinationHandler.getInitialRawTableState(any())) - .thenReturn(new DestinationHandler.InitialRawTableState(true, Optional.of(Instant.parse("2023-01-01T12:34:56Z")))); - when(destinationHandler.findExistingTable(any())).thenReturn(Optional.of("foo")); - when(destinationHandler.isFinalTableEmpty(any())).thenReturn(false); + initialStates.forEach(initialState -> { + when(initialState.isFinalTablePresent()).thenReturn(true); + when(initialState.isFinalTableEmpty()).thenReturn(false); + when(initialState.isSchemaMismatch()).thenReturn(true); + when(initialState.initialRawTableState()).thenReturn(new InitialRawTableState(true, Optional.of(Instant.parse("2023-01-01T12:34:56Z")))); + }); typerDeduper.prepareTables(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); @@ -197,10 +221,12 @@ void existingNonemptyTable() throws Exception { */ @Test void existingNonemptyTableMatchingSchema() throws Exception { - when(destinationHandler.getInitialRawTableState(any())).thenReturn(new DestinationHandler.InitialRawTableState(true, Optional.of(Instant.now()))); - when(destinationHandler.findExistingTable(any())).thenReturn(Optional.of("foo")); - when(destinationHandler.isFinalTableEmpty(any())).thenReturn(false); - when(sqlGenerator.existingSchemaMatchesStreamConfig(any(), any())).thenReturn(true); + initialStates.forEach(initialState -> { + when(initialState.isFinalTablePresent()).thenReturn(true); + when(initialState.isFinalTableEmpty()).thenReturn(false); + when(initialState.isSchemaMismatch()).thenReturn(false); + when(initialState.initialRawTableState()).thenReturn(new InitialRawTableState(true, Optional.of(Instant.now()))); + }); typerDeduper.prepareTables(); // NB: We only create one tmp table here. @@ -236,7 +262,7 @@ void failedSetup() throws Exception { */ @Test void noUnprocessedRecords() throws Exception { - when(destinationHandler.getInitialRawTableState(any())).thenReturn(new DestinationHandler.InitialRawTableState(false, Optional.empty())); + initialStates.forEach(initialState -> when(initialState.initialRawTableState()).thenReturn(new InitialRawTableState(false, Optional.empty()))); typerDeduper.prepareTables(); clearInvocations(destinationHandler); @@ -258,8 +284,8 @@ void noUnprocessedRecords() throws Exception { */ @Test void unprocessedRecords() throws Exception { - when(destinationHandler.getInitialRawTableState(any())) - .thenReturn(new DestinationHandler.InitialRawTableState(true, Optional.of(Instant.parse("2023-01-23T12:34:56Z")))); + initialStates.forEach(initialState -> when(initialState.initialRawTableState()) + .thenReturn(new InitialRawTableState(true, Optional.of(Instant.parse("2023-01-23T12:34:56Z"))))); typerDeduper.prepareTables(); clearInvocations(destinationHandler); diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java index 86893c442ef9..0e4c80321055 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java @@ -74,7 +74,7 @@ public void testMismatchedSchemaThrowsException() throws Exception { public void testMigrate() throws Exception { final var sqlGenerator = new MockSqlGenerator(); final StreamConfig stream = new StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null); - final DestinationHandler handler = Mockito.mock(DestinationHandler.class); + final DestinationHandler handler = Mockito.mock(DestinationHandler.class); final var sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table"); // All is well final var migrator = noIssuesMigrator(); diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.java index c9fed75526a1..3ef59aa91e21 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.java @@ -10,7 +10,7 @@ /** * Basic SqlGenerator mock. See {@link DefaultTyperDeduperTest} for example usage. */ -class MockSqlGenerator implements SqlGenerator { +class MockSqlGenerator implements SqlGenerator { @Override public StreamId buildStreamId(final String namespace, final String name, final String rawNamespaceOverride) { @@ -32,11 +32,6 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo return Sql.of("CREATE TABLE " + stream.id().finalTableId("", suffix)); } - @Override - public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final String existingTable) throws TableNotMigratedException { - return false; - } - @Override public Sql updateTable(final StreamConfig stream, final String finalSuffix, diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index cfc7eae3fa8a..93e62670a99d 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -12,7 +12,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Streams; @@ -51,7 +50,7 @@ * {@link #getDestinationHandler()} in a {@link org.junit.jupiter.api.BeforeEach} method. */ @Execution(ExecutionMode.CONCURRENT) -public abstract class BaseSqlGeneratorIntegrationTest { +public abstract class BaseSqlGeneratorIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(BaseSqlGeneratorIntegrationTest.class); /** @@ -104,8 +103,8 @@ public abstract class BaseSqlGeneratorIntegrationTest { */ protected StreamConfig cdcIncrementalAppendStream; - protected SqlGenerator generator; - protected DestinationHandler destinationHandler; + protected SqlGenerator generator; + protected DestinationHandler destinationHandler; protected String namespace; protected StreamId streamId; @@ -113,9 +112,9 @@ public abstract class BaseSqlGeneratorIntegrationTest { private ColumnId cursor; private LinkedHashMap COLUMNS; - protected abstract SqlGenerator getSqlGenerator(); + protected abstract SqlGenerator getSqlGenerator(); - protected abstract DestinationHandler getDestinationHandler(); + protected abstract DestinationHandler getDestinationHandler(); /** * Subclasses should override this method if they need to make changes to the stream ID. For @@ -273,6 +272,14 @@ public void teardown() throws Exception { teardownNamespace(namespace); } + private DestinationInitialState getDestinationInitialState(StreamConfig streamConfig) throws Exception { + final List initialState = + destinationHandler.gatherInitialState(List.of(streamConfig)); + assertEquals(1, initialState.size(), "gatherInitialState returned the wrong number of futures"); + assertTrue(initialState.getFirst().isFinalTablePresent(), "Destination handler could not find existing table"); + return initialState.getFirst(); + } + /** * Create a table and verify that we correctly recognize it as identical to itself. */ @@ -280,14 +287,9 @@ public void teardown() throws Exception { public void detectNoSchemaChange() throws Exception { final Sql createTable = generator.createTable(incrementalDedupStream, "", false); destinationHandler.execute(createTable); - - final Optional existingTable = destinationHandler.findExistingTable(streamId); - if (!existingTable.isPresent()) { - fail("Destination handler could not find existing table"); - } - - assertTrue( - generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTable.get()), + final DestinationInitialState destinationInitialState = getDestinationInitialState(incrementalDedupStream); + assertFalse( + destinationInitialState.isSchemaMismatch(), "Unchanged schema was incorrectly detected as a schema change."); } @@ -298,18 +300,12 @@ public void detectNoSchemaChange() throws Exception { public void detectColumnAdded() throws Exception { final Sql createTable = generator.createTable(incrementalDedupStream, "", false); destinationHandler.execute(createTable); - - final Optional existingTable = destinationHandler.findExistingTable(streamId); - if (!existingTable.isPresent()) { - fail("Destination handler could not find existing table"); - } - incrementalDedupStream.columns().put( generator.buildColumnId("new_column"), AirbyteProtocolType.STRING); - - assertFalse( - generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTable.get()), + final DestinationInitialState destinationInitialState = getDestinationInitialState(incrementalDedupStream); + assertTrue( + destinationInitialState.isSchemaMismatch(), "Adding a new column was not detected as a schema change."); } @@ -320,16 +316,10 @@ public void detectColumnAdded() throws Exception { public void detectColumnRemoved() throws Exception { final Sql createTable = generator.createTable(incrementalDedupStream, "", false); destinationHandler.execute(createTable); - - final Optional existingTable = destinationHandler.findExistingTable(streamId); - if (!existingTable.isPresent()) { - fail("Destination handler could not find existing table"); - } - incrementalDedupStream.columns().remove(generator.buildColumnId("string")); - - assertFalse( - generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTable.get()), + final DestinationInitialState destinationInitialState = getDestinationInitialState(incrementalDedupStream); + assertTrue( + destinationInitialState.isSchemaMismatch(), "Removing a column was not detected as a schema change."); } @@ -340,18 +330,12 @@ public void detectColumnRemoved() throws Exception { public void detectColumnChanged() throws Exception { final Sql createTable = generator.createTable(incrementalDedupStream, "", false); destinationHandler.execute(createTable); - - final Optional existingTable = destinationHandler.findExistingTable(streamId); - if (!existingTable.isPresent()) { - fail("Destination handler could not find existing table"); - } - incrementalDedupStream.columns().put( generator.buildColumnId("string"), AirbyteProtocolType.INTEGER); - - assertFalse( - generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTable.get()), + final DestinationInitialState destinationInitialState = getDestinationInitialState(incrementalDedupStream); + assertTrue( + destinationInitialState.isSchemaMismatch(), "Altering a column was not detected as a schema change."); } @@ -389,6 +373,11 @@ public void incrementalDedupSameNameNamespace() throws Exception { verifyRecordCounts(1, rawRecords, 1, finalRecords); } + private DestinationInitialState getOnly(final List initialStates) { + assertEquals(1, initialStates.size()); + return initialStates.getFirst(); + } + /** * Run a full T+D update for an incremental-dedup stream, writing to a final table with "_foo" * suffix, with values for all data types. Verifies all behaviors for all types: @@ -414,7 +403,8 @@ public void allTypes() throws Exception { streamId, BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl")); - assertTrue(destinationHandler.isFinalTableEmpty(streamId), "Final table should be empty before T+D"); + DestinationInitialState initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream))); + assertTrue(initialState.isFinalTableEmpty(), "Final table should be empty before T+D"); TypeAndDedupeTransaction.executeTypeAndDedupe(generator, destinationHandler, incrementalDedupStream, Optional.empty(), ""); @@ -423,7 +413,8 @@ public void allTypes() throws Exception { dumpRawTableRecords(streamId), "sqlgenerator/alltypes_expectedrecords_final.jsonl", dumpFinalTableRecords(streamId, "")); - assertFalse(destinationHandler.isFinalTableEmpty(streamId), "Final table should not be empty after T+D"); + initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream))); + assertFalse(initialState.isFinalTableEmpty(), "Final table should not be empty after T+D"); } /** @@ -437,13 +428,22 @@ public void allTypesUnsafe() throws Exception { streamId, BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_unsafe_inputrecords.jsonl")); - assertTrue(destinationHandler.isFinalTableEmpty(streamId), "Final table should be empty before T+D"); + DestinationInitialState initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream))); + assertTrue(initialState.isFinalTableEmpty(), "Final table should be empty before T+D"); // Instead of using the full T+D transaction, explicitly run with useSafeCasting=false. final Sql unsafeSql = generator.updateTable(incrementalDedupStream, "", Optional.empty(), false); destinationHandler.execute(unsafeSql); - assertFalse(destinationHandler.isFinalTableEmpty(streamId), "Final table should not be empty after T+D"); + initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream))); + assertFalse(initialState.isFinalTableEmpty(), "Final table should not be empty after T+D"); + } + + private InitialRawTableState getInitialRawTableState(StreamConfig streamConfig) throws Exception { + List initialStates = + destinationHandler.gatherInitialState(List.of(streamConfig)); + assertEquals(1, initialStates.size()); + return initialStates.getFirst().initialRawTableState(); } /** @@ -453,11 +453,11 @@ public void allTypesUnsafe() throws Exception { @Test public void minTimestampBehavesCorrectly() throws Exception { // When the raw table doesn't exist, there are no unprocessed records and no timestamp - assertEquals(new DestinationHandler.InitialRawTableState(false, Optional.empty()), destinationHandler.getInitialRawTableState(streamId)); + assertEquals(new InitialRawTableState(false, Optional.empty()), getInitialRawTableState(incrementalAppendStream)); // When the raw table is empty, there are still no unprocessed records and no timestamp createRawTable(streamId); - assertEquals(new DestinationHandler.InitialRawTableState(false, Optional.empty()), destinationHandler.getInitialRawTableState(streamId)); + assertEquals(new InitialRawTableState(false, Optional.empty()), getInitialRawTableState(incrementalAppendStream)); // If we insert some raw records with null loaded_at, we should get the min extracted_at insertRawTableRecords( @@ -479,7 +479,7 @@ public void minTimestampBehavesCorrectly() throws Exception { "_airbyte_data": {} } """))); - DestinationHandler.InitialRawTableState tableState = destinationHandler.getInitialRawTableState(streamId); + InitialRawTableState tableState = getInitialRawTableState(incrementalAppendStream); assertTrue(tableState.hasUnprocessedRecords(), "When all raw records have null loaded_at, we should recognize that there are unprocessed records"); assertTrue( @@ -492,8 +492,8 @@ public void minTimestampBehavesCorrectly() throws Exception { TypeAndDedupeTransaction.executeTypeAndDedupe(generator, destinationHandler, incrementalAppendStream, Optional.empty(), ""); assertEquals( - destinationHandler.getInitialRawTableState(streamId), - new DestinationHandler.InitialRawTableState(false, Optional.of(Instant.parse("2023-01-02T00:00:00Z"))), + getInitialRawTableState(incrementalAppendStream), + new InitialRawTableState(false, Optional.of(Instant.parse("2023-01-02T00:00:00Z"))), "When all raw records have non-null loaded_at, we should recognize that there are no unprocessed records, and the min timestamp should be equal to the latest extracted_at"); // If we insert another raw record with older extracted_at than the typed records, we should fetch a @@ -511,7 +511,7 @@ public void minTimestampBehavesCorrectly() throws Exception { "_airbyte_data": {} } """))); - tableState = destinationHandler.getInitialRawTableState(streamId); + tableState = getInitialRawTableState(incrementalAppendStream); // this is a pretty confusing pair of assertions. To explain them in more detail: There are three // records in the raw table: // * loaded_at not null, extracted_at = 2023-01-01 00:00Z @@ -549,7 +549,7 @@ public void handlePreexistingRecords() throws Exception { streamId, BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl")); - final DestinationHandler.InitialRawTableState tableState = destinationHandler.getInitialRawTableState(streamId); + final InitialRawTableState tableState = getInitialRawTableState(incrementalDedupStream); assertAll( () -> assertTrue(tableState.hasUnprocessedRecords(), "After writing some raw records, we should recognize that there are unprocessed records"), @@ -575,7 +575,7 @@ public void handleNoPreexistingRecords() throws Exception { generator.buildColumnId("IamACaseSensitiveColumnName"), AirbyteProtocolType.STRING); createRawTable(streamId); - final DestinationHandler.InitialRawTableState tableState = destinationHandler.getInitialRawTableState(streamId); + final InitialRawTableState tableState = getInitialRawTableState(incrementalDedupStream); assertAll( () -> assertFalse(tableState.hasUnprocessedRecords(), "With an empty raw table, we should recognize that there are no unprocessed records"), () -> assertEquals(Optional.empty(), tableState.maxProcessedTimestamp(), "With an empty raw table, the min timestamp should be empty")); @@ -900,7 +900,7 @@ public void testCdcOrdering_updateAfterDelete() throws Exception { streamId, BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_updateafterdelete_inputrecords.jsonl")); - final DestinationHandler.InitialRawTableState tableState = destinationHandler.getInitialRawTableState(cdcIncrementalDedupStream.id()); + final InitialRawTableState tableState = getInitialRawTableState(cdcIncrementalDedupStream); TypeAndDedupeTransaction.executeTypeAndDedupe(generator, destinationHandler, cdcIncrementalDedupStream, tableState.maxProcessedTimestamp(), ""); verifyRecordCounts( @@ -937,7 +937,7 @@ public void testCdcOrdering_insertAfterDelete() throws Exception { "", BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_insertafterdelete_inputrecords_final.jsonl")); - final DestinationHandler.InitialRawTableState tableState = destinationHandler.getInitialRawTableState(cdcIncrementalAppendStream.id()); + final InitialRawTableState tableState = getInitialRawTableState(cdcIncrementalAppendStream); TypeAndDedupeTransaction.executeTypeAndDedupe(generator, destinationHandler, cdcIncrementalDedupStream, tableState.maxProcessedTimestamp(), ""); verifyRecordCounts( 2, @@ -1241,8 +1241,8 @@ public void testCreateTableForce() throws Exception { assertThrows(Exception.class, () -> destinationHandler.execute(createTableNoForce)); // This should not throw an exception destinationHandler.execute(createTableForce); - - assertTrue(destinationHandler.findExistingTable(streamId).isPresent()); + // This method call ensures assertion than finalTable exists + getDestinationInitialState(incrementalDedupStream); } protected void createFinalTable(final StreamConfig stream, final String suffix) throws Exception { diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index 5c7cd00e8ae2..e52c669dc798 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -156,7 +156,7 @@ protected boolean checkTableExists(final String streamNamespace, final String st */ protected abstract void teardownStreamAndNamespace(String streamNamespace, String streamName) throws Exception; - protected abstract SqlGenerator getSqlGenerator(); + protected abstract SqlGenerator getSqlGenerator(); /** * Destinations which need to clean up resources after an entire test finishes should override this @@ -215,7 +215,7 @@ public void setup() throws Exception { streamName = "test_stream" + getUniqueSuffix(); streamsToTearDown = new ArrayList<>(); - final SqlGenerator generator = getSqlGenerator(); + final SqlGenerator generator = getSqlGenerator(); DIFFER = new RecordDiffer( getRawMetadataColumnNames(), getFinalMetadataColumnNames(),