From 0ab96dac6eac44064f411fdcb4144d158f81e561 Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Fri, 11 Nov 2022 17:47:06 +0800 Subject: [PATCH] Introduce ping connecion handler PingConnectionHandler will periodically send PING commands to Redis Server, and decide whether to reconnect based on whether it fails (the current strategy is to reconnect after three consecutive failures) This is essentially because KeepAlive that only relies on TCP is unreliable, for details, please refer to: #2082 --- .../java/io/lettuce/core/ClientOptions.java | 32 ++++- .../io/lettuce/core/ConnectionBuilder.java | 17 +++ .../lettuce/core/PingConnectionHandler.java | 119 ++++++++++++++++++ .../ConnectToRedisWithPingInterval.java | 30 +++++ 4 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/lettuce/core/PingConnectionHandler.java create mode 100644 src/test/java/io/lettuce/examples/ConnectToRedisWithPingInterval.java diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index 42dbf9c0a5..6f8ec3987c 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -61,6 +61,8 @@ public class ClientOptions implements Serializable { public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create(); + public static final int DEFAULT_PING_CONNECTION_INTERVAL = 0; + private final boolean autoReconnect; private final boolean cancelCommandsOnReconnectFailure; @@ -87,6 +89,8 @@ public class ClientOptions implements Serializable { private final TimeoutOptions timeoutOptions; + private final int pingConnectionInterval; + protected ClientOptions(Builder builder) { this.autoReconnect = builder.autoReconnect; @@ -102,6 +106,7 @@ protected ClientOptions(Builder builder) { this.sslOptions = builder.sslOptions; this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure; this.timeoutOptions = builder.timeoutOptions; + this.pingConnectionInterval = builder.pingConnectionInterval; } protected ClientOptions(ClientOptions original) { @@ -118,6 +123,7 @@ protected ClientOptions(ClientOptions original) { this.sslOptions = original.getSslOptions(); this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure(); this.timeoutOptions = original.getTimeoutOptions(); + this.pingConnectionInterval = original.getPingConnectionInterval(); } /** @@ -179,6 +185,8 @@ public static class Builder { private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS; + private int pingConnectionInterval = DEFAULT_PING_CONNECTION_INTERVAL; + protected Builder() { } @@ -389,6 +397,19 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) { return this; } + /** + * Set the period of sending PING command. see {@link PingConnectionHandler} + * @param pingConnectionInterval the period + * @return {@code this} + */ + public Builder pingConnectionInterval(int pingConnectionInterval) { + if (pingConnectionInterval <= 0) { + throw new IllegalArgumentException("PingConnectionInterval must be greater than 0, unit:ms"); + } + this.pingConnectionInterval = pingConnectionInterval; + return this; + } + /** * Create a new instance of {@link ClientOptions}. * @@ -418,7 +439,8 @@ public ClientOptions.Builder mutate() { .publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection()) .protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize()) .scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions()) - .suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions()); + .suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions()) + .pingConnectionInterval(getPingConnectionInterval()); return builder; } @@ -592,6 +614,14 @@ public TimeoutOptions getTimeoutOptions() { return timeoutOptions; } + /** + * Return the pingConnectionInterval + * @return the pingConnectionInterval + */ + public int getPingConnectionInterval() { + return pingConnectionInterval; + } + /** * Behavior of connections in disconnected state. */ diff --git a/src/main/java/io/lettuce/core/ConnectionBuilder.java b/src/main/java/io/lettuce/core/ConnectionBuilder.java index 94c11fdd9b..a02d625adf 100644 --- a/src/main/java/io/lettuce/core/ConnectionBuilder.java +++ b/src/main/java/io/lettuce/core/ConnectionBuilder.java @@ -87,6 +87,8 @@ public class ConnectionBuilder { private ConnectionWatchdog connectionWatchdog; + private PingConnectionHandler pingConnectionHandler; + private RedisURI redisURI; public static ConnectionBuilder connectionBuilder() { @@ -129,6 +131,10 @@ protected List buildHandlers() { handlers.add(createConnectionWatchdog()); } + if (clientOptions.getPingConnectionInterval() > 0) { + handlers.add(createPingConnectionHandler()); + } + return handlers; } @@ -155,6 +161,17 @@ protected ConnectionWatchdog createConnectionWatchdog() { return watchdog; } + protected PingConnectionHandler createPingConnectionHandler() { + + if (pingConnectionHandler != null) { + return pingConnectionHandler; + } + + PingConnectionHandler handler = new PingConnectionHandler(clientResources, clientOptions); + pingConnectionHandler = handler; + return handler; + } + public ChannelInitializer build(SocketAddress socketAddress) { return new PlainChannelInitializer(this::buildHandlers, clientResources); } diff --git a/src/main/java/io/lettuce/core/PingConnectionHandler.java b/src/main/java/io/lettuce/core/PingConnectionHandler.java new file mode 100644 index 0000000000..2d98d04c99 --- /dev/null +++ b/src/main/java/io/lettuce/core/PingConnectionHandler.java @@ -0,0 +1,119 @@ +package io.lettuce.core; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.protocol.AsyncCommand; +import io.lettuce.core.protocol.Command; +import io.lettuce.core.resource.ClientResources; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * A netty {@link ChannelHandler} responsible for monitoring (By periodically sending PING commands) Redis alive. + * + * @author Bodong Yang + * @date 2022/11/11 + */ +@ChannelHandler.Sharable +public class PingConnectionHandler extends ChannelInboundHandlerAdapter { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(PingConnectionHandler.class); + + private final RedisCommandBuilder commandBuilder = new RedisCommandBuilder<>(StringCodec.UTF8); + + private static final int MAX_PING_FAILED_TIMES = 3; + + private final AtomicInteger pingFailed = new AtomicInteger(0); + + private final ClientResources clientResources; + + private final ClientOptions clientOptions; + + public PingConnectionHandler(ClientResources clientResources, ClientOptions clientOptions) { + this.clientResources = clientResources; + this.clientOptions = clientOptions; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + sendPing(ctx); + ctx.fireChannelActive(); + } + + /** + * Periodically send the PING command, if it fails three times in a row, it will initiate a reconnection. + * Ignore exceptions returned by the following PING commands: {@link RedisLoadingException} + * {@link RedisBusyException} + * + * @param ctx the ctx + */ + private void sendPing(ChannelHandlerContext ctx) { + AsyncCommand dispatch = dispatch(ctx.channel(), commandBuilder.ping()); + + clientResources.timer().newTimeout(timeout -> { + if (ctx.isRemoved() || !ctx.channel().isActive()) { + return; + } + + if (dispatch.cancel(false) || cause(dispatch) != null) { + + Throwable cause = cause(dispatch); + + if (!(cause instanceof RedisLoadingException + || cause instanceof RedisBusyException)) { + if (!dispatch.isCancelled()) { + logger.error("Unable to send PING command over channel: " + ctx.channel(), cause); + } + + if (pingFailed.incrementAndGet() == MAX_PING_FAILED_TIMES) { + logger.error("channel: {} closed due to {} consecutive PING response timeout set in {} ms", + ctx.channel(), MAX_PING_FAILED_TIMES, clientOptions.getPingConnectionInterval()); + pingFailed.set(0); + ctx.channel().close(); + } else { + sendPing(ctx); + } + } else { + pingFailed.set(0); + sendPing(ctx); + } + } else { + pingFailed.set(0); + sendPing(ctx); + } + }, clientOptions.getPingConnectionInterval(), TimeUnit.MILLISECONDS); + } + + private AsyncCommand dispatch(Channel channel, Command command) { + + AsyncCommand future = new AsyncCommand<>(command); + + channel.writeAndFlush(future).addListener(writeFuture -> { + + if (!writeFuture.isSuccess()) { + future.completeExceptionally(writeFuture.cause()); + } + }); + + return future; + } + + protected Throwable cause(CompletableFuture future) { + try { + future.getNow(null); + return null; + } catch (CompletionException ex2) { + return ex2.getCause(); + } catch (CancellationException ex1) { + return ex1; + } + } +} diff --git a/src/test/java/io/lettuce/examples/ConnectToRedisWithPingInterval.java b/src/test/java/io/lettuce/examples/ConnectToRedisWithPingInterval.java new file mode 100644 index 0000000000..0102b9f762 --- /dev/null +++ b/src/test/java/io/lettuce/examples/ConnectToRedisWithPingInterval.java @@ -0,0 +1,30 @@ +package io.lettuce.examples; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; + +/** + * @author bodong.ybd + * @date 2022/11/11 + */ +public class ConnectToRedisWithPingInterval { + public static void main(String[] args) throws Exception { + RedisClient client = RedisClient.create(RedisURI.Builder.redis("localhost", 6379).build()); + client.setOptions(ClientOptions.builder().pingConnectionInterval(3000).build()); + StatefulRedisConnection connection = client.connect(); + + for (int i = 0; i < 1000; i++) { + try { + Thread.sleep(1000); + System.out.printf("%d:%s\n", i, connection.sync().set("" + i, "" + i)); + } catch (Exception e) { + e.printStackTrace(); + } + } + + connection.close(); + client.shutdown(); + } +}