diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 6a8cefb34d3..c1f1ceec0b8 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -131,6 +131,10 @@ func (w *GethWakuWrapper) SetCriteriaForMissingMessageVerification(peerID peer.I return errors.New("not available in WakuV1") } +func (w *GethWakuWrapper) TriggerCheckForMissingMessages() error { + return errors.New("not available in WakuV1") +} + // Peers function only added for compatibility with waku V2 func (w *GethWakuWrapper) Peers() map[string]types.WakuV2Peer { p := make(map[string]types.WakuV2Peer) diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 4bdc8390a3d..db98ebc62a7 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -316,6 +316,14 @@ func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerID peer return nil } +func (w *gethWakuV2Wrapper) TriggerCheckForMissingMessages() error { + w.waku.TriggerCheckForMissingMessages() + + // No err can be be generated by this function. The function returns an error + // Just so there's compatibility with GethWakuWrapper from V1 + return nil +} + func (w *gethWakuV2Wrapper) ConnectionChanged(state connection.State) { w.waku.ConnectionChanged(state) } diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index e3615c6455d..692cd00b454 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -131,6 +131,8 @@ type Waku interface { SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []string) error + TriggerCheckForMissingMessages() error + // MinPow returns the PoW value required by this node. MinPow() float64 // BloomFilter returns the aggregated bloom filter for all the topics of interest. diff --git a/protocol/messenger.go b/protocol/messenger.go index 25646ddc946..743d30506ab 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -821,7 +821,6 @@ func (m *Messenger) Start() (*MessengerResponse, error) { m.updateCommunitiesActiveMembersPeriodically() m.schedulePublishGrantsForControlledCommunities() m.handleENSVerificationSubscription(ensSubscription) - m.watchConnectionChange() m.watchChatsAndCommunitiesToUnmute() m.watchCommunitiesToUnmute() m.watchExpiredMessages() @@ -869,13 +868,18 @@ func (m *Messenger) Start() (*MessengerResponse, error) { return nil, err } - go m.checkForMissingMessagesLoop() - controlledCommunities, err := m.communitiesManager.Controlled() if err != nil { return nil, err } + msAvailable := m.SubscribeMailserverAvailable() + go func() { + <-msAvailable + m.checkForMissingMessagesLoop() + m.watchConnectionChange() + }() + if m.archiveManager.IsReady() { available := m.SubscribeMailserverAvailable() go func() { @@ -1002,9 +1006,12 @@ func (m *Messenger) handleConnectionChange(online bool) { m.shouldPublishContactCode = false } - // Start fetching messages from store nodes if online && m.config.codeControlFlags.AutoRequestHistoricMessages { - m.asyncRequestAllHistoricMessages() + if m.transport.WakuVersion() == 2 { + m.transport.TriggerCheckForMissingMessages() + } else { + m.asyncRequestAllHistoricMessages() + } } // Update ENS verifier diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 4290790b1e5..8f398011697 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -382,32 +382,46 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries const missingMessageCheckPeriod = 30 * time.Second func (m *Messenger) checkForMissingMessagesLoop() { - t := time.NewTicker(missingMessageCheckPeriod) - defer t.Stop() + setCriteria := func() error { + filters := m.transport.Filters() + filtersByMs := m.SplitFiltersByStoreNode(filters) + for communityID, filtersForMs := range filtersByMs { + ms := m.getActiveMailserver(communityID) + if ms == nil { + continue + } - for { - select { - case <-m.quit: - return - - case <-t.C: - filters := m.transport.Filters() - filtersByMs := m.SplitFiltersByStoreNode(filters) - for communityID, filtersForMs := range filtersByMs { - ms := m.getActiveMailserver(communityID) - if ms == nil { - continue - } + peerID, err := ms.PeerID() + if err != nil { + return fmt.Errorf("could not obtain the peerID") + } + m.transport.SetCriteriaForMissingMessageVerification(peerID, filtersForMs) + } + return nil + } - peerID, err := ms.PeerID() - if err != nil { - m.logger.Error("could not obtain the peerID") + // set criteria for the first time + if err := setCriteria(); err != nil { + m.logger.Error("setting criteria for missing message notification", zap.Error(err)) + return + } + + go func() { + t := time.NewTicker(missingMessageCheckPeriod) + defer t.Stop() + + for { + select { + case <-m.quit: + return + case <-t.C: + if err := setCriteria(); err != nil { + m.logger.Error("setting criteria for missing message notification", zap.Error(err)) return } - m.transport.SetCriteriaForMissingMessageVerification(peerID, filtersForMs) } } - } + }() } func getPrioritizedBatches() []int { diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 37c860f7695..f6bdeeb7a16 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -746,6 +746,15 @@ func (t *Transport) SetStorePeerID(peerID peer.ID) { t.waku.SetStorePeerID(peerID) } +func (t *Transport) TriggerCheckForMissingMessages() { + if t.waku.Version() < 2 { + return + } + if err := t.waku.TriggerCheckForMissingMessages(); err != nil { + t.logger.Error("failed to trigger check for missing messages", zap.Error(err)) + } +} + func (t *Transport) SetCriteriaForMissingMessageVerification(peerID peer.ID, filters []*Filter) { if t.waku.Version() != 2 { return diff --git a/wakuv2/missing_messages.go b/wakuv2/missing_messages.go index 976fe16f87c..9b3e0815d48 100644 --- a/wakuv2/missing_messages.go +++ b/wakuv2/missing_messages.go @@ -71,7 +71,7 @@ func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic s peerID: peerID, pubsubTopic: pubsubTopic, contentTopics: contentTopics, - lastChecked: w.timesource.Now().Add(-delay), + lastChecked: w.timesource.Now().Add(-24*time.Hour - delay), // first check of 24 hours back ctx: ctx, cancel: cancel, } @@ -94,40 +94,58 @@ func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic s w.topicInterest[pubsubTopic] = newMissingMessageRequest } +// TriggerCheckForMissingMessages triggers on-demand the check for missing messages, useful when we come back online or from sleep +func (w *Waku) TriggerCheckForMissingMessages() { + select { + case w.checkForMissingMessagesTrigger <- struct{}{}: + default: + } +} + func (w *Waku) checkForMissingMessages() { defer w.wg.Done() defer w.logger.Debug("checkForMissingMessages - done") - t := time.NewTicker(time.Minute) + period := time.Minute + t := time.NewTicker(period) defer t.Stop() var semaphore = make(chan struct{}, 5) + + fetchHistory := func() { + w.topicInterestMu.Lock() + defer w.topicInterestMu.Unlock() + + for _, request := range w.topicInterest { + select { + case <-w.ctx.Done(): + return + default: + semaphore <- struct{}{} + go func(r TopicInterest) { + w.fetchHistory(r) + <-semaphore + }(request) + } + } + } + for { select { + case <-w.checkForMissingMessagesTrigger: + w.logger.Debug("checking for missing messages on-demand...") + t.Reset(period) + fetchHistory() case <-t.C: w.logger.Debug("checking for missing messages...") - w.topicInterestMu.Lock() - for _, request := range w.topicInterest { - select { - case <-w.ctx.Done(): - return - default: - semaphore <- struct{}{} - go func(r TopicInterest) { - w.FetchHistory(r) - <-semaphore - }(request) - } - } - w.topicInterestMu.Unlock() - + fetchHistory() case <-w.ctx.Done(): return } } } -func (w *Waku) FetchHistory(missingHistoryRequest TopicInterest) { +func (w *Waku) fetchHistory(missingHistoryRequest TopicInterest) { for i := 0; i < len(missingHistoryRequest.contentTopics); i += maxContentTopicsPerRequest { j := i + maxContentTopicsPerRequest if j > len(missingHistoryRequest.contentTopics) { @@ -179,19 +197,22 @@ func (w *Waku) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx conte } func (w *Waku) fetchMessagesBatch(missingHistoryRequest TopicInterest, batchFrom int, batchTo int, now time.Time) error { + timeStart := proto.Int64(missingHistoryRequest.lastChecked.Add(-delay).UnixNano()) + timeEnd := proto.Int64(now.Add(-delay).UnixNano()) + logger := w.logger.With( zap.Stringer("peerID", missingHistoryRequest.peerID), zap.Strings("contentTopics", missingHistoryRequest.contentTopics[batchFrom:batchTo]), zap.String("pubsubTopic", missingHistoryRequest.pubsubTopic), - logutils.WakuMessageTimestamp("from", proto.Int64(missingHistoryRequest.lastChecked.UnixNano())), - logutils.WakuMessageTimestamp("to", proto.Int64(now.UnixNano())), + logutils.WakuMessageTimestamp("from", timeStart), + logutils.WakuMessageTimestamp("to", timeEnd), ) result, err := w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { return w.node.Store().Query(ctx, store.FilterCriteria{ ContentFilter: protocol.NewContentFilter(missingHistoryRequest.pubsubTopic, missingHistoryRequest.contentTopics[batchFrom:batchTo]...), - TimeStart: proto.Int64(missingHistoryRequest.lastChecked.Add(-delay).UnixNano()), - TimeEnd: proto.Int64(now.Add(-delay).UnixNano()), + TimeStart: timeStart, + TimeEnd: timeEnd, }, store.WithPeer(missingHistoryRequest.peerID), store.WithPaging(false, 100), store.IncludeData(false)) }, logger, "retrieving history to check for missing messages") if err != nil { diff --git a/wakuv2/waku.go b/wakuv2/waku.go index efc0cc9ae2e..13f122d028d 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -183,6 +183,9 @@ type Waku struct { statusTelemetryClient ITelemetryClient + // to trigger missing messages check on-demand + checkForMissingMessagesTrigger chan struct{} // TODO pablo maybe accept the from/to parameters here + defaultShardInfo protocol.RelayShards } @@ -246,6 +249,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge discV5BootstrapNodes: cfg.DiscV5BootstrapNodes, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onPeerStats: onPeerStats, + checkForMissingMessagesTrigger: make(chan struct{}), onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), }