Skip to content

Commit

Permalink
Refactoring the Waku Archive. Simplifying the app.nim
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Jun 22, 2023
1 parent a44d4bf commit cbab1f4
Show file tree
Hide file tree
Showing 19 changed files with 329 additions and 433 deletions.
191 changes: 12 additions & 179 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ import
../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/v2/waku_archive,
../../waku/v2/waku_archive/driver/queue_driver,
../../waku/v2/waku_archive/driver/sqlite_driver,
../../waku/v2/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations,
../../waku/v2/waku_archive/retention_policy,
../../waku/v2/waku_archive/retention_policy/retention_policy_capacity,
../../waku/v2/waku_archive/retention_policy/retention_policy_time,
../../waku/v2/waku_dnsdisc,
../../waku/v2/waku_enr,
../../waku/v2/waku_discv5,
Expand Down Expand Up @@ -72,8 +66,6 @@ type

rng: ref HmacDrbgContext
peerStore: Option[WakuPeerStorage]
archiveDriver: Option[ArchiveDriver]
archiveRetentionPolicy: Option[RetentionPolicy]
dynamicBootstrapNodes: seq[RemotePeerInfo]

node: WakuNode
Expand All @@ -98,44 +90,14 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
App(version: git_version, conf: conf, rng: rng, node: nil)


## SQLite database

proc setupDatabaseConnection(dbUrl: string): AppResult[Option[SqliteDatabase]] =
## dbUrl mimics SQLAlchemy Database URL schema
## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
if dbUrl == "" or dbUrl == "none":
return ok(none(SqliteDatabase))

let dbUrlParts = dbUrl.split("://", 1)
let
engine = dbUrlParts[0]
path = dbUrlParts[1]

let connRes = case engine
of "sqlite":
# SQLite engine
# See: https://docs.sqlalchemy.org/en/14/core/engines.html#sqlite
SqliteDatabase.new(path)

else:
return err("unknown database engine")

if connRes.isErr():
return err("failed to init database connection: " & connRes.error)

ok(some(connRes.value))


## Peer persistence

const PeerPersistenceDbUrl = "sqlite://peers.db"

proc setupPeerStorage(): AppResult[Option[WakuPeerStorage]] =
let db = ?setupDatabaseConnection(PeerPersistenceDbUrl)
let db = ? SqliteDatabase.new("peers.db")

?peer_store_sqlite_migrations.migrate(db.get())
? peer_store_sqlite_migrations.migrate(db)

let res = WakuPeerStorage.new(db.get())
let res = WakuPeerStorage.new(db)
if res.isErr():
return err("failed to init peer store" & res.error)

Expand All @@ -153,127 +115,6 @@ proc setupPeerPersistence*(app: var App): AppResult[void] =

ok()


## Waku archive

proc gatherSqlitePageStats(db: SqliteDatabase): AppResult[(int64, int64, int64)] =
let
pageSize = ?db.getPageSize()
pageCount = ?db.getPageCount()
freelistCount = ?db.getFreelistCount()

ok((pageSize, pageCount, freelistCount))

proc performSqliteVacuum(db: SqliteDatabase): AppResult[void] =
## SQLite database vacuuming
# TODO: Run vacuuming conditionally based on database page stats
# if (pageCount > 0 and freelistCount > 0):

debug "starting sqlite database vacuuming"

let resVacuum = db.vacuum()
if resVacuum.isErr():
return err("failed to execute vacuum: " & resVacuum.error)

debug "finished sqlite database vacuuming"

proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): AppResult[Option[RetentionPolicy]] =
if retentionPolicy == "" or retentionPolicy == "none":
return ok(none(RetentionPolicy))

let rententionPolicyParts = retentionPolicy.split(":", 1)
let
policy = rententionPolicyParts[0]
policyArgs = rententionPolicyParts[1]


if policy == "time":
var retentionTimeSeconds: int64
try:
retentionTimeSeconds = parseInt(policyArgs)
except ValueError:
return err("invalid time retention policy argument")

let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
return ok(some(retPolicy))

elif policy == "capacity":
var retentionCapacity: int
try:
retentionCapacity = parseInt(policyArgs)
except ValueError:
return err("invalid capacity retention policy argument")

let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
return ok(some(retPolicy))

else:
return err("unknown retention policy")

proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): AppResult[ArchiveDriver] =
let db = ?setupDatabaseConnection(dbUrl)

if db.isSome():
# SQLite vacuum
# TODO: Run this only if the database engine is SQLite
let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get())
debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount

if vacuum and (pageCount > 0 and freelistCount > 0):
?performSqliteVacuum(db.get())

# Database migration
if migrate:
?archive_driver_sqlite_migrations.migrate(db.get())

if db.isSome():
debug "setting up sqlite waku archive driver"
let res = SqliteDriver.new(db.get())
if res.isErr():
return err("failed to init sqlite archive driver: " & res.error)

ok(res.value)

else:
debug "setting up in-memory waku archive driver"
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
ok(driver)

proc setupWakuArchive*(app: var App): AppResult[void] =
if not app.conf.store:
return ok()

# Message storage
let dbUrlValidationRes = validateDbUrl(app.conf.storeMessageDbUrl)
if dbUrlValidationRes.isErr():
return err("failed to configure the message store database connection: " & dbUrlValidationRes.error)

let archiveDriverRes = setupWakuArchiveDriver(dbUrlValidationRes.get(),
vacuum = app.conf.storeMessageDbVacuum,
migrate = app.conf.storeMessageDbMigration)
if archiveDriverRes.isOk():
app.archiveDriver = some(archiveDriverRes.get())
else:
return err("failed to configure archive driver: " & archiveDriverRes.error)

# Message store retention policy
let storeMessageRetentionPolicyRes = validateStoreMessageRetentionPolicy(app.conf.storeMessageRetentionPolicy)
if storeMessageRetentionPolicyRes.isErr():
return err("failed to configure the message retention policy: " & storeMessageRetentionPolicyRes.error)

let archiveRetentionPolicyRes = setupWakuArchiveRetentionPolicy(storeMessageRetentionPolicyRes.get())
if archiveRetentionPolicyRes.isOk():
app.archiveRetentionPolicy = archiveRetentionPolicyRes.get()
else:
return err("failed to configure the message retention policy: " & archiveRetentionPolicyRes.error)

# TODO: Move retention policy execution here
# if archiveRetentionPolicy.isSome():
# executeMessageRetentionPolicy(node)
# startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)

ok()

## Retrieve dynamic bootstrap nodes (DNS discovery)

proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): AppResult[seq[RemotePeerInfo]] =
Expand Down Expand Up @@ -477,9 +318,8 @@ proc setupWakuNode*(app: var App): AppResult[void] =

## Mount protocols

proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
archiveDriver: Option[ArchiveDriver],
archiveRetentionPolicy: Option[RetentionPolicy]): Future[AppResult[void]] {.async.} =
proc setupProtocols(node: WakuNode, conf: WakuNodeConf):
Future[AppResult[void]] {.async.} =
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
## No protocols are started yet.
Expand Down Expand Up @@ -554,20 +394,19 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,

if conf.store:
# Archive setup
let messageValidator: MessageValidator = DefaultMessageValidator()
mountArchive(node, archiveDriver, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy)
let mountArcRes = node.mountArchive(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration,
conf.storeMessageRetentionPolicy)
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)

# Store setup
try:
await mountStore(node)
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())

# TODO: Move this to storage setup phase
if archiveRetentionPolicy.isSome():
executeMessageRetentionPolicy(node)
startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)

mountStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
Expand Down Expand Up @@ -623,13 +462,7 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
return ok()

proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} =
return await setupProtocols(
app.node,
app.conf,
app.archiveDriver,
app.archiveRetentionPolicy
)

return await setupProtocols(app.node, app.conf)

## Start node

Expand Down
7 changes: 0 additions & 7 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import
logScope:
topics = "wakunode main"


{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
when isMainModule:
## Node setup happens in 6 phases:
Expand Down Expand Up @@ -65,12 +64,6 @@ when isMainModule:
error "1/7 Setting up storage failed", error=res1.error
quit(QuitFailure)

## Waku archive
let res2 = wakunode2.setupWakuArchive()
if res2.isErr():
error "1/7 Setting up storage failed (waku archive)", error=res2.error
quit(QuitFailure)

debug "2/7 Retrieve dynamic bootstrap nodes"

let res3 = wakunode2.setupDyamicBootstrapNodes()
Expand Down
Loading

0 comments on commit cbab1f4

Please sign in to comment.