Skip to content

Commit

Permalink
Prepare client side caching - design 2 (#3889)
Browse files Browse the repository at this point in the history
* Separate CacheConnection

* Introduce CacheKey and CacheEntry

* Little tweak maximumSize test in CaffeineClientSideCacheTest

* Remove resetting timeout; we'll PING instead
  • Loading branch information
sazzad16 authored Jul 25, 2024
1 parent 148d4bb commit 8b83218
Show file tree
Hide file tree
Showing 18 changed files with 247 additions and 145 deletions.
48 changes: 16 additions & 32 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.annots.Internal;
import redis.clients.jedis.args.ClientAttributeOption;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
Expand All @@ -31,12 +31,11 @@
public class Connection implements Closeable {

private ConnectionPool memberOf;
private RedisProtocol protocol;
protected RedisProtocol protocol;
private final JedisSocketFactory socketFactory;
private Socket socket;
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private ClientSideCache clientSideCache;
private int soTimeout = 0;
private int infiniteSoTimeout = 0;
private boolean broken = false;
Expand Down Expand Up @@ -68,16 +67,6 @@ public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clie
initializeFromClientConfig(clientConfig);
}

@Experimental
public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig,
ClientSideCache clientSideCache) {
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeFromClientConfig(clientConfig);
initializeClientSideCache(clientSideCache);
}

@Override
public String toString() {
return "Connection{" + socketFactory + "}";
Expand Down Expand Up @@ -352,16 +341,26 @@ protected void flush() {
}
}

@Experimental
@Internal
protected Object protocolRead(RedisInputStream is) {
return Protocol.read(is);
}

@Experimental
@Internal
protected void protocolReadPushes(RedisInputStream is) {
}

// TODO: final
protected Object readProtocolWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read from a broken connection.");
}

try {
return Protocol.read(inputStream, clientSideCache);
// Object read = Protocol.read(inputStream);
// System.out.println(redis.clients.jedis.util.SafeEncoder.encodeObject(read));
// return read;
protocolReadPushes(inputStream);
return protocolRead(inputStream);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
Expand Down Expand Up @@ -525,19 +524,4 @@ public boolean ping() {
}
return true;
}

private void initializeClientSideCache(ClientSideCache csCache) {
this.clientSideCache = csCache;
if (clientSideCache != null) {
if (protocol != RedisProtocol.RESP3) {
throw new JedisException("Client side caching is only supported with RESP3.");
}

sendCommand(Protocol.Command.CLIENT, "TRACKING", "ON");
String reply = getStatusCodeReply();
if (!"OK".equals(reply)) {
throw new JedisException("Could not enable client tracking. Reply: " + reply);
}
}
}
}
5 changes: 4 additions & 1 deletion src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.csc.CacheConnection;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.exceptions.JedisException;

Expand Down Expand Up @@ -62,7 +64,8 @@ public void destroyObject(PooledObject<Connection> pooledConnection) throws Exce
@Override
public PooledObject<Connection> makeObject() throws Exception {
try {
Connection jedis = new Connection(jedisSocketFactory, clientConfig, clientSideCache);
Connection jedis = clientSideCache == null ? new Connection(jedisSocketFactory, clientConfig)
: new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache);
return new DefaultPooledObject<>(jedis);
} catch (JedisException je) {
logger.debug("Error while makeObject", je);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private void discoverClusterSlots(Connection jedis) {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.flush();
}
Set<String> hostAndPortKeys = new HashSet<>();

Expand Down
18 changes: 5 additions & 13 deletions src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import redis.clients.jedis.annots.Experimental;

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.exceptions.*;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.commands.ProtocolCommand;
Expand Down Expand Up @@ -207,23 +207,15 @@ private static List<KeyValue> processMapKeyValueReply(final RedisInputStream is)
return ret;
}

@Deprecated
public static Object read(final RedisInputStream is) {
return process(is);
}

@Experimental
public static Object read(final RedisInputStream is, final ClientSideCache cache) {
readPushes(is, cache);
return process(is);
}

private static void readPushes(final RedisInputStream is, final ClientSideCache cache) {
if (cache != null) {
while (is.peek(GREATER_THAN_BYTE)) {
is.readByte();
processPush(is, cache);
}
public static void readPushes(final RedisInputStream is, final ClientSideCache cache) {
while (is.peek(GREATER_THAN_BYTE)) {
is.readByte();
processPush(is, cache);
}
}

Expand Down
66 changes: 66 additions & 0 deletions src/main/java/redis/clients/jedis/csc/CacheConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package redis.clients.jedis.csc;

import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import redis.clients.jedis.Connection;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.JedisSocketFactory;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.RedisProtocol;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.RedisInputStream;

public class CacheConnection extends Connection {

private final ClientSideCache clientSideCache;
private final ReentrantLock lock;

public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig,
ClientSideCache clientSideCache) {

super(socketFactory, clientConfig);

if (protocol != RedisProtocol.RESP3) {
throw new JedisException("Client side caching is only supported with RESP3.");
}
this.clientSideCache = Objects.requireNonNull(clientSideCache);
initializeClientSideCache();

lock = new ReentrantLock();
}

@Override
protected Object protocolRead(RedisInputStream inputStream) {
if (lock != null) {
lock.lock();
try {
return Protocol.read(inputStream);
} finally {
lock.unlock();
}
} else {
return Protocol.read(inputStream);
}
}

@Override
protected void protocolReadPushes(RedisInputStream inputStream) {
if (lock != null && lock.tryLock()) {
try {
//super.setSoTimeout(1);
Protocol.readPushes(inputStream, clientSideCache);
} finally {
//super.rollbackTimeout();
lock.unlock();
}
}
}

private void initializeClientSideCache() {
sendCommand(Protocol.Command.CLIENT, "TRACKING", "ON");
String reply = getStatusCodeReply();
if (!"OK".equals(reply)) {
throw new JedisException("Could not enable client tracking. Reply: " + reply);
}
}
}
30 changes: 30 additions & 0 deletions src/main/java/redis/clients/jedis/csc/CacheEntry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package redis.clients.jedis.csc;

import redis.clients.jedis.Connection;
import redis.clients.jedis.annots.Internal;

@Internal
public class CacheEntry<T> {

private final CacheKey<T> cacheKey;
private final T value;
private final Connection connection;

public CacheEntry(CacheKey<T> cacheKey, T value, Connection connection) {
this.cacheKey = cacheKey;
this.value = value;
this.connection = connection;
}

public CacheKey<T> getCacheKey() {
return cacheKey;
}

public T getValue() {
return value;
}

public Connection getConnection() {
return connection;
}
}
28 changes: 28 additions & 0 deletions src/main/java/redis/clients/jedis/csc/CacheKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package redis.clients.jedis.csc;

import java.util.Objects;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.annots.Internal;

@Internal
public class CacheKey<T> {

private final CommandObject<T> command;

public CacheKey(CommandObject<T> command) {
this.command = Objects.requireNonNull(command);
}

@Override
public int hashCode() {
return command.hashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
final CacheKey other = (CacheKey) obj;
return Objects.equals(this.command, other.command);
}
}
19 changes: 9 additions & 10 deletions src/main/java/redis/clients/jedis/csc/CaffeineClientSideCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,33 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.CommandObject;

public class CaffeineClientSideCache extends ClientSideCache {

private final Cache<CommandObject, Object> cache;
private final Cache<CacheKey, CacheEntry> cache;

public CaffeineClientSideCache(Cache<CommandObject, Object> caffeineCache) {
public CaffeineClientSideCache(Cache<CacheKey, CacheEntry> caffeineCache) {
this.cache = caffeineCache;
}

@Override
protected final void invalidateFullCache() {
protected final void clear() {
cache.invalidateAll();
}

@Override
protected void invalidateCache(Iterable<CommandObject<?>> commands) {
cache.invalidateAll(commands);
protected void remove(Iterable<CacheKey<?>> keys) {
cache.invalidateAll(keys);
}

@Override
protected <T> void putValue(CommandObject<T> command, T value) {
cache.put(command, value);
protected void put(CacheKey key, CacheEntry entry) {
cache.put(key, entry);
}

@Override
protected <T> T getValue(CommandObject<T> command) {
return (T) cache.getIfPresent(command);
protected CacheEntry get(CacheKey key) {
return cache.getIfPresent(key);
}

public static Builder builder() {
Expand Down
Loading

0 comments on commit 8b83218

Please sign in to comment.