Skip to content

Commit

Permalink
feat: new ksql-restore-command-topic to restore backups
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Oct 15, 2020
1 parent 56f9712 commit cfbf1f5
Show file tree
Hide file tree
Showing 9 changed files with 897 additions and 33 deletions.
28 changes: 28 additions & 0 deletions bin/ksql-restore-command-topic
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
# (Copyright) [2020] Confluent, Inc.

base_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )/.." && pwd )

: "${KSQL_CONFIG_DIR:="$base_dir/config"}"

# logj4 settings
if [ -z "$KSQL_LOG4J_OPTS" ]; then
# Test for files from dev -> packages so this will work as expected in dev if you have packages
# installed
if [ -e "$base_dir/config/log4j.properties" ]; then # Dev environment
KSQL_CONFIG_DIR="$base_dir/config"
elif [ -e "$base_dir/etc/ksqldb/log4j.properties" ]; then # Simple zip file layout
KSQL_CONFIG_DIR="$base_dir/etc/ksqldb"
elif [ -e "/etc/ksqldb/log4j.properties" ]; then # Normal install layout
KSQL_CONFIG_DIR="/etc/ksqldb"
fi
fi

: "${KSQL_LOG4J_OPTS:=""}"

# Use file logging by default
if [ -z "$KSQL_LOG4J_OPTS" ]; then
export KSQL_LOG4J_OPTS="-Dlog4j.configuration=file:$KSQL_CONFIG_DIR/log4j-file.properties"
fi

exec "$base_dir"/bin/ksql-run-class io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic "$@"
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import io.confluent.ksql.util.Pair;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class BackupInputFile {
protected static final String KEY_VALUE_SEPARATOR_STR = ":";
protected static final String NEW_LINE_SEPARATOR_STR = "\n";

static final byte[] KEY_VALUE_SEPARATOR_BYTES =
KEY_VALUE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8);
static final byte[] NEW_LINE_SEPARATOR_BYTES =
NEW_LINE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8);

private final File file;

public BackupInputFile(final File file) {
this.file = Objects.requireNonNull(file, "file");
}

public File getFile() {
return file;
}

public String getPath() {
return file.getAbsolutePath();
}

public List<Pair<byte[], byte[]>> readRecords() throws IOException {
final List<Pair<byte[], byte[]>> commands = new ArrayList<>();
for (final String line : Files.readAllLines(getFile().toPath(), StandardCharsets.UTF_8)) {
final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR_STR));
final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR_STR) + 1);

commands.add(new Pair<>(
commandId.getBytes(StandardCharsets.UTF_8),
command.getBytes(StandardCharsets.UTF_8)
));
}

return commands;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,21 @@
package io.confluent.ksql.rest.server;

import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* A file that is used by the backup service to replay command_topic commands.
*/
public final class BackupReplayFile implements Closeable {
private static final String KEY_VALUE_SEPARATOR_STR = ":";
private static final String NEW_LINE_SEPARATOR_STR = "\n";

private static final byte[] KEY_VALUE_SEPARATOR_BYTES =
KEY_VALUE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8);
private static final byte[] NEW_LINE_SEPARATOR_BYTES =
NEW_LINE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8);

private final File file;
public final class BackupReplayFile extends BackupInputFile implements Closeable {
private final FileOutputStream writer;

public BackupReplayFile(final File file) {
this.file = Objects.requireNonNull(file, "file");
super(file);
this.writer = createWriter(file);
}

Expand All @@ -59,7 +44,7 @@ private static FileOutputStream createWriter(final File file) {
}

public String getPath() {
return file.getAbsolutePath();
return getFile().getAbsolutePath();
}

public void write(final ConsumerRecord<byte[], byte[]> record) throws IOException {
Expand All @@ -70,21 +55,6 @@ public void write(final ConsumerRecord<byte[], byte[]> record) throws IOExceptio
writer.flush();
}

public List<Pair<byte[], byte[]>> readRecords() throws IOException {
final List<Pair<byte[], byte[]>> commands = new ArrayList<>();
for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) {
final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR_STR));
final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR_STR) + 1);

commands.add(new Pair<>(
commandId.getBytes(StandardCharsets.UTF_8),
command.getBytes(StandardCharsets.UTF_8)
));
}

return commands;
}

@Override
public void close() throws IOException {
writer.close();
Expand Down
Loading

0 comments on commit cfbf1f5

Please sign in to comment.