From 54d8b17e692b5e64f7d5943539a2329dec638c6f Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 26 Apr 2024 18:28:25 +0800 Subject: [PATCH] lnd: start blockbeat service and register subsystems --- chainio/dispatcher.go | 38 ++++++++++++++++++++++++++++++ log.go | 2 ++ server.go | 55 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 93 insertions(+), 2 deletions(-) diff --git a/chainio/dispatcher.go b/chainio/dispatcher.go index 1a4a367756..34173efe8c 100644 --- a/chainio/dispatcher.go +++ b/chainio/dispatcher.go @@ -183,3 +183,41 @@ func (b *BlockbeatDispatcher) notifyQueues() { } } } + +// SetInitialBeat sets the current beat during the startup. +// +// NOTE: Must be called before `Start`. +func (b *BlockbeatDispatcher) SetInitialBeat() error { + // We need to register for block epochs and retry sweeping every block. + // We should get a notification with the current best block immediately + // if we don't provide any epoch. We'll wait for that in the collector. + blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + return fmt.Errorf("register block epoch ntfn: %w", err) + } + defer blockEpochs.Cancel() + + // We registered for the block epochs with a nil request. The notifier + // should send us the current best block immediately. So we need to + // wait for it here because we need to know the current best height. + select { + case bestBlock := <-blockEpochs.Epochs: + clog.Infof("Received initial block %v at height %d", + bestBlock.Hash, bestBlock.Height) + + // Update the current blockbeat. + b.beat = NewBeat(*bestBlock) + + case <-b.quit: + clog.Debug("Sweeper shutting down") + } + + // Set the initial height for the consumer. + for _, queue := range b.consumerQueues { + for _, c := range queue { + c.SetCurrentBeat(b.beat) + } + } + + return nil +} diff --git a/log.go b/log.go index a24c39fd40..3068666d61 100644 --- a/log.go +++ b/log.go @@ -8,6 +8,7 @@ import ( sphinx "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/build" + "github.com/lightningnetwork/lnd/chainio" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/chanacceptor" @@ -189,6 +190,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor) AddSubLogger( root, blindedpath.Subsystem, interceptor, blindedpath.UseLogger, ) + AddSubLogger(root, chainio.Subsystem, interceptor, chainio.UseLogger) } // AddSubLogger is a helper method to conveniently create and register the diff --git a/server.go b/server.go index 0fc95dc756..34aad31613 100644 --- a/server.go +++ b/server.go @@ -28,6 +28,7 @@ import ( "github.com/lightningnetwork/lnd/aliasmgr" "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/brontide" + "github.com/lightningnetwork/lnd/chainio" "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/chanbackup" @@ -341,6 +342,10 @@ type server struct { // txPublisher is a publisher with fee-bumping capability. txPublisher *sweep.TxPublisher + // blockbeatDispatcher is a block dispatcher that notifies subscribers + // of new blocks. + blockbeatDispatcher *chainio.BlockbeatDispatcher + quit chan struct{} wg sync.WaitGroup @@ -604,6 +609,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, readPool: readPool, chansToRestore: chansToRestore, + blockbeatDispatcher: chainio.NewBlockbeatDispatcher( + cc.ChainNotifier, + ), channelNotifier: channelnotifier.New( dbs.ChanStateDB.ChannelStateDB(), ), @@ -633,10 +641,10 @@ func newServer(cfg *Config, listenAddrs []net.Addr, peersByPub: make(map[string]*peer.Brontide), inboundPeers: make(map[string]*peer.Brontide), outboundPeers: make(map[string]*peer.Brontide), - peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), peerDisconnectedListeners: make(map[string][]chan<- struct{}), - invoiceHtlcModifier: invoiceHtlcModifier, + invoiceHtlcModifier: invoiceHtlcModifier, + peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), customMessageServer: subscribe.NewServer(), @@ -1779,6 +1787,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } s.connMgr = cmgr + // Finally, register the subsystems in blockbeat. + s.registerBlockConsumers() + return s, nil } @@ -1811,6 +1822,25 @@ func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) { routerCfg.MaxMcHistory = cfg.MaxMcHistory } +// registerBlockConsumers registers the subsystems that consume block events. +// By calling `RegisterQueue`, a list of subsystems are registered in the +// blockbeat for block notifications. When a new block arrives, the subsystems +// in the same queue are notified sequentially, and different queues are +// notified concurrently. +// +// NOTE: To put a subsystem in a different queue, create a slice and pass it to +// a new `RegisterQueue` call. +func (s *server) registerBlockConsumers() { + // In this queue, when a new block arrives, it will be received and + // processed in this order: chainArb -> sweeper -> txPublisher. + consumers := []chainio.Consumer{ + s.chainArb, + s.sweeper, + s.txPublisher, + } + s.blockbeatDispatcher.RegisterQueue(consumers) +} + // signAliasUpdate takes a ChannelUpdate and returns the signature. This is // used for option_scid_alias channels where the ChannelUpdate to be sent back // may differ from what is on disk. @@ -2098,6 +2128,13 @@ func (s *server) Start() error { return } + // Once ChainNotifier is started, we can set the initial + // blockbeat for the consumers. + if err := s.blockbeatDispatcher.SetInitialBeat(); err != nil { + startErr = err + return + } + cleanup = cleanup.add(s.cc.BestBlockTracker.Stop) if err := s.cc.BestBlockTracker.Start(); err != nil { startErr = err @@ -2425,6 +2462,17 @@ func (s *server) Start() error { srvrLog.Infof("Auto peer bootstrapping is disabled") } + // Start the blockbeat after all other subsystems have been + // started so they are ready to receive new blocks. + cleanup = cleanup.add(func() error { + s.blockbeatDispatcher.Stop() + return nil + }) + if err := s.blockbeatDispatcher.Start(); err != nil { + startErr = err + return + } + // Set the active flag now that we've completed the full // startup. atomic.StoreInt32(&s.active, 1) @@ -2449,6 +2497,9 @@ func (s *server) Stop() error { // Shutdown connMgr first to prevent conns during shutdown. s.connMgr.Stop() + // Stop dispatching blocks to other systems immediately. + s.blockbeatDispatcher.Stop() + // Shutdown the wallet, funding manager, and the rpc server. if err := s.chanStatusMgr.Stop(); err != nil { srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)