From b7c71b9c81706a0564fe8cd908ca5c6b887b6505 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Mon, 4 Sep 2023 18:43:17 +0530 Subject: [PATCH 1/5] chore(rln-relay): add isReady check --- waku/node/waku_node.nim | 12 ++++++++++-- .../group_manager/group_manager_base.nim | 1 + .../group_manager/on_chain/group_manager.nim | 4 ++++ .../group_manager/static/group_manager.nim | 1 + waku/waku_rln_relay/rln_relay.nim | 8 ++++++++ 5 files changed, 24 insertions(+), 2 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 5df6200a19..c7724c6c86 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -732,8 +732,8 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message when defined(rln): proc mountRlnRelay*(node: WakuNode, rlnConf: WakuRlnConfig, - spamHandler: Option[SpamHandler] = none(SpamHandler), - registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)) {.async.} = + spamHandler = none(SpamHandler), + registrationHandler = none(RegistrationHandler)) {.async.} = info "mounting rln relay" if node.wakuRelay.isNil(): @@ -903,3 +903,11 @@ proc stop*(node: WakuNode) {.async.} = await node.wakuRlnRelay.stop() node.started = false + +proc isReady*(node: WakuNode): bool = + when defined(rln): + if node.wakuRlnRelay == nil: + return false + return node.wakuRlnRelay.isReady() + ## TODO: add other protocol `isReady` checks + return true diff --git a/waku/waku_rln_relay/group_manager/group_manager_base.nim b/waku/waku_rln_relay/group_manager/group_manager_base.nim index 0dc1ceb307..79bf2835bb 100644 --- a/waku/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/waku_rln_relay/group_manager/group_manager_base.nim @@ -41,6 +41,7 @@ type initialized*: bool latestIndex*: MembershipIndex validRoots*: Deque[MerkleNode] + isReady*: bool # This proc is used to initialize the group manager # Any initialization logic should be implemented here diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 56c82cb27b..b155db5bca 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -388,6 +388,8 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} = # listen to blockheaders and contract events try: await g.startListeningToEvents() + # rln is ready now + g.isReady = true except CatchableError: raise newException(ValueError, "failed to start listening to events: " & getCurrentExceptionMsg()) @@ -502,6 +504,7 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber) ethRpc.ondisconnect = proc() = + g.isReady = false error "Ethereum client disconnected" let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock @@ -528,3 +531,4 @@ method stop*(g: OnchainGroupManager): Future[void] {.async.} = error "failed to flush to the tree db" g.initialized = false + g.isReady = false diff --git a/waku/waku_rln_relay/group_manager/static/group_manager.nim b/waku/waku_rln_relay/group_manager/static/group_manager.nim index c2ccae8c21..0b0af9cec3 100644 --- a/waku/waku_rln_relay/group_manager/static/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/static/group_manager.nim @@ -36,6 +36,7 @@ method init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} = g.latestIndex += MembershipIndex(idCommitments.len() - 1) g.initialized = true + g.isReady = true return diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 887e28d00f..6f20ce36f7 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -400,6 +400,14 @@ proc mount(conf: WakuRlnConfig, return WakuRLNRelay(groupManager: groupManager, messageBucket: messageBucket) +proc isReady*(rlnPeer: WakuRLNRelay): bool = + ## returns true if the rln-relay protocol is ready to relay messages + ## returns false otherwise + + # could be nil during startup + if rlnPeer.groupManager == nil: + return false + return rlnPeer.groupManager.isReady proc new*(T: type WakuRlnRelay, conf: WakuRlnConfig, From cbb266ae49a217c037243703e061ccf037987fa1 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 5 Sep 2023 18:55:34 +0530 Subject: [PATCH 2/5] fix(rln-relay): multiple parameters for checking if node is in sync --- .../test_rln_group_manager_onchain.nim | 30 ++++++++++++++ waku/node/waku_node.nim | 4 +- .../group_manager/group_manager_base.nim | 4 +- .../group_manager/on_chain/group_manager.nim | 39 ++++++++++++++++--- .../group_manager/static/group_manager.nim | 7 +++- waku/waku_rln_relay/rln_relay.nim | 8 +++- 6 files changed, 80 insertions(+), 12 deletions(-) diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index 8e63513ebb..68f9f5ab36 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -555,6 +555,36 @@ suite "Onchain group manager": manager.validRootBuffer.len() == 0 manager.validRoots[credentialCount - 2] == expectedLastRoot + asyncTest "isReady should return false if ethRpc is none": + var manager = await setup() + await manager.init() + + manager.ethRpc = none(Web3) + + check: + (await manager.isReady()) == false + + asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed": + var manager = await setup() + await manager.init() + + manager.lastSeenBlockHead = 10 + manager.latestProcessedBlock = 5 + + check: + (await manager.isReady()) == false + + asyncTest "isReady should return true if ethRpc is not syncing": + # not syncing implies the node is ready + var manager = await setup() + await manager.init() + + manager.latestProcessedBlock = 10 + manager.lastSeenBlockHead = 10 + + check: + (await manager.isReady()) == true + ################################ ## Terminating/removing Ganache diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index c7724c6c86..1b127902b6 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -904,10 +904,10 @@ proc stop*(node: WakuNode) {.async.} = node.started = false -proc isReady*(node: WakuNode): bool = +proc isReady*(node: WakuNode): Future[bool] {.async.} = when defined(rln): if node.wakuRlnRelay == nil: return false - return node.wakuRlnRelay.isReady() + return await node.wakuRlnRelay.isReady() ## TODO: add other protocol `isReady` checks return true diff --git a/waku/waku_rln_relay/group_manager/group_manager_base.nim b/waku/waku_rln_relay/group_manager/group_manager_base.nim index 79bf2835bb..e54606e7f5 100644 --- a/waku/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/waku_rln_relay/group_manager/group_manager_base.nim @@ -41,7 +41,6 @@ type initialized*: bool latestIndex*: MembershipIndex validRoots*: Deque[MerkleNode] - isReady*: bool # This proc is used to initialize the group manager # Any initialization logic should be implemented here @@ -163,3 +162,6 @@ method generateProof*(g: GroupManager, if proofGenRes.isErr(): return err("proof generation failed: " & $proofGenRes.error()) return ok(proofGenRes.value()) + +method isReady*(g: GroupManager): Future[bool] {.base,gcsafe.} = + raise newException(CatchableError, "isReady proc for " & $g.type & " is not implemented yet") diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index b155db5bca..4673096e90 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -73,6 +73,8 @@ type # in event of a reorg. we store 5 in the buffer. Maybe need to revisit this, # because the average reorg depth is 1 to 2 blocks. validRootBuffer*: Deque[MerkleNode] + # this variable tracks the last seen head + lastSeenBlockHead*: BlockNumber const DefaultKeyStorePath* = "rlnKeystore.json" const DefaultKeyStorePassword* = "password" @@ -313,13 +315,18 @@ proc getAndHandleEvents(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} = initializedGuard(g) + proc getLatestBlockNumber(): BlockNumber = + if toBlock.isSome(): + return toBlock.get() + return fromBlock let blockTable = await g.getBlockTable(fromBlock, toBlock) + if blockTable.len() > 0: + g.lastSeenBlockHead = getLatestBlockNumber() await g.handleEvents(blockTable) await g.handleRemovedEvents(blockTable) - g.latestProcessedBlock = if toBlock.isSome(): toBlock.get() - else: fromBlock + g.latestProcessedBlock = getLatestBlockNumber() let metadataSetRes = g.setMetadata() if metadataSetRes.isErr(): # this is not a fatal error, hence we don't raise an exception @@ -388,8 +395,6 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} = # listen to blockheaders and contract events try: await g.startListeningToEvents() - # rln is ready now - g.isReady = true except CatchableError: raise newException(ValueError, "failed to start listening to events: " & getCurrentExceptionMsg()) @@ -504,7 +509,6 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber) ethRpc.ondisconnect = proc() = - g.isReady = false error "Ethereum client disconnected" let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock @@ -531,4 +535,27 @@ method stop*(g: OnchainGroupManager): Future[void] {.async.} = error "failed to flush to the tree db" g.initialized = false - g.isReady = false + +proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} = + let ethRpc = g.ethRpc.get() + + try: + let syncing = await ethRpc.provider.eth_syncing() + return syncing.getBool() + except CatchableError: + error "failed to get the syncing status", error = getCurrentExceptionMsg() + return false + +method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} = + initializedGuard(g) + + if g.ethRpc.isNone(): + return false + + if g.lastSeenBlockHead == 0: + return false + + if g.latestProcessedBlock < g.lastSeenBlockHead: + return false + + return not (await g.isSyncing()) \ No newline at end of file diff --git a/waku/waku_rln_relay/group_manager/static/group_manager.nim b/waku/waku_rln_relay/group_manager/static/group_manager.nim index 0b0af9cec3..9aa81d952e 100644 --- a/waku/waku_rln_relay/group_manager/static/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/static/group_manager.nim @@ -36,7 +36,6 @@ method init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} = g.latestIndex += MembershipIndex(idCommitments.len() - 1) g.initialized = true - g.isReady = true return @@ -110,3 +109,9 @@ method stop*(g: StaticGroupManager): Future[void] = var retFut = newFuture[void]("StaticGroupManager.stop") retFut.complete() return retFut + +method isReady*(g: StaticGroupManager): Future[bool] {.gcsafe.} = + initializedGuard(g) + var retFut = newFuture[bool]("StaticGroupManager.isReady") + retFut.complete(true) + return retFut diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 6f20ce36f7..e38a234752 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -400,14 +400,18 @@ proc mount(conf: WakuRlnConfig, return WakuRLNRelay(groupManager: groupManager, messageBucket: messageBucket) -proc isReady*(rlnPeer: WakuRLNRelay): bool = +proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async.} = ## returns true if the rln-relay protocol is ready to relay messages ## returns false otherwise # could be nil during startup if rlnPeer.groupManager == nil: return false - return rlnPeer.groupManager.isReady + try: + return await rlnPeer.groupManager.isReady() + except CatchableError: + error "could not check if the rln-relay protocol is ready", err = getCurrentExceptionMsg() + return false proc new*(T: type WakuRlnRelay, conf: WakuRlnConfig, From 3c9d4bef4e4bc869a5f54c37f3e605a01552003b Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 5 Sep 2023 20:53:41 +0530 Subject: [PATCH 3/5] fix: set latesthead in newHeadCallback --- waku/waku_rln_relay/group_manager/on_chain/group_manager.nim | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 4673096e90..cbdec4d2f0 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -320,9 +320,7 @@ proc getAndHandleEvents(g: OnchainGroupManager, return toBlock.get() return fromBlock - let blockTable = await g.getBlockTable(fromBlock, toBlock) - if blockTable.len() > 0: - g.lastSeenBlockHead = getLatestBlockNumber() + let blockTable = await g.getBlockTable(fromBlock, toBlock) await g.handleEvents(blockTable) await g.handleRemovedEvents(blockTable) @@ -337,6 +335,7 @@ proc getAndHandleEvents(g: OnchainGroupManager, proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler = proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} = let latestBlock = blockheader.number.uint + g.lastSeenBlockHead = latestBlock trace "block received", blockNumber = latestBlock # get logs from the last block try: From 15bfde443e43bca894d3c951d404f549ab64b39d Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Wed, 6 Sep 2023 13:13:06 +0530 Subject: [PATCH 4/5] fix: explicit rpc call --- .../test_rln_group_manager_onchain.nim | 11 +++------- .../group_manager/on_chain/group_manager.nim | 20 +++++++++++-------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index 68f9f5ab36..600b3e1b72 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -568,20 +568,15 @@ suite "Onchain group manager": var manager = await setup() await manager.init() - manager.lastSeenBlockHead = 10 - manager.latestProcessedBlock = 5 - check: (await manager.isReady()) == false - asyncTest "isReady should return true if ethRpc is not syncing": - # not syncing implies the node is ready + asyncTest "isReady should return true if ethRpc is ready": var manager = await setup() await manager.init() + # node can only be ready after group sync is done + await manager.startGroupSync() - manager.latestProcessedBlock = 10 - manager.lastSeenBlockHead = 10 - check: (await manager.isReady()) == true diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index cbdec4d2f0..b16c7024aa 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -73,8 +73,6 @@ type # in event of a reorg. we store 5 in the buffer. Maybe need to revisit this, # because the average reorg depth is 1 to 2 blocks. validRootBuffer*: Deque[MerkleNode] - # this variable tracks the last seen head - lastSeenBlockHead*: BlockNumber const DefaultKeyStorePath* = "rlnKeystore.json" const DefaultKeyStorePassword* = "password" @@ -317,7 +315,12 @@ proc getAndHandleEvents(g: OnchainGroupManager, initializedGuard(g) proc getLatestBlockNumber(): BlockNumber = if toBlock.isSome(): - return toBlock.get() + # if toBlock = 0, that implies the latest block + # which is the case when we are syncing block-by-block + # therefore, toBlock = fromBlock + 1 + # if toBlock != 0, then we are chunking blocks + # therefore, toBlock = fromBlock + blockChunkSize (which is handled) + return max(fromBlock + 1, toBlock.get()) return fromBlock let blockTable = await g.getBlockTable(fromBlock, toBlock) @@ -335,7 +338,6 @@ proc getAndHandleEvents(g: OnchainGroupManager, proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler = proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} = let latestBlock = blockheader.number.uint - g.lastSeenBlockHead = latestBlock trace "block received", blockNumber = latestBlock # get logs from the last block try: @@ -479,7 +481,6 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = let metadataGetRes = g.rlnInstance.getMetadata() if metadataGetRes.isErr(): warn "could not initialize with persisted rln metadata" - g.latestProcessedBlock = BlockNumber(0) else: let metadata = metadataGetRes.get() if metadata.chainId != uint64(g.chainId.get()): @@ -506,6 +507,7 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = raise newException(ValueError, "could not get the deployed block number: " & getCurrentExceptionMsg()) g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber) + g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) ethRpc.ondisconnect = proc() = error "Ethereum client disconnected" @@ -551,10 +553,12 @@ method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} = if g.ethRpc.isNone(): return false - if g.lastSeenBlockHead == 0: - return false + let currentBlock = cast[BlockNumber](await g.ethRpc + .get() + .provider + .eth_blockNumber()) - if g.latestProcessedBlock < g.lastSeenBlockHead: + if g.latestProcessedBlock < currentBlock: return false return not (await g.isSyncing()) \ No newline at end of file From b058c97fc745835893b1d13d82d11e88e167f71a Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Wed, 6 Sep 2023 13:46:07 +0530 Subject: [PATCH 5/5] fix: unhandled exception --- .../group_manager/on_chain/group_manager.nim | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index b16c7024aa..f5375f0066 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -553,10 +553,15 @@ method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} = if g.ethRpc.isNone(): return false - let currentBlock = cast[BlockNumber](await g.ethRpc - .get() - .provider - .eth_blockNumber()) + var currentBlock: BlockNumber + try: + currentBlock = cast[BlockNumber](await g.ethRpc + .get() + .provider + .eth_blockNumber()) + except CatchableError: + error "failed to get the current block number", error = getCurrentExceptionMsg() + return false if g.latestProcessedBlock < currentBlock: return false