Skip to content

Commit

Permalink
Extract the common logic for reduce duplicate code
Browse files Browse the repository at this point in the history
  • Loading branch information
jjz921024 committed May 29, 2024
1 parent aea377b commit ad7d8ab
Showing 1 changed file with 10 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,67 +73,15 @@ public final <T> T broadcastCommand(CommandObject<T> commandObject) {

@Override
public final <T> T executeCommand(CommandObject<T> commandObject) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);

JedisRedirectionException redirect = null;
int consecutiveConnectionFailures = 0;
Exception lastException = null;
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
Connection connection = null;
try {
if (redirect != null) {
connection = provider.getConnection(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.executeCommand(Protocol.Command.ASKING);
}
} else {
connection = provider.getConnection(commandObject.getArguments());
}

return execute(connection, commandObject);

} catch (JedisClusterOperationException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
lastException = jce;
++consecutiveConnectionFailures;
log.debug("Failed connecting to Redis: {}", connection, jce);
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
if (reset) {
consecutiveConnectionFailures = 0;
redirect = null;
}
} catch (JedisRedirectionException jre) {
// avoid updating lastException if it is a connection exception
if (lastException == null || lastException instanceof JedisRedirectionException) {
lastException = jre;
}
log.debug("Redirected by server to {}", jre.getTargetNode());
consecutiveConnectionFailures = 0;
redirect = jre;
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
provider.renewSlotCache(connection);
}
} finally {
IOUtils.closeQuietly(connection);
}
if (Instant.now().isAfter(deadline)) {
throw new JedisClusterOperationException("Cluster retry deadline exceeded.");
}
}

JedisClusterOperationException maxAttemptsException
= new JedisClusterOperationException("No more cluster attempts left.");
maxAttemptsException.addSuppressed(lastException);
throw maxAttemptsException;
return doExecuteCommand(commandObject, false);
}

@Override
public <T> T executeCommandToReplica(CommandObject<T> commandObject) {
public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
return doExecuteCommand(commandObject, true);
}

private <T> T doExecuteCommand(CommandObject<T> commandObject, boolean toReplica) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);

JedisRedirectionException redirect = null;
Expand All @@ -145,10 +93,12 @@ public <T> T executeCommandToReplica(CommandObject<T> commandObject) {
if (redirect != null) {
connection = provider.getConnection(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.executeCommand(Protocol.Command.ASKING);
}
} else {
connection = provider.getReplicaConnection(commandObject.getArguments());
connection = toReplica ? provider.getReplicaConnection(commandObject.getArguments()) :
provider.getConnection(commandObject.getArguments());
}

return execute(connection, commandObject);
Expand Down Expand Up @@ -187,7 +137,7 @@ public <T> T executeCommandToReplica(CommandObject<T> commandObject) {
}

JedisClusterOperationException maxAttemptsException
= new JedisClusterOperationException("No more cluster attempts left.");
= new JedisClusterOperationException("No more cluster attempts left.");
maxAttemptsException.addSuppressed(lastException);
throw maxAttemptsException;
}
Expand Down

0 comments on commit ad7d8ab

Please sign in to comment.