diff --git a/txpool/pool.go b/txpool/pool.go index d4ec6a99a..8d66310d5 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -289,14 +289,14 @@ func calcProtocolBaseFee(baseFee uint64) uint64 { type TxPool struct { _chainDB kv.RoDB // remote db - use it wisely _stateCache kvcache.Cache - lock *sync.RWMutex + lock *sync.Mutex recentlyConnectedPeers *recentlyConnectedPeers // all txs will be propagated to this peers eventually, and clear list senders *sendersBatch // batch processing of remote transactions // handling works fast without batching, but batching allow: // - reduce amount of _chainDB transactions // - batch notifications about new txs (reduce P2P spam to other nodes about txs propagation) - // - and as a result reducing pool.RWLock contention + // - and as a result reducing lock contention unprocessedRemoteTxs *types.TxSlots unprocessedRemoteByHash map[string]int // to reject duplicates byHash map[string]*metaTx // tx_hash => tx : only not committed to db yet records @@ -337,7 +337,7 @@ func New(newTxs chan types.Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cac tracedSenders[sender] = struct{}{} } return &TxPool{ - lock: &sync.RWMutex{}, + lock: &sync.Mutex{}, byHash: map[string]*metaTx{}, isLocalLRU: localsHistory, discardReasonsLRU: discardHistory, @@ -539,14 +539,14 @@ func (p *TxPool) getRlpLocked(tx kv.Tx, hash []byte) (rlpTxn []byte, sender []by return v[20:], v[:20], txn != nil && txn.subPool&IsLocal > 0, nil } func (p *TxPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() rlpTx, _, _, err := p.getRlpLocked(tx, hash) return common.Copy(rlpTx), err } func (p *TxPool) AppendLocalHashes(buf []byte) []byte { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() for hash, txn := range p.byHash { if txn.subPool&IsLocal == 0 { continue @@ -556,8 +556,8 @@ func (p *TxPool) AppendLocalHashes(buf []byte) []byte { return buf } func (p *TxPool) AppendRemoteHashes(buf []byte) []byte { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() for hash, txn := range p.byHash { if txn.subPool&IsLocal != 0 { @@ -576,8 +576,8 @@ func (p *TxPool) AppendAllHashes(buf []byte) []byte { return buf } func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() if _, ok := p.discardReasonsLRU.Get(string(hash)); ok { return true, nil } @@ -590,8 +590,8 @@ func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { return tx.Has(kv.PoolTransaction, hash) } func (p *TxPool) IsLocal(idHash []byte) bool { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() return p.isLocalLRU.Contains(string(idHash)) } func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } @@ -610,8 +610,8 @@ func (p *TxPool) Best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf uint64) (bo var toRemove []*metaTx success, err := func() (bool, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() best := p.pending.best for i := 0; j < int(n) && i < len(best.ms); i++ { @@ -647,8 +647,8 @@ func (p *TxPool) Best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf uint64) (bo } func (p *TxPool) CountContent() (int, int, int) { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() return p.pending.Len(), p.baseFee.Len(), p.queued.Len() } func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots) { @@ -871,14 +871,14 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, } func (p *TxPool) coreDB() kv.RoDB { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() return p._chainDB } func (p *TxPool) cache() kvcache.Cache { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() return p._stateCache } @@ -1072,7 +1072,7 @@ func (p *TxPool) addLocked(mt *metaTx) DiscardReason { if replaced := p.all.replaceOrInsert(mt); replaced != nil { if ASSERT { - panic("must neve happen") + panic("must never happen") } } @@ -1094,8 +1094,8 @@ func (p *TxPool) discardLocked(mt *metaTx, reason DiscardReason) { } func (p *TxPool) NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool) { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() senderID, found := p.senders.getID(addr[:]) if !found { return 0, false @@ -1700,8 +1700,8 @@ func (p *TxPool) logStats() { return } - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() var m runtime.MemStats common.ReadMemStats(&m) @@ -1724,8 +1724,8 @@ func (p *TxPool) logStats() { // Deprecated need switch to streaming-like func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp, sender []byte, t SubPoolType), tx kv.Tx) { - p.lock.RLock() - defer p.lock.RUnlock() + p.lock.Lock() + defer p.lock.Unlock() p.all.ascendAll(func(mt *metaTx) bool { slot := mt.Tx slotRlp := slot.Rlp @@ -1794,7 +1794,7 @@ var PoolPendingBaseFeeKey = []byte("pending_base_fee") // it doesn't track if peer disconnected, it's fine type recentlyConnectedPeers struct { peers []types.PeerID - lock sync.RWMutex + lock sync.Mutex } func (l *recentlyConnectedPeers) AddPeer(p types.PeerID) {