Skip to content

Commit

Permalink
EIP-1559 tx pool support (ethereum#22898)
Browse files Browse the repository at this point in the history
  • Loading branch information
gzliudan committed Aug 25, 2024
1 parent 3a29838 commit e72eb12
Show file tree
Hide file tree
Showing 5 changed files with 759 additions and 157 deletions.
204 changes: 133 additions & 71 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"
"sync"
"sync/atomic"
"time"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/types"
Expand Down Expand Up @@ -284,15 +285,23 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran
return false, nil
}
if old != nil {
// threshold = oldGP * (100 + priceBump) / 100
if old.FeeCapCmp(tx) >= 0 || old.TipCmp(tx) >= 0 {
return false, nil
}
// thresholdFeeCap = oldFC * (100 + priceBump) / 100
a := big.NewInt(100 + int64(priceBump))
a = a.Mul(a, old.GasPrice())
aFeeCap := new(big.Int).Mul(a, old.FeeCap())
aTip := a.Mul(a, old.Tip())

// thresholdTip = oldTip * (100 + priceBump) / 100
b := big.NewInt(100)
threshold := a.Div(a, b)
// Have to ensure that the new gas price is higher than the old gas
// price as well as checking the percentage threshold to ensure that
thresholdFeeCap := aFeeCap.Div(aFeeCap, b)
thresholdTip := aTip.Div(aTip, b)

// Have to ensure that either the new fee cap or tip is higher than the
// old ones as well as checking the percentage threshold to ensure that
// this is accurate for low (Wei-level) gas price replacements
if old.GasPriceCmp(tx) >= 0 || tx.GasPriceIntCmp(threshold) < 0 {
if tx.FeeCapIntCmp(thresholdFeeCap) < 0 || tx.TipIntCmp(thresholdTip) < 0 {
return false, nil
}
}
Expand Down Expand Up @@ -417,52 +426,85 @@ func (l *txList) LastElement() *types.Transaction {
}

// priceHeap is a heap.Interface implementation over transactions for retrieving
// price-sorted transactions to discard when the pool fills up.
type priceHeap []*types.Transaction
// price-sorted transactions to discard when the pool fills up. If baseFee is set
// then the heap is sorted based on the effective tip based on the given base fee.
// If baseFee is nil then the sorting is based on feeCap.
type priceHeap struct {
baseFee *big.Int // heap should always be re-sorted after baseFee is changed
list []*types.Transaction
}

func (h priceHeap) Len() int { return len(h) }
func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *priceHeap) Len() int { return len(h.list) }
func (h *priceHeap) Swap(i, j int) { h.list[i], h.list[j] = h.list[j], h.list[i] }

func (h priceHeap) Less(i, j int) bool {
// Sort primarily by price, returning the cheaper one
switch h[i].GasPriceCmp(h[j]) {
func (h *priceHeap) Less(i, j int) bool {
switch h.cmp(h.list[i], h.list[j]) {
case -1:
return true
case 1:
return false
default:
return h.list[i].Nonce() > h.list[j].Nonce()
}
}

func (h *priceHeap) cmp(a, b *types.Transaction) int {
if h.baseFee != nil {
// Compare effective tips if baseFee is specified
if c := a.EffectiveTipCmp(b, h.baseFee); c != 0 {
return c
}
}
// Compare fee caps if baseFee is not specified or effective tips are equal
if c := a.FeeCapCmp(b); c != 0 {
return c
}
// If the prices match, stabilize via nonces (high nonce is worse)
return h[i].Nonce() > h[j].Nonce()
// Compare tips if effective tips and fee caps are equal
return a.TipCmp(b)
}

func (h *priceHeap) Push(x interface{}) {
*h = append(*h, x.(*types.Transaction))
tx := x.(*types.Transaction)
h.list = append(h.list, tx)
}

func (h *priceHeap) Pop() interface{} {
old := *h
old := h.list
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
old[n-1] = nil
h.list = old[0 : n-1]
return x
}

// txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way. It's built opon the all transactions
// in txpool but only interested in the remote part. It means only remote transactions
// will be considered for tracking, sorting, eviction, etc.
//
// Two heaps are used for sorting: the urgent heap (based on effective tip in the next
// block) and the floating heap (based on feeCap). Always the bigger heap is chosen for
// eviction. Transactions evicted from the urgent heap are first demoted into the floating heap.
// In some cases (during a congestion, when blocks are full) the urgent heap can provide
// better candidates for inclusion while in other cases (at the top of the baseFee peak)
// the floating heap is better. When baseFee is decreasing they behave similarly.
type txPricedList struct {
all *txLookup // Pointer to the map of all transactions
remotes *priceHeap // Heap of prices of all the stored **remote** transactions
stales int64 // Number of stale price points to (re-heap trigger)
reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list
all *txLookup // Pointer to the map of all transactions
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
stales int64 // Number of stale price points to (re-heap trigger)
reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list
}

const (
// urgentRatio : floatingRatio is the capacity ratio of the two queues
urgentRatio = 4
floatingRatio = 1
)

// newTxPricedList creates a new price-sorted transaction heap.
func newTxPricedList(all *txLookup) *txPricedList {
return &txPricedList{
all: all,
remotes: new(priceHeap),
all: all,
}
}

Expand All @@ -471,7 +513,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) {
if local {
return
}
heap.Push(l.remotes, tx)
// Insert every new transaction to the urgent heap first; Discard will balance the heaps
heap.Push(&l.urgent, tx)
}

// Removed notifies the prices transaction list that an old transaction dropped
Expand All @@ -480,58 +523,43 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) {
func (l *txPricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%)
stales := atomic.AddInt64(&l.stales, int64(count))
if int(stales) <= len(*l.remotes)/4 {
if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 {
return
}
// Seems we've reached a critical number of stale transactions, reheap
l.Reheap()
}

// Cap finds all the transactions below the given price threshold, drops them
// from the priced list and returns them for further removal from the entire pool.
//
// Note: only remote transactions will be considered for eviction.
func (l *txPricedList) Cap(threshold *big.Int) types.Transactions {
drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop
for len(*l.remotes) > 0 {
// Discard stale transactions if found during cleanup
cheapest := (*l.remotes)[0]
if l.all.GetRemote(cheapest.Hash()) == nil { // Removed or migrated
heap.Pop(l.remotes)
l.stales--
continue
}
// Stop the discards if we've reached the threshold
if cheapest.GasPriceIntCmp(threshold) >= 0 {
break
}
heap.Pop(l.remotes)
drop = append(drop, cheapest)
}
return drop
}

// Underpriced checks whether a transaction is cheaper than (or as cheap as) the
// lowest priced (remote) transaction currently being tracked.
func (l *txPricedList) Underpriced(tx *types.Transaction) bool {
// Note: with two queues, being underpriced is defined as being worse than the worst item
// in all non-empty queues if there is any. If both queues are empty then nothing is underpriced.
return (l.underpricedFor(&l.urgent, tx) || len(l.urgent.list) == 0) &&
(l.underpricedFor(&l.floating, tx) || len(l.floating.list) == 0) &&
(len(l.urgent.list) != 0 || len(l.floating.list) != 0)
}

// underpricedFor checks whether a transaction is cheaper than (or as cheap as) the
// lowest priced (remote) transaction in the given heap.
func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
// Discard stale price points if found at the heap start
for len(*l.remotes) > 0 {
head := []*types.Transaction(*l.remotes)[0]
for len(h.list) > 0 {
head := h.list[0]
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
atomic.AddInt64(&l.stales, -1)
heap.Pop(l.remotes)
heap.Pop(h)
continue
}
break
}
// Check if the transaction is underpriced or not
if len(*l.remotes) == 0 {
if len(h.list) == 0 {
return false // There is no remote transaction at all.
}
// If the remote transaction is even cheaper than the
// cheapest one tracked locally, reject it.
cheapest := []*types.Transaction(*l.remotes)[0]
return cheapest.GasPriceCmp(tx) >= 0
return h.cmp(h.list[0], tx) >= 0
}

// Discard finds a number of most underpriced transactions, removes them from the
Expand All @@ -540,21 +568,36 @@ func (l *txPricedList) Underpriced(tx *types.Transaction) bool {
// Note local transaction won't be considered for eviction.
func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
for len(*l.remotes) > 0 && slots > 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(l.remotes).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
atomic.AddInt64(&l.stales, -1)
continue
for slots > 0 {
if len(l.urgent.list)*floatingRatio > len(l.floating.list)*urgentRatio || floatingRatio == 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.urgent).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
atomic.AddInt64(&l.stales, -1)
continue
}
// Non stale transaction found, move to floating heap
heap.Push(&l.floating, tx)
} else {
if len(l.floating.list) == 0 {
// Stop if both heaps are empty
break
}
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.floating).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
atomic.AddInt64(&l.stales, -1)
continue
}
// Non stale transaction found, discard it
drop = append(drop, tx)
slots -= numSlots(tx)
}
// Non stale transaction found, discard it
drop = append(drop, tx)
slots -= numSlots(tx)
}
// If we still can't make enough room for the new transaction
if slots > 0 && !force {
for _, tx := range drop {
heap.Push(l.remotes, tx)
heap.Push(&l.urgent, tx)
}
return nil, false
}
Expand All @@ -565,13 +608,32 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
func (l *txPricedList) Reheap() {
l.reheapMu.Lock()
defer l.reheapMu.Unlock()
reheap := make(priceHeap, 0, l.all.RemoteCount())

start := time.Now()
atomic.StoreInt64(&l.stales, 0)
l.remotes = &reheap
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
*l.remotes = append(*l.remotes, tx)
l.urgent.list = append(l.urgent.list, tx)
return true
}, false, true) // Only iterate remotes
heap.Init(l.remotes)
heap.Init(&l.urgent)

// balance out the two heaps by moving the worse half of transactions into the
// floating heap
// Note: Discard would also do this before the first eviction but Reheap can do
// is more efficiently. Also, Underpriced would work suboptimally the first time
// if the floating queue was empty.
floatingCount := len(l.urgent.list) * floatingRatio / (urgentRatio + floatingRatio)
l.floating.list = make([]*types.Transaction, floatingCount)
for i := 0; i < floatingCount; i++ {
l.floating.list[i] = heap.Pop(&l.urgent).(*types.Transaction)
}
heap.Init(&l.floating)
reheapTimer.Update(time.Since(start))
}

// SetBaseFee updates the base fee and triggers a re-heap. Note that Removed is not
// necessary to call right before SetBaseFee when processing a new block.
func (l *txPricedList) SetBaseFee(baseFee *big.Int) {
l.urgent.baseFee = baseFee
l.Reheap()
}
Loading

0 comments on commit e72eb12

Please sign in to comment.