From 63833bcf61ffb95bdd1196c5d0793e960ad00f9a Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 9 Jan 2024 16:39:15 -0800 Subject: [PATCH 01/22] Adds policy interface and took time policy impl Signed-off-by: Peter Alfonsi --- .../common/tier/DiskTierTookTimePolicy.java | 79 +++++++++++ .../tier/DiskTierTookTimePolicyTests.java | 130 ++++++++++++++++++ .../common/cache/CachePolicyInfoWrapper.java | 42 ++++++ .../common/cache/CacheTierPolicy.java | 21 +++ .../common/settings/ClusterSettings.java | 6 +- 5 files changed, 276 insertions(+), 2 deletions(-) create mode 100644 modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java create mode 100644 modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java create mode 100644 server/src/main/java/org/opensearch/common/cache/CachePolicyInfoWrapper.java create mode 100644 server/src/main/java/org/opensearch/common/cache/CacheTierPolicy.java diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java new file mode 100644 index 0000000000000..916538cc87733 --- /dev/null +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cache.common.tier; + +import org.opensearch.common.cache.CachePolicyInfoWrapper; +import org.opensearch.common.cache.CacheTierPolicy; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.bytes.BytesReference; + +import java.util.function.Function; + +/** + * A cache tier policy which accepts queries whose took time is greater than some threshold, + * which is specified as a dynamic cluster-level setting. The threshold should be set to approximately + * the time it takes to get a result from the cache tier. + * The policy expects to be able to read a CachePolicyInfoWrapper from the start of the BytesReference. + */ +public class DiskTierTookTimePolicy implements CacheTierPolicy { + public static final Setting DISK_TOOKTIME_THRESHOLD_SETTING = Setting.positiveTimeSetting( + "indices.requests.cache.disk.tooktime.threshold", + TimeValue.ZERO, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); // Set this to TimeValue.ZERO to let all data through + + private TimeValue threshold; + private final Function getPolicyInfoFn; + + public DiskTierTookTimePolicy( + Settings settings, + ClusterSettings clusterSettings, + Function getPolicyInfoFn + ) { + this.threshold = DISK_TOOKTIME_THRESHOLD_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(DISK_TOOKTIME_THRESHOLD_SETTING, this::setThreshold); + this.getPolicyInfoFn = getPolicyInfoFn; + } + + protected void setThreshold(TimeValue threshold) { // protected so that we can manually set value in unit test + this.threshold = threshold; + } + + @Override + public boolean checkData(BytesReference data) { + if (threshold.equals(TimeValue.ZERO)) { + return true; + } + Long tookTimeNanos; + try { + tookTimeNanos = getPolicyInfoFn.apply(data).getTookTimeNanos(); + } catch (Exception e) { + // If we can't retrieve the took time for whatever reason, admit the data to be safe + return true; + } + if (tookTimeNanos == null) { + // Received a null took time -> this QSR is from an old version which does not have took time, we should accept it + return true; + } + TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos); + if (tookTime.compareTo(threshold) < 0) { // negative -> tookTime is shorter than threshold + return false; + } + return true; + } +} diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java new file mode 100644 index 0000000000000..6c07be9aa25bc --- /dev/null +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cache.common.tier; + +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.opensearch.action.OriginalIndices; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.common.UUIDs; +import org.opensearch.common.cache.CachePolicyInfoWrapper; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.search.TopDocsAndMaxScore; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.search.DocValueFormat; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.AliasFilter; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.function.Function; + +public class DiskTierTookTimePolicyTests extends OpenSearchTestCase { + private final Function transformationFunction = (data) -> { + try { + return getPolicyInfo(data); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + + private CachePolicyInfoWrapper getPolicyInfo(BytesReference data) throws IOException { + return new CachePolicyInfoWrapper(data.streamInput()); + } + + private DiskTierTookTimePolicy getTookTimePolicy() { + // dummy settings + Settings dummySettings = Settings.EMPTY; + ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + return new DiskTierTookTimePolicy(dummySettings, dummyClusterSettings, transformationFunction); + } + + public void testQSRSetupFunction() throws IOException { + Long ttn = 100000000000L; + QuerySearchResult qsr = getQSR(ttn); + assertEquals(ttn, qsr.getTookTimeNanos()); + } + + public void testTookTimePolicy() throws Exception { + DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy(); + + // manually set threshold for test + double threshMillis = 10; + long shortMillis = (long) (0.9 * threshMillis); + long longMillis = (long) (1.5 * threshMillis); + tookTimePolicy.setThreshold(new TimeValue((long) threshMillis)); + BytesReference shortTime = getValidPolicyInput(getQSR(shortMillis * 1000000)); + BytesReference longTime = getValidPolicyInput(getQSR(longMillis * 1000000)); + + boolean shortResult = tookTimePolicy.checkData(shortTime); + assertFalse(shortResult); + boolean longResult = tookTimePolicy.checkData(longTime); + assertTrue(longResult); + + DiskTierTookTimePolicy disabledPolicy = getTookTimePolicy(); + disabledPolicy.setThreshold(TimeValue.ZERO); + shortResult = disabledPolicy.checkData(shortTime); + assertTrue(shortResult); + longResult = disabledPolicy.checkData(longTime); + assertTrue(longResult); + } + + public static QuerySearchResult getQSR(long tookTimeNanos) { + // package-private, also used by IndicesRequestCacheTests.java + // setup from QuerySearchResultTests.java + ShardId shardId = new ShardId("index", "uuid", randomInt()); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); + ShardSearchRequest shardSearchRequest = new ShardSearchRequest( + OriginalIndicesTests.randomOriginalIndices(), + searchRequest, + shardId, + 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, + randomNonNegativeLong(), + null, + new String[0] + ); + ShardSearchContextId id = new ShardSearchContextId(UUIDs.base64UUID(), randomLong()); + QuerySearchResult result = new QuerySearchResult( + id, + new SearchShardTarget("node", shardId, null, OriginalIndices.NONE), + shardSearchRequest + ); + TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]); + result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]); + + result.setTookTimeNanos(tookTimeNanos); + return result; + } + + private BytesReference getValidPolicyInput(QuerySearchResult qsr) throws IOException { + // When it's used in the cache, the policy will receive BytesReferences which have a CachePolicyInfoWrapper + // at the beginning of them, followed by the actual QSR. + CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(qsr.getTookTimeNanos()); + BytesStreamOutput out = new BytesStreamOutput(); + policyInfo.writeTo(out); + qsr.writeTo(out); + return out.bytes(); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/CachePolicyInfoWrapper.java b/server/src/main/java/org/opensearch/common/cache/CachePolicyInfoWrapper.java new file mode 100644 index 0000000000000..ada1e7442747a --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/CachePolicyInfoWrapper.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * A class containing information needed for all CacheTierPolicy objects to decide whether to admit + * a given BytesReference. This spares us from having to create an entire short-lived QuerySearchResult object + * just to read a few values. + */ +public class CachePolicyInfoWrapper implements Writeable { + private final Long tookTimeNanos; + + public CachePolicyInfoWrapper(Long tookTimeNanos) { + this.tookTimeNanos = tookTimeNanos; + // Add more values here as they are needed for future cache tier policies + } + + public CachePolicyInfoWrapper(StreamInput in) throws IOException { + this.tookTimeNanos = in.readOptionalLong(); + } + + public Long getTookTimeNanos() { + return tookTimeNanos; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalLong(tookTimeNanos); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/CacheTierPolicy.java b/server/src/main/java/org/opensearch/common/cache/CacheTierPolicy.java new file mode 100644 index 0000000000000..ec39c8d74792a --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/CacheTierPolicy.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache; + +/** + * An interface for policies that inspect data of type T to decide whether they are admitted into a cache tier. + */ +public interface CacheTierPolicy { + /** + * Determines whether this policy allows the data into its cache tier. + * @param data The data to check + * @return true if accepted, otherwise false + */ + boolean checkData(T data); +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 896a234c115b6..b12a882e58195 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -81,6 +81,7 @@ import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.cache.tier.DiskTierTookTimePolicy; import org.opensearch.common.logging.Loggers; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.network.NetworkService; @@ -706,10 +707,11 @@ public void apply(Settings value, Settings current, Settings previous) { CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, - // Concurrent segment search settings SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, - SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING + SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING, + // Tiered caching settings + DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING ) ) ); From 6125928cc78d699841941d46598383d37c477331 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 10 Jan 2024 09:50:34 -0800 Subject: [PATCH 02/22] Changes IndicesService to write a CachePolicyInfoWrapper before the QSR Signed-off-by: Peter Alfonsi --- .../main/java/org/opensearch/indices/IndicesService.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index c83f2a4c5cd5d..5b8ed26f50929 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -62,6 +62,7 @@ import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.cache.tier.CachePolicyInfoWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; @@ -1698,6 +1699,10 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q boolean[] loadedFromCache = new boolean[] { true }; BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), out -> { queryPhase.execute(context); + CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(context.queryResult().getTookTimeNanos()); + policyInfo.writeTo(out); + // Write relevant info for cache tier policies before the whole QuerySearchResult, so we don't have to read + // the whole QSR into memory when we decide whether to allow it into a particular cache tier based on took time/other info context.queryResult().writeToNoId(out); loadedFromCache[0] = false; }); @@ -1706,6 +1711,7 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q // restore the cached query result into the context final QuerySearchResult result = context.queryResult(); StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry); + CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(in); // This wrapper is not needed outside the cache result.readFromWithId(context.id(), in); result.setSearchShardTarget(context.shardTarget()); } else if (context.queryResult().searchTimedOut()) { From 484e9586e47a267dbda9bf1ed8110c4ed84ae9b1 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 10 Jan 2024 11:41:09 -0800 Subject: [PATCH 03/22] Moved took time logic from QSR to IndicesService Signed-off-by: Peter Alfonsi --- .../tier/DiskTierTookTimePolicyTests.java | 17 +++++------------ .../org/opensearch/indices/IndicesService.java | 3 ++- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java index 6c07be9aa25bc..c9fa059d9b024 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java @@ -59,12 +59,6 @@ private DiskTierTookTimePolicy getTookTimePolicy() { return new DiskTierTookTimePolicy(dummySettings, dummyClusterSettings, transformationFunction); } - public void testQSRSetupFunction() throws IOException { - Long ttn = 100000000000L; - QuerySearchResult qsr = getQSR(ttn); - assertEquals(ttn, qsr.getTookTimeNanos()); - } - public void testTookTimePolicy() throws Exception { DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy(); @@ -73,8 +67,8 @@ public void testTookTimePolicy() throws Exception { long shortMillis = (long) (0.9 * threshMillis); long longMillis = (long) (1.5 * threshMillis); tookTimePolicy.setThreshold(new TimeValue((long) threshMillis)); - BytesReference shortTime = getValidPolicyInput(getQSR(shortMillis * 1000000)); - BytesReference longTime = getValidPolicyInput(getQSR(longMillis * 1000000)); + BytesReference shortTime = getValidPolicyInput(getQSR(), shortMillis * 1000000); + BytesReference longTime = getValidPolicyInput(getQSR(), longMillis * 1000000); boolean shortResult = tookTimePolicy.checkData(shortTime); assertFalse(shortResult); @@ -89,7 +83,7 @@ public void testTookTimePolicy() throws Exception { assertTrue(longResult); } - public static QuerySearchResult getQSR(long tookTimeNanos) { + public static QuerySearchResult getQSR() { // package-private, also used by IndicesRequestCacheTests.java // setup from QuerySearchResultTests.java ShardId shardId = new ShardId("index", "uuid", randomInt()); @@ -114,14 +108,13 @@ public static QuerySearchResult getQSR(long tookTimeNanos) { TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]); result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]); - result.setTookTimeNanos(tookTimeNanos); return result; } - private BytesReference getValidPolicyInput(QuerySearchResult qsr) throws IOException { + private BytesReference getValidPolicyInput(QuerySearchResult qsr, long tookTimeNanos) throws IOException { // When it's used in the cache, the policy will receive BytesReferences which have a CachePolicyInfoWrapper // at the beginning of them, followed by the actual QSR. - CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(qsr.getTookTimeNanos()); + CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(tookTimeNanos); BytesStreamOutput out = new BytesStreamOutput(); policyInfo.writeTo(out); qsr.writeTo(out); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 5b8ed26f50929..ec82eb32ec13d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -1698,8 +1698,9 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q boolean[] loadedFromCache = new boolean[] { true }; BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), out -> { + long beforeQueryPhase = System.nanoTime(); queryPhase.execute(context); - CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(context.queryResult().getTookTimeNanos()); + CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(System.nanoTime() - beforeQueryPhase); policyInfo.writeTo(out); // Write relevant info for cache tier policies before the whole QuerySearchResult, so we don't have to read // the whole QSR into memory when we decide whether to allow it into a particular cache tier based on took time/other info From 38c786c802aa8e9dc4f57d96ba2790f79a6d737c Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 10 Jan 2024 17:57:55 -0800 Subject: [PATCH 04/22] spotlessApply Signed-off-by: Peter Alfonsi --- .../tier/TieredSpilloverCacheTests.java | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 7c9569f5defe2..8fd98695ecdff 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -8,6 +8,7 @@ package org.opensearch.cache.common.tier; +import org.opensearch.common.cache.CacheTierPolicy; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.LoadAwareCacheLoader; @@ -22,6 +23,7 @@ import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -829,6 +831,70 @@ public void onRemoval(RemovalNotification notification) { } } + public void testDiskTierPolicies() throws Exception { + // For policy function, allow if what it receives starts with "a" and string is even length + ArrayList> policies = new ArrayList<>(); + policies.add(new AllowFirstLetterA()); + policies.add(new AllowEvenLengths()); + + int onHeapCacheSize = 0; + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + 100, + removalListener, + 0, + policies + ); + + Map keyValuePairs = new HashMap<>(); + Map expectedOutputs = new HashMap<>(); + keyValuePairs.put("key1", "abcd"); + expectedOutputs.put("key1", true); + keyValuePairs.put("key2", "abcde"); + expectedOutputs.put("key2", false); + keyValuePairs.put("key3", "bbc"); + expectedOutputs.put("key3", false); + keyValuePairs.put("key4", "ab"); + expectedOutputs.put("key4", true); + keyValuePairs.put("key5", ""); + expectedOutputs.put("key5", false); + + LoadAwareCacheLoader loader = getLoadAwareCacheLoaderWithKeyValueMap(keyValuePairs); + + for (String key : keyValuePairs.keySet()) { + Boolean expectedOutput = expectedOutputs.get(key); + String value = tieredSpilloverCache.computeIfAbsent(key, loader); + assertEquals(keyValuePairs.get(key), value); + String result = tieredSpilloverCache.get(key); + if (expectedOutput) { + // Should retrieve from disk tier if it was accepted + assertEquals(keyValuePairs.get(key), result); + } else { + // Should miss as heap tier size = 0 and the policy rejected it + assertNull(result); + } + } + } + + private static class AllowFirstLetterA implements CacheTierPolicy { + @Override + public boolean checkData(String data) { + try { + return (data.charAt(0) == 'a'); + } catch (StringIndexOutOfBoundsException e) { + return false; + } + } + } + + private static class AllowEvenLengths implements CacheTierPolicy { + @Override + public boolean checkData(String data) { + return data.length() % 2 == 0; + } + } + private LoadAwareCacheLoader getLoadAwareCacheLoader() { return new LoadAwareCacheLoader<>() { boolean isLoaded = false; From a07a20495319a8898db4a8d9d4ffeae035fbe195 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 17 Jan 2024 15:42:46 -0800 Subject: [PATCH 05/22] Addressed ansjcy's comments Signed-off-by: Peter Alfonsi --- .idea/vcs.xml | 32 +++++++++---------- .../common/tier/DiskTierTookTimePolicy.java | 17 ++++++---- .../tier/DiskTierTookTimePolicyTests.java | 32 +++++++++++++++++++ 3 files changed, 59 insertions(+), 22 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 48557884a8893..c668657daf908 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,20 +1,20 @@ - - - + + + - + - + \ No newline at end of file diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java index 916538cc87733..fbcd3644a610c 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java @@ -56,18 +56,23 @@ protected void setThreshold(TimeValue threshold) { // protected so that we can m @Override public boolean checkData(BytesReference data) { - if (threshold.equals(TimeValue.ZERO)) { - return true; - } Long tookTimeNanos; try { tookTimeNanos = getPolicyInfoFn.apply(data).getTookTimeNanos(); } catch (Exception e) { - // If we can't retrieve the took time for whatever reason, admit the data to be safe - return true; + // If we can't read a CachePolicyInfoWrapper from the BytesReference, reject the data + return false; } + if (tookTimeNanos == null) { - // Received a null took time -> this QSR is from an old version which does not have took time, we should accept it + // If the wrapper contains null took time, reject the data + // This can happen if no CachePolicyInfoWrapper was written to the BytesReference, as the wrapper's constructor + // reads an optional long, which will end up as null in this case. This is why we should reject it. + return false; + } + + if (threshold.equals(TimeValue.ZERO)) { + // If the policy is set to zero, admit any well-formed data return true; } TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos); diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java index c9fa059d9b024..d30b160c092f2 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java @@ -83,6 +83,38 @@ public void testTookTimePolicy() throws Exception { assertTrue(longResult); } + public void testMissingWrapper() throws Exception { + DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy(); + tookTimePolicy.setThreshold(TimeValue.ZERO); + QuerySearchResult qsr = getQSR(); + BytesStreamOutput out = new BytesStreamOutput(); + qsr.writeTo(out); + BytesReference missingWrapper = out.bytes(); + boolean allowedMissingWrapper = tookTimePolicy.checkData(missingWrapper); + assertFalse(allowedMissingWrapper); + } + + public void testNullTookTime() throws Exception { + // Null took time should always be rejected (because it might be the result of a + // BytesReference without a CachePolicyInfoWrapper in front of it) + + DiskTierTookTimePolicy zeroThreshold = getTookTimePolicy(); + zeroThreshold.setThreshold(TimeValue.ZERO); + DiskTierTookTimePolicy nonZeroThreshold = getTookTimePolicy(); + nonZeroThreshold.setThreshold(new TimeValue(10L)); + + Long nullTookTime = null; + CachePolicyInfoWrapper nullWrapper = new CachePolicyInfoWrapper(nullTookTime); + BytesStreamOutput out = new BytesStreamOutput(); + nullWrapper.writeTo(out); + QuerySearchResult qsr = getQSR(); + qsr.writeTo(out); + BytesReference data = out.bytes(); + + assertFalse(zeroThreshold.checkData(data)); + assertFalse(nonZeroThreshold.checkData(data)); + } + public static QuerySearchResult getQSR() { // package-private, also used by IndicesRequestCacheTests.java // setup from QuerySearchResultTests.java From e046d8841f8d1e6286aa48c849a47370ab3c1e9e Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 1 Mar 2024 14:32:03 -0800 Subject: [PATCH 06/22] Partial rebase on most recent changes Signed-off-by: Peter Alfonsi --- .../common/tier/DiskTierTookTimePolicy.java | 8 ++-- .../common/tier/TieredSpilloverCache.java | 34 +++++++++++++- .../tier/TieredSpilloverCachePlugin.java | 3 ++ .../tier/TieredSpilloverCacheSettings.java | 7 +++ .../tier/DiskTierTookTimePolicyTests.java | 5 +-- .../tier/TieredSpilloverCacheTests.java | 44 +++++++++++++++++-- .../common/settings/ClusterSettings.java | 5 +-- .../opensearch/indices/IndicesService.java | 2 +- 8 files changed, 90 insertions(+), 18 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java index fbcd3644a610c..55aee6825b992 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java @@ -30,23 +30,21 @@ * The policy expects to be able to read a CachePolicyInfoWrapper from the start of the BytesReference. */ public class DiskTierTookTimePolicy implements CacheTierPolicy { - public static final Setting DISK_TOOKTIME_THRESHOLD_SETTING = Setting.positiveTimeSetting( + /*public static final Setting DISK_TOOKTIME_THRESHOLD_SETTING = Setting.positiveTimeSetting( "indices.requests.cache.disk.tooktime.threshold", TimeValue.ZERO, Setting.Property.Dynamic, Setting.Property.NodeScope - ); // Set this to TimeValue.ZERO to let all data through + );*/ // Set this to TimeValue.ZERO to let all data through private TimeValue threshold; private final Function getPolicyInfoFn; public DiskTierTookTimePolicy( Settings settings, - ClusterSettings clusterSettings, Function getPolicyInfoFn ) { - this.threshold = DISK_TOOKTIME_THRESHOLD_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer(DISK_TOOKTIME_THRESHOLD_SETTING, this::setThreshold); + this.threshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD.get(settings); this.getPolicyInfoFn = getPolicyInfoFn; } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 7b64a7e93fe27..99791bc08ca1a 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -9,6 +9,7 @@ package org.opensearch.cache.common.tier; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.cache.CacheTierPolicy; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.LoadAwareCacheLoader; @@ -21,6 +22,7 @@ import org.opensearch.common.util.iterable.Iterables; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -52,6 +54,7 @@ public class TieredSpilloverCache implements ICache { * Maintains caching tiers in ascending order of cache latency. */ private final List> cacheList; + private final List> policies; TieredSpilloverCache(Builder builder) { Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null"); @@ -63,7 +66,9 @@ public class TieredSpilloverCache implements ICache { @Override public void onRemoval(RemovalNotification notification) { try (ReleasableLock ignore = writeLock.acquire()) { - diskCache.put(notification.getKey(), notification.getValue()); + if (checkPolicies(notification.getValue())) { + diskCache.put(notification.getKey(), notification.getValue()); + } } removalListener.onRemoval(notification); } @@ -79,6 +84,12 @@ public void onRemoval(RemovalNotification notification) { ); this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories); this.cacheList = Arrays.asList(onHeapCache, diskCache); + + List> builderPolicies = builder.policies; + if (builderPolicies == null) { + builderPolicies = new ArrayList<>(); + } + this.policies = builderPolicies; } // Package private for testing @@ -193,6 +204,15 @@ private Function getValueFromTieredCache() { }; } + boolean checkPolicies(V value) { + for (CacheTierPolicy policy : policies) { + if (!policy.checkData(value)) { + return false; + } + } + return true; + } + /** * Factory to create TieredSpilloverCache objects. */ @@ -237,6 +257,7 @@ public ICache create(CacheConfig config, CacheType cacheType, .setRemovalListener(config.getRemovalListener()) .setCacheConfig(config) .setCacheType(cacheType) + //.setPolicy(new DiskTierTookTimePolicy(settings)) .build(); } @@ -258,6 +279,7 @@ public static class Builder { private CacheConfig cacheConfig; private CacheType cacheType; private Map cacheFactories; + private final ArrayList> policies = new ArrayList<>(); /** * Default constructor @@ -324,6 +346,16 @@ public Builder setCacheFactories(Map cacheFactorie return this; } + public Builder setPolicy(CacheTierPolicy policy) { + this.policies.add(policy); + return this; + } + + public Builder setPolicies(List> policies) { + this.policies.addAll(policies); + return this; + } + /** * Build tiered spillover cache. * @return TieredSpilloverCache diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index 6b0620c5fbede..132c13d76328c 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -51,6 +51,9 @@ public List> getSettings() { settingList.add( TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); + settingList.add( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + ); } return settingList; } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index 50b4177f599d1..01e882ae1d30b 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -9,7 +9,9 @@ package org.opensearch.cache.common.tier; import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; +import static org.opensearch.common.settings.Setting.Property.Dynamic; import static org.opensearch.common.settings.Setting.Property.NodeScope; /** @@ -36,6 +38,11 @@ public class TieredSpilloverCacheSettings { (key) -> Setting.simpleString(key, "", NodeScope) ); + public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD = Setting.suffixKeySetting( + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.took_time.threshold", + (key) -> Setting.timeSetting(key, TimeValue.ZERO, NodeScope, Dynamic) + ); + /** * Default constructor */ diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java index d30b160c092f2..a7e261df7bb3d 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java @@ -55,8 +55,7 @@ private CachePolicyInfoWrapper getPolicyInfo(BytesReference data) throws IOExcep private DiskTierTookTimePolicy getTookTimePolicy() { // dummy settings Settings dummySettings = Settings.EMPTY; - ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - return new DiskTierTookTimePolicy(dummySettings, dummyClusterSettings, transformationFunction); + return new DiskTierTookTimePolicy(dummySettings, transformationFunction); } public void testTookTimePolicy() throws Exception { @@ -121,7 +120,7 @@ public static QuerySearchResult getQSR() { ShardId shardId = new ShardId("index", "uuid", randomInt()); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); ShardSearchRequest shardSearchRequest = new ShardSearchRequest( - OriginalIndicesTests.randomOriginalIndices(), + OriginalIndices.NONE, searchRequest, shardId, 1, diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 8fd98695ecdff..18e0f41cc5158 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -843,6 +843,14 @@ public void testDiskTierPolicies() throws Exception { onHeapCacheSize, 100, removalListener, + Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * 50 + "b" + ) + .build(), 0, policies ); @@ -860,7 +868,19 @@ public void testDiskTierPolicies() throws Exception { keyValuePairs.put("key5", ""); expectedOutputs.put("key5", false); - LoadAwareCacheLoader loader = getLoadAwareCacheLoaderWithKeyValueMap(keyValuePairs); + LoadAwareCacheLoader loader = new LoadAwareCacheLoader() { + boolean isLoaded = false; + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(String key) throws Exception { + isLoaded = true; + return keyValuePairs.get(key); + } + }; for (String key : keyValuePairs.keySet()) { Boolean expectedOutput = expectedOutputs.get(key); @@ -918,6 +938,18 @@ private TieredSpilloverCache intializeTieredSpilloverCache( RemovalListener removalListener, Settings settings, long diskDeliberateDelay + + ) { + return intializeTieredSpilloverCache(keyValueSize, diskCacheSize, removalListener, settings, diskDeliberateDelay, null); + } + + private TieredSpilloverCache intializeTieredSpilloverCache( + int keyValueSize, + int diskCacheSize, + RemovalListener removalListener, + Settings settings, + long diskDeliberateDelay, + List> policies ) { ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) @@ -929,12 +961,16 @@ private TieredSpilloverCache intializeTieredSpilloverCache( ICache.Factory mockDiskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); - return new TieredSpilloverCache.Builder().setCacheType(CacheType.INDICES_REQUEST_CACHE) + TieredSpilloverCache.Builder builder = new TieredSpilloverCache.Builder() + .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setRemovalListener(removalListener) .setOnHeapCacheFactory(onHeapCacheFactory) .setDiskCacheFactory(mockDiskCacheFactory) - .setCacheConfig(cacheConfig) - .build(); + .setCacheConfig(cacheConfig); + if (policies != null) { + builder.setPolicies(policies); + } + return builder.build(); } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index b12a882e58195..0baa09d7b63d1 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -81,7 +81,6 @@ import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.PublicApi; -import org.opensearch.common.cache.tier.DiskTierTookTimePolicy; import org.opensearch.common.logging.Loggers; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.network.NetworkService; @@ -709,9 +708,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, // Concurrent segment search settings SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, - SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING, - // Tiered caching settings - DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING + SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index ec82eb32ec13d..630ce068ba782 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -62,7 +62,7 @@ import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; -import org.opensearch.common.cache.tier.CachePolicyInfoWrapper; +import org.opensearch.common.cache.CachePolicyInfoWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; From de0dec3233453b75c0d7999a25b6a8cc9ca37813 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 5 Mar 2024 17:49:28 -0800 Subject: [PATCH 07/22] Integrated policies with new TSC changes Signed-off-by: Peter Alfonsi --- .../common/tier/TieredSpilloverCache.java | 16 +- .../tier/TieredSpilloverCachePlugin.java | 4 +- .../tier/TieredSpilloverCacheTests.java | 146 +++++++++++++++++- .../{ => policy}/CachePolicyInfoWrapper.java | 2 +- .../cache/{ => policy}/CacheTierPolicy.java | 2 +- .../common/cache/policy/TookTimePolicy.java | 37 ++--- .../common/cache/policy/package-info.java | 9 ++ .../cache/store/config/CacheConfig.java | 16 ++ .../opensearch/indices/IndicesService.java | 2 +- .../cache/policy/TookTimePolicyTests.java | 33 ++-- 10 files changed, 209 insertions(+), 58 deletions(-) rename server/src/main/java/org/opensearch/common/cache/{ => policy}/CachePolicyInfoWrapper.java (96%) rename server/src/main/java/org/opensearch/common/cache/{ => policy}/CacheTierPolicy.java (92%) rename modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java => server/src/main/java/org/opensearch/common/cache/policy/TookTimePolicy.java (55%) create mode 100644 server/src/main/java/org/opensearch/common/cache/policy/package-info.java rename modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java => server/src/test/java/org/opensearch/common/cache/policy/TookTimePolicyTests.java (84%) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 99791bc08ca1a..d9eb16e8ab97e 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -9,15 +9,18 @@ package org.opensearch.cache.common.tier; import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.common.cache.CacheTierPolicy; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.policy.CachePolicyInfoWrapper; +import org.opensearch.common.cache.policy.CacheTierPolicy; +import org.opensearch.common.cache.policy.TookTimePolicy; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.iterable.Iterables; @@ -252,12 +255,21 @@ public ICache create(CacheConfig config, CacheType cacheType, ); } ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName); + + TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD + .getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + .get(settings); + Function policyInfoWrapperFunction = Objects.requireNonNull( + config.getPolicyInfoWrapperFunction(), + "Policy info wrapper fn can't be null" + ); + return new Builder().setDiskCacheFactory(diskCacheFactory) .setOnHeapCacheFactory(onHeapCacheFactory) .setRemovalListener(config.getRemovalListener()) .setCacheConfig(config) .setCacheType(cacheType) - //.setPolicy(new DiskTierTookTimePolicy(settings)) + .setPolicy(new TookTimePolicy(diskPolicyThreshold, policyInfoWrapperFunction)) .build(); } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index 132c13d76328c..90c71fc01d2ce 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -52,7 +52,9 @@ public List> getSettings() { TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); settingList.add( - TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD.getConcreteSettingForNamespace( + cacheType.getSettingPrefix() + ) ); } return settingList; diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 18e0f41cc5158..43519233b2c10 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -8,18 +8,20 @@ package org.opensearch.cache.common.tier; -import org.opensearch.common.cache.CacheTierPolicy; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.policy.CachePolicyInfoWrapper; +import org.opensearch.common.cache.policy.CacheTierPolicy; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; @@ -33,6 +35,7 @@ import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; @@ -123,6 +126,12 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) .setSettings(settings) + .setPolicyInfoWrapperFunction(new Function() { + @Override + public CachePolicyInfoWrapper apply(String s) { + return new CachePolicyInfoWrapper(10_000L); + } + }) // Values will always appear to have taken 10_000 ns = 10 ms to compute .build(), CacheType.INDICES_REQUEST_CACHE, Map.of( @@ -837,10 +846,11 @@ public void testDiskTierPolicies() throws Exception { policies.add(new AllowFirstLetterA()); policies.add(new AllowEvenLengths()); + int keyValueSize = 50; int onHeapCacheSize = 0; MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( - onHeapCacheSize, + keyValueSize, 100, removalListener, Settings.builder() @@ -870,6 +880,7 @@ public void testDiskTierPolicies() throws Exception { LoadAwareCacheLoader loader = new LoadAwareCacheLoader() { boolean isLoaded = false; + @Override public boolean isLoaded() { return isLoaded; @@ -897,6 +908,109 @@ public String load(String key) throws Exception { } } + public void testTookTimePolicyFromFactory() throws Exception { + // Mock took time by passing this map to the policy info wrapper fn + // The policy inspects values, not keys, so this is a map from values -> took time + Map tookTimeMap = new HashMap<>(); + tookTimeMap.put("a", 10_000_000L); + tookTimeMap.put("b", 0L); + tookTimeMap.put("c", 99_999_999L); + tookTimeMap.put("d", null); + tookTimeMap.put("e", -1L); + tookTimeMap.put("f", 8_888_888L); + long timeValueThresholdNanos = 10_000_000L; + + Map keyValueMap = Map.of("A", "a", "B", "b", "C", "c", "D", "d", "E", "e", "F", "f"); + + // Most of setup duplicated from testComputeIfAbsentWithFactoryBasedCacheCreation() + int onHeapCacheSize = randomIntBetween(tookTimeMap.size() + 1, tookTimeMap.size() + 30); + int diskCacheSize = tookTimeMap.size(); + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + + // Set the desired settings needed to create a TieredSpilloverCache object with INDICES_REQUEST_CACHE cacheType. + Settings settings = Settings.builder() + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + MockOnDiskCache.MockDiskCacheFactory.NAME + ) + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + new TimeValue(timeValueThresholdNanos / 1_000_000) + ) + .build(); + + ICache tieredSpilloverICache = new TieredSpilloverCache.TieredSpilloverCacheFactory().create( + new CacheConfig.Builder().setKeyType(String.class) + .setKeyType(String.class) + .setWeigher((k, v) -> keyValueSize) + .setRemovalListener(removalListener) + .setSettings(settings) + .setPolicyInfoWrapperFunction(new Function() { + @Override + public CachePolicyInfoWrapper apply(String s) { + return new CachePolicyInfoWrapper(tookTimeMap.get(s)); + } + }) + .build(), + CacheType.INDICES_REQUEST_CACHE, + Map.of( + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), + MockOnDiskCache.MockDiskCacheFactory.NAME, + new MockOnDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + ) + ); + + TieredSpilloverCache tieredSpilloverCache = (TieredSpilloverCache) tieredSpilloverICache; + + // First add all our values to the on heap cache + for (String key : tookTimeMap.keySet()) { + tieredSpilloverCache.computeIfAbsent(key, getLoadAwareCacheLoader(keyValueMap)); + } + assertEquals(tookTimeMap.size(), tieredSpilloverCache.count()); + + // Ensure all these keys get evicted from the on heap tier by adding > heap tier size worth of random keys + for (int i = 0; i < onHeapCacheSize; i++) { + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), getLoadAwareCacheLoader(keyValueMap)); + } + ICache onHeapCache = tieredSpilloverCache.getOnHeapCache(); + for (String key : tookTimeMap.keySet()) { + assertNull(onHeapCache.get(key)); + } + + // Now the original keys should be in the disk tier if the policy allows them, or misses if not + for (String key : tookTimeMap.keySet()) { + String computedValue = tieredSpilloverCache.get(key); + String mapValue = keyValueMap.get(key); + Long tookTime = tookTimeMap.get(mapValue); + if (tookTime != null && tookTime > timeValueThresholdNanos) { + // expect a hit + assertNotNull(computedValue); + } else { + // expect a miss + assertNull(computedValue); + } + } + } + private static class AllowFirstLetterA implements CacheTierPolicy { @Override public boolean checkData(String data) { @@ -914,7 +1028,7 @@ public boolean checkData(String data) { return data.length() % 2 == 0; } } - + private LoadAwareCacheLoader getLoadAwareCacheLoader() { return new LoadAwareCacheLoader<>() { boolean isLoaded = false; @@ -932,6 +1046,27 @@ public boolean isLoaded() { }; } + private LoadAwareCacheLoader getLoadAwareCacheLoader(Map keyValueMap) { + return new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public String load(String key) { + isLoaded = true; + String mapValue = keyValueMap.get(key); + if (mapValue == null) { + mapValue = UUID.randomUUID().toString(); + } + return mapValue; + } + + @Override + public boolean isLoaded() { + return isLoaded; + } + }; + } + private TieredSpilloverCache intializeTieredSpilloverCache( int keyValueSize, int diskCacheSize, @@ -961,8 +1096,9 @@ private TieredSpilloverCache intializeTieredSpilloverCache( ICache.Factory mockDiskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); - TieredSpilloverCache.Builder builder = new TieredSpilloverCache.Builder() - .setCacheType(CacheType.INDICES_REQUEST_CACHE) + TieredSpilloverCache.Builder builder = new TieredSpilloverCache.Builder().setCacheType( + CacheType.INDICES_REQUEST_CACHE + ) .setRemovalListener(removalListener) .setOnHeapCacheFactory(onHeapCacheFactory) .setDiskCacheFactory(mockDiskCacheFactory) diff --git a/server/src/main/java/org/opensearch/common/cache/CachePolicyInfoWrapper.java b/server/src/main/java/org/opensearch/common/cache/policy/CachePolicyInfoWrapper.java similarity index 96% rename from server/src/main/java/org/opensearch/common/cache/CachePolicyInfoWrapper.java rename to server/src/main/java/org/opensearch/common/cache/policy/CachePolicyInfoWrapper.java index ada1e7442747a..99427e4344feb 100644 --- a/server/src/main/java/org/opensearch/common/cache/CachePolicyInfoWrapper.java +++ b/server/src/main/java/org/opensearch/common/cache/policy/CachePolicyInfoWrapper.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.common.cache; +package org.opensearch.common.cache.policy; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; diff --git a/server/src/main/java/org/opensearch/common/cache/CacheTierPolicy.java b/server/src/main/java/org/opensearch/common/cache/policy/CacheTierPolicy.java similarity index 92% rename from server/src/main/java/org/opensearch/common/cache/CacheTierPolicy.java rename to server/src/main/java/org/opensearch/common/cache/policy/CacheTierPolicy.java index ec39c8d74792a..a3d0c42b9a739 100644 --- a/server/src/main/java/org/opensearch/common/cache/CacheTierPolicy.java +++ b/server/src/main/java/org/opensearch/common/cache/policy/CacheTierPolicy.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.common.cache; +package org.opensearch.common.cache.policy; /** * An interface for policies that inspect data of type T to decide whether they are admitted into a cache tier. diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java b/server/src/main/java/org/opensearch/common/cache/policy/TookTimePolicy.java similarity index 55% rename from modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java rename to server/src/main/java/org/opensearch/common/cache/policy/TookTimePolicy.java index 55aee6825b992..9f75aed074935 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicy.java +++ b/server/src/main/java/org/opensearch/common/cache/policy/TookTimePolicy.java @@ -11,40 +11,25 @@ * GitHub history for details. */ -package org.opensearch.cache.common.tier; +package org.opensearch.common.cache.policy; -import org.opensearch.common.cache.CachePolicyInfoWrapper; -import org.opensearch.common.cache.CacheTierPolicy; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.core.common.bytes.BytesReference; import java.util.function.Function; /** - * A cache tier policy which accepts queries whose took time is greater than some threshold, - * which is specified as a dynamic cluster-level setting. The threshold should be set to approximately + * A cache tier policy which accepts queries whose took time is greater than some threshold. + * The threshold should be set to approximately * the time it takes to get a result from the cache tier. - * The policy expects to be able to read a CachePolicyInfoWrapper from the start of the BytesReference. + * The policy accepts values of type V and decodes them into CachePolicyInfoWrapper, which has the data needed + * to decide whether to admit the value. */ -public class DiskTierTookTimePolicy implements CacheTierPolicy { - /*public static final Setting DISK_TOOKTIME_THRESHOLD_SETTING = Setting.positiveTimeSetting( - "indices.requests.cache.disk.tooktime.threshold", - TimeValue.ZERO, - Setting.Property.Dynamic, - Setting.Property.NodeScope - );*/ // Set this to TimeValue.ZERO to let all data through +public class TookTimePolicy implements CacheTierPolicy { + private TimeValue threshold; // Set this to TimeValue.ZERO to let all data through + private final Function getPolicyInfoFn; - private TimeValue threshold; - private final Function getPolicyInfoFn; - - public DiskTierTookTimePolicy( - Settings settings, - Function getPolicyInfoFn - ) { - this.threshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD.get(settings); + public TookTimePolicy(TimeValue threshold, Function getPolicyInfoFn) { + this.threshold = threshold; this.getPolicyInfoFn = getPolicyInfoFn; } @@ -53,7 +38,7 @@ protected void setThreshold(TimeValue threshold) { // protected so that we can m } @Override - public boolean checkData(BytesReference data) { + public boolean checkData(V data) { Long tookTimeNanos; try { tookTimeNanos = getPolicyInfoFn.apply(data).getTookTimeNanos(); diff --git a/server/src/main/java/org/opensearch/common/cache/policy/package-info.java b/server/src/main/java/org/opensearch/common/cache/policy/package-info.java new file mode 100644 index 0000000000000..ce9c2f62d7da2 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/policy/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** A package for policies controlling what can enter caches. */ +package org.opensearch.common.cache.policy; diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index 6fefea6578fb9..f8c023fc24318 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -10,8 +10,10 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.policy.CachePolicyInfoWrapper; import org.opensearch.common.settings.Settings; +import java.util.function.Function; import java.util.function.ToLongBiFunction; /** @@ -41,12 +43,16 @@ public class CacheConfig { private final RemovalListener removalListener; + /** A function which extracts policy-relevant information, such as took time, from values, to allow inspection by policies if present. */ + private Function policyInfoWrapperFunction; + private CacheConfig(Builder builder) { this.keyType = builder.keyType; this.valueType = builder.valueType; this.settings = builder.settings; this.removalListener = builder.removalListener; this.weigher = builder.weigher; + this.policyInfoWrapperFunction = builder.policyInfoWrapperFunction; } public Class getKeyType() { @@ -69,6 +75,10 @@ public ToLongBiFunction getWeigher() { return weigher; } + public Function getPolicyInfoWrapperFunction() { + return policyInfoWrapperFunction; + } + /** * Builder class to build Cache config related parameters. * @param Type of key. @@ -85,6 +95,7 @@ public static class Builder { private RemovalListener removalListener; private ToLongBiFunction weigher; + private Function policyInfoWrapperFunction; public Builder() {} @@ -113,6 +124,11 @@ public Builder setWeigher(ToLongBiFunction weigher) { return this; } + public Builder setPolicyInfoWrapperFunction(Function function) { + this.policyInfoWrapperFunction = function; + return this; + } + public CacheConfig build() { return new CacheConfig<>(this); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 630ce068ba782..7fb85bab4dabe 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -62,7 +62,7 @@ import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; -import org.opensearch.common.cache.CachePolicyInfoWrapper; +import org.opensearch.common.cache.policy.CachePolicyInfoWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java b/server/src/test/java/org/opensearch/common/cache/policy/TookTimePolicyTests.java similarity index 84% rename from modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java rename to server/src/test/java/org/opensearch/common/cache/policy/TookTimePolicyTests.java index a7e261df7bb3d..881d9c02d67f9 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/DiskTierTookTimePolicyTests.java +++ b/server/src/test/java/org/opensearch/common/cache/policy/TookTimePolicyTests.java @@ -6,24 +6,17 @@ * compatible open source license. */ -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.cache.common.tier; +package org.opensearch.common.cache.policy; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; import org.opensearch.action.OriginalIndices; +import org.opensearch.action.OriginalIndicesTests; import org.opensearch.action.search.SearchRequest; import org.opensearch.common.UUIDs; -import org.opensearch.common.cache.CachePolicyInfoWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesReference; @@ -39,7 +32,7 @@ import java.io.IOException; import java.util.function.Function; -public class DiskTierTookTimePolicyTests extends OpenSearchTestCase { +public class TookTimePolicyTests extends OpenSearchTestCase { private final Function transformationFunction = (data) -> { try { return getPolicyInfo(data); @@ -52,14 +45,12 @@ private CachePolicyInfoWrapper getPolicyInfo(BytesReference data) throws IOExcep return new CachePolicyInfoWrapper(data.streamInput()); } - private DiskTierTookTimePolicy getTookTimePolicy() { - // dummy settings - Settings dummySettings = Settings.EMPTY; - return new DiskTierTookTimePolicy(dummySettings, transformationFunction); + private TookTimePolicy getTookTimePolicy() { + return new TookTimePolicy<>(TimeValue.ZERO, transformationFunction); } public void testTookTimePolicy() throws Exception { - DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy(); + TookTimePolicy tookTimePolicy = getTookTimePolicy(); // manually set threshold for test double threshMillis = 10; @@ -74,7 +65,7 @@ public void testTookTimePolicy() throws Exception { boolean longResult = tookTimePolicy.checkData(longTime); assertTrue(longResult); - DiskTierTookTimePolicy disabledPolicy = getTookTimePolicy(); + TookTimePolicy disabledPolicy = getTookTimePolicy(); disabledPolicy.setThreshold(TimeValue.ZERO); shortResult = disabledPolicy.checkData(shortTime); assertTrue(shortResult); @@ -83,7 +74,7 @@ public void testTookTimePolicy() throws Exception { } public void testMissingWrapper() throws Exception { - DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy(); + TookTimePolicy tookTimePolicy = getTookTimePolicy(); tookTimePolicy.setThreshold(TimeValue.ZERO); QuerySearchResult qsr = getQSR(); BytesStreamOutput out = new BytesStreamOutput(); @@ -97,9 +88,9 @@ public void testNullTookTime() throws Exception { // Null took time should always be rejected (because it might be the result of a // BytesReference without a CachePolicyInfoWrapper in front of it) - DiskTierTookTimePolicy zeroThreshold = getTookTimePolicy(); + TookTimePolicy zeroThreshold = getTookTimePolicy(); zeroThreshold.setThreshold(TimeValue.ZERO); - DiskTierTookTimePolicy nonZeroThreshold = getTookTimePolicy(); + TookTimePolicy nonZeroThreshold = getTookTimePolicy(); nonZeroThreshold.setThreshold(new TimeValue(10L)); Long nullTookTime = null; @@ -120,7 +111,7 @@ public static QuerySearchResult getQSR() { ShardId shardId = new ShardId("index", "uuid", randomInt()); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); ShardSearchRequest shardSearchRequest = new ShardSearchRequest( - OriginalIndices.NONE, + OriginalIndicesTests.randomOriginalIndices(), searchRequest, shardId, 1, @@ -148,7 +139,7 @@ private BytesReference getValidPolicyInput(QuerySearchResult qsr, long tookTimeN CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(tookTimeNanos); BytesStreamOutput out = new BytesStreamOutput(); policyInfo.writeTo(out); - qsr.writeTo(out); + qsr.writeTo(out); // This fails when OriginalIndices is OriginalIndices.NONE. return out.bytes(); } } From 0fa4b32ef2dc2599048ac2d5347edde2ed25b40a Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 5 Mar 2024 18:03:19 -0800 Subject: [PATCH 08/22] Reverted unintended change to idea/vcs.xml Signed-off-by: Peter Alfonsi --- .idea/vcs.xml | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index c668657daf908..48557884a8893 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,20 +1,20 @@ - - - + + + - + - \ No newline at end of file + From 9a7592a43081d0b983d88b22ab2e04fe159d4dc9 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 5 Mar 2024 20:05:03 -0800 Subject: [PATCH 09/22] javadocs Signed-off-by: Peter Alfonsi --- .../cache/common/tier/TieredSpilloverCache.java | 10 ++++++++++ .../common/tier/TieredSpilloverCacheSettings.java | 3 +++ 2 files changed, 13 insertions(+) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index d9eb16e8ab97e..1048e4bb8808d 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -358,11 +358,21 @@ public Builder setCacheFactories(Map cacheFactorie return this; } + /** + * Set a cache policy to be used to limit access to this cache's disk tier. + * @param policy the policy + * @return builder + */ public Builder setPolicy(CacheTierPolicy policy) { this.policies.add(policy); return this; } + /** + * Set multiple policies to be used to limit access to this cache's disk tier. + * @param policies the policies + * @return builder + */ public Builder setPolicies(List> policies) { this.policies.addAll(policies); return this; diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index 01e882ae1d30b..ae8424d1251a7 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -38,6 +38,9 @@ public class TieredSpilloverCacheSettings { (key) -> Setting.simpleString(key, "", NodeScope) ); + /** + * Setting defining the minimum took time for a query to be allowed into the disk cache. + */ public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD = Setting.suffixKeySetting( TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.took_time.threshold", (key) -> Setting.timeSetting(key, TimeValue.ZERO, NodeScope, Dynamic) From 8606d9b5850d750b85915d046b31b81008ca2c59 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 6 Mar 2024 14:23:37 -0800 Subject: [PATCH 10/22] github actions Signed-off-by: Peter Alfonsi From 7ead34e9a7d159288e7003965c2c5cd0069b10ff Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Mon, 11 Mar 2024 14:58:41 -0700 Subject: [PATCH 11/22] Set default threshold value to 10 ms Signed-off-by: Peter Alfonsi --- .../cache/common/tier/TieredSpilloverCacheSettings.java | 6 +++++- .../cache/common/tier/TieredSpilloverCacheTests.java | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index ae8424d1251a7..f1f3dabe536c0 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -11,6 +11,8 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; +import java.util.concurrent.TimeUnit; + import static org.opensearch.common.settings.Setting.Property.Dynamic; import static org.opensearch.common.settings.Setting.Property.NodeScope; @@ -43,8 +45,10 @@ public class TieredSpilloverCacheSettings { */ public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD = Setting.suffixKeySetting( TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.took_time.threshold", - (key) -> Setting.timeSetting(key, TimeValue.ZERO, NodeScope, Dynamic) + (key) -> Setting.timeSetting(key, new TimeValue(10, TimeUnit.MILLISECONDS), NodeScope, Dynamic) ); + // 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range. + // Will be tuned further with future benchmarks. /** * Default constructor diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 43519233b2c10..2a92c684c4436 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -129,9 +129,9 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception .setPolicyInfoWrapperFunction(new Function() { @Override public CachePolicyInfoWrapper apply(String s) { - return new CachePolicyInfoWrapper(10_000L); + return new CachePolicyInfoWrapper(20_000_000L); } - }) // Values will always appear to have taken 10_000 ns = 10 ms to compute + }) // Values will always appear to have taken 20_000_000 ns = 20 ms to compute .build(), CacheType.INDICES_REQUEST_CACHE, Map.of( From 8f15ce703a2ccbe1049fcf5bfd4fb9516bc4bf45 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 12 Mar 2024 16:28:13 -0700 Subject: [PATCH 12/22] Addressed Sorabh's comments Signed-off-by: Peter Alfonsi --- .../cache/common}/policy/TookTimePolicy.java | 43 ++++-- .../common/tier/TieredSpilloverCache.java | 35 ++--- .../common/policy/TookTimePolicyTests.java | 101 ++++++++++++ .../tier/TieredSpilloverCacheTests.java | 29 ++-- .../cache/policy/CachePolicyInfoWrapper.java | 42 ----- .../common/cache/policy/CacheTierPolicy.java | 21 --- .../cache/policy/CachedQueryResult.java | 64 ++++++++ .../cache/store/config/CacheConfig.java | 15 +- .../opensearch/indices/IndicesService.java | 18 +-- .../cache/policy/TookTimePolicyTests.java | 145 ------------------ 10 files changed, 236 insertions(+), 277 deletions(-) rename {server/src/main/java/org/opensearch/common/cache => modules/cache-common/src/main/java/org/opensearch/cache/common}/policy/TookTimePolicy.java (61%) create mode 100644 modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java delete mode 100644 server/src/main/java/org/opensearch/common/cache/policy/CachePolicyInfoWrapper.java delete mode 100644 server/src/main/java/org/opensearch/common/cache/policy/CacheTierPolicy.java create mode 100644 server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java delete mode 100644 server/src/test/java/org/opensearch/common/cache/policy/TookTimePolicyTests.java diff --git a/server/src/main/java/org/opensearch/common/cache/policy/TookTimePolicy.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java similarity index 61% rename from server/src/main/java/org/opensearch/common/cache/policy/TookTimePolicy.java rename to modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java index 9f75aed074935..d2b5d78047626 100644 --- a/server/src/main/java/org/opensearch/common/cache/policy/TookTimePolicy.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java @@ -11,11 +11,12 @@ * GitHub history for details. */ -package org.opensearch.common.cache.policy; +package org.opensearch.cache.common.policy; import org.opensearch.common.unit.TimeValue; import java.util.function.Function; +import java.util.function.Predicate; /** * A cache tier policy which accepts queries whose took time is greater than some threshold. @@ -23,25 +24,38 @@ * the time it takes to get a result from the cache tier. * The policy accepts values of type V and decodes them into CachePolicyInfoWrapper, which has the data needed * to decide whether to admit the value. + * @param The type of data consumed by test(). */ -public class TookTimePolicy implements CacheTierPolicy { - private TimeValue threshold; // Set this to TimeValue.ZERO to let all data through - private final Function getPolicyInfoFn; +public class TookTimePolicy implements Predicate { + /** + * The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through. + */ + private final TimeValue threshold; - public TookTimePolicy(TimeValue threshold, Function getPolicyInfoFn) { - this.threshold = threshold; - this.getPolicyInfoFn = getPolicyInfoFn; - } + /** + * Function which extracts took time in nanoseconds from a serialized CachedQueryResult + */ + private final Function cachedResultParser; // - protected void setThreshold(TimeValue threshold) { // protected so that we can manually set value in unit test + /** + * Constructs a took time policy. + * @param threshold the threshold + * @param cachedResultParser the function providing took time + */ + public TookTimePolicy(TimeValue threshold, Function cachedResultParser) { this.threshold = threshold; + this.cachedResultParser = cachedResultParser; } - @Override - public boolean checkData(V data) { + /** + * Check whether to admit data. + * @param data the input argument + * @return whether to admit the data + */ + public boolean test(V data) { Long tookTimeNanos; try { - tookTimeNanos = getPolicyInfoFn.apply(data).getTookTimeNanos(); + tookTimeNanos = cachedResultParser.apply(data); } catch (Exception e) { // If we can't read a CachePolicyInfoWrapper from the BytesReference, reject the data return false; @@ -53,11 +67,6 @@ public boolean checkData(V data) { // reads an optional long, which will end up as null in this case. This is why we should reject it. return false; } - - if (threshold.equals(TimeValue.ZERO)) { - // If the policy is set to zero, admit any well-formed data - return true; - } TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos); if (tookTime.compareTo(threshold) < 0) { // negative -> tookTime is shorter than threshold return false; diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 1048e4bb8808d..95c70b6d227bb 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -8,15 +8,13 @@ package org.opensearch.cache.common.tier; +import org.opensearch.cache.common.policy.TookTimePolicy; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; -import org.opensearch.common.cache.policy.CachePolicyInfoWrapper; -import org.opensearch.common.cache.policy.CacheTierPolicy; -import org.opensearch.common.cache.policy.TookTimePolicy; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -33,6 +31,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import java.util.function.Predicate; /** * This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap @@ -57,7 +56,7 @@ public class TieredSpilloverCache implements ICache { * Maintains caching tiers in ascending order of cache latency. */ private final List> cacheList; - private final List> policies; + private final List> policies; TieredSpilloverCache(Builder builder) { Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null"); @@ -69,7 +68,7 @@ public class TieredSpilloverCache implements ICache { @Override public void onRemoval(RemovalNotification notification) { try (ReleasableLock ignore = writeLock.acquire()) { - if (checkPolicies(notification.getValue())) { + if (evaluatePolicies(notification.getValue())) { diskCache.put(notification.getKey(), notification.getValue()); } } @@ -88,11 +87,7 @@ public void onRemoval(RemovalNotification notification) { this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories); this.cacheList = Arrays.asList(onHeapCache, diskCache); - List> builderPolicies = builder.policies; - if (builderPolicies == null) { - builderPolicies = new ArrayList<>(); - } - this.policies = builderPolicies; + this.policies = builder.policies; // Will never be null; builder initializes it to an empty list } // Package private for testing @@ -207,9 +202,9 @@ private Function getValueFromTieredCache() { }; } - boolean checkPolicies(V value) { - for (CacheTierPolicy policy : policies) { - if (!policy.checkData(value)) { + boolean evaluatePolicies(V value) { + for (Predicate policy : policies) { + if (!policy.test(value)) { return false; } } @@ -259,9 +254,9 @@ public ICache create(CacheConfig config, CacheType cacheType, TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD .getConcreteSettingForNamespace(cacheType.getSettingPrefix()) .get(settings); - Function policyInfoWrapperFunction = Objects.requireNonNull( - config.getPolicyInfoWrapperFunction(), - "Policy info wrapper fn can't be null" + Function cachedResultParser = Objects.requireNonNull( + config.getCachedResultParser(), + "Cached result parser fn can't be null" ); return new Builder().setDiskCacheFactory(diskCacheFactory) @@ -269,7 +264,7 @@ public ICache create(CacheConfig config, CacheType cacheType, .setRemovalListener(config.getRemovalListener()) .setCacheConfig(config) .setCacheType(cacheType) - .setPolicy(new TookTimePolicy(diskPolicyThreshold, policyInfoWrapperFunction)) + .addPolicy(new TookTimePolicy(diskPolicyThreshold, cachedResultParser)) .build(); } @@ -291,7 +286,7 @@ public static class Builder { private CacheConfig cacheConfig; private CacheType cacheType; private Map cacheFactories; - private final ArrayList> policies = new ArrayList<>(); + private final ArrayList> policies = new ArrayList<>(); /** * Default constructor @@ -363,7 +358,7 @@ public Builder setCacheFactories(Map cacheFactorie * @param policy the policy * @return builder */ - public Builder setPolicy(CacheTierPolicy policy) { + public Builder addPolicy(Predicate policy) { this.policies.add(policy); return this; } @@ -373,7 +368,7 @@ public Builder setPolicy(CacheTierPolicy policy) { * @param policies the policies * @return builder */ - public Builder setPolicies(List> policies) { + public Builder addPolicies(List> policies) { this.policies.addAll(policies); return this; } diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java new file mode 100644 index 0000000000000..fc83bc965dc97 --- /dev/null +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.common.policy; + +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.opensearch.common.Randomness; +import org.opensearch.common.cache.policy.CachedQueryResult; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.search.TopDocsAndMaxScore; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.search.DocValueFormat; +import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Random; +import java.util.function.Function; + +public class TookTimePolicyTests extends OpenSearchTestCase { + private final Function transformationFunction = (data) -> { + try { + return CachedQueryResult.getTookTimeNanos(data); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + + private TookTimePolicy getTookTimePolicy(TimeValue threshold) { + return new TookTimePolicy<>(threshold, transformationFunction); + } + + public void testTookTimePolicy() throws Exception { + double threshMillis = 10; + long shortMillis = (long) (0.9 * threshMillis); + long longMillis = (long) (1.5 * threshMillis); + TookTimePolicy tookTimePolicy = getTookTimePolicy(new TimeValue((long) threshMillis)); + BytesReference shortTime = getValidPolicyInput(shortMillis * 1000000); + BytesReference longTime = getValidPolicyInput(longMillis * 1000000); + + boolean shortResult = tookTimePolicy.test(shortTime); + assertFalse(shortResult); + boolean longResult = tookTimePolicy.test(longTime); + assertTrue(longResult); + + TookTimePolicy disabledPolicy = getTookTimePolicy(TimeValue.ZERO); + shortResult = disabledPolicy.test(shortTime); + assertTrue(shortResult); + longResult = disabledPolicy.test(longTime); + assertTrue(longResult); + } + + public void testMissingWrapper() throws Exception { + TookTimePolicy tookTimePolicy = getTookTimePolicy(TimeValue.ZERO); + BytesStreamOutput out = new BytesStreamOutput(); + getQSR().writeToNoId(out); + BytesReference missingWrapper = out.bytes(); + boolean allowedMissingWrapper = tookTimePolicy.test(missingWrapper); + assertFalse(allowedMissingWrapper); + } + + private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException { + // When it's used in the cache, the policy will receive BytesReferences which have a CachePolicyInfoWrapper + // at the beginning of them, followed by the actual QSR. + CachedQueryResult cachedQueryResult = new CachedQueryResult(getQSR(), tookTimeNanos); + BytesStreamOutput out = new BytesStreamOutput(); + cachedQueryResult.writeToNoId(out); + return out.bytes(); + } + + private QuerySearchResult getQSR() { + // We can't mock the QSR with mockito because the class is final. Construct a real one + QuerySearchResult mockQSR = new QuerySearchResult(); + + // duplicated from DfsQueryPhaseTests.java + mockQSR.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + return mockQSR; + } + + private void writeRandomBytes(StreamOutput out, int numBytes) throws IOException { + Random rand = Randomness.get(); + byte[] bytes = new byte[numBytes]; + rand.nextBytes(bytes); + out.writeBytes(bytes); + } +} diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 2a92c684c4436..670c554553040 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -13,8 +13,6 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; -import org.opensearch.common.cache.policy.CachePolicyInfoWrapper; -import org.opensearch.common.cache.policy.CacheTierPolicy; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; @@ -36,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Predicate; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; @@ -126,10 +125,10 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) .setSettings(settings) - .setPolicyInfoWrapperFunction(new Function() { + .setCachedResultParser(new Function() { @Override - public CachePolicyInfoWrapper apply(String s) { - return new CachePolicyInfoWrapper(20_000_000L); + public Long apply(String s) { + return 20_000_000L; } }) // Values will always appear to have taken 20_000_000 ns = 20 ms to compute .build(), @@ -842,7 +841,7 @@ public void onRemoval(RemovalNotification notification) { public void testDiskTierPolicies() throws Exception { // For policy function, allow if what it receives starts with "a" and string is even length - ArrayList> policies = new ArrayList<>(); + ArrayList> policies = new ArrayList<>(); policies.add(new AllowFirstLetterA()); policies.add(new AllowEvenLengths()); @@ -963,10 +962,10 @@ public void testTookTimePolicyFromFactory() throws Exception { .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) .setSettings(settings) - .setPolicyInfoWrapperFunction(new Function() { + .setCachedResultParser(new Function() { @Override - public CachePolicyInfoWrapper apply(String s) { - return new CachePolicyInfoWrapper(tookTimeMap.get(s)); + public Long apply(String s) { + return tookTimeMap.get(s); } }) .build(), @@ -1011,9 +1010,9 @@ public CachePolicyInfoWrapper apply(String s) { } } - private static class AllowFirstLetterA implements CacheTierPolicy { + private static class AllowFirstLetterA implements Predicate { @Override - public boolean checkData(String data) { + public boolean test(String data) { try { return (data.charAt(0) == 'a'); } catch (StringIndexOutOfBoundsException e) { @@ -1022,9 +1021,9 @@ public boolean checkData(String data) { } } - private static class AllowEvenLengths implements CacheTierPolicy { + private static class AllowEvenLengths implements Predicate { @Override - public boolean checkData(String data) { + public boolean test(String data) { return data.length() % 2 == 0; } } @@ -1084,7 +1083,7 @@ private TieredSpilloverCache intializeTieredSpilloverCache( RemovalListener removalListener, Settings settings, long diskDeliberateDelay, - List> policies + List> policies ) { ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) @@ -1104,7 +1103,7 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .setDiskCacheFactory(mockDiskCacheFactory) .setCacheConfig(cacheConfig); if (policies != null) { - builder.setPolicies(policies); + builder.addPolicies(policies); } return builder.build(); } diff --git a/server/src/main/java/org/opensearch/common/cache/policy/CachePolicyInfoWrapper.java b/server/src/main/java/org/opensearch/common/cache/policy/CachePolicyInfoWrapper.java deleted file mode 100644 index 99427e4344feb..0000000000000 --- a/server/src/main/java/org/opensearch/common/cache/policy/CachePolicyInfoWrapper.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.cache.policy; - -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; - -import java.io.IOException; - -/** - * A class containing information needed for all CacheTierPolicy objects to decide whether to admit - * a given BytesReference. This spares us from having to create an entire short-lived QuerySearchResult object - * just to read a few values. - */ -public class CachePolicyInfoWrapper implements Writeable { - private final Long tookTimeNanos; - - public CachePolicyInfoWrapper(Long tookTimeNanos) { - this.tookTimeNanos = tookTimeNanos; - // Add more values here as they are needed for future cache tier policies - } - - public CachePolicyInfoWrapper(StreamInput in) throws IOException { - this.tookTimeNanos = in.readOptionalLong(); - } - - public Long getTookTimeNanos() { - return tookTimeNanos; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalLong(tookTimeNanos); - } -} diff --git a/server/src/main/java/org/opensearch/common/cache/policy/CacheTierPolicy.java b/server/src/main/java/org/opensearch/common/cache/policy/CacheTierPolicy.java deleted file mode 100644 index a3d0c42b9a739..0000000000000 --- a/server/src/main/java/org/opensearch/common/cache/policy/CacheTierPolicy.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.cache.policy; - -/** - * An interface for policies that inspect data of type T to decide whether they are admitted into a cache tier. - */ -public interface CacheTierPolicy { - /** - * Determines whether this policy allows the data into its cache tier. - * @param data The data to check - * @return true if accepted, otherwise false - */ - boolean checkData(T data); -} diff --git a/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java new file mode 100644 index 0000000000000..4621061304593 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.policy; + +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.search.query.QuerySearchResult; + +import java.io.IOException; + +/** + * A class containing a QuerySearchResult used in a cache, as well as information needed for all cache policies + * to decide whether to admit a given BytesReference. Also handles serialization/deserialization of the underlying QuerySearchResult, + * which is all that is needed outside the cache. At policy checking time, this spares us from having to create an entire + * short-lived QuerySearchResult object just to read a few values. + */ +public class CachedQueryResult { + private final long tookTimeNanos; + + private final QuerySearchResult qsr; + + public CachedQueryResult(QuerySearchResult qsr, long tookTimeNanos) { + this.qsr = qsr; + this.tookTimeNanos = tookTimeNanos; + } + + public long getTookTimeNanos() { + return tookTimeNanos; + } + + // Retrieve only took time from a serialized CQR, without creating a short-lived QuerySearchResult or CachedQueryResult object. + public static long getTookTimeNanos(BytesReference serializedCQR) throws IOException { + StreamInput in = serializedCQR.streamInput(); + return in.readOptionalLong(); + } + + // Retrieve only the QSR from a serialized CQR, and load it into an existing QSR object discarding the took time which isn't needed + // outside the cache + public static void loadQSR( + BytesReference serializedCQR, + QuerySearchResult qsr, + ShardSearchContextId id, + NamedWriteableRegistry registry + ) throws IOException { + StreamInput in = new NamedWriteableAwareStreamInput(serializedCQR.streamInput(), registry); + in.readOptionalLong(); // Read and discard took time + qsr.readFromWithId(id, in); + } + + public void writeToNoId(StreamOutput out) throws IOException { + out.writeOptionalLong(tookTimeNanos); + qsr.writeToNoId(out); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index f8c023fc24318..dad4956c821d8 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -10,7 +10,6 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.RemovalListener; -import org.opensearch.common.cache.policy.CachePolicyInfoWrapper; import org.opensearch.common.settings.Settings; import java.util.function.Function; @@ -44,7 +43,7 @@ public class CacheConfig { private final RemovalListener removalListener; /** A function which extracts policy-relevant information, such as took time, from values, to allow inspection by policies if present. */ - private Function policyInfoWrapperFunction; + private Function cachedResultParser; private CacheConfig(Builder builder) { this.keyType = builder.keyType; @@ -52,7 +51,7 @@ private CacheConfig(Builder builder) { this.settings = builder.settings; this.removalListener = builder.removalListener; this.weigher = builder.weigher; - this.policyInfoWrapperFunction = builder.policyInfoWrapperFunction; + this.cachedResultParser = builder.cachedResultParser; } public Class getKeyType() { @@ -75,8 +74,8 @@ public ToLongBiFunction getWeigher() { return weigher; } - public Function getPolicyInfoWrapperFunction() { - return policyInfoWrapperFunction; + public Function getCachedResultParser() { + return cachedResultParser; } /** @@ -95,7 +94,7 @@ public static class Builder { private RemovalListener removalListener; private ToLongBiFunction weigher; - private Function policyInfoWrapperFunction; + private Function cachedResultParser; public Builder() {} @@ -124,8 +123,8 @@ public Builder setWeigher(ToLongBiFunction weigher) { return this; } - public Builder setPolicyInfoWrapperFunction(Function function) { - this.policyInfoWrapperFunction = function; + public Builder setCachedResultParser(Function function) { + this.cachedResultParser = function; return this; } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 7fb85bab4dabe..6878f5e95d85e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -62,7 +62,7 @@ import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; -import org.opensearch.common.cache.policy.CachePolicyInfoWrapper; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; @@ -82,9 +82,7 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; @@ -1700,20 +1698,22 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), out -> { long beforeQueryPhase = System.nanoTime(); queryPhase.execute(context); - CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(System.nanoTime() - beforeQueryPhase); - policyInfo.writeTo(out); + CachedQueryResult cachedQueryResult = new CachedQueryResult(context.queryResult(), System.nanoTime() - beforeQueryPhase); + cachedQueryResult.writeToNoId(out); // Write relevant info for cache tier policies before the whole QuerySearchResult, so we don't have to read // the whole QSR into memory when we decide whether to allow it into a particular cache tier based on took time/other info - context.queryResult().writeToNoId(out); + // context.queryResult().writeToNoId(out); loadedFromCache[0] = false; }); if (loadedFromCache[0]) { // restore the cached query result into the context final QuerySearchResult result = context.queryResult(); - StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry); - CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(in); // This wrapper is not needed outside the cache - result.readFromWithId(context.id(), in); + /*StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry); + CachedQueryResult policyInfo = new CachedQueryResult(in); // This wrapper is not needed outside the cache + result.readFromWithId(context.id(), in);*/ + // Load the cached QSR into result, discarding values used only in the cache + CachedQueryResult.loadQSR(bytesReference, result, context.id(), namedWriteableRegistry); result.setSearchShardTarget(context.shardTarget()); } else if (context.queryResult().searchTimedOut()) { // we have to invalidate the cache entry if we cached a query result form a request that timed out. diff --git a/server/src/test/java/org/opensearch/common/cache/policy/TookTimePolicyTests.java b/server/src/test/java/org/opensearch/common/cache/policy/TookTimePolicyTests.java deleted file mode 100644 index 881d9c02d67f9..0000000000000 --- a/server/src/test/java/org/opensearch/common/cache/policy/TookTimePolicyTests.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.cache.policy; - -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.TotalHits; -import org.opensearch.action.OriginalIndices; -import org.opensearch.action.OriginalIndicesTests; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.common.UUIDs; -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.common.lucene.search.TopDocsAndMaxScore; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.core.common.Strings; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.search.DocValueFormat; -import org.opensearch.search.SearchShardTarget; -import org.opensearch.search.internal.AliasFilter; -import org.opensearch.search.internal.ShardSearchContextId; -import org.opensearch.search.internal.ShardSearchRequest; -import org.opensearch.search.query.QuerySearchResult; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.IOException; -import java.util.function.Function; - -public class TookTimePolicyTests extends OpenSearchTestCase { - private final Function transformationFunction = (data) -> { - try { - return getPolicyInfo(data); - } catch (IOException e) { - throw new RuntimeException(e); - } - }; - - private CachePolicyInfoWrapper getPolicyInfo(BytesReference data) throws IOException { - return new CachePolicyInfoWrapper(data.streamInput()); - } - - private TookTimePolicy getTookTimePolicy() { - return new TookTimePolicy<>(TimeValue.ZERO, transformationFunction); - } - - public void testTookTimePolicy() throws Exception { - TookTimePolicy tookTimePolicy = getTookTimePolicy(); - - // manually set threshold for test - double threshMillis = 10; - long shortMillis = (long) (0.9 * threshMillis); - long longMillis = (long) (1.5 * threshMillis); - tookTimePolicy.setThreshold(new TimeValue((long) threshMillis)); - BytesReference shortTime = getValidPolicyInput(getQSR(), shortMillis * 1000000); - BytesReference longTime = getValidPolicyInput(getQSR(), longMillis * 1000000); - - boolean shortResult = tookTimePolicy.checkData(shortTime); - assertFalse(shortResult); - boolean longResult = tookTimePolicy.checkData(longTime); - assertTrue(longResult); - - TookTimePolicy disabledPolicy = getTookTimePolicy(); - disabledPolicy.setThreshold(TimeValue.ZERO); - shortResult = disabledPolicy.checkData(shortTime); - assertTrue(shortResult); - longResult = disabledPolicy.checkData(longTime); - assertTrue(longResult); - } - - public void testMissingWrapper() throws Exception { - TookTimePolicy tookTimePolicy = getTookTimePolicy(); - tookTimePolicy.setThreshold(TimeValue.ZERO); - QuerySearchResult qsr = getQSR(); - BytesStreamOutput out = new BytesStreamOutput(); - qsr.writeTo(out); - BytesReference missingWrapper = out.bytes(); - boolean allowedMissingWrapper = tookTimePolicy.checkData(missingWrapper); - assertFalse(allowedMissingWrapper); - } - - public void testNullTookTime() throws Exception { - // Null took time should always be rejected (because it might be the result of a - // BytesReference without a CachePolicyInfoWrapper in front of it) - - TookTimePolicy zeroThreshold = getTookTimePolicy(); - zeroThreshold.setThreshold(TimeValue.ZERO); - TookTimePolicy nonZeroThreshold = getTookTimePolicy(); - nonZeroThreshold.setThreshold(new TimeValue(10L)); - - Long nullTookTime = null; - CachePolicyInfoWrapper nullWrapper = new CachePolicyInfoWrapper(nullTookTime); - BytesStreamOutput out = new BytesStreamOutput(); - nullWrapper.writeTo(out); - QuerySearchResult qsr = getQSR(); - qsr.writeTo(out); - BytesReference data = out.bytes(); - - assertFalse(zeroThreshold.checkData(data)); - assertFalse(nonZeroThreshold.checkData(data)); - } - - public static QuerySearchResult getQSR() { - // package-private, also used by IndicesRequestCacheTests.java - // setup from QuerySearchResultTests.java - ShardId shardId = new ShardId("index", "uuid", randomInt()); - SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); - ShardSearchRequest shardSearchRequest = new ShardSearchRequest( - OriginalIndicesTests.randomOriginalIndices(), - searchRequest, - shardId, - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, - randomNonNegativeLong(), - null, - new String[0] - ); - ShardSearchContextId id = new ShardSearchContextId(UUIDs.base64UUID(), randomLong()); - QuerySearchResult result = new QuerySearchResult( - id, - new SearchShardTarget("node", shardId, null, OriginalIndices.NONE), - shardSearchRequest - ); - TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]); - result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]); - - return result; - } - - private BytesReference getValidPolicyInput(QuerySearchResult qsr, long tookTimeNanos) throws IOException { - // When it's used in the cache, the policy will receive BytesReferences which have a CachePolicyInfoWrapper - // at the beginning of them, followed by the actual QSR. - CachePolicyInfoWrapper policyInfo = new CachePolicyInfoWrapper(tookTimeNanos); - BytesStreamOutput out = new BytesStreamOutput(); - policyInfo.writeTo(out); - qsr.writeTo(out); // This fails when OriginalIndices is OriginalIndices.NONE. - return out.bytes(); - } -} From 9f5defc278f5011a39f44c0cef0136d728965b61 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 13 Mar 2024 15:10:34 -0700 Subject: [PATCH 13/22] Addressed Sorabh's second round of comments Signed-off-by: Peter Alfonsi --- .../cache/common/policy/TookTimePolicy.java | 31 +++++--------- .../common/tier/TieredSpilloverCache.java | 3 +- .../tier/TieredSpilloverCacheSettings.java | 5 +-- .../common/policy/TookTimePolicyTests.java | 8 ++-- .../tier/TieredSpilloverCacheTests.java | 13 +++--- .../cache/policy/CachedQueryResult.java | 40 ++++++++++++++----- .../cache/store/config/CacheConfig.java | 9 +++-- 7 files changed, 60 insertions(+), 49 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java index d2b5d78047626..ee0d4243c0ed8 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java @@ -13,6 +13,7 @@ package org.opensearch.cache.common.policy; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.unit.TimeValue; import java.util.function.Function; @@ -20,9 +21,8 @@ /** * A cache tier policy which accepts queries whose took time is greater than some threshold. - * The threshold should be set to approximately - * the time it takes to get a result from the cache tier. - * The policy accepts values of type V and decodes them into CachePolicyInfoWrapper, which has the data needed + * The threshold should be set to approximately the time it takes to get a result from the cache tier. + * The policy accepts values of type V and decodes them into CachedQueryResult.PolicyValues, which has the data needed * to decide whether to admit the value. * @param The type of data consumed by test(). */ @@ -33,16 +33,16 @@ public class TookTimePolicy implements Predicate { private final TimeValue threshold; /** - * Function which extracts took time in nanoseconds from a serialized CachedQueryResult + * Function which extracts the relevant PolicyValues from a serialized CachedQueryResult */ - private final Function cachedResultParser; // + private final Function cachedResultParser; /** * Constructs a took time policy. * @param threshold the threshold - * @param cachedResultParser the function providing took time + * @param cachedResultParser the function providing policy values */ - public TookTimePolicy(TimeValue threshold, Function cachedResultParser) { + public TookTimePolicy(TimeValue threshold, Function cachedResultParser) { this.threshold = threshold; this.cachedResultParser = cachedResultParser; } @@ -53,24 +53,15 @@ public TookTimePolicy(TimeValue threshold, Function cachedResultParser) * @return whether to admit the data */ public boolean test(V data) { - Long tookTimeNanos; + long tookTimeNanos; try { - tookTimeNanos = cachedResultParser.apply(data); + tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos(); } catch (Exception e) { - // If we can't read a CachePolicyInfoWrapper from the BytesReference, reject the data + // If we can't read a CachedQueryResult.PolicyValues from the BytesReference, reject the data return false; } - if (tookTimeNanos == null) { - // If the wrapper contains null took time, reject the data - // This can happen if no CachePolicyInfoWrapper was written to the BytesReference, as the wrapper's constructor - // reads an optional long, which will end up as null in this case. This is why we should reject it. - return false; - } TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos); - if (tookTime.compareTo(threshold) < 0) { // negative -> tookTime is shorter than threshold - return false; - } - return true; + return tookTime.compareTo(threshold) >= 0; } } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 95c70b6d227bb..a507416ce3988 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -15,6 +15,7 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -254,7 +255,7 @@ public ICache create(CacheConfig config, CacheType cacheType, TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD .getConcreteSettingForNamespace(cacheType.getSettingPrefix()) .get(settings); - Function cachedResultParser = Objects.requireNonNull( + Function cachedResultParser = Objects.requireNonNull( config.getCachedResultParser(), "Cached result parser fn can't be null" ); diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index f1f3dabe536c0..7f46b3dbf0287 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -13,7 +13,6 @@ import java.util.concurrent.TimeUnit; -import static org.opensearch.common.settings.Setting.Property.Dynamic; import static org.opensearch.common.settings.Setting.Property.NodeScope; /** @@ -44,8 +43,8 @@ public class TieredSpilloverCacheSettings { * Setting defining the minimum took time for a query to be allowed into the disk cache. */ public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD = Setting.suffixKeySetting( - TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.took_time.threshold", - (key) -> Setting.timeSetting(key, new TimeValue(10, TimeUnit.MILLISECONDS), NodeScope, Dynamic) + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold", + (key) -> Setting.timeSetting(key, new TimeValue(10, TimeUnit.MILLISECONDS), NodeScope) ); // 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range. // Will be tuned further with future benchmarks. diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java index fc83bc965dc97..f74e0e120a3d0 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java @@ -27,9 +27,9 @@ import java.util.function.Function; public class TookTimePolicyTests extends OpenSearchTestCase { - private final Function transformationFunction = (data) -> { + private final Function transformationFunction = (data) -> { try { - return CachedQueryResult.getTookTimeNanos(data); + return CachedQueryResult.getPolicyValues(data); } catch (IOException e) { throw new RuntimeException(e); } @@ -69,8 +69,8 @@ public void testMissingWrapper() throws Exception { } private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException { - // When it's used in the cache, the policy will receive BytesReferences which have a CachePolicyInfoWrapper - // at the beginning of them, followed by the actual QSR. + // When it's used in the cache, the policy will receive BytesReferences which come from + // serializing a CachedQueryResult. CachedQueryResult cachedQueryResult = new CachedQueryResult(getQSR(), tookTimeNanos); BytesStreamOutput out = new BytesStreamOutput(); cachedQueryResult.writeToNoId(out); diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 670c554553040..da0c143c26372 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -13,6 +13,7 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; @@ -125,10 +126,10 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) .setSettings(settings) - .setCachedResultParser(new Function() { + .setCachedResultParser(new Function() { @Override - public Long apply(String s) { - return 20_000_000L; + public CachedQueryResult.PolicyValues apply(String s) { + return new CachedQueryResult.PolicyValues(20_000_000L); } }) // Values will always appear to have taken 20_000_000 ns = 20 ms to compute .build(), @@ -962,10 +963,10 @@ public void testTookTimePolicyFromFactory() throws Exception { .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) .setSettings(settings) - .setCachedResultParser(new Function() { + .setCachedResultParser(new Function() { @Override - public Long apply(String s) { - return tookTimeMap.get(s); + public CachedQueryResult.PolicyValues apply(String s) { + return new CachedQueryResult.PolicyValues(tookTimeMap.get(s)); } }) .build(), diff --git a/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java index 4621061304593..be05d144e35d4 100644 --- a/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java +++ b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java @@ -13,6 +13,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.query.QuerySearchResult; @@ -25,23 +26,18 @@ * short-lived QuerySearchResult object just to read a few values. */ public class CachedQueryResult { - private final long tookTimeNanos; - + private final PolicyValues policyValues; private final QuerySearchResult qsr; public CachedQueryResult(QuerySearchResult qsr, long tookTimeNanos) { this.qsr = qsr; - this.tookTimeNanos = tookTimeNanos; - } - - public long getTookTimeNanos() { - return tookTimeNanos; + this.policyValues = new PolicyValues(tookTimeNanos); } // Retrieve only took time from a serialized CQR, without creating a short-lived QuerySearchResult or CachedQueryResult object. - public static long getTookTimeNanos(BytesReference serializedCQR) throws IOException { + public static PolicyValues getPolicyValues(BytesReference serializedCQR) throws IOException { StreamInput in = serializedCQR.streamInput(); - return in.readOptionalLong(); + return new PolicyValues(in); } // Retrieve only the QSR from a serialized CQR, and load it into an existing QSR object discarding the took time which isn't needed @@ -53,12 +49,34 @@ public static void loadQSR( NamedWriteableRegistry registry ) throws IOException { StreamInput in = new NamedWriteableAwareStreamInput(serializedCQR.streamInput(), registry); - in.readOptionalLong(); // Read and discard took time + PolicyValues pv = new PolicyValues(in); // Read and discard PolicyValues qsr.readFromWithId(id, in); } public void writeToNoId(StreamOutput out) throws IOException { - out.writeOptionalLong(tookTimeNanos); + policyValues.writeTo(out); qsr.writeToNoId(out); } + + public static class PolicyValues implements Writeable { + final long tookTimeNanos; + // More values can be added here as they're needed for future policies + + public PolicyValues(long tookTimeNanos) { + this.tookTimeNanos = tookTimeNanos; + } + + public PolicyValues(StreamInput in) throws IOException { + this.tookTimeNanos = in.readOptionalLong(); + } + + public long getTookTimeNanos() { + return tookTimeNanos; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalLong(tookTimeNanos); + } + } } diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index dad4956c821d8..d4425e28517c8 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -10,6 +10,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.settings.Settings; import java.util.function.Function; @@ -43,7 +44,7 @@ public class CacheConfig { private final RemovalListener removalListener; /** A function which extracts policy-relevant information, such as took time, from values, to allow inspection by policies if present. */ - private Function cachedResultParser; + private Function cachedResultParser; private CacheConfig(Builder builder) { this.keyType = builder.keyType; @@ -74,7 +75,7 @@ public ToLongBiFunction getWeigher() { return weigher; } - public Function getCachedResultParser() { + public Function getCachedResultParser() { return cachedResultParser; } @@ -94,7 +95,7 @@ public static class Builder { private RemovalListener removalListener; private ToLongBiFunction weigher; - private Function cachedResultParser; + private Function cachedResultParser; public Builder() {} @@ -123,7 +124,7 @@ public Builder setWeigher(ToLongBiFunction weigher) { return this; } - public Builder setCachedResultParser(Function function) { + public Builder setCachedResultParser(Function function) { this.cachedResultParser = function; return this; } From 0c9472ae318bd42263ea0258087388fe4273d007 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 13 Mar 2024 18:22:34 -0700 Subject: [PATCH 14/22] Set cachedQueryParser in IRC Signed-off-by: Peter Alfonsi --- .../cache/common/tier/TieredSpilloverCache.java | 2 ++ .../cache/common/policy/TookTimePolicyTests.java | 7 +++++++ .../cache/common/tier/TieredSpilloverCacheTests.java | 1 + .../common/cache/store/config/CacheConfig.java | 1 + .../java/org/opensearch/indices/IndicesRequestCache.java | 9 +++++++++ 5 files changed, 20 insertions(+) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index a507416ce3988..14328e4318a96 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -80,6 +80,8 @@ public void onRemoval(RemovalNotification notification) { .setValueType(builder.cacheConfig.getValueType()) .setSettings(builder.cacheConfig.getSettings()) .setWeigher(builder.cacheConfig.getWeigher()) + .setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes()) // TODO: Part of a workaround for an issue in TSC. Overall fix + // coming soon .build(), builder.cacheType, builder.cacheFactories diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java index f74e0e120a3d0..f433b4f39f3f4 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java @@ -68,6 +68,13 @@ public void testMissingWrapper() throws Exception { assertFalse(allowedMissingWrapper); } + public void testNegativeOneInput() throws Exception { + // PolicyValues with -1 took time can be passed to this policy if we shouldn't accept it for whatever reason + TookTimePolicy tookTimePolicy = getTookTimePolicy(TimeValue.ZERO); + BytesReference minusOne = getValidPolicyInput(-1L); + assertFalse(tookTimePolicy.test(minusOne)); + } + private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException { // When it's used in the cache, the policy will receive BytesReferences which come from // serializing a CachedQueryResult. diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 022da8a04759b..40c4696796a87 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -977,6 +977,7 @@ public void testTookTimePolicyFromFactory() throws Exception { .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) .setSettings(settings) + .setMaxSizeInBytes(onHeapCacheSize * keyValueSize) .setCachedResultParser(new Function() { @Override public CachedQueryResult.PolicyValues apply(String s) { diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index 3741e2a65e7ac..6ecb752f91fb9 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -90,6 +90,7 @@ public ToLongBiFunction getWeigher() { public Function getCachedResultParser() { return cachedResultParser; } + public Long getMaxSizeInBytes() { return maxSizeInBytes; } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 92fb278c946f1..d22f131853a78 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -44,6 +44,7 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; @@ -136,6 +137,14 @@ public final class IndicesRequestCache implements RemovalListener { + try { + return CachedQueryResult.getPolicyValues(bytesReference); + } catch (IOException e) { + // Set took time to -1, which will always be rejected by the policy. + return new CachedQueryResult.PolicyValues(-1); + } + }) .build(), CacheType.INDICES_REQUEST_CACHE ); From 891b5988b4080c6581fffabc9177b49d69c3d721 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Thu, 14 Mar 2024 09:30:33 -0700 Subject: [PATCH 15/22] Addressed Sorabh's comments besides dynamic setting Signed-off-by: Peter Alfonsi --- .../opensearch/cache/common/tier/TieredSpilloverCache.java | 2 +- .../cache/common/tier/TieredSpilloverCachePlugin.java | 2 +- .../cache/common/tier/TieredSpilloverCacheSettings.java | 5 +++-- .../cache/common/tier/TieredSpilloverCacheTests.java | 2 +- .../opensearch/common/cache/policy/CachedQueryResult.java | 5 +++-- .../src/main/java/org/opensearch/indices/IndicesService.java | 4 ---- 6 files changed, 9 insertions(+), 11 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 14328e4318a96..ae1b60e34fe43 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -254,7 +254,7 @@ public ICache create(CacheConfig config, CacheType cacheType, } ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName); - TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD + TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD .getConcreteSettingForNamespace(cacheType.getSettingPrefix()) .get(settings); Function cachedResultParser = Objects.requireNonNull( diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index 90c71fc01d2ce..0cc8a711faaf5 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -52,7 +52,7 @@ public List> getSettings() { TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); settingList.add( - TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD.getConcreteSettingForNamespace( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace( cacheType.getSettingPrefix() ) ); diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index 7f46b3dbf0287..a5c957ce1d776 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -13,6 +13,7 @@ import java.util.concurrent.TimeUnit; +import static org.opensearch.common.settings.Setting.Property.Dynamic; import static org.opensearch.common.settings.Setting.Property.NodeScope; /** @@ -42,9 +43,9 @@ public class TieredSpilloverCacheSettings { /** * Setting defining the minimum took time for a query to be allowed into the disk cache. */ - public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD = Setting.suffixKeySetting( + public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting( TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold", - (key) -> Setting.timeSetting(key, new TimeValue(10, TimeUnit.MILLISECONDS), NodeScope) + (key) -> Setting.timeSetting(key, new TimeValue(10, TimeUnit.MILLISECONDS), NodeScope, Dynamic) ); // 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range. // Will be tuned further with future benchmarks. diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 40c4696796a87..810dc11961bff 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -964,7 +964,7 @@ public void testTookTimePolicyFromFactory() throws Exception { onHeapCacheSize * keyValueSize + "b" ) .put( - TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD.getConcreteSettingForNamespace( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace( CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() ).getKey(), new TimeValue(timeValueThresholdNanos / 1_000_000) diff --git a/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java index be05d144e35d4..63fe3b07f2e5c 100644 --- a/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java +++ b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java @@ -24,6 +24,7 @@ * to decide whether to admit a given BytesReference. Also handles serialization/deserialization of the underlying QuerySearchResult, * which is all that is needed outside the cache. At policy checking time, this spares us from having to create an entire * short-lived QuerySearchResult object just to read a few values. + * @opensearch.internal */ public class CachedQueryResult { private final PolicyValues policyValues; @@ -67,7 +68,7 @@ public PolicyValues(long tookTimeNanos) { } public PolicyValues(StreamInput in) throws IOException { - this.tookTimeNanos = in.readOptionalLong(); + this.tookTimeNanos = in.readVLong(); } public long getTookTimeNanos() { @@ -76,7 +77,7 @@ public long getTookTimeNanos() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalLong(tookTimeNanos); + out.writeVLong(tookTimeNanos); } } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 9a92c480c504d..b0937006a2587 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -1704,16 +1704,12 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q cachedQueryResult.writeToNoId(out); // Write relevant info for cache tier policies before the whole QuerySearchResult, so we don't have to read // the whole QSR into memory when we decide whether to allow it into a particular cache tier based on took time/other info - // context.queryResult().writeToNoId(out); loadedFromCache[0] = false; }); if (loadedFromCache[0]) { // restore the cached query result into the context final QuerySearchResult result = context.queryResult(); - /*StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry); - CachedQueryResult policyInfo = new CachedQueryResult(in); // This wrapper is not needed outside the cache - result.readFromWithId(context.id(), in);*/ // Load the cached QSR into result, discarding values used only in the cache CachedQueryResult.loadQSR(bytesReference, result, context.id(), namedWriteableRegistry); result.setSearchShardTarget(context.shardTarget()); From aecd7218dc809b70726c25dacb9cf38b5ebed9e6 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 15 Mar 2024 11:46:42 -0700 Subject: [PATCH 16/22] Removed dynamic setting, misc comments Signed-off-by: Peter Alfonsi --- .../cache/common/tier/TieredSpilloverCacheSettings.java | 3 +-- .../src/main/java/org/opensearch/indices/IndicesService.java | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index a5c957ce1d776..1264803642cd3 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -13,7 +13,6 @@ import java.util.concurrent.TimeUnit; -import static org.opensearch.common.settings.Setting.Property.Dynamic; import static org.opensearch.common.settings.Setting.Property.NodeScope; /** @@ -45,7 +44,7 @@ public class TieredSpilloverCacheSettings { */ public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting( TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold", - (key) -> Setting.timeSetting(key, new TimeValue(10, TimeUnit.MILLISECONDS), NodeScope, Dynamic) + (key) -> Setting.timeSetting(key, new TimeValue(10, TimeUnit.MILLISECONDS), NodeScope) ); // 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range. // Will be tuned further with future benchmarks. diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b0937006a2587..351042ff56c1f 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -1700,10 +1700,10 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), out -> { long beforeQueryPhase = System.nanoTime(); queryPhase.execute(context); - CachedQueryResult cachedQueryResult = new CachedQueryResult(context.queryResult(), System.nanoTime() - beforeQueryPhase); - cachedQueryResult.writeToNoId(out); // Write relevant info for cache tier policies before the whole QuerySearchResult, so we don't have to read // the whole QSR into memory when we decide whether to allow it into a particular cache tier based on took time/other info + CachedQueryResult cachedQueryResult = new CachedQueryResult(context.queryResult(), System.nanoTime() - beforeQueryPhase); + cachedQueryResult.writeToNoId(out); loadedFromCache[0] = false; }); From dc6fb2e355cac3de4a1e8634bd2deaa6e3ab9553 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 15 Mar 2024 11:55:10 -0700 Subject: [PATCH 17/22] Added changelog entry Signed-off-by: Peter Alfonsi --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c7a1bda4e1d3..136745cf8f459 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -112,6 +112,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533)) - Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835)) - The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586)) +- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) From 7eb671c91d4b117c67d78f28b0c8d98954af8262 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 15 Mar 2024 12:01:37 -0700 Subject: [PATCH 18/22] Added missing javadoc Signed-off-by: Peter Alfonsi --- .../opensearch/cache/common/policy/package-info.java | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 modules/cache-common/src/main/java/org/opensearch/cache/common/policy/package-info.java diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/package-info.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/package-info.java new file mode 100644 index 0000000000000..45cfb00662c98 --- /dev/null +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** A package for policies controlling what can enter caches. */ +package org.opensearch.cache.common.policy; From d92c1bbea540f292e2b6e8278412a201e92615e2 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 15 Mar 2024 12:13:44 -0700 Subject: [PATCH 19/22] Fixed failed gradle run Signed-off-by: Peter Alfonsi --- .../cache/common/policy/TookTimePolicyTests.java | 9 --------- .../common/cache/policy/CachedQueryResult.java | 4 ++-- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java index f433b4f39f3f4..628a6316e8588 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java @@ -59,15 +59,6 @@ public void testTookTimePolicy() throws Exception { assertTrue(longResult); } - public void testMissingWrapper() throws Exception { - TookTimePolicy tookTimePolicy = getTookTimePolicy(TimeValue.ZERO); - BytesStreamOutput out = new BytesStreamOutput(); - getQSR().writeToNoId(out); - BytesReference missingWrapper = out.bytes(); - boolean allowedMissingWrapper = tookTimePolicy.test(missingWrapper); - assertFalse(allowedMissingWrapper); - } - public void testNegativeOneInput() throws Exception { // PolicyValues with -1 took time can be passed to this policy if we shouldn't accept it for whatever reason TookTimePolicy tookTimePolicy = getTookTimePolicy(TimeValue.ZERO); diff --git a/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java index 63fe3b07f2e5c..fa14fd6d2823f 100644 --- a/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java +++ b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java @@ -68,7 +68,7 @@ public PolicyValues(long tookTimeNanos) { } public PolicyValues(StreamInput in) throws IOException { - this.tookTimeNanos = in.readVLong(); + this.tookTimeNanos = in.readZLong(); } public long getTookTimeNanos() { @@ -77,7 +77,7 @@ public long getTookTimeNanos() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(tookTimeNanos); + out.writeZLong(tookTimeNanos); } } } From 2834a3842e0f408f0422ecab2bf32edd784573a3 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 15 Mar 2024 13:06:41 -0700 Subject: [PATCH 20/22] Added setting validation test Signed-off-by: Peter Alfonsi --- .../cache/common/policy/TookTimePolicy.java | 3 +++ .../common/tier/TieredSpilloverCacheSettings.java | 7 ++++++- .../cache/common/policy/TookTimePolicyTests.java | 4 ++++ .../common/tier/TieredSpilloverCacheTests.java | 15 +++++++++++++++ 4 files changed, 28 insertions(+), 1 deletion(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java index ee0d4243c0ed8..96ef027c17187 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java @@ -43,6 +43,9 @@ public class TookTimePolicy implements Predicate { * @param cachedResultParser the function providing policy values */ public TookTimePolicy(TimeValue threshold, Function cachedResultParser) { + if (threshold.compareTo(TimeValue.ZERO) < 0) { + throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep()); + } this.threshold = threshold; this.cachedResultParser = cachedResultParser; } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index 1264803642cd3..684307960b8a5 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -44,7 +44,12 @@ public class TieredSpilloverCacheSettings { */ public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting( TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold", - (key) -> Setting.timeSetting(key, new TimeValue(10, TimeUnit.MILLISECONDS), NodeScope) + (key) -> Setting.timeSetting( + key, + new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting + TimeValue.ZERO, // Minimum value for this setting + NodeScope + ) ); // 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range. // Will be tuned further with future benchmarks. diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java index 628a6316e8588..237c9c7b79db4 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java @@ -66,6 +66,10 @@ public void testNegativeOneInput() throws Exception { assertFalse(tookTimePolicy.test(minusOne)); } + public void testInvalidThreshold() throws Exception { + assertThrows(IllegalArgumentException.class, () -> getTookTimePolicy(TimeValue.MINUS_ONE)); + } + private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException { // When it's used in the cache, the policy will receive BytesReferences which come from // serializing a CachedQueryResult. diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index f0a4edecb5bb2..3e4fb0efd092e 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -19,6 +19,7 @@ import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; @@ -1018,6 +1019,20 @@ public CachedQueryResult.PolicyValues apply(String s) { } } + public void testMinimumThresholdSettingValue() throws Exception { + // Confirm we can't set TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD to below + // TimeValue.ZERO (for example, MINUS_ONE) + Setting concreteSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD + .getConcreteSettingForNamespace(CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()); + TimeValue validDuration = new TimeValue(0, TimeUnit.MILLISECONDS); + Settings validSettings = Settings.builder().put(concreteSetting.getKey(), validDuration).build(); + + Settings belowThresholdSettings = Settings.builder().put(concreteSetting.getKey(), TimeValue.MINUS_ONE).build(); + + assertThrows(IllegalArgumentException.class, () -> concreteSetting.get(belowThresholdSettings)); + assertEquals(validDuration, concreteSetting.get(validSettings)); + } + private static class AllowFirstLetterA implements Predicate { @Override public boolean test(String data) { From b9e922223a51f21d9dfa8f02998282db6e5ce86d Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 15 Mar 2024 13:13:13 -0700 Subject: [PATCH 21/22] rerun gradle for flaky IT Signed-off-by: Peter Alfonsi From 895449ba07e6bb8e24dc946a06360a9cb87512dd Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 15 Mar 2024 13:21:48 -0700 Subject: [PATCH 22/22] javadocs Signed-off-by: Peter Alfonsi --- .../org/opensearch/common/cache/policy/CachedQueryResult.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java index fa14fd6d2823f..0a98542a05bb7 100644 --- a/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java +++ b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java @@ -59,6 +59,10 @@ public void writeToNoId(StreamOutput out) throws IOException { qsr.writeToNoId(out); } + /** + * A class containing information needed for all cache policies + * to decide whether to admit a given value. + */ public static class PolicyValues implements Writeable { final long tookTimeNanos; // More values can be added here as they're needed for future policies