Skip to content

Commit

Permalink
refactor: revert BackupInputFile split
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Oct 20, 2020
1 parent e7cb9ef commit d443f43
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 184 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,50 @@
package io.confluent.ksql.rest.server;

import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* A file that is used by the backup service to replay command_topic commands.
*/
public final class BackupReplayFile extends BackupInputFile implements Closeable {
public final class BackupReplayFile implements Closeable {
private static final String KEY_VALUE_SEPARATOR_STR = ":";
private static final String NEW_LINE_SEPARATOR_STR = "\n";

private static final byte[] KEY_VALUE_SEPARATOR_BYTES =
KEY_VALUE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8);
private static final byte[] NEW_LINE_SEPARATOR_BYTES =
NEW_LINE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8);

private final File file;
private final FileOutputStream writer;

public BackupReplayFile(final File file) {
super(file);
this.writer = createWriter(file);
public static BackupReplayFile readOnly(final File file) {
return new BackupReplayFile(file, false);
}

public static BackupReplayFile writable(final File file) {
return new BackupReplayFile(file, true);
}

private BackupReplayFile(final File file, final boolean write) {
this.file = Objects.requireNonNull(file, "file");

if (write) {
this.writer = createWriter(file);
} else {
this.writer = null;
}
}

private static FileOutputStream createWriter(final File file) {
Expand All @@ -43,20 +71,45 @@ private static FileOutputStream createWriter(final File file) {
}
}

public File getFile() {
return file;
}

public String getPath() {
return getFile().getAbsolutePath();
return file.getAbsolutePath();
}

public void write(final ConsumerRecord<byte[], byte[]> record) throws IOException {
if (writer == null) {
throw new IOException("Write permission denied.");
}

writer.write(record.key());
writer.write(KEY_VALUE_SEPARATOR_BYTES);
writer.write(record.value());
writer.write(NEW_LINE_SEPARATOR_BYTES);
writer.flush();
}

public List<Pair<byte[], byte[]>> readRecords() throws IOException {
final List<Pair<byte[], byte[]>> commands = new ArrayList<>();
for (final String line : Files.readAllLines(getFile().toPath(), StandardCharsets.UTF_8)) {
final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR_STR));
final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR_STR) + 1);

commands.add(new Pair<>(
commandId.getBytes(StandardCharsets.UTF_8),
command.getBytes(StandardCharsets.UTF_8)
));
}

return commands;
}

@Override
public void close() throws IOException {
writer.close();
if (writer != null) {
writer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ BackupReplayFile openOrCreateReplayFile() {
}

private BackupReplayFile newReplayFile() {
return new BackupReplayFile(Paths.get(
return BackupReplayFile.writable(Paths.get(
backupLocation.getAbsolutePath(),
String.format("%s%s_%s", PREFIX, topicName, ticker.read())
).toFile());
Expand Down Expand Up @@ -236,7 +236,7 @@ private Optional<BackupReplayFile> latestReplayFile() {
}

return (latestBakFile != null)
? Optional.of(new BackupReplayFile(latestBakFile))
? Optional.of(BackupReplayFile.writable(latestBakFile))
: Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.BackupInputFile;
import io.confluent.ksql.rest.server.BackupReplayFile;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
Expand Down Expand Up @@ -68,7 +68,7 @@ private static KsqlConfig loadServerConfig(final File configFile) {
}

public static List<Pair<byte[], byte[]>> loadBackup(final File file) throws IOException {
final BackupInputFile commandTopicBackupFile = new BackupInputFile(file);
final BackupReplayFile commandTopicBackupFile = BackupReplayFile.readOnly(file);

final List<Pair<byte[], byte[]>> records = commandTopicBackupFile.readRecords();
throwOnInvalidRecords(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import io.confluent.ksql.rest.entity.SourceInfo;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.BackupInputFile;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.ReservedInternalTopics;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.streams.StreamsConfig;
Expand All @@ -39,16 +37,12 @@
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -58,7 +52,6 @@
import static io.confluent.ksql.util.KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.is;


Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class BackupReplayFileTest {
@Before
public void setup() throws IOException {
internalReplayFile = backupLocation.newFile(REPLAY_FILE_NAME);
replayFile = new BackupReplayFile(internalReplayFile);
replayFile = BackupReplayFile.writable(internalReplayFile);
}

@Test
Expand Down Expand Up @@ -123,7 +123,7 @@ public void shouldReadCommands() throws IOException {
KEY_VALUE_SEPARATOR,
"{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\","
+ "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}"
).getBytes(StandardCharsets.UTF_8));
).getBytes(StandardCharsets.UTF_8));

// When
final List<Pair<byte[], byte[]>> commands = replayFile.readRecords();
Expand Down Expand Up @@ -159,4 +159,4 @@ private String buildValue(final String streamName) {
+ "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}",
streamName, streamName);
}
}
}

0 comments on commit d443f43

Please sign in to comment.