Skip to content

Commit

Permalink
polygon/sync: fix issues found during testing (#9873)
Browse files Browse the repository at this point in the history
Fixes problems found during bor-mainnet sync tests:

- sync.Run was missing a for loop for initial sync convergences
- temporarily disabling ForkChoiceUpdate execution since we are yet to
solve and add missing state sync events and spans
- no need to flush storage inside block downloader (better async
performance)
- logging improvements
  • Loading branch information
taratorio authored Apr 8, 2024
1 parent faf7a43 commit c672452
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 97 deletions.
84 changes: 51 additions & 33 deletions polygon/sync/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math"
"reflect"
"sync"
"time"
Expand All @@ -22,7 +21,6 @@ import (
)

const (
blockDownloaderLogPrefix = "BlockDownloader"
notEnoughPeersBackOffDuration = time.Minute

// conservative over-estimation: 1 MB block size x 1024 blocks per waypoint
Expand Down Expand Up @@ -110,20 +108,37 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
return nil, nil
}

d.logger.Debug(
syncLogPrefix("downloading blocks using waypoints"),
"waypointsLen", len(waypoints),
"start", waypoints[0].StartBlock().Uint64(),
"end", waypoints[len(waypoints)-1].EndBlock().Uint64(),
"kind", reflect.TypeOf(waypoints[0]),
)

// waypoint rootHash->[blocks part of waypoint]
waypointBlocksMemo, err := lru.New[common.Hash, []*types.Block](d.p2pService.MaxPeers())
if err != nil {
return nil, err
}

progressLogTicker := time.NewTicker(30 * time.Second)
defer progressLogTicker.Stop()

var lastBlock *types.Block
lastBlockNum := waypoints[len(waypoints)-1].EndBlock().Uint64()
for len(waypoints) > 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// carry-on
}

endBlockNum := waypoints[len(waypoints)-1].EndBlock().Uint64()
peers := d.p2pService.ListPeersMayHaveBlockNum(endBlockNum)
if len(peers) == 0 {
d.logger.Warn(
fmt.Sprintf("[%s] can't use any peers to sync, will try again", blockDownloaderLogPrefix),
syncLogPrefix("can't use any peers to download blocks, will try again in a bit"),
"start", waypoints[0].StartBlock(),
"end", endBlockNum,
"sleepSeconds", d.notEnoughPeersBackOffDuration.Seconds(),
Expand All @@ -136,21 +151,26 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
numWorkers := cmp.Min(cmp.Min(d.maxWorkers, len(peers)), len(waypoints))
waypointsBatch := waypoints[:numWorkers]

d.logger.Info(
fmt.Sprintf("[%s] downloading blocks", blockDownloaderLogPrefix),
"waypointsBatchLength", len(waypointsBatch),
"startBlockNum", waypointsBatch[0].StartBlock(),
"endBlockNum", waypointsBatch[len(waypointsBatch)-1].EndBlock(),
"kind", reflect.TypeOf(waypointsBatch[0]),
"peerCount", len(peers),
"maxWorkers", d.maxWorkers,
)
select {
case <-progressLogTicker.C:
d.logger.Info(
syncLogPrefix("downloading blocks progress"),
"waypointsBatchLength", len(waypointsBatch),
"startBlockNum", waypointsBatch[0].StartBlock(),
"endBlockNum", waypointsBatch[len(waypointsBatch)-1].EndBlock(),
"kind", reflect.TypeOf(waypointsBatch[0]),
"peerCount", len(peers),
"maxWorkers", d.maxWorkers,
)
default:
// carry on
}

blockBatches := make([][]*types.Block, len(waypointsBatch))
maxWaypointLength := float64(0)
maxWaypointLength := uint64(0)
wg := sync.WaitGroup{}
for i, waypoint := range waypointsBatch {
maxWaypointLength = math.Max(float64(waypoint.Length()), maxWaypointLength)
maxWaypointLength = cmp.Max(waypoint.Length(), maxWaypointLength)
wg.Add(1)
go func(i int, waypoint heimdall.Waypoint, peerId *p2p.PeerId) {
defer wg.Done()
Expand All @@ -163,7 +183,7 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
blocks, err := d.fetchVerifiedBlocks(ctx, waypoint, peerId)
if err != nil {
d.logger.Debug(
fmt.Sprintf("[%s] issue downloading waypoint blocks - will try again", blockDownloaderLogPrefix),
syncLogPrefix("issue downloading waypoint blocks - will try again"),
"err", err,
"start", waypoint.StartBlock(),
"end", waypoint.EndBlock(),
Expand All @@ -186,7 +206,7 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
for i, blockBatch := range blockBatches {
if len(blockBatch) == 0 {
d.logger.Debug(
fmt.Sprintf("[%s] no blocks - will try again", blockDownloaderLogPrefix),
syncLogPrefix("no blocks - will try again"),
"start", waypointsBatch[i].StartBlock(),
"end", waypointsBatch[i].EndBlock(),
"rootHash", waypointsBatch[i].RootHash(),
Expand All @@ -197,6 +217,11 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
break
}

if blockBatch[0].Number().Uint64() == 0 {
// we do not want to insert block 0 (genesis)
blockBatch = blockBatch[1:]
}

blocks = append(blocks, blockBatch...)
}

Expand All @@ -206,26 +231,19 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
waypoints = waypoints[len(waypointsBatch):]
}

if err := d.storage.InsertBlocks(ctx, blocks); err != nil {
return nil, err
if len(blocks) == 0 {
continue
}

flushStartTime := time.Now()
if err := d.storage.Flush(ctx); err != nil {
if err := d.storage.InsertBlocks(ctx, blocks); err != nil {
return nil, err
}

d.logger.Debug(
fmt.Sprintf("[%s] stored blocks", blockDownloaderLogPrefix),
"len", len(blocks),
"duration", time.Since(flushStartTime),
)

if (endBlockNum == lastBlockNum) && (len(blocks) > 0) {
lastBlock = blocks[len(blocks)-1]
}
lastBlock = blocks[len(blocks)-1]
}

d.logger.Debug(syncLogPrefix("finished downloading blocks using waypoints"))

return lastBlock.Header(), nil
}

Expand All @@ -244,7 +262,7 @@ func (d *blockDownloader) fetchVerifiedBlocks(

// 2. Verify headers match waypoint root hash
if err = d.headersVerifier(waypoint, headers); err != nil {
d.logger.Debug(fmt.Sprintf("[%s] penalizing peer", blockDownloaderLogPrefix), "peerId", peerId, "err", err)
d.logger.Debug(syncLogPrefix("penalizing peer - invalid headers"), "peerId", peerId, "err", err)

if penalizeErr := d.p2pService.Penalize(ctx, peerId); penalizeErr != nil {
err = fmt.Errorf("%w: %w", penalizeErr, err)
Expand All @@ -257,7 +275,7 @@ func (d *blockDownloader) fetchVerifiedBlocks(
bodies, err := d.p2pService.FetchBodies(ctx, headers, peerId)
if err != nil {
if errors.Is(err, &p2p.ErrMissingBodies{}) {
d.logger.Debug(fmt.Sprintf("[%s] penalizing peer", blockDownloaderLogPrefix), "peerId", peerId, "err", err)
d.logger.Debug(syncLogPrefix("penalizing peer - missing bodies"), "peerId", peerId, "err", err)

if penalizeErr := d.p2pService.Penalize(ctx, peerId); penalizeErr != nil {
err = fmt.Errorf("%w: %w", penalizeErr, err)
Expand All @@ -275,7 +293,7 @@ func (d *blockDownloader) fetchVerifiedBlocks(

// 5. Verify blocks
if err = d.blocksVerifier(blocks); err != nil {
d.logger.Debug(fmt.Sprintf("[%s] penalizing peer", blockDownloaderLogPrefix), "peerId", peerId, "err", err)
d.logger.Debug(syncLogPrefix("penalizing peer - invalid blocks"), "peerId", peerId, "err", err)

if penalizeErr := d.p2pService.Penalize(ctx, peerId); penalizeErr != nil {
err = fmt.Errorf("%w: %w", penalizeErr, err)
Expand Down
28 changes: 0 additions & 28 deletions polygon/sync/block_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,6 @@ func TestBlockDownloaderDownloadBlocksUsingMilestones(t *testing.T) {
InsertBlocks(gomock.Any(), gomock.Any()).
DoAndReturn(test.defaultInsertBlocksMock(&blocks)).
Times(1)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(1)

tip, err := test.blockDownloader.DownloadBlocksUsingMilestones(context.Background(), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -250,10 +246,6 @@ func TestBlockDownloaderDownloadBlocksUsingCheckpoints(t *testing.T) {
InsertBlocks(gomock.Any(), gomock.Any()).
DoAndReturn(test.defaultInsertBlocksMock(&blocks)).
Times(4)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(4)

tip, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -328,10 +320,6 @@ func TestBlockDownloaderDownloadBlocksWhenInvalidHeadersThenPenalizePeerAndReDow
DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)).
Times(3),
)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(4)

_, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1)
require.NoError(t, err)
Expand All @@ -358,10 +346,6 @@ func TestBlockDownloaderDownloadBlocksWhenZeroPeersTriesAgain(t *testing.T) {
InsertBlocks(gomock.Any(), gomock.Any()).
DoAndReturn(test.defaultInsertBlocksMock(&blocks)).
Times(4)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(4)
gomock.InOrder(
// first time, no peers at all
test.p2pService.EXPECT().
Expand Down Expand Up @@ -439,10 +423,6 @@ func TestBlockDownloaderDownloadBlocksWhenInvalidBodiesThenPenalizePeerAndReDown
DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)).
Times(3),
)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(4)

_, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -504,10 +484,6 @@ func TestBlockDownloaderDownloadBlocksWhenMissingBodiesThenPenalizePeerAndReDown
DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)).
Times(3),
)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(4)

_, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -546,10 +522,6 @@ func TestBlockDownloaderDownloadBlocksRespectsMaxWorkers(t *testing.T) {
DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)).
Times(1),
)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(2)

// max 1 worker
// 100 peers
Expand Down
13 changes: 7 additions & 6 deletions polygon/sync/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package sync
import (
"context"

"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
executionclient "github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/core/types"
)

Expand All @@ -14,20 +14,21 @@ type ExecutionClient interface {
}

type executionClient struct {
engine execution_client.ExecutionEngine
engine executionclient.ExecutionEngine
}

func NewExecutionClient(engine execution_client.ExecutionEngine) ExecutionClient {
func NewExecutionClient(engine executionclient.ExecutionEngine) ExecutionClient {
return &executionClient{engine}
}

func (e *executionClient) InsertBlocks(ctx context.Context, blocks []*types.Block) error {
return e.engine.InsertBlocks(ctx, blocks, true)
}

func (e *executionClient) UpdateForkChoice(ctx context.Context, tip *types.Header, finalizedHeader *types.Header) error {
_, err := e.engine.ForkChoiceUpdate(ctx, finalizedHeader.Hash(), tip.Hash(), nil)
return err
func (e *executionClient) UpdateForkChoice(_ context.Context, _ *types.Header, _ *types.Header) error {
// TODO - not ready for execution - missing state sync event and span data - uncomment once ready
//return e.engine.ForkChoiceUpdate(ctx, finalizedHeader.Hash(), tip.Hash())
return nil
}

func (e *executionClient) CurrentHeader(ctx context.Context) (*types.Header, error) {
Expand Down
7 changes: 7 additions & 0 deletions polygon/sync/log_prefix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package sync

import "fmt"

func syncLogPrefix(message string) string {
return fmt.Sprintf("[sync] %s", message)
}
4 changes: 2 additions & 2 deletions polygon/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewService(
) Service {
borConfig := chainConfig.Bor.(*borcfg.BorConfig)
execution := NewExecutionClient(executionEngine)
storage := NewStorage(execution, maxPeers)
storage := NewStorage(logger, execution, maxPeers)
headersVerifier := VerifyAccumulatedHeaders
blocksVerifier := VerifyBlocks
p2pService := p2p.NewService(maxPeers, logger, sentryClient, statusDataProvider.GetStatusData)
Expand Down Expand Up @@ -76,7 +76,7 @@ func NewService(
headerValidator,
spansCache)
}
events := NewTipEvents(p2pService, heimdallService)
events := NewTipEvents(logger, p2pService, heimdallService)
sync := NewSync(
storage,
execution,
Expand Down
8 changes: 7 additions & 1 deletion polygon/sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"

"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon/core/types"
)

Expand All @@ -18,13 +20,15 @@ type Storage interface {
}

type executionClientStorage struct {
logger log.Logger
execution ExecutionClient
queue chan []*types.Block
waitGroup sync.WaitGroup
}

func NewStorage(execution ExecutionClient, queueCapacity int) Storage {
func NewStorage(logger log.Logger, execution ExecutionClient, queueCapacity int) Storage {
return &executionClientStorage{
logger: logger,
execution: execution,
queue: make(chan []*types.Block, queueCapacity),
}
Expand Down Expand Up @@ -54,6 +58,8 @@ func (s *executionClientStorage) Flush(ctx context.Context) error {
}

func (s *executionClientStorage) Run(ctx context.Context) error {
s.logger.Debug(syncLogPrefix("running execution client storage component"))

for {
select {
case blocks := <-s.queue:
Expand Down
Loading

0 comments on commit c672452

Please sign in to comment.