Skip to content

Commit

Permalink
core: fix race conditions in txpool (ethereum#23474)
Browse files Browse the repository at this point in the history
  • Loading branch information
gzliudan committed Jul 5, 2024
1 parent df2963e commit cdbe388
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
23 changes: 14 additions & 9 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"math"
"math/big"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/XinFinOrg/XDPoSChain/common"
Expand Down Expand Up @@ -487,9 +489,10 @@ func (h *priceHeap) Pop() interface{} {
// better candidates for inclusion while in other cases (at the top of the baseFee peak)
// the floating heap is better. When baseFee is decreasing they behave similarly.
type txPricedList struct {
all *txLookup // Pointer to the map of all transactions
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
stales int // Number of stale price points to (re-heap trigger)
all *txLookup // Pointer to the map of all transactions
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
stales int64 // Number of stale price points to (re-heap trigger)
reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list
}

const (
Expand Down Expand Up @@ -519,8 +522,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) {
// the heap if a large enough ratio of transactions go stale.
func (l *txPricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%)
l.stales += count
if l.stales <= (len(l.urgent.list)+len(l.floating.list))/4 {
stales := atomic.AddInt64(&l.stales, int64(count))
if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 {
return
}
// Seems we've reached a critical number of stale transactions, reheap
Expand All @@ -544,7 +547,7 @@ func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool
for len(h.list) > 0 {
head := h.list[0]
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
l.stales--
atomic.AddInt64(&l.stales, -1)
heap.Pop(h)
continue
}
Expand All @@ -570,7 +573,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.urgent).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
l.stales--
atomic.AddInt64(&l.stales, -1)
continue
}
// Non stale transaction found, move to floating heap
Expand All @@ -583,7 +586,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.floating).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
l.stales--
atomic.AddInt64(&l.stales, -1)
continue
}
// Non stale transaction found, discard it
Expand All @@ -603,8 +606,10 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)

// Reheap forcibly rebuilds the heap based on the current remote transaction set.
func (l *txPricedList) Reheap() {
l.reheapMu.Lock()
defer l.reheapMu.Unlock()
start := time.Now()
l.stales = 0
atomic.StoreInt64(&l.stales, 0)
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
l.urgent.list = append(l.urgent.list, tx)
Expand Down
7 changes: 6 additions & 1 deletion core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/big"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/XinFinOrg/XDPoSChain/common"
Expand Down Expand Up @@ -296,6 +297,7 @@ type TxPool struct {
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

Expand Down Expand Up @@ -329,6 +331,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
trc21FeeCapacity: map[common.Address]*big.Int{},
}
Expand Down Expand Up @@ -383,6 +386,8 @@ func (pool *TxPool) loop() {
defer evict.Stop()
defer journal.Stop()

// Notify tests that the init phase is done
close(pool.initDoneCh)
for {
select {
// Handle ChainHeadEvent
Expand All @@ -401,8 +406,8 @@ func (pool *TxPool) loop() {
case <-report.C:
pool.mu.RLock()
pending, queued := pool.stats()
stales := pool.priced.stales
pool.mu.RUnlock()
stales := int(atomic.LoadInt64(&pool.priced.stales))

if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
Expand Down
7 changes: 5 additions & 2 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/big"
"math/rand"
"os"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -79,7 +80,7 @@ func (bc *testBlockChain) Config() *params.ChainConfig {

func (bc *testBlockChain) CurrentBlock() *types.Block {
return types.NewBlock(&types.Header{
GasLimit: bc.gasLimit,
GasLimit: atomic.LoadUint64(&bc.gasLimit),
}, nil, nil, nil)
}

Expand Down Expand Up @@ -139,6 +140,8 @@ func setupTxPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateK
key, _ := crypto.GenerateKey()
pool := NewTxPool(testTxPoolConfig, config, blockchain)

// wait for the pool to initialize
<-pool.initDoneCh
return pool, key
}

Expand Down Expand Up @@ -646,7 +649,7 @@ func TestTransactionDropping(t *testing.T) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
}
// Reduce the block gas limit, check that invalidated transactions are dropped
pool.chain.(*testBlockChain).gasLimit = 100
atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100)
<-pool.requestReset(nil, nil)

if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
Expand Down

0 comments on commit cdbe388

Please sign in to comment.