Skip to content

Commit

Permalink
Ignore Lucene index in peer recovery if translog corrupted (#49114)
Browse files Browse the repository at this point in the history
If the translog on a replica is corrupt, we should not perform an
operation-based recovery or utilize sync_id as we won't be able to open
an engine in the next step. This change adds an extra validation that
ensures translog is okay when preparing a peer recovery request.
  • Loading branch information
dnhatn committed Nov 25, 2019
1 parent 7ddf540 commit 3b827b8
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private void doRecovery(final long recoveryId) {
cancellableThreads = recoveryTarget.cancellableThreads();
try {
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
request = getStartRecoveryRequest(recoveryTarget);
request = getStartRecoveryRequest(recoveryTarget, clusterService.localNode(), logger);
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
recoveryTarget.indexShard().prepareForIndexRecovery();
} catch (final Exception e) {
Expand Down Expand Up @@ -322,7 +322,7 @@ public RecoveryResponse read(StreamInput in) throws IOException {
* @param recoveryTarget the target of the recovery
* @return a snapshot of the store metadata
*/
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
private static Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget, final Logger logger) {
try {
return recoveryTarget.indexShard().snapshotStoreMetadata();
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
Expand All @@ -341,20 +341,23 @@ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget rec
* @param recoveryTarget the target of the recovery
* @return a start recovery request
*/
private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) {
public static StartRecoveryRequest getStartRecoveryRequest(RecoveryTarget recoveryTarget, DiscoveryNode localNode, Logger logger) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget, logger);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());

final long startingSeqNo;
long startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
if (metadataSnapshot.size() > 0) {
startingSeqNo = getStartingSeqNo(logger, recoveryTarget);
} else {
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
try {
startingSeqNo = getStartingSeqNo(logger, recoveryTarget);
} catch (IOException | TranslogCorruptedException e) {
logger.warn(new ParameterizedMessage("error while reading global checkpoint from translog, " +
"resetting the starting sequence number from {} to unassigned and recovering as if there are none", startingSeqNo), e);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
}
}

if (startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
} else {
Expand All @@ -369,7 +372,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
recoveryTarget.shardId(),
recoveryTarget.indexShard().routingEntry().allocationId().getId(),
recoveryTarget.sourceNode(),
clusterService.localNode(),
localNode,
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
Expand All @@ -384,39 +387,30 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
* @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
* failed
*/
public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) {
try {
final Store store = recoveryTarget.store();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit);
if (logger.isTraceEnabled()) {
final StringJoiner descriptionOfExistingCommits = new StringJoiner(",");
for (IndexCommit commit : existingCommits) {
descriptionOfExistingCommits.add(CombinedDeletionPolicy.commitDescription(commit));
}
logger.trace("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]",
globalCheckpoint, CombinedDeletionPolicy.commitDescription(safeCommit), descriptionOfExistingCommits);
}
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*
* Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global
* checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation
* after the local checkpoint stored in the commit.
*/
return seqNoStats.localCheckpoint + 1;
} else {
return SequenceNumbers.UNASSIGNED_SEQ_NO;
private static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) throws IOException {
final Store store = recoveryTarget.store();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit);
if (logger.isTraceEnabled()) {
final StringJoiner descriptionOfExistingCommits = new StringJoiner(",");
for (IndexCommit commit : existingCommits) {
descriptionOfExistingCommits.add(CombinedDeletionPolicy.commitDescription(commit));
}
} catch (final TranslogCorruptedException | IOException e) {
logger.trace("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]",
globalCheckpoint, CombinedDeletionPolicy.commitDescription(safeCommit), descriptionOfExistingCommits);
}
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*
* This can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
* translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and
* proceeds to attempt a sequence-number-based recovery.
* Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global
* checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation
* after the local checkpoint stored in the commit.
*/
return seqNoStats.localCheckpoint + 1;
} else {
return SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,22 @@

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.sameInstance;

public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {

public void testGetStartingSeqNo() throws Exception {
final IndexShard replica = newShard(false);
final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId());
try {
// Empty store
{
recoveryEmptyReplica(replica, true);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L));
final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(recoveryTarget, rNode, logger);
assertThat(request.startingSeqNo(), equalTo(0L));
assertThat(request.metadataSnapshot().size(), greaterThan(0));
recoveryTarget.decRef();
}
// Last commit is good - use it.
Expand All @@ -78,7 +83,9 @@ public void testGetStartingSeqNo() throws Exception {
replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test");
replica.sync();
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs));
final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(recoveryTarget, rNode, logger);
assertThat(request.startingSeqNo(), equalTo(initDocs));
assertThat(request.metadataSnapshot().size(), greaterThan(0));
recoveryTarget.decRef();
}
// Global checkpoint does not advance, last commit is not good - use the previous commit
Expand All @@ -92,15 +99,19 @@ public void testGetStartingSeqNo() throws Exception {
}
flushShard(replica);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs));
final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(recoveryTarget, rNode, logger);
assertThat(request.startingSeqNo(), equalTo(initDocs));
assertThat(request.metadataSnapshot().size(), greaterThan(0));
recoveryTarget.decRef();
}
// Advances the global checkpoint, a safe commit also advances
{
replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test");
replica.sync();
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs));
final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(recoveryTarget, rNode, logger);
assertThat(request.startingSeqNo(), equalTo(initDocs + moreDocs));
assertThat(request.metadataSnapshot().size(), greaterThan(0));
recoveryTarget.decRef();
}
// Different translogUUID, fallback to file-based
Expand All @@ -119,7 +130,9 @@ public void testGetStartingSeqNo() throws Exception {
writer.commit();
}
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(recoveryTarget, rNode, logger);
assertThat(request.metadataSnapshot(), sameInstance(Store.MetadataSnapshot.EMPTY));
assertThat(request.startingSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
recoveryTarget.decRef();
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,18 +609,7 @@ protected final void recoverUnstartedReplica(final IndexShard replica,
}
replica.prepareForIndexRecovery();
final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
final String targetAllocationId = recoveryTarget.indexShard().routingEntry().allocationId().getId();

final Store.MetadataSnapshot snapshot = getMetadataSnapshotOrEmpty(replica);
final long startingSeqNo;
if (snapshot.size() > 0) {
startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget);
} else {
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}

final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId,
pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo);
final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(recoveryTarget, rNode, logger);
final RecoverySourceHandler recovery = new RecoverySourceHandler(
primary, recoveryTarget, request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
Expand Down

0 comments on commit 3b827b8

Please sign in to comment.