Skip to content

Commit

Permalink
Perform S3 directory deletion with batch requests
Browse files Browse the repository at this point in the history
Delete all the objects which are found under the specified path
prefix in batches in order to improve the performance of
the operation.

This operation is customised for the needs of Trino and does
intentionally not intend to add unnecessary complexity
for dealing with the various quirks of s3 compatible object
storage systems that are not relevant for the use cases of Trino.
  • Loading branch information
findinpath committed Dec 1, 2022
1 parent 3a9708c commit 9cabfc0
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
Expand All @@ -142,6 +143,7 @@
import static com.amazonaws.services.s3.model.StorageClass.Glacier;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.base.Throwables.throwIfInstanceOf;
Expand Down Expand Up @@ -606,25 +608,81 @@ public boolean rename(Path src, Path dst)
public boolean delete(Path path, boolean recursive)
throws IOException
{
try {
if (!directory(path)) {
return deleteObject(keyFromPath(path));
String key = keyFromPath(path);
if (recursive) {
DeletePrefixResult deletePrefixResult;
try {
deletePrefixResult = deletePrefix(path);
}
catch (AmazonClientException e) {
throw new IOException("Failed to delete paths with the prefix path " + path, e);
}
if (deletePrefixResult == DeletePrefixResult.NO_KEYS_FOUND) {
// If the provided key is not a "directory" prefix, attempt to delete the object with the specified key
deleteObject(key);
}
else if (deletePrefixResult == DeletePrefixResult.DELETE_KEYS_FAILURE) {
return false;
}
deleteObject(key + DIRECTORY_SUFFIX);
}
catch (FileNotFoundException e) {
return false;
else {
Iterator<ListObjectsV2Result> listingsIterator = listObjects(path, OptionalInt.of(2), true);
Iterator<String> objectKeysIterator = Iterators.concat(Iterators.transform(listingsIterator, TrinoS3FileSystem::keysFromRecursiveListing));
if (objectKeysIterator.hasNext()) {
String childKey = objectKeysIterator.next();
if (!Objects.equals(childKey, key + PATH_SEPARATOR) || objectKeysIterator.hasNext()) {
throw new IOException("Directory " + path + " is not empty");
}
deleteObject(childKey);
}
else {
// Avoid deleting the bucket in case that the provided path points to the bucket root
if (!key.isEmpty()) {
deleteObject(key);
}
}
deleteObject(key + DIRECTORY_SUFFIX);
}
return true;
}

if (!recursive) {
throw new IOException("Directory " + path + " is not empty");
private DeletePrefixResult deletePrefix(Path prefix)
{
String bucketName = getBucketName(uri);
Iterator<ListObjectsV2Result> listings = listObjects(prefix, OptionalInt.empty(), true);
Iterator<String> objectKeys = Iterators.concat(Iterators.transform(listings, TrinoS3FileSystem::keysFromRecursiveListing));
Iterator<List<String>> objectKeysBatches = Iterators.partition(objectKeys, DELETE_BATCH_SIZE);
if (!objectKeysBatches.hasNext()) {
return DeletePrefixResult.NO_KEYS_FOUND;
}

for (FileStatus file : listStatus(path)) {
delete(file.getPath(), true);
boolean allKeysDeleted = true;
while (objectKeysBatches.hasNext()) {
String[] objectKeysBatch = objectKeysBatches.next().toArray(String[]::new);
try {
s3.deleteObjects(new DeleteObjectsRequest(bucketName)
.withKeys(objectKeysBatch)
.withRequesterPays(requesterPaysEnabled)
.withQuiet(true));
}
catch (AmazonS3Exception e) {
log.debug(e, "Failed to delete objects from the bucket %s under the prefix '%s'", bucketName, prefix);
allKeysDeleted = false;
}
}
deleteObject(keyFromPath(path) + DIRECTORY_SUFFIX);

return true;
return allKeysDeleted ? DeletePrefixResult.ALL_KEYS_DELETED : DeletePrefixResult.DELETE_KEYS_FAILURE;
}

@VisibleForTesting
static Iterator<String> keysFromRecursiveListing(ListObjectsV2Result listing)
{
checkState(
listing.getCommonPrefixes() == null || listing.getCommonPrefixes().isEmpty(),
"No common prefixes should be present when listing without a path delimiter");

return Iterators.transform(listing.getObjectSummaries().iterator(), S3ObjectSummary::getKey);
}

private boolean directory(Path path)
Expand Down Expand Up @@ -1906,4 +1964,11 @@ private static String getMd5AsBase64(byte[] data, int offset, int length)
byte[] md5 = md5().hashBytes(data, offset, length).asBytes();
return Base64.getEncoder().encodeToString(md5);
}

private enum DeletePrefixResult
{
NO_KEYS_FOUND,
ALL_KEYS_DELETED,
DELETE_KEYS_FAILURE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import static io.trino.testing.TestingNames.randomNameSuffix;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public abstract class BaseTestTrinoS3FileSystemObjectStorage
Expand All @@ -55,7 +54,9 @@ public void testDeleteRecursivelyMissingObjectPath()
String prefix = "test-delete-recursively-missing-object-" + randomNameSuffix();

try (TrinoS3FileSystem fs = createFileSystem()) {
assertFalse(fs.delete(new Path("s3://%s/%s".formatted(getBucketName(), prefix)), true));
// Follow Amazon S3 behavior if attempting to delete an object that does not exist
// and return a success message
assertTrue(fs.delete(new Path("s3://%s/%s".formatted(getBucketName(), prefix)), true));
}
}

Expand All @@ -66,7 +67,9 @@ public void testDeleteNonRecursivelyMissingObjectPath()
String prefix = "test-delete-non-recursively-missing-object-" + randomNameSuffix();

try (TrinoS3FileSystem fs = createFileSystem()) {
assertFalse(fs.delete(new Path("s3://%s/%s".formatted(getBucketName(), prefix)), false));
// Follow Amazon S3 behavior if attempting to delete an object that does not exist
// and return a success message
assertTrue(fs.delete(new Path("s3://%s/%s".formatted(getBucketName(), prefix)), false));
}
}

Expand Down Expand Up @@ -169,9 +172,7 @@ public void testDeleteNonRecursivelyDirectoryNamePrefixingAnotherDirectoryName()
assertTrue(fs.delete(new Path("s3://%s/%s/foo".formatted(getBucketName(), prefix)), true));

paths = listPaths(fs.getS3Client(), getBucketName(), prefix, true);
// TODO The directory `prefix + "/foo/"` should have been deleted
assertThat(paths).containsOnly(
"%s/foo/".formatted(prefix),
"%s/foobar/".formatted(prefix));
}
finally {
Expand All @@ -193,9 +194,9 @@ public void testDeleteNonRecursivelyEmptyDirectory()
List<String> paths = listPaths(fs.getS3Client(), getBucketName(), prefix, false);
assertThat(paths).containsOnly(prefix + PATH_SEPARATOR);

// TODO The directory should have been deleted because it is empty
assertThatThrownBy(() -> fs.delete(new Path(prefixPath), false))
.hasMessage("Directory %s is not empty".formatted(prefixPath));
assertTrue(fs.delete(new Path(prefixPath), false));

assertThat(listPaths(fs.getS3Client(), getBucketName(), prefix, true)).isEmpty();
}
finally {
fs.delete(new Path(prefixPath), true);
Expand All @@ -219,9 +220,9 @@ public void testDeleteNonRecursivelyEmptyDirectoryWithAdditionalDirectorySuffixP
directoryName + PATH_SEPARATOR,
directoryName + DIRECTORY_SUFFIX);

// TODO The directory should have been deleted because it is empty
assertThatThrownBy(() -> fs.delete(new Path(directoryPath), false))
.hasMessage("Directory %s is not empty".formatted(directoryPath));
assertTrue(fs.delete(new Path(directoryPath), false));

assertThat(listPaths(fs.getS3Client(), getBucketName(), directoryName, true)).isEmpty();
}
finally {
fs.delete(new Path(directoryPath), true);
Expand Down Expand Up @@ -276,10 +277,7 @@ public void testDeleteRecursivelyDirectoryNamePrefixingAnotherDirectoryName()
assertTrue(fs.delete(new Path("s3://%s/%s/foo".formatted(getBucketName(), prefix)), true));

paths = listPaths(fs.getS3Client(), getBucketName(), prefix, true);
// TODO The directory with the key `prefix + "/foo/"` should have been deleted
assertThat(paths).containsOnly(
"%s/foo/".formatted(prefix),
"%s/foobar/".formatted(prefix));
assertThat(paths).containsOnly("%s/foobar/".formatted(prefix));
}
finally {
fs.delete(new Path(prefixPath), true);
Expand Down Expand Up @@ -338,10 +336,7 @@ public void testDeleteRecursivelyDirectoryWithDeepHierarchy()

assertTrue(fs.delete(new Path(directoryPath), true));

// TODO All the content under `directoryPath` should have been deleted
assertThat(listPaths(fs.getS3Client(), getBucketName(), prefix, true)).containsOnly(
directoryKey + "/",
directoryKey + "/dir4/");
assertThat(listPaths(fs.getS3Client(), getBucketName(), prefix, true)).isEmpty();
}
finally {
fs.delete(new Path(prefixPath), true);
Expand All @@ -368,8 +363,7 @@ public void testDeleteRecursivelyEmptyDirectory()

assertTrue(fs.delete(new Path(prefixPath + "/directory"), true));

// TODO The directory with the key `directoryKey` should have been deleted
assertThat(listPaths(fs.getS3Client(), getBucketName(), prefix, true)).containsOnly(directoryKey + PATH_SEPARATOR);
assertThat(listPaths(fs.getS3Client(), getBucketName(), prefix, true)).isEmpty();
}
finally {
fs.delete(new Path(prefixPath), true);
Expand Down Expand Up @@ -403,11 +397,7 @@ public void testDeleteRecursivelyDirectoryWithObjectsAndDirectorySuffixPlacehold

assertTrue(fs.delete(new Path(directoryPath), true));

// TODO (https://github.com/trinodb/trino/issues/13017) The directory with the key `directoryKey + "/dir4"` should have been deleted
assertThat(listPaths(fs.getS3Client(), getBucketName(), prefix, true))
.containsOnly(
directoryKey + "/",
directoryKey + "/dir4/");
assertThat(listPaths(fs.getS3Client(), getBucketName(), prefix, true)).isEmpty();
}
finally {
fs.delete(new Path(prefixPath), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public void testDeleteNonRecursivelyEmptyBucketRoot()

assertThat(listPaths(s3, testBucketName, "", true)).isEmpty();

assertThatThrownBy(() -> fs.delete(new Path(testBucketPath), false))
.hasMessage("Directory %s is not empty".formatted(testBucketPath));
fs.delete(new Path(testBucketPath), false);

assertThat(listPaths(s3, testBucketName, "", true)).isEmpty();
}
Expand Down Expand Up @@ -160,8 +159,7 @@ public void testDeleteRecursivelyBucketRoot()

assertTrue(fs.delete(new Path(testBucketPath + Path.SEPARATOR), true));

// TODO the entire bucket content should have been deleted
assertThat(listPaths(s3, testBucketName, "", true)).containsOnly("directory2/");
assertThat(listPaths(s3, testBucketName, "", true)).isEmpty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ protected String[] getExcludedTests()
// AWS Glue does not support table renames
"io.trino.tests.product.deltalake.TestHiveAndDeltaLakeRedirect.testDeltaToHiveAlterTable",
"io.trino.tests.product.deltalake.TestHiveAndDeltaLakeRedirect.testHiveToDeltaAlterTable",
// TODO https://github.com/trinodb/trino/issues/13017
"io.trino.tests.product.deltalake.TestDeltaLakeDropTableCompatibility.testCreateManagedTableInDeltaDropTableInTrino"
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static Object[][] engineConfigurations()
{TRINO, DELTA, true},
{TRINO, DELTA, false},
{DELTA, TRINO, true},
{DELTA, TRINO, false},
{DELTA, DELTA, true},
{DELTA, DELTA, false},
};
Expand All @@ -75,14 +76,6 @@ public void testDropTable(Engine creator, Engine dropper, boolean explicitLocati
testDropTableAccuracy(creator, dropper, explicitLocation);
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testCreateManagedTableInDeltaDropTableInTrino()
{
//TODO Integrate this method into `engineConfigurations()` data provider method after dealing with https://github.com/trinodb/trino/issues/13017
testDropTableAccuracy(DELTA, TRINO, false);
}

private void testDropTableAccuracy(Engine creator, Engine dropper, boolean explicitLocation)
{
String schemaName = "test_schema_with_location_" + randomNameSuffix();
Expand Down

0 comments on commit 9cabfc0

Please sign in to comment.