From be7e61f85dc9ad88ac039b2c71a4dcccbc9639c2 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Wed, 27 Mar 2024 19:15:58 +0600 Subject: [PATCH] Add last entry id for XREADs and support XREADs reply as map (#3791) --- .../redis/clients/jedis/BuilderFactory.java | 37 +++- .../redis/clients/jedis/CommandObjects.java | 21 +++ src/main/java/redis/clients/jedis/Jedis.java | 18 +- .../redis/clients/jedis/PipeliningBase.java | 10 ++ .../redis/clients/jedis/StreamEntryID.java | 57 ++++++- .../redis/clients/jedis/UnifiedJedis.java | 11 ++ .../jedis/commands/StreamCommands.java | 12 ++ .../commands/StreamPipelineCommands.java | 12 ++ .../commands/jedis/StreamsCommandsTest.java | 159 +++++++++++++----- 9 files changed, 277 insertions(+), 60 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 0ec00b252a..26d5b5c384 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1420,10 +1420,10 @@ public List>> build(Object data) { .collect(Collectors.toList()); } else { List>> result = new ArrayList<>(list.size()); - for (Object streamObj : list) { - List stream = (List) streamObj; - String streamKey = STRING.build(stream.get(0)); - List streamEntries = STREAM_ENTRY_LIST.build(stream.get(1)); + for (Object anObj : list) { + List streamObj = (List) anObj; + String streamKey = STRING.build(streamObj.get(0)); + List streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1)); result.add(KeyValue.of(streamKey, streamEntries)); } return result; @@ -1436,6 +1436,35 @@ public String toString() { } }; + public static final Builder>> STREAM_READ_MAP_RESPONSE + = new Builder>>() { + @Override + public Map> build(Object data) { + if (data == null) return null; + List list = (List) data; + if (list.isEmpty()) return Collections.emptyMap(); + + if (list.get(0) instanceof KeyValue) { + return ((List) list).stream() + .collect(Collectors.toMap(kv -> STRING.build(kv.getKey()), kv -> STREAM_ENTRY_LIST.build(kv.getValue()))); + } else { + Map> result = new HashMap<>(list.size()); + for (Object anObj : list) { + List streamObj = (List) anObj; + String streamKey = STRING.build(streamObj.get(0)); + List streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1)); + result.put(streamKey, streamEntries); + } + return result; + } + } + + @Override + public String toString() { + return "Map>"; + } + }; + public static final Builder> STREAM_PENDING_ENTRY_LIST = new Builder>() { @Override @SuppressWarnings("unchecked") diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index 1d8e23bd2e..de0f38a2ad 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -2658,6 +2658,15 @@ public final CommandObject>>> xread( return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE); } + public final CommandObject>> xreadAsMap( + XReadParams xReadParams, Map streams) { + CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS); + Set> entrySet = streams.entrySet(); + entrySet.forEach(entry -> args.key(entry.getKey())); + entrySet.forEach(entry -> args.add(entry.getValue())); + return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE); + } + public final CommandObject>>> xreadGroup( String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams) { @@ -2670,6 +2679,18 @@ public final CommandObject>>> xreadGrou return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE); } + public final CommandObject>> xreadGroupAsMap( + String groupName, String consumer, XReadGroupParams xReadGroupParams, + Map streams) { + CommandArguments args = commandArguments(XREADGROUP) + .add(GROUP).add(groupName).add(consumer) + .addParams(xReadGroupParams).add(STREAMS); + Set> entrySet = streams.entrySet(); + entrySet.forEach(entry -> args.key(entry.getKey())); + entrySet.forEach(entry -> args.add(entry.getValue())); + return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE); + } + public final CommandObject> xread(XReadParams xReadParams, Map.Entry... streams) { CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS); for (Map.Entry entry : streams) { diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 4e91ed1ca2..95de209a4b 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -9390,6 +9390,12 @@ public List>> xread(final XReadParams xReadP return connection.executeCommand(commandObjects.xread(xReadParams, streams)); } + @Override + public Map> xreadAsMap(final XReadParams xReadParams, final Map streams) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xreadAsMap(xReadParams, streams)); + } + @Override public long xack(final String key, final String group, final StreamEntryID... ids) { checkIsInMultiOrPipeline(); @@ -9446,13 +9452,19 @@ public long xtrim(final String key, final XTrimParams params) { } @Override - public List>> xreadGroup(final String groupName, - final String consumer, final XReadGroupParams xReadGroupParams, - final Map streams) { + public List>> xreadGroup(final String groupName, final String consumer, + final XReadGroupParams xReadGroupParams, final Map streams) { checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } + @Override + public Map> xreadGroupAsMap(final String groupName, final String consumer, + final XReadGroupParams xReadGroupParams, final Map streams) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams)); + } + @Override public StreamPendingSummary xpending(final String key, final String groupName) { checkIsInMultiOrPipeline(); diff --git a/src/main/java/redis/clients/jedis/PipeliningBase.java b/src/main/java/redis/clients/jedis/PipeliningBase.java index db0622adea..7289547924 100644 --- a/src/main/java/redis/clients/jedis/PipeliningBase.java +++ b/src/main/java/redis/clients/jedis/PipeliningBase.java @@ -1527,11 +1527,21 @@ public Response>>> xread(XReadParams xR return appendCommand(commandObjects.xread(xReadParams, streams)); } + @Override + public Response>> xreadAsMap(XReadParams xReadParams, Map streams) { + return appendCommand(commandObjects.xreadAsMap(xReadParams, streams)); + } + @Override public Response>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams) { return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } + @Override + public Response>> xreadGroupAsMap(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams) { + return appendCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams)); + } + @Override public Response eval(String script) { return appendCommand(commandObjects.eval(script)); diff --git a/src/main/java/redis/clients/jedis/StreamEntryID.java b/src/main/java/redis/clients/jedis/StreamEntryID.java index 9644010d7c..5381638b5a 100644 --- a/src/main/java/redis/clients/jedis/StreamEntryID.java +++ b/src/main/java/redis/clients/jedis/StreamEntryID.java @@ -80,9 +80,7 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN /** * Should be used only with XADD * - * - * XADD mystream * field1 value1 - * + * {@code XADD mystream * field1 value1} */ public static final StreamEntryID NEW_ENTRY = new StreamEntryID() { @@ -97,11 +95,31 @@ public String toString() { /** * Should be used only with XGROUP CREATE * - * - * XGROUP CREATE mystream consumer-group-name $ - * + * {@code XGROUP CREATE mystream consumer-group-name $} */ - public static final StreamEntryID LAST_ENTRY = new StreamEntryID() { + public static final StreamEntryID XGROUP_LAST_ENTRY = new StreamEntryID() { + + private static final long serialVersionUID = 1L; + + @Override + public String toString() { + return "$"; + } + }; + + /** + * @deprecated Use {@link StreamEntryID#XGROUP_LAST_ENTRY} for XGROUP CREATE command or + * {@link StreamEntryID#XREAD_NEW_ENTRY} for XREAD command. + */ + @Deprecated + public static final StreamEntryID LAST_ENTRY = XGROUP_LAST_ENTRY; + + /** + * Should be used only with XREAD + * + * {@code XREAD BLOCK 5000 COUNT 100 STREAMS mystream $} + */ + public static final StreamEntryID XREAD_NEW_ENTRY = new StreamEntryID() { private static final long serialVersionUID = 1L; @@ -114,9 +132,9 @@ public String toString() { /** * Should be used only with XREADGROUP *

- * {@code XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >} + * {@code XREADGROUP GROUP mygroup myconsumer STREAMS mystream >} */ - public static final StreamEntryID UNRECEIVED_ENTRY = new StreamEntryID() { + public static final StreamEntryID XREADGROUP_UNDELIVERED_ENTRY = new StreamEntryID() { private static final long serialVersionUID = 1L; @@ -126,6 +144,12 @@ public String toString() { } }; + /** + * @deprecated Use {@link StreamEntryID#XREADGROUP_UNDELIVERED_ENTRY}. + */ + @Deprecated + public static final StreamEntryID UNRECEIVED_ENTRY = XREADGROUP_UNDELIVERED_ENTRY; + /** * Can be used in XRANGE, XREVRANGE and XPENDING commands. */ @@ -151,4 +175,19 @@ public String toString() { return "+"; } }; + + /** + * Should be used only with XREAD + * + * {@code XREAD STREAMS mystream +} + */ + public static final StreamEntryID XREAD_LAST_ENTRY = new StreamEntryID() { + + private static final long serialVersionUID = 1L; + + @Override + public String toString() { + return "+"; + } + }; } diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index a0febc66d5..3b0cc3b9a5 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -3056,12 +3056,23 @@ public List>> xread(XReadParams xReadParams, return executeCommand(commandObjects.xread(xReadParams, streams)); } + @Override + public Map> xreadAsMap(XReadParams xReadParams, Map streams) { + return executeCommand(commandObjects.xreadAsMap(xReadParams, streams)); + } + @Override public List>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams) { return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } + @Override + public Map> xreadGroupAsMap(String groupName, String consumer, + XReadGroupParams xReadGroupParams, Map streams) { + return executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams)); + } + @Override public byte[] xadd(byte[] key, XAddParams params, Map hash) { return executeCommand(commandObjects.xadd(key, params, hash)); diff --git a/src/main/java/redis/clients/jedis/commands/StreamCommands.java b/src/main/java/redis/clients/jedis/commands/StreamCommands.java index 9c34cbc6a6..163e11050e 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamCommands.java @@ -250,7 +250,19 @@ List>> xread(XReadParams xReadParams, /** * XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] */ + Map> xreadAsMap(XReadParams xReadParams, + Map streams); + + /** + * XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] + */ List>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams); + /** + * XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] + */ + Map> xreadGroupAsMap(String groupName, String consumer, + XReadGroupParams xReadGroupParams, Map streams); + } diff --git a/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java b/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java index e435c02341..d4bda0fb98 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java @@ -243,7 +243,19 @@ Response>>> xread(XReadParams xReadPara /** * XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] */ + Response>> xreadAsMap(XReadParams xReadParams, + Map streams); + + /** + * XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] + */ Response>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams); + /** + * XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] + */ + Response>> xreadGroupAsMap(String groupName, String consumer, + XReadGroupParams xReadGroupParams, Map streams); + } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java index 7c985b9e47..9f028acfd2 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java @@ -1,6 +1,8 @@ package redis.clients.jedis.commands.jedis; +import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -15,7 +17,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicReference; -import org.hamcrest.MatcherAssert; + import org.hamcrest.Matchers; import org.junit.Test; @@ -265,7 +267,10 @@ public void xrangeExclusive() { @Test public void xreadWithParams() { - Map streamQeury1 = singletonMap("xread-stream1", new StreamEntryID()); + final String key1 = "xread-stream1"; + final String key2 = "xread-stream2"; + + Map streamQeury1 = singletonMap(key1, new StreamEntryID()); // Before creating Stream assertNull(jedis.xread(XReadParams.xReadParams().block(1), streamQeury1)); @@ -273,28 +278,78 @@ public void xreadWithParams() { Map map = new HashMap<>(); map.put("f1", "v1"); - StreamEntryID id1 = jedis.xadd("xread-stream1", (StreamEntryID) null, map); - StreamEntryID id2 = jedis.xadd("xread-stream2", (StreamEntryID) null, map); + StreamEntryID id1 = jedis.xadd(key1, (StreamEntryID) null, map); + StreamEntryID id2 = jedis.xadd(key2, (StreamEntryID) null, map); // Read only a single Stream List>> streams1 = jedis.xread(XReadParams.xReadParams().count(1).block(1), streamQeury1); assertEquals(1, streams1.size()); - assertEquals("xread-stream1", streams1.get(0).getKey()); + assertEquals(key1, streams1.get(0).getKey()); assertEquals(1, streams1.get(0).getValue().size()); assertEquals(id1, streams1.get(0).getValue().get(0).getID()); assertEquals(map, streams1.get(0).getValue().get(0).getFields()); - assertNull(jedis.xread(XReadParams.xReadParams().block(1), singletonMap("xread-stream1", id1))); - assertNull(jedis.xread(XReadParams.xReadParams(), singletonMap("xread-stream1", id1))); + assertNull(jedis.xread(XReadParams.xReadParams().block(1), singletonMap(key1, id1))); + assertNull(jedis.xread(XReadParams.xReadParams(), singletonMap(key1, id1))); // Read from two Streams - Map streamQuery23 = new LinkedHashMap<>(); - streamQuery23.put("xread-stream1", new StreamEntryID()); - streamQuery23.put("xread-stream2", new StreamEntryID()); - List>> streams2 = jedis.xread(XReadParams.xReadParams().count(2).block(1), streamQuery23); + Map streamQuery2 = new LinkedHashMap<>(); + streamQuery2.put(key1, new StreamEntryID()); + streamQuery2.put(key2, new StreamEntryID()); + List>> streams2 = jedis.xread(XReadParams.xReadParams().count(2).block(1), streamQuery2); assertEquals(2, streams2.size()); } + @Test + public void xreadAsMap() { + + final String stream1 = "xread-stream1"; + final String stream2 = "xread-stream2"; + + Map streamQeury1 = singletonMap(stream1, new StreamEntryID()); + + // Before creating Stream + assertNull(jedis.xreadAsMap(XReadParams.xReadParams().block(1), streamQeury1)); + assertNull(jedis.xreadAsMap(XReadParams.xReadParams(), streamQeury1)); + + Map map = new HashMap<>(); + map.put("f1", "v1"); + StreamEntryID id1 = new StreamEntryID(1); + StreamEntryID id2 = new StreamEntryID(2); + StreamEntryID id3 = new StreamEntryID(3); + + assertEquals(id1, jedis.xadd(stream1, id1, map)); + assertEquals(id2, jedis.xadd(stream2, id2, map)); + assertEquals(id3, jedis.xadd(stream1, id3, map)); + + // Read only a single Stream + Map> streams1 = jedis.xreadAsMap(XReadParams.xReadParams().count(2), streamQeury1); + assertEquals(singleton(stream1), streams1.keySet()); + List list1 = streams1.get(stream1); + assertEquals(2, list1.size()); + assertEquals(id1, list1.get(0).getID()); + assertEquals(map, list1.get(0).getFields()); + assertEquals(id3, list1.get(1).getID()); + assertEquals(map, list1.get(1).getFields()); + + // Read from two Streams + Map streamQuery2 = new LinkedHashMap<>(); + streamQuery2.put(stream1, new StreamEntryID()); + streamQuery2.put(stream2, new StreamEntryID()); + Map> streams2 = jedis.xreadAsMap(XReadParams.xReadParams().count(1), streamQuery2); + assertEquals(2, streams2.size()); + assertEquals(id1, streams2.get(stream1).get(0).getID()); + assertEquals(id2, streams2.get(stream2).get(0).getID()); + + // Read from last entry + Map streamQueryLE = singletonMap(stream1, StreamEntryID.XREAD_LAST_ENTRY); + Map> streamsLE = jedis.xreadAsMap(XReadParams.xReadParams().count(1), streamQueryLE); + assertEquals(singleton(stream1), streamsLE.keySet()); + assertEquals(1, streamsLE.get(stream1).size()); + assertEquals(id3, streamsLE.get(stream1).get(0).getID()); + assertEquals(map, streamsLE.get(stream1).get(0).getFields()); + } + @Test public void xreadBlockZero() throws InterruptedException { final AtomicReference readId = new AtomicReference<>(); @@ -403,13 +458,13 @@ public void xrevrangeExclusive() { @Test public void xgroup() { - Map map = new HashMap(); + Map map = new HashMap<>(); map.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xgroup-stream", (StreamEntryID) null, map); assertEquals("OK", jedis.xgroupCreate("xgroup-stream", "consumer-group-name", null, false)); assertEquals("OK", jedis.xgroupSetID("xgroup-stream", "consumer-group-name", id1)); - assertEquals("OK", jedis.xgroupCreate("xgroup-stream", "consumer-group-name1", StreamEntryID.LAST_ENTRY, false)); + assertEquals("OK", jedis.xgroupCreate("xgroup-stream", "consumer-group-name1", StreamEntryID.XGROUP_LAST_ENTRY, false)); jedis.xgroupDestroy("xgroup-stream", "consumer-group-name"); assertEquals(0L, jedis.xgroupDelConsumer("xgroup-stream", "consumer-group-name1","myconsumer1")); @@ -425,7 +480,7 @@ public void xreadGroupWithParams() { map.put("f1", "v1"); jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map); jedis.xgroupCreate("xreadGroup-stream1", "xreadGroup-group", null, false); - Map streamQeury1 = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); List>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQeury1); assertEquals(1, range.size()); @@ -436,27 +491,44 @@ public void xreadGroupWithParams() { jedis.xgroupCreate("xreadGroup-stream2", "xreadGroup-group", null, false); // Read only a single Stream - Map streamQeury11 = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); List>> streams1 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", - XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQeury11); + XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQeury1); assertEquals(1, streams1.size()); assertEquals(1, streams1.get(0).getValue().size()); // Read from two Streams - Map streamQuery23 = new LinkedHashMap<>(); - streamQuery23.put("xreadGroup-stream1", new StreamEntryID()); - streamQuery23.put("xreadGroup-stream2", new StreamEntryID()); + Map streamQuery2 = new LinkedHashMap<>(); + streamQuery2.put("xreadGroup-stream1", new StreamEntryID()); + streamQuery2.put("xreadGroup-stream2", new StreamEntryID()); List>> streams2 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", - XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQuery23); + XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQuery2); assertEquals(2, streams2.size()); // Read only fresh messages StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map); - Map streamQeuryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); - List>> streams3 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + Map streamQeuryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + List>> streamsFresh = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQeuryFresh); - assertEquals(1, streams3.size()); - assertEquals(id4, streams3.get(0).getValue().get(0).getID()); + assertEquals(1, streamsFresh.size()); + assertEquals(id4, streamsFresh.get(0).getValue().get(0).getID()); + } + + @Test + public void xreadGroupAsMap() { + + final String stream1 = "xreadGroup-stream1"; + Map map = singletonMap("f1", "v1"); + + StreamEntryID id1 = jedis.xadd(stream1, StreamEntryID.NEW_ENTRY, map); + jedis.xgroupCreate(stream1, "xreadGroup-group", null, false); + Map streamQeury1 = singletonMap(stream1, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + Map> range = jedis.xreadGroupAsMap("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().noAck(), streamQeury1); + assertEquals(singleton(stream1), range.keySet()); + List list = range.get(stream1); + assertEquals(1, list.size()); + assertEquals(id1, list.get(0).getID()); + assertEquals(map, list.get(0).getFields()); } @Test @@ -473,7 +545,7 @@ public void xreadGroupWithParamsWhenPendingMessageIsDiscarded() { jedis.xadd("xreadGroup-discard-stream1", xAddParams, map2); jedis.xgroupCreate("xreadGroup-discard-stream1", "xreadGroup-group", null, false); - Map streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); List>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(1), streamQuery1); assertEquals(1, range.size()); @@ -506,8 +578,7 @@ public void xack() { jedis.xgroupCreate("xack-stream", "xack-group", null, false); - Map streamQeury1 = singletonMap( - "xack-stream", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap("xack-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); // Empty Stream List>> range = jedis.xreadGroup("xack-group", "xack-consumer", @@ -535,7 +606,7 @@ public void xpendingWithParams() { map.put("f1", "v1"); StreamEntryID id1 = jedis.xadd(stream, (StreamEntryID) null, map); - Map streamQeury1 = singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); // Read the event from Stream put it on pending List>> range = jedis.xreadGroup("xpendeing-group", @@ -582,7 +653,7 @@ public void xpendingRange() { jedis.xgroupCreate(stream, "xpendeing-group", null, false); // read 1 message from the group with each consumer - Map streamQeury = singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury = singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); jedis.xreadGroup("xpendeing-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury); jedis.xreadGroup("xpendeing-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury); @@ -614,7 +685,7 @@ public void xclaimWithParams() { // Read the event from Stream put it on pending jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending(stream, "xpendeing-group", @@ -646,7 +717,7 @@ public void xclaimJustId() { // Read the event from Stream put it on pending jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending(stream, "xpendeing-group", @@ -675,7 +746,7 @@ public void xautoclaim() { // Read the event from Stream put it on pending jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending("xpending-stream", "xpending-group", @@ -705,7 +776,7 @@ public void xautoclaimBinary() { // Read the event from Stream put it on pending jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending("xpending-stream", "xpending-group", @@ -737,7 +808,7 @@ public void xautoclaimJustId() { // Read the event from Stream put it on pending jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending("xpending-stream", "xpending-group", @@ -767,7 +838,7 @@ public void xautoclaimJustIdBinary() { // Read the event from Stream put it on pending jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending("xpending-stream", "xpending-group", @@ -810,7 +881,7 @@ public void xinfo() throws InterruptedException { StreamInfo streamInfo = jedis.xinfoStream(STREAM_NAME); assertNotNull(id2); - jedis.xgroupCreate(STREAM_NAME, G1, StreamEntryID.LAST_ENTRY, false); + jedis.xgroupCreate(STREAM_NAME, G1, StreamEntryID.XGROUP_LAST_ENTRY, false); Map streamQeury11 = singletonMap( STREAM_NAME, new StreamEntryID("0-0")); jedis.xreadGroup(G1, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQeury11); @@ -862,8 +933,8 @@ public void xinfo() throws InterruptedException { // Using getters assertEquals(MY_CONSUMER, consumersInfo.get(0).getName()); assertEquals(0L, consumersInfo.get(0).getPending()); - MatcherAssert.assertThat(consumersInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L)); - MatcherAssert.assertThat(consumersInfo.get(0).getInactive(), Matchers.any(Long.class)); + assertThat(consumersInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumersInfo.get(0).getInactive(), Matchers.any(Long.class)); // Consumer info test assertEquals(MY_CONSUMER, @@ -874,11 +945,11 @@ public void xinfo() throws InterruptedException { // Using getters assertEquals(MY_CONSUMER, consumerInfo.get(0).getName()); assertEquals(0L, consumerInfo.get(0).getPending()); - MatcherAssert.assertThat(consumerInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L)); - MatcherAssert.assertThat(consumerInfo.get(0).getInactive(), Matchers.any(Long.class)); + assertThat(consumerInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumerInfo.get(0).getInactive(), Matchers.any(Long.class)); // test with more groups and consumers - jedis.xgroupCreate(STREAM_NAME, G2, StreamEntryID.LAST_ENTRY, false); + jedis.xgroupCreate(STREAM_NAME, G2, StreamEntryID.XGROUP_LAST_ENTRY, false); jedis.xreadGroup(G1, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQeury11); jedis.xreadGroup(G2, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQeury11); jedis.xreadGroup(G2, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQeury11); @@ -929,7 +1000,7 @@ public void xinfoStreamFullWithPending() { StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map); jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false); - Map streamQeury1 = singletonMap("streamfull2", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap("streamfull2", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); List>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(1), streamQeury1); assertEquals(1, range.size()); @@ -948,8 +1019,8 @@ public void xinfoStreamFullWithPending() { assertEquals(1, group.getConsumers().size()); StreamConsumerFullInfo consumer = group.getConsumers().get(0); assertEquals("xreadGroup-consumer", consumer.getName()); - MatcherAssert.assertThat(consumer.getSeenTime(), Matchers.greaterThanOrEqualTo(0L)); - MatcherAssert.assertThat(consumer.getActiveTime(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumer.getSeenTime(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumer.getActiveTime(), Matchers.greaterThanOrEqualTo(0L)); assertEquals(1, consumer.getPending().size()); List consumerPendingEntry = consumer.getPending().get(0); assertEquals(id1, consumerPendingEntry.get(0));