Skip to content

Commit

Permalink
Support for client-side caching - phase 2 (#3673)
Browse files Browse the repository at this point in the history
* Code re-use?

* Stop forcing to read push notifications before checking cache and remove BCAST

* Rename variable

* Remove ensureFillSafe()

* Refactor peeking and reading push notifications

* Cleanup comments
  • Loading branch information
sazzad16 authored Jan 8, 2024
1 parent 5fa2c80 commit 89617c9
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 59 deletions.
19 changes: 1 addition & 18 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,7 @@ protected Object readProtocolWithCheckingBroken() {
}

try {
Protocol.readPushes(inputStream, clientSideCache);
return Protocol.read(inputStream);
// Object read = Protocol.read(inputStream);
// System.out.println("REPLY: " + SafeEncoder.encodeObject(read));
// return read;
return Protocol.read(inputStream, clientSideCache);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
Expand All @@ -376,19 +372,6 @@ public List<Object> getMany(final int count) {
return responses;
}

protected void readPushesWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read pushes from a broken connection");
}

try {
Protocol.readPushes(inputStream, clientSideCache);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}

/**
* Check if the client name libname, libver, characters are legal
* @param info the name
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/redis/clients/jedis/JedisClientSideCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public JedisClientSideCache(final HostAndPort hostPort, final JedisClientConfig

private void clientTrackingOn() {
String reply = connection.executeCommand(new CommandObject<>(
new CommandArguments(Protocol.Command.CLIENT).add("TRACKING").add("ON").add("BCAST"),
new CommandArguments(Protocol.Command.CLIENT).add("TRACKING").add("ON"),
BuilderFactory.STRING));
if (!"OK".equals(reply)) {
throw new JedisException("Could not enable client tracking. Reply: " + reply);
Expand All @@ -33,7 +33,6 @@ private void clientTrackingOn() {

@Override
public String get(String key) {
connection.readPushesWithCheckingBroken();
String cachedValue = cache.getValue(key);
if (cachedValue != null) return cachedValue;

Expand Down
29 changes: 17 additions & 12 deletions src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

import redis.clients.jedis.exceptions.*;
import redis.clients.jedis.args.Rawable;
Expand Down Expand Up @@ -171,15 +170,6 @@ private static Object process(final RedisInputStream is) {
}
}

private static void processPush(final RedisInputStream is, ClientSideCache cache) {
List<Object> list = processMultiBulkReply(is);
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
cache.invalidateKeys((List) list.get(1));
}
}

private static byte[] processBulkReply(final RedisInputStream is) {
final int len = is.readIntCrLf();
if (len == -1) {
Expand Down Expand Up @@ -232,20 +222,35 @@ private static List<KeyValue> processMapKeyValueReply(final RedisInputStream is)
return ret;
}

@Deprecated
public static Object read(final RedisInputStream is) {
return process(is);
}

static void readPushes(final RedisInputStream is, final ClientSideCache cache) {
public static Object read(final RedisInputStream is, final ClientSideCache cache) {
readPushes(is, cache);
return process(is);
}

private static void readPushes(final RedisInputStream is, final ClientSideCache cache) {
if (cache != null) {
//System.out.println("PEEK: " + is.peekByte());
while (Objects.equals(GREATER_THAN_BYTE, is.peekByte())) {
while (is.peek(GREATER_THAN_BYTE)) {
is.readByte();
processPush(is, cache);
}
}
}

private static void processPush(final RedisInputStream is, ClientSideCache cache) {
List<Object> list = processMultiBulkReply(is);
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
cache.invalidateKeys((List) list.get(1));
}
}

public static final byte[] toByteArray(final boolean value) {
return value ? BYTES_TRUE : BYTES_FALSE;
}
Expand Down
20 changes: 3 additions & 17 deletions src/main/java/redis/clients/jedis/util/RedisInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public RedisInputStream(InputStream in) {
this(in, INPUT_BUFFER_SIZE);
}

public Byte peekByte() {
ensureFillSafe();
return buf[count];
public boolean peek(byte b) throws JedisConnectionException {
ensureFill(); // in current design, at least one reply is expected. so ensureFillSafe() is not necessary.
return buf[count] == b;
}

public byte readByte() throws JedisConnectionException {
Expand Down Expand Up @@ -257,18 +257,4 @@ private void ensureFill() throws JedisConnectionException {
}
}
}

private void ensureFillSafe() {
if (count >= limit) {
try {
limit = in.read(buf);
count = 0;
if (limit == -1) {
throw new JedisConnectionException("Unexpected end of stream.");
}
} catch (IOException e) {
// do nothing
}
}
}
}
60 changes: 50 additions & 10 deletions src/test/java/redis/clients/jedis/JedisClientSideCacheTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package redis.clients.jedis;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -17,7 +19,7 @@ public class JedisClientSideCacheTest {

@Before
public void setUp() throws Exception {
jedis = new Jedis(hnp, DefaultJedisClientConfig.builder().timeoutMillis(500).password("foobared").build());
jedis = new Jedis(hnp, DefaultJedisClientConfig.builder().password("foobared").build());
jedis.flushAll();
}

Expand All @@ -26,45 +28,83 @@ public void tearDown() throws Exception {
jedis.close();
}

private static final JedisClientConfig configForCache = DefaultJedisClientConfig.builder()
.resp3().socketTimeoutMillis(20).password("foobared").build();
private static final JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().resp3().password("foobared").build();

@Test
public void simple() {
try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, configForCache)) {
try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig)) {
jedis.set("foo", "bar");
assertEquals("bar", jCache.get("foo"));
jedis.del("foo");
assertNull(jCache.get("foo"));
assertThat(jCache.get("foo"), Matchers.oneOf("bar", null)); // ?
}
}

@Test
public void simpleMock() {
public void simpleMoreAndMock() {
ClientSideCache cache = Mockito.mock(ClientSideCache.class);
try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, configForCache, cache)) {
Mockito.when(cache.getValue("foo")).thenReturn(null, "bar", null);

try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig, cache)) {
jedis.set("foo", "bar");

assertEquals("bar", jCache.get("foo"));

jedis.del("foo");

assertEquals("bar", jCache.get("foo"));

// there should be an invalid pending; any connection command will make it read
jCache.ping();

assertNull(jCache.get("foo"));
}

InOrder inOrder = Mockito.inOrder(cache);
inOrder.verify(cache).invalidateKeys(Mockito.notNull());
inOrder.verify(cache).getValue("foo");
inOrder.verify(cache).setKey("foo", "bar");
inOrder.verify(cache).getValue("foo");
inOrder.verify(cache).invalidateKeys(Mockito.notNull());
inOrder.verify(cache).getValue("foo");
inOrder.verifyNoMoreInteractions();
}

@Test
public void flushall() {
try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, configForCache)) {
public void flushAll() {
try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig)) {
jedis.set("foo", "bar");
assertEquals("bar", jCache.get("foo"));
jedis.flushAll();
assertThat(jCache.get("foo"), Matchers.oneOf("bar", null)); // ?
}
}

@Test
public void flushAllMoreAndMock() {
ClientSideCache cache = Mockito.mock(ClientSideCache.class);
Mockito.when(cache.getValue("foo")).thenReturn(null, "bar", null);

try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig, cache)) {
jedis.set("foo", "bar");

assertEquals("bar", jCache.get("foo"));

jedis.flushAll();

assertEquals("bar", jCache.get("foo"));

// there should be an invalid pending; any connection command will make it read
jCache.ping();

assertNull(jCache.get("foo"));
}

InOrder inOrder = Mockito.inOrder(cache);
inOrder.verify(cache).getValue("foo");
inOrder.verify(cache).setKey("foo", "bar");
inOrder.verify(cache).getValue("foo");
inOrder.verify(cache).invalidateKeys(Mockito.isNull());
inOrder.verify(cache).getValue("foo");
inOrder.verifyNoMoreInteractions();
}
}

0 comments on commit 89617c9

Please sign in to comment.