diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index f54b102ca478a..a01590d8f665c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -790,32 +790,34 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep ); } - public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception { - + public void testFlushOnTooManyRemoteTranslogFiles() throws Exception { internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(1).get(0); + String datanode = internalCluster().startDataOnlyNodes(1).get(0); createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); ensureGreen(INDEX_NAME); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "5") + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100") ); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - // indexing 35 documents (7 bulk requests), which should trigger refresh, and hence number of documents(searchable) should be 35. - // Here refresh will be triggered on 6th and 7th bulk request. One extra since translogs will be marked - // unreferenced after 6th refresh completes and will be trimmed on 7th bulk request call. - for (int i = 0; i < 7; i++) { - indexBulk(INDEX_NAME, 5); + IndexShard indexShard = getIndexShard(datanode, INDEX_NAME); + + assertFalse(indexShard.shouldPeriodicallyFlush()); + assertEquals(0, indexShard.getNumberofTranslogReaders()); + + // indexing 100 documents (100 bulk requests), no flush will be triggered yet + for (int i = 0; i < 100; i++) { + indexBulk(INDEX_NAME, 1); } - assertBusy(() -> assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), 35), 30, TimeUnit.SECONDS); + assertEquals(100, indexShard.getNumberofTranslogReaders()); - // refresh will not trigger here, hence total searchable documents will be 35 (not 40) - indexBulk(INDEX_NAME, 5); + // Will flush and trim the translog readers + indexBulk(INDEX_NAME, 1); - long currentDocCount = client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value; - assertEquals(35, currentDocCount); + assertBusy(() -> assertEquals(0, indexShard.getNumberofTranslogReaders()), 30, TimeUnit.SECONDS); + assertFalse(indexShard.shouldPeriodicallyFlush()); } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 7bacec22fc850..631d4980236ee 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1875,7 +1875,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { logger.trace("finished commit for flush"); // a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved - logger.debug( + logger.info( "new commit on flush, hasUncommittedChanges:{}, force:{}, shouldPeriodicallyFlush:{}", hasUncommittedChanges, force, diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4d8a805723110..87b5b1afe44d2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2911,7 +2911,7 @@ public void restoreFromRepository(Repository repository, ActionListener * * @return {@code true} if the engine should be flushed */ - boolean shouldPeriodicallyFlush() { + public boolean shouldPeriodicallyFlush() { final Engine engine = getEngineOrNull(); if (engine != null) { try { @@ -4484,26 +4484,17 @@ public Durability getTranslogDurability() { // we can not protect with a lock since we "release" on a different thread private final AtomicBoolean flushOrRollRunning = new AtomicBoolean(); - /** - * Checks if the shard need to be refreshed depending on translog constraints. - * each translog type can have it's own decider - * @return {@code true} if the shard should be refreshed - */ - public boolean shouldRefreshShard() { + // For testing purpose + public int getNumberofTranslogReaders() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - return engine.translogManager().shouldRefreshShard(); + return engine.translogManager().getNumberofTranslogReaders(); } catch (final AlreadyClosedException e) { - // we are already closed, no need to Refresh + // we are already closed } } - return false; - } - - private void maybeRefreshShard(String source) { - verifyNotClosed(); - getEngine().maybeRefresh(source); + return -1; } /** @@ -4571,20 +4562,6 @@ public void onAfter() { flushOrRollRunning.compareAndSet(true, false); } } - } else if (shouldRefreshShard()) { - logger.debug("submitting async Refresh request"); - final AbstractRunnable refreshRunnable = new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.warn("refresh failed after translog manager decided to refresh the shard", e); - } - - @Override - protected void doRun() throws Exception { - maybeRefreshShard("Translog manager decided to refresh the shard"); - } - }; - threadPool.executor(ThreadPool.Names.REFRESH).execute(refreshRunnable); } } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 2d90c59d8dd1f..1d87b571fd86f 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -437,6 +437,13 @@ public String getTranslogUUID() { * @return if the translog should be flushed */ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) { + /* + * This triggers flush if number of translog files have breached a threshold. + * each translog type can have it's own decider. + */ + if (translog.shouldFlushOnMaxTranslogFiles()) { + return true; + } // This is the minimum seqNo that is referred in translog and considered for calculating translog size long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1); final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration; @@ -470,8 +477,11 @@ public void close() throws IOException { IOUtils.closeWhileHandlingException(translog); } - @Override - public boolean shouldRefreshShard() { - return getTranslog(true).shouldRefreshShard(); + /** + * Retrieves the number of translog readers + * @return number of translog readers + */ + public int getNumberofTranslogReaders() { + return translog.getNumberofTranslogReaders(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index b4a78baea51bd..740d97e587168 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -136,7 +136,7 @@ public Translog.TranslogGeneration getTranslogGeneration() { } @Override - public boolean shouldRefreshShard() { - return false; + public int getNumberofTranslogReaders() { + return 0; } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 55b78446fb70a..28c1a40d935c5 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -654,13 +654,13 @@ int availablePermits() { } /** - * Checks whether or not the shard should be refreshed. + * Checks whether or not the shard should be flushed based on translog files. * This checks if number of translog files breaches the threshold count determined by * {@code cluster.remote_store.translog.max_readers} setting - * @return {@code true} if the shard should be refreshed + * @return {@code true} if the shard should be flushed */ @Override - public boolean shouldRefreshShard() { + public boolean shouldFlushOnMaxTranslogFiles() { return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 2a13b421fa562..a6ef62d3ee067 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2054,11 +2054,15 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi } /** - * Checks whether or not the shard should be refreshed. + * Checks whether or not the shard should be flushed based on translog files. * each translog type can have it's own decider * @return {@code true} if the shard should be refreshed */ - public boolean shouldRefreshShard() { + public boolean shouldFlushOnMaxTranslogFiles() { return false; } + + public int getNumberofTranslogReaders() { + return readers.size(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index d3e9cbbf95dec..899ae2b6cbdd4 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -143,5 +143,5 @@ public interface TranslogManager { Translog.TranslogGeneration getTranslogGeneration(); - boolean shouldRefreshShard(); + int getNumberofTranslogReaders(); } diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 4190da64fb0f2..34a4db4f79f90 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -70,8 +70,8 @@ public class RemoteStoreSettings { */ public static final Setting CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting( "cluster.remote_store.translog.max_readers", - 300, - 1, + 1000, + 100, Property.Dynamic, Property.NodeScope ); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 499fcc3ec5b28..e5bfa8caee79a 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -144,7 +144,6 @@ import org.opensearch.index.translog.TranslogStats; import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.indices.IndicesQueryCache; -import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; @@ -4958,39 +4957,4 @@ private static void assertRemoteSegmentStats( assertTrue(remoteSegmentStats.getTotalRejections() > 0); assertEquals(remoteSegmentTransferTracker.getRejectionCount(), remoteSegmentStats.getTotalRejections()); } - - public void testShouldRefreshOnTooManyRemoteTranslogFiles() throws Exception { - - Settings primarySettings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "seg-test") - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) - .build(); - - final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); - RemoteStoreSettings remoteStoreSettings = primaryShard.getRemoteStoreSettings(); - final long numDocs = remoteStoreSettings.getMaxRemoteTranslogReaders(); - - assertFalse(primaryShard.shouldRefreshShard()); - - for (long i = 0; i < numDocs; i++) { - indexDoc(primaryShard, "_doc", Long.toString(i), "{}"); - } - - assertTrue(primaryShard.shouldRefreshShard()); - assertBusy(() -> { - primaryShard.afterWriteOperation(); - try (Engine.Searcher searcher = primaryShard.acquireSearcher("test")) { - assertEquals(numDocs, searcher.getIndexReader().numDocs()); - } - }); - - closeShards(primaryShard); - } - } diff --git a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java index 1892e36a0d1fc..f89fd3df6e340 100644 --- a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java @@ -99,21 +99,21 @@ public void testClusterRemoteTranslogTransferTimeout() { public void testMaxRemoteReferencedTranslogFiles() { // Test default value - assertEquals(300, remoteStoreSettings.getMaxRemoteTranslogReaders()); + assertEquals(1000, remoteStoreSettings.getMaxRemoteTranslogReaders()); // Test override with valid value clusterSettings.applySettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100").build() + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "500").build() ); - assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders()); + assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders()); // Test override with value less than minimum assertThrows( IllegalArgumentException.class, () -> clusterSettings.applySettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "0").build() + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "99").build() ) ); - assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders()); + assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders()); } }