diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 1a67eb55e0576..a33b791a666b0 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -24,6 +24,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -217,10 +218,22 @@ public synchronized Tuple getRetentionLeases(final boo // the primary calculates the non-expired retention leases and syncs them to replicas final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); + final Set leaseIdsForCurrentPeers + = routingTable.assignedShards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()); final Map> partitionByExpiration = retentionLeases .leases() .stream() - .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis)); + .collect(Collectors.groupingBy(lease -> { + if (lease.source().equals(PEER_RECOVERY_RETENTION_LEASE_SOURCE)) { + if (leaseIdsForCurrentPeers.contains(lease.id())) { + return false; + } + if (routingTable.allShardsStarted()) { + return true; + } + } + return currentTimeMillis - lease.timestamp() > retentionLeaseMillis; + })); final Collection expiredLeases = partitionByExpiration.get(true); if (expiredLeases == null) { // early out as no retention leases have expired @@ -242,7 +255,7 @@ public synchronized Tuple getRetentionLeases(final boo * @param source the source of the retention lease * @param listener the callback when the retention lease is successfully added and synced to replicas * @return the new retention lease - * @throws IllegalArgumentException if the specified retention lease already exists + * @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists */ public RetentionLease addRetentionLease( final String id, @@ -253,22 +266,38 @@ public RetentionLease addRetentionLease( final RetentionLease retentionLease; final RetentionLeases currentRetentionLeases; synchronized (this) { - assert primaryMode; - if (retentionLeases.contains(id)) { - throw new RetentionLeaseAlreadyExistsException(id); - } - retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source); - logger.debug("adding new retention lease [{}] to current retention leases [{}]", retentionLease, retentionLeases); - retentionLeases = new RetentionLeases( - operationPrimaryTerm, - retentionLeases.version() + 1, - Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())); + retentionLease = innerAddRetentionLease(id, retainingSequenceNumber, source); currentRetentionLeases = retentionLeases; } onSyncRetentionLeases.accept(currentRetentionLeases, listener); return retentionLease; } + /** + * Adds a new retention lease, but does not synchronise it with the rest of the replication group. + * + * @param id the identifier of the retention lease + * @param retainingSequenceNumber the retaining sequence number + * @param source the source of the retention lease + * @return the new retention lease + * @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists + */ + private RetentionLease innerAddRetentionLease(String id, long retainingSequenceNumber, String source) { + assert Thread.holdsLock(this); + assert primaryMode : id + "/" + retainingSequenceNumber + "/" + source; + if (retentionLeases.contains(id)) { + throw new RetentionLeaseAlreadyExistsException(id); + } + final RetentionLease retentionLease + = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source); + logger.debug("adding new retention lease [{}] to current retention leases [{}]", retentionLease, retentionLeases); + retentionLeases = new RetentionLeases( + operationPrimaryTerm, + retentionLeases.version() + 1, + Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())); + return retentionLease; + } + /** * Renews an existing retention lease. * @@ -276,7 +305,7 @@ public RetentionLease addRetentionLease( * @param retainingSequenceNumber the retaining sequence number * @param source the source of the retention lease * @return the renewed retention lease - * @throws IllegalArgumentException if the specified retention lease does not exist + * @throws RetentionLeaseNotFoundException if the specified retention lease does not exist */ public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) { assert primaryMode; @@ -390,6 +419,45 @@ public boolean assertRetentionLeasesPersisted(final Path path) throws IOExceptio return true; } + + /** + * Retention leases for peer recovery have source {@link ReplicationTracker#PEER_RECOVERY_RETENTION_LEASE_SOURCE}, a lease ID + * containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations + * with sequence numbers strictly greater than the given global checkpoint. + */ + public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener listener) { + addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener); + } + + /** + * Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}. + */ + public static final String PEER_RECOVERY_RETENTION_LEASE_SOURCE = "peer recovery"; + + /** + * Id for a peer recovery retention lease for the given node. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}. + */ + static String getPeerRecoveryRetentionLeaseId(String nodeId) { + return "peer_recovery/" + nodeId; + } + + /** + * Id for a peer recovery retention lease for the given {@link ShardRouting}. + * See {@link ReplicationTracker#addPeerRecoveryRetentionLease}. + */ + public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting) { + return getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId()); + } + + /** + * Renew the peer-recovery retention lease for the given shard, advancing the retained sequence number to discard operations up to the + * given global checkpoint. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}. + */ + public void renewPeerRecoveryRetentionLease(ShardRouting shardRouting, long globalCheckpoint) { + assert primaryMode; + renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } + public static class CheckpointState implements Writeable { /** @@ -616,6 +684,22 @@ private boolean invariant() { assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked"; } + if (primaryMode + && indexSettings.isSoftDeleteEnabled() + && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN + && indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)) { + // all tracked shard copies have a corresponding peer-recovery retention lease + for (final ShardRouting shardRouting : routingTable.assignedShards()) { + assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false + || retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : + "no retention lease for tracked shard " + shardRouting + " in " + retentionLeases; + assert shardRouting.relocating() == false + || checkpoints.get(shardRouting.allocationId().getRelocationId()).tracked == false + || retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting.getTargetRelocatingShard())) + : "no retention lease for relocation target " + shardRouting + " in " + retentionLeases; + } + } + return true; } @@ -669,6 +753,7 @@ public ReplicationTracker( this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; + assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } @@ -772,6 +857,31 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) { primaryMode = true; updateLocalCheckpoint(shardAllocationId, checkpoints.get(shardAllocationId), localCheckpoint); updateGlobalCheckpointOnPrimary(); + + if (indexSettings.isSoftDeleteEnabled()) { + final ShardRouting primaryShard = routingTable.primaryShard(); + final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); + if (retentionLeases.get(leaseId) == null) { + /* + * We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention + * leases for every shard copy, but in this case we do not expect any leases to exist. + */ + if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)) { + // We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then + // this copy must already be in-sync and active and therefore holds a retention lease for itself. + assert routingTable.activeShards().equals(Collections.singletonList(primaryShard)) : routingTable.activeShards(); + assert primaryShard.allocationId().getId().equals(shardAllocationId) + : routingTable.activeShards() + " vs " + shardAllocationId; + assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)); + + // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication + // group. + innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), + PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } + } + } + assert invariant(); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index 9cfad7c36ea06..ec67448341fa3 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -211,4 +211,7 @@ public String toString() { '}'; } + public boolean isNotPeerRecoveryRetentionLease() { + return ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(source) == false; + } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java index 927d2ec499960..7de6bad3f1102 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java @@ -44,7 +44,7 @@ public interface RetentionLeaseSyncer { RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() { @Override public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener listener) { - + listener.onResponse(new ReplicationResponse()); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java index 81fd7e2fce047..8148d12fbb8aa 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java @@ -275,13 +275,20 @@ private static Map toMap(final Collection toMap(final RetentionLeases retentionLeases) { - return retentionLeases.leases; + public static Map toMapExcludingPeerRecoveryRetentionLeases(final RetentionLeases retentionLeases) { + return retentionLeases.leases.values().stream() + .filter(l -> ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(l.source()) == false) + .collect(Collectors.toMap(RetentionLease::id, Function.identity(), + (o1, o2) -> { + throw new AssertionError("unexpectedly merging " + o1 + " and " + o2); + }, + LinkedHashMap::new)); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fdd95614756b7..4d7c2e880cef6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2415,6 +2415,20 @@ public boolean isRelocatedPrimary() { return replicationTracker.isRelocated(); } + public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener listener) { + assert assertPrimaryMode(); + replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); + } + + /** + * Test-only method to advance the primary's peer-recovery retention lease so that operations up to the global checkpoint can be + * discarded. TODO Remove this when retention leases are advanced by other mechanisms. + */ + public void advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint() { + assert assertPrimaryMode(); + replicationTracker.renewPeerRecoveryRetentionLease(routingEntry(), getGlobalCheckpoint()); + } + class ShardEventListener implements Engine.EventListener { private final CopyOnWriteArrayList> delegates = new CopyOnWriteArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4b89e75691a76..0bde05d536a06 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -32,6 +32,8 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedSupplier; @@ -49,6 +51,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -188,10 +191,28 @@ public void recoverToTarget(ActionListener listener) { } assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; + final StepListener establishRetentionLeaseStep = new StepListener<>(); + if (shard.indexSettings().isSoftDeleteEnabled() + && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { + runUnderPrimaryPermit(() -> { + try { + // blindly create the lease. TODO integrate this with the recovery process + shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), startingSeqNo - 1, establishRetentionLeaseStep); + } catch (RetentionLeaseAlreadyExistsException e) { + logger.debug("peer-recovery retention lease already exists", e); + establishRetentionLeaseStep.onResponse(null); + } + }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger); + } else { + establishRetentionLeaseStep.onResponse(null); + } + final StepListener prepareEngineStep = new StepListener<>(); - // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); + establishRetentionLeaseStep.whenComplete(r -> { + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); + }, onFailure); final StepListener sendSnapshotStep = new StepListener<>(); prepareEngineStep.whenComplete(prepareEngineTime -> { /* diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f4e1ecd2514b3..00438868cdb4f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -74,6 +74,7 @@ import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -2324,7 +2325,7 @@ public void testIndexWriterInfoStream() throws IllegalAccessException, IOExcepti } } - public void testSeqNoAndCheckpoints() throws IOException { + public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { final int opCount = randomIntBetween(1, 256); long primarySeqNo = SequenceNumbers.NO_OPS_PERFORMED; final String[] ids = new String[]{"1", "2", "3"}; @@ -2342,13 +2343,27 @@ public void testSeqNoAndCheckpoints() throws IOException { final ShardRouting primary = TestShardRouting.newShardRouting("test", shardId.id(), "node1", null, true, ShardRoutingState.STARTED, allocationId); - final ShardRouting replica = - TestShardRouting.newShardRouting(shardId, "node2", false, ShardRoutingState.STARTED); + final ShardRouting initializingReplica = + TestShardRouting.newShardRouting(shardId, "node2", false, ShardRoutingState.INITIALIZING); + ReplicationTracker gcpTracker = (ReplicationTracker) initialEngine.config().getGlobalCheckpointSupplier(); - gcpTracker.updateFromMaster(1L, new HashSet<>(Arrays.asList(primary.allocationId().getId(), - replica.allocationId().getId())), - new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(replica).build()); + gcpTracker.updateFromMaster(1L, new HashSet<>(Collections.singletonList(primary.allocationId().getId())), + new IndexShardRoutingTable.Builder(shardId).addShard(primary).build()); gcpTracker.activatePrimaryMode(primarySeqNo); + if (defaultSettings.isSoftDeleteEnabled()) { + final CountDownLatch countDownLatch = new CountDownLatch(1); + gcpTracker.addPeerRecoveryRetentionLease(initializingReplica.currentNodeId(), + SequenceNumbers.NO_OPS_PERFORMED, ActionListener.wrap(countDownLatch::countDown)); + countDownLatch.await(); + } + gcpTracker.updateFromMaster(2L, new HashSet<>(Collections.singletonList(primary.allocationId().getId())), + new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(initializingReplica).build()); + gcpTracker.initiateTracking(initializingReplica.allocationId().getId()); + gcpTracker.markAllocationIdAsInSync(initializingReplica.allocationId().getId(), replicaLocalCheckpoint); + final ShardRouting replica = initializingReplica.moveToStarted(); + gcpTracker.updateFromMaster(3L, new HashSet<>(Arrays.asList(primary.allocationId().getId(), replica.allocationId().getId())), + new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(replica).build()); + for (int op = 0; op < opCount; op++) { final String id; // mostly index, sometimes delete diff --git a/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java index ce3986f0a2517..bc71fcc63510d 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java @@ -61,9 +61,10 @@ public void testSimpleSyncRetentionLeases() throws Exception { } } RetentionLeases leasesOnPrimary = group.getPrimary().getRetentionLeases(); - assertThat(leasesOnPrimary.version(), equalTo((long) iterations)); + assertThat(leasesOnPrimary.version(), equalTo(iterations + group.getReplicas().size() + 1L)); assertThat(leasesOnPrimary.primaryTerm(), equalTo(group.getPrimary().getOperationPrimaryTerm())); - assertThat(leasesOnPrimary.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0]))); + assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(leasesOnPrimary).values(), + containsInAnyOrder(leases.toArray(new RetentionLease[0]))); latch.await(); for (IndexShard replica : group.getReplicas()) { assertThat(replica.getRetentionLeases(), equalTo(leasesOnPrimary)); @@ -109,6 +110,9 @@ protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, Acti } }) { group.startAll(); + for (IndexShard replica : group.getReplicas()) { + replica.updateRetentionLeasesOnReplica(group.getPrimary().getRetentionLeases()); + } int numLeases = between(1, 100); IndexShard newPrimary = randomFrom(group.getReplicas()); RetentionLeases latestRetentionLeasesOnNewPrimary = RetentionLeases.EMPTY; diff --git a/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java new file mode 100644 index 0000000000000..22d4f5e86f964 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.IndexSettingsModule; +import org.junit.Before; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTestCase { + + private static final ActionListener EMPTY_LISTENER = ActionListener.wrap(() -> { }); + + private ReplicationTracker replicationTracker; + private AtomicLong currentTimeMillis; + private Settings settings; + + @Before + public void setUpReplicationTracker() throws InterruptedException { + final AllocationId primaryAllocationId = AllocationId.newInitializing(); + currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024)); + + if (randomBoolean()) { + settings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), + TimeValue.timeValueMillis(randomLongBetween(1, TimeValue.timeValueHours(12).millis()))) + .build(); + } else { + settings = Settings.EMPTY; + } + + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + primaryAllocationId.getId(), + IndexSettingsModule.newIndexSettings("test", settings), + primaryTerm, + UNASSIGNED_SEQ_NO, + value -> { }, + currentTimeMillis::get, + (leases, listener) -> { }); + replicationTracker.updateFromMaster(1L, Collections.singleton(primaryAllocationId.getId()), + routingTable(Collections.emptySet(), primaryAllocationId)); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + final AllocationId replicaAllocationId = AllocationId.newInitializing(); + final IndexShardRoutingTable routingTableWithReplica + = routingTable(Collections.singleton(replicaAllocationId), primaryAllocationId); + replicationTracker.updateFromMaster(2L, Collections.singleton(primaryAllocationId.getId()), routingTableWithReplica); + replicationTracker.addPeerRecoveryRetentionLease( + routingTableWithReplica.getByAllocationId(replicaAllocationId.getId()).currentNodeId(), randomCheckpoint(), + EMPTY_LISTENER); + + replicationTracker.initiateTracking(replicaAllocationId.getId()); + replicationTracker.markAllocationIdAsInSync(replicaAllocationId.getId(), randomCheckpoint()); + } + + private long randomCheckpoint() { + return randomBoolean() ? SequenceNumbers.NO_OPS_PERFORMED : randomNonNegativeLong(); + } + + private void startReplica() { + final ShardRouting replicaShardRouting = replicationTracker.routingTable.replicaShards().get(0); + final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(replicationTracker.routingTable); + builder.removeShard(replicaShardRouting); + builder.addShard(replicaShardRouting.moveToStarted()); + replicationTracker.updateFromMaster(replicationTracker.appliedClusterStateVersion + 1, + replicationTracker.routingTable.shards().stream().map(sr -> sr.allocationId().getId()).collect(Collectors.toSet()), + builder.build()); + } + + public void testPeerRecoveryRetentionLeasesForAssignedCopiesDoNotEverExpire() { + if (randomBoolean()) { + startReplica(); + } + + currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get())); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertFalse(retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(2)); + assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); + } + + public void testPeerRecoveryRetentionLeasesForUnassignedCopiesDoNotExpireImmediatelyIfShardsNotAllStarted() { + final String unknownNodeId = randomAlphaOfLength(10); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + + currentTimeMillis.set(currentTimeMillis.get() + + randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis())); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertFalse("should not have expired anything", retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(3)); + assertThat(leaseIds, equalTo(Stream.concat(Stream.of(ReplicationTracker.getPeerRecoveryRetentionLeaseId(unknownNodeId)), + replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)).collect(Collectors.toSet()))); + } + + public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireEventually() { + if (randomBoolean()) { + startReplica(); + } + + final String unknownNodeId = randomAlphaOfLength(10); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + + currentTimeMillis.set(randomLongBetween( + currentTimeMillis.get() + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis() + 1, + Long.MAX_VALUE)); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertTrue("should have expired something", retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(2)); + assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); + } + + public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireImmediatelyIfShardsAllStarted() { + final String unknownNodeId = randomAlphaOfLength(10); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + + startReplica(); + + currentTimeMillis.set(currentTimeMillis.get() + + (usually() + ? randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis()) + : randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get()))); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertTrue(retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(2)); + assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 2334cb4330887..8dc91db7d3350 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -83,7 +84,7 @@ public void testAddOrRenewRetentionLease() { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); replicationTracker.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); - assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); + assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 2 + i, true, false); } for (int i = 0; i < length; i++) { @@ -93,7 +94,7 @@ public void testAddOrRenewRetentionLease() { } minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, primaryTerm, 1 + length + i, true, false); + assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, primaryTerm, 2 + length + i, true, false); } } @@ -178,6 +179,7 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { Collections.singleton(allocationId.getId()), routingTable(Collections.emptySet(), allocationId)); replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + retainingSequenceNumbers.put(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeIdFromAllocationId(allocationId)), 0L); final int length = randomIntBetween(0, 8); for (int i = 0; i < length; i++) { @@ -239,7 +241,7 @@ public void testRemoveRetentionLease() { length - i - 1, minimumRetainingSequenceNumbers, primaryTerm, - 1 + length + i, + 2 + length + i, true, false); } @@ -298,6 +300,7 @@ public void testRemoveRetentionLeaseCausesRetentionLeaseSync() { Collections.singleton(allocationId.getId()), routingTable(Collections.emptySet(), allocationId)); replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + retainingSequenceNumbers.put(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeIdFromAllocationId(allocationId)), 0L); final int length = randomIntBetween(0, 8); for (int i = 0; i < length; i++) { @@ -365,11 +368,12 @@ private void runExpirationTest(final boolean primaryMode) { { final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); - assertThat(retentionLeases.version(), equalTo(1L)); - assertThat(retentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); + final long expectedVersion = primaryMode ? 2L : 1L; + assertThat(retentionLeases.version(), equalTo(expectedVersion)); + assertThat(retentionLeases.leases(), hasSize(primaryMode ? 2 : 1)); + final RetentionLease retentionLease = retentionLeases.get("0"); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 1, primaryMode, false); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, expectedVersion, primaryMode, false); } // renew the lease @@ -387,18 +391,19 @@ private void runExpirationTest(final boolean primaryMode) { { final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); - assertThat(retentionLeases.version(), equalTo(2L)); - assertThat(retentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); + final long expectedVersion = primaryMode ? 3L : 2L; + assertThat(retentionLeases.version(), equalTo(expectedVersion)); + assertThat(retentionLeases.leases(), hasSize(primaryMode ? 2 : 1)); + final RetentionLease retentionLease = retentionLeases.get("0"); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, primaryMode, false); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, expectedVersion, primaryMode, false); } // now force the lease to expire currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); if (primaryMode) { - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, true, false); - assertRetentionLeases(replicationTracker, 0, new long[0], primaryTerm, 3, true, true); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 3, true, false); + assertRetentionLeases(replicationTracker, 0, new long[0], primaryTerm, 4, true, true); } else { // leases do not expire on replicas until synced from the primary assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, false, false); @@ -625,10 +630,8 @@ private void assertRetentionLeases( } assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); assertThat(retentionLeases.version(), equalTo(version)); - final Map idToRetentionLease = new HashMap<>(); - for (final RetentionLease retentionLease : retentionLeases.leases()) { - idToRetentionLease.put(retentionLease.id(), retentionLease); - } + final Map idToRetentionLease = retentionLeases.leases().stream() + .filter(RetentionLease::isNotPeerRecoveryRetentionLease).collect(Collectors.toMap(RetentionLease::id, Function.identity())); assertThat(idToRetentionLease.entrySet(), hasSize(size)); for (int i = 0; i < size; i++) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java index 5165f2e8dc9e4..5f035a3604f41 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java @@ -52,10 +52,14 @@ ReplicationTracker newTracker( (leases, listener) -> {}); } + static String nodeIdFromAllocationId(final AllocationId allocationId) { + return "n-" + allocationId.getId().substring(0, 8); + } + static IndexShardRoutingTable routingTable(final Set initializingIds, final AllocationId primaryId) { final ShardId shardId = new ShardId("test", "_na_", 0); - final ShardRouting primaryShard = - TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(10), null, true, ShardRoutingState.STARTED, primaryId); + final ShardRouting primaryShard = TestShardRouting.newShardRouting( + shardId, nodeIdFromAllocationId(primaryId), null, true, ShardRoutingState.STARTED, primaryId); return routingTable(initializingIds, primaryShard); } @@ -65,7 +69,7 @@ static IndexShardRoutingTable routingTable(final Set initializingI final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId); for (final AllocationId initializingId : initializingIds) { builder.addShard(TestShardRouting.newShardRouting( - shardId, randomAlphaOfLength(10), null, false, ShardRoutingState.INITIALIZING, initializingId)); + shardId, nodeIdFromAllocationId(initializingId), null, false, ShardRoutingState.INITIALIZING, initializingId)); } builder.addShard(primaryShard); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index 05ca0a5ea3006..802deab8e5234 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -149,6 +149,7 @@ public void testGlobalCheckpointUpdate() { newInitializing.add(extraId); tracker.updateFromMaster(initialClusterStateVersion + 1, ids(active), routingTable(newInitializing, primaryId)); + addPeerRecoveryRetentionLease(tracker, extraId); tracker.initiateTracking(extraId.getId()); // now notify for the new id @@ -190,6 +191,7 @@ public void testMarkAllocationIdAsInSync() throws BrokenBarrierException, Interr tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId)); final long localCheckpoint = randomLongBetween(0, Long.MAX_VALUE - 1); tracker.activatePrimaryMode(localCheckpoint); + addPeerRecoveryRetentionLease(tracker, replicaId); tracker.initiateTracking(replicaId.getId()); final CyclicBarrier barrier = new CyclicBarrier(2); final Thread thread = new Thread(() -> { @@ -357,6 +359,7 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { tracker.updateFromMaster(clusterStateVersion, Collections.singleton(inSyncAllocationId.getId()), routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId)); tracker.activatePrimaryMode(globalCheckpoint); + addPeerRecoveryRetentionLease(tracker, trackingAllocationId); final Thread thread = new Thread(() -> { try { // synchronize starting with the test thread @@ -421,6 +424,7 @@ public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBar tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()), routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId)); tracker.activatePrimaryMode(globalCheckpoint); + addPeerRecoveryRetentionLease(tracker, trackingAllocationId); final Thread thread = new Thread(() -> { try { // synchronize starting with the test thread @@ -563,6 +567,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { initialClusterStateVersion + 3, ids(newActiveAllocationIds), routingTable(newInitializingAllocationIds, primaryId)); + addPeerRecoveryRetentionLease(tracker, newSyncingAllocationId); final CyclicBarrier barrier = new CyclicBarrier(2); final Thread thread = new Thread(() -> { try { @@ -610,7 +615,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { * allocation ID to the in-sync set and removing it from pending, the local checkpoint update that freed the thread waiting for the * local checkpoint to advance could miss updating the global checkpoint in a race if the waiting thread did not add the allocation * ID to the in-sync set and remove it from the pending set before the local checkpoint updating thread executed the global checkpoint - * update. This test fails without an additional call to {@link ReplicationTracker#updateGlobalCheckpointOnPrimary()} after + * update. This test fails without an additional call to {@code ReplicationTracker#updateGlobalCheckpointOnPrimary()} after * removing the allocation ID from the pending set in {@link ReplicationTracker#markAllocationIdAsInSync(String, long)} (even if a * call is added after notifying all waiters in {@link ReplicationTracker#updateLocalCheckpoint(String, long)}). * @@ -630,6 +635,7 @@ public void testRaceUpdatingGlobalCheckpoint() throws InterruptedException, Brok Collections.singleton(active.getId()), routingTable(Collections.singleton(initializing), active)); tracker.activatePrimaryMode(activeLocalCheckpoint); + addPeerRecoveryRetentionLease(tracker, initializing); final int nextActiveLocalCheckpoint = randomIntBetween(activeLocalCheckpoint + 1, Integer.MAX_VALUE); final Thread activeThread = new Thread(() -> { try { @@ -693,7 +699,9 @@ public void testPrimaryContextHandoff() throws IOException { clusterState.apply(oldPrimary); clusterState.apply(newPrimary); - activatePrimary(oldPrimary); + oldPrimary.activatePrimaryMode(randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10)); + addPeerRecoveryRetentionLease(oldPrimary, newPrimary.shardAllocationId); + newPrimary.updateRetentionLeasesOnReplica(oldPrimary.getRetentionLeases()); final int numUpdates = randomInt(10); for (int i = 0; i < numUpdates; i++) { @@ -706,7 +714,7 @@ public void testPrimaryContextHandoff() throws IOException { randomLocalCheckpointUpdate(oldPrimary); } if (randomBoolean()) { - randomMarkInSync(oldPrimary); + randomMarkInSync(oldPrimary, newPrimary); } } @@ -738,7 +746,7 @@ public void testPrimaryContextHandoff() throws IOException { randomLocalCheckpointUpdate(oldPrimary); } if (randomBoolean()) { - randomMarkInSync(oldPrimary); + randomMarkInSync(oldPrimary, newPrimary); } // do another handoff @@ -876,7 +884,10 @@ private static FakeClusterState initialState() { final ShardId shardId = new ShardId("test", "_na_", 0); final ShardRouting primaryShard = TestShardRouting.newShardRouting( - shardId, randomAlphaOfLength(10), randomAlphaOfLength(10), true, ShardRoutingState.RELOCATING, relocatingId); + shardId, + nodeIdFromAllocationId(relocatingId), + nodeIdFromAllocationId(AllocationId.newInitializing(relocatingId.getRelocationId())), + true, ShardRoutingState.RELOCATING, relocatingId); return new FakeClusterState( initialClusterStateVersion, @@ -884,20 +895,17 @@ private static FakeClusterState initialState() { routingTable(initializingAllocationIds, primaryShard)); } - private static void activatePrimary(ReplicationTracker gcp) { - gcp.activatePrimaryMode(randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10)); - } - private static void randomLocalCheckpointUpdate(ReplicationTracker gcp) { String allocationId = randomFrom(gcp.checkpoints.keySet()); long currentLocalCheckpoint = gcp.checkpoints.get(allocationId).getLocalCheckpoint(); gcp.updateLocalCheckpoint(allocationId, Math.max(SequenceNumbers.NO_OPS_PERFORMED, currentLocalCheckpoint + randomInt(5))); } - private static void randomMarkInSync(ReplicationTracker gcp) { - String allocationId = randomFrom(gcp.checkpoints.keySet()); - long newLocalCheckpoint = Math.max(NO_OPS_PERFORMED, gcp.getGlobalCheckpoint() + randomInt(5)); - markAsTrackingAndInSyncQuietly(gcp, allocationId, newLocalCheckpoint); + private static void randomMarkInSync(ReplicationTracker oldPrimary, ReplicationTracker newPrimary) { + final String allocationId = randomFrom(oldPrimary.checkpoints.keySet()); + final long newLocalCheckpoint = Math.max(NO_OPS_PERFORMED, oldPrimary.getGlobalCheckpoint() + randomInt(5)); + markAsTrackingAndInSyncQuietly(oldPrimary, allocationId, newLocalCheckpoint); + newPrimary.updateRetentionLeasesOnReplica(oldPrimary.getRetentionLeases()); } private static FakeClusterState randomUpdateClusterState(Set allocationIds, FakeClusterState clusterState) { @@ -908,11 +916,14 @@ private static FakeClusterState randomUpdateClusterState(Set allocationI final Set inSyncIdsToRemove = new HashSet<>( exclude(randomSubsetOf(randomInt(clusterState.inSyncIds.size()), clusterState.inSyncIds), allocationIds)); final Set remainingInSyncIds = Sets.difference(clusterState.inSyncIds, inSyncIdsToRemove); + final Set initializingIdsExceptRelocationTargets = exclude(clusterState.initializingIds(), + clusterState.routingTable.activeShards().stream().filter(ShardRouting::relocating) + .map(s -> s.allocationId().getRelocationId()).collect(Collectors.toSet())); return new FakeClusterState( clusterState.version + randomIntBetween(1, 5), remainingInSyncIds.isEmpty() ? clusterState.inSyncIds : remainingInSyncIds, routingTable( - Sets.difference(Sets.union(clusterState.initializingIds(), initializingIdsToAdd), initializingIdsToRemove), + Sets.difference(Sets.union(initializingIdsExceptRelocationTargets, initializingIdsToAdd), initializingIdsToRemove), clusterState.routingTable.primaryShard())); } @@ -945,6 +956,7 @@ private static Set randomAllocationIdsExcludingExistingIds(final S private static void markAsTrackingAndInSyncQuietly( final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) { try { + addPeerRecoveryRetentionLease(tracker, allocationId); tracker.initiateTracking(allocationId); tracker.markAllocationIdAsInSync(allocationId, localCheckpoint); } catch (final InterruptedException e) { @@ -952,4 +964,15 @@ private static void markAsTrackingAndInSyncQuietly( } } + private static void addPeerRecoveryRetentionLease(final ReplicationTracker tracker, final AllocationId allocationId) { + final String nodeId = nodeIdFromAllocationId(allocationId); + if (tracker.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeId)) == false) { + tracker.addPeerRecoveryRetentionLease(nodeId, NO_OPS_PERFORMED, ActionListener.wrap(() -> { })); + } + } + + private static void addPeerRecoveryRetentionLease(final ReplicationTracker tracker, final String allocationId) { + addPeerRecoveryRetentionLease(tracker, AllocationId.newInitializing(allocationId)); + } + } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java index bff4493321289..511a93e8268d1 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java @@ -73,11 +73,14 @@ public void testAddAction() { assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(1)); assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); - assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); - final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); assertThat(retentionLease.id(), equalTo(id)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(retainingSequenceNumber == RETAIN_ALL ? 0L : retainingSequenceNumber)); assertThat(retentionLease.source(), equalTo(source)); + + assertTrue(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(stats.getShards()[0].getShardRouting()))); } public void testAddAlreadyExists() { @@ -160,9 +163,11 @@ public void testRenewAction() throws InterruptedException { assertNotNull(initialStats.getShards()); assertThat(initialStats.getShards(), arrayWithSize(1)); assertNotNull(initialStats.getShards()[0].getRetentionLeaseStats()); - assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + assertTrue(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(initialStats.getShards()[0].getShardRouting()))); final RetentionLease initialRetentionLease = - initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); final long nextRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL @@ -195,9 +200,11 @@ public void testRenewAction() throws InterruptedException { assertNotNull(renewedStats.getShards()); assertThat(renewedStats.getShards(), arrayWithSize(1)); assertNotNull(renewedStats.getShards()[0].getRetentionLeaseStats()); - assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + assertTrue(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(initialStats.getShards()[0].getShardRouting()))); final RetentionLease renewedRetentionLease = - renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); assertThat(renewedRetentionLease.id(), equalTo(id)); assertThat( renewedRetentionLease.retainingSequenceNumber(), @@ -265,7 +272,9 @@ public void testRemoveAction() { assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(1)); assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); - assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(0)); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + assertTrue(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(stats.getShards()[0].getShardRouting()))); } public void testRemoveNotFound() { @@ -328,8 +337,10 @@ public void onFailure(final Exception e) { assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(1)); assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); - assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); - final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + assertTrue(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(stats.getShards()[0].getShardRouting()))); + final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); assertThat(retentionLease.id(), equalTo(id)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(retainingSequenceNumber == RETAIN_ALL ? 0L : retainingSequenceNumber)); assertThat(retentionLease.source(), equalTo(source)); @@ -378,9 +389,10 @@ public void testRenewUnderBlock() throws InterruptedException { assertNotNull(initialStats.getShards()); assertThat(initialStats.getShards(), arrayWithSize(1)); assertNotNull(initialStats.getShards()[0].getRetentionLeaseStats()); - assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); - final RetentionLease initialRetentionLease = - initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + assertTrue(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(initialStats.getShards()[0].getShardRouting()))); + final RetentionLease initialRetentionLease = initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); final long nextRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL @@ -427,9 +439,10 @@ public void onFailure(final Exception e) { assertNotNull(renewedStats.getShards()); assertThat(renewedStats.getShards(), arrayWithSize(1)); assertNotNull(renewedStats.getShards()[0].getRetentionLeaseStats()); - assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); - final RetentionLease renewedRetentionLease = - renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + assertTrue(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(renewedStats.getShards()[0].getShardRouting()))); + final RetentionLease renewedRetentionLease = renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); assertThat(renewedRetentionLease.id(), equalTo(id)); assertThat( renewedRetentionLease.retainingSequenceNumber(), @@ -484,7 +497,9 @@ public void onFailure(final Exception e) { assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(1)); assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); - assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(0)); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + assertTrue(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(stats.getShards()[0].getShardRouting()))); } /* diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index debb6d219a5f1..e9faa5f8ce987 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -115,7 +115,8 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { retentionLock.close(); // check retention leases have been written on the primary - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primary.loadRetentionLeases()))); + assertThat(currentRetentionLeases, + equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -124,11 +125,13 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); + final Map retentionLeasesOnReplica + = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been written on the replica - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases()))); + assertThat(currentRetentionLeases, + equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); } } } @@ -173,7 +176,8 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { retentionLock.close(); // check retention leases have been written on the primary - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primary.loadRetentionLeases()))); + assertThat(currentRetentionLeases, + equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -182,11 +186,13 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); + final Map retentionLeasesOnReplica = + RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been written on the replica - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases()))); + assertThat(currentRetentionLeases, + equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); } } } @@ -239,7 +245,8 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(replica.getRetentionLeases().leases(), anyOf(empty(), contains(currentRetentionLease))); + assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()).values(), + anyOf(empty(), contains(currentRetentionLease))); } // update the index for retention leases to short a long time, to force expiration @@ -256,7 +263,8 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { // sleep long enough that the current retention lease has expired final long later = System.nanoTime(); Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now))); - assertBusy(() -> assertThat(primary.getRetentionLeases().leases(), empty())); + assertBusy(() -> assertThat( + RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.getRetentionLeases()).entrySet(), empty())); // now that all retention leases are expired should have been synced to all replicas assertBusy(() -> { @@ -266,7 +274,7 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(replica.getRetentionLeases().leases(), empty()); + assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()).entrySet(), empty()); } }); } @@ -432,11 +440,13 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); + final Map retentionLeasesOnReplica + = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been written on the replica; see RecoveryTarget#finalizeRecovery - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases()))); + assertThat(currentRetentionLeases, + equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); } } @@ -474,7 +484,9 @@ public void testCanRenewRetentionLeaseUnderBlock() throws InterruptedException { * way for the current retention leases to end up written to disk so we assume that if they are written to disk, it * implies that the background sync was able to execute under a block. */ - assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get()))); + assertBusy(() -> assertThat( + RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()).values(), + contains(retentionLease.get()))); } catch (final Exception e) { failWithException(e); } @@ -593,7 +605,9 @@ public void testCanRenewRetentionLeaseWithoutWaitingForShards() throws Interrupt * way for the current retention leases to end up written to disk so we assume that if they are written to disk, it * implies that the background sync was able to execute despite wait for shards being set on the index. */ - assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get()))); + assertBusy(() -> assertThat( + RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()).values(), + contains(retentionLease.get()))); } catch (final Exception e) { failWithException(e); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java index adacf6539a80e..da22d68bf5f4d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java @@ -63,7 +63,8 @@ public void testRetentionLeaseStats() throws InterruptedException { final IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("index").execute().actionGet(); assertThat(indicesStats.getShards(), arrayWithSize(1)); final RetentionLeaseStats retentionLeaseStats = indicesStats.getShards()[0].getRetentionLeaseStats(); - assertThat(RetentionLeases.toMap(retentionLeaseStats.retentionLeases()), equalTo(currentRetentionLeases)); + assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(retentionLeaseStats.retentionLeases()), + equalTo(currentRetentionLeases)); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 974e060bf2520..ec3065331d275 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.RetentionLeases; @@ -35,14 +36,13 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; @@ -73,7 +73,7 @@ public void testAddOrRenewRetentionLease() throws IOException { indexShard.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); assertRetentionLeases( - indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); + indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 2 + i, true, false); } for (int i = 0; i < length; i++) { @@ -84,7 +84,7 @@ public void testAddOrRenewRetentionLease() throws IOException { length, minimumRetainingSequenceNumbers, primaryTerm, - 1 + length + i, + 2 + length + i, true, false); } @@ -105,7 +105,7 @@ public void testRemoveRetentionLease() throws IOException { indexShard.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); assertRetentionLeases( - indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); + indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 2 + i, true, false); } for (int i = 0; i < length; i++) { @@ -115,7 +115,7 @@ public void testRemoveRetentionLease() throws IOException { length - i - 1, minimumRetainingSequenceNumbers, primaryTerm, - 1 + length + i, + 2 + length + i, true, false); } @@ -132,6 +132,12 @@ public void testExpirationOnReplica() throws IOException { runExpirationTest(false); } + private RetentionLease peerRecoveryRetentionLease(IndexShard indexShard) { + return new RetentionLease( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(indexShard.routingEntry()), 0, currentTimeMillis.get(), + ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } + private void runExpirationTest(final boolean primary) throws IOException { final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); final Settings settings = Settings @@ -147,23 +153,28 @@ private void runExpirationTest(final boolean primary) throws IOException { try { final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(0, Long.MAX_VALUE); + final long initialVersion; if (primary) { + initialVersion = 2; indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); } else { + initialVersion = 3; final RetentionLeases retentionLeases = new RetentionLeases( primaryTerm, - 1, - Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + initialVersion, + Arrays.asList( + peerRecoveryRetentionLease(indexShard), + new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); indexShard.updateRetentionLeasesOnReplica(retentionLeases); } { final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); - assertThat(retentionLeases.version(), equalTo(1L)); - assertThat(retentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); + assertThat(retentionLeases.version(), equalTo(initialVersion)); + assertThat(retentionLeases.leases(), hasSize(2)); + final RetentionLease retentionLease = retentionLeases.get("0"); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 1, primary, false); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, initialVersion, primary, false); } // renew the lease @@ -174,28 +185,30 @@ private void runExpirationTest(final boolean primary) throws IOException { } else { final RetentionLeases retentionLeases = new RetentionLeases( primaryTerm, - 2, - Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + initialVersion + 1, + Arrays.asList( + peerRecoveryRetentionLease(indexShard), + new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); indexShard.updateRetentionLeasesOnReplica(retentionLeases); } { final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); - assertThat(retentionLeases.version(), equalTo(2L)); - assertThat(retentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); + assertThat(retentionLeases.version(), equalTo(initialVersion + 1)); + assertThat(retentionLeases.leases(), hasSize(2)); + final RetentionLease retentionLease = retentionLeases.get("0"); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, primary, false); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, initialVersion + 1, primary, false); } // now force the lease to expire currentTimeMillis.set( currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); if (primary) { - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, true, false); - assertRetentionLeases(indexShard, 0, new long[0], primaryTerm, 3, true, true); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, initialVersion + 1, true, false); + assertRetentionLeases(indexShard, 0, new long[0], primaryTerm, initialVersion + 2, true, true); } else { - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, false, false); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, initialVersion + 1, false, false); } } finally { closeShards(indexShard); @@ -229,13 +242,8 @@ public void testPersistence() throws IOException { // the written retention leases should equal our current retention leases final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); final RetentionLeases writtenRetentionLeases = indexShard.loadRetentionLeases(); - if (retentionLeases.leases().isEmpty()) { - assertThat(writtenRetentionLeases.version(), equalTo(0L)); - assertThat(writtenRetentionLeases.leases(), empty()); - } else { - assertThat(writtenRetentionLeases.version(), equalTo((long) length)); - assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0]))); - } + assertThat(writtenRetentionLeases.version(), equalTo(1L + length)); + assertThat(writtenRetentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0]))); // when we recover, we should recover the retention leases final IndexShard recoveredShard = reinitShard( @@ -244,15 +252,10 @@ public void testPersistence() throws IOException { try { recoverShardFromStore(recoveredShard); final RetentionLeases recoveredRetentionLeases = recoveredShard.getEngine().config().retentionLeasesSupplier().get(); - if (retentionLeases.leases().isEmpty()) { - assertThat(recoveredRetentionLeases.version(), equalTo(0L)); - assertThat(recoveredRetentionLeases.leases(), empty()); - } else { - assertThat(recoveredRetentionLeases.version(), equalTo((long) length)); - assertThat( - recoveredRetentionLeases.leases(), - contains(retentionLeases.leases().toArray(new RetentionLease[0]))); - } + assertThat(recoveredRetentionLeases.version(), equalTo(1L + length)); + assertThat( + recoveredRetentionLeases.leases(), + contains(retentionLeases.leases().toArray(new RetentionLease[0]))); } finally { closeShards(recoveredShard); } @@ -265,8 +268,10 @@ public void testPersistence() throws IOException { try { recoverShardFromStore(forceRecoveredShard); final RetentionLeases recoveredRetentionLeases = forceRecoveredShard.getEngine().config().retentionLeasesSupplier().get(); - assertThat(recoveredRetentionLeases.leases(), empty()); - assertThat(recoveredRetentionLeases.version(), equalTo(0L)); + assertThat(recoveredRetentionLeases.leases(), hasSize(1)); + assertThat(recoveredRetentionLeases.leases().iterator().next().id(), + equalTo(ReplicationTracker.getPeerRecoveryRetentionLeaseId(indexShard.routingEntry()))); + assertThat(recoveredRetentionLeases.version(), equalTo(1L)); } finally { closeShards(forceRecoveredShard); } @@ -291,8 +296,8 @@ public void testRetentionLeaseStats() throws IOException { stats.retentionLeases(), length, minimumRetainingSequenceNumbers, - length == 0 ? RetentionLeases.EMPTY.primaryTerm() : indexShard.getOperationPrimaryTerm(), - length); + indexShard.getOperationPrimaryTerm(), + length + 1); } finally { closeShards(indexShard); } @@ -355,7 +360,9 @@ private void assertRetentionLeases( assertThat(retentionLeases.version(), equalTo(version)); final Map idToRetentionLease = new HashMap<>(); for (final RetentionLease retentionLease : retentionLeases.leases()) { - idToRetentionLease.put(retentionLease.id(), retentionLease); + if (retentionLease.isNotPeerRecoveryRetentionLease()) { + idToRetentionLease.put(retentionLease.id(), retentionLease); + } } assertThat(idToRetentionLease.entrySet(), hasSize(size)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5187ef37fcdf8..be1c6254d868b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -102,6 +102,8 @@ import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; @@ -2076,7 +2078,8 @@ public void testPrimaryHandOffUpdatesLocalCheckpoint() throws IOException { } IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); - final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard()); + final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard(), Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), primarySource.indexSettings().isSoftDeleteEnabled()).build()); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetaData()); recoverReplica(primaryTarget, primarySource, true); @@ -2873,7 +2876,7 @@ public void testCompletionStatsMarksSearcherAccessed() throws Exception { public void testDocStats() throws Exception { IndexShard indexShard = null; try { - indexShard = newStartedShard( + indexShard = newStartedShard(false, Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0).build()); final long numDocs = randomIntBetween(2, 32); // at least two documents so we have docs to delete final long numDocsToDelete = randomLongBetween(1, numDocs); @@ -2911,13 +2914,20 @@ public void testDocStats() throws Exception { deleteDoc(indexShard, "_doc", id); indexDoc(indexShard, "_doc", id); } - // Need to update and sync the global checkpoint as the soft-deletes retention MergePolicy depends on it. + // Need to update and sync the global checkpoint and the retention leases for the soft-deletes retention MergePolicy. if (indexShard.indexSettings.isSoftDeleteEnabled()) { + final long newGlobalCheckpoint = indexShard.getLocalCheckpoint(); if (indexShard.routingEntry().primary()) { - indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), - indexShard.getLocalCheckpoint()); + indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), newGlobalCheckpoint); + indexShard.advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint(); } else { - indexShard.updateGlobalCheckpointOnReplica(indexShard.getLocalCheckpoint(), "test"); + indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test"); + + final RetentionLeases retentionLeases = indexShard.getRetentionLeases(); + indexShard.updateRetentionLeasesOnReplica(new RetentionLeases( + retentionLeases.primaryTerm(), retentionLeases.version() + 1, + retentionLeases.leases().stream().map(lease -> new RetentionLease(lease.id(), newGlobalCheckpoint + 1, + lease.timestamp(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)).collect(Collectors.toList()))); } indexShard.sync(); } @@ -3504,6 +3514,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception { // In order to instruct the merge policy not to keep a fully deleted segment, // we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything. if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { + primary.advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint(); primary.sync(); flushShard(primary); } @@ -3983,6 +3994,7 @@ public void testTypelessDelete() throws IOException { IndexMetaData metaData = IndexMetaData.builder("index") .putMapping("some_type", "{ \"properties\": {}}") .settings(settings) + .primaryTerm(0, 1) .build(); IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); recoverShardFromStore(shard); @@ -3990,10 +4002,10 @@ public void testTypelessDelete() throws IOException { assertTrue(indexResult.isCreated()); DeleteResult deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "some_other_type", "id", VersionType.INTERNAL, - UNASSIGNED_SEQ_NO, 0); + UNASSIGNED_SEQ_NO, 1); assertFalse(deleteResult.isFound()); - deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "_doc", "id", VersionType.INTERNAL, UNASSIGNED_SEQ_NO, 0); + deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "_doc", "id", VersionType.INTERNAL, UNASSIGNED_SEQ_NO, 1); assertTrue(deleteResult.isFound()); closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 59e7c21a3e6e8..4a3b6629eeb89 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -81,6 +81,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -1052,6 +1053,10 @@ public void testFilterCacheStats() throws Exception { // we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything. if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { persistGlobalCheckpoint("index"); + internalCluster().nodesInclude("index").stream() + .flatMap(n -> StreamSupport.stream(internalCluster().getInstance(IndicesService.class, n).spliterator(), false)) + .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) + .forEach(IndexShard::advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint); flush("index"); } ForceMergeResponse forceMergeResponse = diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f7042448e7576..67c29004b2bea 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -48,6 +48,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; @@ -666,7 +667,7 @@ public EngineConfig config( SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> listener.onResponse(new ReplicationResponse())); globalCheckpointSupplier = replicationTracker; retentionLeasesSupplier = replicationTracker::getRetentionLeases; } else { diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index b11a0f84fb84a..93d8b98d306e8 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -287,6 +287,7 @@ public synchronized int startReplicas(int numOfReplicasToStart) throws IOExcepti public void startPrimary() throws IOException { recoverPrimary(primary); + computeReplicationTargets(); HashSet activeIds = new HashSet<>(); activeIds.addAll(activeIds()); activeIds.add(primary.routingEntry().allocationId().getId()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 2a2176f1c100d..2041ba63e5e9d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -222,7 +222,12 @@ protected IndexShard newShard(boolean primary, Settings settings, EngineFactory } protected IndexShard newShard(ShardRouting shardRouting, final IndexingOperationListener... listeners) throws IOException { - return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory(), listeners); + return newShard(shardRouting, Settings.EMPTY, listeners); + } + + protected IndexShard newShard(ShardRouting shardRouting, final Settings settings, final IndexingOperationListener... listeners) + throws IOException { + return newShard(shardRouting, settings, new InternalEngineFactory(), listeners); } /**