From ea84f4f5c36ffb86d978397a67c5447b76978581 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 3 May 2023 16:22:02 +0530 Subject: [PATCH 1/9] [Remote Segments] Retry remote refresh on failure Signed-off-by: Ashish Singh --- .../RemoteStoreRefreshListenerIT.java | 152 ++++++++++++ .../common/settings/IndexScopedSettings.java | 4 + .../shard/RemoteStoreRefreshListener.java | 224 ++++++++++++------ .../org/opensearch/threadpool/ThreadPool.java | 26 +- .../threadpool/ScalingThreadPoolTests.java | 1 + .../snapshots/mockstore/MockRepository.java | 40 +++- 6 files changed, 359 insertions(+), 88 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java new file mode 100644 index 0000000000000..c7db9930b8bc8 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -0,0 +1,152 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.util.FileSystemUtils; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; +import org.opensearch.test.FeatureFlagSetter; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreRefreshListenerIT extends AbstractSnapshotIntegTestCase { + + private static final String REPOSITORY_NAME = "my-segment-repo-1"; + private static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .put(FeatureFlags.REMOTE_STORE, "true") + .build(); + } + + @Before + public void setup() { + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + } + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } + + private Settings remoteStoreIndexSettings(int numberOfReplicas) { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + public void testRemoteRefreshRetryOnFailure() throws Exception { + + // Create repository + Path location = randomRepoPath().toAbsolutePath(); + createRepository( + REPOSITORY_NAME, + "mock", + Settings.builder() + .put("location", location) + .put("random_control_io_exception_rate", randomIntBetween(10, 25) / 100f) + .put("skip_exception_on_verification_file", true) + .put("skip_exception_on_list_blobs", true) + ); + + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + indexData(randomIntBetween(5, 10), randomBoolean()); + IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get(); + assertEquals(1, response.getShards().length); + + String indexUuid = response.getShards()[0].getShardRouting().index().getUUID(); + Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid)); + String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid); + + assertBusy( + () -> assertEquals(getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath)), getSegmentFiles(segmentDataRepoPath)) + ); + } + + /** + * Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc. + * + * @param location the path to location where segment files are being searched. + * @return set of file names of all segment file or empty set if there was IOException thrown. + */ + private Set getSegmentFiles(Path location) { + try { + return Arrays.stream(FileSystemUtils.files(location)) + .filter(path -> path.getFileName().startsWith("_")) + .map(path -> path.getFileName().toString()) + .collect(Collectors.toSet()); + } catch (IOException exception) { + logger.error("Exception occurred while getting segment files", exception); + } + return Collections.emptySet(); + } + + private IndexResponse indexSingleDoc() { + return client().prepareIndex(INDEX_NAME) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + + private void indexData(int numberOfIterations, boolean invokeFlush) { + for (int i = 0; i < numberOfIterations; i++) { + int numberOfOperations = randomIntBetween(20, 50); + for (int j = 0; j < numberOfOperations; j++) { + indexSingleDoc(); + } + if (invokeFlush) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 6ef12b550fc5e..e619a0ed80b22 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -53,6 +53,7 @@ import org.opensearch.index.fielddata.IndexFieldDataService; import org.opensearch.index.mapper.FieldMapper; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; import org.opensearch.index.store.Store; @@ -204,6 +205,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.SEARCHABLE_SNAPSHOT_ID_NAME, IndexSettings.SEARCHABLE_SNAPSHOT_ID_UUID, + // Settings for Remote Store + RemoteStoreRefreshListener.INDEX_REMOTE_REFRESH_RETRY_INTERVAL, + // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index efd2686b41a20..2fb18e0264ef0 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -21,10 +21,13 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; @@ -35,6 +38,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -46,6 +51,25 @@ * @opensearch.internal */ public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { + + private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); + + public static final int REMOTE_REFRESH_RETRY_INTERVAL_DEFAULT_VALUE = 1; + + private static final int MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES = 1; + + /** + * If the remote refresh segment sync fails, we retry to sync the segments scheduling it after a retry interval + * which is controlled by the below setting. + */ + public static final Setting INDEX_REMOTE_REFRESH_RETRY_INTERVAL = Setting.intSetting( + "index.remote_store.segment_sync.retry_interval", + REMOTE_REFRESH_RETRY_INTERVAL_DEFAULT_VALUE, + REMOTE_REFRESH_RETRY_INTERVAL_DEFAULT_VALUE, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + // Visible for testing static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing @@ -57,7 +81,13 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final RemoteSegmentStoreDirectory remoteDirectory; private final Map localSegmentChecksumMap; private long primaryTerm; - private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); + + /** + * Semaphore that ensures there is only 1 retry scheduled at any time. + */ + private final Semaphore retrySemaphore = new Semaphore(MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES); + + private volatile int remoteRefreshRetryInterval; public RemoteStoreRefreshListener(IndexShard indexShard) { this.indexShard = indexShard; @@ -73,6 +103,13 @@ public RemoteStoreRefreshListener(IndexShard indexShard) { logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); } } + + // Retry remote refresh on failure - This interval is the time delay after which the sync of segments to remote store + // would be retried. This has been kept as an index scoped setting inline with index.refresh_interval setting. + remoteRefreshRetryInterval = INDEX_REMOTE_REFRESH_RETRY_INTERVAL.get(indexShard.indexSettings().getSettings()); + indexShard.indexSettings() + .getScopedSettings() + .addSettingsUpdateConsumer(INDEX_REMOTE_REFRESH_RETRY_INTERVAL, this::setRemoteRefreshRetryInterval); } @Override @@ -83,92 +120,127 @@ public void beforeRefresh() throws IOException { /** * Upload new segment files created as part of the last refresh to the remote segment store. * This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded. + * * @param didRefresh true if the refresh opened a new reference */ @Override public void afterRefresh(boolean didRefresh) { - synchronized (this) { - try { - if (indexShard.getReplicationTracker().isPrimaryMode()) { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { - this.primaryTerm = indexShard.getOperationPrimaryTerm(); - this.remoteDirectory.init(); + try { + indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); + } catch (InterruptedException | ExecutionException e) { + logger.info("Exception occurred while scheduling syncSegments", e); + } + } + + private synchronized void syncSegments(boolean retry) { + boolean shouldRetry = false; + beforeSegmentsSync(retry); + try { + if (indexShard.getReplicationTracker().isPrimaryMode()) { + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + this.remoteDirectory.init(); + } + try { + // if a new segments_N file is present in local that is not uploaded to remote store yet, it + // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. + // This is done to avoid delete post each refresh. + // Ideally, we want this to be done in async flow. (GitHub issue #4315) + if (isRefreshAfterCommit()) { + deleteStaleCommits(); } - try { - // if a new segments_N file is present in local that is not uploaded to remote store yet, it - // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. - // This is done to avoid delete post each refresh. - // Ideally, we want this to be done in async flow. (GitHub issue #4315) - if (isRefreshAfterCommit()) { - deleteStaleCommits(); - } - String segmentInfoSnapshotFilename = null; - try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { - SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); - - Collection localSegmentsPostRefresh = segmentInfos.files(true); - - List segmentInfosFiles = localSegmentsPostRefresh.stream() - .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) - .collect(Collectors.toList()); - Optional latestSegmentInfos = segmentInfosFiles.stream() - .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); - - if (latestSegmentInfos.isPresent()) { - // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain - // all the segments from last commit if they are merged away but not yet committed. - // Each metadata file in the remote segment store represents a commit and the following - // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed - // segments. - localSegmentsPostRefresh.addAll( - SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true) + String segmentInfoSnapshotFilename = null; + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); + + Collection localSegmentsPostRefresh = segmentInfos.files(true); + + List segmentInfosFiles = localSegmentsPostRefresh.stream() + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .collect(Collectors.toList()); + Optional latestSegmentInfos = segmentInfosFiles.stream() + .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); + + if (latestSegmentInfos.isPresent()) { + // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain + // all the segments from last commit if they are merged away but not yet committed. + // Each metadata file in the remote segment store represents a commit and the following + // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed + // segments. + localSegmentsPostRefresh.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); + segmentInfosFiles.stream() + .filter(file -> !file.equals(latestSegmentInfos.get())) + .forEach(localSegmentsPostRefresh::remove); + + boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh); + if (uploadStatus) { + segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); + localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); + + remoteDirectory.uploadMetadata( + localSegmentsPostRefresh, + storeDirectory, + indexShard.getOperationPrimaryTerm(), + segmentInfos.getGeneration() ); - segmentInfosFiles.stream() - .filter(file -> !file.equals(latestSegmentInfos.get())) - .forEach(localSegmentsPostRefresh::remove); - - boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh); - if (uploadStatus) { - segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); - localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); - - remoteDirectory.uploadMetadata( - localSegmentsPostRefresh, - storeDirectory, - indexShard.getOperationPrimaryTerm(), - segmentInfos.getGeneration() - ); - localSegmentChecksumMap.keySet() - .stream() - .filter(file -> !localSegmentsPostRefresh.contains(file)) - .collect(Collectors.toSet()) - .forEach(localSegmentChecksumMap::remove); - final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()) - .lastRefreshedCheckpoint(); - indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); - } + localSegmentChecksumMap.keySet() + .stream() + .filter(file -> !localSegmentsPostRefresh.contains(file)) + .collect(Collectors.toSet()) + .forEach(localSegmentChecksumMap::remove); + final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); + indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); + } else { + shouldRetry = true; } - } catch (EngineException e) { - logger.warn("Exception while reading SegmentInfosSnapshot", e); - } finally { - try { - if (segmentInfoSnapshotFilename != null) { - storeDirectory.deleteFile(segmentInfoSnapshotFilename); - } - } catch (IOException e) { - logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); + } + } catch (EngineException e) { + shouldRetry = true; + logger.warn("Exception while reading SegmentInfosSnapshot", e); + } finally { + try { + if (segmentInfoSnapshotFilename != null) { + storeDirectory.deleteFile(segmentInfoSnapshotFilename); } + } catch (IOException e) { + logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); } - } catch (IOException e) { - // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried - // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. - logger.warn("Exception while uploading new segments to the remote segment store", e); } + } catch (IOException e) { + shouldRetry = true; + // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried + // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. + logger.warn("Exception while uploading new segments to the remote segment store", e); } - } catch (Throwable t) { - logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); } + } catch (Throwable t) { + logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); + } + afterSegmentsSync(retry, shouldRetry); + } + + private void beforeSegmentsSync(boolean isRetry) { + if (isRetry) { + logger.info("Retrying to sync the segments to remote store"); + } + } + + private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) { + // If this was a retry attempt, then we release the semaphore at the end so that further retries can be scheduled + if (isRetry) { + retrySemaphore.release(); + } + + // If there are failures in uploading segments, then we should retry as search idle can lead to + // refresh not occurring until write happens. + if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retrySemaphore.tryAcquire()) { + indexShard.getThreadPool() + .schedule( + () -> this.syncSegments(true), + TimeValue.timeValueSeconds(remoteRefreshRetryInterval), + ThreadPool.Names.REMOTE_REFRESH + ); } } @@ -238,4 +310,8 @@ private void deleteStaleCommits() { logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); } } + + public void setRemoteRefreshRetryInterval(int remoteRefreshRetryInterval) { + this.remoteRefreshRetryInterval = remoteRefreshRetryInterval; + } } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 104af6945dfe8..31d43acadb611 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -111,6 +111,7 @@ public static class Names { public static final String TRANSLOG_TRANSFER = "translog_transfer"; public static final String TRANSLOG_SYNC = "translog_sync"; public static final String REMOTE_PURGE = "remote_purge"; + public static final String REMOTE_REFRESH = "remote_refresh"; } /** @@ -178,6 +179,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING); map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED); map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING); + map.put(Names.REMOTE_REFRESH, ThreadPoolType.SCALING); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -256,6 +258,10 @@ public ThreadPool( ); builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000)); builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); + builders.put( + Names.REMOTE_REFRESH, + new ScalingExecutorBuilder(Names.REMOTE_REFRESH, 1, allocatedProcessors * 4, TimeValue.timeValueMinutes(5)) + ); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { @@ -295,7 +301,7 @@ public ThreadPool( /** * Returns a value of milliseconds that may be used for relative time calculations. - * + *

* This method should only be used for calculating time deltas. For an epoch based * timestamp, see {@link #absoluteTimeInMillis()}. */ @@ -305,7 +311,7 @@ public long relativeTimeInMillis() { /** * Returns a value of nanoseconds that may be used for relative time calculations. - * + *

* This method should only be used for calculating time deltas. For an epoch based * timestamp, see {@link #absoluteTimeInMillis()}. */ @@ -315,7 +321,7 @@ public long relativeTimeInNanos() { /** * Returns the value of milliseconds since UNIX epoch. - * + *

* This method should only be used for exact date/time formatting. For calculating * time deltas that should not suffer from negative deltas, which are possible with * this method, see {@link #relativeTimeInMillis()}. @@ -402,14 +408,14 @@ public ExecutorService executor(String name) { /** * Schedules a one-shot command to run after a given delay. The command is run in the context of the calling thread. * - * @param command the command to run - * @param delay delay before the task executes + * @param command the command to run + * @param delay delay before the task executes * @param executor the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes - * the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the - * command completes. + * the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the + * command completes. * @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if - * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool - * the ScheduledFuture will cannot interact with it. + * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool + * the ScheduledFuture will cannot interact with it. * @throws OpenSearchRejectedExecutionException if the task cannot be scheduled for execution */ @Override @@ -609,7 +615,7 @@ public String toString() { /** * A thread to cache millisecond time values from * {@link System#nanoTime()} and {@link System#currentTimeMillis()}. - * + *

* The values are updated at a specified interval. */ static class CachedTimeThread extends Thread { diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index d1e7e25369b12..bfa97fb1d3fe7 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -135,6 +135,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessorsMaxTen); sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive); + sizes.put(ThreadPool.Names.REMOTE_REFRESH, n -> 4 * n); return sizes.get(threadPoolName).apply(numberOfProcessors); } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java index 7b53c36fbacf9..0e47130e424cd 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java @@ -78,6 +78,8 @@ public class MockRepository extends FsRepository { private static final Logger logger = LogManager.getLogger(MockRepository.class); + private static final String DUMMY_FILE_NAME_LIST_BLOBS = "dummy-name-list-blobs"; + public static class Plugin extends org.opensearch.plugins.Plugin implements RepositoryPlugin { public static final Setting USERNAME_SETTING = Setting.simpleString("secret.mock.username", Property.NodeScope); @@ -116,6 +118,10 @@ public long getFailureCount() { private final double randomDataFileIOExceptionRate; + private final boolean skipExceptionOnVerificationFile; + + private final boolean skipExceptionOnListBlobs; + private final boolean useLuceneCorruptionException; private final long maximumNumberOfFailures; @@ -174,6 +180,8 @@ public MockRepository( super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService, recoverySettings); randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); + skipExceptionOnVerificationFile = metadata.settings().getAsBoolean("skip_exception_on_verification_file", false); + skipExceptionOnListBlobs = metadata.settings().getAsBoolean("skip_exception_on_list_blobs", false); useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); blockOnAnyFiles = metadata.settings().getAsBoolean("block_on_control", false); @@ -360,9 +368,14 @@ private int hashCode(String path) { } private void maybeIOExceptionOrBlock(String blobName) throws IOException { - if (INDEX_LATEST_BLOB.equals(blobName)) { - // Don't mess with the index.latest blob here, failures to write to it are ignored by upstream logic and we have - // specific tests that cover the error handling around this blob. + if (INDEX_LATEST_BLOB.equals(blobName) // Condition 1 + || skipExceptionOnVerificationFiles(blobName) // Condition 2 + || skipExceptionOnListBlobs(blobName)) { // Condition 3 + // Condition 1 - Don't mess with the index.latest blob here, failures to write to it are ignored by + // upstream logic and we have specific tests that cover the error handling around this blob. + // Condition 2 & 3 - This condition has been added to allow creation of repository which throws IO + // exception during normal remote store operations. However, if we fail during verification as well, + // then we can not add the repository as well. return; } if (blobName.startsWith("__")) { @@ -482,7 +495,7 @@ public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOExce @Override public Map listBlobs() throws IOException { - maybeIOExceptionOrBlock(""); + maybeIOExceptionOrBlock(DUMMY_FILE_NAME_LIST_BLOBS); return super.listBlobs(); } @@ -550,5 +563,24 @@ public void writeBlobAtomic( } } } + + private boolean skipExceptionOnVerificationFiles(String blobName) { + return skipExceptionOnVerificationFile && isVerificationFile(blobName); + } + + /** + * Checks if the file name is one of the types of verification files that is created at the time of creation of + * repository. + * + * @param blobName name of the blob + * @return true if it is the file created at the time of repository creation + */ + private boolean isVerificationFile(String blobName) { + return blobName.equals("master.dat") || (blobName.startsWith("data-") && blobName.endsWith(".dat")); + } + + private boolean skipExceptionOnListBlobs(String blobName) { + return skipExceptionOnListBlobs && DUMMY_FILE_NAME_LIST_BLOBS.equals(blobName); + } } } From 7589820dfd0b3617f9279df6b36e46933c2dcb1e Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 4 May 2023 16:31:25 +0530 Subject: [PATCH 2/9] [Remote Segments] Introduce exponential backoff retry on failures Signed-off-by: Ashish Singh --- .../common/settings/IndexScopedSettings.java | 4 - .../shard/RemoteStoreRefreshListener.java | 79 ++++++++++++------- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index e619a0ed80b22..6ef12b550fc5e 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -53,7 +53,6 @@ import org.opensearch.index.fielddata.IndexFieldDataService; import org.opensearch.index.mapper.FieldMapper; import org.opensearch.index.mapper.MapperService; -import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; import org.opensearch.index.store.Store; @@ -205,9 +204,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.SEARCHABLE_SNAPSHOT_ID_NAME, IndexSettings.SEARCHABLE_SNAPSHOT_ID_UUID, - // Settings for Remote Store - RemoteStoreRefreshListener.INDEX_REMOTE_REFRESH_RETRY_INTERVAL, - // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 2fb18e0264ef0..49c87a3aa28f6 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -20,13 +20,14 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.common.concurrent.GatedCloseable; -import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -34,12 +35,14 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -54,22 +57,26 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); - public static final int REMOTE_REFRESH_RETRY_INTERVAL_DEFAULT_VALUE = 1; + /** + * The initial retry interval at which the retry job gets scheduled after a failure. + */ + private static final int REMOTE_REFRESH_RETRY_BASE_INTERVAL_SECONDS = 1; - private static final int MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES = 1; + /** + * In an exponential back off setup, the maximum retry interval after the retry interval increases exponentially. + */ + private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_SECONDS = 30; /** - * If the remote refresh segment sync fails, we retry to sync the segments scheduling it after a retry interval - * which is controlled by the below setting. + * Exponential back off policy with max retry interval. */ - public static final Setting INDEX_REMOTE_REFRESH_RETRY_INTERVAL = Setting.intSetting( - "index.remote_store.segment_sync.retry_interval", - REMOTE_REFRESH_RETRY_INTERVAL_DEFAULT_VALUE, - REMOTE_REFRESH_RETRY_INTERVAL_DEFAULT_VALUE, - Setting.Property.Dynamic, - Setting.Property.IndexScope + private static final BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialEqualJitterBackoff( + REMOTE_REFRESH_RETRY_BASE_INTERVAL_SECONDS, + REMOTE_REFRESH_RETRY_MAX_INTERVAL_SECONDS ); + private static final int MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES = 1; + // Visible for testing static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing @@ -87,7 +94,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres */ private final Semaphore retrySemaphore = new Semaphore(MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES); - private volatile int remoteRefreshRetryInterval; + private volatile Iterator backoffDelayIterator; + + private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry; public RemoteStoreRefreshListener(IndexShard indexShard) { this.indexShard = indexShard; @@ -103,13 +112,7 @@ public RemoteStoreRefreshListener(IndexShard indexShard) { logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); } } - - // Retry remote refresh on failure - This interval is the time delay after which the sync of segments to remote store - // would be retried. This has been kept as an index scoped setting inline with index.refresh_interval setting. - remoteRefreshRetryInterval = INDEX_REMOTE_REFRESH_RETRY_INTERVAL.get(indexShard.indexSettings().getSettings()); - indexShard.indexSettings() - .getScopedSettings() - .addSettingsUpdateConsumer(INDEX_REMOTE_REFRESH_RETRY_INTERVAL, this::setRemoteRefreshRetryInterval); + resetBackOffDelayIterator(); } @Override @@ -189,6 +192,7 @@ private synchronized void syncSegments(boolean retry) { .filter(file -> !localSegmentsPostRefresh.contains(file)) .collect(Collectors.toSet()) .forEach(localSegmentChecksumMap::remove); + OnSuccessfulSegmentsSync(); final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); } else { @@ -226,6 +230,31 @@ private void beforeSegmentsSync(boolean isRetry) { } } + private void OnSuccessfulSegmentsSync() { + // Reset the backoffDelayIterator for the future failures + resetBackOffDelayIterator(); + // Cancel the scheduled cancellable retry if possible and set it to null + cancelAndResetScheduledCancellableRetry(); + } + + /** + * Cancels the scheduled retry if there is one scheduled, and it has not started yet. Clears the reference as the + * schedule retry has been cancelled, or it was null in the first place, or it is running/ran already. + */ + private void cancelAndResetScheduledCancellableRetry() { + if (scheduledCancellableRetry != null && scheduledCancellableRetry.getDelay(TimeUnit.NANOSECONDS) > 0) { + scheduledCancellableRetry.cancel(); + } + scheduledCancellableRetry = null; + } + + /** + * Resets the backoff delay iterator so that the next set of failures starts with the base delay and goes upto max delay. + */ + private void resetBackOffDelayIterator() { + backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator(); + } + private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) { // If this was a retry attempt, then we release the semaphore at the end so that further retries can be scheduled if (isRetry) { @@ -235,12 +264,8 @@ private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) { // If there are failures in uploading segments, then we should retry as search idle can lead to // refresh not occurring until write happens. if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retrySemaphore.tryAcquire()) { - indexShard.getThreadPool() - .schedule( - () -> this.syncSegments(true), - TimeValue.timeValueSeconds(remoteRefreshRetryInterval), - ThreadPool.Names.REMOTE_REFRESH - ); + scheduledCancellableRetry = indexShard.getThreadPool() + .schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH); } } @@ -310,8 +335,4 @@ private void deleteStaleCommits() { logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); } } - - public void setRemoteRefreshRetryInterval(int remoteRefreshRetryInterval) { - this.remoteRefreshRetryInterval = remoteRefreshRetryInterval; - } } From 0f015a9fa4ba2876bde69c56bc9f0838c6fa165a Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 4 May 2023 18:50:39 +0530 Subject: [PATCH 3/9] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../RemoteStoreRefreshListenerIT.java | 18 ++++++++++++++- .../shard/RemoteStoreRefreshListener.java | 7 +++--- .../org/opensearch/threadpool/ThreadPool.java | 22 +++++++++---------- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index c7db9930b8bc8..50df061d3d166 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -76,13 +76,18 @@ private Settings remoteStoreIndexSettings(int numberOfReplicas) { @After public void teardown() { + logger.info("--> Deleting the repository={}", REPOSITORY_NAME); assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); } public void testRemoteRefreshRetryOnFailure() throws Exception { - // Create repository Path location = randomRepoPath().toAbsolutePath(); + logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, location); + + // The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in + /// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the + // repository creation can happen without failure. createRepository( REPOSITORY_NAME, "mock", @@ -95,10 +100,18 @@ public void testRemoteRefreshRetryOnFailure() throws Exception { internalCluster().startDataOnlyNodes(1); createIndex(INDEX_NAME); + logger.info("--> Created index={}", INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); + logger.info("--> Cluster is yellow with no initializing shards"); ensureGreen(INDEX_NAME); + logger.info("--> Cluster is green"); + // Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed + // due to IOExceptions that are thrown while doing uploadBlobs. indexData(randomIntBetween(5, 10), randomBoolean()); + logger.info("--> Indexed data"); + + // TODO - Once the segments stats api is available, we need to verify that there were failed upload attempts. IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get(); assertEquals(1, response.getShards().length); @@ -106,6 +119,7 @@ public void testRemoteRefreshRetryOnFailure() throws Exception { Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid)); String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid); + logger.info("--> Verify that the segment files are same on local and repository eventually"); assertBusy( () -> assertEquals(getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath)), getSegmentFiles(segmentDataRepoPath)) ); @@ -137,8 +151,10 @@ private IndexResponse indexSingleDoc() { } private void indexData(int numberOfIterations, boolean invokeFlush) { + logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush); for (int i = 0; i < numberOfIterations; i++) { int numberOfOperations = randomIntBetween(20, 50); + logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i); for (int j = 0; j < numberOfOperations; j++) { indexSingleDoc(); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 49c87a3aa28f6..08c62f1177d0c 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -135,9 +135,9 @@ public void afterRefresh(boolean didRefresh) { } } - private synchronized void syncSegments(boolean retry) { + private synchronized void syncSegments(boolean isRetry) { boolean shouldRetry = false; - beforeSegmentsSync(retry); + beforeSegmentsSync(isRetry); try { if (indexShard.getReplicationTracker().isPrimaryMode()) { if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { @@ -219,9 +219,10 @@ private synchronized void syncSegments(boolean retry) { } } } catch (Throwable t) { + shouldRetry = true; logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); } - afterSegmentsSync(retry, shouldRetry); + afterSegmentsSync(isRetry, shouldRetry); } private void beforeSegmentsSync(boolean isRetry) { diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 49019ab4ad594..be2b77998e016 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -265,7 +265,7 @@ public ThreadPool( builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put( Names.REMOTE_REFRESH, - new ScalingExecutorBuilder(Names.REMOTE_REFRESH, 1, allocatedProcessors * 4, TimeValue.timeValueMinutes(5)) + new ScalingExecutorBuilder(Names.REMOTE_REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) ); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { builders.put(Names.INDEX_SEARCHER, new FixedExecutorBuilder(settings, Names.INDEX_SEARCHER, allocatedProcessors, 1000, false)); @@ -309,7 +309,7 @@ public ThreadPool( /** * Returns a value of milliseconds that may be used for relative time calculations. - *

+ * * This method should only be used for calculating time deltas. For an epoch based * timestamp, see {@link #absoluteTimeInMillis()}. */ @@ -319,7 +319,7 @@ public long relativeTimeInMillis() { /** * Returns a value of nanoseconds that may be used for relative time calculations. - *

+ * * This method should only be used for calculating time deltas. For an epoch based * timestamp, see {@link #absoluteTimeInMillis()}. */ @@ -329,7 +329,7 @@ public long relativeTimeInNanos() { /** * Returns the value of milliseconds since UNIX epoch. - *

+ * * This method should only be used for exact date/time formatting. For calculating * time deltas that should not suffer from negative deltas, which are possible with * this method, see {@link #relativeTimeInMillis()}. @@ -416,14 +416,14 @@ public ExecutorService executor(String name) { /** * Schedules a one-shot command to run after a given delay. The command is run in the context of the calling thread. * - * @param command the command to run - * @param delay delay before the task executes + * @param command the command to run + * @param delay delay before the task executes * @param executor the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes - * the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the - * command completes. + * the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the + * command completes. * @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if - * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool - * the ScheduledFuture will cannot interact with it. + * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool + * the ScheduledFuture will cannot interact with it. * @throws OpenSearchRejectedExecutionException if the task cannot be scheduled for execution */ @Override @@ -623,7 +623,7 @@ public String toString() { /** * A thread to cache millisecond time values from * {@link System#nanoTime()} and {@link System#currentTimeMillis()}. - *

+ * * The values are updated at a specified interval. */ static class CachedTimeThread extends Thread { From 1eb8c5da1bd9d7bc67b263838920cf88595192b2 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 6 May 2023 16:50:58 +0530 Subject: [PATCH 4/9] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../shard/RemoteStoreRefreshListener.java | 18 ++- .../RemoteStoreRefreshListenerTests.java | 126 +++++++++++++++++- 2 files changed, 131 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 08c62f1177d0c..2f8757015f208 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -60,23 +60,21 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres /** * The initial retry interval at which the retry job gets scheduled after a failure. */ - private static final int REMOTE_REFRESH_RETRY_BASE_INTERVAL_SECONDS = 1; + private static final int REMOTE_REFRESH_RETRY_BASE_INTERVAL_MILLIS = 1_000; /** * In an exponential back off setup, the maximum retry interval after the retry interval increases exponentially. */ - private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_SECONDS = 30; + private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 30_000; /** * Exponential back off policy with max retry interval. */ private static final BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialEqualJitterBackoff( - REMOTE_REFRESH_RETRY_BASE_INTERVAL_SECONDS, - REMOTE_REFRESH_RETRY_MAX_INTERVAL_SECONDS + REMOTE_REFRESH_RETRY_BASE_INTERVAL_MILLIS, + REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS ); - private static final int MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES = 1; - // Visible for testing static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing @@ -92,7 +90,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres /** * Semaphore that ensures there is only 1 retry scheduled at any time. */ - private final Semaphore retrySemaphore = new Semaphore(MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES); + private final Semaphore SCHEDULE_RETRY_PERMITS = new Semaphore(1); private volatile Iterator backoffDelayIterator; @@ -105,7 +103,7 @@ public RemoteStoreRefreshListener(IndexShard indexShard) { .getDelegate()).getDelegate(); this.primaryTerm = indexShard.getOperationPrimaryTerm(); localSegmentChecksumMap = new HashMap<>(); - if (indexShard.shardRouting.primary()) { + if (indexShard.routingEntry().primary()) { try { this.remoteDirectory.init(); } catch (IOException e) { @@ -259,12 +257,12 @@ private void resetBackOffDelayIterator() { private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) { // If this was a retry attempt, then we release the semaphore at the end so that further retries can be scheduled if (isRetry) { - retrySemaphore.release(); + SCHEDULE_RETRY_PERMITS.release(); } // If there are failures in uploading segments, then we should retry as search idle can lead to // refresh not occurring until write happens. - if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retrySemaphore.tryAcquire()) { + if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && SCHEDULE_RETRY_PERMITS.tryAcquire()) { scheduledCancellableRetry = indexShard.getThreadPool() .schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index c9b8c023e26aa..158a9e9fb2229 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -30,7 +30,11 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX; public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { @@ -58,9 +62,11 @@ private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @After public void tearDown() throws Exception { - Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); - closeShards(indexShard); + if (indexShard != null) { + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + closeShards(indexShard); + } super.tearDown(); } @@ -204,6 +210,120 @@ public void onFailure(Exception e) { verifyUploadedSegments(remoteSegmentStoreDirectory); } + public void testRefreshSuccessOnFirstAttempt() throws Exception { + // This is the case of isRetry=false, shouldRetry=false + // Succeed on 1st attempt + int succeedOnAttempt = 1; + // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); + // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. + CountDownLatch successLatch = new CountDownLatch(2); + mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(0, successLatch.getCount())); + } + + public void testRefreshSuccessOnSecondAttempt() throws Exception { + // This covers 2 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false + // Succeed on 2nd attempt + int succeedOnAttempt = 2; + // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); + // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. + CountDownLatch successLatch = new CountDownLatch(2); + mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(0, successLatch.getCount())); + } + + public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception { + // This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true + // Succeed on 3rd attempt + int succeedOnAttempt = 3; + // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); + // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. + CountDownLatch successLatch = new CountDownLatch(2); + mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(0, successLatch.getCount())); + } + + private void mockIndexShardWithRetryAndScheduleRefresh( + int SucceedOnAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch + ) throws IOException { + // Create index shard that we will be using to mock different methods in IndexShard for the unit test + indexShard = newStartedShard( + true, + Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + new InternalEngineFactory() + ); + + indexDocs(1, randomIntBetween(1, 100)); + + // Mock indexShard.store().directory() + IndexShard shard = mock(IndexShard.class); + Store store = mock(Store.class); + when(shard.store()).thenReturn(store); + when(store.directory()).thenReturn(indexShard.store().directory()); + + // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) + Store remoteStore = mock(Store.class); + when(shard.remoteStore()).thenReturn(remoteStore); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()) + .getDelegate(); + FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory)); + when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); + + // Mock indexShard.getOperationPrimaryTerm() + when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm()); + + // Mock indexShard.routingEntry().primary() + when(shard.routingEntry()).thenReturn(indexShard.routingEntry()); + + // Mock threadpool + when(shard.getThreadPool()).thenReturn(threadPool); + + // Mock indexShard.getReplicationTracker().isPrimaryMode() + doAnswer(invocation -> { + refreshCountLatch.countDown(); + return indexShard.getReplicationTracker(); + }).when(shard).getReplicationTracker(); + + AtomicLong counter = new AtomicLong(); + // Mock indexShard.getSegmentInfosSnapshot() + doAnswer(invocation -> { + if (counter.incrementAndGet() <= SucceedOnAttempt - 1) { + throw new RuntimeException("Inducing failure in upload"); + } + return indexShard.getSegmentInfosSnapshot(); + }).when(shard).getSegmentInfosSnapshot(); + + when(shard.getEngine()).thenReturn(indexShard.getEngine()); + doAnswer(invocation -> { + successLatch.countDown(); + return indexShard.getEngine(); + }).when(shard).getEngine(); + + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard); + refreshListener.afterRefresh(false); + } + + private static class TestFilterDirectory extends FilterDirectory { + + /** + * Sole constructor, typically called from sub-classes. + * + * @param in + */ + protected TestFilterDirectory(Directory in) { + super(in); + } + } + private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentStoreDirectory) throws IOException { Map uploadedSegments = remoteSegmentStoreDirectory .getSegmentsUploadedToRemoteStore(); From be4378038e8bec90be2976281e6fd4d714ecdd82 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 8 May 2023 19:54:31 +0530 Subject: [PATCH 5/9] Fix success countdown latch in UT & remove redundant condition Signed-off-by: Ashish Singh --- .../index/shard/RemoteStoreRefreshListenerTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 158a9e9fb2229..72db0e8ab8571 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -217,7 +217,8 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. - CountDownLatch successLatch = new CountDownLatch(2); + // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down + CountDownLatch successLatch = new CountDownLatch(3); mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); @@ -302,7 +303,6 @@ private void mockIndexShardWithRetryAndScheduleRefresh( return indexShard.getSegmentInfosSnapshot(); }).when(shard).getSegmentInfosSnapshot(); - when(shard.getEngine()).thenReturn(indexShard.getEngine()); doAnswer(invocation -> { successLatch.countDown(); return indexShard.getEngine(); From f80f4acd71db90846e776d2bfcce446129b09b76 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 8 May 2023 20:35:48 +0530 Subject: [PATCH 6/9] Empty-Commit Signed-off-by: Ashish Singh From 843ea405004b05fdba319c9eefef07a71faecd84 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 8 May 2023 21:56:51 +0530 Subject: [PATCH 7/9] Refactor code Signed-off-by: Ashish Singh --- .../RemoteStoreRefreshListenerIT.java | 60 ++++++++++++------- .../shard/RemoteStoreRefreshListener.java | 2 +- .../RemoteStoreRefreshListenerTests.java | 14 +++-- 3 files changed, 48 insertions(+), 28 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index 50df061d3d166..30e370f3a528c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -20,6 +20,7 @@ import org.opensearch.core.util.FileSystemUtils; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; import org.opensearch.test.FeatureFlagSetter; @@ -31,6 +32,7 @@ import java.util.Collections; import java.util.Locale; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -83,8 +85,32 @@ public void teardown() { public void testRemoteRefreshRetryOnFailure() throws Exception { Path location = randomRepoPath().toAbsolutePath(); - logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, location); + setup(location, randomDoubleBetween(0.1, 0.25, true), "metadata"); + // Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed + // due to IOExceptions that are thrown while doing uploadBlobs. + indexData(randomIntBetween(5, 10), randomBoolean()); + logger.info("--> Indexed data"); + + // TODO - Once the segments stats api is available, we need to verify that there were failed upload attempts. + IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get(); + assertEquals(1, response.getShards().length); + + String indexUuid = response.getShards()[0].getShardRouting().index().getUUID(); + Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid)); + String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid); + + logger.info("--> Verify that the segment files are same on local and repository eventually"); + // This can take time as the retry interval is exponential and maxed at 30s + assertBusy(() -> { + Set filesInLocal = getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath)); + Set filesInRepo = getSegmentFiles(segmentDataRepoPath); + assertTrue(filesInRepo.containsAll(filesInLocal)); + }, 60, TimeUnit.SECONDS); + } + + private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) { + logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation); // The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in /// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the // repository creation can happen without failure. @@ -92,10 +118,11 @@ public void testRemoteRefreshRetryOnFailure() throws Exception { REPOSITORY_NAME, "mock", Settings.builder() - .put("location", location) - .put("random_control_io_exception_rate", randomIntBetween(10, 25) / 100f) + .put("location", repoLocation) + .put("random_control_io_exception_rate", ioFailureRate) .put("skip_exception_on_verification_file", true) .put("skip_exception_on_list_blobs", true) + .put("max_failure_number", Long.MAX_VALUE) ); internalCluster().startDataOnlyNodes(1); @@ -105,24 +132,6 @@ public void testRemoteRefreshRetryOnFailure() throws Exception { logger.info("--> Cluster is yellow with no initializing shards"); ensureGreen(INDEX_NAME); logger.info("--> Cluster is green"); - - // Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed - // due to IOExceptions that are thrown while doing uploadBlobs. - indexData(randomIntBetween(5, 10), randomBoolean()); - logger.info("--> Indexed data"); - - // TODO - Once the segments stats api is available, we need to verify that there were failed upload attempts. - IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get(); - assertEquals(1, response.getShards().length); - - String indexUuid = response.getShards()[0].getShardRouting().index().getUUID(); - Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid)); - String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid); - - logger.info("--> Verify that the segment files are same on local and repository eventually"); - assertBusy( - () -> assertEquals(getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath)), getSegmentFiles(segmentDataRepoPath)) - ); } /** @@ -134,8 +143,9 @@ public void testRemoteRefreshRetryOnFailure() throws Exception { private Set getSegmentFiles(Path location) { try { return Arrays.stream(FileSystemUtils.files(location)) - .filter(path -> path.getFileName().startsWith("_")) + .filter(path -> path.getFileName().toString().startsWith("_")) .map(path -> path.getFileName().toString()) + .map(this::getLocalSegmentFilename) .collect(Collectors.toSet()); } catch (IOException exception) { logger.error("Exception occurred while getting segment files", exception); @@ -143,6 +153,10 @@ private Set getSegmentFiles(Path location) { return Collections.emptySet(); } + private String getLocalSegmentFilename(String remoteFilename) { + return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0]; + } + private IndexResponse indexSingleDoc() { return client().prepareIndex(INDEX_NAME) .setId(UUIDs.randomBase64UUID()) @@ -153,7 +167,7 @@ private IndexResponse indexSingleDoc() { private void indexData(int numberOfIterations, boolean invokeFlush) { logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush); for (int i = 0; i < numberOfIterations; i++) { - int numberOfOperations = randomIntBetween(20, 50); + int numberOfOperations = randomIntBetween(1, 5); logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i); for (int j = 0; j < numberOfOperations; j++) { indexSingleDoc(); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 2f8757015f208..8672ba6c59a13 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -65,7 +65,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres /** * In an exponential back off setup, the maximum retry interval after the retry interval increases exponentially. */ - private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 30_000; + private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 10_000; /** * Exponential back off policy with max retry interval. diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 72db0e8ab8571..592cc5172c12d 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; @@ -251,7 +252,7 @@ public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception { } private void mockIndexShardWithRetryAndScheduleRefresh( - int SucceedOnAttempt, + int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch ) throws IOException { @@ -289,22 +290,27 @@ private void mockIndexShardWithRetryAndScheduleRefresh( when(shard.getThreadPool()).thenReturn(threadPool); // Mock indexShard.getReplicationTracker().isPrimaryMode() + doAnswer(invocation -> { - refreshCountLatch.countDown(); + if (Objects.nonNull(refreshCountLatch)) { + refreshCountLatch.countDown(); + } return indexShard.getReplicationTracker(); }).when(shard).getReplicationTracker(); AtomicLong counter = new AtomicLong(); // Mock indexShard.getSegmentInfosSnapshot() doAnswer(invocation -> { - if (counter.incrementAndGet() <= SucceedOnAttempt - 1) { + if (counter.incrementAndGet() <= succeedOnAttempt - 1) { throw new RuntimeException("Inducing failure in upload"); } return indexShard.getSegmentInfosSnapshot(); }).when(shard).getSegmentInfosSnapshot(); doAnswer(invocation -> { - successLatch.countDown(); + if (Objects.nonNull(successLatch)) { + successLatch.countDown(); + } return indexShard.getEngine(); }).when(shard).getEngine(); From c0ac5dcfed49603ca2ad51d98b178b4b74440ead Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 8 May 2023 22:39:08 +0530 Subject: [PATCH 8/9] Empty-Commit Signed-off-by: Ashish Singh From b7be6f2dd0389e2e1be22ecd13d38e4bc7d8081a Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 9 May 2023 10:22:31 +0530 Subject: [PATCH 9/9] Added missed condition Signed-off-by: Ashish Singh --- .../index/shard/RemoteStoreRefreshListenerTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 592cc5172c12d..84848bb87d634 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -232,7 +232,8 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. - CountDownLatch successLatch = new CountDownLatch(2); + // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down + CountDownLatch successLatch = new CountDownLatch(3); mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); @@ -245,7 +246,8 @@ public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception { // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. - CountDownLatch successLatch = new CountDownLatch(2); + // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down + CountDownLatch successLatch = new CountDownLatch(3); mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount()));