diff --git a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java index ac92262..a80193e 100755 --- a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java @@ -39,6 +39,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.SQLType; import java.sql.SQLWarning; import java.time.Duration; @@ -55,6 +56,7 @@ import java.util.function.Function; import java.util.stream.IntStream; +import static java.sql.Statement.CLOSE_ALL_RESULTS; import static java.sql.Statement.KEEP_CURRENT_RESULT; import static java.sql.Statement.RETURN_GENERATED_KEYS; import static java.util.Objects.requireNonNullElse; @@ -1158,26 +1160,56 @@ private OracleResultImpl getWarnings(OracleResultImpl result) { */ private Publisher deallocate(Collection results) { - // Close the statement after all results are consumed + // Set up a counter that is decremented as each result is consumed. AtomicInteger unconsumed = new AtomicInteger(results.size()); + + // Set up a publisher that decrements the counter, and closes the + // statement when it reaches zero Publisher closeStatement = adapter.getLock().run(() -> { if (unconsumed.decrementAndGet() == 0) - preparedStatement.close(); + closeStatement(); }); + // Tell each unconsumed result to decrement the unconsumed count, and then + // close the statement when the count reaches zero. for (OracleResultImpl result : results) { if (!result.onConsumed(closeStatement)) unconsumed.decrementAndGet(); } - // If all results have already been consumed, the returned - // publisher closes the statement + // If there are no results, or all results have already been consumed, + // then the returned publisher closes the statement. if (unconsumed.get() == 0) - addDeallocation(adapter.getLock().run(preparedStatement::close)); + addDeallocation(adapter.getLock().run(this::closeStatement)); return deallocators; } + /** + * Closes the JDBC {@link #preparedStatement}. This method should only be + * called while holding the + * {@linkplain ReactiveJdbcAdapter#getLock() connection lock} + * @throws SQLException If the statement fails to close. + */ + private void closeStatement() throws SQLException { + try { + // Workaround Oracle JDBC bug #34545179: ResultSet references are + // retained even when the statement is closed. Calling getMoreResults + // with the CLOSE_ALL_RESULTS argument forces the driver to + // de-reference them. + preparedStatement.getMoreResults(CLOSE_ALL_RESULTS); + } + catch (SQLException sqlException) { + // It may be the case that the JDBC connection was closed, and so the + // statement was closed with it. Check for this, and ignore the + // SQLException if so. + if (!jdbcConnection.isClosed()) + throw sqlException; + } + + preparedStatement.close(); + } + /** * Sets the {@code value} of a {@code preparedStatement} parameter at the * specified {@code index}. If a non-null {@code type} is provided, then it is @@ -1454,7 +1486,7 @@ private JdbcBatch( */ @Override protected Publisher bind() { - @SuppressWarnings({"unchecked","rawtypes"}) + @SuppressWarnings({"unchecked"}) Publisher[] bindPublishers = new Publisher[batchSize]; for (int i = 0; i < batchSize; i++) { bindPublishers[i] = Flux.concat(