Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: per limit split of PostgreSQL queries #3008

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 53 additions & 24 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.push raises: [].}

import
std/[nre, options, sequtils, strutils, strformat, times],
std/[nre, options, sequtils, strutils, strformat, times, sugar],
stew/[byteutils, arrayops],
results,
chronos,
Expand Down Expand Up @@ -128,7 +128,9 @@ const SelectCursorByHashDef =
"""SELECT timestamp FROM messages
WHERE messageHash = $1"""

const DefaultMaxNumConns = 50
const
DefaultMaxNumConns = 50
MaxHashesPerQuery = 100

proc new*(
T: type PostgresDriver,
Expand Down Expand Up @@ -815,38 +817,35 @@ proc getMessagesByMessageHashes(
debug "end of getMessagesByMessageHashes"
return ok(rows)

method getMessages*(
s: PostgresDriver,
includeData = true,
contentTopics = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId = "",
proc getMessagesWithinLimits(
self: PostgresDriver,
includeData: bool,
contentTopics: seq[ContentTopic],
pubsubTopic: Option[PubsubTopic],
cursor: Option[ArchiveCursor],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
hashes: seq[WakuMessageHash],
maxPageSize: uint,
ascendingOrder: bool,
requestId: string,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
debug "beginning of getMessages"

const MAX_ALLOWED_HASHES = 100
if hashes.len > MAX_ALLOWED_HASHES:
return err(fmt"can not attend queries with more than {MAX_ALLOWED_HASHES} hashes")
if hashes.len > MaxHashesPerQuery:
return err(fmt"can not attend queries with more than {MaxHashesPerQuery} hashes")

let hexHashes = hashes.mapIt(toHex(it))

if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and
startTime.isNone() and endTime.isNone() and hexHashes.len > 0:
return await s.getMessagesByMessageHashes(
return await self.getMessagesByMessageHashes(
"'" & hexHashes.join("','") & "'", maxPageSize, requestId
)

if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and
startTime.isSome() and endTime.isSome():
## Considered the most common query. Therefore, we use prepared statements to optimize it.
if includeData:
return await s.getMessagesPreparedStmt(
return await self.getMessagesPreparedStmt(
contentTopics.join(","),
PubsubTopic(pubsubTopic.get()),
cursor,
Expand All @@ -858,7 +857,7 @@ method getMessages*(
requestId,
)
else:
return await s.getMessageHashesPreparedStmt(
return await self.getMessageHashesPreparedStmt(
contentTopics.join(","),
PubsubTopic(pubsubTopic.get()),
cursor,
Expand All @@ -872,16 +871,46 @@ method getMessages*(
else:
if includeData:
## We will run atypical query. In this case we don't use prepared statemets
return await s.getMessagesArbitraryQuery(
return await self.getMessagesArbitraryQuery(
contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize,
ascendingOrder, requestId,
)
else:
return await s.getMessageHashesArbitraryQuery(
return await self.getMessageHashesArbitraryQuery(
contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize,
ascendingOrder, requestId,
)

method getMessages*(
s: PostgresDriver,
includeData = true,
contentTopics = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId = "",
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
debug "beginning of getMessages"

let distributedHashes = hashes.distribute(MaxHashesPerQuery)

let rows = collect(newSeq):
for hashes in distributedHashes:
let subRows =
?await s.getMessagesWithinLimits(
includeData, contentTopics, pubsubTopic, cursor, startTime, endTime, hashes,
maxPageSize, ascendingOrder, requestId,
)

for row in subRows:
row

SionoiS marked this conversation as resolved.
Show resolved Hide resolved
return ok(rows)

proc getStr(
s: PostgresDriver, query: string
): Future[ArchiveDriverResult[string]] {.async.} =
Expand Down
Loading