Skip to content

Commit

Permalink
Advance PRRLs to match GCP of tracked shards (#43751)
Browse files Browse the repository at this point in the history
This commit adjusts the behaviour of the retention lease sync to first renew
any peer-recovery retention leases where either:

- the corresponding shard's global checkpoint has advanced, or

- the lease is older than half of its expiry time

Relates #41536
  • Loading branch information
DaveCTurner authored Jul 1, 2019
1 parent f3fbb33 commit fbc4477
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
Expand Down Expand Up @@ -457,18 +458,55 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting)
}

/**
* Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done
* properly. TODO remove this.
* Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global
* checkpoint, and renew any leases that are approaching expiry.
*/
public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
public synchronized void renewPeerRecoveryRetentionLeases() {
assert primaryMode;
for (ShardRouting shardRouting : routingTable) {
if (shardRouting.assignedToNode()) {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), Math.max(0L, checkpointState.globalCheckpoint + 1L),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
assert invariant();

/*
* Peer-recovery retention leases never expire while the associated shard is assigned, but we must still renew them occasionally in
* case the associated shard is temporarily unassigned. However we must not renew them too often, since each renewal must be
* persisted and the resulting IO can be expensive on nodes with large numbers of shards (see #42299). We choose to renew them after
* half the expiry time, so that by default the cluster has at least 6 hours to recover before these leases start to expire.
*/
final long renewalTimeMillis = currentTimeMillisSupplier.getAsLong() - indexSettings.getRetentionLeaseMillis() / 2;

/*
* If any of the peer-recovery retention leases need renewal, it's a good opportunity to renew them all.
*/
final boolean renewalNeeded = StreamSupport.stream(routingTable.spliterator(), false).filter(ShardRouting::assignedToNode)
.anyMatch(shardRouting -> {
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
if (retentionLease == null) {
/*
* If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't
* create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation.
*/
assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false
|| indexSettings.getIndexVersionCreated().before(Version.V_8_0_0);
return false;
}
return retentionLease.timestamp() <= renewalTimeMillis
|| retentionLease.retainingSequenceNumber() <= checkpoints.get(shardRouting.allocationId().getId()).globalCheckpoint;
});

if (renewalNeeded) {
for (ShardRouting shardRouting : routingTable) {
if (shardRouting.assignedToNode()) {
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
if (retentionLease != null) {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting),
Math.max(0L, checkpointState.globalCheckpoint + 1L),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
}
}
}
}

assert invariant();
}

public static class CheckpointState implements Writeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2111,6 +2111,7 @@ public void syncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
ensureSoftDeletesEnabled("retention leases");
replicationTracker.renewPeerRecoveryRetentionLeases();
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
if (retentionLeases.v1()) {
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
Expand Down Expand Up @@ -2502,16 +2503,6 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
}

/**
* Test-only method to advance the all shards' peer-recovery retention leases to their tracked global checkpoints so that operations
* can be discarded. TODO Remove this when retention leases are advanced by other mechanisms.
*/
public void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
assert assertPrimaryMode();
replicationTracker.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
syncRetentionLeases();
}

class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.test.IndexSettingsModule;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -50,6 +51,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
Expand All @@ -61,6 +63,9 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
Expand Down Expand Up @@ -975,4 +980,158 @@ private static void addPeerRecoveryRetentionLease(final ReplicationTracker track
addPeerRecoveryRetentionLease(tracker, AllocationId.newInitializing(allocationId));
}

public void testPeerRecoveryRetentionLeaseCreationAndRenewal() {

final int numberOfActiveAllocationsIds = randomIntBetween(1, 8);
final int numberOfInitializingIds = randomIntBetween(0, 8);
final Tuple<Set<AllocationId>, Set<AllocationId>> activeAndInitializingAllocationIds =
randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
final Set<AllocationId> activeAllocationIds = activeAndInitializingAllocationIds.v1();
final Set<AllocationId> initializingAllocationIds = activeAndInitializingAllocationIds.v2();

final AllocationId primaryId = activeAllocationIds.iterator().next();

final long initialClusterStateVersion = randomNonNegativeLong();

final AtomicLong currentTimeMillis = new AtomicLong(0L);
final ReplicationTracker tracker = newTracker(primaryId, updatedGlobalCheckpoint::set, currentTimeMillis::get);

final long retentionLeaseExpiryTimeMillis = tracker.indexSettings().getRetentionLeaseMillis();
final long peerRecoveryRetentionLeaseRenewalTimeMillis = retentionLeaseExpiryTimeMillis / 2;

final long maximumTestTimeMillis = 13 * retentionLeaseExpiryTimeMillis;
final long testStartTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - maximumTestTimeMillis);
currentTimeMillis.set(testStartTimeMillis);

final Function<AllocationId, RetentionLease> retentionLeaseFromAllocationId = allocationId
-> new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeIdFromAllocationId(allocationId)),
0L, currentTimeMillis.get(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE);

final List<RetentionLease> initialLeases = new ArrayList<>();
if (randomBoolean()) {
initialLeases.add(retentionLeaseFromAllocationId.apply(primaryId));
}
for (final AllocationId replicaId : initializingAllocationIds) {
if (randomBoolean()) {
initialLeases.add(retentionLeaseFromAllocationId.apply(replicaId));
}
}
for (int i = randomIntBetween(0, 5); i > 0; i--) {
initialLeases.add(retentionLeaseFromAllocationId.apply(AllocationId.newInitializing()));
}
tracker.updateRetentionLeasesOnReplica(new RetentionLeases(randomNonNegativeLong(), randomNonNegativeLong(), initialLeases));

IndexShardRoutingTable routingTable = routingTable(initializingAllocationIds, primaryId);
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable);
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
assertTrue("primary's retention lease should exist",
tracker.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(routingTable.primaryShard())));

final Consumer<Runnable> assertAsTimePasses = assertion -> {
final long startTime = currentTimeMillis.get();
while (currentTimeMillis.get() < startTime + retentionLeaseExpiryTimeMillis * 2) {
currentTimeMillis.addAndGet(randomLongBetween(0L, retentionLeaseExpiryTimeMillis * 2));
tracker.renewPeerRecoveryRetentionLeases();
tracker.getRetentionLeases(true);
assertion.run();
}
};

assertAsTimePasses.accept(() -> {
// Leases for assigned replicas do not expire
final RetentionLeases retentionLeases = tracker.getRetentionLeases();
for (final AllocationId replicaId : initializingAllocationIds) {
final String leaseId = retentionLeaseFromAllocationId.apply(replicaId).id();
assertTrue("should not have removed lease for " + replicaId + " in " + retentionLeases,
initialLeases.stream().noneMatch(l -> l.id().equals(leaseId)) || retentionLeases.contains(leaseId));
}
});

// Leases that don't correspond to assigned replicas, however, are expired by this time.
final Set<String> expectedLeaseIds = Stream.concat(Stream.of(primaryId), initializingAllocationIds.stream())
.map(allocationId -> retentionLeaseFromAllocationId.apply(allocationId).id()).collect(Collectors.toSet());
for (final RetentionLease retentionLease : tracker.getRetentionLeases().leases()) {
assertThat(expectedLeaseIds, hasItem(retentionLease.id()));
}

for (AllocationId replicaId : initializingAllocationIds) {
markAsTrackingAndInSyncQuietly(tracker, replicaId.getId(), NO_OPS_PERFORMED);
}

assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
equalTo(expectedLeaseIds));

assertAsTimePasses.accept(() -> {
// Leases still don't expire
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
equalTo(expectedLeaseIds));

// Also leases are renewed before reaching half the expiry time
//noinspection OptionalGetWithoutIsPresent
assertThat(tracker.getRetentionLeases() + " renewed before too long",
tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong(),
greaterThanOrEqualTo(currentTimeMillis.get() - peerRecoveryRetentionLeaseRenewalTimeMillis));
});

IndexShardRoutingTable.Builder routingTableBuilder = new IndexShardRoutingTable.Builder(routingTable);
for (ShardRouting replicaShard : routingTable.replicaShards()) {
routingTableBuilder.removeShard(replicaShard);
routingTableBuilder.addShard(replicaShard.moveToStarted());
}
routingTable = routingTableBuilder.build();
activeAllocationIds.addAll(initializingAllocationIds);

tracker.updateFromMaster(initialClusterStateVersion + randomLongBetween(1, 10), ids(activeAllocationIds), routingTable);

assertAsTimePasses.accept(() -> {
// Leases still don't expire
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
equalTo(expectedLeaseIds));
// ... and any extra peer recovery retention leases are expired immediately since the shard is fully active
tracker.addPeerRecoveryRetentionLease(randomAlphaOfLength(10), randomNonNegativeLong(), ActionListener.wrap(() -> {}));
});

tracker.renewPeerRecoveryRetentionLeases();
assertTrue("expired extra lease", tracker.getRetentionLeases(true).v1());

final AllocationId advancingAllocationId
= initializingAllocationIds.isEmpty() || rarely() ? primaryId : randomFrom(initializingAllocationIds);
final String advancingLeaseId = retentionLeaseFromAllocationId.apply(advancingAllocationId).id();

final long initialGlobalCheckpoint
= Math.max(NO_OPS_PERFORMED, tracker.getTrackedLocalCheckpointForShard(advancingAllocationId.getId()).globalCheckpoint);
assertThat(tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(initialGlobalCheckpoint + 1));
final long newGlobalCheckpoint = initialGlobalCheckpoint + randomLongBetween(1, 1000);
tracker.updateGlobalCheckpointForShard(advancingAllocationId.getId(), newGlobalCheckpoint);
tracker.renewPeerRecoveryRetentionLeases();
assertThat("lease was renewed because the shard advanced its global checkpoint",
tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(newGlobalCheckpoint + 1));

final long initialVersion = tracker.getRetentionLeases().version();
tracker.renewPeerRecoveryRetentionLeases();
assertThat("immediate renewal is a no-op", tracker.getRetentionLeases().version(), equalTo(initialVersion));

//noinspection OptionalGetWithoutIsPresent
final long millisUntilFirstRenewal
= tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong()
+ peerRecoveryRetentionLeaseRenewalTimeMillis
- currentTimeMillis.get();

if (millisUntilFirstRenewal != 0) {
final long shorterThanRenewalTime = randomLongBetween(0L, millisUntilFirstRenewal - 1);
currentTimeMillis.addAndGet(shorterThanRenewalTime);
tracker.renewPeerRecoveryRetentionLeases();
assertThat("renewal is a no-op after a short time", tracker.getRetentionLeases().version(), equalTo(initialVersion));
currentTimeMillis.addAndGet(millisUntilFirstRenewal - shorterThanRenewalTime);
}

tracker.renewPeerRecoveryRetentionLeases();
assertThat("renewal happens after a sufficiently long time", tracker.getRetentionLeases().version(), greaterThan(initialVersion));
assertTrue("all leases were renewed",
tracker.getRetentionLeases().leases().stream().allMatch(l -> l.timestamp() == currentTimeMillis.get()));

assertThat("test ran for too long, potentially leading to overflow",
currentTimeMillis.get(), lessThanOrEqualTo(testStartTimeMillis + maximumTestTimeMillis));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2925,7 +2925,7 @@ public void testDocStats() throws Exception {
indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint());
indexShard.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
indexShard.syncRetentionLeases();
} else {
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");

Expand Down Expand Up @@ -3524,7 +3524,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception {
primary.updateGlobalCheckpointForShard(
primary.routingEntry().allocationId().getId(),
primary.getLastSyncedGlobalCheckpoint());
primary.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
primary.syncRetentionLeases();
primary.sync();
flushShard(primary);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -1009,7 +1008,10 @@ private void assertCumulativeQueryCacheStats(IndicesStatsResponse response) {
}

public void testFilterCacheStats() throws Exception {
Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build();
Settings settings = Settings.builder().put(indexSettings())
.put("number_of_replicas", 0)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms")
.build();
assertAcked(prepareCreate("index").setSettings(settings).get());
indexRandom(false, true,
client().prepareIndex("index", "type", "1").setSource("foo", "bar"),
Expand Down Expand Up @@ -1053,10 +1055,13 @@ public void testFilterCacheStats() throws Exception {
// we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything.
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index");
internalCluster().nodesInclude("index").stream()
.flatMap(n -> StreamSupport.stream(internalCluster().getInstance(IndicesService.class, n).spliterator(), false))
.flatMap(n -> StreamSupport.stream(n.spliterator(), false))
.forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints);
assertBusy(() -> {
for (final ShardStats shardStats : client().admin().indices().prepareStats("index").get().getIndex("index").getShards()) {
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream()
.allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1));
}
});
flush("index");
}
ForceMergeResponse forceMergeResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public List<Setting<?>> getSettings() {
PROVIDED_NAME_SETTING,
TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING,
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING,
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING
);
}
Expand Down
Loading

0 comments on commit fbc4477

Please sign in to comment.