diff --git a/README.md b/README.md index 984c562ee..d719ef864 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,7 @@ ConnectionFactoryOptions options = ConnectionFactoryOptions.builder() .option(Option.valueOf("sslContextBuilderCustomizer"), "com.example.demo.MyCustomizer") // optional, default is no-op customizer .option(Option.valueOf("zeroDate"), "use_null") // optional, default "use_null" .option(Option.valueOf("useServerPrepareStatement"), true) // optional, default false + .option(Option.valueOf("allowLoadLocalInfileInPath"), "/opt") // optional, default null, null means LOCAL INFILE not be allowed .option(Option.valueOf("tcpKeepAlive"), true) // optional, default false .option(Option.valueOf("tcpNoDelay"), true) // optional, default false .option(Option.valueOf("autodetectExtensions"), false) // optional, default false @@ -189,6 +190,7 @@ MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builde .sslContextBuilderCustomizer(MyCustomizer.INSTANCE) // optional, default is no-op customizer .zeroDateOption(ZeroDateOption.USE_NULL) // optional, default ZeroDateOption.USE_NULL .useServerPrepareStatement() // Use server-preparing statements, default use client-preparing statements + .allowLoadLocalInfileInPath("/opt") // optional, default null, null means LOCAL INFILE not be allowed .tcpKeepAlive(true) // optional, controls TCP Keep Alive, default is false .tcpNoDelay(true) // optional, controls TCP No Delay, default is false .autodetectExtensions(false) // optional, controls extension auto-detect, default is true @@ -242,6 +244,7 @@ Mono connectionMono = Mono.from(connectionFactory.create()); | zeroDateOption | Any value of `ZeroDateOption` | Optional, default `USE_NULL` | The option indicates "zero date" handling, see following notice | | autodetectExtensions | `true` or `false` | Optional, default is `true` | Controls auto-detect `Extension`s | | useServerPrepareStatement | `true`, `false` or `Predicate` | Optional, default is `false` | See following notice | +| allowLoadLocalInfileInPath | A path | Optional, default is `null` | The path that allows `LOAD DATA LOCAL INFILE` to load file data | | passwordPublisher | A `Publisher` | Optional, default is `null` | The password publisher, see following notice | - `SslMode` Considers security level and verification for SSL, make sure the database server supports SSL before you want change SSL mode to `REQUIRED` or higher. **The Unix Domain Socket only offers "DISABLED" available** @@ -584,6 +587,7 @@ If you want to raise an issue, please follow the recommendations below: - The MySQL may be not support well for searching rows by a binary field, like `BIT` and `JSON` - `BIT`: cannot select 'BIT(64)' with value greater than 'Long.MAX_VALUE' (or equivalent in binary) - `JSON`: different MySQL may have different serialization formats, e.g. MariaDB and MySQL +- MySQL 8.0+ disables `@@global.local_infile` by default, make sure `@@local_infile` is `ON` before enable `allowLoadLocalInfileInPath` of the driver. e.g. run `SET GLOBAL local_infile=ON`, or set it in `mysql.cnf`. ## License diff --git a/src/main/java/io/asyncer/r2dbc/mysql/Capability.java b/src/main/java/io/asyncer/r2dbc/mysql/Capability.java index 44ce5df47..133d52e48 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/Capability.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/Capability.java @@ -361,7 +361,7 @@ void disableCompression() { this.bitmap &= ~COMPRESS; } - void disableLoadDataInfile() { + void disableLoadDataLocalInfile() { this.bitmap &= ~LOCAL_FILES; } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java b/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java index 51c0930b8..3039ea19b 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java @@ -22,6 +22,7 @@ import io.asyncer.r2dbc.mysql.constant.ZeroDateOption; import org.jetbrains.annotations.Nullable; +import java.nio.file.Path; import java.time.ZoneId; import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull; @@ -42,6 +43,11 @@ public final class ConnectionContext implements CodecContext { private final ZeroDateOption zeroDateOption; + @Nullable + private final Path localInfilePath; + + private final int localInfileBufferSize; + @Nullable private ZoneId serverZoneId; @@ -53,8 +59,11 @@ public final class ConnectionContext implements CodecContext { private volatile Capability capability = null; - ConnectionContext(ZeroDateOption zeroDateOption, @Nullable ZoneId serverZoneId) { + ConnectionContext(ZeroDateOption zeroDateOption, @Nullable Path localInfilePath, + int localInfileBufferSize, @Nullable ZoneId serverZoneId) { this.zeroDateOption = requireNonNull(zeroDateOption, "zeroDateOption must not be null"); + this.localInfilePath = localInfilePath; + this.localInfileBufferSize = localInfileBufferSize; this.serverZoneId = serverZoneId; } @@ -119,6 +128,25 @@ public ZeroDateOption getZeroDateOption() { return zeroDateOption; } + /** + * Gets the allowed local infile path. + * + * @return the path. + */ + @Nullable + public Path getLocalInfilePath() { + return localInfilePath; + } + + /** + * Gets the local infile buffer size. + * + * @return the buffer size. + */ + public int getLocalInfileBufferSize() { + return localInfileBufferSize; + } + /** * Get the bitmap of server statuses. * diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java index 8b8807b54..565d7d462 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java @@ -25,6 +25,8 @@ import javax.net.ssl.HostnameVerifier; import java.net.Socket; +import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import java.time.ZoneId; import java.util.ArrayList; @@ -89,6 +91,11 @@ public final class MySqlConnectionConfiguration { @Nullable private final Predicate preferPrepareStatement; + @Nullable + private final Path loadLocalInfilePath; + + private final int localInfileBufferSize; + private final int queryCacheSize; private final int prepareCacheSize; @@ -104,6 +111,7 @@ private MySqlConnectionConfiguration( @Nullable Duration socketTimeout, ZeroDateOption zeroDateOption, @Nullable ZoneId serverZoneId, String user, @Nullable CharSequence password, @Nullable String database, boolean createDatabaseIfNotExist, @Nullable Predicate preferPrepareStatement, + @Nullable Path loadLocalInfilePath, int localInfileBufferSize, int queryCacheSize, int prepareCacheSize, Extensions extensions, @Nullable Publisher passwordPublisher ) { @@ -122,6 +130,8 @@ private MySqlConnectionConfiguration( this.database = database == null || database.isEmpty() ? "" : database; this.createDatabaseIfNotExist = createDatabaseIfNotExist; this.preferPrepareStatement = preferPrepareStatement; + this.loadLocalInfilePath = loadLocalInfilePath; + this.localInfileBufferSize = localInfileBufferSize; this.queryCacheSize = queryCacheSize; this.prepareCacheSize = prepareCacheSize; this.extensions = extensions; @@ -207,6 +217,15 @@ Predicate getPreferPrepareStatement() { return preferPrepareStatement; } + @Nullable + Path getLoadLocalInfilePath() { + return loadLocalInfilePath; + } + + int getLocalInfileBufferSize() { + return localInfileBufferSize; + } + int getQueryCacheSize() { return queryCacheSize; } @@ -248,6 +267,8 @@ public boolean equals(Object o) { database.equals(that.database) && createDatabaseIfNotExist == that.createDatabaseIfNotExist && Objects.equals(preferPrepareStatement, that.preferPrepareStatement) && + Objects.equals(loadLocalInfilePath, that.loadLocalInfilePath) && + localInfileBufferSize == that.localInfileBufferSize && queryCacheSize == that.queryCacheSize && prepareCacheSize == that.prepareCacheSize && extensions.equals(that.extensions) && @@ -258,7 +279,8 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout, socketTimeout, serverZoneId, zeroDateOption, user, password, database, createDatabaseIfNotExist, - preferPrepareStatement, queryCacheSize, prepareCacheSize, extensions, passwordPublisher); + preferPrepareStatement, loadLocalInfilePath, localInfileBufferSize, queryCacheSize, + prepareCacheSize, extensions, passwordPublisher); } @Override @@ -269,16 +291,21 @@ public String toString() { connectTimeout + ", socketTimeout=" + socketTimeout + ", serverZoneId=" + serverZoneId + ", zeroDateOption=" + zeroDateOption + ", user='" + user + "', password=" + password + ", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist + - ", preferPrepareStatement=" + preferPrepareStatement + ", queryCacheSize=" + queryCacheSize + - ", prepareCacheSize=" + prepareCacheSize + ", extensions=" + extensions + - ", passwordPublisher=" + passwordPublisher + '}'; + ", preferPrepareStatement=" + preferPrepareStatement + + ", loadLocalInfilePath=" + loadLocalInfilePath + + ", localInfileBufferSize=" + localInfileBufferSize + + ", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize + + ", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}'; } return "MySqlConnectionConfiguration{unixSocket='" + domain + "', connectTimeout=" + connectTimeout + ", socketTimeout=" + socketTimeout + ", serverZoneId=" + serverZoneId + ", zeroDateOption=" + zeroDateOption + ", user='" + user + "', password=" + password + ", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist + - ", preferPrepareStatement=" + preferPrepareStatement + ", queryCacheSize=" + queryCacheSize + + ", preferPrepareStatement=" + preferPrepareStatement + + ", loadLocalInfilePath=" + loadLocalInfilePath + + ", localInfileBufferSize=" + localInfileBufferSize + + ", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize + ", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}'; } @@ -345,6 +372,11 @@ public static final class Builder { @Nullable private Predicate preferPrepareStatement; + @Nullable + private Path loadLocalInfilePath; + + private int localInfileBufferSize = 8192; + private int queryCacheSize = 0; private int prepareCacheSize = 256; @@ -379,7 +411,8 @@ public MySqlConnectionConfiguration build() { sslCa, sslKey, sslKeyPassword, sslCert, sslContextBuilderCustomizer); return new MySqlConnectionConfiguration(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout, socketTimeout, zeroDateOption, serverZoneId, user, password, database, - createDatabaseIfNotExist, preferPrepareStatement, queryCacheSize, prepareCacheSize, + createDatabaseIfNotExist, preferPrepareStatement, loadLocalInfilePath, + localInfileBufferSize, queryCacheSize, prepareCacheSize, Extensions.from(extensions, autodetectExtensions), passwordPublisher); } @@ -754,6 +787,38 @@ public Builder useServerPrepareStatement(Predicate preferPrepareStatemen return this; } + /** + * Configures to allow the {@code LOAD DATA LOCAL INFILE} statement in the given {@code path} or + * disallow the statement. Default to {@code null} which means not allow the statement. + * + * @param path which parent path are allowed to load file data, {@code null} means not be allowed. + * @return {@link Builder this}. + * @throws java.nio.file.InvalidPathException if the string cannot be converted to a {@link Path}. + * @since 1.1.0 + */ + public Builder allowLoadLocalInfileInPath(@Nullable String path) { + this.loadLocalInfilePath = path == null ? null : Paths.get(path); + + return this; + } + + /** + * Configures the buffer size for {@code LOAD DATA LOCAL INFILE} statement. Default to {@code 8192}. + *

+ * It is used only if {@link #allowLoadLocalInfileInPath(String)} is set. + * + * @param localInfileBufferSize the buffer size. + * @return {@link Builder this}. + * @throws IllegalArgumentException if {@code localInfileBufferSize} is not positive. + * @since 1.1.0 + */ + public Builder localInfileBufferSize(int localInfileBufferSize) { + require(localInfileBufferSize > 0, "localInfileBufferSize must be positive"); + + this.localInfileBufferSize = localInfileBufferSize; + return this; + } + /** * Configures the maximum size of the {@link Query} parsing cache. Usually it should be power of two. * Default to {@code 0}. Driver will use unbounded cache if size is less than {@code 0}. @@ -823,6 +888,7 @@ public Builder extendWith(Extension extension) { /** * Registers a password publisher function. + * * @param passwordPublisher function to retrieve password before making connection. * @return this {@link Builder}. */ diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java index 09b019094..29a2c7f98 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java @@ -90,8 +90,12 @@ public static MySqlConnectionFactory from(MySqlConnectionConfiguration configura String user = configuration.getUser(); CharSequence password = configuration.getPassword(); SslMode sslMode = ssl.getSslMode(); - ConnectionContext context = new ConnectionContext(configuration.getZeroDateOption(), - configuration.getServerZoneId()); + ConnectionContext context = new ConnectionContext( + configuration.getZeroDateOption(), + configuration.getLoadLocalInfilePath(), + configuration.getLocalInfileBufferSize(), + configuration.getServerZoneId() + ); Extensions extensions = configuration.getExtensions(); Predicate prepare = configuration.getPreferPrepareStatement(); int prepareCacheSize = configuration.getPrepareCacheSize(); diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java index c56fc74b0..1b001d9ba 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java @@ -191,6 +191,9 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr public static final Option USE_SERVER_PREPARE_STATEMENT = Option.valueOf("useServerPrepareStatement"); + public static final Option ALLOW_LOAD_LOCAL_INFILE_IN_PATH = + Option.valueOf("allowLoadLocalInfileInPath"); + /** * Option to set the maximum size of the {@link Query} parsing cache. Default to {@code 256}. * @@ -266,6 +269,8 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) { .to(builder::zeroDateOption); mapper.optional(USE_SERVER_PREPARE_STATEMENT).prepare(builder::useClientPrepareStatement, builder::useServerPrepareStatement, builder::useServerPrepareStatement); + mapper.optional(ALLOW_LOAD_LOCAL_INFILE_IN_PATH).asString() + .to(builder::allowLoadLocalInfileInPath); mapper.optional(QUERY_CACHE_SIZE).asInt() .to(builder::queryCacheSize); mapper.optional(PREPARE_CACHE_SIZE).asInt() diff --git a/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java b/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java index 2cc7f1142..26721b911 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java @@ -22,12 +22,12 @@ import io.asyncer.r2dbc.mysql.client.FluxExchangeable; import io.asyncer.r2dbc.mysql.constant.ServerStatuses; import io.asyncer.r2dbc.mysql.constant.SslMode; -import io.asyncer.r2dbc.mysql.internal.util.InternalArrays; import io.asyncer.r2dbc.mysql.internal.util.StringUtils; import io.asyncer.r2dbc.mysql.message.client.AuthResponse; import io.asyncer.r2dbc.mysql.message.client.ClientMessage; import io.asyncer.r2dbc.mysql.message.client.HandshakeResponse; -import io.asyncer.r2dbc.mysql.message.client.LoginClientMessage; +import io.asyncer.r2dbc.mysql.message.client.LocalInfileResponse; +import io.asyncer.r2dbc.mysql.message.client.SubsequenceClientMessage; import io.asyncer.r2dbc.mysql.message.client.PingMessage; import io.asyncer.r2dbc.mysql.message.client.PrepareQueryMessage; import io.asyncer.r2dbc.mysql.message.client.PreparedCloseMessage; @@ -44,6 +44,7 @@ import io.asyncer.r2dbc.mysql.message.server.ErrorMessage; import io.asyncer.r2dbc.mysql.message.server.HandshakeHeader; import io.asyncer.r2dbc.mysql.message.server.HandshakeRequest; +import io.asyncer.r2dbc.mysql.message.server.LocalInfileRequest; import io.asyncer.r2dbc.mysql.message.server.OkMessage; import io.asyncer.r2dbc.mysql.message.server.PreparedOkMessage; import io.asyncer.r2dbc.mysql.message.server.ServerMessage; @@ -74,6 +75,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Predicate; @@ -209,36 +211,26 @@ static Mono login(Client client, SslMode sslMode, String database, Strin * terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. The {@link ErrorMessage} * will emit an exception. The exchange will be completed by {@link CompleteMessage} after receive the * last result for the last binding. + *

+ * Note: this method does not support {@code LOCAL INFILE} due to it should be used for excepted queries. * * @param client the {@link Client} to exchange messages with. * @param sql the query to execute, can be contains multi-statements. * @return receives complete signal. */ static Mono executeVoid(Client client, String sql) { - return Mono.defer(() -> execute0(client, sql).doOnNext(EXECUTE_VOID).then()); - } + return Mono.defer(() -> client.exchange(new TextQueryMessage(sql), (message, sink) -> { + if (message instanceof ErrorMessage) { + sink.next(((ErrorMessage) message).offendedBy(sql)); + sink.complete(); + } else { + sink.next(message); - /** - * Execute multiple simple queries with one-by-one and return a {@link Mono} for the complete signal or - * error. Query execution terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. The - * {@link ErrorMessage} will emit an exception and cancel subsequent statements execution. The exchange - * will be completed by {@link CompleteMessage} after receive the last result for the last binding. - * - * @param client the {@link Client} to exchange messages with. - * @param statements the queries to execute, each element can be contains multi-statements. - * @return receives complete signal. - */ - static Mono executeVoid(Client client, String... statements) { - switch (statements.length) { - case 0: - return Mono.empty(); - case 1: - return executeVoid(client, statements[0]); - default: - return client.exchange(new MultiQueryExchangeable(InternalArrays.asIterator(statements))) - .doOnNext(EXECUTE_VOID) - .then(); - } + if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) { + sink.complete(); + } + } + }).doOnSubscribe(ignored -> QueryLogger.log(sql)).doOnNext(EXECUTE_VOID).then()); } /** @@ -303,18 +295,7 @@ static Mono createSavepoint(Client client, ConnectionState state, String n * @return the messages received in response to this exchange. */ private static Flux execute0(Client client, String sql) { - return client.exchange(new TextQueryMessage(sql), (message, sink) -> { - if (message instanceof ErrorMessage) { - sink.next(((ErrorMessage) message).offendedBy(sql)); - sink.complete(); - } else { - sink.next(message); - - if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) { - sink.complete(); - } - } - }).doOnSubscribe(ignored -> QueryLogger.log(sql)); + return client.exchange(new SimpleQueryExchangeable(sql)); } private QueryFlow() { } @@ -339,6 +320,16 @@ public final void accept(ServerMessage message, SynchronousSink s if (message instanceof ErrorMessage) { sink.next(((ErrorMessage) message).offendedBy(offendingSql())); sink.complete(); + } else if (message instanceof LocalInfileRequest) { + LocalInfileRequest request = (LocalInfileRequest) message; + String path = request.getPath(); + + QueryLogger.logLocalInfile(path); + + requests.emitNext( + new LocalInfileResponse(request.getEnvelopeId() + 1, path, sink), + Sinks.EmitFailureHandler.FAIL_FAST + ); } else { sink.next(message); @@ -353,6 +344,59 @@ public final void accept(ServerMessage message, SynchronousSink s abstract protected String offendingSql(); } +final class SimpleQueryExchangeable extends BaseFluxExchangeable { + + private static final int INIT = 0; + + private static final int EXECUTE = 1; + + private static final int DISPOSE = 2; + + private final AtomicInteger state = new AtomicInteger(INIT); + + private final String sql; + + SimpleQueryExchangeable(String sql) { + this.sql = sql; + } + + @Override + public void dispose() { + if (state.getAndSet(DISPOSE) != DISPOSE) { + requests.tryEmitComplete(); + } + } + + @Override + public boolean isDisposed() { + return state.get() == DISPOSE; + } + + @Override + protected void tryNextOrComplete(@Nullable SynchronousSink sink) { + if (state.compareAndSet(INIT, EXECUTE)) { + QueryLogger.log(sql); + + Sinks.EmitResult result = requests.tryEmitNext(new TextQueryMessage(sql)); + + if (result == Sinks.EmitResult.OK) { + return; + } + + QueryFlow.logger.error("Emit request failed due to {}", result); + } + + if (sink != null) { + sink.complete(); + } + } + + @Override + protected String offendingSql() { + return sql; + } +} + /** * An implementation of {@link FluxExchangeable} that considers client-preparing requests. */ @@ -770,8 +814,8 @@ final class LoginExchangeable extends FluxExchangeable { private static final int HANDSHAKE_VERSION = 10; - private final Sinks.Many requests = Sinks.many().unicast() - .onBackpressureBuffer(Queues.one().get()); + private final Sinks.Many requests = Sinks.many().unicast() + .onBackpressureBuffer(Queues.one().get()); private final Client client; @@ -879,7 +923,7 @@ public void dispose() { this.requests.tryEmitComplete(); } - private void emitNext(LoginClientMessage message, SynchronousSink sink) { + private void emitNext(SubsequenceClientMessage message, SynchronousSink sink) { Sinks.EmitResult result = requests.tryEmitNext(message); if (result != Sinks.EmitResult.OK) { @@ -903,8 +947,6 @@ private Capability clientCapability(Capability serverCapability) { builder.disableDatabasePinned(); builder.disableCompression(); - // TODO: support LOAD DATA LOCAL INFILE - builder.disableLoadDataInfile(); builder.disableIgnoreAmbiguitySpace(); builder.disableInteractiveTimeout(); @@ -931,6 +973,10 @@ private Capability clientCapability(Capability serverCapability) { builder.disableConnectWithDatabase(); } + if (context.getLocalInfilePath() == null) { + builder.disableLoadDataLocalInfile(); + } + if (ATTRIBUTES.isEmpty()) { builder.disableConnectAttributes(); } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/QueryLogger.java b/src/main/java/io/asyncer/r2dbc/mysql/QueryLogger.java index cb7d954cd..76facdd50 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/QueryLogger.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/QueryLogger.java @@ -16,7 +16,6 @@ package io.asyncer.r2dbc.mysql; - import io.asyncer.r2dbc.mysql.internal.util.StringUtils; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -47,5 +46,9 @@ static void log(int statementId, MySqlParameter[] values) { logger.debug("Executing prepared statement {} with {}", statementId, values); } + static void logLocalInfile(String path) { + logger.debug("Loading data from: {}", path); + } + private QueryLogger() { } } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/TextParametrizedStatement.java b/src/main/java/io/asyncer/r2dbc/mysql/TextParametrizedStatement.java index 9d2c2c55b..e0fd475c6 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/TextParametrizedStatement.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/TextParametrizedStatement.java @@ -33,7 +33,8 @@ final class TextParametrizedStatement extends ParametrizedStatementSupport { @Override protected Flux execute(List bindings) { - return Flux.defer(() -> QueryFlow.execute(client, query, returningIdentifiers(), bindings)) + return Flux.defer(() -> QueryFlow.execute(client, query, returningIdentifiers(), + bindings)) .map(messages -> MySqlResult.toResult(false, codecs, context, syntheticKeyName(), messages)); } } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/TextSimpleStatement.java b/src/main/java/io/asyncer/r2dbc/mysql/TextSimpleStatement.java index e23873c07..04fd90001 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/TextSimpleStatement.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/TextSimpleStatement.java @@ -34,7 +34,7 @@ final class TextSimpleStatement extends SimpleStatementSupport { public Flux execute() { return Flux.defer(() -> QueryFlow.execute( client, - StringUtils.extendReturning(sql, returningIdentifiers())) - ).map(messages -> MySqlResult.toResult(false, codecs, context, syntheticKeyName(), messages)); + StringUtils.extendReturning(sql, returningIdentifiers()) + ).map(messages -> MySqlResult.toResult(false, codecs, context, syntheticKeyName(), messages))); } } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java b/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java index a467ae2ec..1641e480c 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java @@ -19,7 +19,7 @@ import io.asyncer.r2dbc.mysql.ConnectionContext; import io.asyncer.r2dbc.mysql.internal.util.OperatorUtils; import io.asyncer.r2dbc.mysql.message.client.ClientMessage; -import io.asyncer.r2dbc.mysql.message.client.LoginClientMessage; +import io.asyncer.r2dbc.mysql.message.client.SubsequenceClientMessage; import io.asyncer.r2dbc.mysql.message.client.PrepareQueryMessage; import io.asyncer.r2dbc.mysql.message.client.PreparedFetchMessage; import io.asyncer.r2dbc.mysql.message.client.SslRequest; @@ -86,22 +86,22 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (msg instanceof ClientMessage) { ByteBufAllocator allocator = ctx.alloc(); - Flux encoded; - int envelopeId; - if (msg instanceof LoginClientMessage) { - LoginClientMessage message = (LoginClientMessage) msg; + if (msg instanceof SubsequenceClientMessage) { + SubsequenceClientMessage message = (SubsequenceClientMessage) msg; encoded = Flux.from(message.encode(allocator, this.context)); - envelopeId = message.getEnvelopeId(); + int envelopeId = message.getEnvelopeId(); + + OperatorUtils.envelope(encoded, allocator, envelopeId, false) + .subscribe(new WriteSubscriber(ctx, promise)); } else { encoded = Flux.from(((ClientMessage) msg).encode(allocator, this.context)); - envelopeId = 0; - } - OperatorUtils.cumulateEnvelope(encoded, allocator, envelopeId) - .subscribe(new WriteSubscriber(ctx, promise)); + OperatorUtils.envelope(encoded, allocator, 0, true) + .subscribe(new WriteSubscriber(ctx, promise)); + } if (msg instanceof PrepareQueryMessage) { setDecodeContext(DecodeContext.prepareQuery()); diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java b/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java index e30191e8e..505ebf919 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java @@ -184,7 +184,8 @@ public Flux exchange(FluxExchangeable exchangeable) { .asFlux() .doOnSubscribe(ignored -> exchangeable.subscribe( this::emitNextRequest, - e -> requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST)) + e -> + requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST)) ) .handle(exchangeable) .doOnTerminate(() -> { diff --git a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelope.java b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelope.java similarity index 73% rename from src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelope.java rename to src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelope.java index 9bc085be8..47aa878ed 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelope.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelope.java @@ -32,7 +32,7 @@ * An implementation of {@link Flux}{@code <}{@link ByteBuf}{@code >} that considers cumulate buffers as * envelopes of the MySQL socket protocol. */ -final class FluxCumulateEnvelope extends FluxOperator { +final class FluxEnvelope extends FluxOperator { private final ByteBufAllocator alloc; @@ -40,17 +40,127 @@ final class FluxCumulateEnvelope extends FluxOperator { private final int start; - FluxCumulateEnvelope(Flux source, ByteBufAllocator alloc, int size, int start) { + private final boolean cumulate; + + FluxEnvelope(Flux source, ByteBufAllocator alloc, int size, int start, + boolean cumulate) { super(source); this.alloc = alloc; this.size = size; this.start = start; + this.cumulate = cumulate; } @Override public void subscribe(CoreSubscriber actual) { - this.source.subscribe(new CumulateEnvelopeSubscriber(actual, alloc, size, start)); + if (cumulate) { + this.source.subscribe(new CumulateEnvelopeSubscriber(actual, alloc, size, start)); + } else { + this.source.subscribe(new DirectEnvelopeSubscriber(actual, alloc, start)); + } + } +} + +final class DirectEnvelopeSubscriber implements CoreSubscriber, Scannable, Subscription { + + private final CoreSubscriber actual; + + private final ByteBufAllocator alloc; + + private boolean done; + + private Subscription s; + + private int envelopeId; + + DirectEnvelopeSubscriber(CoreSubscriber actual, ByteBufAllocator alloc, int start) { + this.actual = actual; + this.alloc = alloc; + this.envelopeId = start; + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + this.actual.onSubscribe(this); + } + } + + @Override + public void onNext(ByteBuf buf) { + if (done) { + // Do not release the buffer, it should be handled by OperatorUtils.discardOnCancel() or Context. + Operators.onNextDropped(buf, actual.currentContext()); + return; + } + + try { + ByteBuf header = this.alloc.buffer(Envelopes.PART_HEADER_SIZE) + .writeMediumLE(buf.readableBytes()) + .writeByte(this.envelopeId++); + + this.actual.onNext(header); + this.actual.onNext(buf); + } catch (Throwable e) { + Throwable t = Operators.onNextError(buf, e, this.actual.currentContext(), this.s); + + if (t == null) { + s.request(1); + } else { + onError(t); + } + } + } + + @Override + public void onError(Throwable t) { + if (this.done) { + Operators.onErrorDropped(t, this.actual.currentContext()); + return; + } + + this.done = true; + this.actual.onError(t); + } + + @Override + public void onComplete() { + if (this.done) { + return; + } + + this.done = true; + this.actual.onComplete(); + } + + @Override + public void request(long n) { + this.s.request(n); + } + + @Override + public void cancel() { + this.s.cancel(); + } + + @Override + public Context currentContext() { + return this.actual.currentContext(); + } + + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.PARENT) { + return this.s; + } else if (key == Attr.ACTUAL) { + return this.actual; + } else if (key == Attr.TERMINATED) { + return this.done; + } else { + return null; + } } } @@ -95,7 +205,7 @@ public void onNext(ByteBuf buf) { } if (!buf.isReadable()) { - // Ignore empty buffer, useless for MySQL protocol. + // Ignore empty buffer, useless for cumulated buffers. buf.release(); return; } @@ -207,7 +317,6 @@ public Context currentContext() { } @Override - @SuppressWarnings("rawtypes") public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) { return this.s; diff --git a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java index bd97b440c..9c719988d 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java @@ -18,12 +18,19 @@ import io.asyncer.r2dbc.mysql.message.FieldValue; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import reactor.core.publisher.Flux; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.List; +import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.require; +import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull; /** * An internal utility considers the use of safe release buffers (array or {@link List}). It uses standard @@ -31,6 +38,38 @@ */ public final class NettyBufferUtils { + /** + * Reads all bytes from a file asynchronously. + * + * @param path The path of the file want to be read. + * @param allocator The {@link ByteBufAllocator} used to allocate {@link ByteBuf}s. + * @param bufferSize The size of the buffer used to read the file. + * @return A {@link Flux} emits {@link ByteBuf}s read from the file. + */ + public static Flux readFile(Path path, ByteBufAllocator allocator, int bufferSize) { + requireNonNull(path, "path must not be null"); + requireNonNull(allocator, "allocator must not be null"); + require(bufferSize > 0, "bufferSize must be positive"); + + return Flux.create(sink -> { + ReadCompletionHandler handler; + + try { + // AsynchronousFileChannel can only be opened in blocking mode :( + @SuppressWarnings("BlockingMethodInNonBlockingContext") + AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ); + + handler = new ReadCompletionHandler(channel, allocator, bufferSize, sink); + } catch (Throwable e) { + sink.error(e); + return; + } + + sink.onCancel(handler::cancel); + sink.onRequest(handler::request); + }).doOnDiscard(ByteBuf.class, ReferenceCountUtil::safeRelease); + } + /** * Combine {@link ByteBuf}s through composite buffer. *

diff --git a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/OperatorUtils.java b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/OperatorUtils.java index 4306ff20f..21375c419 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/OperatorUtils.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/OperatorUtils.java @@ -55,13 +55,13 @@ public static Flux discardOnCancel(Flux source) { return new FluxDiscardOnCancel<>(source); } - public static Flux cumulateEnvelope(Flux source, ByteBufAllocator allocator, - int envelopeIdStart) { + public static Flux envelope(Flux source, ByteBufAllocator allocator, + int envelopeIdStart, boolean cumulate) { requireNonNull(source, "source must not be null"); requireNonNull(allocator, "allocator must not be null"); - return new FluxCumulateEnvelope(source, allocator, Envelopes.MAX_ENVELOPE_SIZE, - envelopeIdStart & 0xFF); + return new FluxEnvelope(source, allocator, Envelopes.MAX_ENVELOPE_SIZE, + envelopeIdStart & 0xFF, cumulate); } private OperatorUtils() { } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/ReadCompletionHandler.java b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/ReadCompletionHandler.java new file mode 100644 index 000000000..1c72a49df --- /dev/null +++ b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/ReadCompletionHandler.java @@ -0,0 +1,142 @@ +/* + * Copyright 2024 asyncer.io projects + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.asyncer.r2dbc.mysql.internal.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import reactor.core.publisher.FluxSink; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An implementation of {@link CompletionHandler} that reads data from an asynchronous file channel and emits + * file data or I/O exception to a {@link FluxSink}. + */ +final class ReadCompletionHandler implements CompletionHandler { + + private final AsynchronousFileChannel channel; + + private final ByteBufAllocator allocator; + + private final int bufferSize; + + private final FluxSink sink; + + private final AtomicLong position; + + private final AtomicReference state = new AtomicReference<>(State.IDLE); + + ReadCompletionHandler( + AsynchronousFileChannel channel, + ByteBufAllocator allocator, + int bufferSize, + FluxSink sink + ) { + this.channel = channel; + this.allocator = allocator; + this.bufferSize = bufferSize; + this.sink = sink; + this.position = new AtomicLong(0); + } + + public void request(long ignored) { + tryRead(); + } + + public void cancel() { + this.state.getAndSet(State.DISPOSED); + + // According java.nio.channels.AsynchronousChannel "if an I/O operation is outstanding + // on the channel and the channel's close method is invoked, then the I/O operation + // fails with the exception AsynchronousCloseException". That should invoke the failed + // callback below and the current ByteBuf should be released. + + tryCloseChannel(); + } + + private void tryRead() { + if (this.sink.requestedFromDownstream() > 0 && this.state.compareAndSet(State.IDLE, State.READING)) { + read(); + } + } + + private void read() { + ByteBuf buf = this.allocator.buffer(this.bufferSize); + ByteBuffer byteBuffer = buf.nioBuffer(buf.writerIndex(), buf.writableBytes()); + + this.channel.read(byteBuffer, this.position.get(), buf, this); + } + + @Override + public void completed(Integer read, ByteBuf buf) { + if (State.DISPOSED.equals(this.state.get())) { + buf.release(); + tryCloseChannel(); + return; + } + + if (read == -1) { + buf.release(); + tryCloseChannel(); + this.state.set(State.DISPOSED); + this.sink.complete(); + return; + } + + this.position.addAndGet(read); + buf.writerIndex(read); + this.sink.next(buf); + + // Stay in READING mode if there is demand + if (this.sink.requestedFromDownstream() > 0) { + read(); + return; + } + + // Release READING mode and then try again in case of concurrent "request" + if (this.state.compareAndSet(State.READING, State.IDLE)) { + tryRead(); + } + } + + @Override + public void failed(Throwable exc, ByteBuf buf) { + buf.release(); + + tryCloseChannel(); + this.state.set(State.DISPOSED); + this.sink.error(exc); + } + + private enum State { + IDLE, READING, DISPOSED + } + + void tryCloseChannel() { + if (channel.isOpen()) { + try { + channel.close(); + } catch (IOException ignored) { + } + } + } +} diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/AuthResponse.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/AuthResponse.java index 0837febc3..4c6d2b1de 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/client/AuthResponse.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/AuthResponse.java @@ -26,7 +26,7 @@ * A message that contains only an authentication, used by full authentication or change authentication * response. */ -public final class AuthResponse extends SizedClientMessage implements LoginClientMessage { +public final class AuthResponse extends SizedClientMessage implements SubsequenceClientMessage { private final int envelopeId; diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/HandshakeResponse.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/HandshakeResponse.java index a1b0de9bd..ca05f9831 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/client/HandshakeResponse.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/HandshakeResponse.java @@ -25,9 +25,9 @@ import static io.asyncer.r2dbc.mysql.constant.Envelopes.TERMINAL; /** - * An abstraction of {@link LoginClientMessage} considers handshake response. + * An abstraction of {@link SubsequenceClientMessage} considers handshake response. */ -public interface HandshakeResponse extends LoginClientMessage { +public interface HandshakeResponse extends SubsequenceClientMessage { /** * Construct an instance of {@link HandshakeResponse}, it is implemented by the protocol version that is diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/LocalInfileResponse.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/LocalInfileResponse.java new file mode 100644 index 000000000..c9ca2419f --- /dev/null +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/LocalInfileResponse.java @@ -0,0 +1,125 @@ +/* + * Copyright 2024 asyncer.io projects + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.asyncer.r2dbc.mysql.message.client; + +import io.asyncer.r2dbc.mysql.ConnectionContext; +import io.asyncer.r2dbc.mysql.internal.util.NettyBufferUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.r2dbc.spi.R2dbcNonTransientResourceException; +import io.r2dbc.spi.R2dbcPermissionDeniedException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SynchronousSink; + +import java.nio.file.InvalidPathException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.atomic.AtomicReference; + +import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull; + +/** + * A message considers as a chunk of a local in-file data. + */ +public final class LocalInfileResponse implements SubsequenceClientMessage { + + private final int envelopeId; + + private final String path; + + private final SynchronousSink errorSink; + + public LocalInfileResponse(int envelopeId, String path, SynchronousSink errorSink) { + requireNonNull(path, "path must not be null"); + + this.envelopeId = envelopeId; + this.path = path; + this.errorSink = errorSink; + } + + @Override + public Flux encode(ByteBufAllocator allocator, ConnectionContext context) { + return Flux.defer(() -> { + int bufferSize = context.getLocalInfileBufferSize(); + AtomicReference error = new AtomicReference<>(); + + return Mono.create(sink -> { + try { + Path safePath = context.getLocalInfilePath(); + Path file = Paths.get(this.path); + + if (safePath == null) { + String message = "Allowed local file path not set, but attempted to load '" + file + + '\''; + sink.error(new R2dbcPermissionDeniedException(message)); + } else if (file.startsWith(safePath)) { + sink.success(file); + } else { + String message = String.format("The file '%s' is not under the safe path '%s'", + file, safePath); + sink.error(new R2dbcPermissionDeniedException(message)); + } + } catch (InvalidPathException e) { + sink.error(new R2dbcNonTransientResourceException("Invalid path: " + this.path, e)); + } catch (Throwable e) { + sink.error(e); + } + }).flatMapMany(p -> NettyBufferUtils.readFile(p, allocator, bufferSize)).onErrorComplete(e -> { + // Server needs an empty buffer, so emit error to upstream instead of encoding stream. + error.set(e); + return true; + }).concatWith(Flux.just(allocator.buffer(0, 0))).doAfterTerminate(() -> { + Throwable e = error.getAndSet(null); + + if (e != null) { + errorSink.error(e); + } + }); + }); + } + + @Override + public int getEnvelopeId() { + return envelopeId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LocalInfileResponse)) { + return false; + } + + LocalInfileResponse that = (LocalInfileResponse) o; + + return envelopeId == that.envelopeId && path.equals(that.path); + } + + @Override + public int hashCode() { + return 31 * envelopeId + path.hashCode(); + } + + @Override + public String toString() { + return "LocalInfileResponse{envelopeId=" + envelopeId + + ", path='" + path + "'}"; + } +} diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/SslRequest.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/SslRequest.java index c1e1356cf..16420a412 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/client/SslRequest.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/SslRequest.java @@ -23,7 +23,7 @@ /** * An abstraction of {@link ClientMessage} that considers SSL request for handshake. */ -public interface SslRequest extends LoginClientMessage { +public interface SslRequest extends SubsequenceClientMessage { /** * Get current {@link Capability} of the connection. diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/LoginClientMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/SubsequenceClientMessage.java similarity index 69% rename from src/main/java/io/asyncer/r2dbc/mysql/message/client/LoginClientMessage.java rename to src/main/java/io/asyncer/r2dbc/mysql/message/client/SubsequenceClientMessage.java index 3bbca7e0b..091e7a57b 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/client/LoginClientMessage.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/SubsequenceClientMessage.java @@ -17,12 +17,15 @@ package io.asyncer.r2dbc.mysql.message.client; /** - * An abstraction of {@link ClientMessage} considers login phase messages. + * An abstraction of {@link ClientMessage} that considers as a subsequence of responses from a request from + * the server. + *

+ * All encoded buffers will not be cumulated. */ -public interface LoginClientMessage extends ClientMessage { +public interface SubsequenceClientMessage extends ClientMessage { /** - * Get the current envelope ID used to serialize subsequent request messages. + * Gets the current envelope ID used to serialize subsequent request messages. * * @return the current envelope ID. */ diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/LocalInfileRequest.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/LocalInfileRequest.java new file mode 100644 index 000000000..e493e2bf8 --- /dev/null +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/LocalInfileRequest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2024 asyncer.io projects + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.asyncer.r2dbc.mysql.message.server; + +import io.asyncer.r2dbc.mysql.ConnectionContext; +import io.netty.buffer.ByteBuf; + +/** + * A message sent by the server to indicate that the client should send a file to the server using the + * {@code LOAD DATA LOCAL INFILE} command. + */ +public final class LocalInfileRequest implements ServerMessage { + + private final int envelopeId; + + private final String path; + + private LocalInfileRequest(int envelopeId, String path) { + this.envelopeId = envelopeId; + this.path = path; + } + + public int getEnvelopeId() { + return envelopeId; + } + + public String getPath() { + return path; + } + + static LocalInfileRequest decode(int envelopeId, ByteBuf buf, ConnectionContext context) { + buf.skipBytes(1); // Constant 0xFB + return new LocalInfileRequest(envelopeId, buf.toString(context.getClientCollation().getCharset())); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LocalInfileRequest)) { + return false; + } + + LocalInfileRequest that = (LocalInfileRequest) o; + + return envelopeId == that.envelopeId && path.equals(that.path); + } + + @Override + public int hashCode() { + return 31 * envelopeId + path.hashCode(); + } + + @Override + public String toString() { + return "LocalInfileRequest{envelopeId=" + envelopeId + + ", path='" + path + "'}"; + } +} diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java index 1f7408e7b..f81a06200 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java @@ -48,6 +48,11 @@ public final class ServerMessageDecoder { private static final short ERROR = 0xFF; + /** + * Note: it can be a column count message, so the packet size should be checked. + */ + private static final short LOCAL_INFILE = 0xFB; + private final List parts = new ArrayList<>(); /** @@ -99,7 +104,7 @@ private static ServerMessage decodeMessage(List buffers, int envelopeId try { if (decodeContext instanceof CommandDecodeContext) { - return decodeCommandMessage(combined, context); + return decodeCommandMessage(envelopeId, combined, context); } else if (decodeContext instanceof PreparedMetadataDecodeContext) { return decodePreparedMetadata(combined, context, (PreparedMetadataDecodeContext) decodeContext); @@ -189,7 +194,8 @@ private static ServerMessage decodePrepareQuery(ByteBuf buf) { " on prepare query phase"); } - private static ServerMessage decodeCommandMessage(ByteBuf buf, ConnectionContext context) { + private static ServerMessage decodeCommandMessage(int envelopeId, ByteBuf buf, + ConnectionContext context) { short header = buf.getUnsignedByte(buf.readerIndex()); switch (header) { case ERROR: @@ -213,6 +219,10 @@ private static ServerMessage decodeCommandMessage(ByteBuf buf, ConnectionContext } else if (EofMessage.isValidSize(byteSize)) { return EofMessage.decode(buf); } + case LOCAL_INFILE: + if (buf.readableBytes() > 1) { + return LocalInfileRequest.decode(envelopeId, buf, context); + } } if (VarIntUtils.checkNextVarInt(buf) == 0) { diff --git a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionContextTest.java b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionContextTest.java index 2c8c907bd..dce1b0ddb 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionContextTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionContextTest.java @@ -34,7 +34,9 @@ public class ConnectionContextTest { void getServerZoneId() { for (int i = -12; i <= 12; ++i) { String id = i < 0 ? "UTC" + i : "UTC+" + i; - ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, ZoneId.of(id)); + ConnectionContext context = new ConnectionContext( + ZeroDateOption.USE_NULL, null, + 8192, ZoneId.of(id)); assertThat(context.getServerZoneId()).isEqualTo(ZoneId.of(id)); } @@ -42,7 +44,8 @@ void getServerZoneId() { @Test void shouldSetServerZoneId() { - ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, null); + ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, null, + 8192, null); assertThat(context.shouldSetServerZoneId()).isTrue(); context.setServerZoneId(ZoneId.systemDefault()); assertThat(context.shouldSetServerZoneId()).isFalse(); @@ -50,20 +53,23 @@ void shouldSetServerZoneId() { @Test void shouldNotSetServerZoneId() { - ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, ZoneId.systemDefault()); + ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, null, + 8192, ZoneId.systemDefault()); assertThat(context.shouldSetServerZoneId()).isFalse(); } @Test void setTwiceServerZoneId() { - ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, null); + ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, null, + 8192, null); context.setServerZoneId(ZoneId.systemDefault()); assertThatIllegalStateException().isThrownBy(() -> context.setServerZoneId(ZoneId.systemDefault())); } @Test void badSetServerZoneId() { - ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, ZoneId.systemDefault()); + ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, null, + 8192, ZoneId.systemDefault()); assertThatIllegalStateException().isThrownBy(() -> context.setServerZoneId(ZoneId.systemDefault())); } @@ -76,7 +82,8 @@ public static ConnectionContext mock(boolean isMariaDB) { } public static ConnectionContext mock(boolean isMariaDB, ZoneId zoneId) { - ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, zoneId); + ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, null, + 8192, zoneId); context.init(1, ServerVersion.parse(isMariaDB ? "11.2.22.MOCKED" : "8.0.11.MOCKED"), Capability.of(~(isMariaDB ? 1 : 0))); diff --git a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java index fcd5b03a7..257064291 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java @@ -16,15 +16,33 @@ package io.asyncer.r2dbc.mysql; +import io.r2dbc.spi.ColumnMetadata; +import io.r2dbc.spi.R2dbcPermissionDeniedException; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.testcontainers.shaded.com.fasterxml.jackson.core.JsonProcessingException; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.node.ArrayNode; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.node.ObjectNode; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; - +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAccessor; import java.util.Arrays; import java.util.Collections; +import java.util.Objects; import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED; import static io.r2dbc.spi.IsolationLevel.READ_UNCOMMITTED; @@ -37,8 +55,25 @@ */ class ConnectionIntegrationTest extends IntegrationTestSupport { + private static final MySqlConnectionConfiguration config = configuration( + "r2dbc", false, false, null, null); + ConnectionIntegrationTest() { - super(configuration("r2dbc", false, false, null, null)); + super(config); + } + + @BeforeAll + static void initGlobalVariables() { + // TODO: move it to GitHub Actions instead of test code + MySqlConnectionFactory.from(config) + .create() + .flatMap(conn -> conn.createStatement("SET GLOBAL local_infile=ON") + .execute() + .flatMap(MySqlResult::getRowsUpdated) + .then(conn.close()) + .onErrorResume(e -> conn.close().then(Mono.error(e)))) + .as(StepVerifier::create) + .verifyComplete(); } @Test @@ -401,7 +436,87 @@ void beginTransactionShouldRespectQueuedMessages() { .flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class))) .doOnNext(count -> assertThat(count).isEqualTo(1L))) ); + } + + @Test + void loadDataLocalInfileRestricted() throws URISyntaxException { + URL safeUrl = Objects.requireNonNull(getClass().getResource("/local/")); + URL unsafeUrl = Objects.requireNonNull(getClass().getResource("/")); + Path safePath = Paths.get(safeUrl.toURI()); + Path path = Paths.get(unsafeUrl.toURI()).resolve("logback-test.xml"); + + process(connection -> Mono.from(connection.createStatement("CREATE TEMPORARY TABLE test" + + "(id INT NOT NULL PRIMARY KEY, name VARCHAR(50))").execute()) + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .thenMany(connection.createStatement("LOAD DATA LOCAL INFILE '" + path + + "' INTO TABLE test").execute()) + .flatMap(IntegrationTestSupport::extractRowsUpdated)) + .verifyErrorMatches(msg -> msg instanceof R2dbcPermissionDeniedException && + msg.getMessage().contains(path.toString()) && msg.getMessage().contains(safePath.toString())); + } + @ParameterizedTest + @ValueSource(strings = { "stations", "users" }) + void loadDataLocalInfile(String name) throws URISyntaxException, IOException { + URL tdlUrl = Objects.requireNonNull(getClass().getResource(String.format("/local/%s.sql", name))); + URL csvUrl = Objects.requireNonNull(getClass().getResource(String.format("/local/%s.csv", name))); + URL jsonUrl = Objects.requireNonNull(getClass().getResource(String.format("/local/%s.json", name))); + String tdl = new String(Files.readAllBytes(Paths.get(tdlUrl.toURI())), StandardCharsets.UTF_8); + String path = Paths.get(csvUrl.toURI()).toString(); + String loadData = String.format("LOAD DATA LOCAL INFILE '%s' INTO TABLE `%s` " + + "FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"'", path, name); + String select = String.format("SELECT * FROM `%s` ORDER BY `id`", name); + ObjectMapper mapper = new ObjectMapper(); + ArrayNode arrayNode = (ArrayNode) mapper.readTree(jsonUrl); + String json = mapper.writeValueAsString(arrayNode); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + complete(conn -> conn.createStatement(tdl) + .execute() + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .thenMany(conn.createStatement(loadData).execute()) + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .doOnNext(it -> assertThat(it).isEqualTo(arrayNode.size())) + .thenMany(conn.createStatement(select).execute()) + .flatMap(result -> result.map((row, metadata) -> { + ObjectNode node = mapper.createObjectNode(); + + for (ColumnMetadata column : metadata.getColumnMetadatas()) { + String columnName = column.getName(); + Object value = row.get(columnName); + + if (value instanceof TemporalAccessor) { + node.set(columnName, node.textNode(formatter.format((TemporalAccessor) value))); + } else if (value instanceof Long) { + node.set(columnName, node.numberNode(((Long) value))); + } else if (value instanceof Integer) { + node.set(columnName, node.numberNode(((Integer) value))); + } else if (value instanceof String) { + node.set(columnName, node.textNode(((String) value))); + } else if (value == null) { + node.set(columnName, node.nullNode()); + } else { + throw new IllegalArgumentException("Unsupported type: " + value.getClass()); + } + } + + return node; + })) + .collectList() + .handle((list, sink) -> { + ArrayNode array = mapper.createArrayNode(); + + for (ObjectNode node : list) { + array.add(node); + } + + try { + sink.next(mapper.writeValueAsString(array)); + } catch (JsonProcessingException e) { + sink.error(e); + } + }) + .doOnNext(it -> assertThat(it).isEqualTo(json))); } @Test diff --git a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java index 48e1ec67b..2489d5732 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java @@ -25,8 +25,13 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import java.time.ZoneId; +import java.util.Objects; import java.util.function.Function; import java.util.function.Predicate; @@ -63,7 +68,7 @@ Mono create() { return connectionFactory.create(); } - private StepVerifier.FirstStep process(Function> runner) { + StepVerifier.FirstStep process(Function> runner) { return create() .flatMap(connection -> Flux.from(runner.apply(connection)) .onErrorResume(e -> connection.close().then(Mono.error(e))) @@ -86,6 +91,16 @@ static MySqlConnectionConfiguration configuration( .isNotNull() .isNotEmpty(); + String localInfilePath; + + try { + URL url = Objects.requireNonNull(IntegrationTestSupport.class.getResource("/local/")); + Path path = Paths.get(url.toURI()); + localInfilePath = path.toString(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + MySqlConnectionConfiguration.Builder builder = MySqlConnectionConfiguration.builder() .host("127.0.0.1") .connectTimeout(Duration.ofSeconds(3)) @@ -93,6 +108,7 @@ static MySqlConnectionConfiguration configuration( .password(password) .database(database) .createDatabaseIfNotExist(createDatabaseIfNotExist) + .allowLoadLocalInfileInPath(localInfilePath) .autodetectExtensions(autodetectExtensions); if (serverZoneId != null) { diff --git a/src/test/java/io/asyncer/r2dbc/mysql/StatementTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/StatementTestSupport.java index 3c97d55ab..6732b6ec4 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/StatementTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/StatementTestSupport.java @@ -17,7 +17,6 @@ package io.asyncer.r2dbc.mysql; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.provider.ValueSource; import java.util.NoSuchElementException; diff --git a/src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelopeTest.java b/src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelopeTest.java similarity index 98% rename from src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelopeTest.java rename to src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelopeTest.java index a9fa64817..02e076a03 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelopeTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelopeTest.java @@ -36,9 +36,9 @@ import static org.assertj.core.api.Assertions.assertThat; /** - * Unit tests for {@link FluxCumulateEnvelope}. + * Unit tests for {@link FluxEnvelope}. */ -class FluxCumulateEnvelopeTest { +class FluxEnvelopeTest { private static final byte[] RD_PATTERN = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz" .getBytes(StandardCharsets.US_ASCII); @@ -262,7 +262,7 @@ void mergeIntegralWithLargeCrossIntegral() { } private Flux envelopes(Flux source, int envelopeSize) { - return new FluxCumulateEnvelope(source, allocator, envelopeSize, 0); + return new FluxEnvelope(source, allocator, envelopeSize, 0, true); } private Consumer> assertBuffers(String origin, int envelopeSize, int lastSize, diff --git a/src/test/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtilsTest.java b/src/test/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtilsTest.java new file mode 100644 index 000000000..a5f06f35c --- /dev/null +++ b/src/test/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtilsTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2024 asyncer.io projects + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.asyncer.r2dbc.mysql.internal.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; + +/** + * Unit tests for {@link NettyBufferUtils}. + */ +class NettyBufferUtilsTest { + + @ParameterizedTest + @ValueSource(strings = { "stations.csv", "users.csv" }) + void readFile(String name) throws IOException, URISyntaxException { + URL url = Objects.requireNonNull(getClass().getResource("/local/" + name)); + Path path = Paths.get(Paths.get(url.toURI()).toString()); + String content = new String(Files.readAllBytes(path), StandardCharsets.UTF_8); + + NettyBufferUtils.readFile(path, PooledByteBufAllocator.DEFAULT, 8192) + .map(NettyBufferUtilsTest::toStringAndRelease) + .as(it -> StepVerifier.create(it, Long.MAX_VALUE)) + .expectNext(content) + .verifyComplete(); + } + + private static String toStringAndRelease(ByteBuf buf) { + String s = buf.toString(StandardCharsets.UTF_8); + buf.release(); + return s; + } +} diff --git a/src/test/resources/local/stations.csv b/src/test/resources/local/stations.csv new file mode 100644 index 000000000..750030b95 --- /dev/null +++ b/src/test/resources/local/stations.csv @@ -0,0 +1,6 @@ +10000,Atlantic,2024-01-01 00:00:00 +10001,Baychester,2024-01-02 00:00:00 +10002,Cortelyou,2024-01-05 00:00:00 +10003,Dyckman,2024-01-08 00:00:00 +10004,Elmhurst,2024-01-06 00:00:00 +10005,Fordham,2024-01-03 00:00:00 diff --git a/src/test/resources/local/stations.json b/src/test/resources/local/stations.json new file mode 100644 index 000000000..92d19f2d6 --- /dev/null +++ b/src/test/resources/local/stations.json @@ -0,0 +1,32 @@ +[ + { + "id": 10000, + "name": "Atlantic", + "created_at": "2024-01-01 00:00:00" + }, + { + "id": 10001, + "name": "Baychester", + "created_at": "2024-01-02 00:00:00" + }, + { + "id": 10002, + "name": "Cortelyou", + "created_at": "2024-01-05 00:00:00" + }, + { + "id": 10003, + "name": "Dyckman", + "created_at": "2024-01-08 00:00:00" + }, + { + "id": 10004, + "name": "Elmhurst", + "created_at": "2024-01-06 00:00:00" + }, + { + "id": 10005, + "name": "Fordham", + "created_at": "2024-01-03 00:00:00" + } +] diff --git a/src/test/resources/local/stations.sql b/src/test/resources/local/stations.sql new file mode 100644 index 000000000..7ef113a93 --- /dev/null +++ b/src/test/resources/local/stations.sql @@ -0,0 +1,6 @@ +CREATE TEMPORARY TABLE `stations` +( + `id` INT NOT NULL PRIMARY KEY, + `name` VARCHAR(120) NOT NULL, + `created_at` DATETIME NOT NULL +) diff --git a/src/test/resources/local/users.csv b/src/test/resources/local/users.csv new file mode 100644 index 000000000..c63c4a5c3 --- /dev/null +++ b/src/test/resources/local/users.csv @@ -0,0 +1,5 @@ +mirromutth,Mirro Mutth,3000, +superman,Superman,\N,I'm a superhero +earth,earth,4543000000,"I'm a planet. +I have ""Humans"" on me" +goku,Goku,40,"I'm a saiyan, I'm hungry" diff --git a/src/test/resources/local/users.json b/src/test/resources/local/users.json new file mode 100644 index 000000000..33facc0f6 --- /dev/null +++ b/src/test/resources/local/users.json @@ -0,0 +1,26 @@ +[ + { + "id": "earth", + "name": "earth", + "age": 4543000000, + "bio": "I'm a planet.\nI have \"Humans\" on me" + }, + { + "id": "goku", + "name": "Goku", + "age": 40, + "bio": "I'm a saiyan, I'm hungry" + }, + { + "id": "mirromutth", + "name": "Mirro Mutth", + "age": 3000, + "bio": "" + }, + { + "id": "superman", + "name": "Superman", + "age": null, + "bio": "I'm a superhero" + } +] diff --git a/src/test/resources/local/users.sql b/src/test/resources/local/users.sql new file mode 100644 index 000000000..4afcfd30b --- /dev/null +++ b/src/test/resources/local/users.sql @@ -0,0 +1,7 @@ +CREATE TEMPORARY TABLE `users` +( + `id` VARCHAR(120) NOT NULL PRIMARY KEY, + `name` VARCHAR(120) NOT NULL, + `age` BIGINT NULL, + `bio` TEXT NULL +)