Skip to content

Commit

Permalink
Adding healtcheck and reconnection mechanism to the postgres archive …
Browse files Browse the repository at this point in the history
…driver (#1997)

It starts an asynchronous infinite task that checks the connectivity
with the database. In case of error, the postgres_healthcheck task
tries to reconnect for a while, and if it determines that the connection
cannot be resumed, then it invokes a callback indicating that
situation. For the case of the `wakunode2` app, this callback
quits the application itself and adds a log trace indicating
the connectivity issue with the database.
  • Loading branch information
Ivansete-status committed Sep 6, 2023
1 parent 5638bd0 commit 1fb13b0
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 6 deletions.
9 changes: 8 additions & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,17 @@ proc setupProtocols(node: WakuNode,
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())

if conf.store:
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

# Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration)
conf.storeMessageDbMigration,
onErrAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

Expand Down
14 changes: 14 additions & 0 deletions waku/common/databases/db_postgres/pgasyncpool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ proc getConnIndex(pool: PgAsyncPool):
pool.conns[index].busy = true
return ok(index)

proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
## Forces closing the connection pool.
## This proc is intended to be called when the connection with the database
## got interrupted from the database side or a connectivity problem happened.

for i in 0..<pool.conns.len:
pool.conns[i].busy = false

(await pool.close()).isOkOr:
return err("error in resetConnPool: " & error)

pool.state = PgAsyncPoolState.Live
return ok()

proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
## Marks the connection as released.
for i in 0..<pool.conns.len:
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const DefaultPageSize*: uint = 25
type
ArchiveDriverResult*[T] = Result[T, string]
ArchiveDriver* = ref object of RootObj
OnErrHandler* = proc(errMsg: string) {.gcsafe, closure.}

type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)

Expand Down
9 changes: 7 additions & 2 deletions waku/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ export
proc new*(T: type ArchiveDriver,
url: string,
vacuum: bool,
migrate: bool):
migrate: bool,
onErrAction: OnErrHandler):
Result[T, string] =
## url - string that defines the database
## vacuum - if true, a cleanup operation will be applied to the database
## migrate - if true, the database schema will be updated
## onErrAction - called if, e.g., the connection with db got lost forever

let dbUrlValidationRes = dburl.validateDbUrl(url)
if dbUrlValidationRes.isErr():
Expand Down Expand Up @@ -74,7 +79,7 @@ proc new*(T: type ArchiveDriver,

of "postgres":
const MaxNumConns = 5 #TODO: we may need to set that from app args (maybe?)
let res = PostgresDriver.new(url, MaxNumConns)
let res = PostgresDriver.new(url, MaxNumConns, onErrAction)
if res.isErr():
return err("failed to init postgres archive driver: " & res.error)

Expand Down
13 changes: 10 additions & 3 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import
../../../waku_core,
../../common,
../../driver,
../../../common/databases/db_postgres as waku_postgres
../../../common/databases/db_postgres as waku_postgres,
./postgres_healthcheck

export postgres_driver

Expand Down Expand Up @@ -43,14 +44,20 @@ const DefaultMaxConnections = 5

proc new*(T: type PostgresDriver,
dbUrl: string,
maxConnections: int = DefaultMaxConnections):
maxConnections: int = DefaultMaxConnections,
onErrAction: OnErrHandler = nil):
ArchiveDriverResult[T] =

let connPoolRes = PgAsyncPool.new(dbUrl, maxConnections)
if connPoolRes.isErr():
return err("error creating PgAsyncPool: " & connPoolRes.error)

return ok(PostgresDriver(connPool: connPoolRes.get()))
let connPool = connPoolRes.get()

if not isNil(onErrAction):
asyncSpawn checkConnectivity(connPool, onErrAction)

return ok(PostgresDriver(connPool: connPool))

proc createMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
Expand Down
47 changes: 47 additions & 0 deletions waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
chronos,
stew/results
import
../../driver,
../../../common/databases/db_postgres

## Simple query to validate that the postgres is working and attending requests
const HealthCheckQuery = "SELECT version();"
const CheckConnectivityInterval = 30.seconds
const MaxNumTrials = 20
const TrialInterval = 1.seconds

proc checkConnectivity*(connPool: PgAsyncPool,
onErrAction: OnErrHandler) {.async.} =

while true:

(await connPool.exec(HealthCheckQuery)).isOkOr:

## The connection failed once. Let's try reconnecting for a while.
## Notice that the 'exec' proc tries to establish a new connection.

block errorBlock:
## Force close all the opened connections. No need to close gracefully.
(await connPool.resetConnPool()).isOkOr:
onErrAction("checkConnectivity resetConnPool error: " & error)

var numTrial = 0
while numTrial < MaxNumTrials:
let res = await connPool.exec(HealthCheckQuery)
if res.isOk():
## Connection resumed. Let's go back to the normal healthcheck.
break errorBlock

await sleepAsync(TrialInterval)
numTrial.inc()

## The connection couldn't be resumed. Let's inform the upper layers.
onErrAction("postgres health check error: " & error)

await sleepAsync(CheckConnectivityInterval)

0 comments on commit 1fb13b0

Please sign in to comment.