Skip to content

Commit

Permalink
Client-side caching by hashing command arguments (#3700)
Browse files Browse the repository at this point in the history
* Support TTL in client side caching (using Caffeine library)

* Also Guava cache

* format pom.xml

* Client-side caching by command arguments

TODO: Compute hash code.

* send keys

* todo comment for clean-up

* rename method to invalidate

* Client-side caching by hashing command arguments

* Hash command arguments for CaffeineCSC using OpenHFT hashing

* Clean-up keyHashes map

* added javadoc

* rename method

* remove lock

* descriptive name

* descriptive names and fix

* common default values in base class
  • Loading branch information
sazzad16 authored Feb 15, 2024
1 parent 3ab6bdc commit 5f1d8c6
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 78 deletions.
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@
<version>2.10.1</version>
</dependency>

<!-- Optional dependencies -->
<!-- Client-side caching -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.16</version>
<optional>true</optional>
</dependency>

<!-- UNIX socket connection support -->
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
Expand All @@ -90,6 +111,7 @@
<version>1.19.0</version>
<scope>test</scope>
</dependency>

<!-- test -->
<dependency>
<groupId>junit</groupId>
Expand Down
109 changes: 71 additions & 38 deletions src/main/java/redis/clients/jedis/ClientSideCache.java
Original file line number Diff line number Diff line change
@@ -1,71 +1,104 @@
package redis.clients.jedis;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import redis.clients.jedis.exceptions.JedisException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import redis.clients.jedis.util.SafeEncoder;

public class ClientSideCache {
/**
* The class to manage the client-side caching. User can provide any of implementation of this class to the client
* object; e.g. {@link redis.clients.jedis.util.CaffeineCSC CaffeineCSC} or
* {@link redis.clients.jedis.util.GuavaCSC GuavaCSC} or a custom implementation of their own.
*/
public abstract class ClientSideCache {

private final Map<ByteBuffer, Object> cache;
protected static final int DEFAULT_MAXIMUM_SIZE = 10_000;
protected static final int DEFAULT_EXPIRE_SECONDS = 100;

public ClientSideCache() {
this.cache = new HashMap<>();
}
private final Map<ByteBuffer, Set<Long>> keyToCommandHashes;

/**
* For testing purpose only.
* @param map
*/
ClientSideCache(Map<ByteBuffer, Object> map) {
this.cache = map;
protected ClientSideCache() {
this.keyToCommandHashes = new ConcurrentHashMap<>();
}

protected abstract void invalidateAllCommandHashes();

protected abstract void invalidateCommandHashes(Iterable<Long> hashes);

protected abstract void put(long hash, Object value);

protected abstract Object get(long hash);

protected abstract long getCommandHash(CommandObject command);

public final void clear() {
cache.clear();
invalidateAllKeysAndCommandHashes();
}

public final void invalidateKeys(List list) {
final void invalidate(List list) {
if (list == null) {
clear();
invalidateAllKeysAndCommandHashes();
return;
}

list.forEach(this::invalidateKey);
list.forEach(this::invalidateKeyAndRespectiveCommandHashes);
}

private void invalidateKey(Object key) {
if (key instanceof byte[]) {
cache.remove(convertKey((byte[]) key));
} else {
throw new JedisException("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
}
private void invalidateAllKeysAndCommandHashes() {
invalidateAllCommandHashes();
keyToCommandHashes.clear();
}

protected void setKey(Object key, Object value) {
cache.put(getMapKey(key), value);
}
private void invalidateKeyAndRespectiveCommandHashes(Object key) {
if (!(key instanceof byte[])) {
throw new AssertionError("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
}

protected <T> T getValue(Object key) {
return (T) getMapValue(key);
}
final ByteBuffer mapKey = makeKeyForKeyToCommandHashes((byte[]) key);

private Object getMapValue(Object key) {
return cache.get(getMapKey(key));
Set<Long> hashes = keyToCommandHashes.get(mapKey);
if (hashes != null) {
invalidateCommandHashes(hashes);
keyToCommandHashes.remove(mapKey);
}
}

private ByteBuffer getMapKey(Object key) {
if (key instanceof byte[]) {
return convertKey((byte[]) key);
} else {
return convertKey(SafeEncoder.encode(String.valueOf(key)));
final <T> T getValue(Function<CommandObject<T>, T> loader, CommandObject<T> command, String... keys) {

final long hash = getCommandHash(command);

T value = (T) get(hash);
if (value != null) {
return value;
}

value = loader.apply(command);
if (value != null) {
put(hash, value);
for (String key : keys) {
ByteBuffer mapKey = makeKeyForKeyToCommandHashes(key);
if (keyToCommandHashes.containsKey(mapKey)) {
keyToCommandHashes.get(mapKey).add(hash);
} else {
Set<Long> set = new HashSet<>();
set.add(hash);
keyToCommandHashes.put(mapKey, set);
}
}
}

return value;
}

private ByteBuffer makeKeyForKeyToCommandHashes(String key) {
return makeKeyForKeyToCommandHashes(SafeEncoder.encode(key));
}

private static ByteBuffer convertKey(byte[] b) {
private static ByteBuffer makeKeyForKeyToCommandHashes(byte[] b) {
return ByteBuffer.wrap(b);
}
}
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private static void processPush(final RedisInputStream is, ClientSideCache cache
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
cache.invalidateKeys((List) list.get(1));
cache.invalidate((List) list.get(1));
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig co
this.commandObjects.setBroadcastAndRoundRobinConfig(this.broadcastAndRoundRobinConfig);
}

private <T> T executeClientSideCacheCommand(CommandObject<T> command, String... keys) {
if (clientSideCache == null) {
return executeCommand(command);
}

return clientSideCache.getValue((cmd) -> executeCommand(cmd), command, keys);
}

public String ping() {
return checkAndBroadcastCommand(commandObjects.ping());
}
Expand Down Expand Up @@ -749,15 +757,7 @@ public String set(String key, String value, SetParams params) {

@Override
public String get(String key) {
if (clientSideCache != null) {
String cachedValue = clientSideCache.getValue(key);
if (cachedValue != null) return cachedValue;

String value = executeCommand(commandObjects.get(key));
if (value != null) clientSideCache.setKey(key, value);
return value;
}
return executeCommand(commandObjects.get(key));
return executeClientSideCacheCommand(commandObjects.get(key), key);
}

@Override
Expand Down
93 changes: 93 additions & 0 deletions src/main/java/redis/clients/jedis/util/CaffeineCSC.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package redis.clients.jedis.util;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import net.openhft.hashing.LongHashFunction;
import redis.clients.jedis.ClientSideCache;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.args.Rawable;

public class CaffeineCSC extends ClientSideCache {

private static final LongHashFunction DEFAULT_HASH_FUNCTION = LongHashFunction.xx3();

private final Cache<Long, Object> cache;
private final LongHashFunction function;

public CaffeineCSC(Cache<Long, Object> caffeineCache, LongHashFunction hashFunction) {
this.cache = caffeineCache;
this.function = hashFunction;
}

@Override
protected final void invalidateAllCommandHashes() {
cache.invalidateAll();
}

@Override
protected void invalidateCommandHashes(Iterable<Long> hashes) {
cache.invalidateAll(hashes);
}

@Override
protected void put(long hash, Object value) {
cache.put(hash, value);
}

@Override
protected Object get(long hash) {
return cache.getIfPresent(hash);
}

@Override
protected final long getCommandHash(CommandObject command) {
long[] nums = new long[command.getArguments().size() + 1];
int idx = 0;
for (Rawable raw : command.getArguments()) {
nums[idx++] = function.hashBytes(raw.getRaw());
}
nums[idx] = function.hashInt(command.getBuilder().hashCode());
return function.hashLongs(nums);
}

public static Builder builder() {
return new Builder();
}

public static class Builder {

private long maximumSize = DEFAULT_MAXIMUM_SIZE;
private long expireTime = DEFAULT_EXPIRE_SECONDS;
private final TimeUnit expireTimeUnit = TimeUnit.SECONDS;

private LongHashFunction hashFunction = DEFAULT_HASH_FUNCTION;

private Builder() { }

public Builder maximumSize(int size) {
this.maximumSize = size;
return this;
}

public Builder ttl(int seconds) {
this.expireTime = seconds;
return this;
}

public Builder hashFunction(LongHashFunction function) {
this.hashFunction = function;
return this;
}

public CaffeineCSC build() {
Caffeine cb = Caffeine.newBuilder();

cb.maximumSize(maximumSize);

cb.expireAfterWrite(expireTime, expireTimeUnit);

return new CaffeineCSC(cb.build(), hashFunction);
}
}
}
Loading

0 comments on commit 5f1d8c6

Please sign in to comment.