From d31253276bd9c4bfab0d0a736831488b7d750762 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 9 Mar 2022 14:19:38 +0000 Subject: [PATCH 1/5] bug: fix bug where old block headers are spammed to headersub --- core/fetcher.go | 31 ++++++++++++++++++++ service/header/core_listener.go | 51 ++++++++++++++++++++++++++------- 2 files changed, 71 insertions(+), 11 deletions(-) diff --git a/core/fetcher.go b/core/fetcher.go index ff0e88fa97..48d4bb7860 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -177,3 +177,34 @@ func (f *BlockFetcher) UnsubscribeNewBlockEvent(ctx context.Context) error { return f.client.Unsubscribe(ctx, newBlockSubscriber, newBlockEventQuery) } + +// IsSyncing returns the sync status of the Core connection: true for +// syncing, and false for already caught up. It can also return an error +// in the case of a failed status request. +func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { + resp, err := f.client.Status(ctx) + if err != nil { + return false, err + } + return resp.SyncInfo.CatchingUp, nil +} + +// WaitFinishSync will block until the Core connection has finished syncing, +// returning a nil error. +func (f *BlockFetcher) WaitFinishSync(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + syncing, err := f.IsSyncing(ctx) + if err != nil { + return err + } + if !syncing { + return nil + } + // TODO @renaynay: should I wait a second or two to request again? Doesn't make sense to keep requesting. + } + } +} diff --git a/service/header/core_listener.go b/service/header/core_listener.go index a66a950ea0..6dcaa4d171 100644 --- a/service/header/core_listener.go +++ b/service/header/core_listener.go @@ -36,34 +36,63 @@ func NewCoreListener(bcast Broadcaster, fetcher *core.BlockFetcher, dag format.D } // Start kicks off the CoreListener listener loop. -func (cl *CoreListener) Start(ctx context.Context) error { +func (cl *CoreListener) Start(context.Context) error { if cl.cancel != nil { return fmt.Errorf("core-listener: already started") } - sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) - if err != nil { - return err - } - ctx, cancel := context.WithCancel(context.Background()) - go cl.listen(ctx, sub) + + go cl.listen(ctx) cl.cancel = cancel return nil } // Stop stops the CoreListener listener loop. -func (cl *CoreListener) Stop(ctx context.Context) error { +func (cl *CoreListener) Stop(context.Context) error { cl.cancel() cl.cancel = nil - return cl.fetcher.UnsubscribeNewBlockEvent(ctx) + return nil } // listen kicks off a loop, listening for new block events from Core, // generating ExtendedHeaders and broadcasting them to the header-sub // gossipsub network. -func (cl *CoreListener) listen(ctx context.Context, sub <-chan *types.Block) { - defer log.Info("core-listener: listening stopped") +func (cl *CoreListener) listen(ctx context.Context) { + var sub <-chan *types.Block + + defer func() { + // only attempt to unsubscribe from new block events if currently + // subscribed + if sub != nil { + err := cl.fetcher.UnsubscribeNewBlockEvent(ctx) + if err != nil { + log.Errorw("core-listener: failed to unsubscribe from new block events", "err", err) + } + } + log.Info("core-listener: listening stopped") + }() + + // listener loop will only begin once the Core connection has finished + // syncing in order to prevent spamming of old block headers to `headersub` + select { + case <-ctx.Done(): + log.Errorw("core-listener: listener loop failed to start", "err", ctx.Err()) + return + default: + if err := cl.fetcher.WaitFinishSync(ctx); err != nil { + // TODO @renaynay: should this be a fatal error? if the core listener cannot start, then the bridge node is + // useless as it won't be broadcasting new headers to the network. + log.Errorw("core-listener: listener loop failed to start", "err", err) + } + } + // all caught up, start broadcasting new headers + sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) + if err != nil { + log.Errorw("core-listener: failed to subscribe to new block events", "err", err) + return + } + for { select { case b, ok := <-sub: From 9c2e17eb3930d26d44b775bb8d1ff4e77e32f719 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 16 Mar 2022 12:10:40 +0000 Subject: [PATCH 2/5] docs|feature: addressed Johns comments, added docs and introduced time.Sleep for second before requesting sync state again --- core/fetcher.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/fetcher.go b/core/fetcher.go index 48d4bb7860..b7701944cc 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "time" tmbytes "github.com/tendermint/tendermint/libs/bytes" "github.com/tendermint/tendermint/types" @@ -190,7 +191,9 @@ func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { } // WaitFinishSync will block until the Core connection has finished syncing, -// returning a nil error. +// returning a nil error. A non-nil error will be returned if the context is +// canceled or if there is an error fetching syncing status from the Core +// connection. func (f *BlockFetcher) WaitFinishSync(ctx context.Context) error { for { select { @@ -204,7 +207,8 @@ func (f *BlockFetcher) WaitFinishSync(ctx context.Context) error { if !syncing { return nil } - // TODO @renaynay: should I wait a second or two to request again? Doesn't make sense to keep requesting. + // wait a second before requesting sync status again + time.Sleep(time.Second) } } } From d56b23295452c69dd15d5b13cf563814f7543735 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Mon, 21 Mar 2022 19:48:16 +0100 Subject: [PATCH 3/5] refactor: unsubscribe in stop() --- service/header/core_listener.go | 37 +++++++++++---------------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/service/header/core_listener.go b/service/header/core_listener.go index 6dcaa4d171..dc3009862c 100644 --- a/service/header/core_listener.go +++ b/service/header/core_listener.go @@ -7,7 +7,6 @@ import ( format "github.com/ipfs/go-ipld-format" pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/tendermint/tendermint/types" "github.com/celestiaorg/celestia-node/core" ) @@ -49,9 +48,14 @@ func (cl *CoreListener) Start(context.Context) error { } // Stop stops the CoreListener listener loop. -func (cl *CoreListener) Stop(context.Context) error { +func (cl *CoreListener) Stop(ctx context.Context) error { cl.cancel() cl.cancel = nil + + err := cl.fetcher.UnsubscribeNewBlockEvent(ctx) + if err != nil { + log.Errorw("core-listener: failed to unsubscribe from new block events", "err", err) + } return nil } @@ -59,33 +63,16 @@ func (cl *CoreListener) Stop(context.Context) error { // generating ExtendedHeaders and broadcasting them to the header-sub // gossipsub network. func (cl *CoreListener) listen(ctx context.Context) { - var sub <-chan *types.Block - - defer func() { - // only attempt to unsubscribe from new block events if currently - // subscribed - if sub != nil { - err := cl.fetcher.UnsubscribeNewBlockEvent(ctx) - if err != nil { - log.Errorw("core-listener: failed to unsubscribe from new block events", "err", err) - } - } - log.Info("core-listener: listening stopped") - }() + defer log.Info("core-listener: listening stopped") // listener loop will only begin once the Core connection has finished // syncing in order to prevent spamming of old block headers to `headersub` - select { - case <-ctx.Done(): - log.Errorw("core-listener: listener loop failed to start", "err", ctx.Err()) - return - default: - if err := cl.fetcher.WaitFinishSync(ctx); err != nil { - // TODO @renaynay: should this be a fatal error? if the core listener cannot start, then the bridge node is - // useless as it won't be broadcasting new headers to the network. - log.Errorw("core-listener: listener loop failed to start", "err", err) - } + if err := cl.fetcher.WaitFinishSync(ctx); err != nil { + // TODO @renaynay: should this be a fatal error? if the core listener cannot start, then the bridge node is + // useless as it won't be broadcasting new headers to the network. + log.Errorw("core-listener: listener loop failed to start", "err", err) } + // all caught up, start broadcasting new headers sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) if err != nil { From 68a43f1eeb7c96092f550d5581817b8f6e0d9e0c Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Tue, 22 Mar 2022 20:32:52 +0100 Subject: [PATCH 4/5] refactor: only broadcast when not syncing, but still listen for new blocks even if syncing --- core/fetcher.go | 24 ----------------------- service/header/core_listener.go | 34 ++++++++++++++++----------------- 2 files changed, 17 insertions(+), 41 deletions(-) diff --git a/core/fetcher.go b/core/fetcher.go index b7701944cc..a304a8c2c4 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -3,7 +3,6 @@ package core import ( "context" "fmt" - "time" tmbytes "github.com/tendermint/tendermint/libs/bytes" "github.com/tendermint/tendermint/types" @@ -189,26 +188,3 @@ func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { } return resp.SyncInfo.CatchingUp, nil } - -// WaitFinishSync will block until the Core connection has finished syncing, -// returning a nil error. A non-nil error will be returned if the context is -// canceled or if there is an error fetching syncing status from the Core -// connection. -func (f *BlockFetcher) WaitFinishSync(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - syncing, err := f.IsSyncing(ctx) - if err != nil { - return err - } - if !syncing { - return nil - } - // wait a second before requesting sync status again - time.Sleep(time.Second) - } - } -} diff --git a/service/header/core_listener.go b/service/header/core_listener.go index dc3009862c..e5ddb66dbc 100644 --- a/service/header/core_listener.go +++ b/service/header/core_listener.go @@ -65,15 +65,6 @@ func (cl *CoreListener) Stop(ctx context.Context) error { func (cl *CoreListener) listen(ctx context.Context) { defer log.Info("core-listener: listening stopped") - // listener loop will only begin once the Core connection has finished - // syncing in order to prevent spamming of old block headers to `headersub` - if err := cl.fetcher.WaitFinishSync(ctx); err != nil { - // TODO @renaynay: should this be a fatal error? if the core listener cannot start, then the bridge node is - // useless as it won't be broadcasting new headers to the network. - log.Errorw("core-listener: listener loop failed to start", "err", err) - } - - // all caught up, start broadcasting new headers sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) if err != nil { log.Errorw("core-listener: failed to subscribe to new block events", "err", err) @@ -99,15 +90,24 @@ func (cl *CoreListener) listen(ctx context.Context) { return } - // broadcast new ExtendedHeader - err = cl.bcast.Broadcast(ctx, eh) + // only broadcast new ExtendedHeaders if core has finished syncing + // in order to prevent spamming of old block headers to `headersub` + syncing, err := cl.fetcher.IsSyncing(ctx) if err != nil { - var pserr pubsub.ValidationError - // TODO(@Wondertan): Log ValidationIgnore cases as well, once headr duplication issue is fixed - if errors.As(err, &pserr) && pserr.Reason != pubsub.RejectValidationIgnored { - log.Errorw("core-listener: broadcasting next header", "height", eh.Height, - "err", err) - return + log.Errorw("core-listener: fetching sync status from core", "err", err) + return + } + if !syncing { + // broadcast new ExtendedHeader + err = cl.bcast.Broadcast(ctx, eh) + if err != nil { + var pserr pubsub.ValidationError + // TODO(@Wondertan): Log ValidationIgnore cases as well, once headr duplication issue is fixed + if errors.As(err, &pserr) && pserr.Reason != pubsub.RejectValidationIgnored { + log.Errorw("core-listener: broadcasting next header", "height", eh.Height, + "err", err) + return + } } } case <-ctx.Done(): From dfbfdf68e8d84bc68a7502704a21bb28c654d64d Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Thu, 24 Mar 2022 10:15:31 +0100 Subject: [PATCH 5/5] refactor: move subscription to start --- service/header/core_listener.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/service/header/core_listener.go b/service/header/core_listener.go index e5ddb66dbc..63da6a08df 100644 --- a/service/header/core_listener.go +++ b/service/header/core_listener.go @@ -7,6 +7,7 @@ import ( format "github.com/ipfs/go-ipld-format" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/tendermint/tendermint/types" "github.com/celestiaorg/celestia-node/core" ) @@ -35,14 +36,19 @@ func NewCoreListener(bcast Broadcaster, fetcher *core.BlockFetcher, dag format.D } // Start kicks off the CoreListener listener loop. -func (cl *CoreListener) Start(context.Context) error { +func (cl *CoreListener) Start(ctx context.Context) error { if cl.cancel != nil { return fmt.Errorf("core-listener: already started") } - ctx, cancel := context.WithCancel(context.Background()) + sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) + if err != nil { + return err + } + + clCtx, cancel := context.WithCancel(context.Background()) - go cl.listen(ctx) + go cl.listen(clCtx, sub) cl.cancel = cancel return nil } @@ -62,15 +68,9 @@ func (cl *CoreListener) Stop(ctx context.Context) error { // listen kicks off a loop, listening for new block events from Core, // generating ExtendedHeaders and broadcasting them to the header-sub // gossipsub network. -func (cl *CoreListener) listen(ctx context.Context) { +func (cl *CoreListener) listen(ctx context.Context, sub <-chan *types.Block) { defer log.Info("core-listener: listening stopped") - sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) - if err != nil { - log.Errorw("core-listener: failed to subscribe to new block events", "err", err) - return - } - for { select { case b, ok := <-sub: