Skip to content

Commit

Permalink
Merge branch 'unstable' into aleksraiden-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksraiden committed Sep 17, 2024
2 parents 9decde2 + 2a0c57a commit 352d0ce
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 7 deletions.
4 changes: 2 additions & 2 deletions cmake/cpptrace.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(cpptrace
jeremy-rifkin/cpptrace v0.7.0
MD5=d897c48f5bf96134109f7e6716f2fd31
jeremy-rifkin/cpptrace v0.7.1
MD5=8b62f5d3033ab59146cb1fd3ca89d859
)

if (SYMBOLIZE_BACKEND STREQUAL "libbacktrace")
Expand Down
4 changes: 2 additions & 2 deletions cmake/pegtl.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubTarWithMirror(pegtl
taocpp/PEGTL 3.2.7
MD5=31b14660c883bc0489ddcdfbd29199c9
taocpp/PEGTL 3.2.8
MD5=50339029d1bb037909b28c382214033e
)

FetchContent_MakeAvailableWithArgs(pegtl)
7 changes: 4 additions & 3 deletions tests/gocase/unit/namespace/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ func TestNamespaceReplicateWithFullSync(t *testing.T) {
util.WaitForOffsetSync(t, masterClient, slaveClient, 60*time.Second)

// Namespaces should be replicated after the full sync
token, err := slaveClient.Do(ctx, "NAMESPACE", "GET", "foo").Result()
require.NoError(t, err)
require.EqualValues(t, "bar", token)
require.Eventually(t, func() bool {
token, _ := slaveClient.Do(ctx, "NAMESPACE", "GET", "foo").Val().(string)
return token == "bar"
}, 5*time.Second, 100*time.Millisecond)
}

func TestNamespaceRewrite(t *testing.T) {
Expand Down
108 changes: 108 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,114 @@ func TestStreamOffset(t *testing.T) {
require.Equal(t, msgID.ID, infoGroup.LastDeliveredID)
})

t.Run("XINFO Test idle time and pending messages, for issue #2478", func(t *testing.T) {
streamName := "test-stream-2478"
groupName := "test-group-2478"
consumerName := "test-consumer-2478"

rdb.Del(ctx, streamName)
rdb.XGroupDestroy(ctx, streamName, groupName)

for i := 1; i <= 5; i++ {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: fmt.Sprintf("%d-0", i),
Values: map[string]interface{}{"field": fmt.Sprintf("value%d", i)},
}).Err())
}

require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err())
r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 5,
}).Result()
require.NoError(t, err)
require.Len(t, r[0].Messages, 5)

time.Sleep(2 * time.Second)

consumers, err := rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

var consumerInfo redis.XInfoConsumer
for _, c := range consumers {
if c.Name == consumerName {
consumerInfo = c
break
}
}

require.True(t, consumerInfo.Idle >= 2000)
require.Equal(t, int64(5), consumerInfo.Pending)

ackIDs := make([]string, 5)
for i := 1; i <= 5; i++ {
ackIDs[i-1] = fmt.Sprintf("%d-0", i)
}
require.NoError(t, rdb.XAck(ctx, streamName, groupName, ackIDs...).Err())

consumers, err = rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

for _, c := range consumers {
if c.Name == consumerName {
consumerInfo = c
break
}
}

require.Equal(t, int64(0), consumerInfo.Pending)
})

t.Run("XINFO Test consumer removal and inactive time, for issue #2478", func(t *testing.T) {
streamName := "stream-test-2478"
groupName := "group-test-2478"
consumerName := "consumer-test-2478"

rdb.Del(ctx, streamName)
rdb.XGroupDestroy(ctx, streamName, groupName)

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "*",
Values: map[string]interface{}{"field": "value"},
}).Err())

require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err())
_, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 1,
}).Result()
require.NoError(t, err)

time.Sleep(500 * time.Millisecond)

consumers, err := rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

var consumerInfo redis.XInfoConsumer
for _, c := range consumers {
if c.Name == consumerName {
consumerInfo = c
break
}
}

require.Equal(t, consumerName, consumerInfo.Name)
require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err())

consumers, err = rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

for _, c := range consumers {
require.NotEqual(t, consumerName, c.Name)
}
})

t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) {
streamName := "test-stream"
group := "group"
Expand Down

0 comments on commit 352d0ce

Please sign in to comment.