From 1fae68363390cebd065b91dc855d46a21d7ba2c6 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Sat, 6 Apr 2024 13:41:59 +0200 Subject: [PATCH 1/2] polygon/p2p: fix issues found during testing --- polygon/p2p/fetcher_base.go | 39 +++++++------- polygon/p2p/fetcher_base_test.go | 71 ++++++++++---------------- polygon/p2p/fetcher_errors.go | 2 - polygon/p2p/fetcher_penalizing.go | 8 ++- polygon/p2p/fetcher_penalizing_test.go | 32 ------------ polygon/p2p/message_listener.go | 8 ++- polygon/p2p/peer_penalizer.go | 29 ++++++++--- polygon/p2p/service.go | 4 +- polygon/sync/sync.go | 4 +- 9 files changed, 87 insertions(+), 110 deletions(-) diff --git a/polygon/p2p/fetcher_base.go b/polygon/p2p/fetcher_base.go index 2bb76448e29..629b2591bae 100644 --- a/polygon/p2p/fetcher_base.go +++ b/polygon/p2p/fetcher_base.go @@ -4,10 +4,10 @@ import ( "context" "errors" "fmt" + "reflect" "time" "github.com/cenkalti/backoff/v4" - "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/cmp" @@ -35,24 +35,21 @@ type Fetcher interface { func NewFetcher( config FetcherConfig, - logger log.Logger, messageListener MessageListener, messageSender MessageSender, requestIdGenerator RequestIdGenerator, ) Fetcher { - return newFetcher(config, logger, messageListener, messageSender, requestIdGenerator) + return newFetcher(config, messageListener, messageSender, requestIdGenerator) } func newFetcher( config FetcherConfig, - logger log.Logger, messageListener MessageListener, messageSender MessageSender, requestIdGenerator RequestIdGenerator, ) *fetcher { return &fetcher{ config: config, - logger: logger, messageListener: messageListener, messageSender: messageSender, requestIdGenerator: requestIdGenerator, @@ -61,7 +58,6 @@ func newFetcher( type fetcher struct { config FetcherConfig - logger log.Logger messageListener MessageListener messageSender MessageSender requestIdGenerator RequestIdGenerator @@ -91,14 +87,23 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe for chunkNum := uint64(0); chunkNum < numChunks; chunkNum++ { chunkStart := start + chunkNum*eth.MaxHeadersServe chunkEnd := cmp.Min(end, chunkStart+eth.MaxHeadersServe) - headersChunk, err := fetchWithRetry(f.config, func() ([]*types.Header, error) { - return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId) - }) - if err != nil { - return nil, err - } + for chunkStart < chunkEnd { + // a node may not respond with all MaxHeadersServe in 1 response, + // so we keep on consuming from last received number (akin to consuming a paging api) + // until we have all headers of the chunk or the peer stopped returning headers + headersChunk, err := fetchWithRetry(f.config, func() ([]*types.Header, error) { + return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId) + }) + if err != nil { + return nil, err + } + if len(headersChunk) == 0 { + break + } - headers = append(headers, headersChunk...) + headers = append(headers, headersChunk...) + chunkStart += uint64(len(headersChunk)) + } } if err := f.validateHeadersResponse(headers, start, amount); err != nil { @@ -281,12 +286,6 @@ func (f *fetcher) validateBodies(bodies []*types.Body, headers []*types.Header) } } - for _, body := range bodies { - if len(body.Transactions) == 0 && len(body.Withdrawals) == 0 && len(body.Uncles) == 0 { - return ErrEmptyBody - } - } - return nil } @@ -327,7 +326,7 @@ func awaitResponse[TPacket any]( select { case <-ctx.Done(): var nilPacket TPacket - return nilPacket, fmt.Errorf("await response interrupted: %w", ctx.Err()) + return nilPacket, fmt.Errorf("await %v response interrupted: %w", reflect.TypeOf(nilPacket), ctx.Err()) case message := <-messages: if filter(message) { continue diff --git a/polygon/p2p/fetcher_base_test.go b/polygon/p2p/fetcher_base_test.go index 4a5408040df..5a0469c289a 100644 --- a/polygon/p2p/fetcher_base_test.go +++ b/polygon/p2p/fetcher_base_test.go @@ -72,7 +72,7 @@ func TestFetcherFetchHeadersWithChunking(t *testing.T) { Id: sentry.MessageId_BLOCK_HEADERS_66, PeerId: peerId.H512(), // 1024 headers in first response - Data: blockHeadersPacket66Bytes(t, requestId1, mockHeaders[:1025]), + Data: blockHeadersPacket66Bytes(t, requestId1, mockHeaders[:1024]), }, } mockRequestResponse1 := requestResponseMock{ @@ -88,7 +88,7 @@ func TestFetcherFetchHeadersWithChunking(t *testing.T) { Id: sentry.MessageId_BLOCK_HEADERS_66, PeerId: peerId.H512(), // remaining 975 headers in second response - Data: blockHeadersPacket66Bytes(t, requestId2, mockHeaders[1025:]), + Data: blockHeadersPacket66Bytes(t, requestId2, mockHeaders[1024:]), }, } mockRequestResponse2 := requestResponseMock{ @@ -171,7 +171,7 @@ func TestFetcherFetchHeadersResponseTimeoutRetrySuccess(t *testing.T) { Id: sentry.MessageId_BLOCK_HEADERS_66, PeerId: peerId.H512(), // 1024 headers in first response - Data: blockHeadersPacket66Bytes(t, requestId1, mockHeaders[:1025]), + Data: blockHeadersPacket66Bytes(t, requestId1, mockHeaders[:1024]), }, } mockRequestResponse1 := requestResponseMock{ @@ -205,7 +205,7 @@ func TestFetcherFetchHeadersResponseTimeoutRetrySuccess(t *testing.T) { Id: sentry.MessageId_BLOCK_HEADERS_66, PeerId: peerId.H512(), // remaining 975 headers in third response - Data: blockHeadersPacket66Bytes(t, requestId3, mockHeaders[1025:]), + Data: blockHeadersPacket66Bytes(t, requestId3, mockHeaders[1024:]), }, } mockRequestResponse3 := requestResponseMock{ @@ -246,24 +246,39 @@ func TestFetcherFetchHeadersErrIncompleteResponse(t *testing.T) { t.Parallel() peerId := PeerIdFromUint64(1) - requestId := uint64(1234) - mockInboundMessages := []*sentry.InboundMessage{ + requestId1 := uint64(1234) + requestId2 := uint64(1235) + mockInboundMessages1 := []*sentry.InboundMessage{ { Id: sentry.MessageId_BLOCK_HEADERS_66, PeerId: peerId.H512(), - Data: newMockBlockHeadersPacket66Bytes(t, requestId, 2), + Data: newMockBlockHeadersPacket66Bytes(t, requestId1, 2), }, } - mockRequestResponse := requestResponseMock{ - requestId: requestId, - mockResponseInboundMessages: mockInboundMessages, + mockInboundMessages2 := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: newMockBlockHeadersPacket66Bytes(t, requestId2, 0), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, wantRequestPeerId: peerId, wantRequestOriginNumber: 1, wantRequestAmount: 3, } + mockRequestResponse2 := requestResponseMock{ + requestId: requestId2, + mockResponseInboundMessages: mockInboundMessages2, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 3, + wantRequestAmount: 1, + } - test := newFetcherTest(t, newMockRequestGenerator(requestId)) - test.mockSentryStreams(mockRequestResponse) + test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2)) + test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2) test.run(func(ctx context.Context, t *testing.T) { var errIncompleteHeaders *ErrIncompleteHeaders headers, err := test.fetcher.FetchHeaders(ctx, 1, 4, peerId) @@ -452,36 +467,6 @@ func TestFetcherFetchBodiesResponseTimeoutRetrySuccess(t *testing.T) { }) } -func TestFetcherFetchBodiesErrEmptyBody(t *testing.T) { - t.Parallel() - - peerId := PeerIdFromUint64(1) - requestId := uint64(1234) - mockHeaders := []*types.Header{{Number: big.NewInt(1)}} - mockHashes := []common.Hash{mockHeaders[0].Hash()} - mockInboundMessages := []*sentry.InboundMessage{ - { - Id: sentry.MessageId_BLOCK_BODIES_66, - PeerId: peerId.H512(), - Data: newMockBlockBodiesPacketBytes(t, requestId, &types.Body{}), - }, - } - mockRequestResponse := requestResponseMock{ - requestId: requestId, - mockResponseInboundMessages: mockInboundMessages, - wantRequestPeerId: peerId, - wantRequestHashes: mockHashes, - } - - test := newFetcherTest(t, newMockRequestGenerator(requestId)) - test.mockSentryStreams(mockRequestResponse) - test.run(func(ctx context.Context, t *testing.T) { - bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId) - require.ErrorIs(t, err, ErrEmptyBody) - require.Nil(t, bodies) - }) -} - func TestFetcherFetchBodiesErrMissingBodies(t *testing.T) { t.Parallel() @@ -525,7 +510,7 @@ func newFetcherTest(t *testing.T, requestIdGenerator RequestIdGenerator) *fetche messageListenerTest := newMessageListenerTest(t) messageListener := messageListenerTest.messageListener messageSender := NewMessageSender(messageListenerTest.sentryClient) - fetcher := newFetcher(fetcherConfig, messageListenerTest.logger, messageListener, messageSender, requestIdGenerator) + fetcher := newFetcher(fetcherConfig, messageListener, messageSender, requestIdGenerator) return &fetcherTest{ messageListenerTest: messageListenerTest, fetcher: fetcher, diff --git a/polygon/p2p/fetcher_errors.go b/polygon/p2p/fetcher_errors.go index e2a8074298c..8dd66126857 100644 --- a/polygon/p2p/fetcher_errors.go +++ b/polygon/p2p/fetcher_errors.go @@ -8,8 +8,6 @@ import ( "github.com/ledgerwatch/erigon/core/types" ) -var ErrEmptyBody = errors.New("empty body") - type ErrInvalidFetchHeadersRange struct { start uint64 end uint64 diff --git a/polygon/p2p/fetcher_penalizing.go b/polygon/p2p/fetcher_penalizing.go index 7c43a4d610a..32d3ce62fc8 100644 --- a/polygon/p2p/fetcher_penalizing.go +++ b/polygon/p2p/fetcher_penalizing.go @@ -40,7 +40,7 @@ func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) { bodies, err := pf.Fetcher.FetchBodies(ctx, headers, peerId) if err != nil { - return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{}, ErrEmptyBody) + return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{}) } return bodies, nil @@ -56,7 +56,11 @@ func (pf *penalizingFetcher) maybePenalize(ctx context.Context, peerId *PeerId, } if shouldPenalize { - pf.logger.Debug("penalizing peer - penalize-able fetcher issue", "peerId", peerId, "err", err) + pf.logger.Debug( + "[p2p.penalizing.fetcher] penalizing peer - penalize-able fetcher issue", + "peerId", peerId, + "err", err, + ) if penalizeErr := pf.peerPenalizer.Penalize(ctx, peerId); penalizeErr != nil { err = fmt.Errorf("%w: %w", penalizeErr, err) diff --git a/polygon/p2p/fetcher_penalizing_test.go b/polygon/p2p/fetcher_penalizing_test.go index 00cacf94f8f..72b7ae156c2 100644 --- a/polygon/p2p/fetcher_penalizing_test.go +++ b/polygon/p2p/fetcher_penalizing_test.go @@ -123,38 +123,6 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenIncorrectOrigin(t *t }) } -func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrEmptyBody(t *testing.T) { - t.Parallel() - - peerId := PeerIdFromUint64(1) - requestId := uint64(1234) - headers := []*types.Header{{Number: big.NewInt(1)}} - hashes := []common.Hash{headers[0].Hash()} - mockInboundMessages := []*sentry.InboundMessage{ - { - Id: sentry.MessageId_BLOCK_BODIES_66, - PeerId: peerId.H512(), - Data: newMockBlockBodiesPacketBytes(t, requestId, &types.Body{}), - }, - } - mockRequestResponse := requestResponseMock{ - requestId: requestId, - mockResponseInboundMessages: mockInboundMessages, - wantRequestPeerId: peerId, - wantRequestHashes: hashes, - } - - test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId)) - test.mockSentryStreams(mockRequestResponse) - // setup expectation that peer should be penalized - mockExpectPenalizePeer(t, test.sentryClient, peerId) - test.run(func(ctx context.Context, t *testing.T) { - bodies, err := test.penalizingFetcher.FetchBodies(ctx, headers, peerId) - require.ErrorIs(t, err, ErrEmptyBody) - require.Nil(t, bodies) - }) -} - func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrTooManyBodies(t *testing.T) { t.Parallel() diff --git a/polygon/p2p/message_listener.go b/polygon/p2p/message_listener.go index e99cbb969cd..2bd4d44d493 100644 --- a/polygon/p2p/message_listener.go +++ b/polygon/p2p/message_listener.go @@ -79,6 +79,8 @@ type messageListener struct { } func (ml *messageListener) Run(ctx context.Context) { + ml.logger.Info(messageListenerLogPrefix("running p2p message listener component")) + backgroundLoops := []func(ctx context.Context){ ml.listenInboundMessages, ml.listenPeerEvents, @@ -238,7 +240,7 @@ func notifyInboundMessageObservers[TPacket any]( var decodedData TPacket if err := rlp.DecodeBytes(message.Data, &decodedData); err != nil { if rlp.IsInvalidRLPError(err) { - logger.Debug("penalizing peer - invalid rlp", "peerId", peerId, "err", err) + logger.Debug(messageListenerLogPrefix("penalizing peer - invalid rlp"), "peerId", peerId, "err", err) if penalizeErr := peerPenalizer.Penalize(ctx, peerId); penalizeErr != nil { err = fmt.Errorf("%w: %w", penalizeErr, err) @@ -262,3 +264,7 @@ func notifyObservers[TMessage any](observers map[uint64]MessageObserver[TMessage go observer(message) } } + +func messageListenerLogPrefix(message string) string { + return fmt.Sprintf("[p2p.message.listener] %s", message) +} diff --git a/polygon/p2p/peer_penalizer.go b/polygon/p2p/peer_penalizer.go index 234c6be1ea2..72ed9598689 100644 --- a/polygon/p2p/peer_penalizer.go +++ b/polygon/p2p/peer_penalizer.go @@ -7,22 +7,39 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" ) +type PeerPenalizer interface { + Penalize(ctx context.Context, peerId *PeerId) error +} + +func NewTrackingPeerPenalizer(peerPenalizer PeerPenalizer, peerTracker PeerTracker) PeerPenalizer { + return &trackingPeerPenalizer{ + PeerPenalizer: peerPenalizer, + peerTracker: peerTracker, + } +} + +type trackingPeerPenalizer struct { + PeerPenalizer + peerTracker PeerTracker +} + +func (p *trackingPeerPenalizer) Penalize(ctx context.Context, peerId *PeerId) error { + p.peerTracker.PeerDisconnected(peerId) + return p.PeerPenalizer.Penalize(ctx, peerId) +} + func NewPeerPenalizer(sentryClient direct.SentryClient) PeerPenalizer { return &peerPenalizer{ sentryClient: sentryClient, } } -type PeerPenalizer interface { - Penalize(ctx context.Context, peerId *PeerId) error -} - type peerPenalizer struct { sentryClient direct.SentryClient } -func (pp *peerPenalizer) Penalize(ctx context.Context, peerId *PeerId) error { - _, err := pp.sentryClient.PenalizePeer(ctx, &sentry.PenalizePeerRequest{ +func (p *peerPenalizer) Penalize(ctx context.Context, peerId *PeerId) error { + _, err := p.sentryClient.PenalizePeer(ctx, &sentry.PenalizePeerRequest{ PeerId: peerId.H512(), Penalty: sentry.PenaltyKind_Kick, }) diff --git a/polygon/p2p/service.go b/polygon/p2p/service.go index 1386bd0726e..b8f3108b0b6 100644 --- a/polygon/p2p/service.go +++ b/polygon/p2p/service.go @@ -43,11 +43,11 @@ func newService( requestIdGenerator RequestIdGenerator, ) *service { peerTracker := NewPeerTracker() - peerPenalizer := NewPeerPenalizer(sentryClient) + peerPenalizer := NewTrackingPeerPenalizer(NewPeerPenalizer(sentryClient), peerTracker) messageListener := NewMessageListener(logger, sentryClient, statusDataFactory, peerPenalizer) messageListener.RegisterPeerEventObserver(NewPeerEventObserver(peerTracker)) messageSender := NewMessageSender(sentryClient) - fetcher := NewFetcher(fetcherConfig, logger, messageListener, messageSender, requestIdGenerator) + fetcher := NewFetcher(fetcherConfig, messageListener, messageSender, requestIdGenerator) fetcher = NewPenalizingFetcher(logger, fetcher, peerPenalizer) fetcher = NewTrackingFetcher(fetcher, peerTracker) return &service{ diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index 65f2d7637a9..5766a6856ce 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -127,7 +127,7 @@ func (s *Sync) onNewBlockEvent( } else { newBlocks, err = s.p2pService.FetchBlocks(ctx, rootNum, newBlockHeaderNum+1, event.PeerId) if err != nil { - if (p2p.ErrIncompleteHeaders{}).Is(err) || (p2p.ErrMissingBodies{}).Is(err) || errors.Is(err, p2p.ErrEmptyBody) { + if (p2p.ErrIncompleteHeaders{}).Is(err) || (p2p.ErrMissingBodies{}).Is(err) { s.logger.Debug("sync.Sync.onNewBlockEvent: failed to fetch complete blocks, ignoring event", "err", err, "peerId", event.PeerId, "lastBlockNum", newBlockHeaderNum) return nil @@ -183,7 +183,7 @@ func (s *Sync) onNewBlockHashesEvent( newBlocks, err := s.p2pService.FetchBlocks(ctx, headerHashNum.Number, headerHashNum.Number+1, event.PeerId) if err != nil { - if (p2p.ErrIncompleteHeaders{}).Is(err) || (p2p.ErrMissingBodies{}).Is(err) || errors.Is(err, p2p.ErrEmptyBody) { + if (p2p.ErrIncompleteHeaders{}).Is(err) || (p2p.ErrMissingBodies{}).Is(err) { s.logger.Debug("sync.Sync.onNewBlockHashesEvent: failed to fetch complete blocks, ignoring event", "err", err, "peerId", event.PeerId, "lastBlockNum", headerHashNum.Number) continue From efacf73f67a69c890de95b5129db2238f3549816 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Sat, 6 Apr 2024 13:57:31 +0200 Subject: [PATCH 2/2] polygon/p2p: fix issues found during testing --- polygon/sync/sync.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index 5766a6856ce..02c5cb215ac 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -127,11 +127,12 @@ func (s *Sync) onNewBlockEvent( } else { newBlocks, err = s.p2pService.FetchBlocks(ctx, rootNum, newBlockHeaderNum+1, event.PeerId) if err != nil { - if (p2p.ErrIncompleteHeaders{}).Is(err) || (p2p.ErrMissingBodies{}).Is(err) { + if errors.Is(err, &p2p.ErrIncompleteHeaders{}) || errors.Is(err, &p2p.ErrMissingBodies{}) { s.logger.Debug("sync.Sync.onNewBlockEvent: failed to fetch complete blocks, ignoring event", "err", err, "peerId", event.PeerId, "lastBlockNum", newBlockHeaderNum) return nil } + return err } } @@ -183,11 +184,12 @@ func (s *Sync) onNewBlockHashesEvent( newBlocks, err := s.p2pService.FetchBlocks(ctx, headerHashNum.Number, headerHashNum.Number+1, event.PeerId) if err != nil { - if (p2p.ErrIncompleteHeaders{}).Is(err) || (p2p.ErrMissingBodies{}).Is(err) { + if errors.Is(err, &p2p.ErrIncompleteHeaders{}) || errors.Is(err, &p2p.ErrMissingBodies{}) { s.logger.Debug("sync.Sync.onNewBlockHashesEvent: failed to fetch complete blocks, ignoring event", "err", err, "peerId", event.PeerId, "lastBlockNum", headerHashNum.Number) continue } + return err }