Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add support for async deletion in S3BlobContainer (#15621) #16013

Merged
merged 1 commit into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- [Offline Nodes] Adds offline-tasks library containing various interfaces to be used for Offline Background Tasks. ([#13574](https://github.com/opensearch-project/OpenSearch/pull/13574))
- Add path prefix support to hashed prefix snapshots ([#15664](https://github.com/opensearch-project/OpenSearch/pull/15664))
- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
// Disable request throttling because some random values in tests might generate too many failures for the S3 client
.put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false)
.put(S3ClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), ProxySettings.ProxyType.DIRECT)
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
.put(super.nodeSettings(nodeOrdinal))
.setSecureSettings(secureSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@

public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {

@Override
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
.build();
}

@Override
@Before
@SuppressForbidden(reason = "Need to set system property here for AWS SDK v2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.utils.CollectionUtils;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.S3AsyncDeleteHelper;
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
Expand All @@ -109,6 +111,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
Expand Down Expand Up @@ -875,4 +880,123 @@

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build();
ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest);

AtomicLong deletedBlobs = new AtomicLong();
AtomicLong deletedBytes = new AtomicLong();

CompletableFuture<Void> listingFuture = new CompletableFuture<>();

listPublisher.subscribe(new Subscriber<>() {
private Subscription subscription;
private final List<String> objectsToDelete = new ArrayList<>();
private CompletableFuture<Void> deletionChain = CompletableFuture.completedFuture(null);

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}

@Override
public void onNext(ListObjectsV2Response response) {
response.contents().forEach(s3Object -> {
deletedBlobs.incrementAndGet();
deletedBytes.addAndGet(s3Object.size());
objectsToDelete.add(s3Object.key());
});

int bulkDeleteSize = blobStore.getBulkDeletesSize();
if (objectsToDelete.size() >= bulkDeleteSize) {
int fullBatchesCount = objectsToDelete.size() / bulkDeleteSize;
int itemsToDelete = fullBatchesCount * bulkDeleteSize;

List<String> batchToDelete = new ArrayList<>(objectsToDelete.subList(0, itemsToDelete));
objectsToDelete.subList(0, itemsToDelete).clear();

deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
batchToDelete,
deletionChain,
() -> subscription.request(1)
);
} else {
subscription.request(1);
}
}

@Override
public void onError(Throwable t) {
listingFuture.completeExceptionally(new IOException("Failed to list objects for deletion", t));
}

@Override
public void onComplete() {
if (!objectsToDelete.isEmpty()) {
deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
objectsToDelete,
deletionChain,
null
);
}
deletionChain.whenComplete((v, throwable) -> {
if (throwable != null) {
listingFuture.completeExceptionally(throwable);
} else {
listingFuture.complete(null);
}
});
}
});

listingFuture.whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(
throwable instanceof Exception
? (Exception) throwable
: new IOException("Unexpected error during async deletion", throwable)

Check warning on line 967 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L967

Added line #L967 was not covered by tests
);
} else {
completionListener.onResponse(new DeleteResult(deletedBlobs.get(), deletedBytes.get()));
}
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async deletion", e));
}
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
if (blobNames.isEmpty()) {
completionListener.onResponse(null);
return;
}

try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

List<String> keysToDelete = blobNames.stream().map(this::buildKey).collect(Collectors.toList());

S3AsyncDeleteHelper.executeDeleteChain(s3AsyncClient, blobStore, keysToDelete, CompletableFuture.completedFuture(null), null)
.whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(new IOException("Failed to delete blobs " + blobNames, throwable));
} else {
completionListener.onResponse(null);
}
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED;

class S3BlobStore implements BlobStore {
public class S3BlobStore implements BlobStore {

private static final Logger logger = LogManager.getLogger(S3BlobStore.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public void publish(MetricCollection metricCollection) {
public void close() {}
};

public MetricPublisher getDeleteObjectsMetricPublisher() {
return deleteObjectsMetricPublisher;
}

public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3.async;

import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.repositories.s3.S3BlobStore;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class S3AsyncDeleteHelper {

Check warning on line 27 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java#L27

Added line #L27 was not covered by tests
private static final Logger logger = LogManager.getLogger(S3AsyncDeleteHelper.class);

public static CompletableFuture<Void> executeDeleteChain(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<String> objectsToDelete,
CompletableFuture<Void> currentChain,
Runnable afterDeleteAction
) {
List<List<String>> batches = createDeleteBatches(objectsToDelete, blobStore.getBulkDeletesSize());
CompletableFuture<Void> newChain = currentChain.thenCompose(v -> executeDeleteBatches(s3AsyncClient, blobStore, batches));
if (afterDeleteAction != null) {
newChain = newChain.thenRun(afterDeleteAction);
}
return newChain;
}

static List<List<String>> createDeleteBatches(List<String> keys, int bulkDeleteSize) {
List<List<String>> batches = new ArrayList<>();
for (int i = 0; i < keys.size(); i += bulkDeleteSize) {
batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize)));
}
return batches;
}

static CompletableFuture<Void> executeDeleteBatches(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<List<String>> batches) {
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);

for (List<String> batch : batches) {
allDeletesFuture = allDeletesFuture.thenCompose(v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch));
}

return allDeletesFuture;
}

static CompletableFuture<Void> executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<String> batch) {
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore);
return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse);
}

static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
if (!deleteObjectsResponse.errors().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
}
return null;
}

static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Delete.builder()
.objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList()))
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().getDeleteObjectsMetricPublisher()))
.build();
}
}
Loading
Loading