diff --git a/.github/release.env b/.github/release.env index 2034c1d3fa..62e94fa4bd 100644 --- a/.github/release.env +++ b/.github/release.env @@ -1,2 +1,2 @@ -MAINNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.7/mainnet.zip" -TESTNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.7/testnet.zip" +MAINNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.8/mainnet.zip" +TESTNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.8/testnet.zip" diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dfa55dfb8..9100249fa0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,24 @@ # Changelog +## v1.1.9 + +IMPROVEMENT +* [\#792](https://github.com/binance-chain/bsc/pull/792) add shared storage for prefetching state data +* [\#795](https://github.com/binance-chain/bsc/pull/795) implement state verification pipeline in pipecommit +* [\#803](https://github.com/binance-chain/bsc/pull/803) prefetch state data during the mining process +* [\#812](https://github.com/bnb-chain/bsc/pull/812) skip verification on account storage root to tolerate with fastnode when doing diffsync +* [\#818](https://github.com/bnb-chain/bsc/pull/818) add shared storage to the prefetcher of miner +* [\#820](https://github.com/bnb-chain/bsc/pull/820) disable diffsync when pipecommit is enabled +* [\#830](https://github.com/bnb-chain/bsc/pull/830) change the number of prefetch threads + +BUGFIX +* [\#797](https://github.com/bnb-chain/bsc/pull/797) fix race condition on preimage in pipecommit +* [\#808](https://github.com/bnb-chain/bsc/pull/808) fix code of difflayer not assign when new smart contract created +* [\#817](https://github.com/bnb-chain/bsc/pull/817) fix bugs of prune block tool +* [\#834](https://github.com/bnb-chain/bsc/pull/834) fix deadlock when failed to verify state root in pipecommit +* [\#835](https://github.com/bnb-chain/bsc/pull/835) fix deadlock on miner module when failed to commit trie +* [\#842](https://github.com/bnb-chain/bsc/pull/842) fix invalid nil check of statedb in diffsync + ## v1.1.8 FEATURES * [\#668](https://github.com/binance-chain/bsc/pull/668) implement State Verification && Snapshot Commit pipeline diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index d829abbc2a..5405f42e43 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -308,7 +308,8 @@ func pruneBlock(ctx *cli.Context) error { var newAncientPath string oldAncientPath := ctx.GlobalString(utils.AncientFlag.Name) if !filepath.IsAbs(oldAncientPath) { - oldAncientPath = stack.ResolvePath(oldAncientPath) + // force absolute paths, which often fail due to the splicing of relative paths + return errors.New("datadir.ancient not abs path") } path, _ := filepath.Split(oldAncientPath) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 8d24d23d84..e1669fcb12 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -137,7 +137,7 @@ var ( } PipeCommitFlag = cli.BoolFlag{ Name: "pipecommit", - Usage: "Enable MPT pipeline commit, it will improve syncing performance. It is an experimental feature(default is false)", + Usage: "Enable MPT pipeline commit, it will improve syncing performance. It is an experimental feature(default is false), diffsync will be disable if pipeline commit is enabled", } RangeLimitFlag = cli.BoolFlag{ Name: "rangelimit", diff --git a/core/block_validator.go b/core/block_validator.go index 56f609f41a..1ce8cc70a7 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -115,7 +115,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { // transition, such as amount of used gas, the receipt roots and the state root // itself. ValidateState returns a database batch if the validation was a success // otherwise nil and an error is returned. -func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error { +func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error { header := block.Header() if block.GasUsed() != usedGas { return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas) @@ -138,13 +138,15 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD return nil }, } - if skipHeavyVerify { + if statedb.IsPipeCommit() { validateFuns = append(validateFuns, func() error { if err := statedb.WaitPipeVerification(); err != nil { return err } + statedb.CorrectAccountsRoot() statedb.Finalise(v.config.IsEIP158(header.Number)) - statedb.AccountsIntermediateRoot() + // State verification pipeline - accounts root are not calculated here, just populate needed fields for process + statedb.PopulateSnapAccountAndStorage() return nil }) } else { diff --git a/core/blockchain.go b/core/blockchain.go index 3097d9d537..221b9a707f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -873,6 +873,11 @@ func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) { return state.New(root, bc.stateCache, bc.snaps) } +// StateAtWithSharedPool returns a new mutable state based on a particular point in time with sharedStorage +func (bc *BlockChain) StateAtWithSharedPool(root common.Hash) (*state.StateDB, error) { + return state.NewWithSharedPool(root, bc.stateCache, bc.snaps) +} + // StateCache returns the caching database underpinning the blockchain instance. func (bc *BlockChain) StateCache() state.Database { return bc.stateCache @@ -2151,7 +2156,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } - statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) + statedb, err := state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps) if err != nil { return it.index, err } @@ -2196,7 +2201,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // Validate the state using the default validator substart = time.Now() if !statedb.IsLightProcessed() { - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, bc.pipeCommit); err != nil { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { log.Error("validate state failed", "error", err) bc.reportBlock(block, receipts, err) return it.index, err diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 8a7581016a..c9d959d1cb 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -209,7 +209,7 @@ func testBlockChainImport(chain types.Blocks, pipelineCommit bool, blockchain *B blockchain.reportBlock(block, receipts, err) return err } - err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, pipelineCommit) + err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas) if err != nil { blockchain.reportBlock(block, receipts, err) return err diff --git a/core/headerchain.go b/core/headerchain.go index fe4770a469..3e50c1eb07 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -250,6 +250,9 @@ func (hc *HeaderChain) writeHeaders(headers []*types.Header) (result *headerWrit headHeader = hc.GetHeader(headHash, headNumber) ) for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash { + if frozen, _ := hc.chainDb.Ancients(); frozen == headNumber { + break + } rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber) headHash = headHeader.ParentHash headNumber = headHeader.Number.Uint64() - 1 diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 22d5188e91..883e17b782 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -95,7 +95,10 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool number uint64 rlp rlp.RawValue } - if to == from { + if offset := db.AncientOffSet(); offset > from { + from = offset + } + if to <= from { return nil } threads := to - from diff --git a/core/state/shared_pool.go b/core/state/shared_pool.go new file mode 100644 index 0000000000..ba96c2c27d --- /dev/null +++ b/core/state/shared_pool.go @@ -0,0 +1,39 @@ +package state + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" +) + +// sharedPool is used to store maps of originStorage of stateObjects +type StoragePool struct { + sync.RWMutex + sharedMap map[common.Address]*sync.Map +} + +func NewStoragePool() *StoragePool { + sharedMap := make(map[common.Address]*sync.Map) + return &StoragePool{ + sync.RWMutex{}, + sharedMap, + } +} + +// getStorage Check whether the storage exist in pool, +// new one if not exist, the content of storage will be fetched in stateObjects.GetCommittedState() +func (s *StoragePool) getStorage(address common.Address) *sync.Map { + s.RLock() + storageMap, ok := s.sharedMap[address] + s.RUnlock() + if !ok { + s.Lock() + defer s.Unlock() + if storageMap, ok = s.sharedMap[address]; !ok { + m := new(sync.Map) + s.sharedMap[address] = m + return m + } + } + return storageMap +} diff --git a/core/state/snapshot/difflayer.go b/core/state/snapshot/difflayer.go index 65b2729d9c..d2b1b2778b 100644 --- a/core/state/snapshot/difflayer.go +++ b/core/state/snapshot/difflayer.go @@ -118,8 +118,9 @@ type diffLayer struct { storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted) - verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed - valid bool // mark the difflayer is valid or not. + verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed + valid bool // mark the difflayer is valid or not. + accountCorrected bool // mark the accountData has been corrected ort not diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer @@ -182,6 +183,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s storageList: make(map[common.Hash][]common.Hash), verifiedCh: verified, } + switch parent := parent.(type) { case *diskLayer: dl.rebloom(parent) @@ -190,6 +192,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s default: panic("unknown parent type") } + // Sanity check that accounts or storage slots are never nil for accountHash, blob := range accounts { if blob == nil { @@ -286,6 +289,21 @@ func (dl *diffLayer) Verified() bool { } } +func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) { + dl.lock.Lock() + defer dl.lock.Unlock() + + dl.accountData = accounts + dl.accountCorrected = true +} + +func (dl *diffLayer) AccountsCorrected() bool { + dl.lock.RLock() + defer dl.lock.RUnlock() + + return dl.accountCorrected +} + // Parent returns the subsequent layer of a diff layer. func (dl *diffLayer) Parent() snapshot { return dl.parent @@ -314,6 +332,24 @@ func (dl *diffLayer) Account(hash common.Hash) (*Account, error) { return account, nil } +// Accounts directly retrieves all accounts in current snapshot in +// the snapshot slim data format. +func (dl *diffLayer) Accounts() (map[common.Hash]*Account, error) { + dl.lock.RLock() + defer dl.lock.RUnlock() + + accounts := make(map[common.Hash]*Account, len(dl.accountData)) + for hash, data := range dl.accountData { + account := new(Account) + if err := rlp.DecodeBytes(data, account); err != nil { + return nil, err + } + accounts[hash] = account + } + + return accounts, nil +} + // AccountRLP directly retrieves the account RLP associated with a particular // hash in the snapshot slim data format. // diff --git a/core/state/snapshot/disklayer.go b/core/state/snapshot/disklayer.go index c1de41782c..6d46496a71 100644 --- a/core/state/snapshot/disklayer.go +++ b/core/state/snapshot/disklayer.go @@ -59,6 +59,13 @@ func (dl *diskLayer) Verified() bool { return true } +func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) { +} + +func (dl *diskLayer) AccountsCorrected() bool { + return true +} + // Parent always returns nil as there's no layer below the disk. func (dl *diskLayer) Parent() snapshot { return nil @@ -73,6 +80,12 @@ func (dl *diskLayer) Stale() bool { return dl.stale } +// Accounts directly retrieves all accounts in current snapshot in +// the snapshot slim data format. +func (dl *diskLayer) Accounts() (map[common.Hash]*Account, error) { + return nil, nil +} + // Account directly retrieves the account associated with a particular hash in // the snapshot slim data format. func (dl *diskLayer) Account(hash common.Hash) (*Account, error) { diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 3c18294cc1..aabbfd7e1b 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -293,6 +293,7 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { if dl.Stale() { return common.Hash{}, ErrSnapshotStale } + // Everything below was journalled, persist this layer too if err := rlp.Encode(buffer, dl.root); err != nil { return common.Hash{}, err diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 38f52acced..d28d2e295e 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -107,13 +107,23 @@ type Snapshot interface { // Verified returns whether the snapshot is verified Verified() bool - // Store the verification result + // MarkValid stores the verification result MarkValid() + // CorrectAccounts updates account data for storing the correct data during pipecommit + CorrectAccounts(map[common.Hash][]byte) + + // AccountsCorrected checks whether the account data has been corrected during pipecommit + AccountsCorrected() bool + // Account directly retrieves the account associated with a particular hash in // the snapshot slim data format. Account(hash common.Hash) (*Account, error) + // Accounts directly retrieves all accounts in current snapshot in + // the snapshot slim data format. + Accounts() (map[common.Hash]*Account, error) + // AccountRLP directly retrieves the account RLP associated with a particular // hash in the snapshot slim data format. AccountRLP(hash common.Hash) ([]byte, error) @@ -240,6 +250,11 @@ func (t *Tree) waitBuild() { } } +// Layers returns the number of layers +func (t *Tree) Layers() int { + return len(t.layers) +} + // Disable interrupts any pending snapshot generator, deletes all the snapshot // layers in memory and marks snapshots disabled globally. In order to resume // the snapshot functionality, the caller must invoke Rebuild. @@ -666,6 +681,11 @@ func (t *Tree) Journal(root common.Hash) (common.Hash, error) { if snap == nil { return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root) } + // Wait the snapshot(difflayer) is verified, it means the account data also been refreshed with the correct data + if !snap.WaitAndGetVerifyRes() { + return common.Hash{}, ErrSnapshotStale + } + // Run the journaling t.lock.Lock() defer t.lock.Unlock() diff --git a/core/state/state_object.go b/core/state/state_object.go index c86585a658..538d5e5da8 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "math/big" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -63,10 +64,11 @@ func (s Storage) Copy() Storage { // Account values can be accessed and modified through the object. // Finally, call CommitTrie to write the modified storage trie into a database. type StateObject struct { - address common.Address - addrHash common.Hash // hash of ethereum address of the account - data Account - db *StateDB + address common.Address + addrHash common.Hash // hash of ethereum address of the account + data Account + db *StateDB + rootCorrected bool // To indicate whether the root has been corrected in pipecommit mode // DB error. // State objects are used by the consensus core and VM which are @@ -79,7 +81,9 @@ type StateObject struct { trie Trie // storage trie, which becomes non-nil on first access code Code // contract bytecode, which gets set when code is loaded - originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction + sharedOriginStorage *sync.Map // Point to the entry of the stateObject in sharedPool + originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction + pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block dirtyStorage Storage // Storage entries that have been modified in the current transaction execution fakeStorage Storage // Fake storage which constructed by caller for debugging purpose. @@ -120,14 +124,21 @@ func newObject(db *StateDB, address common.Address, data Account) *StateObject { if data.Root == (common.Hash{}) { data.Root = emptyRoot } + var storageMap *sync.Map + // Check whether the storage exist in pool, new originStorage if not exist + if db != nil && db.storagePool != nil { + storageMap = db.GetStorage(address) + } + return &StateObject{ - db: db, - address: address, - addrHash: crypto.Keccak256Hash(address[:]), - data: data, - originStorage: make(Storage), - pendingStorage: make(Storage), - dirtyStorage: make(Storage), + db: db, + address: address, + addrHash: crypto.Keccak256Hash(address[:]), + data: data, + sharedOriginStorage: storageMap, + originStorage: make(Storage), + pendingStorage: make(Storage), + dirtyStorage: make(Storage), } } @@ -194,6 +205,29 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash { return s.GetCommittedState(db, key) } +func (s *StateObject) getOriginStorage(key common.Hash) (common.Hash, bool) { + if value, cached := s.originStorage[key]; cached { + return value, true + } + // if L1 cache miss, try to get it from shared pool + if s.sharedOriginStorage != nil { + val, ok := s.sharedOriginStorage.Load(key) + if !ok { + return common.Hash{}, false + } + s.originStorage[key] = val.(common.Hash) + return val.(common.Hash), true + } + return common.Hash{}, false +} + +func (s *StateObject) setOriginStorage(key common.Hash, value common.Hash) { + if s.db.writeOnSharedStorage && s.sharedOriginStorage != nil { + s.sharedOriginStorage.Store(key, value) + } + s.originStorage[key] = value +} + // GetCommittedState retrieves a value from the committed account storage trie. func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Hash { // If the fake storage is set, only lookup the state here(in the debugging mode) @@ -204,7 +238,8 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has if value, pending := s.pendingStorage[key]; pending { return value } - if value, cached := s.originStorage[key]; cached { + + if value, cached := s.getOriginStorage(key); cached { return value } // If no live objects are available, attempt to use snapshots @@ -263,7 +298,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has } value.SetBytes(content) } - s.originStorage[key] = value + s.setOriginStorage(key, value) return value } @@ -320,7 +355,18 @@ func (s *StateObject) finalise(prefetch bool) { slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure } } - if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot { + + // The account root need to be updated before prefetch, otherwise the account root is empty + if s.db.pipeCommit && s.data.Root == dummyRoot && !s.rootCorrected && s.db.snap.AccountsCorrected() { + if acc, err := s.db.snap.Account(crypto.HashData(s.db.hasher, s.address.Bytes())); err == nil { + if acc != nil && len(acc.Root) != 0 { + s.data.Root = common.BytesToHash(acc.Root) + s.rootCorrected = true + } + } + } + + if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot && s.data.Root != dummyRoot { s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash) } if len(s.dirtyStorage) > 0 { @@ -356,7 +402,6 @@ func (s *StateObject) updateTrie(db Database) Trie { continue } s.originStorage[key] = value - var v []byte if (value == common.Hash{}) { s.setError(tr.TryDelete(key[:])) diff --git a/core/state/statedb.go b/core/state/statedb.go index 384373c98e..ee7ae7020a 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -39,10 +39,7 @@ import ( "github.com/ethereum/go-ethereum/trie" ) -const ( - preLoadLimit = 128 - defaultNumOfSlots = 100 -) +const defaultNumOfSlots = 100 type revision struct { id int @@ -53,6 +50,10 @@ var ( // emptyRoot is the known root hash of an empty trie. emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") + // dummyRoot is the dummy account root before corrected in pipecommit sync mode, + // the value is 542e5fc2709de84248e9bce43a9c0c8943a608029001360f8ab55bf113b23d28 + dummyRoot = crypto.Keccak256Hash([]byte("dummy_account_root")) + emptyAddr = crypto.Keccak256Hash(common.Address{}.Bytes()) ) @@ -103,6 +104,8 @@ type StateDB struct { stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution + storagePool *StoragePool // sharedPool to store L1 originStorage of stateObjects + writeOnSharedStorage bool // Write to the shared origin storage of a stateObject while reading from the underlying storage layer. // DB error. // State objects are used by the consensus core and VM which are // unable to deal with database-level errors. Any error that occurs @@ -149,6 +152,16 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return newStateDB(root, db, snaps) } +// NewWithSharedPool creates a new state with sharedStorge on layer 1.5 +func NewWithSharedPool(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) { + statedb, err := newStateDB(root, db, snaps) + if err != nil { + return nil, err + } + statedb.storagePool = NewStoragePool() + return statedb, nil +} + func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) { sdb := &StateDB{ db: db, @@ -182,6 +195,10 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, return sdb, nil } +func (s *StateDB) EnableWriteOnSharedStorage() { + s.writeOnSharedStorage = true +} + // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. @@ -226,11 +243,16 @@ func (s *StateDB) MarkLightProcessed() { // Enable the pipeline commit function of statedb func (s *StateDB) EnablePipeCommit() { - if s.snap != nil { + if s.snap != nil && s.snaps.Layers() > 1 { s.pipeCommit = true } } +// IsPipeCommit checks whether pipecommit is enabled on the statedb or not +func (s *StateDB) IsPipeCommit() bool { + return s.pipeCommit +} + // Mark that the block is full processed func (s *StateDB) MarkFullProcessed() { s.fullProcessed = true @@ -615,78 +637,6 @@ func (s *StateDB) getStateObject(addr common.Address) *StateObject { return nil } -func (s *StateDB) TryPreload(block *types.Block, signer types.Signer) { - accounts := make(map[common.Address]bool, block.Transactions().Len()) - accountsSlice := make([]common.Address, 0, block.Transactions().Len()) - for _, tx := range block.Transactions() { - from, err := types.Sender(signer, tx) - if err != nil { - break - } - accounts[from] = true - if tx.To() != nil { - accounts[*tx.To()] = true - } - } - for account := range accounts { - accountsSlice = append(accountsSlice, account) - } - if len(accountsSlice) >= preLoadLimit && len(accountsSlice) > runtime.NumCPU() { - objsChan := make(chan []*StateObject, runtime.NumCPU()) - for i := 0; i < runtime.NumCPU(); i++ { - start := i * len(accountsSlice) / runtime.NumCPU() - end := (i + 1) * len(accountsSlice) / runtime.NumCPU() - if i+1 == runtime.NumCPU() { - end = len(accountsSlice) - } - go func(start, end int) { - objs := s.preloadStateObject(accountsSlice[start:end]) - objsChan <- objs - }(start, end) - } - for i := 0; i < runtime.NumCPU(); i++ { - objs := <-objsChan - for _, obj := range objs { - s.SetStateObject(obj) - } - } - } -} - -func (s *StateDB) preloadStateObject(address []common.Address) []*StateObject { - // Prefer live objects if any is available - if s.snap == nil { - return nil - } - hasher := crypto.NewKeccakState() - objs := make([]*StateObject, 0, len(address)) - for _, addr := range address { - // If no live objects are available, attempt to use snapshots - if acc, err := s.snap.Account(crypto.HashData(hasher, addr.Bytes())); err == nil { - if acc == nil { - continue - } - data := &Account{ - Nonce: acc.Nonce, - Balance: acc.Balance, - CodeHash: acc.CodeHash, - Root: common.BytesToHash(acc.Root), - } - if len(data.CodeHash) == 0 { - data.CodeHash = emptyCodeHash - } - if data.Root == (common.Hash{}) { - data.Root = emptyRoot - } - // Insert into the live set - obj := newObject(s, addr, *data) - objs = append(objs, obj) - } - // Do not enable this feature when snapshot is not enabled. - } - return objs -} - // getDeletedStateObject is similar to getStateObject, but instead of returning // nil for a deleted state object, it returns the actual object with the deleted // flag set. This is needed by the state journal to revert to the correct s- @@ -852,6 +802,7 @@ func (s *StateDB) Copy() *StateDB { stateObjects: make(map[common.Address]*StateObject, len(s.journal.dirties)), stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)), stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)), + storagePool: s.storagePool, refund: s.refund, logs: make(map[common.Hash][]*types.Log, len(s.logs)), logSize: s.logSize, @@ -1048,6 +999,65 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { return s.StateIntermediateRoot() } +//CorrectAccountsRoot will fix account roots in pipecommit mode +func (s *StateDB) CorrectAccountsRoot() { + if accounts, err := s.snap.Accounts(); err == nil && accounts != nil { + for _, obj := range s.stateObjects { + if !obj.deleted && !obj.rootCorrected && obj.data.Root == dummyRoot { + if account, exist := accounts[crypto.Keccak256Hash(obj.address[:])]; exist && len(account.Root) != 0 { + obj.data.Root = common.BytesToHash(account.Root) + } + } + } + } +} + +//PopulateSnapAccountAndStorage tries to populate required accounts and storages for pipecommit +func (s *StateDB) PopulateSnapAccountAndStorage() { + for addr := range s.stateObjectsPending { + if obj := s.stateObjects[addr]; !obj.deleted { + if s.snap != nil && !obj.deleted { + root := obj.data.Root + storageChanged := s.populateSnapStorage(obj) + if storageChanged { + root = dummyRoot + } + s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, root, obj.data.CodeHash) + } + } + } +} + +//populateSnapStorage tries to populate required storages for pipecommit, and returns a flag to indicate whether the storage root changed or not +func (s *StateDB) populateSnapStorage(obj *StateObject) bool { + for key, value := range obj.dirtyStorage { + obj.pendingStorage[key] = value + } + if len(obj.pendingStorage) == 0 { + return false + } + var storage map[string][]byte + for key, value := range obj.pendingStorage { + var v []byte + if (value != common.Hash{}) { + // Encoding []byte cannot fail, ok to ignore the error. + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + } + // If state snapshotting is active, cache the data til commit + if obj.db.snap != nil { + if storage == nil { + // Retrieve the old storage map, if available, create a new one otherwise + if storage = obj.db.snapStorage[obj.address]; storage == nil { + storage = make(map[string][]byte) + obj.db.snapStorage[obj.address] = storage + } + } + storage[string(key[:])] = v // v will be nil if value is 0x00 + } + } + return true +} + func (s *StateDB) AccountsIntermediateRoot() { tasks := make(chan func()) finishCh := make(chan struct{}) @@ -1133,6 +1143,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash { } s.trie = tr } + usedAddrs := make([][]byte, 0, len(s.stateObjectsPending)) if !s.noTrie { for addr := range s.stateObjectsPending { @@ -1147,6 +1158,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash { prefetcher.used(s.originalRoot, usedAddrs) } } + if len(s.stateObjectsPending) > 0 { s.stateObjectsPending = make(map[common.Address]struct{}) } @@ -1327,6 +1339,7 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er var diffLayer *types.DiffLayer var verified chan struct{} var snapUpdated chan struct{} + if s.snap != nil { diffLayer = &types.DiffLayer{} } @@ -1338,9 +1351,24 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er commmitTrie := func() error { commitErr := func() error { + if s.pipeCommit { + <-snapUpdated + // Due to state verification pipeline, the accounts roots are not updated, leading to the data in the difflayer is not correct, capture the correct data here + s.AccountsIntermediateRoot() + if parent := s.snap.Root(); parent != s.expectedRoot { + accountData := make(map[common.Hash][]byte) + for k, v := range s.snapAccounts { + accountData[crypto.Keccak256Hash(k[:])] = v + } + s.snaps.Snapshot(s.expectedRoot).CorrectAccounts(accountData) + } + } + if s.stateRoot = s.StateIntermediateRoot(); s.fullProcessed && s.expectedRoot != s.stateRoot { + log.Error("Invalid merkle root", "remote", s.expectedRoot, "local", s.stateRoot) return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot) } + tasks := make(chan func()) taskResults := make(chan error, len(s.stateObjectsDirty)) tasksNum := 0 @@ -1363,19 +1391,6 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er }() } - if s.snap != nil { - for addr := range s.stateObjectsDirty { - if obj := s.stateObjects[addr]; !obj.deleted { - if obj.code != nil && obj.dirtyCode { - diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{ - Hash: common.BytesToHash(obj.CodeHash()), - Code: obj.code, - }) - } - } - } - } - for addr := range s.stateObjectsDirty { if obj := s.stateObjects[addr]; !obj.deleted { // Write any contract code associated with the state object @@ -1384,6 +1399,7 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if !s.noTrie { if err := obj.CommitTrie(s.db); err != nil { taskResults <- err + return } } taskResults <- nil @@ -1434,17 +1450,16 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if s.pipeCommit { if commitErr == nil { - <-snapUpdated s.snaps.Snapshot(s.stateRoot).MarkValid() + close(verified) } else { // The blockchain will do the further rewind if write block not finish yet + close(verified) if failPostCommitFunc != nil { - <-snapUpdated failPostCommitFunc() } log.Error("state verification failed", "err", commitErr) } - close(verified) } return commitErr } @@ -1457,6 +1472,12 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if obj.code != nil && obj.dirtyCode { rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) obj.dirtyCode = false + if s.snap != nil { + diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{ + Hash: common.BytesToHash(obj.CodeHash()), + Code: obj.code, + }) + } if codeWriter.ValueSize() > ethdb.IdealBatchSize { if err := codeWriter.Write(); err != nil { return err @@ -1483,11 +1504,15 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if s.pipeCommit { defer close(snapUpdated) } + diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = s.SnapToDiffLayer() // Only update if there's a state transition (skip empty Clique blocks) if parent := s.snap.Root(); parent != s.expectedRoot { - if err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified); err != nil { + err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified) + + if err != nil { log.Warn("Failed to update snapshot tree", "from", parent, "to", s.expectedRoot, "err", err) } + // Keep n diff layers in the memory // - head layer is paired with HEAD state // - head-1 layer is paired with HEAD-1 state @@ -1501,12 +1526,6 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er } return nil }, - func() error { - if s.snap != nil { - diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = s.SnapToDiffLayer() - } - return nil - }, } if s.pipeCommit { go commmitTrie() @@ -1747,3 +1766,7 @@ func (s *StateDB) GetDirtyAccounts() []common.Address { } return accounts } + +func (s *StateDB) GetStorage(address common.Address) *sync.Map { + return s.storagePool.getStorage(address) +} diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index d559a03a0f..bf0a6b80c6 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -26,7 +26,8 @@ import ( "github.com/ethereum/go-ethereum/params" ) -const prefetchThread = 2 +const prefetchThread = 3 +const checkInterval = 10 // statePrefetcher is a basic Prefetcher, which blindly executes a block on top // of an arbitrary state with the goal of prefetching potentially useful state @@ -67,6 +68,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c for i := 0; i < prefetchThread; i++ { go func(idx int) { newStatedb := statedb.Copy() + newStatedb.EnableWriteOnSharedStorage() gaspool := new(GasPool).AddGas(block.GasLimit()) blockContext := NewEVMBlockContext(header, p.bc, nil) evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) @@ -88,6 +90,64 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c } } +// PrefetchMining processes the state changes according to the Ethereum rules by running +// the transaction messages using the statedb, but any changes are discarded. The +// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage +func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) { + var signer = types.MakeSigner(p.config, header.Number) + + txCh := make(chan *types.Transaction, 2*prefetchThread) + for i := 0; i < prefetchThread; i++ { + go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) { + idx := 0 + newStatedb := statedb.Copy() + newStatedb.EnableWriteOnSharedStorage() + gaspool := new(GasPool).AddGas(gasLimit) + blockContext := NewEVMBlockContext(header, p.bc, nil) + evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) + // Iterate over and process the individual transactions + for { + select { + case tx := <-startCh: + // Convert the transaction into an executable message and pre-cache its sender + msg, err := tx.AsMessageNoNonceCheck(signer) + if err != nil { + return // Also invalid block, bail out + } + idx++ + newStatedb.Prepare(tx.Hash(), header.Hash(), idx) + precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) + gaspool = new(GasPool).AddGas(gasLimit) + case <-stopCh: + return + } + } + }(txCh, interruptCh) + } + go func(txset *types.TransactionsByPriceAndNonce) { + count := 0 + for { + tx := txset.Peek() + if tx == nil { + return + } + select { + case <-interruptCh: + return + default: + } + if count++; count%checkInterval == 0 { + if *txCurr == nil { + return + } + txset.Forward(*txCurr) + } + txCh <- tx + txset.Shift() + } + }(txs) +} + // precacheTransaction attempts to apply a transaction to the given state database // and uses the input parameters for its environment. The goal is not to execute // the transaction successfully, rather to warm up touched data slots. diff --git a/core/state_processor.go b/core/state_processor.go index 16f6eab63b..12bedd65a2 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -123,13 +123,13 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB statedb.StopPrefetcher() parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1) statedb, err = state.New(parent.Root, p.bc.stateCache, p.bc.snaps) + if err != nil { + return statedb, nil, nil, 0, err + } statedb.SetExpectedStateRoot(block.Root()) if p.bc.pipeCommit { statedb.EnablePipeCommit() } - if err != nil { - return statedb, nil, nil, 0, err - } // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") } @@ -337,7 +337,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty } // Do validate in advance so that we can fall back to full process - if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed, false); err != nil { + if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil { log.Error("validate state failed during diff sync", "error", err) return nil, nil, 0, err } @@ -385,7 +385,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg gp = new(GasPool).AddGas(block.GasLimit()) ) signer := types.MakeSigner(p.bc.chainConfig, block.Number()) - statedb.TryPreload(block, signer) var receipts = make([]*types.Receipt, 0) // Mutate the block and state according to any hard-fork specs if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 { diff --git a/core/types.go b/core/types.go index 0a0633103e..db8d7a854f 100644 --- a/core/types.go +++ b/core/types.go @@ -31,7 +31,7 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. - ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error + ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error // RemoteVerifyManager return remoteVerifyManager of validator. RemoteVerifyManager() *remoteVerifyManager } @@ -42,6 +42,8 @@ type Prefetcher interface { // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) + // PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage. + PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) } // Processor is an interface for processing blocks using a given initial state. diff --git a/core/types/transaction.go b/core/types/transaction.go index 74c011544b..e95cec25a6 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -458,6 +458,21 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa } } +// Copy copys a new TransactionsPriceAndNonce with the same *transaction +func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce { + heads := make([]*Transaction, len(t.heads)) + copy(heads, t.heads) + txs := make(map[common.Address]Transactions, len(t.txs)) + for acc, txsTmp := range t.txs { + txs[acc] = txsTmp + } + return &TransactionsByPriceAndNonce{ + heads: heads, + txs: txs, + signer: t.signer, + } +} + // Peek returns the next transaction by price. func (t *TransactionsByPriceAndNonce) Peek() *Transaction { if len(t.heads) == 0 { @@ -488,6 +503,44 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int { return len(t.heads) } +//Forward moves current transaction to be the one which is one index after tx +func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) { + if tx == nil { + t.heads = t.heads[0:0] + return + } + //check whether target tx exists in t.heads + for _, head := range t.heads { + if tx == head { + //shift t to the position one after tx + txTmp := t.Peek() + for txTmp != tx { + t.Shift() + txTmp = t.Peek() + } + t.Shift() + return + } + } + //get the sender address of tx + acc, _ := Sender(t.signer, tx) + //check whether target tx exists in t.txs + if txs, ok := t.txs[acc]; ok { + for _, txTmp := range txs { + //found the same pointer in t.txs as tx and then shift t to the position one after tx + if txTmp == tx { + txTmp = t.Peek() + for txTmp != tx { + t.Shift() + txTmp = t.Peek() + } + t.Shift() + return + } + } + } +} + // Message is a fully derived transaction and implements core.Message // // NOTE: In a future PR this will be removed. @@ -535,6 +588,15 @@ func (tx *Transaction) AsMessage(s Signer) (Message, error) { return msg, err } +// AsMessageNoNonceCheck returns the transaction with checkNonce field set to be false. +func (tx *Transaction) AsMessageNoNonceCheck(s Signer) (Message, error) { + msg, err := tx.AsMessage(s) + if err == nil { + msg.checkNonce = false + } + return msg, err +} + func (m Message) From() common.Address { return m.from } func (m Message) To() *common.Address { return m.to } func (m Message) GasPrice() *big.Int { return m.gasPrice } diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index 3cece9c235..c81b7b8647 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -358,6 +358,75 @@ func TestTransactionTimeSort(t *testing.T) { } } +func TestTransactionForward(t *testing.T) { + // Generate a batch of accounts to start with + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + } + signer := HomesteadSigner{} + + // Generate a batch of transactions with overlapping prices, but different creation times + groups := map[common.Address]Transactions{} + for start, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + + tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + + tx.time = time.Unix(0, int64(len(keys)-start)) + tx2.time = time.Unix(1, int64(len(keys)-start)) + + groups[addr] = append(groups[addr], tx) + groups[addr] = append(groups[addr], tx2) + + } + // Sort the transactions + txset := NewTransactionsByPriceAndNonce(signer, groups) + txsetCpy := txset.Copy() + + txs := Transactions{} + for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() { + txs = append(txs, tx) + txsetCpy.Shift() + } + + tmp := txset.Copy() + for j := 0; j < 10; j++ { + txset = tmp.Copy() + txsetCpy = tmp.Copy() + i := 0 + for ; i < j; i++ { + txset.Shift() + } + tx := txset.Peek() + txsetCpy.Forward(tx) + txCpy := txsetCpy.Peek() + if txCpy == nil { + if tx == nil { + continue + } + txset.Shift() + if txset.Peek() != nil { + t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx) + } else { + continue + } + } + txset.Shift() + for ; i < len(txs)-1; i++ { + tx = txset.Peek() + txCpy = txsetCpy.Peek() + if txCpy != tx { + t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx) + } + txsetCpy.Shift() + txset.Shift() + } + + } +} + // TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON. func TestTransactionCoding(t *testing.T) { key, err := crypto.GenerateKey() diff --git a/eth/backend.go b/eth/backend.go index a062dd834d..b988fcb086 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -206,7 +206,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } ) bcOps := make([]core.BlockChainOption, 0) - if config.DiffSync && config.TriesVerifyMode == core.LocalVerify { + if config.DiffSync && !config.PipeCommit && config.TriesVerifyMode == core.LocalVerify { bcOps = append(bcOps, core.EnableLightProcessor) } if config.PipeCommit { diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 24a0e776f6..156e3f1daf 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -74,6 +74,9 @@ func (eth *Ethereum) stateAtBlock(block *types.Block, reexec uint64, base *state // The optional base statedb is given, mark the start point as parent block statedb, database, report = base, base.Database(), false current = eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) + if current == nil { + return nil, fmt.Errorf("missing parent block %v %d", block.ParentHash(), block.NumberU64()-1) + } } else { // Otherwise try to reexec blocks until we find a state or reach our limit current = block diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 3f3122a37e..db16a05338 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1778,10 +1778,16 @@ func (s *PublicTransactionPoolAPI) GetTransactionReceiptsByBlockNumber(ctx conte if err != nil { return nil, err } + if receipts == nil { + return nil, fmt.Errorf("block %d receipts not found", blockNumber) + } block, err := s.b.BlockByHash(ctx, blockHash) if err != nil { return nil, err } + if block == nil { + return nil, fmt.Errorf("block %d not found", blockNumber) + } txs := block.Transactions() if len(txs) != len(receipts) { return nil, fmt.Errorf("txs length doesn't equal to receipts' length") diff --git a/miner/worker.go b/miner/worker.go index 28ef170e40..6715213e94 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -130,6 +130,7 @@ type intervalAdjust struct { // worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type worker struct { + prefetcher core.Prefetcher config *Config chainConfig *params.ChainConfig engine consensus.Engine @@ -196,6 +197,7 @@ type worker struct { func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker { worker := &worker{ + prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine), config: config, chainConfig: chainConfig, engine: engine, @@ -659,7 +661,7 @@ func (w *worker) resultLoop() { func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { // Retrieve the parent state to execute on top and start a prefetcher for // the miner to speed block sealing up a bit - state, err := w.chain.StateAt(parent.Root()) + state, err := w.chain.StateAtWithSharedPool(parent.Root()) if err != nil { return err } @@ -778,6 +780,14 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin } bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity) + interruptCh := make(chan struct{}) + defer close(interruptCh) + //prefetch txs from all pending txs + txsPrefetch := txs.Copy() + tx := txsPrefetch.Peek() + txCurr := &tx + w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr) + LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. @@ -814,7 +824,7 @@ LOOP: } } // Retrieve the next transaction and abort if all done - tx := txs.Peek() + tx = txs.Peek() if tx == nil { break } diff --git a/params/version.go b/params/version.go index 8faa4bb644..634f527876 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 1 // Minor version component of the current release - VersionPatch = 8 // Patch version component of the current release + VersionPatch = 9 // Patch version component of the current release VersionMeta = "" // Version metadata to append to the version string ) diff --git a/trie/database.go b/trie/database.go index fa8b746c58..85797cc2e4 100644 --- a/trie/database.go +++ b/trie/database.go @@ -723,17 +723,18 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H batch := db.diskdb.NewBatch() // Move all of the accumulated preimages into a write batch + db.lock.RLock() if db.preimages != nil { rawdb.WritePreimages(batch, db.preimages) // Since we're going to replay trie node writes into the clean cache, flush out // any batched pre-images before continuing. if err := batch.Write(); err != nil { + db.lock.RUnlock() return err } batch.Reset() } // Move the trie itself into the batch, flushing if enough data is accumulated - db.lock.RLock() nodes, storage := len(db.dirties), db.dirtiesSize db.lock.RUnlock()