From 4f97d6739c48f7adf0dcab458e98ad6ba6963d12 Mon Sep 17 00:00:00 2001 From: "xingqiang.yuan" Date: Thu, 4 Jul 2024 16:28:43 +0800 Subject: [PATCH 1/8] deny new transactions if txpool is full --- zk/txpool/pool.go | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/zk/txpool/pool.go b/zk/txpool/pool.go index eb6b1387334..362a8e37823 100644 --- a/zk/txpool/pool.go +++ b/zk/txpool/pool.go @@ -1119,6 +1119,10 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) Disca } p.discardLocked(found, ReplacedByHigherTip) + } else if p.queued.IsFull() { + // always accept a tx if it would replace an old tx + // otherwise it is denied when queued pool is full + return QueuedPoolOverflow } p.byHash[string(mt.Tx.IDHash[:])] = mt @@ -1233,8 +1237,8 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint } } - // Promote best transactions from base fee pool to pending pool while they qualify - for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() { + // Promote best transactions from base fee pool to pending pool while they qualify and pending pool is not full + for best := baseFee.Best(); baseFee.Len() > 0 && !pending.IsFull() && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() { tx := baseFee.PopBest() announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:]) pending.Add(tx) @@ -1249,13 +1253,14 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint } } - // Promote best transactions from the queued pool to either pending or base fee pool, while they qualify + // Promote best transactions from the queued pool to either pending or base fee pool, while they qualify. + // But just leave them in the queued pool if both pending pool and base fee pool are full. for best := queued.Best(); queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = queued.Best() { - if best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 { + if !pending.IsFull() && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 { tx := queued.PopBest() announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:]) pending.Add(tx) - } else { + } else if !baseFee.IsFull() && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) < 0 { baseFee.Add(queued.PopBest()) } } @@ -1265,20 +1270,8 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint discard(queued.PopWorst(), FeeTooLow) } - // Discard worst transactions from pending pool until it is within capacity limit - for pending.Len() > pending.limit { - discard(pending.PopWorst(), PendingPoolOverflow) - } - - // Discard worst transactions from pending sub pool until it is within capacity limits - for baseFee.Len() > baseFee.limit { - discard(baseFee.PopWorst(), BaseFeePoolOverflow) - } - - // Discard worst transactions from the queued sub pool until it is within its capacity limits - for _ = queued.Worst(); queued.Len() > queued.limit; _ = queued.Worst() { - discard(queued.PopWorst(), QueuedPoolOverflow) - } + // Never drop any pending transaction even though it exceeds the capacity. + // It is safe because we don't accept any more transactions if the queued sub pool is full. } // MainLoop - does: @@ -2092,6 +2085,8 @@ func (p *PendingPool) Updated(mt *metaTx) { } func (p *PendingPool) Len() int { return len(p.best.ms) } +func (p *PendingPool) IsFull() bool { return p.Len() >= p.limit } + func (p *PendingPool) Remove(i *metaTx) { if i.worstIndex >= 0 { heap.Remove(p.worst, i.worstIndex) @@ -2156,7 +2151,8 @@ func (p *SubPool) PopWorst() *metaTx { //nolint heap.Remove(p.best, i.bestIndex) return i } -func (p *SubPool) Len() int { return p.best.Len() } +func (p *SubPool) IsFull() bool { return p.Len() >= p.limit } +func (p *SubPool) Len() int { return p.best.Len() } func (p *SubPool) Add(i *metaTx) { if i.Tx.Traced { log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IDHash, i.Tx.SenderID)) From 4abadf810c6737dcf21df86953293493d2e32b10 Mon Sep 17 00:00:00 2001 From: "xingqiang.yuan" Date: Thu, 4 Jul 2024 17:07:46 +0800 Subject: [PATCH 2/8] promote and resort immediately after deleting overflow zkcounter transactions --- zk/txpool/pool.go | 63 +++++++++++++++++++++++++++++--------------- zk/txpool/pool_zk.go | 15 +---------- 2 files changed, 43 insertions(+), 35 deletions(-) diff --git a/zk/txpool/pool.go b/zk/txpool/pool.go index 362a8e37823..b7fa131a03e 100644 --- a/zk/txpool/pool.go +++ b/zk/txpool/pool.go @@ -214,18 +214,17 @@ func (r DiscardReason) String() string { // metaTx holds transaction and some metadata type metaTx struct { - Tx *types.TxSlot - minFeeCap uint256.Int - nonceDistance uint64 // how far their nonces are from the state's nonce for the sender - cumulativeBalanceDistance uint64 // how far their cumulativeRequiredBalance are from the state's balance for the sender - minTip uint64 - bestIndex int - worstIndex int - timestamp uint64 // when it was added to pool - subPool SubPoolMarker - currentSubPool SubPoolType - alreadyYielded bool - overflowZkCountersDuringExecution bool + Tx *types.TxSlot + minFeeCap uint256.Int + nonceDistance uint64 // how far their nonces are from the state's nonce for the sender + cumulativeBalanceDistance uint64 // how far their cumulativeRequiredBalance are from the state's balance for the sender + minTip uint64 + bestIndex int + worstIndex int + timestamp uint64 // when it was added to pool + subPool SubPoolMarker + currentSubPool SubPoolType + alreadyYielded bool } func newMetaTx(slot *types.TxSlot, isLocal bool, timestmap uint64) *metaTx { @@ -302,7 +301,8 @@ type TxPool struct { isLocalLRU *simplelru.LRU[string, struct{}] // tx_hash => is_local : to restore isLocal flag of unwinded transactions newPendingTxs chan types.Announcements // notifications about new txs in Pending sub-pool all *BySenderAndNonce // senderID => (sorted map of tx nonce => *metaTx) - deletedTxs []*metaTx // list of discarded txs since last db commit + overflowZkCounters []*metaTx + deletedTxs []*metaTx // list of discarded txs since last db commit promoted types.Announcements cfg txpoolcfg.Config wbCfg WBConfig @@ -457,14 +457,14 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang announcements, err := addTxsOnNewBlock(p.lastSeenBlock.Load(), cacheView, stateChanges, p.senders, unwindTxs, pendingBaseFee, stateChanges.BlockGasLimit, - p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked) + p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked) if err != nil { return err } + p.overflowZkCounters = p.overflowZkCounters[:0] p.pending.EnforceWorstInvariants() p.baseFee.EnforceInvariants() p.queued.EnforceInvariants() - promoteZk(p.pending, p.baseFee, p.queued, pendingBaseFee, p.discardLocked, &announcements) p.pending.EnforceBestInvariants() p.promoted.Reset() p.promoted.AppendOther(announcements) @@ -521,7 +521,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { } announcements, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, - p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true) + p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked, true) if err != nil { return err } @@ -909,7 +909,7 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, } announcements, addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, - p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true) + p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked, true) if err == nil { for i, reason := range addReasons { if reason != NotSet { @@ -956,7 +956,8 @@ func (p *TxPool) cache() kvcache.Cache { func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason), collect bool) (types.Announcements, []DiscardReason, error) { + byNonce *BySenderAndNonce, byHash map[string]*metaTx, overflowZkCounters []*metaTx, + add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason), collect bool) (types.Announcements, []DiscardReason, error) { protocolBaseFee := calcProtocolBaseFee(pendingBaseFee) if assert.Enable { for _, txn := range newTxs.Txs { @@ -998,6 +999,14 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, sendersWithChangedState[mt.Tx.SenderID] = struct{}{} } + // Discard a metaTx from the best pending pool if it has overflow the zk-counters during execution + // We must delete them and re-tag the related transactions before transaction sort + for _, tx := range overflowZkCounters { + pending.Remove(tx) + discard(tx, OverflowZkCounters) + sendersWithChangedState[tx.Tx.SenderID] = struct{}{} + } + for senderID := range sendersWithChangedState { nonce, balance, err := senders.info(cacheView, senderID) if err != nil { @@ -1007,7 +1016,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, protocolBaseFee, blockGasLimit, pending, baseFee, queued, discard) } - promoteZk(pending, baseFee, queued, pendingBaseFee, discard, &announcements) + promote(pending, baseFee, queued, pendingBaseFee, discard, &announcements) pending.EnforceBestInvariants() return announcements, discardReasons, nil @@ -1015,7 +1024,8 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch, senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason)) (types.Announcements, error) { + byNonce *BySenderAndNonce, byHash map[string]*metaTx, overflowZkCounters []*metaTx, + add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason)) (types.Announcements, error) { protocolBaseFee := calcProtocolBaseFee(pendingBaseFee) if assert.Enable { for _, txn := range newTxs.Txs { @@ -1046,6 +1056,7 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges } sendersWithChangedState[mt.Tx.SenderID] = struct{}{} } + // add senders changed in state to `sendersWithChangedState` list for _, changesList := range stateChanges.ChangeBatch { for _, change := range changesList.Changes { @@ -1064,6 +1075,14 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges } } + // We must delete them first and then re-tag the related transactions + // The new tags will be used to sort transactions + for _, tx := range overflowZkCounters { + pending.Remove(tx) + discard(tx, OverflowZkCounters) + sendersWithChangedState[tx.Tx.SenderID] = struct{}{} + } + for senderID := range sendersWithChangedState { nonce, balance, err := senders.info(cacheView, senderID) if err != nil { @@ -1073,6 +1092,8 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges protocolBaseFee, blockGasLimit, pending, baseFee, queued, discard) } + promote(pending, baseFee, queued, pendingBaseFee, discard, &announcements) + return announcements, nil } @@ -1596,7 +1617,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { return err } if _, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs, - pendingBaseFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false); err != nil { + pendingBaseFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked, false); err != nil { return err } p.pendingBaseFee.Store(pendingBaseFee) diff --git a/zk/txpool/pool_zk.go b/zk/txpool/pool_zk.go index c3366065804..ca363d7ae99 100644 --- a/zk/txpool/pool_zk.go +++ b/zk/txpool/pool_zk.go @@ -244,21 +244,8 @@ func (p *TxPool) MarkForDiscardFromPendingBest(txHash libcommon.Hash) { for i := 0; i < len(best.ms); i++ { mt := best.ms[i] if bytes.Equal(mt.Tx.IDHash[:], txHash[:]) { - mt.overflowZkCountersDuringExecution = true + p.overflowZkCounters = append(p.overflowZkCounters, mt) break } } } - -// Discard a metaTx from the best pending pool if it has overflow the zk-counters during execution -func promoteZk(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, discard func(*metaTx, DiscardReason), announcements *types.Announcements) { - for i := 0; i < len(pending.best.ms); i++ { - mt := pending.best.ms[i] - if mt.overflowZkCountersDuringExecution { - pending.Remove(mt) - discard(mt, OverflowZkCounters) - } - } - - promote(pending, baseFee, queued, pendingBaseFee, discard, announcements) -} From 34aad817b32663b2ee322184dc6fd589bf395d8f Mon Sep 17 00:00:00 2001 From: "xingqiang.yuan" Date: Thu, 4 Jul 2024 17:58:24 +0800 Subject: [PATCH 3/8] optimize EnforceBestInvariants --- zk/stages/stage_sequence_execute_utils.go | 2 +- zk/txpool/pool.go | 16 ++++++++++++++-- zk/txpool/pool_zk.go | 2 ++ 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/zk/stages/stage_sequence_execute_utils.go b/zk/stages/stage_sequence_execute_utils.go index ecdb2b68bbe..6dbe616352b 100644 --- a/zk/stages/stage_sequence_execute_utils.go +++ b/zk/stages/stage_sequence_execute_utils.go @@ -44,7 +44,7 @@ const ( transactionGasLimit = 30000000 - yieldSize = 100 // arbitrary number defining how many transactions to yield from the pool at once + yieldSize = 1000 // arbitrary number defining how many transactions to yield from the pool at once ) var ( diff --git a/zk/txpool/pool.go b/zk/txpool/pool.go index b7fa131a03e..033ad3ac8a8 100644 --- a/zk/txpool/pool.go +++ b/zk/txpool/pool.go @@ -419,6 +419,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee) // Update pendingBase for all pool queues and slices if baseFeeChanged { + p.pending.sorted = false // `pending.best` need to be resort if base fee changed p.pending.best.pendingBaseFee = pendingBaseFee p.pending.worst.pendingBaseFee = pendingBaseFee p.baseFee.best.pendingBastFee = pendingBaseFee @@ -1017,7 +1018,6 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, } promote(pending, baseFee, queued, pendingBaseFee, discard, &announcements) - pending.EnforceBestInvariants() return announcements, discardReasons, nil } @@ -2043,6 +2043,8 @@ type PendingPool struct { worst *WorstQueue limit int t SubPoolType + + sorted bool // means `PendingPool.best` is sorted or not } func NewPendingSubPool(t SubPoolType, limit int) *PendingPool { @@ -2079,7 +2081,10 @@ func (p *PendingPool) EnforceWorstInvariants() { heap.Init(p.worst) } func (p *PendingPool) EnforceBestInvariants() { - sort.Sort(p.best) + if !p.sorted { + sort.Sort(p.best) + p.sorted = true + } } func (p *PendingPool) Best() *metaTx { //nolint @@ -2096,6 +2101,9 @@ func (p *PendingPool) Worst() *metaTx { //nolint } func (p *PendingPool) PopWorst() *metaTx { //nolint i := heap.Pop(p.worst).(*metaTx) + if i.bestIndex != p.Len()-1 { // which should never happen + p.sorted = false + } if i.bestIndex >= 0 { p.best.UnsafeRemove(i) } @@ -2112,6 +2120,9 @@ func (p *PendingPool) Remove(i *metaTx) { if i.worstIndex >= 0 { heap.Remove(p.worst, i.worstIndex) } + if i.bestIndex != p.Len()-1 { + p.sorted = false + } if i.bestIndex >= 0 { p.best.UnsafeRemove(i) } @@ -2124,6 +2135,7 @@ func (p *PendingPool) Add(i *metaTx) { } i.currentSubPool = p.t heap.Push(p.worst, i) + p.sorted = false p.best.UnsafeAdd(i) } func (p *PendingPool) DebugPrint(prefix string) { diff --git a/zk/txpool/pool_zk.go b/zk/txpool/pool_zk.go index ca363d7ae99..bec06c1b52a 100644 --- a/zk/txpool/pool_zk.go +++ b/zk/txpool/pool_zk.go @@ -158,6 +158,8 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG isShanghai := p.isShanghai() isLondon := p.isLondon() _ = isLondon + + p.pending.EnforceBestInvariants() // it costs about 50ms when pending size reached one million best := p.pending.best txs.Resize(uint(cmp.Min(int(n), len(best.ms)))) From 70c17a2f9bba6af366a3ddc582bd416f1a32c866 Mon Sep 17 00:00:00 2001 From: "xingqiang.yuan" Date: Thu, 4 Jul 2024 19:25:09 +0800 Subject: [PATCH 4/8] fix bug --- zk/txpool/pool.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zk/txpool/pool.go b/zk/txpool/pool.go index 033ad3ac8a8..18b4032de7d 100644 --- a/zk/txpool/pool.go +++ b/zk/txpool/pool.go @@ -1283,6 +1283,8 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint pending.Add(tx) } else if !baseFee.IsFull() && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) < 0 { baseFee.Add(queued.PopBest()) + } else { + break } } From 21ab2e253fea0947b35071f09c038dce2129c70b Mon Sep 17 00:00:00 2001 From: "xingqiang.yuan" Date: Tue, 9 Jul 2024 09:36:07 +0800 Subject: [PATCH 5/8] fix --- zk/txpool/pool.go | 56 ++++++++++++++++++++++++---------------- zk/txpool/pool_xlayer.go | 4 +++ zk/txpool/pool_zk.go | 2 +- 3 files changed, 39 insertions(+), 23 deletions(-) diff --git a/zk/txpool/pool.go b/zk/txpool/pool.go index 18b4032de7d..120fa4ae4e4 100644 --- a/zk/txpool/pool.go +++ b/zk/txpool/pool.go @@ -301,7 +301,7 @@ type TxPool struct { isLocalLRU *simplelru.LRU[string, struct{}] // tx_hash => is_local : to restore isLocal flag of unwinded transactions newPendingTxs chan types.Announcements // notifications about new txs in Pending sub-pool all *BySenderAndNonce // senderID => (sorted map of tx nonce => *metaTx) - overflowZkCounters []*metaTx + overflowZkCounters map[*metaTx]struct{} deletedTxs []*metaTx // list of discarded txs since last db commit promoted types.Announcements cfg txpoolcfg.Config @@ -349,6 +349,7 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, isLocalLRU: localsHistory, discardReasonsLRU: discardHistory, all: byNonce, + overflowZkCounters: make(map[*metaTx]struct{}), recentlyConnectedPeers: &recentlyConnectedPeers{}, pending: NewPendingSubPool(PendingSubPool, cfg.PendingSubPoolLimit), baseFee: NewSubPool(BaseFeeSubPool, cfg.BaseFeeSubPoolLimit), @@ -462,7 +463,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang if err != nil { return err } - p.overflowZkCounters = p.overflowZkCounters[:0] + p.pending.EnforceWorstInvariants() p.baseFee.EnforceInvariants() p.queued.EnforceInvariants() @@ -957,7 +958,7 @@ func (p *TxPool) cache() kvcache.Cache { func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, overflowZkCounters []*metaTx, + byNonce *BySenderAndNonce, byHash map[string]*metaTx, overflowZkCounters map[*metaTx]struct{}, add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason), collect bool) (types.Announcements, []DiscardReason, error) { protocolBaseFee := calcProtocolBaseFee(pendingBaseFee) if assert.Enable { @@ -1002,10 +1003,11 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, // Discard a metaTx from the best pending pool if it has overflow the zk-counters during execution // We must delete them and re-tag the related transactions before transaction sort - for _, tx := range overflowZkCounters { + for tx := range overflowZkCounters { pending.Remove(tx) discard(tx, OverflowZkCounters) sendersWithChangedState[tx.Tx.SenderID] = struct{}{} + delete(overflowZkCounters, tx) // clear overflowZkCounters } for senderID := range sendersWithChangedState { @@ -1024,7 +1026,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch, senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, overflowZkCounters []*metaTx, + byNonce *BySenderAndNonce, byHash map[string]*metaTx, overflowZkCounters map[*metaTx]struct{}, add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason)) (types.Announcements, error) { protocolBaseFee := calcProtocolBaseFee(pendingBaseFee) if assert.Enable { @@ -1077,10 +1079,11 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges // We must delete them first and then re-tag the related transactions // The new tags will be used to sort transactions - for _, tx := range overflowZkCounters { + for tx := range overflowZkCounters { pending.Remove(tx) discard(tx, OverflowZkCounters) sendersWithChangedState[tx.Tx.SenderID] = struct{}{} + delete(overflowZkCounters, tx) // clear overflowZkCounters } for senderID := range sendersWithChangedState { @@ -1140,10 +1143,10 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) Disca } p.discardLocked(found, ReplacedByHigherTip) - } else if p.queued.IsFull() { + } else if p.pending.IsFull() { // always accept a tx if it would replace an old tx - // otherwise it is denied when queued pool is full - return QueuedPoolOverflow + // otherwise it is denied when pending pool is full + return PendingPoolOverflow } p.byHash[string(mt.Tx.IDHash[:])] = mt @@ -1258,8 +1261,8 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint } } - // Promote best transactions from base fee pool to pending pool while they qualify and pending pool is not full - for best := baseFee.Best(); baseFee.Len() > 0 && !pending.IsFull() && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() { + // Promote best transactions from base fee pool to pending pool while they qualify + for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() { tx := baseFee.PopBest() announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:]) pending.Add(tx) @@ -1277,14 +1280,12 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint // Promote best transactions from the queued pool to either pending or base fee pool, while they qualify. // But just leave them in the queued pool if both pending pool and base fee pool are full. for best := queued.Best(); queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = queued.Best() { - if !pending.IsFull() && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 { + if best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 { tx := queued.PopBest() announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:]) pending.Add(tx) - } else if !baseFee.IsFull() && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) < 0 { - baseFee.Add(queued.PopBest()) } else { - break + baseFee.Add(queued.PopBest()) } } @@ -1293,8 +1294,18 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint discard(queued.PopWorst(), FeeTooLow) } - // Never drop any pending transaction even though it exceeds the capacity. - // It is safe because we don't accept any more transactions if the queued sub pool is full. + // Never drop any pending transaction in pending pool + // It is safe because we don't accept any more transactions if tx pool is full. + + // Discard worst transactions from the baseFee sub pool until it is within capacity limits + for baseFee.Len() > baseFee.limit { + discard(baseFee.PopWorst(), BaseFeePoolOverflow) + } + + // Discard worst transactions from the queued sub pool until it is within its capacity limits + for queued.Len() > queued.limit { + discard(queued.PopWorst(), QueuedPoolOverflow) + } } // MainLoop - does: @@ -1587,7 +1598,6 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { log.Warn("[txpool] fromDB: parseTransaction", "err", err) continue } - txn.Rlp = nil // means that we don't need store it in db anymore txn.SenderID, txn.Traced = p.senders.getOrCreateID(addr) binary.BigEndian.Uint64(v) @@ -1597,6 +1607,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if reason := p.validateTx(txn, isLocalTx, cacheView, addr); reason != NotSet && reason != Success { return nil } + txn.Rlp = nil // means that we don't need store it in db anymore txs.Resize(uint(i + 1)) txs.Txs[i] = txn txs.IsLocal[i] = isLocalTx @@ -2086,6 +2097,9 @@ func (p *PendingPool) EnforceBestInvariants() { if !p.sorted { sort.Sort(p.best) p.sorted = true + } else if !sort.IsSorted(p.best) { + sort.Sort(p.best) + log.Error("EnforceBestInvariants, unsorted best") } } @@ -2114,8 +2128,7 @@ func (p *PendingPool) PopWorst() *metaTx { //nolint func (p *PendingPool) Updated(mt *metaTx) { heap.Fix(p.worst, mt.worstIndex) } -func (p *PendingPool) Len() int { return len(p.best.ms) } - +func (p *PendingPool) Len() int { return len(p.best.ms) } func (p *PendingPool) IsFull() bool { return p.Len() >= p.limit } func (p *PendingPool) Remove(i *metaTx) { @@ -2186,8 +2199,7 @@ func (p *SubPool) PopWorst() *metaTx { //nolint heap.Remove(p.best, i.bestIndex) return i } -func (p *SubPool) IsFull() bool { return p.Len() >= p.limit } -func (p *SubPool) Len() int { return p.best.Len() } +func (p *SubPool) Len() int { return p.best.Len() } func (p *SubPool) Add(i *metaTx) { if i.Tx.Traced { log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IDHash, i.Tx.SenderID)) diff --git a/zk/txpool/pool_xlayer.go b/zk/txpool/pool_xlayer.go index 21cf07ea992..26cf0b06a56 100644 --- a/zk/txpool/pool_xlayer.go +++ b/zk/txpool/pool_xlayer.go @@ -31,3 +31,7 @@ func (p *TxPool) checkWhiteAddr(addr common.Address) bool { } return false } + +func (p *TxPool) isFull() bool { + return p.pending.Len()+p.baseFee.Len()+p.queued.Len() >= p.pending.limit+p.baseFee.limit+p.queued.limit +} diff --git a/zk/txpool/pool_zk.go b/zk/txpool/pool_zk.go index bec06c1b52a..50c327bf38d 100644 --- a/zk/txpool/pool_zk.go +++ b/zk/txpool/pool_zk.go @@ -246,7 +246,7 @@ func (p *TxPool) MarkForDiscardFromPendingBest(txHash libcommon.Hash) { for i := 0; i < len(best.ms); i++ { mt := best.ms[i] if bytes.Equal(mt.Tx.IDHash[:], txHash[:]) { - p.overflowZkCounters = append(p.overflowZkCounters, mt) + p.overflowZkCounters[mt] = struct{}{} break } } From 74ca6f715048735a3286b47163058f71fe6f4553 Mon Sep 17 00:00:00 2001 From: "xingqiang.yuan" Date: Tue, 9 Jul 2024 09:58:31 +0800 Subject: [PATCH 6/8] update --- zk/txpool/pool.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/zk/txpool/pool.go b/zk/txpool/pool.go index a76c44a0d4c..73507c87f6e 100644 --- a/zk/txpool/pool.go +++ b/zk/txpool/pool.go @@ -1310,8 +1310,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint } } - // Promote best transactions from the queued pool to either pending or base fee pool, while they qualify. - // But just leave them in the queued pool if both pending pool and base fee pool are full. + // Promote best transactions from the queued pool to either pending or base fee pool, while they qualify for best := queued.Best(); queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = queued.Best() { if best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 { tx := queued.PopBest() @@ -1327,10 +1326,12 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint discard(queued.PopWorst(), FeeTooLow) } - // Never drop any pending transaction in pending pool - // It is safe because we don't accept any more transactions if tx pool is full. + // Discard worst transactions from pending pool until it is within capacity limit + for pending.Len() > pending.limit { + discard(pending.PopWorst(), PendingPoolOverflow) + } - // Discard worst transactions from the baseFee sub pool until it is within capacity limits + // Discard worst transactions from pending sub pool until it is within capacity limits for baseFee.Len() > baseFee.limit { discard(baseFee.PopWorst(), BaseFeePoolOverflow) } From ccea7416f3e516979851c516dbeb7f59e24a57fa Mon Sep 17 00:00:00 2001 From: "xingqiang.yuan" Date: Tue, 9 Jul 2024 11:07:09 +0800 Subject: [PATCH 7/8] update --- zk/txpool/pool.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/zk/txpool/pool.go b/zk/txpool/pool.go index 73507c87f6e..51254fe47b6 100644 --- a/zk/txpool/pool.go +++ b/zk/txpool/pool.go @@ -2138,9 +2138,6 @@ func (p *PendingPool) EnforceBestInvariants() { if !p.sorted { sort.Sort(p.best) p.sorted = true - } else if !sort.IsSorted(p.best) { - sort.Sort(p.best) - log.Error("EnforceBestInvariants, unsorted best") } } @@ -2158,9 +2155,6 @@ func (p *PendingPool) Worst() *metaTx { //nolint } func (p *PendingPool) PopWorst() *metaTx { //nolint i := heap.Pop(p.worst).(*metaTx) - if i.bestIndex != p.Len()-1 { // which should never happen - p.sorted = false - } if i.bestIndex >= 0 { p.best.UnsafeRemove(i) } From fca251e126618cc334514991649f7cb13a90c92d Mon Sep 17 00:00:00 2001 From: "xingqiang.yuan" Date: Tue, 9 Jul 2024 15:53:00 +0800 Subject: [PATCH 8/8] update --- zk/txpool/pool.go | 44 +++++++++++++++++++++----------------------- zk/txpool/pool_zk.go | 4 ++-- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/zk/txpool/pool.go b/zk/txpool/pool.go index 51254fe47b6..0397976bc6d 100644 --- a/zk/txpool/pool.go +++ b/zk/txpool/pool.go @@ -305,8 +305,8 @@ type TxPool struct { isLocalLRU *simplelru.LRU[string, struct{}] // tx_hash => is_local : to restore isLocal flag of unwinded transactions newPendingTxs chan types.Announcements // notifications about new txs in Pending sub-pool all *BySenderAndNonce // senderID => (sorted map of tx nonce => *metaTx) - overflowZkCounters map[*metaTx]struct{} - deletedTxs []*metaTx // list of discarded txs since last db commit + deletedTxs []*metaTx // list of discarded txs since last db commit + overflowZkCounters []*metaTx promoted types.Announcements cfg txpoolcfg.Config chainID uint256.Int @@ -359,7 +359,6 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, isLocalLRU: localsHistory, discardReasonsLRU: discardHistory, all: byNonce, - overflowZkCounters: make(map[*metaTx]struct{}), recentlyConnectedPeers: &recentlyConnectedPeers{}, pending: NewPendingSubPool(PendingSubPool, cfg.PendingSubPoolLimit), baseFee: NewSubPool(BaseFeeSubPool, cfg.BaseFeeSubPoolLimit), @@ -468,9 +467,9 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight) - announcements, err := addTxsOnNewBlock(p.lastSeenBlock.Load(), cacheView, stateChanges, p.senders, unwindTxs, + announcements, err := p.addTxsOnNewBlock(p.lastSeenBlock.Load(), cacheView, stateChanges, p.senders, unwindTxs, pendingBaseFee, stateChanges.BlockGasLimit, - p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked) + p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked) if err != nil { return err } @@ -533,8 +532,8 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return err } - announcements, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, - p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked, true) + announcements, _, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, + p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true) if err != nil { return err } @@ -943,8 +942,8 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, return nil, err } - announcements, addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, - p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked, true) + announcements, addReasons, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, + p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true) if err == nil { for i, reason := range addReasons { if reason != NotSet { @@ -988,11 +987,10 @@ func (p *TxPool) cache() kvcache.Cache { return p._stateCache } -func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, +func (p *TxPool) addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, overflowZkCounters map[*metaTx]struct{}, - add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason), collect bool) (types.Announcements, []DiscardReason, error) { + byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason), collect bool) (types.Announcements, []DiscardReason, error) { protocolBaseFee := calcProtocolBaseFee(pendingBaseFee) if assert.Enable { for _, txn := range newTxs.Txs { @@ -1036,19 +1034,20 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, // Discard a metaTx from the best pending pool if it has overflow the zk-counters during execution // We must delete them and re-tag the related transactions before transaction sort - for tx := range overflowZkCounters { + for _, tx := range p.overflowZkCounters { pending.Remove(tx) discard(tx, OverflowZkCounters) sendersWithChangedState[tx.Tx.SenderID] = struct{}{} - delete(overflowZkCounters, tx) // clear overflowZkCounters } + p.overflowZkCounters = p.overflowZkCounters[:0] // clear overflowZkCounters + for senderID := range sendersWithChangedState { nonce, balance, err := senders.info(cacheView, senderID) if err != nil { return announcements, discardReasons, err } - onSenderStateChange(senderID, nonce, balance, byNonce, + p.onSenderStateChange(senderID, nonce, balance, byNonce, protocolBaseFee, blockGasLimit, pending, baseFee, queued, discard) } @@ -1056,11 +1055,10 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, return announcements, discardReasons, nil } -func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch, +func (p *TxPool) addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch, senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, overflowZkCounters map[*metaTx]struct{}, - add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason)) (types.Announcements, error) { + byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason)) (types.Announcements, error) { protocolBaseFee := calcProtocolBaseFee(pendingBaseFee) if assert.Enable { for _, txn := range newTxs.Txs { @@ -1112,19 +1110,19 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges // We must delete them first and then re-tag the related transactions // The new tags will be used to sort transactions - for tx := range overflowZkCounters { + for _, tx := range p.overflowZkCounters { pending.Remove(tx) discard(tx, OverflowZkCounters) sendersWithChangedState[tx.Tx.SenderID] = struct{}{} - delete(overflowZkCounters, tx) // clear overflowZkCounters } + p.overflowZkCounters = p.overflowZkCounters[:0] // clear overflowZkCounters for senderID := range sendersWithChangedState { nonce, balance, err := senders.info(cacheView, senderID) if err != nil { return announcements, err } - onSenderStateChange(senderID, nonce, balance, byNonce, + p.onSenderStateChange(senderID, nonce, balance, byNonce, protocolBaseFee, blockGasLimit, pending, baseFee, queued, discard) } @@ -1670,8 +1668,8 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if err != nil { return err } - if _, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs, - pendingBaseFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked, false); err != nil { + if _, _, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs, + pendingBaseFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false); err != nil { return err } p.pendingBaseFee.Store(pendingBaseFee) diff --git a/zk/txpool/pool_zk.go b/zk/txpool/pool_zk.go index 50c327bf38d..073398f1a9f 100644 --- a/zk/txpool/pool_zk.go +++ b/zk/txpool/pool_zk.go @@ -32,7 +32,7 @@ func calcProtocolBaseFee(baseFee uint64) uint64 { // which sub pool they will need to go to. Sice this depends on other transactions from the same sender by with lower // nonces, and also affect other transactions from the same sender with higher nonce, it loops through all transactions // for a given senderID -func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, byNonce *BySenderAndNonce, +func (p *TxPool) onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, byNonce *BySenderAndNonce, protocolBaseFee, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, DiscardReason)) { noGapsNonce := senderNonce cumulativeRequiredBalance := uint256.NewInt(0) @@ -246,7 +246,7 @@ func (p *TxPool) MarkForDiscardFromPendingBest(txHash libcommon.Hash) { for i := 0; i < len(best.ms); i++ { mt := best.ms[i] if bytes.Equal(mt.Tx.IDHash[:], txHash[:]) { - p.overflowZkCounters[mt] = struct{}{} + p.overflowZkCounters = append(p.overflowZkCounters, mt) break } }