diff --git a/.github/wordlist.txt b/.github/wordlist.txt index 12c2e35fe0..7d0d85f49d 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -7,6 +7,7 @@ BitPosParams BuilderFactory CFCommands CMSCommands +CallNotPermittedException CircuitBreaker ClientKillParams ClusterNode @@ -40,6 +41,7 @@ Jaeger Javadocs Jedis JedisCluster +JedisConnectionException JedisPool JedisPooled JedisShardInfo @@ -88,6 +90,7 @@ StatusCode StreamEntryID TCP TOPKCommands +Throwable TimeSeriesCommands URI UnblockType diff --git a/docs/failover.md b/docs/failover.md index 8414e41376..38fdc8c97b 100644 --- a/docs/failover.md +++ b/docs/failover.md @@ -99,8 +99,8 @@ Jedis uses the following retry settings: | Max retry attempts | 3 | Maximum number of retry attempts (including the initial call) | | Retry wait duration | 500 ms | Number of milliseconds to wait between retry attempts | | Wait duration backoff multiplier | 2 | Exponential backoff factor multiplied against wait duration between retries. For example, with a wait duration of 1 second and a multiplier of 2, the retries would occur after 1s, 2s, 4s, 8s, 16s, and so on. | -| Retry included exception list | `JedisConnectionException` | A list of `Throwable` classes that count as failures and should be retried. | -| Retry ignored exception list | Empty list | A list of `Throwable` classes to explicitly ignore for the purposes of retry. | +| Retry included exception list | [JedisConnectionException] | A list of Throwable classes that count as failures and should be retried. | +| Retry ignored exception list | null | A list of Throwable classes to explicitly ignore for the purposes of retry. | To disable retry, set `maxRetryAttempts` to 1. @@ -116,8 +116,16 @@ Jedis uses the following circuit breaker settings: | Failure rate threshold | `50.0f` | Percentage of calls within the sliding window that must fail before the circuit breaker transitions to the `OPEN` state. | | Slow call duration threshold | 60000 ms | Duration threshold above which calls are classified as slow and added to the sliding window. | | Slow call rate threshold | `100.0f` | Percentage of calls within the sliding window that exceed the slow call duration threshold before circuit breaker transitions to the `OPEN` state. | -| Circuit breaker included exception list | `JedisConnectionException` | A list of `Throwable` classes that count as failures and add to the failure rate. | -| Circuit breaker ignored exception list | Empty list | A list of `Throwable` classes to explicitly ignore for failure rate calculations. | | +| Circuit breaker included exception list | [JedisConnectionException] | A list of Throwable classes that count as failures and add to the failure rate. | +| Circuit breaker ignored exception list | null | A list of Throwable classes to explicitly ignore for failure rate calculations. | | + +### Fallback configuration + +Jedis uses the following fallback settings: + +| Setting | Default value | Description | +|-------------------------|-------------------------------------------------------|----------------------------------------------------| +| Fallback exception list | [CallNotPermittedException, JedisConnectionException] | A list of Throwable classes that trigger fallback. | ### Failover callbacks diff --git a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java index c39efae7d4..980bbb91f9 100644 --- a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java +++ b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java @@ -1,13 +1,15 @@ package redis.clients.jedis; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.SlidingWindowType; -import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.jedis.exceptions.JedisValidationException; import java.time.Duration; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisValidationException; + /** * @author Allen Terleto (aterleto) @@ -22,12 +24,13 @@ * not passed through to Jedis users. *

*/ +// TODO: move public final class MultiClusterClientConfig { private static final int RETRY_MAX_ATTEMPTS_DEFAULT = 3; private static final int RETRY_WAIT_DURATION_DEFAULT = 500; // measured in milliseconds private static final int RETRY_WAIT_DURATION_EXPONENTIAL_BACKOFF_MULTIPLIER_DEFAULT = 2; - private static final Class RETRY_INCLUDED_EXCEPTIONS_DEFAULT = JedisConnectionException.class; + private static final List RETRY_INCLUDED_EXCEPTIONS_DEFAULT = Arrays.asList(JedisConnectionException.class); private static final float CIRCUIT_BREAKER_FAILURE_RATE_THRESHOLD_DEFAULT = 50.0f; // measured as percentage private static final int CIRCUIT_BREAKER_SLIDING_WINDOW_MIN_CALLS_DEFAULT = 100; @@ -35,7 +38,10 @@ public final class MultiClusterClientConfig { private static final int CIRCUIT_BREAKER_SLIDING_WINDOW_SIZE_DEFAULT = 100; private static final int CIRCUIT_BREAKER_SLOW_CALL_DURATION_THRESHOLD_DEFAULT = 60000; // measured in milliseconds private static final float CIRCUIT_BREAKER_SLOW_CALL_RATE_THRESHOLD_DEFAULT = 100.0f; // measured as percentage - private static final Class CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT = JedisConnectionException.class; + private static final List CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT = Arrays.asList(JedisConnectionException.class); + + private static final List> FALLBACK_EXCEPTIONS_DEFAULT = + Arrays.asList(CallNotPermittedException.class, JedisConnectionException.class); private final ClusterConfig[] clusterConfigs; @@ -99,6 +105,7 @@ public final class MultiClusterClientConfig { * failure nor success, even if the exceptions is part of recordExceptions */ private List circuitBreakerIgnoreExceptionList; + private List> fallbackExceptionList; public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) { this.clusterConfigs = clusterConfigs; @@ -160,6 +167,10 @@ public SlidingWindowType getCircuitBreakerSlidingWindowType() { return circuitBreakerSlidingWindowType; } + public List> getFallbackExceptionList() { + return fallbackExceptionList; + } + public static class ClusterConfig { private int priority; @@ -195,8 +206,8 @@ public static class Builder { private int retryMaxAttempts = RETRY_MAX_ATTEMPTS_DEFAULT; private int retryWaitDuration = RETRY_WAIT_DURATION_DEFAULT; private int retryWaitDurationExponentialBackoffMultiplier = RETRY_WAIT_DURATION_EXPONENTIAL_BACKOFF_MULTIPLIER_DEFAULT; - private List retryIncludedExceptionList; - private List retryIgnoreExceptionList; + private List retryIncludedExceptionList = RETRY_INCLUDED_EXCEPTIONS_DEFAULT; + private List retryIgnoreExceptionList = null; private float circuitBreakerFailureRateThreshold = CIRCUIT_BREAKER_FAILURE_RATE_THRESHOLD_DEFAULT; private int circuitBreakerSlidingWindowMinCalls = CIRCUIT_BREAKER_SLIDING_WINDOW_MIN_CALLS_DEFAULT; @@ -204,9 +215,9 @@ public static class Builder { private int circuitBreakerSlidingWindowSize = CIRCUIT_BREAKER_SLIDING_WINDOW_SIZE_DEFAULT; private int circuitBreakerSlowCallDurationThreshold = CIRCUIT_BREAKER_SLOW_CALL_DURATION_THRESHOLD_DEFAULT; private float circuitBreakerSlowCallRateThreshold = CIRCUIT_BREAKER_SLOW_CALL_RATE_THRESHOLD_DEFAULT; - private List circuitBreakerIncludedExceptionList; - private List circuitBreakerIgnoreExceptionList; - private List> circuitBreakerFallbackExceptionList; + private List circuitBreakerIncludedExceptionList = CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT; + private List circuitBreakerIgnoreExceptionList = null; + private List> fallbackExceptionList = FALLBACK_EXCEPTIONS_DEFAULT; public Builder(ClusterConfig[] clusterConfigs) { @@ -219,6 +230,10 @@ public Builder(ClusterConfig[] clusterConfigs) { this.clusterConfigs = clusterConfigs; } + public Builder(List clusterConfigs) { + this(clusterConfigs.toArray(new ClusterConfig[0])); + } + public Builder retryMaxAttempts(int retryMaxAttempts) { this.retryMaxAttempts = retryMaxAttempts; return this; @@ -284,8 +299,16 @@ public Builder circuitBreakerIgnoreExceptionList(List circuitBreakerIgnor return this; } + /** + * @deprecated Use {@link #fallbackExceptionList(java.util.List)}. + */ + @Deprecated public Builder circuitBreakerFallbackExceptionList(List> circuitBreakerFallbackExceptionList) { - this.circuitBreakerFallbackExceptionList = circuitBreakerFallbackExceptionList; + return fallbackExceptionList(circuitBreakerFallbackExceptionList); + } + + public Builder fallbackExceptionList(List> fallbackExceptionList) { + this.fallbackExceptionList = fallbackExceptionList; return this; } @@ -296,16 +319,9 @@ public MultiClusterClientConfig build() { config.retryWaitDuration = Duration.ofMillis(this.retryWaitDuration); config.retryWaitDurationExponentialBackoffMultiplier = this.retryWaitDurationExponentialBackoffMultiplier; - if (this.retryIncludedExceptionList != null && !retryIncludedExceptionList.isEmpty()) - config.retryIncludedExceptionList = this.retryIncludedExceptionList; - - else { - config.retryIncludedExceptionList = new ArrayList<>(); - config.retryIncludedExceptionList.add(RETRY_INCLUDED_EXCEPTIONS_DEFAULT); - } + config.retryIncludedExceptionList = this.retryIncludedExceptionList; - if (this.retryIgnoreExceptionList != null && !retryIgnoreExceptionList.isEmpty()) - config.retryIgnoreExceptionList = this.retryIgnoreExceptionList; + config.retryIgnoreExceptionList = this.retryIgnoreExceptionList; config.circuitBreakerFailureRateThreshold = this.circuitBreakerFailureRateThreshold; config.circuitBreakerSlidingWindowMinCalls = this.circuitBreakerSlidingWindowMinCalls; @@ -314,16 +330,11 @@ public MultiClusterClientConfig build() { config.circuitBreakerSlowCallDurationThreshold = Duration.ofMillis(this.circuitBreakerSlowCallDurationThreshold); config.circuitBreakerSlowCallRateThreshold = this.circuitBreakerSlowCallRateThreshold; - if (this.circuitBreakerIncludedExceptionList != null && !circuitBreakerIncludedExceptionList.isEmpty()) - config.circuitBreakerIncludedExceptionList = this.circuitBreakerIncludedExceptionList; + config.circuitBreakerIncludedExceptionList = this.circuitBreakerIncludedExceptionList; - else { - config.circuitBreakerIncludedExceptionList = new ArrayList<>(); - config.circuitBreakerIncludedExceptionList.add(CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT); - } + config.circuitBreakerIgnoreExceptionList = this.circuitBreakerIgnoreExceptionList; - if (this.circuitBreakerIgnoreExceptionList != null && !circuitBreakerIgnoreExceptionList.isEmpty()) - config.circuitBreakerIgnoreExceptionList = this.circuitBreakerIgnoreExceptionList; + config.fallbackExceptionList = this.fallbackExceptionList; return config; } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 234b73bda9..cba60115b3 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -54,6 +54,7 @@ public final class Protocol { private static final String CLUSTERDOWN_PREFIX = "CLUSTERDOWN "; private static final String BUSY_PREFIX = "BUSY "; private static final String NOSCRIPT_PREFIX = "NOSCRIPT "; + private static final String NOAUTH_PREFIX = "NOAUTH"; private static final String WRONGPASS_PREFIX = "WRONGPASS"; private static final String NOPERM_PREFIX = "NOPERM"; @@ -97,9 +98,9 @@ private static void processError(final RedisInputStream is) { throw new JedisBusyException(message); } else if (message.startsWith(NOSCRIPT_PREFIX)) { throw new JedisNoScriptException(message); - } else if (message.startsWith(WRONGPASS_PREFIX)) { - throw new JedisAccessControlException(message); - } else if (message.startsWith(NOPERM_PREFIX)) { + } else if (message.startsWith(NOAUTH_PREFIX) + || message.startsWith(WRONGPASS_PREFIX) + || message.startsWith(NOPERM_PREFIX)) { throw new JedisAccessControlException(message); } throw new JedisDataException(message); diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java index 7a857174cb..38b32bbad0 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java @@ -32,8 +32,8 @@ public T executeCommand(CommandObject commandObject) { supplier.withRetry(cluster.getRetry()); supplier.withCircuitBreaker(cluster.getCircuitBreaker()); - supplier.withFallback(defaultCircuitBreakerFallbackException, - e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker())); + supplier.withFallback(provider.getFallbackExceptionList(), + e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker())); return supplier.decorate().get(); } diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java index 2228233849..b06d7b9604 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java @@ -1,11 +1,6 @@ package redis.clients.jedis.mcf; -import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; - -import java.util.Arrays; -import java.util.List; - import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; import redis.clients.jedis.util.IOUtils; @@ -21,9 +16,6 @@ */ public class CircuitBreakerFailoverBase implements AutoCloseable { - protected final static List> defaultCircuitBreakerFallbackException = - Arrays.asList(CallNotPermittedException.class); - protected final MultiClusterPooledConnectionProvider provider; public CircuitBreakerFailoverBase(MultiClusterPooledConnectionProvider provider) { diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java index dad2c751c8..10a0823973 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java @@ -26,8 +26,8 @@ public Connection getConnection() { supplier.withRetry(cluster.getRetry()); supplier.withCircuitBreaker(cluster.getCircuitBreaker()); - supplier.withFallback(defaultCircuitBreakerFallbackException, - e -> this.handleClusterFailover(cluster.getCircuitBreaker())); + supplier.withFallback(provider.getFallbackExceptionList(), + e -> this.handleClusterFailover(cluster.getCircuitBreaker())); return supplier.decorate().get(); } diff --git a/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java b/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java index 94f686c2d7..d4052dae7b 100644 --- a/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java @@ -22,12 +22,13 @@ public class MultiClusterPipeline extends PipelineBase implements Closeable { public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider) { super(new CommandObjects()); - try (Connection connection = pooledProvider.getConnection()) { // we don't need a healthy connection now + + this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider); + + try (Connection connection = failoverProvider.getConnection()) { RedisProtocol proto = connection.getRedisProtocol(); if (proto != null) this.commandObjects.setProtocol(proto); } - - this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider); } @Override diff --git a/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java b/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java index 5ce9ecd9b5..540911f2d6 100644 --- a/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java +++ b/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java @@ -26,7 +26,7 @@ public class MultiClusterTransaction extends TransactionBase { private static final Builder NO_OP_BUILDER = BuilderFactory.RAW_OBJECT; - private final CircuitBreakerFailoverConnectionProvider provider; + private final CircuitBreakerFailoverConnectionProvider failoverProvider; private final AtomicInteger extraCommandCount = new AtomicInteger(); private final Queue>> commands = new LinkedList<>(); @@ -50,13 +50,13 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) { * @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI */ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti) { - try (Connection connection = provider.getConnection()) { // we don't need a healthy connection now + this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider); + + try (Connection connection = failoverProvider.getConnection()) { RedisProtocol proto = connection.getRedisProtocol(); if (proto != null) this.commandObjects.setProtocol(proto); } - this.provider = new CircuitBreakerFailoverConnectionProvider(provider); - if (doMulti) multi(); } @@ -129,7 +129,7 @@ public final List exec() { throw new IllegalStateException("EXEC without MULTI"); } - try (Connection connection = provider.getConnection()) { + try (Connection connection = failoverProvider.getConnection()) { commands.forEach((command) -> connection.sendCommand(command.getKey())); // following connection.getMany(int) flushes anyway, so no flush here. @@ -174,7 +174,7 @@ public final String discard() { throw new IllegalStateException("DISCARD without MULTI"); } - try (Connection connection = provider.getConnection()) { + try (Connection connection = failoverProvider.getConnection()) { commands.forEach((command) -> connection.sendCommand(command.getKey())); // following connection.getMany(int) flushes anyway, so no flush here. diff --git a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java index abe5515b97..e6013a2c58 100644 --- a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java @@ -8,18 +8,21 @@ import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryConfig; import io.github.resilience4j.retry.RetryRegistry; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import redis.clients.jedis.*; import redis.clients.jedis.MultiClusterClientConfig.ClusterConfig; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisValidationException; import redis.clients.jedis.util.Pool; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; - /** * @author Allen Terleto (aterleto) @@ -31,6 +34,7 @@ * Support for manual failback is provided by way of {@link #setActiveMultiClusterIndex(int)} *

*/ +// TODO: move? public class MultiClusterPooledConnectionProvider implements ConnectionProvider { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -62,6 +66,7 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider */ private Consumer clusterFailoverPostProcessor; + private List> fallbackExceptionList; public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiClusterClientConfig) { @@ -78,7 +83,7 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste retryConfigBuilder.retryExceptions(multiClusterClientConfig.getRetryIncludedExceptionList().stream().toArray(Class[]::new)); List retryIgnoreExceptionList = multiClusterClientConfig.getRetryIgnoreExceptionList(); - if (retryIgnoreExceptionList != null && !retryIgnoreExceptionList.isEmpty()) + if (retryIgnoreExceptionList != null) retryConfigBuilder.ignoreExceptions(retryIgnoreExceptionList.stream().toArray(Class[]::new)); RetryConfig retryConfig = retryConfigBuilder.build(); @@ -96,7 +101,7 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste circuitBreakerConfigBuilder.automaticTransitionFromOpenToHalfOpenEnabled(false); // State transitions are forced. No half open states are used List circuitBreakerIgnoreExceptionList = multiClusterClientConfig.getCircuitBreakerIgnoreExceptionList(); - if (circuitBreakerIgnoreExceptionList != null && !circuitBreakerIgnoreExceptionList.isEmpty()) + if (circuitBreakerIgnoreExceptionList != null) circuitBreakerConfigBuilder.ignoreExceptions(circuitBreakerIgnoreExceptionList.stream().toArray(Class[]::new)); CircuitBreakerConfig circuitBreakerConfig = circuitBreakerConfigBuilder.build(); @@ -123,10 +128,14 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste circuitBreakerEventPublisher.onSlowCallRateExceeded(event -> log.error(String.valueOf(event))); circuitBreakerEventPublisher.onStateTransition(event -> log.warn(String.valueOf(event))); - multiClusterMap.put(config.getPriority(), new Cluster(new ConnectionPool(config.getHostAndPort(), - config.getJedisClientConfig()), - retry, circuitBreaker)); + multiClusterMap.put(config.getPriority(), + new Cluster(new ConnectionPool(config.getHostAndPort(), + config.getJedisClientConfig()), retry, circuitBreaker)); } + + /// --- /// + + this.fallbackExceptionList = multiClusterClientConfig.getFallbackExceptionList(); } /** @@ -289,6 +298,10 @@ public void setClusterFailoverPostProcessor(Consumer clusterFailoverPost this.clusterFailoverPostProcessor = clusterFailoverPostProcessor; } + public List> getFallbackExceptionList() { + return fallbackExceptionList; + } + public static class Cluster { private final ConnectionPool connectionPool; diff --git a/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java b/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java index 4f8f896afc..dc03c52f39 100644 --- a/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java +++ b/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java @@ -1,11 +1,20 @@ package redis.clients.jedis.misc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import redis.clients.jedis.AbstractPipeline; import redis.clients.jedis.AbstractTransaction; @@ -16,51 +25,50 @@ import redis.clients.jedis.JedisClientConfig; import redis.clients.jedis.MultiClusterClientConfig; import redis.clients.jedis.UnifiedJedis; +import redis.clients.jedis.exceptions.JedisAccessControlException; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; +import redis.clients.jedis.util.IOUtils; public class AutomaticFailoverTest { - private final HostAndPort hostAndPort1 = HostAndPorts.getRedisServers().get(0); - private final HostAndPort hostAndPort2 = HostAndPorts.getRedisServers().get(1); + private static final Logger log = LoggerFactory.getLogger(AutomaticFailoverTest.class); + + private final HostAndPort hostPort_1 = new HostAndPort(HostAndPorts.getRedisServers().get(0).getHost(), 6378); + private final HostAndPort hostPort_1_2 = HostAndPorts.getRedisServers().get(0); + private final HostAndPort hostPort_2 = HostAndPorts.getRedisServers().get(7); - private final JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().password("foobared").build(); + private final JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().build(); - private Jedis jedis1; private Jedis jedis2; - private MultiClusterPooledConnectionProvider provider; + private List getClusterConfigs( + JedisClientConfig clientConfig, HostAndPort... hostPorts) { + return Arrays.stream(hostPorts) + .map(hp -> new MultiClusterClientConfig.ClusterConfig(hp, clientConfig)) + .collect(Collectors.toList()); + } @Before public void setUp() { - - MultiClusterClientConfig.ClusterConfig[] clusterConfigs = new MultiClusterClientConfig.ClusterConfig[2]; - clusterConfigs[0] = new MultiClusterClientConfig.ClusterConfig(hostAndPort1, clientConfig); - clusterConfigs[1] = new MultiClusterClientConfig.ClusterConfig(hostAndPort2, clientConfig); - - provider = new MultiClusterPooledConnectionProvider(new MultiClusterClientConfig.Builder(clusterConfigs).build()); - - jedis1 = new Jedis(hostAndPort1, clientConfig); - jedis1.flushAll(); - jedis2 = new Jedis(hostAndPort2, clientConfig); + jedis2 = new Jedis(hostPort_2, clientConfig); jedis2.flushAll(); } @After public void cleanUp() { - - provider.close(); - - jedis1.close(); - jedis2.close(); + IOUtils.closeQuietly(jedis2); } @Test public void pipelineWithSwitch() { + MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider( + new MultiClusterClientConfig.Builder(getClusterConfigs(clientConfig, hostPort_1, hostPort_2)).build()); + try (UnifiedJedis client = new UnifiedJedis(provider)) { AbstractPipeline pipe = client.pipelined(); pipe.set("pstr", "foobar"); pipe.hset("phash", "foo", "bar"); - provider.incrementActiveMultiClusterIndex(); + //provider.incrementActiveMultiClusterIndex(); pipe.sync(); } @@ -70,15 +78,116 @@ public void pipelineWithSwitch() { @Test public void transactionWithSwitch() { + MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider( + new MultiClusterClientConfig.Builder(getClusterConfigs(clientConfig, hostPort_1, hostPort_2)).build()); + try (UnifiedJedis client = new UnifiedJedis(provider)) { AbstractTransaction tx = client.multi(); tx.set("tstr", "foobar"); tx.hset("thash", "foo", "bar"); - provider.incrementActiveMultiClusterIndex(); + //provider.incrementActiveMultiClusterIndex(); assertEquals(Arrays.asList("OK", Long.valueOf(1L)), tx.exec()); } assertEquals("foobar", jedis2.get("tstr")); assertEquals("bar", jedis2.hget("thash", "foo")); } + + @Test + public void commandFailover() { + int slidingWindowMinCalls = 10; + int slidingWindowSize = 10; + + MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder( + getClusterConfigs(clientConfig, hostPort_1, hostPort_2)) + .circuitBreakerSlidingWindowMinCalls(slidingWindowMinCalls) + .circuitBreakerSlidingWindowSize(slidingWindowSize); + + RedisFailoverReporter failoverReporter = new RedisFailoverReporter(); + MultiClusterPooledConnectionProvider cacheProvider = new MultiClusterPooledConnectionProvider(builder.build()); + cacheProvider.setClusterFailoverPostProcessor(failoverReporter); + + UnifiedJedis jedis = new UnifiedJedis(cacheProvider); + + assertFalse(failoverReporter.failedOver); + log.info("Starting calls to Redis"); + String key = "hash-" + System.nanoTime(); + jedis.hset(key, "f1", "v1"); + assertTrue(failoverReporter.failedOver); + + assertEquals(Collections.singletonMap("f1", "v1"), jedis.hgetAll(key)); + jedis.flushAll(); + + jedis.close(); + } + + @Test + public void pipelineFailover() { + int slidingWindowMinCalls = 10; + int slidingWindowSize = 10; + + MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder( + getClusterConfigs(clientConfig, hostPort_1, hostPort_2)) + .circuitBreakerSlidingWindowMinCalls(slidingWindowMinCalls) + .circuitBreakerSlidingWindowSize(slidingWindowSize); + + RedisFailoverReporter failoverReporter = new RedisFailoverReporter(); + MultiClusterPooledConnectionProvider cacheProvider = new MultiClusterPooledConnectionProvider(builder.build()); + cacheProvider.setClusterFailoverPostProcessor(failoverReporter); + + UnifiedJedis jedis = new UnifiedJedis(cacheProvider); + + assertFalse(failoverReporter.failedOver); + log.info("Starting calls to Redis"); + AbstractPipeline pipe = jedis.pipelined(); + String key = "hash-" + System.nanoTime(); + pipe.hset(key, "f1", "v1"); + pipe.sync(); + assertTrue(failoverReporter.failedOver); + + assertEquals(Collections.singletonMap("f1", "v1"), jedis.hgetAll(key)); + jedis.flushAll(); + + jedis.close(); + } + + @Test + public void failoverFromAuthError() { + int slidingWindowMinCalls = 10; + int slidingWindowSize = 10; + + MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder( + getClusterConfigs(clientConfig, hostPort_1_2, hostPort_2)) + .circuitBreakerSlidingWindowMinCalls(slidingWindowMinCalls) + .circuitBreakerSlidingWindowSize(slidingWindowSize) + .fallbackExceptionList(Arrays.asList(JedisAccessControlException.class)); + + RedisFailoverReporter failoverReporter = new RedisFailoverReporter(); + MultiClusterPooledConnectionProvider cacheProvider = new MultiClusterPooledConnectionProvider(builder.build()); + cacheProvider.setClusterFailoverPostProcessor(failoverReporter); + + UnifiedJedis jedis = new UnifiedJedis(cacheProvider); + + assertFalse(failoverReporter.failedOver); + log.info("Starting calls to Redis"); + String key = "hash-" + System.nanoTime(); + jedis.hset(key, "f1", "v1"); + assertTrue(failoverReporter.failedOver); + + assertEquals(Collections.singletonMap("f1", "v1"), jedis.hgetAll(key)); + jedis.flushAll(); + + jedis.close(); + } + + class RedisFailoverReporter implements Consumer { + + boolean failedOver = false; + + @Override + public void accept(String clusterName) { + log.info("Jedis fail over to cluster: " + clusterName); + failedOver = true; + } + } }