Skip to content

Commit

Permalink
Introduce ping connecion handler
Browse files Browse the repository at this point in the history
PingConnectionHandler will periodically send PING commands to Redis Server, and decide whether to reconnect based on whether it fails (the current strategy is to reconnect after three consecutive failures)

This is essentially because KeepAlive that only relies on TCP is unreliable, for details, please refer to: #2082
  • Loading branch information
yangbodong22011 committed Nov 17, 2022
1 parent afeb1e7 commit 0ab96da
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 1 deletion.
32 changes: 31 additions & 1 deletion src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class ClientOptions implements Serializable {

public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create();

public static final int DEFAULT_PING_CONNECTION_INTERVAL = 0;

private final boolean autoReconnect;

private final boolean cancelCommandsOnReconnectFailure;
Expand All @@ -87,6 +89,8 @@ public class ClientOptions implements Serializable {

private final TimeoutOptions timeoutOptions;

private final int pingConnectionInterval;


protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
Expand All @@ -102,6 +106,7 @@ protected ClientOptions(Builder builder) {
this.sslOptions = builder.sslOptions;
this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
this.timeoutOptions = builder.timeoutOptions;
this.pingConnectionInterval = builder.pingConnectionInterval;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -118,6 +123,7 @@ protected ClientOptions(ClientOptions original) {
this.sslOptions = original.getSslOptions();
this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure();
this.timeoutOptions = original.getTimeoutOptions();
this.pingConnectionInterval = original.getPingConnectionInterval();
}

/**
Expand Down Expand Up @@ -179,6 +185,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private int pingConnectionInterval = DEFAULT_PING_CONNECTION_INTERVAL;

protected Builder() {
}

Expand Down Expand Up @@ -389,6 +397,19 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
return this;
}

/**
* Set the period of sending PING command. see {@link PingConnectionHandler}
* @param pingConnectionInterval the period
* @return {@code this}
*/
public Builder pingConnectionInterval(int pingConnectionInterval) {
if (pingConnectionInterval <= 0) {
throw new IllegalArgumentException("PingConnectionInterval must be greater than 0, unit:ms");
}
this.pingConnectionInterval = pingConnectionInterval;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand Down Expand Up @@ -418,7 +439,8 @@ public ClientOptions.Builder mutate() {
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions())
.pingConnectionInterval(getPingConnectionInterval());

return builder;
}
Expand Down Expand Up @@ -592,6 +614,14 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Return the pingConnectionInterval
* @return the pingConnectionInterval
*/
public int getPingConnectionInterval() {
return pingConnectionInterval;
}

/**
* Behavior of connections in disconnected state.
*/
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class ConnectionBuilder {

private ConnectionWatchdog connectionWatchdog;

private PingConnectionHandler pingConnectionHandler;

private RedisURI redisURI;

public static ConnectionBuilder connectionBuilder() {
Expand Down Expand Up @@ -129,6 +131,10 @@ protected List<ChannelHandler> buildHandlers() {
handlers.add(createConnectionWatchdog());
}

if (clientOptions.getPingConnectionInterval() > 0) {
handlers.add(createPingConnectionHandler());
}

return handlers;
}

Expand All @@ -155,6 +161,17 @@ protected ConnectionWatchdog createConnectionWatchdog() {
return watchdog;
}

protected PingConnectionHandler createPingConnectionHandler() {

if (pingConnectionHandler != null) {
return pingConnectionHandler;
}

PingConnectionHandler handler = new PingConnectionHandler(clientResources, clientOptions);
pingConnectionHandler = handler;
return handler;
}

public ChannelInitializer<Channel> build(SocketAddress socketAddress) {
return new PlainChannelInitializer(this::buildHandlers, clientResources);
}
Expand Down
119 changes: 119 additions & 0 deletions src/main/java/io/lettuce/core/PingConnectionHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package io.lettuce.core;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.resource.ClientResources;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
* A netty {@link ChannelHandler} responsible for monitoring (By periodically sending PING commands) Redis alive.
*
* @author Bodong Yang
* @date 2022/11/11
*/
@ChannelHandler.Sharable
public class PingConnectionHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PingConnectionHandler.class);

private final RedisCommandBuilder<String, String> commandBuilder = new RedisCommandBuilder<>(StringCodec.UTF8);

private static final int MAX_PING_FAILED_TIMES = 3;

private final AtomicInteger pingFailed = new AtomicInteger(0);

private final ClientResources clientResources;

private final ClientOptions clientOptions;

public PingConnectionHandler(ClientResources clientResources, ClientOptions clientOptions) {
this.clientResources = clientResources;
this.clientOptions = clientOptions;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
sendPing(ctx);
ctx.fireChannelActive();
}

/**
* Periodically send the PING command, if it fails three times in a row, it will initiate a reconnection.
* Ignore exceptions returned by the following PING commands: {@link RedisLoadingException}
* {@link RedisBusyException}
*
* @param ctx the ctx
*/
private void sendPing(ChannelHandlerContext ctx) {
AsyncCommand<String, String, String> dispatch = dispatch(ctx.channel(), commandBuilder.ping());

clientResources.timer().newTimeout(timeout -> {
if (ctx.isRemoved() || !ctx.channel().isActive()) {
return;
}

if (dispatch.cancel(false) || cause(dispatch) != null) {

Throwable cause = cause(dispatch);

if (!(cause instanceof RedisLoadingException
|| cause instanceof RedisBusyException)) {
if (!dispatch.isCancelled()) {
logger.error("Unable to send PING command over channel: " + ctx.channel(), cause);
}

if (pingFailed.incrementAndGet() == MAX_PING_FAILED_TIMES) {
logger.error("channel: {} closed due to {} consecutive PING response timeout set in {} ms",
ctx.channel(), MAX_PING_FAILED_TIMES, clientOptions.getPingConnectionInterval());
pingFailed.set(0);
ctx.channel().close();
} else {
sendPing(ctx);
}
} else {
pingFailed.set(0);
sendPing(ctx);
}
} else {
pingFailed.set(0);
sendPing(ctx);
}
}, clientOptions.getPingConnectionInterval(), TimeUnit.MILLISECONDS);
}

private <T> AsyncCommand<String, String, T> dispatch(Channel channel, Command<String, String, T> command) {

AsyncCommand<String, String, T> future = new AsyncCommand<>(command);

channel.writeAndFlush(future).addListener(writeFuture -> {

if (!writeFuture.isSuccess()) {
future.completeExceptionally(writeFuture.cause());
}
});

return future;
}

protected Throwable cause(CompletableFuture<?> future) {
try {
future.getNow(null);
return null;
} catch (CompletionException ex2) {
return ex2.getCause();
} catch (CancellationException ex1) {
return ex1;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.lettuce.examples;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;

/**
* @author bodong.ybd
* @date 2022/11/11
*/
public class ConnectToRedisWithPingInterval {
public static void main(String[] args) throws Exception {
RedisClient client = RedisClient.create(RedisURI.Builder.redis("localhost", 6379).build());
client.setOptions(ClientOptions.builder().pingConnectionInterval(3000).build());
StatefulRedisConnection<String, String> connection = client.connect();

for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(1000);
System.out.printf("%d:%s\n", i, connection.sync().set("" + i, "" + i));
} catch (Exception e) {
e.printStackTrace();
}
}

connection.close();
client.shutdown();
}
}

0 comments on commit 0ab96da

Please sign in to comment.