Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

txpool: sync.Mutex instead of RWMutex #735

Merged
merged 3 commits into from
Nov 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 29 additions & 29 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ func calcProtocolBaseFee(baseFee uint64) uint64 {
type TxPool struct {
_chainDB kv.RoDB // remote db - use it wisely
_stateCache kvcache.Cache
lock *sync.RWMutex
lock *sync.Mutex
recentlyConnectedPeers *recentlyConnectedPeers // all txs will be propagated to this peers eventually, and clear list
senders *sendersBatch
// batch processing of remote transactions
// handling works fast without batching, but batching allow:
// - reduce amount of _chainDB transactions
// - batch notifications about new txs (reduce P2P spam to other nodes about txs propagation)
// - and as a result reducing pool.RWLock contention
// - and as a result reducing lock contention
unprocessedRemoteTxs *types.TxSlots
unprocessedRemoteByHash map[string]int // to reject duplicates
byHash map[string]*metaTx // tx_hash => tx : only not committed to db yet records
Expand Down Expand Up @@ -337,7 +337,7 @@ func New(newTxs chan types.Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cac
tracedSenders[sender] = struct{}{}
}
return &TxPool{
lock: &sync.RWMutex{},
lock: &sync.Mutex{},
byHash: map[string]*metaTx{},
isLocalLRU: localsHistory,
discardReasonsLRU: discardHistory,
Expand Down Expand Up @@ -539,14 +539,14 @@ func (p *TxPool) getRlpLocked(tx kv.Tx, hash []byte) (rlpTxn []byte, sender []by
return v[20:], v[:20], txn != nil && txn.subPool&IsLocal > 0, nil
}
func (p *TxPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
rlpTx, _, _, err := p.getRlpLocked(tx, hash)
return common.Copy(rlpTx), err
}
func (p *TxPool) AppendLocalHashes(buf []byte) []byte {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
for hash, txn := range p.byHash {
if txn.subPool&IsLocal == 0 {
continue
Expand All @@ -556,8 +556,8 @@ func (p *TxPool) AppendLocalHashes(buf []byte) []byte {
return buf
}
func (p *TxPool) AppendRemoteHashes(buf []byte) []byte {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()

for hash, txn := range p.byHash {
if txn.subPool&IsLocal != 0 {
Expand All @@ -576,8 +576,8 @@ func (p *TxPool) AppendAllHashes(buf []byte) []byte {
return buf
}
func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.discardReasonsLRU.Get(string(hash)); ok {
return true, nil
}
Expand All @@ -590,8 +590,8 @@ func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) {
return tx.Has(kv.PoolTransaction, hash)
}
func (p *TxPool) IsLocal(idHash []byte) bool {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
return p.isLocalLRU.Contains(string(idHash))
}
func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
Expand All @@ -610,8 +610,8 @@ func (p *TxPool) Best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf uint64) (bo
var toRemove []*metaTx

success, err := func() (bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()

best := p.pending.best
for i := 0; j < int(n) && i < len(best.ms); i++ {
Expand Down Expand Up @@ -647,8 +647,8 @@ func (p *TxPool) Best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf uint64) (bo
}

func (p *TxPool) CountContent() (int, int, int) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
return p.pending.Len(), p.baseFee.Len(), p.queued.Len()
}
func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots) {
Expand Down Expand Up @@ -871,14 +871,14 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots,
}

func (p *TxPool) coreDB() kv.RoDB {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
return p._chainDB
}

func (p *TxPool) cache() kvcache.Cache {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
return p._stateCache
}

Expand Down Expand Up @@ -1072,7 +1072,7 @@ func (p *TxPool) addLocked(mt *metaTx) DiscardReason {

if replaced := p.all.replaceOrInsert(mt); replaced != nil {
if ASSERT {
panic("must neve happen")
panic("must never happen")
}
}

Expand All @@ -1094,8 +1094,8 @@ func (p *TxPool) discardLocked(mt *metaTx, reason DiscardReason) {
}

func (p *TxPool) NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
senderID, found := p.senders.getID(addr[:])
if !found {
return 0, false
Expand Down Expand Up @@ -1700,8 +1700,8 @@ func (p *TxPool) logStats() {
return
}

p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()

var m runtime.MemStats
common.ReadMemStats(&m)
Expand All @@ -1724,8 +1724,8 @@ func (p *TxPool) logStats() {

// Deprecated need switch to streaming-like
func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp, sender []byte, t SubPoolType), tx kv.Tx) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
p.all.ascendAll(func(mt *metaTx) bool {
slot := mt.Tx
slotRlp := slot.Rlp
Expand Down Expand Up @@ -1794,7 +1794,7 @@ var PoolPendingBaseFeeKey = []byte("pending_base_fee")
// it doesn't track if peer disconnected, it's fine
type recentlyConnectedPeers struct {
peers []types.PeerID
lock sync.RWMutex
lock sync.Mutex
}

func (l *recentlyConnectedPeers) AddPeer(p types.PeerID) {
Expand Down