Skip to content

Commit

Permalink
add sharedStorage for prefetching to L1
Browse files Browse the repository at this point in the history
  • Loading branch information
flywukong committed Mar 11, 2022
1 parent 21a3b11 commit 8bccd6f
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 13 deletions.
37 changes: 37 additions & 0 deletions core/state/shared_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package state

import (
"github.com/ethereum/go-ethereum/common"
"sync"
)

// sharedStorage is used to store maps of originStorage of stateObjects
type SharedStorage struct {
poolLock *sync.RWMutex
shared_map map[common.Address]sync.Map
}

func NewSharedStorage() SharedStorage {
sharedMap := make(map[common.Address]sync.Map, 1000)
return SharedStorage{
poolLock: &sync.RWMutex{},
shared_map: sharedMap,
}
}

// Check whether the storage exist in pool,
// new one if not exist, it will be fetched in stateObjects.GetCommittedState()
func (srv *SharedStorage) GetOrInsertStorage(address common.Address) sync.Map {
srv.poolLock.RLock()
storageMap, ok := srv.shared_map[address]
srv.poolLock.RUnlock()

if !ok {
m := sync.Map{}
srv.poolLock.Lock()
srv.shared_map[address] = m
srv.poolLock.Unlock()
return srv.shared_map[address]
}
return storageMap
}
33 changes: 21 additions & 12 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -79,10 +80,10 @@ type StateObject struct {
trie Trie // storage trie, which becomes non-nil on first access
code Code // contract bytecode, which gets set when code is loaded

originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.
originStorage *sync.Map // Storage cache of original entries to dedup rewrites, reset for every transaction
pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.

// Cache flags.
// When an object is marked suicided it will be delete from the trie
Expand Down Expand Up @@ -120,12 +121,15 @@ func newObject(db *StateDB, address common.Address, data Account) *StateObject {
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
// Check whether the storage exist in pool, new originStorage if not exist
storageMap := db.sharedStorage.GetOrInsertStorage(address)

return &StateObject{
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
originStorage: make(Storage),
originStorage: &storageMap,
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
}
Expand Down Expand Up @@ -204,8 +208,9 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
if value, pending := s.pendingStorage[key]; pending {
return value
}
if value, cached := s.originStorage[key]; cached {
return value

if value, cached := s.originStorage.Load(key); cached {
return value.(common.Hash)
}
// If no live objects are available, attempt to use snapshots
var (
Expand Down Expand Up @@ -263,7 +268,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
value.SetBytes(content)
}
s.originStorage[key] = value
s.originStorage.Store(key, value)
return value
}

Expand Down Expand Up @@ -316,7 +321,11 @@ func (s *StateObject) finalise(prefetch bool) {
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
s.pendingStorage[key] = value
if value != s.originStorage[key] {
}

for key, value := range s.dirtyStorage {
originValue, _ := s.originStorage.Load(key)
if value != originValue.(common.Hash) {
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}
}
Expand Down Expand Up @@ -352,10 +361,11 @@ func (s *StateObject) updateTrie(db Database) Trie {
usedStorage := make([][]byte, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
originValue, _ := s.originStorage.Load(key)
if value == originValue.(common.Hash) {
continue
}
s.originStorage[key] = value
s.originStorage.Store(key, value)

var v []byte
if (value == common.Hash{}) {
Expand Down Expand Up @@ -478,7 +488,6 @@ func (s *StateObject) deepCopy(db *StateDB) *StateObject {
}
stateObject.code = s.code
stateObject.dirtyStorage = s.dirtyStorage.Copy()
stateObject.originStorage = s.originStorage.Copy()
stateObject.pendingStorage = s.pendingStorage.Copy()
stateObject.suicided = s.suicided
stateObject.dirtyCode = s.dirtyCode
Expand Down
11 changes: 11 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type StateDB struct {
stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie
stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution

// shared_pool to store L1 originStorage of stateObjects
sharedStorage SharedStorage

// DB error.
// State objects are used by the consensus core and VM which are
// unable to deal with database-level errors. Any error that occurs
Expand Down Expand Up @@ -155,6 +158,7 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB,
stateObjects: make(map[common.Address]*StateObject, defaultNumOfSlots),
stateObjectsPending: make(map[common.Address]struct{}, defaultNumOfSlots),
stateObjectsDirty: make(map[common.Address]struct{}, defaultNumOfSlots),
sharedStorage: NewSharedStorage(),
logs: make(map[common.Hash][]*types.Log, defaultNumOfSlots),
preimages: make(map[common.Hash][]byte),
journal: newJournal(),
Expand Down Expand Up @@ -820,6 +824,12 @@ func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common

// Copy creates a deep, independent copy of the state.
// Snapshots of the copied state cannot be applied to the copy.
func (s *StateDB) CopyWithSharedStorage() *StateDB {
state := s.Copy()
state.sharedStorage = s.sharedStorage
return state
}

func (s *StateDB) Copy() *StateDB {
// Copy all the basic fields, initialize the memory ones
state := &StateDB{
Expand All @@ -828,6 +838,7 @@ func (s *StateDB) Copy() *StateDB {
stateObjects: make(map[common.Address]*StateObject, len(s.journal.dirties)),
stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)),
stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)),
sharedStorage: NewSharedStorage(),
refund: s.refund,
logs: make(map[common.Hash][]*types.Log, len(s.logs)),
logSize: s.logSize,
Expand Down
2 changes: 1 addition & 1 deletion core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
// No need to execute the first batch, since the main processor will do it.
for i := 0; i < prefetchThread; i++ {
go func(idx int) {
newStatedb := statedb.Copy()
newStatedb := statedb.CopyWithSharedStorage()
gaspool := new(GasPool).AddGas(block.GasLimit())
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
Expand Down

0 comments on commit 8bccd6f

Please sign in to comment.