diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index 42dbf9c0a5..6f8ec3987c 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -61,6 +61,8 @@ public class ClientOptions implements Serializable { public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create(); + public static final int DEFAULT_PING_CONNECTION_INTERVAL = 0; + private final boolean autoReconnect; private final boolean cancelCommandsOnReconnectFailure; @@ -87,6 +89,8 @@ public class ClientOptions implements Serializable { private final TimeoutOptions timeoutOptions; + private final int pingConnectionInterval; + protected ClientOptions(Builder builder) { this.autoReconnect = builder.autoReconnect; @@ -102,6 +106,7 @@ protected ClientOptions(Builder builder) { this.sslOptions = builder.sslOptions; this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure; this.timeoutOptions = builder.timeoutOptions; + this.pingConnectionInterval = builder.pingConnectionInterval; } protected ClientOptions(ClientOptions original) { @@ -118,6 +123,7 @@ protected ClientOptions(ClientOptions original) { this.sslOptions = original.getSslOptions(); this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure(); this.timeoutOptions = original.getTimeoutOptions(); + this.pingConnectionInterval = original.getPingConnectionInterval(); } /** @@ -179,6 +185,8 @@ public static class Builder { private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS; + private int pingConnectionInterval = DEFAULT_PING_CONNECTION_INTERVAL; + protected Builder() { } @@ -389,6 +397,19 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) { return this; } + /** + * Set the period of sending PING command. see {@link PingConnectionHandler} + * @param pingConnectionInterval the period + * @return {@code this} + */ + public Builder pingConnectionInterval(int pingConnectionInterval) { + if (pingConnectionInterval <= 0) { + throw new IllegalArgumentException("PingConnectionInterval must be greater than 0, unit:ms"); + } + this.pingConnectionInterval = pingConnectionInterval; + return this; + } + /** * Create a new instance of {@link ClientOptions}. * @@ -418,7 +439,8 @@ public ClientOptions.Builder mutate() { .publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection()) .protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize()) .scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions()) - .suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions()); + .suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions()) + .pingConnectionInterval(getPingConnectionInterval()); return builder; } @@ -592,6 +614,14 @@ public TimeoutOptions getTimeoutOptions() { return timeoutOptions; } + /** + * Return the pingConnectionInterval + * @return the pingConnectionInterval + */ + public int getPingConnectionInterval() { + return pingConnectionInterval; + } + /** * Behavior of connections in disconnected state. */ diff --git a/src/main/java/io/lettuce/core/ConnectionBuilder.java b/src/main/java/io/lettuce/core/ConnectionBuilder.java index 94c11fdd9b..a02d625adf 100644 --- a/src/main/java/io/lettuce/core/ConnectionBuilder.java +++ b/src/main/java/io/lettuce/core/ConnectionBuilder.java @@ -87,6 +87,8 @@ public class ConnectionBuilder { private ConnectionWatchdog connectionWatchdog; + private PingConnectionHandler pingConnectionHandler; + private RedisURI redisURI; public static ConnectionBuilder connectionBuilder() { @@ -129,6 +131,10 @@ protected List buildHandlers() { handlers.add(createConnectionWatchdog()); } + if (clientOptions.getPingConnectionInterval() > 0) { + handlers.add(createPingConnectionHandler()); + } + return handlers; } @@ -155,6 +161,17 @@ protected ConnectionWatchdog createConnectionWatchdog() { return watchdog; } + protected PingConnectionHandler createPingConnectionHandler() { + + if (pingConnectionHandler != null) { + return pingConnectionHandler; + } + + PingConnectionHandler handler = new PingConnectionHandler(clientResources, clientOptions); + pingConnectionHandler = handler; + return handler; + } + public ChannelInitializer build(SocketAddress socketAddress) { return new PlainChannelInitializer(this::buildHandlers, clientResources); } diff --git a/src/main/java/io/lettuce/core/PingConnectionHandler.java b/src/main/java/io/lettuce/core/PingConnectionHandler.java new file mode 100644 index 0000000000..2d98d04c99 --- /dev/null +++ b/src/main/java/io/lettuce/core/PingConnectionHandler.java @@ -0,0 +1,119 @@ +package io.lettuce.core; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.protocol.AsyncCommand; +import io.lettuce.core.protocol.Command; +import io.lettuce.core.resource.ClientResources; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * A netty {@link ChannelHandler} responsible for monitoring (By periodically sending PING commands) Redis alive. + * + * @author Bodong Yang + * @date 2022/11/11 + */ +@ChannelHandler.Sharable +public class PingConnectionHandler extends ChannelInboundHandlerAdapter { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(PingConnectionHandler.class); + + private final RedisCommandBuilder commandBuilder = new RedisCommandBuilder<>(StringCodec.UTF8); + + private static final int MAX_PING_FAILED_TIMES = 3; + + private final AtomicInteger pingFailed = new AtomicInteger(0); + + private final ClientResources clientResources; + + private final ClientOptions clientOptions; + + public PingConnectionHandler(ClientResources clientResources, ClientOptions clientOptions) { + this.clientResources = clientResources; + this.clientOptions = clientOptions; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + sendPing(ctx); + ctx.fireChannelActive(); + } + + /** + * Periodically send the PING command, if it fails three times in a row, it will initiate a reconnection. + * Ignore exceptions returned by the following PING commands: {@link RedisLoadingException} + * {@link RedisBusyException} + * + * @param ctx the ctx + */ + private void sendPing(ChannelHandlerContext ctx) { + AsyncCommand dispatch = dispatch(ctx.channel(), commandBuilder.ping()); + + clientResources.timer().newTimeout(timeout -> { + if (ctx.isRemoved() || !ctx.channel().isActive()) { + return; + } + + if (dispatch.cancel(false) || cause(dispatch) != null) { + + Throwable cause = cause(dispatch); + + if (!(cause instanceof RedisLoadingException + || cause instanceof RedisBusyException)) { + if (!dispatch.isCancelled()) { + logger.error("Unable to send PING command over channel: " + ctx.channel(), cause); + } + + if (pingFailed.incrementAndGet() == MAX_PING_FAILED_TIMES) { + logger.error("channel: {} closed due to {} consecutive PING response timeout set in {} ms", + ctx.channel(), MAX_PING_FAILED_TIMES, clientOptions.getPingConnectionInterval()); + pingFailed.set(0); + ctx.channel().close(); + } else { + sendPing(ctx); + } + } else { + pingFailed.set(0); + sendPing(ctx); + } + } else { + pingFailed.set(0); + sendPing(ctx); + } + }, clientOptions.getPingConnectionInterval(), TimeUnit.MILLISECONDS); + } + + private AsyncCommand dispatch(Channel channel, Command command) { + + AsyncCommand future = new AsyncCommand<>(command); + + channel.writeAndFlush(future).addListener(writeFuture -> { + + if (!writeFuture.isSuccess()) { + future.completeExceptionally(writeFuture.cause()); + } + }); + + return future; + } + + protected Throwable cause(CompletableFuture future) { + try { + future.getNow(null); + return null; + } catch (CompletionException ex2) { + return ex2.getCause(); + } catch (CancellationException ex1) { + return ex1; + } + } +} diff --git a/src/test/java/io/lettuce/examples/ConnectToRedisWithPingInterval.java b/src/test/java/io/lettuce/examples/ConnectToRedisWithPingInterval.java new file mode 100644 index 0000000000..0102b9f762 --- /dev/null +++ b/src/test/java/io/lettuce/examples/ConnectToRedisWithPingInterval.java @@ -0,0 +1,30 @@ +package io.lettuce.examples; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; + +/** + * @author bodong.ybd + * @date 2022/11/11 + */ +public class ConnectToRedisWithPingInterval { + public static void main(String[] args) throws Exception { + RedisClient client = RedisClient.create(RedisURI.Builder.redis("localhost", 6379).build()); + client.setOptions(ClientOptions.builder().pingConnectionInterval(3000).build()); + StatefulRedisConnection connection = client.connect(); + + for (int i = 0; i < 1000; i++) { + try { + Thread.sleep(1000); + System.out.printf("%d:%s\n", i, connection.sync().set("" + i, "" + i)); + } catch (Exception e) { + e.printStackTrace(); + } + } + + connection.close(); + client.shutdown(); + } +}