Skip to content
This repository has been archived by the owner on Jan 19, 2021. It is now read-only.

Refactor WalkTrie #135

Merged
merged 8 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/checkpointing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const iterTest = async (numOfIter: number): Promise<Array<number>> => {
}
await trie.commit()
}
return process.hrtime(hrstart)
}

const go = async () => {
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/random.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const run = async (): Promise<void> => {
}
}

// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (SYMMETRIC) {
await trie.put(key, key)
genRoot()
Expand All @@ -36,6 +37,7 @@ const run = async (): Promise<void> => {

const go = async () => {
const testName = `benchmarks/random.ts | rounds: ${ROUNDS}, ERA_SIZE: ${ERA_SIZE}, ${
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
SYMMETRIC ? 'sys' : 'rand'
}`
console.time(testName)
Expand Down
142 changes: 38 additions & 104 deletions src/baseTrie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { LevelUp } from 'levelup'
import { keccak, KECCAK256_RLP } from 'ethereumjs-util'
import { DB, BatchDBOp, PutBatch } from './db'
import { TrieReadStream as ReadStream } from './readStream'
import { PrioritizedTaskExecutor } from './prioritizedTaskExecutor'
import { bufferToNibbles, matchingNibbleLength, doKeysMatch } from './util/nibbles'
import WalkStrategy from './util/walkStrategy'
import {
TrieNode,
decodeNode,
Expand All @@ -26,11 +26,11 @@ interface Path {
stack: TrieNode[]
}

type FoundNodeFunction = (
export type FoundNodeFunction = (
nodeRef: Buffer,
node: TrieNode,
key: Nibbles,
walkController: any
walkController: WalkStrategy
ryanio marked this conversation as resolved.
Show resolved Hide resolved
) => void

/**
Expand Down Expand Up @@ -117,6 +117,7 @@ export class Trie {
*/
async put(key: Buffer, value: Buffer): Promise<void> {
// If value is empty, delete
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!value || value.toString() === '') {
return await this.del(key)
}
Expand Down Expand Up @@ -175,7 +176,8 @@ export class Trie {
resolve({ node: null, remaining: keyRemainder, stack })
} else {
// node found, continuing search
await walkController.only(branchIndex)
// this can be optimized as this calls getBranch again.
await walkController.onlyBranchIndex(node, keyProgress, branchIndex)
}
}
} else if (node instanceof LeafNode) {
Expand All @@ -193,13 +195,13 @@ export class Trie {
resolve({ node: null, remaining: keyRemainder, stack })
} else {
// keys match, continue search
await walkController.next()
walkController.allChildren(node, keyProgress)
}
}
}

// walk trie and process nodes
await this._walkTrie(this.root, onFound)
await this.walkTrie(this.root, onFound)

// Resolve if _walkTrie finishes without finding any nodes
resolve({ node: null, remaining: [], stack })
Expand All @@ -208,102 +210,22 @@ export class Trie {

/**
* Walks a trie until finished.
* @private
* @param root
* @param onFound - callback to call when a node is found
* @param onFound - callback to call when a node is found. This schedules new tasks. If no tasks are availble, the Promise resolves.
jochem-brouwer marked this conversation as resolved.
Show resolved Hide resolved
* @returns Resolves when finished walking trie.
*/
async _walkTrie(root: Buffer, onFound: FoundNodeFunction): Promise<void> {
// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve) => {
const self = this
root = root || this.root

if (root.equals(KECCAK256_RLP)) {
return resolve()
}

// The maximum pool size should be high enough to utilize
// the parallelizability of reading nodes from disk and
// low enough to utilize the prioritisation of node lookup.
const maxPoolSize = 500
const taskExecutor = new PrioritizedTaskExecutor(maxPoolSize)

const processNode = async (
nodeRef: Buffer,
node: TrieNode,
key: Nibbles = []
): Promise<void> => {
const walkController = {
next: async () => {
if (node instanceof LeafNode) {
if (taskExecutor.finished()) {
resolve()
}
return
}
let children
if (node instanceof ExtensionNode) {
children = [[node.key, node.value]]
} else if (node instanceof BranchNode) {
children = node.getChildren().map((b) => [[b[0]], b[1]])
}
if (!children) {
// Node has no children
return resolve()
}
for (const child of children) {
const keyExtension = child[0] as Nibbles
const childRef = child[1] as Buffer
const childKey = key.concat(keyExtension)
const priority = childKey.length
taskExecutor.execute(priority, async (taskCallback: Function) => {
const childNode = await self._lookupNode(childRef)
taskCallback()
if (childNode) {
await processNode(childRef, childNode as TrieNode, childKey)
}
})
}
},
only: async (childIndex: number) => {
if (!(node instanceof BranchNode)) {
throw new Error('Expected branch node')
}
const childRef = node.getBranch(childIndex)
if (!childRef) {
throw new Error('Could not get branch of childIndex')
}
const childKey = key.slice()
childKey.push(childIndex)
const priority = childKey.length
taskExecutor.execute(priority, async (taskCallback: Function) => {
const childNode = await self._lookupNode(childRef)
taskCallback()
if (childNode) {
await processNode(childRef as Buffer, childNode, childKey)
} else {
// could not find child node
resolve()
}
})
},
}

if (node) {
onFound(nodeRef, node, key, walkController)
} else {
resolve()
}
}
async walkTrie(root: Buffer, onFound: FoundNodeFunction): Promise<void> {
await WalkStrategy.newWalk(onFound, this, root)
}

const node = await this._lookupNode(root)
if (node) {
await processNode(root, node as TrieNode, [])
} else {
resolve()
}
})
/**
* @hidden
* Backwards compatibility
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you elaborate more here, it's a little confusing that an underscore-prefixed method would need to be kept around for backwards compatibility, do you think others are using this that would lead to a breaking change if removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can drop it, but this changes the function signature of the library and would thus imply a major release? It was not officially marked as private. But I doubt any external user is using it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense thanks, we can see what @holgerd77 thinks, i'm fine either way 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it might make sense to keep this for now - judging from our own loose usage pattern of private methods (we might actually want to give this topic some more structured thought respectively it likely makes sense to think this a bit along once one stumbles upon a private method used to raise the question: why are we actually doing this and can we a) achieve the same goal in a way using the public API or b) is the public API of the library used not sufficient and should we additionally expose something there? Doesn't need to be solved "on site" but can also very well lead to a new issue)

For now I've opened a new v5 planning issue #136 where we can collect breaking-change wishes as well as deprecation tasks and have added the _walkTrie() removal as a TODO. One step in the direction that we generally get to a bit more structured on major release planning.

* @param root -
* @param onFound -
*/
async _walkTrie(root: Buffer, onFound: FoundNodeFunction): Promise<void> {
await this.walkTrie(root, onFound)
}

/**
Expand All @@ -318,9 +240,8 @@ export class Trie {

/**
* Retrieves a node from db by hash.
* @private
*/
async _lookupNode(node: Buffer | Buffer[]): Promise<TrieNode | null> {
async lookupNode(node: Buffer | Buffer[]): Promise<TrieNode | null> {
if (isRawNode(node)) {
return decodeRawNode(node as Buffer[])
}
Expand All @@ -334,6 +255,15 @@ export class Trie {
return foundNode
}

/**
* @hidden
* Backwards compatibility
* @param node The node hash to lookup from the DB
*/
async _lookupNode(node: Buffer | Buffer[]): Promise<TrieNode | null> {
return this.lookupNode(node)
}

jochem-brouwer marked this conversation as resolved.
Show resolved Hide resolved
/**
* Updates a node.
* @private
Expand Down Expand Up @@ -454,8 +384,10 @@ export class Trie {
stack: TrieNode[]
) => {
// branchNode is the node ON the branch node not THE branch node
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!parentNode || parentNode instanceof BranchNode) {
// branch->?
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (parentNode) {
stack.push(parentNode)
}
Expand Down Expand Up @@ -659,10 +591,12 @@ export class Trie {
async batch(ops: BatchDBOp[]): Promise<void> {
for (const op of ops) {
if (op.type === 'put') {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!op.value) {
throw new Error('Invalid batch db operation')
}
await this.put(op.key, op.value)
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (op.type === 'del') {
await this.del(op.key)
}
Expand Down Expand Up @@ -760,12 +694,12 @@ export class Trie {
async _findDbNodes(onFound: FoundNodeFunction): Promise<void> {
const outerOnFound: FoundNodeFunction = async (nodeRef, node, key, walkController) => {
if (isRawNode(nodeRef)) {
await walkController.next()
walkController.allChildren(node, key)
} else {
onFound(nodeRef, node, key, walkController)
}
}
await this._walkTrie(this.root, outerOnFound)
await this.walkTrie(this.root, outerOnFound)
}

/**
Expand All @@ -786,9 +720,9 @@ export class Trie {
onFound(nodeRef, node, fullKey, walkController)
} else {
// keep looking for value nodes
await walkController.next()
walkController.allChildren(node, key)
}
}
await this._walkTrie(this.root, outerOnFound)
await this.walkTrie(this.root, outerOnFound)
}
}
3 changes: 3 additions & 0 deletions src/checkpointTrie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class CheckpointTrie extends BaseTrie {
await this.lock.wait()

this._checkpoints.pop()
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!this.isCheckpoint) {
await this._exitCpMode(true)
}
Expand All @@ -71,8 +72,10 @@ export class CheckpointTrie extends BaseTrie {
*/
async revert(): Promise<void> {
await this.lock.wait()
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.isCheckpoint) {
this.root = this._checkpoints.pop()!
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!this.isCheckpoint) {
await this._exitCpMode(false)
}
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export { CheckpointTrie } from './checkpointTrie'
export { SecureTrie } from './secure'
export { Trie as BaseTrie } from './baseTrie'
import WalkStrategy from './util/walkStrategy'
export { WalkStrategy }
jochem-brouwer marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion src/readStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class TrieReadStream extends Readable {
key: nibblesToBuffer(key),
value: node.value,
})
await walkController.next()
walkController.allChildren(node, key)
})
this.push(null)
}
Expand Down
1 change: 1 addition & 0 deletions src/scratch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export class ScratchDB extends DB {
}

// If not found, try searching upstream db
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!value && this._upstream._leveldb) {
try {
value = await this._upstream._leveldb.get(key, ENCODING_OPTS)
Expand Down
2 changes: 1 addition & 1 deletion src/scratchReadStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class ScratchReadStream extends Readable {
key: nodeRef,
value: node.serialize(),
})
await walkController.next()
walkController.allChildren(node, key)
})
this.push(null)
}
Expand Down
1 change: 1 addition & 0 deletions src/secure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export class SecureTrie extends CheckpointTrie {
* @param value
*/
async put(key: Buffer, val: Buffer): Promise<void> {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!val || val.toString() === '') {
await this.del(key)
} else {
Expand Down
Loading