Skip to content

Commit

Permalink
Merge pull request #549 from celestiaorg/hlib/prevent-spamming
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan authored Mar 25, 2022
2 parents c9db56a + dcb944f commit eb67835
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 8 deletions.
11 changes: 11 additions & 0 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,14 @@ func (f *BlockFetcher) UnsubscribeNewBlockEvent(ctx context.Context) error {

return f.client.Unsubscribe(ctx, newBlockSubscriber, newBlockEventQuery)
}

// IsSyncing returns the sync status of the Core connection: true for
// syncing, and false for already caught up. It can also return an error
// in the case of a failed status request.
func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) {
resp, err := f.client.Status(ctx)
if err != nil {
return false, err
}
return resp.SyncInfo.CatchingUp, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,6 @@ require (
)

replace (
github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf => github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34
github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf => github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220325112944-d33a3e5e13d5
github.com/tendermint/tendermint v0.34.14 => github.com/celestiaorg/celestia-core v0.34.14-celestia
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ github.com/celestiaorg/go-leopard v0.1.0 h1:28z2EkvKJIez5J9CEaiiUEC+OxalRLtTGJJ1
github.com/celestiaorg/go-leopard v0.1.0/go.mod h1:NtO/rjlB8dw2aq7jr06vZFKGvryQcTDXaNHelmPNOAM=
github.com/celestiaorg/go-libp2p-messenger v0.1.0 h1:rFldTa3ZWcRRn8E2bRWS94Qp1GFYXO2a0uvqpIey1B8=
github.com/celestiaorg/go-libp2p-messenger v0.1.0/go.mod h1:XzNksXrH0VxuNRGOnjPL9Ck4UyQlbmMpCYg9YwSBerI=
github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34 h1:NB+H2aczLNQgPUu59MHrk9ZAzCsGOkxszpHaClUGRUo=
github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E=
github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220325112944-d33a3e5e13d5 h1:0+iUVoVKzNDioH7MG2OrT7CifpxegvQpcTPlFG8kjTE=
github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220325112944-d33a3e5e13d5/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E=
github.com/celestiaorg/go-verifcid v0.0.1-lazypatch h1:9TSe3w1cmJmbWlweCwCTIZkan7jV8M+KwglXpdD+UG8=
github.com/celestiaorg/go-verifcid v0.0.1-lazypatch/go.mod h1:kXPYu0XqTNUKWA1h3M95UHjUqBzDwXVVt/RXZDjKJmQ=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc=
Expand Down
10 changes: 8 additions & 2 deletions service/header/core_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func (cl *CoreListener) listen(ctx context.Context, sub <-chan *types.Block) {
return
}

syncing, err := cl.fetcher.IsSyncing(ctx)
if err != nil {
log.Errorw("core-listener: getting sync state", "err", err)
return
}

comm, vals, err := cl.fetcher.GetBlockInfo(ctx, &b.Height)
if err != nil {
log.Errorw("core-listener: getting block info", "err", err)
Expand All @@ -83,8 +89,8 @@ func (cl *CoreListener) listen(ctx context.Context, sub <-chan *types.Block) {
return
}

// broadcast new ExtendedHeader
err = cl.bcast.Broadcast(ctx, eh)
// broadcast new ExtendedHeader, but if core is still syncing, notify only local subscribers
err = cl.bcast.Broadcast(ctx, eh, pubsub.WithLocalPublication(syncing))
if err != nil {
var pserr pubsub.ValidationError
// TODO(@Wondertan): Log ValidationIgnore cases as well, once headr duplication issue is fixed
Expand Down
2 changes: 1 addition & 1 deletion service/header/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Subscription interface {

// Broadcaster broadcasts an ExtendedHeader to the network.
type Broadcaster interface {
Broadcast(ctx context.Context, header *ExtendedHeader) error
Broadcast(ctx context.Context, header *ExtendedHeader, opts ...pubsub.PubOpt) error
}

// Exchange encompasses the behavior necessary to request ExtendedHeaders
Expand Down
4 changes: 2 additions & 2 deletions service/header/p2p_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ func (p *P2PSubscriber) Subscribe() (Subscription, error) {
}

// Broadcast broadcasts the given ExtendedHeader to the topic.
func (p *P2PSubscriber) Broadcast(ctx context.Context, header *ExtendedHeader) error {
func (p *P2PSubscriber) Broadcast(ctx context.Context, header *ExtendedHeader, opts ...pubsub.PubOpt) error {
bin, err := header.MarshalBinary()
if err != nil {
return err
}
return p.topic.Publish(ctx, bin)
return p.topic.Publish(ctx, bin, opts...)
}

// msgID computes an id for a pubsub message
Expand Down

0 comments on commit eb67835

Please sign in to comment.