Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]add sharedStorage for prefetching to L1 #792

Merged
merged 15 commits into from
Mar 28, 2022
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2112,7 +2112,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
var followupInterrupt uint32
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
throwaway := statedb.CopyWithSharedStorage()
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
}(time.Now(), block, throwaway, &followupInterrupt)
Expand Down
36 changes: 36 additions & 0 deletions core/state/shared_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rename the file with: shared_storage_pool.go, shared_pool.go is a bit confusing, shared what?


import (
"github.com/ethereum/go-ethereum/common"
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
"sync"
)

// sharedStorage is used to store maps of originStorage of stateObjects
type SharedStorage struct {
*sync.RWMutex
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
sharedMap map[common.Address]*sync.Map
}

func NewSharedStorage() *SharedStorage {
sharedMap := make(map[common.Address]*sync.Map, 1500)
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
return &SharedStorage{
&sync.RWMutex{},
sharedMap,
}
}

// Check whether the storage exist in pool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to follow the commenting style used in go-ethereum, like, // Copy creates a deep, independent copy of the state. // Snapshots of the copied state cannot be applied to the copy. func (s *StateDB) Copy() *StateDB {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

// new one if not exist, it will be fetched in stateObjects.GetCommittedState()
func (storage *SharedStorage) getOrInertStorage(address common.Address) *sync.Map {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
storage.RLock()
storageMap, ok := storage.sharedMap[address]
storage.RUnlock()
if !ok {
m := new(sync.Map)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has race condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

storage.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concurrent race condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, newObjects called by main preoces or prefetcher will both call this function to check or update the sharedpool

storage.sharedMap[address] = m
storage.Unlock()
return m
}
return storageMap
}
36 changes: 24 additions & 12 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -79,10 +80,10 @@ type StateObject struct {
trie Trie // storage trie, which becomes non-nil on first access
code Code // contract bytecode, which gets set when code is loaded

originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.
originStorage *sync.Map // Storage cache of original entries to dedup rewrites, reset for every transaction
pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.

// Cache flags.
// When an object is marked suicided it will be delete from the trie
Expand Down Expand Up @@ -120,12 +121,18 @@ func newObject(db *StateDB, address common.Address, data Account) *StateObject {
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
var storageMap *sync.Map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename storageMap to sharedStorage, storage is a Map by itself.

// Check whether the storage exist in pool, new originStorage if not exist
if db != nil {
storageMap = db.GetOrInsertStorage(address)
}

return &StateObject{
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
originStorage: make(Storage),
originStorage: storageMap,
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
}
Expand Down Expand Up @@ -204,8 +211,9 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
if value, pending := s.pendingStorage[key]; pending {
return value
}
if value, cached := s.originStorage[key]; cached {
return value

if value, cached := s.originStorage.Load(key); cached {
return value.(common.Hash)
}
// If no live objects are available, attempt to use snapshots
var (
Expand Down Expand Up @@ -263,7 +271,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
value.SetBytes(content)
}
s.originStorage[key] = value
s.originStorage.Store(key, value)
return value
}

Expand Down Expand Up @@ -316,7 +324,11 @@ func (s *StateObject) finalise(prefetch bool) {
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
s.pendingStorage[key] = value
if value != s.originStorage[key] {
}

for key, value := range s.dirtyStorage {
originValue, cached := s.originStorage.Load(key)
if cached && value != originValue.(common.Hash) {
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}
}
Expand Down Expand Up @@ -352,10 +364,11 @@ func (s *StateObject) updateTrie(db Database) Trie {
usedStorage := make([][]byte, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
originValue, cached := s.originStorage.Load(key)
if cached && value == originValue.(common.Hash) {
continue
}
s.originStorage[key] = value
s.originStorage.Store(key, value)

var v []byte
if (value == common.Hash{}) {
Expand Down Expand Up @@ -478,7 +491,6 @@ func (s *StateObject) deepCopy(db *StateDB) *StateObject {
}
stateObject.code = s.code
stateObject.dirtyStorage = s.dirtyStorage.Copy()
stateObject.originStorage = s.originStorage.Copy()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
stateObject.pendingStorage = s.pendingStorage.Copy()
stateObject.suicided = s.suicided
stateObject.dirtyCode = s.dirtyCode
Expand Down
15 changes: 15 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type StateDB struct {
stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie
stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution

// shared_pool to store L1 originStorage of stateObjects
sharedStorage *SharedStorage

// DB error.
// State objects are used by the consensus core and VM which are
// unable to deal with database-level errors. Any error that occurs
Expand Down Expand Up @@ -155,6 +158,7 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB,
stateObjects: make(map[common.Address]*StateObject, defaultNumOfSlots),
stateObjectsPending: make(map[common.Address]struct{}, defaultNumOfSlots),
stateObjectsDirty: make(map[common.Address]struct{}, defaultNumOfSlots),
sharedStorage: NewSharedStorage(),
logs: make(map[common.Hash][]*types.Log, defaultNumOfSlots),
preimages: make(map[common.Hash][]byte),
journal: newJournal(),
Expand Down Expand Up @@ -820,6 +824,12 @@ func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common

// Copy creates a deep, independent copy of the state.
// Snapshots of the copied state cannot be applied to the copy.
func (s *StateDB) CopyWithSharedStorage() *StateDB {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
state := s.Copy()
state.sharedStorage = s.sharedStorage
return state
}

func (s *StateDB) Copy() *StateDB {
// Copy all the basic fields, initialize the memory ones
state := &StateDB{
Expand All @@ -828,6 +838,7 @@ func (s *StateDB) Copy() *StateDB {
stateObjects: make(map[common.Address]*StateObject, len(s.journal.dirties)),
stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)),
stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)),
sharedStorage: NewSharedStorage(),
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
refund: s.refund,
logs: make(map[common.Hash][]*types.Log, len(s.logs)),
logSize: s.logSize,
Expand Down Expand Up @@ -1633,3 +1644,7 @@ func (s *StateDB) GetDirtyAccounts() []common.Address {
}
return accounts
}

func (s *StateDB) GetOrInsertStorage(address common.Address) *sync.Map {
return s.sharedStorage.getOrInertStorage(address)
}
2 changes: 1 addition & 1 deletion core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
// No need to execute the first batch, since the main processor will do it.
for i := 0; i < prefetchThread; i++ {
go func(idx int) {
newStatedb := statedb.Copy()
newStatedb := statedb.CopyWithSharedStorage()
gaspool := new(GasPool).AddGas(block.GasLimit())
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
Expand Down