diff --git a/src/test/java/redis/clients/jedis/MigratePipeliningTest.java b/src/test/java/redis/clients/jedis/MigratePipeliningTest.java index b7942fc140..ebd4b31eb9 100644 --- a/src/test/java/redis/clients/jedis/MigratePipeliningTest.java +++ b/src/test/java/redis/clients/jedis/MigratePipeliningTest.java @@ -2,9 +2,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertArrayEquals; @@ -72,7 +72,7 @@ public void noKey() { p.migrate(host, port, db, timeout, new MigrateParams(), bfoo1, bfoo2, bfoo3); assertThat(p.syncAndReturnAll(), - hasItems("NOKEY", "NOKEY", "NOKEY", "NOKEY")); + contains("NOKEY", "NOKEY", "NOKEY", "NOKEY")); } @Test @@ -86,7 +86,7 @@ public void migrate() { p.get("foo"); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", null)); + contains("OK", "OK", null)); assertEquals("bar", dest.get("foo")); } @@ -102,7 +102,7 @@ public void migrateBinary() { p.get(bfoo); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", null)); + contains("OK", "OK", null)); assertArrayEquals(bbar, dest.get(bfoo)); } @@ -118,7 +118,7 @@ public void migrateEmptyParams() { p.get("foo"); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", null)); + contains("OK", "OK", null)); assertEquals("bar", dest.get("foo")); } @@ -134,7 +134,7 @@ public void migrateEmptyParamsBinary() { p.get(bfoo); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", null)); + contains("OK", "OK", null)); assertArrayEquals(bbar, dest.get(bfoo)); } @@ -150,7 +150,7 @@ public void migrateCopy() { p.get("foo"); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", "bar")); + contains("OK", "OK", "bar")); assertEquals("bar", dest.get("foo")); } @@ -166,7 +166,7 @@ public void migrateCopyBinary() { p.get(bfoo); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", bbar)); + contains("OK", "OK", bbar)); assertArrayEquals(bbar, dest.get(bfoo)); } @@ -184,7 +184,7 @@ public void migrateReplace() { p.get("foo"); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", null)); + contains("OK", "OK", null)); assertEquals("bar1", dest.get("foo")); } @@ -202,7 +202,7 @@ public void migrateReplaceBinary() { p.get(bfoo); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", null)); + contains("OK", "OK", null)); assertArrayEquals(bbar1, dest.get(bfoo)); } @@ -220,7 +220,7 @@ public void migrateCopyReplace() { p.get("foo"); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", "bar1")); + contains("OK", "OK", "bar1")); assertEquals("bar1", dest.get("foo")); } @@ -238,7 +238,7 @@ public void migrateCopyReplaceBinary() { p.get(bfoo); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", bbar1)); + contains("OK", "OK", bbar1)); assertArrayEquals(bbar1, dest.get(bfoo)); } @@ -254,7 +254,7 @@ public void migrateAuth() { p.get("foo"); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", null)); + contains("OK", "OK", null)); assertEquals("bar", destAuth.get("foo")); } @@ -270,7 +270,7 @@ public void migrateAuthBinary() { p.get(bfoo); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", null)); + contains("OK", "OK", null)); assertArrayEquals(bbar, destAuth.get(bfoo)); } @@ -287,7 +287,7 @@ public void migrateAuth2() { p.get("foo"); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", null)); + contains("OK", "OK", null)); assertEquals("bar", jedis.get("foo")); } @@ -304,7 +304,7 @@ public void migrateAuth2Binary() { p.get(bfoo); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK", null)); + contains("OK", "OK", null)); assertArrayEquals(bbar, jedis.get(bfoo)); } @@ -321,7 +321,7 @@ public void migrateMulti() { p.migrate(host, port, db, timeout, new MigrateParams(), "foo1", "foo2", "foo3"); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK")); + contains("OK", "OK")); assertEquals("bar1", dest.get("foo1")); assertEquals("bar2", dest.get("foo2")); @@ -340,7 +340,7 @@ public void migrateMultiBinary() { p.migrate(host, port, db, timeout, new MigrateParams(), bfoo1, bfoo2, bfoo3); assertThat(p.syncAndReturnAll(), - hasItems("OK", "OK")); + contains("OK", "OK")); assertArrayEquals(bbar1, dest.get(bfoo1)); assertArrayEquals(bbar2, dest.get(bfoo2)); @@ -361,7 +361,7 @@ public void migrateConflict() { p.migrate(host, port, db, timeout, new MigrateParams(), "foo1", "foo2", "foo3"); assertThat(p.syncAndReturnAll(), - hasItems( + contains( equalTo("OK"), both(instanceOf(JedisDataException.class)).and(hasToString(containsString("BUSYKEY"))) )); @@ -385,7 +385,7 @@ public void migrateConflictBinary() { p.migrate(host, port, db, timeout, new MigrateParams(), bfoo1, bfoo2, bfoo3); assertThat(p.syncAndReturnAll(), - hasItems( + contains( equalTo("OK"), both(instanceOf(JedisDataException.class)).and(hasToString(containsString("BUSYKEY"))) )); diff --git a/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java new file mode 100644 index 0000000000..066ca98fb0 --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java @@ -0,0 +1,1314 @@ +package redis.clients.jedis.commands.unified.pipeline; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.hamcrest.Matchers; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import redis.clients.jedis.BuilderFactory; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; +import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.commands.unified.pooled.PooledCommandsTestHelper; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.params.XAddParams; +import redis.clients.jedis.params.XAutoClaimParams; +import redis.clients.jedis.params.XClaimParams; +import redis.clients.jedis.params.XPendingParams; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XReadParams; +import redis.clients.jedis.params.XTrimParams; +import redis.clients.jedis.resps.StreamConsumerFullInfo; +import redis.clients.jedis.resps.StreamConsumerInfo; +import redis.clients.jedis.resps.StreamConsumersInfo; +import redis.clients.jedis.resps.StreamEntry; +import redis.clients.jedis.resps.StreamFullInfo; +import redis.clients.jedis.resps.StreamGroupFullInfo; +import redis.clients.jedis.resps.StreamGroupInfo; +import redis.clients.jedis.resps.StreamInfo; +import redis.clients.jedis.resps.StreamPendingEntry; +import redis.clients.jedis.util.SafeEncoder; + +public class StreamsPipelineCommandsTest extends PipelineCommandsTestBase { + + @BeforeClass + public static void prepare() throws InterruptedException { + jedis = PooledCommandsTestHelper.getPooled(); + } + + @AfterClass + public static void cleanUp() { + jedis.close(); + } + + @Test + public void xaddWrongNumberOfArguments() { + Map map1 = new HashMap<>(); + pipe.xadd("stream1", (StreamEntryID) null, map1); + + assertThat(pipe.syncAndReturnAll(), + contains( + both(instanceOf(JedisDataException.class)).and(hasToString(containsString("wrong number of arguments"))) + )); + } + + @Test + public void xadd() { + Map map1 = new HashMap<>(); + map1.put("f1", "v1"); + pipe.xadd("xadd-stream1", (StreamEntryID) null, map1); + + Map map2 = new HashMap<>(); + map2.put("f1", "v1"); + map2.put("f2", "v2"); + pipe.xadd("xadd-stream1", (StreamEntryID) null, map2); + + List results = pipe.syncAndReturnAll(); + + assertThat(results, contains( + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class) + )); + + assertThat((StreamEntryID) results.get(1), + greaterThan((StreamEntryID) results.get(0))); + } + + @Test + public void xaddMaxLen() { + Map map4 = new HashMap<>(); + map4.put("f2", "v2"); + map4.put("f3", "v3"); + StreamEntryID idIn = new StreamEntryID(1000, 1L); + pipe.xadd("xadd-stream2", idIn, map4); + + Map map5 = new HashMap<>(); + map5.put("f4", "v4"); + map5.put("f5", "v5"); + pipe.xadd("xadd-stream2", (StreamEntryID) null, map5); + + pipe.xlen("xadd-stream2"); + + Map map6 = new HashMap<>(); + map6.put("f4", "v4"); + map6.put("f5", "v5"); + pipe.xadd("xadd-stream2", map6, XAddParams.xAddParams().maxLen(2)); + + pipe.xlen("xadd-stream2"); + + List results = pipe.syncAndReturnAll(); + + assertThat(results, contains( + equalTo(idIn), + instanceOf(StreamEntryID.class), + equalTo(2L), + instanceOf(StreamEntryID.class), + equalTo(2L) + )); + + assertThat((StreamEntryID) results.get(1), + greaterThan((StreamEntryID) results.get(0))); + + assertThat((StreamEntryID) results.get(3), + greaterThan((StreamEntryID) results.get(1))); + } + + @Test + public void xaddWithParamsWrongNumberOfArguments() { + pipe.xadd("stream1", new HashMap<>(), XAddParams.xAddParams()); + pipe.xadd("stream1", XAddParams.xAddParams(), new HashMap<>()); + + assertThat(pipe.syncAndReturnAll(), + contains( + both(instanceOf(JedisDataException.class)).and(hasToString(containsString("wrong number of arguments"))), + both(instanceOf(JedisDataException.class)).and(hasToString(containsString("wrong number of arguments"))) + )); + } + + @Test + public void xaddWithParams() { + pipe.xadd("xadd-stream1", (StreamEntryID) null, singletonMap("f1", "v1")); + + Map map2 = new HashMap<>(); + map2.put("f1", "v1"); + map2.put("f2", "v2"); + pipe.xadd("xadd-stream1", map2, XAddParams.xAddParams()); + + List results = pipe.syncAndReturnAll(); + + assertThat(results, contains( + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class) + )); + + assertThat((StreamEntryID) results.get(1), + greaterThan((StreamEntryID) results.get(0))); + } + + @Test + public void xaddWithParamsTrim() { + Map map3 = new HashMap<>(); + map3.put("f2", "v2"); + map3.put("f3", "v3"); + StreamEntryID idIn = new StreamEntryID(1000, 1L); + pipe.xadd("xadd-stream2", XAddParams.xAddParams().id(idIn), map3); + + Map map4 = new HashMap<>(); + map4.put("f2", "v2"); + map4.put("f3", "v3"); + StreamEntryID idIn2 = new StreamEntryID(2000, 1L); + pipe.xadd("xadd-stream2", map4, XAddParams.xAddParams().id(idIn2)); + + Map map5 = new HashMap<>(); + map5.put("f4", "v4"); + map5.put("f5", "v5"); + pipe.xadd("xadd-stream2", XAddParams.xAddParams(), map5); + + pipe.xlen("xadd-stream2"); + + Map map6 = new HashMap<>(); + map6.put("f4", "v4"); + map6.put("f5", "v5"); + pipe.xadd("xadd-stream2", map6, XAddParams.xAddParams().maxLen(3).exactTrimming()); + + pipe.xlen("xadd-stream2"); + + List results = pipe.syncAndReturnAll(); + + assertThat(results, contains( + equalTo(idIn), + equalTo(idIn2), + instanceOf(StreamEntryID.class), + equalTo(3L), + instanceOf(StreamEntryID.class), + equalTo(3L) + )); + + assertThat((StreamEntryID) results.get(2), + greaterThan((StreamEntryID) results.get(1))); + + assertThat((StreamEntryID) results.get(4), + greaterThan((StreamEntryID) results.get(2))); + } + + @Test + public void xaddWithParamsNoMkStream() { + pipe.xadd("xadd-stream3", XAddParams.xAddParams().noMkStream().maxLen(3).exactTrimming(), singletonMap("f1", "v1")); + + assertThat(pipe.syncAndReturnAll(), + contains( + nullValue() + )); + + assertFalse(jedis.exists("xadd-stream3")); + } + + @Test + public void xaddWithParamsMinId() { + Map map6 = new HashMap<>(); + map6.put("f4", "v4"); + map6.put("f5", "v5"); + + StreamEntryID id = new StreamEntryID(2); + pipe.xadd("xadd-stream3", map6, XAddParams.xAddParams().minId("2").id(id)); + + pipe.xlen("xadd-stream3"); + + StreamEntryID id1 = new StreamEntryID(3); + pipe.xadd("xadd-stream3", XAddParams.xAddParams().minId("4").id(id1), map6); + + pipe.xlen("xadd-stream3"); + + List results = pipe.syncAndReturnAll(); + + assertThat(results, contains( + equalTo(id), + equalTo(1L), + equalTo(id1), + equalTo(0L) + )); + } + + @Test + public void xaddParamsId() { + String key = "kk"; + Map map = singletonMap("ff", "vv"); + + pipe.xadd(key, XAddParams.xAddParams().id(new StreamEntryID(0, 1)), map); + pipe.xadd(key, XAddParams.xAddParams().id(2, 3), map); + pipe.xadd(key, XAddParams.xAddParams().id(4), map); + pipe.xadd(key, XAddParams.xAddParams().id("5-6"), map); + pipe.xadd(key, XAddParams.xAddParams().id("7-8".getBytes()), map); + pipe.xadd(key, XAddParams.xAddParams(), map); + + List results = pipe.syncAndReturnAll(); + + assertThat(results, contains( + equalTo(new StreamEntryID(0, 1)), + equalTo(new StreamEntryID(2, 3)), + equalTo(new StreamEntryID(4, 0)), + equalTo(new StreamEntryID(5, 6)), + equalTo(new StreamEntryID(7, 8)), + instanceOf(StreamEntryID.class) + )); + + assertThat((StreamEntryID) results.get(5), + greaterThan((StreamEntryID) results.get(4))); + } + + @Test + public void xdel() { + Map map1 = new HashMap<>(); + map1.put("f1", "v1"); + + pipe.xadd("xdel-stream", (StreamEntryID) null, map1); + pipe.xadd("xdel-stream", (StreamEntryID) null, map1); + + List results = pipe.syncAndReturnAll(); + + assertThat(results, contains( + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class) + )); + + StreamEntryID id1 = (StreamEntryID) results.get(1); + + pipe.xlen("xdel-stream"); + pipe.xdel("xdel-stream", id1); + pipe.xlen("xdel-stream"); + + assertThat(pipe.syncAndReturnAll(), contains( + 2L, + 1L, + 1L + )); + } + + @Test + public void xlen() { + pipe.xlen("xlen-stream"); + + Map map = new HashMap<>(); + map.put("f1", "v1"); + pipe.xadd("xlen-stream", (StreamEntryID) null, map); + + pipe.xlen("xlen-stream"); + + pipe.xadd("xlen-stream", (StreamEntryID) null, map); + + pipe.xlen("xlen-stream"); + + assertThat(pipe.syncAndReturnAll(), contains( + equalTo(0L), + instanceOf(StreamEntryID.class), + equalTo(1L), + instanceOf(StreamEntryID.class), + equalTo(2L) + )); + } + + @Test + public void xrange() { + Response> range = pipe.xrange("xrange-stream", null, (StreamEntryID) null, Integer.MAX_VALUE); + + Map map1 = singletonMap("f1", "v1"); + Map map2 = singletonMap("f2", "v2"); + Map map3 = singletonMap("f3", "v3"); + + Response id1Response = pipe.xadd("xrange-stream", (StreamEntryID) null, map1); + + Response id2Response = pipe.xadd("xrange-stream", (StreamEntryID) null, map2); + + pipe.sync(); + + assertThat(range.get(), empty()); + assertThat(id1Response.get(), notNullValue()); + assertThat(id2Response.get(), notNullValue()); + + StreamEntryID id1 = id1Response.get(); + StreamEntryID id2 = id2Response.get(); + + Response> range2 = pipe.xrange("xrange-stream", null, (StreamEntryID) null, 3); + Response> range3 = pipe.xrange("xrange-stream", id1, null, 2); + Response> range4 = pipe.xrange("xrange-stream", id1, id2, 2); + Response> range5 = pipe.xrange("xrange-stream", id1, id2, 1); + Response> range6 = pipe.xrange("xrange-stream", id2, null, 4); + + Response id3Response = pipe.xadd("xrange-stream", (StreamEntryID) null, map3); + + pipe.sync(); + + assertThat(range2.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2)); + assertThat(range3.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2)); + assertThat(range4.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2)); + assertThat(range5.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1)); + assertThat(range6.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2)); + + assertThat(id3Response.get(), notNullValue()); + + StreamEntryID id3 = id3Response.get(); + + Response> range7 = pipe.xrange("xrange-stream", id3, id3, 4); + Response> range8 = pipe.xrange("xrange-stream", null, (StreamEntryID) null); + Response> range9 = pipe.xrange("xrange-stream", StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID); + + pipe.sync(); + + assertThat(range7.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id3)); + assertThat(range8.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2, id3)); + assertThat(range9.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2, id3)); + } + + @Test + public void xrangeExclusive() { + StreamEntryID id1 = jedis.xadd("xrange-stream", (StreamEntryID) null, singletonMap("f1", "v1")); + StreamEntryID id2 = jedis.xadd("xrange-stream", (StreamEntryID) null, singletonMap("f2", "v2")); + + Response> range1 = pipe.xrange("xrange-stream", id1.toString(), "+", 2); + Response> range2 = pipe.xrange("xrange-stream", "(" + id1, "+", 2); + + pipe.sync(); + + assertThat(range1.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2)); + assertThat(range2.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2)); + } + + @Test + public void xreadWithParams() { + Map streamQuery1 = singletonMap("xread-stream1", new StreamEntryID()); + + // Before creating Stream + pipe.xread(XReadParams.xReadParams().block(1), streamQuery1); + pipe.xread(XReadParams.xReadParams(), streamQuery1); + + assertThat(pipe.syncAndReturnAll(), contains( + nullValue(), + nullValue() + )); + + Map map1 = singletonMap("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xread-stream1", (StreamEntryID) null, map1); + + Map map2 = singletonMap("f2", "v2"); + StreamEntryID id2 = jedis.xadd("xread-stream2", (StreamEntryID) null, map2); + + // Read only a single Stream + Response>>> streams1 = + pipe.xread(XReadParams.xReadParams().count(1).block(1), streamQuery1); + + Response>>> streams2 = + pipe.xread(XReadParams.xReadParams().block(1), singletonMap("xread-stream1", id1)); + + Response>>> streams3 = + pipe.xread(XReadParams.xReadParams(), singletonMap("xread-stream1", id1)); + + pipe.sync(); + + assertThat(streams1.get().stream().map(Entry::getKey).collect(Collectors.toList()), + contains("xread-stream1")); + + assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getID).collect(Collectors.toList()), contains(id1)); + + assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map1)); + + assertThat(streams2.get(), nullValue()); + + assertThat(streams3.get(), nullValue()); + + // Read from two Streams + Map streamQuery2 = new LinkedHashMap<>(); + streamQuery2.put("xread-stream1", new StreamEntryID()); + streamQuery2.put("xread-stream2", new StreamEntryID()); + + Response>>> streams4 = + pipe.xread(XReadParams.xReadParams().count(2).block(1), streamQuery2); + + pipe.sync(); + + assertThat(streams4.get().stream().map(Entry::getKey).collect(Collectors.toList()), + contains("xread-stream1", "xread-stream2")); + + assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2)); + + assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map1, map2)); + } + + @Test + public void xreadBlockZero() throws InterruptedException { + final AtomicReference>>> readRef = new AtomicReference<>(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + long startTime = System.currentTimeMillis(); + Pipeline blockPipe = jedis.pipelined(); + Map streamQuery = singletonMap("block0-stream", new StreamEntryID()); + Response>>> read = + blockPipe.xread(XReadParams.xReadParams().block(0), streamQuery); + blockPipe.sync(); + long endTime = System.currentTimeMillis(); + assertTrue(endTime - startTime > 500); + assertNotNull(read); + readRef.set(read.get()); + } + }, "xread-block-0-thread"); + t.start(); + Thread.sleep(1000); + StreamEntryID addedId = jedis.xadd("block0-stream", (StreamEntryID) null, singletonMap("foo", "bar")); + t.join(); + + assertThat(readRef.get().stream().map(Entry::getKey).collect(Collectors.toList()), + contains("block0-stream")); + + assertThat(readRef.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getID).collect(Collectors.toList()), contains(addedId)); + } + + @Test + public void xtrim() { + Map map1 = new HashMap(); + map1.put("f1", "v1"); + + for (int i = 1; i <= 5; i++) { + pipe.xadd("xtrim-stream", (StreamEntryID) null, map1); + } + + pipe.xlen("xtrim-stream"); + + pipe.xtrim("xtrim-stream", 3, false); + + pipe.xlen("xtrim-stream"); + + assertThat(pipe.syncAndReturnAll(), contains( + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class), + equalTo(5L), + equalTo(2L), + equalTo(3L) + )); + } + + @Test + public void xtrimWithParams() { + Map map1 = new HashMap<>(); + map1.put("f1", "v1"); + for (int i = 1; i <= 5; i++) { + pipe.xadd("xtrim-stream", new StreamEntryID("0-" + i), map1); + } + + pipe.xlen("xtrim-stream"); + + pipe.xtrim("xtrim-stream", XTrimParams.xTrimParams().maxLen(3).exactTrimming()); + + pipe.xlen("xtrim-stream"); + + // minId + pipe.xtrim("xtrim-stream", XTrimParams.xTrimParams().minId("0-4").exactTrimming()); + + pipe.xlen("xtrim-stream"); + + assertThat(pipe.syncAndReturnAll(), contains( + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class), + instanceOf(StreamEntryID.class), + equalTo(5L), + equalTo(2L), + equalTo(3L), + equalTo(1L), + equalTo(2L) + )); + } + + @Test + public void xrevrange() { + Response> range = pipe.xrevrange("xrevrange-stream", null, (StreamEntryID) null, Integer.MAX_VALUE); + + Map map1 = singletonMap("f1", "v1"); + Response id1Response = pipe.xadd("xrevrange-stream", (StreamEntryID) null, map1); + Map map2 = singletonMap("f2", "v2"); + Response id2Response = pipe.xadd("xrevrange-stream", (StreamEntryID) null, map2); + + pipe.sync(); + + assertThat(range.get(), empty()); + assertThat(id1Response.get(), notNullValue()); + assertThat(id2Response.get(), notNullValue()); + + StreamEntryID id1 = id1Response.get(); + StreamEntryID id2 = id2Response.get(); + + Response> range2 = pipe.xrevrange("xrevrange-stream", null, (StreamEntryID) null, 3); + Response> range3 = pipe.xrevrange("xrevrange-stream", null, id1, 2); + Response> range4 = pipe.xrevrange("xrevrange-stream", id2, id1, 2); + Response> range5 = pipe.xrevrange("xrevrange-stream", id2, id1, 1); + Response> range6 = pipe.xrevrange("xrevrange-stream", null, id2, 4); + + Map map3 = singletonMap("f3", "v3"); + Response id3Response = pipe.xadd("xrevrange-stream", (StreamEntryID) null, map3); + + pipe.sync(); + + assertThat(range2.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2, id1)); + assertThat(range3.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2, id1)); + assertThat(range4.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2, id1)); + assertThat(range5.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2)); + assertThat(range6.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2)); + + assertThat(id3Response.get(), notNullValue()); + + StreamEntryID id3 = id3Response.get(); + + Response> range7 = pipe.xrevrange("xrevrange-stream", id3, id3, 4); + Response> range8 = pipe.xrevrange("xrevrange-stream", null, (StreamEntryID) null); + Response> range9 = pipe.xrevrange("xrevrange-stream", StreamEntryID.MAXIMUM_ID, StreamEntryID.MINIMUM_ID); + + pipe.sync(); + + assertThat(range7.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id3)); + assertThat(range8.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id3, id2, id1)); + assertThat(range9.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id3, id2, id1)); + } + + @Test + public void xrevrangeExclusive() { + StreamEntryID id1 = jedis.xadd("xrange-stream", (StreamEntryID) null, singletonMap("f1", "v1")); + StreamEntryID id2 = jedis.xadd("xrange-stream", (StreamEntryID) null, singletonMap("f2", "v2")); + + Response> range1 = pipe.xrevrange("xrange-stream", "+", id1.toString(), 2); + Response> range2 = pipe.xrevrange("xrange-stream", "+", "(" + id1, 2); + + pipe.sync(); + + assertThat(range1.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2, id1)); + assertThat(range2.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2)); + } + + @Test + public void xgroup() { + StreamEntryID id1 = jedis.xadd("xgroup-stream", (StreamEntryID) null, singletonMap("f1", "v1")); + + pipe.xgroupCreate("xgroup-stream", "consumer-group-name", null, false); + pipe.xgroupSetID("xgroup-stream", "consumer-group-name", id1); + pipe.xgroupCreate("xgroup-stream", "consumer-group-name1", StreamEntryID.LAST_ENTRY, false); + + pipe.xgroupDestroy("xgroup-stream", "consumer-group-name"); + pipe.xgroupDelConsumer("xgroup-stream", "consumer-group-name1", "myconsumer1"); + pipe.xgroupCreateConsumer("xgroup-stream", "consumer-group-name1", "myconsumer2"); + pipe.xgroupDelConsumer("xgroup-stream", "consumer-group-name1", "myconsumer2"); + + assertThat(pipe.syncAndReturnAll(), contains( + "OK", + "OK", + "OK", + 1L, + 0L, + true, + 0L + )); + } + + @Test + public void xreadGroupWithParams() { + // Simple xreadGroup with NOACK + Map map1 = singletonMap("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map1); + + jedis.xgroupCreate("xreadGroup-stream1", "xreadGroup-group", null, false); + + Map streamQuery1 = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + + Response>>> streams1 = + pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQuery1); + + pipe.sync(); + + assertThat(streams1.get().stream().map(Entry::getKey).collect(Collectors.toList()), + contains("xreadGroup-stream1")); + + assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getID).collect(Collectors.toList()), contains(id1)); + + assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map1)); + + Map map2 = singletonMap("f2", "v2"); + StreamEntryID id2 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map2); + + Map map3 = singletonMap("f3", "v3"); + StreamEntryID id3 = jedis.xadd("xreadGroup-stream2", (StreamEntryID) null, map3); + + jedis.xgroupCreate("xreadGroup-stream2", "xreadGroup-group", null, false); + + // Read only a single Stream + Response>>> streams2 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQuery1); + + // Read from two Streams + Map streamQuery2 = new LinkedHashMap<>(); + streamQuery2.put("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + streamQuery2.put("xreadGroup-stream2", StreamEntryID.UNRECEIVED_ENTRY); + + Response>>> streams3 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQuery2); + + pipe.sync(); + + assertThat(streams2.get().stream().map(Entry::getKey).collect(Collectors.toList()), + contains("xreadGroup-stream1")); + + assertThat(streams2.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getID).collect(Collectors.toList()), contains(id2)); + + assertThat(streams2.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map2)); + + assertThat(streams3.get().stream().map(Entry::getKey).collect(Collectors.toList()), + contains("xreadGroup-stream2")); + + assertThat(streams3.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getID).collect(Collectors.toList()), contains(id3)); + + assertThat(streams3.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map3)); + + // Read only fresh messages + Map map4 = singletonMap("f4", "v4"); + StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map4); + + Map streamQueryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + Response>>> streams4 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQueryFresh); + + pipe.sync(); + + assertThat(streams4.get().stream().map(Entry::getKey).collect(Collectors.toList()), + contains("xreadGroup-stream1")); + + assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getID).collect(Collectors.toList()), contains(id4)); + + assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map4)); + } + + @Test + public void xreadGroupWithParamsWhenPendingMessageIsDiscarded() { + // Add two message to stream + Map map1 = singletonMap("f1", "v1"); + + XAddParams xAddParams = XAddParams.xAddParams().id(StreamEntryID.NEW_ENTRY).maxLen(2); + StreamEntryID firstMessageEntryId = jedis.xadd("xreadGroup-discard-stream1", xAddParams, map1); + + jedis.xadd("xreadGroup-discard-stream1", xAddParams, singletonMap("f2", "v2")); + + pipe.xgroupCreate("xreadGroup-discard-stream1", "xreadGroup-group", null, false); + + Map streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.UNRECEIVED_ENTRY); + + Response>>> streams1 = + pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(1), streamQuery1); + + pipe.sync(); + + assertThat(streams1.get().stream().map(Entry::getKey).collect(Collectors.toList()), + contains("xreadGroup-discard-stream1")); + + assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getID).collect(Collectors.toList()), contains(firstMessageEntryId)); + + assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map1)); + + // Add third message, the fields of pending message1 will be discarded by redis-server + jedis.xadd("xreadGroup-discard-stream1", xAddParams, singletonMap("f3", "v3")); + + Map streamQueryPending = singletonMap("xreadGroup-discard-stream1", new StreamEntryID()); + + Response>>> pendingMessages = + pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQueryPending); + + pipe.sync(); + + assertThat(pendingMessages.get().stream().map(Entry::getKey).collect(Collectors.toList()), + contains("xreadGroup-discard-stream1")); + + assertThat(pendingMessages.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getID).collect(Collectors.toList()), contains(firstMessageEntryId)); + + assertThat(pendingMessages.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getFields).collect(Collectors.toList()), contains(nullValue())); + } + + @Test + public void xack() { + pipe.xadd("xack-stream", (StreamEntryID) null, singletonMap("f1", "v1")); + + pipe.xgroupCreate("xack-stream", "xack-group", null, false); + + Map streamQuery1 = singletonMap("xack-stream", StreamEntryID.UNRECEIVED_ENTRY); + + // Empty Stream + Response>>> streams1 = + pipe.xreadGroup("xack-group", "xack-consumer", + XReadGroupParams.xReadGroupParams().count(1).block(1), streamQuery1); + + pipe.sync(); + + List ids = streams1.get().stream().map(Entry::getValue).flatMap(List::stream) + .map(StreamEntry::getID).collect(Collectors.toList()); + assertThat(ids, hasSize(1)); + + Response xackResponse = pipe.xack("xack-stream", "xack-group", ids.get(0)); + + pipe.sync(); + + assertThat(xackResponse.get(), equalTo(1L)); + } + + @Test + public void xpendingWithParams() { + Map map = singletonMap("f1", "v1"); + + StreamEntryID id1 = jedis.xadd("xpending-stream", (StreamEntryID) null, map); + + assertEquals("OK", jedis.xgroupCreate("xpending-stream", "xpending-group", null, false)); + + Map streamQeury1 = singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY); + + // Read the event from Stream put it on pending + Response>>> range = pipe.xreadGroup("xpending-group", + "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), streamQeury1); + + // Get the pending event + Response> pending1 = + pipe.xpending("xpending-stream", "xpending-group", + new XPendingParams().count(3).consumer("xpending-consumer")); + + // Without consumer + Response> pending2 = + pipe.xpending("xpending-stream", "xpending-group", + new XPendingParams().count(3)); + + // with idle + Response> pending3 = + pipe.xpending("xpending-stream", "xpending-group", + new XPendingParams().idle(Duration.ofMinutes(1).toMillis()).count(3)); + + pipe.sync(); + + assertThat(pending1.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()), + contains("xpending-consumer")); + + assertThat(pending1.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()), + contains(id1)); + + assertThat(pending1.get().stream().map(StreamPendingEntry::getDeliveredTimes).collect(Collectors.toList()), + contains(1L)); + + assertThat(pending2.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()), + contains("xpending-consumer")); + + assertThat(pending2.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()), + contains(id1)); + + assertThat(pending2.get().stream().map(StreamPendingEntry::getDeliveredTimes).collect(Collectors.toList()), + contains(1L)); + + assertThat(pending3.get(), empty()); + } + + @Test + public void xpendingRange() { + StreamEntryID id1 = jedis.xadd("xpending-stream", (StreamEntryID) null, singletonMap("f1", "v1")); + StreamEntryID id2 = jedis.xadd("xpending-stream", (StreamEntryID) null, singletonMap("f2", "v2")); + + pipe.xgroupCreate("xpending-stream", "xpending-group", null, false); + + // read 1 message from the group with each consumer + Map streamQeury = singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY); + pipe.xreadGroup("xpending-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury); + pipe.xreadGroup("xpending-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury); + + Response> pending1 = pipe.xpending("xpending-stream", "xpending-group", + XPendingParams.xPendingParams("(0", "+", 5)); + + Response> pending2 = pipe.xpending("xpending-stream", "xpending-group", + XPendingParams.xPendingParams(StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID, 5)); + + pipe.sync(); + + assertThat(pending1.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()), + contains("consumer1", "consumer2")); + + assertThat(pending1.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()), + contains(id1, id2)); + + assertThat(pending2.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()), + contains("consumer1", "consumer2")); + + assertThat(pending2.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()), + contains(id1, id2)); + } + + @Test + public void xclaimWithParams() throws InterruptedException { + Map map1 = singletonMap("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xpending-stream", (StreamEntryID) null, map1); + + pipe.xgroupCreate("xpending-stream", "xpending-group", null, false); + + // Read the event from Stream put it on pending + pipe.xreadGroup("xpending-group", "xpending-consumer", + XReadGroupParams.xReadGroupParams().count(1).block(1), + singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + + // Get the pending event + Response> pending = + pipe.xpending("xpending-stream", "xpending-group", + XPendingParams.xPendingParams().count(3).consumer("xpending-consumer")); + + // must sync before the sleep + pipe.sync(); + + // Sleep a bit so we can claim events pending for more than 50ms + Thread.sleep(100); + + Response> claimed = + pipe.xclaim("xpending-stream", "xpending-group", "xpending-consumer2", 50, + XClaimParams.xClaimParams().idle(0).retryCount(0), id1); + + pipe.sync(); + + assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()), + contains("xpending-consumer")); + + assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()), + contains(id1)); + + assertThat(claimed.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), + contains(id1)); + + assertThat(claimed.get().stream().map(StreamEntry::getFields).collect(Collectors.toList()), + contains(map1)); + } + + @Test + public void xclaimJustId() throws InterruptedException { + Map map1 = singletonMap("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xpending-stream", (StreamEntryID) null, map1); + + pipe.xgroupCreate("xpending-stream", "xpending-group", null, false); + + // Read the event from Stream put it on pending + pipe.xreadGroup("xpending-group", "xpending-consumer", + XReadGroupParams.xReadGroupParams().count(1).block(1), + singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + + // Get the pending event + Response> pending = + pipe.xpending("xpending-stream", "xpending-group", + XPendingParams.xPendingParams().count(3).consumer("xpending-consumer")); + + // must sync before the sleep + pipe.sync(); + + // Sleep for 100ms so we can claim events pending for more than 50ms + Thread.sleep(100); + + Response> claimedIds = + pipe.xclaimJustId("xpending-stream", "xpending-group", "xpending-consumer2", 50, + XClaimParams.xClaimParams().idle(0).retryCount(0), id1); + + pipe.sync(); + + assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()), + contains("xpending-consumer")); + + assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()), + contains(id1)); + + assertThat(claimedIds.get(), contains(id1)); + } + + @Test + public void xautoclaim() throws InterruptedException { + Map map1 = singletonMap("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xpending-stream", (StreamEntryID) null, map1); + + pipe.xgroupCreate("xpending-stream", "xpending-group", null, false); + + // Read the event from Stream put it on pending + pipe.xreadGroup("xpending-group", "xpending-consumer", + XReadGroupParams.xReadGroupParams().count(1).block(1), + singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + + // Get the pending event + Response> pending = pipe.xpending("xpending-stream", "xpending-group", + XPendingParams.xPendingParams().count(3).consumer("xpending-consumer")); + + pipe.sync(); + + // Sleep for 100ms so we can auto claim events pending for more than 50ms + Thread.sleep(100); + + // Auto claim pending events to different consumer + Response>> autoclaimed = pipe.xautoclaim("xpending-stream", "xpending-group", + "xpending-consumer2", 50, new StreamEntryID(), new XAutoClaimParams().count(1)); + + pipe.sync(); + + assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()), + contains("xpending-consumer")); + + assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()), + contains(id1)); + + assertThat(autoclaimed.get().getValue().stream().map(StreamEntry::getID).collect(Collectors.toList()), + contains(id1)); + + assertThat(autoclaimed.get().getValue().stream().map(StreamEntry::getFields).collect(Collectors.toList()), + contains(map1)); + } + + @Test + public void xautoclaimBinary() throws InterruptedException { + Map map1 = singletonMap("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xpending-stream", XAddParams.xAddParams(), map1); + + pipe.xgroupCreate("xpending-stream", "xpending-group", null, false); + + // Read the event from Stream put it on pending + pipe.xreadGroup("xpending-group", "xpending-consumer", + XReadGroupParams.xReadGroupParams().count(1).block(1), + singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + + // Get the pending event + Response> pending = pipe.xpending("xpending-stream", "xpending-group", + XPendingParams.xPendingParams().count(3).consumer("xpending-consumer")); + + pipe.sync(); + + // Sleep for 100ms so we can auto claim events pending for more than 50ms + Thread.sleep(100); + + // Auto claim pending events to different consumer + Response> autoclaimed = pipe.xautoclaim(SafeEncoder.encode("xpending-stream"), + SafeEncoder.encode("xpending-group"), SafeEncoder.encode("xpending-consumer2"), + 50, SafeEncoder.encode(new StreamEntryID().toString()), new XAutoClaimParams().count(1)); + + pipe.sync(); + + assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()), + contains("xpending-consumer")); + + assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()), + contains(id1)); + + Map.Entry> autoclaimedParsed = + BuilderFactory.STREAM_AUTO_CLAIM_RESPONSE.build(autoclaimed.get()); + + assertThat(autoclaimedParsed.getValue().stream().map(StreamEntry::getID).collect(Collectors.toList()), + contains(id1)); + + assertThat(autoclaimedParsed.getValue().stream().map(StreamEntry::getFields).collect(Collectors.toList()), + contains(map1)); + } + + @Test + public void xautoclaimJustId() throws InterruptedException { + Map map1 = singletonMap("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xpending-stream", XAddParams.xAddParams(), map1); + + pipe.xgroupCreate("xpending-stream", "xpending-group", null, false); + + // Read the event from Stream put it on pending + pipe.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), + singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + + // Get the pending event + Response> pending = pipe.xpending("xpending-stream", "xpending-group", + XPendingParams.xPendingParams().count(3).consumer("xpending-consumer")); + + pipe.sync(); + + // Sleep for 100ms so we can auto claim events pending for more than 50ms + Thread.sleep(100); + + // Auto claim pending events to different consumer + Response>> claimedIds = pipe.xautoclaimJustId("xpending-stream", "xpending-group", + "xpending-consumer2", 50, new StreamEntryID(), new XAutoClaimParams().count(1)); + + pipe.sync(); + + assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()), + contains("xpending-consumer")); + + assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()), + contains(id1)); + + assertThat(claimedIds.get().getValue(), contains(id1)); + } + + @Test + public void xautoclaimJustIdBinary() throws InterruptedException { + Map map1 = singletonMap("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xpending-stream", XAddParams.xAddParams(), map1); + + pipe.xgroupCreate("xpending-stream", "xpending-group", null, false); + + // Read the event from Stream put it on pending + pipe.xreadGroup("xpending-group", "xpending-consumer", + XReadGroupParams.xReadGroupParams().count(1).block(1), + singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + + // Get the pending event + Response> pending = pipe.xpending("xpending-stream", "xpending-group", + XPendingParams.xPendingParams().count(3).consumer("xpending-consumer")); + + pipe.sync(); + + // Sleep for 100ms so we can auto claim events pending for more than 50ms + Thread.sleep(100); + + // Auto claim pending events to different consumer + Response> autoclaimed = pipe.xautoclaimJustId(SafeEncoder.encode("xpending-stream"), + SafeEncoder.encode("xpending-group"), SafeEncoder.encode("xpending-consumer2"), + 50, SafeEncoder.encode(new StreamEntryID().toString()), new XAutoClaimParams().count(1)); + + pipe.sync(); + + assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()), + contains("xpending-consumer")); + + assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()), + contains(id1)); + + Entry> autoclaimedParsed = + BuilderFactory.STREAM_AUTO_CLAIM_JUSTID_RESPONSE.build(autoclaimed.get()); + + assertThat(autoclaimedParsed.getValue(), contains(id1)); + } + + @Test + public void xinfo() throws InterruptedException { + final String STREAM_NAME = "xadd-stream1"; + final String F1 = "f1"; + final String V1 = "v1"; + final String V2 = "v2"; + final String G1 = "G1"; + final String G2 = "G2"; + final String MY_CONSUMER = "myConsumer"; + final String MY_CONSUMER2 = "myConsumer2"; + + Map map1 = new HashMap<>(); + map1.put(F1, V1); + StreamEntryID id1 = jedis.xadd(STREAM_NAME, (StreamEntryID) null, map1); + map1.put(F1, V2); + StreamEntryID id2 = jedis.xadd(STREAM_NAME, (StreamEntryID) null, map1); + + Response streamInfoResponse = pipe.xinfoStream(STREAM_NAME); + + pipe.xgroupCreate(STREAM_NAME, G1, StreamEntryID.LAST_ENTRY, false); + + Map streamQuery1 = singletonMap(STREAM_NAME, new StreamEntryID("0-0")); + + pipe.xreadGroup(G1, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQuery1); + + pipe.sync(); + + Thread.sleep(1); + + Response> groupInfoResponse = pipe.xinfoGroups(STREAM_NAME); + Response> consumersInfoResponse = pipe.xinfoConsumers(STREAM_NAME, G1); + Response> consumerInfoResponse = pipe.xinfoConsumers2(STREAM_NAME, G1); + + pipe.sync(); + + // Stream info test + StreamInfo streamInfo = streamInfoResponse.get(); + + assertEquals(2L, streamInfo.getStreamInfo().get(StreamInfo.LENGTH)); + assertEquals(1L, streamInfo.getStreamInfo().get(StreamInfo.RADIX_TREE_KEYS)); + assertEquals(2L, streamInfo.getStreamInfo().get(StreamInfo.RADIX_TREE_NODES)); + assertEquals(0L, streamInfo.getStreamInfo().get(StreamInfo.GROUPS)); + assertEquals(V1, ((StreamEntry) streamInfo.getStreamInfo().get(StreamInfo.FIRST_ENTRY)).getFields().get(F1)); + assertEquals(V2, ((StreamEntry) streamInfo.getStreamInfo().get(StreamInfo.LAST_ENTRY)).getFields().get(F1)); + assertEquals(id2, streamInfo.getStreamInfo().get(StreamInfo.LAST_GENERATED_ID)); + + // Using getters + assertEquals(2, streamInfo.getLength()); + assertEquals(1, streamInfo.getRadixTreeKeys()); + assertEquals(2, streamInfo.getRadixTreeNodes()); + assertEquals(0, streamInfo.getGroups()); + assertEquals(V1, streamInfo.getFirstEntry().getFields().get(F1)); + assertEquals(V2, streamInfo.getLastEntry().getFields().get(F1)); + assertEquals(id2, streamInfo.getLastGeneratedId()); + + // Group info test + List groupInfo = groupInfoResponse.get(); + + assertEquals(1, groupInfo.size()); + assertEquals(G1, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.NAME)); + assertEquals(1L, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.CONSUMERS)); + assertEquals(0L, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.PENDING)); + assertEquals(id2, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.LAST_DELIVERED)); + + // Using getters + assertEquals(1, groupInfo.size()); + assertEquals(G1, groupInfo.get(0).getName()); + assertEquals(1, groupInfo.get(0).getConsumers()); + assertEquals(0, groupInfo.get(0).getPending()); + assertEquals(id2, groupInfo.get(0).getLastDeliveredId()); + + // Consumers info test + List consumersInfo = consumersInfoResponse.get(); + + assertEquals(MY_CONSUMER, + consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.NAME)); + assertEquals(0L, consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.PENDING)); + assertTrue((Long) consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.IDLE) > 0); + + // Using getters + assertEquals(MY_CONSUMER, consumersInfo.get(0).getName()); + assertEquals(0L, consumersInfo.get(0).getPending()); + assertThat(consumersInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumersInfo.get(0).getInactive(), Matchers.any(Long.class)); + + // Consumer info test + List consumerInfo = consumerInfoResponse.get(); + + assertEquals(MY_CONSUMER, + consumerInfo.get(0).getConsumerInfo().get(StreamConsumerInfo.NAME)); + assertEquals(0L, consumerInfo.get(0).getConsumerInfo().get(StreamConsumerInfo.PENDING)); + assertTrue((Long) consumerInfo.get(0).getConsumerInfo().get(StreamConsumerInfo.IDLE) > 0); + + // Using getters + assertEquals(MY_CONSUMER, consumerInfo.get(0).getName()); + assertEquals(0L, consumerInfo.get(0).getPending()); + assertThat(consumerInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumerInfo.get(0).getInactive(), Matchers.any(Long.class)); + + // test with more groups and consumers + pipe.xgroupCreate(STREAM_NAME, G2, StreamEntryID.LAST_ENTRY, false); + pipe.xreadGroup(G1, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQuery1); + pipe.xreadGroup(G2, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQuery1); + pipe.xreadGroup(G2, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQuery1); + + Response> manyGroupsInfoResponse = pipe.xinfoGroups(STREAM_NAME); + Response> manyConsumersInfoResponse = pipe.xinfoConsumers(STREAM_NAME, G2); + Response> manyConsumerInfoResponse = pipe.xinfoConsumers2(STREAM_NAME, G2); + Response streamInfoFullResponse = pipe.xinfoStreamFull(STREAM_NAME); + Response streamInfoFull10Response = pipe.xinfoStreamFull(STREAM_NAME, 10); + + pipe.sync(); + + List manyGroupsInfo = manyGroupsInfoResponse.get(); + List manyConsumersInfo = manyConsumersInfoResponse.get(); + List manyConsumerInfo = manyConsumerInfoResponse.get(); + StreamFullInfo streamInfoFull = streamInfoFullResponse.get(); + StreamFullInfo streamInfoFull10 = streamInfoFull10Response.get(); + + assertEquals(2, manyGroupsInfo.size()); + assertEquals(2, manyConsumersInfo.size()); + assertEquals(2, manyConsumerInfo.size()); + + assertEquals(2, streamInfoFull.getEntries().size()); + assertEquals(2, streamInfoFull.getGroups().size()); + assertEquals(2, streamInfoFull.getLength()); + assertEquals(1, streamInfoFull.getRadixTreeKeys()); + assertEquals(2, streamInfoFull.getRadixTreeNodes()); + assertEquals(0, streamInfo.getGroups()); + assertEquals(G1, streamInfoFull.getGroups().get(0).getName()); + assertEquals(G2, streamInfoFull.getGroups().get(1).getName()); + assertEquals(V1, streamInfoFull.getEntries().get(0).getFields().get(F1)); + assertEquals(V2, streamInfoFull.getEntries().get(1).getFields().get(F1)); + assertEquals(id2, streamInfoFull.getLastGeneratedId()); + + assertEquals(G1, streamInfoFull10.getGroups().get(0).getName()); + assertEquals(G2, streamInfoFull10.getGroups().get(1).getName()); + assertEquals(V1, streamInfoFull10.getEntries().get(0).getFields().get(F1)); + assertEquals(V2, streamInfoFull10.getEntries().get(1).getFields().get(F1)); + assertEquals(id2, streamInfoFull10.getLastGeneratedId()); + + // Not existing key - redis cli return error so we expect exception + pipe.xinfoStream("random"); + + assertThat(pipe.syncAndReturnAll(), contains( + both(instanceOf(JedisDataException.class)).and(hasToString(containsString("ERR no such key"))) + )); + } + + @Test + public void xinfoStreamFullWithPending() { + Map map = singletonMap("f1", "v1"); + StreamEntryID id1 = jedis.xadd("streamfull2", (StreamEntryID) null, map); + StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map); + jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false); + + Map streamQeury1 = singletonMap("streamfull2", StreamEntryID.UNRECEIVED_ENTRY); + Response>>> pending = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(1), streamQeury1); + + Response fullResult = pipe.xinfoStreamFull("streamfull2"); + + pipe.sync(); + + assertThat(pending.get(), hasSize(1)); + + StreamFullInfo full = fullResult.get(); + assertEquals(1, full.getGroups().size()); + StreamGroupFullInfo group = full.getGroups().get(0); + assertEquals("xreadGroup-group", group.getName()); + + assertEquals(1, group.getPending().size()); + List groupPendingEntry = group.getPending().get(0); + assertEquals(id1, groupPendingEntry.get(0)); + assertEquals("xreadGroup-consumer", groupPendingEntry.get(1)); + + assertEquals(1, group.getConsumers().size()); + StreamConsumerFullInfo consumer = group.getConsumers().get(0); + assertEquals("xreadGroup-consumer", consumer.getName()); + assertThat(consumer.getSeenTime(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumer.getActiveTime(), Matchers.greaterThanOrEqualTo(0L)); + assertEquals(1, consumer.getPending().size()); + List consumerPendingEntry = consumer.getPending().get(0); + assertEquals(id1, consumerPendingEntry.get(0)); + } +}