Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 JDBC source: adjust streaming query fetch size dynamically #12400

Merged
merged 16 commits into from
Apr 29, 2022
Merged
7 changes: 4 additions & 3 deletions airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig;
import io.airbyte.db.mongodb.MongoDatabase;
import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.val;
import org.apache.commons.dbcp2.BasicDataSource;
import org.jooq.SQLDialect;
Expand Down Expand Up @@ -188,13 +189,13 @@ public static JdbcDatabase createStreamingJdbcDatabase(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final JdbcStreamingQueryConfiguration jdbcStreamingQuery,
final Supplier<JdbcStreamingQueryConfig> streamingQueryConfigProvider,
final Map<String, String> connectionProperties,
final JdbcCompatibleSourceOperations<?> sourceOperations) {
final BasicDataSource connectionPool =
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, connectionProperties);

return new StreamingJdbcDatabase(connectionPool, sourceOperations, jdbcStreamingQuery);
return new StreamingJdbcDatabase(connectionPool, sourceOperations, streamingQueryConfigProvider);
}

private static BasicDataSource createBasicDataSource(final String username,
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,46 @@

package io.airbyte.db.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.errorprone.annotations.MustBeClosed;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.sql.DataSource;

/**
* This database allows a developer to specify a {@link JdbcStreamingQueryConfiguration}. This
* allows the developer to specify the correct configuration in order for a
* {@link PreparedStatement} to execute as in a streaming / chunked manner.
* This database allows a developer to specify a {@link JdbcStreamingQueryConfig}. This allows the
* developer to specify the correct configuration in order for a {@link PreparedStatement} to
* execute as in a streaming / chunked manner.
*/
public class StreamingJdbcDatabase extends DefaultJdbcDatabase {

private final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration;
private final Supplier<JdbcStreamingQueryConfig> streamingQueryConfigProvider;

public StreamingJdbcDatabase(final DataSource dataSource,
final JdbcCompatibleSourceOperations<?> sourceOperations,
final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration) {
final Supplier<JdbcStreamingQueryConfig> streamingQueryConfigProvider) {
super(dataSource, sourceOperations);
this.jdbcStreamingQueryConfiguration = jdbcStreamingQueryConfiguration;
this.streamingQueryConfigProvider = streamingQueryConfigProvider;
}

/**
* Assuming that the {@link JdbcStreamingQueryConfiguration} is configured correctly for the JDBC
* driver being used, this method will return data in streaming / chunked fashion. Review the
* provided {@link JdbcStreamingQueryConfiguration} to understand the size of these chunks. If the
* entire stream is consumed the database connection will be closed automatically and the caller
* need not call close on the returned stream. This query (and the first chunk) are fetched
* immediately. Subsequent chunks will not be pulled until the first chunk is consumed.
* Assuming that the {@link JdbcStreamingQueryConfig} is configured correctly for the JDBC driver
* being used, this method will return data in streaming / chunked fashion. Review the provided
* {@link JdbcStreamingQueryConfig} to understand the size of these chunks. If the entire stream is
* consumed the database connection will be closed automatically and the caller need not call close
* on the returned stream. This query (and the first chunk) are fetched immediately. Subsequent
* chunks will not be pulled until the first chunk is consumed.
*
* @param statementCreator create a {@link PreparedStatement} from a {@link Connection}.
* @param recordTransform transform each record of that result set into the desired type. do NOT
Expand All @@ -53,10 +60,10 @@ public <T> Stream<T> unsafeQuery(final CheckedFunction<Connection, PreparedState
throws SQLException {
try {
final Connection connection = dataSource.getConnection();
final PreparedStatement ps = statementCreator.apply(connection);
// allow configuration of connection and prepared statement to make streaming possible.
jdbcStreamingQueryConfiguration.accept(connection, ps);
return toUnsafeStream(ps.executeQuery(), recordTransform)
final PreparedStatement statement = statementCreator.apply(connection);
final JdbcStreamingQueryConfig streamingConfig = streamingQueryConfigProvider.get();
streamingConfig.initialize(connection, statement);
return toUnsafeStream(statement.executeQuery(), recordTransform, streamingConfig)
.onClose(() -> {
try {
connection.setAutoCommit(true);
Expand All @@ -70,4 +77,46 @@ public <T> Stream<T> unsafeQuery(final CheckedFunction<Connection, PreparedState
}
}

@MustBeClosed
@Override
public Stream<JsonNode> unsafeQuery(final String sql, final String... params) throws SQLException {
return unsafeQuery(connection -> {
final PreparedStatement statement = connection.prepareStatement(sql);
int i = 1;
for (final String param : params) {
statement.setString(i, param);
++i;
}
return statement;
}, sourceOperations::rowToJson);
}

/**
* This method differs from {@link DefaultJdbcDatabase#toUnsafeStream} in that it takes a streaming
* config that adjusts the fetch size dynamically according to sampled row size.
*/
protected static <T> Stream<T> toUnsafeStream(final ResultSet resultSet,
final CheckedFunction<ResultSet, T, SQLException> mapper,
final JdbcStreamingQueryConfig streamingConfig) {
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {

@Override
public boolean tryAdvance(final Consumer<? super T> action) {
try {
if (!resultSet.next()) {
resultSet.close();
return false;
}
final T dataRow = mapper.apply(resultSet);
streamingConfig.accept(resultSet, dataRow);
action.accept(dataRow);
return true;
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}

}, false);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc.streaming;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Optional;

public class AdaptiveStreamingQueryConfig implements JdbcStreamingQueryConfig {

private final FetchSizeEstimator fetchSizeEstimator;

public AdaptiveStreamingQueryConfig() {
this.fetchSizeEstimator = TwoStageSizeEstimator.getInstance();
}

@Override
public void initialize(final Connection connection, final Statement preparedStatement) throws SQLException {
connection.setAutoCommit(false);
preparedStatement.setFetchSize(FetchSizeConstants.INITIAL_SAMPLE_SIZE);
}

@Override
public void accept(final ResultSet resultSet, final Object rowData) throws SQLException {
fetchSizeEstimator.accept(rowData);
final Optional<Integer> newFetchSize = fetchSizeEstimator.getFetchSize();
if (newFetchSize.isPresent()) {
resultSet.setFetchSize(newFetchSize.get());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc.streaming;

import io.airbyte.commons.json.Jsons;

public abstract class BaseSizeEstimator implements FetchSizeEstimator {

private final long bufferByteSize;
private final int minFetchSize;
private final int defaultFetchSize;
private final int maxFetchSize;

protected double meanByteSize = 0.0;

protected BaseSizeEstimator(final long bufferByteSize,
tuliren marked this conversation as resolved.
Show resolved Hide resolved
final int minFetchSize,
final int defaultFetchSize,
final int maxFetchSize) {
this.bufferByteSize = bufferByteSize;
this.minFetchSize = minFetchSize;
this.defaultFetchSize = defaultFetchSize;
this.maxFetchSize = maxFetchSize;
}

/**
* Use serialized string size as an estimation of the byte size.
*/
static long getEstimatedByteSize(final Object rowData) {
if (rowData == null) {
return 0L;
}
return Jsons.serialize(rowData).length() * 4L;
tuliren marked this conversation as resolved.
Show resolved Hide resolved
}

protected int getBoundedFetchSize() {
if (meanByteSize <= 0L) {
return defaultFetchSize;
}
final double rawFetchSize = bufferByteSize / meanByteSize;
return Math.max(minFetchSize, Math.min(maxFetchSize, Double.valueOf(rawFetchSize).intValue()));
}

long getMeanRowByteSize() {
return Double.valueOf(meanByteSize).longValue();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc.streaming;

public final class FetchSizeConstants {

public static final long BUFFER_BYTE_SIZE = 200L * 1024L * 1024L; // 200 MB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this configurable, so that if a single row exceeds 200MB we can reconfigure the pod to have more memory and reconfigure the connector to have a larger buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 200 MB buffer size is not enforced. It is only used to calculated the fetch size. Currently, each connector has much more than 200 MB of heap size. The max row size the connector can handle is actually limited by the heap size.

I'd prefer not to expose this as a connector parameter. Users should not worry about this kind of low level details. It will make the setup confusing. For example, currently we let people configure part size in the blob storage connector. People don't always get what it means, and sometimes they can set a wrong value, resulting in failed connections. We are in the process of removing it.

If a row is larger than 200 MB, the user should store the data in the blob storage or something else. I don't think we need to support such edge case. No matter how large the buffer is, there can always be some use case that breaks it.

public static final int INITIAL_SAMPLE_SIZE = 10;
public static final int SAMPLE_FREQUENCY = 100;
public static final int MIN_FETCH_SIZE = 10;
public static final int DEFAULT_FETCH_SIZE = 1000;
public static final int MAX_FETCH_SIZE = 100_000;

private FetchSizeConstants() {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc.streaming;

import java.util.Optional;
import java.util.function.Consumer;

public interface FetchSizeEstimator extends Consumer<Object> {

/**
* @return the estimated fetch size when the estimation is ready
*/
Optional<Integer> getFetchSize();

}
Loading