Skip to content

Commit

Permalink
Add CoolDown Period to S3 Repository (elastic#51074)
Browse files Browse the repository at this point in the history
Add cool down period after snapshot finalization and delete to prevent eventually consistent AWS S3 from corrupting shard level metadata as long as the repository is using the old format metadata on the shard level.
  • Loading branch information
original-brownbear authored and SivagurunathanV committed Jan 21, 2020
1 parent 875daa7 commit 559617b
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand All @@ -29,11 +32,23 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;

import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -126,6 +141,23 @@ class S3Repository extends BlobStoreRepository {

static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());

/**
* Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the
* backwards compatible snapshot format from before
* {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}).
* This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when
* doing repository operations in rapid succession on a repository in the old metadata format.
* This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository
* becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than
* {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new
* format and disable the cooldown period.
*/
static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting(
"cooldown_period",
new TimeValue(3, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic);

/**
* Specifies the path within bucket to repository data. Defaults to root directory.
*/
Expand All @@ -145,6 +177,12 @@ class S3Repository extends BlobStoreRepository {

private final String cannedACL;

/**
* Time period to delay repository operations by after finalizing or deleting a snapshot.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private final TimeValue coolDown;

/**
* Constructs an s3 backed repository
*/
Expand Down Expand Up @@ -176,6 +214,8 @@ class S3Repository extends BlobStoreRepository {
this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings());

coolDown = COOLDOWN_PERIOD.get(metadata.settings());

logger.debug(
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]",
bucket,
Expand All @@ -186,6 +226,70 @@ class S3Repository extends BlobStoreRepository {
storageClass);
}

/**
* Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be
* closed concurrently.
*/
private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();

@Override
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData clusterMetaData, Map<String, Object> userMetadata, boolean writeShardGens,
ActionListener<SnapshotInfo> listener) {
if (writeShardGens == false) {
listener = delayedListener(listener);
}
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
if (writeShardGens == false) {
listener = delayedListener(listener);
}
super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
}

/**
* Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
});
return new ActionListener<>() {
@Override
public void onResponse(T response) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)),
coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}

@Override
public void onFailure(Exception e) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
};
}

private void logCooldownInfo() {
logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" +
" and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " +
"repository corruption. To get rid of this message and move to the new repository metadata format, either remove " +
"all snapshots older than version [{}] from the repository or create a new repository at an empty location.",
coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
}

private static BlobPath buildBasePath(RepositoryMetaData metadata) {
final String basePath = BASE_PATH_SETTING.get(metadata.settings());
if (Strings.hasLength(basePath)) {
Expand All @@ -210,4 +314,14 @@ protected BlobStore getBlobStore() {
protected ByteSizeValue chunkSize() {
return chunkSize;
}

@Override
protected void doClose() {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
if (cancellable != null) {
logger.debug("Repository [{}] closed during cool-down period", metadata.name());
cancellable.cancel();
}
super.doClose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,48 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.s3.S3HttpHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {

private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(5L);

@Override
protected String repositoryType() {
return S3Repository.TYPE;
Expand Down Expand Up @@ -82,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret");

return Settings.builder()
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl())
// Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
.put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true)
Expand All @@ -92,6 +111,41 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

public void testEnforcedCooldownPeriod() throws IOException {
final String repoName = createRepository(randomName(), Settings.builder().put(repositorySettings())
.put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build());

final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old")
.setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId();
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final RepositoryData repositoryData =
PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f)));
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
final BytesReference serialized =
BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false));
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
try (InputStream stream = serialized.streamInput()) {
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true);
}
})));

final String newSnapshotName = "snapshot-new";
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeFastDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down

0 comments on commit 559617b

Please sign in to comment.