Skip to content

Commit

Permalink
lnd: start blockbeat service and register subsystems
Browse files Browse the repository at this point in the history
  • Loading branch information
yyforyongyu committed Oct 15, 2024
1 parent c594915 commit 54d8b17
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
38 changes: 38 additions & 0 deletions chainio/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,41 @@ func (b *BlockbeatDispatcher) notifyQueues() {
}
}
}

// SetInitialBeat sets the current beat during the startup.
//
// NOTE: Must be called before `Start`.
func (b *BlockbeatDispatcher) SetInitialBeat() error {
// We need to register for block epochs and retry sweeping every block.
// We should get a notification with the current best block immediately
// if we don't provide any epoch. We'll wait for that in the collector.
blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return fmt.Errorf("register block epoch ntfn: %w", err)
}
defer blockEpochs.Cancel()

// We registered for the block epochs with a nil request. The notifier
// should send us the current best block immediately. So we need to
// wait for it here because we need to know the current best height.
select {
case bestBlock := <-blockEpochs.Epochs:
clog.Infof("Received initial block %v at height %d",
bestBlock.Hash, bestBlock.Height)

// Update the current blockbeat.
b.beat = NewBeat(*bestBlock)

case <-b.quit:
clog.Debug("Sweeper shutting down")
}

// Set the initial height for the consumer.
for _, queue := range b.consumerQueues {
for _, c := range queue {
c.SetCurrentBeat(b.beat)
}
}

return nil
}
2 changes: 2 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanacceptor"
Expand Down Expand Up @@ -189,6 +190,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor)
AddSubLogger(
root, blindedpath.Subsystem, interceptor, blindedpath.UseLogger,
)
AddSubLogger(root, chainio.Subsystem, interceptor, chainio.UseLogger)
}

// AddSubLogger is a helper method to conveniently create and register the
Expand Down
55 changes: 53 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/lightningnetwork/lnd/aliasmgr"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/chanbackup"
Expand Down Expand Up @@ -341,6 +342,10 @@ type server struct {
// txPublisher is a publisher with fee-bumping capability.
txPublisher *sweep.TxPublisher

// blockbeatDispatcher is a block dispatcher that notifies subscribers
// of new blocks.
blockbeatDispatcher *chainio.BlockbeatDispatcher

quit chan struct{}

wg sync.WaitGroup
Expand Down Expand Up @@ -604,6 +609,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
readPool: readPool,
chansToRestore: chansToRestore,

blockbeatDispatcher: chainio.NewBlockbeatDispatcher(
cc.ChainNotifier,
),
channelNotifier: channelnotifier.New(
dbs.ChanStateDB.ChannelStateDB(),
),
Expand Down Expand Up @@ -633,10 +641,10 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
peersByPub: make(map[string]*peer.Brontide),
inboundPeers: make(map[string]*peer.Brontide),
outboundPeers: make(map[string]*peer.Brontide),
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
peerDisconnectedListeners: make(map[string][]chan<- struct{}),

invoiceHtlcModifier: invoiceHtlcModifier,
invoiceHtlcModifier: invoiceHtlcModifier,
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),

customMessageServer: subscribe.NewServer(),

Expand Down Expand Up @@ -1779,6 +1787,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}
s.connMgr = cmgr

// Finally, register the subsystems in blockbeat.
s.registerBlockConsumers()

return s, nil
}

Expand Down Expand Up @@ -1811,6 +1822,25 @@ func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) {
routerCfg.MaxMcHistory = cfg.MaxMcHistory
}

// registerBlockConsumers registers the subsystems that consume block events.
// By calling `RegisterQueue`, a list of subsystems are registered in the
// blockbeat for block notifications. When a new block arrives, the subsystems
// in the same queue are notified sequentially, and different queues are
// notified concurrently.
//
// NOTE: To put a subsystem in a different queue, create a slice and pass it to
// a new `RegisterQueue` call.
func (s *server) registerBlockConsumers() {
// In this queue, when a new block arrives, it will be received and
// processed in this order: chainArb -> sweeper -> txPublisher.
consumers := []chainio.Consumer{
s.chainArb,
s.sweeper,
s.txPublisher,
}
s.blockbeatDispatcher.RegisterQueue(consumers)
}

// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
// used for option_scid_alias channels where the ChannelUpdate to be sent back
// may differ from what is on disk.
Expand Down Expand Up @@ -2098,6 +2128,13 @@ func (s *server) Start() error {
return
}

// Once ChainNotifier is started, we can set the initial
// blockbeat for the consumers.
if err := s.blockbeatDispatcher.SetInitialBeat(); err != nil {
startErr = err
return
}

cleanup = cleanup.add(s.cc.BestBlockTracker.Stop)
if err := s.cc.BestBlockTracker.Start(); err != nil {
startErr = err
Expand Down Expand Up @@ -2425,6 +2462,17 @@ func (s *server) Start() error {
srvrLog.Infof("Auto peer bootstrapping is disabled")
}

// Start the blockbeat after all other subsystems have been
// started so they are ready to receive new blocks.
cleanup = cleanup.add(func() error {
s.blockbeatDispatcher.Stop()
return nil
})
if err := s.blockbeatDispatcher.Start(); err != nil {
startErr = err
return
}

// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&s.active, 1)
Expand All @@ -2449,6 +2497,9 @@ func (s *server) Stop() error {
// Shutdown connMgr first to prevent conns during shutdown.
s.connMgr.Stop()

// Stop dispatching blocks to other systems immediately.
s.blockbeatDispatcher.Stop()

// Shutdown the wallet, funding manager, and the rpc server.
if err := s.chanStatusMgr.Stop(); err != nil {
srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)
Expand Down

0 comments on commit 54d8b17

Please sign in to comment.