Skip to content

Commit

Permalink
fix(rln-relay): scope of getEvents (#1672)
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc authored Apr 18, 2023
1 parent 9616253 commit b62193e
Showing 1 changed file with 46 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type
const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password"

template initializedGuard*(g: OnchainGroupManager): untyped =
template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized:
raise newException(ValueError, "OnchainGroupManager is not initialized")

Expand Down Expand Up @@ -156,7 +156,7 @@ method withdrawBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]):

# TODO: after slashing is enabled on the contract

proc parseEvent*(event: type MemberRegistered,
proc parseEvent(event: type MemberRegistered,
log: JsonNode): GroupManagerResult[Membership] =
## parses the `data` parameter of the `MemberRegistered` event `log`
## returns an error if it cannot parse the `data` parameter
Expand Down Expand Up @@ -196,7 +196,16 @@ proc backfillRootQueue*(g: OnchainGroupManager, blockTable: BlockTable): Future[
# add the backfilled root
g.validRoots.addLast(g.validRootBuffer.popLast())

proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Membership) =
if blockTable.hasKeyOrPut(blockNumber, @[member]):
try:
blockTable[blockNumber].add(member)
except KeyError: # qed
error "could not insert member into block table", blockNumber=blockNumber, member=member

proc getRawEvents(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[JsonNode] {.async.} =
initializedGuard(g)

let ethRpc = g.ethRpc.get()
Expand All @@ -212,13 +221,24 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[
else:
normalizedToBlock = fromBlock

let events = await rlnContract.getJsonLogs(MemberRegistered,
fromBlock = some(fromBlock.blockId()),
toBlock = some(normalizedToBlock.blockId()))
return events

proc getBlockTables(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[(BlockTable, BlockTable)] {.async.} =
initializedGuard(g)

var blockTable = default(BlockTable)
var toRemoveBlockTable = default(BlockTable)

let events = await rlnContract.getJsonLogs(MemberRegistered, fromBlock = some(fromBlock.blockId()), toBlock = some(normalizedToBlock.blockId()))
let events = await g.getRawEvents(fromBlock, toBlock)

if events.len == 0:
debug "no events found"
return blockTable
return (blockTable, toRemoveBlockTable)

for event in events:
let blockNumber = parseHexInt(event["blockNumber"].getStr()).uint
Expand All @@ -232,21 +252,13 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[
if removed:
# remove the registration from the tree, per block
warn "member removed from the tree as per canonical chain", index=parsedEvent.index
if toRemoveBlockTable.hasKey(blockNumber):
toRemoveBlockTable[blockNumber].add(parsedEvent)
else:
toRemoveBlockTable[blockNumber] = @[parsedEvent]

await g.backfillRootQueue(toRemoveBlockTable)

if blockTable.hasKey(blockNumber):
blockTable[blockNumber].add(parsedEvent)
toRemoveBlockTable.insert(blockNumber, parsedEvent)
else:
blockTable[blockNumber] = @[parsedEvent]
blockTable.insert(blockNumber, parsedEvent)

return blockTable
return (blockTable, toRemoveBlockTable)

proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} =
proc handleValidEvents(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} =
initializedGuard(g)

for blockNumber, members in blockTable.pairs():
Expand All @@ -268,30 +280,36 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu

return

proc getEventsAndSeedIntoTree*(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
proc handleRemovedEvents(g: OnchainGroupManager, toRemoveBlockTable: BlockTable): Future[void] {.async.} =
initializedGuard(g)

await g.backfillRootQueue(toRemoveBlockTable)

proc getAndHandleEvents(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
initializedGuard(g)

let events = await g.getEvents(fromBlock, toBlock)
await g.seedBlockTableIntoTree(events)
let (validEvents, removedEvents) = await g.getBlockTables(fromBlock, toBlock)
await g.handleRemovedEvents(removedEvents)
await g.handleValidEvents(validEvents)
return

proc getNewHeadCallback*(g: OnchainGroupManager): BlockHeaderHandler =
proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
let latestBlock = blockheader.number.uint
debug "block received", blockNumber = latestBlock
# get logs from the last block
try:
asyncSpawn g.getEventsAndSeedIntoTree(latestBlock)
asyncSpawn g.getAndHandleEvents(latestBlock)
except CatchableError:
warn "failed to handle log: ", error=getCurrentExceptionMsg()
return newHeadCallback

proc newHeadErrCallback(error: CatchableError) =
warn "failed to get new head", error=error.msg

proc startListeningToEvents*(g: OnchainGroupManager): Future[void] {.async.} =
proc startListeningToEvents(g: OnchainGroupManager): Future[void] {.async.} =
initializedGuard(g)

let ethRpc = g.ethRpc.get()
Expand All @@ -301,11 +319,11 @@ proc startListeningToEvents*(g: OnchainGroupManager): Future[void] {.async.} =
except CatchableError:
raise newException(ValueError, "failed to subscribe to block headers: " & getCurrentExceptionMsg())

proc startOnchainSync*(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNumber(0)): Future[void] {.async.} =
proc startOnchainSync(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNumber(0)): Future[void] {.async.} =
initializedGuard(g)

try:
await g.getEventsAndSeedIntoTree(fromBlock, some(fromBlock))
await g.getAndHandleEvents(fromBlock, some(fromBlock))
except CatchableError:
raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())

Expand All @@ -315,7 +333,7 @@ proc startOnchainSync*(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNum
except CatchableError:
raise newException(ValueError, "failed to start listening to events: " & getCurrentExceptionMsg())

proc persistCredentials*(g: OnchainGroupManager): GroupManagerResult[void] =
proc persistCredentials(g: OnchainGroupManager): GroupManagerResult[void] =
if not g.saveKeystore:
return ok()
if g.idCredentials.isNone():
Expand Down

0 comments on commit b62193e

Please sign in to comment.