Skip to content

Commit

Permalink
fix(rln-relay): sync from deployed block number (#1955)
Browse files Browse the repository at this point in the history
* fix(rln-relay): sync from deployed block number

* fix(rln-relay): remove option usage for ints

* fix: unnecessary decl
  • Loading branch information
rymnc authored Aug 29, 2023
1 parent 04df6d1 commit bd3be21
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 23 deletions.
1 change: 1 addition & 0 deletions tests/waku_rln_relay/test_rln_group_manager_onchain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ suite "Onchain group manager":
manager.rlnContract.isSome()
manager.membershipFee.isSome()
manager.initialized
manager.rlnContractDeployedBlockNumber > 0

asyncTest "should error on initialization when loaded metadata does not match":
let manager = await setup()
Expand Down
54 changes: 31 additions & 23 deletions waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ contract(RlnStorage):
proc MEMBERSHIP_DEPOSIT(): Uint256 {.pure.}
proc members(idCommitment: Uint256): Uint256 {.view.}
proc idCommitmentIndex(): Uint256 {.view.}
proc deployedBlockNumber(): Uint256 {.view.}

type
RegistryContractWithSender = Sender[WakuRlnRegistry]
Expand All @@ -49,10 +50,11 @@ type
ethContractAddress*: string
ethRpc*: Option[Web3]
rlnContract*: Option[RlnContractWithSender]
rlnContractDeployedBlockNumber*: BlockNumber
registryContract*: Option[RegistryContractWithSender]
usingStorageIndex: Option[Uint16]
membershipFee*: Option[Uint256]
latestProcessedBlock*: Option[BlockNumber]
latestProcessedBlock*: BlockNumber
registrationTxHash*: Option[TxHash]
chainId*: Option[Quantity]
keystorePath*: Option[string]
Expand All @@ -68,20 +70,15 @@ type
const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password"

const DecayFactor* = 1.2
const DefaultChunkSize* = 1000

template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized:
raise newException(ValueError, "OnchainGroupManager is not initialized")


proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] =
if g.latestProcessedBlock.isNone():
return err("latest processed block is not set")
try:
let metadataSetRes = g.rlnInstance.setMetadata(RlnMetadata(
lastProcessedBlock: g.latestProcessedBlock.get(),
lastProcessedBlock: g.latestProcessedBlock,
chainId: uint64(g.chainId.get()),
contractAddress: g.ethContractAddress,
validRoots: g.validRoots.toSeq()))
Expand Down Expand Up @@ -314,15 +311,14 @@ proc getAndHandleEvents(g: OnchainGroupManager,
await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)

let latestProcessedBlock = if toBlock.isSome(): toBlock.get()
g.latestProcessedBlock = if toBlock.isSome(): toBlock.get()
else: fromBlock
g.latestProcessedBlock = some(latestProcessedBlock)
let metadataSetRes = g.setMetadata()
if metadataSetRes.isErr():
# this is not a fatal error, hence we don't raise an exception
warn "failed to persist rln metadata", error=metadataSetRes.error()
else:
trace "rln metadata persisted", blockNumber = latestProcessedBlock
trace "rln metadata persisted", blockNumber = g.latestProcessedBlock

proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
Expand Down Expand Up @@ -353,16 +349,15 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =

let ethRpc = g.ethRpc.get()

# the block chunk size decays exponentially with the number of blocks
# the minimum chunk size is 100
var blockChunkSize = 1_000_000
# static block chunk size
let blockChunkSize = 2_000

var fromBlock = if g.latestProcessedBlock.isSome():
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get()
g.latestProcessedBlock.get() + 1
var fromBlock = if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber:
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock
g.latestProcessedBlock + 1
else:
info "starting onchain sync from scratch"
BlockNumber(0)
info "starting onchain sync from deployed block number", deployedBlockNumber = g.rlnContractDeployedBlockNumber
g.rlnContractDeployedBlockNumber

let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
try:
Expand All @@ -378,8 +373,6 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
fromBlock = toBlock + 1
if fromBlock >= currentLatestBlock:
break
let newChunkSize = float(blockChunkSize) / DecayFactor
blockChunkSize = max(int(newChunkSize), DefaultChunkSize)
else:
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
except CatchableError:
Expand Down Expand Up @@ -457,15 +450,15 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
let metadataGetRes = g.rlnInstance.getMetadata()
if metadataGetRes.isErr():
warn "could not initialize with persisted rln metadata"
g.latestProcessedBlock = some(BlockNumber(0))
g.latestProcessedBlock = BlockNumber(0)
else:
let metadata = metadataGetRes.get()
if metadata.chainId != uint64(g.chainId.get()):
raise newException(ValueError, "persisted data: chain id mismatch")

if metadata.contractAddress != g.ethContractAddress.toLower():
raise newException(ValueError, "persisted data: contract address mismatch")
g.latestProcessedBlock = some(metadata.lastProcessedBlock)
g.latestProcessedBlock = metadata.lastProcessedBlock
g.validRoots = metadata.validRoots.toDeque()

# check if the contract exists by calling a static function
Expand All @@ -477,10 +470,25 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
"could not get the membership deposit: " & getCurrentExceptionMsg())
g.membershipFee = some(membershipFee)

var deployedBlockNumber: Uint256
try:
deployedBlockNumber = await rlnContract.deployedBlockNumber().call()
except CatchableError:
raise newException(ValueError,
"could not get the deployed block number: " & getCurrentExceptionMsg())
g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)

ethRpc.ondisconnect = proc() =
error "Ethereum client disconnected"
let fromBlock = g.latestProcessedBlock.get()
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
try:
let newEthRpc = waitFor newWeb3(g.ethClientUrl)
newEthRpc.ondisconnect = ethRpc.ondisconnect
g.ethRpc = some(newEthRpc)
except CatchableError:
error "failed to reconnect with the Ethereum client", error = getCurrentExceptionMsg()
return
try:
asyncSpawn g.startOnchainSync()
except CatchableError:
Expand Down

0 comments on commit bd3be21

Please sign in to comment.