From d9873bbf381967e8a7b8ffe1609093313fdc3564 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 23 Oct 2023 23:31:56 +0800 Subject: [PATCH] core, eth, trie: filter out boundary nodes and remove dangling nodes in (#28327) --- eth/protocols/snap/metrics.go | 28 ++++++++++ eth/protocols/snap/sync.go | 99 +++++++++++++++++++++++++++++++++-- trie/stacktrie.go | 83 ++++++++++++++++++++++++++--- trie/stacktrie_test.go | 87 ++++++++++++++++++++++++++++++ trie/sync.go | 26 +++++++++ 5 files changed, 312 insertions(+), 11 deletions(-) diff --git a/eth/protocols/snap/metrics.go b/eth/protocols/snap/metrics.go index a9f35ca4471b..a7d071953f67 100644 --- a/eth/protocols/snap/metrics.go +++ b/eth/protocols/snap/metrics.go @@ -26,4 +26,32 @@ var ( IngressRegistrationErrorMeter = metrics.NewRegisteredMeter(ingressRegistrationErrorName, nil) EgressRegistrationErrorMeter = metrics.NewRegisteredMeter(egressRegistrationErrorName, nil) + + // deletionGauge is the metric to track how many trie node deletions + // are performed in total during the sync process. + deletionGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/delete", nil) + + // lookupGauge is the metric to track how many trie node lookups are + // performed to determine if node needs to be deleted. + lookupGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/lookup", nil) + + // boundaryAccountNodesGauge is the metric to track how many boundary trie + // nodes in account trie are met. + boundaryAccountNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/account", nil) + + // boundaryAccountNodesGauge is the metric to track how many boundary trie + // nodes in storage tries are met. + boundaryStorageNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/storage", nil) + + // smallStorageGauge is the metric to track how many storages are small enough + // to retrieved in one or two request. + smallStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/small", nil) + + // largeStorageGauge is the metric to track how many storages are large enough + // to retrieved concurrently. + largeStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/large", nil) + + // skipStorageHealingGauge is the metric to track how many storages are retrieved + // in multiple requests but healing is not necessary. + skipStorageHealingGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/noheal", nil) ) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index eafa948de2fc..fa628a108e9f 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -717,6 +717,19 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { } } +// cleanPath is used to remove the dangling nodes in the stackTrie. +func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) { + if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db, path) { + rawdb.DeleteAccountTrieNode(batch, path) + deletionGauge.Inc(1) + } + if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db, owner, path) { + rawdb.DeleteStorageTrieNode(batch, owner, path) + deletionGauge.Inc(1) + } + lookupGauge.Inc(1) +} + // loadSyncStatus retrieves a previously aborted sync status from the database, // or generates a fresh one if none is available. func (s *Syncer) loadSyncStatus() { @@ -743,6 +756,17 @@ func (s *Syncer) loadSyncStatus() { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(task.genBatch, common.Hash{}, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + // Configure the dangling node cleaner and also filter out boundary nodes + // only in the context of the path scheme. Deletion is forbidden in the + // hash scheme, as it can disrupt state completeness. + options = options.WithCleaner(func(path []byte) { + s.cleanPath(task.genBatch, common.Hash{}, path) + }) + // Skip the left boundary if it's not the first range. + // Skip the right boundary if it's not the last range. + options = options.WithSkipBoundary(task.Next != (common.Hash{}), task.Last != common.MaxHash, boundaryAccountNodesGauge) + } task.genTrie = trie.NewStackTrie(options) for accountHash, subtasks := range task.SubTasks { for _, subtask := range subtasks { @@ -759,6 +783,17 @@ func (s *Syncer) loadSyncStatus() { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + // Configure the dangling node cleaner and also filter out boundary nodes + // only in the context of the path scheme. Deletion is forbidden in the + // hash scheme, as it can disrupt state completeness. + options = options.WithCleaner(func(path []byte) { + s.cleanPath(subtask.genBatch, owner, path) + }) + // Skip the left boundary if it's not the first range. + // Skip the right boundary if it's not the last range. + options = options.WithSkipBoundary(subtask.Next != common.Hash{}, subtask.Last != common.MaxHash, boundaryStorageNodesGauge) + } subtask.genTrie = trie.NewStackTrie(options) } } @@ -815,6 +850,17 @@ func (s *Syncer) loadSyncStatus() { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + // Configure the dangling node cleaner and also filter out boundary nodes + // only in the context of the path scheme. Deletion is forbidden in the + // hash scheme, as it can disrupt state completeness. + options = options.WithCleaner(func(path []byte) { + s.cleanPath(batch, common.Hash{}, path) + }) + // Skip the left boundary if it's not the first range. + // Skip the right boundary if it's not the last range. + options = options.WithSkipBoundary(next != common.Hash{}, last != common.MaxHash, boundaryAccountNodesGauge) + } s.tasks = append(s.tasks, &accountTask{ Next: next, Last: last, @@ -1972,6 +2018,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) { res.mainTask.needState[j] = false res.mainTask.pend-- + smallStorageGauge.Inc(1) } // If the last contract was chunked, mark it as needing healing // to avoid writing it out to disk prematurely. @@ -2007,7 +2054,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks) } r := newHashRange(lastKey, chunks) - + if chunks == 1 { + smallStorageGauge.Inc(1) + } else { + largeStorageGauge.Inc(1) + } // Our first task is the one that was just filled by this response. batch := ethdb.HookedBatch{ Batch: s.db.NewBatch(), @@ -2020,6 +2071,14 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + options = options.WithCleaner(func(path []byte) { + s.cleanPath(batch, owner, path) + }) + // Keep the left boundary as it's the first range. + // Skip the right boundary if it's not the last range. + options = options.WithSkipBoundary(false, r.End() != common.MaxHash, boundaryStorageNodesGauge) + } tasks = append(tasks, &storageTask{ Next: common.Hash{}, Last: r.End(), @@ -2038,6 +2097,17 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + // Configure the dangling node cleaner and also filter out boundary nodes + // only in the context of the path scheme. Deletion is forbidden in the + // hash scheme, as it can disrupt state completeness. + options = options.WithCleaner(func(path []byte) { + s.cleanPath(batch, owner, path) + }) + // Skip the left boundary as it's not the first range + // Skip the right boundary if it's not the last range. + options = options.WithSkipBoundary(true, r.End() != common.MaxHash, boundaryStorageNodesGauge) + } tasks = append(tasks, &storageTask{ Next: r.Start(), Last: r.End(), @@ -2093,6 +2163,17 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(batch, account, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + // Configure the dangling node cleaner only in the context of the + // path scheme. Deletion is forbidden in the hash scheme, as it can + // disrupt state completeness. + // + // Notably, boundary nodes can be also kept because the whole storage + // trie is complete. + options = options.WithCleaner(func(path []byte) { + s.cleanPath(batch, account, path) + }) + } tr := trie.NewStackTrie(options) for j := 0; j < len(res.hashes[i]); j++ { tr.Update(res.hashes[i][j][:], res.slots[i][j]) @@ -2116,16 +2197,24 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { if res.subTask != nil { if res.subTask.done { root := res.subTask.genTrie.Commit() - if root == res.subTask.root { - // If the chunk's root is an overflown but full delivery, clear the heal request + if err := res.subTask.genBatch.Write(); err != nil { + log.Error("Failed to persist stack slots", "err", err) + } + res.subTask.genBatch.Reset() + + // If the chunk's root is an overflown but full delivery, + // clear the heal request. + accountHash := res.accounts[len(res.accounts)-1] + if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db, accountHash, nil, root) { for i, account := range res.mainTask.res.hashes { - if account == res.accounts[len(res.accounts)-1] { + if account == accountHash { res.mainTask.needHeal[i] = false + skipStorageHealingGauge.Inc(1) } } } } - if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done { + if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize { if err := res.subTask.genBatch.Write(); err != nil { log.Error("Failed to persist stack slots", "err", err) } diff --git a/trie/stacktrie.go b/trie/stacktrie.go index 09c3402663bb..423afdec8816 100644 --- a/trie/stacktrie.go +++ b/trie/stacktrie.go @@ -17,11 +17,13 @@ package trie import ( + "bytes" "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" ) var ( @@ -31,7 +33,12 @@ var ( // StackTrieOptions contains the configured options for manipulating the stackTrie. type StackTrieOptions struct { - Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes + Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes + Cleaner func(path []byte) // The function to clean up dangling nodes + + SkipLeftBoundary bool // Flag whether the nodes on the left boundary are skipped for committing + SkipRightBoundary bool // Flag whether the nodes on the right boundary are skipped for committing + boundaryGauge metrics.Gauge // Gauge to track how many boundary nodes are met } // NewStackTrieOptions initializes an empty options for stackTrie. @@ -43,6 +50,22 @@ func (o *StackTrieOptions) WithWriter(writer func(path []byte, hash common.Hash, return o } +// WithCleaner configures the cleaner in the option for removing dangling nodes. +func (o *StackTrieOptions) WithCleaner(cleaner func(path []byte)) *StackTrieOptions { + o.Cleaner = cleaner + return o +} + +// WithSkipBoundary configures whether the left and right boundary nodes are +// filtered for committing, along with a gauge metrics to track how many +// boundary nodes are met. +func (o *StackTrieOptions) WithSkipBoundary(skipLeft, skipRight bool, gauge metrics.Gauge) *StackTrieOptions { + o.SkipLeftBoundary = skipLeft + o.SkipRightBoundary = skipRight + o.boundaryGauge = gauge + return o +} + // StackTrie is a trie implementation that expects keys to be inserted // in order. Once it determines that a subtree will no longer be inserted // into, it will hash it and free up the memory it uses. @@ -50,6 +73,9 @@ type StackTrie struct { options *StackTrieOptions root *stNode h *hasher + + first []byte // The (hex-encoded without terminator) key of first inserted entry, tracked as left boundary. + last []byte // The (hex-encoded without terminator) key of last inserted entry, tracked as right boundary. } // NewStackTrie allocates and initializes an empty trie. @@ -72,6 +98,15 @@ func (t *StackTrie) Update(key, value []byte) error { } k = k[:len(k)-1] // chop the termination flag + // track the first and last inserted entries. + if t.first == nil { + t.first = append([]byte{}, k...) + } + if t.last == nil { + t.last = append([]byte{}, k...) // allocate key slice + } else { + t.last = append(t.last[:0], k...) // reuse key slice + } t.insert(t.root, k, value, nil) return nil } @@ -88,6 +123,8 @@ func (t *StackTrie) MustUpdate(key, value []byte) { func (t *StackTrie) Reset() { t.options = NewStackTrieOptions() t.root = stPool.Get().(*stNode) + t.first = nil + t.last = nil } // stNode represents a node within a StackTrie @@ -306,8 +343,10 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, path []byte) { // // This method also sets 'st.type' to hashedNode, and clears 'st.key'. func (t *StackTrie) hash(st *stNode, path []byte) { - var blob []byte // RLP-encoded node blob - + var ( + blob []byte // RLP-encoded node blob + internal [][]byte // List of node paths covered by the extension node + ) switch st.typ { case hashedNode: return @@ -342,6 +381,15 @@ func (t *StackTrie) hash(st *stNode, path []byte) { // recursively hash and commit child as the first step t.hash(st.children[0], append(path, st.key...)) + // Collect the path of internal nodes between shortNode and its **in disk** + // child. This is essential in the case of path mode scheme to avoid leaving + // danging nodes within the range of this internal path on disk, which would + // break the guarantee for state healing. + if len(st.children[0].val) >= 32 && t.options.Cleaner != nil { + for i := 1; i < len(st.key); i++ { + internal = append(internal, append(path, st.key[:i]...)) + } + } // encode the extension node n := shortNode{Key: hexToCompactInPlace(st.key)} if len(st.children[0].val) < 32 { @@ -378,10 +426,33 @@ func (t *StackTrie) hash(st *stNode, path []byte) { // input values. st.val = t.h.hashData(blob) - // Commit the trie node if the writer is configured. - if t.options.Writer != nil { - t.options.Writer(path, common.BytesToHash(st.val), blob) + // Short circuit if the stack trie is not configured for writing. + if t.options.Writer == nil { + return + } + // Skip committing if the node is on the left boundary and stackTrie is + // configured to filter the boundary. + if t.options.SkipLeftBoundary && bytes.HasPrefix(t.first, path) { + if t.options.boundaryGauge != nil { + t.options.boundaryGauge.Inc(1) + } + return + } + // Skip committing if the node is on the right boundary and stackTrie is + // configured to filter the boundary. + if t.options.SkipRightBoundary && bytes.HasPrefix(t.last, path) { + if t.options.boundaryGauge != nil { + t.options.boundaryGauge.Inc(1) + } + return + } + // Clean up the internal dangling nodes covered by the extension node. + // This should be done before writing the node to adhere to the committing + // order from bottom to top. + for _, path := range internal { + t.options.Cleaner(path) } + t.options.Writer(path, common.BytesToHash(st.val), blob) } // Hash will firstly hash the entire trie if it's still not hashed and then commit diff --git a/trie/stacktrie_test.go b/trie/stacktrie_test.go index b52c38183be4..802d472e36e6 100644 --- a/trie/stacktrie_test.go +++ b/trie/stacktrie_test.go @@ -19,11 +19,14 @@ package trie import ( "bytes" "math/big" + "math/rand" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/trie/testutil" + "golang.org/x/exp/slices" ) func TestStackTrieInsertAndHash(t *testing.T) { @@ -344,3 +347,87 @@ func TestStacktrieNotModifyValues(t *testing.T) { } } } + +func buildPartialTree(entries []*kv, t *testing.T) map[string]common.Hash { + var ( + options = NewStackTrieOptions() + nodes = make(map[string]common.Hash) + ) + var ( + first int + last = len(entries) - 1 + + noLeft bool + noRight bool + ) + // Enter split mode if there are at least two elements + if rand.Intn(5) != 0 { + for { + first = rand.Intn(len(entries)) + last = rand.Intn(len(entries)) + if first <= last { + break + } + } + if first != 0 { + noLeft = true + } + if last != len(entries)-1 { + noRight = true + } + } + options = options.WithSkipBoundary(noLeft, noRight, nil) + options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { + nodes[string(path)] = hash + }) + tr := NewStackTrie(options) + + for i := first; i <= last; i++ { + tr.MustUpdate(entries[i].k, entries[i].v) + } + tr.Commit() + return nodes +} + +func TestPartialStackTrie(t *testing.T) { + for round := 0; round < 100; round++ { + var ( + n = rand.Intn(100) + 1 + entries []*kv + ) + for i := 0; i < n; i++ { + var val []byte + if rand.Intn(3) == 0 { + val = testutil.RandBytes(3) + } else { + val = testutil.RandBytes(32) + } + entries = append(entries, &kv{ + k: testutil.RandBytes(32), + v: val, + }) + } + slices.SortFunc(entries, (*kv).cmp) + + var ( + nodes = make(map[string]common.Hash) + options = NewStackTrieOptions().WithWriter(func(path []byte, hash common.Hash, blob []byte) { + nodes[string(path)] = hash + }) + ) + tr := NewStackTrie(options) + + for i := 0; i < len(entries); i++ { + tr.MustUpdate(entries[i].k, entries[i].v) + } + tr.Commit() + + for j := 0; j < 100; j++ { + for path, hash := range buildPartialTree(entries, t) { + if nodes[path] != hash { + t.Errorf("%v, want %x, got %x", []byte(path), nodes[path], hash) + } + } + } + } +} diff --git a/trie/sync.go b/trie/sync.go index 9da07060759a..e5af26ff91fb 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -51,6 +51,18 @@ var ( // lookupGauge is the metric to track how many trie node lookups are // performed to determine if node needs to be deleted. lookupGauge = metrics.NewRegisteredGauge("trie/sync/lookup", nil) + + // accountNodeSyncedGauge is the metric to track how many account trie + // node are written during the sync. + accountNodeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/nodes/account", nil) + + // storageNodeSyncedGauge is the metric to track how many account trie + // node are written during the sync. + storageNodeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/nodes/storage", nil) + + // codeSyncedGauge is the metric to track how many contract codes are + // written during the sync. + codeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/codes", nil) ) // SyncPath is a path tuple identifying a particular trie node either in a single @@ -362,10 +374,22 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error { // storage, returning any occurred error. func (s *Sync) Commit(dbw ethdb.Batch) error { // Flush the pending node writes into database batch. + var ( + account int + storage int + ) for path, value := range s.membatch.nodes { owner, inner := ResolvePath([]byte(path)) + if owner == (common.Hash{}) { + account += 1 + } else { + storage += 1 + } rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme) } + accountNodeSyncedGauge.Inc(int64(account)) + storageNodeSyncedGauge.Inc(int64(storage)) + // Flush the pending node deletes into the database batch. // Please note that each written and deleted node has a // unique path, ensuring no duplication occurs. @@ -377,6 +401,8 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { for hash, value := range s.membatch.codes { rawdb.WriteCode(dbw, hash, value) } + codeSyncedGauge.Inc(int64(len(s.membatch.codes))) + s.membatch = newSyncMemBatch() // reset the batch return nil }