Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/catalyst: ensure period zero mode leaves no pending txs in pool #30264

Merged
merged 10 commits into from
Aug 19, 2024
21 changes: 4 additions & 17 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, engine.InvalidParams.With(errors.New("forkChoiceUpdateV1 called post-shanghai"))
}
}
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, false)
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1)
}

// ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload
Expand All @@ -207,7 +207,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, engine.UnsupportedFork.With(errors.New("forkchoiceUpdatedV2 must only be called with paris and shanghai payloads"))
}
}
return api.forkchoiceUpdated(update, params, engine.PayloadV2, false)
return api.forkchoiceUpdated(update, params, engine.PayloadV2)
}

// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root
Expand All @@ -228,10 +228,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa
// hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction.
return api.forkchoiceUpdated(update, params, engine.PayloadV3, false)
return api.forkchoiceUpdated(update, params, engine.PayloadV3)
}

func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, simulatorMode bool) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion) (engine.ForkChoiceResponse, error) {
api.forkchoiceLock.Lock()
defer api.forkchoiceLock.Unlock()

Expand Down Expand Up @@ -374,19 +374,6 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
if api.localBlocks.has(id) {
return valid(&id), nil
}
// If the beacon chain is ran by a simulator, then transaction insertion,
// block insertion and block production will happen without any timing
// delay between them. This will cause flaky simulator executions due to
// the transaction pool running its internal reset operation on a back-
// ground thread. To avoid the racey behavior - in simulator mode - the
// pool will be explicitly blocked on its reset before continuing to the
// block production below.
if simulatorMode {
if err := api.eth.TxPool().Sync(); err != nil {
log.Error("Failed to sync transaction pool", "err", err)
return valid(nil), engine.InvalidPayloadAttributes.With(err)
}
}
payload, err := api.eth.Miner().BuildPayload(args)
if err != nil {
log.Error("Failed to build payload", "err", err)
Expand Down
87 changes: 51 additions & 36 deletions eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"math/big"
"sync"
"time"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
Expand All @@ -41,36 +43,46 @@ const devEpochLength = 32
// withdrawalQueue implements a FIFO queue which holds withdrawals that are
// pending inclusion.
type withdrawalQueue struct {
pending chan *types.Withdrawal
pending types.Withdrawals
mu sync.Mutex
feed event.Feed
subs event.SubscriptionScope
}

type newWithdrawalsEvent struct{ Withdrawals types.Withdrawals }

// add queues a withdrawal for future inclusion.
func (w *withdrawalQueue) add(withdrawal *types.Withdrawal) error {
select {
case w.pending <- withdrawal:
break
default:
return errors.New("withdrawal queue full")
}
w.mu.Lock()
w.pending = append(w.pending, withdrawal)
w.mu.Unlock()

w.feed.Send(newWithdrawalsEvent{types.Withdrawals{withdrawal}})
return nil
}

// gatherPending returns a number of queued withdrawals up to a maximum count.
func (w *withdrawalQueue) gatherPending(maxCount int) []*types.Withdrawal {
withdrawals := []*types.Withdrawal{}
for {
select {
case withdrawal := <-w.pending:
withdrawals = append(withdrawals, withdrawal)
if len(withdrawals) == maxCount {
return withdrawals
}
default:
return withdrawals
}
}
// pop dequeues the specified number of withdrawals from the queue.
func (w *withdrawalQueue) pop(count int) types.Withdrawals {
w.mu.Lock()
defer w.mu.Unlock()

count = min(count, len(w.pending))
popped := w.pending[0:count]
w.pending = w.pending[count:]

return popped
}

// subscribe allows a listener to be updated when new withdrawals are added to
// the queue.
func (w *withdrawalQueue) subscribe(ch chan<- newWithdrawalsEvent) event.Subscription {
sub := w.feed.Subscribe(ch)
return w.subs.Track(sub)
}

// SimulatedBeacon drives an Ethereum instance as if it were a real beacon
// client. It can run in period mode where it mines a new block every period
// (seconds) or on every transaction via Commit, Fork and AdjustTime.
type SimulatedBeacon struct {
shutdownCh chan struct{}
eth *eth.Ethereum
Expand All @@ -86,10 +98,6 @@ type SimulatedBeacon struct {
}

// NewSimulatedBeacon constructs a new simulated beacon chain.
// Period sets the period in which blocks should be produced.
//
// - If period is set to 0, a block is produced on every transaction.
// via Commit, Fork and AdjustTime.
func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, error) {
block := eth.BlockChain().CurrentBlock()
current := engine.ForkchoiceStateV1{
Expand All @@ -112,7 +120,6 @@ func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, err
engineAPI: engineAPI,
lastBlockTime: block.Time,
curForkchoiceState: current,
withdrawals: withdrawalQueue{make(chan *types.Withdrawal, 20)},
}, nil
}

Expand Down Expand Up @@ -156,6 +163,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
c.setCurrentState(header.Hash(), *finalizedHash)
}

// Because transaction insertion, block insertion, and block production will
// happen without any timing delay between them in simulator mode and the
// transaction pool will be running its internal reset operation on a
// background thread, flaky executions can happen. To avoid the racey
// behavior, the pool will be explicitly blocked on its reset before
// continuing to the block production below.
if err := c.eth.APIBackend.TxPool().Sync(); err != nil {
jwasinger marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("failed to sync txpool: %w", err)
}

var random [32]byte
rand.Read(random[:])
fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, &engine.PayloadAttributes{
Expand All @@ -164,13 +181,14 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
Withdrawals: withdrawals,
Random: random,
BeaconRoot: &common.Hash{},
}, engine.PayloadV3, true)
}, engine.PayloadV3)
if err != nil {
return err
}
if fcResponse == engine.STATUS_SYNCING {
return errors.New("chain rewind prevented invocation of payload creation")
}

envelope, err := c.engineAPI.getPayload(*fcResponse.PayloadID, true)
if err != nil {
return err
Expand Down Expand Up @@ -223,8 +241,7 @@ func (c *SimulatedBeacon) loop() {
case <-c.shutdownCh:
return
case <-timer.C:
withdrawals := c.withdrawals.gatherPending(10)
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
if err := c.sealBlock(c.withdrawals.pop(10), uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
} else {
timer.Reset(time.Second * time.Duration(c.period))
Expand Down Expand Up @@ -260,7 +277,7 @@ func (c *SimulatedBeacon) setCurrentState(headHash, finalizedHash common.Hash) {

// Commit seals a block on demand.
func (c *SimulatedBeacon) Commit() common.Hash {
withdrawals := c.withdrawals.gatherPending(10)
withdrawals := c.withdrawals.pop(10)
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
}
Expand Down Expand Up @@ -301,16 +318,14 @@ func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
if parent == nil {
return errors.New("parent not found")
}
withdrawals := c.withdrawals.gatherPending(10)
withdrawals := c.withdrawals.pop(10)
return c.sealBlock(withdrawals, parent.Time+uint64(adjustment/time.Second))
}

// RegisterSimulatedBeaconAPIs registers the simulated beacon's API with the
// stack.
func RegisterSimulatedBeaconAPIs(stack *node.Node, sim *SimulatedBeacon) {
api := &api{sim}
if sim.period == 0 {
// mine on demand if period is set to 0
go api.loop()
}
api := newSimulatedBeaconAPI(sim)
stack.RegisterAPIs([]rpc.API{
{
Namespace: "dev",
Expand Down
72 changes: 58 additions & 14 deletions eth/catalyst/simulated_beacon_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,88 @@ package catalyst

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

type api struct {
// simulatedBeaconAPI provides a RPC API for SimulatedBeacon.
type simulatedBeaconAPI struct {
sim *SimulatedBeacon
}

func (a *api) loop() {
// newSimulatedBeaconAPI returns an instance of simulatedBeaconAPI with a
// buffered commit channel. If period is zero, it starts a goroutine to handle
// new tx events.
func newSimulatedBeaconAPI(sim *SimulatedBeacon) *simulatedBeaconAPI {
api := &simulatedBeaconAPI{sim: sim}
if sim.period == 0 {
// mine on demand if period is set to 0
go api.loop()
}
return api
}

// loop is the main loop for the API when it's running in period = 0 mode. It
// ensures that block production is triggered as soon as a new withdrawal or
// transaction is received.
func (a *simulatedBeaconAPI) loop() {
var (
newTxs = make(chan core.NewTxsEvent)
sub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
newTxs = make(chan core.NewTxsEvent)
newWxs = make(chan newWithdrawalsEvent)
newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
newWxsSub = a.sim.withdrawals.subscribe(newWxs)
doCommit = make(chan struct{}, 1)
)
defer sub.Unsubscribe()
defer newTxsSub.Unsubscribe()
defer newWxsSub.Unsubscribe()

// A background thread which signals to the simulator when to commit
// based on messages over doCommit.
go func() {
for _ = range doCommit {
a.sim.Commit()
a.sim.eth.TxPool().Sync()

// It's worth noting that in case a tx ends up in the pool listed as
// "executable", but for whatever reason the miner does not include it in
// a block -- maybe the miner is enforcing a higher tip than the pool --
// this code will spinloop.
for {
if executable, _ := a.sim.eth.TxPool().Stats(); executable == 0 {
break
}
a.sim.Commit()
}
}
}()

for {
select {
case <-a.sim.shutdownCh:
close(doCommit)
return
case w := <-a.sim.withdrawals.pending:
withdrawals := append(a.sim.withdrawals.gatherPending(9), w)
if err := a.sim.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
case <-newWxs:
select {
case doCommit <- struct{}{}:
default:
}
case <-newTxs:
a.sim.Commit()
select {
case doCommit <- struct{}{}:
default:
}
}
}
}

func (a *api) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error {
// AddWithdrawal adds a withdrawal to the pending queue.
func (a *simulatedBeaconAPI) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error {
return a.sim.withdrawals.add(withdrawal)
}

func (a *api) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) {
// SetFeeRecipient sets the fee recipient for block building purposes.
func (a *simulatedBeaconAPI) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) {
a.sim.setFeeRecipient(feeRecipient)
}
Loading