Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 JDBC source: adjust streaming query fetch size dynamically #12400

Merged
merged 16 commits into from
Apr 29, 2022
Merged
7 changes: 4 additions & 3 deletions airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig;
import io.airbyte.db.mongodb.MongoDatabase;
import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.val;
import org.apache.commons.dbcp2.BasicDataSource;
import org.jooq.SQLDialect;
Expand Down Expand Up @@ -188,13 +189,13 @@ public static JdbcDatabase createStreamingJdbcDatabase(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final JdbcStreamingQueryConfiguration jdbcStreamingQuery,
final Supplier<JdbcStreamingQueryConfig> streamingQueryConfigProvider,
final Map<String, String> connectionProperties,
final JdbcCompatibleSourceOperations<?> sourceOperations) {
final BasicDataSource connectionPool =
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, connectionProperties);

return new StreamingJdbcDatabase(connectionPool, sourceOperations, jdbcStreamingQuery);
return new StreamingJdbcDatabase(connectionPool, sourceOperations, streamingQueryConfigProvider);
}

private static BasicDataSource createBasicDataSource(final String username,
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,42 @@
import com.google.errorprone.annotations.MustBeClosed;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.sql.DataSource;

/**
* This database allows a developer to specify a {@link JdbcStreamingQueryConfiguration}. This
* allows the developer to specify the correct configuration in order for a
* {@link PreparedStatement} to execute as in a streaming / chunked manner.
* This database allows a developer to specify a {@link JdbcStreamingQueryConfig}. This allows the
* developer to specify the correct configuration in order for a {@link PreparedStatement} to
* execute as in a streaming / chunked manner.
*/
public class StreamingJdbcDatabase extends DefaultJdbcDatabase {

private final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration;
private final Supplier<JdbcStreamingQueryConfig> streamingQueryConfigProvider;

public StreamingJdbcDatabase(final DataSource dataSource,
final JdbcCompatibleSourceOperations<?> sourceOperations,
final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration) {
final Supplier<JdbcStreamingQueryConfig> streamingQueryConfigProvider) {
super(dataSource, sourceOperations);
this.jdbcStreamingQueryConfiguration = jdbcStreamingQueryConfiguration;
this.streamingQueryConfigProvider = streamingQueryConfigProvider;
}

/**
* Assuming that the {@link JdbcStreamingQueryConfiguration} is configured correctly for the JDBC
* driver being used, this method will return data in streaming / chunked fashion. Review the
* provided {@link JdbcStreamingQueryConfiguration} to understand the size of these chunks. If the
* entire stream is consumed the database connection will be closed automatically and the caller
* need not call close on the returned stream. This query (and the first chunk) are fetched
* immediately. Subsequent chunks will not be pulled until the first chunk is consumed.
* Assuming that the {@link JdbcStreamingQueryConfig} is configured correctly for the JDBC driver
* being used, this method will return data in streaming / chunked fashion. Review the provided
* {@link JdbcStreamingQueryConfig} to understand the size of these chunks. If the entire stream is
* consumed the database connection will be closed automatically and the caller need not call close
* on the returned stream. This query (and the first chunk) are fetched immediately. Subsequent
* chunks will not be pulled until the first chunk is consumed.
*
* @param statementCreator create a {@link PreparedStatement} from a {@link Connection}.
* @param recordTransform transform each record of that result set into the desired type. do NOT
Expand All @@ -53,10 +59,10 @@ public <T> Stream<T> unsafeQuery(final CheckedFunction<Connection, PreparedState
throws SQLException {
try {
final Connection connection = dataSource.getConnection();
final PreparedStatement ps = statementCreator.apply(connection);
// allow configuration of connection and prepared statement to make streaming possible.
jdbcStreamingQueryConfiguration.accept(connection, ps);
return toUnsafeStream(ps.executeQuery(), recordTransform)
final PreparedStatement statement = statementCreator.apply(connection);
final JdbcStreamingQueryConfig streamingConfig = streamingQueryConfigProvider.get();
streamingConfig.initialize(connection, statement);
return toUnsafeStream(statement.executeQuery(), recordTransform, streamingConfig)
.onClose(() -> {
try {
connection.setAutoCommit(true);
Expand All @@ -70,4 +76,32 @@ public <T> Stream<T> unsafeQuery(final CheckedFunction<Connection, PreparedState
}
}

/**
* This method differs from {@link DefaultJdbcDatabase#toUnsafeStream} in that it takes a streaming
* config that adjusts the fetch size dynamically according to sampled row size.
*/
protected static <T> Stream<T> toUnsafeStream(final ResultSet resultSet,
final CheckedFunction<ResultSet, T, SQLException> mapper,
final JdbcStreamingQueryConfig streamingConfig) {
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {

@Override
public boolean tryAdvance(final Consumer<? super T> action) {
try {
if (!resultSet.next()) {
resultSet.close();
return false;
}
final T dataRow = mapper.apply(resultSet);
streamingConfig.accept(resultSet, dataRow);
action.accept(dataRow);
return true;
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}

}, false);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc.streaming;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdaptiveStreamingQueryConfig implements JdbcStreamingQueryConfig {

private static final Logger LOGGER = LoggerFactory.getLogger(AdaptiveStreamingQueryConfig.class);

private final FetchSizeEstimator fetchSizeEstimator;
private int currentFetchSize;

public AdaptiveStreamingQueryConfig() {
this.fetchSizeEstimator = TwoStageSizeEstimator.getInstance();
this.currentFetchSize = FetchSizeConstants.INITIAL_SAMPLE_SIZE;
}

@Override
public void initialize(final Connection connection, final Statement preparedStatement) throws SQLException {
connection.setAutoCommit(false);
preparedStatement.setFetchSize(FetchSizeConstants.INITIAL_SAMPLE_SIZE);
currentFetchSize = FetchSizeConstants.INITIAL_SAMPLE_SIZE;
LOGGER.info("Set initial fetch size: {} rows", preparedStatement.getFetchSize());
}

@Override
public void accept(final ResultSet resultSet, final Object rowData) throws SQLException {
fetchSizeEstimator.accept(rowData);
final Optional<Integer> newFetchSize = fetchSizeEstimator.getFetchSize();
if (newFetchSize.isPresent() && currentFetchSize != newFetchSize.get()) {
currentFetchSize = newFetchSize.get();
resultSet.setFetchSize(currentFetchSize);
LOGGER.info("Updated fetch size: {} rows", currentFetchSize);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc.streaming;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;

/**
* Fetch size (number of rows) = target buffer byte size / mean row byte size
*/
public abstract class BaseSizeEstimator implements FetchSizeEstimator {

// desired buffer size in memory
private final long targetBufferByteSize;
private final int minFetchSize;
private final int defaultFetchSize;
private final int maxFetchSize;

// mean byte size per row
protected double meanByteSize = 0.0;

protected BaseSizeEstimator(final long targetBufferByteSize,
final int minFetchSize,
final int defaultFetchSize,
final int maxFetchSize) {
this.targetBufferByteSize = targetBufferByteSize;
this.minFetchSize = minFetchSize;
this.defaultFetchSize = defaultFetchSize;
this.maxFetchSize = maxFetchSize;
}

/**
* What we really want is to know how much memory each {@code rowData} takes. However, there is no
* easy way to measure that. So we use the byte size of the serialized row to approximate that.
*/
@VisibleForTesting
public static long getEstimatedByteSize(final Object rowData) {
if (rowData == null) {
return 0L;
}
// The string length is multiplied by 4 assuming each character is a
// full UTF-8 character. In reality, a UTF-8 character is encoded as
// 1 to 4 bytes. So this is an overestimation. This is alright, because
// the whole method only provides an estimation. Please never convert
// the string to byte[] to get the exact length. That conversion is known
// to introduce a lot of memory overhead.
return Jsons.serialize(rowData).length() * 4L;
tuliren marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* This method ensures that the fetch size is between {@code minFetchSize} and {@code maxFetchSize},
* inclusively.
*/
protected int getBoundedFetchSize() {
if (meanByteSize <= 0.0) {
return defaultFetchSize;
}
final long rawFetchSize = Math.round(targetBufferByteSize / meanByteSize);
if (rawFetchSize > Integer.MAX_VALUE) {
return maxFetchSize;
}
return Math.max(minFetchSize, Math.min(maxFetchSize, (int) rawFetchSize));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this ever return 0?
What happens when the estimator estimates that even a single row is larger than available buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it will always return a value in the range of [minFetchSize, maxFetchSize].

As I mentioned in the above comment, as long as the row can fit in the total heap, the connector can still handle it.

}

double getMeanRowByteSize() {
return meanByteSize;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc.streaming;

public final class FetchSizeConstants {

// The desired buffer size in memory to store the fetched rows.
// This size is not enforced. It is only used to calculate a proper
// fetch size. The max row size the connector can handle is actually
// limited by the heap size.
public static final long TARGET_BUFFER_BYTE_SIZE = 200L * 1024L * 1024L; // 200 MB
// sample size for making the first estimation of the row size
public static final int INITIAL_SAMPLE_SIZE = 10;
// sample size for making the post-initial estimation of the row size
public static final int POST_INITIAL_SAMPLE_SIZE = 10;
// sample every N rows during the post-initial stage
public static final int SAMPLE_FREQUENCY = 100;

public static final int MIN_FETCH_SIZE = 1;
public static final int DEFAULT_FETCH_SIZE = 1000;
public static final int MAX_FETCH_SIZE = 100_000;

private FetchSizeConstants() {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc.streaming;

import java.util.Optional;
import java.util.function.Consumer;

public interface FetchSizeEstimator extends Consumer<Object> {

/**
* @return the estimated fetch size when the estimation is ready
*/
Optional<Integer> getFetchSize();

}
Loading