diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d8af16db72a7..4c1591a894244 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) - [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) +- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 78441f74f6b4f..ca0ae3ca9a700 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -30,6 +30,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; +import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.Translog.Durability; import org.opensearch.indices.IndicesService; import org.opensearch.indices.RemoteStoreSettings; @@ -63,6 +64,7 @@ import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; +import static org.opensearch.index.shard.IndexShardTestCase.getTranslog; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -859,4 +861,45 @@ public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception { refresh(INDEX_NAME); assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs + 15); } + + public void testFlushOnTooManyRemoteTranslogFiles() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + String datanode = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + IndexShard indexShard = getIndexShard(datanode, INDEX_NAME); + Path translogLocation = getTranslog(indexShard).location(); + assertFalse(indexShard.shouldPeriodicallyFlush()); + + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 1L); + } + + // indexing 100 documents (100 bulk requests), no flush will be triggered yet + for (int i = 0; i < 100; i++) { + indexBulk(INDEX_NAME, 1); + } + + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 101L); + } + // Will flush and trim the translog readers + indexBulk(INDEX_NAME, 1); + + assertBusy(() -> { + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 1L); + } + }, 30, TimeUnit.SECONDS); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index dab0f6bcf1c85..c70f22be518f2 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -732,7 +732,8 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, - RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING + RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, + RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS ) ) ); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 26dbbbcdee7c0..3e0284267ec29 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2911,7 +2911,7 @@ public void restoreFromRepository(Repository repository, ActionListener * * @return {@code true} if the engine should be flushed */ - boolean shouldPeriodicallyFlush() { + public boolean shouldPeriodicallyFlush() { final Engine engine = getEngineOrNull(); if (engine != null) { try { @@ -4487,6 +4487,7 @@ public Durability getTranslogDurability() { /** * Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be * executed asynchronously on the flush thread pool. + * Can also schedule a flush if decided by translog manager */ public void afterWriteOperation() { if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) { diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index a22c538286a88..e2210217672ef 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -437,6 +437,12 @@ public String getTranslogUUID() { * @return if the translog should be flushed */ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) { + /* + * This can trigger flush depending upon translog's implementation + */ + if (translog.shouldFlush()) { + return true; + } // This is the minimum seqNo that is referred in translog and considered for calculating translog size long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1); final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration; diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index da905b9605dfd..8c01791af2c9e 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -675,4 +675,15 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi int availablePermits() { return syncPermit.availablePermits(); } + + /** + * Checks whether or not the shard should be flushed based on translog files. + * This checks if number of translog files breaches the threshold count determined by + * {@code cluster.remote_store.translog.max_readers} setting + * @return {@code true} if the shard should be flushed + */ + @Override + protected boolean shouldFlush() { + return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index c653605f8fa10..842e9c77d2350 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2082,4 +2082,13 @@ public static String createEmptyTranslog( public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) { return minUnrefCheckpointInLastCommit; } + + /** + * Checks whether or not the shard should be flushed based on translog files. + * each translog type can have it's own decider + * @return {@code true} if the shard should be flushed + */ + protected boolean shouldFlush() { + return false; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 1087244623b87..47638f44fd6fc 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -585,4 +585,8 @@ public void onFailure(Exception e) { throw e; } } + + public int getMaxRemoteTranslogReadersSettings() { + return this.remoteStoreSettings.getMaxRemoteTranslogReaders(); + } } diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index e0a9f7a9e05c1..0bd4c7aedfc03 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -94,11 +94,23 @@ public class RemoteStoreSettings { Property.Dynamic ); + /** + * Controls the maximum referenced remote translog files. If breached the shard will be flushed. + */ + public static final Setting CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting( + "cluster.remote_store.translog.max_readers", + 1000, + 100, + Property.Dynamic, + Property.NodeScope + ); + private volatile TimeValue clusterRemoteTranslogBufferInterval; private volatile int minRemoteSegmentMetadataFiles; private volatile TimeValue clusterRemoteTranslogTransferTimeout; private volatile RemoteStoreEnums.PathType pathType; private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm; + private volatile int maxRemoteTranslogReaders; public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); @@ -124,6 +136,9 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { pathHashAlgorithm = clusterSettings.get(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING); clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, this::setPathHashAlgorithm); + + maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders); } public TimeValue getClusterRemoteTranslogBufferInterval() { @@ -167,4 +182,12 @@ private void setPathType(RemoteStoreEnums.PathType pathType) { private void setPathHashAlgorithm(RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm) { this.pathHashAlgorithm = pathHashAlgorithm; } + + public int getMaxRemoteTranslogReaders() { + return maxRemoteTranslogReaders; + } + + private void setMaxRemoteTranslogReaders(int maxRemoteTranslogReaders) { + this.maxRemoteTranslogReaders = maxRemoteTranslogReaders; + } } diff --git a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java index 8a77d97f88d67..f89fd3df6e340 100644 --- a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java @@ -96,4 +96,24 @@ public void testClusterRemoteTranslogTransferTimeout() { ); assertEquals(TimeValue.timeValueSeconds(40), remoteStoreSettings.getClusterRemoteTranslogTransferTimeout()); } + + public void testMaxRemoteReferencedTranslogFiles() { + // Test default value + assertEquals(1000, remoteStoreSettings.getMaxRemoteTranslogReaders()); + + // Test override with valid value + clusterSettings.applySettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "500").build() + ); + assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders()); + + // Test override with value less than minimum + assertThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "99").build() + ) + ); + assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders()); + } }