diff --git a/core/tx_list.go b/core/tx_list.go index 5f623806d609..3e746ff3518e 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -21,6 +21,8 @@ import ( "math" "math/big" "sort" + "sync" + "sync/atomic" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/core/types" @@ -450,9 +452,10 @@ func (h *priceHeap) Pop() interface{} { // in txpool but only interested in the remote part. It means only remote transactions // will be considered for tracking, sorting, eviction, etc. type txPricedList struct { - all *txLookup // Pointer to the map of all transactions - remotes *priceHeap // Heap of prices of all the stored **remote** transactions - stales int // Number of stale price points to (re-heap trigger) + all *txLookup // Pointer to the map of all transactions + remotes *priceHeap // Heap of prices of all the stored **remote** transactions + stales int64 // Number of stale price points to (re-heap trigger) + reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list } // newTxPricedList creates a new price-sorted transaction heap. @@ -476,8 +479,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) { // the heap if a large enough ratio of transactions go stale. func (l *txPricedList) Removed(count int) { // Bump the stale counter, but exit if still too low (< 25%) - l.stales += count - if l.stales <= len(*l.remotes)/4 { + stales := atomic.AddInt64(&l.stales, int64(count)) + if int(stales) <= len(*l.remotes)/4 { return } // Seems we've reached a critical number of stale transactions, reheap @@ -515,7 +518,7 @@ func (l *txPricedList) Underpriced(tx *types.Transaction) bool { for len(*l.remotes) > 0 { head := []*types.Transaction(*l.remotes)[0] if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated - l.stales-- + atomic.AddInt64(&l.stales, -1) heap.Pop(l.remotes) continue } @@ -541,7 +544,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) // Discard stale transactions if found during cleanup tx := heap.Pop(l.remotes).(*types.Transaction) if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated - l.stales-- + atomic.AddInt64(&l.stales, -1) continue } // Non stale transaction found, discard it @@ -560,9 +563,12 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) // Reheap forcibly rebuilds the heap based on the current remote transaction set. func (l *txPricedList) Reheap() { + l.reheapMu.Lock() + defer l.reheapMu.Unlock() reheap := make(priceHeap, 0, l.all.RemoteCount()) - l.stales, l.remotes = 0, &reheap + atomic.StoreInt64(&l.stales, 0) + l.remotes = &reheap l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { *l.remotes = append(*l.remotes, tx) return true diff --git a/core/tx_pool.go b/core/tx_pool.go index 64359a972bbc..7a3aa8cd1efe 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -23,6 +23,7 @@ import ( "math/big" "sort" "sync" + "sync/atomic" "time" "github.com/XinFinOrg/XDPoSChain/common" @@ -282,6 +283,7 @@ type TxPool struct { reorgDoneCh chan chan struct{} reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop wg sync.WaitGroup // tracks loop, scheduleReorgLoop + initDoneCh chan struct{} // is closed once the pool is initialized (for tests) eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions. IsSigner func(address common.Address) bool @@ -314,6 +316,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block queueTxEventCh: make(chan *types.Transaction), reorgDoneCh: make(chan chan struct{}), reorgShutdownCh: make(chan struct{}), + initDoneCh: make(chan struct{}), gasPrice: new(big.Int).SetUint64(config.PriceLimit), trc21FeeCapacity: map[common.Address]*big.Int{}, } @@ -368,6 +371,8 @@ func (pool *TxPool) loop() { defer evict.Stop() defer journal.Stop() + // Notify tests that the init phase is done + close(pool.initDoneCh) for { select { // Handle ChainHeadEvent @@ -386,8 +391,8 @@ func (pool *TxPool) loop() { case <-report.C: pool.mu.RLock() pending, queued := pool.stats() - stales := pool.priced.stales pool.mu.RUnlock() + stales := int(atomic.LoadInt64(&pool.priced.stales)) if pending != prevPending || queued != prevQueued || stales != prevStales { log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index dbbdd24baa83..62e1d70a61da 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -22,13 +22,13 @@ import ( "math/big" "math/rand" "os" + "sync/atomic" "testing" "time" + "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus" "github.com/XinFinOrg/XDPoSChain/core/rawdb" - - "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/core/state" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/crypto" @@ -69,7 +69,7 @@ func (bc *testBlockChain) Config() *params.ChainConfig { func (bc *testBlockChain) CurrentBlock() *types.Block { return types.NewBlock(&types.Header{ - GasLimit: bc.gasLimit, + GasLimit: atomic.LoadUint64(&bc.gasLimit), }, nil, nil, nil) } @@ -110,6 +110,8 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { key, _ := crypto.GenerateKey() pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + // wait for the pool to initialize + <-pool.initDoneCh return pool, key } @@ -572,7 +574,7 @@ func TestTransactionDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4) } // Reduce the block gas limit, check that invalidated transactions are dropped - pool.chain.(*testBlockChain).gasLimit = 100 + atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100) <-pool.requestReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {