From 586fc817eafaa76368f40161fcb781c519eeefca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Mon, 5 Oct 2020 13:40:01 -0500 Subject: [PATCH 1/5] feat: new ksql-restore-command-topic to restore backups --- bin/ksql-restore-command-topic | 28 ++ .../ksql/rest/server/BackupInputFile.java | 64 ++++ .../ksql/rest/server/BackupReplayFile.java | 36 +-- .../restore/KsqlRestoreCommandTopic.java | 288 +++++++++++++++++ .../rest/server/restore/RestoreOptions.java | 72 +++++ .../ksql/rest/server/BackupInputFileTest.java | 100 ++++++ .../restore/KsqlRestoreCommandTopicTest.java | 291 ++++++++++++++++++ .../server/restore/RestoreOptionsTest.java | 48 +++ .../streams/StreamAggregateBuilderTest.java | 3 + 9 files changed, 897 insertions(+), 33 deletions(-) create mode 100755 bin/ksql-restore-command-topic create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupInputFile.java create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/RestoreOptions.java create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupInputFileTest.java create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/RestoreOptionsTest.java diff --git a/bin/ksql-restore-command-topic b/bin/ksql-restore-command-topic new file mode 100755 index 000000000000..b412014b5768 --- /dev/null +++ b/bin/ksql-restore-command-topic @@ -0,0 +1,28 @@ +#!/bin/bash +# (Copyright) [2020] Confluent, Inc. + +base_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )/.." && pwd ) + +: "${KSQL_CONFIG_DIR:="$base_dir/config"}" + +# logj4 settings +if [ -z "$KSQL_LOG4J_OPTS" ]; then + # Test for files from dev -> packages so this will work as expected in dev if you have packages + # installed + if [ -e "$base_dir/config/log4j.properties" ]; then # Dev environment + KSQL_CONFIG_DIR="$base_dir/config" + elif [ -e "$base_dir/etc/ksqldb/log4j.properties" ]; then # Simple zip file layout + KSQL_CONFIG_DIR="$base_dir/etc/ksqldb" + elif [ -e "/etc/ksqldb/log4j.properties" ]; then # Normal install layout + KSQL_CONFIG_DIR="/etc/ksqldb" + fi +fi + +: "${KSQL_LOG4J_OPTS:=""}" + +# Use file logging by default +if [ -z "$KSQL_LOG4J_OPTS" ]; then + export KSQL_LOG4J_OPTS="-Dlog4j.configuration=file:$KSQL_CONFIG_DIR/log4j-file.properties" +fi + +exec "$base_dir"/bin/ksql-run-class io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic "$@" \ No newline at end of file diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupInputFile.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupInputFile.java new file mode 100644 index 000000000000..0a4639161cc2 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupInputFile.java @@ -0,0 +1,64 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import io.confluent.ksql.util.Pair; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class BackupInputFile { + protected static final String KEY_VALUE_SEPARATOR_STR = ":"; + protected static final String NEW_LINE_SEPARATOR_STR = "\n"; + + static final byte[] KEY_VALUE_SEPARATOR_BYTES = + KEY_VALUE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8); + static final byte[] NEW_LINE_SEPARATOR_BYTES = + NEW_LINE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8); + + private final File file; + + public BackupInputFile(final File file) { + this.file = Objects.requireNonNull(file, "file"); + } + + public File getFile() { + return file; + } + + public String getPath() { + return file.getAbsolutePath(); + } + + public List> readRecords() throws IOException { + final List> commands = new ArrayList<>(); + for (final String line : Files.readAllLines(getFile().toPath(), StandardCharsets.UTF_8)) { + final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR_STR)); + final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR_STR) + 1); + + commands.add(new Pair<>( + commandId.getBytes(StandardCharsets.UTF_8), + command.getBytes(StandardCharsets.UTF_8) + )); + } + + return commands; + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java index d4a3095af3b5..e5c7a30d9dc0 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java @@ -16,36 +16,21 @@ package io.confluent.ksql.rest.server; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.Pair; import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; import org.apache.kafka.clients.consumer.ConsumerRecord; /** * A file that is used by the backup service to replay command_topic commands. */ -public final class BackupReplayFile implements Closeable { - private static final String KEY_VALUE_SEPARATOR_STR = ":"; - private static final String NEW_LINE_SEPARATOR_STR = "\n"; - - private static final byte[] KEY_VALUE_SEPARATOR_BYTES = - KEY_VALUE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8); - private static final byte[] NEW_LINE_SEPARATOR_BYTES = - NEW_LINE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8); - - private final File file; +public final class BackupReplayFile extends BackupInputFile implements Closeable { private final FileOutputStream writer; public BackupReplayFile(final File file) { - this.file = Objects.requireNonNull(file, "file"); + super(file); this.writer = createWriter(file); } @@ -59,7 +44,7 @@ private static FileOutputStream createWriter(final File file) { } public String getPath() { - return file.getAbsolutePath(); + return getFile().getAbsolutePath(); } public void write(final ConsumerRecord record) throws IOException { @@ -70,21 +55,6 @@ public void write(final ConsumerRecord record) throws IOExceptio writer.flush(); } - public List> readRecords() throws IOException { - final List> commands = new ArrayList<>(); - for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) { - final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR_STR)); - final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR_STR) + 1); - - commands.add(new Pair<>( - commandId.getBytes(StandardCharsets.UTF_8), - command.getBytes(StandardCharsets.UTF_8) - )); - } - - return commands; - } - @Override public void close() throws IOException { writer.close(); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java new file mode 100644 index 000000000000..4d30fc657573 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java @@ -0,0 +1,288 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.restore; + +import static java.util.Objects.requireNonNull; + +import io.confluent.ksql.properties.PropertiesUtil; +import io.confluent.ksql.rest.entity.CommandId; +import io.confluent.ksql.rest.server.BackupInputFile; +import io.confluent.ksql.rest.server.computation.Command; +import io.confluent.ksql.rest.server.computation.InternalTopicSerdes; +import io.confluent.ksql.rest.util.KsqlInternalTopicUtils; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.services.ServiceContextFactory; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.ReservedInternalTopics; + +import java.io.Console; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; + +/** + * Main command to restore the KSQL command topic. + */ +public class KsqlRestoreCommandTopic { + private static final Serializer BYTES_SERIALIZER = new ByteArraySerializer(); + private static final int COMMAND_TOPIC_PARTITION = 0; + + private static KsqlConfig loadServerConfig(final File configFile) { + final Map serverProps = PropertiesUtil.loadProperties(configFile); + return new KsqlConfig(serverProps); + } + + public static List> loadBackup(final File file) throws IOException { + final BackupInputFile commandTopicBackupFile = new BackupInputFile(file); + + final List> records = commandTopicBackupFile.readRecords(); + throwOnInvalidRecords(records); + + return records; + } + + private static void throwOnInvalidRecords(final List> records) { + int n = 0; + + for (final Pair record : records) { + n++; + + try { + InternalTopicSerdes.deserializer(CommandId.class) + .deserialize(null, record.getLeft()); + } catch (final Exception e) { + throw new KsqlException(String.format( + "Invalid CommandId string (line %d): %s", + n, new String(record.getLeft(), StandardCharsets.UTF_8), e + )); + } + + try { + InternalTopicSerdes.deserializer(Command.class) + .deserialize(null, record.getRight()); + } catch (final Exception e) { + throw new KsqlException(String.format( + "Invalid Command string (line %d): %s", + n, new String(record.getRight(), StandardCharsets.UTF_8), e + )); + } + } + } + + private static void checkFileExists(final File file) throws Exception { + if (!file.exists()) { + throw new NoSuchFileException("File does not exist: " + file.getPath()); + } + + if (!file.isFile()) { + throw new NoSuchFileException("Invalid file: " + file.getPath()); + } + + if (!file.canRead()) { + throw new Exception("You don't have Read permissions on file: " + file.getPath()); + } + } + + private static long timer; + + private static void resetTimer() { + timer = System.currentTimeMillis(); + } + + private static long currentTimer() { + return System.currentTimeMillis() - timer; + } + + private static boolean promptQuestion() { + System.out.println("Restoring the command topic will DELETE your actual metadata."); + System.out.print("Continue [yes or no] (default: no)? "); + + final Console console = System.console(); + final String decision = console.readLine(); + + switch (decision.toLowerCase()) { + case "yes": + return true; + default: + return false; + } + } + + /** + * Main command to restore the KSQL command topic. + */ + public static void main(final String[] args) throws Exception { + final RestoreOptions restoreOptions = RestoreOptions.parse(args); + if (restoreOptions == null) { + System.exit(1); + } + + final File configFile = restoreOptions.getConfigFile(); + final File backupFile = restoreOptions.getBackupFile(); + + try { + checkFileExists(configFile); + checkFileExists(backupFile); + } catch (final Exception e) { + System.err.println(e.getMessage()); + System.exit(2); + } + + final KsqlConfig serverConfig = loadServerConfig(configFile); + final KsqlRestoreCommandTopic restoreMetadata = new KsqlRestoreCommandTopic(serverConfig); + + // Stop and ask the user to type 'yes' to continue to warn users about the restore process + if (!restoreOptions.isAutomaticYes() && !promptQuestion()) { + System.exit(0); + } + + System.out.println("Loading backup file ..."); + resetTimer(); + + List> backupCommands = null; + try { + backupCommands = loadBackup(backupFile); + } catch (final Exception e) { + System.err.println(String.format( + "Failed loading backup file.%nError = %s", e.getMessage())); + System.exit(1); + } + + System.out.println(String.format( + "Backup (%d records) loaded in memory in %s ms.", backupCommands.size(), currentTimer())); + System.out.println(); + + System.out.println("Restoring command topic ..."); + resetTimer(); + + try { + restoreMetadata.restore(backupCommands); + } catch (final Exception e) { + System.err.println(String.format( + "Failed restoring command topic.%nError = %s", e.getMessage())); + System.exit(1); + } + + System.out.println(String.format( + "Restore process completed in %d ms.", currentTimer())); + System.out.println(); + + System.out.println("You need to restart the ksqlDB server to re-load the command topic."); + } + + private final KsqlConfig serverConfig; + private final String commandTopicName; + private final KafkaTopicClient topicClient; + private final Producer kafkaProducer; + + KsqlRestoreCommandTopic(final KsqlConfig serverConfig) { + this( + serverConfig, + ReservedInternalTopics.commandTopic(serverConfig), + ServiceContextFactory.create(serverConfig, + () -> /* no ksql client */ null).getTopicClient(), + new KafkaProducer<>( + serverConfig.getProducerClientConfigProps(), + BYTES_SERIALIZER, + BYTES_SERIALIZER + ) + ); + } + + KsqlRestoreCommandTopic( + final KsqlConfig serverConfig, + final String commandTopicName, + final KafkaTopicClient topicClient, + final Producer kafkaProducer + ) { + this.serverConfig = requireNonNull(serverConfig, "serverConfig"); + this.commandTopicName = requireNonNull(commandTopicName, "commandTopicName"); + this.topicClient = requireNonNull(topicClient, "topicClient"); + this.kafkaProducer = requireNonNull(kafkaProducer, "kafkaProducer"); + } + + public void restore(final List> backupCommands) { + // Delete the command topic + deleteCommandTopicIfExists(); + + // Create the command topic + KsqlInternalTopicUtils.ensureTopic(commandTopicName, serverConfig, topicClient); + + // Restore the commands + restoreCommandTopic(backupCommands); + } + + private void deleteCommandTopicIfExists() { + if (topicClient.isTopicExists(commandTopicName)) { + topicClient.deleteTopics(Collections.singletonList(commandTopicName)); + try { + // Wait a few seconds, otherwise the create topic does not work because it still sees + // the topic + Thread.sleep(1000); + } catch (final InterruptedException e) { + // Don't need to throw an exception in this case + } + } + } + + private void restoreCommandTopic(final List> commands) { + final List> futures = new ArrayList<>(commands.size()); + + for (final Pair command : commands) { + futures.add(enqueueCommand(command.getLeft(), command.getRight())); + } + + int i = 0; + for (final Future future : futures) { + try { + future.get(); + } catch (final InterruptedException e) { + throw new KsqlException("Restore process was interrupted.", e); + } catch (final Exception e) { + throw new KsqlException( + String.format("Failed restoring command (line %d): %s", + i + 1, new String(commands.get(i).getLeft(), StandardCharsets.UTF_8)), e); + } + + i++; + } + } + + public Future enqueueCommand(final byte[] commandId, final byte[] command) { + final ProducerRecord producerRecord = new ProducerRecord<>( + commandTopicName, + COMMAND_TOPIC_PARTITION, + commandId, + command); + + return kafkaProducer.send(producerRecord); + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/RestoreOptions.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/RestoreOptions.java new file mode 100644 index 000000000000..6fc5ea1a3f7d --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/RestoreOptions.java @@ -0,0 +1,72 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.restore; + +import com.github.rvesse.airline.HelpOption; +import com.github.rvesse.airline.annotations.Arguments; +import com.github.rvesse.airline.annotations.Command; +import com.github.rvesse.airline.annotations.Option; +import com.github.rvesse.airline.annotations.restrictions.Required; +import io.confluent.ksql.rest.util.OptionsParser; +import java.io.File; +import java.io.IOException; +import javax.inject.Inject; + +@Command(name = "ksql-restore-metadata", description = "KSQL Restore Metadata") +public class RestoreOptions { + // Only here so that the help message generated by Help.help() is accurate + @Inject + public HelpOption help; + + @SuppressWarnings("unused") // Accessed via reflection + @Required + @Option( + name = "--config-file", + description = "A file specifying configs for the KSQL Server, KSQL, " + + "and its underlying Kafka Streams instance(s). Refer to KSQL " + + "documentation for a list of available configs.") + private String configFile; + + @SuppressWarnings("unused") // Accessed via reflection + @Option( + name = {"--yes", "-y"}, + description = "Automatic \"yes\" as answer to prompt and run non-interactively.") + private boolean automaticYes = false; + + @SuppressWarnings("unused") // Accessed via reflection + @Required + @Arguments( + title = "backup-file", + description = "A file specifying the file that contains the metadata backup.") + private String backupFile; + + + public File getConfigFile() { + return new File(configFile); + } + + public File getBackupFile() { + return new File(backupFile); + } + + public boolean isAutomaticYes() { + return automaticYes; + } + + public static RestoreOptions parse(final String...args) throws IOException { + return OptionsParser.parse(args, RestoreOptions.class); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupInputFileTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupInputFileTest.java new file mode 100644 index 000000000000..95275e156370 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupInputFileTest.java @@ -0,0 +1,100 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import io.confluent.ksql.util.Pair; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.List; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class BackupInputFileTest { + private static final String KEY_VALUE_SEPARATOR = ":"; + private static final String BACKUP_FILE_NAME = "backup_command_topic_1"; + + @Rule + public TemporaryFolder backupLocation = new TemporaryFolder(); + + private File internalReplayFile; + private BackupInputFile backupFile; + + @Before + public void setup() throws IOException { + internalReplayFile = backupLocation.newFile(BACKUP_FILE_NAME); + backupFile = new BackupInputFile(internalReplayFile); + } + + @Test + public void shouldGetFileAndPaths() { + // When/Then + assertThat(backupFile.getFile(), is(internalReplayFile)); + assertThat(backupFile.getPath(), is(internalReplayFile.getPath())); + } + + @Test + public void shouldBeEmptyWhenReadAllCommandsFromEmptyFile() throws IOException { + // When + final List commands = backupFile.readRecords(); + + // Then + assertThat(commands.size(), is(0)); + } + + @Test + public void shouldReadAndParseCommandsFromTheFile() throws IOException { + // Given + Files.write(internalReplayFile.toPath(), + String.format("%s%s%s%n%s%s%s", + "\"stream/stream1/create\"", + KEY_VALUE_SEPARATOR, + "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"," + + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}", + "\"stream/stream2/create\"", + KEY_VALUE_SEPARATOR, + "{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\"," + + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}" + ).getBytes(StandardCharsets.UTF_8)); + + // When + final List> commands = backupFile.readRecords(); + + // Then + assertThat(commands.size(), is(2)); + assertThat(new String(commands.get(0).left, StandardCharsets.UTF_8), + is("\"stream/stream1/create\"")); + assertThat(new String(commands.get(0).right, StandardCharsets.UTF_8), + is( + "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"," + + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}")); + assertThat(new String(commands.get(1).left, StandardCharsets.UTF_8), + is("\"stream/stream2/create\"")); + assertThat(new String(commands.get(1).right, StandardCharsets.UTF_8), + is( + "{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\"," + + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}")); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java new file mode 100644 index 000000000000..dbc07b346db1 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java @@ -0,0 +1,291 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.restore; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.Pair; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.TopicConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class KsqlRestoreCommandTopicTest { + private static final String COMMAND_TOPIC_NAME = "command_topic_name"; + + private static final int INTERNAL_TOPIC_PARTITION_COUNT = 1; + private static final short INTERNAL_TOPIC_REPLICAS_COUNT = 1; + + private static final ImmutableMap INTERNAL_TOPIC_CONFIG = ImmutableMap.of( + TopicConfig.RETENTION_MS_CONFIG, -1L, + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE, + TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, false, + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, INTERNAL_TOPIC_REPLICAS_COUNT + ); + + private static final Pair COMMAND_1 = createStreamCommand("stream1"); + private static final Pair COMMAND_2 = createStreamCommand("stream2"); + private static final Pair COMMAND_3 = createStreamCommand("stream3"); + + private static final ProducerRecord RECORD_1 = newRecord(COMMAND_1); + private static final ProducerRecord RECORD_2 = newRecord(COMMAND_2); + private static final ProducerRecord RECORD_3 = newRecord(COMMAND_3); + + private static final List> BACKUP_COMMANDS = + Arrays.asList(COMMAND_1, COMMAND_2, COMMAND_3); + + private static Pair createStreamCommand(final String streamName) { + return Pair.of( + String.format("\"stream/%s/create\"", streamName).getBytes(StandardCharsets.UTF_8), + String.format("{\"statement\":\"CREATE STREAM %s (id INT) WITH (kafka_topic='%s')\"," + + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}", + streamName, streamName).getBytes(StandardCharsets.UTF_8) + ); + } + + private static ProducerRecord newRecord(final Pair command) { + return new ProducerRecord<>( + COMMAND_TOPIC_NAME, + 0, + command.getLeft(), + command.getRight()); + } + + @Mock + private KafkaTopicClient topicClient; + @Mock + private Producer kafkaProducer; + @Mock + private Future future1; + @Mock + private Future future2; + @Mock + private Future future3; + + private KsqlRestoreCommandTopic restoreCommandTopic; + + + @Before + public void setup() { + final KsqlConfig serverConfig = new KsqlConfig(ImmutableMap.of( + KsqlConfig.KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY, INTERNAL_TOPIC_REPLICAS_COUNT, + KsqlConfig.KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY, INTERNAL_TOPIC_REPLICAS_COUNT + )); + + restoreCommandTopic = new KsqlRestoreCommandTopic( + serverConfig, + COMMAND_TOPIC_NAME, + topicClient, + kafkaProducer + ); + + when(kafkaProducer.send(RECORD_1)).thenReturn(future1); + when(kafkaProducer.send(RECORD_2)).thenReturn(future2); + when(kafkaProducer.send(RECORD_3)).thenReturn(future3); + } + + @Test + public void shouldCreateAndRestoreCommandTopic() throws ExecutionException, InterruptedException { + // Given: + when(topicClient.isTopicExists(COMMAND_TOPIC_NAME)).thenReturn(false); + + // When: + restoreCommandTopic.restore(BACKUP_COMMANDS); + + // Then: + verifyCreateTopic(COMMAND_TOPIC_NAME); + verify(kafkaProducer).send(RECORD_1); + verify(kafkaProducer).send(RECORD_2); + verify(kafkaProducer).send(RECORD_3); + verify(future1).get(); + verify(future2).get(); + verify(future3).get(); + verifyNoMoreInteractions(kafkaProducer, future1, future2, future3); + } + + @Test + public void shouldThrowWhenRestoreIsInterrupted() throws Exception { + // Given: + when(topicClient.isTopicExists(COMMAND_TOPIC_NAME)).thenReturn(false); + doThrow(new InterruptedException("fail")).when(future2).get(); + + // When: + final Exception e = assertThrows( + KsqlException.class, + () -> restoreCommandTopic.restore(BACKUP_COMMANDS)); + + // Then: + assertThat(e.getMessage(), containsString("Restore process was interrupted.")); + verifyCreateTopic(COMMAND_TOPIC_NAME); + verify(kafkaProducer).send(RECORD_1); + verify(kafkaProducer).send(RECORD_2); + verify(kafkaProducer).send(RECORD_3); + verify(future1).get(); + verify(future2).get(); + verifyNoMoreInteractions(kafkaProducer, future1, future2); + verifyZeroInteractions(future3); + } + + @Test + public void shouldThrowWhenRestoreExecutionFails() throws Exception { + // Given: + when(topicClient.isTopicExists(COMMAND_TOPIC_NAME)).thenReturn(false); + doThrow(new RuntimeException()).when(future2).get(); + + // When: + final Exception e = assertThrows( + KsqlException.class, + () -> restoreCommandTopic.restore(BACKUP_COMMANDS)); + + // Then: + assertThat(e.getMessage(), + containsString(String.format("Failed restoring command (line 2): %s", + new String(RECORD_2.key(), StandardCharsets.UTF_8)))); + + verifyCreateTopic(COMMAND_TOPIC_NAME); + verify(kafkaProducer).send(RECORD_1); + verify(kafkaProducer).send(RECORD_2); + verify(kafkaProducer).send(RECORD_3); + verify(future1).get(); + verify(future2).get(); + verifyNoMoreInteractions(kafkaProducer, future1, future2); + verifyZeroInteractions(future3); + } + + @Test + public void shouldRestoreCommandTopicWithEmptyCommands() { + // Given: + when(topicClient.isTopicExists(COMMAND_TOPIC_NAME)).thenReturn(false); + + // When: + restoreCommandTopic.restore(Collections.emptyList()); + + // Then: + verifyCreateTopic(COMMAND_TOPIC_NAME); + verifyZeroInteractions(kafkaProducer, future1, future2, future3); + } + + @Test + public void shouldDeleteAndCreateCommandTopicOnRestore() throws Exception { + // Given: + when(topicClient.isTopicExists(COMMAND_TOPIC_NAME)).thenReturn(true).thenReturn(false); + + // When: + restoreCommandTopic.restore(Collections.singletonList(BACKUP_COMMANDS.get(0))); + + // Then: + verifyDeleteTopic(COMMAND_TOPIC_NAME); + verifyCreateTopic(COMMAND_TOPIC_NAME); + verify(kafkaProducer).send(RECORD_1); + verify(future1).get(); + verifyNoMoreInteractions(kafkaProducer, future1); + } + + @SuppressWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") + @Test + public void shouldThrowIfCannotDescribeTopicExists() { + // Given: + doThrow(new RuntimeException("denied")).when(topicClient).isTopicExists(COMMAND_TOPIC_NAME); + + // When: + final Exception e = assertThrows( + RuntimeException.class, + () -> restoreCommandTopic.restore(Collections.singletonList(BACKUP_COMMANDS.get(0)))); + + // Then: + assertThat(e.getMessage(), containsString("denied")); + verifyZeroInteractions(kafkaProducer); + } + + @Test + public void shouldThrowIfCannotDeleteTopic() { + // Given: + when(topicClient.isTopicExists(COMMAND_TOPIC_NAME)).thenReturn(true).thenReturn(true); + doThrow(new RuntimeException("denied")).when(topicClient) + .deleteTopics(Collections.singletonList(COMMAND_TOPIC_NAME)); + + // When: + final Exception e = assertThrows( + RuntimeException.class, + () -> restoreCommandTopic.restore(Collections.singletonList(BACKUP_COMMANDS.get(0)))); + + // Then: + assertThat(e.getMessage(), containsString("denied")); + verify(topicClient).isTopicExists(COMMAND_TOPIC_NAME); + verify(topicClient).deleteTopics(Collections.singletonList(COMMAND_TOPIC_NAME)); + verifyNoMoreInteractions(topicClient); + verifyZeroInteractions(kafkaProducer); + } + + @Test + public void shouldThrowIfCannotCreateTopic() { + // Given: + when(topicClient.isTopicExists(COMMAND_TOPIC_NAME)).thenReturn(false); + doThrow(new RuntimeException("denied")).when(topicClient) + .createTopic(COMMAND_TOPIC_NAME, INTERNAL_TOPIC_PARTITION_COUNT, + INTERNAL_TOPIC_REPLICAS_COUNT, INTERNAL_TOPIC_CONFIG); + + // When: + final Exception e = assertThrows( + RuntimeException.class, + () -> restoreCommandTopic.restore(Collections.singletonList(BACKUP_COMMANDS.get(0)))); + + // Then: + assertThat(e.getMessage(), containsString("denied")); + verify(topicClient, times(2)).isTopicExists(COMMAND_TOPIC_NAME); + verifyCreateTopic(COMMAND_TOPIC_NAME); + verifyNoMoreInteractions(topicClient); + verifyZeroInteractions(kafkaProducer); + } + + private void verifyDeleteTopic(final String topicName) { + verify(topicClient).deleteTopics(Collections.singletonList(topicName)); + } + + private void verifyCreateTopic(final String topicName) { + verify(topicClient).createTopic( + topicName, + INTERNAL_TOPIC_PARTITION_COUNT, + INTERNAL_TOPIC_REPLICAS_COUNT, + INTERNAL_TOPIC_CONFIG); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/RestoreOptionsTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/RestoreOptionsTest.java new file mode 100644 index 000000000000..b46befc64c00 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/RestoreOptionsTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.restore; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.io.IOException; +import org.apache.commons.lang3.ArrayUtils; +import org.junit.Test; + +public class RestoreOptionsTest { + private static final String[] REQUIRED_ARGS = + new String[]{"--config-file", "config file", "backup file"}; + + @Test + public void shouldUseDefaultValuesOnNonRequiredParameters() throws IOException { + // When + RestoreOptions restoreOptions = RestoreOptions.parse(REQUIRED_ARGS); + + // Then + assertThat(restoreOptions.isAutomaticYes(), is(false)); + assertThat(restoreOptions.getBackupFile().getPath(), is("backup file")); + assertThat(restoreOptions.getConfigFile().getPath(), is("config file")); + } + + @Test + public void shouldSetAutomaticYesOptionIfSupplied() throws IOException { + // When + RestoreOptions restoreOptions = RestoreOptions.parse(ArrayUtils.addAll(REQUIRED_ARGS, "--yes")); + + // Then + assertThat(restoreOptions.isAutomaticYes(), is(true)); + } +} \ No newline at end of file diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index be60c8267f72..7a8fa330d165 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -431,6 +431,7 @@ public void shouldBuildAggregatorParamsCorrectlyForUnwindowedAggregate() { } @Test + @SuppressWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") public void shouldBuildTumblingWindowedAggregateCorrectly() { // Given: givenTumblingWindowedAggregate(); @@ -460,6 +461,7 @@ public void shouldBuildTumblingWindowedAggregateCorrectly() { } @Test + @SuppressWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") public void shouldBuildHoppingWindowedAggregateCorrectly() { // Given: givenHoppingWindowedAggregate(); @@ -490,6 +492,7 @@ public void shouldBuildHoppingWindowedAggregateCorrectly() { } @Test + @SuppressWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") public void shouldBuildSessionWindowedAggregateCorrectly() { // Given: givenSessionWindowedAggregate(); From 4a61d963f1f25e1e4ec8ada845a9709394d3702b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Thu, 15 Oct 2020 11:51:03 -0500 Subject: [PATCH 2/5] fix: use transactions to restore the commands --- .../restore/KsqlRestoreCommandTopic.java | 114 +++++++++++++----- .../restore/KsqlRestoreCommandTopicTest.java | 25 +++- .../streams/StreamAggregateBuilderTest.java | 1 - 3 files changed, 104 insertions(+), 36 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java index 4d30fc657573..95d8300fe3e1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java @@ -17,7 +17,9 @@ import static java.util.Objects.requireNonNull; +import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.properties.PropertiesUtil; +import io.confluent.ksql.rest.DefaultErrorMessages; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.server.BackupInputFile; import io.confluent.ksql.rest.server.computation.Command; @@ -35,16 +37,21 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; -import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serializer; @@ -201,7 +208,30 @@ public static void main(final String[] args) throws Exception { private final KsqlConfig serverConfig; private final String commandTopicName; private final KafkaTopicClient topicClient; - private final Producer kafkaProducer; + private final Supplier> kafkaProducerSupplier; + + private static KafkaProducer transactionalProducer( + final KsqlConfig serverConfig + ) { + final Map transactionalProperties = + new HashMap<>(serverConfig.getProducerClientConfigProps()); + + transactionalProperties.put( + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + serverConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG) + ); + + transactionalProperties.put( + ProducerConfig.ACKS_CONFIG, + "all" + ); + + return new KafkaProducer<>( + transactionalProperties, + BYTES_SERIALIZER, + BYTES_SERIALIZER + ); + } KsqlRestoreCommandTopic(final KsqlConfig serverConfig) { this( @@ -209,24 +239,21 @@ public static void main(final String[] args) throws Exception { ReservedInternalTopics.commandTopic(serverConfig), ServiceContextFactory.create(serverConfig, () -> /* no ksql client */ null).getTopicClient(), - new KafkaProducer<>( - serverConfig.getProducerClientConfigProps(), - BYTES_SERIALIZER, - BYTES_SERIALIZER - ) + () -> transactionalProducer(serverConfig) ); } + @VisibleForTesting KsqlRestoreCommandTopic( final KsqlConfig serverConfig, final String commandTopicName, final KafkaTopicClient topicClient, - final Producer kafkaProducer + final Supplier> kafkaProducerSupplier ) { this.serverConfig = requireNonNull(serverConfig, "serverConfig"); this.commandTopicName = requireNonNull(commandTopicName, "commandTopicName"); this.topicClient = requireNonNull(topicClient, "topicClient"); - this.kafkaProducer = requireNonNull(kafkaProducer, "kafkaProducer"); + this.kafkaProducerSupplier = requireNonNull(kafkaProducerSupplier, "kafkaProducerSupplier"); } public void restore(final List> backupCommands) { @@ -254,35 +281,60 @@ private void deleteCommandTopicIfExists() { } private void restoreCommandTopic(final List> commands) { - final List> futures = new ArrayList<>(commands.size()); - - for (final Pair command : commands) { - futures.add(enqueueCommand(command.getLeft(), command.getRight())); - } - - int i = 0; - for (final Future future : futures) { - try { - future.get(); - } catch (final InterruptedException e) { - throw new KsqlException("Restore process was interrupted.", e); - } catch (final Exception e) { - throw new KsqlException( - String.format("Failed restoring command (line %d): %s", - i + 1, new String(commands.get(i).getLeft(), StandardCharsets.UTF_8)), e); + try (Producer kafkaProducer = createTransactionalProducer()) { + for (int i = 0; i < commands.size(); i++) { + final Pair command = commands.get(i); + + try { + kafkaProducer.beginTransaction(); + enqueueCommand(kafkaProducer, command.getLeft(), command.getRight()); + kafkaProducer.commitTransaction(); + } catch (final ProducerFencedException + | OutOfOrderSequenceException + | AuthorizationException e + ) { + // We can't recover from these exceptions, so our only option is close producer and exit. + // This catch doesn't abortTransaction() since doing that would throw another exception. + throw new KsqlException( + String.format("Failed restoring command (line %d): %s", + i + 1, new String(commands.get(i).getLeft(), StandardCharsets.UTF_8)), e); + } catch (final InterruptedException e) { + kafkaProducer.abortTransaction(); + throw new KsqlException("Restore process was interrupted.", e); + } catch (final Exception e) { + kafkaProducer.abortTransaction(); + throw new KsqlException( + String.format("Failed restoring command (line %d): %s", + i + 1, new String(commands.get(i).getLeft(), StandardCharsets.UTF_8)), e); + } } - - i++; } } - public Future enqueueCommand(final byte[] commandId, final byte[] command) { + private void enqueueCommand( + final Producer kafkaProducer, + final byte[] commandId, + final byte[] command + ) throws ExecutionException, InterruptedException { final ProducerRecord producerRecord = new ProducerRecord<>( commandTopicName, COMMAND_TOPIC_PARTITION, commandId, command); - return kafkaProducer.send(producerRecord); + kafkaProducer.send(producerRecord).get(); + } + + private Producer createTransactionalProducer() { + try { + final Producer kafkaProducer = kafkaProducerSupplier.get(); + kafkaProducer.initTransactions(); + return kafkaProducer; + } catch (final TimeoutException e) { + final DefaultErrorMessages errorMessages = new DefaultErrorMessages(); + throw new KsqlException(errorMessages.transactionInitTimeoutErrorMessage(e), e); + } catch (final Exception e) { + throw new KsqlException("Failed to initialize topic transactions.", e); + } } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java index dbc07b346db1..40b0e12a0540 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java @@ -41,7 +41,6 @@ import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -115,7 +114,7 @@ public void setup() { serverConfig, COMMAND_TOPIC_NAME, topicClient, - kafkaProducer + () -> kafkaProducer ); when(kafkaProducer.send(RECORD_1)).thenReturn(future1); @@ -133,12 +132,16 @@ public void shouldCreateAndRestoreCommandTopic() throws ExecutionException, Inte // Then: verifyCreateTopic(COMMAND_TOPIC_NAME); + verify(kafkaProducer).initTransactions(); + verify(kafkaProducer, times(3)).beginTransaction(); verify(kafkaProducer).send(RECORD_1); verify(kafkaProducer).send(RECORD_2); verify(kafkaProducer).send(RECORD_3); verify(future1).get(); verify(future2).get(); verify(future3).get(); + verify(kafkaProducer, times(3)).commitTransaction(); + verify(kafkaProducer).close(); verifyNoMoreInteractions(kafkaProducer, future1, future2, future3); } @@ -156,11 +159,15 @@ public void shouldThrowWhenRestoreIsInterrupted() throws Exception { // Then: assertThat(e.getMessage(), containsString("Restore process was interrupted.")); verifyCreateTopic(COMMAND_TOPIC_NAME); + verify(kafkaProducer).initTransactions(); + verify(kafkaProducer, times(2)).beginTransaction(); verify(kafkaProducer).send(RECORD_1); verify(kafkaProducer).send(RECORD_2); - verify(kafkaProducer).send(RECORD_3); verify(future1).get(); verify(future2).get(); + verify(kafkaProducer).commitTransaction(); + verify(kafkaProducer).abortTransaction(); + verify(kafkaProducer).close(); verifyNoMoreInteractions(kafkaProducer, future1, future2); verifyZeroInteractions(future3); } @@ -182,11 +189,15 @@ public void shouldThrowWhenRestoreExecutionFails() throws Exception { new String(RECORD_2.key(), StandardCharsets.UTF_8)))); verifyCreateTopic(COMMAND_TOPIC_NAME); + verify(kafkaProducer).initTransactions(); + verify(kafkaProducer, times(2)).beginTransaction(); verify(kafkaProducer).send(RECORD_1); verify(kafkaProducer).send(RECORD_2); - verify(kafkaProducer).send(RECORD_3); verify(future1).get(); verify(future2).get(); + verify(kafkaProducer).commitTransaction(); + verify(kafkaProducer).abortTransaction(); + verify(kafkaProducer).close(); verifyNoMoreInteractions(kafkaProducer, future1, future2); verifyZeroInteractions(future3); } @@ -201,6 +212,8 @@ public void shouldRestoreCommandTopicWithEmptyCommands() { // Then: verifyCreateTopic(COMMAND_TOPIC_NAME); + verify(kafkaProducer).initTransactions(); + verify(kafkaProducer).close(); verifyZeroInteractions(kafkaProducer, future1, future2, future3); } @@ -215,8 +228,12 @@ public void shouldDeleteAndCreateCommandTopicOnRestore() throws Exception { // Then: verifyDeleteTopic(COMMAND_TOPIC_NAME); verifyCreateTopic(COMMAND_TOPIC_NAME); + verify(kafkaProducer).initTransactions(); + verify(kafkaProducer).beginTransaction(); verify(kafkaProducer).send(RECORD_1); verify(future1).get(); + verify(kafkaProducer).commitTransaction(); + verify(kafkaProducer).close(); verifyNoMoreInteractions(kafkaProducer, future1); } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index 7a8fa330d165..cfab1f8be191 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -431,7 +431,6 @@ public void shouldBuildAggregatorParamsCorrectlyForUnwindowedAggregate() { } @Test - @SuppressWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") public void shouldBuildTumblingWindowedAggregateCorrectly() { // Given: givenTumblingWindowedAggregate(); From 1a3c664e8423ee62c37f92fecf0c6c6dd8aa1212 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Mon, 19 Oct 2020 15:25:41 -0500 Subject: [PATCH 3/5] fix: address feedback - use inOrder on unit tests --- .../restore/KsqlRestoreCommandTopicTest.java | 86 +++++++++++-------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java index 40b0e12a0540..3eacbcc483b3 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java @@ -27,6 +27,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -41,6 +42,7 @@ import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -132,16 +134,22 @@ public void shouldCreateAndRestoreCommandTopic() throws ExecutionException, Inte // Then: verifyCreateTopic(COMMAND_TOPIC_NAME); - verify(kafkaProducer).initTransactions(); - verify(kafkaProducer, times(3)).beginTransaction(); - verify(kafkaProducer).send(RECORD_1); - verify(kafkaProducer).send(RECORD_2); - verify(kafkaProducer).send(RECORD_3); - verify(future1).get(); - verify(future2).get(); - verify(future3).get(); - verify(kafkaProducer, times(3)).commitTransaction(); - verify(kafkaProducer).close(); + + final InOrder inOrder = inOrder(kafkaProducer, future1, future2, future3); + inOrder.verify(kafkaProducer).initTransactions(); + inOrder.verify(kafkaProducer).beginTransaction(); + inOrder.verify(kafkaProducer).send(RECORD_1); + inOrder.verify(future1).get(); + inOrder.verify(kafkaProducer).commitTransaction(); + inOrder.verify(kafkaProducer).beginTransaction(); + inOrder.verify(kafkaProducer).send(RECORD_2); + inOrder.verify(future2).get(); + inOrder.verify(kafkaProducer).commitTransaction(); + inOrder.verify(kafkaProducer).beginTransaction(); + inOrder.verify(kafkaProducer).send(RECORD_3); + inOrder.verify(future3).get(); + inOrder.verify(kafkaProducer).commitTransaction(); + inOrder.verify(kafkaProducer).close(); verifyNoMoreInteractions(kafkaProducer, future1, future2, future3); } @@ -159,15 +167,17 @@ public void shouldThrowWhenRestoreIsInterrupted() throws Exception { // Then: assertThat(e.getMessage(), containsString("Restore process was interrupted.")); verifyCreateTopic(COMMAND_TOPIC_NAME); - verify(kafkaProducer).initTransactions(); - verify(kafkaProducer, times(2)).beginTransaction(); - verify(kafkaProducer).send(RECORD_1); - verify(kafkaProducer).send(RECORD_2); - verify(future1).get(); - verify(future2).get(); - verify(kafkaProducer).commitTransaction(); - verify(kafkaProducer).abortTransaction(); - verify(kafkaProducer).close(); + final InOrder inOrder = inOrder(kafkaProducer, future1, future2); + inOrder.verify(kafkaProducer).initTransactions(); + inOrder.verify(kafkaProducer).beginTransaction(); + inOrder.verify(kafkaProducer).send(RECORD_1); + inOrder.verify(future1).get(); + inOrder.verify(kafkaProducer).commitTransaction(); + inOrder.verify(kafkaProducer).beginTransaction(); + inOrder.verify(kafkaProducer).send(RECORD_2); + inOrder.verify(future2).get(); + inOrder.verify(kafkaProducer).abortTransaction(); + inOrder.verify(kafkaProducer).close(); verifyNoMoreInteractions(kafkaProducer, future1, future2); verifyZeroInteractions(future3); } @@ -189,15 +199,17 @@ public void shouldThrowWhenRestoreExecutionFails() throws Exception { new String(RECORD_2.key(), StandardCharsets.UTF_8)))); verifyCreateTopic(COMMAND_TOPIC_NAME); - verify(kafkaProducer).initTransactions(); - verify(kafkaProducer, times(2)).beginTransaction(); - verify(kafkaProducer).send(RECORD_1); - verify(kafkaProducer).send(RECORD_2); - verify(future1).get(); - verify(future2).get(); - verify(kafkaProducer).commitTransaction(); - verify(kafkaProducer).abortTransaction(); - verify(kafkaProducer).close(); + final InOrder inOrder = inOrder(kafkaProducer, future1, future2); + inOrder.verify(kafkaProducer).initTransactions(); + inOrder.verify(kafkaProducer).beginTransaction(); + inOrder.verify(kafkaProducer).send(RECORD_1); + inOrder.verify(future1).get(); + inOrder.verify(kafkaProducer).commitTransaction(); + inOrder.verify(kafkaProducer).beginTransaction(); + inOrder.verify(kafkaProducer).send(RECORD_2); + inOrder.verify(future2).get(); + inOrder.verify(kafkaProducer).abortTransaction(); + inOrder.verify(kafkaProducer).close(); verifyNoMoreInteractions(kafkaProducer, future1, future2); verifyZeroInteractions(future3); } @@ -212,8 +224,9 @@ public void shouldRestoreCommandTopicWithEmptyCommands() { // Then: verifyCreateTopic(COMMAND_TOPIC_NAME); - verify(kafkaProducer).initTransactions(); - verify(kafkaProducer).close(); + final InOrder inOrder = inOrder(kafkaProducer); + inOrder.verify(kafkaProducer).initTransactions(); + inOrder.verify(kafkaProducer).close(); verifyZeroInteractions(kafkaProducer, future1, future2, future3); } @@ -228,12 +241,13 @@ public void shouldDeleteAndCreateCommandTopicOnRestore() throws Exception { // Then: verifyDeleteTopic(COMMAND_TOPIC_NAME); verifyCreateTopic(COMMAND_TOPIC_NAME); - verify(kafkaProducer).initTransactions(); - verify(kafkaProducer).beginTransaction(); - verify(kafkaProducer).send(RECORD_1); - verify(future1).get(); - verify(kafkaProducer).commitTransaction(); - verify(kafkaProducer).close(); + final InOrder inOrder = inOrder(kafkaProducer, future1); + inOrder.verify(kafkaProducer).initTransactions(); + inOrder.verify(kafkaProducer).beginTransaction(); + inOrder.verify(kafkaProducer).send(RECORD_1); + inOrder.verify(future1).get(); + inOrder.verify(kafkaProducer).commitTransaction(); + inOrder.verify(kafkaProducer).close(); verifyNoMoreInteractions(kafkaProducer, future1); } From 8ca99e5c210349eba5a8a3281b7840e739a1650f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Mon, 19 Oct 2020 18:31:09 -0500 Subject: [PATCH 4/5] test: add integration test --- .../RestoreCommandTopicIntegrationTest.java | 183 ++++++++++++++++++ .../ksql/rest/server/TestKsqlRestApp.java | 12 +- 2 files changed, 191 insertions(+), 4 deletions(-) create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java new file mode 100644 index 000000000000..32f204a79fb6 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java @@ -0,0 +1,183 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.integration; + +import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.integration.IntegrationTestHarness; +import io.confluent.ksql.integration.Retry; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.KsqlWarning; +import io.confluent.ksql.rest.entity.SourceInfo; +import io.confluent.ksql.rest.entity.StreamsList; +import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; +import io.confluent.ksql.rest.server.BackupInputFile; +import io.confluent.ksql.rest.server.TestKsqlRestApp; +import io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.ReservedInternalTopics; +import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.streams.StreamsConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static io.confluent.ksql.util.KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION; +import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.is; + + +/** + * Tests covering integration tests for backup/restore the command topic. + */ +@Category({IntegrationTest.class}) +public class RestoreCommandTopicIntegrationTest { + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + + @ClassRule + public static final RuleChain CHAIN = RuleChain + .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) + .around(TEST_HARNESS); + + @ClassRule + public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + + private static KsqlConfig KSQL_CONFIG; + private static File BACKUP_LOCATION; + private static TestKsqlRestApp REST_APP; + private static String COMMAND_TOPIC; + private static Path BACKUP_FILE; + private static Path PROPERTIES_FILE; + + @BeforeClass + public static void setup() throws IOException { + BACKUP_LOCATION = TMP_FOLDER.newFolder(); + + REST_APP = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) + .withProperty(KSQL_METASTORE_BACKUP_LOCATION, BACKUP_LOCATION.getPath()) + .build(); + + REST_APP.start(); + + KSQL_CONFIG = new KsqlConfig(REST_APP.getKsqlRestConfig().getKsqlConfigProperties()); + COMMAND_TOPIC = ReservedInternalTopics.commandTopic(KSQL_CONFIG); + BACKUP_FILE = Files.list(BACKUP_LOCATION.toPath()).findFirst().get(); + PROPERTIES_FILE = TMP_FOLDER.newFile().toPath(); + + writeServerProperties(); + } + + @AfterClass + public static void teardown() { + REST_APP.stop(); + TMP_FOLDER.delete(); + } + + private static void writeServerProperties() throws IOException { + final Map map = REST_APP.getKsqlRestConfig().getKsqlConfigProperties(); + + Files.write( + PROPERTIES_FILE, + map.keySet().stream() + .map((key -> key + "=" + map.get(key))) + .collect(Collectors.joining("\n")) + .getBytes(StandardCharsets.UTF_8), + StandardOpenOption.CREATE + ); + } + + @Test + public void shouldBackupAndRestoreCommandTopic() throws Exception { + // Given + TEST_HARNESS.ensureTopics("topic1", "topic2"); + + makeKsqlRequest("CREATE STREAM TOPIC1 (ID INT) " + + "WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='JSON');"); + makeKsqlRequest("CREATE STREAM TOPIC2 (ID INT) " + + "WITH (KAFKA_TOPIC='topic2', VALUE_FORMAT='JSON');"); + makeKsqlRequest("CREATE STREAM stream1 AS SELECT * FROM topic1;"); + makeKsqlRequest("CREATE STREAM stream2 AS SELECT * FROM topic2;"); + + // When + + // Delete the command topic and check the server is in degraded state + TEST_HARNESS.deleteTopics(Collections.singletonList(COMMAND_TOPIC)); + assertThat("Server should be in degraded state", isDegradedState(), is(true)); + + // Restore the command topic + KsqlRestoreCommandTopic.main( + new String[]{ + "--yes", + "--config-file", PROPERTIES_FILE.toString(), + BACKUP_FILE.toString() + }); + + // Re-load the command topic + REST_APP.stop(); + REST_APP.start(); + + // Then + final List streamsNames = showStreams(); + assertThat("Should have TOPIC1", streamsNames.contains("TOPIC1"), is(true)); + assertThat("Should have TOPIC2", streamsNames.contains("TOPIC2"), is(true)); + assertThat("Should have STREAM1", streamsNames.contains("STREAM1"), is(true)); + assertThat("Should have STREAM1", streamsNames.contains("STREAM1"), is(true)); + assertThat("Server should NOT be in degraded state", isDegradedState(), is(false)); + } + + private boolean isDegradedState() { + // If in degraded state, then the following command will return a warning + final List response = makeKsqlRequest( + "CREATE STREAM ANY (id INT) WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='JSON');"); + + final List warnings = response.get(0).getWarnings(); + return warnings.size() > 0 && warnings.get(0).getMessage() + .contains("The server has detected corruption in the command topic"); + } + + private List showStreams() { + return ((StreamsList)makeKsqlRequest("SHOW STREAMS;").get(0)) + .getStreams().stream().map(SourceInfo::getName).collect(Collectors.toList()); + } + + private List makeKsqlRequest(final String sql) { + return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java index 50668a052220..af27edfb13eb 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java @@ -19,7 +19,6 @@ import static org.easymock.EasyMock.niceMock; import com.google.common.collect.ImmutableMap; -import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.client.BasicCredentials; @@ -93,6 +92,7 @@ public class TestKsqlRestApp extends ExternalResource { protected final Optional credentials; protected Optional internalListener; protected KsqlExecutionContext ksqlEngine; + protected KsqlRestConfig ksqlRestConfig; protected KsqlRestApplication ksqlRestApplication; protected long lastCommandSequenceNumber = -1L; @@ -246,6 +246,10 @@ public ServiceContext getServiceContext() { return serviceContext.get(); } + public KsqlRestConfig getKsqlRestConfig() { + return ksqlRestConfig; + } + @Override protected void before() { initialize(); @@ -282,20 +286,20 @@ protected void initialize() { after(); } - final KsqlRestConfig config = buildConfig(bootstrapServers, baseConfig); + ksqlRestConfig = buildConfig(bootstrapServers, baseConfig); try { Vertx vertx = Vertx.vertx(); ksqlRestApplication = KsqlRestApplication.buildApplication( metricsPrefix, - config, + ksqlRestConfig, (booleanSupplier) -> niceMock(VersionCheckerAgent.class), 3, serviceContext.get(), () -> serviceContext.get().getSchemaRegistryClient(), vertx, InternalKsqlClientFactory.createInternalClient( - KsqlRestApplication.toClientProps(config.originals()), + KsqlRestApplication.toClientProps(ksqlRestConfig.originals()), SocketAddress::inetSocketAddress, vertx)); From 4db06da3cc5d2a1e38ec949430655b10dd09f289 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Mon, 19 Oct 2020 20:40:51 -0500 Subject: [PATCH 5/5] refactor: revert BackupInputFile split --- .../ksql/rest/server/BackupInputFile.java | 64 ----------- .../ksql/rest/server/BackupReplayFile.java | 65 ++++++++++-- .../rest/server/CommandTopicBackupImpl.java | 4 +- .../restore/KsqlRestoreCommandTopic.java | 4 +- .../RestoreCommandTopicIntegrationTest.java | 7 -- .../ksql/rest/server/BackupInputFileTest.java | 100 ------------------ .../rest/server/BackupReplayFileTest.java | 6 +- .../restore/KsqlRestoreCommandTopicTest.java | 5 +- 8 files changed, 70 insertions(+), 185 deletions(-) delete mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupInputFile.java delete mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupInputFileTest.java diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupInputFile.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupInputFile.java deleted file mode 100644 index 0a4639161cc2..000000000000 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupInputFile.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2020 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.rest.server; - -import io.confluent.ksql.util.Pair; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -public class BackupInputFile { - protected static final String KEY_VALUE_SEPARATOR_STR = ":"; - protected static final String NEW_LINE_SEPARATOR_STR = "\n"; - - static final byte[] KEY_VALUE_SEPARATOR_BYTES = - KEY_VALUE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8); - static final byte[] NEW_LINE_SEPARATOR_BYTES = - NEW_LINE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8); - - private final File file; - - public BackupInputFile(final File file) { - this.file = Objects.requireNonNull(file, "file"); - } - - public File getFile() { - return file; - } - - public String getPath() { - return file.getAbsolutePath(); - } - - public List> readRecords() throws IOException { - final List> commands = new ArrayList<>(); - for (final String line : Files.readAllLines(getFile().toPath(), StandardCharsets.UTF_8)) { - final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR_STR)); - final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR_STR) + 1); - - commands.add(new Pair<>( - commandId.getBytes(StandardCharsets.UTF_8), - command.getBytes(StandardCharsets.UTF_8) - )); - } - - return commands; - } -} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java index e5c7a30d9dc0..e525835cf2fc 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java @@ -16,22 +16,50 @@ package io.confluent.ksql.rest.server; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.Pair; import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import org.apache.kafka.clients.consumer.ConsumerRecord; /** * A file that is used by the backup service to replay command_topic commands. */ -public final class BackupReplayFile extends BackupInputFile implements Closeable { +public final class BackupReplayFile implements Closeable { + private static final String KEY_VALUE_SEPARATOR_STR = ":"; + private static final String NEW_LINE_SEPARATOR_STR = "\n"; + + private static final byte[] KEY_VALUE_SEPARATOR_BYTES = + KEY_VALUE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8); + private static final byte[] NEW_LINE_SEPARATOR_BYTES = + NEW_LINE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8); + + private final File file; private final FileOutputStream writer; - public BackupReplayFile(final File file) { - super(file); - this.writer = createWriter(file); + public static BackupReplayFile readOnly(final File file) { + return new BackupReplayFile(file, false); + } + + public static BackupReplayFile writable(final File file) { + return new BackupReplayFile(file, true); + } + + private BackupReplayFile(final File file, final boolean write) { + this.file = Objects.requireNonNull(file, "file"); + + if (write) { + this.writer = createWriter(file); + } else { + this.writer = null; + } } private static FileOutputStream createWriter(final File file) { @@ -43,11 +71,19 @@ private static FileOutputStream createWriter(final File file) { } } + public File getFile() { + return file; + } + public String getPath() { - return getFile().getAbsolutePath(); + return file.getAbsolutePath(); } public void write(final ConsumerRecord record) throws IOException { + if (writer == null) { + throw new IOException("Write permission denied."); + } + writer.write(record.key()); writer.write(KEY_VALUE_SEPARATOR_BYTES); writer.write(record.value()); @@ -55,8 +91,25 @@ public void write(final ConsumerRecord record) throws IOExceptio writer.flush(); } + public List> readRecords() throws IOException { + final List> commands = new ArrayList<>(); + for (final String line : Files.readAllLines(getFile().toPath(), StandardCharsets.UTF_8)) { + final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR_STR)); + final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR_STR) + 1); + + commands.add(new Pair<>( + commandId.getBytes(StandardCharsets.UTF_8), + command.getBytes(StandardCharsets.UTF_8) + )); + } + + return commands; + } + @Override public void close() throws IOException { - writer.close(); + if (writer != null) { + writer.close(); + } } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java index 492842e3041c..34cd32359618 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java @@ -202,7 +202,7 @@ BackupReplayFile openOrCreateReplayFile() { } private BackupReplayFile newReplayFile() { - return new BackupReplayFile(Paths.get( + return BackupReplayFile.writable(Paths.get( backupLocation.getAbsolutePath(), String.format("%s%s_%s", PREFIX, topicName, ticker.read()) ).toFile()); @@ -236,7 +236,7 @@ private Optional latestReplayFile() { } return (latestBakFile != null) - ? Optional.of(new BackupReplayFile(latestBakFile)) + ? Optional.of(BackupReplayFile.writable(latestBakFile)) : Optional.empty(); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java index 95d8300fe3e1..db4dd8a0dadc 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java @@ -21,7 +21,7 @@ import io.confluent.ksql.properties.PropertiesUtil; import io.confluent.ksql.rest.DefaultErrorMessages; import io.confluent.ksql.rest.entity.CommandId; -import io.confluent.ksql.rest.server.BackupInputFile; +import io.confluent.ksql.rest.server.BackupReplayFile; import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.rest.server.computation.InternalTopicSerdes; import io.confluent.ksql.rest.util.KsqlInternalTopicUtils; @@ -68,7 +68,7 @@ private static KsqlConfig loadServerConfig(final File configFile) { } public static List> loadBackup(final File file) throws IOException { - final BackupInputFile commandTopicBackupFile = new BackupInputFile(file); + final BackupReplayFile commandTopicBackupFile = BackupReplayFile.readOnly(file); final List> records = commandTopicBackupFile.readRecords(); throwOnInvalidRecords(records); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java index 32f204a79fb6..ffc6d726ed08 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java @@ -23,11 +23,9 @@ import io.confluent.ksql.rest.entity.SourceInfo; import io.confluent.ksql.rest.entity.StreamsList; import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; -import io.confluent.ksql.rest.server.BackupInputFile; import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.ReservedInternalTopics; import kafka.zookeeper.ZooKeeperClientException; import org.apache.kafka.streams.StreamsConfig; @@ -39,16 +37,12 @@ import org.junit.rules.RuleChain; import org.junit.rules.TemporaryFolder; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsString; - import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -58,7 +52,6 @@ import static io.confluent.ksql.util.KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION; import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.Matchers.is; diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupInputFileTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupInputFileTest.java deleted file mode 100644 index 95275e156370..000000000000 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupInputFileTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2020 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.rest.server; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; - -import io.confluent.ksql.util.Pair; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.List; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class BackupInputFileTest { - private static final String KEY_VALUE_SEPARATOR = ":"; - private static final String BACKUP_FILE_NAME = "backup_command_topic_1"; - - @Rule - public TemporaryFolder backupLocation = new TemporaryFolder(); - - private File internalReplayFile; - private BackupInputFile backupFile; - - @Before - public void setup() throws IOException { - internalReplayFile = backupLocation.newFile(BACKUP_FILE_NAME); - backupFile = new BackupInputFile(internalReplayFile); - } - - @Test - public void shouldGetFileAndPaths() { - // When/Then - assertThat(backupFile.getFile(), is(internalReplayFile)); - assertThat(backupFile.getPath(), is(internalReplayFile.getPath())); - } - - @Test - public void shouldBeEmptyWhenReadAllCommandsFromEmptyFile() throws IOException { - // When - final List commands = backupFile.readRecords(); - - // Then - assertThat(commands.size(), is(0)); - } - - @Test - public void shouldReadAndParseCommandsFromTheFile() throws IOException { - // Given - Files.write(internalReplayFile.toPath(), - String.format("%s%s%s%n%s%s%s", - "\"stream/stream1/create\"", - KEY_VALUE_SEPARATOR, - "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"," - + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}", - "\"stream/stream2/create\"", - KEY_VALUE_SEPARATOR, - "{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\"," - + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}" - ).getBytes(StandardCharsets.UTF_8)); - - // When - final List> commands = backupFile.readRecords(); - - // Then - assertThat(commands.size(), is(2)); - assertThat(new String(commands.get(0).left, StandardCharsets.UTF_8), - is("\"stream/stream1/create\"")); - assertThat(new String(commands.get(0).right, StandardCharsets.UTF_8), - is( - "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"," - + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}")); - assertThat(new String(commands.get(1).left, StandardCharsets.UTF_8), - is("\"stream/stream2/create\"")); - assertThat(new String(commands.get(1).right, StandardCharsets.UTF_8), - is( - "{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\"," - + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}")); - } -} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java index 49ee95bccf20..7f3656af0198 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java @@ -48,7 +48,7 @@ public class BackupReplayFileTest { @Before public void setup() throws IOException { internalReplayFile = backupLocation.newFile(REPLAY_FILE_NAME); - replayFile = new BackupReplayFile(internalReplayFile); + replayFile = BackupReplayFile.writable(internalReplayFile); } @Test @@ -123,7 +123,7 @@ public void shouldReadCommands() throws IOException { KEY_VALUE_SEPARATOR, "{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\"," + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}" - ).getBytes(StandardCharsets.UTF_8)); + ).getBytes(StandardCharsets.UTF_8)); // When final List> commands = replayFile.readRecords(); @@ -159,4 +159,4 @@ private String buildValue(final String streamName) { + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}", streamName, streamName); } -} +} \ No newline at end of file diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java index 3eacbcc483b3..d35c0c613d02 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopicTest.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server.restore; import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; @@ -251,7 +252,7 @@ public void shouldDeleteAndCreateCommandTopicOnRestore() throws Exception { verifyNoMoreInteractions(kafkaProducer, future1); } - @SuppressWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") @Test public void shouldThrowIfCannotDescribeTopicExists() { // Given: @@ -267,6 +268,7 @@ public void shouldThrowIfCannotDescribeTopicExists() { verifyZeroInteractions(kafkaProducer); } + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") @Test public void shouldThrowIfCannotDeleteTopic() { // Given: @@ -287,6 +289,7 @@ public void shouldThrowIfCannotDeleteTopic() { verifyZeroInteractions(kafkaProducer); } + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") @Test public void shouldThrowIfCannotCreateTopic() { // Given: