Skip to content

Commit

Permalink
more stream id changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Mar 27, 2024
1 parent 3d233df commit 9140c3d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 39 deletions.
16 changes: 9 additions & 7 deletions src/main/java/redis/clients/jedis/StreamEntryID.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public String toString() {
};

/**
* @deprecated Use {@link StreamEntryID#XGROUP_LAST_ENTRY} for XREADGROUP command or
* @deprecated Use {@link StreamEntryID#XGROUP_LAST_ENTRY} for XGROUP CREATE command or
* {@link StreamEntryID#XREAD_NEW_ENTRY} for XREAD command.
*/
@Deprecated
Expand All @@ -132,9 +132,9 @@ public String toString() {
/**
* Should be used only with XREADGROUP
* <p>
* {@code XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >}
* {@code XREADGROUP GROUP mygroup myconsumer STREAMS mystream >}
*/
public static final StreamEntryID UNRECEIVED_ENTRY = new StreamEntryID() {
public static final StreamEntryID XREADGROUP_UNDELIVERED_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;

Expand All @@ -144,10 +144,15 @@ public String toString() {
}
};

/**
* @deprecated Use {@link StreamEntryID#XREADGROUP_UNDELIVERED_ENTRY}.
*/
@Deprecated
public static final StreamEntryID UNRECEIVED_ENTRY = XREADGROUP_UNDELIVERED_ENTRY;

/**
* Can be used in XRANGE, XREVRANGE and XPENDING commands.
*/
// TODO: FIRST_ID ?
public static final StreamEntryID MINIMUM_ID = new StreamEntryID() {

private static final long serialVersionUID = 1L;
Expand All @@ -161,8 +166,6 @@ public String toString() {
/**
* Can be used in XRANGE, XREVRANGE and XPENDING commands.
*/
// TODO: LAST_ID ?
// TODO: unify with XREAD_LAST_ENTRY ??
public static final StreamEntryID MAXIMUM_ID = new StreamEntryID() {

private static final long serialVersionUID = 1L;
Expand All @@ -178,7 +181,6 @@ public String toString() {
*
* {@code XREAD STREAMS mystream +}
*/
// TODO: unify with MAXIMUM_ID ??
public static final StreamEntryID XREAD_LAST_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ public void xreadGroupWithParams() {
map.put("f1", "v1");
jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map);
jedis.xgroupCreate("xreadGroup-stream1", "xreadGroup-group", null, false);
Map<String, StreamEntryID> streamQeury1 = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQeury1 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQeury1);
assertEquals(1, range.size());
Expand All @@ -499,7 +499,7 @@ public void xreadGroupWithParams() {
jedis.xgroupCreate("xreadGroup-stream2", "xreadGroup-group", null, false);

// Read only a single Stream
Map<String, StreamEntryID> streamQeury11 = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQeury11 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Entry<String, List<StreamEntry>>> streams1 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQeury11);
assertEquals(1, streams1.size());
Expand All @@ -515,7 +515,7 @@ public void xreadGroupWithParams() {

// Read only fresh messages
StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map);
Map<String, StreamEntryID> streamQeuryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQeuryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Entry<String, List<StreamEntry>>> streams3 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQeuryFresh);
assertEquals(1, streams3.size());
Expand All @@ -536,7 +536,7 @@ public void xreadGroupWithParamsWhenPendingMessageIsDiscarded() {
jedis.xadd("xreadGroup-discard-stream1", xAddParams, map2);

jedis.xgroupCreate("xreadGroup-discard-stream1", "xreadGroup-group", null, false);
Map<String, StreamEntryID> streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(1), streamQuery1);
assertEquals(1, range.size());
Expand Down Expand Up @@ -569,8 +569,7 @@ public void xack() {

jedis.xgroupCreate("xack-stream", "xack-group", null, false);

Map<String, StreamEntryID> streamQeury1 = singletonMap(
"xack-stream", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQeury1 = singletonMap("xack-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

// Empty Stream
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xack-group", "xack-consumer",
Expand All @@ -589,8 +588,7 @@ public void xpendingWithParams() {

assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false));

Map<String, StreamEntryID> streamQeury1 = singletonMap(
"xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQeury1 = singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

// Read the event from Stream put it on pending
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xpendeing-group",
Expand Down Expand Up @@ -630,8 +628,7 @@ public void xpendingRange() {
jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false);

// read 1 message from the group with each consumer
Map<String, StreamEntryID> streamQeury = singletonMap(
"xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQeury = singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
jedis.xreadGroup("xpendeing-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury);
jedis.xreadGroup("xpendeing-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury);

Expand Down Expand Up @@ -662,7 +659,7 @@ public void xclaimWithParams() {

// Read the event from Stream put it on pending
jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
Expand Down Expand Up @@ -693,7 +690,7 @@ public void xclaimJustId() {

// Read the event from Stream put it on pending
jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
Expand Down Expand Up @@ -722,7 +719,7 @@ public void xautoclaim() {

// Read the event from Stream put it on pending
jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpending-stream", "xpending-group",
Expand Down Expand Up @@ -752,7 +749,7 @@ public void xautoclaimBinary() {

// Read the event from Stream put it on pending
jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpending-stream", "xpending-group",
Expand Down Expand Up @@ -784,7 +781,7 @@ public void xautoclaimJustId() {

// Read the event from Stream put it on pending
jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpending-stream", "xpending-group",
Expand Down Expand Up @@ -814,7 +811,7 @@ public void xautoclaimJustIdBinary() {

// Read the event from Stream put it on pending
jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpending-stream", "xpending-group",
Expand Down Expand Up @@ -976,7 +973,7 @@ public void xinfoStreamFullWithPending() {
StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false);

Map<String, StreamEntryID> streamQeury1 = singletonMap("streamfull2", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQeury1 = singletonMap("streamfull2", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(1), streamQeury1);
assertEquals(1, range.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ public void xreadGroupWithParams() {

jedis.xgroupCreate("xreadGroup-stream1", "xreadGroup-group", null, false);

Map<String, StreamEntryID> streamQuery1 = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQuery1 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

Response<List<Entry<String, List<StreamEntry>>>> streams1 =
pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
Expand Down Expand Up @@ -679,8 +679,8 @@ public void xreadGroupWithParams() {

// Read from two Streams
Map<String, StreamEntryID> streamQuery2 = new LinkedHashMap<>();
streamQuery2.put("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY);
streamQuery2.put("xreadGroup-stream2", StreamEntryID.UNRECEIVED_ENTRY);
streamQuery2.put("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
streamQuery2.put("xreadGroup-stream2", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

Response<List<Entry<String, List<StreamEntry>>>> streams3 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQuery2);
Expand Down Expand Up @@ -709,7 +709,7 @@ public void xreadGroupWithParams() {
Map<String, String> map4 = singletonMap("f4", "v4");
StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map4);

Map<String, StreamEntryID> streamQueryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQueryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
Response<List<Entry<String, List<StreamEntry>>>> streams4 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQueryFresh);

Expand Down Expand Up @@ -737,7 +737,7 @@ public void xreadGroupWithParamsWhenPendingMessageIsDiscarded() {

pipe.xgroupCreate("xreadGroup-discard-stream1", "xreadGroup-group", null, false);

Map<String, StreamEntryID> streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

Response<List<Entry<String, List<StreamEntry>>>> streams1 =
pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
Expand Down Expand Up @@ -781,7 +781,7 @@ public void xack() {

pipe.xgroupCreate("xack-stream", "xack-group", null, false);

Map<String, StreamEntryID> streamQuery1 = singletonMap("xack-stream", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQuery1 = singletonMap("xack-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

// Empty Stream
Response<List<Entry<String, List<StreamEntry>>>> streams1 =
Expand Down Expand Up @@ -809,7 +809,7 @@ public void xpendingWithParams() {

assertEquals("OK", jedis.xgroupCreate("xpending-stream", "xpending-group", null, false));

Map<String, StreamEntryID> streamQeury1 = singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQeury1 = singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

// Read the event from Stream put it on pending
Response<List<Entry<String, List<StreamEntry>>>> range = pipe.xreadGroup("xpending-group",
Expand Down Expand Up @@ -861,7 +861,7 @@ public void xpendingRange() {
pipe.xgroupCreate("xpending-stream", "xpending-group", null, false);

// read 1 message from the group with each consumer
Map<String, StreamEntryID> streamQeury = singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQeury = singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
pipe.xreadGroup("xpending-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury);
pipe.xreadGroup("xpending-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury);

Expand Down Expand Up @@ -896,7 +896,7 @@ public void xclaimWithParams() throws InterruptedException {
// Read the event from Stream put it on pending
pipe.xreadGroup("xpending-group", "xpending-consumer",
XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
Response<List<StreamPendingEntry>> pending =
Expand Down Expand Up @@ -938,7 +938,7 @@ public void xclaimJustId() throws InterruptedException {
// Read the event from Stream put it on pending
pipe.xreadGroup("xpending-group", "xpending-consumer",
XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
Response<List<StreamPendingEntry>> pending =
Expand Down Expand Up @@ -976,7 +976,7 @@ public void xautoclaim() throws InterruptedException {
// Read the event from Stream put it on pending
pipe.xreadGroup("xpending-group", "xpending-consumer",
XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
Response<List<StreamPendingEntry>> pending = pipe.xpending("xpending-stream", "xpending-group",
Expand Down Expand Up @@ -1016,7 +1016,7 @@ public void xautoclaimBinary() throws InterruptedException {
// Read the event from Stream put it on pending
pipe.xreadGroup("xpending-group", "xpending-consumer",
XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
Response<List<StreamPendingEntry>> pending = pipe.xpending("xpending-stream", "xpending-group",
Expand Down Expand Up @@ -1059,7 +1059,7 @@ public void xautoclaimJustId() throws InterruptedException {

// Read the event from Stream put it on pending
pipe.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
Response<List<StreamPendingEntry>> pending = pipe.xpending("xpending-stream", "xpending-group",
Expand Down Expand Up @@ -1095,7 +1095,7 @@ public void xautoclaimJustIdBinary() throws InterruptedException {
// Read the event from Stream put it on pending
pipe.xreadGroup("xpending-group", "xpending-consumer",
XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY));
singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
Response<List<StreamPendingEntry>> pending = pipe.xpending("xpending-stream", "xpending-group",
Expand Down Expand Up @@ -1281,7 +1281,7 @@ public void xinfoStreamFullWithPending() {
StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false);

Map<String, StreamEntryID> streamQeury1 = singletonMap("streamfull2", StreamEntryID.UNRECEIVED_ENTRY);
Map<String, StreamEntryID> streamQeury1 = singletonMap("streamfull2", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
Response<List<Entry<String, List<StreamEntry>>>> pending = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(1), streamQeury1);

Expand Down

0 comments on commit 9140c3d

Please sign in to comment.