Skip to content

Commit

Permalink
Remove PRRL after SendFileStep in Peer Recovery
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Oct 27, 2022
1 parent 69763ac commit 77d0d3d
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 27 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459))
- Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761))
- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839))
- [Remote Store] Remove PRRL creation/deletion in replica peer recovery ([#4954](https://github.com/opensearch-project/OpenSearch/pull/4954))
- [Remote Store] Remove PRRL creation/deletion in peer recovery of remote store enabled replica ([#4954](https://github.com/opensearch-project/OpenSearch/pull/4954))

### Deprecated
### Removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ private boolean invariant() {
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked && !indexSettings().isRemoteTranslogStoreEnabled()) {
assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
: "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases;
assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ && isTargetSameHistory()

deleteRetentionLeaseStep.whenComplete(ignored -> {
assert Transports.assertNotTransportThread(this + "[phase1]");
phase1(wrappedSafeCommit.get(), startingSeqNo, () -> estimateNumOps, sendFileStep);
phase1(wrappedSafeCommit.get(), startingSeqNo, () -> estimateNumOps, sendFileStep, false);
}, onFailure);

} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,13 @@ static final class SendFileResult {
* segments that are missing. Only segments that have the same size and
* checksum can be reused
*/
void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
void phase1(
IndexCommit snapshot,
long startingSeqNo,
IntSupplier translogOps,
ActionListener<SendFileResult> listener,
boolean skipCreateRetentionLeaseStep
) {
cancellableThreads.checkForCancel();
final Store store = shard.store();
try {
Expand Down Expand Up @@ -444,7 +450,12 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A
listener::onFailure
);

sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);
// When doing peer recovery of remote store enabled replica, retention leases are not required.
if (skipCreateRetentionLeaseStep) {
sendFilesStep.whenComplete(r -> createRetentionLeaseStep.onResponse(null), listener::onFailure);
} else {
sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);
}

createRetentionLeaseStep.whenComplete(retentionLease -> {
final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
onSendFileStepComplete(sendFileStep, wrappedSafeCommit, releaseStore);

assert Transports.assertNotTransportThread(this + "[phase1]");
phase1(wrappedSafeCommit.get(), startingSeqNo, () -> 0, sendFileStep);
phase1(wrappedSafeCommit.get(), startingSeqNo, () -> 0, sendFileStep, true);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;

Expand Down Expand Up @@ -146,20 +147,23 @@ public void testStartSequenceForReplicaRecovery() throws Exception {
null
);
shards.addReplica(newReplicaShard);
AtomicBoolean assertDone = new AtomicBoolean(false);
shards.recoverReplica(newReplicaShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) {
@Override
public IndexShard indexShard() {
IndexShard idxShard = super.indexShard();
// verify the starting sequence number while recovering a failed shard which has a valid last commit
long startingSeqNo = -1;
try {
startingSeqNo = Long.parseLong(
idxShard.store().readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO)
);
} catch (IOException e) {
Assert.fail();
if (assertDone.compareAndSet(false, true)) {
// verify the starting sequence number while recovering a failed shard which has a valid last commit
long startingSeqNo = -1;
try {
startingSeqNo = Long.parseLong(
idxShard.store().readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO)
);
} catch (IOException e) {
Assert.fail();
}
assertEquals(numDocs - 1, startingSeqNo);
}
assertEquals(numDocs - 1, startingSeqNo);
return idxShard;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,15 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
) {

@Override
void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
void phase1(
IndexCommit snapshot,
long startingSeqNo,
IntSupplier translogOps,
ActionListener<SendFileResult> listener,
boolean skipCreateRetentionLeaseStep
) {
phase1Called.set(true);
super.phase1(snapshot, startingSeqNo, translogOps, listener);
super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep);
}

@Override
Expand Down Expand Up @@ -993,7 +999,7 @@ void createRetentionLease(long startingSeqNo, ActionListener<RetentionLease> lis
final StepListener<RecoverySourceHandler.SendFileResult> phase1Listener = new StepListener<>();
try {
final CountDownLatch latch = new CountDownLatch(1);
handler.phase1(DirectoryReader.listCommits(dir).get(0), 0, () -> 0, new LatchedActionListener<>(phase1Listener, latch));
handler.phase1(DirectoryReader.listCommits(dir).get(0), 0, () -> 0, new LatchedActionListener<>(phase1Listener, latch), false);
latch.await();
phase1Listener.result();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -99,12 +100,12 @@
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
import org.opensearch.indices.recovery.AsyncRecoveryTarget;
import org.opensearch.indices.recovery.DefaultRecoverySourceHandler;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryResponse;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoverySourceHandler;
import org.opensearch.indices.recovery.RecoverySourceHandlerFactory;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.recovery.StartRecoveryRequest;
Expand Down Expand Up @@ -133,8 +134,8 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -862,17 +863,23 @@ protected final void recoverUnstartedReplica(
recoveryTarget,
startingSeqNo
);
int fileChunkSizeInBytes = Math.toIntExact(
randomBoolean() ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() : randomIntBetween(1, 10 * 1024 * 1024)
long fileChunkSizeInBytes = randomBoolean()
? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes()
: randomIntBetween(1, 10 * 1024 * 1024);
final Settings settings = Settings.builder()
.put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4)))
.put("indices.recovery.max_concurrent_operations", Integer.toString(between(1, 4)))
.build();
RecoverySettings recoverySettings = new RecoverySettings(
settings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
final RecoverySourceHandler recovery = new DefaultRecoverySourceHandler(
recoverySettings.setChunkSize(new ByteSizeValue(fileChunkSizeInBytes));
final RecoverySourceHandler recovery = new RecoverySourceHandlerFactory().create(
primary,
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
threadPool,
request,
fileChunkSizeInBytes,
between(1, 8),
between(1, 8)
recoverySettings
);
primary.updateShardState(
primary.routingEntry(),
Expand Down

0 comments on commit 77d0d3d

Please sign in to comment.