Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rln-relay): close db connection appropriately #1858

Merged
merged 1 commit into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions waku/v2/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -828,4 +828,8 @@ proc stop*(node: WakuNode) {.async.} =
await node.switch.stop()
node.peerManager.stop()

when defined(rln):
if not node.wakuRlnRelay.isNil():
await node.wakuRlnRelay.stop()

node.started = false
3 changes: 3 additions & 0 deletions waku/v2/waku_rln_relay/group_manager/group_manager_base.nim
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretH
method atomicBatch*(g: GroupManager, idCommitments: seq[IDCommitment], toRemoveIndices: seq[MembershipIndex]): Future[void] {.base,gcsafe.} =
raise newException(CatchableError, "atomicBatch proc for " & $g.type & " is not implemented yet")

method stop*(g: GroupManager): Future[void] {.base,gcsafe.} =
raise newException(CatchableError, "stop proc for " & $g.type & " is not implemented yet")

# This proc is used to set a callback that will be called when an identity commitment is withdrawn
# The callback may be called multiple times, and should be used to for any post processing
method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} =
Expand Down
24 changes: 20 additions & 4 deletions waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type
const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password"

const BlockChunkSize* = 100'u64
const DecayFactor* = 1.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this decay for block chunk size added by mistake to this PR? Does not seem to be really related to "close db connection" (or am I missing something?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately, while I was testing this fix, I discovered a bug related to the chunking of blocks, so I included it in this PR to ensure some stability - can remove if you'd like!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me, just wanted to make sure this was intentional:)

const DefaultChunkSize* = 1000

template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized:
Expand Down Expand Up @@ -320,28 +321,33 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =

let ethRpc = g.ethRpc.get()

# the block chunk size decays exponentially with the number of blocks
# the minimum chunk size is 100
var blockChunkSize = 1_000_000

var fromBlock = if g.latestProcessedBlock.isSome():
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get()
g.latestProcessedBlock.get()
else:
info "starting onchain sync from scratch"
# chunk size is 1000 blocks
BlockNumber(0)

let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
try:
# we always want to sync from last processed block => latest
if fromBlock == BlockNumber(0) or
fromBlock + BlockNumber(BlockChunkSize) < latestBlock:
fromBlock + BlockNumber(blockChunkSize) < latestBlock:
# chunk events
while true:
let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber())
let toBlock = min(fromBlock + BlockNumber(BlockChunkSize), currentLatestBlock)
let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
info "chunking events", fromBlock = fromBlock, toBlock = toBlock
await g.getAndHandleEvents(fromBlock, some(toBlock))
fromBlock = toBlock + 1
if fromBlock >= currentLatestBlock:
break
let newChunkSize = float(blockChunkSize) / DecayFactor
blockChunkSize = max(int(newChunkSize), DefaultChunkSize)
else:
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
except CatchableError:
Expand Down Expand Up @@ -496,3 +502,13 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
error "failed to restart group sync", error = getCurrentExceptionMsg()

g.initialized = true

method stop*(g: OnchainGroupManager): Future[void] {.async.} =
if g.ethRpc.isSome():
g.ethRpc.get().ondisconnect = nil
await g.ethRpc.get().close()
let flushed = g.rlnInstance.flush()
if not flushed:
error "failed to flush to the tree db"

g.initialized = false
7 changes: 7 additions & 0 deletions waku/v2/waku_rln_relay/group_manager/static/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,10 @@ method onRegister*(g: StaticGroupManager, cb: OnRegisterCallback) {.gcsafe.} =

method onWithdraw*(g: StaticGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
g.withdrawCb = some(cb)

method stop*(g: StaticGroupManager): Future[void] =
initializedGuard(g)
# No-op
var retFut = newFuture[void]("StaticGroupManager.stop")
retFut.complete()
return retFut
5 changes: 5 additions & 0 deletions waku/v2/waku_rln_relay/rln/rln_interface.nim
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,8 @@ proc get_metadata*(ctx: ptr RLN, output_buffer: ptr Buffer): bool {.importc: "ge
## gets the metadata stored by ctx and populates the passed pointer output_buffer with it
## the output_buffer holds the metadata as a byte seq
## the return bool value indicates the success or failure of the operation

proc flush*(ctx: ptr RLN): bool {.importc: "flush".}
## flushes the write buffer to the database
## the return bool value indicates the success or failure of the operation
## This allows more robust and graceful handling of the database connection
7 changes: 7 additions & 0 deletions waku/v2/waku_rln_relay/rln_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ type WakuRLNRelay* = ref object of RootObj
groupManager*: GroupManager
messageBucket*: Option[TokenBucket]

method stop*(rlnPeer: WakuRLNRelay) {.async.} =
## stops the rln-relay protocol
## Throws an error if it cannot stop the rln-relay protocol

# stop the group sync, and flush data to tree db
await rlnPeer.groupManager.stop()

proc hasDuplicate*(rlnPeer: WakuRLNRelay,
proofMetadata: ProofMetadata): RlnRelayResult[bool] =
## returns true if there is another message in the `nullifierLog` of the `rlnPeer` with the same
Expand Down