Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ping connecion handler #2253

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class ClientOptions implements Serializable {

public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create();

public static final int DEFAULT_PING_CONNECTION_INTERVAL = 0;

private final boolean autoReconnect;

private final boolean cancelCommandsOnReconnectFailure;
Expand All @@ -87,6 +89,8 @@ public class ClientOptions implements Serializable {

private final TimeoutOptions timeoutOptions;

private final int pingConnectionInterval;


protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
Expand All @@ -102,6 +106,7 @@ protected ClientOptions(Builder builder) {
this.sslOptions = builder.sslOptions;
this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
this.timeoutOptions = builder.timeoutOptions;
this.pingConnectionInterval = builder.pingConnectionInterval;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -118,6 +123,7 @@ protected ClientOptions(ClientOptions original) {
this.sslOptions = original.getSslOptions();
this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure();
this.timeoutOptions = original.getTimeoutOptions();
this.pingConnectionInterval = original.getPingConnectionInterval();
}

/**
Expand Down Expand Up @@ -179,6 +185,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private int pingConnectionInterval = DEFAULT_PING_CONNECTION_INTERVAL;

protected Builder() {
}

Expand Down Expand Up @@ -389,6 +397,19 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
return this;
}

/**
* Set the period of sending PING command. see {@link PingConnectionHandler}
* @param pingConnectionInterval the period
* @return {@code this}
*/
public Builder pingConnectionInterval(int pingConnectionInterval) {
if (pingConnectionInterval <= 0) {
throw new IllegalArgumentException("PingConnectionInterval must be greater than 0, unit:ms");
}
this.pingConnectionInterval = pingConnectionInterval;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand Down Expand Up @@ -418,7 +439,8 @@ public ClientOptions.Builder mutate() {
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions())
.pingConnectionInterval(getPingConnectionInterval());

return builder;
}
Expand Down Expand Up @@ -592,6 +614,14 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Return the pingConnectionInterval
* @return the pingConnectionInterval
*/
public int getPingConnectionInterval() {
return pingConnectionInterval;
}

/**
* Behavior of connections in disconnected state.
*/
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class ConnectionBuilder {

private ConnectionWatchdog connectionWatchdog;

private PingConnectionHandler pingConnectionHandler;

private RedisURI redisURI;

public static ConnectionBuilder connectionBuilder() {
Expand Down Expand Up @@ -129,6 +131,10 @@ protected List<ChannelHandler> buildHandlers() {
handlers.add(createConnectionWatchdog());
}

if (clientOptions.getPingConnectionInterval() > 0) {
handlers.add(createPingConnectionHandler());
}

return handlers;
}

Expand All @@ -155,6 +161,17 @@ protected ConnectionWatchdog createConnectionWatchdog() {
return watchdog;
}

protected PingConnectionHandler createPingConnectionHandler() {

if (pingConnectionHandler != null) {
return pingConnectionHandler;
}

PingConnectionHandler handler = new PingConnectionHandler(clientResources, clientOptions);
pingConnectionHandler = handler;
return handler;
}

public ChannelInitializer<Channel> build(SocketAddress socketAddress) {
return new PlainChannelInitializer(this::buildHandlers, clientResources);
}
Expand Down
119 changes: 119 additions & 0 deletions src/main/java/io/lettuce/core/PingConnectionHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package io.lettuce.core;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.resource.ClientResources;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
* A netty {@link ChannelHandler} responsible for monitoring (By periodically sending PING commands) Redis alive.
*
* @author Bodong Yang
* @date 2022/11/11
*/
@ChannelHandler.Sharable
public class PingConnectionHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PingConnectionHandler.class);

private final RedisCommandBuilder<String, String> commandBuilder = new RedisCommandBuilder<>(StringCodec.UTF8);

private static final int MAX_PING_FAILED_TIMES = 3;

private final AtomicInteger pingFailed = new AtomicInteger(0);

private final ClientResources clientResources;

private final ClientOptions clientOptions;

public PingConnectionHandler(ClientResources clientResources, ClientOptions clientOptions) {
this.clientResources = clientResources;
this.clientOptions = clientOptions;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
sendPing(ctx);
ctx.fireChannelActive();
}

/**
* Periodically send the PING command, if it fails three times in a row, it will initiate a reconnection.
* Ignore exceptions returned by the following PING commands: {@link RedisLoadingException}
* {@link RedisBusyException}
*
* @param ctx the ctx
*/
private void sendPing(ChannelHandlerContext ctx) {
AsyncCommand<String, String, String> dispatch = dispatch(ctx.channel(), commandBuilder.ping());

clientResources.timer().newTimeout(timeout -> {
if (ctx.isRemoved() || !ctx.channel().isActive()) {
return;
}

if (dispatch.cancel(false) || cause(dispatch) != null) {

Throwable cause = cause(dispatch);

if (!(cause instanceof RedisLoadingException
|| cause instanceof RedisBusyException)) {
if (!dispatch.isCancelled()) {
logger.error("Unable to send PING command over channel: " + ctx.channel(), cause);
}

if (pingFailed.incrementAndGet() == MAX_PING_FAILED_TIMES) {
logger.error("channel: {} closed due to {} consecutive PING response timeout set in {} ms",
ctx.channel(), MAX_PING_FAILED_TIMES, clientOptions.getPingConnectionInterval());
pingFailed.set(0);
ctx.channel().close();
} else {
sendPing(ctx);
}
} else {
pingFailed.set(0);
sendPing(ctx);
}
} else {
pingFailed.set(0);
sendPing(ctx);
}
}, clientOptions.getPingConnectionInterval(), TimeUnit.MILLISECONDS);
}

private <T> AsyncCommand<String, String, T> dispatch(Channel channel, Command<String, String, T> command) {

AsyncCommand<String, String, T> future = new AsyncCommand<>(command);

channel.writeAndFlush(future).addListener(writeFuture -> {

if (!writeFuture.isSuccess()) {
future.completeExceptionally(writeFuture.cause());
}
});

return future;
}

protected Throwable cause(CompletableFuture<?> future) {
try {
future.getNow(null);
return null;
} catch (CompletionException ex2) {
return ex2.getCause();
} catch (CancellationException ex1) {
return ex1;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.lettuce.examples;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;

/**
* @author bodong.ybd
* @date 2022/11/11
*/
public class ConnectToRedisWithPingInterval {
public static void main(String[] args) throws Exception {
RedisClient client = RedisClient.create(RedisURI.Builder.redis("localhost", 6379).build());
client.setOptions(ClientOptions.builder().pingConnectionInterval(3000).build());
StatefulRedisConnection<String, String> connection = client.connect();

for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(1000);
System.out.printf("%d:%s\n", i, connection.sync().set("" + i, "" + i));
} catch (Exception e) {
e.printStackTrace();
}
}

connection.close();
client.shutdown();
}
}