Skip to content

Commit

Permalink
[Backport 2.x] [Tiered Caching] Expose new cache stats API (#13237) (#…
Browse files Browse the repository at this point in the history
…13456)

Step 3 out of 4

---------

Signed-off-by: Peter Alfonsi <[email protected]>
Co-authored-by: Peter Alfonsi <[email protected]>
  • Loading branch information
peteralfonsi and Peter Alfonsi authored Apr 30, 2024
1 parent 5fe3568 commit 67001e2
Show file tree
Hide file tree
Showing 37 changed files with 1,378 additions and 160 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Tiered Caching] Expose new cache stats API ([#13237](https://github.com/opensearch-project/OpenSearch/pull/13237))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Tiered caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic ([#12941](https://github.com/opensearch-project/OpenSearch/pull/12941))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
Expand All @@ -90,6 +91,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -642,12 +645,47 @@ public <K, V> Map<K, V> readMap(Writeable.Reader<K> keyReader, Writeable.Reader<
return Collections.emptyMap();
}
Map<K, V> map = new HashMap<>(size);
readIntoMap(keyReader, valueReader, map, size);
return map;
}

/**
* Read a serialized map into a SortedMap using the default ordering for the keys. If the result is empty it might be immutable.
*/
public <K extends Comparable<K>, V> SortedMap<K, V> readOrderedMap(Writeable.Reader<K> keyReader, Writeable.Reader<V> valueReader)
throws IOException {
return readOrderedMap(keyReader, valueReader, null);
}

/**
* Read a serialized map into a SortedMap, specifying a Comparator for the keys. If the result is empty it might be immutable.
*/
public <K extends Comparable<K>, V> SortedMap<K, V> readOrderedMap(
Writeable.Reader<K> keyReader,
Writeable.Reader<V> valueReader,
@Nullable Comparator<K> keyComparator
) throws IOException {
int size = readArraySize();
if (size == 0) {
return Collections.emptySortedMap();
}
SortedMap<K, V> sortedMap;
if (keyComparator == null) {
sortedMap = new TreeMap<>();
} else {
sortedMap = new TreeMap<>(keyComparator);
}
readIntoMap(keyReader, valueReader, sortedMap, size);
return sortedMap;
}

private <K, V> void readIntoMap(Writeable.Reader<K> keyReader, Writeable.Reader<V> valueReader, Map<K, V> map, int size)
throws IOException {
for (int i = 0; i < size; i++) {
K key = keyReader.read(this);
V value = valueReader.read(this);
map.put(key, value);
}
return map;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void close() throws IOException {
}

@Override
public ImmutableCacheStatsHolder stats() {
public ImmutableCacheStatsHolder stats(String[] levels) {
return null; // TODO: in TSC stats PR
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public ImmutableCacheStatsHolder stats() {
return null;
}

@Override
public ImmutableCacheStatsHolder stats(String[] levels) {
return null;
}

@Override
public void close() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private EhcacheDiskCache(Builder<K, V> builder) {
this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder);
List<String> dimensionNames = Objects.requireNonNull(builder.dimensionNames, "Dimension names can't be null");
// If this cache is being used, FeatureFlags.PLUGGABLE_CACHE is already on, so we can always use the DefaultCacheStatsHolder.
this.cacheStatsHolder = new DefaultCacheStatsHolder(dimensionNames);
this.cacheStatsHolder = new DefaultCacheStatsHolder(dimensionNames, EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME);
}

@SuppressWarnings({ "rawtypes" })
Expand Down Expand Up @@ -446,12 +446,13 @@ public void close() {
}

/**
* Relevant stats for this cache.
* @return CacheStats
* Relevant stats for this cache, aggregated by levels.
* @param levels The levels to aggregate by.
* @return ImmutableCacheStatsHolder
*/
@Override
public ImmutableCacheStatsHolder stats() {
return cacheStatsHolder.getImmutableCacheStatsHolder();
public ImmutableCacheStatsHolder stats(String[] levels) {
return cacheStatsHolder.getImmutableCacheStatsHolder(levels);
}

/**
Expand Down Expand Up @@ -510,15 +511,15 @@ private long getNewValuePairSize(CacheEvent<? extends ICacheKey<K>, ? extends By
public void onEvent(CacheEvent<? extends ICacheKey<K>, ? extends ByteArrayWrapper> event) {
switch (event.getType()) {
case CREATED:
cacheStatsHolder.incrementEntries(event.getKey().dimensions);
cacheStatsHolder.incrementItems(event.getKey().dimensions);
cacheStatsHolder.incrementSizeInBytes(event.getKey().dimensions, getNewValuePairSize(event));
assert event.getOldValue() == null;
break;
case EVICTED:
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EVICTED)
);
cacheStatsHolder.decrementEntries(event.getKey().dimensions);
cacheStatsHolder.decrementItems(event.getKey().dimensions);
cacheStatsHolder.decrementSizeInBytes(event.getKey().dimensions, getOldValuePairSize(event));
cacheStatsHolder.incrementEvictions(event.getKey().dimensions);
assert event.getNewValue() == null;
Expand All @@ -527,15 +528,15 @@ public void onEvent(CacheEvent<? extends ICacheKey<K>, ? extends ByteArrayWrappe
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EXPLICIT)
);
cacheStatsHolder.decrementEntries(event.getKey().dimensions);
cacheStatsHolder.decrementItems(event.getKey().dimensions);
cacheStatsHolder.decrementSizeInBytes(event.getKey().dimensions, getOldValuePairSize(event));
assert event.getNewValue() == null;
break;
case EXPIRED:
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.INVALIDATED)
);
cacheStatsHolder.decrementEntries(event.getKey().dimensions);
cacheStatsHolder.decrementItems(event.getKey().dimensions);
cacheStatsHolder.decrementSizeInBytes(event.getKey().dimensions, getOldValuePairSize(event));
assert event.getNewValue() == null;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testBasicGetAndPut() throws IOException {
String value = ehcacheTest.get(getICacheKey(entry.getKey()));
assertEquals(entry.getValue(), value);
}
assertEquals(randomKeys, ehcacheTest.stats().getTotalEntries());
assertEquals(randomKeys, ehcacheTest.stats().getTotalItems());
assertEquals(randomKeys, ehcacheTest.stats().getTotalHits());
assertEquals(expectedSize, ehcacheTest.stats().getTotalSizeInBytes());
assertEquals(randomKeys, ehcacheTest.count());
Expand Down Expand Up @@ -217,7 +217,7 @@ public void testConcurrentPut() throws Exception {
assertEquals(entry.getValue(), value);
}
assertEquals(randomKeys, ehcacheTest.count());
assertEquals(randomKeys, ehcacheTest.stats().getTotalEntries());
assertEquals(randomKeys, ehcacheTest.stats().getTotalItems());
ehcacheTest.close();
}
}
Expand Down Expand Up @@ -416,7 +416,7 @@ public String load(ICacheKey<String> key) {
assertEquals(1, numberOfTimesValueLoaded);
assertEquals(0, ((EhcacheDiskCache) ehcacheTest).getCompletableFutureMap().size());
assertEquals(1, ehcacheTest.stats().getTotalMisses());
assertEquals(1, ehcacheTest.stats().getTotalEntries());
assertEquals(1, ehcacheTest.stats().getTotalItems());
assertEquals(numberOfRequest - 1, ehcacheTest.stats().getTotalHits());
assertEquals(1, ehcacheTest.count());
ehcacheTest.close();
Expand Down
Loading

0 comments on commit 67001e2

Please sign in to comment.