Skip to content

Commit

Permalink
Temp commit: Add support for failover
Browse files Browse the repository at this point in the history
  • Loading branch information
mirromutth committed Apr 5, 2024
1 parent e5bf1ac commit 7c9144a
Show file tree
Hide file tree
Showing 11 changed files with 464 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
import io.netty.channel.ChannelOption;
import io.netty.resolver.AddressResolver;
Expand All @@ -26,13 +27,17 @@
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.time.ZoneId;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object.
Expand All @@ -49,7 +54,7 @@ interface ConnectionStrategy {
*
* @return a logged-in {@link Client} object.
*/
Mono<Client> connect();
Mono<? extends Client> connect();

/**
* Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}.
Expand Down Expand Up @@ -87,7 +92,7 @@ static TcpClient createTcpClient(SocketClientConfiguration configuration, boolea
* @param configuration a configuration that affects login behavior.
* @return a logged-in {@link Client} object.
*/
static Mono<Client> connectWithInit(
static Mono<ReactorNettyClient> connectWithInit(
TcpClient tcpClient,
Credential credential,
MySqlConnectionConfiguration configuration
Expand All @@ -110,7 +115,7 @@ static Mono<Client> connectWithInit(
configuration.isPreserveInstants(),
connectionTimeZone
);
}).flatMap(context -> Client.connect(tcpClient, configuration.getSsl(), context)).flatMap(client -> {
}).flatMap(ctx -> ReactorNettyClient.connect(tcpClient, configuration.getSsl(), ctx)).flatMap(client -> {
// Lazy init database after handshake/login
MySqlSslConfiguration ssl = configuration.getSsl();
String loginDb = configuration.isCreateDatabaseIfNotExist() ? "" : configuration.getDatabase();
Expand All @@ -126,30 +131,88 @@ static Mono<Client> connectWithInit(
).then(Mono.just(client)).onErrorResume(e -> client.forceClose().then(Mono.error(e)));
});
}
}

/**
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
*
* @since 1.2.0
*/
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> {

BalancedResolverGroup() {
/**
* Creates an exception that indicates a retry failure.
*
* @param message the message of the exception.
* @param cause the last exception that caused the retry.
* @return a retry failure exception.
*/
static R2dbcNonTransientResourceException retryFail(String message, @Nullable Throwable cause) {
return new R2dbcNonTransientResourceException(
message,
"H1000",
9000,
cause
);
}

public static final BalancedResolverGroup INSTANCE;
/**
* Connect and login to a MySQL server with a specific TCP socket address.
*
* @since 1.2.0
*/
final class InetConnectFunction implements Function<Supplier<InetSocketAddress>, Mono<ReactorNettyClient>> {

private final boolean balancedDns;

private final boolean tcpKeepAlive;

private final boolean tcpNoDelay;

private final Credential credential;

static {
INSTANCE = new BalancedResolverGroup();
Runtime.getRuntime().addShutdownHook(new Thread(
INSTANCE::close,
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
));
private final MySqlConnectionConfiguration configuration;

InetConnectFunction(
boolean balancedDns,
boolean tcpKeepAlive,
boolean tcpNoDelay,
Credential credential,
MySqlConnectionConfiguration configuration
) {
this.balancedDns = balancedDns;
this.tcpKeepAlive = tcpKeepAlive;
this.tcpNoDelay = tcpNoDelay;
this.credential = credential;
this.configuration = configuration;
}

@Override
public Mono<ReactorNettyClient> apply(Supplier<InetSocketAddress> address) {
TcpClient client = ConnectionStrategy.createTcpClient(configuration.getClient(), balancedDns)
.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
.option(ChannelOption.TCP_NODELAY, tcpNoDelay)
.remoteAddress(address);

return ConnectionStrategy.connectWithInit(client, credential, configuration);
}
}

@Override
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
/**
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
*
* @since 1.2.0
*/
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> {

BalancedResolverGroup() {
}

public static final BalancedResolverGroup INSTANCE;

static {
INSTANCE = new BalancedResolverGroup();
Runtime.getRuntime().addShutdownHook(new Thread(
INSTANCE::close,
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
));
}

@Override
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
}
}
}
13 changes: 7 additions & 6 deletions r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/InitFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.asyncer.r2dbc.mysql.cache.PrepareCache;
import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.client.FluxExchangeable;
import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
import io.asyncer.r2dbc.mysql.codec.Codecs;
import io.asyncer.r2dbc.mysql.codec.CodecsBuilder;
import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
Expand Down Expand Up @@ -75,7 +76,7 @@
import java.util.function.Function;

/**
* A message flow utility that can initializes the session of {@link Client}.
* A message flow utility that can initializes the session of {@link ReactorNettyClient}.
* <p>
* It should not use server-side prepared statements, because {@link PrepareCache} will be initialized after the session
* is initialized.
Expand Down Expand Up @@ -117,9 +118,9 @@ final class InitFlow {
};

/**
* Initializes handshake and login a {@link Client}.
* Initializes handshake and login a {@link ReactorNettyClient}.
*
* @param client the {@link Client} to exchange messages with.
* @param client the {@link ReactorNettyClient} to exchange messages with.
* @param sslMode the {@link SslMode} defines SSL capability and behavior.
* @param database the database that will be connected.
* @param user the user that will be login.
Expand All @@ -128,7 +129,7 @@ final class InitFlow {
* @param zstdCompressionLevel the zstd compression level.
* @return a {@link Mono} that indicates the initialization is done, or an error if the initialization failed.
*/
static Mono<Void> initHandshake(Client client, SslMode sslMode, String database, String user,
static Mono<Void> initHandshake(ReactorNettyClient client, SslMode sslMode, String database, String user,
@Nullable CharSequence password, Set<CompressionAlgorithm> compressionAlgorithms, int zstdCompressionLevel) {
return client.exchange(new HandshakeExchangeable(
client,
Expand Down Expand Up @@ -488,7 +489,7 @@ final class HandshakeExchangeable extends FluxExchangeable<Void> {
private final Sinks.Many<SubsequenceClientMessage> requests = Sinks.many().unicast()
.onBackpressureBuffer(Queues.<SubsequenceClientMessage>one().get());

private final Client client;
private final ReactorNettyClient client;

private final SslMode sslMode;

Expand All @@ -511,7 +512,7 @@ final class HandshakeExchangeable extends FluxExchangeable<Void> {

private boolean sslCompleted;

HandshakeExchangeable(Client client, SslMode sslMode, String database, String user,
HandshakeExchangeable(ReactorNettyClient client, SslMode sslMode, String database, String user,
@Nullable CharSequence password, Set<CompressionAlgorithm> compressions,
int zstdCompressionLevel) {
this.client = client;
Expand Down
Loading

0 comments on commit 7c9144a

Please sign in to comment.