Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/fetcher: fix blob transaction propagation #30125

Merged
merged 5 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,16 @@ func (s *Suite) TestBlobViolations(t *utesting.T) {
if code, _, err := conn.Read(); err != nil {
t.Fatalf("expected disconnect on blob violation, got err: %v", err)
} else if code != discMsg {
t.Fatalf("expected disconnect on blob violation, got msg code: %d", code)
if code == protoOffset(ethProto)+eth.NewPooledTransactionHashesMsg {
// sometimes we'll get a blob transaction hashes announcement before the disconnect
// because blob transactions are scheduled to be fetched right away.
if code, _, err = conn.Read(); err != nil {
t.Fatalf("expected disconnect on blob violation, got err on second read: %v", err)
}
}
if code != discMsg {
t.Fatalf("expected disconnect on blob violation, got msg code: %d", code)
}
}
conn.Close()
}
Expand Down
177 changes: 104 additions & 73 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package fetcher

import (
"bytes"
"errors"
"fmt"
"math"
Expand All @@ -35,7 +34,7 @@ import (
)

const (
// maxTxAnnounces is the maximum number of unique transaction a peer
// maxTxAnnounces is the maximum number of unique transactions a peer
// can announce in a short time.
maxTxAnnounces = 4096

Expand Down Expand Up @@ -114,16 +113,23 @@ var errTerminated = errors.New("terminated")
type txAnnounce struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes being announced
metas []*txMetadata // Batch of metadata associated with the hashes
metas []txMetadata // Batch of metadata associated with the hashes
}

// txMetadata is a set of extra data transmitted along the announcement for better
// fetch scheduling.
// txMetadata provides the extra data transmitted along with the announcement
// for better fetch scheduling.
type txMetadata struct {
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes
}

// txMetadataWithSeq is a wrapper of transaction metadata with an extra field
// tracking the transaction sequence number.
type txMetadataWithSeq struct {
txMetadata
seq uint64
}

// txRequest represents an in-flight transaction retrieval request destined to
// a specific peers.
type txRequest struct {
Expand Down Expand Up @@ -159,7 +165,7 @@ type txDrop struct {
// The invariants of the fetcher are:
// - Each tracked transaction (hash) must only be present in one of the
// three stages. This ensures that the fetcher operates akin to a finite
// state automata and there's do data leak.
// state automata and there's no data leak.
// - Each peer that announced transactions may be scheduled retrievals, but
// only ever one concurrently. This ensures we can immediately know what is
// missing from a reply and reschedule it.
Expand All @@ -169,18 +175,19 @@ type TxFetcher struct {
drop chan *txDrop
quit chan struct{}

txSeq uint64 // Unique transaction sequence number
underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch)

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection)
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
Expand Down Expand Up @@ -218,8 +225,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadata),
announces: make(map[string]map[common.Hash]*txMetadata),
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
Expand Down Expand Up @@ -247,7 +254,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
// loop, so anything caught here is time saved internally.
var (
unknownHashes = make([]common.Hash, 0, len(hashes))
unknownMetas = make([]*txMetadata, 0, len(hashes))
unknownMetas = make([]txMetadata, 0, len(hashes))

duplicate int64
underpriced int64
Expand All @@ -264,7 +271,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
// Transaction metadata has been available since eth68, and all
// legacy eth protocols (prior to eth68) have been deprecated.
// Therefore, metadata is always expected in the announcement.
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]})
unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]})
}
}
txAnnounceKnownMeter.Mark(duplicate)
Expand Down Expand Up @@ -431,9 +438,19 @@ func (f *TxFetcher) loop() {
ann.metas = ann.metas[:want-maxTxAnnounces]
}
// All is well, schedule the remainder of the transactions
idleWait := len(f.waittime) == 0
_, oldPeer := f.announces[ann.origin]

var (
idleWait = len(f.waittime) == 0
_, oldPeer = f.announces[ann.origin]
hasBlob bool

// nextSeq returns the next available sequence number for tagging
// transaction announcement and also bump it internally.
nextSeq = func() uint64 {
seq := f.txSeq
f.txSeq++
return seq
}
)
for i, hash := range ann.hashes {
// If the transaction is already downloading, add it to the list
// of possible alternates (in case the current retrieval fails) and
Expand All @@ -443,9 +460,17 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i]
announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
}
continue
}
Expand All @@ -456,9 +481,17 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i]
announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
}
continue
}
Expand All @@ -475,24 +508,47 @@ func (f *TxFetcher) loop() {
f.waitlist[hash][ann.origin] = struct{}{}

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = ann.metas[i]
waitslots[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
}
continue
}
// Transaction unknown to the fetcher, insert it into the waiting list
f.waitlist[hash] = map[string]struct{}{ann.origin: {}}
f.waittime[hash] = f.clock.Now()

// Assign the current timestamp as the wait time, but for blob transactions,
// skip the wait time since they are only announced.
if ann.metas[i].kind != types.BlobTxType {
f.waittime[hash] = f.clock.Now()
} else {
hasBlob = true
f.waittime[hash] = f.clock.Now() - mclock.AbsTime(txArriveTimeout)
}
if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = ann.metas[i]
waitslots[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
}
}
// If a new item was added to the waitlist, schedule it into the fetcher
if idleWait && len(f.waittime) > 0 {
if hasBlob || (idleWait && len(f.waittime) > 0) {
f.rescheduleWait(waitTimer, waitTrigger)
}
// If this peer is new and announced something already queued, maybe
Expand All @@ -516,7 +572,7 @@ func (f *TxFetcher) loop() {
if announces := f.announces[peer]; announces != nil {
announces[hash] = f.waitslots[peer][hash]
} else {
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
f.announces[peer] = map[common.Hash]*txMetadataWithSeq{hash: f.waitslots[peer][hash]}
}
delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 {
Expand Down Expand Up @@ -873,7 +929,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
hashes = make([]common.Hash, 0, maxTxRetrievals)
bytes uint64
)
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool {
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta txMetadata) bool {
// If the transaction is already fetching, skip to the next one
if _, ok := f.fetching[hash]; ok {
return true
Expand Down Expand Up @@ -938,28 +994,26 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
}
}

// forEachAnnounce does a range loop over a map of announcements in production,
// but during testing it does a deterministic sorted random to allow reproducing
// issues.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) {
// If we're running production, use whatever Go's map gives us
if f.rand == nil {
for hash, meta := range announces {
if !do(hash, meta) {
return
}
}
return
// forEachAnnounce loops over the given announcements in arrival order, invoking
// the do function for each until it returns false. We enforce an arrival
// ordering to minimize the chances of transaction nonce-gaps, which result in
// transactions being rejected by the txpool.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadataWithSeq, do func(hash common.Hash, meta txMetadata) bool) {
type announcement struct {
hash common.Hash
meta txMetadata
seq uint64
}
// We're running the test suite, make iteration deterministic
list := make([]common.Hash, 0, len(announces))
for hash := range announces {
list = append(list, hash)
// Process announcements by their arrival order
list := make([]announcement, 0, len(announces))
for hash, entry := range announces {
list = append(list, announcement{hash: hash, meta: entry.txMetadata, seq: entry.seq})
}
sortHashes(list)
rotateHashes(list, f.rand.Intn(len(list)))
for _, hash := range list {
if !do(hash, announces[hash]) {
sort.Slice(list, func(i, j int) bool {
return list[i].seq < list[j].seq
})
for i := range list {
if !do(list[i].hash, list[i].meta) {
return
}
}
Expand All @@ -975,26 +1029,3 @@ func rotateStrings(slice []string, n int) {
slice[i] = orig[(i+n)%len(orig)]
}
}

// sortHashes sorts a slice of hashes. This method is only used in tests in order
// to simulate random map iteration but keep it deterministic.
func sortHashes(slice []common.Hash) {
for i := 0; i < len(slice); i++ {
for j := i + 1; j < len(slice); j++ {
if bytes.Compare(slice[i][:], slice[j][:]) > 0 {
slice[i], slice[j] = slice[j], slice[i]
}
}
}
}

// rotateHashes rotates the contents of a slice by n steps. This method is only
// used in tests to simulate random map iteration but keep it deterministic.
func rotateHashes(slice []common.Hash, n int) {
orig := make([]common.Hash, len(slice))
copy(orig, slice)

for i := 0; i < len(orig); i++ {
slice[i] = orig[(i+n)%len(orig)]
}
}
Loading