From 3e6d413dece4042ad3941af2d7d3b98763a8f4df Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 8 Aug 2019 21:43:08 +0100 Subject: [PATCH] Only retain reasonable history for peer recoveries (#45208) Today if a shard is not fully allocated we maintain a retention lease for a lost peer for up to 12 hours, retaining all operations that occur in that time period so that we can recover this replica using an operations-based recovery if it returns. However it is not always reasonable to perform an operations-based recovery on such a replica: if the replica is a very long way behind the rest of the replication group then it can be much quicker to perform a file-based recovery instead. This commit introduces a notion of "reasonable" recoveries. If an operations-based recovery would involve copying only a small number of operations, but the index is large, then an operations-based recovery is reasonable; on the other hand if there are many operations to copy across and the index itself is relatively small then it makes more sense to perform a file-based recovery. We measure the size of the index by computing its number of documents (including deleted documents) in all segments belonging to the current safe commit, and compare this to the number of operations a lease is retaining below the local checkpoint of the safe commit. We consider an operations-based recovery to be reasonable iff it would involve replaying at most 10% of the documents in the index. The mechanism for this feature is to expire peer-recovery retention leases early if they are retaining so much history that an operations-based recovery using that lease would be unreasonable. Relates #41536 --- .../elasticsearch/index/IndexSettings.java | 13 +++ .../index/engine/CombinedDeletionPolicy.java | 46 ++++++-- .../elasticsearch/index/engine/Engine.java | 5 + .../index/engine/InternalEngine.java | 5 + .../index/engine/ReadOnlyEngine.java | 7 ++ .../index/engine/SafeCommitInfo.java | 37 +++++++ .../index/seqno/ReplicationTracker.java | 38 ++++++- .../elasticsearch/index/shard/IndexShard.java | 9 +- .../engine/CombinedDeletionPolicyTests.java | 19 +++- ...PeerRecoveryRetentionLeaseExpiryTests.java | 57 +++++++++- ...ReplicationTrackerRetentionLeaseTests.java | 45 +++++--- .../seqno/ReplicationTrackerTestCase.java | 7 +- .../index/seqno/ReplicationTrackerTests.java | 8 +- .../indices/recovery/IndexRecoveryIT.java | 103 ++++++++++++++++++ .../index/engine/EngineTestCase.java | 3 +- .../test/InternalSettingsPlugin.java | 2 + 16 files changed, 363 insertions(+), 41 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index d4cc38f0b959c..ca8a24ea93d0b 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -301,6 +301,19 @@ public final class IndexSettings { public static final Setting INDEX_SEARCH_THROTTLED = Setting.boolSetting("index.search.throttled", false, Property.IndexScope, Property.PrivateIndex, Property.Dynamic); + /** + * Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an + * operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted + * documents) on the grounds that a file-based peer recovery may copy all of the documents in the shard over to the new peer, but is + * significantly faster than replaying the missing operations on the peer, so once a peer falls far enough behind the primary it makes + * more sense to copy all the data over again instead of replaying history. + * + * Defaults to retaining history for up to 10% of the documents in the shard. This can only be changed in tests, since this setting is + * intentionally unregistered. + */ + public static final Setting FILE_BASED_RECOVERY_THRESHOLD_SETTING + = Setting.doubleSetting("index.recovery.file_based_threshold", 0.1d, 0.0d, Setting.Property.IndexScope); + private final Index index; private final Version version; private final Logger logger; diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 313598e1d8ec7..8166a0d37d429 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; @@ -43,7 +44,7 @@ * In particular, this policy will delete index commits whose max sequence number is at most * the current global checkpoint except the index commit which has the highest max sequence number among those. */ -public final class CombinedDeletionPolicy extends IndexDeletionPolicy { +public class CombinedDeletionPolicy extends IndexDeletionPolicy { private final Logger logger; private final TranslogDeletionPolicy translogDeletionPolicy; private final SoftDeletesPolicy softDeletesPolicy; @@ -51,6 +52,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point + private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY; CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, SoftDeletesPolicy softDeletesPolicy, LongSupplier globalCheckpointSupplier) { @@ -62,7 +64,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { } @Override - public synchronized void onInit(List commits) throws IOException { + public void onInit(List commits) throws IOException { assert commits.isEmpty() == false : "index is opened, but we have no commits"; onCommit(commits); if (safeCommit != commits.get(commits.size() - 1)) { @@ -74,16 +76,32 @@ public synchronized void onInit(List commits) throws IOEx } @Override - public synchronized void onCommit(List commits) throws IOException { - final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong()); - lastCommit = commits.get(commits.size() - 1); - safeCommit = commits.get(keptPosition); - for (int i = 0; i < keptPosition; i++) { - if (snapshottedCommits.containsKey(commits.get(i)) == false) { - deleteCommit(commits.get(i)); + public void onCommit(List commits) throws IOException { + final IndexCommit safeCommit; + synchronized (this) { + final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong()); + this.safeCommitInfo = SafeCommitInfo.EMPTY; + this.lastCommit = commits.get(commits.size() - 1); + this.safeCommit = commits.get(keptPosition); + for (int i = 0; i < keptPosition; i++) { + if (snapshottedCommits.containsKey(commits.get(i)) == false) { + deleteCommit(commits.get(i)); + } } + updateRetentionPolicy(); + safeCommit = this.safeCommit; } - updateRetentionPolicy(); + + assert Thread.holdsLock(this) == false : "should not block concurrent acquire or relesase"; + safeCommitInfo = new SafeCommitInfo(Long.parseLong( + safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), getDocCountOfCommit(safeCommit)); + + // This is protected from concurrent calls by a lock on the IndexWriter, but this assertion makes sure that we notice if that ceases + // to be true in future. It is not disastrous if safeCommitInfo refers to an older safeCommit, it just means that we might retain a + // bit more history and do a few more ops-based recoveries than we would otherwise. + final IndexCommit newSafeCommit = this.safeCommit; + assert safeCommit == newSafeCommit + : "onCommit called concurrently? " + safeCommit.getGeneration() + " vs " + newSafeCommit.getGeneration(); } private void deleteCommit(IndexCommit commit) throws IOException { @@ -109,6 +127,14 @@ private void updateRetentionPolicy() throws IOException { Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))); } + protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException { + return SegmentInfos.readCommit(indexCommit.getDirectory(), indexCommit.getSegmentsFileName()).totalMaxDoc(); + } + + SafeCommitInfo getSafeCommitInfo() { + return safeCommitInfo; + } + /** * Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}. * Index files of the capturing commit point won't be released until the commit reference is closed. diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 7a50d3471a335..f26e5b8ad1ffe 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1122,6 +1122,11 @@ public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyE */ public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException; + /** + * @return a summary of the contents of the current safe commit + */ + public abstract SafeCommitInfo getSafeCommitInfo(); + /** * If the specified throwable contains a fatal error in the throwable graph, such a fatal error will be thrown. Callers should ensure * that there are no catch statements that would catch an error in the stack as the fatal error here should go uncaught and be handled diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5a8662845c482..b83c0a70178a0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2008,6 +2008,11 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException { } } + @Override + public SafeCommitInfo getSafeCommitInfo() { + return combinedDeletionPolicy.getSafeCommitInfo(); + } + private boolean failOnTragicEvent(AlreadyClosedException ex) { final boolean engineFailed; // if we are already closed due to some tragic exception diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 30b3d0221f36d..ded39c51b3712 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -77,6 +77,7 @@ public class ReadOnlyEngine extends Engine { private final Lock indexWriterLock; private final DocsStats docsStats; private final RamAccountingRefreshListener refreshListener; + private final SafeCommitInfo safeCommitInfo; protected volatile TranslogStats translogStats; @@ -120,6 +121,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time"; this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; + this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc()); success = true; } finally { if (success == false) { @@ -420,6 +422,11 @@ public IndexCommitRef acquireSafeIndexCommit() { return acquireLastIndexCommit(false); } + @Override + public SafeCommitInfo getSafeCommitInfo() { + return safeCommitInfo; + } + @Override public void activateThrottling() { } diff --git a/server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java b/server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java new file mode 100644 index 0000000000000..37461177c93cf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import org.elasticsearch.index.seqno.SequenceNumbers; + +/** + * Information about the safe commit, for making decisions about recoveries. + */ +public class SafeCommitInfo { + + public final long localCheckpoint; + public final int docCount; + + public SafeCommitInfo(long localCheckpoint, int docCount) { + this.localCheckpoint = localCheckpoint; + this.docCount = docCount; + } + + public static final SafeCommitInfo EMPTY = new SafeCommitInfo(SequenceNumbers.NO_OPS_PERFORMED, 0); +} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 465abdd0e27b3..1ef7c27c51796 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; @@ -57,6 +58,7 @@ import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -210,6 +212,17 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ private boolean hasAllPeerRecoveryRetentionLeases; + /** + * Supplies information about the current safe commit which may be used to expire peer-recovery retention leases. + */ + private final Supplier safeCommitInfoSupplier; + + /** + * Threshold for expiring peer-recovery retention leases and falling back to file-based recovery. See + * {@link IndexSettings#FILE_BASED_RECOVERY_THRESHOLD_SETTING}. + */ + private final double fileBasedRecoveryThreshold; + /** * Get all retention leases tracked on this shard. * @@ -237,6 +250,8 @@ public synchronized Tuple getRetentionLeases(final boo final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); final Set leaseIdsForCurrentPeers = routingTable.assignedShards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()); + final boolean allShardsStarted = routingTable.allShardsStarted(); + final long minimumReasonableRetainedSeqNo = allShardsStarted ? 0L : getMinimumReasonableRetainedSeqNo(); final Map> partitionByExpiration = retentionLeases .leases() .stream() @@ -245,7 +260,12 @@ public synchronized Tuple getRetentionLeases(final boo if (leaseIdsForCurrentPeers.contains(lease.id())) { return false; } - if (routingTable.allShardsStarted()) { + if (allShardsStarted) { + logger.trace("expiring unused [{}]", lease); + return true; + } + if (lease.retainingSequenceNumber() < minimumReasonableRetainedSeqNo) { + logger.trace("expiring unreasonable [{}] retaining history before [{}]", lease, minimumReasonableRetainedSeqNo); return true; } } @@ -264,6 +284,17 @@ public synchronized Tuple getRetentionLeases(final boo return Tuple.tuple(true, retentionLeases); } + private long getMinimumReasonableRetainedSeqNo() { + final SafeCommitInfo safeCommitInfo = safeCommitInfoSupplier.get(); + return safeCommitInfo.localCheckpoint + 1 - Math.round(Math.ceil(safeCommitInfo.docCount * fileBasedRecoveryThreshold)); + // NB safeCommitInfo.docCount is a very low-level count of the docs in the index, and in particular if this shard contains nested + // docs then safeCommitInfo.docCount counts every child doc separately from the parent doc. However every part of a nested document + // has the same seqno, so we may be overestimating the cost of a file-based recovery when compared to an ops-based recovery and + // therefore preferring ops-based recoveries inappropriately in this case. Correctly accounting for nested docs seems difficult to + // do cheaply, and the circumstances in which this matters should be relatively rare, so we use this naive calculation regardless. + // TODO improve this measure for when nested docs are in use + } + /** * Adds a new retention lease. * @@ -850,7 +881,8 @@ public ReplicationTracker( final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, - final BiConsumer> onSyncRetentionLeases) { + final BiConsumer> onSyncRetentionLeases, + final Supplier safeCommitInfoSupplier) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -867,6 +899,8 @@ public ReplicationTracker( this.routingTable = null; this.replicationGroup = null; this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0); + this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); + this.safeCommitInfoSupplier = safeCommitInfoSupplier; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index dafd379b1927d..225056e2edc31 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -92,6 +92,7 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; @@ -336,7 +337,8 @@ public IndexShard( UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, - (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener)); + (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener), + this::getSafeCommitInfo); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis @@ -2612,6 +2614,11 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener> delegates = new CopyOnWriteArrayList<>(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 110a27ff5510f..4e82a77ce43a7 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -55,7 +55,7 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, NO_OPS_PERFORMED, extraRetainedOps, () -> RetentionLeases.EMPTY); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final LongArrayList maxSeqNoList = new LongArrayList(); final LongArrayList translogGenList = new LongArrayList(); @@ -102,7 +102,7 @@ public void testAcquireIndexCommit() throws Exception { new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); long lastTranslogGen = between(1, 20); @@ -182,7 +182,7 @@ public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final int invalidCommits = between(1, 10); final List commitList = new ArrayList<>(); @@ -217,7 +217,7 @@ public void testCheckUnreferencedCommits() throws Exception { final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); @@ -254,6 +254,17 @@ public void testCheckUnreferencedCommits() throws Exception { } } + private CombinedDeletionPolicy newCombinedDeletionPolicy(TranslogDeletionPolicy translogPolicy, SoftDeletesPolicy softDeletesPolicy, + AtomicLong globalCheckpoint) { + return new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get) + { + @Override + protected int getDocCountOfCommit(IndexCommit indexCommit) { + return between(0, 1000); + } + }; + } + IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { final Map userData = new HashMap<>(); userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java index 22d4f5e86f964..fe2d8f27aa308 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.IndexSettingsModule; import org.junit.Before; @@ -37,6 +38,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -48,6 +50,7 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes private ReplicationTracker replicationTracker; private AtomicLong currentTimeMillis; private Settings settings; + private SafeCommitInfo safeCommitInfo; @Before public void setUpReplicationTracker() throws InterruptedException { @@ -63,6 +66,8 @@ public void setUpReplicationTracker() throws InterruptedException { settings = Settings.EMPTY; } + safeCommitInfo = null; // must be set in each test + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); replicationTracker = new ReplicationTracker( new ShardId("test", "_na", 0), @@ -72,7 +77,8 @@ public void setUpReplicationTracker() throws InterruptedException { UNASSIGNED_SEQ_NO, value -> { }, currentTimeMillis::get, - (leases, listener) -> { }); + (leases, listener) -> { }, + () -> safeCommitInfo); replicationTracker.updateFromMaster(1L, Collections.singleton(primaryAllocationId.getId()), routingTable(Collections.emptySet(), primaryAllocationId)); replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); @@ -109,6 +115,7 @@ public void testPeerRecoveryRetentionLeasesForAssignedCopiesDoNotEverExpire() { } currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get())); + safeCommitInfo = randomSafeCommitInfo(); final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertFalse(retentionLeases.v1()); @@ -121,11 +128,14 @@ public void testPeerRecoveryRetentionLeasesForAssignedCopiesDoNotEverExpire() { public void testPeerRecoveryRetentionLeasesForUnassignedCopiesDoNotExpireImmediatelyIfShardsNotAllStarted() { final String unknownNodeId = randomAlphaOfLength(10); - replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + final long globalCheckpoint = randomNonNegativeLong(); // not NO_OPS_PERFORMED since this always results in file-based recovery + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER); currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis())); + safeCommitInfo = randomSafeCommitInfoSuitableForOpsBasedRecovery(globalCheckpoint); + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertFalse("should not have expired anything", retentionLeases.v1()); @@ -142,12 +152,15 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireEventually() } final String unknownNodeId = randomAlphaOfLength(10); - replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + final long globalCheckpoint = randomCheckpoint(); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER); currentTimeMillis.set(randomLongBetween( currentTimeMillis.get() + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis() + 1, Long.MAX_VALUE)); + safeCommitInfo = randomSafeCommitInfoSuitableForOpsBasedRecovery(globalCheckpoint); + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertTrue("should have expired something", retentionLeases.v1()); @@ -167,6 +180,7 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireImmediatelyI (usually() ? randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis()) : randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get()))); + safeCommitInfo = randomSafeCommitInfo(); final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertTrue(retentionLeases.v1()); @@ -176,4 +190,41 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireImmediatelyI assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); } + + public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireIfRetainingTooMuchHistory() { + if (randomBoolean()) { + startReplica(); + } + + final String unknownNodeId = randomAlphaOfLength(10); + final long globalCheckpoint = randomValueOtherThan(SequenceNumbers.NO_OPS_PERFORMED, this::randomCheckpoint); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER); + + safeCommitInfo = randomSafeCommitInfoSuitableForFileBasedRecovery(globalCheckpoint); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertTrue("should have expired something", retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(2)); + assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); + } + + private SafeCommitInfo randomSafeCommitInfo() { + return randomBoolean() ? SafeCommitInfo.EMPTY : new SafeCommitInfo( + randomFrom(randomNonNegativeLong(), (long) randomIntBetween(0, Integer.MAX_VALUE)), + randomIntBetween(0, Integer.MAX_VALUE)); + } + + private SafeCommitInfo randomSafeCommitInfoSuitableForOpsBasedRecovery(long globalCheckpoint) { + // simulate a safe commit that is behind the given global checkpoint, so that no files need to be transferrred + final long localCheckpoint = randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint); + return new SafeCommitInfo(localCheckpoint, between(0, Math.toIntExact(Math.min(localCheckpoint + 1, Integer.MAX_VALUE)))); + } + + private SafeCommitInfo randomSafeCommitInfoSuitableForFileBasedRecovery(long globalCheckpoint) { + // simulate a later safe commit containing no documents, which is always better to transfer than any ops + return new SafeCommitInfo(randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE), 0); + } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 7611fad5a7e43..bdf7acf478b91 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -70,7 +70,8 @@ public void testAddOrRenewRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -111,7 +112,8 @@ public void testAddDuplicateRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -139,7 +141,8 @@ public void testRenewNotFoundRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -174,7 +177,8 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { .stream() .collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)), equalTo(retainingSequenceNumbers)); - }); + }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); reference.set(replicationTracker); replicationTracker.updateFromMaster( randomNonNegativeLong(), @@ -210,7 +214,8 @@ public void testRemoveRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -266,7 +271,8 @@ public void testCloneRetentionLease() { assertFalse(Thread.holdsLock(replicationTrackerRef.get())); assertTrue(synced.compareAndSet(false, true)); listener.onResponse(new ReplicationResponse()); - }); + }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTrackerRef.set(replicationTracker); replicationTracker.updateFromMaster( randomNonNegativeLong(), @@ -309,7 +315,8 @@ public void testCloneNonexistentRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> { }); + (leases, listener) -> { }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -331,7 +338,8 @@ public void testCloneDuplicateRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> { }); + (leases, listener) -> { }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -357,7 +365,8 @@ public void testRemoveNotFound() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -392,7 +401,8 @@ public void testRemoveRetentionLeaseCausesRetentionLeaseSync() { .stream() .collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)), equalTo(retainingSequenceNumbers)); - }); + }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); reference.set(replicationTracker); replicationTracker.updateFromMaster( randomNonNegativeLong(), @@ -445,7 +455,8 @@ private void runExpirationTest(final boolean primaryMode) { UNASSIGNED_SEQ_NO, value -> {}, currentTimeMillis::get, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -519,7 +530,8 @@ public void testReplicaIgnoresOlderRetentionLeasesVersion() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -572,7 +584,8 @@ public void testLoadAndPersistRetentionLeases() throws IOException { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -605,7 +618,8 @@ public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -653,7 +667,8 @@ public void testPersistRetentionLeasesUnderConcurrency() throws IOException { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java index 5f035a3604f41..cc32d5198c8b0 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -32,6 +33,7 @@ import java.util.Set; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -49,9 +51,12 @@ ReplicationTracker newTracker( UNASSIGNED_SEQ_NO, updatedGlobalCheckpoint, currentTimeMillisSupplier, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); } + static final Supplier OPS_BASED_RECOVERY_ALWAYS_REASONABLE = () -> SafeCommitInfo.EMPTY; + static String nodeIdFromAllocationId(final AllocationId allocationId) { return "n-" + allocationId.getId().substring(0, 8); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index afbd560758cf1..e7d68baf26599 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -694,10 +694,10 @@ public void testPrimaryContextHandoff() throws IOException { final long globalCheckpoint = UNASSIGNED_SEQ_NO; final BiConsumer> onNewRetentionLease = (leases, listener) -> {}; - ReplicationTracker oldPrimary = new ReplicationTracker( - shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); - ReplicationTracker newPrimary = new ReplicationTracker( - shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); + ReplicationTracker oldPrimary = new ReplicationTracker(shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, + onUpdate, () -> 0L, onNewRetentionLease, OPS_BASED_RECOVERY_ALWAYS_REASONABLE); + ReplicationTracker newPrimary = new ReplicationTracker(shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, + onUpdate, () -> 0L, onNewRetentionLease, OPS_BASED_RECOVERY_ALWAYS_REASONABLE); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index ad791702ffe89..77d47d6d24191 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -1206,6 +1206,109 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); } + public void testUsesFileBasedRecoveryIfOperationsBasedRecoveryWouldBeUnreasonable() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + + String indexName = "test-index"; + final Settings.Builder settings = Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "12h") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"); + + final double reasonableOperationsBasedRecoveryProportion; + if (randomBoolean()) { + reasonableOperationsBasedRecoveryProportion = randomDoubleBetween(0.05, 0.99, true); + settings.put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), + reasonableOperationsBasedRecoveryProportion); + } else { + reasonableOperationsBasedRecoveryProportion + = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(Settings.EMPTY); + } + logger.info("--> performing ops-based recoveries up to [{}%] of docs", reasonableOperationsBasedRecoveryProportion * 100.0); + + createIndex(indexName, settings.build()); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + flush(indexName); + // wait for all history to be discarded + assertBusy(() -> { + for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) { + final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); + assertTrue(shardStats.getRetentionLeaseStats().retentionLeases() + " should discard history up to " + maxSeqNo, + shardStats.getRetentionLeaseStats().retentionLeases().leases().stream().allMatch( + l -> l.retainingSequenceNumber() == maxSeqNo + 1)); + } + }); + flush(indexName); // ensure that all operations are in the safe commit + + final ShardStats shardStats = client().admin().indices().prepareStats(indexName).get().getShards()[0]; + final long docCount = shardStats.getStats().docs.getCount(); + assertThat(shardStats.getStats().docs.getDeleted(), equalTo(0L)); + assertThat(shardStats.getSeqNoStats().getMaxSeqNo() + 1, equalTo(docCount)); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final DiscoveryNodes discoveryNodes = clusterService().state().nodes(); + final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId); + + final ShardRouting replicaShardRouting = indexShardRoutingTable.replicaShards().get(0); + assertTrue("should have lease for " + replicaShardRouting, + client().admin().indices().prepareStats(indexName).get().getShards()[0].getRetentionLeaseStats() + .retentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting))); + internalCluster().restartNode(discoveryNodes.get(replicaShardRouting.currentNodeId()).getName(), + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertFalse(client().admin().cluster().prepareHealth() + .setWaitForNodes(Integer.toString(discoveryNodes.getSize() - 1)) + .setWaitForEvents(Priority.LANGUID).get().isTimedOut()); + + final int newDocCount = Math.toIntExact(Math.round(Math.ceil( + (1 + Math.ceil(docCount * reasonableOperationsBasedRecoveryProportion)) + / (1 - reasonableOperationsBasedRecoveryProportion)))); + + /* + * newDocCount >= (ceil(docCount * p) + 1) / (1-p) + * + * ==> 0 <= newDocCount * (1-p) - ceil(docCount * p) - 1 + * = newDocCount - (newDocCount * p + ceil(docCount * p) + 1) + * < newDocCount - (ceil(newDocCount * p) + ceil(docCount * p)) + * <= newDocCount - ceil(newDocCount * p + docCount * p) + * + * ==> docCount < newDocCount + docCount - ceil((newDocCount + docCount) * p) + * == localCheckpoint + 1 - ceil((newDocCount + docCount) * p) + * == firstReasonableSeqNo + * + * The replica has docCount docs, i.e. has operations with seqnos [0..docCount-1], so a seqno-based recovery will start + * from docCount < firstReasonableSeqNo + * + * ==> it is unreasonable to recover the replica using a seqno-based recovery + */ + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, newDocCount) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + flush(indexName); + + assertBusy(() -> assertFalse("should no longer have lease for " + replicaShardRouting, + client().admin().indices().prepareStats(indexName).get().getShards()[0].getRetentionLeaseStats() + .retentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting)))); + + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(indexName); + + //noinspection OptionalGetWithoutIsPresent because it fails the test if absent + final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get() + .shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get(); + assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); + } + public void testDoesNotCopyOperationsInSafeCommit() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 2c54189e20c16..9e1cc211008fc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -674,7 +674,8 @@ public EngineConfig config( SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L, - (leases, listener) -> listener.onResponse(new ReplicationResponse())); + (leases, listener) -> listener.onResponse(new ReplicationResponse()), + () -> SafeCommitInfo.EMPTY); globalCheckpointSupplier = replicationTracker; retentionLeasesSupplier = replicationTracker::getRetentionLeases; } else { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java index fdb623d1d1e91..246dac18ef8a2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; import java.util.Arrays; @@ -51,6 +52,7 @@ public List> getSettings() { TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING, IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING, + IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING, IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING ); }