Skip to content

Commit

Permalink
Add RecoverySourceHandlerFactory for extensibility
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Oct 27, 2022
1 parent 722fbfd commit 472fc70
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.recovery;

import org.opensearch.index.shard.IndexShard;
import org.opensearch.threadpool.ThreadPool;

public class DefaultRecoverySourceHandler extends RecoverySourceHandler {

public DefaultRecoverySourceHandler(
IndexShard shard,
RecoveryTargetHandler recoveryTarget,
ThreadPool threadPool,
StartRecoveryRequest request,
int fileChunkSizeInBytes,
int maxConcurrentFileChunks,
int maxConcurrentOperations
) {
super(shard, recoveryTarget, threadPool, request, fileChunkSizeInBytes, maxConcurrentFileChunks, maxConcurrentOperations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ void awaitEmpty() {

private final class ShardRecoveryContext {
final Map<RecoverySourceHandler, RemoteRecoveryTargetHandler> recoveryHandlers = new HashMap<>();
private final RecoverySourceHandlerFactory recoverySourceHandlerFactory = new RecoverySourceHandlerFactory();

/**
* Adds recovery source handler.
Expand Down Expand Up @@ -378,15 +379,7 @@ private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecovery
recoverySettings,
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)
);
handler = new RecoverySourceHandler(
shard,
recoveryTarget,
shard.getThreadPool(),
request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks(),
recoverySettings.getMaxConcurrentOperations()
);
handler = recoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings);
return Tuple.tuple(handler, recoveryTarget);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
* RecoverySourceHandler handles the three phases of shard recovery, which is
* everything relating to copying the segment files as well as sending translog
* operations across the wire once the segments have been copied.
*
* <p>
* Note: There is always one source handler per recovery that handles all the
* file and translog transfer. This handler is completely isolated from other recoveries
* while the {@link RateLimiter} passed via {@link RecoverySettings} is shared across recoveries
Expand All @@ -106,7 +106,7 @@
*
* @opensearch.internal
*/
public class RecoverySourceHandler {
public abstract class RecoverySourceHandler {

protected final Logger logger;
// Shard that is going to be recovered (the "source")
Expand Down Expand Up @@ -435,8 +435,9 @@ private boolean isTargetSameHistory() {

/**
* Counts the number of history operations from the starting sequence number
* @param startingSeqNo the starting sequence number to count; included
* @return number of history operations
*
* @param startingSeqNo the starting sequence number to count; included
* @return number of history operations
*/
private int countNumberOfHistoryOperations(long startingSeqNo) throws IOException {
return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.recovery;

import org.opensearch.index.shard.IndexShard;

public class RecoverySourceHandlerFactory {

public RecoverySourceHandler create(
IndexShard shard,
RecoveryTargetHandler recoveryTarget,
StartRecoveryRequest request,
RecoverySettings recoverySettings
) {
return new DefaultRecoverySourceHandler(
shard,
recoveryTarget,
shard.getThreadPool(),
request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks(),
recoverySettings.getMaxConcurrentOperations()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.recovery.remotestore;

import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.RecoverySourceHandler;
import org.opensearch.indices.recovery.RecoveryTargetHandler;
import org.opensearch.indices.recovery.StartRecoveryRequest;
import org.opensearch.threadpool.ThreadPool;

public class ReplicaRecoverySourceHandler extends RecoverySourceHandler {
public ReplicaRecoverySourceHandler(
IndexShard shard,
RecoveryTargetHandler recoveryTarget,
ThreadPool threadPool,
StartRecoveryRequest request,
int fileChunkSizeInBytes,
int maxConcurrentFileChunks,
int maxConcurrentOperations
) {
super(shard, recoveryTarget, threadPool, request, fileChunkSizeInBytes, maxConcurrentFileChunks, maxConcurrentOperations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void writeFileChunk(
});
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
null,
new AsyncRecoveryTarget(target, recoveryExecutor),
threadPool,
Expand Down Expand Up @@ -296,7 +296,7 @@ public void indexTranslogOperations(
listener.onResponse(checkpointOnTarget.get());
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
shard,
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
threadPool,
Expand Down Expand Up @@ -359,7 +359,7 @@ public void indexTranslogOperations(
}
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
shard,
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
threadPool,
Expand Down Expand Up @@ -433,7 +433,7 @@ public void indexTranslogOperations(
Randomness.shuffle(operations);
List<Translog.Operation> skipOperations = randomSubsetOf(operations);
Translog.Snapshot snapshot = newTranslogSnapshot(operations, skipOperations);
RecoverySourceHandler handler = new RecoverySourceHandler(
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
shard,
new AsyncRecoveryTarget(target, recoveryExecutor),
threadPool,
Expand Down Expand Up @@ -552,7 +552,7 @@ public void writeFileChunk(
failedEngine.set(true);
return null;
}).when(mockShard).failShard(any(), any());
RecoverySourceHandler handler = new RecoverySourceHandler(
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
mockShard,
new AsyncRecoveryTarget(target, recoveryExecutor),
threadPool,
Expand Down Expand Up @@ -627,7 +627,7 @@ public void writeFileChunk(
failedEngine.set(true);
return null;
}).when(mockShard).failShard(any(), any());
RecoverySourceHandler handler = new RecoverySourceHandler(
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
mockShard,
new AsyncRecoveryTarget(target, recoveryExecutor),
threadPool,
Expand Down Expand Up @@ -786,7 +786,7 @@ public void writeFileChunk(
};
final int maxConcurrentChunks = between(1, 8);
final int chunkSize = between(1, 32);
final RecoverySourceHandler handler = new RecoverySourceHandler(
final RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
shard,
recoveryTarget,
threadPool,
Expand Down Expand Up @@ -859,7 +859,7 @@ public void writeFileChunk(
};
final int maxConcurrentChunks = between(1, 4);
final int chunkSize = between(1, 16);
final RecoverySourceHandler handler = new RecoverySourceHandler(
final RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
null,
new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor),
threadPool,
Expand Down Expand Up @@ -1006,7 +1006,7 @@ void createRetentionLease(long startingSeqNo, ActionListener<RetentionLease> lis
public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception {
IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
RecoverySourceHandler handler = new RecoverySourceHandler(
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
shard,
new TestRecoveryTargetHandler(),
threadPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
import org.opensearch.indices.recovery.AsyncRecoveryTarget;
import org.opensearch.indices.recovery.DefaultRecoverySourceHandler;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryResponse;
Expand Down Expand Up @@ -864,7 +865,7 @@ protected final void recoverUnstartedReplica(
int fileChunkSizeInBytes = Math.toIntExact(
randomBoolean() ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() : randomIntBetween(1, 10 * 1024 * 1024)
);
final RecoverySourceHandler recovery = new RecoverySourceHandler(
final RecoverySourceHandler recovery = new DefaultRecoverySourceHandler(
primary,
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
threadPool,
Expand Down

0 comments on commit 472fc70

Please sign in to comment.