Skip to content

Commit

Permalink
Fix startup sync for txpool processing for bor block production (#9219)
Browse files Browse the repository at this point in the history
When the sync loop first runs it suppresses block sync events both in
the initial loop and when the blocks being processed are greater than
1000.

This fix removed the first check, because otherwise the first block
received by the process ends up not getting sent to the tx pool. Which
means it won't produce new block for polygon.

As well as this fix - I have also moved the gas initialization to the
txpool start method rather than prompting it with a 'synthetic block
event'

As the txpool start has access to the core & tx DB's it can find the
current block and chain config internally so that it doesn't need to be
externally activated it can just do this itself on start up. This has
the advantage of making the txpool more self contained.
  • Loading branch information
mh0lt authored Jan 13, 2024
1 parent 2b0fd6d commit d8b91c4
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 57 deletions.
5 changes: 3 additions & 2 deletions cmd/txpool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ledgerwatch/erigon-lib/types"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest"
common2 "github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -163,7 +164,7 @@ func doTxpool(ctx context.Context, logger log.Logger) error {
newTxs := make(chan types.Announcements, 1024)
defer close(newTxs)
txPoolDB, txPool, fetch, send, txpoolGrpcServer, err := txpooluitl.AllComponents(ctx, cfg,
kvcache.New(cacheConfig), newTxs, coreDB, sentryClients, kvClient, logger)
kvcache.New(cacheConfig), newTxs, coreDB, sentryClients, kvClient, misc.Eip1559FeeCalculator, logger)
if err != nil {
return err
}
Expand All @@ -178,7 +179,7 @@ func doTxpool(ctx context.Context, logger log.Logger) error {
}

notifyMiner := func() {}
txpool.MainLoop(ctx, txPoolDB, coreDB, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams, notifyMiner)
txpool.MainLoop(ctx, txPoolDB, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams, notifyMiner)

grpcServer.GracefulStop()
return nil
Expand Down
39 changes: 39 additions & 0 deletions consensus/misc/eip1559.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/polygon/bor/borcfg"

"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/params"
)
Expand Down Expand Up @@ -56,6 +59,42 @@ func VerifyEip1559Header(config *chain.Config, parent, header *types.Header, ski
return nil
}

var Eip1559FeeCalculator eip1559Calculator

type eip1559Calculator struct{}

func (f eip1559Calculator) CurrentFees(chainConfig *chain.Config, db kv.Getter) (baseFee uint64, blobFee uint64, minBlobGasPrice uint64, err error) {
hash := rawdb.ReadHeadHeaderHash(db)

if hash == (libcommon.Hash{}) {
return 0, 0, 0, fmt.Errorf("can't get head header hash")
}

currentHeader, err := rawdb.ReadHeaderByHash(db, hash)

if err != nil {
return 0, 0, 0, err
}

if chainConfig != nil {
if currentHeader.BaseFee != nil {
baseFee = CalcBaseFee(chainConfig, currentHeader).Uint64()
}

if currentHeader.ExcessBlobGas != nil {
excessBlobGas := CalcExcessBlobGas(chainConfig, currentHeader)
b, err := GetBlobGasPrice(chainConfig, excessBlobGas)
if err == nil {
blobFee = b.Uint64()
}
}
}

minBlobGasPrice = chainConfig.GetMinBlobGasPrice()

return baseFee, blobFee, minBlobGasPrice, nil
}

// CalcBaseFee calculates the basefee of the header.
func CalcBaseFee(config *chain.Config, parent *types.Header) *big.Int {
// If the current block is the first EIP-1559 block, return the InitialBaseFee.
Expand Down
8 changes: 4 additions & 4 deletions erigon-lib/chain/chain_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,28 +233,28 @@ func (c *Config) GetBurntContract(num uint64) *common.Address {
}

func (c *Config) GetMinBlobGasPrice() uint64 {
if c.MinBlobGasPrice != nil {
if c != nil && c.MinBlobGasPrice != nil {
return *c.MinBlobGasPrice
}
return 1 // MIN_BLOB_GASPRICE (EIP-4844)
}

func (c *Config) GetMaxBlobGasPerBlock() uint64 {
if c.MaxBlobGasPerBlock != nil {
if c != nil && c.MaxBlobGasPerBlock != nil {
return *c.MaxBlobGasPerBlock
}
return 786432 // MAX_BLOB_GAS_PER_BLOCK (EIP-4844)
}

func (c *Config) GetTargetBlobGasPerBlock() uint64 {
if c.TargetBlobGasPerBlock != nil {
if c != nil && c.TargetBlobGasPerBlock != nil {
return *c.TargetBlobGasPerBlock
}
return 393216 // TARGET_BLOB_GAS_PER_BLOCK (EIP-4844)
}

func (c *Config) GetBlobGasPriceUpdateFraction() uint64 {
if c.BlobGasPriceUpdateFraction != nil {
if c != nil && c.BlobGasPriceUpdateFraction != nil {
return *c.BlobGasPriceUpdateFraction
}
return 3338477 // BLOB_GASPRICE_UPDATE_FRACTION (EIP-4844)
Expand Down
31 changes: 24 additions & 7 deletions erigon-lib/txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,17 @@ type TxPool struct {
cancunTime *uint64
isPostCancun atomic.Bool
maxBlobsPerBlock uint64
feeCalculator FeeCalculator
logger log.Logger
}

type FeeCalculator interface {
CurrentFees(chainConfig *chain.Config, db kv.Getter) (baseFee uint64, blobFee uint64, minBlobGasPrice uint64, err error)
}

func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, cache kvcache.Cache,
chainID uint256.Int, shanghaiTime, agraBlock, cancunTime *big.Int, maxBlobsPerBlock uint64, logger log.Logger,
chainID uint256.Int, shanghaiTime, agraBlock, cancunTime *big.Int, maxBlobsPerBlock uint64,
feeCalculator FeeCalculator, logger log.Logger,
) (*TxPool, error) {
localsHistory, err := simplelru.NewLRU[string, struct{}](10_000, nil)
if err != nil {
Expand Down Expand Up @@ -275,6 +281,7 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config,
minedBlobTxsByBlock: map[uint64][]*metaTx{},
minedBlobTxsByHash: map[string]*metaTx{},
maxBlobsPerBlock: maxBlobsPerBlock,
feeCalculator: feeCalculator,
logger: logger,
}

Expand Down Expand Up @@ -331,7 +338,6 @@ func (p *TxPool) Start(ctx context.Context, db kv.RwDB) error {
}

func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error {

defer newBlockTimer.ObserveDuration(time.Now())
//t := time.Now()

Expand Down Expand Up @@ -1700,7 +1706,7 @@ const txMaxBroadcastSize = 4 * 1024
//
// promote/demote transactions
// reorgs
func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs chan types.Announcements, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) {
func MainLoop(ctx context.Context, db kv.RwDB, p *TxPool, newTxs chan types.Announcements, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) {
syncToNewPeersEvery := time.NewTicker(p.cfg.SyncToNewPeersEvery)
defer syncToNewPeersEvery.Stop()
processRemoteTxsEvery := time.NewTicker(p.cfg.ProcessRemoteTxsEvery)
Expand Down Expand Up @@ -2072,8 +2078,15 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
i++
}

var pendingBaseFee uint64
{
var pendingBaseFee, pendingBlobFee, minBlobGasPrice uint64

if p.feeCalculator != nil {
if chainConfig, _ := ChainConfig(tx); chainConfig != nil {
pendingBaseFee, pendingBlobFee, minBlobGasPrice, _ = p.feeCalculator.CurrentFees(chainConfig, coreTx)
}
}

if pendingBaseFee == 0 {
v, err := tx.GetOne(kv.PoolInfo, PoolPendingBaseFeeKey)
if err != nil {
return err
Expand All @@ -2082,8 +2095,8 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
pendingBaseFee = binary.BigEndian.Uint64(v)
}
}
var pendingBlobFee uint64 = 1 // MIN_BLOB_GAS_PRICE A/EIP-4844
{

if pendingBlobFee == 0 {
v, err := tx.GetOne(kv.PoolInfo, PoolPendingBlobFeeKey)
if err != nil {
return err
Expand All @@ -2093,6 +2106,10 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
}
}

if pendingBlobFee == 0 {
pendingBlobFee = minBlobGasPrice
}

err = p.senders.registerNewSenders(&txs, p.logger)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/txpool/pool_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func FuzzOnNewBlocks(f *testing.F) {

cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)

err = pool.Start(ctx, db)
Expand Down Expand Up @@ -540,7 +540,7 @@ func FuzzOnNewBlocks(f *testing.F) {
check(p2pReceived, types.TxSlots{}, "after_flush")
checkNotify(p2pReceived, types.TxSlots{}, "after_flush")

p2, err := New(ch, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
p2, err := New(ch, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)

p2.senders = pool.senders // senders are not persisted
Expand Down
14 changes: 7 additions & 7 deletions erigon-lib/txpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestNonceFromAddress(t *testing.T) {

cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
require.True(pool != nil)
ctx := context.Background()
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestReplaceWithHigherFee(t *testing.T) {

cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
require.NotEqual(nil, pool)
ctx := context.Background()
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestReverseNonces(t *testing.T) {

cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
require.True(pool != nil)
ctx := context.Background()
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestTxPoke(t *testing.T) {

cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
require.True(pool != nil)
ctx := context.Background()
Expand Down Expand Up @@ -682,7 +682,7 @@ func TestShanghaiValidateTx(t *testing.T) {
}

cache := &kvcache.DummyCache{}
pool, err := New(ch, coreDB, cfg, cache, *u256.N1, shanghaiTime, nil /* agraBlock */, nil /* cancunTime */, fixedgas.DefaultMaxBlobsPerBlock, logger)
pool, err := New(ch, coreDB, cfg, cache, *u256.N1, shanghaiTime, nil /* agraBlock */, nil /* cancunTime */, fixedgas.DefaultMaxBlobsPerBlock, nil, logger)
asrt.NoError(err)
ctx := context.Background()
tx, err := coreDB.BeginRw(ctx)
Expand Down Expand Up @@ -728,7 +728,7 @@ func TestBlobTxReplacement(t *testing.T) {
db, coreDB := memdb.NewTestPoolDB(t), memdb.NewTestDB(t)
cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
require.True(pool != nil)
ctx := context.Background()
Expand Down Expand Up @@ -953,7 +953,7 @@ func TestDropRemoteAtNoGossip(t *testing.T) {
logger := log.New()
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)

txPool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, big.NewInt(0), big.NewInt(0), nil, fixedgas.DefaultMaxBlobsPerBlock, logger)
txPool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, big.NewInt(0), big.NewInt(0), nil, fixedgas.DefaultMaxBlobsPerBlock, nil, logger)
assert.NoError(err)
require.True(txPool != nil)

Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/txpool/txpooluitl/all_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func SaveChainConfigIfNeed(ctx context.Context, coreDB kv.RoDB, txPoolDB kv.RwDB
}

func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cache, newTxs chan types.Announcements, chainDB kv.RoDB,
sentryClients []direct.SentryClient, stateChangesClient txpool.StateChangesClient, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) {
sentryClients []direct.SentryClient, stateChangesClient txpool.StateChangesClient, feeCalculator txpool.FeeCalculator, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) {
opts := mdbx.NewMDBX(logger).Label(kv.TxPoolDB).Path(cfg.DBDir).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).
WriteMergeThreshold(3 * 8192).
Expand Down Expand Up @@ -144,7 +144,7 @@ func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cach
cancunTime = cfg.OverrideCancunTime
}

txPool, err := txpool.New(newTxs, chainDB, cfg, cache, *chainID, shanghaiTime, agraBlock, cancunTime, maxBlobsPerBlock, logger)
txPool, err := txpool.New(newTxs, chainDB, cfg, cache, *chainID, shanghaiTime, agraBlock, cancunTime, maxBlobsPerBlock, feeCalculator, logger)
if err != nil {
return nil, nil, nil, nil, nil, err
}
Expand Down
23 changes: 3 additions & 20 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.newTxs = make(chan types2.Announcements, 1024)
//defer close(newTxs)
backend.txPoolDB, backend.txPool, backend.txPoolFetch, backend.txPoolSend, backend.txPoolGrpcServer, err = txpooluitl.AllComponents(
ctx, config.TxPool, kvcache.NewDummy(), backend.newTxs, backend.chainDB, backend.sentriesClient.Sentries(), stateDiffClient, logger,
ctx, config.TxPool, kvcache.NewDummy(), backend.newTxs, backend.chainDB, backend.sentriesClient.Sentries(), stateDiffClient, misc.Eip1559FeeCalculator, logger,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -708,23 +708,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
// 1) Hive tests requires us to do so and starting it from eth_sendRawTransaction is not viable as we have not enough data
// to initialize it properly.
// 2) we cannot propose for block 1 regardless.
go func() {
time.Sleep(10 * time.Millisecond)
baseFee := uint64(0)
if currentBlock.BaseFee() != nil {
baseFee = misc.CalcBaseFee(chainConfig, currentBlock.Header()).Uint64()
}
blobFee := chainConfig.GetMinBlobGasPrice()
if currentBlock.Header().ExcessBlobGas != nil {
excessBlobGas := misc.CalcExcessBlobGas(chainConfig, currentBlock.Header())
b, err := misc.GetBlobGasPrice(chainConfig, excessBlobGas)
if err == nil {
blobFee = b.Uint64()
}
}
backend.notifications.Accumulator.StartChange(currentBlock.NumberU64(), currentBlock.Hash(), nil, false)
backend.notifications.Accumulator.SendAndReset(ctx, backend.notifications.StateChangesConsumer, baseFee, blobFee, currentBlock.GasLimit(), 0)
}()

if !config.DeprecatedTxPool.Disable {
backend.txPoolFetch.ConnectCore()
Expand All @@ -734,15 +717,15 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
newTxsBroadcaster = casted.NewSlotsStreams
}
go txpool.MainLoop(backend.sentryCtx,
backend.txPoolDB, backend.chainDB,
backend.txPool, backend.newTxs, backend.txPoolSend, newTxsBroadcaster,
backend.txPoolDB, backend.txPool, backend.newTxs, backend.txPoolSend, newTxsBroadcaster,
func() {
select {
case backend.notifyMiningAboutNewTxs <- struct{}{}:
default:
}
})
}

go func() {
defer debug.LogPanic()
for {
Expand Down
Loading

0 comments on commit d8b91c4

Please sign in to comment.