Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rln-relay): gracefully handle chain forks #1623

Merged
merged 5 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ else:
{.push raises: [].}

import
std/[options, osproc, streams, strutils],
std/[options, osproc, streams, strutils, tables],
stew/[results, byteutils],
stew/shims/net as stewNet,
testutils/unittests,
Expand Down Expand Up @@ -246,17 +246,20 @@ suite "Onchain group manager":

asyncTest "startGroupSync: should fetch history correctly":
let manager = await setup()
let credentials = generateCredentials(manager.rlnInstance, 5)
const credentialCount = 6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The const keyword should be only used at "package level". Here, a let is sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used const since it can be evaluated at compile time, is that okay?

let credentials = generateCredentials(manager.rlnInstance, credentialCount)
await manager.init()

let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot()
require:
merkleRootBeforeRes.isOk()
let merkleRootBefore = merkleRootBeforeRes.get()

var futures = [newFuture[void](), newFuture[void](), newFuture[void](), newFuture[void](), newFuture[void]()]

proc generateCallback(futs: array[0..4, Future[system.void]], credentials: seq[IdentityCredential]): OnRegisterCallback =
type VoidFuturesArray = array[0..credentialCount - 1, Future[void]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather avoid defining types in the middle of a test. I think the right place for type aliases and new types is the "package level".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type would be dependant on the credentialCount, which is set at the top of the test. Would you just prefer I hardcode it, or not use type aliases altogether?

var futures: array[0..credentialCount - 1, Future[void]]
for i in 0 ..< futures.len():
futures[i] = newFuture[void]()
proc generateCallback(futs: VoidFuturesArray, credentials: seq[IdentityCredential]): OnRegisterCallback =
var futureIndex = 0
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
if registrations.len == 1 and
Expand All @@ -281,6 +284,7 @@ suite "Onchain group manager":

check:
merkleRootBefore != merkleRootAfter
manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize

asyncTest "register: should guard against uninitialized state":
let manager = await setup()
Expand Down Expand Up @@ -477,6 +481,54 @@ suite "Onchain group manager":
check:
verifiedRes.get() == false

asyncTest "backfillRootQueue: should backfill roots in event of chain reorg":
let manager = await setup()
const credentialCount = 6
let credentials = generateCredentials(manager.rlnInstance, credentialCount)
await manager.init()

type VoidFuturesArray = array[0..credentialCount - 1, Future[void]]
var futures: array[0..credentialCount - 1, Future[void]]
for i in 0 ..< futures.len():
futures[i] = newFuture[void]()

proc generateCallback(futs: VoidFuturesArray, credentials: seq[IdentityCredential]): OnRegisterCallback =
var futureIndex = 0
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
if registrations.len == 1 and
registrations[0].idCommitment == credentials[futureIndex].idCommitment and
registrations[0].index == MembershipIndex(futureIndex + 1):
futs[futureIndex].complete()
futureIndex += 1
return callback

manager.onRegister(generateCallback(futures, credentials))
await manager.startGroupSync()

for i in 0 ..< credentials.len():
await manager.register(credentials[i])

await allFutures(futures)

# At this point, we should have a full root queue, 5 roots, and partial buffer of 1 root
require:
manager.validRoots.len() == credentialCount - 1
manager.validRootBuffer.len() == 1

# We can now simulate a chain reorg by calling backfillRootQueue
var blockTable = default(BlockTable)
blockTable[1.uint] = @[Membership(idCommitment: credentials[4].idCommitment, index: 4.uint)]

let expectedLastRoot = manager.validRootBuffer[0]
await manager.backfillRootQueue(blockTable)

# We should now have 5 roots in the queue, and no partial buffer
check:
manager.validRoots.len() == credentialCount - 1
manager.validRootBuffer.len() == 0
manager.validRoots[credentialCount - 2] == expectedLastRoot


################################
## Terminating/removing Ganache
################################
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import
../protocol_types,
../constants,
../rln
import
options,
Expand Down Expand Up @@ -87,18 +88,18 @@ method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretH
method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} =
g.withdrawCb = some(cb)

# Acceptable roots for merkle root validation of incoming messages
const AcceptableRootWindowSize* = 5

proc updateValidRootQueue*(rootQueue: var Deque[MerkleNode], root: MerkleNode): void =
proc slideRootQueue*(rootQueue: var Deque[MerkleNode], root: MerkleNode): seq[MerkleNode] =
## updates the root queue with the latest root and pops the oldest one when the capacity of `AcceptableRootWindowSize` is reached
let overflowCount = rootQueue.len() - AcceptableRootWindowSize
if overflowCount >= 0:
# Delete the oldest `overflowCount` elements in the deque (index 0..`overflowCount`)
for i in 0..overflowCount:
rootQueue.popFirst()
let overflowCount = rootQueue.len() - AcceptableRootWindowSize + 1
var overflowedRoots = newSeq[MerkleNode]()
if overflowCount > 0:
# Delete the oldest `overflowCount` roots in the deque (index 0..`overflowCount`)
# insert into overflowedRoots seq and return
for i in 0 ..< overflowCount:
overFlowedRoots.add(rootQueue.popFirst())
# Push the next root into the queue
rootQueue.addLast(root)
return overFlowedRoots

method indexOfRoot*(g: GroupManager, root: MerkleNode): int {.base,gcsafe,raises:[].} =
## returns the index of the root in the merkle tree.
Expand All @@ -112,12 +113,18 @@ method validateRoot*(g: GroupManager, root: MerkleNode): bool {.base,gcsafe,rais
return true
return false

template updateValidRootQueue*(g: GroupManager) =
template slideRootQueue*(g: GroupManager): untyped =
let rootRes = g.rlnInstance.getMerkleRoot()
if rootRes.isErr():
raise newException(ValueError, "failed to get merkle root")
let rootAfterUpdate = rootRes.get()
updateValidRootQueue(g.validRoots, rootAfterUpdate)

var rootBuffer: Deque[MerkleNode]
let overflowedRoots = slideRootQueue(g.validRoots, rootAfterUpdate)
if overflowedRoots.len > 0:
for root in overflowedRoots:
discard rootBuffer.slideRootQueue(root)
rootBuffer

method verifyProof*(g: GroupManager,
input: openArray[byte],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type
keystorePassword*: Option[string]
saveKeystore*: bool
registrationHandler*: Option[RegistrationHandler]
# this buffer exists to backfill appropriate roots for the merkle tree,
# in event of a reorg. we store 5 in the buffer. Maybe need to revisit this,
# because the average reorg depth is 1 to 2 blocks.
validRootBuffer*: Deque[MerkleNode]

const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password"
Expand All @@ -70,7 +74,7 @@ method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[voi
if g.registerCb.isSome():
await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)])

g.updateValidRootQueue()
g.validRootBuffer = g.slideRootQueue()

g.latestIndex += 1

Expand All @@ -92,7 +96,7 @@ method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]):
membersSeq.add(member)
await g.registerCb.get()(membersSeq)

g.updateValidRootQueue()
g.validRootBuffer = g.slideRootQueue()

g.latestIndex += MembershipIndex(idCommitments.len())

Expand Down Expand Up @@ -175,6 +179,22 @@ proc parseEvent*(event: type MemberRegistered,

type BlockTable* = OrderedTable[BlockNumber, seq[Membership]]

proc backfillRootQueue*(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} =
if blocktable.len() > 0:
for blockNumber, members in blocktable.pairs():
let deletionSuccess = g.rlnInstance.removeMembers(members.mapIt(it.index))
debug "deleting members to reconcile state"
if not deletionSuccess:
error "failed to delete members from the tree", success=deletionSuccess
raise newException(ValueError, "failed to delete member from the tree, tree is inconsistent")
# backfill the tree's acceptable roots
for i in 0..blocktable.len()-1:
# remove the last root
g.validRoots.popLast()
for i in 0..blockTable.len()-1:
# add the backfilled root
g.validRoots.addLast(g.validRootBuffer.popLast())

proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
rymnc marked this conversation as resolved.
Show resolved Hide resolved
initializedGuard(g)

Expand All @@ -192,19 +212,32 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[
normalizedToBlock = fromBlock

var blockTable = default(BlockTable)
var toRemoveBlockTable = default(BlockTable)

let events = await rlnContract.getJsonLogs(MemberRegistered, fromBlock = some(fromBlock.blockId()), toBlock = some(normalizedToBlock.blockId()))
if events.len == 0:
debug "no events found"
return blockTable

for event in events:
let blockNumber = parseHexInt(event["blockNumber"].getStr()).uint
let removed = event["removed"].getBool()
let parsedEventRes = parseEvent(MemberRegistered, event)
if parsedEventRes.isErr():
error "failed to parse the MemberRegistered event", error=parsedEventRes.error()
raise newException(ValueError, "failed to parse the MemberRegistered event")
let parsedEvent = parsedEventRes.get()

if removed:
# remove the registration from the tree, per block
warn "member removed from the tree as per canonical chain", index=parsedEvent.index
if toRemoveBlockTable.hasKey(blockNumber):
toRemoveBlockTable[blockNumber].add(parsedEvent)
else:
toRemoveBlockTable[blockNumber] = @[parsedEvent]

await g.backfillRootQueue(toRemoveBlockTable)

if blockTable.hasKey(blockNumber):
blockTable[blockNumber].add(parsedEvent)
else:
Expand All @@ -220,8 +253,8 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu
let startingIndex = members[0].index
try:
await g.registerBatch(members.mapIt(it.idCommitment))
except:
error "failed to insert members into the tree"
except CatchableError:
error "failed to insert members into the tree", error=getCurrentExceptionMsg()
raise newException(ValueError, "failed to insert members into the tree")
debug "new members added to the Merkle tree", commitments=members.mapIt(it.idCommitment.inHex()) , startingIndex=startingIndex
let lastIndex = startingIndex + members.len.uint - 1
Expand All @@ -234,7 +267,9 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu

return

proc getEventsAndSeedIntoTree*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
proc getEventsAndSeedIntoTree*(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
initializedGuard(g)

let events = await g.getEvents(fromBlock, toBlock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ method init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} =
if not membersInserted:
raise newException(ValueError, "Failed to insert members into the merkle tree")

g.updateValidRootQueue()
discard g.slideRootQueue()

g.latestIndex += MembershipIndex(idCommitments.len() - 1)

Expand All @@ -56,7 +56,7 @@ method register*(g: StaticGroupManager, idCommitment: IDCommitment): Future[void
if not memberInserted:
raise newException(ValueError, "Failed to insert member into the merkle tree")

g.updateValidRootQueue()
discard g.slideRootQueue()

g.latestIndex += 1

Expand All @@ -77,7 +77,7 @@ method registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]):
memberSeq.add(Membership(idCommitment: idCommitments[i], index: g.latestIndex + MembershipIndex(i)))
await g.registerCb.get()(memberSeq)

g.updateValidRootQueue()
discard g.slideRootQueue()

g.latestIndex += MembershipIndex(idCommitments.len() - 1)

Expand Down
7 changes: 7 additions & 0 deletions waku/v2/protocol/waku_rln_relay/rln/wrappers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ proc removeMember*(rlnInstance: ptr RLN, index: MembershipIndex): bool =
let deletion_success = delete_member(rlnInstance, index)
return deletion_success

proc removeMembers*(rlnInstance: ptr RLN, indices: seq[MembershipIndex]): bool =
for index in indices:
let deletion_success = delete_member(rlnInstance, index)
if not deletion_success:
return false
return true
Comment on lines +256 to +261
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be an atomic operation in zerokit soon


proc getMerkleRoot*(rlnInstance: ptr RLN): MerkleNodeResult =
# read the Merkle Tree root after insertion
var
Expand Down