From f24757dd819ebb7f7d23e9fb538fb4665973d1d6 Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Mon, 1 Apr 2024 10:17:22 +0530 Subject: [PATCH 01/11] Adding support to allow refresh interval -1 Signed-off-by: Shubh Sahu --- .../common/settings/IndexScopedSettings.java | 2 + .../org/opensearch/index/IndexSettings.java | 24 ++++++++ .../opensearch/index/shard/IndexShard.java | 56 +++++++++++++++++++ .../translog/InternalTranslogManager.java | 5 ++ .../index/translog/NoOpTranslogManager.java | 5 ++ .../index/translog/RemoteFsTranslog.java | 11 ++++ .../opensearch/index/translog/Translog.java | 10 ++++ .../index/translog/TranslogManager.java | 2 + 8 files changed, 115 insertions(+) diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index c6c312d6b6eea..29ba5d7504e26 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -237,6 +237,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { // Settings for concurrent segment search IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, + IndexSettings.INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES, + // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 82875564c1c07..16b43c65c1718 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -719,6 +719,19 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + /** + * Index setting describing the maximum number of uncommitted translog files at a time. + * If breached the shard will be Refreshed, this is to control large number of tranlog files + * downloads during recovery. + */ + public static final Setting INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES = Setting.intSetting( + "index.max_uncommitted_translog_files", + 300, + 1, + Property.Dynamic, + Property.IndexScope + ); + private final Index index; private final Version version; private final Logger logger; @@ -802,6 +815,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { private volatile long mappingTotalFieldsLimit; private volatile long mappingDepthLimit; private volatile long mappingFieldNameLengthLimit; + private volatile int maxUncommittedTranslogFiles; /** * The maximum number of refresh listeners allows on this shard. @@ -980,6 +994,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY)); checkPendingFlushEnabled = scopedSettings.get(INDEX_CHECK_PENDING_FLUSH_ENABLED); defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE); + maxUncommittedTranslogFiles = scopedSettings.get(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES); /* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7). * For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type * to higher bytes size like integer to long. This behavior was changed from OpenSearch 2.7 version not to @@ -1104,6 +1119,15 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, this::setDocIdFuzzySetFalsePositiveProbability ); + scopedSettings.addSettingsUpdateConsumer(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES,this::setMaxUncommittedTranslogFiles); + } + + private void setMaxUncommittedTranslogFiles(int maxUncommittedTranslogFiles) { + this.maxUncommittedTranslogFiles = maxUncommittedTranslogFiles; + } + + public int getMaxUncommittedTranslogFiles() { + return maxUncommittedTranslogFiles; } private void setSearchIdleAfter(TimeValue searchIdleAfter) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 26dbbbcdee7c0..e0202b7404a88 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4484,6 +4484,33 @@ public Durability getTranslogDurability() { // we can not protect with a lock since we "release" on a different thread private final AtomicBoolean flushOrRollRunning = new AtomicBoolean(); + /** + * Tests whether or not the shard should be Refreshed, if number of translog files breaches the + * threshold count determined by {@code index.translog.max_uncommitted_files_threshold} + * @return {@code true} if the shard should be Refreshed + */ + boolean shouldRefreshShard(){ + final Engine engine = getEngineOrNull(); + if (engine != null) { + try { + return engine.translogManager().shouldRefreshShard(indexSettings.getMaxUncommittedTranslogFiles()); + } catch (final AlreadyClosedException e) { + // we are already closed, no need to Refresh + } + } + return false; + } + + private final AtomicBoolean isRefreshRunning = new AtomicBoolean(); + + /** + * Will Call a blocking Refresh and then Trim the Unreferenced Translog files + */ + private void refreshAndTrimTranslogfiles(String source) throws IOException { + refresh(source); + getEngine().translogManager().trimUnreferencedTranslogFiles(); + } + /** * Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be * executed asynchronously on the flush thread pool. @@ -4548,6 +4575,35 @@ public void onAfter() { flushOrRollRunning.compareAndSet(true, false); } } + } else if (shouldRefreshShard() && isRefreshRunning.compareAndSet(false, true)) { + + if (shouldRefreshShard()) { + logger.info("submitting async Refresh request"); + final AbstractRunnable refreshAndTrimTranslog = new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("forced refresh failed after number of uncommited translog files breached limit", e); + } + + @Override + protected void doRun() throws Exception { + refreshAndTrimTranslogfiles("Too many uncommited Translog files"); + } + + @Override + public boolean isForceExecution() { + return true; + } + + @Override + public void onAfter() { + isRefreshRunning.compareAndSet(true, false); + } + }; + threadPool.executor(ThreadPool.Names.REFRESH).execute(refreshAndTrimTranslog); + } else { + isRefreshRunning.compareAndSet(true, false); + } } } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index a22c538286a88..34044b45bf859 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -469,4 +469,9 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl public void close() throws IOException { IOUtils.closeWhileHandlingException(translog); } + + @Override + public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + return getTranslog(true).shouldRefreshShard(maxUncommittedTranslogFilesThreshold); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index b4aa7865570a6..d918b0edca120 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -134,4 +134,9 @@ public Releasable drainSync() { public Translog.TranslogGeneration getTranslogGeneration() { return null; } + + @Override + public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + return false; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 67799f0465c29..6154cb3454699 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -652,4 +652,15 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi int availablePermits() { return syncPermit.availablePermits(); } + + /** + * Tests whether or not the shard should be Refreshed. + * This test is based on the number of Translog files compared to configured number of Translog files threshold + * + * @return {@code true} if the shard should be Refreshed + */ + @Override + public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + return readers.size() > maxUncommittedTranslogFilesThreshold; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 7c50ed6ecd58f..ced8ebc450cfa 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2052,4 +2052,14 @@ public static String createEmptyTranslog( public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) { return minUnrefCheckpointInLastCommit; } + + /** + * Tests whether or not the shard should be Refreshed. + * This test is based on the number of Translog files compared to configured number of Translog files threshold + * + * @return {@code true} if the shard should be Refreshed + */ + public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + return false; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index e1a0b7d1c1293..25d9397287d91 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -142,4 +142,6 @@ public interface TranslogManager { Releasable drainSync(); Translog.TranslogGeneration getTranslogGeneration(); + + boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold); } From 5f4ca3a8bc7707653806fc6cb8bba6614aeab2b5 Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Wed, 3 Apr 2024 12:13:03 +0530 Subject: [PATCH 02/11] Changing logic to Just trigger refresh, and leave trimtranslog on AfterWriteAction, which trims unreferenced translog readers after every write Signed-off-by: Shubh Sahu --- .../org/opensearch/index/IndexSettings.java | 2 +- .../org/opensearch/index/shard/IndexShard.java | 18 ++++++------------ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 16b43c65c1718..0d5dd1a303e92 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1119,7 +1119,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, this::setDocIdFuzzySetFalsePositiveProbability ); - scopedSettings.addSettingsUpdateConsumer(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES,this::setMaxUncommittedTranslogFiles); + scopedSettings.addSettingsUpdateConsumer(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES, this::setMaxUncommittedTranslogFiles); } private void setMaxUncommittedTranslogFiles(int maxUncommittedTranslogFiles) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e0202b7404a88..99e0b750b231a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4489,7 +4489,7 @@ public Durability getTranslogDurability() { * threshold count determined by {@code index.translog.max_uncommitted_files_threshold} * @return {@code true} if the shard should be Refreshed */ - boolean shouldRefreshShard(){ + boolean shouldRefreshShard() { final Engine engine = getEngineOrNull(); if (engine != null) { try { @@ -4503,17 +4503,11 @@ boolean shouldRefreshShard(){ private final AtomicBoolean isRefreshRunning = new AtomicBoolean(); - /** - * Will Call a blocking Refresh and then Trim the Unreferenced Translog files - */ - private void refreshAndTrimTranslogfiles(String source) throws IOException { - refresh(source); - getEngine().translogManager().trimUnreferencedTranslogFiles(); - } - /** * Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be * executed asynchronously on the flush thread pool. + * Also Schedules a refresh if Number of Translog files breaches the threshold count determined by + * {@code index.translog.max_uncommitted_files_threshold} */ public void afterWriteOperation() { if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) { @@ -4579,7 +4573,7 @@ public void onAfter() { if (shouldRefreshShard()) { logger.info("submitting async Refresh request"); - final AbstractRunnable refreshAndTrimTranslog = new AbstractRunnable() { + final AbstractRunnable _refresh = new AbstractRunnable() { @Override public void onFailure(Exception e) { logger.warn("forced refresh failed after number of uncommited translog files breached limit", e); @@ -4587,7 +4581,7 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { - refreshAndTrimTranslogfiles("Too many uncommited Translog files"); + refresh("Too many uncommited Translog files"); } @Override @@ -4600,7 +4594,7 @@ public void onAfter() { isRefreshRunning.compareAndSet(true, false); } }; - threadPool.executor(ThreadPool.Names.REFRESH).execute(refreshAndTrimTranslog); + threadPool.executor(ThreadPool.Names.REFRESH).execute(_refresh); } else { isRefreshRunning.compareAndSet(true, false); } From 0e1a2d3c4b7f07c56571e0aca97648c5e7891f88 Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Fri, 5 Apr 2024 16:46:50 +0530 Subject: [PATCH 03/11] Added IT/UT and changed setting scope Signed-off-by: Shubh Sahu --- .../refresh/RefreshRemoteTranslogFilesIT.java | 50 +++++++++++++++++++ .../MigrationBaseTestCase.java | 2 +- .../common/settings/ClusterSettings.java | 3 +- .../common/settings/IndexScopedSettings.java | 2 - .../org/opensearch/index/IndexSettings.java | 24 --------- .../opensearch/index/shard/IndexShard.java | 7 ++- .../translog/InternalTranslogManager.java | 4 +- .../index/translog/NoOpTranslogManager.java | 2 +- .../index/translog/RemoteFsTranslog.java | 4 +- .../opensearch/index/translog/Translog.java | 2 +- .../index/translog/TranslogManager.java | 2 +- .../transfer/TranslogTransferManager.java | 4 ++ .../indices/RemoteStoreSettings.java | 23 +++++++++ ...RemoteStoreSettingsDynamicUpdateTests.java | 20 ++++++++ 14 files changed, 110 insertions(+), 39 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java new file mode 100644 index 0000000000000..04eab9dcb2284 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.refresh; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.common.settings.Settings; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RefreshRemoteTranslogFilesIT extends RemoteStoreBaseIntegTestCase { + + protected final String INDEX_NAME = "remote-store-test-idx-1"; + + public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception { + + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "5") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // indexing 35 documents (7 bulk requests), which should trigger refresh, and hence number of documents(searchable) should be 35. + // Here refresh will be triggered on 6th and 7th bulk request. One extra since translogs will be marked + // unreferenced after 6th refresh completes and will be trimmed on 7th bulk request call. + for (int i = 0; i < 7; i++) { + indexBulk(INDEX_NAME, 5); + } + + // refresh will not trigger here, hence total searchable documents will be 35 (not 40) + indexBulk(INDEX_NAME, 5); + + long currentDocCount = client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value; + assertEquals(35, currentDocCount); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 0c35f91121059..f34d3cf1f3ad9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -40,7 +40,7 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase { protected Path segmentRepoPath; protected Path translogRepoPath; - boolean addRemote = false; + protected boolean addRemote = false; Settings extraSettings = Settings.EMPTY; private final List documentKeys = List.of( diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 9d763c970c3e7..575089eb2d904 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -728,7 +728,8 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, - RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING + RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, + RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES ) ) ); diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 29ba5d7504e26..c6c312d6b6eea 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -237,8 +237,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings { // Settings for concurrent segment search IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, - IndexSettings.INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES, - // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 0d5dd1a303e92..82875564c1c07 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -719,19 +719,6 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); - /** - * Index setting describing the maximum number of uncommitted translog files at a time. - * If breached the shard will be Refreshed, this is to control large number of tranlog files - * downloads during recovery. - */ - public static final Setting INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES = Setting.intSetting( - "index.max_uncommitted_translog_files", - 300, - 1, - Property.Dynamic, - Property.IndexScope - ); - private final Index index; private final Version version; private final Logger logger; @@ -815,7 +802,6 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { private volatile long mappingTotalFieldsLimit; private volatile long mappingDepthLimit; private volatile long mappingFieldNameLengthLimit; - private volatile int maxUncommittedTranslogFiles; /** * The maximum number of refresh listeners allows on this shard. @@ -994,7 +980,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY)); checkPendingFlushEnabled = scopedSettings.get(INDEX_CHECK_PENDING_FLUSH_ENABLED); defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE); - maxUncommittedTranslogFiles = scopedSettings.get(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES); /* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7). * For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type * to higher bytes size like integer to long. This behavior was changed from OpenSearch 2.7 version not to @@ -1119,15 +1104,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, this::setDocIdFuzzySetFalsePositiveProbability ); - scopedSettings.addSettingsUpdateConsumer(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES, this::setMaxUncommittedTranslogFiles); - } - - private void setMaxUncommittedTranslogFiles(int maxUncommittedTranslogFiles) { - this.maxUncommittedTranslogFiles = maxUncommittedTranslogFiles; - } - - public int getMaxUncommittedTranslogFiles() { - return maxUncommittedTranslogFiles; } private void setSearchIdleAfter(TimeValue searchIdleAfter) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 99e0b750b231a..4882539510f26 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4489,11 +4489,11 @@ public Durability getTranslogDurability() { * threshold count determined by {@code index.translog.max_uncommitted_files_threshold} * @return {@code true} if the shard should be Refreshed */ - boolean shouldRefreshShard() { + public boolean shouldRefreshShard() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - return engine.translogManager().shouldRefreshShard(indexSettings.getMaxUncommittedTranslogFiles()); + return engine.translogManager().shouldRefreshShard(); } catch (final AlreadyClosedException e) { // we are already closed, no need to Refresh } @@ -4570,9 +4570,8 @@ public void onAfter() { } } } else if (shouldRefreshShard() && isRefreshRunning.compareAndSet(false, true)) { - if (shouldRefreshShard()) { - logger.info("submitting async Refresh request"); + logger.debug("submitting async Refresh request"); final AbstractRunnable _refresh = new AbstractRunnable() { @Override public void onFailure(Exception e) { diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 34044b45bf859..2d90c59d8dd1f 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -471,7 +471,7 @@ public void close() throws IOException { } @Override - public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { - return getTranslog(true).shouldRefreshShard(maxUncommittedTranslogFilesThreshold); + public boolean shouldRefreshShard() { + return getTranslog(true).shouldRefreshShard(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index d918b0edca120..b4a78baea51bd 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -136,7 +136,7 @@ public Translog.TranslogGeneration getTranslogGeneration() { } @Override - public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + public boolean shouldRefreshShard() { return false; } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 6154cb3454699..4f38363d6a353 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -660,7 +660,7 @@ int availablePermits() { * @return {@code true} if the shard should be Refreshed */ @Override - public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { - return readers.size() > maxUncommittedTranslogFilesThreshold; + public boolean shouldRefreshShard() { + return readers.size() >= translogTransferManager.getMaxRemoteReferencedTranslogFilesSettings(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index ced8ebc450cfa..f4468a0b0ad04 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2059,7 +2059,7 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi * * @return {@code true} if the shard should be Refreshed */ - public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + public boolean shouldRefreshShard() { return false; } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 25d9397287d91..d3e9cbbf95dec 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -143,5 +143,5 @@ public interface TranslogManager { Translog.TranslogGeneration getTranslogGeneration(); - boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold); + boolean shouldRefreshShard(); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 1087244623b87..f13cf6c06805d 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -585,4 +585,8 @@ public void onFailure(Exception e) { throw e; } } + + public int getMaxRemoteReferencedTranslogFilesSettings() { + return this.remoteStoreSettings.getMaxRemoteReferencedTranslogFiles(); + } } diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 7f2121093f8e8..c545af9008fc8 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -65,9 +65,21 @@ public class RemoteStoreSettings { Property.Dynamic ); + /** + * Controls the maximum referenced remote translog files. If breached the shard will be Refreshed. + */ + public static final Setting CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES = Setting.intSetting( + "cluster.remote_store.max_referenced_translog_files", + 300, + 1, + Property.Dynamic, + Property.NodeScope + ); + private volatile TimeValue clusterRemoteTranslogBufferInterval; private volatile int minRemoteSegmentMetadataFiles; private volatile TimeValue clusterRemoteTranslogTransferTimeout; + private volatile int maxRemoteReferencedTranslogFiles; public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); @@ -87,6 +99,9 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, this::setClusterRemoteTranslogTransferTimeout ); + + maxRemoteReferencedTranslogFiles = CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES, this::setMaxRemoteReferencedTranslogFiles); } public TimeValue getClusterRemoteTranslogBufferInterval() { @@ -112,4 +127,12 @@ public TimeValue getClusterRemoteTranslogTransferTimeout() { private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) { this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout; } + + public int getMaxRemoteReferencedTranslogFiles() { + return maxRemoteReferencedTranslogFiles; + } + + private void setMaxRemoteReferencedTranslogFiles(int maxRemoteReferencedTranslogFiles) { + this.maxRemoteReferencedTranslogFiles = maxRemoteReferencedTranslogFiles; + } } diff --git a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java index 8a77d97f88d67..54d539df04395 100644 --- a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java @@ -96,4 +96,24 @@ public void testClusterRemoteTranslogTransferTimeout() { ); assertEquals(TimeValue.timeValueSeconds(40), remoteStoreSettings.getClusterRemoteTranslogTransferTimeout()); } + + public void testMaxRemoteReferencedTranslogFiles() { + // Test default value + assertEquals(300, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles()); + + // Test override with valid value + clusterSettings.applySettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "100").build() + ); + assertEquals(100, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles()); + + // Test override with value less than minimum + assertThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "0").build() + ) + ); + assertEquals(100, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles()); + } } From 4fd9bf50c487f00708f01115b6856146826b50ed Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Fri, 5 Apr 2024 16:53:08 +0530 Subject: [PATCH 04/11] Minor correction Signed-off-by: Shubh Sahu --- .../org/opensearch/remotemigration/MigrationBaseTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index f34d3cf1f3ad9..0c35f91121059 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -40,7 +40,7 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase { protected Path segmentRepoPath; protected Path translogRepoPath; - protected boolean addRemote = false; + boolean addRemote = false; Settings extraSettings = Settings.EMPTY; private final List documentKeys = List.of( From 90bcd5e18a8c880634520e6b4bc828b990c5a672 Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Fri, 5 Apr 2024 17:19:40 +0530 Subject: [PATCH 05/11] Comments and CHANGELOG.md Signed-off-by: Shubh Sahu --- CHANGELOG.md | 1 + .../main/java/org/opensearch/index/shard/IndexShard.java | 4 ++-- .../org/opensearch/index/translog/RemoteFsTranslog.java | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 340360f9cdd28..464fcf2cadd3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -113,6 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044)) - Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656)) - [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704)) +- [Remote Store] Reallow index & cluster default Refresh Interval to be set as -1 ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4882539510f26..072faf7ae4053 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4485,8 +4485,8 @@ public Durability getTranslogDurability() { private final AtomicBoolean flushOrRollRunning = new AtomicBoolean(); /** - * Tests whether or not the shard should be Refreshed, if number of translog files breaches the - * threshold count determined by {@code index.translog.max_uncommitted_files_threshold} + * Checks if the shard need to be Refreshed depending on Translog constraints. + * Each Translog type can have it's own decider * @return {@code true} if the shard should be Refreshed */ public boolean shouldRefreshShard() { diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 4f38363d6a353..025751e18d71e 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -654,9 +654,9 @@ int availablePermits() { } /** - * Tests whether or not the shard should be Refreshed. - * This test is based on the number of Translog files compared to configured number of Translog files threshold - * + * Checks whether or not the shard should be Refreshed. + * This checks if number of translog files breaches the threshold count determined by + * {@code cluster.remote_store.max_referenced_translog_files} setting * @return {@code true} if the shard should be Refreshed */ @Override From c5df7d4fc057d6ae5e7c78515a76661ac904a27b Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Wed, 10 Apr 2024 16:07:36 +0530 Subject: [PATCH 06/11] Some refactoring and name changes Signed-off-by: Shubh Sahu --- CHANGELOG.md | 2 +- .../refresh/RefreshRemoteTranslogFilesIT.java | 2 +- .../common/settings/ClusterSettings.java | 2 +- .../org/opensearch/index/shard/IndexShard.java | 18 ++++++------------ .../index/translog/RemoteFsTranslog.java | 8 ++++---- .../opensearch/index/translog/Translog.java | 7 +++---- .../transfer/TranslogTransferManager.java | 4 ++-- .../indices/RemoteStoreSettings.java | 18 +++++++++--------- .../index/shard/IndexShardTests.java | 2 +- .../RemoteStoreSettingsDynamicUpdateTests.java | 10 +++++----- 10 files changed, 33 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3467a606adcfe..7cc42dfeb5c5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,7 +114,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656)) - [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704)) - Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686)) -- [Remote Store] Reallow index & cluster default Refresh Interval to be set as -1 ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992)) +- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992)) - Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java index 04eab9dcb2284..3f3612d52739f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java @@ -30,7 +30,7 @@ public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception { ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "5") + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "5") ); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 575089eb2d904..8410bd6dc300a 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -729,7 +729,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, - RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES + RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS ) ) ); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 781f87dfdab48..4d8a805723110 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4485,9 +4485,9 @@ public Durability getTranslogDurability() { private final AtomicBoolean flushOrRollRunning = new AtomicBoolean(); /** - * Checks if the shard need to be Refreshed depending on Translog constraints. - * Each Translog type can have it's own decider - * @return {@code true} if the shard should be Refreshed + * Checks if the shard need to be refreshed depending on translog constraints. + * each translog type can have it's own decider + * @return {@code true} if the shard should be refreshed */ public boolean shouldRefreshShard() { final Engine engine = getEngineOrNull(); @@ -4509,7 +4509,7 @@ private void maybeRefreshShard(String source) { /** * Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be * executed asynchronously on the flush thread pool. - * Also Schedules a refresh if required, decided by Translog manager + * Also schedules a refresh if required, decided by translog manager */ public void afterWriteOperation() { if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) { @@ -4573,7 +4573,7 @@ public void onAfter() { } } else if (shouldRefreshShard()) { logger.debug("submitting async Refresh request"); - final AbstractRunnable _refresh = new AbstractRunnable() { + final AbstractRunnable refreshRunnable = new AbstractRunnable() { @Override public void onFailure(Exception e) { logger.warn("refresh failed after translog manager decided to refresh the shard", e); @@ -4583,14 +4583,8 @@ public void onFailure(Exception e) { protected void doRun() throws Exception { maybeRefreshShard("Translog manager decided to refresh the shard"); } - - @Override - public boolean isForceExecution() { - return true; - } - }; - threadPool.executor(ThreadPool.Names.REFRESH).execute(_refresh); + threadPool.executor(ThreadPool.Names.REFRESH).execute(refreshRunnable); } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 025751e18d71e..55b78446fb70a 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -654,13 +654,13 @@ int availablePermits() { } /** - * Checks whether or not the shard should be Refreshed. + * Checks whether or not the shard should be refreshed. * This checks if number of translog files breaches the threshold count determined by - * {@code cluster.remote_store.max_referenced_translog_files} setting - * @return {@code true} if the shard should be Refreshed + * {@code cluster.remote_store.translog.max_readers} setting + * @return {@code true} if the shard should be refreshed */ @Override public boolean shouldRefreshShard() { - return readers.size() >= translogTransferManager.getMaxRemoteReferencedTranslogFilesSettings(); + return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index f4468a0b0ad04..2a13b421fa562 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2054,10 +2054,9 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi } /** - * Tests whether or not the shard should be Refreshed. - * This test is based on the number of Translog files compared to configured number of Translog files threshold - * - * @return {@code true} if the shard should be Refreshed + * Checks whether or not the shard should be refreshed. + * each translog type can have it's own decider + * @return {@code true} if the shard should be refreshed */ public boolean shouldRefreshShard() { return false; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index f13cf6c06805d..47638f44fd6fc 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -586,7 +586,7 @@ public void onFailure(Exception e) { } } - public int getMaxRemoteReferencedTranslogFilesSettings() { - return this.remoteStoreSettings.getMaxRemoteReferencedTranslogFiles(); + public int getMaxRemoteTranslogReadersSettings() { + return this.remoteStoreSettings.getMaxRemoteTranslogReaders(); } } diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index c545af9008fc8..4190da64fb0f2 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -68,8 +68,8 @@ public class RemoteStoreSettings { /** * Controls the maximum referenced remote translog files. If breached the shard will be Refreshed. */ - public static final Setting CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES = Setting.intSetting( - "cluster.remote_store.max_referenced_translog_files", + public static final Setting CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting( + "cluster.remote_store.translog.max_readers", 300, 1, Property.Dynamic, @@ -79,7 +79,7 @@ public class RemoteStoreSettings { private volatile TimeValue clusterRemoteTranslogBufferInterval; private volatile int minRemoteSegmentMetadataFiles; private volatile TimeValue clusterRemoteTranslogTransferTimeout; - private volatile int maxRemoteReferencedTranslogFiles; + private volatile int maxRemoteTranslogReaders; public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); @@ -100,8 +100,8 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { this::setClusterRemoteTranslogTransferTimeout ); - maxRemoteReferencedTranslogFiles = CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.get(settings); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES, this::setMaxRemoteReferencedTranslogFiles); + maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders); } public TimeValue getClusterRemoteTranslogBufferInterval() { @@ -128,11 +128,11 @@ private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTran this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout; } - public int getMaxRemoteReferencedTranslogFiles() { - return maxRemoteReferencedTranslogFiles; + public int getMaxRemoteTranslogReaders() { + return maxRemoteTranslogReaders; } - private void setMaxRemoteReferencedTranslogFiles(int maxRemoteReferencedTranslogFiles) { - this.maxRemoteReferencedTranslogFiles = maxRemoteReferencedTranslogFiles; + private void setMaxRemoteTranslogReaders(int maxRemoteTranslogReaders) { + this.maxRemoteTranslogReaders = maxRemoteTranslogReaders; } } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index f119f74d6b14d..499fcc3ec5b28 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -4974,7 +4974,7 @@ public void testShouldRefreshOnTooManyRemoteTranslogFiles() throws Exception { final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); RemoteStoreSettings remoteStoreSettings = primaryShard.getRemoteStoreSettings(); - final long numDocs = remoteStoreSettings.getMaxRemoteReferencedTranslogFiles(); + final long numDocs = remoteStoreSettings.getMaxRemoteTranslogReaders(); assertFalse(primaryShard.shouldRefreshShard()); diff --git a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java index 54d539df04395..1892e36a0d1fc 100644 --- a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java @@ -99,21 +99,21 @@ public void testClusterRemoteTranslogTransferTimeout() { public void testMaxRemoteReferencedTranslogFiles() { // Test default value - assertEquals(300, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles()); + assertEquals(300, remoteStoreSettings.getMaxRemoteTranslogReaders()); // Test override with valid value clusterSettings.applySettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "100").build() + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100").build() ); - assertEquals(100, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles()); + assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders()); // Test override with value less than minimum assertThrows( IllegalArgumentException.class, () -> clusterSettings.applySettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "0").build() + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "0").build() ) ); - assertEquals(100, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles()); + assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders()); } } From d1ffa62b36f734a1dd2e65c49dad7632cf2eaec3 Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Thu, 11 Apr 2024 21:40:59 +0530 Subject: [PATCH 07/11] Code refactoring Signed-off-by: Shubh Sahu --- .../refresh/RefreshRemoteTranslogFilesIT.java | 50 ------------------- .../opensearch/remotestore/RemoteStoreIT.java | 29 +++++++++++ 2 files changed, 29 insertions(+), 50 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java deleted file mode 100644 index 3f3612d52739f..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.admin.indices.refresh; - -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.opensearch.common.settings.Settings; -import org.opensearch.indices.RemoteStoreSettings; -import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; -import org.opensearch.test.OpenSearchIntegTestCase; - -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RefreshRemoteTranslogFilesIT extends RemoteStoreBaseIntegTestCase { - - protected final String INDEX_NAME = "remote-store-test-idx-1"; - - public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception { - - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(1).get(0); - createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); - ensureGreen(INDEX_NAME); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "5") - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - // indexing 35 documents (7 bulk requests), which should trigger refresh, and hence number of documents(searchable) should be 35. - // Here refresh will be triggered on 6th and 7th bulk request. One extra since translogs will be marked - // unreferenced after 6th refresh completes and will be trimmed on 7th bulk request call. - for (int i = 0; i < 7; i++) { - indexBulk(INDEX_NAME, 5); - } - - // refresh will not trigger here, hence total searchable documents will be 35 (not 40) - indexBulk(INDEX_NAME, 5); - - long currentDocCount = client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value; - assertEquals(35, currentDocCount); - } -} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index b767ffff05e3a..f54b102ca478a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -789,4 +789,33 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep docs + moreDocs + uncommittedOps ); } + + public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception { + + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "5") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // indexing 35 documents (7 bulk requests), which should trigger refresh, and hence number of documents(searchable) should be 35. + // Here refresh will be triggered on 6th and 7th bulk request. One extra since translogs will be marked + // unreferenced after 6th refresh completes and will be trimmed on 7th bulk request call. + for (int i = 0; i < 7; i++) { + indexBulk(INDEX_NAME, 5); + } + + assertBusy(() -> assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), 35), 30, TimeUnit.SECONDS); + + // refresh will not trigger here, hence total searchable documents will be 35 (not 40) + indexBulk(INDEX_NAME, 5); + + long currentDocCount = client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value; + assertEquals(35, currentDocCount); + } } From d0c2dd29632af6807461611b6ea893298540eaf9 Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Tue, 16 Apr 2024 00:32:47 +0530 Subject: [PATCH 08/11] Changed logic to do flush instead of refresh Signed-off-by: Shubh Sahu --- .../opensearch/remotestore/RemoteStoreIT.java | 30 ++++++++-------- .../index/engine/InternalEngine.java | 2 +- .../opensearch/index/shard/IndexShard.java | 35 ++++-------------- .../translog/InternalTranslogManager.java | 16 +++++++-- .../index/translog/NoOpTranslogManager.java | 4 +-- .../index/translog/RemoteFsTranslog.java | 6 ++-- .../opensearch/index/translog/Translog.java | 8 +++-- .../index/translog/TranslogManager.java | 2 +- .../indices/RemoteStoreSettings.java | 4 +-- .../index/shard/IndexShardTests.java | 36 ------------------- ...RemoteStoreSettingsDynamicUpdateTests.java | 10 +++--- 11 files changed, 55 insertions(+), 98 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index f54b102ca478a..a01590d8f665c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -790,32 +790,34 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep ); } - public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception { - + public void testFlushOnTooManyRemoteTranslogFiles() throws Exception { internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(1).get(0); + String datanode = internalCluster().startDataOnlyNodes(1).get(0); createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); ensureGreen(INDEX_NAME); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "5") + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100") ); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - // indexing 35 documents (7 bulk requests), which should trigger refresh, and hence number of documents(searchable) should be 35. - // Here refresh will be triggered on 6th and 7th bulk request. One extra since translogs will be marked - // unreferenced after 6th refresh completes and will be trimmed on 7th bulk request call. - for (int i = 0; i < 7; i++) { - indexBulk(INDEX_NAME, 5); + IndexShard indexShard = getIndexShard(datanode, INDEX_NAME); + + assertFalse(indexShard.shouldPeriodicallyFlush()); + assertEquals(0, indexShard.getNumberofTranslogReaders()); + + // indexing 100 documents (100 bulk requests), no flush will be triggered yet + for (int i = 0; i < 100; i++) { + indexBulk(INDEX_NAME, 1); } - assertBusy(() -> assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), 35), 30, TimeUnit.SECONDS); + assertEquals(100, indexShard.getNumberofTranslogReaders()); - // refresh will not trigger here, hence total searchable documents will be 35 (not 40) - indexBulk(INDEX_NAME, 5); + // Will flush and trim the translog readers + indexBulk(INDEX_NAME, 1); - long currentDocCount = client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value; - assertEquals(35, currentDocCount); + assertBusy(() -> assertEquals(0, indexShard.getNumberofTranslogReaders()), 30, TimeUnit.SECONDS); + assertFalse(indexShard.shouldPeriodicallyFlush()); } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 7bacec22fc850..631d4980236ee 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1875,7 +1875,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { logger.trace("finished commit for flush"); // a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved - logger.debug( + logger.info( "new commit on flush, hasUncommittedChanges:{}, force:{}, shouldPeriodicallyFlush:{}", hasUncommittedChanges, force, diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4d8a805723110..87b5b1afe44d2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2911,7 +2911,7 @@ public void restoreFromRepository(Repository repository, ActionListener * * @return {@code true} if the engine should be flushed */ - boolean shouldPeriodicallyFlush() { + public boolean shouldPeriodicallyFlush() { final Engine engine = getEngineOrNull(); if (engine != null) { try { @@ -4484,26 +4484,17 @@ public Durability getTranslogDurability() { // we can not protect with a lock since we "release" on a different thread private final AtomicBoolean flushOrRollRunning = new AtomicBoolean(); - /** - * Checks if the shard need to be refreshed depending on translog constraints. - * each translog type can have it's own decider - * @return {@code true} if the shard should be refreshed - */ - public boolean shouldRefreshShard() { + // For testing purpose + public int getNumberofTranslogReaders() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - return engine.translogManager().shouldRefreshShard(); + return engine.translogManager().getNumberofTranslogReaders(); } catch (final AlreadyClosedException e) { - // we are already closed, no need to Refresh + // we are already closed } } - return false; - } - - private void maybeRefreshShard(String source) { - verifyNotClosed(); - getEngine().maybeRefresh(source); + return -1; } /** @@ -4571,20 +4562,6 @@ public void onAfter() { flushOrRollRunning.compareAndSet(true, false); } } - } else if (shouldRefreshShard()) { - logger.debug("submitting async Refresh request"); - final AbstractRunnable refreshRunnable = new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.warn("refresh failed after translog manager decided to refresh the shard", e); - } - - @Override - protected void doRun() throws Exception { - maybeRefreshShard("Translog manager decided to refresh the shard"); - } - }; - threadPool.executor(ThreadPool.Names.REFRESH).execute(refreshRunnable); } } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 2d90c59d8dd1f..1d87b571fd86f 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -437,6 +437,13 @@ public String getTranslogUUID() { * @return if the translog should be flushed */ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) { + /* + * This triggers flush if number of translog files have breached a threshold. + * each translog type can have it's own decider. + */ + if (translog.shouldFlushOnMaxTranslogFiles()) { + return true; + } // This is the minimum seqNo that is referred in translog and considered for calculating translog size long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1); final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration; @@ -470,8 +477,11 @@ public void close() throws IOException { IOUtils.closeWhileHandlingException(translog); } - @Override - public boolean shouldRefreshShard() { - return getTranslog(true).shouldRefreshShard(); + /** + * Retrieves the number of translog readers + * @return number of translog readers + */ + public int getNumberofTranslogReaders() { + return translog.getNumberofTranslogReaders(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index b4a78baea51bd..740d97e587168 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -136,7 +136,7 @@ public Translog.TranslogGeneration getTranslogGeneration() { } @Override - public boolean shouldRefreshShard() { - return false; + public int getNumberofTranslogReaders() { + return 0; } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 55b78446fb70a..28c1a40d935c5 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -654,13 +654,13 @@ int availablePermits() { } /** - * Checks whether or not the shard should be refreshed. + * Checks whether or not the shard should be flushed based on translog files. * This checks if number of translog files breaches the threshold count determined by * {@code cluster.remote_store.translog.max_readers} setting - * @return {@code true} if the shard should be refreshed + * @return {@code true} if the shard should be flushed */ @Override - public boolean shouldRefreshShard() { + public boolean shouldFlushOnMaxTranslogFiles() { return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 2a13b421fa562..a6ef62d3ee067 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2054,11 +2054,15 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi } /** - * Checks whether or not the shard should be refreshed. + * Checks whether or not the shard should be flushed based on translog files. * each translog type can have it's own decider * @return {@code true} if the shard should be refreshed */ - public boolean shouldRefreshShard() { + public boolean shouldFlushOnMaxTranslogFiles() { return false; } + + public int getNumberofTranslogReaders() { + return readers.size(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index d3e9cbbf95dec..899ae2b6cbdd4 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -143,5 +143,5 @@ public interface TranslogManager { Translog.TranslogGeneration getTranslogGeneration(); - boolean shouldRefreshShard(); + int getNumberofTranslogReaders(); } diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 4190da64fb0f2..34a4db4f79f90 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -70,8 +70,8 @@ public class RemoteStoreSettings { */ public static final Setting CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting( "cluster.remote_store.translog.max_readers", - 300, - 1, + 1000, + 100, Property.Dynamic, Property.NodeScope ); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 499fcc3ec5b28..e5bfa8caee79a 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -144,7 +144,6 @@ import org.opensearch.index.translog.TranslogStats; import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.indices.IndicesQueryCache; -import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; @@ -4958,39 +4957,4 @@ private static void assertRemoteSegmentStats( assertTrue(remoteSegmentStats.getTotalRejections() > 0); assertEquals(remoteSegmentTransferTracker.getRejectionCount(), remoteSegmentStats.getTotalRejections()); } - - public void testShouldRefreshOnTooManyRemoteTranslogFiles() throws Exception { - - Settings primarySettings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "seg-test") - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) - .build(); - - final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); - RemoteStoreSettings remoteStoreSettings = primaryShard.getRemoteStoreSettings(); - final long numDocs = remoteStoreSettings.getMaxRemoteTranslogReaders(); - - assertFalse(primaryShard.shouldRefreshShard()); - - for (long i = 0; i < numDocs; i++) { - indexDoc(primaryShard, "_doc", Long.toString(i), "{}"); - } - - assertTrue(primaryShard.shouldRefreshShard()); - assertBusy(() -> { - primaryShard.afterWriteOperation(); - try (Engine.Searcher searcher = primaryShard.acquireSearcher("test")) { - assertEquals(numDocs, searcher.getIndexReader().numDocs()); - } - }); - - closeShards(primaryShard); - } - } diff --git a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java index 1892e36a0d1fc..f89fd3df6e340 100644 --- a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java @@ -99,21 +99,21 @@ public void testClusterRemoteTranslogTransferTimeout() { public void testMaxRemoteReferencedTranslogFiles() { // Test default value - assertEquals(300, remoteStoreSettings.getMaxRemoteTranslogReaders()); + assertEquals(1000, remoteStoreSettings.getMaxRemoteTranslogReaders()); // Test override with valid value clusterSettings.applySettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100").build() + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "500").build() ); - assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders()); + assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders()); // Test override with value less than minimum assertThrows( IllegalArgumentException.class, () -> clusterSettings.applySettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "0").build() + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "99").build() ) ); - assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders()); + assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders()); } } From 6d93d6a1276adcff7225adff2b3778fb7c4bc6ee Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Tue, 16 Apr 2024 00:35:41 +0530 Subject: [PATCH 09/11] rebuild jenkins empty commit --- .../main/java/org/opensearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 631d4980236ee..7bacec22fc850 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1875,7 +1875,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { logger.trace("finished commit for flush"); // a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved - logger.info( + logger.debug( "new commit on flush, hasUncommittedChanges:{}, force:{}, shouldPeriodicallyFlush:{}", hasUncommittedChanges, force, From 312c58a2bee27491fa459dd2e12337df09429afb Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Thu, 18 Apr 2024 14:06:29 +0530 Subject: [PATCH 10/11] Addresed comment on PR Signed-off-by: Shubh Sahu --- .../opensearch/remotestore/RemoteStoreIT.java | 24 ++++++++++++++----- .../opensearch/index/shard/IndexShard.java | 15 +----------- .../translog/InternalTranslogManager.java | 13 ++-------- .../index/translog/NoOpTranslogManager.java | 5 ---- .../index/translog/RemoteFsTranslog.java | 2 +- .../opensearch/index/translog/Translog.java | 8 ++----- .../index/translog/TranslogManager.java | 2 -- .../indices/RemoteStoreSettings.java | 2 +- 8 files changed, 25 insertions(+), 46 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index a01590d8f665c..6578e5d373d69 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -30,6 +30,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; +import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.Translog.Durability; import org.opensearch.indices.IndicesService; import org.opensearch.indices.RemoteStoreSettings; @@ -61,6 +62,7 @@ import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; +import static org.opensearch.index.shard.IndexShardTestCase.getTranslog; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -803,21 +805,31 @@ public void testFlushOnTooManyRemoteTranslogFiles() throws Exception { assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); IndexShard indexShard = getIndexShard(datanode, INDEX_NAME); - + Path translogLocation = getTranslog(indexShard).location(); assertFalse(indexShard.shouldPeriodicallyFlush()); - assertEquals(0, indexShard.getNumberofTranslogReaders()); + + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 1L); + } // indexing 100 documents (100 bulk requests), no flush will be triggered yet for (int i = 0; i < 100; i++) { indexBulk(INDEX_NAME, 1); } - assertEquals(100, indexShard.getNumberofTranslogReaders()); - + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 101L); + } // Will flush and trim the translog readers indexBulk(INDEX_NAME, 1); - assertBusy(() -> assertEquals(0, indexShard.getNumberofTranslogReaders()), 30, TimeUnit.SECONDS); - assertFalse(indexShard.shouldPeriodicallyFlush()); + assertBusy(() -> { + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 1L); + } + }, 30, TimeUnit.SECONDS); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 87b5b1afe44d2..3e0284267ec29 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4484,23 +4484,10 @@ public Durability getTranslogDurability() { // we can not protect with a lock since we "release" on a different thread private final AtomicBoolean flushOrRollRunning = new AtomicBoolean(); - // For testing purpose - public int getNumberofTranslogReaders() { - final Engine engine = getEngineOrNull(); - if (engine != null) { - try { - return engine.translogManager().getNumberofTranslogReaders(); - } catch (final AlreadyClosedException e) { - // we are already closed - } - } - return -1; - } - /** * Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be * executed asynchronously on the flush thread pool. - * Also schedules a refresh if required, decided by translog manager + * Can also schedule a flush if decided by translog manager */ public void afterWriteOperation() { if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) { diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 1d87b571fd86f..e2210217672ef 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -438,10 +438,9 @@ public String getTranslogUUID() { */ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) { /* - * This triggers flush if number of translog files have breached a threshold. - * each translog type can have it's own decider. + * This can trigger flush depending upon translog's implementation */ - if (translog.shouldFlushOnMaxTranslogFiles()) { + if (translog.shouldFlush()) { return true; } // This is the minimum seqNo that is referred in translog and considered for calculating translog size @@ -476,12 +475,4 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl public void close() throws IOException { IOUtils.closeWhileHandlingException(translog); } - - /** - * Retrieves the number of translog readers - * @return number of translog readers - */ - public int getNumberofTranslogReaders() { - return translog.getNumberofTranslogReaders(); - } } diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 740d97e587168..b4aa7865570a6 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -134,9 +134,4 @@ public Releasable drainSync() { public Translog.TranslogGeneration getTranslogGeneration() { return null; } - - @Override - public int getNumberofTranslogReaders() { - return 0; - } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 28c1a40d935c5..20b9d920b8b04 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -660,7 +660,7 @@ int availablePermits() { * @return {@code true} if the shard should be flushed */ @Override - public boolean shouldFlushOnMaxTranslogFiles() { + public boolean shouldFlush() { return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index a6ef62d3ee067..eb16b9f761d78 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2056,13 +2056,9 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi /** * Checks whether or not the shard should be flushed based on translog files. * each translog type can have it's own decider - * @return {@code true} if the shard should be refreshed + * @return {@code true} if the shard should be flushed */ - public boolean shouldFlushOnMaxTranslogFiles() { + public boolean shouldFlush() { return false; } - - public int getNumberofTranslogReaders() { - return readers.size(); - } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 899ae2b6cbdd4..e1a0b7d1c1293 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -142,6 +142,4 @@ public interface TranslogManager { Releasable drainSync(); Translog.TranslogGeneration getTranslogGeneration(); - - int getNumberofTranslogReaders(); } diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 34a4db4f79f90..f312cb393eceb 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -66,7 +66,7 @@ public class RemoteStoreSettings { ); /** - * Controls the maximum referenced remote translog files. If breached the shard will be Refreshed. + * Controls the maximum referenced remote translog files. If breached the shard will be flushed. */ public static final Setting CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting( "cluster.remote_store.translog.max_readers", From 8a81cb70a43883d685f73542df6786fbc30079be Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Fri, 19 Apr 2024 15:02:45 +0530 Subject: [PATCH 11/11] Addressed comment on PR Signed-off-by: Shubh Sahu --- CHANGELOG.md | 2 +- .../java/org/opensearch/index/translog/RemoteFsTranslog.java | 2 +- .../src/main/java/org/opensearch/index/translog/Translog.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 84e83309f5439..0ffff08d2e0b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,13 +17,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656)) - [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704)) - Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686)) -- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992)) - Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967)) - [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531)) - Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868)) - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) - [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) +- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index ccf23433fe8ec..8c01791af2c9e 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -683,7 +683,7 @@ int availablePermits() { * @return {@code true} if the shard should be flushed */ @Override - public boolean shouldFlush() { + protected boolean shouldFlush() { return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index be0b4b2ab62a7..842e9c77d2350 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2088,7 +2088,7 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi * each translog type can have it's own decider * @return {@code true} if the shard should be flushed */ - public boolean shouldFlush() { + protected boolean shouldFlush() { return false; } }