Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Retry on connection closed #5515

Merged
merged 1 commit into from
Jun 1, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.StreamPublisher;
import io.confluent.ksql.rest.entity.CommandStatus;
Expand All @@ -47,12 +48,14 @@
import io.confluent.ksql.util.ParserUtil;
import io.confluent.ksql.util.WelcomeMsgUtils;
import io.vertx.core.Context;
import io.vertx.core.VertxException;
import java.io.Closeable;
import java.io.PrintWriter;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
Expand All @@ -67,6 +70,8 @@ public class Cli implements KsqlRequestExecutor, Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);

private static final int MAX_RETRIES = 10;

private static final ClassHandlerMap2<StatementContext, Cli, String> STATEMENT_HANDLERS =
HandlerMaps
.forClass(StatementContext.class)
Expand Down Expand Up @@ -137,19 +142,40 @@ private <R> RestResponse<R> makeKsqlRequest(
final Long commandSequenceNumberToWaitFor = remoteServerState.getRequestPipelining()
? null
: remoteServerState.getLastCommandSequenceNumber();
final RestResponse<R> response = requestIssuer.apply(ksql, commandSequenceNumberToWaitFor);

if (isSequenceNumberTimeout(response)) {
terminal.writer().printf(
"Error: command not executed since the server timed out "
+ "while waiting for prior commands to finish executing.%n"
+ "If you wish to execute new commands without waiting for "
+ "prior commands to finish, run the command '%s ON'.%n",
RequestPipeliningCommand.NAME);
} else if (isKsqlEntityList(response)) {
updateLastCommandSequenceNumber((KsqlEntityList)response.getResponse());

int retries = 0;
while (retries < MAX_RETRIES) {
try {
final RestResponse<R> response = requestIssuer.apply(ksql, commandSequenceNumberToWaitFor);

if (isSequenceNumberTimeout(response)) {
terminal.writer().printf(
"Error: command not executed since the server timed out "
+ "while waiting for prior commands to finish executing.%n"
+ "If you wish to execute new commands without waiting for "
+ "prior commands to finish, run the command '%s ON'.%n",
RequestPipeliningCommand.NAME);
} else if (isKsqlEntityList(response)) {
updateLastCommandSequenceNumber((KsqlEntityList) response.getResponse());
}
return response;
} catch (KsqlRestClientException e) {
if (e.getCause() instanceof ExecutionException && e.getCause()
.getCause() instanceof VertxException) {
// We close the connection asynchronously after a pull query is terminated at the terminal
// this means there is a chance that a subsequent query can grab the same connection
// and attempt to use it resulting in an exception. This is extremely unlikely to happen
// in real-usage, but can happen in the test suite, so we catch this case and retry
final VertxException ve = (VertxException) e.getCause().getCause();
if (ve.getMessage().equals("Connection was closed")) {
retries++;
continue;
}
}
throw e;
}
}
return response;
throw new KsqlRestClientException("Failed to execute request " + ksql);
}

public void runInteractively() {
Expand Down