From c37221427c845245bf76339b4c191d711d5215f5 Mon Sep 17 00:00:00 2001 From: lightclient Date: Sun, 4 Aug 2024 15:53:17 -0600 Subject: [PATCH 01/10] eth/catalyst: ensure period zero mode leaves no pending txs in pool --- eth/catalyst/simulated_beacon.go | 73 +++++++++++++++------------ eth/catalyst/simulated_beacon_api.go | 65 ++++++++++++++++++------ eth/catalyst/simulated_beacon_test.go | 73 +++++++++++++++++++++++++-- 3 files changed, 159 insertions(+), 52 deletions(-) diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index 8bdf94b80e81..052dc7e23797 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" @@ -41,36 +42,47 @@ const devEpochLength = 32 // withdrawalQueue implements a FIFO queue which holds withdrawals that are // pending inclusion. type withdrawalQueue struct { - pending chan *types.Withdrawal + pending types.Withdrawals + mu sync.Mutex + feed event.Feed + subs event.SubscriptionScope } +type newWithdrawalsEvent struct{ Withdrawals types.Withdrawals } + // add queues a withdrawal for future inclusion. -func (w *withdrawalQueue) add(withdrawal *types.Withdrawal) error { - select { - case w.pending <- withdrawal: - break - default: - return errors.New("withdrawal queue full") - } +func (w *withdrawalQueue) Add(withdrawal *types.Withdrawal) error { + w.mu.Lock() + defer w.mu.Unlock() + + w.pending = append(w.pending, withdrawal) + w.feed.Send(newWithdrawalsEvent{types.Withdrawals{withdrawal}}) + return nil } -// gatherPending returns a number of queued withdrawals up to a maximum count. -func (w *withdrawalQueue) gatherPending(maxCount int) []*types.Withdrawal { - withdrawals := []*types.Withdrawal{} - for { - select { - case withdrawal := <-w.pending: - withdrawals = append(withdrawals, withdrawal) - if len(withdrawals) == maxCount { - return withdrawals - } - default: - return withdrawals - } - } +// pop dequeues the specified number of withdrawals from the queue. +func (w *withdrawalQueue) Pop(count int) types.Withdrawals { + w.mu.Lock() + defer w.mu.Unlock() + + count = min(count, len(w.pending)) + popped := w.pending[0:count] + w.pending = w.pending[count:] + + return popped } +// subscribe allows a listener to be updated when new withdrawals are added to +// the queue. +func (w *withdrawalQueue) Subscribe(ch chan<- newWithdrawalsEvent) event.Subscription { + sub := w.feed.Subscribe(ch) + return w.subs.Track(sub) +} + +// SimulatedBeacon drives an Ethereum instance as if it were a real beacon +// client. It can run in period mode where it mines a new block every period +// (seconds) or on every transaction via Commit, Fork and AdjustTime. type SimulatedBeacon struct { shutdownCh chan struct{} eth *eth.Ethereum @@ -86,10 +98,6 @@ type SimulatedBeacon struct { } // NewSimulatedBeacon constructs a new simulated beacon chain. -// Period sets the period in which blocks should be produced. -// -// - If period is set to 0, a block is produced on every transaction. -// via Commit, Fork and AdjustTime. func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, error) { block := eth.BlockChain().CurrentBlock() current := engine.ForkchoiceStateV1{ @@ -112,7 +120,6 @@ func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, err engineAPI: engineAPI, lastBlockTime: block.Time, curForkchoiceState: current, - withdrawals: withdrawalQueue{make(chan *types.Withdrawal, 20)}, }, nil } @@ -171,6 +178,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u if fcResponse == engine.STATUS_SYNCING { return errors.New("chain rewind prevented invocation of payload creation") } + envelope, err := c.engineAPI.getPayload(*fcResponse.PayloadID, true) if err != nil { return err @@ -223,8 +231,7 @@ func (c *SimulatedBeacon) loop() { case <-c.shutdownCh: return case <-timer.C: - withdrawals := c.withdrawals.gatherPending(10) - if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil { + if err := c.sealBlock(c.withdrawals.Pop(10), uint64(time.Now().Unix())); err != nil { log.Warn("Error performing sealing work", "err", err) } else { timer.Reset(time.Second * time.Duration(c.period)) @@ -260,7 +267,7 @@ func (c *SimulatedBeacon) setCurrentState(headHash, finalizedHash common.Hash) { // Commit seals a block on demand. func (c *SimulatedBeacon) Commit() common.Hash { - withdrawals := c.withdrawals.gatherPending(10) + withdrawals := c.withdrawals.Pop(10) if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil { log.Warn("Error performing sealing work", "err", err) } @@ -301,12 +308,14 @@ func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error { if parent == nil { return errors.New("parent not found") } - withdrawals := c.withdrawals.gatherPending(10) + withdrawals := c.withdrawals.Pop(10) return c.sealBlock(withdrawals, parent.Time+uint64(adjustment/time.Second)) } +// RegisterSimulatedBeaconAPIs registers the simulated beacon's API with the +// stack. func RegisterSimulatedBeaconAPIs(stack *node.Node, sim *SimulatedBeacon) { - api := &api{sim} + api := &simulatedBeaconAPI{sim: sim, doCommit: make(chan struct{}, 1)} if sim.period == 0 { // mine on demand if period is set to 0 go api.loop() diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index 73d0a5921d83..b26316f8585d 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -18,44 +18,77 @@ package catalyst import ( "context" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" ) -type api struct { - sim *SimulatedBeacon +// simulatedBeaconAPI provides a RPC API for SimulatedBeacon. +type simulatedBeaconAPI struct { + sim *SimulatedBeacon + doCommit chan struct{} } -func (a *api) loop() { +// loop is the main loop for the API when it's running in period = 0 mode. It +// ensures that block production is triggered as soon as a new withdrawal or +// transaction is received. +func (a *simulatedBeaconAPI) loop() { var ( - newTxs = make(chan core.NewTxsEvent) - sub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true) + newTxs = make(chan core.NewTxsEvent) + newWxs = make(chan newWithdrawalsEvent) + newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true) + newWxsSub = a.sim.withdrawals.Subscribe(newWxs) ) - defer sub.Unsubscribe() + defer newTxsSub.Unsubscribe() + defer newWxsSub.Unsubscribe() + + go a.worker() for { select { case <-a.sim.shutdownCh: return - case w := <-a.sim.withdrawals.pending: - withdrawals := append(a.sim.withdrawals.gatherPending(9), w) - if err := a.sim.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil { - log.Warn("Error performing sealing work", "err", err) - } + case <-newWxs: + a.commit() case <-newTxs: + a.commit() + } + } +} + +// commit is a non-blocking method to initate Commit() on the simulator. +func (a *simulatedBeaconAPI) commit() { + select { + case a.doCommit <- struct{}{}: + default: + } +} + +// worker runs in the background and signals to the simulator when to commit +// based on messages over doCommit. +func (a *simulatedBeaconAPI) worker() { + for { + select { + case <-a.sim.shutdownCh: + return + case <-a.doCommit: a.sim.Commit() + a.sim.eth.TxPool().Sync() + executable, _ := a.sim.eth.TxPool().Stats() + if executable != 0 { + a.commit() + } } } } -func (a *api) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error { - return a.sim.withdrawals.add(withdrawal) +// AddWithdrawal adds a withdrawal to the pending queue. +func (a *simulatedBeaconAPI) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error { + return a.sim.withdrawals.Add(withdrawal) } -func (a *api) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) { +// SetFeeRecipient sets the fee recipient for block building purposes. +func (a *simulatedBeaconAPI) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) { a.sim.setFeeRecipient(feeRecipient) } diff --git a/eth/catalyst/simulated_beacon_test.go b/eth/catalyst/simulated_beacon_test.go index bb10938c359d..723338a1af5d 100644 --- a/eth/catalyst/simulated_beacon_test.go +++ b/eth/catalyst/simulated_beacon_test.go @@ -35,7 +35,7 @@ import ( "github.com/ethereum/go-ethereum/params" ) -func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis) (*node.Node, *eth.Ethereum, *SimulatedBeacon) { +func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis, period uint64) (*node.Node, *eth.Ethereum, *SimulatedBeacon) { t.Helper() n, err := node.New(&node.Config{ @@ -55,7 +55,7 @@ func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis) (*node. t.Fatal("can't create eth service:", err) } - simBeacon, err := NewSimulatedBeacon(1, ethservice) + simBeacon, err := NewSimulatedBeacon(period, ethservice) if err != nil { t.Fatal("can't create simulated beacon:", err) } @@ -87,7 +87,7 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) { // short period (1 second) for testing purposes var gasLimit uint64 = 10_000_000 genesis := core.DeveloperGenesisBlock(gasLimit, &testAddr) - node, ethService, mock := startSimulatedBeaconEthService(t, genesis) + node, ethService, mock := startSimulatedBeaconEthService(t, genesis, 1) _ = mock defer node.Close() @@ -98,7 +98,7 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) { // generate some withdrawals for i := 0; i < 20; i++ { withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)}) - if err := mock.withdrawals.add(&withdrawals[i]); err != nil { + if err := mock.withdrawals.Add(&withdrawals[i]); err != nil { t.Fatal("addWithdrawal failed", err) } } @@ -140,3 +140,68 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) { } } } + +// Tests that zero-period dev mode can handle a lot of simultaneous +// transactions/withdrawals +func TestOnDemandSpam(t *testing.T) { + var ( + withdrawals []types.Withdrawal + txs = make(map[common.Hash]*types.Transaction) + testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + testAddr = crypto.PubkeyToAddress(testKey.PublicKey) + gasLimit uint64 = 10_000_000 + genesis = core.DeveloperGenesisBlock(gasLimit, &testAddr) + node, eth, mock = startSimulatedBeaconEthService(t, genesis, 0) + signer = types.LatestSigner(eth.BlockChain().Config()) + chainHeadCh = make(chan core.ChainHeadEvent, 100) + sub = eth.BlockChain().SubscribeChainHeadEvent(chainHeadCh) + ) + defer node.Close() + defer sub.Unsubscribe() + + // start simulated beacon + api := &simulatedBeaconAPI{sim: mock, doCommit: make(chan struct{}, 1)} + go api.loop() + + // generate some withdrawals + for i := 0; i < 20; i++ { + withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)}) + if err := mock.withdrawals.Add(&withdrawals[i]); err != nil { + t.Fatal("addWithdrawal failed", err) + } + } + + // generate a bunch of transactions + for i := 0; i < 20000; i++ { + tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey) + if err != nil { + t.Fatal("error signing transaction", err) + } + txs[tx.Hash()] = tx + if err := eth.APIBackend.SendTx(context.Background(), tx); err != nil { + t.Fatal("error adding txs to pool", err) + } + } + + var ( + includedTxs = make(map[common.Hash]struct{}) + includedWxs []uint64 + ) + for { + select { + case evt := <-chainHeadCh: + for _, itx := range evt.Block.Transactions() { + includedTxs[itx.Hash()] = struct{}{} + } + for _, iwx := range evt.Block.Withdrawals() { + includedWxs = append(includedWxs, iwx.Index) + } + // ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10 + if len(includedTxs) == len(txs) && len(includedWxs) == len(withdrawals) { + return + } + case <-time.After(10 * time.Second): + t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d", len(includedTxs), len(txs), len(includedWxs), len(withdrawals)) + } + } +} From 1eb342e30a6e3746237e831e61b4e51a1ad7eaeb Mon Sep 17 00:00:00 2001 From: lightclient Date: Sun, 4 Aug 2024 18:03:39 -0600 Subject: [PATCH 02/10] eth/catalyst: move txpool.Sync from fcu into simulated beacon --- eth/catalyst/api.go | 21 ++++----------------- eth/catalyst/simulated_beacon.go | 13 ++++++++++++- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 00cce259c861..36a332d92264 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -184,7 +184,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa return engine.STATUS_INVALID, engine.InvalidParams.With(errors.New("forkChoiceUpdateV1 called post-shanghai")) } } - return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, false) + return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1) } // ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload @@ -207,7 +207,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa return engine.STATUS_INVALID, engine.UnsupportedFork.With(errors.New("forkchoiceUpdatedV2 must only be called with paris and shanghai payloads")) } } - return api.forkchoiceUpdated(update, params, engine.PayloadV2, false) + return api.forkchoiceUpdated(update, params, engine.PayloadV2) } // ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root @@ -228,10 +228,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa // hash, even if params are wrong. To do this we need to split up // forkchoiceUpdate into a function that only updates the head and then a // function that kicks off block construction. - return api.forkchoiceUpdated(update, params, engine.PayloadV3, false) + return api.forkchoiceUpdated(update, params, engine.PayloadV3) } -func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, simulatorMode bool) (engine.ForkChoiceResponse, error) { +func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion) (engine.ForkChoiceResponse, error) { api.forkchoiceLock.Lock() defer api.forkchoiceLock.Unlock() @@ -374,19 +374,6 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl if api.localBlocks.has(id) { return valid(&id), nil } - // If the beacon chain is ran by a simulator, then transaction insertion, - // block insertion and block production will happen without any timing - // delay between them. This will cause flaky simulator executions due to - // the transaction pool running its internal reset operation on a back- - // ground thread. To avoid the racey behavior - in simulator mode - the - // pool will be explicitly blocked on its reset before continuing to the - // block production below. - if simulatorMode { - if err := api.eth.TxPool().Sync(); err != nil { - log.Error("Failed to sync transaction pool", "err", err) - return valid(nil), engine.InvalidPayloadAttributes.With(err) - } - } payload, err := api.eth.Miner().BuildPayload(args) if err != nil { log.Error("Failed to build payload", "err", err) diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index 052dc7e23797..da32a6c9485b 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -20,6 +20,7 @@ import ( "crypto/rand" "crypto/sha256" "errors" + "fmt" "math/big" "sync" "time" @@ -163,6 +164,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u c.setCurrentState(header.Hash(), *finalizedHash) } + // Because transaction insertion, block insertion and block production will + // happen without any timing delay between them in simulator mode and the + // transaction pool will be running its internal reset operation on a + // background thread, flaky executions can happen. To avoid the racey + // behavior, the pool will be explicitly blocked on its reset before + // continuing to the block production below. + if err := c.eth.APIBackend.TxPool().Sync(); err != nil { + return fmt.Errorf("failed to sync txpool: %w", err) + } + var random [32]byte rand.Read(random[:]) fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, &engine.PayloadAttributes{ @@ -171,7 +182,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u Withdrawals: withdrawals, Random: random, BeaconRoot: &common.Hash{}, - }, engine.PayloadV3, true) + }, engine.PayloadV3) if err != nil { return err } From 16fad83071a8e7e22dc04430f319f08032de7a02 Mon Sep 17 00:00:00 2001 From: lightclient Date: Thu, 8 Aug 2024 07:23:03 -0600 Subject: [PATCH 03/10] eth/catalyst: fix pr comments --- eth/catalyst/simulated_beacon.go | 20 ++++++++------------ eth/catalyst/simulated_beacon_api.go | 16 ++++++++++++++-- eth/catalyst/simulated_beacon_test.go | 9 +++------ 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index da32a6c9485b..de6ab8e6bd43 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -52,7 +52,7 @@ type withdrawalQueue struct { type newWithdrawalsEvent struct{ Withdrawals types.Withdrawals } // add queues a withdrawal for future inclusion. -func (w *withdrawalQueue) Add(withdrawal *types.Withdrawal) error { +func (w *withdrawalQueue) add(withdrawal *types.Withdrawal) error { w.mu.Lock() defer w.mu.Unlock() @@ -63,7 +63,7 @@ func (w *withdrawalQueue) Add(withdrawal *types.Withdrawal) error { } // pop dequeues the specified number of withdrawals from the queue. -func (w *withdrawalQueue) Pop(count int) types.Withdrawals { +func (w *withdrawalQueue) pop(count int) types.Withdrawals { w.mu.Lock() defer w.mu.Unlock() @@ -76,7 +76,7 @@ func (w *withdrawalQueue) Pop(count int) types.Withdrawals { // subscribe allows a listener to be updated when new withdrawals are added to // the queue. -func (w *withdrawalQueue) Subscribe(ch chan<- newWithdrawalsEvent) event.Subscription { +func (w *withdrawalQueue) subscribe(ch chan<- newWithdrawalsEvent) event.Subscription { sub := w.feed.Subscribe(ch) return w.subs.Track(sub) } @@ -164,7 +164,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u c.setCurrentState(header.Hash(), *finalizedHash) } - // Because transaction insertion, block insertion and block production will + // Because transaction insertion, block insertion, and block production will // happen without any timing delay between them in simulator mode and the // transaction pool will be running its internal reset operation on a // background thread, flaky executions can happen. To avoid the racey @@ -242,7 +242,7 @@ func (c *SimulatedBeacon) loop() { case <-c.shutdownCh: return case <-timer.C: - if err := c.sealBlock(c.withdrawals.Pop(10), uint64(time.Now().Unix())); err != nil { + if err := c.sealBlock(c.withdrawals.pop(10), uint64(time.Now().Unix())); err != nil { log.Warn("Error performing sealing work", "err", err) } else { timer.Reset(time.Second * time.Duration(c.period)) @@ -278,7 +278,7 @@ func (c *SimulatedBeacon) setCurrentState(headHash, finalizedHash common.Hash) { // Commit seals a block on demand. func (c *SimulatedBeacon) Commit() common.Hash { - withdrawals := c.withdrawals.Pop(10) + withdrawals := c.withdrawals.pop(10) if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil { log.Warn("Error performing sealing work", "err", err) } @@ -319,18 +319,14 @@ func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error { if parent == nil { return errors.New("parent not found") } - withdrawals := c.withdrawals.Pop(10) + withdrawals := c.withdrawals.pop(10) return c.sealBlock(withdrawals, parent.Time+uint64(adjustment/time.Second)) } // RegisterSimulatedBeaconAPIs registers the simulated beacon's API with the // stack. func RegisterSimulatedBeaconAPIs(stack *node.Node, sim *SimulatedBeacon) { - api := &simulatedBeaconAPI{sim: sim, doCommit: make(chan struct{}, 1)} - if sim.period == 0 { - // mine on demand if period is set to 0 - go api.loop() - } + api := newSimulatedBeaconAPI(sim) stack.RegisterAPIs([]rpc.API{ { Namespace: "dev", diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index b26316f8585d..86b489817e7f 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -30,6 +30,18 @@ type simulatedBeaconAPI struct { doCommit chan struct{} } +// newSimulatedBeaconAPI returns an instance of simulatedBeaconAPI with a +// buffered commit channel. If period is zero, it starts a goroutine to handle +// new tx events. +func newSimulatedBeaconAPI(sim *SimulatedBeacon) *simulatedBeaconAPI { + api := &simulatedBeaconAPI{sim: sim, doCommit: make(chan struct{}, 1)} + if sim.period == 0 { + // mine on demand if period is set to 0 + go api.loop() + } + return api +} + // loop is the main loop for the API when it's running in period = 0 mode. It // ensures that block production is triggered as soon as a new withdrawal or // transaction is received. @@ -38,7 +50,7 @@ func (a *simulatedBeaconAPI) loop() { newTxs = make(chan core.NewTxsEvent) newWxs = make(chan newWithdrawalsEvent) newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true) - newWxsSub = a.sim.withdrawals.Subscribe(newWxs) + newWxsSub = a.sim.withdrawals.subscribe(newWxs) ) defer newTxsSub.Unsubscribe() defer newWxsSub.Unsubscribe() @@ -85,7 +97,7 @@ func (a *simulatedBeaconAPI) worker() { // AddWithdrawal adds a withdrawal to the pending queue. func (a *simulatedBeaconAPI) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error { - return a.sim.withdrawals.Add(withdrawal) + return a.sim.withdrawals.add(withdrawal) } // SetFeeRecipient sets the fee recipient for block building purposes. diff --git a/eth/catalyst/simulated_beacon_test.go b/eth/catalyst/simulated_beacon_test.go index 723338a1af5d..711e8f1d60f3 100644 --- a/eth/catalyst/simulated_beacon_test.go +++ b/eth/catalyst/simulated_beacon_test.go @@ -98,7 +98,7 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) { // generate some withdrawals for i := 0; i < 20; i++ { withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)}) - if err := mock.withdrawals.Add(&withdrawals[i]); err != nil { + if err := mock.withdrawals.add(&withdrawals[i]); err != nil { t.Fatal("addWithdrawal failed", err) } } @@ -152,6 +152,7 @@ func TestOnDemandSpam(t *testing.T) { gasLimit uint64 = 10_000_000 genesis = core.DeveloperGenesisBlock(gasLimit, &testAddr) node, eth, mock = startSimulatedBeaconEthService(t, genesis, 0) + _ = newSimulatedBeaconAPI(mock) signer = types.LatestSigner(eth.BlockChain().Config()) chainHeadCh = make(chan core.ChainHeadEvent, 100) sub = eth.BlockChain().SubscribeChainHeadEvent(chainHeadCh) @@ -159,14 +160,10 @@ func TestOnDemandSpam(t *testing.T) { defer node.Close() defer sub.Unsubscribe() - // start simulated beacon - api := &simulatedBeaconAPI{sim: mock, doCommit: make(chan struct{}, 1)} - go api.loop() - // generate some withdrawals for i := 0; i < 20; i++ { withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)}) - if err := mock.withdrawals.Add(&withdrawals[i]); err != nil { + if err := mock.withdrawals.add(&withdrawals[i]); err != nil { t.Fatal("addWithdrawal failed", err) } } From 2e96810a68cb6c296d0cd2a6fffa87731d74d5f7 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 19 Aug 2024 14:12:51 +0200 Subject: [PATCH 04/10] eth/catalyst: minor refactoring --- eth/catalyst/simulated_beacon_api.go | 55 ++++++++++++---------------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index 86b489817e7f..3877fcb226ea 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -26,15 +26,14 @@ import ( // simulatedBeaconAPI provides a RPC API for SimulatedBeacon. type simulatedBeaconAPI struct { - sim *SimulatedBeacon - doCommit chan struct{} + sim *SimulatedBeacon } // newSimulatedBeaconAPI returns an instance of simulatedBeaconAPI with a // buffered commit channel. If period is zero, it starts a goroutine to handle // new tx events. func newSimulatedBeaconAPI(sim *SimulatedBeacon) *simulatedBeaconAPI { - api := &simulatedBeaconAPI{sim: sim, doCommit: make(chan struct{}, 1)} + api := &simulatedBeaconAPI{sim: sim} if sim.period == 0 { // mine on demand if period is set to 0 go api.loop() @@ -51,47 +50,41 @@ func (a *simulatedBeaconAPI) loop() { newWxs = make(chan newWithdrawalsEvent) newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true) newWxsSub = a.sim.withdrawals.subscribe(newWxs) + doCommit = make(chan struct{}, 1) ) defer newTxsSub.Unsubscribe() defer newWxsSub.Unsubscribe() - go a.worker() - - for { + // commit is a non-blocking method to initate Commit() on the simulator. + commit := func() { select { - case <-a.sim.shutdownCh: - return - case <-newWxs: - a.commit() - case <-newTxs: - a.commit() + case doCommit <- struct{}{}: + default: } } -} - -// commit is a non-blocking method to initate Commit() on the simulator. -func (a *simulatedBeaconAPI) commit() { - select { - case a.doCommit <- struct{}{}: - default: - } -} - -// worker runs in the background and signals to the simulator when to commit -// based on messages over doCommit. -func (a *simulatedBeaconAPI) worker() { - for { - select { - case <-a.sim.shutdownCh: - return - case <-a.doCommit: + // a background thread which signals to the simulator when to commit + // based on messages over doCommit. + go func() { + for _ = range doCommit { a.sim.Commit() a.sim.eth.TxPool().Sync() executable, _ := a.sim.eth.TxPool().Stats() if executable != 0 { - a.commit() + commit() } } + }() + + for { + select { + case <-a.sim.shutdownCh: + close(doCommit) + return + case <-newWxs: + commit() + case <-newTxs: + commit() + } } } From 191d4665288444ee473d8fd97410ad43268ef1fb Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 19 Aug 2024 14:41:00 +0200 Subject: [PATCH 05/10] eth/catalyst: refactor simulated beacon --- eth/catalyst/simulated_beacon_api.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index 3877fcb226ea..20cf8325ba4d 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -54,23 +54,14 @@ func (a *simulatedBeaconAPI) loop() { ) defer newTxsSub.Unsubscribe() defer newWxsSub.Unsubscribe() - - // commit is a non-blocking method to initate Commit() on the simulator. - commit := func() { - select { - case doCommit <- struct{}{}: - default: - } - } // a background thread which signals to the simulator when to commit // based on messages over doCommit. go func() { for _ = range doCommit { a.sim.Commit() a.sim.eth.TxPool().Sync() - executable, _ := a.sim.eth.TxPool().Stats() - if executable != 0 { - commit() + if executable, _ := a.sim.eth.TxPool().Stats(); executable > 0 { + a.sim.Commit() } } }() @@ -81,9 +72,15 @@ func (a *simulatedBeaconAPI) loop() { close(doCommit) return case <-newWxs: - commit() + select { + case doCommit <- struct{}{}: + default: + } case <-newTxs: - commit() + select { + case doCommit <- struct{}{}: + default: + } } } } From 4cd6aa2809fcb45d0a0303fa2d56d053ed0ab1a4 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 19 Aug 2024 14:59:10 +0200 Subject: [PATCH 06/10] eth/catalyst: ensure all txs are handled --- eth/catalyst/simulated_beacon_api.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index 20cf8325ba4d..da95be18d3dc 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -60,7 +60,10 @@ func (a *simulatedBeaconAPI) loop() { for _ = range doCommit { a.sim.Commit() a.sim.eth.TxPool().Sync() - if executable, _ := a.sim.eth.TxPool().Stats(); executable > 0 { + for { + if executable, _ := a.sim.eth.TxPool().Stats(); executable == 0 { + break + } a.sim.Commit() } } From bc9f01c3a53c870ece8830a3da4e08bffb788d40 Mon Sep 17 00:00:00 2001 From: lightclient Date: Mon, 19 Aug 2024 07:18:31 -0600 Subject: [PATCH 07/10] eth/catalyst: pull feed Send out of mutex lock --- eth/catalyst/simulated_beacon.go | 5 ++--- eth/catalyst/simulated_beacon_api.go | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index de6ab8e6bd43..86355a153387 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -54,11 +54,10 @@ type newWithdrawalsEvent struct{ Withdrawals types.Withdrawals } // add queues a withdrawal for future inclusion. func (w *withdrawalQueue) add(withdrawal *types.Withdrawal) error { w.mu.Lock() - defer w.mu.Unlock() - w.pending = append(w.pending, withdrawal) - w.feed.Send(newWithdrawalsEvent{types.Withdrawals{withdrawal}}) + w.mu.Unlock() + w.feed.Send(newWithdrawalsEvent{types.Withdrawals{withdrawal}}) return nil } diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index da95be18d3dc..79a0121b5bbd 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -54,7 +54,8 @@ func (a *simulatedBeaconAPI) loop() { ) defer newTxsSub.Unsubscribe() defer newWxsSub.Unsubscribe() - // a background thread which signals to the simulator when to commit + + // A background thread which signals to the simulator when to commit // based on messages over doCommit. go func() { for _ = range doCommit { From 972c05531d8772e12cfee592c3b762cfc1614d14 Mon Sep 17 00:00:00 2001 From: lightclient Date: Mon, 19 Aug 2024 07:21:59 -0600 Subject: [PATCH 08/10] eth/catalyst: add comment about spin loop --- eth/catalyst/simulated_beacon_api.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index 79a0121b5bbd..cd48f23d4ece 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -61,6 +61,11 @@ func (a *simulatedBeaconAPI) loop() { for _ = range doCommit { a.sim.Commit() a.sim.eth.TxPool().Sync() + + // It's worth noting that in case a tx ends up in the pool listed as + // "executable", but for whatever reason the miner does not include it in + // a block -- maybe the miner is enforcing a higher tip than the pool -- + // this code will spinloop. for { if executable, _ := a.sim.eth.TxPool().Stats(); executable == 0 { break From 201e1bdfd5a55b5d0607d6398eeb8305b288e322 Mon Sep 17 00:00:00 2001 From: lightclient Date: Mon, 19 Aug 2024 09:12:28 -0600 Subject: [PATCH 09/10] eth/catalyst: fix lint --- eth/catalyst/simulated_beacon_api.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index cd48f23d4ece..59ec3bcc91de 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -58,14 +58,14 @@ func (a *simulatedBeaconAPI) loop() { // A background thread which signals to the simulator when to commit // based on messages over doCommit. go func() { - for _ = range doCommit { + for range doCommit { a.sim.Commit() a.sim.eth.TxPool().Sync() // It's worth noting that in case a tx ends up in the pool listed as // "executable", but for whatever reason the miner does not include it in // a block -- maybe the miner is enforcing a higher tip than the pool -- - // this code will spinloop. + // this code will spinloopk. for { if executable, _ := a.sim.eth.TxPool().Stats(); executable == 0 { break From 3ad818e6e0fb7c7bdf35080df8565322da11f783 Mon Sep 17 00:00:00 2001 From: lightclient Date: Mon, 19 Aug 2024 12:25:17 -0600 Subject: [PATCH 10/10] eth/catalyst: typo --- eth/catalyst/simulated_beacon_api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index 59ec3bcc91de..668780531501 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -65,7 +65,7 @@ func (a *simulatedBeaconAPI) loop() { // It's worth noting that in case a tx ends up in the pool listed as // "executable", but for whatever reason the miner does not include it in // a block -- maybe the miner is enforcing a higher tip than the pool -- - // this code will spinloopk. + // this code will spinloop. for { if executable, _ := a.sim.eth.TxPool().Stats(); executable == 0 { break