Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Bugfix] [Tiered Caching] Fixes issues when integrating tiered cache with disk cache #13801

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setKeySerializer(builder.cacheConfig.getKeySerializer())
.setValueSerializer(builder.cacheConfig.getValueSerializer())
.setDimensionNames(builder.cacheConfig.getDimensionNames())
.setStatsTrackingEnabled(false)
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnable
@Override
@SuppressWarnings({ "unchecked" })
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
// As we can't directly IT with the tiered cache and ehcache, check that we receive non-null serializers, as an ehcache disk
// cache would require.
assert config.getKeySerializer() != null;
assert config.getValueSerializer() != null;
return new Builder<K, V>().setKeySerializer((Serializer<K, byte[]>) config.getKeySerializer())
.setValueSerializer((Serializer<V, byte[]>) config.getValueSerializer())
.setMaxSize(maxSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
Expand All @@ -32,6 +33,8 @@
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -166,6 +169,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(settings)
.setDimensionNames(dimensionNames)
.setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken
Expand Down Expand Up @@ -318,6 +323,8 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception {
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setDimensionNames(dimensionNames)
.setSettings(
Settings.builder()
Expand Down Expand Up @@ -830,6 +837,8 @@ public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exceptio
.setKeyType(String.class)
.setWeigher((k, v) -> 150)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(
Settings.builder()
.put(
Expand Down Expand Up @@ -1022,6 +1031,8 @@ public void testTookTimePolicyFromFactory() throws Exception {
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(settings)
.setMaxSizeInBytes(onHeapCacheSize * keyValueSize)
.setDimensionNames(dimensionNames)
Expand Down Expand Up @@ -1423,6 +1434,8 @@ private TieredSpilloverCache<String, String> intializeTieredSpilloverCache(
.setSettings(settings)
.setDimensionNames(dimensionNames)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(
Settings.builder()
.put(
Expand Down Expand Up @@ -1488,6 +1501,28 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache<?, ?> t
return snapshot;
}

// Duplicated here from EhcacheDiskCacheTests.java, we can't add a dependency on that plugin
static class StringSerializer implements Serializer<String, byte[]> {
private final Charset charset = StandardCharsets.UTF_8;

@Override
public byte[] serialize(String object) {
return object.getBytes(charset);
}

@Override
public String deserialize(byte[] bytes) {
if (bytes == null) {
return null;
}
return new String(bytes, charset);
}

public boolean equals(String object, byte[] bytes) {
return object.equals(deserialize(bytes));
}
}

private ImmutableCacheStats getTotalStatsSnapshot(TieredSpilloverCache<?, ?> tsc) throws IOException {
ImmutableCacheStatsHolder cacheStats = tsc.stats(new String[0]);
return cacheStats.getStatsForDimensionValues(List.of());
Expand Down
10 changes: 0 additions & 10 deletions plugins/cache-ehcache/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,3 @@ tasks.named("bundlePlugin").configure {
into 'config'
}
}

test {
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
systemProperty 'tests.security.manager', 'false'
}

internalClusterTest {
// TODO: Remove this later once we have a way.
systemProperty 'tests.security.manager', 'false'
}
2 changes: 1 addition & 1 deletion plugins/cache-ehcache/licenses/slf4j-api-LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
Expand Down Expand Up @@ -175,57 +177,60 @@

@SuppressWarnings({ "rawtypes" })
private Cache<ICacheKey, ByteArrayWrapper> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
try {
return this.cacheManager.createCache(
this.diskCacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
ICacheKey.class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(
ICacheKey key,
Supplier<? extends ByteArrayWrapper> oldValue,
ByteArrayWrapper newValue
) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_CONCURRENCY_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
// Creating the cache requires permissions specified in plugin-security.policy
return AccessController.doPrivileged((PrivilegedAction<Cache<ICacheKey, ByteArrayWrapper>>) () -> {
try {
return this.cacheManager.createCache(
this.diskCacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
ICacheKey.class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(
ICacheKey key,
Supplier<? extends ByteArrayWrapper> oldValue,
ByteArrayWrapper newValue
) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_CONCURRENCY_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
)
)
)
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer())
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer())
// We pass ByteArrayWrapperSerializer as ehcache's value serializer. If V is an interface, and we pass its
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
// before V hits ehcache.
);
} catch (IllegalArgumentException ex) {
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
throw ex;
} catch (IllegalStateException ex) {
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
throw ex;
}
);
} catch (IllegalArgumentException ex) {
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
throw ex;
} catch (IllegalStateException ex) {
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
throw ex;

Check warning on line 231 in plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java

View check run for this annotation

Codecov / codecov/patch

plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java#L226-L231

Added lines #L226 - L231 were not covered by tests
}
});
}

private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<K, V> builder) {
Expand All @@ -252,25 +257,28 @@
@SuppressForbidden(reason = "Ehcache uses File.io")
private PersistentCacheManager buildCacheManager() {
// In case we use multiple ehCaches, we can define this cache manager at a global level.
return CacheManagerBuilder.newCacheManagerBuilder()
.with(CacheManagerBuilder.persistence(new File(storagePath)))

.using(
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks
// like event listeners
.pool(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MIN_THREADS_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MAXIMUM_THREADS_KEY)
.get(settings)
)
.build()
)
.build(true);
// Creating the cache manager also requires permissions specified in plugin-security.policy
return AccessController.doPrivileged((PrivilegedAction<PersistentCacheManager>) () -> {
return CacheManagerBuilder.newCacheManagerBuilder()
.with(CacheManagerBuilder.persistence(new File(storagePath)))

.using(
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks
// like event listeners
.pool(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MIN_THREADS_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MAXIMUM_THREADS_KEY)
.get(settings)
)
.build()
)
.build(true);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@
grant {
permission java.lang.RuntimePermission "accessClassInPackage.sun.misc";
permission java.lang.RuntimePermission "createClassLoader";
permission java.lang.RuntimePermission "accessDeclaredMembers";
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
permission java.lang.RuntimePermission "getClassLoader";
};

Loading