Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/sync: fix issues found during testing #9873

Merged
merged 2 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 51 additions & 33 deletions polygon/sync/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math"
"reflect"
"sync"
"time"
Expand All @@ -22,7 +21,6 @@ import (
)

const (
blockDownloaderLogPrefix = "BlockDownloader"
notEnoughPeersBackOffDuration = time.Minute

// conservative over-estimation: 1 MB block size x 1024 blocks per waypoint
Expand Down Expand Up @@ -110,20 +108,37 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
return nil, nil
}

d.logger.Debug(
syncLogPrefix("downloading blocks using waypoints"),
"waypointsLen", len(waypoints),
"start", waypoints[0].StartBlock().Uint64(),
"end", waypoints[len(waypoints)-1].EndBlock().Uint64(),
"kind", reflect.TypeOf(waypoints[0]),
)

// waypoint rootHash->[blocks part of waypoint]
waypointBlocksMemo, err := lru.New[common.Hash, []*types.Block](d.p2pService.MaxPeers())
if err != nil {
return nil, err
}

progressLogTicker := time.NewTicker(30 * time.Second)
defer progressLogTicker.Stop()

var lastBlock *types.Block
lastBlockNum := waypoints[len(waypoints)-1].EndBlock().Uint64()
for len(waypoints) > 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// carry-on
}

endBlockNum := waypoints[len(waypoints)-1].EndBlock().Uint64()
peers := d.p2pService.ListPeersMayHaveBlockNum(endBlockNum)
if len(peers) == 0 {
d.logger.Warn(
fmt.Sprintf("[%s] can't use any peers to sync, will try again", blockDownloaderLogPrefix),
syncLogPrefix("can't use any peers to download blocks, will try again in a bit"),
"start", waypoints[0].StartBlock(),
"end", endBlockNum,
"sleepSeconds", d.notEnoughPeersBackOffDuration.Seconds(),
Expand All @@ -136,21 +151,26 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
numWorkers := cmp.Min(cmp.Min(d.maxWorkers, len(peers)), len(waypoints))
waypointsBatch := waypoints[:numWorkers]

d.logger.Info(
fmt.Sprintf("[%s] downloading blocks", blockDownloaderLogPrefix),
"waypointsBatchLength", len(waypointsBatch),
"startBlockNum", waypointsBatch[0].StartBlock(),
"endBlockNum", waypointsBatch[len(waypointsBatch)-1].EndBlock(),
"kind", reflect.TypeOf(waypointsBatch[0]),
"peerCount", len(peers),
"maxWorkers", d.maxWorkers,
)
select {
case <-progressLogTicker.C:
d.logger.Info(
syncLogPrefix("downloading blocks progress"),
"waypointsBatchLength", len(waypointsBatch),
"startBlockNum", waypointsBatch[0].StartBlock(),
"endBlockNum", waypointsBatch[len(waypointsBatch)-1].EndBlock(),
"kind", reflect.TypeOf(waypointsBatch[0]),
"peerCount", len(peers),
"maxWorkers", d.maxWorkers,
)
default:
// carry on
}

blockBatches := make([][]*types.Block, len(waypointsBatch))
maxWaypointLength := float64(0)
maxWaypointLength := uint64(0)
wg := sync.WaitGroup{}
for i, waypoint := range waypointsBatch {
maxWaypointLength = math.Max(float64(waypoint.Length()), maxWaypointLength)
maxWaypointLength = cmp.Max(waypoint.Length(), maxWaypointLength)
wg.Add(1)
go func(i int, waypoint heimdall.Waypoint, peerId *p2p.PeerId) {
defer wg.Done()
Expand All @@ -163,7 +183,7 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
blocks, err := d.fetchVerifiedBlocks(ctx, waypoint, peerId)
if err != nil {
d.logger.Debug(
fmt.Sprintf("[%s] issue downloading waypoint blocks - will try again", blockDownloaderLogPrefix),
syncLogPrefix("issue downloading waypoint blocks - will try again"),
"err", err,
"start", waypoint.StartBlock(),
"end", waypoint.EndBlock(),
Expand All @@ -186,7 +206,7 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
for i, blockBatch := range blockBatches {
if len(blockBatch) == 0 {
d.logger.Debug(
fmt.Sprintf("[%s] no blocks - will try again", blockDownloaderLogPrefix),
syncLogPrefix("no blocks - will try again"),
"start", waypointsBatch[i].StartBlock(),
"end", waypointsBatch[i].EndBlock(),
"rootHash", waypointsBatch[i].RootHash(),
Expand All @@ -197,6 +217,11 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
break
}

if blockBatch[0].Number().Uint64() == 0 {
// we do not want to insert block 0 (genesis)
blockBatch = blockBatch[1:]
}

blocks = append(blocks, blockBatch...)
}

Expand All @@ -206,26 +231,19 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp
waypoints = waypoints[len(waypointsBatch):]
}

if err := d.storage.InsertBlocks(ctx, blocks); err != nil {
return nil, err
if len(blocks) == 0 {
continue
}

flushStartTime := time.Now()
if err := d.storage.Flush(ctx); err != nil {
if err := d.storage.InsertBlocks(ctx, blocks); err != nil {
return nil, err
}

d.logger.Debug(
fmt.Sprintf("[%s] stored blocks", blockDownloaderLogPrefix),
"len", len(blocks),
"duration", time.Since(flushStartTime),
)

if (endBlockNum == lastBlockNum) && (len(blocks) > 0) {
lastBlock = blocks[len(blocks)-1]
}
lastBlock = blocks[len(blocks)-1]
}

d.logger.Debug(syncLogPrefix("finished downloading blocks using waypoints"))

return lastBlock.Header(), nil
}

Expand All @@ -244,7 +262,7 @@ func (d *blockDownloader) fetchVerifiedBlocks(

// 2. Verify headers match waypoint root hash
if err = d.headersVerifier(waypoint, headers); err != nil {
d.logger.Debug(fmt.Sprintf("[%s] penalizing peer", blockDownloaderLogPrefix), "peerId", peerId, "err", err)
d.logger.Debug(syncLogPrefix("penalizing peer - invalid headers"), "peerId", peerId, "err", err)

if penalizeErr := d.p2pService.Penalize(ctx, peerId); penalizeErr != nil {
err = fmt.Errorf("%w: %w", penalizeErr, err)
Expand All @@ -257,7 +275,7 @@ func (d *blockDownloader) fetchVerifiedBlocks(
bodies, err := d.p2pService.FetchBodies(ctx, headers, peerId)
if err != nil {
if errors.Is(err, &p2p.ErrMissingBodies{}) {
d.logger.Debug(fmt.Sprintf("[%s] penalizing peer", blockDownloaderLogPrefix), "peerId", peerId, "err", err)
d.logger.Debug(syncLogPrefix("penalizing peer - missing bodies"), "peerId", peerId, "err", err)

if penalizeErr := d.p2pService.Penalize(ctx, peerId); penalizeErr != nil {
err = fmt.Errorf("%w: %w", penalizeErr, err)
Expand All @@ -275,7 +293,7 @@ func (d *blockDownloader) fetchVerifiedBlocks(

// 5. Verify blocks
if err = d.blocksVerifier(blocks); err != nil {
d.logger.Debug(fmt.Sprintf("[%s] penalizing peer", blockDownloaderLogPrefix), "peerId", peerId, "err", err)
d.logger.Debug(syncLogPrefix("penalizing peer - invalid blocks"), "peerId", peerId, "err", err)

if penalizeErr := d.p2pService.Penalize(ctx, peerId); penalizeErr != nil {
err = fmt.Errorf("%w: %w", penalizeErr, err)
Expand Down
28 changes: 0 additions & 28 deletions polygon/sync/block_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,6 @@ func TestBlockDownloaderDownloadBlocksUsingMilestones(t *testing.T) {
InsertBlocks(gomock.Any(), gomock.Any()).
DoAndReturn(test.defaultInsertBlocksMock(&blocks)).
Times(1)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(1)

tip, err := test.blockDownloader.DownloadBlocksUsingMilestones(context.Background(), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -250,10 +246,6 @@ func TestBlockDownloaderDownloadBlocksUsingCheckpoints(t *testing.T) {
InsertBlocks(gomock.Any(), gomock.Any()).
DoAndReturn(test.defaultInsertBlocksMock(&blocks)).
Times(4)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(4)

tip, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -328,10 +320,6 @@ func TestBlockDownloaderDownloadBlocksWhenInvalidHeadersThenPenalizePeerAndReDow
DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)).
Times(3),
)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(4)

_, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1)
require.NoError(t, err)
Expand All @@ -358,10 +346,6 @@ func TestBlockDownloaderDownloadBlocksWhenZeroPeersTriesAgain(t *testing.T) {
InsertBlocks(gomock.Any(), gomock.Any()).
DoAndReturn(test.defaultInsertBlocksMock(&blocks)).
Times(4)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(4)
gomock.InOrder(
// first time, no peers at all
test.p2pService.EXPECT().
Expand Down Expand Up @@ -439,10 +423,6 @@ func TestBlockDownloaderDownloadBlocksWhenInvalidBodiesThenPenalizePeerAndReDown
DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)).
Times(3),
)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(4)

_, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -504,10 +484,6 @@ func TestBlockDownloaderDownloadBlocksWhenMissingBodiesThenPenalizePeerAndReDown
DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)).
Times(3),
)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(4)

_, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -546,10 +522,6 @@ func TestBlockDownloaderDownloadBlocksRespectsMaxWorkers(t *testing.T) {
DoAndReturn(test.defaultInsertBlocksMock(&blocksBatch2)).
Times(1),
)
test.storage.EXPECT().
Flush(gomock.Any()).
Return(nil).
Times(2)

// max 1 worker
// 100 peers
Expand Down
13 changes: 7 additions & 6 deletions polygon/sync/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package sync
import (
"context"

"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
executionclient "github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/core/types"
)

Expand All @@ -14,20 +14,21 @@ type ExecutionClient interface {
}

type executionClient struct {
engine execution_client.ExecutionEngine
engine executionclient.ExecutionEngine
}

func NewExecutionClient(engine execution_client.ExecutionEngine) ExecutionClient {
func NewExecutionClient(engine executionclient.ExecutionEngine) ExecutionClient {
return &executionClient{engine}
}

func (e *executionClient) InsertBlocks(ctx context.Context, blocks []*types.Block) error {
return e.engine.InsertBlocks(ctx, blocks, true)
}

func (e *executionClient) UpdateForkChoice(ctx context.Context, tip *types.Header, finalizedHeader *types.Header) error {
_, err := e.engine.ForkChoiceUpdate(ctx, finalizedHeader.Hash(), tip.Hash(), nil)
return err
func (e *executionClient) UpdateForkChoice(_ context.Context, _ *types.Header, _ *types.Header) error {
// TODO - not ready for execution - missing state sync event and span data - uncomment once ready
//return e.engine.ForkChoiceUpdate(ctx, finalizedHeader.Hash(), tip.Hash())
return nil
}

func (e *executionClient) CurrentHeader(ctx context.Context) (*types.Header, error) {
Expand Down
7 changes: 7 additions & 0 deletions polygon/sync/log_prefix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package sync

import "fmt"

func syncLogPrefix(message string) string {
return fmt.Sprintf("[sync] %s", message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be solved by a new logger?

}
4 changes: 2 additions & 2 deletions polygon/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewService(
) Service {
borConfig := chainConfig.Bor.(*borcfg.BorConfig)
execution := NewExecutionClient(executionEngine)
storage := NewStorage(execution, maxPeers)
storage := NewStorage(logger, execution, maxPeers)
headersVerifier := VerifyAccumulatedHeaders
blocksVerifier := VerifyBlocks
p2pService := p2p.NewService(maxPeers, logger, sentryClient, statusDataProvider.GetStatusData)
Expand Down Expand Up @@ -76,7 +76,7 @@ func NewService(
headerValidator,
spansCache)
}
events := NewTipEvents(p2pService, heimdallService)
events := NewTipEvents(logger, p2pService, heimdallService)
sync := NewSync(
storage,
execution,
Expand Down
8 changes: 7 additions & 1 deletion polygon/sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"

"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon/core/types"
)

Expand All @@ -18,13 +20,15 @@ type Storage interface {
}

type executionClientStorage struct {
logger log.Logger
execution ExecutionClient
queue chan []*types.Block
waitGroup sync.WaitGroup
}

func NewStorage(execution ExecutionClient, queueCapacity int) Storage {
func NewStorage(logger log.Logger, execution ExecutionClient, queueCapacity int) Storage {
return &executionClientStorage{
logger: logger,
execution: execution,
queue: make(chan []*types.Block, queueCapacity),
}
Expand Down Expand Up @@ -54,6 +58,8 @@ func (s *executionClientStorage) Flush(ctx context.Context) error {
}

func (s *executionClientStorage) Run(ctx context.Context) error {
s.logger.Debug(syncLogPrefix("running execution client storage component"))

for {
select {
case blocks := <-s.queue:
Expand Down
Loading
Loading