diff --git a/polygon/sync/block_downloader.go b/polygon/sync/block_downloader.go index a84ca196dc3..7965e118081 100644 --- a/polygon/sync/block_downloader.go +++ b/polygon/sync/block_downloader.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "math" "reflect" "sync" "time" @@ -22,7 +21,6 @@ import ( ) const ( - blockDownloaderLogPrefix = "BlockDownloader" notEnoughPeersBackOffDuration = time.Minute // conservative over-estimation: 1 MB block size x 1024 blocks per waypoint @@ -110,20 +108,37 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp return nil, nil } + d.logger.Debug( + syncLogPrefix("downloading blocks using waypoints"), + "waypointsLen", len(waypoints), + "start", waypoints[0].StartBlock().Uint64(), + "end", waypoints[len(waypoints)-1].EndBlock().Uint64(), + "kind", reflect.TypeOf(waypoints[0]), + ) + // waypoint rootHash->[blocks part of waypoint] waypointBlocksMemo, err := lru.New[common.Hash, []*types.Block](d.p2pService.MaxPeers()) if err != nil { return nil, err } + progressLogTicker := time.NewTicker(30 * time.Second) + defer progressLogTicker.Stop() + var lastBlock *types.Block - lastBlockNum := waypoints[len(waypoints)-1].EndBlock().Uint64() for len(waypoints) > 0 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + // carry-on + } + endBlockNum := waypoints[len(waypoints)-1].EndBlock().Uint64() peers := d.p2pService.ListPeersMayHaveBlockNum(endBlockNum) if len(peers) == 0 { d.logger.Warn( - fmt.Sprintf("[%s] can't use any peers to sync, will try again", blockDownloaderLogPrefix), + syncLogPrefix("can't use any peers to download blocks, will try again in a bit"), "start", waypoints[0].StartBlock(), "end", endBlockNum, "sleepSeconds", d.notEnoughPeersBackOffDuration.Seconds(), @@ -136,21 +151,26 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp numWorkers := cmp.Min(cmp.Min(d.maxWorkers, len(peers)), len(waypoints)) waypointsBatch := waypoints[:numWorkers] - d.logger.Info( - fmt.Sprintf("[%s] downloading blocks", blockDownloaderLogPrefix), - "waypointsBatchLength", len(waypointsBatch), - "startBlockNum", waypointsBatch[0].StartBlock(), - "endBlockNum", waypointsBatch[len(waypointsBatch)-1].EndBlock(), - "kind", reflect.TypeOf(waypointsBatch[0]), - "peerCount", len(peers), - "maxWorkers", d.maxWorkers, - ) + select { + case <-progressLogTicker.C: + d.logger.Info( + syncLogPrefix("downloading blocks progress"), + "waypointsBatchLength", len(waypointsBatch), + "startBlockNum", waypointsBatch[0].StartBlock(), + "endBlockNum", waypointsBatch[len(waypointsBatch)-1].EndBlock(), + "kind", reflect.TypeOf(waypointsBatch[0]), + "peerCount", len(peers), + "maxWorkers", d.maxWorkers, + ) + default: + // carry on + } blockBatches := make([][]*types.Block, len(waypointsBatch)) - maxWaypointLength := float64(0) + maxWaypointLength := uint64(0) wg := sync.WaitGroup{} for i, waypoint := range waypointsBatch { - maxWaypointLength = math.Max(float64(waypoint.Length()), maxWaypointLength) + maxWaypointLength = cmp.Max(waypoint.Length(), maxWaypointLength) wg.Add(1) go func(i int, waypoint heimdall.Waypoint, peerId *p2p.PeerId) { defer wg.Done() @@ -163,7 +183,7 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp blocks, err := d.fetchVerifiedBlocks(ctx, waypoint, peerId) if err != nil { d.logger.Debug( - fmt.Sprintf("[%s] issue downloading waypoint blocks - will try again", blockDownloaderLogPrefix), + syncLogPrefix("issue downloading waypoint blocks - will try again"), "err", err, "start", waypoint.StartBlock(), "end", waypoint.EndBlock(), @@ -186,7 +206,7 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp for i, blockBatch := range blockBatches { if len(blockBatch) == 0 { d.logger.Debug( - fmt.Sprintf("[%s] no blocks - will try again", blockDownloaderLogPrefix), + syncLogPrefix("no blocks - will try again"), "start", waypointsBatch[i].StartBlock(), "end", waypointsBatch[i].EndBlock(), "rootHash", waypointsBatch[i].RootHash(), @@ -197,6 +217,11 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp break } + if blockBatch[0].Number().Uint64() == 0 { + // we do not want to insert block 0 (genesis) + blockBatch = blockBatch[1:] + } + blocks = append(blocks, blockBatch...) } @@ -206,26 +231,19 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp waypoints = waypoints[len(waypointsBatch):] } - if err := d.storage.InsertBlocks(ctx, blocks); err != nil { - return nil, err + if len(blocks) == 0 { + continue } - flushStartTime := time.Now() - if err := d.storage.Flush(ctx); err != nil { + if err := d.storage.InsertBlocks(ctx, blocks); err != nil { return nil, err } - d.logger.Debug( - fmt.Sprintf("[%s] stored blocks", blockDownloaderLogPrefix), - "len", len(blocks), - "duration", time.Since(flushStartTime), - ) - - if (endBlockNum == lastBlockNum) && (len(blocks) > 0) { - lastBlock = blocks[len(blocks)-1] - } + lastBlock = blocks[len(blocks)-1] } + d.logger.Debug(syncLogPrefix("finished downloading blocks using waypoints")) + return lastBlock.Header(), nil } @@ -244,7 +262,7 @@ func (d *blockDownloader) fetchVerifiedBlocks( // 2. Verify headers match waypoint root hash if err = d.headersVerifier(waypoint, headers); err != nil { - d.logger.Debug(fmt.Sprintf("[%s] penalizing peer", blockDownloaderLogPrefix), "peerId", peerId, "err", err) + d.logger.Debug(syncLogPrefix("penalizing peer - invalid headers"), "peerId", peerId, "err", err) if penalizeErr := d.p2pService.Penalize(ctx, peerId); penalizeErr != nil { err = fmt.Errorf("%w: %w", penalizeErr, err) @@ -257,7 +275,7 @@ func (d *blockDownloader) fetchVerifiedBlocks( bodies, err := d.p2pService.FetchBodies(ctx, headers, peerId) if err != nil { if errors.Is(err, &p2p.ErrMissingBodies{}) { - d.logger.Debug(fmt.Sprintf("[%s] penalizing peer", blockDownloaderLogPrefix), "peerId", peerId, "err", err) + d.logger.Debug(syncLogPrefix("penalizing peer - missing bodies"), "peerId", peerId, "err", err) if penalizeErr := d.p2pService.Penalize(ctx, peerId); penalizeErr != nil { err = fmt.Errorf("%w: %w", penalizeErr, err) @@ -275,7 +293,7 @@ func (d *blockDownloader) fetchVerifiedBlocks( // 5. Verify blocks if err = d.blocksVerifier(blocks); err != nil { - d.logger.Debug(fmt.Sprintf("[%s] penalizing peer", blockDownloaderLogPrefix), "peerId", peerId, "err", err) + d.logger.Debug(syncLogPrefix("penalizing peer - invalid blocks"), "peerId", peerId, "err", err) if penalizeErr := d.p2pService.Penalize(ctx, peerId); penalizeErr != nil { err = fmt.Errorf("%w: %w", penalizeErr, err) diff --git a/polygon/sync/block_downloader_test.go b/polygon/sync/block_downloader_test.go index 387c4e1331a..e8bfcf7c1c8 100644 --- a/polygon/sync/block_downloader_test.go +++ b/polygon/sync/block_downloader_test.go @@ -211,10 +211,6 @@ func TestBlockDownloaderDownloadBlocksUsingMilestones(t *testing.T) { InsertBlocks(gomock.Any(), gomock.Any()). DoAndReturn(test.defaultInsertBlocksMock(&blocks)). Times(1) - test.storage.EXPECT(). - Flush(gomock.Any()). - Return(nil). - Times(1) tip, err := test.blockDownloader.DownloadBlocksUsingMilestones(context.Background(), 1) require.NoError(t, err) @@ -250,10 +246,6 @@ func TestBlockDownloaderDownloadBlocksUsingCheckpoints(t *testing.T) { InsertBlocks(gomock.Any(), gomock.Any()). DoAndReturn(test.defaultInsertBlocksMock(&blocks)). Times(4) - test.storage.EXPECT(). - Flush(gomock.Any()). - Return(nil). - Times(4) tip, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1) require.NoError(t, err) @@ -328,10 +320,6 @@ func TestBlockDownloaderDownloadBlocksWhenInvalidHeadersThenPenalizePeerAndReDow DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)). Times(3), ) - test.storage.EXPECT(). - Flush(gomock.Any()). - Return(nil). - Times(4) _, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1) require.NoError(t, err) @@ -358,10 +346,6 @@ func TestBlockDownloaderDownloadBlocksWhenZeroPeersTriesAgain(t *testing.T) { InsertBlocks(gomock.Any(), gomock.Any()). DoAndReturn(test.defaultInsertBlocksMock(&blocks)). Times(4) - test.storage.EXPECT(). - Flush(gomock.Any()). - Return(nil). - Times(4) gomock.InOrder( // first time, no peers at all test.p2pService.EXPECT(). @@ -439,10 +423,6 @@ func TestBlockDownloaderDownloadBlocksWhenInvalidBodiesThenPenalizePeerAndReDown DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)). Times(3), ) - test.storage.EXPECT(). - Flush(gomock.Any()). - Return(nil). - Times(4) _, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1) require.NoError(t, err) @@ -504,10 +484,6 @@ func TestBlockDownloaderDownloadBlocksWhenMissingBodiesThenPenalizePeerAndReDown DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)). Times(3), ) - test.storage.EXPECT(). - Flush(gomock.Any()). - Return(nil). - Times(4) _, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1) require.NoError(t, err) @@ -546,10 +522,6 @@ func TestBlockDownloaderDownloadBlocksRespectsMaxWorkers(t *testing.T) { DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)). Times(1), ) - test.storage.EXPECT(). - Flush(gomock.Any()). - Return(nil). - Times(2) // max 1 worker // 100 peers diff --git a/polygon/sync/execution_client.go b/polygon/sync/execution_client.go index c0e1348b7d4..5c722fdda9e 100644 --- a/polygon/sync/execution_client.go +++ b/polygon/sync/execution_client.go @@ -3,7 +3,7 @@ package sync import ( "context" - "github.com/ledgerwatch/erigon/cl/phase1/execution_client" + executionclient "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/core/types" ) @@ -14,10 +14,10 @@ type ExecutionClient interface { } type executionClient struct { - engine execution_client.ExecutionEngine + engine executionclient.ExecutionEngine } -func NewExecutionClient(engine execution_client.ExecutionEngine) ExecutionClient { +func NewExecutionClient(engine executionclient.ExecutionEngine) ExecutionClient { return &executionClient{engine} } @@ -25,9 +25,10 @@ func (e *executionClient) InsertBlocks(ctx context.Context, blocks []*types.Bloc return e.engine.InsertBlocks(ctx, blocks, true) } -func (e *executionClient) UpdateForkChoice(ctx context.Context, tip *types.Header, finalizedHeader *types.Header) error { - _, err := e.engine.ForkChoiceUpdate(ctx, finalizedHeader.Hash(), tip.Hash(), nil) - return err +func (e *executionClient) UpdateForkChoice(_ context.Context, _ *types.Header, _ *types.Header) error { + // TODO - not ready for execution - missing state sync event and span data - uncomment once ready + //return e.engine.ForkChoiceUpdate(ctx, finalizedHeader.Hash(), tip.Hash()) + return nil } func (e *executionClient) CurrentHeader(ctx context.Context) (*types.Header, error) { diff --git a/polygon/sync/log_prefix.go b/polygon/sync/log_prefix.go new file mode 100644 index 00000000000..1b67fd723b4 --- /dev/null +++ b/polygon/sync/log_prefix.go @@ -0,0 +1,7 @@ +package sync + +import "fmt" + +func syncLogPrefix(message string) string { + return fmt.Sprintf("[sync] %s", message) +} diff --git a/polygon/sync/service.go b/polygon/sync/service.go index cbb4810ad2b..2147bda6cf4 100644 --- a/polygon/sync/service.go +++ b/polygon/sync/service.go @@ -41,7 +41,7 @@ func NewService( ) Service { borConfig := chainConfig.Bor.(*borcfg.BorConfig) execution := NewExecutionClient(executionEngine) - storage := NewStorage(execution, maxPeers) + storage := NewStorage(logger, execution, maxPeers) headersVerifier := VerifyAccumulatedHeaders blocksVerifier := VerifyBlocks p2pService := p2p.NewService(maxPeers, logger, sentryClient, statusDataProvider.GetStatusData) @@ -76,7 +76,7 @@ func NewService( headerValidator, spansCache) } - events := NewTipEvents(p2pService, heimdallService) + events := NewTipEvents(logger, p2pService, heimdallService) sync := NewSync( storage, execution, diff --git a/polygon/sync/storage.go b/polygon/sync/storage.go index 5256aab3e4b..95a306c5cd6 100644 --- a/polygon/sync/storage.go +++ b/polygon/sync/storage.go @@ -4,6 +4,8 @@ import ( "context" "sync" + "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/core/types" ) @@ -18,13 +20,15 @@ type Storage interface { } type executionClientStorage struct { + logger log.Logger execution ExecutionClient queue chan []*types.Block waitGroup sync.WaitGroup } -func NewStorage(execution ExecutionClient, queueCapacity int) Storage { +func NewStorage(logger log.Logger, execution ExecutionClient, queueCapacity int) Storage { return &executionClientStorage{ + logger: logger, execution: execution, queue: make(chan []*types.Block, queueCapacity), } @@ -54,6 +58,8 @@ func (s *executionClientStorage) Flush(ctx context.Context) error { } func (s *executionClientStorage) Run(ctx context.Context) error { + s.logger.Debug(syncLogPrefix("running execution client storage component")) + for { select { case blocks := <-s.queue: diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index 02c5cb215ac..f1de4c0b106 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -79,7 +79,7 @@ func (s *Sync) onMilestoneEvent( } s.logger.Debug( - "sync.Sync.onMilestoneEvent: local chain tip does not match the milestone, unwinding to the previous verified milestone", + syncLogPrefix("onMilestoneEvent: local chain tip does not match the milestone, unwinding to the previous verified milestone"), "err", err, ) @@ -127,9 +127,14 @@ func (s *Sync) onNewBlockEvent( } else { newBlocks, err = s.p2pService.FetchBlocks(ctx, rootNum, newBlockHeaderNum+1, event.PeerId) if err != nil { - if errors.Is(err, &p2p.ErrIncompleteHeaders{}) || errors.Is(err, &p2p.ErrMissingBodies{}) { - s.logger.Debug("sync.Sync.onNewBlockEvent: failed to fetch complete blocks, ignoring event", - "err", err, "peerId", event.PeerId, "lastBlockNum", newBlockHeaderNum) + if (p2p.ErrIncompleteHeaders{}).Is(err) || (p2p.ErrMissingBodies{}).Is(err) { + s.logger.Debug( + syncLogPrefix("onNewBlockEvent: failed to fetch complete blocks, ignoring event"), + "err", err, + "peerId", event.PeerId, + "lastBlockNum", newBlockHeaderNum, + ) + return nil } @@ -138,10 +143,10 @@ func (s *Sync) onNewBlockEvent( } if err := s.blocksVerifier(newBlocks); err != nil { - s.logger.Debug("sync.Sync.onNewBlockEvent: invalid new block event from peer, penalizing and ignoring", "err", err) + s.logger.Debug(syncLogPrefix("onNewBlockEvent: invalid new block event from peer, penalizing and ignoring"), "err", err) if err = s.p2pService.Penalize(ctx, event.PeerId); err != nil { - s.logger.Debug("sync.Sync.onNewBlockEvent: issue with penalizing peer", "err", err) + s.logger.Debug(syncLogPrefix("onNewBlockEvent: issue with penalizing peer"), "err", err) } return nil @@ -154,11 +159,11 @@ func (s *Sync) onNewBlockEvent( oldTip := ccBuilder.Tip() if err = ccBuilder.Connect(newHeaders); err != nil { - s.logger.Debug("sync.Sync.onNewBlockEvent: couldn't connect a header to the local chain tip, ignoring", "err", err) + s.logger.Debug(syncLogPrefix("onNewBlockEvent: couldn't connect a header to the local chain tip, ignoring"), "err", err) return nil } - newTip := ccBuilder.Tip() + newTip := ccBuilder.Tip() if newTip != oldTip { if err = s.execution.InsertBlocks(ctx, newBlocks); err != nil { return err @@ -184,9 +189,14 @@ func (s *Sync) onNewBlockHashesEvent( newBlocks, err := s.p2pService.FetchBlocks(ctx, headerHashNum.Number, headerHashNum.Number+1, event.PeerId) if err != nil { - if errors.Is(err, &p2p.ErrIncompleteHeaders{}) || errors.Is(err, &p2p.ErrMissingBodies{}) { - s.logger.Debug("sync.Sync.onNewBlockHashesEvent: failed to fetch complete blocks, ignoring event", - "err", err, "peerId", event.PeerId, "lastBlockNum", headerHashNum.Number) + if (p2p.ErrIncompleteHeaders{}).Is(err) || (p2p.ErrMissingBodies{}).Is(err) { + s.logger.Debug( + syncLogPrefix("onNewBlockHashesEvent: failed to fetch complete blocks, ignoring event"), + "err", err, + "peerId", event.PeerId, + "lastBlockNum", headerHashNum.Number, + ) + continue } @@ -211,25 +221,37 @@ func (s *Sync) onNewBlockHashesEvent( // func (s *Sync) Run(ctx context.Context) error { + s.logger.Debug(syncLogPrefix("running sync component")) + tip, err := s.execution.CurrentHeader(ctx) if err != nil { return err } - if newTip, err := s.blockDownloader.DownloadBlocksUsingCheckpoints(ctx, tip.Number.Uint64()); err != nil { - return err - } else if newTip != nil { - tip = newTip - } + // loop until we converge at the latest checkpoint & milestone + var prevTip *types.Header + for tip != prevTip { + prevTip = tip - if newTip, err := s.blockDownloader.DownloadBlocksUsingMilestones(ctx, tip.Number.Uint64()); err != nil { - return err - } else if newTip != nil { - tip = newTip - } + newTip, err := s.blockDownloader.DownloadBlocksUsingCheckpoints(ctx, tip.Number.Uint64()+1) + if err != nil { + return err + } + if newTip != nil { + tip = newTip + } - if err = s.commitExecution(ctx, tip, tip); err != nil { - return err + newTip, err = s.blockDownloader.DownloadBlocksUsingMilestones(ctx, tip.Number.Uint64()+1) + if err != nil { + return err + } + if newTip != nil { + tip = newTip + } + + if err = s.commitExecution(ctx, tip, tip); err != nil { + return err + } } latestSpan, err := s.fetchLatestSpan(ctx) diff --git a/polygon/sync/tip_events.go b/polygon/sync/tip_events.go index 43791123eb9..8e9f01568ee 100644 --- a/polygon/sync/tip_events.go +++ b/polygon/sync/tip_events.go @@ -3,6 +3,8 @@ package sync import ( "context" + "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/protocols/eth" "github.com/ledgerwatch/erigon/polygon/heimdall" @@ -66,21 +68,22 @@ func (e Event) AsNewSpan() EventNewSpan { } type TipEvents struct { - events *EventChannel[Event] - + logger log.Logger + events *EventChannel[Event] p2pService p2p.Service heimdallService heimdall.HeimdallNoStore } func NewTipEvents( + logger log.Logger, p2pService p2p.Service, heimdallService heimdall.HeimdallNoStore, ) *TipEvents { eventsCapacity := uint(1000) // more than 3 milestones return &TipEvents{ - events: NewEventChannel[Event](eventsCapacity), - + logger: logger, + events: NewEventChannel[Event](eventsCapacity), p2pService: p2pService, heimdallService: heimdallService, } @@ -91,6 +94,8 @@ func (te *TipEvents) Events() <-chan Event { } func (te *TipEvents) Run(ctx context.Context) error { + te.logger.Debug(syncLogPrefix("running tip events component")) + newBlockObserverCancel := te.p2pService.RegisterNewBlockObserver(func(message *p2p.DecodedInboundMessage[*eth.NewBlockPacket]) { te.events.PushEvent(Event{ Type: EventTypeNewBlock,