Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]add sharedStorage for prefetching to L1 #792

Merged
merged 15 commits into from
Mar 28, 2022
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2101,7 +2101,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
statedb, err := state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}
Expand Down
37 changes: 37 additions & 0 deletions core/state/shared_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rename the file with: shared_storage_pool.go, shared_pool.go is a bit confusing, shared what?


import (
"sync"

"github.com/ethereum/go-ethereum/common"
)

// sharedPool is used to store maps of originStorage of stateObjects
type StoragePool struct {
sync.RWMutex
sharedMap map[common.Address]*sync.Map
}

func NewStoragePool() *StoragePool {
sharedMap := make(map[common.Address]*sync.Map, 500)
return &StoragePool{
sync.RWMutex{},
sharedMap,
}
}

// Check whether the storage exist in pool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to follow the commenting style used in go-ethereum, like, // Copy creates a deep, independent copy of the state. // Snapshots of the copied state cannot be applied to the copy. func (s *StateDB) Copy() *StateDB {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

// new one if not exist, it will be fetched in stateObjects.GetCommittedState()
func (s *StoragePool) getStorage(address common.Address) *sync.Map {
s.RLock()
storageMap, ok := s.sharedMap[address]
s.RUnlock()
if !ok {
m := new(sync.Map)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has race condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

s.Lock()
s.sharedMap[address] = m
s.Unlock()
return m
}
return storageMap
}
55 changes: 44 additions & 11 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -79,7 +80,9 @@ type StateObject struct {
trie Trie // storage trie, which becomes non-nil on first access
code Code // contract bytecode, which gets set when code is loaded

originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
sharedOriginStorage *sync.Map // Storage cache of original entries to dedup rewrites, reset for every transaction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Storage cache of original entries to dedup rewrites, reset for every transaction
the comment is for originStorage, pls add a new comment for sharedOriginStorage

originStorage Storage

pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.
Expand Down Expand Up @@ -120,14 +123,21 @@ func newObject(db *StateDB, address common.Address, data Account) *StateObject {
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
var storageMap *sync.Map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename storageMap to sharedStorage, storage is a Map by itself.

// Check whether the storage exist in pool, new originStorage if not exist
if db != nil && db.storagePool != nil {
storageMap = db.GetStorage(address)
}

return &StateObject{
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
sharedOriginStorage: storageMap,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
}
}

Expand Down Expand Up @@ -194,6 +204,28 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash {
return s.GetCommittedState(db, key)
}

func (s *StateObject) getOriginStorage(key common.Hash) (common.Hash, bool) {
if value, cached := s.originStorage[key]; cached {
return value, true
}
// if L1 cache miss, try to get it from shared pool
if s.sharedOriginStorage != nil {
val, ok := s.sharedOriginStorage.Load(key)
if !ok {
return common.Hash{}, false
}
setunapo marked this conversation as resolved.
Show resolved Hide resolved
return val.(common.Hash), true
}
return common.Hash{}, false
}

func (s *StateObject) setOriginStorage(key common.Hash, value common.Hash) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add a new APIsetOriginStorage either

if s.db.dbForSpeedup && s.sharedOriginStorage != nil {
s.sharedOriginStorage.Store(key, value)
}
s.originStorage[key] = value
}

// GetCommittedState retrieves a value from the committed account storage trie.
func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Hash {
// If the fake storage is set, only lookup the state here(in the debugging mode)
Expand All @@ -204,7 +236,8 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
if value, pending := s.pendingStorage[key]; pending {
return value
}
if value, cached := s.originStorage[key]; cached {

if value, cached := s.getOriginStorage(key); cached {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can leave it unchanged; the API: getOriginStorage can be removed;
// Try to get it from origin storage prefetch pool
if s.sharedOriginStorage != nil {
val, ok := s.sharedOriginStorage.Load(key)
if ok {
s.originStorage[key] = val.(common.Hash)
return val.(common.Hash), true
}
}

return value
}
// If no live objects are available, attempt to use snapshots
Expand Down Expand Up @@ -263,7 +296,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
value.SetBytes(content)
}
s.originStorage[key] = value
s.setOriginStorage(key, value)
return value
}

Expand Down Expand Up @@ -320,6 +353,7 @@ func (s *StateObject) finalise(prefetch bool) {
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}
}

if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash)
}
Expand Down Expand Up @@ -356,7 +390,6 @@ func (s *StateObject) updateTrie(db Database) Trie {
continue
}
s.originStorage[key] = value

var v []byte
if (value == common.Hash{}) {
s.setError(tr.TryDelete(key[:]))
Expand Down
95 changes: 23 additions & 72 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type StateDB struct {
stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie
stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution

storagePool *StoragePool // shared_pool to store L1 originStorage of stateObjects
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
dbForSpeedup bool
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
// DB error.
// State objects are used by the consensus core and VM which are
// unable to deal with database-level errors. Any error that occurs
Expand Down Expand Up @@ -147,6 +149,16 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
return newStateDB(root, db, snaps)
}

// NewWithSharedPool creates a new state with sharedStorge on layer 1.5
func NewWithSharedPool(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) {
statedb, err := newStateDB(root, db, snaps)
if err != nil {
return nil, err
}
statedb.storagePool = NewStoragePool()
return statedb, nil
}

func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) {
sdb := &StateDB{
db: db,
Expand All @@ -155,6 +167,8 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB,
stateObjects: make(map[common.Address]*StateObject, defaultNumOfSlots),
stateObjectsPending: make(map[common.Address]struct{}, defaultNumOfSlots),
stateObjectsDirty: make(map[common.Address]struct{}, defaultNumOfSlots),
storagePool: nil,
dbForSpeedup: false,
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
logs: make(map[common.Hash][]*types.Log, defaultNumOfSlots),
preimages: make(map[common.Hash][]byte),
journal: newJournal(),
Expand All @@ -178,6 +192,10 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB,
return sdb, nil
}

func (s *StateDB) SetPrefetchFlag() {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
s.dbForSpeedup = true
}

// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
Expand Down Expand Up @@ -591,78 +609,6 @@ func (s *StateDB) getStateObject(addr common.Address) *StateObject {
return nil
}

func (s *StateDB) TryPreload(block *types.Block, signer types.Signer) {
accounts := make(map[common.Address]bool, block.Transactions().Len())
accountsSlice := make([]common.Address, 0, block.Transactions().Len())
for _, tx := range block.Transactions() {
from, err := types.Sender(signer, tx)
if err != nil {
break
}
accounts[from] = true
if tx.To() != nil {
accounts[*tx.To()] = true
}
}
for account := range accounts {
accountsSlice = append(accountsSlice, account)
}
if len(accountsSlice) >= preLoadLimit && len(accountsSlice) > runtime.NumCPU() {
objsChan := make(chan []*StateObject, runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
start := i * len(accountsSlice) / runtime.NumCPU()
end := (i + 1) * len(accountsSlice) / runtime.NumCPU()
if i+1 == runtime.NumCPU() {
end = len(accountsSlice)
}
go func(start, end int) {
objs := s.preloadStateObject(accountsSlice[start:end])
objsChan <- objs
}(start, end)
}
for i := 0; i < runtime.NumCPU(); i++ {
objs := <-objsChan
for _, obj := range objs {
s.SetStateObject(obj)
}
}
}
}

func (s *StateDB) preloadStateObject(address []common.Address) []*StateObject {
// Prefer live objects if any is available
if s.snap == nil {
return nil
}
hasher := crypto.NewKeccakState()
objs := make([]*StateObject, 0, len(address))
for _, addr := range address {
// If no live objects are available, attempt to use snapshots
if acc, err := s.snap.Account(crypto.HashData(hasher, addr.Bytes())); err == nil {
if acc == nil {
continue
}
data := &Account{
Nonce: acc.Nonce,
Balance: acc.Balance,
CodeHash: acc.CodeHash,
Root: common.BytesToHash(acc.Root),
}
if len(data.CodeHash) == 0 {
data.CodeHash = emptyCodeHash
}
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
// Insert into the live set
obj := newObject(s, addr, *data)
objs = append(objs, obj)
}
// Do not enable this feature when snapshot is not enabled.
}
return objs
}

// getDeletedStateObject is similar to getStateObject, but instead of returning
// nil for a deleted state object, it returns the actual object with the deleted
// flag set. This is needed by the state journal to revert to the correct s-
Expand Down Expand Up @@ -828,6 +774,7 @@ func (s *StateDB) Copy() *StateDB {
stateObjects: make(map[common.Address]*StateObject, len(s.journal.dirties)),
stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)),
stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)),
storagePool: s.storagePool,
refund: s.refund,
logs: make(map[common.Hash][]*types.Log, len(s.logs)),
logSize: s.logSize,
Expand Down Expand Up @@ -1633,3 +1580,7 @@ func (s *StateDB) GetDirtyAccounts() []common.Address {
}
return accounts
}

func (s *StateDB) GetStorage(address common.Address) *sync.Map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to addGetStorage;
GetStorage is paired with SetStorage, it is for fakeStorage.
Do not occupy the name.

return s.storagePool.getStorage(address)
}
1 change: 1 addition & 0 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
for i := 0; i < prefetchThread; i++ {
go func(idx int) {
newStatedb := statedb.Copy()
newStatedb.SetPrefetchFlag()
gaspool := new(GasPool).AddGas(block.GasLimit())
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
Expand Down
1 change: 0 additions & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
gp = new(GasPool).AddGas(block.GasLimit())
)
signer := types.MakeSigner(p.bc.chainConfig, block.Number())
statedb.TryPreload(block, signer)
var receipts = make([]*types.Receipt, 0)
// Mutate the block and state according to any hard-fork specs
if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
Expand Down