Skip to content

Commit

Permalink
test + docs
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Jan 16, 2020
1 parent d0ffb44 commit f9047d7
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -142,9 +143,13 @@ class S3Repository extends BlobStoreRepository {
/**
* Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the
* backwards compatible snapshot format from before
* {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
* {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}).
* This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when
* doing repository operations in rapid succession.
* doing repository operations in rapid succession on a repository in the old metadata format.
* This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository
* becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than
* {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new
* format and disable the cooldown period.
*/
static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting(
"cooldown_period",
Expand Down Expand Up @@ -258,20 +263,31 @@ private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
return new ActionListener<>() {
@Override
public void onResponse(T response) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(() -> wrappedListener.onResponse(response), coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}

@Override
public void onFailure(Exception e) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(() -> wrappedListener.onFailure(e), coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
};
}

private void logCooldownInfo() {
logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" +
" and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " +
"repository corruption. To get rid of this message and move to the new repository metadata format, either remove " +
"all snapshots older than version [{}] from the repository or create a new repository at an empty location.",
coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
}

private static BlobPath buildBasePath(RepositoryMetaData metadata) {
final String basePath = BASE_PATH_SETTING.get(metadata.settings());
if (Strings.hasLength(basePath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,48 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.s3.S3HttpHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {

private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(5L);

@Override
protected String repositoryType() {
return S3Repository.TYPE;
Expand Down Expand Up @@ -82,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret");

return Settings.builder()
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl())
// Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
.put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true)
Expand All @@ -92,6 +111,41 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

public void testEnforcedCooldownPeriod() throws IOException {
final String repoName = createRepository(randomName(), Settings.builder().put(repositorySettings())
.put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build());

final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old")
.setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId();
final RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final RepositoryData repositoryData =
PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f)));
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
final BytesReference serialized =
BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false));
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
try (InputStream stream = serialized.streamInput()) {
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true);
}
})));

final String newSnapshotName = "snapshot-new";
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeFastDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down

0 comments on commit f9047d7

Please sign in to comment.