Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Optimize postgres - use of rowCallback approach #2171

Merged
merged 5 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions waku/common/databases/db_postgres/dbconn.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import

include db_postgres

type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe.}

## Connection management

proc check*(db: DbConn): Result[void, string] =
Expand Down Expand Up @@ -43,11 +45,11 @@ proc open*(connString: string):

ok(conn)

proc rows*(db: DbConn,
query: SqlQuery,
args: seq[string]):
Future[Result[seq[Row], string]] {.async.} =
## Runs the SQL getting results.
proc sendQuery(db: DbConn,
query: SqlQuery,
args: seq[string]):
Future[Result[void, string]] {.async.} =
## This proc can be used directly for queries that don't retrieve values back.

if db.status != CONNECTION_OK:
let checkRes = db.check()
Expand All @@ -71,7 +73,13 @@ proc rows*(db: DbConn,

return err("failed pqsendQuery: unknown reason")

var ret = newSeq[Row](0)
return ok()

proc waitQueryToFinish(db: DbConn,
rowCallback: DataProc = nil):
Future[Result[void, string]] {.async.} =
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
## For other queries, like "INSERT", 'rowCallback' should be nil.

while true:

Expand All @@ -84,22 +92,33 @@ proc rows*(db: DbConn,
return err("failed pqconsumeInput: unknown reason")

if db.pqisBusy() == 1:
await sleepAsync(0.milliseconds) # Do not block the async runtime
await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime
continue

var pqResult = db.pqgetResult()
let pqResult = db.pqgetResult()
if pqResult == nil:
# Check if its a real error or just end of results
let checkRes = db.check()
if checkRes.isErr():
return err("error in rows: " & checkRes.error)

return ok(ret) # reached the end of the results
return ok() # reached the end of the results

var cols = pqResult.pqnfields()
var row = cols.newRow()
for i in 0'i32 .. pqResult.pqNtuples() - 1:
pqResult.setRow(row, i, cols) # puts the value in the row
ret.add(row)
if not rowCallback.isNil():
rowCallback(pqResult)

pqclear(pqResult)

proc dbConnQuery*(db: DbConn,
query: SqlQuery,
args: seq[string],
rowCallback: DataProc):
Future[Result[void, string]] {.async, gcsafe.} =

(await db.sendQuery(query, args)).isOkOr:
return err("error in dbConnQuery calling sendQuery: " & $error)

(await db.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)

return ok()
79 changes: 33 additions & 46 deletions waku/common/databases/db_postgres/pgasyncpool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@ else:
import
std/[sequtils,nre, strformat],
stew/results,
chronicles,
chronos
import
./dbconn,
../common

logScope:
topics = "postgres asyncpool"
Comment on lines -17 to -18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't log anything anymore? Or was it not used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! It was not used. In fact, all the exposed procs return Result so it is pretty likely that we won't need many longs in this file.


type PgAsyncPoolState {.pure.} = enum
Closed,
Live,
Expand Down Expand Up @@ -107,6 +103,16 @@ proc close*(pool: PgAsyncPool):

return ok()

proc getFirstFreeConnIndex(pool: PgAsyncPool):
DatabaseResult[int] =
for index in 0..<pool.conns.len:
if pool.conns[index].busy:
continue

## Pick up the first free connection and set it busy
pool.conns[index].busy = true
return ok(index)

proc getConnIndex(pool: PgAsyncPool):
Future[DatabaseResult[int]] {.async.} =
## Waits for a free connection or create if max connections limits have not been reached.
Expand All @@ -115,8 +121,20 @@ proc getConnIndex(pool: PgAsyncPool):
if not pool.isLive():
return err("pool is not live")

# stablish new connections if we are under the limit
if pool.isBusy() and pool.conns.len < pool.maxConnections:
if not pool.isBusy():
return pool.getFirstFreeConnIndex()

## Pool is busy then

if pool.conns.len == pool.maxConnections:
## Can't create more connections. Wait for a free connection without blocking the async runtime.
while pool.isBusy():
await sleepAsync(0.milliseconds)

return pool.getFirstFreeConnIndex()

elif pool.conns.len < pool.maxConnections:
## stablish a new connection
let connRes = dbconn.open(pool.connString)
if connRes.isOk():
let conn = connRes.get()
Expand All @@ -125,17 +143,6 @@ proc getConnIndex(pool: PgAsyncPool):
else:
return err("failed to stablish a new connection: " & connRes.error)

# wait for a free connection without blocking the async runtime
while pool.isBusy():
await sleepAsync(0.milliseconds)

for index in 0..<pool.conns.len:
if pool.conns[index].busy:
continue

pool.conns[index].busy = true
return ok(index)

proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
## Forces closing the connection pool.
## This proc is intended to be called when the connection with the database
Expand All @@ -156,12 +163,13 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
if pool.conns[i].dbConn == conn:
pool.conns[i].busy = false

proc query*(pool: PgAsyncPool,
query: string,
args: seq[string] = newSeq[string](0)):
Future[DatabaseResult[seq[Row]]] {.async.} =
## Runs the SQL query getting results.
## Retrieves info from the database.
proc pgQuery*(pool: PgAsyncPool,
query: string,
args: seq[string] = newSeq[string](0),
rowCallback: DataProc = nil):
Future[DatabaseResult[void]] {.async.} =
## rowCallback != nil when it is expected to retrieve info from the database.
## rowCallback == nil for queries that change the database state.

let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr():
Expand All @@ -170,29 +178,8 @@ proc query*(pool: PgAsyncPool,
let conn = pool.conns[connIndexRes.value].dbConn
defer: pool.releaseConn(conn)

let rowsRes = await conn.rows(sql(query), args)
if rowsRes.isErr():
return err("error in asyncpool query: " & rowsRes.error)

return ok(rowsRes.get())

proc exec*(pool: PgAsyncPool,
query: string,
args: seq[string] = newSeq[string](0)):
Future[DatabaseResult[void]] {.async.} =
## Runs the SQL query without results.
## Alters the database state.

let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr():
return err("connRes is err in exec: " & connIndexRes.error)

let conn = pool.conns[connIndexRes.value].dbConn
defer: pool.releaseConn(conn)

let rowsRes = await conn.rows(sql(query), args)
if rowsRes.isErr():
return err("rowsRes is err in exec: " & rowsRes.error)
(await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr:
return err("error in asyncpool query: " & $error)

return ok()

Expand Down
Loading
Loading