Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream patch: add sharedStorage for prefetching to L1 #10

Open
wants to merge 1 commit into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2101,7 +2101,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
statedb, err := state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}
Expand Down
39 changes: 39 additions & 0 deletions core/state/shared_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package state

import (
"sync"

"github.com/ethereum/go-ethereum/common"
)

// sharedPool is used to store maps of originStorage of stateObjects
type StoragePool struct {
sync.RWMutex
sharedMap map[common.Address]*sync.Map
}

func NewStoragePool() *StoragePool {
sharedMap := make(map[common.Address]*sync.Map)
return &StoragePool{
sync.RWMutex{},
sharedMap,
}
}

// getStorage Check whether the storage exist in pool,
// new one if not exist, the content of storage will be fetched in stateObjects.GetCommittedState()
func (s *StoragePool) getStorage(address common.Address) *sync.Map {
s.RLock()
storageMap, ok := s.sharedMap[address]
s.RUnlock()
if !ok {
s.Lock()
defer s.Unlock()
if storageMap, ok = s.sharedMap[address]; !ok {
m := new(sync.Map)
s.sharedMap[address] = m
return m
}
}
return storageMap
}
53 changes: 43 additions & 10 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -79,7 +80,9 @@ type StateObject struct {
trie Trie // storage trie, which becomes non-nil on first access
code Code // contract bytecode, which gets set when code is loaded

originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
sharedOriginStorage *sync.Map // Storage cache of original entries to dedup rewrites, reset for every transaction

pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.
Expand Down Expand Up @@ -120,14 +123,21 @@ func newObject(db *StateDB, address common.Address, data Account) *StateObject {
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
var storageMap *sync.Map
// Check whether the storage exist in pool, new originStorage if not exist
if db != nil && db.storagePool != nil {
storageMap = db.GetStorage(address)
}

return &StateObject{
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
sharedOriginStorage: storageMap,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
}
}

Expand Down Expand Up @@ -194,6 +204,29 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash {
return s.GetCommittedState(db, key)
}

func (s *StateObject) getOriginStorage(key common.Hash) (common.Hash, bool) {
if value, cached := s.originStorage[key]; cached {
return value, true
}
// if L1 cache miss, try to get it from shared pool
if s.sharedOriginStorage != nil {
val, ok := s.sharedOriginStorage.Load(key)
if !ok {
return common.Hash{}, false
}
s.originStorage[key] = val.(common.Hash)
return val.(common.Hash), true
}
return common.Hash{}, false
}

func (s *StateObject) setOriginStorage(key common.Hash, value common.Hash) {
if s.db.writeOnSharedStorage && s.sharedOriginStorage != nil {
s.sharedOriginStorage.Store(key, value)
}
s.originStorage[key] = value
}

// GetCommittedState retrieves a value from the committed account storage trie.
func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Hash {
// If the fake storage is set, only lookup the state here(in the debugging mode)
Expand All @@ -204,7 +237,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
if value, pending := s.pendingStorage[key]; pending {
return value
}
if value, cached := s.originStorage[key]; cached {
if value, cached := s.getOriginStorage(key); cached {
return value
}
// If no live objects are available, attempt to use snapshots
Expand Down Expand Up @@ -263,7 +296,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
value.SetBytes(content)
}
s.originStorage[key] = value
s.setOriginStorage(key, value)
return value
}

Expand Down
103 changes: 25 additions & 78 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ import (
"github.com/ethereum/go-ethereum/trie"
)

const (
preLoadLimit = 128
defaultNumOfSlots = 100
)
const defaultNumOfSlots = 100

type revision struct {
id int
Expand Down Expand Up @@ -101,6 +98,9 @@ type StateDB struct {
stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie
stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution

storagePool *StoragePool // sharedPool to store L1 originStorage of stateObjects
writeOnSharedStorage bool // Write to the shared origin storage of a stateObject while reading from the underlying storage layer.

// DB error.
// State objects are used by the consensus core and VM which are
// unable to deal with database-level errors. Any error that occurs
Expand Down Expand Up @@ -147,6 +147,16 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
return newStateDB(root, db, snaps)
}

// NewWithSharedPool creates a new state with sharedStorge on layer 1.5
func NewWithSharedPool(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) {
statedb, err := newStateDB(root, db, snaps)
if err != nil {
return nil, err
}
statedb.storagePool = NewStoragePool()
return statedb, nil
}

func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) {
sdb := &StateDB{
db: db,
Expand Down Expand Up @@ -178,6 +188,10 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB,
return sdb, nil
}

func (s *StateDB) EnableWriteOnSharedStorage() {
s.writeOnSharedStorage = true
}

// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
Expand Down Expand Up @@ -592,78 +606,6 @@ func (s *StateDB) getStateObject(addr common.Address) *StateObject {
return nil
}

func (s *StateDB) TryPreload(block *types.Block, signer types.Signer) {
accounts := make(map[common.Address]bool, block.Transactions().Len())
accountsSlice := make([]common.Address, 0, block.Transactions().Len())
for _, tx := range block.Transactions() {
from, err := types.Sender(signer, tx)
if err != nil {
break
}
accounts[from] = true
if tx.To() != nil {
accounts[*tx.To()] = true
}
}
for account := range accounts {
accountsSlice = append(accountsSlice, account)
}
if len(accountsSlice) >= preLoadLimit && len(accountsSlice) > runtime.NumCPU() {
objsChan := make(chan []*StateObject, runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
start := i * len(accountsSlice) / runtime.NumCPU()
end := (i + 1) * len(accountsSlice) / runtime.NumCPU()
if i+1 == runtime.NumCPU() {
end = len(accountsSlice)
}
go func(start, end int) {
objs := s.preloadStateObject(accountsSlice[start:end])
objsChan <- objs
}(start, end)
}
for i := 0; i < runtime.NumCPU(); i++ {
objs := <-objsChan
for _, obj := range objs {
s.SetStateObject(obj)
}
}
}
}

func (s *StateDB) preloadStateObject(address []common.Address) []*StateObject {
// Prefer live objects if any is available
if s.snap == nil {
return nil
}
hasher := crypto.NewKeccakState()
objs := make([]*StateObject, 0, len(address))
for _, addr := range address {
// If no live objects are available, attempt to use snapshots
if acc, err := s.snap.Account(crypto.HashData(hasher, addr.Bytes())); err == nil {
if acc == nil {
continue
}
data := &Account{
Nonce: acc.Nonce,
Balance: acc.Balance,
CodeHash: acc.CodeHash,
Root: common.BytesToHash(acc.Root),
}
if len(data.CodeHash) == 0 {
data.CodeHash = emptyCodeHash
}
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
// Insert into the live set
obj := newObject(s, addr, *data)
objs = append(objs, obj)
}
// Do not enable this feature when snapshot is not enabled.
}
return objs
}

// getDeletedStateObject is similar to getStateObject, but instead of returning
// nil for a deleted state object, it returns the actual object with the deleted
// flag set. This is needed by the state journal to revert to the correct s-
Expand Down Expand Up @@ -779,8 +721,8 @@ func (s *StateDB) createObject(addr common.Address) (newobj, prev *StateObject)
// CreateAccount is called during the EVM CREATE operation. The situation might arise that
// a contract does the following:
//
// 1. sends funds to sha(account ++ (nonce + 1))
// 2. tx_create(sha(account ++ nonce)) (note that this gets the address of 1)
// 1. sends funds to sha(account ++ (nonce + 1))
// 2. tx_create(sha(account ++ nonce)) (note that this gets the address of 1)
//
// Carrying over the balance ensures that Ether doesn't disappear.
func (s *StateDB) CreateAccount(addr common.Address) {
Expand Down Expand Up @@ -829,6 +771,7 @@ func (s *StateDB) Copy() *StateDB {
stateObjects: make(map[common.Address]*StateObject, len(s.journal.dirties)),
stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)),
stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)),
storagePool: s.storagePool,
refund: s.refund,
logs: make(map[common.Hash][]*types.Log, len(s.logs)),
logSize: s.logSize,
Expand Down Expand Up @@ -1634,3 +1577,7 @@ func (s *StateDB) GetDirtyAccounts() []common.Address {
}
return accounts
}

func (s *StateDB) GetStorage(address common.Address) *sync.Map {
return s.storagePool.getStorage(address)
}
1 change: 1 addition & 0 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
for i := 0; i < prefetchThread; i++ {
go func(idx int) {
newStatedb := statedb.Copy()
newStatedb.EnableWriteOnSharedStorage()
gaspool := new(GasPool).AddGas(block.GasLimit())
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
Expand Down
1 change: 0 additions & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
gp = new(GasPool).AddGas(block.GasLimit())
)
signer := types.MakeSigner(p.bc.chainConfig, block.Number())
statedb.TryPreload(block, signer)
var receipts = make([]*types.Receipt, 0)

blockContext := NewEVMBlockContext(header, p.bc, nil)
Expand Down