Skip to content

Commit

Permalink
Addressing Sorabh's comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Apr 25, 2024
1 parent e8ecfb7 commit 1a6ccf5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
/**
* Maintains caching tiers in ascending order of cache latency.
*/
private final Map<ICache<K, V>, Boolean> cacheList;
private final Map<ICache<K, V>, Boolean> caches;
private final List<Predicate<V>> policies;

private boolean isDiskCacheEnabled;

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
Expand All @@ -87,7 +85,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
@Override
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (isDiskCacheEnabled
if (caches.get(diskCache)
&& SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
&& evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
Expand All @@ -111,11 +109,11 @@ && evaluatePolicies(notification.getValue())) {

);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings());
Boolean isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings());
LinkedHashMap<ICache<K, V>, Boolean> cacheListMap = new LinkedHashMap<>();
cacheListMap.put(onHeapCache, true);
cacheListMap.put(diskCache, this.isDiskCacheEnabled);
this.cacheList = Collections.synchronizedMap(cacheListMap);
cacheListMap.put(diskCache, isDiskCacheEnabled);
this.caches = Collections.synchronizedMap(cacheListMap);
this.dimensionNames = builder.cacheConfig.getDimensionNames();
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
builder.cacheConfig.getClusterSettings()
Expand All @@ -136,8 +134,7 @@ ICache<K, V> getDiskCache() {
void enableDisableDiskCache(Boolean isDiskCacheEnabled) {
// When disk cache is disabled, we are not clearing up the disk cache entries yet as that should be part of
// separate cache/clear API.
this.cacheList.put(diskCache, isDiskCacheEnabled);
this.isDiskCacheEnabled = isDiskCacheEnabled;
this.caches.put(diskCache, isDiskCacheEnabled);
}

@Override
Expand Down Expand Up @@ -174,7 +171,7 @@ public void invalidate(ICacheKey<K> key) {
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also count hits/misses stats, so ignoring it for now.
try (ReleasableLock ignore = writeLock.acquire()) {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : cacheList.entrySet()) {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().invalidate(key);
}
}
Expand All @@ -183,7 +180,7 @@ public void invalidate(ICacheKey<K> key) {
@Override
public void invalidateAll() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : cacheList.entrySet()) {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().invalidateAll();
}
}
Expand All @@ -197,7 +194,7 @@ public void invalidateAll() {
@Override
public Iterable<ICacheKey<K>> keys() {
List<Iterable<ICacheKey<K>>> iterableList = new ArrayList<>();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : cacheList.entrySet()) {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
iterableList.add(cacheEntry.getKey().keys());
}
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) iterableList.toArray(new Iterable<?>[0]);
Expand All @@ -207,7 +204,7 @@ public Iterable<ICacheKey<K>> keys() {
@Override
public long count() {
long count = 0;
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : cacheList.entrySet()) {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
// Count for all the tiers irrespective of whether they are enabled or not. As eventually
// this will turn to zero once cache is cleared up either via invalidation or manually.
count += cacheEntry.getKey().count();
Expand All @@ -218,17 +215,15 @@ public long count() {
@Override
public void refresh() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : cacheList.entrySet()) {
if (cacheEntry.getValue()) {
cacheEntry.getKey().refresh();
}
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().refresh();
}
}
}

@Override
public void close() throws IOException {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : cacheList.entrySet()) {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
// Close all the caches here irrespective of whether they are enabled or not.
cacheEntry.getKey().close();

Check warning on line 228 in modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

View check run for this annotation

Codecov / codecov/patch

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java#L228

Added line #L228 was not covered by tests
}
Expand All @@ -242,7 +237,7 @@ public ImmutableCacheStatsHolder stats() {
private Function<ICacheKey<K>, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : cacheList.entrySet()) {
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
if (cacheEntry.getValue()) {
V value = cacheEntry.getKey().get(key);
if (value != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.common.cache.ICache;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.plugins.CachePlugin;
import org.opensearch.plugins.Plugin;

Expand Down Expand Up @@ -54,21 +53,15 @@ public Map<String, ICache.Factory> getCacheFactoryMap() {
@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> settingList = new ArrayList<>();
if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) {
for (CacheType cacheType : CacheType.values()) {
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
)
);
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
)
);
settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType));
settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType));
}
for (CacheType cacheType : CacheType.values()) {
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType));
settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType));
}
return settingList;
}
Expand Down

0 comments on commit 1a6ccf5

Please sign in to comment.