From 40303b08e5034d173212b34306159137cd6e63ab Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Thu, 26 Aug 2021 12:14:49 +0200 Subject: [PATCH 1/6] core: fix race conditions in txpool --- core/tx_list.go | 36 +++++++++++++++++++++---------- core/tx_pool.go | 51 +++++++++++++++++++++++--------------------- core/tx_pool_test.go | 10 +++++++-- 3 files changed, 60 insertions(+), 37 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index 607838ba37c9..ec16effc0d70 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -21,6 +21,8 @@ import ( "math" "math/big" "sort" + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -478,9 +480,10 @@ func (h *priceHeap) Pop() interface{} { // better candidates for inclusion while in other cases (at the top of the baseFee peak) // the floating heap is better. When baseFee is decreasing they behave similarly. type txPricedList struct { - all *txLookup // Pointer to the map of all transactions - urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions - stales int // Number of stale price points to (re-heap trigger) + all *txLookup // Pointer to the map of all transactions + urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions + stales int64 // Number of stale price points to (re-heap trigger) + mu sync.Mutex // Mutex asserts that only one routine is reheaping the list } const ( @@ -509,13 +512,15 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) { // from the pool. The list will just keep a counter of stale objects and update // the heap if a large enough ratio of transactions go stale. func (l *txPricedList) Removed(count int) { + l.mu.Lock() + defer l.mu.Unlock() // Bump the stale counter, but exit if still too low (< 25%) - l.stales += count - if l.stales <= (len(l.urgent.list)+len(l.floating.list))/4 { + stales := atomic.AddInt64(&l.stales, int64(count)) + if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 { return } // Seems we've reached a critical number of stale transactions, reheap - l.Reheap() + l.reheap() } // Underpriced checks whether a transaction is cheaper than (or as cheap as) the @@ -535,7 +540,7 @@ func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool for len(h.list) > 0 { head := h.list[0] if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated - l.stales-- + atomic.AddInt64(&l.stales, -1) heap.Pop(h) continue } @@ -561,7 +566,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) // Discard stale transactions if found during cleanup tx := heap.Pop(&l.urgent).(*types.Transaction) if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated - l.stales-- + atomic.AddInt64(&l.stales, -1) continue } // Non stale transaction found, move to floating heap @@ -574,7 +579,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) // Discard stale transactions if found during cleanup tx := heap.Pop(&l.floating).(*types.Transaction) if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated - l.stales-- + atomic.AddInt64(&l.stales, -1) continue } // Non stale transaction found, discard it @@ -594,8 +599,15 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) // Reheap forcibly rebuilds the heap based on the current remote transaction set. func (l *txPricedList) Reheap() { + l.mu.Lock() + defer l.mu.Unlock() +} + +// reheap forcibly rebuilds the heap based on the current remote transaction set. +// Expects the reheap mutex to be held +func (l *txPricedList) reheap() { start := time.Now() - l.stales = 0 + atomic.StoreInt64(&l.stales, 0) l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { l.urgent.list = append(l.urgent.list, tx) @@ -621,5 +633,7 @@ func (l *txPricedList) Reheap() { // necessary to call right before SetBaseFee when processing a new block. func (l *txPricedList) SetBaseFee(baseFee *big.Int) { l.urgent.baseFee = baseFee - l.Reheap() + l.mu.Lock() + defer l.mu.Unlock() + l.reheap() } diff --git a/core/tx_pool.go b/core/tx_pool.go index ee56dae888d7..9b657eae6fd8 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -22,6 +22,7 @@ import ( "math/big" "sort" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -256,14 +257,15 @@ type TxPool struct { all *txLookup // All transactions to allow lookups priced *txPricedList // All transactions sorted by price - chainHeadCh chan ChainHeadEvent - chainHeadSub event.Subscription - reqResetCh chan *txpoolResetRequest - reqPromoteCh chan *accountSet - queueTxEventCh chan *types.Transaction - reorgDoneCh chan chan struct{} - reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop - wg sync.WaitGroup // tracks loop, scheduleReorgLoop + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + reqResetCh chan *txpoolResetRequest + reqPromoteCh chan *accountSet + updateBlockchainCh chan blockChain + queueTxEventCh chan *types.Transaction + reorgDoneCh chan chan struct{} + reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop + wg sync.WaitGroup // tracks loop, scheduleReorgLoop changesSinceReorg int // A counter for how many drops we've performed in-between reorg. } @@ -280,21 +282,22 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.LatestSigner(chainconfig), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - reqResetCh: make(chan *txpoolResetRequest), - reqPromoteCh: make(chan *accountSet), - queueTxEventCh: make(chan *types.Transaction), - reorgDoneCh: make(chan chan struct{}), - reorgShutdownCh: make(chan struct{}), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.LatestSigner(chainconfig), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + reqResetCh: make(chan *txpoolResetRequest), + reqPromoteCh: make(chan *accountSet), + queueTxEventCh: make(chan *types.Transaction), + reorgDoneCh: make(chan chan struct{}), + reorgShutdownCh: make(chan struct{}), + updateBlockchainCh: make(chan blockChain), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -365,8 +368,8 @@ func (pool *TxPool) loop() { case <-report.C: pool.mu.RLock() pending, queued := pool.stats() - stales := pool.priced.stales pool.mu.RUnlock() + stales := int(atomic.LoadInt64(&pool.priced.stales)) if pending != prevPending || queued != prevQueued || stales != prevStales { log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index f86f64bf2564..f0347e3c4d05 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -24,6 +24,7 @@ import ( "math/big" "math/rand" "os" + "sync/atomic" "testing" "time" @@ -64,7 +65,7 @@ type testBlockChain struct { func (bc *testBlockChain) CurrentBlock() *types.Block { return types.NewBlock(&types.Header{ - GasLimit: bc.gasLimit, + GasLimit: atomic.LoadUint64(&bc.gasLimit), }, nil, nil, nil, trie.NewStackTrie(nil)) } @@ -123,6 +124,7 @@ func setupTxPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateK key, _ := crypto.GenerateKey() pool := NewTxPool(testTxPoolConfig, config, blockchain) + pool.chainHeadCh <- ChainHeadEvent{} return pool, key } @@ -426,7 +428,9 @@ func TestTransactionChainFork(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) statedb.AddBalance(addr, big.NewInt(100000000000000)) + pool.mu.Lock() pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} + pool.mu.Unlock() <-pool.requestReset(nil, nil) } resetState() @@ -455,7 +459,9 @@ func TestTransactionDoubleNonce(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) statedb.AddBalance(addr, big.NewInt(100000000000000)) + pool.mu.Lock() pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} + pool.mu.Unlock() <-pool.requestReset(nil, nil) } resetState() @@ -625,7 +631,7 @@ func TestTransactionDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4) } // Reduce the block gas limit, check that invalidated transactions are dropped - pool.chain.(*testBlockChain).gasLimit = 100 + atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100) <-pool.requestReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { From 1caff8fad7f6606cfe19965687ff8a6bb39875c0 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Thu, 26 Aug 2021 18:36:14 +0200 Subject: [PATCH 2/6] core: fixed races in the txpool --- core/tx_list.go | 1 + core/tx_pool.go | 4 ++++ core/tx_pool_test.go | 7 ++----- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index ec16effc0d70..c1c29fb7b528 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -601,6 +601,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) func (l *txPricedList) Reheap() { l.mu.Lock() defer l.mu.Unlock() + l.reheap() } // reheap forcibly rebuilds the heap based on the current remote transaction set. diff --git a/core/tx_pool.go b/core/tx_pool.go index 9b657eae6fd8..49c9f45e4420 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -266,6 +266,7 @@ type TxPool struct { reorgDoneCh chan chan struct{} reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop wg sync.WaitGroup // tracks loop, scheduleReorgLoop + initDoneCh chan struct{} // is closed once the pool is initialized (for tests) changesSinceReorg int // A counter for how many drops we've performed in-between reorg. } @@ -297,6 +298,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block reorgDoneCh: make(chan chan struct{}), reorgShutdownCh: make(chan struct{}), updateBlockchainCh: make(chan blockChain), + initDoneCh: make(chan struct{}), gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) @@ -350,6 +352,8 @@ func (pool *TxPool) loop() { defer evict.Stop() defer journal.Stop() + // Notify tests that the init phase is done + close(pool.initDoneCh) for { select { // Handle ChainHeadEvent diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index f0347e3c4d05..ce2fe7755009 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -124,7 +124,8 @@ func setupTxPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateK key, _ := crypto.GenerateKey() pool := NewTxPool(testTxPoolConfig, config, blockchain) - pool.chainHeadCh <- ChainHeadEvent{} + // wait for the pool to initialize + <-pool.initDoneCh return pool, key } @@ -428,9 +429,7 @@ func TestTransactionChainFork(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) statedb.AddBalance(addr, big.NewInt(100000000000000)) - pool.mu.Lock() pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} - pool.mu.Unlock() <-pool.requestReset(nil, nil) } resetState() @@ -459,9 +458,7 @@ func TestTransactionDoubleNonce(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) statedb.AddBalance(addr, big.NewInt(100000000000000)) - pool.mu.Lock() pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} - pool.mu.Unlock() <-pool.requestReset(nil, nil) } resetState() From 8906d9c2885ecf74f6cabdb83bc9d67fd9505a89 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Fri, 27 Aug 2021 11:10:20 +0200 Subject: [PATCH 3/6] core: rebased on master --- core/tx_pool.go | 52 ++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 49c9f45e4420..5c70cf170578 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -257,16 +257,15 @@ type TxPool struct { all *txLookup // All transactions to allow lookups priced *txPricedList // All transactions sorted by price - chainHeadCh chan ChainHeadEvent - chainHeadSub event.Subscription - reqResetCh chan *txpoolResetRequest - reqPromoteCh chan *accountSet - updateBlockchainCh chan blockChain - queueTxEventCh chan *types.Transaction - reorgDoneCh chan chan struct{} - reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop - wg sync.WaitGroup // tracks loop, scheduleReorgLoop - initDoneCh chan struct{} // is closed once the pool is initialized (for tests) + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + reqResetCh chan *txpoolResetRequest + reqPromoteCh chan *accountSet + queueTxEventCh chan *types.Transaction + reorgDoneCh chan chan struct{} + reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop + wg sync.WaitGroup // tracks loop, scheduleReorgLoop + initDoneCh chan struct{} // is closed once the pool is initialized (for tests) changesSinceReorg int // A counter for how many drops we've performed in-between reorg. } @@ -283,23 +282,22 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.LatestSigner(chainconfig), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - reqResetCh: make(chan *txpoolResetRequest), - reqPromoteCh: make(chan *accountSet), - queueTxEventCh: make(chan *types.Transaction), - reorgDoneCh: make(chan chan struct{}), - reorgShutdownCh: make(chan struct{}), - updateBlockchainCh: make(chan blockChain), - initDoneCh: make(chan struct{}), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.LatestSigner(chainconfig), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + reqResetCh: make(chan *txpoolResetRequest), + reqPromoteCh: make(chan *accountSet), + queueTxEventCh: make(chan *types.Transaction), + reorgDoneCh: make(chan chan struct{}), + reorgShutdownCh: make(chan struct{}), + initDoneCh: make(chan struct{}), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { From 4d2e3a648b64633baadaa9104cbea57601a2c5a9 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Fri, 27 Aug 2021 11:21:18 +0200 Subject: [PATCH 4/6] core: move reheap mutex --- core/tx_list.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index c1c29fb7b528..2123615602a7 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -512,8 +512,6 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) { // from the pool. The list will just keep a counter of stale objects and update // the heap if a large enough ratio of transactions go stale. func (l *txPricedList) Removed(count int) { - l.mu.Lock() - defer l.mu.Unlock() // Bump the stale counter, but exit if still too low (< 25%) stales := atomic.AddInt64(&l.stales, int64(count)) if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 { @@ -599,14 +597,14 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) // Reheap forcibly rebuilds the heap based on the current remote transaction set. func (l *txPricedList) Reheap() { - l.mu.Lock() - defer l.mu.Unlock() l.reheap() } // reheap forcibly rebuilds the heap based on the current remote transaction set. // Expects the reheap mutex to be held func (l *txPricedList) reheap() { + l.mu.Lock() + defer l.mu.Unlock() start := time.Now() atomic.StoreInt64(&l.stales, 0) l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) @@ -634,7 +632,5 @@ func (l *txPricedList) reheap() { // necessary to call right before SetBaseFee when processing a new block. func (l *txPricedList) SetBaseFee(baseFee *big.Int) { l.urgent.baseFee = baseFee - l.mu.Lock() - defer l.mu.Unlock() l.reheap() } From dcdfb7cf1a570f7f425ca436b1e4695d2dcd8b82 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Fri, 27 Aug 2021 11:52:08 +0200 Subject: [PATCH 5/6] core: renamed mutex --- core/tx_list.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index 2123615602a7..482f33db9652 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -483,7 +483,7 @@ type txPricedList struct { all *txLookup // Pointer to the map of all transactions urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions stales int64 // Number of stale price points to (re-heap trigger) - mu sync.Mutex // Mutex asserts that only one routine is reheaping the list + reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list } const ( @@ -603,8 +603,8 @@ func (l *txPricedList) Reheap() { // reheap forcibly rebuilds the heap based on the current remote transaction set. // Expects the reheap mutex to be held func (l *txPricedList) reheap() { - l.mu.Lock() - defer l.mu.Unlock() + l.reheapMu.Lock() + defer l.reheapMu.Unlock() start := time.Now() atomic.StoreInt64(&l.stales, 0) l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) From 506794e3d9dd4d431cd61705c0eee2203b209017 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Fri, 27 Aug 2021 14:27:34 +0200 Subject: [PATCH 6/6] core: revert Reheap changes --- core/tx_list.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index 482f33db9652..1d5c336a1c28 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -518,7 +518,7 @@ func (l *txPricedList) Removed(count int) { return } // Seems we've reached a critical number of stale transactions, reheap - l.reheap() + l.Reheap() } // Underpriced checks whether a transaction is cheaper than (or as cheap as) the @@ -597,12 +597,6 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) // Reheap forcibly rebuilds the heap based on the current remote transaction set. func (l *txPricedList) Reheap() { - l.reheap() -} - -// reheap forcibly rebuilds the heap based on the current remote transaction set. -// Expects the reheap mutex to be held -func (l *txPricedList) reheap() { l.reheapMu.Lock() defer l.reheapMu.Unlock() start := time.Now() @@ -632,5 +626,5 @@ func (l *txPricedList) reheap() { // necessary to call right before SetBaseFee when processing a new block. func (l *txPricedList) SetBaseFee(baseFee *big.Int) { l.urgent.baseFee = baseFee - l.reheap() + l.Reheap() }