From e9d54734a6827bf92e54e82697463128dcd22d48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Fri, 24 Jul 2020 15:31:52 -0500 Subject: [PATCH] fix: create the metastore backups directory if it does not exist --- .../rest/server/CommandTopicBackupImpl.java | 57 +++++++++++++++++-- .../server/CommandTopicBackupImplTest.java | 55 ++++++++++++++++-- 2 files changed, 102 insertions(+), 10 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java index 2a1e1aff9332..ad9d2df1ab2b 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java @@ -17,18 +17,24 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableSet; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.server.computation.Command; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.Pair; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermissions; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,10 +72,7 @@ public CommandTopicBackupImpl( final Ticker ticker ) { final File dir = new File(Objects.requireNonNull(location, "location")); - if (!dir.exists() || !dir.isDirectory()) { - throw new KsqlException(String.format( - "Backup location '%s' does not exist or it is not a directory.", location)); - } + ensureDirectoryExists(dir); this.backupLocation = dir; this.topicName = Objects.requireNonNull(topicName, "topicName"); @@ -222,4 +225,50 @@ private Optional latestReplayFile() { ? Optional.of(new BackupReplayFile(latestBakFile)) : Optional.empty(); } + + private void ensureDirectoryExists(final File backupsDir) { + if (!backupsDir.exists()) { + if (!backupsDir.mkdirs()) { + throw new KsqlServerException("Couldn't create the backups directory: " + + backupsDir.getPath() + + "\n Make sure the directory exists and is readable/writable for KSQL server " + + "\n or its parent directory is readable/writable by KSQL server" + + "\n or change it to a readable/writable directory by setting '" + + KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION + + "' config in the properties file." + ); + } + + try { + Files.setPosixFilePermissions(backupsDir.toPath(), + PosixFilePermissions.fromString("rwx------")); + } catch (final IOException e) { + throw new KsqlServerException(String.format( + "Couldn't set POSIX permissions on the backups directory: %s. Error = %s", + backupsDir.getPath(), e.getMessage())); + } + } + + if (!backupsDir.isDirectory()) { + throw new KsqlServerException(backupsDir.getPath() + + " is not a directory." + + "\n Make sure the directory exists and is readable/writable for KSQL server " + + "\n or its parent directory is readable/writable by KSQL server" + + "\n or change it to a readable/writable directory by setting '" + + KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION + + "' config in the properties file." + ); + } + + if (!backupsDir.canWrite() || !backupsDir.canRead() || !backupsDir.canExecute()) { + throw new KsqlServerException("The backups directory is not readable/writable " + + "for KSQL server: " + + backupsDir.getPath() + + "\n Make sure the directory exists and is readable/writable for KSQL server " + + "\n or change it to a readable/writable directory by setting '" + + KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION + + "' config in the properties file." + ); + } + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java index c163c4a733e1..65d44d0885e2 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java @@ -19,6 +19,7 @@ import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.Pair; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Before; @@ -31,6 +32,10 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermissions; import java.util.List; import java.util.Optional; @@ -81,31 +86,69 @@ public void shouldThrowWhenBackupLocationIsNotDirectory() throws IOException { // When final Exception e = assertThrows( - KsqlException.class, + KsqlServerException.class, () -> new CommandTopicBackupImpl(file.getAbsolutePath(), COMMAND_TOPIC_NAME) ); // Then assertThat(e.getMessage(), containsString(String.format( - "Backup location '%s' does not exist or it is not a directory.", + "%s is not a directory.", file.getAbsolutePath() ))); } @Test - public void shouldThrowWhenBackupLocationDoesNotExist() { + public void shouldThrowWhenBackupLocationIsNotWritable() throws IOException { + // Given + final File file = backupLocation.newFolder(); + Files.setPosixFilePermissions(file.toPath(), PosixFilePermissions.fromString("r-x------")); + // When final Exception e = assertThrows( - KsqlException.class, - () -> new CommandTopicBackupImpl("/not-existing-directory", COMMAND_TOPIC_NAME) + KsqlServerException.class, + () -> new CommandTopicBackupImpl(file.getAbsolutePath(), COMMAND_TOPIC_NAME) ); // Then assertThat(e.getMessage(), containsString(String.format( - "Backup location '/not-existing-directory' does not exist or it is not a directory." + "The backups directory is not readable/writable for KSQL server: %s", + file.getAbsolutePath() ))); } + @Test + public void shouldThrowWhenBackupLocationIsNotReadable() throws IOException { + // Given + final File dir = backupLocation.newFolder(); + Files.setPosixFilePermissions(dir.toPath(), PosixFilePermissions.fromString("-wx------")); + + // When + final Exception e = assertThrows( + KsqlServerException.class, + () -> new CommandTopicBackupImpl(dir.getAbsolutePath(), COMMAND_TOPIC_NAME) + ); + + // Then + assertThat(e.getMessage(), containsString(String.format( + "The backups directory is not readable/writable for KSQL server: %s", + dir.getAbsolutePath() + ))); + } + + + @Test + public void shouldCreateBackupLocationWhenDoesNotExist() throws IOException { + // Given + final Path dir = Paths.get(backupLocation.newFolder().getAbsolutePath(), "ksql-backups"); + assertThat(Files.exists(dir), is(false)); + + // When + new CommandTopicBackupImpl(dir.toString(), COMMAND_TOPIC_NAME); + + // Then + assertThat(Files.exists(dir), is(true)); + } + @Test public void shouldWriteRecordsToReplayFile() throws IOException { // Given