Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/peer manager #596

Merged
merged 18 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ var (
Destination: &options.Address,
EnvVars: []string{"WAKUNODE2_ADDRESS"},
})
MaxPeerConnections = altsrc.NewIntFlag(&cli.IntFlag{
Name: "max-connections",
Value: 50,
Usage: "Maximum allowed number of libp2p connections.",
Destination: &options.MaxPeerConnections,
EnvVars: []string{"WAKUNODE2_MAX_CONNECTIONS"},
})
WebsocketSupport = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "websocket-support",
Aliases: []string{"ws"},
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func main() {
&cli.StringFlag{Name: "config-file", Usage: "loads configuration from a TOML file (cmd-line parameters take precedence)"},
TcpPort,
Address,
MaxPeerConnections,
WebsocketSupport,
WebsocketPort,
WebsocketSecurePort,
Expand Down
9 changes: 5 additions & 4 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence/sqlite"
dbutils "github.com/waku-org/go-waku/waku/persistence/utils"
wmetrics "github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/rendezvous"

"github.com/ethereum/go-ethereum/accounts/keystore"
Expand Down Expand Up @@ -129,6 +129,7 @@ func Execute(options Options) {
node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr),
node.WithKeepAlive(options.KeepAlive),
node.WithMaxPeerConnections(options.MaxPeerConnections),
}
if len(options.AdvertiseAddresses) != 0 {
nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...))
Expand Down Expand Up @@ -403,7 +404,7 @@ func Execute(options Options) {
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
logger.Info("retrieving peer info via peer exchange protocol")

peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, peers.Static, peer_exchange.PeerExchangeID_v20alpha1)
peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static, peer_exchange.PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
} else {
Expand Down Expand Up @@ -434,7 +435,7 @@ func Execute(options Options) {

var peerIDs []peer.ID
for _, n := range options.Store.ResumeNodes {
pID, err := wakuNode.AddPeer(n, peers.Static, store.StoreID_v20beta4)
pID, err := wakuNode.AddPeer(n, wakupeerstore.Static, store.StoreID_v20beta4)
if err != nil {
logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
Expand Down Expand Up @@ -502,7 +503,7 @@ func Execute(options Options) {

func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) {
for _, addr := range addresses {
_, err := wakuNode.AddPeer(addr, peers.Static, protocols...)
_, err := wakuNode.AddPeer(addr, wakupeerstore.Static, protocols...)
failOnErr(err, "error adding peer")
}
}
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type Options struct {
PersistPeers bool
UserAgent string
PProf bool
MaxPeerConnections int

PeerExchange PeerExchangeOptions
Websocket WSOptions
Expand Down
4 changes: 2 additions & 2 deletions cmd/waku/rest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
Expand Down Expand Up @@ -197,7 +197,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()

_, err = d.node.AddPeer(peerAddr, peers.Static)
_, err = d.node.AddPeer(peerAddr, peerstore.Static)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
Expand Down
4 changes: 2 additions & 2 deletions cmd/waku/rpc/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestFilterSubscription(t *testing.T) {
break
}

_, err = d.node.AddPeer(addr, peers.Static, legacy_filter.FilterID_v20beta1)
_, err = d.node.AddPeer(addr, peerstore.Static, legacy_filter.FilterID_v20beta1)
require.NoError(t, err)

args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}}
Expand Down
8 changes: 4 additions & 4 deletions examples/basic2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ require (
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 // indirect
github.com/waku-org/go-zerokit-rln v0.1.12 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 // indirect
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d // indirect
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9 // indirect
github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b // indirect
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0 // indirect
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8 // indirect
github.com/wk8/go-ordered-map v1.0.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions examples/basic2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -658,14 +658,14 @@ github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZF
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 h1:0e1h+p84yBp0IN7AqgbZlV7lgFBjm214lgSOE7CeJmE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7/go.mod h1:pFvOZ9YTFsW0o5zJW7a0B5tr1owAijRWJctXJ2toL04=
github.com/waku-org/go-zerokit-rln v0.1.12 h1:66+tU6sTlmUpuUlEv7kCFOGZ37MwZYFJBXHcm8QquwU=
github.com/waku-org/go-zerokit-rln v0.1.12/go.mod h1:MUW+wB6Yj7UBMdZrhko7oHfUZeY2wchggXYjpUiMoac=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 h1:Q5XQqo+PEmvrybT8D7BEsKCwIYDi80s+00Q49cfm9Gs=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d h1:Kcg85Y2xGU6hqZ/kMfkLQF2jAog8vt+tw1/VNidzNtE=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d/go.mod h1:7cSGUoGVIla1IpnChrLbkVjkYgdOcr7rcifEfh4ReR4=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9 h1:u+YUlWDltHiK5upSb7M6mStc84zdc4vTCNNOz7R5RaY=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9/go.mod h1:+LeEYoW5/uBUTVjtBGLEVCUe9mOYAlu5ZPkIxLOSr5Y=
github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb h1:pxPRTh2DWCPCC5dhFisHuBCm1k54fMtR8VR6hUWD734=
github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb/go.mod h1:QYTnrByLh6OXvMzSvPNs5aykT/w4fQb4krGcZfKgSZw=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b h1:wWs8b91SVrxYy37gdNnFDCbjv1hMUHMTwaJUktyjrJE=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0 h1:JU5aMzRFeyG/DOiMwLy3F1AMuuXjzPrUKZpW72kAHxE=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0/go.mod h1:7cSGUoGVIla1IpnChrLbkVjkYgdOcr7rcifEfh4ReR4=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8 h1:pQmTryFdSQuUe8dxt/dHgEfRdLwqf1DEGeReuMcJ9Yg=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8/go.mod h1:+LeEYoW5/uBUTVjtBGLEVCUe9mOYAlu5ZPkIxLOSr5Y=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8=
github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk=
Expand Down
4 changes: 2 additions & 2 deletions mobile/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
dbutils "github.com/waku-org/go-waku/waku/persistence/utils"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
Expand Down Expand Up @@ -256,7 +256,7 @@ func AddPeer(address string, protocolID string) string {
return MakeJSONResponse(err)
}

peerID, err := wakuState.node.AddPeer(ma, peers.Static, libp2pProtocol.ID(protocolID))
peerID, err := wakuState.node.AddPeer(ma, peerstore.Static, libp2pProtocol.ID(protocolID))
return PrepareJSONResponse(peerID, err)
}

Expand Down
12 changes: 6 additions & 6 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/multiformats/go-multiaddr"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)

Expand Down Expand Up @@ -107,7 +107,7 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e
return nil, err
}

psWrapper := peers.NewWakuPeerstore(ps)
psWrapper := peerstore.NewWakuPeerstore(ps)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,19 +138,19 @@ func RandomHex(n int) (string, error) {
type TestPeerDiscoverer struct {
sync.RWMutex
peerMap map[peer.ID]struct{}
peerCh chan v2.PeerData
peerCh chan peermanager.PeerData
}

func NewTestPeerDiscoverer() *TestPeerDiscoverer {
result := &TestPeerDiscoverer{
peerMap: make(map[peer.ID]struct{}),
peerCh: make(chan v2.PeerData, 10),
peerCh: make(chan peermanager.PeerData, 10),
}

return result
}

func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan v2.PeerData) {
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan peermanager.PeerData) {
go func() {
for p := range ch {
t.Lock()
Expand Down
15 changes: 7 additions & 8 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-discover/discover"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"

"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"

Expand All @@ -31,7 +30,7 @@ var ErrNoDiscV5Listener = errors.New("no discv5 listener")

// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
type PeerConnector interface {
Subscribe(context.Context, <-chan v2.PeerData)
Subscribe(context.Context, <-chan peermanager.PeerData)
}

type DiscoveryV5 struct {
Expand All @@ -43,7 +42,7 @@ type DiscoveryV5 struct {
localnode *enode.LocalNode

peerConnector PeerConnector
peerCh chan v2.PeerData
peerCh chan peermanager.PeerData
NAT nat.Interface

log *zap.Logger
Expand Down Expand Up @@ -203,7 +202,7 @@ func (d *DiscoveryV5) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel

d.peerCh = make(chan v2.PeerData)
d.peerCh = make(chan peermanager.PeerData)
d.peerConnector.Subscribe(ctx, d.peerCh)

err := d.listen(ctx)
Expand Down Expand Up @@ -423,8 +422,8 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
defer iterator.Close()

d.Iterate(ctx, iterator, func(n *enode.Node, p peer.AddrInfo) error {
peer := v2.PeerData{
Origin: peers.Discv5,
peer := peermanager.PeerData{
Origin: peerstore.Discv5,
AddrInfo: p,
ENR: n,
}
Expand Down
9 changes: 8 additions & 1 deletion waku/v2/node/connectedness.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.opencensus.io/stats"
"go.uber.org/zap"

wps "github.com/waku-org/go-waku/waku/v2/peerstore"
)

// PeerStatis is a map of peer IDs to supported protocols
Expand Down Expand Up @@ -66,14 +68,19 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m multiaddr.Multiaddr

// Connected is called when a connection is opened
func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()))
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()), zap.String("direction", cc.Stat().Direction.String()))
if c.connNotifCh != nil {
select {
case c.connNotifCh <- PeerConnection{cc.RemotePeer(), true}:
default:
c.log.Warn("subscriber is too slow")
}
}
//TODO: Move this to be stored in Waku's own peerStore
err := c.h.Peerstore().(wps.WakuPeerstore).SetDirection(cc.RemotePeer(), cc.Stat().Direction)
if err != nil {
c.log.Error("Failed to set peer direction for an outgoing connection", zap.Error(err))
}
stats.Record(c.ctx, metrics.Peers.M(1))
}

Expand Down
Loading