Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service/header/core_listener: Fix bug where old block headers are spammed to headersub by bridge nodes #507

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,14 @@ func (f *BlockFetcher) UnsubscribeNewBlockEvent(ctx context.Context) error {

return f.client.Unsubscribe(ctx, newBlockSubscriber, newBlockEventQuery)
}

// IsSyncing returns the sync status of the Core connection: true for
// syncing, and false for already caught up. It can also return an error
// in the case of a failed status request.
func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) {
resp, err := f.client.Status(ctx)
if err != nil {
return false, err
}
return resp.SyncInfo.CatchingUp, nil
}
38 changes: 27 additions & 11 deletions service/header/core_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func (cl *CoreListener) Start(ctx context.Context) error {
return err
}

ctx, cancel := context.WithCancel(context.Background())
go cl.listen(ctx, sub)
clCtx, cancel := context.WithCancel(context.Background())

go cl.listen(clCtx, sub)
cl.cancel = cancel
return nil
}
Expand All @@ -56,14 +57,20 @@ func (cl *CoreListener) Start(ctx context.Context) error {
func (cl *CoreListener) Stop(ctx context.Context) error {
cl.cancel()
cl.cancel = nil
return cl.fetcher.UnsubscribeNewBlockEvent(ctx)

err := cl.fetcher.UnsubscribeNewBlockEvent(ctx)
if err != nil {
log.Errorw("core-listener: failed to unsubscribe from new block events", "err", err)
}
return nil
}

// listen kicks off a loop, listening for new block events from Core,
// generating ExtendedHeaders and broadcasting them to the header-sub
// gossipsub network.
func (cl *CoreListener) listen(ctx context.Context, sub <-chan *types.Block) {
defer log.Info("core-listener: listening stopped")

for {
select {
case b, ok := <-sub:
Expand All @@ -83,15 +90,24 @@ func (cl *CoreListener) listen(ctx context.Context, sub <-chan *types.Block) {
return
}

// broadcast new ExtendedHeader
err = cl.bcast.Broadcast(ctx, eh)
// only broadcast new ExtendedHeaders if core has finished syncing
// in order to prevent spamming of old block headers to `headersub`
syncing, err := cl.fetcher.IsSyncing(ctx)
if err != nil {
var pserr pubsub.ValidationError
// TODO(@Wondertan): Log ValidationIgnore cases as well, once headr duplication issue is fixed
if errors.As(err, &pserr) && pserr.Reason != pubsub.RejectValidationIgnored {
log.Errorw("core-listener: broadcasting next header", "height", eh.Height,
"err", err)
return
log.Errorw("core-listener: fetching sync status from core", "err", err)
return
}
if !syncing {
// broadcast new ExtendedHeader
err = cl.bcast.Broadcast(ctx, eh)
if err != nil {
var pserr pubsub.ValidationError
// TODO(@Wondertan): Log ValidationIgnore cases as well, once headr duplication issue is fixed
if errors.As(err, &pserr) && pserr.Reason != pubsub.RejectValidationIgnored {
log.Errorw("core-listener: broadcasting next header", "height", eh.Height,
"err", err)
return
}
}
}
case <-ctx.Done():
Expand Down