Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore_: use periodic store query #5455

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions eth-node/bridge/geth/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (w *GethWakuWrapper) SetCriteriaForMissingMessageVerification(peerID peer.I
return errors.New("not available in WakuV1")
}

func (w *GethWakuWrapper) TriggerCheckForMissingMessages() error {
return errors.New("not available in WakuV1")
}

// Peers function only added for compatibility with waku V2
func (w *GethWakuWrapper) Peers() map[string]types.WakuV2Peer {
p := make(map[string]types.WakuV2Peer)
Expand Down
8 changes: 8 additions & 0 deletions eth-node/bridge/geth/wakuv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,14 @@ func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerID peer
return nil
}

func (w *gethWakuV2Wrapper) TriggerCheckForMissingMessages() error {
w.waku.TriggerCheckForMissingMessages()

// No err can be be generated by this function. The function returns an error
// Just so there's compatibility with GethWakuWrapper from V1
return nil
}

func (w *gethWakuV2Wrapper) ConnectionChanged(state connection.State) {
w.waku.ConnectionChanged(state)
}
Expand Down
2 changes: 2 additions & 0 deletions eth-node/types/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type Waku interface {

SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []string) error

TriggerCheckForMissingMessages() error

// MinPow returns the PoW value required by this node.
MinPow() float64
// BloomFilter returns the aggregated bloom filter for all the topics of interest.
Expand Down
17 changes: 12 additions & 5 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,6 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
m.updateCommunitiesActiveMembersPeriodically()
m.schedulePublishGrantsForControlledCommunities()
m.handleENSVerificationSubscription(ensSubscription)
m.watchConnectionChange()
m.watchChatsAndCommunitiesToUnmute()
m.watchCommunitiesToUnmute()
m.watchExpiredMessages()
Expand Down Expand Up @@ -869,13 +868,18 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
return nil, err
}

go m.checkForMissingMessagesLoop()

controlledCommunities, err := m.communitiesManager.Controlled()
if err != nil {
return nil, err
}

msAvailable := m.SubscribeMailserverAvailable()
go func() {
<-msAvailable
m.checkForMissingMessagesLoop()
m.watchConnectionChange()
}()

if m.archiveManager.IsReady() {
available := m.SubscribeMailserverAvailable()
go func() {
Expand Down Expand Up @@ -1002,9 +1006,12 @@ func (m *Messenger) handleConnectionChange(online bool) {
m.shouldPublishContactCode = false
}

// Start fetching messages from store nodes
if online && m.config.codeControlFlags.AutoRequestHistoricMessages {
m.asyncRequestAllHistoricMessages()
if m.transport.WakuVersion() == 2 {
m.transport.TriggerCheckForMissingMessages()
} else {
m.asyncRequestAllHistoricMessages()
}
}

// Update ENS verifier
Expand Down
54 changes: 34 additions & 20 deletions protocol/messenger_mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,32 +382,46 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries
const missingMessageCheckPeriod = 30 * time.Second

func (m *Messenger) checkForMissingMessagesLoop() {
t := time.NewTicker(missingMessageCheckPeriod)
defer t.Stop()
setCriteria := func() error {
filters := m.transport.Filters()
filtersByMs := m.SplitFiltersByStoreNode(filters)
for communityID, filtersForMs := range filtersByMs {
ms := m.getActiveMailserver(communityID)
if ms == nil {
continue
}

for {
select {
case <-m.quit:
return

case <-t.C:
filters := m.transport.Filters()
filtersByMs := m.SplitFiltersByStoreNode(filters)
for communityID, filtersForMs := range filtersByMs {
ms := m.getActiveMailserver(communityID)
if ms == nil {
continue
}
peerID, err := ms.PeerID()
if err != nil {
return fmt.Errorf("could not obtain the peerID")
}
m.transport.SetCriteriaForMissingMessageVerification(peerID, filtersForMs)
}
return nil
}

peerID, err := ms.PeerID()
if err != nil {
m.logger.Error("could not obtain the peerID")
// set criteria for the first time
if err := setCriteria(); err != nil {
m.logger.Error("setting criteria for missing message notification", zap.Error(err))
return
}

go func() {
t := time.NewTicker(missingMessageCheckPeriod)
defer t.Stop()

for {
select {
case <-m.quit:
return
case <-t.C:
if err := setCriteria(); err != nil {
m.logger.Error("setting criteria for missing message notification", zap.Error(err))
return
}
m.transport.SetCriteriaForMissingMessageVerification(peerID, filtersForMs)
}
}
}
}()
}

func getPrioritizedBatches() []int {
Expand Down
9 changes: 9 additions & 0 deletions protocol/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,15 @@ func (t *Transport) SetStorePeerID(peerID peer.ID) {
t.waku.SetStorePeerID(peerID)
}

func (t *Transport) TriggerCheckForMissingMessages() {
if t.waku.Version() < 2 {
return
}
if err := t.waku.TriggerCheckForMissingMessages(); err != nil {
t.logger.Error("failed to trigger check for missing messages", zap.Error(err))
}
}

func (t *Transport) SetCriteriaForMissingMessageVerification(peerID peer.ID, filters []*Filter) {
if t.waku.Version() != 2 {
return
Expand Down
65 changes: 43 additions & 22 deletions wakuv2/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic s
peerID: peerID,
pubsubTopic: pubsubTopic,
contentTopics: contentTopics,
lastChecked: w.timesource.Now().Add(-delay),
lastChecked: w.timesource.Now().Add(-24*time.Hour - delay), // first check of 24 hours back
ctx: ctx,
cancel: cancel,
}
Expand All @@ -94,40 +94,58 @@ func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic s
w.topicInterest[pubsubTopic] = newMissingMessageRequest
}

// TriggerCheckForMissingMessages triggers on-demand the check for missing messages, useful when we come back online or from sleep
func (w *Waku) TriggerCheckForMissingMessages() {
select {
case w.checkForMissingMessagesTrigger <- struct{}{}:
default:
}
}

func (w *Waku) checkForMissingMessages() {
defer w.wg.Done()
defer w.logger.Debug("checkForMissingMessages - done")

t := time.NewTicker(time.Minute)
period := time.Minute
t := time.NewTicker(period)
defer t.Stop()

var semaphore = make(chan struct{}, 5)

fetchHistory := func() {
w.topicInterestMu.Lock()
defer w.topicInterestMu.Unlock()

for _, request := range w.topicInterest {
select {
case <-w.ctx.Done():
return
default:
semaphore <- struct{}{}
go func(r TopicInterest) {
w.fetchHistory(r)
<-semaphore
}(request)
}
}
}

for {
select {
case <-w.checkForMissingMessagesTrigger:
w.logger.Debug("checking for missing messages on-demand...")
t.Reset(period)
fetchHistory()
case <-t.C:
w.logger.Debug("checking for missing messages...")
w.topicInterestMu.Lock()
for _, request := range w.topicInterest {
select {
case <-w.ctx.Done():
return
default:
semaphore <- struct{}{}
go func(r TopicInterest) {
w.FetchHistory(r)
<-semaphore
}(request)
}
}
w.topicInterestMu.Unlock()

fetchHistory()
case <-w.ctx.Done():
return
}
}
}

func (w *Waku) FetchHistory(missingHistoryRequest TopicInterest) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richard-ramos I still don't see this method returning all messages in the store node even though the underlying bug in storev3 has been fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the commit history of https://github.com/waku-org/nwaku/releases/tag/v0.30.2 (the version installed on the fleet according to https://fleets.waku.org/), it does not include this commit: waku-org/nwaku@f54ba10

We still gotta wait until v0.31 is released.

func (w *Waku) fetchHistory(missingHistoryRequest TopicInterest) {
for i := 0; i < len(missingHistoryRequest.contentTopics); i += maxContentTopicsPerRequest {
j := i + maxContentTopicsPerRequest
if j > len(missingHistoryRequest.contentTopics) {
Expand Down Expand Up @@ -179,19 +197,22 @@ func (w *Waku) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx conte
}

func (w *Waku) fetchMessagesBatch(missingHistoryRequest TopicInterest, batchFrom int, batchTo int, now time.Time) error {
timeStart := proto.Int64(missingHistoryRequest.lastChecked.Add(-delay).UnixNano())
timeEnd := proto.Int64(now.Add(-delay).UnixNano())

logger := w.logger.With(
zap.Stringer("peerID", missingHistoryRequest.peerID),
zap.Strings("contentTopics", missingHistoryRequest.contentTopics[batchFrom:batchTo]),
zap.String("pubsubTopic", missingHistoryRequest.pubsubTopic),
logutils.WakuMessageTimestamp("from", proto.Int64(missingHistoryRequest.lastChecked.UnixNano())),
logutils.WakuMessageTimestamp("to", proto.Int64(now.UnixNano())),
logutils.WakuMessageTimestamp("from", timeStart),
logutils.WakuMessageTimestamp("to", timeEnd),
)

result, err := w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) {
return w.node.Store().Query(ctx, store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(missingHistoryRequest.pubsubTopic, missingHistoryRequest.contentTopics[batchFrom:batchTo]...),
TimeStart: proto.Int64(missingHistoryRequest.lastChecked.Add(-delay).UnixNano()),
TimeEnd: proto.Int64(now.Add(-delay).UnixNano()),
TimeStart: timeStart,
TimeEnd: timeEnd,
}, store.WithPeer(missingHistoryRequest.peerID), store.WithPaging(false, 100), store.IncludeData(false))
}, logger, "retrieving history to check for missing messages")
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ type Waku struct {

statusTelemetryClient ITelemetryClient

// to trigger missing messages check on-demand
checkForMissingMessagesTrigger chan struct{} // TODO pablo maybe accept the from/to parameters here

defaultShardInfo protocol.RelayShards
}

Expand Down Expand Up @@ -246,6 +249,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
discV5BootstrapNodes: cfg.DiscV5BootstrapNodes,
onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed,
onPeerStats: onPeerStats,
checkForMissingMessagesTrigger: make(chan struct{}),
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
}

Expand Down