-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
🎉 JDBC source: adjust streaming query fetch size dynamically (#12400)
* Merge all streaming configs to one * Implement new streaming query config * Format code * Fix comparison * Use double for mean byte size * Update fetch size only when changed * Calculate mean size by sampling n rows * Add javadoc * Change min fetch size to 1 * Add comment by buffer size * Update java connector template * Perform division first * Add unit test for fetching large rows * Format code * Fix connector compilation error
- Loading branch information
Showing
48 changed files
with
862 additions
and
351 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
19 changes: 0 additions & 19 deletions
19
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/Db2JdbcStreamingQueryConfiguration.java
This file was deleted.
Oops, something went wrong.
15 changes: 0 additions & 15 deletions
15
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcStreamingQueryConfiguration.java
This file was deleted.
Oops, something went wrong.
19 changes: 0 additions & 19 deletions
19
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/NoOpJdbcStreamingQueryConfiguration.java
This file was deleted.
Oops, something went wrong.
19 changes: 0 additions & 19 deletions
19
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/OracleJdbcStreamingQueryConfiguration.java
This file was deleted.
Oops, something went wrong.
19 changes: 0 additions & 19 deletions
19
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/PostgresJdbcStreamingQueryConfiguration.java
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
46 changes: 46 additions & 0 deletions
46
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/AdaptiveStreamingQueryConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.jdbc.streaming; | ||
|
||
import java.sql.Connection; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import java.sql.Statement; | ||
import java.util.Optional; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class AdaptiveStreamingQueryConfig implements JdbcStreamingQueryConfig { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(AdaptiveStreamingQueryConfig.class); | ||
|
||
private final FetchSizeEstimator fetchSizeEstimator; | ||
private int currentFetchSize; | ||
|
||
public AdaptiveStreamingQueryConfig() { | ||
this.fetchSizeEstimator = TwoStageSizeEstimator.getInstance(); | ||
this.currentFetchSize = FetchSizeConstants.INITIAL_SAMPLE_SIZE; | ||
} | ||
|
||
@Override | ||
public void initialize(final Connection connection, final Statement preparedStatement) throws SQLException { | ||
connection.setAutoCommit(false); | ||
preparedStatement.setFetchSize(FetchSizeConstants.INITIAL_SAMPLE_SIZE); | ||
currentFetchSize = FetchSizeConstants.INITIAL_SAMPLE_SIZE; | ||
LOGGER.info("Set initial fetch size: {} rows", preparedStatement.getFetchSize()); | ||
} | ||
|
||
@Override | ||
public void accept(final ResultSet resultSet, final Object rowData) throws SQLException { | ||
fetchSizeEstimator.accept(rowData); | ||
final Optional<Integer> newFetchSize = fetchSizeEstimator.getFetchSize(); | ||
if (newFetchSize.isPresent() && currentFetchSize != newFetchSize.get()) { | ||
currentFetchSize = newFetchSize.get(); | ||
resultSet.setFetchSize(currentFetchSize); | ||
LOGGER.info("Updated fetch size: {} rows", currentFetchSize); | ||
} | ||
} | ||
|
||
} |
71 changes: 71 additions & 0 deletions
71
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/BaseSizeEstimator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.jdbc.streaming; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import io.airbyte.commons.json.Jsons; | ||
|
||
/** | ||
* Fetch size (number of rows) = target buffer byte size / mean row byte size | ||
*/ | ||
public abstract class BaseSizeEstimator implements FetchSizeEstimator { | ||
|
||
// desired buffer size in memory | ||
private final long targetBufferByteSize; | ||
private final int minFetchSize; | ||
private final int defaultFetchSize; | ||
private final int maxFetchSize; | ||
|
||
// mean byte size per row | ||
protected double meanByteSize = 0.0; | ||
|
||
protected BaseSizeEstimator(final long targetBufferByteSize, | ||
final int minFetchSize, | ||
final int defaultFetchSize, | ||
final int maxFetchSize) { | ||
this.targetBufferByteSize = targetBufferByteSize; | ||
this.minFetchSize = minFetchSize; | ||
this.defaultFetchSize = defaultFetchSize; | ||
this.maxFetchSize = maxFetchSize; | ||
} | ||
|
||
/** | ||
* What we really want is to know how much memory each {@code rowData} takes. However, there is no | ||
* easy way to measure that. So we use the byte size of the serialized row to approximate that. | ||
*/ | ||
@VisibleForTesting | ||
public static long getEstimatedByteSize(final Object rowData) { | ||
if (rowData == null) { | ||
return 0L; | ||
} | ||
// The string length is multiplied by 4 assuming each character is a | ||
// full UTF-8 character. In reality, a UTF-8 character is encoded as | ||
// 1 to 4 bytes. So this is an overestimation. This is alright, because | ||
// the whole method only provides an estimation. Please never convert | ||
// the string to byte[] to get the exact length. That conversion is known | ||
// to introduce a lot of memory overhead. | ||
return Jsons.serialize(rowData).length() * 4L; | ||
} | ||
|
||
/** | ||
* This method ensures that the fetch size is between {@code minFetchSize} and {@code maxFetchSize}, | ||
* inclusively. | ||
*/ | ||
protected int getBoundedFetchSize() { | ||
if (meanByteSize <= 0.0) { | ||
return defaultFetchSize; | ||
} | ||
final long rawFetchSize = Math.round(targetBufferByteSize / meanByteSize); | ||
if (rawFetchSize > Integer.MAX_VALUE) { | ||
return maxFetchSize; | ||
} | ||
return Math.max(minFetchSize, Math.min(maxFetchSize, (int) rawFetchSize)); | ||
} | ||
|
||
double getMeanRowByteSize() { | ||
return meanByteSize; | ||
} | ||
|
||
} |
27 changes: 27 additions & 0 deletions
27
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/FetchSizeConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.jdbc.streaming; | ||
|
||
public final class FetchSizeConstants { | ||
|
||
// The desired buffer size in memory to store the fetched rows. | ||
// This size is not enforced. It is only used to calculate a proper | ||
// fetch size. The max row size the connector can handle is actually | ||
// limited by the heap size. | ||
public static final long TARGET_BUFFER_BYTE_SIZE = 200L * 1024L * 1024L; // 200 MB | ||
// sample size for making the first estimation of the row size | ||
public static final int INITIAL_SAMPLE_SIZE = 10; | ||
// sample size for making the post-initial estimation of the row size | ||
public static final int POST_INITIAL_SAMPLE_SIZE = 10; | ||
// sample every N rows during the post-initial stage | ||
public static final int SAMPLE_FREQUENCY = 100; | ||
|
||
public static final int MIN_FETCH_SIZE = 1; | ||
public static final int DEFAULT_FETCH_SIZE = 1000; | ||
public static final int MAX_FETCH_SIZE = 100_000; | ||
|
||
private FetchSizeConstants() {} | ||
|
||
} |
17 changes: 17 additions & 0 deletions
17
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/FetchSizeEstimator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.jdbc.streaming; | ||
|
||
import java.util.Optional; | ||
import java.util.function.Consumer; | ||
|
||
public interface FetchSizeEstimator extends Consumer<Object> { | ||
|
||
/** | ||
* @return the estimated fetch size when the estimation is ready | ||
*/ | ||
Optional<Integer> getFetchSize(); | ||
|
||
} |
Oops, something went wrong.