diff --git a/.github/workflows/startup/01_createUser.sql b/.github/workflows/startup/01_createUser.sql
index 3a26b85..f2b30cb 100644
--- a/.github/workflows/startup/01_createUser.sql
+++ b/.github/workflows/startup/01_createUser.sql
@@ -36,12 +36,13 @@
-- v$transaction (to verify if TransactionDefinitions are applied).
-- v$session (to verify if VSESSION_* Options are applied).
ALTER SESSION SET CONTAINER=xepdb1;
-CREATE ROLE r2dbc_test_user;
-GRANT SELECT ON v_$open_cursor TO r2dbc_test_user;
-GRANT SELECT ON v_$transaction TO r2dbc_test_user;
-GRANT SELECT ON v_$session TO r2dbc_test_user;
+CREATE ROLE r2dbc_test_role;
+GRANT SELECT ON v_$open_cursor TO r2dbc_test_role;
+GRANT SELECT ON v_$transaction TO r2dbc_test_role;
+GRANT SELECT ON v_$session TO r2dbc_test_role;
+GRANT CREATE VIEW TO r2dbc_test_role;
CREATE USER test IDENTIFIED BY test;
-GRANT connect, resource, unlimited tablespace, r2dbc_test_user TO test;
+GRANT connect, resource, unlimited tablespace, r2dbc_test_role TO test;
ALTER USER test DEFAULT TABLESPACE users;
ALTER USER test TEMPORARY TABLESPACE temp;
diff --git a/README.md b/README.md
index bc12741..fa9299a 100644
--- a/README.md
+++ b/README.md
@@ -344,12 +344,32 @@ Oracle R2DBC's implementation of Publishers that emit multiple items will
typically defer execution until a Subscriber signals demand, and not support
multiple subscribers.
-### Errors
+### Errors and Warnings
Oracle R2DBC creates R2dbcExceptions having the same ORA-XXXXX error codes
-used by Oracle Database and Oracle JDBC.
+used by Oracle Database and Oracle JDBC. The
+[Database Error Messages](https://docs.oracle.com/en/database/oracle/oracle-database/21/errmg/ORA-00000.html#GUID-27437B7F-F0C3-4F1F-9C6E-6780706FB0F6)
+document provides a reference for all ORA-XXXXX error codes.
-A reference for the ORA-XXXXX error codes can be found
-[here](https://docs.oracle.com/en/database/oracle/oracle-database/21/errmg/ORA-00000.html#GUID-27437B7F-F0C3-4F1F-9C6E-6780706FB0F6)
+Warning messages from Oracle Database are emitted as
+`oracle.r2dbc.OracleR2dbcWarning` segments. These segments may be consumed using
+`Result.flatMap(Function)`:
+```java
+result.flatMap(segment -> {
+ if (segment instanceof OracleR2dbcWarning) {
+ logWarning(((OracleR2dbcWarning)segment).getMessage());
+ return emptyPublisher();
+ }
+ else if (segment instanceof Result.Message){
+ ... handle an error ...
+ }
+ else {
+ ... handle other segment types ...
+ }
+})
+```
+Unlike the errors of standard `Result.Message` segments, if a warning is not
+consumed by `flatMap`, then it will be silently discarded when a `Result` is
+consumed using the `map` or `getRowsUpdated` methods.
### Transactions
Oracle R2DBC uses READ COMMITTED as the default transaction isolation level.
diff --git a/src/main/java/oracle/r2dbc/OracleR2dbcWarning.java b/src/main/java/oracle/r2dbc/OracleR2dbcWarning.java
new file mode 100644
index 0000000..381a230
--- /dev/null
+++ b/src/main/java/oracle/r2dbc/OracleR2dbcWarning.java
@@ -0,0 +1,57 @@
+package oracle.r2dbc;
+
+import io.r2dbc.spi.Result;
+
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+/**
+ *
+ * A subtype of {@link Result.Message} that provides information on warnings
+ * raised by Oracle Database.
+ *
+ * When a SQL command results in a warning, Oracle R2DBC emits a {@link Result}
+ * with an {@code OracleR2dbcWarning} segment in addition to any other segments
+ * that resulted from the SQL command. For example, if a SQL {@code SELECT}
+ * command results in a warning, then an {@code OracleR2dbcWarning} segment is
+ * included with the result, along with any {@link Result.RowSegment}s returned
+ * by the {@code SELECT}.
+ *
+ * R2DBC drivers typically emit {@code onError} signals for {@code Message}
+ * segments that are not consumed by {@link Result#filter(Predicate)} or
+ * {@link Result#flatMap(Function)}. Oracle R2DBC does not apply this behavior
+ * for warning messages. If an {@code OracleR2dbcWarning}
+ * segment is not consumed by the {@code filter} or {@code flatMap} methods of
+ * a {@code Result}, then the warning is discarded and the result may be
+ * consumed as normal with with the {@code map} or {@code getRowsUpdated}
+ * methods.
+ *
+ * Warning messages may be consumed with {@link Result#flatMap(Function)}:
+ *
{@code
+ * result.flatMap(segment -> {
+ * if (segment instanceof OracleR2dbcWarning) {
+ * logWarning(((OracleR2dbcWarning)segment).getMessage());
+ * return emptyPublisher();
+ * }
+ * else {
+ * ... handle other segment types ...
+ * }
+ * })
+ * }
+ * A {@code flatMap} function may also be used to convert a warning into an
+ * {@code onError} signal:
+ *
{@code
+ * result.flatMap(segment -> {
+ * if (segment instanceof OracleR2dbcWarning) {
+ * return errorPublisher(((OracleR2dbcWarning)segment).warning());
+ * }
+ * else {
+ * ... handle other segment types ...
+ * }
+ * })
+ * }
+ * @since 1.1.0
+ */
+public interface OracleR2dbcWarning extends Result.Message {
+
+}
\ No newline at end of file
diff --git a/src/main/java/oracle/r2dbc/impl/OracleR2dbcExceptions.java b/src/main/java/oracle/r2dbc/impl/OracleR2dbcExceptions.java
index 772315a..f638ac9 100755
--- a/src/main/java/oracle/r2dbc/impl/OracleR2dbcExceptions.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleR2dbcExceptions.java
@@ -163,12 +163,26 @@ static void requireOpenConnection(java.sql.Connection jdbcConnection) {
* as the specified {@code sqlException}. Not null.
*/
static R2dbcException toR2dbcException(SQLException sqlException) {
+ return toR2dbcException(sqlException, getSql(sqlException));
+ }
+
+ /**
+ * Converts a {@link SQLException} into an {@link R2dbcException}, as
+ * specified by {@link #toR2dbcException(SQLException)}. This method accepts
+ * a SQL string argument. It should be used in cases where the SQL can not
+ * be extracted by {@link #getSql(SQLException)}.
+ * @param sqlException A {@code SQLException} to convert. Not null.
+ * @param sql SQL that caused the exception
+ * @return an {@code R2dbcException} that indicates the same error conditions
+ * as the specified {@code sqlException}. Not null.
+ */
+ static R2dbcException toR2dbcException(
+ SQLException sqlException, String sql) {
assert sqlException != null : "sqlException is null";
final String message = sqlException.getMessage();
final String sqlState = sqlException.getSQLState();
final int errorCode = sqlException.getErrorCode();
- final String sql = getSql(sqlException);
if (sqlException instanceof SQLNonTransientException) {
if (sqlException instanceof SQLSyntaxErrorException) {
diff --git a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
index 7218ef8..ed6ed47 100644
--- a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
@@ -27,6 +27,7 @@
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
+import oracle.r2dbc.OracleR2dbcWarning;
import oracle.r2dbc.impl.ReadablesMetadata.RowMetadataImpl;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@@ -150,6 +151,8 @@ private Publisher publishSegments(
Flux.from(publishSegments(segment -> {
if (type.isInstance(segment))
return mappingFunction.apply(type.cast(segment));
+ else if (segment instanceof OracleR2dbcWarning)
+ return (U)FILTERED;
else if (segment instanceof Message)
throw ((Message)segment).exception();
else
@@ -398,14 +401,15 @@ static OracleResultImpl createErrorResult(R2dbcException r2dbcException) {
* Creates a {@code Result} that publishes a {@code warning} as a
* {@link Message} segment, followed by any {@code Segment}s of a
* {@code result}.
+ * @param sql The SQL that resulted in a waring. Not null.
* @param warning Warning to publish. Not null.
* @param result Result to publisher. Not null.
* @return A {@code Result} for a {@code Statement} execution that
* completed with a warning.
*/
static OracleResultImpl createWarningResult(
- SQLWarning warning, OracleResultImpl result) {
- return new WarningResult(warning, result);
+ String sql, SQLWarning warning, OracleResultImpl result) {
+ return new WarningResult(sql, warning, result);
}
/**
@@ -627,6 +631,9 @@ Publisher publishSegments(Function mappingFunction) {
*/
private static final class WarningResult extends OracleResultImpl {
+ /** The SQL that resulted in a warning */
+ private final String sql;
+
/** The warning of this result */
private final SQLWarning warning;
@@ -636,11 +643,13 @@ private static final class WarningResult extends OracleResultImpl {
/**
* Constructs a result that publishes a {@code warning} as a
* {@link Message}, and then publishes the segments of a {@code result}.
+ * @param sql The SQL that resulted in a warning
* @param warning Warning to publish. Not null.
* @param result Result of segments to publish after the warning. Not null.
*/
private WarningResult(
- SQLWarning warning, OracleResultImpl result) {
+ String sql, SQLWarning warning, OracleResultImpl result) {
+ this.sql = sql;
this.warning = warning;
this.result = result;
}
@@ -649,8 +658,11 @@ private WarningResult(
Publisher publishSegments(Function mappingFunction) {
return Flux.fromStream(Stream.iterate(
warning, Objects::nonNull, SQLWarning::getNextWarning)
- .map(OracleR2dbcExceptions::toR2dbcException)
- .map(MessageImpl::new))
+ .map(nextWarning ->
+ // It is noted that SQL can not be extracted from Oracle JDBC's
+ // SQLWarning objects, so it must be explicitly provided here.
+ OracleR2dbcExceptions.toR2dbcException(warning, sql))
+ .map(WarningImpl::new))
.map(mappingFunction)
// Invoke publishSegments(Class, Function) rather than
// publishSegments(Function) to update the state of the result; Namely,
@@ -774,7 +786,7 @@ public long value() {
/**
* Implementation of {@link Message}.
*/
- private static final class MessageImpl implements Message {
+ private static class MessageImpl implements Message {
private final R2dbcException exception;
@@ -801,6 +813,24 @@ public String sqlState() {
public String message() {
return exception.getMessage();
}
+
+ @Override
+ public String toString() {
+ return exception.toString();
+ }
+ }
+
+ /**
+ * Implementation of {@link OracleR2dbcWarning}.
+ */
+ private static final class WarningImpl
+ extends MessageImpl
+ implements OracleR2dbcWarning {
+
+ private WarningImpl(R2dbcException exception) {
+ super(exception);
+ }
+
}
/**
diff --git a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
index a80193e..8b76052 100755
--- a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
@@ -1132,7 +1132,7 @@ private OracleResultImpl getWarnings(OracleResultImpl result) {
preparedStatement.clearWarnings();
return warning == null
? result
- : OracleResultImpl.createWarningResult(warning, result);
+ : OracleResultImpl.createWarningResult(sql, warning, result);
});
}
diff --git a/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java
index d2e84dc..e5727cf 100644
--- a/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java
+++ b/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java
@@ -22,12 +22,15 @@
package oracle.r2dbc.impl;
import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Result.Message;
import io.r2dbc.spi.Result.RowSegment;
import io.r2dbc.spi.Result.UpdateCount;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
+import io.r2dbc.spi.Statement;
+import oracle.r2dbc.OracleR2dbcWarning;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@@ -51,10 +54,13 @@
import static oracle.r2dbc.util.Awaits.awaitMany;
import static oracle.r2dbc.util.Awaits.awaitNone;
import static oracle.r2dbc.util.Awaits.awaitOne;
+import static oracle.r2dbc.util.Awaits.awaitUpdate;
import static oracle.r2dbc.util.Awaits.consumeOne;
import static oracle.r2dbc.util.Awaits.tryAwaitExecution;
import static oracle.r2dbc.util.Awaits.tryAwaitNone;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -624,4 +630,122 @@ else if (index == 1) {
}
}
+ /**
+ * Verifies that a warnings are emitted as
+ * {@link oracle.r2dbc.OracleR2dbcWarning} segments.
+ */
+ @Test
+ public void testOracleR2dbcWarning() {
+ Connection connection = awaitOne(sharedConnection());
+ try {
+
+ // Expect a warning for forcing a view that references a non-existent
+ // table
+ String sql = "CREATE OR REPLACE FORCE VIEW testOracleR2dbcWarning AS" +
+ " SELECT x FROM thisdoesnotexist";
+ Statement warningStatement = connection.createStatement(sql);
+
+ // Collect the segments
+ List segments =
+ awaitMany(Flux.from(warningStatement.execute())
+ .flatMap(result -> result.flatMap(Mono::just)));
+
+ // Expect the warning segment first. Expect it to have the fixed message
+ // and error number used by Oracle JDBC for all warnings.
+ Result.Segment secondSegment = segments.get(0);
+ OracleR2dbcWarning warning =
+ assertInstanceOf(OracleR2dbcWarning.class, secondSegment);
+ assertEquals(
+ warning.message(), "Warning: execution completed with warning");
+ assertEquals(warning.errorCode(), 17110);
+ assertEquals("99999", warning.sqlState()); // Default SQL state
+ R2dbcException exception =
+ assertInstanceOf(R2dbcException.class, warning.exception());
+ assertEquals(warning.message(), exception.getMessage());
+ assertEquals(warning.errorCode(), exception.getErrorCode());
+ assertEquals(warning.sqlState(), exception.getSqlState());
+ assertEquals(sql, exception.getSql());
+
+
+ // Expect the update count segment last. Warnings are always emitted
+ // first.
+ Result.Segment firstSegment = segments.get(1);
+ assertEquals(0,
+ assertInstanceOf(UpdateCount.class, firstSegment).value());
+ assertFalse(firstSegment instanceof OracleR2dbcWarning);
+
+ // Verify that there are not any more segments
+ assertEquals(2, segments.size());
+ }
+ finally {
+ tryAwaitExecution(
+ connection.createStatement("DROP VIEW testOracleR2dbcWarning"));
+ tryAwaitNone(connection.close());
+ }
+ }
+
+ /**
+ * Verifies that a warnings are not emitted as onError signals
+ */
+ @Test
+ public void testOracleR2dbcWarningIgnored() {
+ Connection connection = awaitOne(sharedConnection());
+ try {
+
+ // Expect a warning for forcing a view that references a non-existent
+ // table
+ String sql =
+ "CREATE OR REPLACE FORCE VIEW testOracleR2dbcWarningIgnored AS" +
+ " SELECT x FROM thisdoesnotexist";
+ Statement warningStatement = connection.createStatement(sql);
+
+ // Verify that an update count of 0 is returned.
+ awaitUpdate(0, warningStatement);
+
+ // Verify that no rows are returned
+ awaitNone(
+ awaitOne(warningStatement.execute())
+ .map(row -> "UNEXPECTED ROW"));
+
+ // Verify that no rows are returned
+ awaitNone(
+ awaitOne(warningStatement.execute())
+ .map((row, metadata) -> "UNEXPECTED ROW WITH METADATA"));
+ }
+ finally {
+ tryAwaitExecution(
+ connection.createStatement("DROP VIEW testOracleR2dbcWarningIgnored"));
+ tryAwaitNone(connection.close());
+ }
+ }
+
+ /**
+ * Verifies that {@link Result#flatMap(Function)} may be used to convert
+ * warnings into onError signals
+ */
+ @Test
+ public void testOracleR2dbcWarningNotIgnored() {
+ Connection connection = awaitOne(sharedConnection());
+ try {
+
+ // Expect a warning for forcing a view that references a non-existent
+ // table
+ String sql =
+ "CREATE OR REPLACE FORCE VIEW testOracleR2dbcWarningIgnored AS" +
+ " SELECT x FROM thisdoesnotexist";
+ Statement warningStatement = connection.createStatement(sql);
+ awaitError(
+ R2dbcException.class,
+ awaitOne(warningStatement.execute())
+ .flatMap(segment ->
+ Mono.error(
+ assertInstanceOf(OracleR2dbcWarning.class, segment).exception())));
+ }
+ finally {
+ tryAwaitExecution(
+ connection.createStatement("DROP VIEW testOracleR2dbcWarningIgnored"));
+ tryAwaitNone(connection.close());
+ }
+ }
+
}
diff --git a/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java
index 09723b8..509cbb4 100644
--- a/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java
+++ b/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java
@@ -35,6 +35,7 @@
import io.r2dbc.spi.Statement;
import io.r2dbc.spi.Type;
import oracle.r2dbc.OracleR2dbcOptions;
+import oracle.r2dbc.OracleR2dbcWarning;
import oracle.r2dbc.test.DatabaseConfig;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
@@ -1997,7 +1998,7 @@ public void testOutAndImplicitResult() {
/**
* Verifies that {@link OracleStatementImpl#execute()} emits a {@link Result}
- * with a {@link Message} segment when the execution results in a
+ * with a {@link OracleR2dbcWarning} segment when the execution results in a
* warning.
*/
@Test
@@ -2007,9 +2008,10 @@ public void testWarningMessage() {
try {
// Create a procedure using invalid syntax and expect the Result to
- // have a Message with an R2dbcException having a SQLWarning as it's
- // initial cause. Expect the Result to have an update count of zero as
- // well, indicating that the statement completed after the warning.
+ // have an OracleR2dbcWarning with an R2dbcException having a SQLWarning
+ // as it's initial cause. Expect the Result to have an update count of
+ // zero as well, indicating that the statement completed after the
+ // warning.
AtomicInteger segmentCount = new AtomicInteger(0);
R2dbcException r2dbcException =
awaitOne(Flux.from(connection.createStatement(
@@ -2020,9 +2022,9 @@ public void testWarningMessage() {
result.flatMap(segment -> {
int index = segmentCount.getAndIncrement();
if (index == 0) {
- assertTrue(segment instanceof Message,
+ assertTrue(segment instanceof OracleR2dbcWarning,
"Unexpected Segment: " + segment);
- return Mono.just(((Message)segment).exception());
+ return Mono.just(((OracleR2dbcWarning)segment).exception());
}
else if (index == 1) {
assertTrue(segment instanceof UpdateCount,