Skip to content

Commit

Permalink
[Tiered Caching] Make took time policy dynamic and add additional int…
Browse files Browse the repository at this point in the history
…eg tests (#13063)


---------

Signed-off-by: Sagar Upadhyaya <[email protected]>
Signed-off-by: Sagar <[email protected]>
  • Loading branch information
sgup432 committed Apr 11, 2024
1 parent 5d939b9 commit 52ce070
Show file tree
Hide file tree
Showing 14 changed files with 600 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
- [Tiered Caching] Make took time caching policy setting dynamic ([#13063](https://github.com/opensearch-project/OpenSearch/pull/13063))
- Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569))
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@

package org.opensearch.cache.common.policy;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;

import java.util.function.Function;
import java.util.function.Predicate;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold.
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
Expand All @@ -30,7 +34,7 @@ public class TookTimePolicy<V> implements Predicate<V> {
/**
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through.
*/
private final TimeValue threshold;
private TimeValue threshold;

/**
* Function which extracts the relevant PolicyValues from a serialized CachedQueryResult
Expand All @@ -41,13 +45,25 @@ public class TookTimePolicy<V> implements Predicate<V> {
* Constructs a took time policy.
* @param threshold the threshold
* @param cachedResultParser the function providing policy values
* @param clusterSettings cluster settings
* @param cacheType cache type
*/
public TookTimePolicy(TimeValue threshold, Function<V, CachedQueryResult.PolicyValues> cachedResultParser) {
public TookTimePolicy(
TimeValue threshold,
Function<V, CachedQueryResult.PolicyValues> cachedResultParser,
ClusterSettings clusterSettings,
CacheType cacheType
) {
if (threshold.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
}
this.threshold = threshold;
this.cachedResultParser = cachedResultParser;
clusterSettings.addSettingsUpdateConsumer(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType), this::setThreshold);
}

private void setThreshold(TimeValue threshold) {
this.threshold = threshold;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -47,6 +49,9 @@
@ExperimentalApi
public class TieredSpilloverCache<K, V> implements ICache<K, V> {

// Used to avoid caching stale entries in lower tiers.
private static final List<RemovalReason> SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY);

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;
Expand All @@ -69,8 +74,11 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (evaluatePolicies(notification.getValue())) {
if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
&& evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
} else {
removalListener.onRemoval(notification);
}
}
}
Expand All @@ -81,6 +89,7 @@ public void onRemoval(RemovalNotification<K, V> notification) {
.setWeigher(builder.cacheConfig.getWeigher())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes())
.setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess())
.setClusterSettings(builder.cacheConfig.getClusterSettings())
.build(),
builder.cacheType,
builder.cacheFactories
Expand Down Expand Up @@ -156,10 +165,11 @@ public void invalidateAll() {
* Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache.
* @return An iterable over (onHeap + disk) keys
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked" })
@Override
public Iterable<K> keys() {
return Iterables.concat(onHeapCache.keys(), diskCache.keys());
Iterable<K>[] iterables = (Iterable<K>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<K>(iterables);
}

@Override
Expand Down Expand Up @@ -213,6 +223,67 @@ boolean evaluatePolicies(V value) {
return true;
}

/**
* ConcatenatedIterables which combines cache iterables and supports remove() functionality as well if underlying
* iterator supports it.
* @param <K> Type of key.
*/
static class ConcatenatedIterables<K> implements Iterable<K> {

final Iterable<K>[] iterables;

ConcatenatedIterables(Iterable<K>[] iterables) {
this.iterables = iterables;
}

@SuppressWarnings({ "unchecked" })
@Override
public Iterator<K> iterator() {
Iterator<K>[] iterators = (Iterator<K>[]) new Iterator<?>[iterables.length];
for (int i = 0; i < iterables.length; i++) {
iterators[i] = iterables[i].iterator();
}
return new ConcatenatedIterator<>(iterators);
}

static class ConcatenatedIterator<T> implements Iterator<T> {
private final Iterator<T>[] iterators;
private int currentIteratorIndex;
private Iterator<T> currentIterator;

public ConcatenatedIterator(Iterator<T>[] iterators) {
this.iterators = iterators;
this.currentIteratorIndex = 0;
this.currentIterator = iterators[currentIteratorIndex];
}

@Override
public boolean hasNext() {
while (!currentIterator.hasNext()) {
currentIteratorIndex++;
if (currentIteratorIndex == iterators.length) {
return false;
}
currentIterator = iterators[currentIteratorIndex];
}
return true;
}

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return currentIterator.next();
}

@Override
public void remove() {
currentIterator.remove();
}
}
}

/**
* Factory to create TieredSpilloverCache objects.
*/
Expand Down Expand Up @@ -253,8 +324,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
}
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);

TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD
.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)
.get(settings);
Function<V, CachedQueryResult.PolicyValues> cachedResultParser = Objects.requireNonNull(
config.getCachedResultParser(),
Expand All @@ -266,7 +336,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser))
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* Plugin for TieredSpilloverCache.
*/
Expand Down Expand Up @@ -51,11 +53,7 @@ public List<Setting<?>> getSettings() {
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
)
);
settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType));
}
return settingList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.common.settings.Setting.Property.NodeScope;
Expand Down Expand Up @@ -42,17 +45,36 @@ public class TieredSpilloverCacheSettings {
/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
*/
public static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
private static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
(key) -> Setting.timeSetting(
key,
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
TimeValue.ZERO, // Minimum value for this setting
NodeScope
NodeScope,
Setting.Property.Dynamic
)
);
// 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range.
// Will be tuned further with future benchmarks.

/**
* Stores took time policy settings for various cache types as these are dynamic so that can be registered and
* retrieved accordingly.
*/
public static final Map<CacheType, Setting<TimeValue>> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* Fetches concrete took time policy settings.
*/
static {
Map<CacheType, Setting<TimeValue>> concreteTookTimePolicySettingMap = new HashMap<>();
for (CacheType cacheType : CacheType.values()) {
concreteTookTimePolicySettingMap.put(
cacheType,
TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
}
TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = concreteTookTimePolicySettingMap;
}

/**
* Default constructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,27 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.function.Function;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

public class TookTimePolicyTests extends OpenSearchTestCase {
private final Function<BytesReference, CachedQueryResult.PolicyValues> transformationFunction = (data) -> {
try {
Expand All @@ -35,8 +42,17 @@ public class TookTimePolicyTests extends OpenSearchTestCase {
}
};

private ClusterSettings clusterSettings;

@Before
public void setup() {
Settings settings = Settings.EMPTY;
clusterSettings = new ClusterSettings(settings, new HashSet<>());
clusterSettings.registerSetting(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE));
}

private TookTimePolicy<BytesReference> getTookTimePolicy(TimeValue threshold) {
return new TookTimePolicy<>(threshold, transformationFunction);
return new TookTimePolicy<>(threshold, transformationFunction, clusterSettings, CacheType.INDICES_REQUEST_CACHE);
}

public void testTookTimePolicy() throws Exception {
Expand Down
Loading

0 comments on commit 52ce070

Please sign in to comment.