From 0a11740fbd772b61d053e8184d8b47c5a8018126 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 27 Oct 2022 14:01:30 +0530 Subject: [PATCH] Add RecoverySourceHandlerFactory for extensibility --- .../DefaultRecoverySourceHandler.java | 27 ++++++++++++++++ .../recovery/PeerRecoverySourceService.java | 11 ++----- .../recovery/RecoverySourceHandler.java | 9 +++--- .../RecoverySourceHandlerFactory.java | 31 +++++++++++++++++++ .../ReplicaRecoverySourceHandler.java | 29 +++++++++++++++++ .../recovery/RecoverySourceHandlerTests.java | 18 +++++------ .../index/shard/IndexShardTestCase.java | 3 +- 7 files changed, 105 insertions(+), 23 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java create mode 100644 server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java create mode 100644 server/src/main/java/org/opensearch/indices/recovery/remotestore/ReplicaRecoverySourceHandler.java diff --git a/server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java new file mode 100644 index 0000000000000..44c6e8ccc7ee8 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.index.shard.IndexShard; +import org.opensearch.threadpool.ThreadPool; + +public class DefaultRecoverySourceHandler extends RecoverySourceHandler { + + public DefaultRecoverySourceHandler( + IndexShard shard, + RecoveryTargetHandler recoveryTarget, + ThreadPool threadPool, + StartRecoveryRequest request, + int fileChunkSizeInBytes, + int maxConcurrentFileChunks, + int maxConcurrentOperations + ) { + super(shard, recoveryTarget, threadPool, request, fileChunkSizeInBytes, maxConcurrentFileChunks, maxConcurrentOperations); + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java index a1cf78920cf7e..c88da80e3c2d0 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java @@ -324,6 +324,7 @@ void awaitEmpty() { private final class ShardRecoveryContext { final Map recoveryHandlers = new HashMap<>(); + private final RecoverySourceHandlerFactory recoverySourceHandlerFactory = new RecoverySourceHandlerFactory(); /** * Adds recovery source handler. @@ -378,15 +379,7 @@ private Tuple createRecovery recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime) ); - handler = new RecoverySourceHandler( - shard, - recoveryTarget, - shard.getThreadPool(), - request, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), - recoverySettings.getMaxConcurrentFileChunks(), - recoverySettings.getMaxConcurrentOperations() - ); + handler = recoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings); return Tuple.tuple(handler, recoveryTarget); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 505d3c7adfb3f..61a300374be38 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -97,7 +97,7 @@ * RecoverySourceHandler handles the three phases of shard recovery, which is * everything relating to copying the segment files as well as sending translog * operations across the wire once the segments have been copied. - * + *

* Note: There is always one source handler per recovery that handles all the * file and translog transfer. This handler is completely isolated from other recoveries * while the {@link RateLimiter} passed via {@link RecoverySettings} is shared across recoveries @@ -106,7 +106,7 @@ * * @opensearch.internal */ -public class RecoverySourceHandler { +public abstract class RecoverySourceHandler { protected final Logger logger; // Shard that is going to be recovered (the "source") @@ -435,8 +435,9 @@ private boolean isTargetSameHistory() { /** * Counts the number of history operations from the starting sequence number - * @param startingSeqNo the starting sequence number to count; included - * @return number of history operations + * + * @param startingSeqNo the starting sequence number to count; included + * @return number of history operations */ private int countNumberOfHistoryOperations(long startingSeqNo) throws IOException { return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java new file mode 100644 index 0000000000000..843a28c235c00 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.index.shard.IndexShard; + +public class RecoverySourceHandlerFactory { + + public RecoverySourceHandler create( + IndexShard shard, + RecoveryTargetHandler recoveryTarget, + StartRecoveryRequest request, + RecoverySettings recoverySettings + ) { + return new DefaultRecoverySourceHandler( + shard, + recoveryTarget, + shard.getThreadPool(), + request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks(), + recoverySettings.getMaxConcurrentOperations() + ); + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/remotestore/ReplicaRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/remotestore/ReplicaRecoverySourceHandler.java new file mode 100644 index 0000000000000..021a8a72ca5a0 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/remotestore/ReplicaRecoverySourceHandler.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery.remotestore; + +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.recovery.RecoverySourceHandler; +import org.opensearch.indices.recovery.RecoveryTargetHandler; +import org.opensearch.indices.recovery.StartRecoveryRequest; +import org.opensearch.threadpool.ThreadPool; + +public class ReplicaRecoverySourceHandler extends RecoverySourceHandler { + public ReplicaRecoverySourceHandler( + IndexShard shard, + RecoveryTargetHandler recoveryTarget, + ThreadPool threadPool, + StartRecoveryRequest request, + int fileChunkSizeInBytes, + int maxConcurrentFileChunks, + int maxConcurrentOperations + ) { + super(shard, recoveryTarget, threadPool, request, fileChunkSizeInBytes, maxConcurrentFileChunks, maxConcurrentOperations); + } +} diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index 2b5550b71a627..7c2e0036e9172 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -215,7 +215,7 @@ public void writeFileChunk( }); } }; - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new DefaultRecoverySourceHandler( null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -296,7 +296,7 @@ public void indexTranslogOperations( listener.onResponse(checkpointOnTarget.get()); } }; - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new DefaultRecoverySourceHandler( shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, @@ -359,7 +359,7 @@ public void indexTranslogOperations( } } }; - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new DefaultRecoverySourceHandler( shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, @@ -433,7 +433,7 @@ public void indexTranslogOperations( Randomness.shuffle(operations); List skipOperations = randomSubsetOf(operations); Translog.Snapshot snapshot = newTranslogSnapshot(operations, skipOperations); - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new DefaultRecoverySourceHandler( shard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -552,7 +552,7 @@ public void writeFileChunk( failedEngine.set(true); return null; }).when(mockShard).failShard(any(), any()); - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new DefaultRecoverySourceHandler( mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -627,7 +627,7 @@ public void writeFileChunk( failedEngine.set(true); return null; }).when(mockShard).failShard(any(), any()); - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new DefaultRecoverySourceHandler( mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -786,7 +786,7 @@ public void writeFileChunk( }; final int maxConcurrentChunks = between(1, 8); final int chunkSize = between(1, 32); - final RecoverySourceHandler handler = new RecoverySourceHandler( + final RecoverySourceHandler handler = new DefaultRecoverySourceHandler( shard, recoveryTarget, threadPool, @@ -859,7 +859,7 @@ public void writeFileChunk( }; final int maxConcurrentChunks = between(1, 4); final int chunkSize = between(1, 16); - final RecoverySourceHandler handler = new RecoverySourceHandler( + final RecoverySourceHandler handler = new DefaultRecoverySourceHandler( null, new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor), threadPool, @@ -1006,7 +1006,7 @@ void createRetentionLease(long startingSeqNo, ActionListener lis public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception { IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new DefaultRecoverySourceHandler( shard, new TestRecoveryTargetHandler(), threadPool, diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index f520206e0f866..953b6dfbb2f7b 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -99,6 +99,7 @@ import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.recovery.AsyncRecoveryTarget; +import org.opensearch.indices.recovery.DefaultRecoverySourceHandler; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryResponse; @@ -864,7 +865,7 @@ protected final void recoverUnstartedReplica( int fileChunkSizeInBytes = Math.toIntExact( randomBoolean() ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() : randomIntBetween(1, 10 * 1024 * 1024) ); - final RecoverySourceHandler recovery = new RecoverySourceHandler( + final RecoverySourceHandler recovery = new DefaultRecoverySourceHandler( primary, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool,