Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: surface error to user when command topic deleted while server running #6240

Merged
merged 12 commits into from
Sep 23, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,9 @@ static KsqlRestApplication buildApplication(
KsqlRestConfig.KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS)),
metricsPrefix,
InternalTopicSerdes.deserializer(Command.class),
errorHandler
errorHandler,
serviceContext.getTopicClient(),
commandTopicName
);

final KsqlResource ksqlResource = new KsqlResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.RetryUtil;
Expand All @@ -40,7 +41,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -81,6 +84,8 @@ public class CommandRunner implements Closeable {
private final Consumer<QueuedCommand> incompatibleCommandChecker;
private final Errors errorHandler;
private boolean incompatibleCommandDetected;
private final Supplier<Boolean> commandTopicExists;
private boolean commandTopicDeleted;
private Status state = new Status(CommandRunnerStatus.RUNNING, CommandRunnerDegradedReason.NONE);

public enum CommandRunnerStatus {
Expand All @@ -91,8 +96,9 @@ public enum CommandRunnerStatus {

public enum CommandRunnerDegradedReason {
NONE(errors -> ""),
CORRUPTED(Errors:: commandRunnerDegradedBackupCorruptedErrorMessage),
INCOMPATIBLE_COMMAND(Errors:: commandRunnerDegradedIncompatibleCommandsErrorMessage);
CORRUPTED(Errors::commandRunnerDegradedBackupCorruptedErrorMessage),
INCOMPATIBLE_COMMAND(Errors::commandRunnerDegradedIncompatibleCommandsErrorMessage),
COMMAND_TOPIC_DELETED(Errors::commandRunnerDegradedCommandTopicDeletedErrorMessage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember a discussion on the design doc indicated that maybe we don't want to differentiate between corrupted and deleted. What was the decision on that? (also good to document it here for posterity)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main reason is right now, backups are optional, so if backups aren't enabled, then it wouldn't really make sense to have the degraded reason be CORRUPTED.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking it over with Rohan a bit more, I've combined CORRUPTED and COMMAND_TOPIC_DELETED into CORRUPTED. The COMMAND_TOPIC_DELETED could show up as CORRUPTED after a restart and having different states for the same external action (deleting the command topic) is strange.

I've made the error message returned from the API generic enough so it address both the backup corruption and the command topic deleted/modified case

The server has detected corruption in the command topic due to modifications performed on it. DDL statements will not be processed any further.
If a backup of the command topic is available, restore the command topic using the backup file.
A server restart is required to restore full functionality


private final Function<Errors, String> msgFactory;

Expand All @@ -110,8 +116,8 @@ public static class Status {
private final CommandRunnerDegradedReason degradedReason;

public Status(
final CommandRunnerStatus status,
final CommandRunnerDegradedReason degradedReason
final CommandRunnerStatus status,
final CommandRunnerDegradedReason degradedReason
) {
this.status = status;
this.degradedReason = degradedReason;
Expand All @@ -137,7 +143,9 @@ public CommandRunner(
final Duration commandRunnerHealthTimeout,
final String metricsGroupPrefix,
final Deserializer<Command> commandDeserializer,
final Errors errorHandler
final Errors errorHandler,
final KafkaTopicClient kafkaTopicClient,
final String commandTopicName
) {
this(
statementExecutor,
Expand All @@ -156,7 +164,8 @@ public CommandRunner(
queuedCommand.getAndDeserializeCommand(commandDeserializer);
},
commandDeserializer,
errorHandler
errorHandler,
() -> kafkaTopicClient.isTopicExists(commandTopicName)
);
}

Expand All @@ -176,7 +185,8 @@ public CommandRunner(
final Function<List<QueuedCommand>, List<QueuedCommand>> compactor,
final Consumer<QueuedCommand> incompatibleCommandChecker,
final Deserializer<Command> commandDeserializer,
final Errors errorHandler
final Errors errorHandler,
final Supplier<Boolean> commandTopicExists
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor");
Expand All @@ -199,7 +209,10 @@ public CommandRunner(
Objects.requireNonNull(commandDeserializer, "commandDeserializer");
this.errorHandler =
Objects.requireNonNull(errorHandler, "errorHandler");
this.commandTopicExists =
Objects.requireNonNull(commandTopicExists, "commandTopicExists");
this.incompatibleCommandDetected = false;
this.commandTopicDeleted = false;
}

/**
Expand Down Expand Up @@ -289,6 +302,9 @@ void fetchAndRunCommands() {
lastPollTime.set(clock.instant());
final List<QueuedCommand> commands = commandStore.getNewCommands(NEW_CMDS_TIMEOUT);
if (commands.isEmpty()) {
if (!commandTopicExists.get()) {
commandTopicDeleted = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can save this for another PR, but I think it makes sense to have this method return some status then to set a ton of flags and check them in the main loop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll file a follow up issue for that refactoring improvement.

}
return;
}

Expand Down Expand Up @@ -428,6 +444,13 @@ public void run() {
CommandRunnerDegradedReason.CORRUPTED
);
closeEarly();
} else if (commandTopicDeleted) {
LOG.warn("CommandRunner entering degraded state due to command topic deletion.");
state = new Status(
CommandRunnerStatus.DEGRADED,
CommandRunnerDegradedReason.COMMAND_TOPIC_DELETED
);
closeEarly();
} else {
LOG.trace("Polling for new writes to command topic");
fetchAndRunCommands();
Expand All @@ -437,6 +460,13 @@ public void run() {
if (!closed) {
throw wue;
}
} catch (final OffsetOutOfRangeException e) {
LOG.warn("The command topic offset was reset. CommandRunner thread exiting.");
state = new Status(
CommandRunnerStatus.DEGRADED,
CommandRunnerDegradedReason.COMMAND_TOPIC_DELETED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does offset reset always mean command topic deleted? it could mean the retention was configured incorrectly perhaps?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every time we start up the server, we'd check/override the command topic properties such that the retention.ms=-1 although I guess it is possible for the user themselves to override the retention config on the command topic which could trigger this. I can enhance the error message so that it mentions the degraded state is a result of the command topic being deleted or the configurations being modified for it

The REASON can be updated to COMMAND_TOPIC_DELETED_OR_MODIFIED

);
closeEarly();
} finally {
commandStore.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Before;
Expand All @@ -72,6 +74,7 @@ public class CommandRunnerTest {
private static final long COMMAND_RUNNER_HEALTH_TIMEOUT = 1000;
private static final String BACKUP_CORRUPTED_ERROR_MESSAGE = "corrupted";
private static final String INCOMPATIBLE_COMMANDS_ERROR_MESSAGE = "incompatible";
private static final String MISSING_COMMAND_TOPIC_ERROR_MESSAGE = "command topic missing";

@Mock
private InteractiveStatementExecutor statementExecutor;
Expand Down Expand Up @@ -104,6 +107,8 @@ public class CommandRunnerTest {
@Mock
private Deserializer<Command> commandDeserializer;
@Mock
private Supplier<Boolean> commandTopicExists;
@Mock
private Errors errorHandler;
@Captor
private ArgumentCaptor<Runnable> threadTaskCaptor;
Expand All @@ -126,10 +131,12 @@ public void setup() {
doNothing().when(incompatibleCommandChecker).accept(queuedCommand3);

when(commandStore.corruptionDetected()).thenReturn(false);
when(commandTopicExists.get()).thenReturn(true);
when(compactor.apply(any())).thenAnswer(inv -> inv.getArgument(0));
when(errorHandler.commandRunnerDegradedIncompatibleCommandsErrorMessage()).thenReturn(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE);
when(errorHandler.commandRunnerDegradedBackupCorruptedErrorMessage()).thenReturn(BACKUP_CORRUPTED_ERROR_MESSAGE);

when(errorHandler.commandRunnerDegradedCommandTopicDeletedErrorMessage()).thenReturn(MISSING_COMMAND_TOPIC_ERROR_MESSAGE);

givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);

commandRunner = new CommandRunner(
Expand All @@ -146,7 +153,8 @@ public void setup() {
compactor,
incompatibleCommandChecker,
commandDeserializer,
errorHandler
errorHandler,
commandTopicExists
);
}

Expand Down Expand Up @@ -333,6 +341,23 @@ public void shouldNotProcessCommandTopicIfBackupCorrupted() throws InterruptedEx
assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.CORRUPTED));
}

@Test
public void shouldEnterDegradedStateIfCommandTopicMissing() {
// Given:
givenQueuedCommands();
when(commandTopicExists.get()).thenReturn(false);

// When:
commandRunner.start();

final Runnable threadTask = getThreadTask();
threadTask.run();

assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(MISSING_COMMAND_TOPIC_ERROR_MESSAGE));
assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.COMMAND_TOPIC_DELETED));
}

@Test
public void shouldPullAndRunStatements() {
// Given:
Expand Down Expand Up @@ -524,6 +549,25 @@ public void shouldCloseEarlyWhenSerializationExceptionInFetch() throws Exception
inOrder.verify(commandStore).close();
}

public void shouldCloseEarlyWhenOffsetOutOfRangeException() throws Exception {
stevenpyzhang marked this conversation as resolved.
Show resolved Hide resolved
// Given:
when(commandStore.getNewCommands(any()))
.thenReturn(Collections.singletonList(queuedCommand1))
.thenThrow(new OffsetOutOfRangeException(Collections.singletonMap(new TopicPartition("command_topic", 0), 0L)));

// When:
commandRunner.start();
verify(commandStore, never()).close();
final Runnable threadTask = getThreadTask();
threadTask.run();

// Then:
final InOrder inOrder = inOrder(executor, commandStore);
inOrder.verify(commandStore).wakeup();
inOrder.verify(executor).awaitTermination(anyLong(), any());
inOrder.verify(commandStore).close();
}

@Test
public void shouldCloseTheCommandRunnerCorrectly() throws Exception {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.server.CommandTopicBackup;
import io.confluent.ksql.rest.server.CommandTopicBackupNoOp;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
Expand All @@ -68,6 +67,7 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -231,7 +231,9 @@ private class KsqlServer {
Duration.ofMillis(2000),
"",
InternalTopicSerdes.deserializer(Command.class),
errorHandler
errorHandler,
topicClient,
"command_topic"
);

this.ksqlResource = new KsqlResource(
Expand Down Expand Up @@ -567,6 +569,7 @@ private void shouldRecover(final List<QueuedCommand> commands) {
@Before
public void setUp() {
topicClient.preconditionTopicExists("A");
topicClient.preconditionTopicExists("command_topic");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public class DefaultErrorMessages implements ErrorMessages {
+ System.lineSeparator()
+ "The server must be restarted after performing either operation in order to resume "
+ "normal functionality";

public static final String COMMAND_RUNNER_DEGRADED_COMMAND_TOPIC_DELETED =
"The server is in a degraded state due to deletion of the command topic. "
+ "DDL statements will not be processed."
+ System.lineSeparator()
+ "Restart the server to restore server functionality.";


@Override
Expand Down Expand Up @@ -73,4 +79,9 @@ public String commandRunnerDegradedIncompatibleCommandsErrorMessage() {
public String commandRunnerDegradedBackupCorruptedErrorMessage() {
return COMMAND_RUNNER_DEGRADED_BACKUP_CORRUPTED_ERROR_MESSAGE;
}

@Override
public String commandRunnerDegradedCommandTopicDeletedErrorMessage() {
return COMMAND_RUNNER_DEGRADED_COMMAND_TOPIC_DELETED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ public interface ErrorMessages {
String commandRunnerDegradedIncompatibleCommandsErrorMessage();

String commandRunnerDegradedBackupCorruptedErrorMessage();

String commandRunnerDegradedCommandTopicDeletedErrorMessage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ public String commandRunnerDegradedBackupCorruptedErrorMessage() {
return errorMessages.commandRunnerDegradedBackupCorruptedErrorMessage();
}

public String commandRunnerDegradedCommandTopicDeletedErrorMessage() {
return errorMessages.commandRunnerDegradedCommandTopicDeletedErrorMessage();
}

public EndpointResponse generateResponse(
final Exception e,
final EndpointResponse defaultResponse
Expand Down