Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CoolDown Period to S3 Repository #51074

Merged
merged 10 commits into from
Jan 20, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand All @@ -29,11 +32,23 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;

import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -126,6 +141,23 @@ class S3Repository extends BlobStoreRepository {

static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());

/**
* Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the
* backwards compatible snapshot format from before
* {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}).
* This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when
* doing repository operations in rapid succession on a repository in the old metadata format.
* This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository
* becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than
* {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new
* format and disable the cooldown period.
*/
static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting(
"cooldown_period",
new TimeValue(3, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic);

/**
* Specifies the path within bucket to repository data. Defaults to root directory.
*/
Expand All @@ -145,6 +177,12 @@ class S3Repository extends BlobStoreRepository {

private final String cannedACL;

/**
* Time period to delay repository operations by after finalizing or deleting a snapshot.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private final TimeValue coolDown;

/**
* Constructs an s3 backed repository
*/
Expand Down Expand Up @@ -176,6 +214,8 @@ class S3Repository extends BlobStoreRepository {
this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings());

coolDown = COOLDOWN_PERIOD.get(metadata.settings());

logger.debug(
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]",
bucket,
Expand All @@ -186,6 +226,70 @@ class S3Repository extends BlobStoreRepository {
storageClass);
}

/**
* Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be
* closed concurrently.
*/
private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();

@Override
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData clusterMetaData, Map<String, Object> userMetadata, boolean writeShardGens,
ActionListener<SnapshotInfo> listener) {
if (writeShardGens == false) {
listener = delayedListener(listener);
}
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
if (writeShardGens == false) {
listener = delayedListener(listener);
}
super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
}

/**
* Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
});
return new ActionListener<>() {
@Override
public void onResponse(T response) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch I went with doing it this way instead of just keeping track of a timestamp and then failing a new snapshot if it's started to close to the last timestamp. I'm afraid having random failures from concurrent snapshot exceptions when no running snapshot is visible to APIs could mess with Cloud orchestration (not necessarily breaking it but causing an unreasonable amount of _status requests).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to think a bit about this, and consulted @DaveCTurner as well. We both agree that this is the right path forward (simpler to explain to users and simpler for existing orchestration tools).
In short, this artificially extends the duration of the snapshot, i.e., taking or deleting a snapshot takes 3 minutes longer. Can we add a log message that details why we are doing this (and that we are in a repo with legacy snapshots)? Let's also document this somewhere (with the setting). This gives users the choice e.g. to move to a different repo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also document this somewhere (with the setting)

Should we really document this? It seems to me that if you're on AWS S3 not having the cool down is a risk in 100% of cases. If we document it, those that this functionality is intended to protect might opt to turn it off to "speed things up"?
Maybe just document the waiting but not the setting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we must document how users can safely speed it up (i.e. by moving to a new repo or deleting all their legacy snapshots). I'm ok with not documenting the setting itself - we already have form for leaving dangerous settings undocumented (see MergePolicyConfig for instance). Let's add this reasoning to its Javadoc along with explicit instructions not to adjust it and instead to move to a new repo or delete all the legacy snapshots, to deal with the inevitable user who comes across it in the source code.

Can we also mention {@link Version#V_7_6_0} in the Javadoc so we get a reminder to remove this in v9?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, added docs to the setting, a link to 7.6, an explanatory log message and a test in f9047d7 :)

threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)),
coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}

@Override
public void onFailure(Exception e) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
};
}

private void logCooldownInfo() {
logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" +
" and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " +
"repository corruption. To get rid of this message and move to the new repository metadata format, either remove " +
"all snapshots older than version [{}] from the repository or create a new repository at an empty location.",
coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
}

private static BlobPath buildBasePath(RepositoryMetaData metadata) {
final String basePath = BASE_PATH_SETTING.get(metadata.settings());
if (Strings.hasLength(basePath)) {
Expand All @@ -210,4 +314,14 @@ protected BlobStore getBlobStore() {
protected ByteSizeValue chunkSize() {
return chunkSize;
}

@Override
protected void doClose() {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
if (cancellable != null) {
logger.debug("Repository [{}] closed during cool-down period", metadata.name());
cancellable.cancel();
}
super.doClose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,48 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.s3.S3HttpHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {

private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(5L);

@Override
protected String repositoryType() {
return S3Repository.TYPE;
Expand Down Expand Up @@ -82,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret");

return Settings.builder()
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl())
// Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
.put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true)
Expand All @@ -92,6 +111,41 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

public void testEnforcedCooldownPeriod() throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is admittedly quite the hacky test and it takes 2x 5s of hard waits to verify behaviour.

We could create a cleaner test by adding some BwC test infrastructure to the S3 plugin tests but I'm not sure it's worth the complexity. Also, running a real rest test to verify the timing here makes the test even more prone to run into random CI slowness and fail in the last step that verifies no waiting is happening when moving to a repo without any old version snapshot => this seemed like the least bad option to me.
We are using a O(5s) hard timeout in some other repo IO-timeout tests and so far that hasn't failed us due to CI running into a longer pause so I'm hopeful this will be stable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a SnapshotResiliencyTests with a mock repo that is eventually consistent on actions for X seconds, and then becomes consistent, and then use that one to verify all is going well? Would be a stronger test than this, which just verifies that some sleep is somewhere in place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not trivial but doable => On it :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh never mind ... then we'd have to move the cool down logic to BlobStoreRepository. We can't use the mock repository together with the S3 plugin. We also don't really have any mock infrastructure left for S3 so either we test this on the BlobStoreRepository or this requires some infrastructure that combines the mock S3 REST api infra + the snapshot resiliency test infrastructure.

Think this is worth it, given that this is a stop-gap solution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bummer :/

final String repoName = createRepository(randomName(), Settings.builder().put(repositorySettings())
.put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build());

final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old")
.setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId();
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final RepositoryData repositoryData =
PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f)));
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
final BytesReference serialized =
BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false));
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
try (InputStream stream = serialized.streamInput()) {
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true);
}
})));

final String newSnapshotName = "snapshot-new";
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeFastDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there are situations where ThreadPool.schedule will return before the time value specified. This would then fail here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there are situations where ThreadPool.schedule will return before the time value specified.

I figured that's impossible since I turned off the timestamp cache? I think otherwise the underlying primitives in ThreadPoolExecutor are accurate (at least on Linux).

This would then fail here.

And rightfully so?

=> that said :) ... let me see about the suggested test via the resiliency tests

}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down