Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]add sharedStorage for prefetching to L1 #792

Merged
merged 15 commits into from
Mar 28, 2022
4 changes: 2 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2101,7 +2101,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
statedb, err := state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}
Expand All @@ -2112,7 +2112,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
var followupInterrupt uint32
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
throwaway := statedb.CopyWithSharedStorage()
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
}(time.Now(), block, throwaway, &followupInterrupt)
Expand Down
36 changes: 36 additions & 0 deletions core/state/shared_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rename the file with: shared_storage_pool.go, shared_pool.go is a bit confusing, shared what?


import (
"github.com/ethereum/go-ethereum/common"
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
"sync"
)

// sharedStorage is used to store maps of originStorage of stateObjects
type SharedStorage struct {
*sync.RWMutex
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
sharedMap map[common.Address]*sync.Map
}

func NewSharedStorage() *SharedStorage {
sharedMap := make(map[common.Address]*sync.Map, 1500)
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
return &SharedStorage{
&sync.RWMutex{},
sharedMap,
}
}

// Check whether the storage exist in pool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to follow the commenting style used in go-ethereum, like, // Copy creates a deep, independent copy of the state. // Snapshots of the copied state cannot be applied to the copy. func (s *StateDB) Copy() *StateDB {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

// new one if not exist, it will be fetched in stateObjects.GetCommittedState()
func (storage *SharedStorage) getOrInertStorage(address common.Address) *sync.Map {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
storage.RLock()
storageMap, ok := storage.sharedMap[address]
storage.RUnlock()
if !ok {
m := new(sync.Map)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has race condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

storage.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concurrent race condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, newObjects called by main preoces or prefetcher will both call this function to check or update the sharedpool

storage.sharedMap[address] = m
storage.Unlock()
return m
}
return storageMap
}
63 changes: 49 additions & 14 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -79,7 +80,9 @@ type StateObject struct {
trie Trie // storage trie, which becomes non-nil on first access
code Code // contract bytecode, which gets set when code is loaded

originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
sharedOriginMap *sync.Map // Storage cache of original entries to dedup rewrites, reset for every transaction
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
originStorage Storage

pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.
Expand Down Expand Up @@ -120,14 +123,21 @@ func newObject(db *StateDB, address common.Address, data Account) *StateObject {
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
var storageMap *sync.Map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename storageMap to sharedStorage, storage is a Map by itself.

// Check whether the storage exist in pool, new originStorage if not exist
if db != nil && db.sharedStorage != nil {
storageMap = db.GetOrInsertStorage(address)
}

return &StateObject{
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
sharedOriginMap: storageMap,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
}
}

Expand Down Expand Up @@ -194,6 +204,28 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash {
return s.GetCommittedState(db, key)
}

func (s *StateObject) getStorageKey(key common.Hash) (common.Hash, bool) {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
if value, cached := s.originStorage[key]; cached {
return value, true
}
// if L1 cache miss, try to get it from shared pool
if s.sharedOriginMap != nil {
val, ok := s.sharedOriginMap.Load(key)
if !ok {
return common.Hash{}, false
}
setunapo marked this conversation as resolved.
Show resolved Hide resolved
return val.(common.Hash), ok
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
}
return common.Hash{}, false
}

func (s *StateObject) setStorgeKey(key common.Hash, value common.Hash) {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
if s.db.isPrefetchDb {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
s.sharedOriginMap.Store(key, value)
}
s.originStorage[key] = value
}

// GetCommittedState retrieves a value from the committed account storage trie.
func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Hash {
// If the fake storage is set, only lookup the state here(in the debugging mode)
Expand All @@ -204,7 +236,8 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
if value, pending := s.pendingStorage[key]; pending {
return value
}
if value, cached := s.originStorage[key]; cached {

if value, cached := s.getStorageKey(key); cached {
return value
}
// If no live objects are available, attempt to use snapshots
Expand Down Expand Up @@ -263,7 +296,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
value.SetBytes(content)
}
s.originStorage[key] = value
s.setStorgeKey(key, value)
return value
}

Expand Down Expand Up @@ -316,10 +349,12 @@ func (s *StateObject) finalise(prefetch bool) {
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
s.pendingStorage[key] = value
if value != s.originStorage[key] {
originValue, cached := s.getStorageKey(key)
if cached && value != originValue {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}
}

if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash)
}
Expand Down Expand Up @@ -352,10 +387,11 @@ func (s *StateObject) updateTrie(db Database) Trie {
usedStorage := make([][]byte, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
originValue, cached := s.getStorageKey(key)
if cached && value == originValue {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
continue
}
s.originStorage[key] = value
s.setStorgeKey(key, value)

var v []byte
if (value == common.Hash{}) {
Expand Down Expand Up @@ -478,7 +514,6 @@ func (s *StateObject) deepCopy(db *StateDB) *StateObject {
}
stateObject.code = s.code
stateObject.dirtyStorage = s.dirtyStorage.Copy()
stateObject.originStorage = s.originStorage.Copy()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
stateObject.pendingStorage = s.pendingStorage.Copy()
stateObject.suicided = s.suicided
stateObject.dirtyCode = s.dirtyCode
Expand Down
100 changes: 28 additions & 72 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type StateDB struct {
stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie
stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution

// shared_pool to store L1 originStorage of stateObjects
sharedStorage *SharedStorage
isPrefetchDb bool
// DB error.
// State objects are used by the consensus core and VM which are
// unable to deal with database-level errors. Any error that occurs
Expand Down Expand Up @@ -147,6 +150,16 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
return newStateDB(root, db, snaps)
}

// NewWithSharedPool creates a new state with sharedStorge on layer 1.5
func NewWithSharedPool(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) {
statedb, err := newStateDB(root, db, snaps)
if err != nil {
return nil, err
}
statedb.sharedStorage = NewSharedStorage()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
return statedb, nil
}

func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) {
sdb := &StateDB{
db: db,
Expand All @@ -155,6 +168,8 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB,
stateObjects: make(map[common.Address]*StateObject, defaultNumOfSlots),
stateObjectsPending: make(map[common.Address]struct{}, defaultNumOfSlots),
stateObjectsDirty: make(map[common.Address]struct{}, defaultNumOfSlots),
sharedStorage: nil,
isPrefetchDb: false,
logs: make(map[common.Hash][]*types.Log, defaultNumOfSlots),
preimages: make(map[common.Hash][]byte),
journal: newJournal(),
Expand Down Expand Up @@ -591,78 +606,6 @@ func (s *StateDB) getStateObject(addr common.Address) *StateObject {
return nil
}

func (s *StateDB) TryPreload(block *types.Block, signer types.Signer) {
accounts := make(map[common.Address]bool, block.Transactions().Len())
accountsSlice := make([]common.Address, 0, block.Transactions().Len())
for _, tx := range block.Transactions() {
from, err := types.Sender(signer, tx)
if err != nil {
break
}
accounts[from] = true
if tx.To() != nil {
accounts[*tx.To()] = true
}
}
for account := range accounts {
accountsSlice = append(accountsSlice, account)
}
if len(accountsSlice) >= preLoadLimit && len(accountsSlice) > runtime.NumCPU() {
objsChan := make(chan []*StateObject, runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
start := i * len(accountsSlice) / runtime.NumCPU()
end := (i + 1) * len(accountsSlice) / runtime.NumCPU()
if i+1 == runtime.NumCPU() {
end = len(accountsSlice)
}
go func(start, end int) {
objs := s.preloadStateObject(accountsSlice[start:end])
objsChan <- objs
}(start, end)
}
for i := 0; i < runtime.NumCPU(); i++ {
objs := <-objsChan
for _, obj := range objs {
s.SetStateObject(obj)
}
}
}
}

func (s *StateDB) preloadStateObject(address []common.Address) []*StateObject {
// Prefer live objects if any is available
if s.snap == nil {
return nil
}
hasher := crypto.NewKeccakState()
objs := make([]*StateObject, 0, len(address))
for _, addr := range address {
// If no live objects are available, attempt to use snapshots
if acc, err := s.snap.Account(crypto.HashData(hasher, addr.Bytes())); err == nil {
if acc == nil {
continue
}
data := &Account{
Nonce: acc.Nonce,
Balance: acc.Balance,
CodeHash: acc.CodeHash,
Root: common.BytesToHash(acc.Root),
}
if len(data.CodeHash) == 0 {
data.CodeHash = emptyCodeHash
}
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
// Insert into the live set
obj := newObject(s, addr, *data)
objs = append(objs, obj)
}
// Do not enable this feature when snapshot is not enabled.
}
return objs
}

// getDeletedStateObject is similar to getStateObject, but instead of returning
// nil for a deleted state object, it returns the actual object with the deleted
// flag set. This is needed by the state journal to revert to the correct s-
Expand Down Expand Up @@ -818,6 +761,14 @@ func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common
return nil
}

// Used by prefetcher
func (s *StateDB) CopyWithSharedStorage() *StateDB {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
state := s.Copy()
state.sharedStorage = s.sharedStorage
state.isPrefetchDb = true
return state
}

// Copy creates a deep, independent copy of the state.
// Snapshots of the copied state cannot be applied to the copy.
func (s *StateDB) Copy() *StateDB {
Expand All @@ -828,6 +779,7 @@ func (s *StateDB) Copy() *StateDB {
stateObjects: make(map[common.Address]*StateObject, len(s.journal.dirties)),
stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)),
stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)),
sharedStorage: NewSharedStorage(),
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
refund: s.refund,
logs: make(map[common.Hash][]*types.Log, len(s.logs)),
logSize: s.logSize,
Expand Down Expand Up @@ -1633,3 +1585,7 @@ func (s *StateDB) GetDirtyAccounts() []common.Address {
}
return accounts
}

func (s *StateDB) GetOrInsertStorage(address common.Address) *sync.Map {
return s.sharedStorage.getOrInertStorage(address)
}
2 changes: 1 addition & 1 deletion core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
// No need to execute the first batch, since the main processor will do it.
for i := 0; i < prefetchThread; i++ {
go func(idx int) {
newStatedb := statedb.Copy()
newStatedb := statedb.CopyWithSharedStorage()
gaspool := new(GasPool).AddGas(block.GasLimit())
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
Expand Down
1 change: 0 additions & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
gp = new(GasPool).AddGas(block.GasLimit())
)
signer := types.MakeSigner(p.bc.chainConfig, block.Number())
statedb.TryPreload(block, signer)
var receipts = make([]*types.Receipt, 0)
// Mutate the block and state according to any hard-fork specs
if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
Expand Down