diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 243af4ec248c3..fbff99536ebf4 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -170,6 +170,13 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) { * @return true if sync is needed */ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) { + // Ignore syncing segments if the underlying shard is closed + // This also makes sure that retries are not scheduled for shards + // with failed syncSegments invocation after they are closed + if (shardClosed()) { + logger.info("Shard is already closed. Not attempting sync to remote store"); + return false; + } boolean shouldSync = didRefresh // If the readers change, didRefresh is always true. // The third condition exists for uploading the zero state segments where the refresh has not changed the reader // reference, but it is important to upload the zero state segments so that the restore does not break. @@ -608,6 +615,15 @@ public void onFailure(String file) { }; } + /** + * Checks if the underlying IndexShard instance is closed + * + * @return true if it is closed, false otherwise + */ + private boolean shardClosed() { + return indexShard.state() == IndexShardState.CLOSED; + } + @Override protected Logger getLogger() { return logger; diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index e0ee58cbfdb22..ee80b1b037ce5 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; @@ -469,6 +470,25 @@ public void testRefreshPersistentFailure() throws Exception { assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync()); } + public void testRefreshPersistentFailureAndIndexShardClosed() throws Exception { + int succeedOnAttempt = 3; + int closeShardOnAttempt = 1; + CountDownLatch refreshCountLatch = new CountDownLatch(1); + CountDownLatch successLatch = new CountDownLatch(10); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + true, + closeShardOnAttempt + ); + // Giving 10ms for some iterations of remote refresh upload + Thread.sleep(TimeUnit.SECONDS.toMillis(2)); + RemoteStoreRefreshListener listener = tuple.v1(); + assertFalse("remote store should not in sync", listener.isRemoteSegmentStoreInSync()); + assertFalse(listener.getRetryScheduledStatus()); + } + private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segmentTracker, long totalUploadsFailed) throws Exception { assertBusy(() -> { assertEquals(0, segmentTracker.getBytesLag()); @@ -547,6 +567,49 @@ private Tuple mockIn return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch); } + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int totalAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch, + int checkpointPublishSucceedOnAttempt, + CountDownLatch reachedCheckpointPublishLatch, + boolean mockPrimaryTerm, + boolean testUploadTimeout + ) throws IOException { + return mockIndexShardWithRetryAndScheduleRefresh( + totalAttempt, + refreshCountLatch, + successLatch, + checkpointPublishSucceedOnAttempt, + reachedCheckpointPublishLatch, + mockPrimaryTerm, + testUploadTimeout, + false, + 0 + ); + } + + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int succeedOnAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch, + boolean closedShard, + int closeShardAfterAttempt + ) throws IOException { + CountDownLatch noOpLatch = new CountDownLatch(0); + return mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + 1, + noOpLatch, + true, + false, + closedShard, + closeShardAfterAttempt + ); + } + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, @@ -561,7 +624,9 @@ private Tuple mockIn succeedCheckpointPublishOnAttempt, reachedCheckpointPublishLatch, true, - false + false, + false, + 0 ); } @@ -572,7 +637,9 @@ private Tuple mockIn int succeedCheckpointPublishOnAttempt, CountDownLatch reachedCheckpointPublishLatch, boolean mockPrimaryTerm, - boolean testUploadTimeout + boolean testUploadTimeout, + boolean closeShard, + int closeShardAfterAttempt ) throws IOException { // Create index shard that we will be using to mock different methods in IndexShard for the unit test indexShard = newStartedShard( @@ -600,7 +667,6 @@ private Tuple mockIn IndexShard shard = mock(IndexShard.class); Store store = mock(Store.class); when(shard.store()).thenReturn(store); - when(shard.state()).thenReturn(IndexShardState.STARTED); when(store.directory()).thenReturn(indexShard.store().directory()); // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -662,6 +728,14 @@ private Tuple mockIn return indexShard.getLatestReplicationCheckpoint(); })).when(shard).computeReplicationCheckpoint(any()); + doAnswer((invocationOnMock -> { + if (closeShard && counter.get() == closeShardAfterAttempt) { + logger.info("Closing shard..."); + return IndexShardState.CLOSED; + } + return IndexShardState.STARTED; + })).when(shard).state(); + doAnswer(invocation -> { if (Objects.nonNull(successLatch)) { successLatch.countDown();