Skip to content

Commit

Permalink
Feat/peer manager (#596)
Browse files Browse the repository at this point in the history
* chore: refactor existing code into peer maanger package

* feat: move peer connection related code into peer manager

* feat: in relay peer connection pruning

* feat: add max-connections CLI flag and limit outRelayPeers based on max-connections #621

* tested both in and out relay connection limits

Co-authored-by: richΛrd <[email protected]>

* Review comment, use context to pause connectivity loop during node shutdown.

Co-authored-by: richΛrd <[email protected]>

* address review comments


---------

Co-authored-by: richΛrd <[email protected]>
  • Loading branch information
chaitanyaprem and richard-ramos authored Aug 3, 2023
1 parent 33344c2 commit 34de294
Show file tree
Hide file tree
Showing 24 changed files with 294 additions and 93 deletions.
7 changes: 7 additions & 0 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ var (
Destination: &options.Address,
EnvVars: []string{"WAKUNODE2_ADDRESS"},
})
MaxPeerConnections = altsrc.NewIntFlag(&cli.IntFlag{
Name: "max-connections",
Value: 50,
Usage: "Maximum allowed number of libp2p connections.",
Destination: &options.MaxPeerConnections,
EnvVars: []string{"WAKUNODE2_MAX_CONNECTIONS"},
})
WebsocketSupport = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "websocket-support",
Aliases: []string{"ws"},
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func main() {
&cli.StringFlag{Name: "config-file", Usage: "loads configuration from a TOML file (cmd-line parameters take precedence)"},
TcpPort,
Address,
MaxPeerConnections,
WebsocketSupport,
WebsocketPort,
WebsocketSecurePort,
Expand Down
9 changes: 5 additions & 4 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence/sqlite"
dbutils "github.com/waku-org/go-waku/waku/persistence/utils"
wmetrics "github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/rendezvous"

"github.com/ethereum/go-ethereum/accounts/keystore"
Expand Down Expand Up @@ -129,6 +129,7 @@ func Execute(options Options) {
node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr),
node.WithKeepAlive(options.KeepAlive),
node.WithMaxPeerConnections(options.MaxPeerConnections),
}
if len(options.AdvertiseAddresses) != 0 {
nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...))
Expand Down Expand Up @@ -403,7 +404,7 @@ func Execute(options Options) {
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
logger.Info("retrieving peer info via peer exchange protocol")

peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, peers.Static, peer_exchange.PeerExchangeID_v20alpha1)
peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static, peer_exchange.PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
} else {
Expand Down Expand Up @@ -434,7 +435,7 @@ func Execute(options Options) {

var peerIDs []peer.ID
for _, n := range options.Store.ResumeNodes {
pID, err := wakuNode.AddPeer(n, peers.Static, store.StoreID_v20beta4)
pID, err := wakuNode.AddPeer(n, wakupeerstore.Static, store.StoreID_v20beta4)
if err != nil {
logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
Expand Down Expand Up @@ -502,7 +503,7 @@ func Execute(options Options) {

func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) {
for _, addr := range addresses {
_, err := wakuNode.AddPeer(addr, peers.Static, protocols...)
_, err := wakuNode.AddPeer(addr, wakupeerstore.Static, protocols...)
failOnErr(err, "error adding peer")
}
}
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type Options struct {
PersistPeers bool
UserAgent string
PProf bool
MaxPeerConnections int

PeerExchange PeerExchangeOptions
Websocket WSOptions
Expand Down
4 changes: 2 additions & 2 deletions cmd/waku/rest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
Expand Down Expand Up @@ -197,7 +197,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()

_, err = d.node.AddPeer(peerAddr, peers.Static)
_, err = d.node.AddPeer(peerAddr, peerstore.Static)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
Expand Down
4 changes: 2 additions & 2 deletions cmd/waku/rpc/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestFilterSubscription(t *testing.T) {
break
}

_, err = d.node.AddPeer(addr, peers.Static, legacy_filter.FilterID_v20beta1)
_, err = d.node.AddPeer(addr, peerstore.Static, legacy_filter.FilterID_v20beta1)
require.NoError(t, err)

args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}}
Expand Down
8 changes: 4 additions & 4 deletions examples/basic2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ require (
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 // indirect
github.com/waku-org/go-zerokit-rln v0.1.12 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 // indirect
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d // indirect
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9 // indirect
github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b // indirect
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0 // indirect
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8 // indirect
github.com/wk8/go-ordered-map v1.0.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions examples/basic2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -658,14 +658,14 @@ github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZF
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 h1:0e1h+p84yBp0IN7AqgbZlV7lgFBjm214lgSOE7CeJmE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7/go.mod h1:pFvOZ9YTFsW0o5zJW7a0B5tr1owAijRWJctXJ2toL04=
github.com/waku-org/go-zerokit-rln v0.1.12 h1:66+tU6sTlmUpuUlEv7kCFOGZ37MwZYFJBXHcm8QquwU=
github.com/waku-org/go-zerokit-rln v0.1.12/go.mod h1:MUW+wB6Yj7UBMdZrhko7oHfUZeY2wchggXYjpUiMoac=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 h1:Q5XQqo+PEmvrybT8D7BEsKCwIYDi80s+00Q49cfm9Gs=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d h1:Kcg85Y2xGU6hqZ/kMfkLQF2jAog8vt+tw1/VNidzNtE=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d/go.mod h1:7cSGUoGVIla1IpnChrLbkVjkYgdOcr7rcifEfh4ReR4=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9 h1:u+YUlWDltHiK5upSb7M6mStc84zdc4vTCNNOz7R5RaY=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9/go.mod h1:+LeEYoW5/uBUTVjtBGLEVCUe9mOYAlu5ZPkIxLOSr5Y=
github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb h1:pxPRTh2DWCPCC5dhFisHuBCm1k54fMtR8VR6hUWD734=
github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb/go.mod h1:QYTnrByLh6OXvMzSvPNs5aykT/w4fQb4krGcZfKgSZw=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b h1:wWs8b91SVrxYy37gdNnFDCbjv1hMUHMTwaJUktyjrJE=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0 h1:JU5aMzRFeyG/DOiMwLy3F1AMuuXjzPrUKZpW72kAHxE=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0/go.mod h1:7cSGUoGVIla1IpnChrLbkVjkYgdOcr7rcifEfh4ReR4=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8 h1:pQmTryFdSQuUe8dxt/dHgEfRdLwqf1DEGeReuMcJ9Yg=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8/go.mod h1:+LeEYoW5/uBUTVjtBGLEVCUe9mOYAlu5ZPkIxLOSr5Y=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8=
github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk=
Expand Down
4 changes: 2 additions & 2 deletions mobile/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
dbutils "github.com/waku-org/go-waku/waku/persistence/utils"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
Expand Down Expand Up @@ -256,7 +256,7 @@ func AddPeer(address string, protocolID string) string {
return MakeJSONResponse(err)
}

peerID, err := wakuState.node.AddPeer(ma, peers.Static, libp2pProtocol.ID(protocolID))
peerID, err := wakuState.node.AddPeer(ma, peerstore.Static, libp2pProtocol.ID(protocolID))
return PrepareJSONResponse(peerID, err)
}

Expand Down
12 changes: 6 additions & 6 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/multiformats/go-multiaddr"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)

Expand Down Expand Up @@ -107,7 +107,7 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e
return nil, err
}

psWrapper := peers.NewWakuPeerstore(ps)
psWrapper := peerstore.NewWakuPeerstore(ps)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,19 +138,19 @@ func RandomHex(n int) (string, error) {
type TestPeerDiscoverer struct {
sync.RWMutex
peerMap map[peer.ID]struct{}
peerCh chan v2.PeerData
peerCh chan peermanager.PeerData
}

func NewTestPeerDiscoverer() *TestPeerDiscoverer {
result := &TestPeerDiscoverer{
peerMap: make(map[peer.ID]struct{}),
peerCh: make(chan v2.PeerData, 10),
peerCh: make(chan peermanager.PeerData, 10),
}

return result
}

func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan v2.PeerData) {
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan peermanager.PeerData) {
go func() {
for p := range ch {
t.Lock()
Expand Down
15 changes: 7 additions & 8 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-discover/discover"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"

"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"

Expand All @@ -31,7 +30,7 @@ var ErrNoDiscV5Listener = errors.New("no discv5 listener")

// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
type PeerConnector interface {
Subscribe(context.Context, <-chan v2.PeerData)
Subscribe(context.Context, <-chan peermanager.PeerData)
}

type DiscoveryV5 struct {
Expand All @@ -43,7 +42,7 @@ type DiscoveryV5 struct {
localnode *enode.LocalNode

peerConnector PeerConnector
peerCh chan v2.PeerData
peerCh chan peermanager.PeerData
NAT nat.Interface

log *zap.Logger
Expand Down Expand Up @@ -203,7 +202,7 @@ func (d *DiscoveryV5) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel

d.peerCh = make(chan v2.PeerData)
d.peerCh = make(chan peermanager.PeerData)
d.peerConnector.Subscribe(ctx, d.peerCh)

err := d.listen(ctx)
Expand Down Expand Up @@ -423,8 +422,8 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
defer iterator.Close()

d.Iterate(ctx, iterator, func(n *enode.Node, p peer.AddrInfo) error {
peer := v2.PeerData{
Origin: peers.Discv5,
peer := peermanager.PeerData{
Origin: peerstore.Discv5,
AddrInfo: p,
ENR: n,
}
Expand Down
9 changes: 8 additions & 1 deletion waku/v2/node/connectedness.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.opencensus.io/stats"
"go.uber.org/zap"

wps "github.com/waku-org/go-waku/waku/v2/peerstore"
)

// PeerStatis is a map of peer IDs to supported protocols
Expand Down Expand Up @@ -66,14 +68,19 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m multiaddr.Multiaddr

// Connected is called when a connection is opened
func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()))
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()), zap.String("direction", cc.Stat().Direction.String()))
if c.connNotifCh != nil {
select {
case c.connNotifCh <- PeerConnection{cc.RemotePeer(), true}:
default:
c.log.Warn("subscriber is too slow")
}
}
//TODO: Move this to be stored in Waku's own peerStore
err := c.h.Peerstore().(wps.WakuPeerstore).SetDirection(cc.RemotePeer(), cc.Stat().Direction)
if err != nil {
c.log.Error("Failed to set peer direction for an outgoing connection", zap.Error(err))
}
stats.Record(c.ctx, metrics.Peers.M(1))
}

Expand Down
Loading

0 comments on commit 34de294

Please sign in to comment.