Skip to content

Commit

Permalink
Refactor Client-Side Caching implementation (#3900)
Browse files Browse the repository at this point in the history
* adding a DataProvider to access connection from cache

* resolve keys from commandarguments

* clean up in unifiiedjedis and add csc test with ssl

* - fix readtimeout exception with sockets for consuming invalidations pending in buffer
- apply a default list of cacheable commands to DefaultClientSideCacheable
- fix failing unit tests with cacheable / non-cacheable keys
- remove formatting changes

* - add serialization for cache instances
- add unit test with UnifiedJedis
- add benchmark for CSC execution
- clean unused imports

* - added 'Cache' interface and 'DefaultCache' implementation in regard to design doc
- added 'EvictionPolicy' interface and LRU implementation
- move cache object validation and cache control stuf from 'ClientSideCache' into 'CacheConnection'
- make guava and caffeine caches experimental

* - added SSLSocketWrapper and plug it to use 'available'
- handle exceptions properly
- fix some issues with unit tests

* implementing thread safety

* - fix eviction issue and add related test
- fix consuming invalidation messages on a response read
- introduce cachestats
- fix potential issue with cacheKeysRelatedtoRedisKey cleanup
- tests for sequential access, concurrent acces and maxsize

* - renmae abstract cache class
- add test case for returning new instance of cache object

* - change order of execution in sequential acces test

* -  flush the cache on any disconnect
- replace LRU policy references with EvictionPolicy interface
-  add some constructor overloads to enable custom eviction policies on cache

* fix testcache

* fix javadoc issue

* - fix multithreaded eviction policy issue
- update guava and caffeine implementations according to abstract cache
  • Loading branch information
atakavci authored Aug 13, 2024
1 parent 8b83218 commit 229399f
Show file tree
Hide file tree
Showing 42 changed files with 2,393 additions and 794 deletions.
13 changes: 7 additions & 6 deletions src/main/java/redis/clients/jedis/CommandArguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@
public class CommandArguments implements Iterable<Rawable> {

private final ArrayList<Rawable> args;
private final ArrayList<Object> keys;

private boolean blocking;

private CommandArguments() {
throw new InstantiationError();
}

public CommandArguments(ProtocolCommand command) {
args = new ArrayList<>();
args.add(command);
keys = new ArrayList<>();
}

public ProtocolCommand getCommand() {
Expand Down Expand Up @@ -115,6 +113,7 @@ public CommandArguments key(Object key) {
} else {
throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
}
keys.add(key);
return this;
}

Expand All @@ -134,7 +133,6 @@ public final CommandArguments addParams(IParams params) {
}

protected CommandArguments processKey(byte[] key) {
// do nothing
return this;
}

Expand All @@ -146,7 +144,6 @@ protected final CommandArguments processKeys(byte[]... keys) {
}

protected CommandArguments processKey(String key) {
// do nothing
return this;
}

Expand All @@ -166,6 +163,10 @@ public Iterator<Rawable> iterator() {
return args.iterator();
}

public Object[] getKeys() {
return keys.toArray();
}

public boolean isBlocking() {
return blocking;
}
Expand Down
29 changes: 22 additions & 7 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.annots.Internal;
import redis.clients.jedis.args.ClientAttributeOption;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.commands.ProtocolCommand;
Expand Down Expand Up @@ -342,31 +341,45 @@ protected void flush() {
}

@Experimental
@Internal
protected Object protocolRead(RedisInputStream is) {
return Protocol.read(is);
}

@Experimental
@Internal
protected void protocolReadPushes(RedisInputStream is) {
}

// TODO: final
protected Object readProtocolWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read from a broken connection.");
}

try {
protocolReadPushes(inputStream);
return protocolRead(inputStream);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}

protected void readPushesWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read from a broken connection.");
}

try {
if (inputStream.available() > 0) {
protocolReadPushes(inputStream);
}
} catch (IOException e) {
broken = true;
throw new JedisConnectionException("Failed to check buffer on connection.", e);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}

public List<Object> getMany(final int count) {
flush();
final List<Object> responses = new ArrayList<>(count);
Expand All @@ -382,6 +395,7 @@ public List<Object> getMany(final int count) {

/**
* Check if the client name libname, libver, characters are legal
*
* @param info the name
* @return Returns true if legal, false throws exception
* @throws JedisException if characters illegal
Expand All @@ -397,7 +411,7 @@ private static boolean validateClientInfo(String info) {
return true;
}

private void initializeFromClientConfig(final JedisClientConfig config) {
protected void initializeFromClientConfig(final JedisClientConfig config) {
try {
connect();

Expand Down Expand Up @@ -425,7 +439,8 @@ private void initializeFromClientConfig(final JedisClientConfig config) {
}

ClientSetInfoConfig setInfoConfig = config.getClientSetInfoConfig();
if (setInfoConfig == null) setInfoConfig = ClientSetInfoConfig.DEFAULT;
if (setInfoConfig == null)
setInfoConfig = ClientSetInfoConfig.DEFAULT;

if (!setInfoConfig.isDisabled()) {
String libName = JedisMetaInfo.getArtifactId();
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import org.slf4j.LoggerFactory;

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.csc.CacheConnection;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.exceptions.JedisException;

/**
Expand All @@ -20,7 +20,7 @@ public class ConnectionFactory implements PooledObjectFactory<Connection> {

private final JedisSocketFactory jedisSocketFactory;
private final JedisClientConfig clientConfig;
private ClientSideCache clientSideCache = null;
private Cache clientSideCache = null;

public ConnectionFactory(final HostAndPort hostAndPort) {
this.clientConfig = DefaultJedisClientConfig.builder().build();
Expand All @@ -33,7 +33,7 @@ public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig
}

@Experimental
public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache) {
public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, Cache csCache) {
this.clientConfig = clientConfig;
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
this.clientSideCache = csCache;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/redis/clients/jedis/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.util.Pool;

public class ConnectionPool extends Pool<Connection> {
Expand All @@ -13,7 +13,7 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
}

@Experimental
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, Cache clientSideCache) {
this(new ConnectionFactory(hostAndPort, clientConfig, clientSideCache));
}

Expand All @@ -27,7 +27,7 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
}

@Experimental
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, Cache clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig, clientSideCache), poolConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ public Socket createSocket() throws JedisConnectionException {
if (null == _sslSocketFactory) {
_sslSocketFactory = (SSLSocketFactory) SSLSocketFactory.getDefault();
}
Socket plainSocket = socket;
socket = _sslSocketFactory.createSocket(socket, _hostAndPort.getHost(), _hostAndPort.getPort(), true);

if (null != sslParameters) {
((SSLSocket) socket).setSSLParameters(sslParameters);
}
socket = new SSLSocketWrapper((SSLSocket) socket, plainSocket);

if (null != hostnameVerifier
&& !hostnameVerifier.verify(_hostAndPort.getHost(), ((SSLSocket) socket).getSession())) {
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.util.JedisClusterCRC16;

public class JedisCluster extends UnifiedJedis {
Expand Down Expand Up @@ -218,35 +218,35 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati
}

@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache) {
this(clusterNodes, clientConfig, clientSideCache, DEFAULT_MAX_ATTEMPTS,
Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()));
}

@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol(), clientSideCache);
}

@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
}

@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
DEFAULT_MAX_ATTEMPTS, Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()),
clientConfig.getRedisProtocol(), clientSideCache);
}

@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig, topologyRefreshPeriod),
Expand All @@ -255,7 +255,7 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi

@Experimental
private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol, ClientSideCache clientSideCache) {
RedisProtocol protocol, Cache clientSideCache) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache);
}

Expand Down
10 changes: 5 additions & 5 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.annots.Internal;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;
Expand All @@ -48,7 +48,7 @@ public class JedisClusterInfoCache {

private final GenericObjectPoolConfig<Connection> poolConfig;
private final JedisClientConfig clientConfig;
private final ClientSideCache clientSideCache;
private final Cache clientSideCache;
private final Set<HostAndPort> startNodes;

private static final int MASTER_NODE_INDEX = 2;
Expand All @@ -72,7 +72,7 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<Hos
}

@Experimental
public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisClusterInfoCache(final JedisClientConfig clientConfig, Cache clientSideCache,
final Set<HostAndPort> startNodes) {
this(clientConfig, clientSideCache, null, startNodes);
}
Expand All @@ -83,7 +83,7 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
}

@Experimental
public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisClusterInfoCache(final JedisClientConfig clientConfig, Cache clientSideCache,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, clientSideCache, poolConfig, startNodes, null);
}
Expand All @@ -95,7 +95,7 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
}

@Experimental
public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisClusterInfoCache(final JedisClientConfig clientConfig, Cache clientSideCache,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.JedisURIHelper;
import redis.clients.jedis.util.Pool;
Expand Down Expand Up @@ -78,7 +78,7 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
}

@Experimental
public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, Cache clientSideCache) {
super(hostAndPort, clientConfig, clientSideCache);
}

Expand Down Expand Up @@ -383,7 +383,7 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
}

@Experimental
public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, Cache clientSideCache,
final GenericObjectPoolConfig<Connection> poolConfig) {
super(new PooledConnectionProvider(hostAndPort, clientConfig, clientSideCache, poolConfig),
clientConfig.getRedisProtocol(), clientSideCache);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.providers.SentineledConnectionProvider;

public class JedisSentineled extends UnifiedJedis {
Expand All @@ -15,7 +15,7 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
}

@Experimental
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCache clientSideCache,
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, Cache clientSideCache,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
super(new SentineledConnectionProvider(masterName, masterClientConfig, clientSideCache,
sentinels, sentinelClientConfig), masterClientConfig.getRedisProtocol(), clientSideCache);
Expand All @@ -29,7 +29,7 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
}

@Experimental
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCache clientSideCache,
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, Cache clientSideCache,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
super(new SentineledConnectionProvider(masterName, masterClientConfig, clientSideCache, poolConfig,
Expand Down
Loading

0 comments on commit 229399f

Please sign in to comment.