Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Moving query recomputation logic outside of write lock #14187

Merged
merged 13 commits into from
Jun 25, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `opentelemetry` from 1.36.0 to 1.39.0 ([#14457](https://github.com/opensearch-project/OpenSearch/pull/14457))

### Changed
- [Tiered Caching] Move query recomputation logic outside write lock ([#14187](https://github.com/opensearch-project/OpenSearch/pull/14187))
- unsignedLongRangeQuery now returns MatchNoDocsQuery if the lower bounds are greater than the upper bounds ([#14416](https://github.com/opensearch-project/OpenSearch/pull/14416))
- Updated the `indices.query.bool.max_clause_count` setting from being static to dynamically updateable ([#13568](https://github.com/opensearch-project/OpenSearch/pull/13568))
- Make the class CommunityIdProcessor final ([#14448](https://github.com/opensearch-project/OpenSearch/pull/14448))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.cache.common.tier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cache.common.policy.TookTimePolicy;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
Expand Down Expand Up @@ -35,9 +37,13 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.ToLongBiFunction;
Expand All @@ -61,6 +67,7 @@

// Used to avoid caching stale entries in lower tiers.
private static final List<RemovalReason> SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY);
private static final Logger logger = LogManager.getLogger(TieredSpilloverCache.class);

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
Expand All @@ -86,6 +93,12 @@
private final Map<ICache<K, V>, TierInfo> caches;
private final List<Predicate<V>> policies;

/**
* This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value
* only once.
*/
Map<ICacheKey<K>, CompletableFuture<Tuple<ICacheKey<K>, V>>> completableFutureMap = new ConcurrentHashMap<>();

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
Expand Down Expand Up @@ -190,10 +203,7 @@
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = null;
try (ReleasableLock ignore = writeLock.acquire()) {
value = onHeapCache.computeIfAbsent(key, loader);
}
V value = compute(key, loader);
// Handle stats
if (loader.isLoaded()) {
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache
Expand Down Expand Up @@ -222,6 +232,57 @@
return cacheValueTuple.v1();
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
CompletableFuture<Tuple<ICacheKey<K>, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
// Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Void> handler = (pair, ex) -> {
if (pair != null) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(pair.v1(), pair.v2());
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
// exception.
logger.warn("Exception occurred while putting item onto heap cache", e);
}
} else {
if (ex != null) {
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
completableFutureMap.remove(key); // Remove key from map as not needed anymore.
return null;
};
V value = null;
if (future == null) {
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
future = completableFutureMap.get(key);
future.handle(handler);
try {
value = loader.load(key);
} catch (Exception ex) {
future.completeExceptionally(ex);
throw new ExecutionException(ex);
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
}
if (value == null) {
NullPointerException npe = new NullPointerException("Loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Tuple<>(key, value));
}
} else {
try {
value = future.get().v2();
} catch (InterruptedException ex) {
throw new IllegalStateException(ex);

Check warning on line 280 in modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

View check run for this annotation

Codecov / codecov/patch

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java#L279-L280

Added lines #L279 - L280 were not covered by tests
}
}
return value;
}

@Override
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
Expand Down Expand Up @@ -328,12 +389,22 @@
ICacheKey<K> key = notification.getKey();
boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason());
boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier
if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) {
boolean exceptionOccurredOnDiskCachePut = false;
boolean canCacheOnDisk = caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue());
if (canCacheOnDisk) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats
} catch (Exception ex) {
// TODO: Catch specific exceptions. Needs better exception handling. We are just swallowing exception
// in this case as it shouldn't cause upstream request to fail.
logger.warn("Exception occurred while putting item to disk cache", ex);
exceptionOccurredOnDiskCachePut = true;
}
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
} else {
if (!exceptionOccurredOnDiskCachePut) {
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
}
}
if (!canCacheOnDisk || exceptionOccurredOnDiskCachePut) {
// If the value is not going to the disk cache, send this notification to the TSC's removal listener
// as the value is leaving the TSC entirely
removalListener.onRemoval(notification);
Expand Down
Loading
Loading