Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Tiered Caching] Stats rework (1/3): Interfaces and implementations for individual tiers #13247

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
- Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569))
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -54,7 +56,11 @@

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;

// The listener for removals from the spillover cache as a whole
// TODO: In TSC stats PR, each tier will have its own separate removal listener.
private final RemovalListener<ICacheKey<K>, V> removalListener;
private final List<String> dimensionNames;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
Expand All @@ -70,9 +76,9 @@
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<K, V>() {
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<ICacheKey<K>, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
&& evaluatePolicies(notification.getValue())) {
Expand All @@ -87,6 +93,7 @@
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setDimensionNames(builder.cacheConfig.getDimensionNames())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes())
.setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess())
.setClusterSettings(builder.cacheConfig.getClusterSettings())
Expand All @@ -97,7 +104,7 @@
);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);

this.dimensionNames = builder.cacheConfig.getDimensionNames();
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
}

Expand All @@ -112,19 +119,19 @@
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
return getValueFromTieredCache().apply(key);
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
}
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {

V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
Expand All @@ -141,7 +148,7 @@
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
Expand All @@ -167,9 +174,9 @@
*/
@SuppressWarnings({ "unchecked" })
@Override
public Iterable<K> keys() {
Iterable<K>[] iterables = (Iterable<K>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<K>(iterables);
public Iterable<ICacheKey<K>> keys() {
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<ICacheKey<K>>(iterables);
}

@Override
Expand Down Expand Up @@ -197,7 +204,12 @@
}
}

private Function<K, V> getValueFromTieredCache() {
@Override
public ImmutableCacheStatsHolder stats() {
return null; // TODO: in TSC stats PR

Check warning on line 209 in modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

View check run for this annotation

Codecov / codecov/patch

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java#L209

Added line #L209 was not covered by tests
}

private Function<ICacheKey<K>, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
Expand Down Expand Up @@ -354,7 +366,7 @@
public static class Builder<K, V> {
private ICache.Factory onHeapCacheFactory;
private ICache.Factory diskCacheFactory;
private RemovalListener<K, V> removalListener;
private RemovalListener<ICacheKey<K>, V> removalListener;
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
Expand Down Expand Up @@ -390,7 +402,7 @@
* @param removalListener Removal listener
* @return builder
*/
public Builder<K, V> setRemovalListener(RemovalListener<K, V> removalListener) {
public Builder<K, V> setRemovalListener(RemovalListener<ICacheKey<K>, V> removalListener) {
this.removalListener = removalListener;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

Expand All @@ -25,27 +27,27 @@

public class MockDiskCache<K, V> implements ICache<K, V> {

Map<K, V> cache;
Map<ICacheKey<K>, V> cache;
int maxSize;
long delay;

private final RemovalListener<K, V> removalListener;
private final RemovalListener<ICacheKey<K>, V> removalListener;

public MockDiskCache(int maxSize, long delay, RemovalListener<K, V> removalListener) {
public MockDiskCache(int maxSize, long delay, RemovalListener<ICacheKey<K>, V> removalListener) {
this.maxSize = maxSize;
this.delay = delay;
this.removalListener = removalListener;
this.cache = new ConcurrentHashMap<K, V>();
this.cache = new ConcurrentHashMap<ICacheKey<K>, V>();
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
V value = cache.get(key);
return value;
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
this.removalListener.onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED));
}
Expand All @@ -58,7 +60,7 @@ public void put(K key, V value) {
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) {
V value = cache.computeIfAbsent(key, key1 -> {
try {
return loader.load(key);
Expand All @@ -70,7 +72,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
this.cache.remove(key);
}

Expand All @@ -80,7 +82,7 @@ public void invalidateAll() {
}

@Override
public Iterable<K> keys() {
public Iterable<ICacheKey<K>> keys() {
return () -> new CacheKeyIterator<>(cache, removalListener);
}

Expand All @@ -92,6 +94,11 @@ public long count() {
@Override
public void refresh() {}

@Override
public ImmutableCacheStatsHolder stats() {
return null;
}

@Override
public void close() {

Expand Down
Loading
Loading