diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 58bc941706..a8af741f42 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -352,11 +352,7 @@ protected Object readProtocolWithCheckingBroken() { } try { - Protocol.readPushes(inputStream, clientSideCache); - return Protocol.read(inputStream); -// Object read = Protocol.read(inputStream); -// System.out.println("REPLY: " + SafeEncoder.encodeObject(read)); -// return read; + return Protocol.read(inputStream, clientSideCache); } catch (JedisConnectionException exc) { broken = true; throw exc; @@ -376,19 +372,6 @@ public List getMany(final int count) { return responses; } - protected void readPushesWithCheckingBroken() { - if (broken) { - throw new JedisConnectionException("Attempting to read pushes from a broken connection"); - } - - try { - Protocol.readPushes(inputStream, clientSideCache); - } catch (JedisConnectionException exc) { - broken = true; - throw exc; - } - } - /** * Check if the client name libname, libver, characters are legal * @param info the name diff --git a/src/main/java/redis/clients/jedis/JedisClientSideCache.java b/src/main/java/redis/clients/jedis/JedisClientSideCache.java index 73f2a71124..7128f7a1d5 100644 --- a/src/main/java/redis/clients/jedis/JedisClientSideCache.java +++ b/src/main/java/redis/clients/jedis/JedisClientSideCache.java @@ -24,7 +24,7 @@ public JedisClientSideCache(final HostAndPort hostPort, final JedisClientConfig private void clientTrackingOn() { String reply = connection.executeCommand(new CommandObject<>( - new CommandArguments(Protocol.Command.CLIENT).add("TRACKING").add("ON").add("BCAST"), + new CommandArguments(Protocol.Command.CLIENT).add("TRACKING").add("ON"), BuilderFactory.STRING)); if (!"OK".equals(reply)) { throw new JedisException("Could not enable client tracking. Reply: " + reply); @@ -33,7 +33,6 @@ private void clientTrackingOn() { @Override public String get(String key) { - connection.readPushesWithCheckingBroken(); String cachedValue = cache.getValue(key); if (cachedValue != null) return cachedValue; diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 0e276c0d93..489a331b3a 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -7,7 +7,6 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; -import java.util.Objects; import redis.clients.jedis.exceptions.*; import redis.clients.jedis.args.Rawable; @@ -171,15 +170,6 @@ private static Object process(final RedisInputStream is) { } } - private static void processPush(final RedisInputStream is, ClientSideCache cache) { - List list = processMultiBulkReply(is); - //System.out.println("PUSH: " + SafeEncoder.encodeObject(list)); - if (list.size() == 2 && list.get(0) instanceof byte[] - && Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) { - cache.invalidateKeys((List) list.get(1)); - } - } - private static byte[] processBulkReply(final RedisInputStream is) { final int len = is.readIntCrLf(); if (len == -1) { @@ -232,20 +222,35 @@ private static List processMapKeyValueReply(final RedisInputStream is) return ret; } + @Deprecated public static Object read(final RedisInputStream is) { return process(is); } - static void readPushes(final RedisInputStream is, final ClientSideCache cache) { + public static Object read(final RedisInputStream is, final ClientSideCache cache) { + readPushes(is, cache); + return process(is); + } + + private static void readPushes(final RedisInputStream is, final ClientSideCache cache) { if (cache != null) { //System.out.println("PEEK: " + is.peekByte()); - while (Objects.equals(GREATER_THAN_BYTE, is.peekByte())) { + while (is.peek(GREATER_THAN_BYTE)) { is.readByte(); processPush(is, cache); } } } + private static void processPush(final RedisInputStream is, ClientSideCache cache) { + List list = processMultiBulkReply(is); + //System.out.println("PUSH: " + SafeEncoder.encodeObject(list)); + if (list.size() == 2 && list.get(0) instanceof byte[] + && Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) { + cache.invalidateKeys((List) list.get(1)); + } + } + public static final byte[] toByteArray(final boolean value) { return value ? BYTES_TRUE : BYTES_FALSE; } diff --git a/src/main/java/redis/clients/jedis/util/RedisInputStream.java b/src/main/java/redis/clients/jedis/util/RedisInputStream.java index 094ec762d8..a0859c6bd4 100644 --- a/src/main/java/redis/clients/jedis/util/RedisInputStream.java +++ b/src/main/java/redis/clients/jedis/util/RedisInputStream.java @@ -43,9 +43,9 @@ public RedisInputStream(InputStream in) { this(in, INPUT_BUFFER_SIZE); } - public Byte peekByte() { - ensureFillSafe(); - return buf[count]; + public boolean peek(byte b) throws JedisConnectionException { + ensureFill(); // in current design, at least one reply is expected. so ensureFillSafe() is not necessary. + return buf[count] == b; } public byte readByte() throws JedisConnectionException { @@ -257,18 +257,4 @@ private void ensureFill() throws JedisConnectionException { } } } - - private void ensureFillSafe() { - if (count >= limit) { - try { - limit = in.read(buf); - count = 0; - if (limit == -1) { - throw new JedisConnectionException("Unexpected end of stream."); - } - } catch (IOException e) { - // do nothing - } - } - } } diff --git a/src/test/java/redis/clients/jedis/JedisClientSideCacheTest.java b/src/test/java/redis/clients/jedis/JedisClientSideCacheTest.java index 2375fa5153..5b9f8b2319 100644 --- a/src/test/java/redis/clients/jedis/JedisClientSideCacheTest.java +++ b/src/test/java/redis/clients/jedis/JedisClientSideCacheTest.java @@ -1,8 +1,10 @@ package redis.clients.jedis; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -17,7 +19,7 @@ public class JedisClientSideCacheTest { @Before public void setUp() throws Exception { - jedis = new Jedis(hnp, DefaultJedisClientConfig.builder().timeoutMillis(500).password("foobared").build()); + jedis = new Jedis(hnp, DefaultJedisClientConfig.builder().password("foobared").build()); jedis.flushAll(); } @@ -26,45 +28,83 @@ public void tearDown() throws Exception { jedis.close(); } - private static final JedisClientConfig configForCache = DefaultJedisClientConfig.builder() - .resp3().socketTimeoutMillis(20).password("foobared").build(); + private static final JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().resp3().password("foobared").build(); @Test public void simple() { - try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, configForCache)) { + try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig)) { jedis.set("foo", "bar"); assertEquals("bar", jCache.get("foo")); jedis.del("foo"); - assertNull(jCache.get("foo")); + assertThat(jCache.get("foo"), Matchers.oneOf("bar", null)); // ? } } @Test - public void simpleMock() { + public void simpleMoreAndMock() { ClientSideCache cache = Mockito.mock(ClientSideCache.class); - try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, configForCache, cache)) { + Mockito.when(cache.getValue("foo")).thenReturn(null, "bar", null); + + try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig, cache)) { jedis.set("foo", "bar"); + assertEquals("bar", jCache.get("foo")); + jedis.del("foo"); + + assertEquals("bar", jCache.get("foo")); + + // there should be an invalid pending; any connection command will make it read + jCache.ping(); + assertNull(jCache.get("foo")); } InOrder inOrder = Mockito.inOrder(cache); - inOrder.verify(cache).invalidateKeys(Mockito.notNull()); inOrder.verify(cache).getValue("foo"); inOrder.verify(cache).setKey("foo", "bar"); + inOrder.verify(cache).getValue("foo"); inOrder.verify(cache).invalidateKeys(Mockito.notNull()); inOrder.verify(cache).getValue("foo"); inOrder.verifyNoMoreInteractions(); } @Test - public void flushall() { - try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, configForCache)) { + public void flushAll() { + try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig)) { + jedis.set("foo", "bar"); + assertEquals("bar", jCache.get("foo")); + jedis.flushAll(); + assertThat(jCache.get("foo"), Matchers.oneOf("bar", null)); // ? + } + } + + @Test + public void flushAllMoreAndMock() { + ClientSideCache cache = Mockito.mock(ClientSideCache.class); + Mockito.when(cache.getValue("foo")).thenReturn(null, "bar", null); + + try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig, cache)) { jedis.set("foo", "bar"); + assertEquals("bar", jCache.get("foo")); + jedis.flushAll(); + + assertEquals("bar", jCache.get("foo")); + + // there should be an invalid pending; any connection command will make it read + jCache.ping(); + assertNull(jCache.get("foo")); } + + InOrder inOrder = Mockito.inOrder(cache); + inOrder.verify(cache).getValue("foo"); + inOrder.verify(cache).setKey("foo", "bar"); + inOrder.verify(cache).getValue("foo"); + inOrder.verify(cache).invalidateKeys(Mockito.isNull()); + inOrder.verify(cache).getValue("foo"); + inOrder.verifyNoMoreInteractions(); } }