Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oracle Warning Segments #98

Merged
merged 5 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions src/main/java/oracle/r2dbc/OracleR2dbcWarning.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package oracle.r2dbc;

import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.Result;

import java.util.function.Function;
import java.util.function.Predicate;

/**
* <p>
* A subtype of the {@link Result.Segment} interface that provides information
* on warnings raised by Oracle Database.
* </p><p>
* When a SQL command results in a warning, Oracle R2DBC emits a {@link Result}
* with an {@code OracleR2dbcWarning} segment in addition to any other segments
* that resulted from the SQL command. For example, if a SQL {@code SELECT}
* command results in a warning, then an {@code OracleR2dbcWarning} segment is
* included with the result, along with any {@link Result.RowSegment}s returned
* by the {@code SELECT}.
* </p><p>
* R2DBC drivers typically emit {@code onError} signals for {@code Message}
* segments that are not consumed by {@link Result#filter(Predicate)} or
* {@link Result#flatMap(Function)}. Oracle R2DBC does not apply this behavior
* for warning messages. If an {@code OracleR2dbcWarning}
* segment is not consumed by the {@code filter} or {@code flatMap} methods of
* a {@code Result}, then the warning is discarded and the result may be
* consumed as normal with with the {@code map} or {@code getRowsUpdated}
* methods.
* </p><p>
* Warning messages may be consumed with {@link Result#flatMap(Function)}:
* </p><pre>{@code
* result.flatMap(segment -> {
* if (segment instanceof OracleR2dbcWarning) {
* logWarning(((OracleR2dbcWarning)segment).getMessage());
* return emptyPublisher();
* }
* else {
* ... handle other segment types ...
* }
* })
* }</pre><p>
* A {@code flatMap} function may also be used to convert a warning into an
* {@code onError} signal:
* </p><pre>{@code
* result.flatMap(segment -> {
* if (segment instanceof OracleR2dbcWarning) {
* return errorPublisher(((OracleR2dbcWarning)segment).warning());
* }
* else {
* ... handle other segment types ...
* }
* })
* }</pre>
* @since 1.1.0
*/
public interface OracleR2dbcWarning extends Result.Segment {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extending Result.Message would likely make more sense to pick up what's already provided by the spec.

Copy link
Member Author

@Michael-A-McMahon Michael-A-McMahon Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really appreciate you joining in on this!

I can see how a Message subtype is much easier to work with, and I'm happy to make that change.

Would you be open to adding an instanceof OracleR2dbcWarning check in Spring then? Users are telling me that they don't want warnings to be emitted as onError signals. So if Spring has an instanceof Message check that results in onError, we'd want to avoid taking that branch.

The main goal is to make warnings an "opt-in" type of result. That is, code which consumes a Result should not see a warning unless it explicitly checks for it. It is similar to JDBC's getWarnings methods, where code won't see the warning unless it checks. To this end, calls to Result.getRowsUpdated or Result.map methods would not emit onError for warnings, even if the segment type is a Result.Message subtype.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that emitting warnings as errors creates a lot of inconvenience in the first place.

Users are telling me that they don't want warnings to be emitted as onError signals.

I also understand the desire to inform users that something unexpected has happened that didn't immediately lead to failure. Most other drivers do not emit warnings as errors but rather allow consumption of warning (message) segments when users are interested in such detail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying on this; Knowing that other drivers handle it this way is very helpful!
I've made the suggested changes.


/**
* Returns the warning as an {@link R2dbcException}.
* @return The warning as an {@link R2dbcException}. Not null.
*/
R2dbcException exception();

/**
* Returns the error code of the warning.
* @return The error code of the warning.
*/
int errorCode();

/**
* Returns the SQLState of the warning.
* @return The SQLState of the warning. Not null.
*/
String sqlState();

/**
* Returns the text of the warning message.
* @return The text of the warning message. Not null.
*/
String message();
}
16 changes: 15 additions & 1 deletion src/main/java/oracle/r2dbc/impl/OracleR2dbcExceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,26 @@ static void requireOpenConnection(java.sql.Connection jdbcConnection) {
* as the specified {@code sqlException}. Not null.
*/
static R2dbcException toR2dbcException(SQLException sqlException) {
return toR2dbcException(sqlException, getSql(sqlException));
}

/**
* Converts a {@link SQLException} into an {@link R2dbcException}, as
* specified by {@link #toR2dbcException(SQLException)}. This method accepts
* a SQL string argument. It should be used in cases where the SQL can not
* be extracted by {@link #getSql(SQLException)}.
* @param sqlException A {@code SQLException} to convert. Not null.
* @param sql SQL that caused the exception
* @return an {@code R2dbcException} that indicates the same error conditions
* as the specified {@code sqlException}. Not null.
*/
static R2dbcException toR2dbcException(
SQLException sqlException, String sql) {
assert sqlException != null : "sqlException is null";

final String message = sqlException.getMessage();
final String sqlState = sqlException.getSQLState();
final int errorCode = sqlException.getErrorCode();
final String sql = getSql(sqlException);

if (sqlException instanceof SQLNonTransientException) {
if (sqlException instanceof SQLSyntaxErrorException) {
Expand Down
52 changes: 47 additions & 5 deletions src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import oracle.r2dbc.OracleR2dbcWarning;
import oracle.r2dbc.impl.ReadablesMetadata.RowMetadataImpl;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -398,14 +399,15 @@ static OracleResultImpl createErrorResult(R2dbcException r2dbcException) {
* Creates a {@code Result} that publishes a {@code warning} as a
* {@link Message} segment, followed by any {@code Segment}s of a
* {@code result}.
* @param sql The SQL that resulted in a waring. Not null.
* @param warning Warning to publish. Not null.
* @param result Result to publisher. Not null.
* @return A {@code Result} for a {@code Statement} execution that
* completed with a warning.
*/
static OracleResultImpl createWarningResult(
SQLWarning warning, OracleResultImpl result) {
return new WarningResult(warning, result);
String sql, SQLWarning warning, OracleResultImpl result) {
return new WarningResult(sql, warning, result);
}

/**
Expand Down Expand Up @@ -627,6 +629,9 @@ <T> Publisher<T> publishSegments(Function<Segment, T> mappingFunction) {
*/
private static final class WarningResult extends OracleResultImpl {

/** The SQL that resulted in a warning */
private final String sql;

/** The warning of this result */
private final SQLWarning warning;

Expand All @@ -636,11 +641,13 @@ private static final class WarningResult extends OracleResultImpl {
/**
* Constructs a result that publishes a {@code warning} as a
* {@link Message}, and then publishes the segments of a {@code result}.
* @param sql The SQL that resulted in a warning
* @param warning Warning to publish. Not null.
* @param result Result of segments to publish after the warning. Not null.
*/
private WarningResult(
SQLWarning warning, OracleResultImpl result) {
String sql, SQLWarning warning, OracleResultImpl result) {
this.sql = sql;
this.warning = warning;
this.result = result;
}
Expand All @@ -649,8 +656,11 @@ private WarningResult(
<T> Publisher<T> publishSegments(Function<Segment, T> mappingFunction) {
return Flux.fromStream(Stream.iterate(
warning, Objects::nonNull, SQLWarning::getNextWarning)
.map(OracleR2dbcExceptions::toR2dbcException)
.map(MessageImpl::new))
.map(nextWarning ->
// It is noted that SQL can not be extracted from Oracle JDBC's
// SQLWarning objects, so it must be explicitly provided here.
OracleR2dbcExceptions.toR2dbcException(warning, sql))
.map(WarningImpl::new))
.map(mappingFunction)
// Invoke publishSegments(Class, Function) rather than
// publishSegments(Function) to update the state of the result; Namely,
Expand Down Expand Up @@ -803,6 +813,38 @@ public String message() {
}
}

/**
* Implementation of {@link OracleR2dbcWarning}.
*/
private static final class WarningImpl implements OracleR2dbcWarning {

private final R2dbcException warning;

private WarningImpl(R2dbcException exception) {
this.warning = exception;
}

@Override
public R2dbcException exception() {
return warning;
}

@Override
public int errorCode() {
return warning.getErrorCode();
}

@Override
public String sqlState() {
return warning.getSqlState();
}

@Override
public String message() {
return warning.getMessage();
}
}

/**
* Returns a {@code Publisher} that emits the signals of a {@code publisher}
* to a single {@link org.reactivestreams.Subscriber}, and rejects additional
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ private OracleResultImpl getWarnings(OracleResultImpl result) {
preparedStatement.clearWarnings();
return warning == null
? result
: OracleResultImpl.createWarningResult(warning, result);
: OracleResultImpl.createWarningResult(sql, warning, result);
});
}

Expand Down
60 changes: 60 additions & 0 deletions src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
package oracle.r2dbc.impl;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Result.Message;
import io.r2dbc.spi.Result.RowSegment;
import io.r2dbc.spi.Result.UpdateCount;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import io.r2dbc.spi.Statement;
import oracle.r2dbc.OracleR2dbcWarning;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -55,6 +58,9 @@
import static oracle.r2dbc.util.Awaits.tryAwaitExecution;
import static oracle.r2dbc.util.Awaits.tryAwaitNone;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -624,4 +630,58 @@ else if (index == 1) {
}
}

/**
* Verifies that a warnings are emitted as {@code Message} segments with an
* {@link oracle.r2dbc.OracleR2dbcWarning}.
*/
@Test
public void testOracleR2dbcWarning() {
Connection connection = awaitOne(sharedConnection());
try {

// Expect a warning for forcing a view that references a non-existent
// table
String sql = "CREATE OR REPLACE FORCE VIEW testOracleR2dbcWarning AS" +
" SELECT x FROM thisdoesnotexist";
Statement warningStatement = connection.createStatement(sql);

// Collect the segments
List<Result.Segment> segments =
awaitMany(Flux.from(warningStatement.execute())
.flatMap(result -> result.flatMap(Mono::just)));

// Expect the warning segment first. Expect it to have the fixed message
// and error number used by Oracle JDBC for all warnings.
Result.Segment secondSegment = segments.get(0);
OracleR2dbcWarning warning =
assertInstanceOf(OracleR2dbcWarning.class, secondSegment);
assertEquals(
warning.message(), "Warning: execution completed with warning");
assertEquals(warning.errorCode(), 17110);
assertEquals("99999", warning.sqlState()); // Default SQL state
R2dbcException exception =
assertInstanceOf(R2dbcException.class, warning.exception());
assertEquals(warning.message(), exception.getMessage());
assertEquals(warning.errorCode(), exception.getErrorCode());
assertEquals(warning.sqlState(), exception.getSqlState());
assertEquals(sql, exception.getSql());


// Expect the update count segment last. Warnings are always emitted
// first.
Result.Segment firstSegment = segments.get(1);
assertEquals(0,
assertInstanceOf(UpdateCount.class, firstSegment).value());
assertFalse(firstSegment instanceof OracleR2dbcWarning);

// Verify that there are not any more segments
assertEquals(2, segments.size());
}
finally {
tryAwaitExecution(
connection.createStatement("DROP VIEW testOracleR2dbcWarning"));
tryAwaitNone(connection.close());
}
}

}
14 changes: 8 additions & 6 deletions src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.r2dbc.spi.Statement;
import io.r2dbc.spi.Type;
import oracle.r2dbc.OracleR2dbcOptions;
import oracle.r2dbc.OracleR2dbcWarning;
import oracle.r2dbc.test.DatabaseConfig;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -1997,7 +1998,7 @@ public void testOutAndImplicitResult() {

/**
* Verifies that {@link OracleStatementImpl#execute()} emits a {@link Result}
* with a {@link Message} segment when the execution results in a
* with a {@link OracleR2dbcWarning} segment when the execution results in a
* warning.
*/
@Test
Expand All @@ -2007,9 +2008,10 @@ public void testWarningMessage() {
try {

// Create a procedure using invalid syntax and expect the Result to
// have a Message with an R2dbcException having a SQLWarning as it's
// initial cause. Expect the Result to have an update count of zero as
// well, indicating that the statement completed after the warning.
// have an OracleR2dbcWarning with an R2dbcException having a SQLWarning
// as it's initial cause. Expect the Result to have an update count of
// zero as well, indicating that the statement completed after the
// warning.
AtomicInteger segmentCount = new AtomicInteger(0);
R2dbcException r2dbcException =
awaitOne(Flux.from(connection.createStatement(
Expand All @@ -2020,9 +2022,9 @@ public void testWarningMessage() {
result.flatMap(segment -> {
int index = segmentCount.getAndIncrement();
if (index == 0) {
assertTrue(segment instanceof Message,
assertTrue(segment instanceof OracleR2dbcWarning,
"Unexpected Segment: " + segment);
return Mono.just(((Message)segment).exception());
return Mono.just(((OracleR2dbcWarning)segment).exception());
}
else if (index == 1) {
assertTrue(segment instanceof UpdateCount,
Expand Down