Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): prevent spamming of old headers during bridge node sync #549

Merged
merged 4 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,14 @@ func (f *BlockFetcher) UnsubscribeNewBlockEvent(ctx context.Context) error {

return f.client.Unsubscribe(ctx, newBlockSubscriber, newBlockEventQuery)
}

// IsSyncing returns the sync status of the Core connection: true for
// syncing, and false for already caught up. It can also return an error
// in the case of a failed status request.
func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) {
resp, err := f.client.Status(ctx)
if err != nil {
return false, err
}
return resp.SyncInfo.CatchingUp, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,6 @@ require (
)

replace (
github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf => github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34
github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf => github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220325112944-d33a3e5e13d5
github.com/tendermint/tendermint v0.34.14 => github.com/celestiaorg/celestia-core v0.34.14-celestia
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ github.com/celestiaorg/go-leopard v0.1.0 h1:28z2EkvKJIez5J9CEaiiUEC+OxalRLtTGJJ1
github.com/celestiaorg/go-leopard v0.1.0/go.mod h1:NtO/rjlB8dw2aq7jr06vZFKGvryQcTDXaNHelmPNOAM=
github.com/celestiaorg/go-libp2p-messenger v0.1.0 h1:rFldTa3ZWcRRn8E2bRWS94Qp1GFYXO2a0uvqpIey1B8=
github.com/celestiaorg/go-libp2p-messenger v0.1.0/go.mod h1:XzNksXrH0VxuNRGOnjPL9Ck4UyQlbmMpCYg9YwSBerI=
github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34 h1:NB+H2aczLNQgPUu59MHrk9ZAzCsGOkxszpHaClUGRUo=
github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E=
github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220325112944-d33a3e5e13d5 h1:0+iUVoVKzNDioH7MG2OrT7CifpxegvQpcTPlFG8kjTE=
github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220325112944-d33a3e5e13d5/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E=
github.com/celestiaorg/go-verifcid v0.0.1-lazypatch h1:9TSe3w1cmJmbWlweCwCTIZkan7jV8M+KwglXpdD+UG8=
github.com/celestiaorg/go-verifcid v0.0.1-lazypatch/go.mod h1:kXPYu0XqTNUKWA1h3M95UHjUqBzDwXVVt/RXZDjKJmQ=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc=
Expand Down
10 changes: 8 additions & 2 deletions service/header/core_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func (cl *CoreListener) listen(ctx context.Context, sub <-chan *types.Block) {
return
}

syncing, err := cl.fetcher.IsSyncing(ctx)
if err != nil {
log.Errorw("core-listener: getting sync state", "err", err)
return
}

comm, vals, err := cl.fetcher.GetBlockInfo(ctx, &b.Height)
if err != nil {
log.Errorw("core-listener: getting block info", "err", err)
Expand All @@ -83,8 +89,8 @@ func (cl *CoreListener) listen(ctx context.Context, sub <-chan *types.Block) {
return
}

// broadcast new ExtendedHeader
err = cl.bcast.Broadcast(ctx, eh)
// broadcast new ExtendedHeader, but if core is still syncing, notify only local subscribers
err = cl.bcast.Broadcast(ctx, eh, pubsub.WithLocalPublication(syncing))
if err != nil {
var pserr pubsub.ValidationError
// TODO(@Wondertan): Log ValidationIgnore cases as well, once headr duplication issue is fixed
Expand Down
2 changes: 1 addition & 1 deletion service/header/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Subscription interface {

// Broadcaster broadcasts an ExtendedHeader to the network.
type Broadcaster interface {
Broadcast(ctx context.Context, header *ExtendedHeader) error
Broadcast(ctx context.Context, header *ExtendedHeader, opts ...pubsub.PubOpt) error
}

// Exchange encompasses the behavior necessary to request ExtendedHeaders
Expand Down
4 changes: 2 additions & 2 deletions service/header/p2p_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ func (p *P2PSubscriber) Subscribe() (Subscription, error) {
}

// Broadcast broadcasts the given ExtendedHeader to the topic.
func (p *P2PSubscriber) Broadcast(ctx context.Context, header *ExtendedHeader) error {
func (p *P2PSubscriber) Broadcast(ctx context.Context, header *ExtendedHeader, opts ...pubsub.PubOpt) error {
bin, err := header.MarshalBinary()
if err != nil {
return err
}
return p.topic.Publish(ctx, bin)
return p.topic.Publish(ctx, bin, opts...)
}

// msgID computes an id for a pubsub message
Expand Down