Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client-side caching by hashing command arguments #3700

Merged
merged 16 commits into from
Feb 15, 2024
Merged
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@
<version>2.10.1</version>
</dependency>

<!-- Optional dependencies -->
<!-- Client-side caching -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.16</version>
<optional>true</optional>
</dependency>

<!-- UNIX socket connection support -->
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
Expand All @@ -90,6 +111,7 @@
<version>1.19.0</version>
<scope>test</scope>
</dependency>

<!-- test -->
<dependency>
<groupId>junit</groupId>
Expand Down
109 changes: 71 additions & 38 deletions src/main/java/redis/clients/jedis/ClientSideCache.java
Original file line number Diff line number Diff line change
@@ -1,71 +1,104 @@
package redis.clients.jedis;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import redis.clients.jedis.exceptions.JedisException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import redis.clients.jedis.util.SafeEncoder;

public class ClientSideCache {
/**
* The class to manage the client-side caching. User can provide any of implementation of this class to the client
* object; e.g. {@link redis.clients.jedis.util.CaffeineCSC CaffeineCSC} or
* {@link redis.clients.jedis.util.GuavaCSC GuavaCSC} or a custom implementation of their own.
*/
public abstract class ClientSideCache {
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved

private final Map<ByteBuffer, Object> cache;
protected static final int DEFAULT_MAXIMUM_SIZE = 10_000;
protected static final int DEFAULT_EXPIRE_SECONDS = 100;

public ClientSideCache() {
this.cache = new HashMap<>();
}
private final Map<ByteBuffer, Set<Long>> keyToCommandHashes;

/**
* For testing purpose only.
* @param map
*/
ClientSideCache(Map<ByteBuffer, Object> map) {
this.cache = map;
protected ClientSideCache() {
this.keyToCommandHashes = new ConcurrentHashMap<>();
}

protected abstract void invalidateAllCommandHashes();

protected abstract void invalidateCommandHashes(Iterable<Long> hashes);

protected abstract void put(long hash, Object value);

protected abstract Object get(long hash);

protected abstract long getCommandHash(CommandObject command);

public final void clear() {
cache.clear();
invalidateAllKeysAndCommandHashes();
}

public final void invalidateKeys(List list) {
final void invalidate(List list) {
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
if (list == null) {
clear();
invalidateAllKeysAndCommandHashes();
return;
}

list.forEach(this::invalidateKey);
list.forEach(this::invalidateKeyAndRespectiveCommandHashes);
}

private void invalidateKey(Object key) {
if (key instanceof byte[]) {
cache.remove(convertKey((byte[]) key));
} else {
throw new JedisException("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
}
private void invalidateAllKeysAndCommandHashes() {
invalidateAllCommandHashes();
keyToCommandHashes.clear();
}

protected void setKey(Object key, Object value) {
cache.put(getMapKey(key), value);
}
private void invalidateKeyAndRespectiveCommandHashes(Object key) {
if (!(key instanceof byte[])) {
throw new AssertionError("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
}

protected <T> T getValue(Object key) {
return (T) getMapValue(key);
}
final ByteBuffer mapKey = makeKeyForKeyToCommandHashes((byte[]) key);

private Object getMapValue(Object key) {
return cache.get(getMapKey(key));
Set<Long> hashes = keyToCommandHashes.get(mapKey);
if (hashes != null) {
invalidateCommandHashes(hashes);
keyToCommandHashes.remove(mapKey);
}
}

private ByteBuffer getMapKey(Object key) {
if (key instanceof byte[]) {
return convertKey((byte[]) key);
} else {
return convertKey(SafeEncoder.encode(String.valueOf(key)));
final <T> T getValue(Function<CommandObject<T>, T> loader, CommandObject<T> command, String... keys) {

final long hash = getCommandHash(command);

T value = (T) get(hash);
if (value != null) {
return value;
}

value = loader.apply(command);
if (value != null) {
put(hash, value);
for (String key : keys) {
ByteBuffer mapKey = makeKeyForKeyToCommandHashes(key);
if (keyToCommandHashes.containsKey(mapKey)) {
keyToCommandHashes.get(mapKey).add(hash);
} else {
Set<Long> set = new HashSet<>();
set.add(hash);
keyToCommandHashes.put(mapKey, set);
}
}
}

return value;
}

private ByteBuffer makeKeyForKeyToCommandHashes(String key) {
return makeKeyForKeyToCommandHashes(SafeEncoder.encode(key));
}

private static ByteBuffer convertKey(byte[] b) {
private static ByteBuffer makeKeyForKeyToCommandHashes(byte[] b) {
return ByteBuffer.wrap(b);
}
}
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private static void processPush(final RedisInputStream is, ClientSideCache cache
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
cache.invalidateKeys((List) list.get(1));
cache.invalidate((List) list.get(1));
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig co
this.commandObjects.setBroadcastAndRoundRobinConfig(this.broadcastAndRoundRobinConfig);
}

private <T> T executeClientSideCacheCommand(CommandObject<T> command, String... keys) {
if (clientSideCache == null) {
return executeCommand(command);
}

return clientSideCache.getValue((cmd) -> executeCommand(cmd), command, keys);
}

public String ping() {
return checkAndBroadcastCommand(commandObjects.ping());
}
Expand Down Expand Up @@ -749,15 +757,7 @@ public String set(String key, String value, SetParams params) {

@Override
public String get(String key) {
if (clientSideCache != null) {
String cachedValue = clientSideCache.getValue(key);
if (cachedValue != null) return cachedValue;

String value = executeCommand(commandObjects.get(key));
if (value != null) clientSideCache.setKey(key, value);
return value;
}
return executeCommand(commandObjects.get(key));
return executeClientSideCacheCommand(commandObjects.get(key), key);
}

@Override
Expand Down
93 changes: 93 additions & 0 deletions src/main/java/redis/clients/jedis/util/CaffeineCSC.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package redis.clients.jedis.util;
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import net.openhft.hashing.LongHashFunction;
import redis.clients.jedis.ClientSideCache;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.args.Rawable;

public class CaffeineCSC extends ClientSideCache {

private static final LongHashFunction DEFAULT_HASH_FUNCTION = LongHashFunction.xx3();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO let's share with users a comment on why this function. Be kind to our future selves and community.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure... we'll also need why Caffeine, why Guava, why not only one, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%


private final Cache<Long, Object> cache;
private final LongHashFunction function;

public CaffeineCSC(Cache<Long, Object> caffeineCache, LongHashFunction hashFunction) {
this.cache = caffeineCache;
this.function = hashFunction;
}

@Override
protected final void invalidateAllCommandHashes() {
cache.invalidateAll();
}

@Override
protected void invalidateCommandHashes(Iterable<Long> hashes) {
cache.invalidateAll(hashes);
}

@Override
protected void put(long hash, Object value) {
cache.put(hash, value);
}

@Override
protected Object get(long hash) {
return cache.getIfPresent(hash);
}

@Override
protected final long getCommandHash(CommandObject command) {
long[] nums = new long[command.getArguments().size() + 1];
int idx = 0;
for (Rawable raw : command.getArguments()) {
nums[idx++] = function.hashBytes(raw.getRaw());
}
nums[idx] = function.hashInt(command.getBuilder().hashCode());
return function.hashLongs(nums);
}

public static Builder builder() {
return new Builder();
}

public static class Builder {

private long maximumSize = DEFAULT_MAXIMUM_SIZE;
private long expireTime = DEFAULT_EXPIRE_SECONDS;
private final TimeUnit expireTimeUnit = TimeUnit.SECONDS;

private LongHashFunction hashFunction = DEFAULT_HASH_FUNCTION;

private Builder() { }

public Builder maximumSize(int size) {
this.maximumSize = size;
return this;
}

public Builder ttl(int seconds) {
this.expireTime = seconds;
return this;
}

public Builder hashFunction(LongHashFunction function) {
this.hashFunction = function;
return this;
}

public CaffeineCSC build() {
Caffeine cb = Caffeine.newBuilder();

cb.maximumSize(maximumSize);

cb.expireAfterWrite(expireTime, expireTimeUnit);

return new CaffeineCSC(cb.build(), hashFunction);
}
}
}
Loading
Loading