Skip to content

Commit

Permalink
fix: create the metastore backups directory if it does not exist
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Jul 24, 2020
1 parent 8892190 commit e9d5473
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.Pair;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,10 +72,7 @@ public CommandTopicBackupImpl(
final Ticker ticker
) {
final File dir = new File(Objects.requireNonNull(location, "location"));
if (!dir.exists() || !dir.isDirectory()) {
throw new KsqlException(String.format(
"Backup location '%s' does not exist or it is not a directory.", location));
}
ensureDirectoryExists(dir);

this.backupLocation = dir;
this.topicName = Objects.requireNonNull(topicName, "topicName");
Expand Down Expand Up @@ -222,4 +225,50 @@ private Optional<BackupReplayFile> latestReplayFile() {
? Optional.of(new BackupReplayFile(latestBakFile))
: Optional.empty();
}

private void ensureDirectoryExists(final File backupsDir) {
if (!backupsDir.exists()) {
if (!backupsDir.mkdirs()) {
throw new KsqlServerException("Couldn't create the backups directory: "
+ backupsDir.getPath()
+ "\n Make sure the directory exists and is readable/writable for KSQL server "
+ "\n or its parent directory is readable/writable by KSQL server"
+ "\n or change it to a readable/writable directory by setting '"
+ KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION
+ "' config in the properties file."
);
}

try {
Files.setPosixFilePermissions(backupsDir.toPath(),
PosixFilePermissions.fromString("rwx------"));
} catch (final IOException e) {
throw new KsqlServerException(String.format(
"Couldn't set POSIX permissions on the backups directory: %s. Error = %s",
backupsDir.getPath(), e.getMessage()));
}
}

if (!backupsDir.isDirectory()) {
throw new KsqlServerException(backupsDir.getPath()
+ " is not a directory."
+ "\n Make sure the directory exists and is readable/writable for KSQL server "
+ "\n or its parent directory is readable/writable by KSQL server"
+ "\n or change it to a readable/writable directory by setting '"
+ KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION
+ "' config in the properties file."
);
}

if (!backupsDir.canWrite() || !backupsDir.canRead() || !backupsDir.canExecute()) {
throw new KsqlServerException("The backups directory is not readable/writable "
+ "for KSQL server: "
+ backupsDir.getPath()
+ "\n Make sure the directory exists and is readable/writable for KSQL server "
+ "\n or change it to a readable/writable directory by setting '"
+ KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION
+ "' config in the properties file."
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.Pair;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Before;
Expand All @@ -31,6 +32,10 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -81,31 +86,69 @@ public void shouldThrowWhenBackupLocationIsNotDirectory() throws IOException {

// When
final Exception e = assertThrows(
KsqlException.class,
KsqlServerException.class,
() -> new CommandTopicBackupImpl(file.getAbsolutePath(), COMMAND_TOPIC_NAME)
);

// Then
assertThat(e.getMessage(), containsString(String.format(
"Backup location '%s' does not exist or it is not a directory.",
"%s is not a directory.",
file.getAbsolutePath()
)));
}

@Test
public void shouldThrowWhenBackupLocationDoesNotExist() {
public void shouldThrowWhenBackupLocationIsNotWritable() throws IOException {
// Given
final File file = backupLocation.newFolder();
Files.setPosixFilePermissions(file.toPath(), PosixFilePermissions.fromString("r-x------"));

// When
final Exception e = assertThrows(
KsqlException.class,
() -> new CommandTopicBackupImpl("/not-existing-directory", COMMAND_TOPIC_NAME)
KsqlServerException.class,
() -> new CommandTopicBackupImpl(file.getAbsolutePath(), COMMAND_TOPIC_NAME)
);

// Then
assertThat(e.getMessage(), containsString(String.format(
"Backup location '/not-existing-directory' does not exist or it is not a directory."
"The backups directory is not readable/writable for KSQL server: %s",
file.getAbsolutePath()
)));
}

@Test
public void shouldThrowWhenBackupLocationIsNotReadable() throws IOException {
// Given
final File dir = backupLocation.newFolder();
Files.setPosixFilePermissions(dir.toPath(), PosixFilePermissions.fromString("-wx------"));

// When
final Exception e = assertThrows(
KsqlServerException.class,
() -> new CommandTopicBackupImpl(dir.getAbsolutePath(), COMMAND_TOPIC_NAME)
);

// Then
assertThat(e.getMessage(), containsString(String.format(
"The backups directory is not readable/writable for KSQL server: %s",
dir.getAbsolutePath()
)));
}


@Test
public void shouldCreateBackupLocationWhenDoesNotExist() throws IOException {
// Given
final Path dir = Paths.get(backupLocation.newFolder().getAbsolutePath(), "ksql-backups");
assertThat(Files.exists(dir), is(false));

// When
new CommandTopicBackupImpl(dir.toString(), COMMAND_TOPIC_NAME);

// Then
assertThat(Files.exists(dir), is(true));
}

@Test
public void shouldWriteRecordsToReplayFile() throws IOException {
// Given
Expand Down

0 comments on commit e9d5473

Please sign in to comment.