Skip to content

Commit

Permalink
🐛 Fixed connection leak in StreamingJdbcDatabase (#20888)
Browse files Browse the repository at this point in the history
* Fixed connection leak in StreamingJdbcDatabase

* fixed checkstyle
  • Loading branch information
VitaliiMaltsev authored Jan 22, 2023
1 parent 0548d5f commit 126ce36
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
public abstract class JdbcDatabase extends SqlDatabase {

protected final JdbcCompatibleSourceOperations<?> sourceOperations;
protected Exception streamException;
protected boolean isStreamFailed;

public JdbcDatabase(final JdbcCompatibleSourceOperations<?> sourceOperations) {
this.sourceOperations = sourceOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public <T> Stream<T> unsafeQuery(final CheckedFunction<Connection, PreparedState
try {
connection.setAutoCommit(true);
connection.close();
if (isStreamFailed) {
throw new RuntimeException(streamException);
}
} catch (final SQLException e) {
throw new RuntimeException(e);
}
Expand All @@ -84,9 +87,9 @@ public <T> Stream<T> unsafeQuery(final CheckedFunction<Connection, PreparedState
* This method differs from {@link DefaultJdbcDatabase#toUnsafeStream} in that it takes a streaming
* config that adjusts the fetch size dynamically according to sampled row size.
*/
protected static <T> Stream<T> toUnsafeStream(final ResultSet resultSet,
final CheckedFunction<ResultSet, T, SQLException> mapper,
final JdbcStreamingQueryConfig streamingConfig) {
protected <T> Stream<T> toUnsafeStream(final ResultSet resultSet,
final CheckedFunction<ResultSet, T, SQLException> mapper,
final JdbcStreamingQueryConfig streamingConfig) {
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {

@Override
Expand All @@ -102,7 +105,11 @@ public boolean tryAdvance(final Consumer<? super T> action) {
return true;
} catch (final SQLException e) {
LOGGER.error("SQLState: {}, Message: {}", e.getSQLState(), e.getMessage());
throw new RuntimeException(e);
streamException = e;
isStreamFailed = true;
// throwing an exception in tryAdvance() method lead to the endless loop in Spliterator and stream
// will never close
return false;
}
}

Expand Down

0 comments on commit 126ce36

Please sign in to comment.