diff --git a/.ci/bwcVersions b/.ci/bwcVersions index 1f80ed34d6c10..a738eb54e17f6 100644 --- a/.ci/bwcVersions +++ b/.ci/bwcVersions @@ -34,4 +34,5 @@ BWC_VERSION: - "2.14.0" - "2.14.1" - "2.15.0" + - "2.15.1" - "2.16.0" diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 5a2d08756c49f..8d69e98220b69 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -24,4 +24,4 @@ /.github/ @peternied -/MAINTAINERS.md @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @jed326 @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah +/MAINTAINERS.md @anasalkouz @andrross @ashking94 @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @jed326 @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b4c46b1910e7..5e694e4124619 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865)) - [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782)) - Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445)) +- Add allowlist setting for ingest-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) @@ -46,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix fs info reporting negative available size ([#11573](https://github.com/opensearch-project/OpenSearch/pull/11573)) - Add ListPitInfo::getKeepAlive() getter ([#14495](https://github.com/opensearch-project/OpenSearch/pull/14495)) - Fix FuzzyQuery in keyword field will use IndexOrDocValuesQuery when both of index and doc_value are true ([#14378](https://github.com/opensearch-project/OpenSearch/pull/14378)) +- Fix file cache initialization ([#14004](https://github.com/opensearch-project/OpenSearch/pull/14004)) - Handle NPE in GetResult if "found" field is missing ([#14552](https://github.com/opensearch-project/OpenSearch/pull/14552)) ### Security diff --git a/MAINTAINERS.md b/MAINTAINERS.md index 91b57a4cbc74e..3298ceb15463c 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -9,6 +9,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje | Anas Alkouz | [anasalkouz](https://github.com/anasalkouz) | Amazon | | Andrew Ross | [andrross](https://github.com/andrross) | Amazon | | Andriy Redko | [reta](https://github.com/reta) | Aiven | +| Ashish Singh | [ashking94](https://github.com/ashking94) | Amazon | | Bukhtawar Khan | [Bukhtawar](https://github.com/Bukhtawar) | Amazon | | Charlotte Henkle | [CEHENKLE](https://github.com/CEHENKLE) | Amazon | | Dan Widdis | [dbwiddis](https://github.com/dbwiddis) | Amazon | diff --git a/libs/core/src/main/java/org/opensearch/Version.java b/libs/core/src/main/java/org/opensearch/Version.java index 0cb2d4f867c12..da43894863432 100644 --- a/libs/core/src/main/java/org/opensearch/Version.java +++ b/libs/core/src/main/java/org/opensearch/Version.java @@ -105,6 +105,7 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_2_14_0 = new Version(2140099, org.apache.lucene.util.Version.LUCENE_9_10_0); public static final Version V_2_14_1 = new Version(2140199, org.apache.lucene.util.Version.LUCENE_9_10_0); public static final Version V_2_15_0 = new Version(2150099, org.apache.lucene.util.Version.LUCENE_9_10_0); + public static final Version V_2_15_1 = new Version(2150199, org.apache.lucene.util.Version.LUCENE_9_10_0); public static final Version V_2_16_0 = new Version(2160099, org.apache.lucene.util.Version.LUCENE_9_11_0); public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_12_0); public static final Version CURRENT = V_3_0_0; diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index b6d6913a9f8d4..f69c56808b2a1 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -195,7 +195,16 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> // and it only has to be loaded one time, we should report one miss and the rest hits. But, if we do stats in // getValueFromTieredCache(), // we will see all misses. Instead, handle stats in computeIfAbsent(). - Tuple cacheValueTuple = getValueFromTieredCache(false).apply(key); + Tuple cacheValueTuple; + CompletableFuture, V>> future = null; + try (ReleasableLock ignore = readLock.acquire()) { + cacheValueTuple = getValueFromTieredCache(false).apply(key); + if (cacheValueTuple == null) { + // Only one of the threads will succeed putting a future into map for the same key. + // Rest will fetch existing future and wait on that to complete. + future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>()); + } + } List heapDimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, TIER_DIMENSION_VALUE_ON_HEAP); List diskDimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, TIER_DIMENSION_VALUE_DISK); @@ -203,7 +212,7 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> // Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside. // This is needed as there can be many requests for the same key at the same time and we only want to load // the value once. - V value = compute(key, loader); + V value = compute(key, loader, future); // Handle stats if (loader.isLoaded()) { // The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache @@ -232,10 +241,8 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> return cacheValueTuple.v1(); } - private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { - // Only one of the threads will succeed putting a future into map for the same key. - // Rest will fetch existing future and wait on that to complete. - CompletableFuture, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>()); + private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader, CompletableFuture, V>> future) + throws Exception { // Handler to handle results post processing. Takes a tuple or exception as an input and returns // the value. Also before returning value, puts the value in cache. BiFunction, V>, Throwable, Void> handler = (pair, ex) -> { @@ -253,7 +260,7 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader logger.warn("Exception occurred while trying to compute the value", ex); } } - completableFutureMap.remove(key); // Remove key from map as not needed anymore. + completableFutureMap.remove(key);// Remove key from map as not needed anymore. return null; }; V value = null; diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index b9c7bbdb77d3d..c6440a1e1797f 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -760,7 +760,7 @@ public void testInvalidateAll() throws Exception { } public void testComputeIfAbsentConcurrently() throws Exception { - int onHeapCacheSize = randomIntBetween(100, 300); + int onHeapCacheSize = randomIntBetween(500, 700); int diskCacheSize = randomIntBetween(200, 400); int keyValueSize = 50; @@ -782,7 +782,7 @@ public void testComputeIfAbsentConcurrently() throws Exception { 0 ); - int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); + int numberOfSameKeys = randomIntBetween(400, onHeapCacheSize - 1); ICacheKey key = getICacheKey(UUID.randomUUID().toString()); String value = UUID.randomUUID().toString(); diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java index 162934efa6778..bf9e9b71b8491 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java @@ -58,10 +58,20 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; public class IngestCommonModulePlugin extends Plugin implements ActionPlugin, IngestPlugin { + static final Setting> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting( + "ingest.common.processors.allowed", + List.of(), + Function.identity(), + Setting.Property.NodeScope + ); + static final Setting WATCHDOG_INTERVAL = Setting.timeSetting( "ingest.grok.watchdog.interval", TimeValue.timeValueSeconds(1), @@ -77,7 +87,7 @@ public IngestCommonModulePlugin() {} @Override public Map getProcessors(Processor.Parameters parameters) { - Map processors = new HashMap<>(); + final Map processors = new HashMap<>(); processors.put(DateProcessor.TYPE, new DateProcessor.Factory(parameters.scriptService)); processors.put(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)); processors.put(AppendProcessor.TYPE, new AppendProcessor.Factory(parameters.scriptService)); @@ -110,7 +120,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory()); processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()); processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory()); - return Collections.unmodifiableMap(processors); + return filterForAllowlistSetting(parameters.env.settings(), processors); } @Override @@ -133,7 +143,7 @@ public List getRestHandlers( @Override public List> getSettings() { - return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME); + return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME, PROCESSORS_ALLOWLIST_SETTING); } private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) { @@ -147,4 +157,27 @@ private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters par ); } + private Map filterForAllowlistSetting(Settings settings, Map map) { + if (PROCESSORS_ALLOWLIST_SETTING.exists(settings) == false) { + return Map.copyOf(map); + } + final Set allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(settings)); + // Assert that no unknown processors are defined in the allowlist + final Set unknownAllowlistProcessors = allowlist.stream() + .filter(p -> map.containsKey(p) == false) + .collect(Collectors.toSet()); + if (unknownAllowlistProcessors.isEmpty() == false) { + throw new IllegalArgumentException( + "Processor(s) " + + unknownAllowlistProcessors + + " were defined in [" + + PROCESSORS_ALLOWLIST_SETTING.getKey() + + "] but do not exist" + ); + } + return map.entrySet() + .stream() + .filter(e -> allowlist.contains(e.getKey())) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + } } diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java new file mode 100644 index 0000000000000..b0c1e0fdbaa63 --- /dev/null +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.common.settings.Settings; +import org.opensearch.env.TestEnvironment; +import org.opensearch.ingest.Processor; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public class IngestCommonModulePluginTests extends OpenSearchTestCase { + + public void testAllowlist() throws IOException { + runAllowlistTest(List.of()); + runAllowlistTest(List.of("date")); + runAllowlistTest(List.of("set")); + runAllowlistTest(List.of("copy", "date")); + runAllowlistTest(List.of("date", "set", "copy")); + } + + private void runAllowlistTest(List allowlist) throws IOException { + final Settings settings = Settings.builder() + .putList(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), allowlist) + .build(); + try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) { + assertEquals(Set.copyOf(allowlist), plugin.getProcessors(createParameters(settings)).keySet()); + } + } + + public void testAllowlistNotSpecified() throws IOException { + final Settings.Builder builder = Settings.builder(); + builder.remove(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey()); + final Settings settings = builder.build(); + try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) { + final Set expected = Set.of( + "append", + "urldecode", + "sort", + "fail", + "trim", + "set", + "fingerprint", + "pipeline", + "json", + "join", + "kv", + "bytes", + "date", + "drop", + "community_id", + "lowercase", + "convert", + "copy", + "gsub", + "dot_expander", + "rename", + "remove_by_pattern", + "html_strip", + "remove", + "csv", + "grok", + "date_index_name", + "foreach", + "script", + "dissect", + "uppercase", + "split" + ); + assertEquals(expected, plugin.getProcessors(createParameters(settings)).keySet()); + } + } + + public void testAllowlistHasNonexistentProcessors() throws IOException { + final Settings settings = Settings.builder() + .putList(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), List.of("threeve")) + .build(); + try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) { + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> plugin.getProcessors(createParameters(settings)) + ); + assertTrue(e.getMessage(), e.getMessage().contains("threeve")); + } + } + + private static Processor.Parameters createParameters(Settings settings) { + return new Processor.Parameters( + TestEnvironment.newEnvironment(Settings.builder().put(settings).put("path.home", "").build()), + null, + null, + null, + () -> 0L, + (a, b) -> null, + null, + null, + $ -> {}, + null + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 2440a3c64e956..1c199df4d548e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -28,6 +28,7 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; @@ -35,6 +36,7 @@ import org.opensearch.common.Priority; import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsException; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.index.Index; @@ -65,10 +67,13 @@ import java.util.stream.StreamSupport; import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS; +import static org.opensearch.common.util.FeatureFlags.TIERED_REMOTE_INDEX; import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList; import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING; import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode; import static org.opensearch.test.NodeRoles.dataNode; +import static org.opensearch.test.NodeRoles.onlyRole; +import static org.opensearch.test.NodeRoles.onlyRoles; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; @@ -1009,6 +1014,26 @@ public void cleanup() throws Exception { ); } + public void testStartSearchNode() throws Exception { + // test start dedicated search node + internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE))); + // test start node without search role + internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.DATA_ROLE))); + // test start non-dedicated search node with TIERED_REMOTE_INDEX feature enabled + internalCluster().startNode( + Settings.builder() + .put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE))) + .put(TIERED_REMOTE_INDEX, true) + ); + // test start non-dedicated search node + assertThrows( + SettingsException.class, + () -> internalCluster().startNode( + Settings.builder().put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE))) + ) + ); + } + private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception { final Node node = internalCluster().getInstance(Node.class, nodeName); final ShardId shardId = new ShardId(index, 0); diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 690621c2e7bca..653f81830ed17 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -130,6 +130,10 @@ public static boolean isSearchNode(Settings settings) { return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE); } + public static boolean isDedicatedSearchNode(Settings settings) { + return getRolesFromSettings(settings).stream().allMatch(DiscoveryNodeRole.SEARCH_ROLE::equals); + } + private final String nodeName; private final String nodeId; private final String ephemeralId; diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java index 2ea7ea8dbee12..9ff6ddb1fb667 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java @@ -52,15 +52,15 @@ private static final int ceilingNextPowerOfTwo(int x) { private final Weigher weigher; public SegmentedCache(Builder builder) { - this.capacity = builder.capacity; final int segments = ceilingNextPowerOfTwo(builder.concurrencyLevel); this.segmentMask = segments - 1; this.table = newSegmentArray(segments); - this.perSegmentCapacity = (capacity + (segments - 1)) / segments; + this.perSegmentCapacity = (builder.capacity + (segments - 1)) / segments; this.weigher = builder.weigher; for (int i = 0; i < table.length; i++) { table[i] = new LRUCache<>(perSegmentCapacity, builder.listener, builder.weigher); } + this.capacity = perSegmentCapacity * segments; } @SuppressWarnings("unchecked") diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java index f93cb63ff1f0a..db77ec7628e76 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java @@ -81,7 +81,11 @@ public FsInfo stats(FsInfo previous) throws IOException { if (fileCache != null && dataLocations[i].fileCacheReservedSize != ByteSizeValue.ZERO) { paths[i].fileCacheReserved = adjustForHugeFilesystems(dataLocations[i].fileCacheReservedSize.getBytes()); paths[i].fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage().usage()); - paths[i].available -= (paths[i].fileCacheReserved - paths[i].fileCacheUtilized); + // fileCacheFree will be less than zero if the cache being over-subscribed + long fileCacheFree = paths[i].fileCacheReserved - paths[i].fileCacheUtilized; + if (fileCacheFree > 0) { + paths[i].available -= fileCacheFree; + } // occurs if reserved file cache space is occupied by other files, like local indices if (paths[i].available < 0) { paths[i].available = 0; @@ -215,4 +219,11 @@ public static FsInfo.Path getFSInfo(NodePath nodePath) throws IOException { return fsPath; } + public static long getTotalSize(NodePath nodePath) throws IOException { + return adjustForHugeFilesystems(nodePath.fileStore.getTotalSpace()); + } + + public static long getAvailableSize(NodePath nodePath) throws IOException { + return adjustForHugeFilesystems(nodePath.fileStore.getUsableSpace()); + } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a91dce4ece126..505c9264d62bb 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -38,6 +38,7 @@ import org.opensearch.Build; import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; +import org.opensearch.OpenSearchParseException; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.Version; import org.opensearch.action.ActionModule; @@ -108,6 +109,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsException; import org.opensearch.common.settings.SettingsModule; +import org.opensearch.common.unit.RatioValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.FeatureFlags; @@ -176,7 +178,6 @@ import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; import org.opensearch.monitor.fs.FsHealthService; -import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsProbe; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; @@ -372,9 +373,12 @@ public class Node implements Closeable { } }, Setting.Property.NodeScope); - public static final Setting NODE_SEARCH_CACHE_SIZE_SETTING = Setting.byteSizeSetting( + private static final String ZERO = "0"; + + public static final Setting NODE_SEARCH_CACHE_SIZE_SETTING = new Setting<>( "node.search.cache.size", - ByteSizeValue.ZERO, + s -> (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING) || DiscoveryNode.isDedicatedSearchNode(s)) ? "80%" : ZERO, + Node::validateFileCacheSize, Property.NodeScope ); @@ -2002,43 +2006,59 @@ DiscoveryNode getNode() { * Initializes the search cache with a defined capacity. * The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}. * If the user doesn't configure the cache size, it fails if the node is a data + search node. - * Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined. + * Else it configures the size to 80% of total capacity for a dedicated search node, if not explicitly defined. */ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException { boolean isWritableRemoteIndexEnabled = FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING); - if (DiscoveryNode.isSearchNode(settings) || isWritableRemoteIndexEnabled) { - NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath(); - long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes(); - FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(fileCacheNodePath)); - long availableCapacity = info.getAvailable().getBytes(); - - // Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set. - if (capacity == 0) { - // If node is not a dedicated search node without configuration, prevent cache initialization - if (!isWritableRemoteIndexEnabled - && DiscoveryNode.getRolesFromSettings(settings) - .stream() - .anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) { - throw new SettingsException( - "Unable to initialize the " - + DiscoveryNodeRole.SEARCH_ROLE.roleName() - + "-" - + DiscoveryNodeRole.DATA_ROLE.roleName() - + " node: Missing value for configuration " - + NODE_SEARCH_CACHE_SIZE_SETTING.getKey() - ); - } else { - capacity = 80 * availableCapacity / 100; - } + if (DiscoveryNode.isSearchNode(settings) == false && isWritableRemoteIndexEnabled == false) { + return; + } + + String capacityRaw = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings); + logger.info("cache size [{}]", capacityRaw); + if (capacityRaw.equals(ZERO)) { + throw new SettingsException( + "Unable to initialize the " + + DiscoveryNodeRole.SEARCH_ROLE.roleName() + + "-" + + DiscoveryNodeRole.DATA_ROLE.roleName() + + " node: Missing value for configuration " + + NODE_SEARCH_CACHE_SIZE_SETTING.getKey() + ); + } + + NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath(); + long totalSpace = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getTotalSize(fileCacheNodePath)); + long capacity = calculateFileCacheSize(capacityRaw, totalSpace); + if (capacity <= 0 || totalSpace <= capacity) { + throw new SettingsException("Cache size must be larger than zero and less than total capacity"); + } + + this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker); + fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(this.fileCache.capacity(), ByteSizeUnit.BYTES); + List fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath); + this.fileCache.restoreFromDirectory(fileCacheDataPaths); + } + + private static long calculateFileCacheSize(String capacityRaw, long totalSpace) { + try { + RatioValue ratioValue = RatioValue.parseRatioValue(capacityRaw); + return Math.round(totalSpace * ratioValue.getAsRatio()); + } catch (OpenSearchParseException e) { + try { + return ByteSizeValue.parseBytesSizeValue(capacityRaw, NODE_SEARCH_CACHE_SIZE_SETTING.getKey()).getBytes(); + } catch (OpenSearchParseException ex) { + ex.addSuppressed(e); + throw ex; } - capacity = Math.min(capacity, availableCapacity); - fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES); - this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker); - List fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath); - this.fileCache.restoreFromDirectory(fileCacheDataPaths); } } + private static String validateFileCacheSize(String capacityRaw) { + calculateFileCacheSize(capacityRaw, 0L); + return capacityRaw; + } + /** * Returns the {@link FileCache} instance for remote search node * Note: Visible for testing diff --git a/server/src/test/java/org/opensearch/env/NodeRepurposeCommandTests.java b/server/src/test/java/org/opensearch/env/NodeRepurposeCommandTests.java index 2a3525143c01f..d2d6fdc387dfe 100644 --- a/server/src/test/java/org/opensearch/env/NodeRepurposeCommandTests.java +++ b/server/src/test/java/org/opensearch/env/NodeRepurposeCommandTests.java @@ -95,7 +95,7 @@ public void createNodePaths() throws IOException { dataClusterManagerSettings = buildEnvSettings(Settings.EMPTY); Settings defaultSearchSettings = Settings.builder() .put(dataClusterManagerSettings) - .put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB)) + .put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB).toString()) .build(); searchNoDataNoClusterManagerSettings = onlyRole(dataClusterManagerSettings, DiscoveryNodeRole.SEARCH_ROLE); diff --git a/server/src/test/java/org/opensearch/node/NodeTests.java b/server/src/test/java/org/opensearch/node/NodeTests.java index f44cc352cd330..0093091f61a1c 100644 --- a/server/src/test/java/org/opensearch/node/NodeTests.java +++ b/server/src/test/java/org/opensearch/node/NodeTests.java @@ -380,7 +380,7 @@ public void testCreateWithFileCache() throws Exception { List> plugins = basePlugins(); ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); Settings searchRoleSettingsWithConfig = baseSettings().put(searchRoleSettings) - .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) .build(); Settings onlySearchRoleSettings = Settings.builder() .put(searchRoleSettingsWithConfig) diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index ca80c65e58522..ec88002317284 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -165,6 +165,7 @@ import static org.opensearch.test.NodeRoles.onlyRoles; import static org.opensearch.test.NodeRoles.removeRoles; import static org.opensearch.test.OpenSearchTestCase.assertBusy; +import static org.opensearch.test.OpenSearchTestCase.randomBoolean; import static org.opensearch.test.OpenSearchTestCase.randomFrom; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -216,7 +217,8 @@ public final class InternalTestCluster extends TestCluster { nodeAndClient.node.settings() ); - private static final ByteSizeValue DEFAULT_SEARCH_CACHE_SIZE = new ByteSizeValue(2, ByteSizeUnit.GB); + private static final String DEFAULT_SEARCH_CACHE_SIZE_BYTES = "2gb"; + private static final String DEFAULT_SEARCH_CACHE_SIZE_PERCENT = "5%"; public static final int DEFAULT_LOW_NUM_CLUSTER_MANAGER_NODES = 1; public static final int DEFAULT_HIGH_NUM_CLUSTER_MANAGER_NODES = 3; @@ -700,8 +702,10 @@ public synchronized void ensureAtLeastNumSearchAndDataNodes(int n) { logger.info("increasing cluster size from {} to {}", size, n); Set searchAndDataRoles = Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.SEARCH_ROLE); Settings settings = Settings.builder() - .put(Settings.EMPTY) - .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), DEFAULT_SEARCH_CACHE_SIZE) + .put( + Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), + randomBoolean() ? DEFAULT_SEARCH_CACHE_SIZE_PERCENT : DEFAULT_SEARCH_CACHE_SIZE_BYTES + ) .build(); startNodes(n - size, Settings.builder().put(onlyRoles(settings, searchAndDataRoles)).build()); validateClusterFormed();