diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java new file mode 100644 index 0000000000000..04eab9dcb2284 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/refresh/RefreshRemoteTranslogFilesIT.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.refresh; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.common.settings.Settings; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RefreshRemoteTranslogFilesIT extends RemoteStoreBaseIntegTestCase { + + protected final String INDEX_NAME = "remote-store-test-idx-1"; + + public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception { + + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "5") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // indexing 35 documents (7 bulk requests), which should trigger refresh, and hence number of documents(searchable) should be 35. + // Here refresh will be triggered on 6th and 7th bulk request. One extra since translogs will be marked + // unreferenced after 6th refresh completes and will be trimmed on 7th bulk request call. + for (int i = 0; i < 7; i++) { + indexBulk(INDEX_NAME, 5); + } + + // refresh will not trigger here, hence total searchable documents will be 35 (not 40) + indexBulk(INDEX_NAME, 5); + + long currentDocCount = client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value; + assertEquals(35, currentDocCount); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 0c35f91121059..f34d3cf1f3ad9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -40,7 +40,7 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase { protected Path segmentRepoPath; protected Path translogRepoPath; - boolean addRemote = false; + protected boolean addRemote = false; Settings extraSettings = Settings.EMPTY; private final List documentKeys = List.of( diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 9d763c970c3e7..575089eb2d904 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -728,7 +728,8 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, - RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING + RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, + RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES ) ) ); diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 29ba5d7504e26..c6c312d6b6eea 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -237,8 +237,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings { // Settings for concurrent segment search IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, - IndexSettings.INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES, - // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 0d5dd1a303e92..82875564c1c07 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -719,19 +719,6 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); - /** - * Index setting describing the maximum number of uncommitted translog files at a time. - * If breached the shard will be Refreshed, this is to control large number of tranlog files - * downloads during recovery. - */ - public static final Setting INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES = Setting.intSetting( - "index.max_uncommitted_translog_files", - 300, - 1, - Property.Dynamic, - Property.IndexScope - ); - private final Index index; private final Version version; private final Logger logger; @@ -815,7 +802,6 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { private volatile long mappingTotalFieldsLimit; private volatile long mappingDepthLimit; private volatile long mappingFieldNameLengthLimit; - private volatile int maxUncommittedTranslogFiles; /** * The maximum number of refresh listeners allows on this shard. @@ -994,7 +980,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY)); checkPendingFlushEnabled = scopedSettings.get(INDEX_CHECK_PENDING_FLUSH_ENABLED); defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE); - maxUncommittedTranslogFiles = scopedSettings.get(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES); /* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7). * For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type * to higher bytes size like integer to long. This behavior was changed from OpenSearch 2.7 version not to @@ -1119,15 +1104,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, this::setDocIdFuzzySetFalsePositiveProbability ); - scopedSettings.addSettingsUpdateConsumer(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES, this::setMaxUncommittedTranslogFiles); - } - - private void setMaxUncommittedTranslogFiles(int maxUncommittedTranslogFiles) { - this.maxUncommittedTranslogFiles = maxUncommittedTranslogFiles; - } - - public int getMaxUncommittedTranslogFiles() { - return maxUncommittedTranslogFiles; } private void setSearchIdleAfter(TimeValue searchIdleAfter) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 99e0b750b231a..4882539510f26 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4489,11 +4489,11 @@ public Durability getTranslogDurability() { * threshold count determined by {@code index.translog.max_uncommitted_files_threshold} * @return {@code true} if the shard should be Refreshed */ - boolean shouldRefreshShard() { + public boolean shouldRefreshShard() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - return engine.translogManager().shouldRefreshShard(indexSettings.getMaxUncommittedTranslogFiles()); + return engine.translogManager().shouldRefreshShard(); } catch (final AlreadyClosedException e) { // we are already closed, no need to Refresh } @@ -4570,9 +4570,8 @@ public void onAfter() { } } } else if (shouldRefreshShard() && isRefreshRunning.compareAndSet(false, true)) { - if (shouldRefreshShard()) { - logger.info("submitting async Refresh request"); + logger.debug("submitting async Refresh request"); final AbstractRunnable _refresh = new AbstractRunnable() { @Override public void onFailure(Exception e) { diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 34044b45bf859..2d90c59d8dd1f 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -471,7 +471,7 @@ public void close() throws IOException { } @Override - public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { - return getTranslog(true).shouldRefreshShard(maxUncommittedTranslogFilesThreshold); + public boolean shouldRefreshShard() { + return getTranslog(true).shouldRefreshShard(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index d918b0edca120..b4a78baea51bd 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -136,7 +136,7 @@ public Translog.TranslogGeneration getTranslogGeneration() { } @Override - public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + public boolean shouldRefreshShard() { return false; } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 6154cb3454699..4f38363d6a353 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -660,7 +660,7 @@ int availablePermits() { * @return {@code true} if the shard should be Refreshed */ @Override - public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { - return readers.size() > maxUncommittedTranslogFilesThreshold; + public boolean shouldRefreshShard() { + return readers.size() >= translogTransferManager.getMaxRemoteReferencedTranslogFilesSettings(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index ced8ebc450cfa..f4468a0b0ad04 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2059,7 +2059,7 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi * * @return {@code true} if the shard should be Refreshed */ - public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + public boolean shouldRefreshShard() { return false; } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 25d9397287d91..d3e9cbbf95dec 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -143,5 +143,5 @@ public interface TranslogManager { Translog.TranslogGeneration getTranslogGeneration(); - boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold); + boolean shouldRefreshShard(); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 1087244623b87..f13cf6c06805d 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -585,4 +585,8 @@ public void onFailure(Exception e) { throw e; } } + + public int getMaxRemoteReferencedTranslogFilesSettings() { + return this.remoteStoreSettings.getMaxRemoteReferencedTranslogFiles(); + } } diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 7f2121093f8e8..c545af9008fc8 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -65,9 +65,21 @@ public class RemoteStoreSettings { Property.Dynamic ); + /** + * Controls the maximum referenced remote translog files. If breached the shard will be Refreshed. + */ + public static final Setting CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES = Setting.intSetting( + "cluster.remote_store.max_referenced_translog_files", + 300, + 1, + Property.Dynamic, + Property.NodeScope + ); + private volatile TimeValue clusterRemoteTranslogBufferInterval; private volatile int minRemoteSegmentMetadataFiles; private volatile TimeValue clusterRemoteTranslogTransferTimeout; + private volatile int maxRemoteReferencedTranslogFiles; public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); @@ -87,6 +99,9 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, this::setClusterRemoteTranslogTransferTimeout ); + + maxRemoteReferencedTranslogFiles = CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES, this::setMaxRemoteReferencedTranslogFiles); } public TimeValue getClusterRemoteTranslogBufferInterval() { @@ -112,4 +127,12 @@ public TimeValue getClusterRemoteTranslogTransferTimeout() { private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) { this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout; } + + public int getMaxRemoteReferencedTranslogFiles() { + return maxRemoteReferencedTranslogFiles; + } + + private void setMaxRemoteReferencedTranslogFiles(int maxRemoteReferencedTranslogFiles) { + this.maxRemoteReferencedTranslogFiles = maxRemoteReferencedTranslogFiles; + } } diff --git a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java index 8a77d97f88d67..54d539df04395 100644 --- a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java @@ -96,4 +96,24 @@ public void testClusterRemoteTranslogTransferTimeout() { ); assertEquals(TimeValue.timeValueSeconds(40), remoteStoreSettings.getClusterRemoteTranslogTransferTimeout()); } + + public void testMaxRemoteReferencedTranslogFiles() { + // Test default value + assertEquals(300, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles()); + + // Test override with valid value + clusterSettings.applySettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "100").build() + ); + assertEquals(100, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles()); + + // Test override with value less than minimum + assertThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "0").build() + ) + ); + assertEquals(100, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles()); + } }