From 1445238434c4a4edcec5385191400ee9d8cff777 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Tue, 22 Sep 2020 21:49:38 -0700 Subject: [PATCH] combine corrupted and command topic missing degraded reason --- .../server/computation/CommandRunner.java | 10 +++--- .../server/computation/CommandRunnerTest.java | 12 +++---- .../ksql/rest/DefaultErrorMessages.java | 31 +++++-------------- .../io/confluent/ksql/rest/ErrorMessages.java | 4 +-- .../java/io/confluent/ksql/rest/Errors.java | 8 ++--- 5 files changed, 20 insertions(+), 45 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 55f546aa5b97..0bb68edced0a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -96,10 +96,8 @@ public enum CommandRunnerStatus { public enum CommandRunnerDegradedReason { NONE(errors -> ""), - CORRUPTED(Errors::commandRunnerDegradedBackupCorruptedErrorMessage), - INCOMPATIBLE_COMMAND(Errors::commandRunnerDegradedIncompatibleCommandsErrorMessage), - COMMAND_TOPIC_DELETED_OR_MODIFIED( - Errors::commandRunnerDegradedCommandTopicDeletedModifiedErrorMessage); + CORRUPTED(Errors::commandRunnerDegradedCorruptedErrorMessage), + INCOMPATIBLE_COMMAND(Errors::commandRunnerDegradedIncompatibleCommandsErrorMessage); private final Function msgFactory; @@ -449,7 +447,7 @@ public void run() { LOG.warn("CommandRunner entering degraded state due to command topic deletion."); state = new Status( CommandRunnerStatus.DEGRADED, - CommandRunnerDegradedReason.COMMAND_TOPIC_DELETED_OR_MODIFIED + CommandRunnerDegradedReason.CORRUPTED ); closeEarly(); } else { @@ -465,7 +463,7 @@ public void run() { LOG.warn("The command topic offset was reset. CommandRunner thread exiting."); state = new Status( CommandRunnerStatus.DEGRADED, - CommandRunnerDegradedReason.COMMAND_TOPIC_DELETED_OR_MODIFIED + CommandRunnerDegradedReason.CORRUPTED ); closeEarly(); } finally { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index ebc0cf1301e7..7d3dc09f1851 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -72,9 +72,8 @@ @RunWith(MockitoJUnitRunner.class) public class CommandRunnerTest { private static final long COMMAND_RUNNER_HEALTH_TIMEOUT = 1000; - private static final String BACKUP_CORRUPTED_ERROR_MESSAGE = "corrupted"; + private static final String CORRUPTED_ERROR_MESSAGE = "corrupted"; private static final String INCOMPATIBLE_COMMANDS_ERROR_MESSAGE = "incompatible"; - private static final String MISSING_COMMAND_TOPIC_ERROR_MESSAGE = "command topic missing"; @Mock private InteractiveStatementExecutor statementExecutor; @@ -134,8 +133,7 @@ public void setup() { when(commandTopicExists.get()).thenReturn(true); when(compactor.apply(any())).thenAnswer(inv -> inv.getArgument(0)); when(errorHandler.commandRunnerDegradedIncompatibleCommandsErrorMessage()).thenReturn(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE); - when(errorHandler.commandRunnerDegradedBackupCorruptedErrorMessage()).thenReturn(BACKUP_CORRUPTED_ERROR_MESSAGE); - when(errorHandler.commandRunnerDegradedCommandTopicDeletedModifiedErrorMessage()).thenReturn(MISSING_COMMAND_TOPIC_ERROR_MESSAGE); + when(errorHandler.commandRunnerDegradedCorruptedErrorMessage()).thenReturn(CORRUPTED_ERROR_MESSAGE); givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3); @@ -337,7 +335,7 @@ public void shouldNotProcessCommandTopicIfBackupCorrupted() throws InterruptedEx inOrder.verify(executor).awaitTermination(anyLong(), any()); inOrder.verify(commandStore).close(); assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); - assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(BACKUP_CORRUPTED_ERROR_MESSAGE)); + assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(CORRUPTED_ERROR_MESSAGE)); assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.CORRUPTED)); } @@ -354,10 +352,10 @@ public void shouldEnterDegradedStateIfCommandTopicMissing() { threadTask.run(); assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); - assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(MISSING_COMMAND_TOPIC_ERROR_MESSAGE)); + assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(CORRUPTED_ERROR_MESSAGE)); assertThat( commandRunner.getCommandRunnerDegradedReason(), - is(CommandRunner.CommandRunnerDegradedReason.COMMAND_TOPIC_DELETED_OR_MODIFIED)); + is(CommandRunner.CommandRunnerDegradedReason.CORRUPTED)); } @Test diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java index 60889b049909..1c7ec2bb41c1 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java @@ -25,25 +25,15 @@ public class DefaultErrorMessages implements ErrorMessages { + System.lineSeparator() + "This is most likely due to the service being rolled back to an earlier version."; - public static final String COMMAND_RUNNER_DEGRADED_BACKUP_CORRUPTED_ERROR_MESSAGE = - "The server has detected that the command topic may be corrupted. The backup of the " - + "command topic does not match the current contents of command topic." + public static final String COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE = + "The server has detected corruption in the command topic due to modifications performed on it. " + System.lineSeparator() - + "DDL statements will not be processed until either:" + + "DDL statements will not be processed any further." + System.lineSeparator() - + "1. The current command topic is deleted and the backup file is used " - + "to restore the command topic." + + "If a backup of the command topic is available, " + + "restore the command topic using the backup file." + System.lineSeparator() - + "2. The current backup file is deleted." - + System.lineSeparator() - + "The server must be restarted after performing either operation in order to resume " - + "normal functionality"; - - public static final String COMMAND_RUNNER_DEGRADED_COMMAND_TOPIC_DELETED_MODIFIED = - "The server has detected that the command topic has been modified or deleted." - + "DDL statements will not be processed." - + System.lineSeparator() - + "Restart the server to restore server functionality."; + + "A server restart is required to restore full functionality."; @Override @@ -76,12 +66,7 @@ public String commandRunnerDegradedIncompatibleCommandsErrorMessage() { } @Override - public String commandRunnerDegradedBackupCorruptedErrorMessage() { - return COMMAND_RUNNER_DEGRADED_BACKUP_CORRUPTED_ERROR_MESSAGE; - } - - @Override - public String commandRunnerDegradedCommandTopicDeletedModifiedErrorMessage() { - return COMMAND_RUNNER_DEGRADED_COMMAND_TOPIC_DELETED_MODIFIED; + public String commandRunnerDegradedCorruptedErrorMessage() { + return COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE; } } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java index 0d610b6791e2..269e0e9f1487 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java @@ -25,7 +25,5 @@ public interface ErrorMessages { String commandRunnerDegradedIncompatibleCommandsErrorMessage(); - String commandRunnerDegradedBackupCorruptedErrorMessage(); - - String commandRunnerDegradedCommandTopicDeletedModifiedErrorMessage(); + String commandRunnerDegradedCorruptedErrorMessage(); } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java index 604e2412533c..e929982cc83e 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java @@ -230,12 +230,8 @@ public String commandRunnerDegradedIncompatibleCommandsErrorMessage() { return errorMessages.commandRunnerDegradedIncompatibleCommandsErrorMessage(); } - public String commandRunnerDegradedBackupCorruptedErrorMessage() { - return errorMessages.commandRunnerDegradedBackupCorruptedErrorMessage(); - } - - public String commandRunnerDegradedCommandTopicDeletedModifiedErrorMessage() { - return errorMessages.commandRunnerDegradedCommandTopicDeletedModifiedErrorMessage(); + public String commandRunnerDegradedCorruptedErrorMessage() { + return errorMessages.commandRunnerDegradedCorruptedErrorMessage(); } public EndpointResponse generateResponse(