Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[config change] Support multiple producers in redis streams #2581

Merged
merged 23 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7e49635
Support multiple producers in redis streams
ganeshvanahalli Aug 15, 2024
e155ebb
trim acknotifiers map and use previous keepalive timeouts
ganeshvanahalli Aug 15, 2024
1473c60
Use faster hash function
ganeshvanahalli Aug 16, 2024
85ba40b
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Aug 16, 2024
40e6b9b
address PR comments, handle memory better and add test to cover incor…
ganeshvanahalli Aug 26, 2024
a2d9b03
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Aug 26, 2024
b5fff68
increase TestProducerConfig requestTimeout
ganeshvanahalli Aug 26, 2024
cee4620
fix tests
ganeshvanahalli Aug 26, 2024
2a3dc1d
rectify xautoclaim logic and address PR comments
ganeshvanahalli Aug 27, 2024
ecd2722
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Aug 27, 2024
4028e99
merge master and resolve conflicts
ganeshvanahalli Sep 5, 2024
dbcf908
Merge branch 'master' into multiple-producers-redisstream
tsahee Sep 13, 2024
8525ad4
address PR comments
ganeshvanahalli Sep 13, 2024
4548690
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Sep 13, 2024
a8967d1
remove separate id impl
ganeshvanahalli Sep 16, 2024
9508627
merge master and resolve conflicts
ganeshvanahalli Sep 16, 2024
6123021
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Sep 27, 2024
9a96fbf
address PR comments
ganeshvanahalli Sep 27, 2024
43a54e7
remove unnecessary error log in decrementMsgIdByOne
ganeshvanahalli Sep 27, 2024
8c655e6
merge master and address PR comments
ganeshvanahalli Oct 8, 2024
08e0fc4
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Oct 8, 2024
dc72b58
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Oct 9, 2024
01a1b38
Merge branch 'master' into multiple-producers-redisstream
tsahee Oct 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/signer/core/apitypes"
"github.com/go-redis/redis/v8"
"github.com/holiman/uint256"
"github.com/offchainlabs/nitro/arbnode/dataposter/dbstorage"
"github.com/offchainlabs/nitro/arbnode/dataposter/noop"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/redis/go-redis/v9"
"github.com/spf13/pflag"

redisstorage "github.com/offchainlabs/nitro/arbnode/dataposter/redis"
Expand Down
4 changes: 2 additions & 2 deletions arbnode/dataposter/redis/redisstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"errors"
"fmt"

"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/util/signature"
"github.com/redis/go-redis/v9"
)

// Storage implements redis sorted set backed storage. It does not support
Expand Down Expand Up @@ -196,7 +196,7 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu
if err != nil {
return err
}
if err := pipe.ZAdd(ctx, s.key, &redis.Z{
if err := pipe.ZAdd(ctx, s.key, redis.Z{
Score: float64(index),
Member: string(signedItem),
}).Err(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion arbnode/redislock/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/redis/go-redis/v9"
flag "github.com/spf13/pflag"
)

Expand Down
2 changes: 1 addition & 1 deletion arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"sync/atomic"
"time"

"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
flag "github.com/spf13/pflag"

"github.com/ethereum/go-ethereum/log"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"errors"
"strings"

"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/redis/go-redis/v9"
)

// RedisCoordinator builds upon RedisCoordinator of redisutil with additional functionality
Expand Down
2 changes: 1 addition & 1 deletion das/redis_storage_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (

"golang.org/x/crypto/sha3"

"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/util/pretty"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/redis/go-redis/v9"
flag "github.com/spf13/pflag"

"github.com/ethereum/go-ethereum/common"
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ require (
github.com/ethereum/go-ethereum v1.10.26
github.com/fatih/structtag v1.2.0
github.com/gdamore/tcell/v2 v2.7.1
github.com/go-redis/redis/v8 v8.11.5
github.com/gobwas/httphead v0.1.0
github.com/gobwas/ws v1.2.1
github.com/gobwas/ws-examples v0.0.0-20190625122829-a9e8908d9484
Expand All @@ -39,6 +38,7 @@ require (
github.com/mitchellh/mapstructure v1.4.1
github.com/pkg/errors v0.9.1
github.com/r3labs/diff/v3 v3.0.1
github.com/redis/go-redis/v9 v9.6.1
github.com/rivo/tview v0.0.0-20240307173318-e804876934a1
github.com/spf13/pflag v1.0.5
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
Expand Down Expand Up @@ -99,7 +99,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
Expand Down
22 changes: 12 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88=
github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
Expand All @@ -150,8 +154,8 @@ github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/logex v1.2.0/go.mod h1:9+9sk7u7pGNWYMkh0hdiL++6OeibzJccyQU4p4MedaY=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down Expand Up @@ -281,8 +285,6 @@ github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AE
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
Expand Down Expand Up @@ -582,22 +584,19 @@ github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down Expand Up @@ -643,6 +642,8 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/r3labs/diff/v3 v3.0.1 h1:CBKqf3XmNRHXKmdU7mZP1w7TV0pDyVCis1AUHtA4Xtg=
github.com/r3labs/diff/v3 v3.0.1/go.mod h1:f1S9bourRbiM66NskseyUdo0fTmEE0qKrikYJX63dgo=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/rhnvrm/simples3 v0.6.1 h1:H0DJwybR6ryQE+Odi9eqkHuzjYAeJgtGcGtuBwOhsH8=
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/rivo/tview v0.0.0-20240307173318-e804876934a1 h1:bWLHTRekAy497pE7+nXSuzXwwFHI0XauRzz6roUvY+s=
Expand Down Expand Up @@ -1042,6 +1043,7 @@ golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxb
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
Expand Down
5 changes: 4 additions & 1 deletion pubsub/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package pubsub

import (
"context"
"fmt"
"strings"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
)

func ResultKeyFor(streamName, id string) string { return fmt.Sprintf("%s.%s", streamName, id) }

// CreateStream tries to create stream with given name, if it already exists
// does not return an error.
func CreateStream(ctx context.Context, streamName string, client redis.UniversalClient) error {
Expand Down
Loading
Loading