From 302d7332e89151fdc07ee868090973a30a4d9131 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 10 May 2021 20:44:29 +0200 Subject: [PATCH 01/14] core: don't write blocks after insertion is stopped --- core/blockchain.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/blockchain.go b/core/blockchain.go index 66bc395c9dc4..68df1166b776 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1372,6 +1372,9 @@ var lastWrite uint64 // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) { + if bc.insertStopped() { + return errInsertionInterrupted + } bc.wg.Add(1) defer bc.wg.Done() @@ -1387,6 +1390,9 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e // writeKnownBlock updates the head block flag with a known block // and introduces chain reorg if necessary. func (bc *BlockChain) writeKnownBlock(block *types.Block) error { + if bc.insertStopped() { + return errInsertionInterrupted + } bc.wg.Add(1) defer bc.wg.Done() From f74cb45f76a03e09e224386cc5cdeba588ab7046 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 11 May 2021 08:56:41 +0200 Subject: [PATCH 02/14] core: fix check at wrong place --- core/blockchain.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 68df1166b776..b80e9d7e4762 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1390,9 +1390,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e // writeKnownBlock updates the head block flag with a known block // and introduces chain reorg if necessary. func (bc *BlockChain) writeKnownBlock(block *types.Block) error { - if bc.insertStopped() { - return errInsertionInterrupted - } bc.wg.Add(1) defer bc.wg.Done() @@ -1417,6 +1414,9 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { + if bc.insertStopped() { + return NonStatTy, errInsertionInterrupted + } bc.wg.Add(1) defer bc.wg.Done() From 2f19b6e186738ef26404b0acbacf10afbc9bb257 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Thu, 20 May 2021 11:36:13 +0200 Subject: [PATCH 03/14] core: remove bad use of waitgroup in blockchain --- core/blockchain.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index b80e9d7e4762..2ea4763cf394 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -995,6 +995,8 @@ func (bc *BlockChain) Stop() { bc.scope.Close() close(bc.quit) bc.StopInsert() + bc.chainmu.Lock() + defer bc.chainmu.Unlock() bc.wg.Wait() // Ensure that the entirety of the state snapshot is journalled to disk. @@ -1375,8 +1377,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e if bc.insertStopped() { return errInsertionInterrupted } - bc.wg.Add(1) - defer bc.wg.Done() batch := bc.db.NewBatch() rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) @@ -1390,9 +1390,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e // writeKnownBlock updates the head block flag with a known block // and introduces chain reorg if necessary. func (bc *BlockChain) writeKnownBlock(block *types.Block) error { - bc.wg.Add(1) - defer bc.wg.Done() - current := bc.CurrentBlock() if block.ParentHash() != current.Hash() { if err := bc.reorg(current, block); err != nil { @@ -1417,8 +1414,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. if bc.insertStopped() { return NonStatTy, errInsertionInterrupted } - bc.wg.Add(1) - defer bc.wg.Done() // Calculate the total difficulty of the block ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1) @@ -1600,11 +1595,9 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { } } // Pre-checks passed, start the full block imports - bc.wg.Add(1) bc.chainmu.Lock() n, err := bc.insertChain(chain, true) bc.chainmu.Unlock() - bc.wg.Done() return n, err } @@ -1616,11 +1609,9 @@ func (bc *BlockChain) InsertChainWithoutSealVerification(block *types.Block) (in defer bc.blockProcFeed.Send(false) // Pre-checks passed, start the full block imports - bc.wg.Add(1) bc.chainmu.Lock() n, err := bc.insertChain(types.Blocks([]*types.Block{block}), false) bc.chainmu.Unlock() - bc.wg.Done() return n, err } @@ -2246,7 +2237,6 @@ func (bc *BlockChain) update() { // the extra indices. func (bc *BlockChain) maintainTxIndex(ancients uint64) { defer bc.wg.Done() - // Before starting the actual maintenance, we need to handle a special case, // where user might init Geth with an external ancient database. If so, we // need to reindex all necessary transactions before starting to process any @@ -2361,8 +2351,6 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i bc.chainmu.Lock() defer bc.chainmu.Unlock() - bc.wg.Add(1) - defer bc.wg.Done() _, err := bc.hc.InsertHeaderChain(chain, start) return 0, err } From c9e2f0c58f75bd3ff05008b79c8fc59edcdb7df1 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 6 Oct 2021 12:59:05 +0200 Subject: [PATCH 04/14] core: track future blocks loop in BlockChain.wg --- core/blockchain.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 2ea4763cf394..4b562a6a3763 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -278,6 +278,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par if err := bc.loadLastState(); err != nil { return nil, err } + // Make sure the state associated with the block is available head := bc.CurrentBlock() if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil { @@ -306,6 +307,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } } + // Ensure that a previous crash in SetHead doesn't leave extra ancients if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { var ( @@ -357,6 +359,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } } + // Load any existing snapshot, regenerating it if loading failed if bc.cacheConfig.SnapshotLimit > 0 { // If the chain was rewound past the snapshot persistent layer (causing @@ -372,14 +375,19 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } bc.snaps, _ = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, true, recover) } - // Take ownership of this particular state - go bc.update() + + // Start future block processor. + bc.wg.Add(1) + go bc.futureBlocksLoop() + + // Start tx indexer/unindexer. if txLookupLimit != nil { bc.txLookupLimit = *txLookupLimit bc.wg.Add(1) go bc.maintainTxIndex(txIndexBlock) } + // If periodic cache journal is required, spin it up. if bc.cacheConfig.TrieCleanRejournal > 0 { if bc.cacheConfig.TrieCleanRejournal < time.Minute { @@ -2212,7 +2220,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } -func (bc *BlockChain) update() { +// futureBlocksLoop processes the 'future block' queue. +func (bc *BlockChain) futureBlocksLoop() { + defer bc.wg.Done() + futureTimer := time.NewTicker(5 * time.Second) defer futureTimer.Stop() for { From fc984b46c5edc14a44677ab28ebabb380df4dd38 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 6 Oct 2021 12:59:39 +0200 Subject: [PATCH 05/14] core: add some blank lines in maintainTxIndex --- core/blockchain.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/blockchain.go b/core/blockchain.go index 4b562a6a3763..32335e2debdd 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2248,6 +2248,7 @@ func (bc *BlockChain) futureBlocksLoop() { // the extra indices. func (bc *BlockChain) maintainTxIndex(ancients uint64) { defer bc.wg.Done() + // Before starting the actual maintenance, we need to handle a special case, // where user might init Geth with an external ancient database. If so, we // need to reindex all necessary transactions before starting to process any @@ -2259,6 +2260,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { } rawdb.IndexTransactions(bc.db, from, ancients, bc.quit) } + // indexBlocks reindexes or unindexes transactions depending on user configuration indexBlocks := func(tail *uint64, head uint64, done chan struct{}) { defer func() { done <- struct{}{} }() @@ -2291,6 +2293,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit) } } + // Any reindexing done, start listening to chain events and moving the index window var ( done chan struct{} // Non-nil if background unindexing or reindexing routine is active. From 15823b6ca0887b54e333504178a7e45d95c9d492 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 6 Oct 2021 13:04:34 +0200 Subject: [PATCH 06/14] core: add some blank lines in insertChain --- core/blockchain.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index 32335e2debdd..408e342d1d40 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1796,9 +1796,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er lastCanon = block continue } + // Retrieve the parent block and it's state to execute on top start := time.Now() - parent := it.previous() if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) @@ -1807,6 +1807,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if err != nil { return it.index, err } + // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") activeState = statedb @@ -1828,6 +1829,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er }(time.Now(), followup, throwaway, &followupInterrupt) } } + // Process block using the parent state as reference point substart := time.Now() receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) @@ -1836,6 +1838,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } + // Update the metrics touched during block processing accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them @@ -1911,6 +1914,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er dirty, _ := bc.stateCache.TrieDB().Size() stats.report(chain, it.index, dirty) } + // Any blocks remaining here? The only ones we care about are the future ones if block != nil && errors.Is(err, consensus.ErrFutureBlock) { if err := bc.addFutureBlock(block); err != nil { From 8530bc7af3f5167b37d0a5b3accb9119a8a3a5fb Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 6 Oct 2021 16:06:35 +0200 Subject: [PATCH 07/14] core: implement and use closable mutex for chain stop --- core/blockchain.go | 99 +++++++++++++++++++++++++----------------- internal/syncx/lock.go | 55 +++++++++++++++++++++++ 2 files changed, 113 insertions(+), 41 deletions(-) create mode 100644 internal/syncx/lock.go diff --git a/core/blockchain.go b/core/blockchain.go index 408e342d1d40..3f33fc82a6b5 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -39,6 +39,7 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/internal/syncx" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" @@ -80,6 +81,7 @@ var ( blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) errInsertionInterrupted = errors.New("insertion is interrupted") + errChainStopped = errors.New("blockchain is stopped") ) const ( @@ -183,7 +185,7 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - chainmu sync.RWMutex // blockchain insertion lock + chainmu *syncx.ClosableMutex // blockchain insertion lock currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) @@ -235,6 +237,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par Preimages: cacheConfig.Preimages, }), quit: make(chan struct{}), + chainmu: syncx.NewClosableMutex(), shouldPreserve: shouldPreserve, bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -496,7 +499,9 @@ func (bc *BlockChain) SetHead(head uint64) error { // // The method returns the block number where the requested root cap was found. func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) { - bc.chainmu.Lock() + if !bc.chainmu.Lock() { + return 0, errChainStopped + } defer bc.chainmu.Unlock() // Track the block number of the requested root hash @@ -641,8 +646,11 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB()); err != nil { return err } - // If all checks out, manually set the head block - bc.chainmu.Lock() + + // If all checks out, manually set the head block. + if !bc.chainmu.Lock() { + return errChainStopped + } bc.currentBlock.Store(block) headBlockGauge.Update(int64(block.NumberU64())) bc.chainmu.Unlock() @@ -745,8 +753,10 @@ func (bc *BlockChain) Export(w io.Writer) error { // ExportN writes a subset of the active chain to the given writer. func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { - bc.chainmu.RLock() - defer bc.chainmu.RUnlock() + if !bc.chainmu.Lock() { + return fmt.Errorf("chain is stopped") + } + defer bc.chainmu.Unlock() if first > last { return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last) @@ -999,12 +1009,16 @@ func (bc *BlockChain) Stop() { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return } - // Unsubscribe all subscriptions registered from blockchain + + // Unsubscribe all subscriptions registered from blockchain. bc.scope.Close() + + // Signal shutdown to all goroutines. close(bc.quit) bc.StopInsert() - bc.chainmu.Lock() - defer bc.chainmu.Unlock() + + // Grab the lock and wait for all goroutines to come down. + bc.chainmu.Close() bc.wg.Wait() // Ensure that the entirety of the state snapshot is journalled to disk. @@ -1015,6 +1029,7 @@ func (bc *BlockChain) Stop() { log.Error("Failed to journal state snapshot", "err", err) } } + // Ensure the state of a recent block is also stored to disk before exiting. // We're writing three different states to catch different restart scenarios: // - HEAD: So we don't need to reprocess any blocks in the general case @@ -1138,7 +1153,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // updateHead updates the head fast sync block if the inserted blocks are better // and returns an indicator whether the inserted blocks are canonical. updateHead := func(head *types.Block) bool { - bc.chainmu.Lock() + if !bc.chainmu.Lock() { + return false + } defer bc.chainmu.Unlock() // Rewind may have occurred, skip in that case. @@ -1410,9 +1427,10 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { // WriteBlockWithState writes the block and all associated state to the database. func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { - bc.chainmu.Lock() + if !bc.chainmu.Lock() { + return NonStatTy, errInsertionInterrupted + } defer bc.chainmu.Unlock() - return bc.writeBlockWithState(block, receipts, logs, state, emitHeadEvent) } @@ -1585,29 +1603,28 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { bc.blockProcFeed.Send(true) defer bc.blockProcFeed.Send(false) - // Remove already known canon-blocks - var ( - block, prev *types.Block - ) - // Do a sanity check that the provided chain is actually ordered and linked + // Do a sanity check that the provided chain is actually ordered and linked. for i := 1; i < len(chain); i++ { - block = chain[i] - prev = chain[i-1] + block, prev := chain[i], chain[i-1] if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() { - // Chain broke ancestry, log a message (programming error) and skip insertion - log.Error("Non contiguous block insert", "number", block.Number(), "hash", block.Hash(), - "parent", block.ParentHash(), "prevnumber", prev.Number(), "prevhash", prev.Hash()) - + log.Error("Non contiguous block insert", + "number", block.Number(), + "hash", block.Hash(), + "parent", block.ParentHash(), + "prevnumber", prev.Number(), + "prevhash", prev.Hash(), + ) return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, prev.NumberU64(), prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4]) } } - // Pre-checks passed, start the full block imports - bc.chainmu.Lock() - n, err := bc.insertChain(chain, true) - bc.chainmu.Unlock() - return n, err + // Pre-check passed, start the full block imports. + if !bc.chainmu.Lock() { + return 0, errChainStopped + } + defer bc.chainmu.Unlock() + return bc.insertChain(chain, true) } // InsertChainWithoutSealVerification works exactly the same @@ -1616,12 +1633,11 @@ func (bc *BlockChain) InsertChainWithoutSealVerification(block *types.Block) (in bc.blockProcFeed.Send(true) defer bc.blockProcFeed.Send(false) - // Pre-checks passed, start the full block imports - bc.chainmu.Lock() - n, err := bc.insertChain(types.Blocks([]*types.Block{block}), false) - bc.chainmu.Unlock() - - return n, err + if !bc.chainmu.Lock() { + return 0, errChainStopped + } + defer bc.chainmu.Unlock() + return bc.insertChain(types.Blocks([]*types.Block{block}), false) } // insertChain is the internal implementation of InsertChain, which assumes that @@ -1633,10 +1649,11 @@ func (bc *BlockChain) InsertChainWithoutSealVerification(block *types.Block) (in // is imported, but then new canon-head is added before the actual sidechain // completes, then the historic state could be pruned again func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, error) { - // If the chain is terminating, don't even bother starting up - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + // If the chain is terminating, don't even bother starting up. + if bc.insertStopped() { return 0, nil } + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) @@ -1671,8 +1688,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // First block (and state) is known // 1. We did a roll-back, and should now do a re-import // 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot - // from the canonical chain, which has not been verified. - // Skip all known blocks that are behind us + // from the canonical chain, which has not been verified. + // Skip all known blocks that are behind us. var ( current = bc.CurrentBlock() localTd = bc.GetTd(current.Hash(), current.NumberU64()) @@ -2365,10 +2382,10 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i return i, err } - // Make sure only one thread manipulates the chain at once - bc.chainmu.Lock() + if !bc.chainmu.Lock() { + return 0, errChainStopped + } defer bc.chainmu.Unlock() - _, err := bc.hc.InsertHeaderChain(chain, start) return 0, err } diff --git a/internal/syncx/lock.go b/internal/syncx/lock.go new file mode 100644 index 000000000000..e02c914be594 --- /dev/null +++ b/internal/syncx/lock.go @@ -0,0 +1,55 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package syncx contains specialized synchronization primitives. +package syncx + +type ClosableMutex struct { + ch chan struct{} +} + +func NewClosableMutex() *ClosableMutex { + ch := make(chan struct{}, 1) + ch <- struct{}{} + return &ClosableMutex{ch} +} + +// Lock attempts to lock cm. +// +// If the mutex is closed, Lock returns false. +func (cm *ClosableMutex) Lock() bool { + _, ok := <-cm.ch + return ok +} + +// Unlock unlocks cm. +func (cm *ClosableMutex) Unlock() { + select { + case cm.ch <- struct{}{}: + default: + panic("Unlock of already-unlocked ClosableMutex") + } +} + +// Close locks the mutex, then closes it. +// When Close has returned, the mutex cannot be taken again. +func (cm *ClosableMutex) Close() { + _, ok := <-cm.ch + if !ok { + panic("Close of already-closed ClosableMutex") + } + close(cm.ch) +} From f171df5a20000d554842f9d1b71447f1384d2625 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 6 Oct 2021 16:12:50 +0200 Subject: [PATCH 08/14] internal/syncx: update copyright year --- internal/syncx/lock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/syncx/lock.go b/internal/syncx/lock.go index e02c914be594..de908b339c6f 100644 --- a/internal/syncx/lock.go +++ b/internal/syncx/lock.go @@ -1,4 +1,4 @@ -// Copyright 2014 The go-ethereum Authors +// Copyright 2021 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify From 182d10683f59af41aea0912de48426bf3c31604f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 6 Oct 2021 16:19:08 +0200 Subject: [PATCH 09/14] core: update comment in Stop --- core/blockchain.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index 3f33fc82a6b5..34e6b37c4d86 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1017,7 +1017,12 @@ func (bc *BlockChain) Stop() { close(bc.quit) bc.StopInsert() - // Grab the lock and wait for all goroutines to come down. + // Now wait for all chain modifications to end and persistent goroutines to exit. + // + // Note: Close waits for the mutex to become available, i.e. any running chain + // modification will have exited when Close returns. Since we also called StopInsert, + // the mutex should become available quickly. It cannot be taken again after Close has + // returned. bc.chainmu.Close() bc.wg.Wait() From c02fea52c01e009c07582006083d5653dc7824d9 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 6 Oct 2021 16:20:38 +0200 Subject: [PATCH 10/14] core: use errChainStopped in ExportN --- core/blockchain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index 34e6b37c4d86..ea7df413d094 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -754,7 +754,7 @@ func (bc *BlockChain) Export(w io.Writer) error { // ExportN writes a subset of the active chain to the given writer. func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { if !bc.chainmu.Lock() { - return fmt.Errorf("chain is stopped") + return errChainStopped } defer bc.chainmu.Unlock() From aed6b62b4824df9ab39502bde24951ec1e01ab1e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 7 Oct 2021 13:51:34 +0200 Subject: [PATCH 11/14] core: add close handling in ResetWithGenesisBlock --- core/blockchain.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index ea7df413d094..ab60c2dea992 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -723,7 +723,9 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { if err := bc.SetHead(0); err != nil { return err } - bc.chainmu.Lock() + if !bc.chainmu.Lock() { + return errChainStopped + } defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain From fe0c75927b5d05477f26cecfb3977f0d62a79a84 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 7 Oct 2021 14:05:41 +0200 Subject: [PATCH 12/14] internal/syncx: improve docs/API --- internal/syncx/{lock.go => mutex.go} | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) rename internal/syncx/{lock.go => mutex.go} (75%) diff --git a/internal/syncx/lock.go b/internal/syncx/mutex.go similarity index 75% rename from internal/syncx/lock.go rename to internal/syncx/mutex.go index de908b339c6f..96a21986c60c 100644 --- a/internal/syncx/lock.go +++ b/internal/syncx/mutex.go @@ -14,9 +14,11 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Package syncx contains specialized synchronization primitives. +// Package syncx contains exotic synchronization primitives. package syncx +// ClosableMutex is a mutex that can also be closed. +// Once closed, it can never be taken again. type ClosableMutex struct { ch chan struct{} } @@ -27,14 +29,22 @@ func NewClosableMutex() *ClosableMutex { return &ClosableMutex{ch} } -// Lock attempts to lock cm. -// -// If the mutex is closed, Lock returns false. -func (cm *ClosableMutex) Lock() bool { +// TryLock attempts to lock cm. +// If the mutex is closed, TryLock returns false. +func (cm *ClosableMutex) TryLock() bool { _, ok := <-cm.ch return ok } +// MustLock locks cm. +// If the mutex is closed, MustLock panics. +func (cm *ClosableMutex) MustLock() { + _, ok := <-cm.ch + if !ok { + panic("mutex closed") + } +} + // Unlock unlocks cm. func (cm *ClosableMutex) Unlock() { select { @@ -45,7 +55,6 @@ func (cm *ClosableMutex) Unlock() { } // Close locks the mutex, then closes it. -// When Close has returned, the mutex cannot be taken again. func (cm *ClosableMutex) Close() { _, ok := <-cm.ch if !ok { From 8aad28158bce01b76cd6e0398e9e511e72b8b949 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 7 Oct 2021 14:06:12 +0200 Subject: [PATCH 13/14] core: update for new ClosableMutex API --- core/blockchain.go | 18 +++++++++--------- core/blockchain_test.go | 5 +++-- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index ab60c2dea992..aebfef077dc5 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -499,7 +499,7 @@ func (bc *BlockChain) SetHead(head uint64) error { // // The method returns the block number where the requested root cap was found. func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) { - if !bc.chainmu.Lock() { + if !bc.chainmu.TryLock() { return 0, errChainStopped } defer bc.chainmu.Unlock() @@ -648,7 +648,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { } // If all checks out, manually set the head block. - if !bc.chainmu.Lock() { + if !bc.chainmu.TryLock() { return errChainStopped } bc.currentBlock.Store(block) @@ -723,7 +723,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { if err := bc.SetHead(0); err != nil { return err } - if !bc.chainmu.Lock() { + if !bc.chainmu.TryLock() { return errChainStopped } defer bc.chainmu.Unlock() @@ -755,7 +755,7 @@ func (bc *BlockChain) Export(w io.Writer) error { // ExportN writes a subset of the active chain to the given writer. func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { - if !bc.chainmu.Lock() { + if !bc.chainmu.TryLock() { return errChainStopped } defer bc.chainmu.Unlock() @@ -1160,7 +1160,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // updateHead updates the head fast sync block if the inserted blocks are better // and returns an indicator whether the inserted blocks are canonical. updateHead := func(head *types.Block) bool { - if !bc.chainmu.Lock() { + if !bc.chainmu.TryLock() { return false } defer bc.chainmu.Unlock() @@ -1434,7 +1434,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { // WriteBlockWithState writes the block and all associated state to the database. func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { - if !bc.chainmu.Lock() { + if !bc.chainmu.TryLock() { return NonStatTy, errInsertionInterrupted } defer bc.chainmu.Unlock() @@ -1627,7 +1627,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { } // Pre-check passed, start the full block imports. - if !bc.chainmu.Lock() { + if !bc.chainmu.TryLock() { return 0, errChainStopped } defer bc.chainmu.Unlock() @@ -1640,7 +1640,7 @@ func (bc *BlockChain) InsertChainWithoutSealVerification(block *types.Block) (in bc.blockProcFeed.Send(true) defer bc.blockProcFeed.Send(false) - if !bc.chainmu.Lock() { + if !bc.chainmu.TryLock() { return 0, errChainStopped } defer bc.chainmu.Unlock() @@ -2389,7 +2389,7 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i return i, err } - if !bc.chainmu.Lock() { + if !bc.chainmu.TryLock() { return 0, errChainStopped } defer bc.chainmu.Unlock() diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 8d94f17aabae..4c3291dc32ce 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -163,7 +163,8 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.reportBlock(block, receipts, err) return err } - blockchain.chainmu.Lock() + + blockchain.chainmu.MustLock() rawdb.WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) rawdb.WriteBlock(blockchain.db, block) statedb.Commit(false) @@ -181,7 +182,7 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error return err } // Manually insert the header into the database, but don't reorganise (allows subsequent testing) - blockchain.chainmu.Lock() + blockchain.chainmu.MustLock() rawdb.WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash))) rawdb.WriteHeader(blockchain.db, header) blockchain.chainmu.Unlock() From c00e3ca9c61fe560a3de8017acb7d1cafba47a47 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 7 Oct 2021 14:54:41 +0200 Subject: [PATCH 14/14] core: update BlockChain field comments --- core/blockchain.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index aebfef077dc5..559800a06861 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -185,7 +185,9 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - chainmu *syncx.ClosableMutex // blockchain insertion lock + // This mutex synchronizes chain write operations. + // Readers don't need to take it, they can just read the database. + chainmu *syncx.ClosableMutex currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) @@ -198,8 +200,8 @@ type BlockChain struct { txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing - quit chan struct{} // blockchain quit channel - wg sync.WaitGroup // chain processing wait group for shutting down + wg sync.WaitGroup // + quit chan struct{} // shutdown signal, closed in Stop. running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing