diff --git a/cmd/devp2p/internal/ethtest/suite_test.go b/cmd/devp2p/internal/ethtest/suite_test.go index 8a2b132fa3..382842029d 100644 --- a/cmd/devp2p/internal/ethtest/suite_test.go +++ b/cmd/devp2p/internal/ethtest/suite_test.go @@ -117,7 +117,9 @@ func setupGeth(stack *node.Node) error { TrieCleanCacheRejournal: 60 * time.Minute, TrieDirtyCache: 16, TrieTimeout: 60 * time.Minute, + TriesInMemory: 128, SnapshotCache: 10, + Preimages: true, }) if err != nil { return err diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 96999075a3..30fe09008b 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -468,7 +468,7 @@ func dump(ctx *cli.Context) error { config := &trie.Config{ Preimages: true, // always enable preimage lookup } - state, err := state.New(root, state.NewDatabaseWithConfig(db, config), nil) + state, err := state.New(root, state.NewDatabaseWithConfigAndCache(db, config), nil) if err != nil { return err } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index b1d23fdd19..5dcc52bbb5 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -92,6 +92,7 @@ var ( utils.SyncTargetFlag, utils.ExitWhenSyncedFlag, utils.GCModeFlag, + utils.TriesInMemoryFlag, utils.SnapshotFlag, utils.TxLookupLimitFlag, utils.LightServeFlag, diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index 0759341fd9..11636d4732 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -52,6 +52,7 @@ var ( Flags: flags.Merge([]cli.Flag{ utils.CacheTrieJournalFlag, utils.BloomFilterSizeFlag, + utils.TriesInMemoryFlag, }, utils.NetworkFlags, utils.DatabasePathFlags), Description: ` geth snapshot prune-state @@ -73,7 +74,7 @@ the trie clean cache with default directory will be deleted. Usage: "Recalculate state hash based on the snapshot for verification", ArgsUsage: "", Action: verifyState, - Flags: flags.Merge(utils.NetworkFlags, utils.DatabasePathFlags), + Flags: flags.Merge(utils.NetworkFlags, utils.DatabasePathFlags, []cli.Flag{utils.TriesInMemoryFlag}), Description: ` geth snapshot verify-state will traverse the whole accounts and storages set based on the specified @@ -144,6 +145,7 @@ It's also usable without snapshot enabled. utils.ExcludeStorageFlag, utils.StartKeyFlag, utils.DumpLimitFlag, + utils.TriesInMemoryFlag, }, utils.NetworkFlags, utils.DatabasePathFlags), Description: ` This command is semantically equivalent to 'geth dump', but uses the snapshots @@ -171,7 +173,8 @@ func pruneState(ctx *cli.Context) error { Cachedir: stack.ResolvePath(config.Eth.TrieCleanCacheJournal), BloomSize: ctx.Uint64(utils.BloomFilterSizeFlag.Name), } - pruner, err := pruner.NewPruner(chaindb, prunerconfig) + pruner, err := pruner.NewPruner(chaindb, prunerconfig, + pruner.WithTriesInMemory(ctx.Uint64(utils.TriesInMemoryFlag.Name))) if err != nil { log.Error("Failed to open snapshot tree", "err", err) return err @@ -213,7 +216,8 @@ func verifyState(ctx *cli.Context) error { NoBuild: true, AsyncBuild: false, } - snaptree, err := snapshot.New(snapconfig, chaindb, trie.NewDatabase(chaindb), headBlock.Root()) + snaptree, err := snapshot.New(snapconfig, chaindb, trie.NewDatabase(chaindb), headBlock.Root(), + snapshot.SetCapLimit(int(ctx.Uint64(utils.TriesInMemoryFlag.Name)))) if err != nil { log.Error("Failed to open snapshot tree", "err", err) return err @@ -496,7 +500,8 @@ func dumpState(ctx *cli.Context) error { NoBuild: true, AsyncBuild: false, } - snaptree, err := snapshot.New(snapConfig, db, trie.NewDatabase(db), root) + snaptree, err := snapshot.New(snapConfig, db, trie.NewDatabase(db), root, + snapshot.SetCapLimit(int(ctx.Uint64(utils.TriesInMemoryFlag.Name)))) if err != nil { return err } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7b17390d97..377fe5b58a 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -242,6 +242,11 @@ var ( Value: true, Category: flags.EthCategory, } + TriesInMemoryFlag = &cli.Uint64Flag{ + Name: "triesInMemory", + Usage: "The layer of tries trees that keep in memory", + Value: 128, + } TxLookupLimitFlag = &cli.Uint64Flag{ Name: "txlookuplimit", Usage: "Number of recent blocks to maintain transactions index for (default = about one year, 0 = entire chain)", @@ -465,7 +470,7 @@ var ( CacheDatabaseFlag = &cli.IntFlag{ Name: "cache.database", Usage: "Percentage of cache memory allowance to use for database io", - Value: 50, + Value: 40, Category: flags.PerfCategory, } CacheTrieFlag = &cli.IntFlag{ @@ -1852,6 +1857,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheGCFlag.Name) { cfg.TrieDirtyCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheGCFlag.Name) / 100 } + if ctx.IsSet(TriesInMemoryFlag.Name) { + cfg.TriesInMemory = ctx.Uint64(TriesInMemoryFlag.Name) + } if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheSnapshotFlag.Name) { cfg.SnapshotCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheSnapshotFlag.Name) / 100 } @@ -2296,6 +2304,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache, TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive", TrieTimeLimit: ethconfig.Defaults.TrieTimeout, + TriesInMemory: ethconfig.Defaults.TriesInMemory, SnapshotLimit: ethconfig.Defaults.SnapshotCache, Preimages: ctx.Bool(CachePreimagesFlag.Name), } diff --git a/common/gopool/pool.go b/common/gopool/pool.go new file mode 100644 index 0000000000..2c6647aa76 --- /dev/null +++ b/common/gopool/pool.go @@ -0,0 +1,60 @@ +package gopool + +import ( + "time" + + "github.com/panjf2000/ants/v2" +) + +const ( + // DefaultAntsPoolSize is the default expire time of ants pool. + defaultGoroutineExpireDuration = 10 * time.Second +) + +var ( + // defaultPool is the default ants pool for gopool package. + defaultPool *ants.Pool +) + +func init() { + // Init a instance pool when importing ants. + pool, err := ants.NewPool( + ants.DefaultAntsPoolSize, + ants.WithExpiryDuration(defaultGoroutineExpireDuration), + ) + if err != nil { + panic(err) + } + + defaultPool = pool +} + +// Submit submits a task to pool. +func Submit(task func()) error { + return defaultPool.Submit(task) +} + +// Running returns the number of the currently running goroutines. +func Running() int { + return defaultPool.Running() +} + +// Cap returns the capacity of this default pool. +func Cap() int { + return defaultPool.Cap() +} + +// Free returns the available goroutines to work. +func Free() int { + return defaultPool.Free() +} + +// Release Closes the default pool. +func Release() { + defaultPool.Release() +} + +// Reboot reboots the default pool. +func Reboot() { + defaultPool.Reboot() +} diff --git a/common/lru/lru.go b/common/lru/lru.go index 45965adb0d..5afa7b98ab 100644 --- a/common/lru/lru.go +++ b/common/lru/lru.go @@ -93,3 +93,11 @@ func (c *Cache[K, V]) Keys() []K { return c.cache.Keys() } + +// GetOldest returns the oldest entry in the cache. +func (c *Cache[K, V]) GetOldest() (key K, value V, ok bool) { + c.mu.Lock() + defer c.mu.Unlock() + + return c.cache.GetOldest() +} diff --git a/core/asm/lexer.go b/core/asm/lexer.go index d1b79a1fb9..fb705848d2 100644 --- a/core/asm/lexer.go +++ b/core/asm/lexer.go @@ -22,6 +22,8 @@ import ( "strings" "unicode" "unicode/utf8" + + "github.com/ethereum/go-ethereum/common/gopool" ) // stateFn is used through the lifetime of the @@ -103,14 +105,14 @@ func Lex(source []byte, debug bool) <-chan token { state: lexLine, debug: debug, } - go func() { + gopool.Submit(func() { l.emit(lineStart) for l.state != nil { l.state = l.state(l) } l.emit(eof) close(l.tokens) - }() + }) return ch } diff --git a/core/block_validator.go b/core/block_validator.go index bcb228830d..de598b58ab 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -19,6 +19,7 @@ package core import ( "fmt" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -64,29 +65,54 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { if hash := types.CalcUncleHash(block.Uncles()); hash != header.UncleHash { return fmt.Errorf("uncle root hash mismatch (header value %x, calculated %x)", header.UncleHash, hash) } - if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash { - return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash) + + validateFuns := []func() error{ + func() error { + if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash { + return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash) + } + return nil + }, + func() error { + // Withdrawals are present after the Shanghai fork. + if header.WithdrawalsHash != nil { + // Withdrawals list must be present in body after Shanghai. + if block.Withdrawals() == nil { + return fmt.Errorf("missing withdrawals in block body") + } + if hash := types.DeriveSha(block.Withdrawals(), trie.NewStackTrie(nil)); hash != *header.WithdrawalsHash { + return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash) + } + } else if block.Withdrawals() != nil { + // Withdrawals are not allowed prior to shanghai fork + return fmt.Errorf("withdrawals present in block body") + } + return nil + }, + func() error { + if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { + if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) { + return consensus.ErrUnknownAncestor + } + return consensus.ErrPrunedAncestor + } + return nil + }, } - // Withdrawals are present after the Shanghai fork. - if header.WithdrawalsHash != nil { - // Withdrawals list must be present in body after Shanghai. - if block.Withdrawals() == nil { - return fmt.Errorf("missing withdrawals in block body") - } - if hash := types.DeriveSha(block.Withdrawals(), trie.NewStackTrie(nil)); hash != *header.WithdrawalsHash { - return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash) - } - } else if block.Withdrawals() != nil { - // Withdrawals are not allowed prior to shanghai fork - return fmt.Errorf("withdrawals present in block body") + validateRes := make(chan error, len(validateFuns)) + for _, f := range validateFuns { + tmpFunc := f + gopool.Submit(func() { + validateRes <- tmpFunc() + }) } - - if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { - if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) { - return consensus.ErrUnknownAncestor + for i := 0; i < len(validateFuns); i++ { + err := <-validateRes + if err != nil { + return err } - return consensus.ErrPrunedAncestor } + return nil } diff --git a/core/blockchain.go b/core/blockchain.go index 1709f9cbae..9915d0c768 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -30,6 +30,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" @@ -82,8 +83,7 @@ var ( blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) - blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) - blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) + blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) errInsertionInterrupted = errors.New("insertion is interrupted") errChainStopped = errors.New("blockchain is stopped") @@ -96,7 +96,7 @@ const ( txLookupCacheLimit = 1024 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 - TriesInMemory = 128 + TriesInMemory = 128 // default number of recent trie roots to keep in memory // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // @@ -134,6 +134,7 @@ type CacheConfig struct { TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node) TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + TriesInMemory uint64 // How many tries keeps in memory SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory Preimages bool // Whether to store preimage of trie key to the disk @@ -147,6 +148,7 @@ var defaultCacheConfig = &CacheConfig{ TrieCleanLimit: 256, TrieDirtyLimit: 256, TrieTimeLimit: 5 * time.Minute, + TriesInMemory: TriesInMemory, SnapshotLimit: 256, SnapshotWait: true, } @@ -217,6 +219,7 @@ type BlockChain struct { quit chan struct{} // shutdown signal, closed in Stop. running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing + commitLock sync.Mutex // CommitLock is used to protect above field from being modified concurrently engine consensus.Engine validator Validator // Block and state validator interface @@ -233,6 +236,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis if cacheConfig == nil { cacheConfig = defaultCacheConfig } + if cacheConfig.TriesInMemory == 0 { + cacheConfig.TriesInMemory = TriesInMemory + } // Open trie database with provided config triedb := trie.NewDatabaseWithConfig(db, &trie.Config{ @@ -278,9 +284,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis vmConfig: vmConfig, } bc.forker = NewForkChoice(bc, shouldPreserve) - bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb) + bc.stateCache = state.NewDatabaseWithNodeDBAndCache(bc.db, bc.triedb) bc.validator = NewBlockValidator(chainConfig, bc, engine) - bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) + bc.prefetcher = NewStatePrefetcher(chainConfig, bc, engine) bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error @@ -407,7 +413,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis NoBuild: bc.cacheConfig.SnapshotNoBuild, AsyncBuild: !bc.cacheConfig.SnapshotWait, } - bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root) + bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root, snapshot.SetCapLimit(int(bc.cacheConfig.TriesInMemory))) } // Start future block processor. @@ -963,7 +969,7 @@ func (bc *BlockChain) Stop() { if !bc.cacheConfig.TrieDirtyDisabled { triedb := bc.triedb - for _, offset := range []uint64{0, 1, TriesInMemory - 1} { + for _, offset := range []uint64{0, 1, bc.cacheConfig.TriesInMemory - 1} { if number := bc.CurrentBlock().Number.Uint64(); number > offset { recent := bc.GetBlockByNumber(number - offset) @@ -1341,80 +1347,117 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Calculate the total difficulty of the block ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1) if ptd == nil { + state.StopPrefetcher() return consensus.ErrUnknownAncestor } - // Make sure no inconsistent state is leaked during insertion - externTd := new(big.Int).Add(block.Difficulty(), ptd) - // Irrelevant of the canonical status, write the block itself to the database. - // - // Note all the components of block(td, hash->number map, header, body, receipts) - // should be written atomically. BlockBatch is used for containing all components. - blockBatch := bc.db.NewBatch() - rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) - rawdb.WriteBlock(blockBatch, block) - rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) - rawdb.WritePreimages(blockBatch, state.Preimages()) - if err := blockBatch.Write(); err != nil { - log.Crit("Failed to write block into disk", "err", err) - } - // Commit all cached state changes into underlying memory database. - root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) - if err != nil { - return err - } - // If we're running an archive node, always flush - if bc.cacheConfig.TrieDirtyDisabled { - return bc.triedb.Commit(root, false) - } - // Full but not archive node, do proper garbage collection - bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive - bc.triegc.Push(root, -int64(block.NumberU64())) + postCommitFuncs := []func() error{ + func() error { + bc.commitLock.Lock() + defer bc.commitLock.Unlock() - current := block.NumberU64() - // Flush limits are not considered for the first TriesInMemory blocks. - if current <= TriesInMemory { - return nil - } - // If we exceeded our memory allowance, flush matured singleton nodes to disk - var ( - nodes, imgs = bc.triedb.Size() - limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 - ) - if nodes > limit || imgs > 4*1024*1024 { - bc.triedb.Cap(limit - ethdb.IdealBatchSize) - } - // Find the next state trie we need to commit - chosen := current - TriesInMemory - flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval)) - // If we exceeded time allowance, flush an entire trie to disk - if bc.gcproc > flushInterval { - // If the header is missing (canonical chain behind), we're reorging a low - // diff sidechain. Suspend committing until this operation is completed. - header := bc.GetHeaderByNumber(chosen) - if header == nil { - log.Warn("Reorg in progress, trie commit postponed", "number", chosen) - } else { - // If we're exceeding limits but haven't reached a large enough memory gap, - // warn the user that the system is becoming unstable. - if chosen < bc.lastWrite+TriesInMemory && bc.gcproc >= 2*flushInterval { - log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/TriesInMemory) + root := block.Root() + // If we're running an archive node, always flush + if bc.cacheConfig.TrieDirtyDisabled { + return bc.triedb.Commit(root, false) } - // Flush an entire trie and restart the counters - bc.triedb.Commit(header.Root, true) - bc.lastWrite = chosen - bc.gcproc = 0 - } + // Full but not archive node, do proper garbage collection + bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive + bc.triegc.Push(root, -int64(block.NumberU64())) + + current := block.NumberU64() + // Flush limits are not considered for the first TriesInMemory blocks. + if current <= bc.cacheConfig.TriesInMemory { + return nil + } + // If we exceeded our memory allowance, flush matured singleton nodes to disk + var ( + nodes, imgs = bc.triedb.Size() + limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 + ) + if nodes > limit || imgs > 4*1024*1024 { + bc.triedb.Cap(limit - ethdb.IdealBatchSize) + } + // Find the next state trie we need to commit + chosen := current - bc.cacheConfig.TriesInMemory + flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval)) + // If we exceeded time allowance, flush an entire trie to disk + if bc.gcproc > flushInterval { + // If the header is missing (canonical chain behind), we're reorging a low + // diff sidechain. Suspend committing until this operation is completed. + header := bc.GetHeaderByNumber(chosen) + if header == nil { + log.Warn("Reorg in progress, trie commit postponed", "number", chosen) + } else { + // If we're exceeding limits but haven't reached a large enough memory gap, + // warn the user that the system is becoming unstable. + if chosen < bc.lastWrite+bc.cacheConfig.TriesInMemory && bc.gcproc >= 2*flushInterval { + log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/TriesInMemory) + } + // Flush an entire trie and restart the counters + bc.triedb.Commit(header.Root, true) + bc.lastWrite = chosen + bc.gcproc = 0 + } + } + // Garbage collect anything below our required write retention + for !bc.triegc.Empty() { + root, number := bc.triegc.Pop() + if uint64(-number) > chosen { + bc.triegc.Push(root, number) + break + } + bc.triedb.Dereference(root) + } + return nil + }, + } + + commitFuncs := []func() error{ + func() error { + // Make sure no inconsistent state is leaked during insertion + externTd := new(big.Int).Add(block.Difficulty(), ptd) + + // Irrelevant of the canonical status, write the block itself to the database. + // + // Note all the components of block(td, hash->number map, header, body, receipts) + // should be written atomically. BlockBatch is used for containing all components. + blockBatch := bc.db.NewBatch() + rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) + rawdb.WriteBlock(blockBatch, block) + rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) + rawdb.WritePreimages(blockBatch, state.Preimages()) + if err := blockBatch.Write(); err != nil { + log.Crit("Failed to write block into disk", "err", err) + } + return nil + }, + func() error { + // Commit all cached state changes into underlying memory database. + _, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()), postCommitFuncs...) + if err != nil { + return err + } + return nil + }, } - // Garbage collect anything below our required write retention - for !bc.triegc.Empty() { - root, number := bc.triegc.Pop() - if uint64(-number) > chosen { - bc.triegc.Push(root, number) - break + + // commit all the data to the database + commitRes := make(chan error, len(commitFuncs)) + for i := 0; i < len(commitFuncs); i++ { + commitFunc := commitFuncs[i] + gopool.Submit(func() { + commitRes <- commitFunc() + }) + } + // wait for the block to be written to disk and state to be committed + for i := 0; i < len(commitFuncs); i++ { + err := <-commitRes + if err != nil { + return err } - bc.triedb.Dereference(root) } + return nil } @@ -1728,7 +1771,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } - statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) + statedb, err := state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps) if err != nil { return it.index, err } @@ -1739,18 +1782,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. - var followupInterrupt uint32 + interruptCh := make(chan struct{}) if !bc.cacheConfig.TrieCleanNoPrefetch { if followup, err := it.peek(); followup != nil && err == nil { throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) go func(start time.Time, followup *types.Block, throwaway *state.StateDB) { - bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) + bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, interruptCh) blockPrefetchExecuteTimer.Update(time.Since(start)) - if atomic.LoadUint32(&followupInterrupt) == 1 { - blockPrefetchInterruptMeter.Mark(1) - } }(time.Now(), followup, throwaway) } } @@ -1760,7 +1800,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) - atomic.StoreUint32(&followupInterrupt, 1) + close(interruptCh) return it.index, err } ptime := time.Since(pstart) @@ -1768,7 +1808,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) vstart := time.Now() if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) - atomic.StoreUint32(&followupInterrupt, 1) + close(interruptCh) return it.index, err } vtime := time.Since(vstart) @@ -1801,7 +1841,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) } else { status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false) } - atomic.StoreUint32(&followupInterrupt, 1) + close(interruptCh) if err != nil { return it.index, err } diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index 6a4a9c9d22..bf260a02a9 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -1777,6 +1777,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) { TrieDirtyLimit: 256, TrieTimeLimit: 5 * time.Minute, SnapshotLimit: 0, // Disable snapshot by default + TriesInMemory: 128, } ) defer engine.Close() diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index 0d2f6f950d..b3f63b134d 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -27,6 +27,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/crypto" ) @@ -164,7 +165,7 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin // Read the output from the result sink and deliver to the user session.pend.Add(1) - go func() { + gopool.Submit(func() { defer session.pend.Done() defer close(results) @@ -210,7 +211,7 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin } } } - }() + }) return session, nil } @@ -226,7 +227,7 @@ func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) ch source := make(chan *partialMatches, buffer) session.pend.Add(1) - go func() { + gopool.Submit(func() { defer session.pend.Done() defer close(source) @@ -237,7 +238,7 @@ func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) ch case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}: } } - }() + }) // Assemble the daisy-chained filtering pipeline next := source dist := make(chan *request, buffer) @@ -247,7 +248,7 @@ func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) ch } // Start the request distribution session.pend.Add(1) - go m.distributor(dist, session) + gopool.Submit(func() { m.distributor(dist, session) }) return next } @@ -273,7 +274,7 @@ func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloo results := make(chan *partialMatches, cap(source)) session.pend.Add(2) - go func() { + gopool.Submit(func() { // Tear down the goroutine and terminate all source channels defer session.pend.Done() defer close(process) @@ -314,9 +315,9 @@ func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloo } } } - }() + }) - go func() { + gopool.Submit(func() { // Tear down the goroutine and terminate the final sink channel defer session.pend.Done() defer close(results) @@ -372,7 +373,7 @@ func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloo } } } - }() + }) return results } diff --git a/core/bloombits/scheduler.go b/core/bloombits/scheduler.go index 6449c7465a..bd78a8e91b 100644 --- a/core/bloombits/scheduler.go +++ b/core/bloombits/scheduler.go @@ -18,6 +18,8 @@ package bloombits import ( "sync" + + "github.com/ethereum/go-ethereum/common/gopool" ) // request represents a bloom retrieval task to prioritize and pull from the local @@ -63,8 +65,8 @@ func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []by // Start the pipeline schedulers to forward between user -> distributor -> user wg.Add(2) - go s.scheduleRequests(sections, dist, pend, quit, wg) - go s.scheduleDeliveries(pend, done, quit, wg) + gopool.Submit(func() { s.scheduleRequests(sections, dist, pend, quit, wg) }) + gopool.Submit(func() { s.scheduleDeliveries(pend, done, quit, wg) }) } // reset cleans up any leftovers from previous runs. This is required before a diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 102943516e..82cc2abaf3 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -164,7 +165,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool } go lookup() // start the sequential db accessor for i := 0; i < int(threads); i++ { - go process() + gopool.Submit(func() { process() }) } return hashesCh } diff --git a/core/sender_cacher.go b/core/sender_cacher.go index 4be53619eb..18bcc4ffb4 100644 --- a/core/sender_cacher.go +++ b/core/sender_cacher.go @@ -19,6 +19,7 @@ package core import ( "runtime" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/types" ) @@ -52,7 +53,7 @@ func newTxSenderCacher(threads int) *txSenderCacher { threads: threads, } for i := 0; i < threads; i++ { - go cacher.cache() + gopool.Submit(func() { cacher.cache() }) } return cacher } diff --git a/core/state/database.go b/core/state/database.go index d3c36c10ac..da26719816 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -19,6 +19,7 @@ package state import ( "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/lru" @@ -34,6 +35,18 @@ const ( // Cache size granted for caching clean code. codeCacheSize = 64 * 1024 * 1024 + + // Number of state trie in cache + accountTrieCacheSize = 32 + + // Number of storage Trie in cache + storageTrieCacheSize = 2000 + + // Purge interval + purgeInterval = 600 * time.Second + + // Max size of account trie cache + maxAccountTrieSize = 1024 * 1024 ) // Database wraps access to tries and contract code. @@ -58,6 +71,15 @@ type Database interface { // TrieDB retrieves the low level trie database used for data storage. TrieDB() *trie.Database + + // Cache the account trie tree + CacheAccount(root common.Hash, t Trie) + + // Cache the storage trie tree + CacheStorage(addrHash common.Hash, root common.Hash, t Trie) + + // Purge cache + Purge() } // Trie is a Ethereum Merkle Patricia trie. @@ -144,6 +166,21 @@ func NewDatabaseWithConfig(db ethdb.Database, config *trie.Config) Database { } } +// NewDatabaseWithConfigAndCache creates a backing store for state. The returned database +// is safe for concurrent use and retains a lot of collapsed RLP trie nodes in a +// large memory cache. +func NewDatabaseWithConfigAndCache(db ethdb.Database, config *trie.Config) Database { + database := &cachingDB{ + disk: db, + codeSizeCache: lru.NewCache[common.Hash, int](codeSizeCacheSize), + codeCache: lru.NewSizeConstrainedCache[common.Hash, []byte](codeCacheSize), + accountTrieCache: lru.NewCache[common.Hash, Trie](accountTrieCacheSize), + storageTrieCache: lru.NewCache[common.Hash, [3]*triePair](storageTrieCacheSize), + } + go database.purgeLoop() + return database +} + // NewDatabaseWithNodeDB creates a state database with an already initialized node database. func NewDatabaseWithNodeDB(db ethdb.Database, triedb *trie.Database) Database { return &cachingDB{ @@ -154,11 +191,49 @@ func NewDatabaseWithNodeDB(db ethdb.Database, triedb *trie.Database) Database { } } +func NewDatabaseWithNodeDBAndCache(db ethdb.Database, triedb *trie.Database) Database { + database := &cachingDB{ + disk: db, + codeSizeCache: lru.NewCache[common.Hash, int](codeSizeCacheSize), + codeCache: lru.NewSizeConstrainedCache[common.Hash, []byte](codeCacheSize), + triedb: triedb, + accountTrieCache: lru.NewCache[common.Hash, Trie](accountTrieCacheSize), + storageTrieCache: lru.NewCache[common.Hash, [3]*triePair](storageTrieCacheSize), + } + go database.purgeLoop() + return database +} + type cachingDB struct { disk ethdb.KeyValueStore codeSizeCache *lru.Cache[common.Hash, int] codeCache *lru.SizeConstrainedCache[common.Hash, []byte] triedb *trie.Database + + accountTrieCache *lru.Cache[common.Hash, Trie] + storageTrieCache *lru.Cache[common.Hash, [3]*triePair] +} + +type triePair struct { + root common.Hash + trie Trie +} + +func (db *cachingDB) purgeLoop() { + ticker := time.NewTicker(purgeInterval) + defer ticker.Stop() + go func() { + for range ticker.C { + _, accounts, ok := db.accountTrieCache.GetOldest() + if !ok { + continue + } + tr := accounts.(*trie.SecureTrie).GetRawTrie() + if tr.Size() > maxAccountTrieSize { + db.Purge() + } + } + }() } // OpenTrie opens the main account trie at a specific root hash. @@ -239,3 +314,48 @@ func (db *cachingDB) DiskDB() ethdb.KeyValueStore { func (db *cachingDB) TrieDB() *trie.Database { return db.triedb } + +// CacheAccount caches the given trie in memory for future access. +func (db *cachingDB) CacheAccount(root common.Hash, t Trie) { + if db.accountTrieCache == nil { + return + } + tr, ok := t.(*trie.SecureTrie) + if !ok { + return + } + db.accountTrieCache.Add(root, tr.ResetCopy()) +} + +// CacheStorage caches the given trie in memory for future access. +func (db *cachingDB) CacheStorage(addrHash common.Hash, root common.Hash, t Trie) { + if db.storageTrieCache == nil { + return + } + tr, ok := t.(*trie.SecureTrie) + if !ok { + return + } + if tries, exist := db.storageTrieCache.Get(addrHash); exist { + newTriesArray := [3]*triePair{ + {root: root, trie: tr.ResetCopy()}, + tries[0], + tries[1], + } + db.storageTrieCache.Add(addrHash, newTriesArray) + return + } + + triesArray := [3]*triePair{{root: root, trie: tr.ResetCopy()}, nil, nil} + db.storageTrieCache.Add(addrHash, triesArray) +} + +// Purge removes the given trie from the cache. +func (db *cachingDB) Purge() { + if db.storageTrieCache != nil { + db.storageTrieCache.Purge() + } + if db.accountTrieCache != nil { + db.accountTrieCache.Purge() + } +} diff --git a/core/state/journal.go b/core/state/journal.go index 1722fb4c02..3e1a9a405a 100644 --- a/core/state/journal.go +++ b/core/state/journal.go @@ -43,7 +43,8 @@ type journal struct { // newJournal creates a new initialized journal. func newJournal() *journal { return &journal{ - dirties: make(map[common.Address]int), + dirties: make(map[common.Address]int, defaultNumOfSlots), + entries: make([]journalEntry, 0, defaultNumOfSlots), } } @@ -266,7 +267,9 @@ func (ch accessListAddAccountChange) revert(s *StateDB) { (addr) at this point, since no storage adds can remain when come upon a single (addr) change. */ - s.accessList.DeleteAddress(*ch.address) + if s.accessList != nil { + s.accessList.DeleteAddress(*ch.address) + } } func (ch accessListAddAccountChange) dirtied() *common.Address { @@ -274,7 +277,9 @@ func (ch accessListAddAccountChange) dirtied() *common.Address { } func (ch accessListAddSlotChange) revert(s *StateDB) { - s.accessList.DeleteSlot(*ch.address, *ch.slot) + if s.accessList != nil { + s.accessList.DeleteSlot(*ch.address, *ch.slot) + } } func (ch accessListAddSlotChange) dirtied() *common.Address { diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index 2bd5658e0a..d50033fcc8 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -28,6 +28,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" @@ -61,6 +62,16 @@ type Config struct { BloomSize uint64 // The Megabytes of memory allocated to bloom-filter } +// PrunerOption is the option for pruner. +type PrunerOption func(*Pruner) + +// WithTriesInMemory sets the number of tries to keep in memory. +func WithTriesInMemory(triesInMemory uint64) PrunerOption { + return func(p *Pruner) { + p.triesInMemory = triesInMemory + } +} + // Pruner is an offline tool to prune the stale state with the // help of the snapshot. The workflow of pruner is very simple: // @@ -73,15 +84,16 @@ type Config struct { // periodically in order to release the disk usage and improve the // disk read performance to some extent. type Pruner struct { - config Config - chainHeader *types.Header - db ethdb.Database - stateBloom *stateBloom - snaptree *snapshot.Tree + config Config + chainHeader *types.Header + db ethdb.Database + stateBloom *stateBloom + snaptree *snapshot.Tree + triesInMemory uint64 } // NewPruner creates the pruner instance. -func NewPruner(db ethdb.Database, config Config) (*Pruner, error) { +func NewPruner(db ethdb.Database, config Config, opts ...PrunerOption) (*Pruner, error) { headBlock := rawdb.ReadHeadBlock(db) if headBlock == nil { return nil, errors.New("failed to load head block") @@ -105,13 +117,20 @@ func NewPruner(db ethdb.Database, config Config) (*Pruner, error) { if err != nil { return nil, err } - return &Pruner{ - config: config, - chainHeader: headBlock.Header(), - db: db, - stateBloom: stateBloom, - snaptree: snaptree, - }, nil + pruner := &Pruner{ + config: config, + chainHeader: headBlock.Header(), + db: db, + stateBloom: stateBloom, + snaptree: snaptree, + triesInMemory: core.TriesInMemory, + } + for _, opt := range opts { + if opt != nil { + opt(pruner) + } + } + return pruner, nil } func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, middleStateRoots map[common.Hash]struct{}, start time.Time) error { @@ -243,23 +262,23 @@ func (p *Pruner) Prune(root common.Hash) error { return err } if stateBloomRoot != (common.Hash{}) { - return RecoverPruning(p.config.Datadir, p.db, p.config.Cachedir) + return RecoverPruning(p.config.Datadir, p.db, p.config.Cachedir, p.triesInMemory) } - // If the target state root is not specified, use the HEAD-127 as the + // If the target state root is not specified, use the HEAD-(n-1) as the // target. The reason for picking it is: // - in most of the normal cases, the related state is available // - the probability of this layer being reorg is very low var layers []snapshot.Snapshot if root == (common.Hash{}) { // Retrieve all snapshot layers from the current HEAD. - // In theory there are 128 difflayers + 1 disk layer present, - // so 128 diff layers are expected to be returned. - layers = p.snaptree.Snapshots(p.chainHeader.Root, 128, true) - if len(layers) != 128 { - // Reject if the accumulated diff layers are less than 128. It + // In theory there are n difflayers + 1 disk layer present, + // so n diff layers are expected to be returned. + layers = p.snaptree.Snapshots(p.chainHeader.Root, int(p.triesInMemory), true) + if len(layers) != int(p.triesInMemory) { + // Reject if the accumulated diff layers are less than n. It // means in most of normal cases, there is no associated state // with bottom-most diff layer. - return fmt.Errorf("snapshot not old enough yet: need %d more blocks", 128-len(layers)) + return fmt.Errorf("snapshot not old enough yet: need %d more blocks", int(p.triesInMemory)-len(layers)) } // Use the bottom-most diff layer as the target root = layers[len(layers)-1].Root() @@ -271,8 +290,8 @@ func (p *Pruner) Prune(root common.Hash) error { // The special case is for clique based networks(rinkeby, goerli // and some other private networks), it's possible that two // consecutive blocks will have same root. In this case snapshot - // difflayer won't be created. So HEAD-127 may not paired with - // head-127 layer. Instead the paired layer is higher than the + // difflayer won't be created. So HEAD-(n-1) may not paired with + // head-(n-1) layer. Instead the paired layer is higher than the // bottom-most diff layer. Try to find the bottom-most snapshot // layer with state available. // @@ -344,7 +363,7 @@ func (p *Pruner) Prune(root common.Hash) error { // pruning can be resumed. What's more if the bloom filter is constructed, the // pruning **has to be resumed**. Otherwise a lot of dangling nodes may be left // in the disk. -func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) error { +func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string, triesInMemory uint64) error { stateBloomPath, stateBloomRoot, err := findBloomFilter(datadir) if err != nil { return err @@ -370,7 +389,7 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err NoBuild: true, AsyncBuild: false, } - snaptree, err := snapshot.New(snapconfig, db, trie.NewDatabase(db), headBlock.Root()) + snaptree, err := snapshot.New(snapconfig, db, trie.NewDatabase(db), headBlock.Root(), snapshot.SetCapLimit(int(triesInMemory))) if err != nil { return err // The relevant snapshot(s) might not exist } @@ -390,7 +409,7 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err // otherwise the dangling state will be left. var ( found bool - layers = snaptree.Snapshots(headBlock.Root(), 128, true) + layers = snaptree.Snapshots(headBlock.Root(), int(triesInMemory), true) middleRoots = make(map[common.Hash]struct{}) ) for _, layer := range layers { diff --git a/core/state/shared_pool.go b/core/state/shared_pool.go new file mode 100644 index 0000000000..ba96c2c27d --- /dev/null +++ b/core/state/shared_pool.go @@ -0,0 +1,39 @@ +package state + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" +) + +// sharedPool is used to store maps of originStorage of stateObjects +type StoragePool struct { + sync.RWMutex + sharedMap map[common.Address]*sync.Map +} + +func NewStoragePool() *StoragePool { + sharedMap := make(map[common.Address]*sync.Map) + return &StoragePool{ + sync.RWMutex{}, + sharedMap, + } +} + +// getStorage Check whether the storage exist in pool, +// new one if not exist, the content of storage will be fetched in stateObjects.GetCommittedState() +func (s *StoragePool) getStorage(address common.Address) *sync.Map { + s.RLock() + storageMap, ok := s.sharedMap[address] + s.RUnlock() + if !ok { + s.Lock() + defer s.Unlock() + if storageMap, ok = s.sharedMap[address]; !ok { + m := new(sync.Map) + s.sharedMap[address] = m + return m + } + } + return storageMap +} diff --git a/core/state/snapshot/conversion.go b/core/state/snapshot/conversion.go index ed7cb963ad..8187921b4f 100644 --- a/core/state/snapshot/conversion.go +++ b/core/state/snapshot/conversion.go @@ -27,6 +27,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -317,7 +318,8 @@ func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, accou if err != nil { return stop(err) } - go func(hash common.Hash) { + hash := it.Hash() + gopool.Submit(func() { subroot, err := leafCallback(db, hash, common.BytesToHash(account.CodeHash), stats) if err != nil { results <- err @@ -328,7 +330,7 @@ func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, accou return } results <- nil - }(it.Hash()) + }) fullData, err = rlp.EncodeToBytes(account) if err != nil { return stop(err) diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 0f3fa2c7a4..470c60a9cf 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -166,16 +166,27 @@ type Config struct { // storage data to avoid expensive multi-level trie lookups; and to allow sorted, // cheap iteration of the account/storage tries for sync aid. type Tree struct { - config Config // Snapshots configurations - diskdb ethdb.KeyValueStore // Persistent database to store the snapshot - triedb *trie.Database // In-memory cache to access the trie through - layers map[common.Hash]snapshot // Collection of all known layers - lock sync.RWMutex + config Config // Snapshots configurations + diskdb ethdb.KeyValueStore // Persistent database to store the snapshot + triedb *trie.Database // In-memory cache to access the trie through + layers map[common.Hash]snapshot // Collection of all known layers + lock sync.RWMutex + capLimit int // Maximum number of layers permitted to keep in memory // Test hooks onFlatten func() // Hook invoked when the bottom most diff layers are flattened } +// SnapshotOption is a function that can be passed to New to configure the snapshot. +type SnapshotOption func(*Tree) + +// SetCapLimit sets the maximum number of layers permitted to keep in memory. +func SetCapLimit(capLimit int) SnapshotOption { + return func(tree *Tree) { + tree.capLimit = capLimit + } +} + // New attempts to load an already existing snapshot from a persistent key-value // store (with a number of memory layers from a journal), ensuring that the head // of the snapshot matches the expected one. @@ -192,14 +203,21 @@ type Tree struct { // state trie. // - otherwise, the entire snapshot is considered invalid and will be recreated on // a background thread. -func New(config Config, diskdb ethdb.KeyValueStore, triedb *trie.Database, root common.Hash) (*Tree, error) { +func New(config Config, diskdb ethdb.KeyValueStore, triedb *trie.Database, root common.Hash, opts ...SnapshotOption) (*Tree, error) { // Create a new, empty snapshot tree snap := &Tree{ - config: config, - diskdb: diskdb, - triedb: triedb, - layers: make(map[common.Hash]snapshot), + config: config, + diskdb: diskdb, + triedb: triedb, + capLimit: 128, + layers: make(map[common.Hash]snapshot), + } + for _, opt := range opts { + if opt != nil { + opt(snap) + } } + // Attempt to load a previously persisted snapshot and rebuild one if failed head, disabled, err := loadSnapshot(diskdb, triedb, root, config.CacheSize, config.Recovery, config.NoBuild) if disabled { @@ -852,3 +870,8 @@ func (t *Tree) DiskRoot() common.Hash { return t.diskRoot() } + +// CapLimit returns the cap limit of the snapshot. +func (t *Tree) CapLimit() int { + return t.capLimit +} diff --git a/core/state/state_object.go b/core/state/state_object.go index 7e34cba44a..2bf9dd2f4e 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "math/big" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -70,7 +71,9 @@ type stateObject struct { trie Trie // storage trie, which becomes non-nil on first access code Code // contract bytecode, which gets set when code is loaded - originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction + sharedOriginStorage *sync.Map // Storage cache of original entries to dedup rewrites, reset for every transaction + originStorage Storage + pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block dirtyStorage Storage // Storage entries that have been modified in the current transaction execution @@ -98,14 +101,20 @@ func newObject(db *StateDB, address common.Address, data types.StateAccount) *st if data.Root == (common.Hash{}) { data.Root = types.EmptyRootHash } + var storageMap *sync.Map + // Check whether the storage exist in pool, new originStorage if not exist + if db != nil && db.storagePool != nil { + storageMap = db.GetStorage(address) + } return &stateObject{ - db: db, - address: address, - addrHash: crypto.Keccak256Hash(address[:]), - data: data, - originStorage: make(Storage), - pendingStorage: make(Storage), - dirtyStorage: make(Storage), + db: db, + address: address, + addrHash: crypto.Keccak256Hash(address[:]), + data: data, + sharedOriginStorage: storageMap, + originStorage: make(Storage), + pendingStorage: make(Storage), + dirtyStorage: make(Storage), } } @@ -163,13 +172,36 @@ func (s *stateObject) GetState(db Database, key common.Hash) common.Hash { return s.GetCommittedState(db, key) } +func (s *stateObject) getOriginStorage(key common.Hash) (common.Hash, bool) { + if value, cached := s.originStorage[key]; cached { + return value, true + } + // if L1 cache miss, try to get it from shared pool + if s.sharedOriginStorage != nil { + val, ok := s.sharedOriginStorage.Load(key) + if !ok { + return common.Hash{}, false + } + s.originStorage[key] = val.(common.Hash) + return val.(common.Hash), true + } + return common.Hash{}, false +} + +func (s *stateObject) setOriginStorage(key common.Hash, value common.Hash) { + if s.db.writeOnSharedStorage && s.sharedOriginStorage != nil { + s.sharedOriginStorage.Store(key, value) + } + s.originStorage[key] = value +} + // GetCommittedState retrieves a value from the committed account storage trie. func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Hash { // If we have a pending write or clean cached, return that if value, pending := s.pendingStorage[key]; pending { return value } - if value, cached := s.originStorage[key]; cached { + if value, cached := s.getOriginStorage(key); cached { return value } // If the object was destructed in *this* block (and potentially resurrected), @@ -218,7 +250,7 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has } value.SetBytes(content) } - s.originStorage[key] = value + s.setOriginStorage(key, value) return value } @@ -310,6 +342,7 @@ func (s *stateObject) updateTrie(db Database) (Trie, error) { } // If state snapshotting is active, cache the data til commit if s.db.snap != nil { + s.db.snapMux.Lock() if storage == nil { // Retrieve the old storage map, if available, create a new one otherwise if storage = s.db.snapStorage[s.addrHash]; storage == nil { @@ -318,6 +351,7 @@ func (s *stateObject) updateTrie(db Database) (Trie, error) { } } storage[crypto.HashData(hasher, key[:])] = v // v will be nil if it's deleted + s.db.snapMux.Unlock() } usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure } @@ -357,6 +391,9 @@ func (s *stateObject) commitTrie(db Database) (*trie.NodeSet, error) { } // If nothing changed, don't bother with committing anything if tr == nil { + if s.trie != nil && s.data.Root != types.EmptyRootHash { + db.CacheStorage(s.addrHash, s.data.Root, s.trie) + } return nil, nil } // Track the amount of time wasted on committing the storage trie @@ -365,6 +402,9 @@ func (s *stateObject) commitTrie(db Database) (*trie.NodeSet, error) { } root, nodes := tr.Commit(false) s.data.Root = root + if s.data.Root != types.EmptyRootHash { + db.CacheStorage(s.addrHash, s.data.Root, s.trie) + } return nodes, nil } diff --git a/core/state/statedb.go b/core/state/statedb.go index 54d5040451..b4296a9498 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -22,9 +22,11 @@ import ( "fmt" "math/big" "sort" + "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" @@ -36,6 +38,8 @@ import ( "github.com/ethereum/go-ethereum/trie" ) +const defaultNumOfSlots = 100 + type revision struct { id int journalIndex int @@ -67,6 +71,7 @@ type StateDB struct { // It will be updated when the Commit is called. originalRoot common.Hash + snapMux sync.Mutex snaps *snapshot.Tree snap snapshot.Snapshot snapAccounts map[common.Hash][]byte @@ -78,6 +83,8 @@ type StateDB struct { stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution stateObjectsDestruct map[common.Address]struct{} // State objects destructed in the block + storagePool *StoragePool // sharedPool to store L1 originStorage of stateObjects + writeOnSharedStorage bool // Write to the shared origin storage of a stateObject while reading from the underlying storage layer. // DB error. // State objects are used by the consensus core and VM which are // unable to deal with database-level errors. Any error that occurs @@ -140,11 +147,11 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) trie: tr, originalRoot: root, snaps: snaps, - stateObjects: make(map[common.Address]*stateObject), - stateObjectsPending: make(map[common.Address]struct{}), - stateObjectsDirty: make(map[common.Address]struct{}), - stateObjectsDestruct: make(map[common.Address]struct{}), - logs: make(map[common.Hash][]*types.Log), + stateObjects: make(map[common.Address]*stateObject, defaultNumOfSlots), + stateObjectsPending: make(map[common.Address]struct{}, defaultNumOfSlots), + stateObjectsDirty: make(map[common.Address]struct{}, defaultNumOfSlots), + stateObjectsDestruct: make(map[common.Address]struct{}, defaultNumOfSlots), + logs: make(map[common.Hash][]*types.Log, defaultNumOfSlots), preimages: make(map[common.Hash][]byte), journal: newJournal(), accessList: newAccessList(), @@ -160,6 +167,20 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return sdb, nil } +// NewWithSharedPool creates a new state with sharedStorge on layer 1.5 +func NewWithSharedPool(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) { + statedb, err := New(root, db, snaps) + if err != nil { + return nil, err + } + statedb.storagePool = NewStoragePool() + return statedb, nil +} + +func (s *StateDB) EnableWriteOnSharedStorage() { + s.writeOnSharedStorage = true +} + // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. @@ -696,6 +717,17 @@ func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common // Copy creates a deep, independent copy of the state. // Snapshots of the copied state cannot be applied to the copy. func (s *StateDB) Copy() *StateDB { + return s.copyInternal(false) +} + +// It is mainly for state prefetcher to do trie prefetch right now. +func (s *StateDB) CopyDoPrefetch() *StateDB { + return s.copyInternal(true) +} + +// If doPrefetch is true, it tries to reuse the prefetcher, the copied StateDB will do active trie prefetch. +// otherwise, just do inactive copy trie prefetcher. +func (s *StateDB) copyInternal(doPrefetch bool) *StateDB { // Copy all the basic fields, initialize the memory ones state := &StateDB{ db: s.db, @@ -705,6 +737,7 @@ func (s *StateDB) Copy() *StateDB { stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)), stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)), stateObjectsDestruct: make(map[common.Address]struct{}, len(s.stateObjectsDestruct)), + storagePool: s.storagePool, refund: s.refund, logs: make(map[common.Hash][]*types.Log, len(s.logs)), logSize: s.logSize, @@ -771,7 +804,8 @@ func (s *StateDB) Copy() *StateDB { // If there's a prefetcher running, make an inactive copy of it that can // only access data but does not actively preload (since the user will not // know that they need to explicitly terminate an active copy). - if s.prefetcher != nil { + state.prefetcher = s.prefetcher + if s.prefetcher != nil && !doPrefetch { state.prefetcher = s.prefetcher.copy() } if s.snaps != nil { @@ -957,128 +991,160 @@ func (s *StateDB) clearJournalAndRefund() { } // Commit writes the state to the underlying in-memory trie database. -func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { +func (s *StateDB) Commit(deleteEmptyObjects bool, postCommitFuncs ...func() error) (common.Hash, error) { // Short circuit in case any database failure occurred earlier. if s.dbErr != nil { return common.Hash{}, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr) } // Finalize any pending changes and merge everything into the tries - s.IntermediateRoot(deleteEmptyObjects) - - // Commit objects to the trie, measuring the elapsed time - var ( - accountTrieNodesUpdated int - accountTrieNodesDeleted int - storageTrieNodesUpdated int - storageTrieNodesDeleted int - nodes = trie.NewMergedNodeSet() - codeWriter = s.db.DiskDB().NewBatch() - ) - for addr := range s.stateObjectsDirty { - if obj := s.stateObjects[addr]; !obj.deleted { - // Write any contract code associated with the state object - if obj.code != nil && obj.dirtyCode { - rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) - obj.dirtyCode = false + root := s.IntermediateRoot(deleteEmptyObjects) + + commitFuncs := []func() error{ + func() error { + // Commit objects to the trie, measuring the elapsed time + var ( + accountTrieNodesUpdated int + accountTrieNodesDeleted int + storageTrieNodesUpdated int + storageTrieNodesDeleted int + nodes = trie.NewMergedNodeSet() + codeWriter = s.db.DiskDB().NewBatch() + ) + for addr := range s.stateObjectsDirty { + if obj := s.stateObjects[addr]; !obj.deleted { + // Write any contract code associated with the state object + if obj.code != nil && obj.dirtyCode { + rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) + obj.dirtyCode = false + } + // Write any storage changes in the state object to its storage trie + set, err := obj.commitTrie(s.db) + if err != nil { + return err + } + // Merge the dirty nodes of storage trie into global set + if set != nil { + if err := nodes.Merge(set); err != nil { + return err + } + updates, deleted := set.Size() + storageTrieNodesUpdated += updates + storageTrieNodesDeleted += deleted + } + } + // If the contract is destructed, the storage is still left in the + // database as dangling data. Theoretically it's should be wiped from + // database as well, but in hash-based-scheme it's extremely hard to + // determine that if the trie nodes are also referenced by other storage, + // and in path-based-scheme some technical challenges are still unsolved. + // Although it won't affect the correctness but please fix it TODO(rjl493456442). } - // Write any storage changes in the state object to its storage trie - set, err := obj.commitTrie(s.db) - if err != nil { - return common.Hash{}, err + if len(s.stateObjectsDirty) > 0 { + s.stateObjectsDirty = make(map[common.Address]struct{}) + } + if codeWriter.ValueSize() > 0 { + if err := codeWriter.Write(); err != nil { + log.Crit("Failed to commit dirty codes", "error", err) + } } - // Merge the dirty nodes of storage trie into global set + // Write the account trie changes, measuring the amount of wasted time + var start time.Time + if metrics.EnabledExpensive { + start = time.Now() + } + root, set := s.trie.Commit(true) + // Merge the dirty nodes of account trie into global set if set != nil { if err := nodes.Merge(set); err != nil { - return common.Hash{}, err + return err } - updates, deleted := set.Size() - storageTrieNodesUpdated += updates - storageTrieNodesDeleted += deleted + accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size() } - } - // If the contract is destructed, the storage is still left in the - // database as dangling data. Theoretically it's should be wiped from - // database as well, but in hash-based-scheme it's extremely hard to - // determine that if the trie nodes are also referenced by other storage, - // and in path-based-scheme some technical challenges are still unsolved. - // Although it won't affect the correctness but please fix it TODO(rjl493456442). - } - if len(s.stateObjectsDirty) > 0 { - s.stateObjectsDirty = make(map[common.Address]struct{}) - } - if codeWriter.ValueSize() > 0 { - if err := codeWriter.Write(); err != nil { - log.Crit("Failed to commit dirty codes", "error", err) - } - } - // Write the account trie changes, measuring the amount of wasted time - var start time.Time - if metrics.EnabledExpensive { - start = time.Now() - } - root, set := s.trie.Commit(true) - // Merge the dirty nodes of account trie into global set - if set != nil { - if err := nodes.Merge(set); err != nil { - return common.Hash{}, err - } - accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size() - } - if metrics.EnabledExpensive { - s.AccountCommits += time.Since(start) - - accountUpdatedMeter.Mark(int64(s.AccountUpdated)) - storageUpdatedMeter.Mark(int64(s.StorageUpdated)) - accountDeletedMeter.Mark(int64(s.AccountDeleted)) - storageDeletedMeter.Mark(int64(s.StorageDeleted)) - accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated)) - accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted)) - storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated)) - storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted)) - s.AccountUpdated, s.AccountDeleted = 0, 0 - s.StorageUpdated, s.StorageDeleted = 0, 0 - } - // If snapshotting is enabled, update the snapshot tree with this new version - if s.snap != nil { - start := time.Now() - // Only update if there's a state transition (skip empty Clique blocks) - if parent := s.snap.Root(); parent != root { - if err := s.snaps.Update(root, parent, s.convertAccountSet(s.stateObjectsDestruct), s.snapAccounts, s.snapStorage); err != nil { - log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) + if metrics.EnabledExpensive { + s.AccountCommits += time.Since(start) + + accountUpdatedMeter.Mark(int64(s.AccountUpdated)) + storageUpdatedMeter.Mark(int64(s.StorageUpdated)) + accountDeletedMeter.Mark(int64(s.AccountDeleted)) + storageDeletedMeter.Mark(int64(s.StorageDeleted)) + accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated)) + accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted)) + storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated)) + storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted)) + s.AccountUpdated, s.AccountDeleted = 0, 0 + s.StorageUpdated, s.StorageDeleted = 0, 0 } - // Keep 128 diff layers in the memory, persistent layer is 129th. - // - head layer is paired with HEAD state - // - head-1 layer is paired with HEAD-1 state - // - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state - if err := s.snaps.Cap(root, 128); err != nil { - log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err) + + if len(s.stateObjectsDestruct) > 0 { + s.stateObjectsDestruct = make(map[common.Address]struct{}) } - } - if metrics.EnabledExpensive { - s.SnapshotCommits += time.Since(start) - } - s.snap, s.snapAccounts, s.snapStorage = nil, nil, nil - } - if len(s.stateObjectsDestruct) > 0 { - s.stateObjectsDestruct = make(map[common.Address]struct{}) - } - if root == (common.Hash{}) { - root = types.EmptyRootHash + if root == (common.Hash{}) { + root = types.EmptyRootHash + } + origin := s.originalRoot + if origin == (common.Hash{}) { + origin = types.EmptyRootHash + } + if root != origin { + start := time.Now() + if err := s.db.TrieDB().Update(nodes); err != nil { + return err + } + s.originalRoot = root + if metrics.EnabledExpensive { + s.TrieDBCommits += time.Since(start) + } + + s.db.CacheAccount(root, s.trie) + } + for _, postFunc := range postCommitFuncs { + err := postFunc() + if err != nil { + return err + } + } + return nil + }, + func() error { + // If snapshotting is enabled, update the snapshot tree with this new version + if s.snap != nil { + start := time.Now() + // Only update if there's a state transition (skip empty Clique blocks) + if parent := s.snap.Root(); parent != root { + if err := s.snaps.Update(root, parent, s.convertAccountSet(s.stateObjectsDestruct), s.snapAccounts, s.snapStorage); err != nil { + log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) + } + // Keep n diff layers in the memory, persistent layer is (n+1))th. + // - head layer is paired with HEAD state + // - head-1 layer is paired with HEAD-1 state + // - head-(n-1) layer(bottom-most diff layer) is paired with HEAD-(n-1) state + if err := s.snaps.Cap(root, s.snaps.CapLimit()); err != nil { + log.Warn("Failed to cap snapshot tree", "root", root, "layers", s.snaps.CapLimit(), "err", err) + } + } + if metrics.EnabledExpensive { + s.SnapshotCommits += time.Since(start) + } + s.snap, s.snapAccounts, s.snapStorage = nil, nil, nil + } + return nil + }, } - origin := s.originalRoot - if origin == (common.Hash{}) { - origin = types.EmptyRootHash + + commitRes := make(chan error, len(commitFuncs)) + for i := 0; i < len(commitFuncs); i++ { + commitFunc := commitFuncs[i] + gopool.Submit(func() { + commitRes <- commitFunc() + }) } - if root != origin { - start := time.Now() - if err := s.db.TrieDB().Update(nodes); err != nil { + for i := 0; i < len(commitFuncs); i++ { + err := <-commitRes + if err != nil { return common.Hash{}, err } - s.originalRoot = root - if metrics.EnabledExpensive { - s.TrieDBCommits += time.Since(start) - } } + return root, nil } @@ -1171,3 +1237,7 @@ func (s *StateDB) convertAccountSet(set map[common.Address]struct{}) map[common. } return ret } + +func (s *StateDB) GetStorage(address common.Address) *sync.Map { + return s.storagePool.getStorage(address) +} diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 72cb249f22..5e7f52c701 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -17,8 +17,7 @@ package core import ( - "sync/atomic" - + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -26,6 +25,11 @@ import ( "github.com/ethereum/go-ethereum/params" ) +const ( + prefetchThread = 3 + checkInterval = 10 +) + // statePrefetcher is a basic Prefetcher, which blindly executes a block on top // of an arbitrary state with the goal of prefetching potentially useful state // data from disk before the main block processor start executing. @@ -36,7 +40,7 @@ type statePrefetcher struct { } // newStatePrefetcher initialises a new statePrefetcher. -func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *statePrefetcher { +func NewStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *statePrefetcher { return &statePrefetcher{ config: config, bc: bc, @@ -47,39 +51,113 @@ func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine conse // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. -func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) { +func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}) { var ( - header = block.Header() - gaspool = new(GasPool).AddGas(block.GasLimit()) - blockContext = NewEVMBlockContext(header, p.bc, nil, p.config, statedb) - evm = vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) - signer = types.MakeSigner(p.config, header.Number) + header = block.Header() + signer = types.MakeSigner(p.config, header.Number) ) - // Iterate over and process the individual transactions - byzantium := p.config.IsByzantium(block.Number()) - for i, tx := range block.Transactions() { - // If block precaching was interrupted, abort - if interrupt != nil && atomic.LoadUint32(interrupt) == 1 { + transactions := block.Transactions() + txChan := make(chan int, prefetchThread) + // No need to execute the first batch, since the main processor will do it. + for i := 0; i < prefetchThread; i++ { + gopool.Submit(func() { + newStatedb := statedb.Copy() + newStatedb.EnableWriteOnSharedStorage() + gaspool := new(GasPool).AddGas(block.GasLimit()) + blockContext := NewEVMBlockContext(header, p.bc, nil, p.config, newStatedb) + evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) + // Iterate over and process the individual transactions + for { + select { + case txIndex := <-txChan: + tx := transactions[txIndex] + // Convert the transaction into an executable message and pre-cache its sender + msg, err := TransactionToMessage(tx, signer, header.BaseFee) + if err != nil { + return // Also invalid block, bail out + } + newStatedb.SetTxContext(tx.Hash(), i) + precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) + + case <-interruptCh: + // If block precaching was interrupted, abort + return + } + } + }) + } + + // it should be in a separate goroutine, to avoid blocking the critical path. + for i := 0; i < len(transactions); i++ { + select { + case txChan <- i: + case <-interruptCh: return } - // Convert the transaction into an executable message and pre-cache its sender - msg, err := TransactionToMessage(tx, signer, header.BaseFee) - if err != nil { - return // Also invalid block, bail out - } - statedb.SetTxContext(tx.Hash(), i) - if err := precacheTransaction(msg, p.config, gaspool, statedb, header, evm); err != nil { - return // Ugh, something went horribly wrong, bail out - } - // If we're pre-byzantium, pre-load trie nodes for the intermediate root - if !byzantium { - statedb.IntermediateRoot(true) - } } - // If were post-byzantium, pre-load trie nodes for the final root hash - if byzantium { - statedb.IntermediateRoot(true) +} + +// PrefetchMining processes the state changes according to the Ethereum rules by running +// the transaction messages using the statedb, but any changes are discarded. The +// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage +func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) { + var ( + signer = types.MakeSigner(p.config, header.Number) + ) + + txCh := make(chan *types.Transaction, 2*prefetchThread) + for i := 0; i < prefetchThread; i++ { + go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) { + newStatedb := statedb.CopyDoPrefetch() + newStatedb.EnableWriteOnSharedStorage() + gaspool := new(GasPool).AddGas(gasLimit) + blockContext := NewEVMBlockContext(header, p.bc, nil, p.config, newStatedb) + evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) + rules := p.config.Rules(evm.Context.BlockNumber, evm.Context.Random != nil, evm.Context.Time) + // Iterate over and process the individual transactions + for { + select { + case tx := <-startCh: + // Convert the transaction into an executable message and pre-cache its sender + msg, err := ToMessageNoNonceCheck(tx, signer, header.BaseFee) + if err != nil { + return // Also invalid block, bail out + } + + newStatedb.Prepare(rules, msg.From, evm.Context.Coinbase, msg.To, vm.ActivePrecompiles(rules), msg.AccessList) + precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) + gaspool = new(GasPool).AddGas(gasLimit) + case <-stopCh: + return + } + } + }(txCh, interruptCh) } + go func(txset *types.TransactionsByPriceAndNonce) { + count := 0 + for { + select { + case <-interruptCh: + return + default: + if count++; count%checkInterval == 0 { + txset.Forward(*txCurr) + } + tx := txset.Peek() + if tx == nil { + return + } + + select { + case <-interruptCh: + return + case txCh <- tx: + } + + txset.Shift() + } + } + }(txs) } // precacheTransaction attempts to apply a transaction to the given state database diff --git a/core/state_processor.go b/core/state_processor.go index 7a638190aa..0310af96fb 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -165,5 +165,9 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo // Create a new context to be used in the EVM environment blockContext := NewEVMBlockContext(header, bc, author, config, statedb) vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg) + defer func() { + vm.EVMInterpreterPool.Put(vmenv.Interpreter()) + vm.EvmPool.Put(vmenv) + }() return applyTransaction(msg, config, gp, statedb, header.Number, header.Hash(), tx, usedGas, vmenv) } diff --git a/core/state_transition.go b/core/state_transition.go index 232a79af7c..4d7ed25b65 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -175,6 +175,15 @@ func TransactionToMessage(tx *types.Transaction, s types.Signer, baseFee *big.In return msg, err } +// ToMessageNoNonceCheck returns the transaction with checkNonce field set to be false. +func ToMessageNoNonceCheck(tx *types.Transaction, s types.Signer, baseFee *big.Int) (*Message, error) { + msg, err := TransactionToMessage(tx, s, baseFee) + if err == nil { + msg.SkipAccountChecks = true + } + return msg, err +} + // ApplyMessage computes the new state by applying the given message // against the old state within the environment. // diff --git a/core/types.go b/core/types.go index 4c5b74a498..72a99a65bb 100644 --- a/core/types.go +++ b/core/types.go @@ -39,7 +39,9 @@ type Prefetcher interface { // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. - Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) + Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt <-chan struct{}) + // PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage. + PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) } // Processor is an interface for processing blocks using a given initial state. diff --git a/core/types/transaction.go b/core/types/transaction.go index 6485d05169..84cab95884 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -669,6 +669,61 @@ func (t *TransactionsByPriceAndNonce) Pop() { heap.Pop(&t.heads) } +// Forward moves current transaction to be the one which is one index after tx +func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) { + if tx == nil { + if len(t.heads) > 0 { + t.heads = t.heads[0:0] + } + return + } + //check whether target tx exists in t.heads + for _, head := range t.heads { + if tx == head.tx { + //shift t to the position one after tx + txTmp := t.Peek() + for txTmp != tx { + t.Shift() + txTmp = t.Peek() + } + t.Shift() + return + } + } + //get the sender address of tx + acc, _ := Sender(t.signer, tx) + //check whether target tx exists in t.txs + if txs, ok := t.txs[acc]; ok { + for _, txTmp := range txs { + //found the same pointer in t.txs as tx and then shift t to the position one after tx + if txTmp == tx { + txTmp = t.Peek() + for txTmp != tx { + t.Shift() + txTmp = t.Peek() + } + t.Shift() + return + } + } + } +} + +// Copy returns a new TransactionsPriceAndNonce with the same *transaction +func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce { + heads := make([]*TxWithMinerFee, len(t.heads)) + copy(heads, t.heads) + txs := make(map[common.Address]Transactions, len(t.txs)) + for acc, txsTmp := range t.txs { + txs[acc] = txsTmp + } + return &TransactionsByPriceAndNonce{ + heads: heads, + txs: txs, + signer: t.signer, + } +} + // copyAddressPtr copies an address. func copyAddressPtr(a *common.Address) *common.Address { if a == nil { diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index 4b96c6b91a..ebe9be86c9 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -405,6 +405,77 @@ func TestTransactionTimeSort(t *testing.T) { } } +func TestTransactionForward(t *testing.T) { + // Generate a batch of accounts to start with + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + } + signer := HomesteadSigner{} + + // Generate a batch of transactions with overlapping prices, but different creation times + groups := map[common.Address]Transactions{} + for start, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + + tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + + tx.time = time.Unix(0, int64(len(keys)-start)) + tx2.time = time.Unix(1, int64(len(keys)-start)) + + groups[addr] = append(groups[addr], tx) + groups[addr] = append(groups[addr], tx2) + + } + // Sort the transactions + txset := NewTransactionsByPriceAndNonce(signer, groups, common.Big0) + txsetCpy := txset.Copy() + txs := Transactions{} + for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() { + txs = append(txs, tx) + txsetCpy.Shift() + } + + tmp := txset.Copy() + for j := 0; j < 11; j++ { + txset = tmp.Copy() + txsetCpy = tmp.Copy() + i := 0 + for ; i < j; i++ { + txset.Shift() + } + tx := txset.Peek() + if tx == nil { + continue + } + txsetCpy.Forward(tx) + txCpy := txsetCpy.Peek() + if txCpy == nil { + if tx == nil { + continue + } + txset.Shift() + if txset.Peek() != nil { + t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx) + } else { + continue + } + } + txset.Shift() + for ; i < len(txs)-1; i++ { + tx = txset.Peek() + txCpy = txsetCpy.Peek() + if txCpy != tx { + t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx) + } + txsetCpy.Shift() + txset.Shift() + } + + } +} + // TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON. func TestTransactionCoding(t *testing.T) { key, err := crypto.GenerateKey() diff --git a/core/vm/evm.go b/core/vm/evm.go index 9ab030dbab..abc6d79525 100644 --- a/core/vm/evm.go +++ b/core/vm/evm.go @@ -18,6 +18,7 @@ package vm import ( "math/big" + "sync" "sync/atomic" "github.com/holiman/uint256" @@ -32,6 +33,13 @@ import ( // deployed contract addresses (relevant after the account abstraction). var emptyCodeHash = crypto.Keccak256Hash(nil) +// EvmPool is a pool of EVM instances +var EvmPool = sync.Pool{ + New: func() interface{} { + return &EVM{} + }, +} + type ( // CanTransferFunc is the signature of a transfer guard function CanTransferFunc func(StateDB, common.Address, *big.Int) bool @@ -129,14 +137,16 @@ type EVM struct { // NewEVM returns a new EVM. The returned EVM is not thread safe and should // only ever be used *once*. func NewEVM(blockCtx BlockContext, txCtx TxContext, statedb StateDB, chainConfig *params.ChainConfig, config Config) *EVM { - evm := &EVM{ - Context: blockCtx, - TxContext: txCtx, - StateDB: statedb, - Config: config, - chainConfig: chainConfig, - chainRules: chainConfig.Rules(blockCtx.BlockNumber, blockCtx.Random != nil, blockCtx.Time), - } + evm := EvmPool.Get().(*EVM) + evm.Context = blockCtx + evm.TxContext = txCtx + evm.StateDB = statedb + evm.Config = config + evm.chainConfig = chainConfig + evm.chainRules = chainConfig.Rules(blockCtx.BlockNumber, blockCtx.Random != nil, blockCtx.Time) + evm.abort = 0 + evm.callGasTemp = 0 + evm.depth = 0 evm.interpreter = NewEVMInterpreter(evm) return evm } diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index 0ab520b90f..408d5fd299 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -17,12 +17,21 @@ package vm import ( + "sync" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" ) +// EVMInterpreterPool is a pool of EVMInterpreter instances +var EVMInterpreterPool = sync.Pool{ + New: func() interface{} { + return &EVMInterpreter{} + }, +} + // Config are the configuration options for the Interpreter type Config struct { Debug bool // Enables debugging @@ -94,7 +103,13 @@ func NewEVMInterpreter(evm *EVM) *EVMInterpreter { } } evm.Config.ExtraEips = extraEips - return &EVMInterpreter{evm: evm, table: table} + + evmInterpreter := EVMInterpreterPool.Get().(*EVMInterpreter) + evmInterpreter.evm = evm + evmInterpreter.table = table + evmInterpreter.readOnly = false + evmInterpreter.returnData = nil + return evmInterpreter } // Run loops and evaluates the contract's code with the given input data and returns diff --git a/crypto/crypto.go b/crypto/crypto.go index e51b63beca..361682c742 100644 --- a/crypto/crypto.go +++ b/crypto/crypto.go @@ -28,7 +28,9 @@ import ( "io" "math/big" "os" + "sync" + "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/rlp" @@ -47,6 +49,12 @@ const DigestLength = 32 var ( secp256k1N, _ = new(big.Int).SetString("fffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141", 16) secp256k1halfN = new(big.Int).Div(secp256k1N, big.NewInt(2)) + + keccakState256Cache = fastcache.New(100 * 1024 * 1024) + keccakState256Pool = sync.Pool{ + New: func() interface{} { + return NewKeccakState() + }} ) var errInvalidPubkey = errors.New("invalid secp256k1 public key") @@ -66,31 +74,55 @@ func NewKeccakState() KeccakState { // HashData hashes the provided data using the KeccakState and returns a 32 byte hash func HashData(kh KeccakState, data []byte) (h common.Hash) { + if hash, ok := keccakState256Cache.HasGet(nil, data); ok { + return common.BytesToHash(hash) + } kh.Reset() kh.Write(data) kh.Read(h[:]) + keccakState256Cache.Set(data, h.Bytes()) return h } // Keccak256 calculates and returns the Keccak256 hash of the input data. func Keccak256(data ...[]byte) []byte { + if len(data) == 1 { + if hash, ok := keccakState256Cache.HasGet(nil, data[0]); ok { + return hash + } + } b := make([]byte, 32) - d := NewKeccakState() + d := keccakState256Pool.Get().(KeccakState) + defer keccakState256Pool.Put(d) + d.Reset() for _, b := range data { d.Write(b) } d.Read(b) + if len(data) == 1 { + keccakState256Cache.Set(data[0], b) + } return b } // Keccak256Hash calculates and returns the Keccak256 hash of the input data, // converting it to an internal Hash data structure. func Keccak256Hash(data ...[]byte) (h common.Hash) { - d := NewKeccakState() + if len(data) == 1 { + if hash, ok := keccakState256Cache.HasGet(nil, data[0]); ok { + return common.BytesToHash(hash) + } + } + d := keccakState256Pool.Get().(KeccakState) + defer keccakState256Pool.Put(d) + d.Reset() for _, b := range data { d.Write(b) } d.Read(h[:]) + if len(data) == 1 { + keccakState256Cache.Set(data[0], h.Bytes()) + } return h } diff --git a/eth/backend.go b/eth/backend.go index 0e3351ab8b..d55c80e4b4 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -129,6 +129,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } config.TrieDirtyCache = 0 } + if config.TriesInMemory == 0 { + config.TriesInMemory = core.TriesInMemory // point to default(128) + } log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) // Assemble the Ethereum object @@ -136,7 +139,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } - if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb, stack.ResolvePath(config.TrieCleanCacheJournal)); err != nil { + if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb, stack.ResolvePath(config.TrieCleanCacheJournal), config.TriesInMemory); err != nil { log.Error("Failed to recover state", "error", err) } // Transfer mining-related config to the ethash config. @@ -193,6 +196,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { TrieDirtyLimit: config.TrieDirtyCache, TrieDirtyDisabled: config.NoPruning, TrieTimeLimit: config.TrieTimeout, + TriesInMemory: config.TriesInMemory, SnapshotLimit: config.SnapshotCache, Preimages: config.Preimages, } diff --git a/eth/bloombits.go b/eth/bloombits.go index 0cb7050d23..314317ae4f 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -20,6 +20,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/rawdb" ) @@ -45,7 +46,7 @@ const ( // retrievals from possibly a range of filters and serving the data to satisfy. func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { for i := 0; i < bloomServiceThreads; i++ { - go func() { + gopool.Submit(func() { for { select { case <-eth.closeBloomHandler: @@ -69,6 +70,6 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { request <- task } } - }() + }) } } diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index fb6e6935ee..cc9f3467c8 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -440,7 +440,7 @@ func startEthService(t *testing.T, genesis *core.Genesis, blocks []*types.Block) t.Fatal("can't create node:", err) } - ethcfg := ðconfig.Config{Genesis: genesis, Ethash: ethash.Config{PowMode: ethash.ModeFake}, SyncMode: downloader.FullSync, TrieTimeout: time.Minute, TrieDirtyCache: 256, TrieCleanCache: 256} + ethcfg := ðconfig.Config{Genesis: genesis, Ethash: ethash.Config{PowMode: ethash.ModeFake}, SyncMode: downloader.FullSync, TrieTimeout: time.Minute, TrieDirtyCache: 256, TrieCleanCache: 256, TriesInMemory: 128} ethservice, err := eth.New(n, ethcfg) if err != nil { t.Fatal("can't create eth service:", err) diff --git a/eth/downloader/api.go b/eth/downloader/api.go index b3f7113bcd..3d9e06eba7 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" ) @@ -98,7 +99,7 @@ func (api *DownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error rpcSub := notifier.CreateSubscription() - go func() { + gopool.Submit(func() { statuses := make(chan interface{}) sub := api.SubscribeSyncStatus(statuses) @@ -114,7 +115,7 @@ func (api *DownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error return } } - }() + }) return rpcSub, nil } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 272433cd27..d188fe1413 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -83,6 +83,7 @@ var Defaults = Config{ TrieCleanCacheRejournal: 60 * time.Minute, TrieDirtyCache: 256, TrieTimeout: 60 * time.Minute, + TriesInMemory: 128, SnapshotCache: 102, FilterLogCacheSize: 32, Miner: miner.DefaultConfig, @@ -166,6 +167,7 @@ type Config struct { TrieCleanCacheRejournal time.Duration `toml:",omitempty"` // Time interval to regenerate the journal for clean cache TrieDirtyCache int TrieTimeout time.Duration + TriesInMemory uint64 // How many tries keeps in memory SnapshotCache int Preimages bool diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index b7255a242e..6dbcf0fac9 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -46,6 +46,7 @@ func (c Config) MarshalTOML() (interface{}, error) { TrieCleanCacheRejournal time.Duration `toml:",omitempty"` TrieDirtyCache int TrieTimeout time.Duration + TriesInMemory uint64 `toml:",omitempty"` SnapshotCache int Preimages bool FilterLogCacheSize int @@ -91,6 +92,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.TrieCleanCacheRejournal = c.TrieCleanCacheRejournal enc.TrieDirtyCache = c.TrieDirtyCache enc.TrieTimeout = c.TrieTimeout + enc.TriesInMemory = c.TriesInMemory enc.SnapshotCache = c.SnapshotCache enc.Preimages = c.Preimages enc.FilterLogCacheSize = c.FilterLogCacheSize @@ -140,6 +142,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { TrieCleanCacheRejournal *time.Duration `toml:",omitempty"` TrieDirtyCache *int TrieTimeout *time.Duration + TriesInMemory *uint64 `toml:",omitempty"` SnapshotCache *int Preimages *bool FilterLogCacheSize *int @@ -244,6 +247,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.TrieTimeout != nil { c.TrieTimeout = *dec.TrieTimeout } + if dec.TriesInMemory != nil { + c.TriesInMemory = *dec.TriesInMemory + } if dec.SnapshotCache != nil { c.SnapshotCache = *dec.SnapshotCache } diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 35608031d9..7c6044a293 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -23,6 +23,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/types" @@ -461,17 +462,18 @@ func (f *BlockFetcher) loop() { log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes) // Create a closure of the fetch and schedule in on a new thread - fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes - go func(peer string) { + fetchHeader, hashes, peer := f.fetching[hashes[0]].fetchHeader, hashes, peer + gopool.Submit(func() { if f.fetchingHook != nil { f.fetchingHook(hashes) } for _, hash := range hashes { headerFetchMeter.Mark(1) - go func(hash common.Hash) { + copiedHash := hash // copy for closure + gopool.Submit(func() { resCh := make(chan *eth.Response) - req, err := fetchHeader(hash, resCh) + req, err := fetchHeader(copiedHash, resCh) if err != nil { return // Legacy code, yolo } @@ -492,9 +494,9 @@ func (f *BlockFetcher) loop() { // peer however, it's a protocol violation. f.dropPeer(peer) } - }(hash) + }) } - }(peer) + }) } // Schedule the next fetch if blocks are still pending f.rescheduleFetch(fetchTimer) @@ -525,7 +527,8 @@ func (f *BlockFetcher) loop() { fetchBodies := f.completing[hashes[0]].fetchBodies bodyFetchMeter.Mark(int64(len(hashes))) - go func(peer string, hashes []common.Hash) { + peer, hashes := peer, hashes + gopool.Submit(func() { resCh := make(chan *eth.Response) req, err := fetchBodies(hashes, resCh) @@ -551,7 +554,7 @@ func (f *BlockFetcher) loop() { // peer however, it's a protocol violation. f.dropPeer(peer) } - }(peer, hashes) + }) } // Schedule the next fetch if blocks are still pending f.rescheduleComplete(completeTimer) @@ -807,7 +810,7 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { hash := header.Hash() log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash) - go func() { + gopool.Submit(func() { defer func() { f.done <- hash }() // If the parent's unknown, abort insertion parent := f.getHeader(header.ParentHash) @@ -830,7 +833,7 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { if f.importedHook != nil { f.importedHook(header, nil) } - }() + }) } // importBlocks spawns a new goroutine to run a block insertion into the chain. If the @@ -841,7 +844,7 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) { // Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) - go func() { + gopool.Submit(func() { defer func() { f.done <- hash }() // If the parent's unknown, abort insertion @@ -879,7 +882,7 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) { if f.importedHook != nil { f.importedHook(nil, block) } - }() + }) } // forgetHash removes all traces of a block announcement from the fetcher's diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 39727e0079..f01e683ff2 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -26,6 +26,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" @@ -812,14 +813,14 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, f.requests[peer] = &txRequest{hashes: hashes, time: f.clock.Now()} txRequestOutMeter.Mark(int64(len(hashes))) - go func(peer string, hashes []common.Hash) { + gopool.Submit(func() { // Try to fetch the transactions, but in case of a request // failure (e.g. peer disconnected), reschedule the hashes. if err := f.fetchTxs(peer, hashes); err != nil { txRequestFailMeter.Mark(int64(len(hashes))) f.Drop(peer) } - }(peer, hashes) + }) } }) // If a new request was fired, schedule a timeout timer diff --git a/eth/filters/api.go b/eth/filters/api.go index f9ae70eba7..cafe7bee64 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/internal/ethapi" @@ -114,7 +115,7 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID { api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, fullTx: fullTx != nil && *fullTx, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub} api.filtersMu.Unlock() - go func() { + gopool.Submit(func() { for { select { case pTx := <-pendingTxs: @@ -130,7 +131,7 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID { return } } - }() + }) return pendingTxSub.ID } @@ -146,7 +147,7 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) rpcSub := notifier.CreateSubscription() - go func() { + gopool.Submit(func() { txs := make(chan []*types.Transaction, 128) pendingTxSub := api.events.SubscribePendingTxs(txs) chainConfig := api.sys.backend.ChainConfig() @@ -173,7 +174,7 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) return } } - }() + }) return rpcSub, nil } @@ -190,7 +191,7 @@ func (api *FilterAPI) NewBlockFilter() rpc.ID { api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub} api.filtersMu.Unlock() - go func() { + gopool.Submit(func() { for { select { case h := <-headers: @@ -206,7 +207,7 @@ func (api *FilterAPI) NewBlockFilter() rpc.ID { return } } - }() + }) return headerSub.ID } @@ -220,7 +221,7 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { rpcSub := notifier.CreateSubscription() - go func() { + gopool.Submit(func() { headers := make(chan *types.Header) headersSub := api.events.SubscribeNewHeads(headers) @@ -236,7 +237,7 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { return } } - }() + }) return rpcSub, nil } @@ -258,7 +259,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc return nil, err } - go func() { + gopool.Submit(func() { for { select { case logs := <-matchedLogs: @@ -274,7 +275,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc return } } - }() + }) return rpcSub, nil } @@ -305,7 +306,7 @@ func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(api.timeout), logs: make([]*types.Log, 0), s: logsSub} api.filtersMu.Unlock() - go func() { + gopool.Submit(func() { for { select { case l := <-logs: @@ -321,7 +322,7 @@ func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { return } } - }() + }) return logsSub.ID, nil } diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 3045303f22..bd07a9ba60 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -20,6 +20,7 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/types" ) @@ -91,14 +92,14 @@ func (p *Peer) broadcastTransactions() { // If there's anything available to transfer, fire up an async writer if len(txs) > 0 { done = make(chan struct{}) - go func() { + gopool.Submit(func() { if err := p.SendTransactions(txs); err != nil { fail <- err return } close(done) p.Log().Trace("Sent transactions", "count", len(txs)) - }() + }) } } // Transfer goroutine may or may not have been started, listen for events @@ -162,7 +163,7 @@ func (p *Peer) announceTransactions() { // If there's anything available to transfer, fire up an async writer if len(pending) > 0 { done = make(chan struct{}) - go func() { + gopool.Submit(func() { if p.version >= ETH68 { if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil { fail <- err @@ -176,7 +177,7 @@ func (p *Peer) announceTransactions() { } close(done) p.Log().Trace("Sent transaction announcements", "count", len(pending)) - }() + }) } } // Transfer goroutine may or may not have been started, listen for events diff --git a/eth/protocols/eth/handshake.go b/eth/protocols/eth/handshake.go index 9a2769fa0d..cdfb6edf19 100644 --- a/eth/protocols/eth/handshake.go +++ b/eth/protocols/eth/handshake.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/p2p" ) @@ -40,7 +41,7 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis var status StatusPacket // safe to read after two values have been received from errc - go func() { + gopool.Submit(func() { errc <- p2p.Send(p.rw, StatusMsg, &StatusPacket{ ProtocolVersion: uint32(p.version), NetworkID: network, @@ -49,10 +50,10 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis Genesis: genesis, ForkID: forkID, }) - }() - go func() { + }) + gopool.Submit(func() { errc <- p.readStatus(network, &status, genesis, forkFilter) - }() + }) timeout := time.NewTimer(handshakeTimeout) defer timeout.Stop() for i := 0; i < 2; i++ { diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 13279fd96c..d44f68ee44 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -30,6 +30,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" @@ -999,7 +1000,8 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac delete(s.accountIdlers, idle) s.pend.Add(1) - go func(root common.Hash) { + root := s.root + gopool.Submit(func() { defer s.pend.Done() // Attempt to send the remote request and revert if it fails @@ -1013,7 +1015,7 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac peer.Log().Debug("Failed to request account range", "err", err) s.scheduleRevertAccountRequest(req) } - }(s.root) + }) // Inject the request into the task to block further assignments task.req = req @@ -1110,7 +1112,7 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan * delete(s.bytecodeIdlers, idle) s.pend.Add(1) - go func() { + gopool.Submit(func() { defer s.pend.Done() // Attempt to send the remote request and revert if it fails @@ -1118,7 +1120,7 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan * log.Debug("Failed to request bytecodes", "err", err) s.scheduleRevertBytecodeRequest(req) } - }() + }) } } @@ -1257,7 +1259,8 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st delete(s.storageIdlers, idle) s.pend.Add(1) - go func(root common.Hash) { + root := s.root + gopool.Submit(func() { defer s.pend.Done() // Attempt to send the remote request and revert if it fails @@ -1269,7 +1272,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st log.Debug("Failed to request storage", "err", err) s.scheduleRevertStorageRequest(req) } - }(s.root) + }) // Inject the request into the subtask to block further assignments if subtask != nil { @@ -1394,7 +1397,8 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai delete(s.trienodeHealIdlers, idle) s.pend.Add(1) - go func(root common.Hash) { + root := s.root + gopool.Submit(func() { defer s.pend.Done() // Attempt to send the remote request and revert if it fails @@ -1402,7 +1406,7 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai log.Debug("Failed to request trienode healers", "err", err) s.scheduleRevertTrienodeHealRequest(req) } - }(s.root) + }) } } @@ -1510,7 +1514,7 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai delete(s.bytecodeHealIdlers, idle) s.pend.Add(1) - go func() { + gopool.Submit(func() { defer s.pend.Done() // Attempt to send the remote request and revert if it fails @@ -1518,7 +1522,7 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai log.Debug("Failed to request bytecode healers", "err", err) s.scheduleRevertBytecodeHealRequest(req) } - }() + }) } } diff --git a/eth/sync.go b/eth/sync.go index 6d764ef482..6e06803e56 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -23,6 +23,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" @@ -228,7 +229,7 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { // startSync launches doSync in a new goroutine. func (cs *chainSyncer) startSync(op *chainSyncOp) { cs.doneCh = make(chan error, 1) - go func() { cs.doneCh <- cs.handler.doSync(op) }() + gopool.Submit(func() { cs.doneCh <- cs.handler.doSync(op) }) } // doSync synchronizes the local blockchain with a remote peer. diff --git a/eth/tracers/api.go b/eth/tracers/api.go index c32f038577..d1ed5ba8bf 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -30,6 +30,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" @@ -251,11 +252,11 @@ func (api *API) TraceChain(ctx context.Context, start, end rpc.BlockNumber, conf sub := notifier.CreateSubscription() resCh := api.traceChain(from, to, config, notifier.Closed()) - go func() { + gopool.Submit(func() { for result := range resCh { notifier.Notify(sub.ID, result) } - }() + }) return sub, nil } @@ -283,7 +284,7 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed ) for th := 0; th < threads; th++ { pend.Add(1) - go func() { + gopool.Submit(func() { defer pend.Done() // Fetch and execute the block trace taskCh @@ -323,10 +324,10 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed return } } - }() + }) } // Start a goroutine to feed all the blocks into the tracers - go func() { + gopool.Submit(func() { var ( logged time.Time begin = time.Now() @@ -417,11 +418,11 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed } traced += uint64(len(txs)) } - }() + }) // Keep reading the trace results and stream them to result channel. retCh := make(chan *blockTraceResult) - go func() { + gopool.Submit(func() { defer close(retCh) var ( next = start.NumberU64() + 1 @@ -449,7 +450,7 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed next++ } } - }() + }) return retCh } @@ -696,7 +697,7 @@ func (api *API) traceBlockParallel(ctx context.Context, block *types.Block, stat jobs := make(chan *txTraceTask, threads) for th := 0; th < threads; th++ { pend.Add(1) - go func() { + gopool.Submit(func() { defer pend.Done() // Fetch and execute the next transaction trace tasks for task := range jobs { @@ -715,7 +716,7 @@ func (api *API) traceBlockParallel(ctx context.Context, block *types.Block, stat } results[task.index] = &txTraceResult{Result: res} } - }() + }) } // Feed the transactions into the tracers and return @@ -1021,14 +1022,14 @@ func (api *API) traceTx(ctx context.Context, message *core.Message, txctx *Conte } } deadlineCtx, cancel := context.WithTimeout(ctx, timeout) - go func() { + gopool.Submit(func() { <-deadlineCtx.Done() if errors.Is(deadlineCtx.Err(), context.DeadlineExceeded) { tracer.Stop(errors.New("execution timeout")) // Stop evm execution. Note cancellation is not necessarily immediate. vmenv.Cancel() } - }() + }) defer cancel() // Call Prepare to clear out the statedb access list diff --git a/eth/tracers/api_test.go b/eth/tracers/api_test.go index a2d00f3edc..4479627903 100644 --- a/eth/tracers/api_test.go +++ b/eth/tracers/api_test.go @@ -158,6 +158,7 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, generator func(i i TrieDirtyLimit: 256, TrieTimeLimit: 5 * time.Minute, SnapshotLimit: 0, + TriesInMemory: core.TriesInMemory, TrieDirtyDisabled: true, // Archive mode } chain, err := core.NewBlockChain(backend.chaindb, cacheConfig, gspec, nil, backend.engine, vm.Config{}, nil, nil) diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index 7e4fd7e5e7..88837f1666 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -244,6 +244,9 @@ func (b *batch) Write() error { b.db.lock.Lock() defer b.db.lock.Unlock() + if b.db.db == nil { + return nil + } for _, keyvalue := range b.writes { if keyvalue.delete { delete(b.db.db, string(keyvalue.key)) diff --git a/event/subscription.go b/event/subscription.go index 6c62874719..080985d1d4 100644 --- a/event/subscription.go +++ b/event/subscription.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/mclock" ) @@ -48,7 +49,7 @@ type Subscription interface { // error, it is sent on the subscription's error channel. func NewSubscription(producer func(<-chan struct{}) error) Subscription { s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)} - go func() { + gopool.Submit(func() { defer close(s.err) err := producer(s.unsub) s.mu.Lock() @@ -59,7 +60,7 @@ func NewSubscription(producer func(<-chan struct{}) error) Subscription { } s.unsubscribed = true } - }() + }) return s } @@ -171,11 +172,11 @@ func (s *resubscribeSub) subscribe() Subscription { for { s.lastTry = mclock.Now() ctx, cancel := context.WithCancel(context.Background()) - go func() { + gopool.Submit(func() { rsub, err := s.fn(ctx, s.lastSubErr) sub = rsub subscribed <- err - }() + }) select { case err := <-subscribed: cancel() diff --git a/go.mod b/go.mod index 430451fc33..2ded17b66c 100644 --- a/go.mod +++ b/go.mod @@ -105,6 +105,7 @@ require ( github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect github.com/opentracing/opentracing-go v1.1.0 // indirect + github.com/panjf2000/ants/v2 v2.7.3 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect diff --git a/go.sum b/go.sum index 449f638159..8a1cedb4ba 100644 --- a/go.sum +++ b/go.sum @@ -451,6 +451,8 @@ github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/panjf2000/ants/v2 v2.7.3 h1:rHQ0hH0DQvuNUqqlWIMJtkMcDuL1uQAfpX2mIhQ5/s0= +github.com/panjf2000/ants/v2 v2.7.3/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc= diff --git a/graphql/graphql_test.go b/graphql/graphql_test.go index 46acd15293..e1f5f1fbf8 100644 --- a/graphql/graphql_test.go +++ b/graphql/graphql_test.go @@ -345,6 +345,7 @@ func newGQLService(t *testing.T, stack *node.Node, gspec *core.Genesis, genBlock TrieCleanCacheRejournal: 60 * time.Minute, TrieDirtyCache: 5, TrieTimeout: 60 * time.Minute, + TriesInMemory: 128, SnapshotCache: 5, } ethBackend, err := eth.New(stack, ethConf) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 97575366f4..b33090c261 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -35,6 +35,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/accounts/scwallet" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -1076,10 +1077,10 @@ func DoCall(ctx context.Context, b Backend, args TransactionArgs, blockNrOrHash } // Wait for the context to be done and cancel the evm. Even if the // EVM has finished, cancelling may be done (repeatedly) - go func() { + gopool.Submit(func() { <-ctx.Done() evm.Cancel() - }() + }) // Execute the message. gp := new(core.GasPool).AddGas(math.MaxUint64) diff --git a/les/catalyst/api_test.go b/les/catalyst/api_test.go index 54757f61da..b319762318 100644 --- a/les/catalyst/api_test.go +++ b/les/catalyst/api_test.go @@ -228,6 +228,7 @@ func startLesService(t *testing.T, genesis *core.Genesis, headers []*types.Heade TrieDirtyCache: 256, TrieCleanCache: 256, LightPeers: 10, + TriesInMemory: 128, } lesService, err := les.New(n, ethcfg) if err != nil { diff --git a/light/trie.go b/light/trie.go index 0ccab1588d..78c532de6e 100644 --- a/light/trie.go +++ b/light/trie.go @@ -99,6 +99,12 @@ func (db *odrDatabase) DiskDB() ethdb.KeyValueStore { panic("not implemented") } +func (db *odrDatabase) CacheAccount(_ common.Hash, _ state.Trie) {} + +func (db *odrDatabase) CacheStorage(_ common.Hash, _ common.Hash, _ state.Trie) {} + +func (db *odrDatabase) Purge() {} + type odrTrie struct { db *odrDatabase id *TrieID diff --git a/miner/worker.go b/miner/worker.go index f09bc32058..37c5787aa4 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -198,6 +198,7 @@ type worker struct { engine consensus.Engine eth Backend chain *core.BlockChain + prefetcher core.Prefetcher // Feeds pendingLogsFeed event.Feed @@ -279,6 +280,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus engine: engine, eth: eth, chain: eth.BlockChain(), + prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine), mux: mux, isLocalBlock: isLocalBlock, localUncles: make(map[common.Hash]*types.Block), @@ -802,6 +804,7 @@ func (w *worker) makeEnv(parent *types.Header, header *types.Header, coinbase co parentBlock := w.eth.BlockChain().GetBlockByHash(parent.Hash()) state, release, err = historicalBackend.StateAtBlock(context.Background(), parentBlock, ^uint64(0), nil, false, false) state = state.Copy() + state.EnableWriteOnSharedStorage() release() } } @@ -895,6 +898,14 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP } var coalescedLogs []*types.Log + stopPrefetchCh := make(chan struct{}) + defer close(stopPrefetchCh) + //prefetch txs from all pending txs + txsPrefetch := txs.Copy() + tx := txsPrefetch.Peek() + txCurr := &tx + w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr) + for { // Check interruption signal and abort building if it's fired. if interrupt != nil { @@ -908,7 +919,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP break } // Retrieve the next transaction and abort if all done. - tx := txs.Peek() + tx = txs.Peek() if tx == nil { break } diff --git a/node/api.go b/node/api.go index 15892a270b..86fc445528 100644 --- a/node/api.go +++ b/node/api.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/internal/debug" @@ -132,7 +133,7 @@ func (api *adminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) } rpcSub := notifier.CreateSubscription() - go func() { + gopool.Submit(func() { events := make(chan *p2p.PeerEvent) sub := server.SubscribeEvents(events) defer sub.Unsubscribe() @@ -149,7 +150,7 @@ func (api *adminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) return } } - }() + }) return rpcSub, nil } diff --git a/p2p/dial.go b/p2p/dial.go index 134e6e2eae..49bf6d543a 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -27,6 +27,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" @@ -177,8 +178,8 @@ func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupF d.lastStatsLog = d.clock.Now() d.ctx, d.cancel = context.WithCancel(context.Background()) d.wg.Add(2) - go d.readNodes(it) - go d.loop(it) + gopool.Submit(func() { d.readNodes(it) }) + gopool.Submit(func() { d.loop(it) }) return d } @@ -441,10 +442,10 @@ func (d *dialScheduler) startDial(task *dialTask) { hkey := string(task.dest.ID().Bytes()) d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration)) d.dialing[task.dest.ID()] = task - go func() { + gopool.Submit(func() { task.run(d) d.doneCh <- task - }() + }) } // A dialTask generated for each node that is dialed. diff --git a/p2p/discover/lookup.go b/p2p/discover/lookup.go index b8d97b44e1..15b93bb5a0 100644 --- a/p2p/discover/lookup.go +++ b/p2p/discover/lookup.go @@ -21,6 +21,7 @@ import ( "errors" "time" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/p2p/enode" ) @@ -123,7 +124,7 @@ func (it *lookup) startQueries() bool { if !it.asked[n.ID()] { it.asked[n.ID()] = true it.queries++ - go it.query(n, it.replyCh) + gopool.Submit(func() { it.query(n, it.replyCh) }) } } // The lookup ends when no more nodes can be asked. diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 41d5ac6e34..0bcb540062 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -33,6 +33,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/netutil" @@ -229,7 +230,7 @@ func (tab *Table) loop() { defer copyNodes.Stop() // Start initial refresh. - go tab.doRefresh(refreshDone) + gopool.Submit(func() { tab.doRefresh(refreshDone) }) loop: for { @@ -238,13 +239,13 @@ loop: tab.seedRand() if refreshDone == nil { refreshDone = make(chan struct{}) - go tab.doRefresh(refreshDone) + gopool.Submit(func() { tab.doRefresh(refreshDone) }) } case req := <-tab.refreshReq: waiting = append(waiting, req) if refreshDone == nil { refreshDone = make(chan struct{}) - go tab.doRefresh(refreshDone) + gopool.Submit(func() { tab.doRefresh(refreshDone) }) } case <-refreshDone: for _, ch := range waiting { @@ -253,12 +254,12 @@ loop: waiting, refreshDone = nil, nil case <-revalidate.C: revalidateDone = make(chan struct{}) - go tab.doRevalidate(revalidateDone) + gopool.Submit(func() { tab.doRevalidate(revalidateDone) }) case <-revalidateDone: revalidate.Reset(tab.nextRevalidateTime()) revalidateDone = nil case <-copyNodes.C: - go tab.copyLiveNodes() + gopool.Submit(func() { tab.copyLiveNodes() }) case <-tab.closeReq: break loop } diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 67cd2c004c..cbf396df3f 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -29,6 +29,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover/v4wire" @@ -488,7 +489,7 @@ func (t *UDPv4) loop() { if contTimeouts > ntpFailureThreshold { if time.Since(ntpWarnTime) >= ntpWarningCooldown { ntpWarnTime = time.Now() - go checkClockDrift() + gopool.Submit(func() { checkClockDrift() }) } contTimeouts = 0 } diff --git a/p2p/enode/iter.go b/p2p/enode/iter.go index b8ab4a758a..364ac9bee7 100644 --- a/p2p/enode/iter.go +++ b/p2p/enode/iter.go @@ -19,6 +19,8 @@ package enode import ( "sync" "time" + + "github.com/ethereum/go-ethereum/common/gopool" ) // Iterator represents a sequence of nodes. The Next method moves to the next node in the @@ -177,7 +179,9 @@ func (m *FairMix) AddSource(it Iterator) { m.wg.Add(1) source := &mixSource{it, make(chan *Node), m.timeout} m.sources = append(m.sources, source) - go m.runSource(m.closed, source) + gopool.Submit(func() { + m.runSource(m.closed, source) + }) } // Close shuts down the mixer and all current sources. diff --git a/p2p/enode/nodedb.go b/p2p/enode/nodedb.go index 7e7fb69b29..da2140582a 100644 --- a/p2p/enode/nodedb.go +++ b/p2p/enode/nodedb.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/rlp" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/errors" @@ -303,7 +304,7 @@ func deleteRange(db *leveldb.DB, prefix []byte) { // convergence, it's simpler to "ensure" the correct state when an appropriate // condition occurs (i.e. a successful bonding), and discard further events. func (db *DB) ensureExpirer() { - db.runner.Do(func() { go db.expirer() }) + db.runner.Do(func() { gopool.Submit(func() { db.expirer() }) }) } // expirer should be started in a go routine, and is responsible for looping ad diff --git a/p2p/nat/nat.go b/p2p/nat/nat.go index ad4c36582a..65424e5b7a 100644 --- a/p2p/nat/nat.go +++ b/p2p/nat/nat.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/log" natpmp "github.com/jackpal/go-nat-pmp" ) @@ -145,8 +146,8 @@ func Any() Interface { // Internet-class address. Return ExtIP in this case. return startautodisc("UPnP or NAT-PMP", func() Interface { found := make(chan Interface, 2) - go func() { found <- discoverUPnP() }() - go func() { found <- discoverPMP() }() + gopool.Submit(func() { found <- discoverUPnP() }) + gopool.Submit(func() { found <- discoverPMP() }) for i := 0; i < cap(found); i++ { if c := <-found; c != nil { return c diff --git a/p2p/peer.go b/p2p/peer.go index 469a1b7974..9978512b92 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -410,7 +411,7 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) rw = newMsgEventer(rw, p.events, p.ID(), proto.Name, p.Info().Network.RemoteAddress, p.Info().Network.LocalAddress) } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) - go func() { + gopool.Submit(func() { defer p.wg.Done() err := proto.Run(p, rw) if err == nil { @@ -420,7 +421,7 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) } p.protoErr <- err - }() + }) } } diff --git a/p2p/server.go b/p2p/server.go index 610b82d784..9d186d42c4 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -30,6 +30,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" @@ -526,12 +527,12 @@ func (srv *Server) setupLocalNode() error { // Ask the router about the IP. This takes a while and blocks startup, // do it in the background. srv.loopWG.Add(1) - go func() { + gopool.Submit(func() { defer srv.loopWG.Done() if ip, err := srv.NAT.ExternalIP(); err == nil { srv.localnode.SetStaticIP(ip) } - }() + }) } return nil } @@ -574,10 +575,10 @@ func (srv *Server) setupDiscovery() error { if srv.NAT != nil { if !realaddr.IP.IsLoopback() { srv.loopWG.Add(1) - go func() { + gopool.Submit(func() { nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery") srv.loopWG.Done() - }() + }) } } srv.localnode.SetFallbackUDP(realaddr.Port) @@ -681,10 +682,10 @@ func (srv *Server) setupListening() error { srv.localnode.Set(enr.TCP(tcp.Port)) if !tcp.IP.IsLoopback() && srv.NAT != nil { srv.loopWG.Add(1) - go func() { + gopool.Submit(func() { nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p") srv.loopWG.Done() - }() + }) } } @@ -902,10 +903,10 @@ func (srv *Server) listenLoop() { fd = newMeteredConn(fd, true, addr) srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) } - go func() { + gopool.Submit(func() { srv.SetupConn(fd, inboundConn, nil) slots <- struct{}{} - }() + }) } } diff --git a/p2p/simulations/examples/ping-pong.go b/p2p/simulations/examples/ping-pong.go index d9b51dc09b..66b55be6cd 100644 --- a/p2p/simulations/examples/ping-pong.go +++ b/p2p/simulations/examples/ping-pong.go @@ -25,6 +25,7 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" @@ -140,7 +141,7 @@ func (p *pingPongService) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error { log := p.log.New("peer.id", peer.ID()) errC := make(chan error, 1) - go func() { + gopool.Submit(func() { for range time.Tick(10 * time.Second) { log.Info("sending ping") if err := p2p.Send(rw, pingMsgCode, "PING"); err != nil { @@ -148,8 +149,8 @@ func (p *pingPongService) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error { return } } - }() - go func() { + }) + gopool.Submit(func() { for { msg, err := rw.ReadMsg() if err != nil { @@ -165,9 +166,9 @@ func (p *pingPongService) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error { atomic.AddInt64(&p.received, 1) if msg.Code == pingMsgCode { log.Info("sending pong") - go p2p.Send(rw, pongMsgCode, "PONG") + gopool.Submit(func() { p2p.Send(rw, pongMsgCode, "PONG") }) } } - }() + }) return <-errC } diff --git a/p2p/transport.go b/p2p/transport.go index 4f6bb569bf..299e9db8dd 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/rlpx" "github.com/ethereum/go-ethereum/rlp" @@ -137,7 +138,7 @@ func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHands // disconnects us early with a valid reason, we should return it // as the error so it can be tracked elsewhere. werr := make(chan error, 1) - go func() { werr <- Send(t, handshakeMsg, our) }() + gopool.Submit(func() { werr <- Send(t, handshakeMsg, our) }) if their, err = readProtocolHandshake(t); err != nil { <-werr // make sure the write terminates too return nil, err diff --git a/rpc/handler.go b/rpc/handler.go index c2e7d7dc08..63754c457c 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/log" ) @@ -341,12 +342,12 @@ func (h *handler) cancelServerSubscriptions(err error) { // startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group. func (h *handler) startCallProc(fn func(*callProc)) { h.callWG.Add(1) - go func() { + gopool.Submit(func() { ctx, cancel := context.WithCancel(h.rootCtx) defer h.callWG.Done() defer cancel() fn(&callProc{ctx: ctx}) - }() + }) } // handleImmediate executes non-call messages. It returns false if the message is a diff --git a/trie/database.go b/trie/database.go index 200ed3674b..fcaeb3e9ae 100644 --- a/trie/database.go +++ b/trie/database.go @@ -87,6 +87,11 @@ type Database struct { childrenSize common.StorageSize // Storage size of the external children tracking preimages *preimageStore // The store for caching preimages + // metrics with light lock + sizeLock sync.RWMutex + roughPreimagesSize common.StorageSize + roughDirtiesSize common.StorageSize + lock sync.RWMutex } @@ -439,9 +444,20 @@ func (db *Database) Nodes() []common.Hash { // are referenced together by database itself. func (db *Database) Reference(child common.Hash, parent common.Hash) { db.lock.Lock() - defer db.lock.Unlock() db.reference(child, parent) + var roughDirtiesSize = common.StorageSize((len(db.dirties)-1)*cachedNodeSize) + db.dirtiesSize + db.childrenSize - common.StorageSize(len(db.dirties[common.Hash{}].children)*(common.HashLength+2)) + var roughPreimagesSize = common.StorageSize(0) + if db.preimages != nil { + roughPreimagesSize = db.preimages.size() + } + + db.lock.Unlock() + + db.sizeLock.Lock() + db.roughDirtiesSize = roughDirtiesSize + db.roughPreimagesSize = roughPreimagesSize + db.sizeLock.Unlock() } // reference is the private locked version of Reference. @@ -818,19 +834,10 @@ func (db *Database) Update(nodes *MergedNodeSet) error { // Size returns the current storage size of the memory cache in front of the // persistent database layer. func (db *Database) Size() (common.StorageSize, common.StorageSize) { - db.lock.RLock() - defer db.lock.RUnlock() + db.sizeLock.RLock() + defer db.sizeLock.RUnlock() - // db.dirtiesSize only contains the useful data in the cache, but when reporting - // the total memory consumption, the maintenance metadata is also needed to be - // counted. - var metadataSize = common.StorageSize((len(db.dirties) - 1) * cachedNodeSize) - var metarootRefs = common.StorageSize(len(db.dirties[common.Hash{}].children) * (common.HashLength + 2)) - var preimageSize common.StorageSize - if db.preimages != nil { - preimageSize = db.preimages.size() - } - return db.dirtiesSize + db.childrenSize + metadataSize - metarootRefs, preimageSize + return db.roughDirtiesSize, db.roughPreimagesSize } // GetReader retrieves a node reader belonging to the given state root. diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 83b92cebd2..4a26c4adf3 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -242,6 +242,19 @@ func (t *StateTrie) Copy() *StateTrie { } } +// ResetCopy returns a copy of StateTrie with the secKeyCache cleared. +func (t *SecureTrie) ResetCopy() *SecureTrie { + cpy := *t + cpy.secKeyCacheOwner = nil + cpy.secKeyCache = nil + return &cpy +} + +// GetRawTrie returns the underlying trie. +func (t *SecureTrie) GetRawTrie() Trie { + return t.trie +} + // NodeIterator returns an iterator that returns nodes of the underlying trie. Iteration // starts at the key after the given start key. func (t *StateTrie) NodeIterator(start []byte) NodeIterator { diff --git a/trie/sync.go b/trie/sync.go index 4f55845991..ddd32ef88c 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" @@ -466,26 +467,27 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { } // Check the presence of children concurrently pending.Add(1) - go func(child childNode) { + copiedChild := child // copy for closure + gopool.Submit(func() { defer pending.Done() // If database says duplicate, then at least the trie node is present // and we hold the assumption that it's NOT legacy contract code. var ( chash = common.BytesToHash(node) - owner, inner = ResolvePath(child.path) + owner, inner = ResolvePath(copiedChild.path) ) if rawdb.HasTrieNode(s.database, owner, inner, chash, s.scheme) { return } // Locally unknown node, schedule for retrieval missing <- &nodeRequest{ - path: child.path, + path: copiedChild.path, hash: chash, parent: req, callback: req.callback, } - }(child) + }) } } pending.Wait() diff --git a/trie/trie.go b/trie/trie.go index 17bacba00f..d98fe992d5 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -608,3 +608,8 @@ func (t *Trie) Reset() { t.unhashed = 0 t.tracer.reset() } + +// Size returns the number of nodes in the trie. +func (t *Trie) Size() int { + return estimateSize(t.root) +}