Skip to content

Commit

Permalink
Remove PRRLs before performing file-based recovery
Browse files Browse the repository at this point in the history
If the primary performs a file-based recovery to a node that has (or recently
had) a copy of the shard then it is possible that the persisted global
checkpoint of the new copy is behind that of the old copy since file-based
recoveries are somewhat destructive operations.

Today we leave that node's PRRL in place during the recovery with the
expectation that it can be used by the new copy. However this isn't the case if
the new copy needs more history to be retained, because retention leases may
only advance and never retreat.

This commit addresses this by removing any existing PRRL during a file-based
recovery: since we are performing a file-based recovery we have already
determined that there isn't enough history for an ops-based recovery, so there
is little point in keeping the old lease in place.

Caught by [a failure of `RecoveryWhileUnderLoadIT.testRecoverWhileRelocating`](https://scans.gradle.com/s/wxccfrtfgjj3g/console-log?task=:server:integTest#L14)

Relates elastic#41536
  • Loading branch information
DaveCTurner committed Jul 3, 2019
1 parent b1be151 commit 0a0604b
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener);
}

public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
removeRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), listener);
}

/**
* Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
Expand Down Expand Up @@ -498,9 +502,17 @@ public synchronized void renewPeerRecoveryRetentionLeases() {
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
if (retentionLease != null) {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting),
Math.max(0L, checkpointState.globalCheckpoint + 1L),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
final long newRetainedSequenceNumber = Math.max(0L, checkpointState.globalCheckpoint + 1L);
if (retentionLease.retainingSequenceNumber() <= newRetainedSequenceNumber) {
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), newRetainedSequenceNumber,
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
} else {
// the retention lease is tied to the node, not the shard copy, so it's possible a copy was removed and now
// we are in the process of recovering it again. The recovery process will fix the lease before initiating
// tracking on this copy:
assert checkpointState.tracked == false :
"cannot renew " + retentionLease + " according to " + checkpointState + " for " + shardRouting;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2503,6 +2503,11 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
}

public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
assert assertPrimaryMode();
replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener);
}

class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -196,7 +197,30 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
logger.warn("releasing snapshot caused exception", ex);
}
});
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);

final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();
if (shard.indexSettings().isSoftDeleteEnabled()
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) {
runUnderPrimaryPermit(() -> {
try {
// If the target previously had a copy of this shard then a file-based recovery might move its global
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
// new one later on in the recovery.
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), deleteRetentionLeaseStep);
} catch (RetentionLeaseNotFoundException e) {
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
deleteRetentionLeaseStep.onResponse(null);
}
}, shardId + " removing retention leaes for [" + request.targetAllocationId() + "]",
shard, cancellableThreads, logger);
} else {
deleteRetentionLeaseStep.onResponse(null);
}

deleteRetentionLeaseStep.whenComplete(ignored -> {
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
}, onFailure);

} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.mapper.MapperParsingException;
Expand All @@ -70,6 +71,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSIndexStore;
Expand Down Expand Up @@ -127,8 +129,12 @@ public class IndexRecoveryIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class, TestAnalysisPlugin.class);
return Arrays.asList(
MockTransportService.TestPlugin.class,
MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class,
TestAnalysisPlugin.class,
InternalSettingsPlugin.class);
}

@After
Expand Down Expand Up @@ -1015,4 +1021,45 @@ public TokenStream create(TokenStream tokenStream) {
});
}
}

public void testRepeatedRecovery() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);

// Ensures that you can remove a replica and then add it back again without any ill effects, even if it's allocated back to the
// node that held it previously, in case that node hasn't completely cleared it up.

final String indexName = "test-index";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 6))
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms")
.build());
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 10))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));

assertThat(client().admin().indices().prepareFlush(indexName).get().getFailedShards(), equalTo(0));

assertBusy(() -> {
final ShardStats[] shardsStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards();
for (final ShardStats shardStats : shardsStats) {
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream()
.allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1));
}
});

logger.info("--> remove replicas");
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put("index.number_of_replicas", 0)));
ensureGreen(indexName);

logger.info("--> index more documents");
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 10))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));

logger.info("--> add replicas again");
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put("index.number_of_replicas", 1)));
ensureGreen(indexName);
}
}

0 comments on commit 0a0604b

Please sign in to comment.