From 9aacfa130e8d5c4b93e8ef4c887f4f9e8e30c665 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Mon, 22 Jan 2024 14:57:30 +0530 Subject: [PATCH 1/3] chore(rln-relay): remove websocket from OnchainGroupManager --- .../test_rln_group_manager_onchain.nim | 19 ++--- waku/waku_rln_relay/constants.nim | 2 +- .../group_manager/on_chain/group_manager.nim | 82 +++++++++++++------ .../group_manager/on_chain/retry_wrapper.nim | 5 +- 4 files changed, 69 insertions(+), 39 deletions(-) diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index fd697cd3ad..2dc70277cf 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -232,9 +232,9 @@ suite "Onchain group manager": try: await manager.startGroupSync() - except ValueError: + except CatchableError: assert true - except Exception, CatchableError: + except Exception: assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() await manager.stop() @@ -330,9 +330,9 @@ suite "Onchain group manager": try: await manager.register(dummyCommitment) - except ValueError: + except CatchableError: assert true - except Exception, CatchableError: + except Exception: assert false, "exception raised: " & getCurrentExceptionMsg() await manager.stop() @@ -399,9 +399,9 @@ suite "Onchain group manager": try: await manager.withdraw(idSecretHash) - except ValueError: + except CatchableError: assert true - except Exception, CatchableError: + except Exception: assert false, "exception raised: " & getCurrentExceptionMsg() await manager.stop() @@ -627,7 +627,7 @@ suite "Onchain group manager": await manager.stop() asyncTest "isReady should return false if ethRpc is none": - var manager = await setup() + let manager = await setup() await manager.init() manager.ethRpc = none(Web3) @@ -644,7 +644,7 @@ suite "Onchain group manager": await manager.stop() asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed": - var manager = await setup() + let manager = await setup() await manager.init() var isReady = true @@ -659,14 +659,13 @@ suite "Onchain group manager": await manager.stop() asyncTest "isReady should return true if ethRpc is ready": - var manager = await setup() + let manager = await setup() await manager.init() # node can only be ready after group sync is done try: await manager.startGroupSync() except Exception, CatchableError: assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() - var isReady = false try: isReady = await manager.isReady() diff --git a/waku/waku_rln_relay/constants.nim b/waku/waku_rln_relay/constants.nim index 8994c6df47..81cfe94490 100644 --- a/waku/waku_rln_relay/constants.nim +++ b/waku/waku_rln_relay/constants.nim @@ -18,7 +18,7 @@ const MembershipFee* = 1000000000000000.u256 # the current implementation of the rln lib supports a circuit for Merkle tree with depth 20 MerkleTreeDepth* = 20 - EthClient* = "ws://127.0.0.1:8540" + EthClient* = "http://127.0.0.1:8540" const # the size of poseidon hash output in bits diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 67c9c61876..e54b22ac37 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -74,13 +74,17 @@ type # in event of a reorg. we store 5 in the buffer. Maybe need to revisit this, # because the average reorg depth is 1 to 2 blocks. validRootBuffer*: Deque[MerkleNode] + # interval loop to shut down gracefully + blockFetchingActive*: bool const DefaultKeyStorePath* = "rlnKeystore.json" const DefaultKeyStorePassword* = "password" +const DefaultBlockPollRate* = 1.seconds + template initializedGuard(g: OnchainGroupManager): untyped = if not g.initialized: - raise newException(ValueError, "OnchainGroupManager is not initialized") + raise newException(CatchableError, "OnchainGroupManager is not initialized") proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] = @@ -316,12 +320,15 @@ proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable): proc getAndHandleEvents(g: OnchainGroupManager, fromBlock: BlockNumber, - toBlock: BlockNumber): Future[void] {.async: (raises: [Exception]).} = + toBlock: BlockNumber): Future[bool] {.async: (raises: [Exception]).} = initializedGuard(g) - let blockTable = await g.getBlockTable(fromBlock, toBlock) - await g.handleEvents(blockTable) - await g.handleRemovedEvents(blockTable) + try: + await g.handleEvents(blockTable) + await g.handleRemovedEvents(blockTable) + except CatchableError: + error "failed to handle events", error=getCurrentExceptionMsg() + raise newException(ValueError, "failed to handle events") g.latestProcessedBlock = toBlock let metadataSetRes = g.setMetadata() @@ -330,32 +337,49 @@ proc getAndHandleEvents(g: OnchainGroupManager, warn "failed to persist rln metadata", error=metadataSetRes.error() else: trace "rln metadata persisted", blockNumber = g.latestProcessedBlock + + return true + +proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void = + g.blockFetchingActive = false -proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler = - proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} = - let latestBlock = BlockNumber(blockheader.number) - trace "block received", blockNumber = latestBlock - # get logs from the last block - try: - # inc by 1 to prevent double processing - let fromBlock = g.latestProcessedBlock + 1 - asyncSpawn g.getAndHandleEvents(fromBlock, latestBlock) - except CatchableError: - warn "failed to handle log: ", error=getCurrentExceptionMsg() - return newHeadCallback - -proc newHeadErrCallback(error: CatchableError) = - warn "failed to get new head", error=error.msg + proc runIntervalLoop() {.async, gcsafe.} = + g.blockFetchingActive = true + + while g.blockFetchingActive: + var retCb: bool + retryWrapper(retCb, RetryStrategy.new(), "Failed to run the interval loop"): + await cb() + await sleepAsync(interval) + + asyncSpawn runIntervalLoop() + + +proc getNewBlockCallback(g: OnchainGroupManager): proc = + let ethRpc = g.ethRpc.get() + proc wrappedCb(): Future[bool] {.async, gcsafe.} = + var latestBlock: BlockNumber + retryWrapper(latestBlock, RetryStrategy.new(), "Failed to get the latest block number"): + cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + + if latestBlock <= g.latestProcessedBlock: + return + # get logs from the last block + # inc by 1 to prevent double processing + let fromBlock = g.latestProcessedBlock + 1 + var handleBlockRes: bool + retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle new block"): + await g.getAndHandleEvents(fromBlock, latestBlock) + return true + return wrappedCb proc startListeningToEvents(g: OnchainGroupManager): Future[void] {.async: (raises: [Exception]).} = initializedGuard(g) let ethRpc = g.ethRpc.get() - let newHeadCallback = g.getNewHeadCallback() - var blockHeaderSub: Subscription - retryWrapper(blockHeaderSub, RetryStrategy.new(), "Failed to subscribe to block headers"): - await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback) + let newBlockCallback = g.getNewBlockCallback() + g.runInInterval(newBlockCallback, DefaultBlockPollRate) proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async: (raises: [Exception]).} = @@ -385,7 +409,9 @@ proc startOnchainSync(g: OnchainGroupManager): let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) debug "fetching events", fromBlock = fromBlock, toBlock = toBlock - await g.getAndHandleEvents(fromBlock, toBlock) + var handleBlockRes: bool + retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle old blocks"): + await g.getAndHandleEvents(fromBlock, toBlock) fromBlock = toBlock + 1 except CatchableError: @@ -523,13 +549,15 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = error "failed to restart group sync", error = getCurrentExceptionMsg() ethRpc.ondisconnect = proc() = - asyncCheck onDisconnect() + asyncSpawn onDisconnect() waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) g.initialized = true -method stop*(g: OnchainGroupManager): Future[void] {.async.} = +method stop*(g: OnchainGroupManager): Future[void] {.async,gcsafe.} = + g.blockFetchingActive = false + if g.ethRpc.isSome(): g.ethRpc.get().ondisconnect = nil await g.ethRpc.get().close() diff --git a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim index f4daa5ff2a..d0a621dc50 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim @@ -21,12 +21,15 @@ template retryWrapper*(res: auto, body: untyped): auto = var retryCount = retryStrategy.retryCount var shouldRetry = retryStrategy.shouldRetry + var exceptionMessage = "" + while shouldRetry and retryCount > 0: try: res = body shouldRetry = false except: retryCount -= 1 + exceptionMessage = getCurrentExceptionMsg() await sleepAsync(retryStrategy.retryDelay) if shouldRetry: - raise newException(CatchableError, errStr & ": " & $getCurrentExceptionMsg()) + raise newException(CatchableError, errStr & ": " & exceptionMessage) From 1058cc116efaeec83b8e553fa7623cd297a7284f Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 23 Jan 2024 16:28:19 +0530 Subject: [PATCH 2/3] fix: swap ws for http --- apps/chat2/config_chat2.nim | 4 ++-- apps/wakunode2/external_config.nim | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/chat2/config_chat2.nim b/apps/chat2/config_chat2.nim index 2865a19242..b513ecb81c 100644 --- a/apps/chat2/config_chat2.nim +++ b/apps/chat2/config_chat2.nim @@ -252,8 +252,8 @@ type name: "rln-relay-id-commitment-key" }: string rlnRelayEthClientAddress* {. - desc: "WebSocket address of an Ethereum testnet client e.g., ws://localhost:8540/", - defaultValue: "ws://localhost:8540/" + desc: "WebSocket address of an Ethereum testnet client e.g., http://localhost:8540/", + defaultValue: "http://localhost:8540/" name: "rln-relay-eth-client-address" }: string rlnRelayEthContractAddress* {. diff --git a/apps/wakunode2/external_config.nim b/apps/wakunode2/external_config.nim index 22235af69c..6ebb48a01d 100644 --- a/apps/wakunode2/external_config.nim +++ b/apps/wakunode2/external_config.nim @@ -61,8 +61,8 @@ type name: "rln-relay-cred-path" }: string rlnRelayEthClientAddress* {. - desc: "WebSocket address of an Ethereum testnet client e.g., ws://localhost:8540/", - defaultValue: "ws://localhost:8540/", + desc: "WebSocket address of an Ethereum testnet client e.g., http://localhost:8540/", + defaultValue: "http://localhost:8540/", name: "rln-relay-eth-client-address" }: string rlnRelayEthContractAddress* {. From dbd6cc3617634314295c11f4b19ab064de5eb25a Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 23 Jan 2024 22:44:03 +0530 Subject: [PATCH 3/3] fix(rln-relay): update block poll rate --- waku/waku_rln_relay/group_manager/on_chain/group_manager.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index e54b22ac37..53059251da 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -80,7 +80,7 @@ type const DefaultKeyStorePath* = "rlnKeystore.json" const DefaultKeyStorePassword* = "password" -const DefaultBlockPollRate* = 1.seconds +const DefaultBlockPollRate* = 6.seconds template initializedGuard(g: OnchainGroupManager): untyped = if not g.initialized: