From 97df0045aa3c0b941ecf75883c36d6517852c30e Mon Sep 17 00:00:00 2001 From: shamonhashmi Date: Tue, 16 Jan 2024 00:26:07 +0530 Subject: [PATCH 1/7] Implemented `Connection#setStatementTimeout` Motivation: Previously, the `Connection#setStatementTimeout` method did not perform any operation (NO-OP). Modification: Successfully implemented the functionality for `Connection#setStatementTimeout`. Result: The `Connection#setStatementTimeout` method is now fully operational and functional. Resolves #193 Co-authored-by: jchrys <45776091+jchrys@users.noreply.github.com> --- .../asyncer/r2dbc/mysql/MySqlConnection.java | 28 +++++++++++++++++-- .../mysql/message/server/ErrorMessage.java | 4 +++ .../r2dbc/mysql/IntegrationTestSupport.java | 22 +++++++++++++++ .../mysql/QueryIntegrationTestSupport.java | 12 +++++++- 4 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index 8a8f51bd8..10fbd8f87 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -33,6 +33,7 @@ import io.r2dbc.spi.Connection; import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.Lifecycle; +import io.r2dbc.spi.R2dbcNonTransientResourceException; import io.r2dbc.spi.TransactionDefinition; import io.r2dbc.spi.ValidationDepth; import org.jetbrains.annotations.Nullable; @@ -74,6 +75,10 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS private static final ServerVersion MYSQL_8 = ServerVersion.create(8, 0, 0); + private static final ServerVersion MYSQL_5_7_4 = ServerVersion.create(5, 7, 4); + + private static final ServerVersion MARIA_10_2_0 = ServerVersion.create(10, 2, 0, true); + private static final BiConsumer> PING = (message, sink) -> { if (message instanceof ErrorMessage) { ErrorMessage msg = (ErrorMessage) message; @@ -410,9 +415,28 @@ public Mono setLockWaitTimeout(Duration timeout) { @Override public Mono setStatementTimeout(Duration timeout) { requireNonNull(timeout, "timeout must not be null"); + final boolean isMariaDb = context.getCapability().isMariaDb(); + final ServerVersion serverVersion = context.getServerVersion(); + final long timeoutMs = timeout.toMillis(); + final String sql = isMariaDb ? "SET max_statement_time=" + timeoutMs / 1000.0 + : "SET SESSION MAX_EXECUTION_TIME=" + timeoutMs; + + // mariadb: https://mariadb.com/kb/en/aborting-statements/ + // mysql: https://dev.mysql.com/blog-archive/server-side-select-statement-timeouts/ + // ref: https://github.com/mariadb-corporation/mariadb-connector-r2dbc + if (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_2_0) + || !isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_5_7_4)) { + return QueryFlow.executeVoid(client, sql); + } - // TODO: implement me - return Mono.empty(); + return Mono.error( + new R2dbcNonTransientResourceException( + "Statement timeout is not supported by server version " + serverVersion, + "HY000", + -1, + sql + ) + ); } boolean isSessionAutoCommit() { diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java index 9dee369ee..a7ef07fb8 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java @@ -74,6 +74,8 @@ public R2dbcException toException() { } public R2dbcException toException(@Nullable String sql) { + // mysql: https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html + // mariadb: https://mariadb.com/kb/en/mariadb-error-code-reference/ // Should keep looking more error codes switch (code) { case 1044: // Database access denied @@ -93,6 +95,8 @@ public R2dbcException toException(@Nullable String sql) { return new R2dbcTransientResourceException(message, sqlState, code); case 1205: // Wait lock timeout case 1907: // Statement executing timeout + case 3024: // Query execution was interrupted, maximum statement execution time exceeded + case 1969: // Query execution was interrupted return new R2dbcTimeoutException(message, sqlState, code); case 1613: // Transaction rollback because of took too long return new R2dbcRollbackException(message, sqlState, code); diff --git a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java index c8d887f64..97963231d 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java @@ -17,6 +17,7 @@ package io.asyncer.r2dbc.mysql; import io.r2dbc.spi.R2dbcBadGrammarException; +import io.r2dbc.spi.R2dbcTimeoutException; import io.r2dbc.spi.Result; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; @@ -50,6 +51,10 @@ void badGrammar(Function> runner) { process(runner).verifyError(R2dbcBadGrammarException.class); } + void timeout(Function> runner) { + process(runner).verifyError(R2dbcTimeoutException.class); + } + void illegalArgument(Function> runner) { process(runner).expectError(IllegalArgumentException.class).verify(Duration.ofSeconds(3)); } @@ -148,4 +153,21 @@ static boolean envIsMariaDb10_5_1() { return ver.isGreaterThanOrEqualTo(ServerVersion.create(10, 5, 1)); } + + boolean envIsLessThanMySql574OrMariaDb102() { + String version = System.getProperty("test.mysql.version"); + + if (version == null || version.isEmpty()) { + return true; + } + + ServerVersion ver = ServerVersion.parse(version); + String type = System.getProperty("test.db.type"); + + if ("mariadb".equalsIgnoreCase(type)) { + return ver.isLessThan(ServerVersion.create(10, 2, 0)); + } + + return ver.isLessThan(ServerVersion.create(5, 7, 4)); + } } diff --git a/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java index 09ed4ac8e..424574546 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java @@ -17,7 +17,6 @@ package io.asyncer.r2dbc.mysql; import com.fasterxml.jackson.core.type.TypeReference; -import io.netty.util.ReferenceCountUtil; import io.r2dbc.spi.Connection; import io.r2dbc.spi.Result; import org.jetbrains.annotations.Nullable; @@ -611,6 +610,17 @@ void testUnionQueryWithJsonColumnDecodedAsString() { ); } + @Test + @DisabledIf("envIsLessThanMySql574OrMariaDb102") + void setStatementTimeoutTest() { + final String sql = "SELECT 1 WHERE SLEEP(1) > 1"; + timeout(connection -> connection.setStatementTimeout(Duration.ofMillis(500)) + .then(Mono.from(connection.createStatement(sql).execute())) + .flatMapMany(result -> Mono.from(result.map((row, metadata) -> row.get(0, String.class)))) + .collectList() + ); + } + private static JsonNode parseJson(String json) { ObjectMapper mapper = new ObjectMapper(); try { From 8aea49fef4801aa8a36064c7645e42206b299867 Mon Sep 17 00:00:00 2001 From: jchrys Date: Mon, 22 Jan 2024 12:53:07 +0900 Subject: [PATCH 2/7] Update src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java Co-authored-by: Mirro Mutth Signed-off-by: jchrys --- src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index 10fbd8f87..6f64ddeb9 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -77,7 +77,7 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS private static final ServerVersion MYSQL_5_7_4 = ServerVersion.create(5, 7, 4); - private static final ServerVersion MARIA_10_2_0 = ServerVersion.create(10, 2, 0, true); + private static final ServerVersion MARIA_10_1_1 = ServerVersion.create(10, 1, 1, true); private static final BiConsumer> PING = (message, sink) -> { if (message instanceof ErrorMessage) { From c8138cfe39a37eac72251e909883a9ec7f7915fe Mon Sep 17 00:00:00 2001 From: jchrys Date: Mon, 22 Jan 2024 12:53:30 +0900 Subject: [PATCH 3/7] Update src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java Co-authored-by: Mirro Mutth Signed-off-by: jchrys --- src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index 6f64ddeb9..7cf53c40d 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -415,7 +415,7 @@ public Mono setLockWaitTimeout(Duration timeout) { @Override public Mono setStatementTimeout(Duration timeout) { requireNonNull(timeout, "timeout must not be null"); - final boolean isMariaDb = context.getCapability().isMariaDb(); + final boolean isMariaDb = context.isMariaDb(); final ServerVersion serverVersion = context.getServerVersion(); final long timeoutMs = timeout.toMillis(); final String sql = isMariaDb ? "SET max_statement_time=" + timeoutMs / 1000.0 From 8bff5c12219f6d17f9c5d42940285b203b14f81a Mon Sep 17 00:00:00 2001 From: jchrys Date: Mon, 22 Jan 2024 12:53:39 +0900 Subject: [PATCH 4/7] Update src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java Co-authored-by: Mirro Mutth Signed-off-by: jchrys --- .../java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java index 97963231d..365caa58e 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java @@ -165,7 +165,7 @@ boolean envIsLessThanMySql574OrMariaDb102() { String type = System.getProperty("test.db.type"); if ("mariadb".equalsIgnoreCase(type)) { - return ver.isLessThan(ServerVersion.create(10, 2, 0)); + return ver.isLessThan(ServerVersion.create(10, 1, 1)); } return ver.isLessThan(ServerVersion.create(5, 7, 4)); From d6517ea171dfd2d94aa2f6e7a03793b352a6d44e Mon Sep 17 00:00:00 2001 From: jchrys Date: Mon, 22 Jan 2024 12:54:53 +0900 Subject: [PATCH 5/7] Update src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java Signed-off-by: jchrys --- .../java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java index 365caa58e..48e1ec67b 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java @@ -154,7 +154,7 @@ static boolean envIsMariaDb10_5_1() { return ver.isGreaterThanOrEqualTo(ServerVersion.create(10, 5, 1)); } - boolean envIsLessThanMySql574OrMariaDb102() { + boolean envIsLessThanMySql574OrMariaDb1011() { String version = System.getProperty("test.mysql.version"); if (version == null || version.isEmpty()) { From 3a5faf708e60df7acd73ffee8f66f6fe5d51fb05 Mon Sep 17 00:00:00 2001 From: jchrys Date: Mon, 22 Jan 2024 12:54:58 +0900 Subject: [PATCH 6/7] Update src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java Signed-off-by: jchrys --- .../io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java index 424574546..ff44d5e35 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java @@ -611,7 +611,7 @@ void testUnionQueryWithJsonColumnDecodedAsString() { } @Test - @DisabledIf("envIsLessThanMySql574OrMariaDb102") + @DisabledIf("envIsLessThanMySql574OrMariaDb1011") void setStatementTimeoutTest() { final String sql = "SELECT 1 WHERE SLEEP(1) > 1"; timeout(connection -> connection.setStatementTimeout(Duration.ofMillis(500)) From 5f5f10accb27f83a67d01a7edea0363af62cea39 Mon Sep 17 00:00:00 2001 From: jchrys Date: Mon, 22 Jan 2024 12:55:07 +0900 Subject: [PATCH 7/7] Update src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java Co-authored-by: Mirro Mutth Signed-off-by: jchrys --- src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index 7cf53c40d..371ebb4b0 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -424,7 +424,7 @@ public Mono setStatementTimeout(Duration timeout) { // mariadb: https://mariadb.com/kb/en/aborting-statements/ // mysql: https://dev.mysql.com/blog-archive/server-side-select-statement-timeouts/ // ref: https://github.com/mariadb-corporation/mariadb-connector-r2dbc - if (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_2_0) + if (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_1_1) || !isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_5_7_4)) { return QueryFlow.executeVoid(client, sql); }