diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java b/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java index 28dd36cde362..1e193409e042 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java @@ -10,13 +10,14 @@ import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcSourceOperations; -import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.jdbc.StreamingJdbcDatabase; +import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig; import io.airbyte.db.mongodb.MongoDatabase; import java.io.IOException; import java.util.Map; import java.util.function.Function; +import java.util.function.Supplier; import lombok.val; import org.apache.commons.dbcp2.BasicDataSource; import org.jooq.SQLDialect; @@ -188,13 +189,13 @@ public static JdbcDatabase createStreamingJdbcDatabase(final String username, final String password, final String jdbcConnectionString, final String driverClassName, - final JdbcStreamingQueryConfiguration jdbcStreamingQuery, + final Supplier streamingQueryConfigProvider, final Map connectionProperties, final JdbcCompatibleSourceOperations sourceOperations) { final BasicDataSource connectionPool = createBasicDataSource(username, password, jdbcConnectionString, driverClassName, connectionProperties); - return new StreamingJdbcDatabase(connectionPool, sourceOperations, jdbcStreamingQuery); + return new StreamingJdbcDatabase(connectionPool, sourceOperations, streamingQueryConfigProvider); } private static BasicDataSource createBasicDataSource(final String username, diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/Db2JdbcStreamingQueryConfiguration.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/Db2JdbcStreamingQueryConfiguration.java deleted file mode 100644 index a3d5f7974f04..000000000000 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/Db2JdbcStreamingQueryConfiguration.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.db.jdbc; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class Db2JdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration { - - @Override - public void accept(final Connection connection, final PreparedStatement preparedStatement) throws SQLException { - connection.setAutoCommit(false); - preparedStatement.setFetchSize(1000); - } - -} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcStreamingQueryConfiguration.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcStreamingQueryConfiguration.java deleted file mode 100644 index d9486fe33f16..000000000000 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcStreamingQueryConfiguration.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.db.jdbc; - -import io.airbyte.commons.functional.CheckedBiConsumer; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -@FunctionalInterface -public interface JdbcStreamingQueryConfiguration extends CheckedBiConsumer { - -} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/NoOpJdbcStreamingQueryConfiguration.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/NoOpJdbcStreamingQueryConfiguration.java deleted file mode 100644 index 3330cdfd5b23..000000000000 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/NoOpJdbcStreamingQueryConfiguration.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.db.jdbc; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class NoOpJdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration { - - @Override - public void accept(final Connection connection, final PreparedStatement preparedStatement) - throws SQLException { - - } - -} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/OracleJdbcStreamingQueryConfiguration.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/OracleJdbcStreamingQueryConfiguration.java deleted file mode 100644 index c6ab49b81b12..000000000000 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/OracleJdbcStreamingQueryConfiguration.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.db.jdbc; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class OracleJdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration { - - @Override - public void accept(final Connection connection, final PreparedStatement preparedStatement) throws SQLException { - connection.setAutoCommit(false); - preparedStatement.setFetchSize(1000); - } - -} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/PostgresJdbcStreamingQueryConfiguration.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/PostgresJdbcStreamingQueryConfiguration.java deleted file mode 100644 index 08977016af16..000000000000 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/PostgresJdbcStreamingQueryConfiguration.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.db.jdbc; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class PostgresJdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration { - - @Override - public void accept(final Connection connection, final PreparedStatement preparedStatement) throws SQLException { - connection.setAutoCommit(false); - preparedStatement.setFetchSize(1000); - } - -} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java index 338fec362d83..40ec3e908b96 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java @@ -7,36 +7,42 @@ import com.google.errorprone.annotations.MustBeClosed; import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.db.JdbcCompatibleSourceOperations; +import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import javax.sql.DataSource; /** - * This database allows a developer to specify a {@link JdbcStreamingQueryConfiguration}. This - * allows the developer to specify the correct configuration in order for a - * {@link PreparedStatement} to execute as in a streaming / chunked manner. + * This database allows a developer to specify a {@link JdbcStreamingQueryConfig}. This allows the + * developer to specify the correct configuration in order for a {@link PreparedStatement} to + * execute as in a streaming / chunked manner. */ public class StreamingJdbcDatabase extends DefaultJdbcDatabase { - private final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration; + private final Supplier streamingQueryConfigProvider; public StreamingJdbcDatabase(final DataSource dataSource, final JdbcCompatibleSourceOperations sourceOperations, - final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration) { + final Supplier streamingQueryConfigProvider) { super(dataSource, sourceOperations); - this.jdbcStreamingQueryConfiguration = jdbcStreamingQueryConfiguration; + this.streamingQueryConfigProvider = streamingQueryConfigProvider; } /** - * Assuming that the {@link JdbcStreamingQueryConfiguration} is configured correctly for the JDBC - * driver being used, this method will return data in streaming / chunked fashion. Review the - * provided {@link JdbcStreamingQueryConfiguration} to understand the size of these chunks. If the - * entire stream is consumed the database connection will be closed automatically and the caller - * need not call close on the returned stream. This query (and the first chunk) are fetched - * immediately. Subsequent chunks will not be pulled until the first chunk is consumed. + * Assuming that the {@link JdbcStreamingQueryConfig} is configured correctly for the JDBC driver + * being used, this method will return data in streaming / chunked fashion. Review the provided + * {@link JdbcStreamingQueryConfig} to understand the size of these chunks. If the entire stream is + * consumed the database connection will be closed automatically and the caller need not call close + * on the returned stream. This query (and the first chunk) are fetched immediately. Subsequent + * chunks will not be pulled until the first chunk is consumed. * * @param statementCreator create a {@link PreparedStatement} from a {@link Connection}. * @param recordTransform transform each record of that result set into the desired type. do NOT @@ -53,10 +59,10 @@ public Stream unsafeQuery(final CheckedFunction { try { connection.setAutoCommit(true); @@ -70,4 +76,32 @@ public Stream unsafeQuery(final CheckedFunction Stream toUnsafeStream(final ResultSet resultSet, + final CheckedFunction mapper, + final JdbcStreamingQueryConfig streamingConfig) { + return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) { + + @Override + public boolean tryAdvance(final Consumer action) { + try { + if (!resultSet.next()) { + resultSet.close(); + return false; + } + final T dataRow = mapper.apply(resultSet); + streamingConfig.accept(resultSet, dataRow); + action.accept(dataRow); + return true; + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + }, false); + } + } diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/AdaptiveStreamingQueryConfig.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/AdaptiveStreamingQueryConfig.java new file mode 100644 index 000000000000..e2f09fa06c6a --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/AdaptiveStreamingQueryConfig.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AdaptiveStreamingQueryConfig implements JdbcStreamingQueryConfig { + + private static final Logger LOGGER = LoggerFactory.getLogger(AdaptiveStreamingQueryConfig.class); + + private final FetchSizeEstimator fetchSizeEstimator; + private int currentFetchSize; + + public AdaptiveStreamingQueryConfig() { + this.fetchSizeEstimator = TwoStageSizeEstimator.getInstance(); + this.currentFetchSize = FetchSizeConstants.INITIAL_SAMPLE_SIZE; + } + + @Override + public void initialize(final Connection connection, final Statement preparedStatement) throws SQLException { + connection.setAutoCommit(false); + preparedStatement.setFetchSize(FetchSizeConstants.INITIAL_SAMPLE_SIZE); + currentFetchSize = FetchSizeConstants.INITIAL_SAMPLE_SIZE; + LOGGER.info("Set initial fetch size: {} rows", preparedStatement.getFetchSize()); + } + + @Override + public void accept(final ResultSet resultSet, final Object rowData) throws SQLException { + fetchSizeEstimator.accept(rowData); + final Optional newFetchSize = fetchSizeEstimator.getFetchSize(); + if (newFetchSize.isPresent() && currentFetchSize != newFetchSize.get()) { + currentFetchSize = newFetchSize.get(); + resultSet.setFetchSize(currentFetchSize); + LOGGER.info("Updated fetch size: {} rows", currentFetchSize); + } + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/BaseSizeEstimator.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/BaseSizeEstimator.java new file mode 100644 index 000000000000..aa1a7dcb454b --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/BaseSizeEstimator.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.json.Jsons; + +/** + * Fetch size (number of rows) = target buffer byte size / mean row byte size + */ +public abstract class BaseSizeEstimator implements FetchSizeEstimator { + + // desired buffer size in memory + private final long targetBufferByteSize; + private final int minFetchSize; + private final int defaultFetchSize; + private final int maxFetchSize; + + // mean byte size per row + protected double meanByteSize = 0.0; + + protected BaseSizeEstimator(final long targetBufferByteSize, + final int minFetchSize, + final int defaultFetchSize, + final int maxFetchSize) { + this.targetBufferByteSize = targetBufferByteSize; + this.minFetchSize = minFetchSize; + this.defaultFetchSize = defaultFetchSize; + this.maxFetchSize = maxFetchSize; + } + + /** + * What we really want is to know how much memory each {@code rowData} takes. However, there is no + * easy way to measure that. So we use the byte size of the serialized row to approximate that. + */ + @VisibleForTesting + public static long getEstimatedByteSize(final Object rowData) { + if (rowData == null) { + return 0L; + } + // The string length is multiplied by 4 assuming each character is a + // full UTF-8 character. In reality, a UTF-8 character is encoded as + // 1 to 4 bytes. So this is an overestimation. This is alright, because + // the whole method only provides an estimation. Please never convert + // the string to byte[] to get the exact length. That conversion is known + // to introduce a lot of memory overhead. + return Jsons.serialize(rowData).length() * 4L; + } + + /** + * This method ensures that the fetch size is between {@code minFetchSize} and {@code maxFetchSize}, + * inclusively. + */ + protected int getBoundedFetchSize() { + if (meanByteSize <= 0.0) { + return defaultFetchSize; + } + final long rawFetchSize = Math.round(targetBufferByteSize / meanByteSize); + if (rawFetchSize > Integer.MAX_VALUE) { + return maxFetchSize; + } + return Math.max(minFetchSize, Math.min(maxFetchSize, (int) rawFetchSize)); + } + + double getMeanRowByteSize() { + return meanByteSize; + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/FetchSizeConstants.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/FetchSizeConstants.java new file mode 100644 index 000000000000..2e92ce66e476 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/FetchSizeConstants.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +public final class FetchSizeConstants { + + // The desired buffer size in memory to store the fetched rows. + // This size is not enforced. It is only used to calculate a proper + // fetch size. The max row size the connector can handle is actually + // limited by the heap size. + public static final long TARGET_BUFFER_BYTE_SIZE = 200L * 1024L * 1024L; // 200 MB + // sample size for making the first estimation of the row size + public static final int INITIAL_SAMPLE_SIZE = 10; + // sample size for making the post-initial estimation of the row size + public static final int POST_INITIAL_SAMPLE_SIZE = 10; + // sample every N rows during the post-initial stage + public static final int SAMPLE_FREQUENCY = 100; + + public static final int MIN_FETCH_SIZE = 1; + public static final int DEFAULT_FETCH_SIZE = 1000; + public static final int MAX_FETCH_SIZE = 100_000; + + private FetchSizeConstants() {} + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/FetchSizeEstimator.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/FetchSizeEstimator.java new file mode 100644 index 000000000000..12bff10708dc --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/FetchSizeEstimator.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import java.util.Optional; +import java.util.function.Consumer; + +public interface FetchSizeEstimator extends Consumer { + + /** + * @return the estimated fetch size when the estimation is ready + */ + Optional getFetchSize(); + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/InitialSizeEstimator.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/InitialSizeEstimator.java new file mode 100644 index 000000000000..af3b3ed31124 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/InitialSizeEstimator.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import java.util.Optional; + +/** + * This class estimates the mean row byte size by measuring the first consecutive + * {@code initialSampleSize} rows. + */ +public class InitialSizeEstimator extends BaseSizeEstimator implements FetchSizeEstimator { + + private final int sampleSize; + private int counter = 0; + + public InitialSizeEstimator(final long bufferByteSize, + final int initialSampleSize, + final int minFetchSize, + final int defaultFetchSize, + final int maxFetchSize) { + super(bufferByteSize, minFetchSize, defaultFetchSize, maxFetchSize); + this.sampleSize = initialSampleSize; + } + + @Override + public void accept(final Object row) { + final long byteSize = getEstimatedByteSize(row); + // divide each byteSize by sampleSize to prevent overflow + meanByteSize += 1.0 * byteSize / sampleSize; + counter++; + } + + @Override + public Optional getFetchSize() { + if (counter < sampleSize) { + return Optional.empty(); + } + return Optional.of(getBoundedFetchSize()); + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/JdbcStreamingQueryConfig.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/JdbcStreamingQueryConfig.java new file mode 100644 index 000000000000..3d92c0c92492 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/JdbcStreamingQueryConfig.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import io.airbyte.commons.functional.CheckedBiConsumer; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +public interface JdbcStreamingQueryConfig extends CheckedBiConsumer { + + void initialize(final Connection connection, final Statement statement) throws SQLException; + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/NoOpStreamingQueryConfig.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/NoOpStreamingQueryConfig.java new file mode 100644 index 000000000000..79fcf1db42c4 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/NoOpStreamingQueryConfig.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +public class NoOpStreamingQueryConfig implements JdbcStreamingQueryConfig { + + @Override + public void initialize(final Connection connection, final Statement preparedStatement) throws SQLException {} + + @Override + public void accept(final ResultSet resultSet, final Object o) throws SQLException {} + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/SamplingSizeEstimator.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/SamplingSizeEstimator.java new file mode 100644 index 000000000000..b065140fa4d1 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/SamplingSizeEstimator.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import java.util.Optional; + +/** + * This class adjusts the mean row byte size by estimating one row out of every + * {@code sampleFrequency} rows. + */ +public class SamplingSizeEstimator extends BaseSizeEstimator implements FetchSizeEstimator { + + private final int sampleSize; + private final int sampleFrequency; + + private int counter = 0; + private boolean hasNewEstimation = false; + + public SamplingSizeEstimator(final long bufferByteSize, + final int sampleSize, + final int sampleFrequency, + final double initialRowByteSize, + final int minFetchSize, + final int defaultFetchSize, + final int maxFetchSize) { + super(bufferByteSize, minFetchSize, defaultFetchSize, maxFetchSize); + this.sampleSize = sampleSize; + this.sampleFrequency = sampleFrequency; + this.meanByteSize = initialRowByteSize; + } + + @Override + public void accept(final Object row) { + counter++; + if (counter < sampleFrequency) { + return; + } + + counter = 0; + final long rowByteSize = getEstimatedByteSize(row); + if (rowByteSize != Math.round(meanByteSize)) { + // This is equivalent to calculating the mean size + // based on the last N rows. The division is performed + // first to prevent overflow. + meanByteSize = meanByteSize / sampleSize * (sampleSize - 1) + 1.0 * rowByteSize / sampleSize; + hasNewEstimation = true; + } + } + + @Override + public Optional getFetchSize() { + if (!hasNewEstimation) { + return Optional.empty(); + } + + hasNewEstimation = false; + return Optional.of(getBoundedFetchSize()); + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/TwoStageSizeEstimator.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/TwoStageSizeEstimator.java new file mode 100644 index 000000000000..9e819f1ab3a1 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/TwoStageSizeEstimator.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Optional; + +/** + * This estimator first uses the {@link InitialSizeEstimator} to calculate an initial fetch size by + * sampling the first N rows consecutively, and then switches to {@link SamplingSizeEstimator} to + * periodically adjust the fetch size by sampling every M rows. + */ +public class TwoStageSizeEstimator implements FetchSizeEstimator { + + private final int initialSampleSize; + private BaseSizeEstimator delegate; + private int counter = 0; + + public static TwoStageSizeEstimator getInstance() { + return new TwoStageSizeEstimator(); + } + + private TwoStageSizeEstimator() { + this.initialSampleSize = FetchSizeConstants.INITIAL_SAMPLE_SIZE; + this.delegate = new InitialSizeEstimator( + FetchSizeConstants.TARGET_BUFFER_BYTE_SIZE, + initialSampleSize, + FetchSizeConstants.MIN_FETCH_SIZE, + FetchSizeConstants.DEFAULT_FETCH_SIZE, + FetchSizeConstants.MAX_FETCH_SIZE); + } + + @Override + public Optional getFetchSize() { + return delegate.getFetchSize(); + } + + @Override + public void accept(final Object rowData) { + if (counter <= initialSampleSize + 1) { + counter++; + // switch to SamplingSizeEstimator after the initial N rows + if (delegate instanceof InitialSizeEstimator && counter > initialSampleSize) { + delegate = new SamplingSizeEstimator( + FetchSizeConstants.TARGET_BUFFER_BYTE_SIZE, + FetchSizeConstants.POST_INITIAL_SAMPLE_SIZE, + FetchSizeConstants.SAMPLE_FREQUENCY, + delegate.getMeanRowByteSize(), + FetchSizeConstants.MIN_FETCH_SIZE, + FetchSizeConstants.DEFAULT_FETCH_SIZE, + FetchSizeConstants.MAX_FETCH_SIZE); + } + } + + delegate.accept(rowData); + } + + @VisibleForTesting + BaseSizeEstimator getDelegate() { + return delegate; + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java index f1b60896081b..e5e8b3507551 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java @@ -5,9 +5,8 @@ package io.airbyte.db.jdbc; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; @@ -15,47 +14,50 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; +import io.airbyte.db.jdbc.streaming.FetchSizeConstants; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.dbcp2.BasicDataSource; +import org.elasticsearch.common.collect.Map; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class TestStreamingJdbcDatabase { - private static final List RECORDS_AS_JSON = Lists.newArrayList( - Jsons.jsonNode(ImmutableMap.of("id", 1, "name", "picard")), - Jsons.jsonNode(ImmutableMap.of("id", 2, "name", "crusher")), - Jsons.jsonNode(ImmutableMap.of("id", 3, "name", "vash"))); - private static PostgreSQLContainer PSQL_DB; - - private JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration; + private final JdbcSourceOperations sourceOperations = JdbcUtils.getDefaultSourceOperations(); private JdbcDatabase defaultJdbcDatabase; private JdbcDatabase streamingJdbcDatabase; - private final JdbcSourceOperations sourceOperations = JdbcUtils.getDefaultSourceOperations(); @BeforeAll static void init() { PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine"); PSQL_DB.start(); + } + @AfterAll + static void cleanUp() { + PSQL_DB.close(); } @BeforeEach - void setup() throws Exception { - jdbcStreamingQueryConfiguration = mock(JdbcStreamingQueryConfiguration.class); - + void setup() { final String dbName = Strings.addRandomSuffix("db", "_", 10); final JsonNode config = getConfig(PSQL_DB, dbName); @@ -74,37 +76,83 @@ void setup() throws Exception { config.get("database").asText())); defaultJdbcDatabase = spy(new DefaultJdbcDatabase(connectionPool)); - streamingJdbcDatabase = new StreamingJdbcDatabase(connectionPool, JdbcUtils.getDefaultSourceOperations(), jdbcStreamingQueryConfiguration); + streamingJdbcDatabase = new StreamingJdbcDatabase(connectionPool, JdbcUtils.getDefaultSourceOperations(), AdaptiveStreamingQueryConfig::new); + } + @Test + @Order(1) + void testQuery() throws SQLException { defaultJdbcDatabase.execute(connection -> { - connection.createStatement().execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); - connection.createStatement().execute("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + connection.createStatement().execute( + """ + DROP TABLE IF EXISTS id_and_name; + CREATE TABLE id_and_name (id INTEGER, name VARCHAR(200)); + INSERT INTO id_and_name (id, name) VALUES (1, 'picard'), (2, 'crusher'), (3, 'vash'); + """); }); - } - @AfterAll - static void cleanUp() { - PSQL_DB.close(); + // grab references to connection and prepared statement, so we can verify the streaming config is + // invoked. + final AtomicReference connection1 = new AtomicReference<>(); + final AtomicReference ps1 = new AtomicReference<>(); + try (final Stream actual = streamingJdbcDatabase.unsafeQuery( + connection -> { + connection1.set(connection); + final PreparedStatement ps = connection.prepareStatement("SELECT * FROM id_and_name;"); + ps1.set(ps); + return ps; + }, + sourceOperations::rowToJson)) { + final List expectedRecords = Lists.newArrayList( + Jsons.jsonNode(Map.of("id", 1, "name", "picard")), + Jsons.jsonNode(Map.of("id", 2, "name", "crusher")), + Jsons.jsonNode(Map.of("id", 3, "name", "vash"))); + assertEquals(expectedRecords, actual.toList()); + } } + /** + * Test stream querying a table with 20 rows. Each row is 10 MB large. The table in this test must + * contain more than {@code + * FetchSizeConstants.INITIAL_SAMPLE_SIZE} rows. Otherwise, all rows will be fetched in the first + * fetch, the fetch size won't be adjusted, and the test will fail. + */ + @Order(2) @Test - void testQuery() throws SQLException { - // grab references to connection and prepared statement so we can verify the streaming config is - // invoked. + void testLargeRow() throws SQLException { + defaultJdbcDatabase.execute(connection -> connection.createStatement() + .execute( + """ + DROP TABLE IF EXISTS id_and_name; + CREATE TABLE id_and_name (id INTEGER, name TEXT); + INSERT INTO id_and_name SELECT id, repeat('a', 10485760) as name from generate_series(1, 20) as id; + """)); + final AtomicReference connection1 = new AtomicReference<>(); final AtomicReference ps1 = new AtomicReference<>(); - final Stream actual = streamingJdbcDatabase.unsafeQuery( + final Set fetchSizes = new HashSet<>(); + try (final Stream actual = streamingJdbcDatabase.unsafeQuery( connection -> { connection1.set(connection); final PreparedStatement ps = connection.prepareStatement("SELECT * FROM id_and_name;"); ps1.set(ps); return ps; }, - sourceOperations::rowToJson); - - assertEquals(RECORDS_AS_JSON, actual.collect(Collectors.toList())); - // verify that the query configuration is invoked. - verify(jdbcStreamingQueryConfiguration).accept(connection1.get(), ps1.get()); + resultSet -> { + fetchSizes.add(resultSet.getFetchSize()); + return sourceOperations.rowToJson(resultSet); + })) { + assertEquals(20, actual.count()); + + // Two fetch sizes should be set on the result set, one is the initial sample size, + // and the other is smaller than the initial value because of the large row. + // This check assumes that FetchSizeConstants.TARGET_BUFFER_BYTE_SIZE = 200 MB. + // Update this check if the buffer size constant is changed. + assertEquals(2, fetchSizes.size()); + final List sortedSizes = fetchSizes.stream().sorted().toList(); + assertTrue(sortedSizes.get(0) < FetchSizeConstants.INITIAL_SAMPLE_SIZE); + assertEquals(FetchSizeConstants.INITIAL_SAMPLE_SIZE, sortedSizes.get(1)); + } } private JsonNode getConfig(final PostgreSQLContainer psqlDb, final String dbName) { diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/AdaptiveStreamingQueryConfigTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/AdaptiveStreamingQueryConfigTest.java new file mode 100644 index 000000000000..1a9fd943b504 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/AdaptiveStreamingQueryConfigTest.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.sql.ResultSet; +import java.sql.SQLException; +import joptsimple.internal.Strings; +import org.junit.jupiter.api.Test; + +class AdaptiveStreamingQueryConfigTest { + + @Test + public void testFetchSizeUpdate() throws SQLException { + final AdaptiveStreamingQueryConfig queryConfig = new AdaptiveStreamingQueryConfig(); + final ResultSet resultSet = mock(ResultSet.class); + for (int i = 0; i < FetchSizeConstants.INITIAL_SAMPLE_SIZE - 1; ++i) { + queryConfig.accept(resultSet, Strings.repeat(Character.forDigit(i, 10), i + 1)); + verify(resultSet, never()).setFetchSize(anyInt()); + } + queryConfig.accept(resultSet, "final sampling in the initial stage"); + verify(resultSet, times(1)).setFetchSize(anyInt()); + queryConfig.accept(resultSet, "abcd"); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/BaseSizeEstimatorTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/BaseSizeEstimatorTest.java new file mode 100644 index 000000000000..eccf1be3b120 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/BaseSizeEstimatorTest.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.commons.json.Jsons; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +class BaseSizeEstimatorTest { + + @Test + public void testGetEstimatedByteSize() { + assertEquals(0L, BaseSizeEstimator.getEstimatedByteSize(null)); + assertEquals(28L, BaseSizeEstimator.getEstimatedByteSize("12345")); + assertEquals(60L, BaseSizeEstimator.getEstimatedByteSize(Jsons.jsonNode(Map.of("key", "value")))); + } + + public static class TestSizeEstimator extends BaseSizeEstimator { + + protected TestSizeEstimator(final long bufferByteSize, final int minFetchSize, final int defaultFetchSize, final int maxFetchSize) { + super(bufferByteSize, minFetchSize, defaultFetchSize, maxFetchSize); + } + + @Override + public Optional getFetchSize() { + return Optional.empty(); + } + + @Override + public void accept(final Object o) {} + + public void setMeanByteSize(final double meanByteSize) { + this.meanByteSize = meanByteSize; + } + + } + + @Test + public void testGetBoundedFetchSize() { + final long bufferByteSize = 120; + final int minFetchSize = 10; + final int defaultFetchSize = 20; + final int maxFetchSize = 40; + final TestSizeEstimator sizeEstimator = new TestSizeEstimator(bufferByteSize, minFetchSize, defaultFetchSize, maxFetchSize); + + sizeEstimator.setMeanByteSize(-1.0); + assertEquals(defaultFetchSize, sizeEstimator.getBoundedFetchSize()); + + sizeEstimator.setMeanByteSize(0.0); + assertEquals(defaultFetchSize, sizeEstimator.getBoundedFetchSize()); + + // fetch size = 5 < min fetch size + sizeEstimator.setMeanByteSize(bufferByteSize / 5.0); + assertEquals(minFetchSize, sizeEstimator.getBoundedFetchSize()); + + // fetch size = 10 within [min fetch size, max fetch size] + sizeEstimator.setMeanByteSize(bufferByteSize / 10.0); + assertEquals(10, sizeEstimator.getBoundedFetchSize()); + + // fetch size = 30 within [min fetch size, max fetch size] + sizeEstimator.setMeanByteSize(bufferByteSize / 30.0); + assertEquals(30, sizeEstimator.getBoundedFetchSize()); + + // fetch size = 40 within [min fetch size, max fetch size] + sizeEstimator.setMeanByteSize(bufferByteSize / 40.0); + assertEquals(40, sizeEstimator.getBoundedFetchSize()); + + // fetch size = 60 > max fetch size + sizeEstimator.setMeanByteSize(bufferByteSize / 60.0); + assertEquals(maxFetchSize, sizeEstimator.getBoundedFetchSize()); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/InitialSizeEstimatorTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/InitialSizeEstimatorTest.java new file mode 100644 index 000000000000..219d0e696273 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/InitialSizeEstimatorTest.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Optional; +import org.junit.jupiter.api.Test; + +class InitialSizeEstimatorTest { + + @Test + public void testIt() { + final long bufferByteSize = 120; + final int initialSampleSize = 5; + final int minFetchSize = 1; + final int defaultFetchSize = 20; + final int maxFetchSize = 120; + final InitialSizeEstimator sizeEstimator = new InitialSizeEstimator( + bufferByteSize, + initialSampleSize, + minFetchSize, + defaultFetchSize, + maxFetchSize); + + // size: 3 * 4 = 12 + sizeEstimator.accept("1"); + assertFalse(sizeEstimator.getFetchSize().isPresent()); + // size: 4 * 4 = 16 + sizeEstimator.accept("11"); + assertFalse(sizeEstimator.getFetchSize().isPresent()); + // size: 5 * 4 = 20 + sizeEstimator.accept("111"); + assertFalse(sizeEstimator.getFetchSize().isPresent()); + // size: 6 * 4 = 24 + sizeEstimator.accept("1111"); + assertFalse(sizeEstimator.getFetchSize().isPresent()); + // size: 7 * 4 = 28, fetch size is available + sizeEstimator.accept("11111"); + final Optional fetchSize = sizeEstimator.getFetchSize(); + assertTrue(fetchSize.isPresent()); + final long expectedMeanByteSize = 20L; + assertEquals(expectedMeanByteSize, Math.round(sizeEstimator.getMeanRowByteSize())); + assertEquals(bufferByteSize / expectedMeanByteSize, fetchSize.get().longValue()); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/SamplingSizeEstimatorTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/SamplingSizeEstimatorTest.java new file mode 100644 index 000000000000..c5a470891a70 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/SamplingSizeEstimatorTest.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Optional; +import org.junit.jupiter.api.Test; + +class SamplingSizeEstimatorTest { + + @Test + public void testIt() { + final long bufferByteSize = 120; + final int sampleSize = 2; + final int sampleFrequency = 3; + final long initialByteSize = 30; + final int minFetchSize = 1; + final int defaultFetchSize = 20; + final int maxFetchSize = 120; + final SamplingSizeEstimator sizeEstimator = new SamplingSizeEstimator( + bufferByteSize, + sampleSize, + sampleFrequency, + initialByteSize, + minFetchSize, + defaultFetchSize, + maxFetchSize); + + double meanByteSize = initialByteSize; + + // size: 3 * 4 = 12, not sampled + sizeEstimator.accept("1"); + assertFalse(sizeEstimator.getFetchSize().isPresent()); + assertEquals(meanByteSize, sizeEstimator.getMeanRowByteSize()); + + // size: 4 * 4 = 16, not sampled + sizeEstimator.accept("11"); + assertFalse(sizeEstimator.getFetchSize().isPresent()); + assertEquals(meanByteSize, sizeEstimator.getMeanRowByteSize()); + + // size: 5 * 4 = 20, sampled, fetch size is ready + sizeEstimator.accept("111"); + final Optional fetchSize1 = sizeEstimator.getFetchSize(); + assertTrue(fetchSize1.isPresent()); + meanByteSize = (meanByteSize + 20) / 2.0; + assertDoubleEquals(meanByteSize, sizeEstimator.getMeanRowByteSize()); + assertDoubleEquals(bufferByteSize / meanByteSize, fetchSize1.get().doubleValue()); + + // size: 6 * 4 = 24, not sampled + sizeEstimator.accept("1111"); + assertFalse(sizeEstimator.getFetchSize().isPresent()); + assertDoubleEquals(meanByteSize, sizeEstimator.getMeanRowByteSize()); + + // size: 7 * 4 = 28, not sampled + sizeEstimator.accept("11111"); + assertFalse(sizeEstimator.getFetchSize().isPresent()); + assertDoubleEquals(meanByteSize, sizeEstimator.getMeanRowByteSize()); + + // size: 8 * 4 = 32, sampled, fetch size is ready + sizeEstimator.accept("111111"); + final Optional fetchSize2 = sizeEstimator.getFetchSize(); + assertTrue(fetchSize2.isPresent()); + meanByteSize = (meanByteSize + 32) / 2.0; + assertDoubleEquals(meanByteSize, sizeEstimator.getMeanRowByteSize()); + assertDoubleEquals(bufferByteSize / meanByteSize, fetchSize2.get().doubleValue()); + } + + private static void assertDoubleEquals(final double expected, final double actual) { + assertEquals(Math.round(expected), Math.round(actual)); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/TwoStageSizeEstimatorTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/TwoStageSizeEstimatorTest.java new file mode 100644 index 000000000000..a49bb8bd85dd --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/streaming/TwoStageSizeEstimatorTest.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc.streaming; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +class TwoStageSizeEstimatorTest { + + @Test + public void testDelegationSwitch() { + final TwoStageSizeEstimator sizeEstimator = TwoStageSizeEstimator.getInstance(); + for (int i = 0; i < FetchSizeConstants.INITIAL_SAMPLE_SIZE; ++i) { + sizeEstimator.accept("1"); + assertTrue(sizeEstimator.getDelegate() instanceof InitialSizeEstimator); + } + // delegation is changed after initial sampling + for (int i = 0; i < 3; ++i) { + sizeEstimator.accept("1"); + assertTrue(sizeEstimator.getDelegate() instanceof SamplingSizeEstimator); + } + } + +} diff --git a/airbyte-integrations/connector-templates/source-java-jdbc/src/main/java/io/airbyte/integrations/source/{{snakeCase name}}/{{pascalCase name}}Source.java.hbs b/airbyte-integrations/connector-templates/source-java-jdbc/src/main/java/io/airbyte/integrations/source/{{snakeCase name}}/{{pascalCase name}}Source.java.hbs index 7cb625a29983..d7babcc57203 100644 --- a/airbyte-integrations/connector-templates/source-java-jdbc/src/main/java/io/airbyte/integrations/source/{{snakeCase name}}/{{pascalCase name}}Source.java.hbs +++ b/airbyte-integrations/connector-templates/source-java-jdbc/src/main/java/io/airbyte/integrations/source/{{snakeCase name}}/{{pascalCase name}}Source.java.hbs @@ -6,7 +6,7 @@ package io.airbyte.integrations.source.{{snakeCase name}}; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.NoOpJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.NoOpStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; @@ -23,9 +23,9 @@ public class {{pascalCase name}}Source extends AbstractJdbcSource impl static final String DRIVER_CLASS = "driver_name_here"; public {{pascalCase name}}Source() { - // By default, NoOpJdbcStreamingQueryConfiguration class is used, but may be updated. See example - // MssqlJdbcStreamingQueryConfiguration - super(DRIVER_CLASS, new NoOpJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + // By default, NoOpStreamingQueryConfig class is used. If the JDBC supports custom + // fetch size, change it to AdaptiveStreamingQueryConfig for better performance. + super(DRIVER_CLASS, NoOpStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } // TODO The config is based on spec.json, update according to your DB diff --git a/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java b/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java index aa255ccace20..3fd083e43879 100644 --- a/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java +++ b/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java @@ -9,7 +9,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.NoOpJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.NoOpStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -72,13 +72,13 @@ public static Source getWrappedSource() { } /** - * The reason we use NoOpJdbcStreamingQueryConfiguration(not setting auto commit to false and not - * setting fetch size to 1000) for ClickHouse is cause method + * The reason we use NoOpStreamingQueryConfig(not setting auto commit to false and not setting fetch + * size to 1000) for ClickHouse is cause method * {@link ru.yandex.clickhouse.ClickHouseConnectionImpl#setAutoCommit} is empty and method * {@link ru.yandex.clickhouse.ClickHouseStatementImpl#setFetchSize} is empty */ public ClickHouseSource() { - super(DRIVER_CLASS, new NoOpJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + super(DRIVER_CLASS, NoOpStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } @Override @@ -93,7 +93,7 @@ public JsonNode toDatabaseConfig(final JsonNode config) { jdbcUrl.append("?").append(String.join("&", SSL_PARAMETERS)); } - ImmutableMap.Builder configBuilder = ImmutableMap.builder() + final ImmutableMap.Builder configBuilder = ImmutableMap.builder() .put("username", config.get("username").asText()) .put("jdbc_url", jdbcUrl.toString()); diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java index 7e6d63de5878..bd2ebe01a3b3 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java @@ -12,6 +12,7 @@ import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.jdbc.streaming.NoOpStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -40,7 +41,7 @@ public class CockroachDbSource extends AbstractJdbcSource { public static final List PORT_KEY = List.of("port"); public CockroachDbSource() { - super(DRIVER_CLASS, null, new CockroachJdbcSourceOperations()); + super(DRIVER_CLASS, NoOpStreamingQueryConfig::new, new CockroachJdbcSourceOperations()); } public static Source sshWrappedSource() { diff --git a/airbyte-integrations/connectors/source-db2-strict-encrypt/src/main/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcStreamingQueryConfiguration.java b/airbyte-integrations/connectors/source-db2-strict-encrypt/src/main/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcStreamingQueryConfiguration.java deleted file mode 100644 index 71c7c3d00ca0..000000000000 --- a/airbyte-integrations/connectors/source-db2-strict-encrypt/src/main/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcStreamingQueryConfiguration.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.db2_strict_encrypt; - -import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class Db2JdbcStreamingQueryConfiguration implements - JdbcStreamingQueryConfiguration { - - @Override - public void accept(final Connection connection, final PreparedStatement preparedStatement) - throws SQLException { - connection.setAutoCommit(false); - preparedStatement.setFetchSize(1000); - } - -} diff --git a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java index 8ec8eab83d0f..74b897c7d93b 100644 --- a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java +++ b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java @@ -8,8 +8,8 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; -import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration; import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; @@ -35,13 +35,12 @@ public class Db2Source extends AbstractJdbcSource implements Source { public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver"; public static final String USERNAME = "username"; public static final String PASSWORD = "password"; - private static Db2SourceOperations operations; private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8); private static final String KEY_STORE_FILE_PATH = "clientkeystore.jks"; public Db2Source() { - super(DRIVER_CLASS, new Db2JdbcStreamingQueryConfiguration(), new Db2SourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new Db2SourceOperations()); } public static void main(final String[] args) throws Exception { @@ -96,7 +95,7 @@ public Set getPrivilegesTableForCurrentUser(final JdbcDatabase } @Override - protected boolean isNotInternalSchema(JsonNode jsonNode, Set internalSchemas) { + protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set internalSchemas) { return false; } @@ -105,7 +104,7 @@ private CheckedFunction getPrivileg "SELECT DISTINCT OBJECTNAME, OBJECTSCHEMA FROM SYSIBMADM.PRIVILEGES WHERE OBJECTTYPE = 'TABLE' AND PRIVILEGE = 'SELECT' AND AUTHID = SESSION_USER"); } - private JdbcPrivilegeDto getPrivilegeDto(JsonNode jsonNode) { + private JdbcPrivilegeDto getPrivilegeDto(final JsonNode jsonNode) { return JdbcPrivilegeDto.builder() .schemaName(jsonNode.get("OBJECTSCHEMA").asText().trim()) .tableName(jsonNode.get("OBJECTNAME").asText()) diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 1d6c237a3659..3ca4791c2981 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -29,8 +29,8 @@ import io.airbyte.db.JdbcCompatibleSourceOperations; import io.airbyte.db.SqlDatabase; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto; import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource; @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -65,16 +66,16 @@ public abstract class AbstractJdbcSource extends AbstractRelationalDbS private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcSource.class); protected final String driverClass; - protected final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration; + protected final Supplier streamingQueryConfigProvider; protected final JdbcCompatibleSourceOperations sourceOperations; protected String quoteString; public AbstractJdbcSource(final String driverClass, - final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration, + final Supplier streamingQueryConfigProvider, final JdbcCompatibleSourceOperations sourceOperations) { this.driverClass = driverClass; - this.jdbcStreamingQueryConfiguration = jdbcStreamingQueryConfiguration; + this.streamingQueryConfigProvider = streamingQueryConfigProvider; this.sourceOperations = sourceOperations; } @@ -293,7 +294,7 @@ public JdbcDatabase createDatabase(final JsonNode config) throws SQLException { jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, jdbcConfig.get("jdbc_url").asText(), driverClass, - jdbcStreamingQueryConfiguration, + streamingQueryConfigProvider, JdbcUtils.parseJdbcParameters(jdbcConfig, "connection_properties"), sourceOperations); diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/JdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/JdbcSource.java index 66cb15d0c971..5d6443b1c352 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/JdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/JdbcSource.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import java.sql.JDBCType; @@ -19,7 +19,7 @@ public class JdbcSource extends AbstractJdbcSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSource.class); public JdbcSource() { - super("org.postgresql.Driver", new PostgresJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + super("org.postgresql.Driver", AdaptiveStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } // no-op for JdbcSource since the config it receives is designed to be use for JDBC. diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java index ba4e8a5e3304..d81b0a3d466e 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java @@ -10,7 +10,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; @@ -93,7 +93,7 @@ private static class PostgresTestSource extends AbstractJdbcSource imp static final String DRIVER_CLASS = "org.postgresql.Driver"; public PostgresTestSource() { - super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } @Override diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcStressTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcStressTest.java index 4d2ba5ad4274..06ca30f2b3ba 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcStressTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcStressTest.java @@ -10,7 +10,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.test.JdbcStressTest; @@ -98,7 +98,7 @@ private static class PostgresTestSource extends AbstractJdbcSource imp static final String DRIVER_CLASS = "org.postgresql.Driver"; public PostgresTestSource() { - super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } @Override diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceStressTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceStressTest.java index bb542c0401df..192fdb38a1e4 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceStressTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceStressTest.java @@ -10,7 +10,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.test.JdbcStressTest; @@ -96,7 +96,7 @@ private static class PostgresTestSource extends AbstractJdbcSource imp static final String DRIVER_CLASS = "org.postgresql.Driver"; public PostgresTestSource() { - super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } @Override diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlJdbcStreamingQueryConfiguration.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlJdbcStreamingQueryConfiguration.java deleted file mode 100644 index 71c1001b542f..000000000000 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlJdbcStreamingQueryConfiguration.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mssql; - -import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class MssqlJdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration { - - @Override - public void accept(final Connection connection, final PreparedStatement preparedStatement) throws SQLException { - connection.setAutoCommit(false); - preparedStatement.setFetchSize(1000); - } - -} diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 94a3f4bcf771..3af72f59dea6 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -17,6 +17,7 @@ import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -63,20 +64,20 @@ public static Source sshWrappedSource() { } MssqlSource() { - super(DRIVER_CLASS, new MssqlJdbcStreamingQueryConfiguration(), new MssqlSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new MssqlSourceOperations()); } @Override - public AutoCloseableIterator queryTableFullRefresh(JdbcDatabase database, - List columnNames, - String schemaName, - String tableName) { + public AutoCloseableIterator queryTableFullRefresh(final JdbcDatabase database, + final List columnNames, + final String schemaName, + final String tableName) { LOGGER.info("Queueing query for table: {}", tableName); - List newIdentifiersList = getWrappedColumn(database, + final List newIdentifiersList = getWrappedColumn(database, columnNames, schemaName, tableName, "\""); - String preparedSqlQuery = String + final String preparedSqlQuery = String .format("SELECT %s FROM %s", String.join(",", newIdentifiersList), getFullTableName(schemaName, tableName)); @@ -85,13 +86,13 @@ public AutoCloseableIterator queryTableFullRefresh(JdbcDatabase databa } @Override - public AutoCloseableIterator queryTableIncremental(JdbcDatabase database, - List columnNames, - String schemaName, - String tableName, - String cursorField, - JDBCType cursorFieldType, - String cursor) { + public AutoCloseableIterator queryTableIncremental(final JdbcDatabase database, + final List columnNames, + final String schemaName, + final String tableName, + final String cursorField, + final JDBCType cursorFieldType, + final String cursor) { LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { @@ -101,7 +102,7 @@ public AutoCloseableIterator queryTableIncremental(JdbcDatabase databa final String identifierQuoteString = connection.getMetaData() .getIdentifierQuoteString(); - List newColumnNames = getWrappedColumn(database, + final List newColumnNames = getWrappedColumn(database, columnNames, schemaName, tableName, identifierQuoteString); final String sql = String.format("SELECT %s FROM %s WHERE %s > ?", @@ -133,14 +134,14 @@ public AutoCloseableIterator queryTableIncremental(JdbcDatabase databa * * @return the list with Column names updated to handle functions (if nay) properly */ - private List getWrappedColumn(JdbcDatabase database, - List columnNames, - String schemaName, - String tableName, - String enquoteSymbol) { - List hierarchyIdColumns = new ArrayList<>(); + private List getWrappedColumn(final JdbcDatabase database, + final List columnNames, + final String schemaName, + final String tableName, + final String enquoteSymbol) { + final List hierarchyIdColumns = new ArrayList<>(); try { - SQLServerResultSetMetaData sqlServerResultSetMetaData = (SQLServerResultSetMetaData) database + final SQLServerResultSetMetaData sqlServerResultSetMetaData = (SQLServerResultSetMetaData) database .queryMetadata(String .format("SELECT TOP 1 %s FROM %s", // only first row is enough to get field's type enquoteIdentifierList(columnNames), @@ -155,7 +156,7 @@ private List getWrappedColumn(JdbcDatabase database, } } - } catch (SQLException e) { + } catch (final SQLException e) { LOGGER.error("Failed to fetch metadata to prepare a proper request.", e); } diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java index 6438e29d9666..b2b063aa69f1 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java @@ -27,6 +27,7 @@ import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.debezium.CdcSourceTest; import io.airbyte.integrations.debezium.CdcTargetPosition; @@ -316,7 +317,7 @@ protected CdcTargetPosition cdcLatestTargetPosition() { config.get("port").asInt(), dbName), DRIVER_CLASS, - new MssqlJdbcStreamingQueryConfiguration(), + AdaptiveStreamingQueryConfig::new, Maps.newHashMap(), new MssqlSourceOperations()); return MssqlCdcTargetPosition.getTargetPosition(jdbcDatabase, dbName); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlJdbcStreamingQueryConfiguration.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlJdbcStreamingQueryConfiguration.java deleted file mode 100644 index b8b46c1505e1..000000000000 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlJdbcStreamingQueryConfiguration.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql; - -import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class MySqlJdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration { - - @Override - public void accept(final Connection connection, final PreparedStatement preparedStatement) throws SQLException { - // This is only respected if "useCursorFetch=true" is set in the connection. See the "resultset" - // section the MySql docs for more details. - // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-implementation-notes.html. - // When using this approach MySql creates a temporary table which may have some effect on db - // performance. - // e.g. conn = DriverManager.getConnection("jdbc:mysql://localhost/?useCursorFetch=true", "user", - // "s3cr3t"); - // We set userCursorFetch in MySqlSource. - connection.setAutoCommit(false); - preparedStatement.setFetchSize(1000); - // If for some reason, you cannot set useCursorFetch in the connection, fall back on this - // implementation below. It fetches records one at a time, which while inefficient, at least does - // not risk OOM. - // connection.setAutoCommit(false); - // preparedStatement.setFetchSize(Integer.MIN_VALUE); - } - -} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index aef618a469f1..9afbfc539269 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -17,6 +17,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -61,7 +62,7 @@ public static Source sshWrappedSource() { } public MySqlSource() { - super(DRIVER_CLASS, new MySqlJdbcStreamingQueryConfiguration(), new MySqlSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new MySqlSourceOperations()); } private static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stream) { @@ -129,7 +130,10 @@ public JsonNode toDatabaseConfig(final JsonNode config) { config.get("port").asText(), config.get("database").asText())); - // see MySqlJdbcStreamingQueryConfiguration for more context on why useCursorFetch=true is needed. + // To fetch the result in batches, the "useCursorFetch=true" must be set. + // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-implementation-notes.html. + // When using this approach MySql creates a temporary table which may have some effect on db + // performance. jdbcUrl.append("?useCursorFetch=true"); jdbcUrl.append("&zeroDateTimeBehavior=convertToNull"); // ensure the return tinyint(1) is boolean @@ -181,8 +185,8 @@ public List> getIncrementalIterators(final new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), MySqlCdcProperties.getDebeziumProperties(), catalog, true); - Optional cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState()); - MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)); + final Optional cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState()); + final MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)); cdcState.ifPresent(cdc -> checkBinlog(cdc.getState(), database)); return handler.getIncrementalIterators(fetcher, new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt); } else { diff --git a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java index 764d85d690c5..94813f54fff3 100644 --- a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java +++ b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java @@ -9,7 +9,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.OracleJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -45,7 +45,7 @@ enum Protocol { } public OracleSource() { - super(DRIVER_CLASS, new OracleJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } public static Source sshWrappedSource() { diff --git a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleStressTest.java b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleStressTest.java index f1ab08a51157..cdca797efd00 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleStressTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleStressTest.java @@ -8,7 +8,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.OracleJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; @@ -93,7 +93,7 @@ private static class OracleTestSource extends AbstractJdbcSource imple static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver"; public OracleTestSource() { - super(DRIVER_CLASS, new OracleJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } @Override diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 88c30e5c5d7d..bbc397b0cd2b 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -17,7 +17,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -57,7 +57,7 @@ public static Source sshWrappedSource() { } PostgresSource() { - super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration(), new PostgresSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new PostgresSourceOperations()); } @Override @@ -129,15 +129,15 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception { } @Override - public List>> discoverInternal(JdbcDatabase database) throws Exception { + public List>> discoverInternal(final JdbcDatabase database) throws Exception { if (schemas != null && !schemas.isEmpty()) { // process explicitly selected (from UI) schemas final List>> internals = new ArrayList<>(); - for (String schema : schemas) { + for (final String schema : schemas) { LOGGER.debug("Discovering schema: {}", schema); internals.addAll(super.discoverInternal(database, schema)); } - for (TableInfo> info : internals) { + for (final TableInfo> info : internals) { LOGGER.debug("Found table (schema: {}): {}", info.getNameSpace(), info.getName()); } return internals; @@ -327,7 +327,7 @@ public Set getPrivilegesTableForCurrentUser(final JdbcDatabase } @Override - protected boolean isNotInternalSchema(JsonNode jsonNode, Set internalSchemas) { + protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set internalSchemas) { return false; } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresStressTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresStressTest.java index 0f30e4fa07f9..75773468b577 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresStressTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresStressTest.java @@ -10,7 +10,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; @@ -98,7 +98,7 @@ private static class PostgresTestSource extends AbstractJdbcSource imp static final String DRIVER_CLASS = "org.postgresql.Driver"; public PostgresTestSource() { - super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } @Override diff --git a/airbyte-integrations/connectors/source-redshift/src/main/java/io/airbyte/integrations/source/redshift/RedshiftJdbcStreamingQueryConfiguration.java b/airbyte-integrations/connectors/source-redshift/src/main/java/io/airbyte/integrations/source/redshift/RedshiftJdbcStreamingQueryConfiguration.java deleted file mode 100644 index b7a2ee716207..000000000000 --- a/airbyte-integrations/connectors/source-redshift/src/main/java/io/airbyte/integrations/source/redshift/RedshiftJdbcStreamingQueryConfiguration.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.redshift; - -import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class RedshiftJdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration { - - // aws docs on how setting up batching: - // https://docs.aws.amazon.com/redshift/latest/dg/queries-troubleshooting.html - @Override - public void accept(final Connection connection, final PreparedStatement preparedStatement) throws SQLException { - connection.setAutoCommit(false); - preparedStatement.setFetchSize(1000); - } - -} diff --git a/airbyte-integrations/connectors/source-redshift/src/main/java/io/airbyte/integrations/source/redshift/RedshiftSource.java b/airbyte-integrations/connectors/source-redshift/src/main/java/io/airbyte/integrations/source/redshift/RedshiftSource.java index df025ced67ed..b81e77dd71a1 100644 --- a/airbyte-integrations/connectors/source-redshift/src/main/java/io/airbyte/integrations/source/redshift/RedshiftSource.java +++ b/airbyte-integrations/connectors/source-redshift/src/main/java/io/airbyte/integrations/source/redshift/RedshiftSource.java @@ -9,6 +9,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; @@ -35,7 +36,7 @@ public class RedshiftSource extends AbstractJdbcSource implements Sour // todo (cgardens) - clean up passing the dialect as null versus explicitly adding the case to the // constructor. public RedshiftSource() { - super(DRIVER_CLASS, new RedshiftJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } @Override diff --git a/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/main/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcSource.java b/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/main/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcSource.java index 1ff87d55ef86..1f8e82d00496 100644 --- a/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/main/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcSource.java +++ b/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/main/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcSource.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.NoOpJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.NoOpStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; @@ -23,9 +23,9 @@ public class ScaffoldJavaJdbcSource extends AbstractJdbcSource impleme static final String DRIVER_CLASS = "driver_name_here"; public ScaffoldJavaJdbcSource() { - // By default, NoOpJdbcStreamingQueryConfiguration class is used, but may be updated. See example - // MssqlJdbcStreamingQueryConfiguration - super(DRIVER_CLASS, new NoOpJdbcStreamingQueryConfiguration(), JdbcUtils.getDefaultSourceOperations()); + // By default, NoOpStreamingQueryConfig class is used. If the JDBC supports custom + // fetch size, change it to AdaptiveStreamingQueryConfig for better performance. + super(DRIVER_CLASS, NoOpStreamingQueryConfig::new, JdbcUtils.getDefaultSourceOperations()); } // TODO The config is based on spec.json, update according to your DB diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeJdbcStreamingQueryConfiguration.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeJdbcStreamingQueryConfiguration.java deleted file mode 100644 index 6c7750979d75..000000000000 --- a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeJdbcStreamingQueryConfiguration.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.snowflake; - -import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class SnowflakeJdbcStreamingQueryConfiguration implements - JdbcStreamingQueryConfiguration { - - @Override - public void accept(final Connection connection, final PreparedStatement preparedStatement) - throws SQLException { - connection.setAutoCommit(false); - preparedStatement.setFetchSize(1000); - } - -} diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java index 33fe4f434671..a22abe03a10b 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java +++ b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java @@ -13,6 +13,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.StreamingJdbcDatabase; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; @@ -33,8 +34,7 @@ public class SnowflakeSource extends AbstractJdbcSource implements Sou public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1); public SnowflakeSource() { - super(DRIVER_CLASS, new SnowflakeJdbcStreamingQueryConfiguration(), - new SnowflakeSourceOperations()); + super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new SnowflakeSourceOperations()); } public static void main(final String[] args) throws Exception { @@ -46,10 +46,9 @@ public static void main(final String[] args) throws Exception { } @Override - public JdbcDatabase createDatabase(JsonNode config) throws SQLException { + public JdbcDatabase createDatabase(final JsonNode config) throws SQLException { final DataSource dataSource = SnowflakeDataSourceUtils.createDataSource(config); - var database = new StreamingJdbcDatabase(dataSource, new SnowflakeSourceOperations(), - new SnowflakeJdbcStreamingQueryConfiguration()); + final var database = new StreamingJdbcDatabase(dataSource, new SnowflakeSourceOperations(), AdaptiveStreamingQueryConfig::new); quoteString = database.getMetaData().getIdentifierQuoteString(); return database; } @@ -59,7 +58,7 @@ public JsonNode toDatabaseConfig(final JsonNode config) { final String jdbcUrl = SnowflakeDataSourceUtils.buildJDBCUrl(config); if (config.has("credentials")) { - JsonNode credentials = config.get("credentials"); + final JsonNode credentials = config.get("credentials"); final String authType = credentials.has("auth_type") ? credentials.get("auth_type").asText() : UNRECOGNIZED; return switch (authType) { @@ -79,14 +78,14 @@ public Set getExcludedInternalNameSpaces() { "INFORMATION_SCHEMA"); } - private JsonNode buildOAuthConfig(JsonNode config, String jdbcUrl) { + private JsonNode buildOAuthConfig(final JsonNode config, final String jdbcUrl) { final String accessToken; - var credentials = config.get("credentials"); + final var credentials = config.get("credentials"); try { accessToken = SnowflakeDataSourceUtils.getAccessTokenUsingRefreshToken( config.get("host").asText(), credentials.get("client_id").asText(), credentials.get("client_secret").asText(), credentials.get("refresh_token").asText()); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException(e); } final ImmutableMap.Builder configBuilder = ImmutableMap.builder() @@ -96,7 +95,7 @@ private JsonNode buildOAuthConfig(JsonNode config, String jdbcUrl) { return Jsons.jsonNode(configBuilder.build()); } - private JsonNode buildUsernamePasswordConfig(JsonNode config, String jdbcUrl) { + private JsonNode buildUsernamePasswordConfig(final JsonNode config, final String jdbcUrl) { final ImmutableMap.Builder configBuilder = ImmutableMap.builder() .put("username", config.get("username").asText()) .put("password", config.get("password").asText()) diff --git a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAuthAcceptanceTest.java b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAuthAcceptanceTest.java index bdcc57e9e08c..19265cf3529f 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAuthAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAuthAcceptanceTest.java @@ -11,8 +11,8 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.jdbc.StreamingJdbcDatabase; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils; -import io.airbyte.integrations.source.snowflake.SnowflakeJdbcStreamingQueryConfiguration; import java.io.IOException; import java.nio.file.Path; import java.util.Properties; @@ -26,12 +26,12 @@ protected JdbcDatabase setupDataBase() { final DataSource dataSource = createDataSource(getStaticConfig()); return new StreamingJdbcDatabase(dataSource, JdbcUtils.getDefaultSourceOperations(), - new SnowflakeJdbcStreamingQueryConfiguration()); + AdaptiveStreamingQueryConfig::new); } private HikariDataSource createDataSource(final JsonNode config) { - HikariDataSource dataSource = new HikariDataSource(); - Properties properties = new Properties(); + final HikariDataSource dataSource = new HikariDataSource(); + final Properties properties = new Properties(); final StringBuilder jdbcUrl = new StringBuilder( String.format("jdbc:snowflake://%s/?", config.get("host").asText())); @@ -49,18 +49,18 @@ private HikariDataSource createDataSource(final JsonNode config) { jdbcUrl.append(config.get("jdbc_url_params").asText()); } - var credentials = config.get("credentials"); + final var credentials = config.get("credentials"); try { properties.setProperty("client_id", credentials.get("client_id").asText()); properties.setProperty("client_secret", credentials.get("client_secret").asText()); properties.setProperty("refresh_token", credentials.get("refresh_token").asText()); properties.setProperty("host", config.get("host").asText()); - var accessToken = SnowflakeDataSourceUtils.getAccessTokenUsingRefreshToken( + final var accessToken = SnowflakeDataSourceUtils.getAccessTokenUsingRefreshToken( config.get("host").asText(), credentials.get("client_id").asText(), credentials.get("client_secret").asText(), credentials.get("refresh_token").asText()); properties.put("authenticator", "oauth"); properties.put("token", accessToken); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException(e); } @@ -91,4 +91,5 @@ JsonNode getStaticConfig() { public void testBackwardCompatibilityAfterAddingOAuth() throws Exception { // this test case is not valid for OAuth method } + } diff --git a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSource.java b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSource.java index fea9e0c41bf1..370236b8d691 100644 --- a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSource.java +++ b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSource.java @@ -8,7 +8,7 @@ import com.google.common.collect.ImmutableMap; import com.mysql.cj.MysqlType; import io.airbyte.commons.json.Jsons; -import io.airbyte.db.jdbc.NoOpJdbcStreamingQueryConfiguration; +import io.airbyte.db.jdbc.streaming.NoOpStreamingQueryConfig; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -33,7 +33,7 @@ public static Source sshWrappedSource() { } public TiDBSource() { - super(DRIVER_CLASS, new NoOpJdbcStreamingQueryConfiguration(), new TiDBSourceOperations()); + super(DRIVER_CLASS, NoOpStreamingQueryConfig::new, new TiDBSourceOperations()); } @Override