Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CoolDown Period to S3 Repository #51074

Merged
merged 10 commits into from
Jan 20, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand All @@ -29,11 +31,22 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;

import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -126,6 +139,19 @@ class S3Repository extends BlobStoreRepository {

static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());

/**
* Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the
* backwards compatible snapshot format from before
* {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
* This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when
* doing repository operations in rapid succession.
*/
static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting(
"cooldown_period",
new TimeValue(3, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic);

/**
* Specifies the path within bucket to repository data. Defaults to root directory.
*/
Expand All @@ -145,6 +171,12 @@ class S3Repository extends BlobStoreRepository {

private final String cannedACL;

/**
* Time period to delay repository operations by after finalizing or deleting a snapshot.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private final TimeValue coolDown;

/**
* Constructs an s3 backed repository
*/
Expand Down Expand Up @@ -176,6 +208,8 @@ class S3Repository extends BlobStoreRepository {
this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings());

coolDown = COOLDOWN_PERIOD.get(metadata.settings());

logger.debug(
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]",
bucket,
Expand All @@ -186,6 +220,58 @@ class S3Repository extends BlobStoreRepository {
storageClass);
}

/**
* Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be
* closed concurrently.
*/
private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();

@Override
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData clusterMetaData, Map<String, Object> userMetadata, boolean writeShardGens,
ActionListener<SnapshotInfo> listener) {
if (writeShardGens == false) {
listener = delayedListener(listener);
}
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
if (writeShardGens == false) {
listener = delayedListener(listener);
}
super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
}

/**
* Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
});
return new ActionListener<>() {
@Override
public void onResponse(T response) {
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch I went with doing it this way instead of just keeping track of a timestamp and then failing a new snapshot if it's started to close to the last timestamp. I'm afraid having random failures from concurrent snapshot exceptions when no running snapshot is visible to APIs could mess with Cloud orchestration (not necessarily breaking it but causing an unreasonable amount of _status requests).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to think a bit about this, and consulted @DaveCTurner as well. We both agree that this is the right path forward (simpler to explain to users and simpler for existing orchestration tools).
In short, this artificially extends the duration of the snapshot, i.e., taking or deleting a snapshot takes 3 minutes longer. Can we add a log message that details why we are doing this (and that we are in a repo with legacy snapshots)? Let's also document this somewhere (with the setting). This gives users the choice e.g. to move to a different repo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also document this somewhere (with the setting)

Should we really document this? It seems to me that if you're on AWS S3 not having the cool down is a risk in 100% of cases. If we document it, those that this functionality is intended to protect might opt to turn it off to "speed things up"?
Maybe just document the waiting but not the setting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we must document how users can safely speed it up (i.e. by moving to a new repo or deleting all their legacy snapshots). I'm ok with not documenting the setting itself - we already have form for leaving dangerous settings undocumented (see MergePolicyConfig for instance). Let's add this reasoning to its Javadoc along with explicit instructions not to adjust it and instead to move to a new repo or delete all the legacy snapshots, to deal with the inevitable user who comes across it in the source code.

Can we also mention {@link Version#V_7_6_0} in the Javadoc so we get a reminder to remove this in v9?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, added docs to the setting, a link to 7.6, an explanatory log message and a test in f9047d7 :)

threadPool.schedule(() -> wrappedListener.onResponse(response), coolDown, ThreadPool.Names.SNAPSHOT));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we are rejected from the snapshot threadpool? Let's force it onto the threadpool (use AbstractRunnable), and notify listener as welll on AbstractRunnable.onFaillure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we are rejected from the snapshot threadpool?

I think that's impossible unless the snapshot pool is shutting down (in which case it's kinda irrelevant what we do anyway I guess). No other action will go onto the snapshot pool until this listener is resolved (because the snapshot or delete in progress in the CS will prevent anything else from running + we specifically made it so that no steps in the snapshot operations runs on the SNAPSHOT pool before we checked the CS to avoid any deadlocks here).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't bet my life on that (think e.g. about master failover). Let's make this safe.

assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}

@Override
public void onFailure(Exception e) {
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(() -> wrappedListener.onFailure(e), coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
};
}

private static BlobPath buildBasePath(RepositoryMetaData metadata) {
final String basePath = BASE_PATH_SETTING.get(metadata.settings());
if (Strings.hasLength(basePath)) {
Expand All @@ -210,4 +296,14 @@ protected BlobStore getBlobStore() {
protected ByteSizeValue chunkSize() {
return chunkSize;
}

@Override
protected void doClose() {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
if (cancellable != null) {
logger.warn("Repository closed during cooldown period");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to log this at warn level?

Copy link
Member Author

@original-brownbear original-brownbear Jan 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured this should be somewhat visible, it's not great if this happens because the next master will not start waiting again. This may be something we want to add but I figured it might not be worth the extra complication because a master failover won't be instant (since the current master must have worked fine to set safe and pending generation equal before getting to the wait) so that "wait' might be good enough?

... retracted see below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the situation here is better than I described above ... since if we're running into this we're always re-running the last step of the delete or snapshot operation on the next master (and will fail there because the repository generation has already moved) which will trigger another wait period. So this isn't a bad spot at all :) => moving this to DEBUG.

cancellable.cancel();
}
super.doClose();
}
}