Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize txpool #51

Merged
merged 9 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion zk/stages/stage_sequence_execute_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (

transactionGasLimit = 30000000

yieldSize = 100 // arbitrary number defining how many transactions to yield from the pool at once
yieldSize = 1000 // arbitrary number defining how many transactions to yield from the pool at once
yann-sjtu marked this conversation as resolved.
Show resolved Hide resolved
)

var (
Expand Down
117 changes: 74 additions & 43 deletions zk/txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,17 @@ func (r DiscardReason) String() string {

// metaTx holds transaction and some metadata
type metaTx struct {
Tx *types.TxSlot
minFeeCap uint256.Int
nonceDistance uint64 // how far their nonces are from the state's nonce for the sender
cumulativeBalanceDistance uint64 // how far their cumulativeRequiredBalance are from the state's balance for the sender
minTip uint64
bestIndex int
worstIndex int
timestamp uint64 // when it was added to pool
subPool SubPoolMarker
currentSubPool SubPoolType
alreadyYielded bool
overflowZkCountersDuringExecution bool
Tx *types.TxSlot
minFeeCap uint256.Int
nonceDistance uint64 // how far their nonces are from the state's nonce for the sender
cumulativeBalanceDistance uint64 // how far their cumulativeRequiredBalance are from the state's balance for the sender
minTip uint64
bestIndex int
worstIndex int
timestamp uint64 // when it was added to pool
subPool SubPoolMarker
currentSubPool SubPoolType
alreadyYielded bool
}

func newMetaTx(slot *types.TxSlot, isLocal bool, timestmap uint64) *metaTx {
Expand Down Expand Up @@ -302,7 +301,8 @@ type TxPool struct {
isLocalLRU *simplelru.LRU[string, struct{}] // tx_hash => is_local : to restore isLocal flag of unwinded transactions
newPendingTxs chan types.Announcements // notifications about new txs in Pending sub-pool
all *BySenderAndNonce // senderID => (sorted map of tx nonce => *metaTx)
deletedTxs []*metaTx // list of discarded txs since last db commit
overflowZkCounters []*metaTx
deletedTxs []*metaTx // list of discarded txs since last db commit
promoted types.Announcements
cfg txpoolcfg.Config
wbCfg WBConfig
Expand Down Expand Up @@ -419,6 +419,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee)
// Update pendingBase for all pool queues and slices
if baseFeeChanged {
p.pending.sorted = false // `pending.best` need to be resort if base fee changed
p.pending.best.pendingBaseFee = pendingBaseFee
p.pending.worst.pendingBaseFee = pendingBaseFee
p.baseFee.best.pendingBastFee = pendingBaseFee
Expand Down Expand Up @@ -457,14 +458,14 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang

announcements, err := addTxsOnNewBlock(p.lastSeenBlock.Load(), cacheView, stateChanges, p.senders, unwindTxs,
pendingBaseFee, stateChanges.BlockGasLimit,
p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked)
p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked)
if err != nil {
return err
}
p.overflowZkCounters = p.overflowZkCounters[:0]
p.pending.EnforceWorstInvariants()
p.baseFee.EnforceInvariants()
p.queued.EnforceInvariants()
promoteZk(p.pending, p.baseFee, p.queued, pendingBaseFee, p.discardLocked, &announcements)
p.pending.EnforceBestInvariants()
p.promoted.Reset()
p.promoted.AppendOther(announcements)
Expand Down Expand Up @@ -521,7 +522,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error {
}

announcements, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true)
p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -909,7 +910,7 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots,
}

announcements, addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true)
p.pendingBaseFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked, true)
if err == nil {
for i, reason := range addReasons {
if reason != NotSet {
Expand Down Expand Up @@ -956,7 +957,8 @@ func (p *TxPool) cache() kvcache.Cache {
func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64,
pending *PendingPool, baseFee, queued *SubPool,
byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason), collect bool) (types.Announcements, []DiscardReason, error) {
byNonce *BySenderAndNonce, byHash map[string]*metaTx, overflowZkCounters []*metaTx,
add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason), collect bool) (types.Announcements, []DiscardReason, error) {
protocolBaseFee := calcProtocolBaseFee(pendingBaseFee)
if assert.Enable {
for _, txn := range newTxs.Txs {
Expand Down Expand Up @@ -998,6 +1000,14 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
sendersWithChangedState[mt.Tx.SenderID] = struct{}{}
}

// Discard a metaTx from the best pending pool if it has overflow the zk-counters during execution
// We must delete them and re-tag the related transactions before transaction sort
for _, tx := range overflowZkCounters {
pending.Remove(tx)
discard(tx, OverflowZkCounters)
sendersWithChangedState[tx.Tx.SenderID] = struct{}{}
}

for senderID := range sendersWithChangedState {
nonce, balance, err := senders.info(cacheView, senderID)
if err != nil {
Expand All @@ -1007,15 +1017,15 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
protocolBaseFee, blockGasLimit, pending, baseFee, queued, discard)
}

promoteZk(pending, baseFee, queued, pendingBaseFee, discard, &announcements)
pending.EnforceBestInvariants()
promote(pending, baseFee, queued, pendingBaseFee, discard, &announcements)

return announcements, discardReasons, nil
}
func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch,
senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64,
pending *PendingPool, baseFee, queued *SubPool,
byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason)) (types.Announcements, error) {
byNonce *BySenderAndNonce, byHash map[string]*metaTx, overflowZkCounters []*metaTx,
add func(*metaTx, *types.Announcements) DiscardReason, discard func(*metaTx, DiscardReason)) (types.Announcements, error) {
protocolBaseFee := calcProtocolBaseFee(pendingBaseFee)
if assert.Enable {
for _, txn := range newTxs.Txs {
Expand Down Expand Up @@ -1046,6 +1056,7 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges
}
sendersWithChangedState[mt.Tx.SenderID] = struct{}{}
}

// add senders changed in state to `sendersWithChangedState` list
for _, changesList := range stateChanges.ChangeBatch {
for _, change := range changesList.Changes {
Expand All @@ -1064,6 +1075,14 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges
}
}

// We must delete them first and then re-tag the related transactions
// The new tags will be used to sort transactions
for _, tx := range overflowZkCounters {
pending.Remove(tx)
discard(tx, OverflowZkCounters)
sendersWithChangedState[tx.Tx.SenderID] = struct{}{}
}

for senderID := range sendersWithChangedState {
nonce, balance, err := senders.info(cacheView, senderID)
if err != nil {
Expand All @@ -1073,6 +1092,8 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges
protocolBaseFee, blockGasLimit, pending, baseFee, queued, discard)
}

promote(pending, baseFee, queued, pendingBaseFee, discard, &announcements)

return announcements, nil
}

Expand Down Expand Up @@ -1119,6 +1140,10 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) Disca
}

p.discardLocked(found, ReplacedByHigherTip)
} else if p.queued.IsFull() {
// always accept a tx if it would replace an old tx
// otherwise it is denied when queued pool is full
return QueuedPoolOverflow
}

p.byHash[string(mt.Tx.IDHash[:])] = mt
Expand Down Expand Up @@ -1233,8 +1258,8 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint
}
}

// Promote best transactions from base fee pool to pending pool while they qualify
for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() {
// Promote best transactions from base fee pool to pending pool while they qualify and pending pool is not full
for best := baseFee.Best(); baseFee.Len() > 0 && !pending.IsFull() && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() {
tx := baseFee.PopBest()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
pending.Add(tx)
yann-sjtu marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -1249,14 +1274,17 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint
}
}

// Promote best transactions from the queued pool to either pending or base fee pool, while they qualify
// Promote best transactions from the queued pool to either pending or base fee pool, while they qualify.
// But just leave them in the queued pool if both pending pool and base fee pool are full.
for best := queued.Best(); queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = queued.Best() {
if best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 {
if !pending.IsFull() && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 {
tx := queued.PopBest()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
pending.Add(tx)
} else {
} else if !baseFee.IsFull() && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) < 0 {
baseFee.Add(queued.PopBest())
} else {
break
}
}

Expand All @@ -1265,20 +1293,8 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint
discard(queued.PopWorst(), FeeTooLow)
}

// Discard worst transactions from pending pool until it is within capacity limit
for pending.Len() > pending.limit {
discard(pending.PopWorst(), PendingPoolOverflow)
}

// Discard worst transactions from pending sub pool until it is within capacity limits
for baseFee.Len() > baseFee.limit {
discard(baseFee.PopWorst(), BaseFeePoolOverflow)
}

// Discard worst transactions from the queued sub pool until it is within its capacity limits
for _ = queued.Worst(); queued.Len() > queued.limit; _ = queued.Worst() {
discard(queued.PopWorst(), QueuedPoolOverflow)
}
// Never drop any pending transaction even though it exceeds the capacity.
// It is safe because we don't accept any more transactions if the queued sub pool is full.
}

// MainLoop - does:
Expand Down Expand Up @@ -1603,7 +1619,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
return err
}
if _, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs,
pendingBaseFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false); err != nil {
pendingBaseFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.overflowZkCounters, p.addLocked, p.discardLocked, false); err != nil {
return err
}
p.pendingBaseFee.Store(pendingBaseFee)
Expand Down Expand Up @@ -2029,6 +2045,8 @@ type PendingPool struct {
worst *WorstQueue
limit int
t SubPoolType

sorted bool // means `PendingPool.best` is sorted or not
}

func NewPendingSubPool(t SubPoolType, limit int) *PendingPool {
Expand Down Expand Up @@ -2065,7 +2083,10 @@ func (p *PendingPool) EnforceWorstInvariants() {
heap.Init(p.worst)
}
func (p *PendingPool) EnforceBestInvariants() {
sort.Sort(p.best)
if !p.sorted {
sort.Sort(p.best)
p.sorted = true
}
}

func (p *PendingPool) Best() *metaTx { //nolint
Expand All @@ -2082,6 +2103,9 @@ func (p *PendingPool) Worst() *metaTx { //nolint
}
func (p *PendingPool) PopWorst() *metaTx { //nolint
i := heap.Pop(p.worst).(*metaTx)
if i.bestIndex != p.Len()-1 { // which should never happen
yann-sjtu marked this conversation as resolved.
Show resolved Hide resolved
p.sorted = false
}
if i.bestIndex >= 0 {
p.best.UnsafeRemove(i)
}
Expand All @@ -2092,10 +2116,15 @@ func (p *PendingPool) Updated(mt *metaTx) {
}
func (p *PendingPool) Len() int { return len(p.best.ms) }

func (p *PendingPool) IsFull() bool { return p.Len() >= p.limit }

func (p *PendingPool) Remove(i *metaTx) {
if i.worstIndex >= 0 {
heap.Remove(p.worst, i.worstIndex)
}
if i.bestIndex != p.Len()-1 {
p.sorted = false
}
if i.bestIndex >= 0 {
p.best.UnsafeRemove(i)
}
Expand All @@ -2108,6 +2137,7 @@ func (p *PendingPool) Add(i *metaTx) {
}
i.currentSubPool = p.t
heap.Push(p.worst, i)
p.sorted = false
p.best.UnsafeAdd(i)
}
func (p *PendingPool) DebugPrint(prefix string) {
Expand Down Expand Up @@ -2156,7 +2186,8 @@ func (p *SubPool) PopWorst() *metaTx { //nolint
heap.Remove(p.best, i.bestIndex)
return i
}
func (p *SubPool) Len() int { return p.best.Len() }
func (p *SubPool) IsFull() bool { return p.Len() >= p.limit }
func (p *SubPool) Len() int { return p.best.Len() }
func (p *SubPool) Add(i *metaTx) {
if i.Tx.Traced {
log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IDHash, i.Tx.SenderID))
Expand Down
17 changes: 3 additions & 14 deletions zk/txpool/pool_zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG
isShanghai := p.isShanghai()
isLondon := p.isLondon()
_ = isLondon

p.pending.EnforceBestInvariants() // it costs about 50ms when pending size reached one million
yann-sjtu marked this conversation as resolved.
Show resolved Hide resolved
best := p.pending.best

txs.Resize(uint(cmp.Min(int(n), len(best.ms))))
Expand Down Expand Up @@ -244,21 +246,8 @@ func (p *TxPool) MarkForDiscardFromPendingBest(txHash libcommon.Hash) {
for i := 0; i < len(best.ms); i++ {
mt := best.ms[i]
if bytes.Equal(mt.Tx.IDHash[:], txHash[:]) {
mt.overflowZkCountersDuringExecution = true
p.overflowZkCounters = append(p.overflowZkCounters, mt)
break
}
}
}

// Discard a metaTx from the best pending pool if it has overflow the zk-counters during execution
func promoteZk(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, discard func(*metaTx, DiscardReason), announcements *types.Announcements) {
for i := 0; i < len(pending.best.ms); i++ {
mt := pending.best.ms[i]
if mt.overflowZkCountersDuringExecution {
pending.Remove(mt)
discard(mt, OverflowZkCounters)
}
}

promote(pending, baseFee, queued, pendingBaseFee, discard, announcements)
}
Loading