-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(postgresql): 1st commit to async sql (waku_archive/driver...) #1755
Conversation
5e5baa9
to
c822e37
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this! I'm about ready to approve this PR (good work), but have added a couple of comments below.
else: | ||
return ok(pageRes.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit nitpicky here, but I prefer having the "default" return case here be the last statement as before and not just a part of the conditional here. That way the last statement is your expected happy-path return and other returns are either fail conditions (or quick successes).
## Inserts a message into the store | ||
|
||
let res = s.insertStmt.exec(( | ||
return s.insertStmt.exec(( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd still assign this to a res
variable here to keep the final return
a bit cleaner.
let cursor = cursor.map(toDbCursor) | ||
|
||
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( | ||
return s.db.selectMessagesByHistoryQueryWithLimit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, I'd keep the indirection here with assigning this to a variable first to have the SQLite-specific failure and return cleanly separated within the SqliteDriver module.
method execute*(p: RetentionPolicy, store: ArchiveDriver): | ||
Future[Result[void, string]] {.base.} = discard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Future[Result[void, string]] {.base.} = discard | |
Future[RetentionPolicyResult[void]] {.base.} = discard |
Or am I missing something?
method execute*(p: RetentionPolicy, store: ArchiveDriver): | ||
Future[Result[void, string]] {.base.} = discard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason .base.
methods aren't annotated with async
pragma too?
else: | ||
return ok() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to elsewhere, I'd keep this as a statement on its own to signify that it's the expected exit for the method.
waku/v2/node/waku_node.nim
Outdated
asyncSpawn node.wakuArchive.executeMessageRetentionPolicy() | ||
asyncSpawn node.wakuArchive.reportStoredMessagesMetric() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure of impact, but since this is a timed task, do we want to spawn more futures here or shouldn't this function block until we have finished executing the retention policy as it did before (i.e. use waitFor
rather than asyncSpawn
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree. actually as it is the updated metrics by reportStoredMessagesMetric
wont be accurate since reportStoredMessagesMetric
will complete before executeMessageRetentionPolicy
i guess?
let archiveA = block: | ||
let driver = newTestArchiveDriver() | ||
|
||
for msg in msgListA: | ||
let msg_digest = waku_archive.computeDigest(msg) | ||
require (await driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp)).isOk() | ||
|
||
driver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this move into each test? If it's about the await
, wouldn't waitFor
and blocking solve it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes your are right.
I moved it due to the await
.
I'm moving it back and will use waitFor
instead.
let archiveA = block: | ||
let | ||
driver = newTestArchiveDriver() | ||
archive = newTestWakuArchive(driver) | ||
|
||
for msg in msgListA: | ||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() | ||
|
||
archive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps if you use waitFor
rather than await
this wouldn't need to move into the async block?
waku/v2/waku_archive/driver.nim
Outdated
message: WakuMessage, | ||
digest: MessageDigest, | ||
receivedTime: Timestamp): | ||
Future[ArchiveDriverResult[void]] {.base.} = discard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to annotate base
methods as async
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nothing to add to the existing commnets, thanks for the PR!
waku/v2/node/waku_node.nim
Outdated
asyncSpawn node.wakuArchive.executeMessageRetentionPolicy() | ||
asyncSpawn node.wakuArchive.reportStoredMessagesMetric() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree. actually as it is the updated metrics by reportStoredMessagesMetric
wont be accurate since reportStoredMessagesMetric
will complete before executeMessageRetentionPolicy
i guess?
Thanks so much for the comments! It can be re-reviewed :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM.
One thing, but it's up to you if you want to revert: apologies if I was unclear in previous feedback, but in general there is no issue with using asyncTest
and await
rather than waitFor
. In fact, this is what we'll prefer. The suggestion for using waitFor
was simply for creating the test archive drivers outside the (async) test blocks, whereas the code in the test blocks can continue using await
. Still this does not have to be part of this PR and only affects the tests. :)
Description
This PR is a very first step on getting asynchronous calls to any CRUD
archive
operation.Important to notice that this PR doesn't allow async calls per se. For example, in
waku/v2/waku_archive/driver/sqlite_driver/sqlite_driver.nim
, the methodgetMessages()
will still be sync because the call tos.db.selectMessagesByHistoryQueryWithLimit(..)
will still be blocking.Instead, this PR is a reorganization to adopt the asynchronous patterns for
waku archive
and adapt the code for the upcoming PR's where we will aggregate the @LNSD's PR, and therefore, start incorporating the needed to allow async interaction with a PostgreSQL database.Sorry for the big number of changes. These are the most relevant:
Changes
waku_archive/driver.nim
methodsIssue
#1604