From e91568ce2655e50287fc4a4ccf071a21e8a3bb96 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 25 Sep 2024 17:12:32 -0400 Subject: [PATCH] chore_: bump go-waku to change datatype of `waku2` field in ENR to byte array --- go.mod | 2 +- go.sum | 4 +- .../go-waku/waku/persistence/store.go | 3 ++ .../go-waku/waku/v2/api/filter/filter.go | 20 ++++---- .../waku/v2/api/filter/filter_manager.go | 17 +++++-- .../waku/v2/api/missing/missing_messages.go | 4 ++ .../waku/v2/api/publish/message_check.go | 2 + .../waku/v2/api/publish/message_queue.go | 2 + .../go-waku/waku/v2/discv5/discover.go | 17 +++---- .../go-waku/waku/v2/discv5/filters.go | 12 ++--- .../waku/v2/discv5/mock_peer_discoverer.go | 2 + .../go-waku/waku/v2/node/connectedness.go | 2 + .../go-waku/waku/v2/node/keepalive.go | 3 ++ .../go-waku/waku/v2/node/localnode.go | 3 ++ .../go-waku/waku/v2/node/wakunode2.go | 10 +++- .../go-waku/waku/v2/node/wakuoptions.go | 14 ++++++ .../waku/v2/peermanager/connection_gater.go | 2 + .../v2/peermanager/fastest_peer_selector.go | 3 ++ .../waku/v2/peermanager/peer_connector.go | 15 +++--- .../waku/v2/peermanager/peer_discovery.go | 2 + .../waku/v2/peermanager/peer_manager.go | 36 ++++++++++++-- .../v2/peermanager/topic_event_handler.go | 2 + .../go-waku/waku/v2/protocol/enr/enr.go | 17 +++++++ .../go-waku/waku/v2/protocol/filter/client.go | 9 +++- .../v2/protocol/filter/filter_health_check.go | 3 ++ .../go-waku/waku/v2/protocol/filter/server.go | 10 ++-- .../v2/protocol/filter/subscribers_map.go | 3 ++ .../legacy_store/waku_store_client.go | 5 +- .../legacy_store/waku_store_protocol.go | 3 ++ .../v2/protocol/lightpush/waku_lightpush.go | 6 +-- .../v2/protocol/metadata/waku_metadata.go | 2 + .../waku/v2/protocol/peer_exchange/client.go | 8 ++-- .../v2/protocol/peer_exchange/protocol.go | 2 + .../waku/v2/protocol/relay/broadcast.go | 2 + .../go-waku/waku/v2/protocol/relay/config.go | 5 ++ .../go-waku/waku/v2/protocol/relay/metrics.go | 2 + .../waku/v2/protocol/relay/topic_events.go | 2 + .../waku/v2/protocol/relay/waku_relay.go | 2 + .../dynamic/membership_fetcher.go | 2 + .../waku/v2/protocol/rln/nullifier_log.go | 2 + .../go-waku/waku/v2/protocol/store/client.go | 48 +++++++++++++++---- .../go-waku/waku/v2/protocol/store/options.go | 9 ++++ .../waku/v2/protocol/store/pb/validation.go | 6 +-- .../go-waku/waku/v2/protocol/store/result.go | 4 +- .../waku-org/go-waku/waku/v2/rendezvous/db.go | 2 + .../go-waku/waku/v2/rendezvous/rendezvous.go | 3 ++ .../v2/service/common_discovery_service.go | 2 + .../go-waku/waku/v2/timesource/ntp.go | 3 ++ .../waku-org/go-waku/waku/v2/utils/logger.go | 8 ++++ .../waku-org/go-waku/waku/v2/utils/peer.go | 5 ++ vendor/modules.txt | 2 +- 51 files changed, 276 insertions(+), 78 deletions(-) diff --git a/go.mod b/go.mod index c03523339a0..67dc82593de 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da + github.com/waku-org/go-waku v0.8.1-0.20240925210455-69ce0c676ce7 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index d89cbbe3bae..60adc86127d 100644 --- a/go.sum +++ b/go.sum @@ -2136,8 +2136,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da h1:bkAJVlJL4Ba83frABWjI9p9MeLGmEHuD/QcjYu3HNbQ= -github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= +github.com/waku-org/go-waku v0.8.1-0.20240925210455-69ce0c676ce7 h1:L4fJfQvzmZi7C9oPz8Yr55/ZIpj345YIfciVt+VkO+4= +github.com/waku-org/go-waku v0.8.1-0.20240925210455-69ce0c676ce7/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/persistence/store.go b/vendor/github.com/waku-org/go-waku/waku/persistence/store.go index 10540c7ce0f..5ca38afbe9d 100644 --- a/vendor/github.com/waku-org/go-waku/waku/persistence/store.go +++ b/vendor/github.com/waku-org/go-waku/waku/persistence/store.go @@ -14,6 +14,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) @@ -186,6 +187,7 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e } func (d *DBStore) updateMetrics(ctx context.Context) { + defer utils.LogOnPanic() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() defer d.wg.Done() @@ -251,6 +253,7 @@ func (d *DBStore) getDeleteOldRowsQuery() string { } func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) { + defer utils.LogOnPanic() defer d.wg.Done() ticker := time.NewTicker(t) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go index f8123704f61..16f2633670c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -27,6 +28,8 @@ func (fc FilterConfig) String() string { return string(jsonStr) } +const filterSubLoopInterval = 5 * time.Second + type Sub struct { ContentFilter protocol.ContentFilter DataCh chan *protocol.Envelope @@ -69,13 +72,7 @@ func defaultOptions() []SubscribeOptions { } // Subscribe -func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) { - optList := append(defaultOptions(), opts...) - params := new(subscribeParameters) - for _, opt := range optList { - opt(params) - } - +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) { sub := new(Sub) sub.id = uuid.NewString() sub.wf = wf @@ -95,12 +92,14 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte sub.multiplex(subs) } } - - go sub.subscriptionLoop(params.batchInterval) + // filter subscription loop is to check if target subscriptions for a filter are active and if not + // trigger resubscribe. + go sub.subscriptionLoop(filterSubLoopInterval) return sub, nil } func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) { + defer utils.LogOnPanic() _, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter) //Not reading result unless we want to do specific error handling? if err != nil { @@ -109,6 +108,7 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) { } func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) { + defer utils.LogOnPanic() ticker := time.NewTicker(batchInterval) defer ticker.Stop() for { @@ -216,12 +216,14 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { for _, subDetails := range subs { apiSub.subs[subDetails.ID] = subDetails go func(subDetails *subscription.SubscriptionDetails) { + defer utils.LogOnPanic() apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID)) for env := range subDetails.C { apiSub.DataCh <- env } }(subDetails) go func(subDetails *subscription.SubscriptionDetails) { + defer utils.LogOnPanic() select { case <-apiSub.ctx.Done(): return diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go index e4b6e524d61..b4933a798aa 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go @@ -13,6 +13,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/utils" ) // Methods on FilterManager just aggregate filters from application and subscribe to them @@ -31,7 +32,7 @@ type appFilterMap map[string]filterConfig type FilterManager struct { sync.Mutex ctx context.Context - opts []SubscribeOptions + params *subscribeParameters minPeersPerFilter int onlineChecker *onlinechecker.DefaultOnlineChecker filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details @@ -64,7 +65,6 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx - mgr.opts = opts mgr.logger = logger mgr.minPeersPerFilter = minPeersPerFilter mgr.envProcessor = envProcessor @@ -72,15 +72,23 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter mgr.node = node mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) mgr.node.SetOnlineChecker(mgr.onlineChecker) - mgr.filterSubBatchDuration = 5 * time.Second mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.filterConfigs = make(appFilterMap) mgr.waitingToSubQueue = make(chan filterConfig, 100) + + //parsing the subscribe params only to read the batchInterval passed. + mgr.params = new(subscribeParameters) + opts = append(defaultOptions(), opts...) + for _, opt := range opts { + opt(mgr.params) + } + mgr.filterSubBatchDuration = mgr.params.batchInterval go mgr.startFilterSubLoop() return mgr } func (mgr *FilterManager) startFilterSubLoop() { + defer utils.LogOnPanic() ticker := time.NewTicker(mgr.filterSubBatchDuration) defer ticker.Stop() for { @@ -151,9 +159,10 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi } func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { + defer utils.LogOnPanic() ctx, cancel := context.WithCancel(mgr.ctx) config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} - sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...) + sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params) mgr.Lock() mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} mgr.Unlock() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go index 058af9a4897..ca8b63fb79b 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go @@ -15,6 +15,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) @@ -102,6 +103,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { m.C = c go func() { + defer utils.LogOnPanic() t := time.NewTicker(m.params.interval) defer t.Stop() @@ -123,6 +125,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { default: semaphore <- struct{}{} go func(interest criteriaInterest) { + defer utils.LogOnPanic() m.fetchHistory(c, interest) <-semaphore }(interest) @@ -276,6 +279,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, wg.Add(1) go func(messageHashes []pb.MessageHash) { + defer utils.LogOnPanic() defer wg.Wait() result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go index 67a67c91344..c6df0f29a60 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go @@ -14,6 +14,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -145,6 +146,7 @@ func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) { // Start checks if the tracked outgoing messages are stored periodically func (m *MessageSentCheck) Start() { + defer utils.LogOnPanic() ticker := time.NewTicker(m.hashQueryInterval) defer ticker.Stop() for { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go index 03b7a16a6de..50d3d75c33e 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" ) // MessagePriority determines the ordering for the message priority queue @@ -182,6 +183,7 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope { ch := make(chan *protocol.Envelope) go func() { + defer utils.LogOnPanic() defer close(ch) select { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go index 582e46d2467..aeae8e182e5 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go @@ -21,7 +21,6 @@ import ( "go.uber.org/zap" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/nat" ) @@ -172,6 +171,7 @@ func (d *DiscoveryV5) listen(ctx context.Context) error { if d.NAT != nil && !d.udpAddr.IP.IsLoopback() { d.WaitGroup().Add(1) go func() { + defer utils.LogOnPanic() defer d.WaitGroup().Done() nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery") }() @@ -217,6 +217,7 @@ func (d *DiscoveryV5) start() error { if d.params.autoFindPeers { d.WaitGroup().Add(1) go func() { + defer utils.LogOnPanic() defer d.WaitGroup().Done() d.runDiscoveryV5Loop(d.Context()) }() @@ -253,19 +254,13 @@ func (d *DiscoveryV5) Stop() { } func isWakuNode(node *enode.Node) bool { - enrField := new(wenr.WakuEnrBitfield) - if err := node.Record().Load(enr.WithEntry(wenr.WakuENRField, &enrField)); err != nil { - if !enr.IsNotFound(err) { - utils.Logger().Named("discv5").Error("could not retrieve waku2 ENR field for enr ", zap.Any("node", node)) - } + enrField, err := wenr.GetWakuEnrBitField(node) + if err != nil { + utils.Logger().Named("discv5").Error("could not retrieve waku2 ENR field for enr ", zap.Error(err)) return false } - if enrField != nil { - return *enrField != uint8(0) // #RFC 31 requirement - } - - return false + return enrField != uint8(0) // #RFC 31 requirement } func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/filters.go b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/filters.go index e34502fa559..40557fbccdd 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/filters.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/filters.go @@ -4,7 +4,6 @@ import ( wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" ) // FilterPredicate is to create a Predicate using a custom function @@ -36,16 +35,11 @@ func FilterShard(cluster, index uint16) Predicate { func FilterCapabilities(flags wenr.WakuEnrBitfield) Predicate { return func(iterator enode.Iterator) enode.Iterator { predicate := func(node *enode.Node) bool { - enrField := new(wenr.WakuEnrBitfield) - if err := node.Record().Load(enr.WithEntry(wenr.WakuENRField, &enrField)); err != nil { + enrField, err := wenr.GetWakuEnrBitField(node) + if err != nil { return false } - - if enrField == nil { - return false - } - - return *enrField&flags == flags + return enrField&flags == flags } return enode.Filter(iterator, predicate) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/mock_peer_discoverer.go b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/mock_peer_discoverer.go index 5bef8542116..f997f028c83 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/mock_peer_discoverer.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/mock_peer_discoverer.go @@ -6,6 +6,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" ) // TestPeerDiscoverer is mock peer discoverer for testing @@ -26,6 +27,7 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer { // Subscribe is for subscribing to peer discoverer func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan service.PeerData) { go func() { + defer utils.LogOnPanic() for { select { case <-ctx.Done(): diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go index 5a0f89fe236..3526b1d293c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/utils" ) // PeerStatis is a map of peer IDs to supported protocols @@ -101,6 +102,7 @@ func (c ConnectionNotifier) Close() { } func (w *WakuNode) connectednessListener(ctx context.Context) { + defer utils.LogOnPanic() defer w.wg.Done() for { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go index 94ebbb74ade..416666f55a6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "golang.org/x/exp/maps" ) @@ -40,6 +41,7 @@ func disconnectAllPeers(host host.Host, logger *zap.Logger) { // This is necessary because TCP connections are automatically closed due to inactivity, // and doing a ping will avoid this (with a small bandwidth cost) func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration time.Duration, allPeersPingDuration time.Duration) { + defer utils.LogOnPanic() defer w.wg.Done() if !w.opts.enableRelay { @@ -168,6 +170,7 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t } func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, resultChan chan bool) { + defer utils.LogOnPanic() defer wg.Done() logger := w.log.With(logging.HostID("peer", peerID)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go index 9de6c59fd3c..5b042734b14 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go @@ -15,6 +15,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -358,6 +359,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error { } go func() { + defer utils.LogOnPanic() defer evtRelaySubscribed.Close() defer evtRelayUnsubscribed.Close() @@ -411,6 +413,7 @@ func (w *WakuNode) registerAndMonitorReachability(ctx context.Context) { } w.wg.Add(1) go func() { + defer utils.LogOnPanic() defer myEventSub.Close() defer w.wg.Done() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index f9dc443fc5d..0a8188120bd 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -214,6 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { func(ctx context.Context, numPeers int) <-chan peer.AddrInfo { r := make(chan peer.AddrInfo) go func() { + defer utils.LogOnPanic() defer close(r) for ; numPeers != 0; numPeers-- { select { @@ -292,7 +293,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...) - w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) + w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit) if params.storeFactory != nil { w.storeFactory = params.storeFactory @@ -308,6 +309,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) { + defer utils.LogOnPanic() defer w.wg.Done() addrsSet := utils.MultiAddrSet(w.ListenAddresses()...) @@ -550,6 +552,7 @@ func (w *WakuNode) ID() string { } func (w *WakuNode) watchENRChanges(ctx context.Context) { + defer utils.LogOnPanic() defer w.wg.Done() var prevNodeVal string @@ -752,7 +755,9 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) if err != nil { - w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID) + if w.peermanager != nil { + w.peermanager.HandleDialError(err, info.ID) + } return err } @@ -885,6 +890,7 @@ func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice { } func (w *WakuNode) findRelayNodes(ctx context.Context) { + defer utils.LogOnPanic() defer w.wg.Done() // Feed peers more often right after the bootstrap, then backoff diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index 445065de636..112cafe616b 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -38,6 +38,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" ) // Default UserAgent @@ -94,6 +95,8 @@ type WakuNodeParameters struct { enableStore bool messageProvider legacy_store.MessageProvider + storeRateLimit rate.Limit + enableRendezvousPoint bool rendezvousDB *rendezvous.DB @@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{ WithCircuitRelayParams(2*time.Second, 3*time.Minute), WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity), WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)), + WithWakuStoreRateLimit(8), // Value currently set in status.staging } // MultiAddresses return the list of multiaddresses configured in the node @@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption { } } +// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will +// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming +// the store protocol from a storenode +func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.storeRateLimit = value + return nil + } +} + // WithWakuStore enables the Waku V2 Store protocol and if the messages should // be stored or not in a message provider. func WithWakuStore() WakuNodeOption { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/connection_gater.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/connection_gater.go index d08008139ea..3a326d6bf21 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/connection_gater.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/connection_gater.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -77,6 +78,7 @@ func (c *ConnectionGater) InterceptUpgraded(_ network.Conn) (allow bool, reason // NotifyDisconnect is called when a connection disconnects. func (c *ConnectionGater) NotifyDisconnect(addr multiaddr.Multiaddr) { + defer utils.LogOnPanic() ip, err := manet.ToIP(addr) if err != nil { return diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/fastest_peer_selector.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/fastest_peer_selector.go index 225d594027f..7f2ce6ddb45 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/fastest_peer_selector.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/fastest_peer_selector.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -69,9 +70,11 @@ func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlic pinged := make(map[peer.ID]struct{}) go func() { + defer utils.LogOnPanic() // Ping any peer with no latency recorded for peerToPing := range pingCh { go func(p peer.ID) { + defer utils.LogOnPanic() defer wg.Done() rtt := time.Hour result, err := r.PingPeer(ctx, p) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go index bd844b20c6a..8cce276a1dc 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go @@ -4,7 +4,6 @@ package peermanager import ( "context" - "errors" "math/rand" "sync" "sync/atomic" @@ -19,6 +18,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/onlinechecker" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -104,6 +104,7 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan servic // if running start a goroutine to consume the subscription c.WaitGroup().Add(1) go func() { + defer utils.LogOnPanic() defer c.WaitGroup().Done() c.consumeSubscription(subscription{ctx, ch}) }() @@ -187,6 +188,7 @@ func (c *PeerConnectionStrategy) consumeSubscriptions() { for _, subs := range c.subscriptions { c.WaitGroup().Add(1) go func(s subscription) { + defer utils.LogOnPanic() defer c.WaitGroup().Done() c.consumeSubscription(s) }(subs) @@ -234,6 +236,7 @@ func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) { } func (c *PeerConnectionStrategy) dialPeers() { + defer utils.LogOnPanic() defer c.WaitGroup().Done() maxGoRoutines := c.pm.OutPeersTarget @@ -273,15 +276,15 @@ func (c *PeerConnectionStrategy) dialPeers() { } func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { + defer utils.LogOnPanic() defer c.WaitGroup().Done() ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout) defer cancel() err := c.host.Connect(ctx, pi) - if err != nil && !errors.Is(err, context.Canceled) { - c.addConnectionBackoff(pi.ID) - c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID) - c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) + if err != nil { + c.pm.HandleDialError(err, pi.ID) + } else { + c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) } - c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) <-sem } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go index 8ab1c8bebf4..89868510be9 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go @@ -11,6 +11,7 @@ import ( wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -103,6 +104,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16, } func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) { + defer utils.LogOnPanic() shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...) if err != nil { pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index 2ac489a0484..8e07d93c6b3 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/p2p/enr" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" @@ -23,6 +22,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/metadata" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -87,6 +87,7 @@ type PeerManager struct { TopicHealthNotifCh chan<- TopicHealthStatus rttCache *FastestPeerSelector RelayEnabled bool + evtDialError event.Emitter } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -249,9 +250,18 @@ func (pm *PeerManager) Start(ctx context.Context) { go pm.connectivityLoop(ctx) } go pm.peerStoreLoop(ctx) + + if pm.host != nil { + var err error + pm.evtDialError, err = pm.host.EventBus().Emitter(new(utils.DialError)) + if err != nil { + pm.logger.Error("failed to create dial error emitter", zap.Error(err)) + } + } } func (pm *PeerManager) peerStoreLoop(ctx context.Context) { + defer utils.LogOnPanic() t := time.NewTicker(prunePeerStoreInterval) defer t.Stop() for { @@ -353,6 +363,7 @@ func (pm *PeerManager) prunePeerStore() { // This is a connectivity loop, which currently checks and prunes inbound connections. func (pm *PeerManager) connectivityLoop(ctx context.Context) { + defer utils.LogOnPanic() pm.connectToPeers() t := time.NewTicker(peerConnectivityLoopSecs * time.Second) defer t.Stop() @@ -535,8 +546,8 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { } supportedProtos := []protocol.ID{} //Identify and specify protocols supported by the peer based on the discovered peer's ENR - var enrField wenr.WakuEnrBitfield - if err := p.ENR.Record().Load(enr.WithEntry(wenr.WakuENRField, &enrField)); err == nil { + enrField, err := wenr.GetWakuEnrBitField(p.ENR) + if err == nil { for proto, protoENR := range pm.wakuprotoToENRFieldMap { protoENRField := protoENR.waku2ENRBitField if protoENRField&enrField != 0 { @@ -719,3 +730,22 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { // getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol pm.serviceSlots.getPeers(proto).add(peerID) } + +func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { + if err == nil || errors.Is(err, context.Canceled) { + return + } + if pm.peerConnector != nil { + pm.peerConnector.addConnectionBackoff(peerID) + } + if pm.host != nil { + pm.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID) + } + pm.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err)) + if pm.evtDialError != nil { + emitterErr := pm.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID}) + if emitterErr != nil { + pm.logger.Error("failed to emit DialError", zap.Error(emitterErr)) + } + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/topic_event_handler.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/topic_event_handler.go index 1b965ef033f..41a7760f669 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/topic_event_handler.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/topic_event_handler.go @@ -12,6 +12,7 @@ import ( wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "golang.org/x/exp/maps" ) @@ -162,6 +163,7 @@ func (pm *PeerManager) handlerPeerTopicEvent(peerEvt relay.EvtPeerTopic) { } func (pm *PeerManager) peerEventLoop(ctx context.Context) { + defer utils.LogOnPanic() defer pm.sub.Close() for { select { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/enr.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/enr.go index fe1e546256a..27c332c3c49 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/enr.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/enr.go @@ -28,6 +28,23 @@ const ShardingBitVectorEnrField = "rsv" // WakuEnrBitfield is a8-bit flag field to indicate Waku capabilities. Only the 4 LSBs are currently defined according to RFC31 (https://rfc.vac.dev/spec/31/). type WakuEnrBitfield = uint8 +func GetWakuEnrBitField(node *enode.Node) (WakuEnrBitfield, error) { + enrField := []byte{} + err := node.Record().Load(enr.WithEntry(WakuENRField, &enrField)) + if err != nil { + if enr.IsNotFound(err) { + return 0, nil + } + return 0, err + } + + if len(enrField) == 0 { + return 0, err + } + + return WakuEnrBitfield(enrField[0]), nil +} + // NewWakuEnrBitfield creates a WakuEnrBitField whose value will depend on which protocols are enabled in the node func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield { var v uint8 diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index a9d2b496d9e..3b56d4700ee 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -28,6 +28,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "golang.org/x/exp/maps" "golang.org/x/exp/slices" @@ -127,6 +128,7 @@ func (wf *WakuFilterLightNode) Stop() { wf.h.RemoveStreamHandler(FilterPushID_v20beta1) if wf.subscriptions.Count() > 0 { go func() { + defer utils.LogOnPanic() defer func() { _ = recover() }() @@ -245,8 +247,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) - if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wf.pm != nil { + wf.pm.HandleDialError(err, peerID) } return err } @@ -414,6 +416,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot for i, peerID := range selectedPeers { wg.Add(1) go func(index int, ID peer.ID) { + defer utils.LogOnPanic() defer wg.Done() err := wf.request( reqCtx, @@ -565,6 +568,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr // send unsubscribe request to all the peers for peerID := range peers { go func(peerID peer.ID) { + defer utils.LogOnPanic() defer func() { if params.wg != nil { params.wg.Done() @@ -687,6 +691,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte } for peerId := range peers { go func(peerID peer.ID) { + defer utils.LogOnPanic() defer func() { if params.wg != nil { params.wg.Done() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go index a6b76a340c7..126090d9939 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go @@ -5,6 +5,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -19,6 +20,7 @@ func (wf *WakuFilterLightNode) PingPeers() { } func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { + defer utils.LogOnPanic() ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout) defer cancel() err := wf.Ping(ctxWithTimeout, peer) @@ -41,6 +43,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { } func (wf *WakuFilterLightNode) FilterHealthCheckLoop() { + defer utils.LogOnPanic() defer wf.WaitGroup().Done() ticker := time.NewTicker(wf.peerPingInterval) defer ticker.Stop() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go index 9a2e25d6b68..bacfe85cc14 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go @@ -14,7 +14,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -38,6 +38,7 @@ type ( log *zap.Logger *service.CommonService subscriptions *SubscribersMap + pm *peermanager.PeerManager maxSubscriptions int } @@ -61,6 +62,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi wf.maxSubscriptions = params.MaxSubscribers if params.pm != nil { params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField) + wf.pm = params.pm } return wf } @@ -216,6 +218,7 @@ func (wf *WakuFilterFullNode) unsubscribeAll(ctx context.Context, stream network } func (wf *WakuFilterFullNode) filterListener(ctx context.Context) { + defer utils.LogOnPanic() defer wf.WaitGroup().Done() // This function is invoked for each message received @@ -237,6 +240,7 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) { logger.Debug("pushing message to light node") wf.WaitGroup().Add(1) go func(subscriber peer.ID) { + defer utils.LogOnPanic() defer wf.WaitGroup().Done() start := time.Now() err := wf.pushMessage(ctx, logger, subscriber, envelope) @@ -274,8 +278,8 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge wf.metrics.RecordError(pushTimeoutFailure) } else { wf.metrics.RecordError(dialFailure) - if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wf.pm != nil { + wf.pm.HandleDialError(err, peerID) } } logger.Error("opening peer stream", zap.Error(err)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/subscribers_map.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/subscribers_map.go index faa5700ca3e..b539357e973 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/subscribers_map.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/subscribers_map.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" ) type PeerSet map[peer.ID]struct{} @@ -188,6 +189,7 @@ func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan key := getKey(pubsubTopic, contentTopic) f := func() { + defer utils.LogOnPanic() sub.RLock() defer sub.RUnlock() @@ -236,6 +238,7 @@ func (sub *SubscribersMap) Refresh(peerID peer.ID) { } func (sub *SubscribersMap) cleanUp(ctx context.Context, cleanupInterval time.Duration) { + defer utils.LogOnPanic() t := time.NewTicker(cleanupInterval) defer t.Stop() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go index 03f7c9b217e..ef971f0037e 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go @@ -205,10 +205,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) store.metrics.RecordError(dialFailure) - if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(selectedPeer) + if store.pm != nil { + store.pm.HandleDialError(err, selectedPeer) } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_protocol.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_protocol.go index 16eabe8f002..90229b5bd27 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_protocol.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_protocol.go @@ -21,6 +21,7 @@ import ( wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" ) func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*wpb.WakuMessage, *pb.PagingInfo, error) { @@ -159,9 +160,11 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) error { } func (store *WakuStore) storeIncomingMessages(ctx context.Context) { + defer utils.LogOnPanic() defer store.wg.Done() for envelope := range store.MsgC.Ch { go func(env *protocol.Envelope) { + defer utils.LogOnPanic() _ = store.storeMessage(env) }(envelope) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 8200fddfcfd..9d6744315a6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -195,10 +195,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) - if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wakuLP.pm != nil { + wakuLP.pm.HandleDialError(err, peerID) } return nil, err } @@ -335,6 +334,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa for i, peerID := range params.selectedPeers { wg.Add(1) go func(index int, id peer.ID) { + defer utils.LogOnPanic() paramsValue := *params paramsValue.requestID = protocol.GenerateRequestID() defer wg.Done() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go index 47cf088dc29..34327980902 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go @@ -20,6 +20,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -225,6 +226,7 @@ func (wakuM *WakuMetadata) disconnectPeer(peerID peer.ID, reason error) { // Connected is called when a connection is opened func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) { go func() { + defer utils.LogOnPanic() wakuM.log.Debug("peer connected", zap.Stringer("peer", cc.RemotePeer())) // Metadata verification is done only if a clusterID is specified if wakuM.clusterID == 0 { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go index 8075e2de8d0..ef1f7bb9ae9 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go @@ -16,6 +16,7 @@ import ( wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -76,8 +77,8 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { - if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(params.selectedPeer) + if wakuPX.pm != nil { + wakuPX.pm.HandleDialError(err, params.selectedPeer) } return err } @@ -123,13 +124,11 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb } if params.clusterID != 0 { - wakuPX.log.Debug("clusterID is non zero, filtering by shard") rs, err := wenr.RelaySharding(enrRecord) if err != nil || rs == nil || !rs.Contains(uint16(params.clusterID), uint16(params.shard)) { wakuPX.log.Debug("peer doesn't matches filter", zap.Int("shard", params.shard)) continue } - wakuPX.log.Debug("peer matches filter", zap.Int("shard", params.shard)) } enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord) @@ -156,6 +155,7 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(discoveredPeers))) wakuPX.WaitGroup().Add(1) go func() { + defer utils.LogOnPanic() defer wakuPX.WaitGroup().Done() peerCh := make(chan service.PeerData) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go index 3f33b2ec8e0..5a3821b96a1 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go @@ -21,6 +21,7 @@ import ( wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "golang.org/x/time/rate" ) @@ -223,6 +224,7 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error { } func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) { + defer utils.LogOnPanic() defer wakuPX.WaitGroup().Done() // Runs a discv5 loop adding new peers to the px peer cache diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/broadcast.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/broadcast.go index 36ca3e1e7bf..91ece5d8ad8 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/broadcast.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/broadcast.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" ) type BroadcasterParameters struct { @@ -174,6 +175,7 @@ func (b *broadcaster) Start(ctx context.Context) error { } func (b *broadcaster) run(ctx context.Context) { + defer utils.LogOnPanic() for { select { case <-ctx.Done(): diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/config.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/config.go index f0f41f80045..bc1ea9334be 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/config.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/config.go @@ -13,6 +13,10 @@ import ( var DefaultRelaySubscriptionBufferSize int = 1024 +// trying to match value here https://github.com/vacp2p/nim-libp2p/pull/1077 +// note that nim-libp2p has 2 peer queues 1 for priority and other non-priority, whereas go-libp2p seems to have single peer-queue +var DefaultPeerOutboundQSize int = 1024 + type RelaySubscribeParameters struct { dontConsume bool cacheSize uint @@ -109,6 +113,7 @@ func (w *WakuRelay) defaultPubsubOptions() []pubsub.Option { pubsub.WithSeenMessagesTTL(2 * time.Minute), pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds), pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second), + pubsub.WithPeerOutboundQueueSize(DefaultPeerOutboundQSize), } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/metrics.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/metrics.go index 4a10a0a9a60..a6b33cc993c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/metrics.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/metrics.go @@ -5,6 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -56,6 +57,7 @@ func newMetrics(reg prometheus.Registerer, logger *zap.Logger) Metrics { // RecordMessage is used to increase the counter for the number of messages received via waku relay func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) { go func() { + defer utils.LogOnPanic() payloadSizeInBytes := len(envelope.Message().Payload) payloadSizeInKb := float64(payloadSizeInBytes) / 1000 messageSize.Observe(payloadSizeInKb) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/topic_events.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/topic_events.go index 60261a3b3f0..0b828b867d5 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/topic_events.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/topic_events.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -51,6 +52,7 @@ func (w *WakuRelay) addPeerTopicEventListener(topic *pubsub.Topic) (*pubsub.Topi } func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandler) { + defer utils.LogOnPanic() defer w.WaitGroup().Done() for { evt, err := handler.NextPeerEvent(w.Context()) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go index 2ff8329af68..24964dd18a3 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -439,6 +439,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont subscriptions = append(subscriptions, subscription) go func() { + defer utils.LogOnPanic() <-ctx.Done() subscription.Unsubscribe() }() @@ -533,6 +534,7 @@ func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptio } func (w *WakuRelay) pubsubTopicMsgHandler(sub *pubsub.Subscription) { + defer utils.LogOnPanic() defer w.WaitGroup().Done() for { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go index 42c3b5f2615..a73b91264d6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go @@ -15,6 +15,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" "github.com/waku-org/go-waku/waku/v2/protocol/rln/web3" + "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-zerokit-rln/rln" "go.uber.org/zap" ) @@ -120,6 +121,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo } func (mf *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler, errCh chan<- error) { + defer utils.LogOnPanic() defer mf.wg.Done() // Watch for new events diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/nullifier_log.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/nullifier_log.go index 1bf6263ad48..cc122a79771 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/nullifier_log.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/nullifier_log.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-zerokit-rln/rln" "go.uber.org/zap" ) @@ -89,6 +90,7 @@ func (n *NullifierLog) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) { // cleanup cleans up the log every time there are more than MaxEpochGap epochs stored in it func (n *NullifierLog) cleanup(ctx context.Context) { + defer utils.LogOnPanic() t := time.NewTicker(1 * time.Minute) // TODO: tune this defer t.Stop() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go index 92c47ff4c1d..f7427b9790f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go @@ -19,6 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" + "golang.org/x/time/rate" "google.golang.org/protobuf/proto" ) @@ -69,14 +70,19 @@ type WakuStore struct { timesource timesource.Timesource log *zap.Logger pm *peermanager.PeerManager + + defaultRatelimit rate.Limit + rateLimiters map[peer.ID]*rate.Limiter } // NewWakuStore is used to instantiate a StoreV3 client -func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore { +func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore { s := new(WakuStore) s.log = log.Named("store-client") s.timesource = timesource s.pm = pm + s.defaultRatelimit = defaultRatelimit + s.rateLimiters = make(map[peer.ID]*rate.Limiter) if pm != nil { pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField) @@ -171,7 +177,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return nil, err } - response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer) + response, err := s.queryFrom(ctx, storeRequest, params) if err != nil { return nil, err } @@ -211,7 +217,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt return len(result.messages) != 0, nil } -func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { +func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) { if r.IsComplete() { return &Result{ store: s, @@ -223,11 +229,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { }, nil } + params := new(Parameters) + params.selectedPeer = r.PeerID() + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + err := opt(params) + if err != nil { + return nil, err + } + } + storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest) storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID()) storeRequest.PaginationCursor = r.Cursor() - response, err := s.queryFrom(ctx, storeRequest, r.PeerID()) + response, err := s.queryFrom(ctx, storeRequest, params) if err != nil { return nil, err } @@ -245,16 +262,27 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { } -func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) { - logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId)))) +func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) { + logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId)))) logger.Debug("sending store request") - stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300) + if !params.skipRatelimit { + rateLimiter, ok := s.rateLimiters[params.selectedPeer] + if !ok { + rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1) + s.rateLimiters[params.selectedPeer] = rateLimiter + } + err := rateLimiter.Wait(ctx) + if err != nil { + return nil, err + } + } + + stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) - if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(selectedPeer) + if s.pm != nil { + s.pm.HandleDialError(err, params.selectedPeer) } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go index b38afd53ab9..b8deba47cdf 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go @@ -19,6 +19,7 @@ type Parameters struct { pageLimit uint64 forward bool includeData bool + skipRatelimit bool } type RequestOption func(*Parameters) error @@ -115,6 +116,14 @@ func IncludeData(v bool) RequestOption { } } +// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429)) +func SkipRateLimit() RequestOption { + return func(params *Parameters) error { + params.skipRatelimit = true + return nil + } +} + // Default options to be used when querying a store node for results func DefaultOptions() []RequestOption { return []RequestOption{ diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go index f54dea90e0a..542c6a05208 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go @@ -2,6 +2,7 @@ package pb import ( "errors" + "fmt" ) // MaxContentTopics is the maximum number of allowed contenttopics in a query @@ -10,7 +11,6 @@ const MaxContentTopics = 10 var ( errMissingRequestID = errors.New("missing RequestId field") errMessageHashOtherFields = errors.New("cannot use MessageHashes with ContentTopics/PubsubTopic") - errRequestIDMismatch = errors.New("requestID in response does not match request") errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed") errEmptyContentTopic = errors.New("one or more content topics specified is empty") errMissingPubsubTopic = errors.New("missing PubsubTopic field") @@ -57,8 +57,8 @@ func (x *StoreQueryRequest) Validate() error { } func (x *StoreQueryResponse) Validate(requestID string) error { - if x.RequestId != "" && x.RequestId != requestID { - return errRequestIDMismatch + if x.RequestId != "" && x.RequestId != "N/A" && x.RequestId != requestID { + return fmt.Errorf("requestID %s in response does not match requestID in request %s", x.RequestId, requestID) } if x.StatusCode == nil { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go index 5ea4765ec8b..604d6453c2d 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go @@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse { return r.storeResponse } -func (r *Result) Next(ctx context.Context) error { +func (r *Result) Next(ctx context.Context, opts ...RequestOption) error { if r.cursor == nil { r.done = true r.messages = nil return nil } - newResult, err := r.store.next(ctx, r) + newResult, err := r.store.next(ctx, r, opts...) if err != nil { return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/db.go b/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/db.go index 1d751fede4d..f4d9ba196ed 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/db.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/db.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" dbi "github.com/waku-org/go-libp2p-rendezvous/db" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -315,6 +316,7 @@ func (db *DB) ValidCookie(ns string, cookie []byte) bool { } func (db *DB) background(ctx context.Context) { + defer utils.LogOnPanic() for { db.cleanupExpired() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/rendezvous.go b/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/rendezvous.go index 76c63ff5c0e..69862f60e18 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/rendezvous.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/rendezvous.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -142,6 +143,7 @@ func (r *Rendezvous) Register(ctx context.Context, rendezvousPoints []*Rendezvou // RegisterShard registers the node in the rendezvous points using a shard as namespace func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16, rendezvousPoints []*RendezvousPoint) { + defer utils.LogOnPanic() namespace := ShardToNamespace(cluster, shard) r.RegisterWithNamespace(ctx, namespace, rendezvousPoints) } @@ -158,6 +160,7 @@ func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string for _, m := range rendezvousPoints { r.WaitGroup().Add(1) go func(m *RendezvousPoint) { + defer utils.LogOnPanic() r.WaitGroup().Done() rendezvousClient := rvs.NewRendezvousClient(r.host, m.id) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/service/common_discovery_service.go b/vendor/github.com/waku-org/go-waku/waku/v2/service/common_discovery_service.go index 0aa6b852889..a2a4b22153f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/service/common_discovery_service.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/service/common_discovery_service.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/utils" ) // PeerData contains information about a peer useful in establishing connections with it. @@ -58,6 +59,7 @@ func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData { return sp.channel } func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool { + defer utils.LogOnPanic() if err := sp.ErrOnNotRunning(); err != nil { return false } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/timesource/ntp.go b/vendor/github.com/waku-org/go-waku/waku/v2/timesource/ntp.go index 3454631e3eb..a27d03fc8ec 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/timesource/ntp.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/timesource/ntp.go @@ -9,6 +9,7 @@ import ( "time" "github.com/beevik/ntp" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -69,6 +70,7 @@ func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (t responses := make(chan queryResponse, len(servers)) for _, server := range servers { go func(server string) { + defer utils.LogOnPanic() response, err := timeQuery(server, ntp.QueryOptions{ Timeout: DefaultRPCTimeout, }) @@ -172,6 +174,7 @@ func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error) er // we try to do it synchronously so that user can have reliable messages right away s.wg.Add(1) go func() { + defer utils.LogOnPanic() for { select { case <-time.After(period): diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/utils/logger.go b/vendor/github.com/waku-org/go-waku/waku/v2/utils/logger.go index 02b82eee46e..beba67d87d5 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/utils/logger.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/utils/logger.go @@ -1,6 +1,7 @@ package utils import ( + "runtime/debug" "strings" logging "github.com/ipfs/go-log/v2" @@ -81,3 +82,10 @@ func InitLogger(encoding string, output string, name string, level zapcore.Level log = logging.Logger(name).Desugar() } + +func LogOnPanic() { + if err := recover(); err != nil { + Logger().Error("panic in goroutine", zap.Any("error", err), zap.String("stacktrace", string(debug.Stack()))) + panic(err) + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/utils/peer.go b/vendor/github.com/waku-org/go-waku/waku/v2/utils/peer.go index 8321dc3e358..b732fa14954 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/utils/peer.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/utils/peer.go @@ -5,6 +5,11 @@ import ( "github.com/multiformats/go-multiaddr" ) +type DialError struct { + Err error + PeerID peer.ID +} + // GetPeerID is used to extract the peerID from a multiaddress func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P) diff --git a/vendor/modules.txt b/vendor/modules.txt index 634b75422d1..ea3edff8d66 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1007,7 +1007,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da +# github.com/waku-org/go-waku v0.8.1-0.20240925210455-69ce0c676ce7 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests