diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java index d86a313b1a76..17f471dacc3d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java @@ -90,7 +90,6 @@ public Iterable> getNewCommands(final Duration ti log.warn("Backup is out of sync with the current command topic. " + "Backups will not work until the previous command topic is " + "restored or all backup files are deleted.", e); - return records; } records.add(record); } @@ -117,7 +116,6 @@ public List getRestoreCommands(final Duration duration) { log.warn("Backup is out of sync with the current command topic. " + "Backups will not work until the previous command topic is " + "restored or all backup files are deleted.", e); - return restoreCommands; } if (record.value() == null) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java index 63656e085b80..02db3c3937f4 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java @@ -36,7 +36,7 @@ public class Command { @VisibleForTesting - static final int VERSION = 1; + public static final int VERSION = 1; private final String statement; private final Map overwriteProperties; @@ -80,7 +80,7 @@ public Command( } @VisibleForTesting - Command( + public Command( final String statement, final Map overwriteProperties, final Map originalProperties, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java index db4dd8a0dadc..e3ef6903d383 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java @@ -24,6 +24,7 @@ import io.confluent.ksql.rest.server.BackupReplayFile; import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.rest.server.computation.InternalTopicSerdes; +import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException; import io.confluent.ksql.rest.util.KsqlInternalTopicUtils; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContextFactory; @@ -37,6 +38,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -51,6 +53,7 @@ import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serializer; @@ -67,17 +70,26 @@ private static KsqlConfig loadServerConfig(final File configFile) { return new KsqlConfig(serverProps); } - public static List> loadBackup(final File file) throws IOException { + public static List> loadBackup( + final File file, + final RestoreOptions options + ) throws IOException { final BackupReplayFile commandTopicBackupFile = BackupReplayFile.readOnly(file); + List> records = commandTopicBackupFile.readRecords(); - final List> records = commandTopicBackupFile.readRecords(); - throwOnInvalidRecords(records); + if (options.isSkipIncompatibleCommands()) { + records = removeIncompatibleCommands(records); + } return records; } - private static void throwOnInvalidRecords(final List> records) { + private static List> removeIncompatibleCommands( + final List> records + ) { int n = 0; + int numFilteredCommands = 0; + final List> filteredRecords = new ArrayList<>(); for (final Pair record : records) { n++; @@ -87,21 +99,28 @@ private static void throwOnInvalidRecords(final List> recor .deserialize(null, record.getLeft()); } catch (final Exception e) { throw new KsqlException(String.format( - "Invalid CommandId string (line %d): %s", - n, new String(record.getLeft(), StandardCharsets.UTF_8), e + "Invalid CommandId string (line %d): %s (%s)", + n, new String(record.getLeft(), StandardCharsets.UTF_8), e.getMessage() )); } try { InternalTopicSerdes.deserializer(Command.class) .deserialize(null, record.getRight()); + } catch (final SerializationException | IncomaptibleKsqlCommandVersionException e) { + numFilteredCommands++; + continue; } catch (final Exception e) { throw new KsqlException(String.format( - "Invalid Command string (line %d): %s", - n, new String(record.getRight(), StandardCharsets.UTF_8), e + "Invalid Command string (line %d): %s (%s)", + n, new String(record.getRight(), StandardCharsets.UTF_8), e.getMessage() )); } + filteredRecords.add(record); } + System.out.println( + String.format("%s incompatible command(s) skipped from backup file.", numFilteredCommands)); + return filteredRecords; } private static void checkFileExists(final File file) throws Exception { @@ -176,7 +195,7 @@ public static void main(final String[] args) throws Exception { List> backupCommands = null; try { - backupCommands = loadBackup(backupFile); + backupCommands = loadBackup(backupFile, restoreOptions); } catch (final Exception e) { System.err.println(String.format( "Failed loading backup file.%nError = %s", e.getMessage())); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/RestoreOptions.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/RestoreOptions.java index 6fc5ea1a3f7d..4929f3f5f3a2 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/RestoreOptions.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/RestoreOptions.java @@ -42,8 +42,16 @@ public class RestoreOptions { @SuppressWarnings("unused") // Accessed via reflection @Option( - name = {"--yes", "-y"}, - description = "Automatic \"yes\" as answer to prompt and run non-interactively.") + name = {"--skip-incompatible-commands", "-s"}, + description = "This restore command can restore command topic commands that " + + "are of version (" + io.confluent.ksql.rest.server.computation.Command.VERSION + ") " + + "or lower. If true, the restore command will skip all incompatible commands." + + "If false, the restore command will restore the backup file as is.") + private boolean skipIncompatibleCommands = false; + + @Option( + name = {"--yes", "-y"}, + description = "Automatic \"yes\" as answer to prompt and run non-interactively.") private boolean automaticYes = false; @SuppressWarnings("unused") // Accessed via reflection @@ -66,6 +74,10 @@ public boolean isAutomaticYes() { return automaticYes; } + public boolean isSkipIncompatibleCommands() { + return skipIncompatibleCommands; + } + public static RestoreOptions parse(final String...args) throws IOException { return OptionsParser.parse(args, RestoreOptions.class); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java index ffc6d726ed08..24f6aa9f7076 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java @@ -18,18 +18,33 @@ import io.confluent.common.utils.IntegrationTest; import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.integration.Retry; +import io.confluent.ksql.rest.DefaultErrorMessages; +import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlWarning; import io.confluent.ksql.rest.entity.SourceInfo; import io.confluent.ksql.rest.entity.StreamsList; import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; +import io.confluent.ksql.rest.server.BackupReplayFile; +import io.confluent.ksql.rest.server.CommandTopicBackupImpl; import io.confluent.ksql.rest.server.TestKsqlRestApp; +import io.confluent.ksql.rest.server.computation.Command; +import io.confluent.ksql.rest.server.computation.InternalTopicSerdes; import io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.ReservedInternalTopics; import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -38,20 +53,29 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; +import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; import static io.confluent.ksql.util.KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION; +import static io.confluent.ksql.util.KsqlConfig.KSQL_SERVICE_ID_CONFIG; import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; @@ -61,6 +85,7 @@ @Category({IntegrationTest.class}) public class RestoreCommandTopicIntegrationTest { private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + private static final Serializer BYTES_SERIALIZER = new ByteArraySerializer(); @ClassRule public static final RuleChain CHAIN = RuleChain @@ -78,7 +103,7 @@ public class RestoreCommandTopicIntegrationTest { private static Path PROPERTIES_FILE; @BeforeClass - public static void setup() throws IOException { + public static void classSetUp() throws IOException { BACKUP_LOCATION = TMP_FOLDER.newFolder(); REST_APP = TestKsqlRestApp @@ -86,20 +111,27 @@ public static void setup() throws IOException { .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) .withProperty(KSQL_METASTORE_BACKUP_LOCATION, BACKUP_LOCATION.getPath()) .build(); + } + @Before + public void setup() throws IOException { REST_APP.start(); - KSQL_CONFIG = new KsqlConfig(REST_APP.getKsqlRestConfig().getKsqlConfigProperties()); COMMAND_TOPIC = ReservedInternalTopics.commandTopic(KSQL_CONFIG); BACKUP_FILE = Files.list(BACKUP_LOCATION.toPath()).findFirst().get(); PROPERTIES_FILE = TMP_FOLDER.newFile().toPath(); - writeServerProperties(); } - @AfterClass - public static void teardown() { + @After + public void teardown() throws IOException { REST_APP.stop(); + TEST_HARNESS.deleteTopics(Collections.singletonList(COMMAND_TOPIC)); + new File(String.valueOf(BACKUP_FILE)).delete(); + } + + @After + public void teardownClass() { TMP_FOLDER.delete(); } @@ -132,7 +164,7 @@ public void shouldBackupAndRestoreCommandTopic() throws Exception { // Delete the command topic and check the server is in degraded state TEST_HARNESS.deleteTopics(Collections.singletonList(COMMAND_TOPIC)); - assertThat("Server should be in degraded state", isDegradedState(), is(true)); + assertThatEventually("Degraded State", this::isDegradedState, is(true)); // Restore the command topic KsqlRestoreCommandTopic.main( @@ -151,18 +183,81 @@ public void shouldBackupAndRestoreCommandTopic() throws Exception { assertThat("Should have TOPIC1", streamsNames.contains("TOPIC1"), is(true)); assertThat("Should have TOPIC2", streamsNames.contains("TOPIC2"), is(true)); assertThat("Should have STREAM1", streamsNames.contains("STREAM1"), is(true)); - assertThat("Should have STREAM1", streamsNames.contains("STREAM1"), is(true)); + assertThat("Should have STREAM2", streamsNames.contains("STREAM2"), is(true)); assertThat("Server should NOT be in degraded state", isDegradedState(), is(false)); } + @Test + public void shouldSkipIncompatibleCommands() throws Exception { + // Given + TEST_HARNESS.ensureTopics("topic3", "topic4"); + + makeKsqlRequest("CREATE STREAM TOPIC3 (ID INT) " + + "WITH (KAFKA_TOPIC='topic3', VALUE_FORMAT='JSON');"); + makeKsqlRequest("CREATE STREAM TOPIC4 (ID INT) " + + "WITH (KAFKA_TOPIC='topic4', VALUE_FORMAT='JSON');"); + makeKsqlRequest("CREATE STREAM stream3 AS SELECT * FROM topic3;"); + + final CommandId commandId = new CommandId("TOPIC", "entity", "CREATE"); + final Command command = new Command( + "statement", + Collections.emptyMap(), + Collections.emptyMap(), + Optional.empty(), + Optional.of(Command.VERSION + 1), + Command.VERSION + 1); + + writeToBackupFile(commandId, command, BACKUP_FILE); + + // Delete the command topic + TEST_HARNESS.deleteTopics(Collections.singletonList(COMMAND_TOPIC)); + REST_APP.stop(); + + // Restore the command topic + KsqlRestoreCommandTopic.main( + new String[]{ + "--yes", + "--config-file", PROPERTIES_FILE.toString(), + BACKUP_FILE.toString() + }); + + // Re-load the command topic + REST_APP.start(); + + // Server should be in degraded state since the backup file had incompatible command + assertThat("Server should be in degraded state", isDegradedState(), is(true)); + + // Delete the command topic again and restore with skip flag + TEST_HARNESS.deleteTopics(Collections.singletonList(COMMAND_TOPIC)); + REST_APP.stop(); + KsqlRestoreCommandTopic.main( + new String[]{ + "--yes", + "-s", + "--config-file", PROPERTIES_FILE.toString(), + BACKUP_FILE.toString() + }); + + // Re-load the command topic + REST_APP.start(); + final List streamsNames = showStreams(); + assertThat("Should have TOPIC3", streamsNames.contains("TOPIC3"), is(true)); + assertThat("Should have TOPIC4", streamsNames.contains("TOPIC4"), is(true)); + assertThat("Should have STREAM3", streamsNames.contains("STREAM3"), is(true)); + assertThat("Server should not be in degraded state", isDegradedState(), is(false)); + } + private boolean isDegradedState() { // If in degraded state, then the following command will return a warning final List response = makeKsqlRequest( - "CREATE STREAM ANY (id INT) WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='JSON');"); + "Show Streams;"); final List warnings = response.get(0).getWarnings(); - return warnings.size() > 0 && warnings.get(0).getMessage() - .contains("The server has detected corruption in the command topic"); + return warnings.size() > 0 && + (warnings.get(0).getMessage().contains( + DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE) || + warnings.get(0).getMessage().contains( + DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_INCOMPATIBLE_COMMANDS_ERROR_MESSAGE)); } private List showStreams() { @@ -173,4 +268,27 @@ private List showStreams() { private List makeKsqlRequest(final String sql) { return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql); } + + public static void writeToBackupFile( + final CommandId commandId, + final Command command, + final Path backUpFileLocation + ) throws IOException { + FileOutputStream writer; + try { + writer = new FileOutputStream(new File(String.valueOf(backUpFileLocation)), true); + } catch (final FileNotFoundException e) { + throw new KsqlException( + String.format("Failed to open backup file: %s", backUpFileLocation), e); + } + + final byte[] keyValueSeparator = ":".getBytes(StandardCharsets.UTF_8); + final byte[] newLine = "/n".getBytes(StandardCharsets.UTF_8); + + writer.write(InternalTopicSerdes.serializer().serialize("", commandId)); + writer.write(keyValueSeparator); + writer.write(InternalTopicSerdes.serializer().serialize("", command)); + writer.write(newLine); + writer.flush(); + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java index a8b5f6010a9f..dd1bbf88e7cb 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java @@ -104,7 +104,7 @@ public void shouldAssignCorrectPartitionToConsumer() { } @Test - public void shouldGetCommandsThatDoNotCorruptBackup() { + public void shouldGetCommandsEvenWhenIssueBackingUp() { // Given: when(commandConsumer.poll(any(Duration.class))).thenReturn(consumerRecords); doNothing().doThrow(new KsqlServerException("error")).when(commandTopicBackup).writeRecord(any()); @@ -115,13 +115,12 @@ public void shouldGetCommandsThatDoNotCorruptBackup() { final List> newCommandsList = ImmutableList.copyOf(newCommands); // Then: - assertThat(newCommandsList.size(), is(1)); - assertThat(newCommandsList, equalTo(ImmutableList.of(record1))); - verify(commandTopicBackup, never()).writeRecord(record3); + assertThat(newCommandsList.size(), is(3)); + assertThat(newCommandsList, equalTo(ImmutableList.of(record1, record2, record3))); } @Test - public void shouldGetCommandsThatDoNotCorruptBackupInRestore() { + public void shouldGetCommandsEvenWhenIssueWithBackupInRestore() { // Given: when(commandConsumer.poll(any(Duration.class))) .thenReturn(someConsumerRecords( @@ -138,11 +137,13 @@ public void shouldGetCommandsThatDoNotCorruptBackupInRestore() { // Then: verify(commandConsumer).seekToBeginning(topicPartitionsCaptor.capture()); - verify(commandConsumer, times(1)).poll(any()); + verify(commandConsumer, times(3)).poll(any()); assertThat(topicPartitionsCaptor.getValue(), equalTo(Collections.singletonList(new TopicPartition(COMMAND_TOPIC_NAME, 0)))); assertThat(queuedCommandList, equalTo(ImmutableList.of( - new QueuedCommand(commandId1, command1, Optional.empty(), 0L)))); + new QueuedCommand(commandId1, command1, Optional.empty(), 0L), + new QueuedCommand(commandId2, command2, Optional.empty(), 1L), + new QueuedCommand(commandId3, command3, Optional.empty(), 2L)))); } @Test