Skip to content

Commit

Permalink
feat: update ksql restore command to skip incompatible commands if fl…
Browse files Browse the repository at this point in the history
…ag set
  • Loading branch information
stevenpyzhang committed Oct 26, 2020
1 parent df98ef4 commit 64a2ccc
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ public Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(final Duration ti
log.warn("Backup is out of sync with the current command topic. "
+ "Backups will not work until the previous command topic is "
+ "restored or all backup files are deleted.", e);
return records;
}
records.add(record);
}
Expand All @@ -117,7 +116,6 @@ public List<QueuedCommand> getRestoreCommands(final Duration duration) {
log.warn("Backup is out of sync with the current command topic. "
+ "Backups will not work until the previous command topic is "
+ "restored or all backup files are deleted.", e);
return restoreCommands;
}

if (record.value() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public class Command {

@VisibleForTesting
static final int VERSION = 1;
public static final int VERSION = 1;

private final String statement;
private final Map<String, Object> overwriteProperties;
Expand Down Expand Up @@ -80,7 +80,7 @@ public Command(
}

@VisibleForTesting
Command(
public Command(
final String statement,
final Map<String, Object> overwriteProperties,
final Map<String, String> originalProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.rest.server.BackupReplayFile;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContextFactory;
Expand All @@ -37,6 +38,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -51,6 +53,7 @@
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
Expand All @@ -67,17 +70,26 @@ private static KsqlConfig loadServerConfig(final File configFile) {
return new KsqlConfig(serverProps);
}

public static List<Pair<byte[], byte[]>> loadBackup(final File file) throws IOException {
public static List<Pair<byte[], byte[]>> loadBackup(
final File file,
final RestoreOptions options
) throws IOException {
final BackupReplayFile commandTopicBackupFile = BackupReplayFile.readOnly(file);
List<Pair<byte[], byte[]>> records = commandTopicBackupFile.readRecords();

final List<Pair<byte[], byte[]>> records = commandTopicBackupFile.readRecords();
throwOnInvalidRecords(records);
if (options.isSkipIncompatibleCommands()) {
records = removeIncompatibleCommands(records);
}

return records;
}

private static void throwOnInvalidRecords(final List<Pair<byte[], byte[]>> records) {
private static List<Pair<byte[], byte[]>> removeIncompatibleCommands(
final List<Pair<byte[], byte[]>> records
) {
int n = 0;
int numFilteredCommands = 0;
final List<Pair<byte[], byte[]>> filteredRecords = new ArrayList<>();

for (final Pair<byte[], byte[]> record : records) {
n++;
Expand All @@ -87,21 +99,28 @@ private static void throwOnInvalidRecords(final List<Pair<byte[], byte[]>> recor
.deserialize(null, record.getLeft());
} catch (final Exception e) {
throw new KsqlException(String.format(
"Invalid CommandId string (line %d): %s",
n, new String(record.getLeft(), StandardCharsets.UTF_8), e
"Invalid CommandId string (line %d): %s (%s)",
n, new String(record.getLeft(), StandardCharsets.UTF_8), e.getMessage()
));
}

try {
InternalTopicSerdes.deserializer(Command.class)
.deserialize(null, record.getRight());
} catch (final SerializationException | IncomaptibleKsqlCommandVersionException e) {
numFilteredCommands++;
continue;
} catch (final Exception e) {
throw new KsqlException(String.format(
"Invalid Command string (line %d): %s",
n, new String(record.getRight(), StandardCharsets.UTF_8), e
"Invalid Command string (line %d): %s (%s)",
n, new String(record.getRight(), StandardCharsets.UTF_8), e.getMessage()
));
}
filteredRecords.add(record);
}
System.out.println(
String.format("%s incompatible command(s) skipped from backup file.", numFilteredCommands));
return filteredRecords;
}

private static void checkFileExists(final File file) throws Exception {
Expand Down Expand Up @@ -176,7 +195,7 @@ public static void main(final String[] args) throws Exception {

List<Pair<byte[], byte[]>> backupCommands = null;
try {
backupCommands = loadBackup(backupFile);
backupCommands = loadBackup(backupFile, restoreOptions);
} catch (final Exception e) {
System.err.println(String.format(
"Failed loading backup file.%nError = %s", e.getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ public class RestoreOptions {

@SuppressWarnings("unused") // Accessed via reflection
@Option(
name = {"--yes", "-y"},
description = "Automatic \"yes\" as answer to prompt and run non-interactively.")
name = {"--skip-incompatible-commands", "-s"},
description = "This restore command can restore command topic commands that "
+ "are of version (" + io.confluent.ksql.rest.server.computation.Command.VERSION + ") "
+ "or lower. If true, the restore command will skip all incompatible commands."
+ "If false, the restore command will restore the backup file as is.")
private boolean skipIncompatibleCommands = false;

@Option(
name = {"--yes", "-y"},
description = "Automatic \"yes\" as answer to prompt and run non-interactively.")
private boolean automaticYes = false;

@SuppressWarnings("unused") // Accessed via reflection
Expand All @@ -66,6 +74,10 @@ public boolean isAutomaticYes() {
return automaticYes;
}

public boolean isSkipIncompatibleCommands() {
return skipIncompatibleCommands;
}

public static RestoreOptions parse(final String...args) throws IOException {
return OptionsParser.parse(args, RestoreOptions.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,33 @@
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlWarning;
import io.confluent.ksql.rest.entity.SourceInfo;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.BackupReplayFile;
import io.confluent.ksql.rest.server.CommandTopicBackupImpl;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.ReservedInternalTopics;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -38,20 +53,29 @@
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static io.confluent.ksql.util.KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION;
import static io.confluent.ksql.util.KsqlConfig.KSQL_SERVICE_ID_CONFIG;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;


Expand All @@ -61,6 +85,7 @@
@Category({IntegrationTest.class})
public class RestoreCommandTopicIntegrationTest {
private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();
private static final Serializer<byte[]> BYTES_SERIALIZER = new ByteArraySerializer();

@ClassRule
public static final RuleChain CHAIN = RuleChain
Expand All @@ -78,28 +103,35 @@ public class RestoreCommandTopicIntegrationTest {
private static Path PROPERTIES_FILE;

@BeforeClass
public static void setup() throws IOException {
public static void classSetUp() throws IOException {
BACKUP_LOCATION = TMP_FOLDER.newFolder();

REST_APP = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_METASTORE_BACKUP_LOCATION, BACKUP_LOCATION.getPath())
.build();
}

@Before
public void setup() throws IOException {
REST_APP.start();

KSQL_CONFIG = new KsqlConfig(REST_APP.getKsqlRestConfig().getKsqlConfigProperties());
COMMAND_TOPIC = ReservedInternalTopics.commandTopic(KSQL_CONFIG);
BACKUP_FILE = Files.list(BACKUP_LOCATION.toPath()).findFirst().get();
PROPERTIES_FILE = TMP_FOLDER.newFile().toPath();

writeServerProperties();
}

@AfterClass
public static void teardown() {
@After
public void teardown() throws IOException {
REST_APP.stop();
TEST_HARNESS.deleteTopics(Collections.singletonList(COMMAND_TOPIC));
new File(String.valueOf(BACKUP_FILE)).delete();
}

@After
public void teardownClass() {
TMP_FOLDER.delete();
}

Expand Down Expand Up @@ -132,7 +164,7 @@ public void shouldBackupAndRestoreCommandTopic() throws Exception {

// Delete the command topic and check the server is in degraded state
TEST_HARNESS.deleteTopics(Collections.singletonList(COMMAND_TOPIC));
assertThat("Server should be in degraded state", isDegradedState(), is(true));
assertThatEventually("Degraded State", this::isDegradedState, is(true));

// Restore the command topic
KsqlRestoreCommandTopic.main(
Expand All @@ -151,18 +183,81 @@ public void shouldBackupAndRestoreCommandTopic() throws Exception {
assertThat("Should have TOPIC1", streamsNames.contains("TOPIC1"), is(true));
assertThat("Should have TOPIC2", streamsNames.contains("TOPIC2"), is(true));
assertThat("Should have STREAM1", streamsNames.contains("STREAM1"), is(true));
assertThat("Should have STREAM1", streamsNames.contains("STREAM1"), is(true));
assertThat("Should have STREAM2", streamsNames.contains("STREAM2"), is(true));
assertThat("Server should NOT be in degraded state", isDegradedState(), is(false));
}

@Test
public void shouldSkipIncompatibleCommands() throws Exception {
// Given
TEST_HARNESS.ensureTopics("topic3", "topic4");

makeKsqlRequest("CREATE STREAM TOPIC3 (ID INT) "
+ "WITH (KAFKA_TOPIC='topic3', VALUE_FORMAT='JSON');");
makeKsqlRequest("CREATE STREAM TOPIC4 (ID INT) "
+ "WITH (KAFKA_TOPIC='topic4', VALUE_FORMAT='JSON');");
makeKsqlRequest("CREATE STREAM stream3 AS SELECT * FROM topic3;");

final CommandId commandId = new CommandId("TOPIC", "entity", "CREATE");
final Command command = new Command(
"statement",
Collections.emptyMap(),
Collections.emptyMap(),
Optional.empty(),
Optional.of(Command.VERSION + 1),
Command.VERSION + 1);

writeToBackupFile(commandId, command, BACKUP_FILE);

// Delete the command topic
TEST_HARNESS.deleteTopics(Collections.singletonList(COMMAND_TOPIC));
REST_APP.stop();

// Restore the command topic
KsqlRestoreCommandTopic.main(
new String[]{
"--yes",
"--config-file", PROPERTIES_FILE.toString(),
BACKUP_FILE.toString()
});

// Re-load the command topic
REST_APP.start();

// Server should be in degraded state since the backup file had incompatible command
assertThat("Server should be in degraded state", isDegradedState(), is(true));

// Delete the command topic again and restore with skip flag
TEST_HARNESS.deleteTopics(Collections.singletonList(COMMAND_TOPIC));
REST_APP.stop();
KsqlRestoreCommandTopic.main(
new String[]{
"--yes",
"-s",
"--config-file", PROPERTIES_FILE.toString(),
BACKUP_FILE.toString()
});

// Re-load the command topic
REST_APP.start();
final List<String> streamsNames = showStreams();
assertThat("Should have TOPIC3", streamsNames.contains("TOPIC3"), is(true));
assertThat("Should have TOPIC4", streamsNames.contains("TOPIC4"), is(true));
assertThat("Should have STREAM3", streamsNames.contains("STREAM3"), is(true));
assertThat("Server should not be in degraded state", isDegradedState(), is(false));
}

private boolean isDegradedState() {
// If in degraded state, then the following command will return a warning
final List<KsqlEntity> response = makeKsqlRequest(
"CREATE STREAM ANY (id INT) WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='JSON');");
"Show Streams;");

final List<KsqlWarning> warnings = response.get(0).getWarnings();
return warnings.size() > 0 && warnings.get(0).getMessage()
.contains("The server has detected corruption in the command topic");
return warnings.size() > 0 &&
(warnings.get(0).getMessage().contains(
DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE) ||
warnings.get(0).getMessage().contains(
DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_INCOMPATIBLE_COMMANDS_ERROR_MESSAGE));
}

private List<String> showStreams() {
Expand All @@ -173,4 +268,27 @@ private List<String> showStreams() {
private List<KsqlEntity> makeKsqlRequest(final String sql) {
return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql);
}

public static void writeToBackupFile(
final CommandId commandId,
final Command command,
final Path backUpFileLocation
) throws IOException {
FileOutputStream writer;
try {
writer = new FileOutputStream(new File(String.valueOf(backUpFileLocation)), true);
} catch (final FileNotFoundException e) {
throw new KsqlException(
String.format("Failed to open backup file: %s", backUpFileLocation), e);
}

final byte[] keyValueSeparator = ":".getBytes(StandardCharsets.UTF_8);
final byte[] newLine = "/n".getBytes(StandardCharsets.UTF_8);

writer.write(InternalTopicSerdes.serializer().serialize("", commandId));
writer.write(keyValueSeparator);
writer.write(InternalTopicSerdes.serializer().serialize("", command));
writer.write(newLine);
writer.flush();
}
}
Loading

0 comments on commit 64a2ccc

Please sign in to comment.