Skip to content

Commit

Permalink
chore: Minor Postgres optimizations (#2166)
Browse files Browse the repository at this point in the history
* postgres_healthcheck: validate once per minute instead of 30 sec
* postgres_driver.nim: change MaxNumCons from 5 to 50
* postgres_driver.nim: split connPool into writeConPool and readConPool
  This aims to avoid clashes in insert and select queries
  because the inserts and selects can happen concurrently
  in relay and store events, respectively.
  • Loading branch information
Ivansete-status authored Oct 30, 2023
1 parent cc01bb0 commit 282c2e8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 20 deletions.
52 changes: 33 additions & 19 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import
export postgres_driver

type PostgresDriver* = ref object of ArchiveDriver
connPool: PgAsyncPool
## Establish a separate pools for read/write operations
writeConnPool: PgAsyncPool
readConnPool: PgAsyncPool

proc dropTableQuery(): string =
"DROP TABLE messages"
Expand All @@ -40,29 +42,33 @@ proc insertRow(): string =
"""INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic,
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);"""

const MaxNumConns = 5 #TODO: we may need to set that from app args (maybe?)
const MaxNumConns = 50 #TODO: we may need to set that from app args (maybe?)

proc new*(T: type PostgresDriver,
dbUrl: string,
maxConnections: int = MaxNumConns,
onErrAction: OnErrHandler = nil):
ArchiveDriverResult[T] =

let connPoolRes = PgAsyncPool.new(dbUrl, maxConnections)
if connPoolRes.isErr():
return err("error creating PgAsyncPool: " & connPoolRes.error)
let readConnPool = PgAsyncPool.new(dbUrl, maxConnections).valueOr:
return err("error creating read conn pool PgAsyncPool")

let connPool = connPoolRes.get()
let writeConnPool = PgAsyncPool.new(dbUrl, maxConnections).valueOr:
return err("error creating write conn pool PgAsyncPool")

if not isNil(onErrAction):
asyncSpawn checkConnectivity(connPool, onErrAction)
asyncSpawn checkConnectivity(readConnPool, onErrAction)

return ok(PostgresDriver(connPool: connPool))
if not isNil(onErrAction):
asyncSpawn checkConnectivity(writeConnPool, onErrAction)

return ok(PostgresDriver(writeConnPool: writeConnPool,
readConnPool: readConnPool))

proc createMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(createTableQuery())
let execRes = await s.writeConnPool.exec(createTableQuery())
if execRes.isErr():
return err("error in createMessageTable: " & execRes.error)

Expand All @@ -71,7 +77,7 @@ proc createMessageTable*(s: PostgresDriver):
proc deleteMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(dropTableQuery())
let execRes = await s.writeConnPool.exec(dropTableQuery())
if execRes.isErr():
return err("error in deleteMessageTable: " & execRes.error)

Expand All @@ -97,7 +103,7 @@ method put*(s: PostgresDriver,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =

let ret = await s.connPool.runStmt(insertRow(),
let ret = await s.writeConnPool.runStmt(insertRow(),
@[toHex(digest.data),
$receivedTime,
message.contentTopic,
Expand Down Expand Up @@ -144,7 +150,7 @@ method getAllMessages*(s: PostgresDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.

let rowsRes = await s.connPool.query("""SELECT storedAt, contentTopic,
let rowsRes = await s.readConnPool.query("""SELECT storedAt, contentTopic,
payload, pubsubTopic, version, timestamp,
id FROM messages ORDER BY storedAt ASC""",
newSeq[string](0))
Expand Down Expand Up @@ -214,7 +220,7 @@ method getMessages*(s: PostgresDriver,
query &= " LIMIT ?"
args.add($maxPageSize)

let rowsRes = await s.connPool.query(query, args)
let rowsRes = await s.readConnPool.query(query, args)
if rowsRes.isErr():
return err("failed to run query: " & rowsRes.error)

Expand All @@ -233,7 +239,7 @@ proc getInt(s: PostgresDriver,
Future[ArchiveDriverResult[int64]] {.async.} =
# Performs a query that is expected to return a single numeric value (int64)

let rowsRes = await s.connPool.query(query)
let rowsRes = await s.readConnPool.query(query)
if rowsRes.isErr():
return err("failed in getRow: " & rowsRes.error)

Expand Down Expand Up @@ -286,7 +292,7 @@ method deleteMessagesOlderThanTimestamp*(
ts: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(
let execRes = await s.writeConnPool.exec(
"DELETE FROM messages WHERE storedAt < " & $ts)
if execRes.isErr():
return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error)
Expand All @@ -298,7 +304,7 @@ method deleteOldestMessagesNotWithinLimit*(
limit: int):
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(
let execRes = await s.writeConnPool.exec(
"""DELETE FROM messages WHERE id NOT IN
(
SELECT id FROM messages ORDER BY storedAt DESC LIMIT ?
Expand All @@ -312,8 +318,16 @@ method deleteOldestMessagesNotWithinLimit*(
method close*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
## Close the database connection
let result = await s.connPool.close()
return result
let writeCloseRes = await s.writeConnPool.close()
let readCloseRes = await s.readConnPool.close()

writeCloseRes.isOkOr:
return err("error closing write pool: " & $error)

readCloseRes.isOkOr:
return err("error closing read pool: " & $error)

return ok()

proc sleep*(s: PostgresDriver, seconds: int):
Future[ArchiveDriverResult[void]] {.async.} =
Expand All @@ -322,7 +336,7 @@ proc sleep*(s: PostgresDriver, seconds: int):
# database for the amount of seconds given as a parameter.
try:
let params = @[$seconds]
let sleepRes = await s.connPool.query("SELECT pg_sleep(?)", params)
let sleepRes = await s.writeConnPool.query("SELECT pg_sleep(?)", params)
if sleepRes.isErr():
return err("error in postgres_driver sleep: " & sleepRes.error)
except DbError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import

## Simple query to validate that the postgres is working and attending requests
const HealthCheckQuery = "SELECT version();"
const CheckConnectivityInterval = 30.seconds
const CheckConnectivityInterval = 60.seconds
const MaxNumTrials = 20
const TrialInterval = 1.seconds

Expand Down

0 comments on commit 282c2e8

Please sign in to comment.