From d5c26f543a6926892c647bff9ba2c479535fb7d3 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Fri, 29 Nov 2019 19:48:39 +0100
Subject: [PATCH 01/22] Use Cluster State to Track Repository Generation
Step on the road to #49060.
This commit adds the logic to keep track of a repository's generation
across repository operations.
---
.../elasticsearch/cluster/ClusterModule.java | 2 +
.../repositories/RepositoriesService.java | 15 +-
.../repositories/RepositoriesState.java | 211 ++++++++++++++++++
.../blobstore/BlobStoreRepository.java | 136 +++++++++--
.../repositories/RepositoriesStateTests.java | 55 +++++
.../blobstore/BlobStoreRepositoryTests.java | 6 +
.../DedicatedClusterSnapshotRestoreIT.java | 7 +-
.../SharedClusterSnapshotRestoreIT.java | 5 +-
.../MockEventuallyConsistentRepository.java | 10 +-
...ckEventuallyConsistentRepositoryTests.java | 5 +-
10 files changed, 417 insertions(+), 35 deletions(-)
create mode 100644 server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
create mode 100644 server/src/test/java/org/elasticsearch/repositories/RepositoriesStateTests.java
diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
index e445615e0fc73..9fd92160beee6 100644
--- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
+++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
@@ -70,6 +70,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksNodeService;
import org.elasticsearch.plugins.ClusterPlugin;
+import org.elasticsearch.repositories.RepositoriesState;
import org.elasticsearch.script.ScriptMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskResultsService;
@@ -114,6 +115,7 @@ public ClusterModule(Settings settings, ClusterService clusterService, List getNamedWriteables() {
List entries = new ArrayList<>();
// Cluster State
+ registerClusterCustom(entries, RepositoriesState.TYPE, RepositoriesState::new, RepositoriesState::readDiffFrom);
registerClusterCustom(entries, SnapshotsInProgress.TYPE, SnapshotsInProgress::new, SnapshotsInProgress::readDiffFrom);
registerClusterCustom(entries, RestoreInProgress.TYPE, RestoreInProgress::new, RestoreInProgress::readDiffFrom);
registerClusterCustom(entries, SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new,
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
index 03b283c4aafe5..0ab97a10b842b 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
@@ -207,6 +207,8 @@ public ClusterState execute(ClusterState currentState) {
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);
+ final RepositoriesState existingStates = currentState.custom(RepositoriesState.TYPE);
+ final RepositoriesState.Builder updatedStates = RepositoriesState.builder();
if (repositories != null && repositories.repositories().size() > 0) {
List repositoriesMetaData = new ArrayList<>(repositories.repositories().size());
boolean changed = false;
@@ -215,13 +217,21 @@ public ClusterState execute(ClusterState currentState) {
logger.info("delete repository [{}]", repositoryMetaData.name());
changed = true;
} else {
+ if (existingStates != null) {
+ final RepositoriesState.State repoState = existingStates.state(repositoryMetaData.name());
+ if (repoState != null) {
+ updatedStates.putState(
+ repositoryMetaData.name(), repoState.generation(), repoState.pendingGeneration());
+ }
+ }
repositoriesMetaData.add(repositoryMetaData);
}
}
if (changed) {
repositories = new RepositoriesMetaData(repositoriesMetaData);
mdBuilder.putCustom(RepositoriesMetaData.TYPE, repositories);
- return ClusterState.builder(currentState).metaData(mdBuilder).build();
+ return ClusterState.builder(currentState).putCustom(RepositoriesState.TYPE, updatedStates.build())
+ .metaData(mdBuilder).build();
}
}
if (Regex.isMatchAllPattern(request.name())) { // we use a wildcard so we don't barf if it's not present.
@@ -293,6 +303,9 @@ public void applyClusterState(ClusterChangedEvent event) {
// Check if repositories got changed
if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equals(newMetaData))) {
+ for (Repository repo : repositories.values()) {
+ repo.updateState(state);
+ }
return;
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
new file mode 100644
index 0000000000000..07b125e0e6106
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.AbstractNamedDiffable;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.NamedDiff;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Tracks the current generation at which the {@link RepositoryData} for each writable
+ * {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} in the cluster can be found.
+ * See documentation for the package {@link org.elasticsearch.repositories.blobstore} for details.
+ */
+public final class RepositoriesState extends AbstractNamedDiffable implements ClusterState.Custom {
+
+ public static final Version REPO_GEN_IN_CS_VERSION = Version.V_8_0_0;
+
+ public static final String TYPE = "repositories_state";
+
+ private final Map states;
+
+ private RepositoriesState(Map states) {
+ this.states = states;
+ }
+
+ public RepositoriesState(StreamInput in) throws IOException {
+ this(in.readMap(StreamInput::readString, State::new));
+ }
+
+ public State state(String repoName) {
+ return states.get(repoName);
+ }
+
+ public static NamedDiff readDiffFrom(StreamInput in) throws IOException {
+ return readDiffFrom(ClusterState.Custom.class, TYPE, in);
+ }
+
+ @Override
+ public Version getMinimalSupportedVersion() {
+ return REPO_GEN_IN_CS_VERSION;
+ }
+
+ @Override
+ public String getWriteableName() {
+ return TYPE;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeMap(states, StreamOutput::writeString, (o, v) -> v.writeTo(o));
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ for (Map.Entry stringStateEntry : states.entrySet()) {
+ builder.field(stringStateEntry.getKey(), stringStateEntry.getValue());
+ }
+ return builder;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other instanceof RepositoriesState == false) {
+ return false;
+ }
+ final RepositoriesState that = (RepositoriesState) other;
+ return states.equals(that.states);
+ }
+
+ @Override
+ public int hashCode() {
+ return states.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(this);
+ }
+
+ public static final class State implements Writeable, ToXContent {
+
+ private final long generation;
+
+ private final long pendingGeneration;
+
+ private State(long generation, long pendingGeneration) {
+ assert generation <= pendingGeneration :
+ "Pending generation [" + pendingGeneration + "] smaller than generation [" + generation + "]";
+ this.generation = generation;
+ this.pendingGeneration = pendingGeneration;
+ }
+
+ private State(StreamInput in) throws IOException {
+ this(in.readLong(), in.readLong());
+ }
+
+ /**
+ * Returns the current repository generation for the repository.
+ *
+ * @return current repository generation that should be used for reads of the repository's {@link RepositoryData}
+ */
+ public long generation() {
+ return generation;
+ }
+
+ /**
+ * Latest repository generation that a write was attempted for.
+ *
+ * @return latest repository generation that a write was attempted for
+ */
+ public long pendingGeneration() {
+ return pendingGeneration;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeLong(generation);
+ out.writeLong(pendingGeneration);
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(this);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ return builder.startObject().field("generation", generation).field("pending", pendingGeneration).endObject();
+ }
+
+ @Override
+ public boolean isFragment() {
+ return false;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other instanceof RepositoriesState.State == false) {
+ return false;
+ }
+ final RepositoriesState.State that = (RepositoriesState.State) other;
+ return this.generation == that.generation && this.pendingGeneration == that.pendingGeneration;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(generation, pendingGeneration);
+ }
+ }
+
+ public static final class Builder {
+
+ private final Map stateMap = new HashMap<>();
+
+ private Builder() {
+ }
+
+ public Builder putAll(RepositoriesState state) {
+ stateMap.putAll(state.states);
+ return this;
+ }
+
+ public Builder putState(String name, long generation, long pending) {
+ stateMap.put(name, new State(generation, pending));
+ return this;
+ }
+
+ public RepositoriesState build() {
+ return new RepositoriesState(Map.copyOf(stateMap));
+ }
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 1e3076b258104..acd21a5bc62a2 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -36,6 +36,7 @@
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
@@ -90,6 +91,7 @@
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.RepositoriesState;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData;
@@ -116,6 +118,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
@@ -124,6 +127,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import java.util.stream.LongStream;
import java.util.stream.Stream;
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
@@ -211,6 +215,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private final BlobPath basePath;
+ private final ClusterService clusterService;
+
/**
* Constructs new BlobStoreRepository
* @param metadata The metadata for this repository including name and settings
@@ -223,6 +229,7 @@ protected BlobStoreRepository(
final BlobPath basePath) {
this.metadata = metadata;
this.threadPool = clusterService.getClusterApplierService().threadPool();
+ this.clusterService = clusterService;
this.compress = COMPRESS_SETTING.get(metadata.settings());
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
@@ -293,7 +300,16 @@ public void updateState(ClusterState state) {
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
}
+ final RepositoriesState repositoriesState = state.custom(RepositoriesState.TYPE);
+ if (repositoriesState != null) {
+ final RepositoriesState.State repoState = repositoriesState.state(metadata.name());
+ if (repoState != null) {
+ bestGenerationFromCS = Math.max(bestGenerationFromCS, repoState.generation());
+ }
+ }
+
final long finalBestGen = bestGenerationFromCS;
+
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));
}
@@ -1042,26 +1058,73 @@ public boolean isReadOnly() {
}
/**
+ * Writing a new index generation is a three step process.
+ * First, the {@link RepositoriesState.State} entry for this repository is set into {@code pending} state while its
+ * generation {@code N} remains unchanged.
+ * Second, the updated {@link RepositoryData} is written to generation {@code N + 1}.
+ * Lastly, the {@link RepositoriesState.State} entry for this repository is updated to the new generation {@code N + 1} and the pending
+ * flag set back to {@code false}.
+ *
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob
* @param listener completion listener
*/
protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, ActionListener listener) {
- ActionListener.completeWith(listener, () -> {
- assert isReadOnly() == false; // can not write to a read only repository
- final long currentGen = repositoryData.getGenId();
- if (currentGen != expectedGen) {
- // the index file was updated by a concurrent operation, so we were operating on stale
- // repository data
- throw new RepositoryException(metadata.name(),
- "concurrent modification of the index-N file, expected current generation [" + expectedGen +
- "], actual current generation [" + currentGen + "] - possibly due to simultaneous snapshot deletion requests");
+ assert isReadOnly() == false; // can not write to a read only repository
+ final long currentGen = repositoryData.getGenId();
+ if (currentGen != expectedGen) {
+ // the index file was updated by a concurrent operation, so we were operating on stale
+ // repository data
+ throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
+ expectedGen + "], actual current generation [" + currentGen +
+ "] - possibly due to simultaneous snapshot deletion requests");
+ }
+
+ // Step 1: Set repository generation state to the next possible pending generation
+ final StepListener setPendingStep = new StepListener<>();
+ clusterService.submitStateUpdateTask("set pending repository generation", new ClusterStateUpdateTask() {
+
+ private long newGen;
+
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ final RepositoriesState state = Optional.ofNullable(currentState.custom(RepositoriesState.TYPE))
+ .orElseGet(() -> RepositoriesState.builder().putState(metadata.name(), expectedGen, expectedGen).build());
+ final RepositoriesState.State repoState = Optional.ofNullable(state.state(metadata.name())).orElseGet(
+ () -> RepositoriesState.builder().putState(metadata.name(), expectedGen, expectedGen).build().state(metadata.name()));
+ if (repoState.pendingGeneration() != repoState.generation()) {
+ logger.warn("Trying to write new repository data of generation [{}] over unfinished write, repo is in state [{}]",
+ repoState.pendingGeneration(), repoState);
+ }
+ if (expectedGen != repoState.generation()) {
+ throw new IllegalStateException(
+ "Expected generation [" + expectedGen + "] does not match generation tracked in [" + repoState + "]");
+ }
+ newGen = repoState.pendingGeneration() + 1;
+ final RepositoriesState updated =
+ RepositoriesState.builder().putAll(state).putState(
+ metadata.name(), repoState.generation(), newGen).build();
+ return ClusterState.builder(currentState).putCustom(RepositoriesState.TYPE, updated).build();
+ }
+
+ @Override
+ public void onFailure(String source, Exception e) {
+ listener.onFailure(e);
}
- final long newGen = currentGen + 1;
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ setPendingStep.onResponse(newGen);
+ }
+ });
+
+ // Step 2: Write new index-N blob to repository and update index.latest
+ setPendingStep.whenComplete(newGen -> threadPool().generic().execute(ActionRunnable.wrap(listener, l -> {
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
- "Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
+ "Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get()
+ + "] already");
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
@@ -1081,18 +1144,49 @@ protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, bo
genBytes = bStream.bytes();
}
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
+
+ // Step 3: Update CS to reflect new repository generation.
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
- // delete the N-2 index file if it exists, keep the previous one around as a backup
- if (newGen - 2 >= 0) {
- final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
- try {
- blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
- } catch (IOException e) {
- logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile);
+ clusterService.submitStateUpdateTask("update_repo_gen", new ClusterStateUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ final RepositoriesState state = currentState.custom(RepositoriesState.TYPE);
+ final RepositoriesState.State repoState = state.state(metadata.name());
+ final long prevGeneration = repoState.generation();
+ if (prevGeneration != expectedGen) {
+ throw new IllegalStateException("Tried to update repo generation to [" + newGen
+ + "] but saw unexpected generation in state [" + repoState + "]");
+ }
+ if (repoState.pendingGeneration() == prevGeneration) {
+ throw new IllegalStateException(
+ "Tried to update non-pending repo state [" + repoState + "] after write to generation [" + newGen + "]");
+ }
+ final RepositoriesState updated =
+ RepositoriesState.builder().putAll(state).putState(metadata.name(), newGen, newGen).build();
+ return ClusterState.builder(currentState).putCustom(RepositoriesState.TYPE, updated).build();
}
- }
- return null;
- });
+
+ @Override
+ public void onFailure(String source, Exception e) {
+ l.onFailure(e);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ threadPool.generic().execute(ActionRunnable.run(l, () -> {
+ // delete all now outdated index files
+ final List oldIndexN = LongStream.range(Math.max(expectedGen, 0), newGen)
+ .mapToObj(gen -> INDEX_FILE_PREFIX + gen)
+ .collect(Collectors.toList());
+ try {
+ blobContainer().deleteBlobsIgnoringIfNotExists(oldIndexN);
+ } catch (IOException e) {
+ logger.warn("Failed to clean up old index blobs {}", oldIndexN);
+ }
+ }));
+ }
+ });
+ })), listener::onFailure);
}
/**
diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesStateTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesStateTests.java
new file mode 100644
index 0000000000000..b69f82c29ee59
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesStateTests.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+public class RepositoriesStateTests extends ESTestCase {
+
+ public void testEqualsAndHashCode() {
+ final RepositoriesState repositoriesState1 = generateRandomRepositoriesState();
+ final RepositoriesState repositoriesState2 = RepositoriesState.builder().putAll(repositoriesState1).build();
+ assertEquals(repositoriesState1, repositoriesState2);
+ assertEquals(repositoriesState1.hashCode(), repositoriesState2.hashCode());
+ }
+
+ public void testSerializationRoundTrip() throws IOException {
+ final RepositoriesState repositoriesState1 = generateRandomRepositoriesState();
+ final BytesStreamOutput out = new BytesStreamOutput();
+ repositoriesState1.writeTo(out);
+ final RepositoriesState repositoriesState2 = new RepositoriesState(out.bytes().streamInput());
+ assertEquals(repositoriesState1, repositoriesState2);
+ assertEquals(repositoriesState1.hashCode(), repositoriesState2.hashCode());
+ }
+
+ private static RepositoriesState generateRandomRepositoriesState() {
+ final int repoCount = randomIntBetween(0, 10);
+ final RepositoriesState.Builder builder = RepositoriesState.builder();
+ for (int i = 0; i < repoCount; i++) {
+ final String repoName = randomAlphaOfLength(10);
+ final long gen = randomLongBetween(RepositoryData.EMPTY_REPO_GEN, Integer.MAX_VALUE);
+ builder.putState(repoName, gen, gen + randomLongBetween(0, Integer.MAX_VALUE));
+ }
+ return builder.build();
+ }
+}
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
index 6d6248c446df1..e7fe09c5a3a95 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
@@ -82,6 +82,12 @@ protected void assertSnapshotOrGenericThread() {
}
}
+ @Override
+ public void tearDown() throws Exception {
+ client().admin().cluster().prepareDeleteRepository("*").get();
+ super.tearDown();
+ }
+
public void testRetrieveSnapshots() throws Exception {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
index a11fb9d13bc04..ab5aebe853b9e 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
@@ -497,11 +497,10 @@ public void testSnapshotWithStuckNode() throws Exception {
logger.info("--> Go through a loop of creating and deleting a snapshot to trigger repository cleanup");
client().admin().cluster().prepareCleanupRepository("test-repo").get();
- // Subtract four files that will remain in the repository:
+ // Expect two files to remain in the repository:
// (1) index-(N+1)
- // (2) index-N (because we keep the previous version) and
- // (3) index-latest
- assertFileCount(repo, 3);
+ // (2) index-latest
+ assertFileCount(repo, 2);
logger.info("--> done");
}
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
index 0ec69bdaa0424..e31ad37296093 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
@@ -1321,9 +1321,8 @@ public void testDeleteSnapshot() throws Exception {
logger.info("--> delete the last snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot).get();
- logger.info("--> make sure that number of files is back to what it was when the first snapshot was made, " +
- "plus one because one backup index-N file should remain");
- assertFileCount(repo, numberOfFiles[0] + 1);
+ logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
+ assertFileCount(repo, numberOfFiles[0]);
}
public void testGetSnapshotsNoRepos() {
diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java
index 0931ceb494827..880008a96b90b 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java
@@ -282,13 +282,13 @@ public Map listBlobsByPrefix(String blobNamePrefix) {
.collect(Collectors.toList())));
}
- // Randomly filter out the latest /index-N blob from a listing to test that tracking of it in latestKnownRepoGen
- // overrides an inconsistent listing
+ // Randomly filter out the index-N blobs from a listing to test that tracking of it in latestKnownRepoGen and the cluster state
+ // ensures consistent repository operations
private Map maybeMissLatestIndexN(Map listing) {
- // Only filter out latest index-N at the repo root and only as long as we're not in a forced consistent state
- if (path.parent() == null && context.consistent == false && random.nextBoolean()) {
+ // Randomly filter out index-N blobs at the repo root to proof that we don't need them to be consistently listed
+ if (path.parent() == null && context.consistent == false) {
final Map filtered = new HashMap<>(listing);
- filtered.remove(BlobStoreRepository.INDEX_FILE_PREFIX + latestKnownRepoGen.get());
+ filtered.keySet().removeIf(b -> b.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX) && random.nextBoolean());
return Map.copyOf(filtered);
}
return listing;
diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java
index f1cf314e3158a..fd04b9297ba74 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java
@@ -21,6 +21,7 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
+import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.Settings;
@@ -134,9 +135,11 @@ public void testOverwriteShardSnapBlobFails() throws IOException {
public void testOverwriteSnapshotInfoBlob() {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
+ final ClusterService clusterService = BlobStoreTestUtil.mockClusterService();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
- xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
+ xContentRegistry(), clusterService, blobStoreContext, random())) {
+ clusterService.addStateApplier(event -> repository.updateState(event.state()));
repository.start();
// We create a snap- blob for snapshot "foo" in the first generation
From ab530c5014c3a1ec7e45d92cd01ad04cc585d603 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Fri, 29 Nov 2019 20:21:21 +0100
Subject: [PATCH 02/22] simpler change
---
.../blobstore/BlobStoreRepository.java | 7 ++--
.../blobstore/BlobStoreRepositoryTests.java | 35 +++++++++++--------
2 files changed, 25 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index acd21a5bc62a2..6f4f1604a7237 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -309,7 +309,6 @@ public void updateState(ClusterState state) {
}
final long finalBestGen = bestGenerationFromCS;
-
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));
}
@@ -1097,14 +1096,16 @@ public ClusterState execute(ClusterState currentState) {
logger.warn("Trying to write new repository data of generation [{}] over unfinished write, repo is in state [{}]",
repoState.pendingGeneration(), repoState);
}
- if (expectedGen != repoState.generation()) {
+ if (expectedGen != RepositoryData.EMPTY_REPO_GEN && expectedGen != repoState.generation()) {
throw new IllegalStateException(
"Expected generation [" + expectedGen + "] does not match generation tracked in [" + repoState + "]");
}
newGen = repoState.pendingGeneration() + 1;
+ final long safeGeneration = expectedGen == RepositoryData.EMPTY_REPO_GEN ? RepositoryData.EMPTY_REPO_GEN
+ : repoState.generation();
final RepositoriesState updated =
RepositoriesState.builder().putAll(state).putState(
- metadata.name(), repoState.generation(), newGen).build();
+ metadata.name(), safeGeneration, newGen).build();
return ClusterState.builder(currentState).putCustom(RepositoriesState.TYPE, updated).build();
}
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
index e7fe09c5a3a95..b59b2b4c92949 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
@@ -23,6 +23,7 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
@@ -33,6 +34,7 @@
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.repositories.RepositoriesState;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
@@ -49,6 +51,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
@@ -82,12 +85,6 @@ protected void assertSnapshotOrGenericThread() {
}
}
- @Override
- public void tearDown() throws Exception {
- client().admin().cluster().prepareDeleteRepository("*").get();
- super.tearDown();
- }
-
public void testRetrieveSnapshots() throws Exception {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
@@ -143,7 +140,7 @@ public void testRetrieveSnapshots() throws Exception {
public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
final BlobStoreRepository repository = setupRepo();
-
+ final long pendingGeneration = getPendingGeneration(repository);
// write to and read from a index file with no entries
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().size(), equalTo(0));
final RepositoryData emptyData = RepositoryData.EMPTY;
@@ -152,7 +149,7 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
assertEquals(repoData, emptyData);
assertEquals(repoData.getIndices().size(), 0);
assertEquals(repoData.getSnapshotIds().size(), 0);
- assertEquals(0L, repoData.getGenId());
+ assertEquals(pendingGeneration + 1L, repoData.getGenId());
// write to and read from an index file with snapshots but no indices
repoData = addRandomSnapshotsToRepoData(repoData, false);
@@ -169,27 +166,30 @@ public void testIndexGenerationalFiles() throws Exception {
final BlobStoreRepository repository = setupRepo();
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), RepositoryData.EMPTY);
+ final long pendingGeneration = getPendingGeneration(repository);
+
// write to index generational file
RepositoryData repositoryData = generateRandomRepoData();
writeIndexGen(repository, repositoryData, RepositoryData.EMPTY_REPO_GEN);
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), equalTo(repositoryData));
- assertThat(repository.latestIndexBlobId(), equalTo(0L));
- assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));
+ final long expectedGeneration = pendingGeneration + 1L;
+ assertThat(repository.latestIndexBlobId(), equalTo(expectedGeneration));
+ assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(expectedGeneration));
// adding more and writing to a new index generational file
repositoryData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true);
writeIndexGen(repository, repositoryData, repositoryData.getGenId());
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
- assertThat(repository.latestIndexBlobId(), equalTo(1L));
- assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));
+ assertThat(repository.latestIndexBlobId(), equalTo(expectedGeneration + 1L));
+ assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(expectedGeneration + 1L));
// removing a snapshot and writing to a new index generational file
repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshot(
repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY);
writeIndexGen(repository, repositoryData, repositoryData.getGenId());
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
- assertThat(repository.latestIndexBlobId(), equalTo(2L));
- assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
+ assertThat(repository.latestIndexBlobId(), equalTo(expectedGeneration + 2L));
+ assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(expectedGeneration + 2L));
}
public void testRepositoryDataConcurrentModificationNotAllowed() {
@@ -227,6 +227,13 @@ private static void writeIndexGen(BlobStoreRepository repository, RepositoryData
future.actionGet();
}
+ private long getPendingGeneration(Repository repository) {
+ final ClusterState state = client().admin().cluster().prepareState().get().getState();
+ return Optional.ofNullable(state.custom(RepositoriesState.TYPE)).map(
+ repositoriesState -> repositoriesState.state(repository.getMetadata().name()))
+ .map(RepositoriesState.State::pendingGeneration).orElse(RepositoryData.EMPTY_REPO_GEN);
+ }
+
private BlobStoreRepository setupRepo() {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
From a67bb904afbae65b750d5a4b3b744aee5b2b0864 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Fri, 29 Nov 2019 20:34:06 +0100
Subject: [PATCH 03/22] cleaner
---
.../repositories/blobstore/BlobStoreRepository.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 6f4f1604a7237..5b9513e3b18aa 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -991,7 +991,7 @@ public void endVerification(String seed) {
// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
// and concurrent modifications.
// Protected for use in MockEventuallyConsistentRepository
- protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
+ private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
@Override
public void getRepositoryData(ActionListener listener) {
From 7072b7e3efe9b04b00e658f2dd03f8b2004216ac Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Fri, 29 Nov 2019 21:26:40 +0100
Subject: [PATCH 04/22] add docs
---
.../repositories/blobstore/package-info.java | 46 +++++++++++++++----
1 file changed, 38 insertions(+), 8 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
index d4f3329d354b4..3f2bda9025369 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
@@ -96,6 +96,9 @@
*
* - The blobstore repository stores the {@code RepositoryData} in blobs named with incrementing suffix {@code N} at {@code /index-N}
* directly under the repository's root.
+ * - For each {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} (that is not mounted in read-only mode) an entry
+ * of type {@link org.elasticsearch.repositories.RepositoriesState.State} exists in the cluster state. It tracks the current valid
+ * generation {@code N} as well as the latest generation that a write was attempted for.
* - The blobstore also stores the most recent {@code N} as a 64bit long in the blob {@code /index.latest} directly under the
* repository's root.
*
@@ -116,6 +119,38 @@
*
*
*
+ *
+ * Writing Updated RepositoryData to the Repository
+ *
+ * Writing an updated {@link org.elasticsearch.repositories.RepositoryData} to a blob store repository is an operation that uses
+ * the cluster state to ensure that a specific {@code index-N} blob is never accidentally overwritten in a master failover scenario.
+ * The specific steps to writing a new {@code index-N} blob and thus making changes from a snapshot-create or delete operation visible
+ * to read operations on the repository are as follows and all run on the master node:
+ *
+ *
+ * - Write an updated value of {@link org.elasticsearch.repositories.RepositoriesState.State} for the repository that has the same
+ * {@link org.elasticsearch.repositories.RepositoriesState.State#generation()} as the existing entry and has a value of
+ * {@link org.elasticsearch.repositories.RepositoriesState.State#pendingGeneration()} one greater than the {@code pendingGeneration} of the
+ * existing entry.
+ * - On the same master node, after the cluster state has been updated in the first step, write the new {@code index-N} blob and
+ * also update the contents of the {@code index.latest} blob. Note that updating the index.latest blob is done on a best effort
+ * basis and that there is a chance for a stuck master-node to overwrite the contents of the {@code index.latest} blob after a newer
+ * {@code index-N} has been written by another master node. This is acceptable since the contents of {@code index.latest} are not used
+ * during normal operation of the repository and must only be correct for purposes of mounting the contents of a
+ * {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} as a read-only url repository.
+ * - After the write has finished, set the value of {@code RepositoriesState.State#generation} to the value used for
+ * {@code RepositoriesState.State#pendingGeneration} so that the new entry for the state of the repository has {@code generation} and
+ * {@code pendingGeneration} set to the same value to signalize a clean repository state with no potentially failed writes newer than the
+ * last valid {@code index-N} blob in the repository.
+ *
+ *
+ * If either of the last two steps in the above fails or master fails over to a new node at any point, then a subsequent operation
+ * trying to write a new {@code index-N} blob will never use the same value of {@code N} used by a previous attempt. It will always start
+ * over at the first of the above three steps, incrementing the {@code pendingGeneration} generation before attempting a write, thus
+ * ensuring no overwriting of a {@code index-N} blob ever to occur. The use of the cluster state to track the latest repository generation
+ * {@code N} and ensuring no overwriting of {@code index-N} blobs to ever occur allows the blob store repository to properly function even
+ * on blob stores with neither a consistent list operation nor an atomic "write but not overwrite" operation.
+ *
* Creating a Snapshot
*
* Creating a snapshot in the repository happens in the two steps described in detail below.
@@ -160,11 +195,7 @@
* {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}
* Write the {@link org.elasticsearch.snapshots.SnapshotInfo} blob for the given snapshot to the key {@code /snap-${snapshot-uuid}.dat}
* directly under the repository root.
- * Write an updated {@code RepositoryData} blob to the key {@code /index-${N+1}} using the {@code N} determined when initializing the
- * snapshot in the first step. When doing this, the implementation checks that the blob for generation {@code N + 1} has not yet been
- * written to prevent concurrent updates to the repository. If the blob for {@code N + 1} already exists the execution of finalization
- * stops under the assumption that a master failover occurred and the snapshot has already been finalized by the new master.
- * Write the updated {@code /index.latest} blob containing the new repository generation {@code N + 1}.
+ * Write an updated {@code RepositoryData} blob containing the new snapshot.
*
*
* Deleting a Snapshot
@@ -189,9 +220,8 @@
* blob so that it can be deleted at the end of the snapshot delete process.
*
*
- * Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the
- * repository root and the repository generations that were changed in the affected shards adjusted.
- * Write an updated {@code index.latest} blob containing {@code N + 1}.
+ * Write an updated {@code RepositoryData} blob with the deleted snapshot removed and containing the updated repository generations
+ * that changed for the shards affected by the delete.
* Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot
* as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.
* Delete all unreferenced blobs previously collected when updating the shard directories. Also, remove any index folders or blobs
From 3cd2e727d031dbbd45e82413beed8ca427575da5 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Fri, 29 Nov 2019 21:36:44 +0100
Subject: [PATCH 05/22] better wording
---
.../repositories/blobstore/BlobStoreRepository.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 5b9513e3b18aa..6181328307bdc 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -1058,11 +1058,11 @@ public boolean isReadOnly() {
/**
* Writing a new index generation is a three step process.
- * First, the {@link RepositoriesState.State} entry for this repository is set into {@code pending} state while its
- * generation {@code N} remains unchanged.
- * Second, the updated {@link RepositoryData} is written to generation {@code N + 1}.
- * Lastly, the {@link RepositoriesState.State} entry for this repository is updated to the new generation {@code N + 1} and the pending
- * flag set back to {@code false}.
+ * First, the {@link RepositoriesState.State} entry for this repository is set into a pending state by incrementing its
+ * pending generation {@code P} while its safe generation {@code N} remains unchanged.
+ * Second, the updated {@link RepositoryData} is written to generation {@code P + 1}.
+ * Lastly, the {@link RepositoriesState.State} entry for this repository is updated to the new generation {@code P + 1} and thus
+ * pending and safe generation are set to the same value marking the end of the update of the repository data.
*
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
From afa7b67b2d685a7ed8bb8f692d4e64ad000d7b46 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Sat, 30 Nov 2019 20:23:48 +0100
Subject: [PATCH 06/22] nicer docs
---
.../repositories/RepositoriesService.java | 12 +++--
.../repositories/RepositoriesState.java | 20 +++++++++
.../blobstore/BlobStoreRepository.java | 45 ++++++++++++-------
.../blobstore/BlobStoreRepositoryTests.java | 3 +-
4 files changed, 54 insertions(+), 26 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
index 0ab97a10b842b..4a440ebefb48c 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
@@ -207,7 +207,7 @@ public ClusterState execute(ClusterState currentState) {
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);
- final RepositoriesState existingStates = currentState.custom(RepositoriesState.TYPE);
+ final RepositoriesState existingStates = RepositoriesState.getOrEmpty(currentState);
final RepositoriesState.Builder updatedStates = RepositoriesState.builder();
if (repositories != null && repositories.repositories().size() > 0) {
List repositoriesMetaData = new ArrayList<>(repositories.repositories().size());
@@ -217,12 +217,10 @@ public ClusterState execute(ClusterState currentState) {
logger.info("delete repository [{}]", repositoryMetaData.name());
changed = true;
} else {
- if (existingStates != null) {
- final RepositoriesState.State repoState = existingStates.state(repositoryMetaData.name());
- if (repoState != null) {
- updatedStates.putState(
- repositoryMetaData.name(), repoState.generation(), repoState.pendingGeneration());
- }
+ final RepositoriesState.State repoState = existingStates.state(repositoryMetaData.name());
+ if (repoState != null) {
+ updatedStates.putState(
+ repositoryMetaData.name(), repoState.generation(), repoState.pendingGeneration());
}
repositoriesMetaData.add(repositoryMetaData);
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
index 07b125e0e6106..126c557dbfce1 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
@@ -23,6 +23,7 @@
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
+import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -46,6 +47,8 @@ public final class RepositoriesState extends AbstractNamedDiffable states;
private RepositoriesState(Map states) {
@@ -56,6 +59,23 @@ public RepositoriesState(StreamInput in) throws IOException {
this(in.readMap(StreamInput::readString, State::new));
}
+ /**
+ * Gets repositories state from the given cluster state or an empty repositories state if none is found in the cluster state.
+ *
+ * @param state cluster state
+ * @return RepositoriesState
+ */
+ public static RepositoriesState getOrEmpty(ClusterState state) {
+ return Objects.requireNonNullElse(state.custom(RepositoriesState.TYPE), RepositoriesState.EMPTY);
+ }
+
+ /**
+ * Get {@link State} for a repository.
+ *
+ * @param repoName repository name
+ * @return State for the repository or {@code null} if none is found
+ */
+ @Nullable
public State state(String repoName) {
return states.get(repoName);
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 6181328307bdc..608b1943075fc 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -300,12 +300,9 @@ public void updateState(ClusterState state) {
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
}
- final RepositoriesState repositoriesState = state.custom(RepositoriesState.TYPE);
- if (repositoriesState != null) {
- final RepositoriesState.State repoState = repositoriesState.state(metadata.name());
- if (repoState != null) {
- bestGenerationFromCS = Math.max(bestGenerationFromCS, repoState.generation());
- }
+ final RepositoriesState.State repoState = RepositoriesState.getOrEmpty(state).state(metadata.name());
+ if (repoState != null) {
+ bestGenerationFromCS = Math.max(bestGenerationFromCS, repoState.generation());
}
final long finalBestGen = bestGenerationFromCS;
@@ -1088,10 +1085,10 @@ protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, bo
@Override
public ClusterState execute(ClusterState currentState) {
- final RepositoriesState state = Optional.ofNullable(currentState.custom(RepositoriesState.TYPE))
- .orElseGet(() -> RepositoriesState.builder().putState(metadata.name(), expectedGen, expectedGen).build());
- final RepositoriesState.State repoState = Optional.ofNullable(state.state(metadata.name())).orElseGet(
- () -> RepositoriesState.builder().putState(metadata.name(), expectedGen, expectedGen).build().state(metadata.name()));
+ final RepositoriesState state = RepositoriesState.getOrEmpty(currentState);
+ final String repoName = metadata.name();
+ final RepositoriesState.State repoState = Optional.ofNullable(state.state(repoName)).orElseGet(
+ () -> RepositoriesState.builder().putState(repoName, expectedGen, expectedGen).build().state(repoName));
if (repoState.pendingGeneration() != repoState.generation()) {
logger.warn("Trying to write new repository data of generation [{}] over unfinished write, repo is in state [{}]",
repoState.pendingGeneration(), repoState);
@@ -1100,13 +1097,18 @@ public ClusterState execute(ClusterState currentState) {
throw new IllegalStateException(
"Expected generation [" + expectedGen + "] does not match generation tracked in [" + repoState + "]");
}
- newGen = repoState.pendingGeneration() + 1;
+ // If we run into the empty repo generation for the expected gen, the repo has been is assumed to have been cleared of all
+ // contents by an external process so we reset the safe generation to the empty generation.
final long safeGeneration = expectedGen == RepositoryData.EMPTY_REPO_GEN ? RepositoryData.EMPTY_REPO_GEN
: repoState.generation();
- final RepositoriesState updated =
- RepositoriesState.builder().putAll(state).putState(
- metadata.name(), safeGeneration, newGen).build();
- return ClusterState.builder(currentState).putCustom(RepositoriesState.TYPE, updated).build();
+ // Regardless of whether or not the safe generation has been reset, the pending generation always increments so that even
+ // if a repository has been manually cleared of all contents we will never reuse the same repository generation.
+ // This is motivated by the consistency behavior the S3 based blob repository implementation has to support which does not
+ // offer any consistency guarantees when it comes to overwriting the same blob name with different content.
+ newGen = repoState.pendingGeneration() + 1;
+ return ClusterState.builder(currentState).putCustom(
+ RepositoriesState.TYPE, RepositoriesState.builder().putAll(state).putState(repoName, safeGeneration, newGen).build())
+ .build();
}
@Override
@@ -1116,8 +1118,17 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ assert assertExpectedGeneration(newState);
setPendingStep.onResponse(newGen);
}
+
+ private boolean assertExpectedGeneration(ClusterState newState) {
+ final RepositoriesState repositoriesState = newState.custom(RepositoriesState.TYPE);
+ final RepositoriesState.State repoState = repositoriesState.state(metadata.name());
+ assert newGen == repoState.pendingGeneration()
+ : "State [" + repoState + "] did not contain assumed pending generation [" + newGen + "]";
+ return true;
+ }
});
// Step 2: Write new index-N blob to repository and update index.latest
@@ -1148,7 +1159,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
// Step 3: Update CS to reflect new repository generation.
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
- clusterService.submitStateUpdateTask("update_repo_gen", new ClusterStateUpdateTask() {
+ clusterService.submitStateUpdateTask("set safe repository generation", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoriesState state = currentState.custom(RepositoriesState.TYPE);
@@ -1174,7 +1185,7 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
- threadPool.generic().execute(ActionRunnable.run(l, () -> {
+ threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
// delete all now outdated index files
final List oldIndexN = LongStream.range(Math.max(expectedGen, 0), newGen)
.mapToObj(gen -> INDEX_FILE_PREFIX + gen)
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
index b59b2b4c92949..9f01ccd982d95 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
@@ -229,8 +229,7 @@ private static void writeIndexGen(BlobStoreRepository repository, RepositoryData
private long getPendingGeneration(Repository repository) {
final ClusterState state = client().admin().cluster().prepareState().get().getState();
- return Optional.ofNullable(state.custom(RepositoriesState.TYPE)).map(
- repositoriesState -> repositoriesState.state(repository.getMetadata().name()))
+ return Optional.ofNullable(RepositoriesState.getOrEmpty(state).state(repository.getMetadata().name()))
.map(RepositoriesState.State::pendingGeneration).orElse(RepositoryData.EMPTY_REPO_GEN);
}
From 59352a18f44e3b2fc0373790bc3b53654f17d3b1 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Mon, 2 Dec 2019 15:57:00 +0100
Subject: [PATCH 07/22] small changes
---
.../blobstore/BlobStoreRepository.java | 176 +++++++++---------
1 file changed, 89 insertions(+), 87 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 608b1943075fc..391601cbdcf42 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -1072,67 +1072,67 @@ protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, bo
if (currentGen != expectedGen) {
// the index file was updated by a concurrent operation, so we were operating on stale
// repository data
- throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
- expectedGen + "], actual current generation [" + currentGen +
- "] - possibly due to simultaneous snapshot deletion requests");
+ listener.onFailure(new RepositoryException(metadata.name(),
+ "concurrent modification of the index-N file, expected current generation [" + expectedGen +
+ "], actual current generation [" + currentGen + "] - possibly due to simultaneous snapshot deletion requests"));
+ return;
}
// Step 1: Set repository generation state to the next possible pending generation
final StepListener setPendingStep = new StepListener<>();
- clusterService.submitStateUpdateTask("set pending repository generation", new ClusterStateUpdateTask() {
-
- private long newGen;
-
- @Override
- public ClusterState execute(ClusterState currentState) {
- final RepositoriesState state = RepositoriesState.getOrEmpty(currentState);
- final String repoName = metadata.name();
- final RepositoriesState.State repoState = Optional.ofNullable(state.state(repoName)).orElseGet(
- () -> RepositoriesState.builder().putState(repoName, expectedGen, expectedGen).build().state(repoName));
- if (repoState.pendingGeneration() != repoState.generation()) {
- logger.warn("Trying to write new repository data of generation [{}] over unfinished write, repo is in state [{}]",
- repoState.pendingGeneration(), repoState);
- }
- if (expectedGen != RepositoryData.EMPTY_REPO_GEN && expectedGen != repoState.generation()) {
- throw new IllegalStateException(
- "Expected generation [" + expectedGen + "] does not match generation tracked in [" + repoState + "]");
+ clusterService.submitStateUpdateTask("set pending repository generation [" + metadata.name() + "][" + expectedGen + "]",
+ new ClusterStateUpdateTask() {
+
+ private long newGen;
+
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ final RepositoriesState state = RepositoriesState.getOrEmpty(currentState);
+ final String repoName = metadata.name();
+ final RepositoriesState.State repoState = Optional.ofNullable(state.state(repoName)).orElseGet(
+ () -> RepositoriesState.builder().putState(repoName, expectedGen, expectedGen).build().state(repoName));
+ if (repoState.pendingGeneration() != repoState.generation()) {
+ logger.info("Trying to write new repository data over unfinished write, repo is in state [{}]", repoState);
+ }
+ assert expectedGen == RepositoryData.EMPTY_REPO_GEN || expectedGen == repoState.generation()
+ : "Expected non-empty generation [" + expectedGen + "] does not match generation tracked in [" + repoState + "]";
+ // If we run into the empty repo generation for the expected gen, the repo has been is assumed to have been cleared of
+ // all contents by an external process so we reset the safe generation to the empty generation.
+ final long safeGeneration = expectedGen == RepositoryData.EMPTY_REPO_GEN ? RepositoryData.EMPTY_REPO_GEN
+ : repoState.generation();
+ // Regardless of whether or not the safe generation has been reset, the pending generation always increments so that
+ // even if a repository has been manually cleared of all contents we will never reuse the same repository generation.
+ // This is motivated by the consistency behavior the S3 based blob repository implementation has to support which does
+ // not offer any consistency guarantees when it comes to overwriting the same blob name with different content.
+ newGen = repoState.pendingGeneration() + 1;
+ return ClusterState.builder(currentState).putCustom(
+ RepositoriesState.TYPE,
+ RepositoriesState.builder().putAll(state).putState(repoName, safeGeneration, newGen).build()).build();
}
- // If we run into the empty repo generation for the expected gen, the repo has been is assumed to have been cleared of all
- // contents by an external process so we reset the safe generation to the empty generation.
- final long safeGeneration = expectedGen == RepositoryData.EMPTY_REPO_GEN ? RepositoryData.EMPTY_REPO_GEN
- : repoState.generation();
- // Regardless of whether or not the safe generation has been reset, the pending generation always increments so that even
- // if a repository has been manually cleared of all contents we will never reuse the same repository generation.
- // This is motivated by the consistency behavior the S3 based blob repository implementation has to support which does not
- // offer any consistency guarantees when it comes to overwriting the same blob name with different content.
- newGen = repoState.pendingGeneration() + 1;
- return ClusterState.builder(currentState).putCustom(
- RepositoriesState.TYPE, RepositoriesState.builder().putAll(state).putState(repoName, safeGeneration, newGen).build())
- .build();
- }
- @Override
- public void onFailure(String source, Exception e) {
- listener.onFailure(e);
- }
+ @Override
+ public void onFailure(String source, Exception e) {
+ listener.onFailure(
+ new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e));
+ }
- @Override
- public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
- assert assertExpectedGeneration(newState);
- setPendingStep.onResponse(newGen);
- }
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ assert assertExpectedGeneration(newState);
+ setPendingStep.onResponse(newGen);
+ }
- private boolean assertExpectedGeneration(ClusterState newState) {
- final RepositoriesState repositoriesState = newState.custom(RepositoriesState.TYPE);
- final RepositoriesState.State repoState = repositoriesState.state(metadata.name());
- assert newGen == repoState.pendingGeneration()
- : "State [" + repoState + "] did not contain assumed pending generation [" + newGen + "]";
- return true;
- }
- });
+ private boolean assertExpectedGeneration(ClusterState newState) {
+ final RepositoriesState repositoriesState = newState.custom(RepositoriesState.TYPE);
+ final RepositoriesState.State repoState = repositoriesState.state(metadata.name());
+ assert newGen == repoState.pendingGeneration()
+ : "State [" + repoState + "] did not contain assumed pending generation [" + newGen + "]";
+ return true;
+ }
+ });
// Step 2: Write new index-N blob to repository and update index.latest
- setPendingStep.whenComplete(newGen -> threadPool().generic().execute(ActionRunnable.wrap(listener, l -> {
+ setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get()
@@ -1159,45 +1159,47 @@ private boolean assertExpectedGeneration(ClusterState newState) {
// Step 3: Update CS to reflect new repository generation.
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
- clusterService.submitStateUpdateTask("set safe repository generation", new ClusterStateUpdateTask() {
- @Override
- public ClusterState execute(ClusterState currentState) {
- final RepositoriesState state = currentState.custom(RepositoriesState.TYPE);
- final RepositoriesState.State repoState = state.state(metadata.name());
- final long prevGeneration = repoState.generation();
- if (prevGeneration != expectedGen) {
- throw new IllegalStateException("Tried to update repo generation to [" + newGen
- + "] but saw unexpected generation in state [" + repoState + "]");
- }
- if (repoState.pendingGeneration() == prevGeneration) {
- throw new IllegalStateException(
- "Tried to update non-pending repo state [" + repoState + "] after write to generation [" + newGen + "]");
+ clusterService.submitStateUpdateTask("set safe repository generation [" + metadata.name() + "][" + newGen + "]",
+ new ClusterStateUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ final RepositoriesState state = currentState.custom(RepositoriesState.TYPE);
+ final RepositoriesState.State repoState = state.state(metadata.name());
+ final long prevGeneration = repoState.generation();
+ if (prevGeneration != expectedGen) {
+ throw new IllegalStateException("Tried to update repo generation to [" + newGen
+ + "] but saw unexpected generation in state [" + repoState + "]");
+ }
+ if (repoState.pendingGeneration() == prevGeneration) {
+ throw new IllegalStateException(
+ "Tried to update non-pending repo state [" + repoState + "] after write to generation [" + newGen + "]");
+ }
+ final RepositoriesState updated =
+ RepositoriesState.builder().putAll(state).putState(metadata.name(), newGen, newGen).build();
+ return ClusterState.builder(currentState).putCustom(RepositoriesState.TYPE, updated).build();
}
- final RepositoriesState updated =
- RepositoriesState.builder().putAll(state).putState(metadata.name(), newGen, newGen).build();
- return ClusterState.builder(currentState).putCustom(RepositoriesState.TYPE, updated).build();
- }
- @Override
- public void onFailure(String source, Exception e) {
- l.onFailure(e);
- }
+ @Override
+ public void onFailure(String source, Exception e) {
+ l.onFailure(
+ new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e));
+ }
- @Override
- public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
- threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
- // delete all now outdated index files
- final List oldIndexN = LongStream.range(Math.max(expectedGen, 0), newGen)
- .mapToObj(gen -> INDEX_FILE_PREFIX + gen)
- .collect(Collectors.toList());
- try {
- blobContainer().deleteBlobsIgnoringIfNotExists(oldIndexN);
- } catch (IOException e) {
- logger.warn("Failed to clean up old index blobs {}", oldIndexN);
- }
- }));
- }
- });
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
+ // delete all now outdated index files
+ final List oldIndexN = LongStream.range(Math.max(expectedGen, 0), newGen)
+ .mapToObj(gen -> INDEX_FILE_PREFIX + gen)
+ .collect(Collectors.toList());
+ try {
+ blobContainer().deleteBlobsIgnoringIfNotExists(oldIndexN);
+ } catch (IOException e) {
+ logger.warn("Failed to clean up old index blobs {}", oldIndexN);
+ }
+ }));
+ }
+ });
})), listener::onFailure);
}
From 2b38e7bdcdfb8263ccbb6b67b525b099e453a414 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Mon, 2 Dec 2019 18:10:43 +0100
Subject: [PATCH 08/22] bck
---
.../cluster/metadata/RepositoryMetaData.java | 41 ++++++++++++--
.../repositories/RepositoriesService.java | 10 +---
.../repositories/RepositoriesState.java | 5 +-
.../repositories/RepositoryData.java | 6 ++
.../blobstore/BlobStoreRepository.java | 17 +++---
.../repositories/RepositoriesStateTests.java | 55 -------------------
6 files changed, 56 insertions(+), 78 deletions(-)
delete mode 100644 server/src/test/java/org/elasticsearch/repositories/RepositoriesStateTests.java
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
index 847db915b8bce..e633e042cbb3b 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
@@ -18,20 +18,30 @@
*/
package org.elasticsearch.cluster.metadata;
+import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.repositories.RepositoryData;
import java.io.IOException;
+import java.util.Objects;
/**
* Metadata about registered repository
*/
public class RepositoryMetaData {
+
+ public static final Version REPO_GEN_IN_CS_VERSION = Version.V_8_0_0;
+
private final String name;
private final String type;
private final Settings settings;
+ private final long generation;
+
+ private final long pendingGeneration;
+
/**
* Constructs new repository metadata
*
@@ -40,9 +50,15 @@ public class RepositoryMetaData {
* @param settings repository settings
*/
public RepositoryMetaData(String name, String type, Settings settings) {
+ this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.UNKNOWN_REPO_GEN);
+ }
+
+ private RepositoryMetaData(String name, String type, Settings settings, long generation, long pendingGeneration) {
this.name = name;
this.type = type;
this.settings = settings;
+ this.generation = generation;
+ this.pendingGeneration = pendingGeneration;
}
/**
@@ -72,11 +88,25 @@ public Settings settings() {
return this.settings;
}
+ public long generation() {
+ return generation;
+ }
+
+ public long pendingGeneration() {
+ return pendingGeneration;
+ }
public RepositoryMetaData(StreamInput in) throws IOException {
name = in.readString();
type = in.readString();
settings = Settings.readSettingsFromStream(in);
+ if (in.getVersion().onOrAfter(REPO_GEN_IN_CS_VERSION)) {
+ generation = in.readLong();
+ pendingGeneration = in.readLong();
+ } else {
+ generation = RepositoryData.UNKNOWN_REPO_GEN;
+ pendingGeneration = RepositoryData.UNKNOWN_REPO_GEN;
+ }
}
/**
@@ -88,6 +118,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(type);
Settings.writeSettingsToStream(settings, out);
+ if (out.getVersion().onOrAfter(REPO_GEN_IN_CS_VERSION)) {
+ out.writeLong(generation);
+ out.writeLong(pendingGeneration);
+ }
}
@Override
@@ -99,15 +133,14 @@ public boolean equals(Object o) {
if (!name.equals(that.name)) return false;
if (!type.equals(that.type)) return false;
+ if (generation != that.generation) return false;
+ if (pendingGeneration != that.pendingGeneration) return false;
return settings.equals(that.settings);
}
@Override
public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + type.hashCode();
- result = 31 * result + settings.hashCode();
- return result;
+ return Objects.hash(name, type, settings, generation, pendingGeneration);
}
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
index 4a440ebefb48c..a084bbab9e986 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
@@ -207,8 +207,6 @@ public ClusterState execute(ClusterState currentState) {
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);
- final RepositoriesState existingStates = RepositoriesState.getOrEmpty(currentState);
- final RepositoriesState.Builder updatedStates = RepositoriesState.builder();
if (repositories != null && repositories.repositories().size() > 0) {
List repositoriesMetaData = new ArrayList<>(repositories.repositories().size());
boolean changed = false;
@@ -217,19 +215,13 @@ public ClusterState execute(ClusterState currentState) {
logger.info("delete repository [{}]", repositoryMetaData.name());
changed = true;
} else {
- final RepositoriesState.State repoState = existingStates.state(repositoryMetaData.name());
- if (repoState != null) {
- updatedStates.putState(
- repositoryMetaData.name(), repoState.generation(), repoState.pendingGeneration());
- }
repositoriesMetaData.add(repositoryMetaData);
}
}
if (changed) {
repositories = new RepositoriesMetaData(repositoriesMetaData);
mdBuilder.putCustom(RepositoriesMetaData.TYPE, repositories);
- return ClusterState.builder(currentState).putCustom(RepositoriesState.TYPE, updatedStates.build())
- .metaData(mdBuilder).build();
+ return ClusterState.builder(currentState).metaData(mdBuilder).build();
}
}
if (Regex.isMatchAllPattern(request.name())) { // we use a wildcard so we don't barf if it's not present.
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
index 126c557dbfce1..99061ab0da452 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
@@ -23,6 +23,7 @@
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
+import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -43,8 +44,6 @@
*/
public final class RepositoriesState extends AbstractNamedDiffable implements ClusterState.Custom {
- public static final Version REPO_GEN_IN_CS_VERSION = Version.V_8_0_0;
-
public static final String TYPE = "repositories_state";
private static final RepositoriesState EMPTY = RepositoriesState.builder().build();
@@ -86,7 +85,7 @@ public static NamedDiff readDiffFrom(StreamInput in) throws
@Override
public Version getMinimalSupportedVersion() {
- return REPO_GEN_IN_CS_VERSION;
+ return RepositoryMetaData.REPO_GEN_IN_CS_VERSION;
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java
index 20dcdc2371805..357268fa051e0 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java
@@ -52,6 +52,12 @@ public final class RepositoryData {
* The generation value indicating the repository has no index generational files.
*/
public static final long EMPTY_REPO_GEN = -1L;
+
+ /**
+ * The generation value indicating that the repository generation is unknown.
+ */
+ public static final long UNKNOWN_REPO_GEN = -2L;
+
/**
* An instance initialized for an empty repository.
*/
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 391601cbdcf42..3666d2147e340 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -42,6 +42,7 @@
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
@@ -300,12 +301,7 @@ public void updateState(ClusterState state) {
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
}
- final RepositoriesState.State repoState = RepositoriesState.getOrEmpty(state).state(metadata.name());
- if (repoState != null) {
- bestGenerationFromCS = Math.max(bestGenerationFromCS, repoState.generation());
- }
-
- final long finalBestGen = bestGenerationFromCS;
+ final long finalBestGen = Math.max(bestGenerationFromCS, getRepoMetaData(state).generation());
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));
}
@@ -1087,7 +1083,7 @@ protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, bo
@Override
public ClusterState execute(ClusterState currentState) {
- final RepositoriesState state = RepositoriesState.getOrEmpty(currentState);
+ final RepositoryMetaData state = RepositoriesState.getOrEmpty(currentState);
final String repoName = metadata.name();
final RepositoriesState.State repoState = Optional.ofNullable(state.state(repoName)).orElseGet(
() -> RepositoriesState.builder().putState(repoName, expectedGen, expectedGen).build().state(repoName));
@@ -1203,6 +1199,13 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
})), listener::onFailure);
}
+ private RepositoryMetaData getRepoMetaData(ClusterState state) {
+ final RepositoryMetaData metaData =
+ state.getMetaData().custom(RepositoriesMetaData.TYPE).repository(metadata.name());
+ assert metaData != null;
+ return metaData;
+ }
+
/**
* Get the latest snapshot index blob id. Snapshot index blobs are named index-N, where N is
* the next version number from when the index blob was written. Each individual index-N blob is
diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesStateTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesStateTests.java
deleted file mode 100644
index b69f82c29ee59..0000000000000
--- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesStateTests.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.repositories;
-
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.test.ESTestCase;
-
-import java.io.IOException;
-
-public class RepositoriesStateTests extends ESTestCase {
-
- public void testEqualsAndHashCode() {
- final RepositoriesState repositoriesState1 = generateRandomRepositoriesState();
- final RepositoriesState repositoriesState2 = RepositoriesState.builder().putAll(repositoriesState1).build();
- assertEquals(repositoriesState1, repositoriesState2);
- assertEquals(repositoriesState1.hashCode(), repositoriesState2.hashCode());
- }
-
- public void testSerializationRoundTrip() throws IOException {
- final RepositoriesState repositoriesState1 = generateRandomRepositoriesState();
- final BytesStreamOutput out = new BytesStreamOutput();
- repositoriesState1.writeTo(out);
- final RepositoriesState repositoriesState2 = new RepositoriesState(out.bytes().streamInput());
- assertEquals(repositoriesState1, repositoriesState2);
- assertEquals(repositoriesState1.hashCode(), repositoriesState2.hashCode());
- }
-
- private static RepositoriesState generateRandomRepositoriesState() {
- final int repoCount = randomIntBetween(0, 10);
- final RepositoriesState.Builder builder = RepositoriesState.builder();
- for (int i = 0; i < repoCount; i++) {
- final String repoName = randomAlphaOfLength(10);
- final long gen = randomLongBetween(RepositoryData.EMPTY_REPO_GEN, Integer.MAX_VALUE);
- builder.putState(repoName, gen, gen + randomLongBetween(0, Integer.MAX_VALUE));
- }
- return builder.build();
- }
-}
From abad89a883687ce5d606444d4ff55932e91c4c47 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 10:02:37 +0100
Subject: [PATCH 09/22] move meta handling to repo meta
---
docs/reference/modules/snapshots.asciidoc | 20 +-
.../elasticsearch/cluster/ClusterModule.java | 2 -
.../metadata/RepositoriesMetaData.java | 70 +++++-
.../cluster/metadata/RepositoryMetaData.java | 50 +++-
.../repositories/RepositoriesService.java | 5 +-
.../repositories/RepositoriesState.java | 230 ------------------
.../blobstore/BlobStoreRepository.java | 71 +++---
.../repositories/blobstore/package-info.java | 10 +-
.../repositories/fs/FsRepository.java | 2 +-
.../BlobStoreRepositoryRestoreTests.java | 2 +-
.../blobstore/BlobStoreRepositoryTests.java | 13 +-
...epositoriesMetaDataSerializationTests.java | 5 +-
...ckEventuallyConsistentRepositoryTests.java | 8 +-
.../blobstore/BlobStoreTestUtil.java | 17 +-
.../snapshots/mockstore/MockRepository.java | 8 +
15 files changed, 208 insertions(+), 305 deletions(-)
delete mode 100644 server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
diff --git a/docs/reference/modules/snapshots.asciidoc b/docs/reference/modules/snapshots.asciidoc
index 666b8b4495fe5..0e1da537cc594 100644
--- a/docs/reference/modules/snapshots.asciidoc
+++ b/docs/reference/modules/snapshots.asciidoc
@@ -122,7 +122,9 @@ which returns:
"type": "fs",
"settings": {
"location": "my_backup_location"
- }
+ },
+ "generation": -2,
+ "pending_generation": -1
}
}
-----------------------------------
@@ -188,7 +190,9 @@ PUT /_snapshot/my_fs_backup
"settings": {
"location": "/mount/backups/my_fs_backup_location",
"compress": true
- }
+ },
+ "generation": -2,
+ "pending_generation": -1
}
-----------------------------------
// TEST[skip:no access to absolute path]
@@ -204,7 +208,9 @@ PUT /_snapshot/my_fs_backup
"settings": {
"location": "my_fs_backup_location",
"compress": true
- }
+ },
+ "generation": -2,
+ "pending_generation": -1
}
-----------------------------------
// TEST[continued]
@@ -281,7 +287,9 @@ PUT _snapshot/my_src_only_repository
"settings": {
"delegate_type": "fs",
"location": "my_backup_location"
- }
+ },
+ "generation": -2,
+ "pending_generation": -1
}
-----------------------------------
// TEST[continued]
@@ -309,7 +317,9 @@ PUT /_snapshot/my_unverified_backup?verify=false
"type": "fs",
"settings": {
"location": "my_unverified_backup_location"
- }
+ },
+ "generation": -2,
+ "pending_generation": -1
}
-----------------------------------
// TEST[continued]
diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
index 9fd92160beee6..e445615e0fc73 100644
--- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
+++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
@@ -70,7 +70,6 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksNodeService;
import org.elasticsearch.plugins.ClusterPlugin;
-import org.elasticsearch.repositories.RepositoriesState;
import org.elasticsearch.script.ScriptMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskResultsService;
@@ -115,7 +114,6 @@ public ClusterModule(Settings settings, ClusterService clusterService, List getNamedWriteables() {
List entries = new ArrayList<>();
// Cluster State
- registerClusterCustom(entries, RepositoriesState.TYPE, RepositoriesState::new, RepositoriesState::readDiffFrom);
registerClusterCustom(entries, SnapshotsInProgress.TYPE, SnapshotsInProgress::new, SnapshotsInProgress::readDiffFrom);
registerClusterCustom(entries, RestoreInProgress.TYPE, RestoreInProgress::new, RestoreInProgress::readDiffFrom);
registerClusterCustom(entries, SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new,
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
index 4f182b6ca381e..33ef3383a7d97 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
@@ -24,12 +24,15 @@
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.repositories.RepositoryData;
import java.io.IOException;
import java.util.ArrayList;
@@ -55,6 +58,30 @@ public RepositoriesMetaData(List repositories) {
this.repositories = Collections.unmodifiableList(repositories);
}
+ /**
+ * Creates a new instance that has the given repository moved to the given {@code safeGeneration} and {@code pendingGeneration}.
+ *
+ * @param repoName repository name
+ * @param safeGeneration new safe generation
+ * @param pendingGeneration new pending generation
+ * @return new instance with updated generations
+ */
+ public RepositoriesMetaData withUpdatedGeneration(String repoName, long safeGeneration, long pendingGeneration) {
+ int indexOfRepo = -1;
+ for (int i = 0; i < repositories.size(); i++) {
+ if (repositories.get(i).name().equals(repoName)) {
+ indexOfRepo = i;
+ break;
+ }
+ }
+ if (indexOfRepo < 0) {
+ throw new IllegalArgumentException("Unknown repository [" + repoName + "]");
+ }
+ final List updatedRepos = new ArrayList<>(repositories);
+ updatedRepos.set(indexOfRepo, new RepositoryMetaData(repositories.get(indexOfRepo), safeGeneration, pendingGeneration));
+ return new RepositoriesMetaData(updatedRepos);
+ }
+
/**
* Returns list of currently registered repositories
*
@@ -87,7 +114,29 @@ public boolean equals(Object o) {
RepositoriesMetaData that = (RepositoriesMetaData) o;
return repositories.equals(that.repositories);
+ }
+ /**
+ * Checks if this instance and the given instance share the same repositories by checking that this instances' repositories and the
+ * repositories in {@code other} are equal or only differ in their values of {@link RepositoryMetaData#generation()} and
+ * {@link RepositoryMetaData#pendingGeneration()}.
+ *
+ * @param other other repositories metadata
+ * @return {@code true} iff both instances contain the same repositories apart from differences in generations
+ */
+ public boolean equalSettings(@Nullable RepositoriesMetaData other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.repositories.size() != repositories.size()) {
+ return false;
+ }
+ for (int i = 0; i < repositories.size(); i++) {
+ if (repositories.get(i).equalSettings(other.repositories.get(i)) == false) {
+ return false;
+ }
+ }
+ return true;
}
@Override
@@ -142,6 +191,8 @@ public static RepositoriesMetaData fromXContent(XContentParser parser) throws IO
}
String type = null;
Settings settings = Settings.EMPTY;
+ long generation = RepositoryData.UNKNOWN_REPO_GEN;
+ long pendingGeneration = RepositoryData.EMPTY_REPO_GEN;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
@@ -155,6 +206,16 @@ public static RepositoriesMetaData fromXContent(XContentParser parser) throws IO
throw new ElasticsearchParseException("failed to parse repository [{}], incompatible params", name);
}
settings = Settings.fromXContent(parser);
+ } else if ("generation".equals(currentFieldName)) {
+ if (parser.nextToken() != XContentParser.Token.VALUE_NUMBER) {
+ throw new ElasticsearchParseException("failed to parse repository [{}], unknown type", name);
+ }
+ generation = parser.longValue();
+ } else if ("pending_generation".equals(currentFieldName)) {
+ if (parser.nextToken() != XContentParser.Token.VALUE_NUMBER) {
+ throw new ElasticsearchParseException("failed to parse repository [{}], unknown type", name);
+ }
+ pendingGeneration = parser.longValue();
} else {
throw new ElasticsearchParseException("failed to parse repository [{}], unknown field [{}]",
name, currentFieldName);
@@ -166,7 +227,7 @@ public static RepositoriesMetaData fromXContent(XContentParser parser) throws IO
if (type == null) {
throw new ElasticsearchParseException("failed to parse repository [{}], missing repository type", name);
}
- repository.add(new RepositoryMetaData(name, type, settings));
+ repository.add(new RepositoryMetaData(name, type, settings, generation, pendingGeneration));
} else {
throw new ElasticsearchParseException("failed to parse repositories");
}
@@ -204,6 +265,13 @@ public static void toXContent(RepositoryMetaData repository, XContentBuilder bui
repository.settings().toXContent(builder, params);
builder.endObject();
+ builder.field("generation", repository.generation());
+ builder.field("pending_generation", repository.pendingGeneration());
builder.endObject();
}
+
+ @Override
+ public String toString() {
+ return Strings.toString(this);
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
index e633e042cbb3b..7b3c982536d0d 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
@@ -38,8 +38,14 @@ public class RepositoryMetaData {
private final String type;
private final Settings settings;
+ /**
+ * Safe repository generation.
+ */
private final long generation;
+ /**
+ * Pending repository generation.
+ */
private final long pendingGeneration;
/**
@@ -50,15 +56,21 @@ public class RepositoryMetaData {
* @param settings repository settings
*/
public RepositoryMetaData(String name, String type, Settings settings) {
- this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.UNKNOWN_REPO_GEN);
+ this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN);
+ }
+
+ public RepositoryMetaData(RepositoryMetaData metaData, long generation, long pendingGeneration) {
+ this(metaData.name, metaData.type, metaData.settings, generation, pendingGeneration);
}
- private RepositoryMetaData(String name, String type, Settings settings, long generation, long pendingGeneration) {
+ public RepositoryMetaData(String name, String type, Settings settings, long generation, long pendingGeneration) {
this.name = name;
this.type = type;
this.settings = settings;
this.generation = generation;
this.pendingGeneration = pendingGeneration;
+ assert generation <= pendingGeneration :
+ "Pending generation [" + pendingGeneration + "] must be greater or equal generation [" + generation + "]";
}
/**
@@ -88,10 +100,26 @@ public Settings settings() {
return this.settings;
}
+ /**
+ * Returns the safe repository generation. {@link RepositoryData} for this generation is assumed to exist in the repository.
+ * All operations on the repository must be based on the {@link RepositoryData} at this generation.
+ * See package level documentation for the blob store based repositories {@link org.elasticsearch.repositories.blobstore} for details
+ * on how hits value is used during snapshots.
+ * @return safe repository generation
+ */
public long generation() {
return generation;
}
+ /**
+ * Returns the pending repository generation. {@link RepositoryData} for this generation and all generations down to the safe
+ * generation {@link #generation} may exist in the repository and should not be reused for writing new {@link RepositoryData} to the
+ * repository.
+ * See package level documentation for the blob store based repositories {@link org.elasticsearch.repositories.blobstore} for details
+ * on how hits value is used during snapshots.
+ *
+ * @return highest pending repository generation
+ */
public long pendingGeneration() {
return pendingGeneration;
}
@@ -105,7 +133,7 @@ public RepositoryMetaData(StreamInput in) throws IOException {
pendingGeneration = in.readLong();
} else {
generation = RepositoryData.UNKNOWN_REPO_GEN;
- pendingGeneration = RepositoryData.UNKNOWN_REPO_GEN;
+ pendingGeneration = RepositoryData.EMPTY_REPO_GEN;
}
}
@@ -124,6 +152,16 @@ public void writeTo(StreamOutput out) throws IOException {
}
}
+ /**
+ * Checks if this instance is equal to the other instance in all fields other than {@link #generation} and {@link #pendingGeneration}.
+ *
+ * @param other other repository metadata
+ * @return {@code true} if both instances equal in all fields but the generation fields
+ */
+ public boolean equalSettings(RepositoryMetaData other) {
+ return name.equals(other.name) && type.equals(other.type()) && settings.equals(other.settings());
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -136,11 +174,15 @@ public boolean equals(Object o) {
if (generation != that.generation) return false;
if (pendingGeneration != that.pendingGeneration) return false;
return settings.equals(that.settings);
-
}
@Override
public int hashCode() {
return Objects.hash(name, type, settings, generation, pendingGeneration);
}
+
+ @Override
+ public String toString() {
+ return "RepositoryMetaData{" + name + "}{" + type + "}{" + settings + "}{" + generation + "}{" + pendingGeneration + "}";
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
index a084bbab9e986..621f5d330da14 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
@@ -150,7 +150,8 @@ public ClusterState execute(ClusterState currentState) {
for (RepositoryMetaData repositoryMetaData : repositories.repositories()) {
if (repositoryMetaData.name().equals(newRepositoryMetaData.name())) {
- if (newRepositoryMetaData.equals(repositoryMetaData)) {
+ if (newRepositoryMetaData.type().equals(repositoryMetaData.type())
+ && newRepositoryMetaData.settings().equals(repositoryMetaData.settings())) {
// Previous version is the same as this one no update is needed.
return currentState;
}
@@ -292,7 +293,7 @@ public void applyClusterState(ClusterChangedEvent event) {
RepositoriesMetaData newMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE);
// Check if repositories got changed
- if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equals(newMetaData))) {
+ if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equalSettings(newMetaData))) {
for (Repository repo : repositories.values()) {
repo.updateState(state);
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
deleted file mode 100644
index 99061ab0da452..0000000000000
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesState.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.repositories;
-
-import org.elasticsearch.Version;
-import org.elasticsearch.cluster.AbstractNamedDiffable;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.NamedDiff;
-import org.elasticsearch.cluster.metadata.RepositoryMetaData;
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.common.xcontent.ToXContent;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * Tracks the current generation at which the {@link RepositoryData} for each writable
- * {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} in the cluster can be found.
- * See documentation for the package {@link org.elasticsearch.repositories.blobstore} for details.
- */
-public final class RepositoriesState extends AbstractNamedDiffable implements ClusterState.Custom {
-
- public static final String TYPE = "repositories_state";
-
- private static final RepositoriesState EMPTY = RepositoriesState.builder().build();
-
- private final Map states;
-
- private RepositoriesState(Map states) {
- this.states = states;
- }
-
- public RepositoriesState(StreamInput in) throws IOException {
- this(in.readMap(StreamInput::readString, State::new));
- }
-
- /**
- * Gets repositories state from the given cluster state or an empty repositories state if none is found in the cluster state.
- *
- * @param state cluster state
- * @return RepositoriesState
- */
- public static RepositoriesState getOrEmpty(ClusterState state) {
- return Objects.requireNonNullElse(state.custom(RepositoriesState.TYPE), RepositoriesState.EMPTY);
- }
-
- /**
- * Get {@link State} for a repository.
- *
- * @param repoName repository name
- * @return State for the repository or {@code null} if none is found
- */
- @Nullable
- public State state(String repoName) {
- return states.get(repoName);
- }
-
- public static NamedDiff readDiffFrom(StreamInput in) throws IOException {
- return readDiffFrom(ClusterState.Custom.class, TYPE, in);
- }
-
- @Override
- public Version getMinimalSupportedVersion() {
- return RepositoryMetaData.REPO_GEN_IN_CS_VERSION;
- }
-
- @Override
- public String getWriteableName() {
- return TYPE;
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeMap(states, StreamOutput::writeString, (o, v) -> v.writeTo(o));
- }
-
- @Override
- public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
- for (Map.Entry stringStateEntry : states.entrySet()) {
- builder.field(stringStateEntry.getKey(), stringStateEntry.getValue());
- }
- return builder;
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (other instanceof RepositoriesState == false) {
- return false;
- }
- final RepositoriesState that = (RepositoriesState) other;
- return states.equals(that.states);
- }
-
- @Override
- public int hashCode() {
- return states.hashCode();
- }
-
- @Override
- public String toString() {
- return Strings.toString(this);
- }
-
- public static final class State implements Writeable, ToXContent {
-
- private final long generation;
-
- private final long pendingGeneration;
-
- private State(long generation, long pendingGeneration) {
- assert generation <= pendingGeneration :
- "Pending generation [" + pendingGeneration + "] smaller than generation [" + generation + "]";
- this.generation = generation;
- this.pendingGeneration = pendingGeneration;
- }
-
- private State(StreamInput in) throws IOException {
- this(in.readLong(), in.readLong());
- }
-
- /**
- * Returns the current repository generation for the repository.
- *
- * @return current repository generation that should be used for reads of the repository's {@link RepositoryData}
- */
- public long generation() {
- return generation;
- }
-
- /**
- * Latest repository generation that a write was attempted for.
- *
- * @return latest repository generation that a write was attempted for
- */
- public long pendingGeneration() {
- return pendingGeneration;
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeLong(generation);
- out.writeLong(pendingGeneration);
- }
-
- @Override
- public String toString() {
- return Strings.toString(this);
- }
-
- @Override
- public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
- return builder.startObject().field("generation", generation).field("pending", pendingGeneration).endObject();
- }
-
- @Override
- public boolean isFragment() {
- return false;
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (other instanceof RepositoriesState.State == false) {
- return false;
- }
- final RepositoriesState.State that = (RepositoriesState.State) other;
- return this.generation == that.generation && this.pendingGeneration == that.pendingGeneration;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(generation, pendingGeneration);
- }
- }
-
- public static final class Builder {
-
- private final Map stateMap = new HashMap<>();
-
- private Builder() {
- }
-
- public Builder putAll(RepositoriesState state) {
- stateMap.putAll(state.states);
- return this;
- }
-
- public Builder putState(String name, long generation, long pending) {
- stateMap.put(name, new State(generation, pending));
- return this;
- }
-
- public RepositoriesState build() {
- return new RepositoriesState(Map.copyOf(stateMap));
- }
- }
-}
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 3666d2147e340..c33c3b936a0d5 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -92,7 +92,6 @@
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.RepositoriesState;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData;
@@ -119,7 +118,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
@@ -145,7 +143,7 @@
public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository {
private static final Logger logger = LogManager.getLogger(BlobStoreRepository.class);
- protected final RepositoryMetaData metadata;
+ protected volatile RepositoryMetaData metadata;
protected final ThreadPool threadPool;
@@ -301,7 +299,8 @@ public void updateState(ClusterState state) {
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
}
- final long finalBestGen = Math.max(bestGenerationFromCS, getRepoMetaData(state).generation());
+ metadata = getRepoMetaData(state);
+ final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation());
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));
}
@@ -1051,10 +1050,10 @@ public boolean isReadOnly() {
/**
* Writing a new index generation is a three step process.
- * First, the {@link RepositoriesState.State} entry for this repository is set into a pending state by incrementing its
+ * First, the {@link RepositoryMetaData} entry for this repository is set into a pending state by incrementing its
* pending generation {@code P} while its safe generation {@code N} remains unchanged.
* Second, the updated {@link RepositoryData} is written to generation {@code P + 1}.
- * Lastly, the {@link RepositoriesState.State} entry for this repository is updated to the new generation {@code P + 1} and thus
+ * Lastly, the {@link RepositoryMetaData} entry for this repository is updated to the new generation {@code P + 1} and thus
* pending and safe generation are set to the same value marking the end of the update of the repository data.
*
* @param repositoryData RepositoryData to write
@@ -1083,27 +1082,32 @@ protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, bo
@Override
public ClusterState execute(ClusterState currentState) {
- final RepositoryMetaData state = RepositoriesState.getOrEmpty(currentState);
+ final RepositoryMetaData meta = getRepoMetaData(currentState);
final String repoName = metadata.name();
- final RepositoriesState.State repoState = Optional.ofNullable(state.state(repoName)).orElseGet(
- () -> RepositoriesState.builder().putState(repoName, expectedGen, expectedGen).build().state(repoName));
- if (repoState.pendingGeneration() != repoState.generation()) {
- logger.info("Trying to write new repository data over unfinished write, repo is in state [{}]", repoState);
+ final long genInState = meta.generation();
+ // TODO: Remove all usages of this variable, instead initialize the generation when loading RepositoryData
+ final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN;
+ if (uninitializedMeta == false && meta.pendingGeneration() != genInState) {
+ logger.info("Trying to write new repository data over unfinished write, repo is in state [{}]", meta);
}
- assert expectedGen == RepositoryData.EMPTY_REPO_GEN || expectedGen == repoState.generation()
- : "Expected non-empty generation [" + expectedGen + "] does not match generation tracked in [" + repoState + "]";
+ assert expectedGen == RepositoryData.EMPTY_REPO_GEN || RepositoryData.UNKNOWN_REPO_GEN == meta.generation()
+ || expectedGen == meta.generation() :
+ "Expected non-empty generation [" + expectedGen + "] does not match generation tracked in [" + meta + "]";
// If we run into the empty repo generation for the expected gen, the repo has been is assumed to have been cleared of
// all contents by an external process so we reset the safe generation to the empty generation.
final long safeGeneration = expectedGen == RepositoryData.EMPTY_REPO_GEN ? RepositoryData.EMPTY_REPO_GEN
- : repoState.generation();
+ : (uninitializedMeta ? expectedGen : genInState);
// Regardless of whether or not the safe generation has been reset, the pending generation always increments so that
// even if a repository has been manually cleared of all contents we will never reuse the same repository generation.
// This is motivated by the consistency behavior the S3 based blob repository implementation has to support which does
// not offer any consistency guarantees when it comes to overwriting the same blob name with different content.
- newGen = repoState.pendingGeneration() + 1;
- return ClusterState.builder(currentState).putCustom(
- RepositoriesState.TYPE,
- RepositoriesState.builder().putAll(state).putState(repoName, safeGeneration, newGen).build()).build();
+ newGen = uninitializedMeta ? expectedGen + 1: metadata.pendingGeneration() + 1;
+ assert newGen > latestKnownRepoGen.get() : "Attempted new generation [" + newGen +
+ "] must be larger than latest known generation [" + latestKnownRepoGen.get() + "]";
+ return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
+ .putCustom(RepositoriesMetaData.TYPE,
+ currentState.metaData().custom(RepositoriesMetaData.TYPE).withUpdatedGeneration(
+ repoName, safeGeneration, newGen)).build()).build();
}
@Override
@@ -1119,8 +1123,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
private boolean assertExpectedGeneration(ClusterState newState) {
- final RepositoriesState repositoriesState = newState.custom(RepositoriesState.TYPE);
- final RepositoriesState.State repoState = repositoriesState.state(metadata.name());
+ final RepositoriesMetaData repositoriesMetaData = newState.metaData().custom(RepositoriesMetaData.TYPE);
+ final RepositoryMetaData repoState = repositoriesMetaData.repository(metadata.name());
assert newGen == repoState.pendingGeneration()
: "State [" + repoState + "] did not contain assumed pending generation [" + newGen + "]";
return true;
@@ -1139,12 +1143,6 @@ private boolean assertExpectedGeneration(ClusterState newState) {
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
- final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
- if (newGen < latestKnownGen) {
- // Don't mess up the index.latest blob
- throw new IllegalStateException(
- "Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]");
- }
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
@@ -1153,26 +1151,27 @@ private boolean assertExpectedGeneration(ClusterState newState) {
}
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
- // Step 3: Update CS to reflect new repository generation.
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
+
+ // Step 3: Update CS to reflect new repository generation.
clusterService.submitStateUpdateTask("set safe repository generation [" + metadata.name() + "][" + newGen + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
- final RepositoriesState state = currentState.custom(RepositoriesState.TYPE);
- final RepositoriesState.State repoState = state.state(metadata.name());
- final long prevGeneration = repoState.generation();
+ final RepositoryMetaData meta = getRepoMetaData(currentState);
+ final long prevGeneration = meta.generation();
if (prevGeneration != expectedGen) {
throw new IllegalStateException("Tried to update repo generation to [" + newGen
- + "] but saw unexpected generation in state [" + repoState + "]");
+ + "] but saw unexpected generation in state [" + meta + "]");
}
- if (repoState.pendingGeneration() == prevGeneration) {
+ if (meta.pendingGeneration() == prevGeneration) {
throw new IllegalStateException(
- "Tried to update non-pending repo state [" + repoState + "] after write to generation [" + newGen + "]");
+ "Tried to update non-pending repo state [" + meta + "] after write to generation [" + newGen + "]");
}
- final RepositoriesState updated =
- RepositoriesState.builder().putAll(state).putState(metadata.name(), newGen, newGen).build();
- return ClusterState.builder(currentState).putCustom(RepositoriesState.TYPE, updated).build();
+ return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
+ .putCustom(RepositoriesMetaData.TYPE,
+ currentState.metaData().custom(RepositoriesMetaData.TYPE).withUpdatedGeneration(
+ metadata.name(), newGen, newGen)).build()).build();
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
index 3f2bda9025369..5e5fe84103250 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
@@ -96,8 +96,8 @@
*
* - The blobstore repository stores the {@code RepositoryData} in blobs named with incrementing suffix {@code N} at {@code /index-N}
* directly under the repository's root.
- * - For each {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} (that is not mounted in read-only mode) an entry
- * of type {@link org.elasticsearch.repositories.RepositoriesState.State} exists in the cluster state. It tracks the current valid
+ *
- For each {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} an entry of type
+ * {@link org.elasticsearch.cluster.metadata.RepositoryMetaData} exists in the cluster state. It tracks the current valid
* generation {@code N} as well as the latest generation that a write was attempted for.
* - The blobstore also stores the most recent {@code N} as a 64bit long in the blob {@code /index.latest} directly under the
* repository's root.
@@ -128,9 +128,9 @@
* to read operations on the repository are as follows and all run on the master node:
*
*
- * - Write an updated value of {@link org.elasticsearch.repositories.RepositoriesState.State} for the repository that has the same
- * {@link org.elasticsearch.repositories.RepositoriesState.State#generation()} as the existing entry and has a value of
- * {@link org.elasticsearch.repositories.RepositoriesState.State#pendingGeneration()} one greater than the {@code pendingGeneration} of the
+ *
- Write an updated value of {@link org.elasticsearch.cluster.metadata.RepositoryMetaData} for the repository that has the same
+ * {@link org.elasticsearch.cluster.metadata.RepositoryMetaData#generation()} as the existing entry and has a value of
+ * {@link org.elasticsearch.cluster.metadata.RepositoryMetaData#pendingGeneration()} one greater than the {@code pendingGeneration} of the
* existing entry.
* - On the same master node, after the cluster state has been updated in the first step, write the new {@code index-N} blob and
* also update the contents of the {@code index.latest} blob. Note that updating the index.latest blob is done on a best effort
diff --git a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java
index efe095eb9b6c2..9d69dea97f020 100644
--- a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java
@@ -103,7 +103,7 @@ public FsRepository(RepositoryMetaData metadata, Environment environment, NamedX
@Override
protected BlobStore createBlobStore() throws Exception {
- final String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
+ final String location = REPOSITORIES_LOCATION_SETTING.get(getMetadata().settings());
final Path locationFile = environment.resolveRepoFile(location);
return new FsBlobStore(environment.settings(), locationFile, isReadOnly());
}
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
index b5d99db0a880f..432091b81e1ec 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
@@ -194,7 +194,7 @@ private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
- BlobStoreTestUtil.mockClusterService()) {
+ BlobStoreTestUtil.mockClusterService(repositoryMetaData)) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
index 9f01ccd982d95..13102182cd7b0 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
@@ -23,7 +23,6 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
@@ -34,7 +33,6 @@
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
-import org.elasticsearch.repositories.RepositoriesState;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
@@ -51,7 +49,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.stream.Collectors;
import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
@@ -140,7 +137,7 @@ public void testRetrieveSnapshots() throws Exception {
public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
final BlobStoreRepository repository = setupRepo();
- final long pendingGeneration = getPendingGeneration(repository);
+ final long pendingGeneration = repository.metadata.pendingGeneration();
// write to and read from a index file with no entries
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().size(), equalTo(0));
final RepositoryData emptyData = RepositoryData.EMPTY;
@@ -166,7 +163,7 @@ public void testIndexGenerationalFiles() throws Exception {
final BlobStoreRepository repository = setupRepo();
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), RepositoryData.EMPTY);
- final long pendingGeneration = getPendingGeneration(repository);
+ final long pendingGeneration = repository.metadata.pendingGeneration();
// write to index generational file
RepositoryData repositoryData = generateRandomRepoData();
@@ -227,12 +224,6 @@ private static void writeIndexGen(BlobStoreRepository repository, RepositoryData
future.actionGet();
}
- private long getPendingGeneration(Repository repository) {
- final ClusterState state = client().admin().cluster().prepareState().get().getState();
- return Optional.ofNullable(RepositoriesState.getOrEmpty(state).state(repository.getMetadata().name()))
- .map(RepositoriesState.State::pendingGeneration).orElse(RepositoryData.EMPTY_REPO_GEN);
- }
-
private BlobStoreRepository setupRepo() {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
diff --git a/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetaDataSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetaDataSerializationTests.java
index 17ae1def2359c..c7c97077fe9b2 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetaDataSerializationTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetaDataSerializationTests.java
@@ -42,7 +42,10 @@ protected Custom createTestInstance() {
int numberOfRepositories = randomInt(10);
List entries = new ArrayList<>();
for (int i = 0; i < numberOfRepositories; i++) {
- entries.add(new RepositoryMetaData(randomAlphaOfLength(10), randomAlphaOfLength(10), randomSettings()));
+ // divide by 2 to not overflow when adding to this number for the pending generation below
+ final long generation = randomNonNegativeLong() / 2L;
+ entries.add(new RepositoryMetaData(randomAlphaOfLength(10), randomAlphaOfLength(10), randomSettings(), generation,
+ generation + randomLongBetween(0, generation)));
}
entries.sort(Comparator.comparing(RepositoryMetaData::name));
return new RepositoriesMetaData(entries);
diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java
index fd04b9297ba74..e4e6d99c6e6f0 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java
@@ -135,10 +135,10 @@ public void testOverwriteShardSnapBlobFails() throws IOException {
public void testOverwriteSnapshotInfoBlob() {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
- final ClusterService clusterService = BlobStoreTestUtil.mockClusterService();
- try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
- new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
- xContentRegistry(), clusterService, blobStoreContext, random())) {
+ final RepositoryMetaData metaData = new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY);
+ final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(metaData);
+ try (BlobStoreRepository repository =
+ new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random())) {
clusterService.addStateApplier(event -> repository.updateState(event.state()));
repository.start();
diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java
index 66c49db542dab..246af6dea6189 100644
--- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java
+++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java
@@ -26,6 +26,9 @@
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
+import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@@ -290,13 +293,23 @@ public static void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath
}
}
+ public static ClusterService mockClusterService() {
+ return mockClusterService(ClusterState.EMPTY_STATE);
+ }
+
+ public static ClusterService mockClusterService(RepositoryMetaData metaData) {
+ return mockClusterService(ClusterState.builder(ClusterState.EMPTY_STATE).metaData(
+ MetaData.builder().putCustom(RepositoriesMetaData.TYPE,
+ new RepositoriesMetaData(Collections.singletonList(metaData))).build()).build());
+ }
+
/**
* Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary
* functionality to make {@link BlobStoreRepository} work.
*
* @return Mock ClusterService
*/
- public static ClusterService mockClusterService() {
+ public static ClusterService mockClusterService(ClusterState initialState) {
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
when(threadPool.generic()).thenReturn(new SameThreadExecutorService());
@@ -305,7 +318,7 @@ public static ClusterService mockClusterService() {
final ClusterService clusterService = mock(ClusterService.class);
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
- final AtomicReference currentState = new AtomicReference<>(ClusterState.EMPTY_STATE);
+ final AtomicReference currentState = new AtomicReference<>(initialState);
when(clusterService.state()).then(invocationOnMock -> currentState.get());
final List appliers = new CopyOnWriteArrayList<>();
doAnswer(invocation -> {
diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
index 218c6f4eecac7..6c05cc625f5cb 100644
--- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
+++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
@@ -100,6 +100,8 @@ public long getFailureCount() {
private final String randomPrefix;
+ private final Environment env;
+
private volatile boolean blockOnControlFiles;
private volatile boolean blockOnDataFiles;
@@ -125,9 +127,15 @@ public MockRepository(RepositoryMetaData metadata, Environment environment,
blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false);
randomPrefix = metadata.settings().get("random", "default");
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
+ env = environment;
logger.info("starting mock repository with random prefix {}", randomPrefix);
}
+ @Override
+ public RepositoryMetaData getMetadata() {
+ return overrideSettings(super.getMetadata(), env);
+ }
+
private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) {
// TODO: use another method of testing not being able to read the test file written by the master...
// this is super duper hacky
From 446873474c383531369560ef694dd1e551a0f586 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 11:02:07 +0100
Subject: [PATCH 10/22] fix test
---
.../elasticsearch/snapshots/SourceOnlySnapshotShardTests.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java
index 03ca8d5cfff28..dba66e0b1b1db 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java
@@ -352,7 +352,8 @@ private Environment createEnvironment() {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
- return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), BlobStoreTestUtil.mockClusterService());
+ return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
+ BlobStoreTestUtil.mockClusterService(repositoryMetaData));
}
private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {
From 7b07f62f924b2297fb9c7943e4578a21812f46fc Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 11:54:38 +0100
Subject: [PATCH 11/22] nicer javadoc
---
.../blobstore/BlobStoreTestUtil.java | 22 +++++++++++++------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java
index 246af6dea6189..12130a1dd330a 100644
--- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java
+++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java
@@ -293,23 +293,31 @@ public static void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath
}
}
+ /**
+ * Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary
+ * functionality to make {@link BlobStoreRepository} work. Initializes the cluster state as {@link ClusterState#EMPTY_STATE}.
+ *
+ * @return Mock ClusterService
+ */
public static ClusterService mockClusterService() {
return mockClusterService(ClusterState.EMPTY_STATE);
}
+ /**
+ * Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary
+ * functionality to make {@link BlobStoreRepository} work. Initializes the cluster state with a {@link RepositoriesMetaData} instance
+ * that contains the given {@code metadata}.
+ *
+ * @param metaData RepositoryMetaData to initialize the cluster state with
+ * @return Mock ClusterService
+ */
public static ClusterService mockClusterService(RepositoryMetaData metaData) {
return mockClusterService(ClusterState.builder(ClusterState.EMPTY_STATE).metaData(
MetaData.builder().putCustom(RepositoriesMetaData.TYPE,
new RepositoriesMetaData(Collections.singletonList(metaData))).build()).build());
}
- /**
- * Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary
- * functionality to make {@link BlobStoreRepository} work.
- *
- * @return Mock ClusterService
- */
- public static ClusterService mockClusterService(ClusterState initialState) {
+ private static ClusterService mockClusterService(ClusterState initialState) {
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
when(threadPool.generic()).thenReturn(new SameThreadExecutorService());
From aa1bc78c187f8826d26d07f478d70a0e054cc5aa Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 15:58:58 +0100
Subject: [PATCH 12/22] CR: comments
---
.../cluster/metadata/RepositoriesMetaData.java | 4 ++--
.../cluster/metadata/RepositoryMetaData.java | 4 ++--
.../elasticsearch/repositories/RepositoriesService.java | 5 ++---
.../repositories/blobstore/BlobStoreRepository.java | 9 +++------
4 files changed, 9 insertions(+), 13 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
index 33ef3383a7d97..6fb4bb92858df 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
@@ -124,7 +124,7 @@ public boolean equals(Object o) {
* @param other other repositories metadata
* @return {@code true} iff both instances contain the same repositories apart from differences in generations
*/
- public boolean equalSettings(@Nullable RepositoriesMetaData other) {
+ public boolean equalsIgnoreGenerations(@Nullable RepositoriesMetaData other) {
if (other == null) {
return false;
}
@@ -132,7 +132,7 @@ public boolean equalSettings(@Nullable RepositoriesMetaData other) {
return false;
}
for (int i = 0; i < repositories.size(); i++) {
- if (repositories.get(i).equalSettings(other.repositories.get(i)) == false) {
+ if (repositories.get(i).equalsIgnoreGenerations(other.repositories.get(i)) == false) {
return false;
}
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
index 7b3c982536d0d..235be92806cf9 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
@@ -70,7 +70,7 @@ public RepositoryMetaData(String name, String type, Settings settings, long gene
this.generation = generation;
this.pendingGeneration = pendingGeneration;
assert generation <= pendingGeneration :
- "Pending generation [" + pendingGeneration + "] must be greater or equal generation [" + generation + "]";
+ "Pending generation [" + pendingGeneration + "] must be greater or equal to generation [" + generation + "]";
}
/**
@@ -158,7 +158,7 @@ public void writeTo(StreamOutput out) throws IOException {
* @param other other repository metadata
* @return {@code true} if both instances equal in all fields but the generation fields
*/
- public boolean equalSettings(RepositoryMetaData other) {
+ public boolean equalsIgnoreGenerations(RepositoryMetaData other) {
return name.equals(other.name) && type.equals(other.type()) && settings.equals(other.settings());
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
index 621f5d330da14..c2796d27aa956 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
@@ -150,8 +150,7 @@ public ClusterState execute(ClusterState currentState) {
for (RepositoryMetaData repositoryMetaData : repositories.repositories()) {
if (repositoryMetaData.name().equals(newRepositoryMetaData.name())) {
- if (newRepositoryMetaData.type().equals(repositoryMetaData.type())
- && newRepositoryMetaData.settings().equals(repositoryMetaData.settings())) {
+ if (newRepositoryMetaData.equalsIgnoreGenerations(repositoryMetaData)) {
// Previous version is the same as this one no update is needed.
return currentState;
}
@@ -293,7 +292,7 @@ public void applyClusterState(ClusterChangedEvent event) {
RepositoriesMetaData newMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE);
// Check if repositories got changed
- if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equalSettings(newMetaData))) {
+ if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equalsIgnoreGenerations(newMetaData))) {
for (Repository repo : repositories.values()) {
repo.updateState(state);
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index c33c3b936a0d5..8a42deeed0c71 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -982,7 +982,6 @@ public void endVerification(String seed) {
// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
// and concurrent modifications.
- // Protected for use in MockEventuallyConsistentRepository
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
@Override
@@ -1123,8 +1122,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
private boolean assertExpectedGeneration(ClusterState newState) {
- final RepositoriesMetaData repositoriesMetaData = newState.metaData().custom(RepositoriesMetaData.TYPE);
- final RepositoryMetaData repoState = repositoriesMetaData.repository(metadata.name());
+ final RepositoryMetaData repoState = getRepoMetaData(newState);
assert newGen == repoState.pendingGeneration()
: "State [" + repoState + "] did not contain assumed pending generation [" + newGen + "]";
return true;
@@ -1159,12 +1157,11 @@ private boolean assertExpectedGeneration(ClusterState newState) {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryMetaData meta = getRepoMetaData(currentState);
- final long prevGeneration = meta.generation();
- if (prevGeneration != expectedGen) {
+ if (meta.generation() != expectedGen) {
throw new IllegalStateException("Tried to update repo generation to [" + newGen
+ "] but saw unexpected generation in state [" + meta + "]");
}
- if (meta.pendingGeneration() == prevGeneration) {
+ if (meta.pendingGeneration() == meta.generation()) {
throw new IllegalStateException(
"Tried to update non-pending repo state [" + meta + "] after write to generation [" + newGen + "]");
}
From 2e8cc1e7904b28402e90a93e592929ef6ef08c3c Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 16:07:12 +0100
Subject: [PATCH 13/22] CR: add breakout
---
.../repositories/blobstore/BlobStoreRepository.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 8a42deeed0c71..552f65078be7c 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -1179,6 +1179,10 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ if (newGen == 0) {
+ // This was the first snapshot, no need to run any deletes since we don't have any old index-N in the repo
+ return;
+ }
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
// delete all now outdated index files
final List oldIndexN = LongStream.range(Math.max(expectedGen, 0), newGen)
From 87bdb8d8e32bcd033893b2a1f148e753b556cc03 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 16:17:40 +0100
Subject: [PATCH 14/22] fix test
---
.../repositories/blobstore/BlobStoreRepository.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 552f65078be7c..ca82915b5ffa7 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -1181,6 +1181,7 @@ public void onFailure(String source, Exception e) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (newGen == 0) {
// This was the first snapshot, no need to run any deletes since we don't have any old index-N in the repo
+ l.onResponse(null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
From cd3071787e2988c20e8677de6ca884f70602206e Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 16:18:56 +0100
Subject: [PATCH 15/22] revert
---
.../repositories/blobstore/BlobStoreRepository.java | 5 -----
1 file changed, 5 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index ca82915b5ffa7..8a42deeed0c71 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -1179,11 +1179,6 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
- if (newGen == 0) {
- // This was the first snapshot, no need to run any deletes since we don't have any old index-N in the repo
- l.onResponse(null);
- return;
- }
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
// delete all now outdated index files
final List oldIndexN = LongStream.range(Math.max(expectedGen, 0), newGen)
From 45a42c1ce9ce3ee801497eea1f87c8e0a47af1b3 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 19:37:11 +0100
Subject: [PATCH 16/22] fix typo
---
.../elasticsearch/cluster/metadata/RepositoryMetaData.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
index 235be92806cf9..c210c32a9a529 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java
@@ -104,7 +104,7 @@ public Settings settings() {
* Returns the safe repository generation. {@link RepositoryData} for this generation is assumed to exist in the repository.
* All operations on the repository must be based on the {@link RepositoryData} at this generation.
* See package level documentation for the blob store based repositories {@link org.elasticsearch.repositories.blobstore} for details
- * on how hits value is used during snapshots.
+ * on how this value is used during snapshots.
* @return safe repository generation
*/
public long generation() {
@@ -116,7 +116,7 @@ public long generation() {
* generation {@link #generation} may exist in the repository and should not be reused for writing new {@link RepositoryData} to the
* repository.
* See package level documentation for the blob store based repositories {@link org.elasticsearch.repositories.blobstore} for details
- * on how hits value is used during snapshots.
+ * on how this value is used during snapshots.
*
* @return highest pending repository generation
*/
From e7969fd682807975524476fe5279b9feaa5fcf11 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 19:55:07 +0100
Subject: [PATCH 17/22] nicer log message
---
.../repositories/blobstore/BlobStoreRepository.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 8a42deeed0c71..d196cda97ad29 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -1087,7 +1087,8 @@ public ClusterState execute(ClusterState currentState) {
// TODO: Remove all usages of this variable, instead initialize the generation when loading RepositoryData
final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN;
if (uninitializedMeta == false && meta.pendingGeneration() != genInState) {
- logger.info("Trying to write new repository data over unfinished write, repo is in state [{}]", meta);
+ logger.info("Trying to write new repository data over unfinished write, repo [{}] is at " +
+ "safe generation [{}] and pending generation [{}]", meta.name(), genInState, meta.pendingGeneration());
}
assert expectedGen == RepositoryData.EMPTY_REPO_GEN || RepositoryData.UNKNOWN_REPO_GEN == meta.generation()
|| expectedGen == meta.generation() :
From f7997afe1554a8b768ef77e35a2f04adabda2124 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 19:55:35 +0100
Subject: [PATCH 18/22] fix typo
---
.../repositories/blobstore/BlobStoreRepository.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index d196cda97ad29..955c74280dc28 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -1093,7 +1093,7 @@ public ClusterState execute(ClusterState currentState) {
assert expectedGen == RepositoryData.EMPTY_REPO_GEN || RepositoryData.UNKNOWN_REPO_GEN == meta.generation()
|| expectedGen == meta.generation() :
"Expected non-empty generation [" + expectedGen + "] does not match generation tracked in [" + meta + "]";
- // If we run into the empty repo generation for the expected gen, the repo has been is assumed to have been cleared of
+ // If we run into the empty repo generation for the expected gen, the repo is assumed to have been cleared of
// all contents by an external process so we reset the safe generation to the empty generation.
final long safeGeneration = expectedGen == RepositoryData.EMPTY_REPO_GEN ? RepositoryData.EMPTY_REPO_GEN
: (uninitializedMeta ? expectedGen : genInState);
From 68746583529ef06b2063e2c20c0c6239b7571ccf Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 20:03:36 +0100
Subject: [PATCH 19/22] smarter check
---
.../repositories/blobstore/BlobStoreRepository.java | 13 +++----------
1 file changed, 3 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 955c74280dc28..0fc81471ff521 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -1118,16 +1118,8 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
- assert assertExpectedGeneration(newState);
setPendingStep.onResponse(newGen);
}
-
- private boolean assertExpectedGeneration(ClusterState newState) {
- final RepositoryMetaData repoState = getRepoMetaData(newState);
- assert newGen == repoState.pendingGeneration()
- : "State [" + repoState + "] did not contain assumed pending generation [" + newGen + "]";
- return true;
- }
});
// Step 2: Write new index-N blob to repository and update index.latest
@@ -1162,9 +1154,10 @@ public ClusterState execute(ClusterState currentState) {
throw new IllegalStateException("Tried to update repo generation to [" + newGen
+ "] but saw unexpected generation in state [" + meta + "]");
}
- if (meta.pendingGeneration() == meta.generation()) {
+ if (meta.pendingGeneration() != newGen) {
throw new IllegalStateException(
- "Tried to update non-pending repo state [" + meta + "] after write to generation [" + newGen + "]");
+ "Tried to update from unexpected pending repo generation [" + meta.pendingGeneration() +
+ "] after write to generation [" + newGen + "]");
}
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
.putCustom(RepositoriesMetaData.TYPE,
From 5a202ba62baa3bf7918be1ce86a58b00e89eabcb Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 3 Dec 2019 22:28:11 +0100
Subject: [PATCH 20/22] CR: Comments
---
.../repositories/blobstore/BlobStoreRepository.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 0fc81471ff521..ff8b349cad631 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -1174,8 +1174,12 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
- // delete all now outdated index files
- final List oldIndexN = LongStream.range(Math.max(expectedGen, 0), newGen)
+ // Delete all now outdated index files up to 1000 blobs back from the new generation.
+ // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
+ // Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep
+ // two index-N blobs around.
+ final List oldIndexN = LongStream.range(
+ Math.max(Math.max(expectedGen - 1, 0), newGen - 1000), newGen)
.mapToObj(gen -> INDEX_FILE_PREFIX + gen)
.collect(Collectors.toList());
try {
From 8a6106361081cc722625969f2bf9b647ed04e3a2 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Wed, 4 Dec 2019 07:57:38 +0100
Subject: [PATCH 21/22] CR: hide generations
---
docs/reference/modules/snapshots.asciidoc | 20 +++++--------------
.../get/GetRepositoriesResponse.java | 3 ++-
.../metadata/RepositoriesMetaData.java | 12 +++++++++--
3 files changed, 17 insertions(+), 18 deletions(-)
diff --git a/docs/reference/modules/snapshots.asciidoc b/docs/reference/modules/snapshots.asciidoc
index 0e1da537cc594..666b8b4495fe5 100644
--- a/docs/reference/modules/snapshots.asciidoc
+++ b/docs/reference/modules/snapshots.asciidoc
@@ -122,9 +122,7 @@ which returns:
"type": "fs",
"settings": {
"location": "my_backup_location"
- },
- "generation": -2,
- "pending_generation": -1
+ }
}
}
-----------------------------------
@@ -190,9 +188,7 @@ PUT /_snapshot/my_fs_backup
"settings": {
"location": "/mount/backups/my_fs_backup_location",
"compress": true
- },
- "generation": -2,
- "pending_generation": -1
+ }
}
-----------------------------------
// TEST[skip:no access to absolute path]
@@ -208,9 +204,7 @@ PUT /_snapshot/my_fs_backup
"settings": {
"location": "my_fs_backup_location",
"compress": true
- },
- "generation": -2,
- "pending_generation": -1
+ }
}
-----------------------------------
// TEST[continued]
@@ -287,9 +281,7 @@ PUT _snapshot/my_src_only_repository
"settings": {
"delegate_type": "fs",
"location": "my_backup_location"
- },
- "generation": -2,
- "pending_generation": -1
+ }
}
-----------------------------------
// TEST[continued]
@@ -317,9 +309,7 @@ PUT /_snapshot/my_unverified_backup?verify=false
"type": "fs",
"settings": {
"location": "my_unverified_backup_location"
- },
- "generation": -2,
- "pending_generation": -1
+ }
}
-----------------------------------
// TEST[continued]
diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java
index 69ade6d8fe0b0..7512e5a4f3b3e 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
@@ -66,7 +67,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
- repositories.toXContent(builder, params);
+ repositories.toXContent(builder, new DelegatingMapParams(Map.of(RepositoriesMetaData.HIDE_GENERATIONS_PARAM, "true"), params));
builder.endObject();
return builder;
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
index 6fb4bb92858df..a28850a9b47ce 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
@@ -47,6 +47,12 @@ public class RepositoriesMetaData extends AbstractNamedDiffable implemen
public static final String TYPE = "repositories";
+ /**
+ * Serialization parameter used to hide the {@link RepositoryMetaData#generation()} and {@link RepositoryMetaData#pendingGeneration()}
+ * in {@link org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse}.
+ */
+ public static final String HIDE_GENERATIONS_PARAM = "hide_generations";
+
private final List repositories;
/**
@@ -265,8 +271,10 @@ public static void toXContent(RepositoryMetaData repository, XContentBuilder bui
repository.settings().toXContent(builder, params);
builder.endObject();
- builder.field("generation", repository.generation());
- builder.field("pending_generation", repository.pendingGeneration());
+ if (params.paramAsBoolean(HIDE_GENERATIONS_PARAM, false) == false) {
+ builder.field("generation", repository.generation());
+ builder.field("pending_generation", repository.pendingGeneration());
+ }
builder.endObject();
}
From c3c758cf1e71b0b55f4ca51bec55661a721c03b6 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Wed, 4 Dec 2019 12:03:35 +0100
Subject: [PATCH 22/22] CR: adjust ex. message
---
.../repositories/blobstore/BlobStoreRepository.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index ff8b349cad631..ba670c2d6f01d 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -1068,7 +1068,7 @@ protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, bo
// repository data
listener.onFailure(new RepositoryException(metadata.name(),
"concurrent modification of the index-N file, expected current generation [" + expectedGen +
- "], actual current generation [" + currentGen + "] - possibly due to simultaneous snapshot deletion requests"));
+ "], actual current generation [" + currentGen + "]"));
return;
}