diff --git a/core/tx_list.go b/core/tx_list.go index 607838ba37c9..1d5c336a1c28 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -21,6 +21,8 @@ import ( "math" "math/big" "sort" + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -478,9 +480,10 @@ func (h *priceHeap) Pop() interface{} { // better candidates for inclusion while in other cases (at the top of the baseFee peak) // the floating heap is better. When baseFee is decreasing they behave similarly. type txPricedList struct { - all *txLookup // Pointer to the map of all transactions - urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions - stales int // Number of stale price points to (re-heap trigger) + all *txLookup // Pointer to the map of all transactions + urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions + stales int64 // Number of stale price points to (re-heap trigger) + reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list } const ( @@ -510,8 +513,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) { // the heap if a large enough ratio of transactions go stale. func (l *txPricedList) Removed(count int) { // Bump the stale counter, but exit if still too low (< 25%) - l.stales += count - if l.stales <= (len(l.urgent.list)+len(l.floating.list))/4 { + stales := atomic.AddInt64(&l.stales, int64(count)) + if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 { return } // Seems we've reached a critical number of stale transactions, reheap @@ -535,7 +538,7 @@ func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool for len(h.list) > 0 { head := h.list[0] if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated - l.stales-- + atomic.AddInt64(&l.stales, -1) heap.Pop(h) continue } @@ -561,7 +564,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) // Discard stale transactions if found during cleanup tx := heap.Pop(&l.urgent).(*types.Transaction) if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated - l.stales-- + atomic.AddInt64(&l.stales, -1) continue } // Non stale transaction found, move to floating heap @@ -574,7 +577,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) // Discard stale transactions if found during cleanup tx := heap.Pop(&l.floating).(*types.Transaction) if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated - l.stales-- + atomic.AddInt64(&l.stales, -1) continue } // Non stale transaction found, discard it @@ -594,8 +597,10 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) // Reheap forcibly rebuilds the heap based on the current remote transaction set. func (l *txPricedList) Reheap() { + l.reheapMu.Lock() + defer l.reheapMu.Unlock() start := time.Now() - l.stales = 0 + atomic.StoreInt64(&l.stales, 0) l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { l.urgent.list = append(l.urgent.list, tx) diff --git a/core/tx_pool.go b/core/tx_pool.go index 4903e2ef28bb..629e56d93aa4 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -22,6 +22,7 @@ import ( "math/big" "sort" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -282,6 +283,7 @@ type TxPool struct { reorgDoneCh chan chan struct{} reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop wg sync.WaitGroup // tracks loop, scheduleReorgLoop + initDoneCh chan struct{} // is closed once the pool is initialized (for tests) changesSinceReorg int // A counter for how many drops we've performed in-between reorg. } @@ -312,6 +314,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block queueTxEventCh: make(chan *types.Transaction), reorgDoneCh: make(chan chan struct{}), reorgShutdownCh: make(chan struct{}), + initDoneCh: make(chan struct{}), gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.jamIndexer = newTxJamIndexer(config.JamConfig, pool) @@ -372,6 +375,8 @@ func (pool *TxPool) loop() { defer evict.Stop() defer journal.Stop() + // Notify tests that the init phase is done + close(pool.initDoneCh) for { select { // Handle ChainHeadEvent @@ -391,8 +396,8 @@ func (pool *TxPool) loop() { case <-report.C: pool.mu.RLock() pending, queued := pool.stats() - stales := pool.priced.stales pool.mu.RUnlock() + stales := int(atomic.LoadInt64(&pool.priced.stales)) if pending != prevPending || queued != prevQueued || stales != prevStales { log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index f86f64bf2564..ce2fe7755009 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -24,6 +24,7 @@ import ( "math/big" "math/rand" "os" + "sync/atomic" "testing" "time" @@ -64,7 +65,7 @@ type testBlockChain struct { func (bc *testBlockChain) CurrentBlock() *types.Block { return types.NewBlock(&types.Header{ - GasLimit: bc.gasLimit, + GasLimit: atomic.LoadUint64(&bc.gasLimit), }, nil, nil, nil, trie.NewStackTrie(nil)) } @@ -123,6 +124,8 @@ func setupTxPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateK key, _ := crypto.GenerateKey() pool := NewTxPool(testTxPoolConfig, config, blockchain) + // wait for the pool to initialize + <-pool.initDoneCh return pool, key } @@ -625,7 +628,7 @@ func TestTransactionDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4) } // Reduce the block gas limit, check that invalidated transactions are dropped - pool.chain.(*testBlockChain).gasLimit = 100 + atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100) <-pool.requestReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {