Skip to content

Commit

Permalink
refactor(archive): Moving waku archive logic from app.nim to the arch…
Browse files Browse the repository at this point in the history
…ive module (#1817)

* Refactoring the Waku Archive. Simplifying the app.nim

This change is needed to accommodate the further PRs where we will integrate Postgres in `wakunode2`.
  • Loading branch information
Ivansete-status authored Jun 27, 2023
1 parent 71c4ac1 commit 52894a8
Show file tree
Hide file tree
Showing 18 changed files with 300 additions and 285 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci-experimental.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ jobs:

- name: Run tests
run: |
if [ ${{ runner.os }} == "macOS" ]; then
brew unlink postgresql@14
brew link libpq --force
fi
if [ ${{ runner.os }} == "Linux" ]; then
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:9.6-alpine
fi
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ jobs:

- name: Run tests
run: |
if [ ${{ runner.os }} == "macOS" ]; then
brew unlink postgresql@14
brew link libpq --force
fi
if [ ${{ runner.os }} == "Linux" ]; then
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:9.6-alpine
fi
Expand Down
199 changes: 23 additions & 176 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,15 @@ import
import
../../waku/common/utils/nat,
../../waku/common/databases/db_sqlite,
../../waku/v2/waku_archive/driver/builder,
../../waku/v2/waku_archive/retention_policy/builder,
../../waku/v2/waku_core,
../../waku/v2/waku_node,
../../waku/v2/node/waku_metrics,
../../waku/v2/node/peer_manager,
../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/v2/waku_archive,
../../waku/v2/waku_archive/driver/queue_driver,
../../waku/v2/waku_archive/driver/sqlite_driver,
../../waku/v2/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations,
../../waku/v2/waku_archive/retention_policy,
../../waku/v2/waku_archive/retention_policy/retention_policy_capacity,
../../waku/v2/waku_archive/retention_policy/retention_policy_time,
../../waku/v2/waku_dnsdisc,
../../waku/v2/waku_enr,
../../waku/v2/waku_discv5,
Expand Down Expand Up @@ -76,8 +72,6 @@ type
record: Record

peerStore: Option[WakuPeerStorage]
archiveDriver: Option[ArchiveDriver]
archiveRetentionPolicy: Option[RetentionPolicy]
dynamicBootstrapNodes: seq[RemotePeerInfo]

node: WakuNode
Expand Down Expand Up @@ -136,44 +130,15 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
)


## SQLite database

proc setupDatabaseConnection(dbUrl: string): AppResult[Option[SqliteDatabase]] =
## dbUrl mimics SQLAlchemy Database URL schema
## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
if dbUrl == "" or dbUrl == "none":
return ok(none(SqliteDatabase))

let dbUrlParts = dbUrl.split("://", 1)
let
engine = dbUrlParts[0]
path = dbUrlParts[1]

let connRes = case engine
of "sqlite":
# SQLite engine
# See: https://docs.sqlalchemy.org/en/14/core/engines.html#sqlite
SqliteDatabase.new(path)

else:
return err("unknown database engine")

if connRes.isErr():
return err("failed to init database connection: " & connRes.error)

ok(some(connRes.value))


## Peer persistence

const PeerPersistenceDbUrl = "sqlite://peers.db"

const PeerPersistenceDbUrl = "peers.db"
proc setupPeerStorage(): AppResult[Option[WakuPeerStorage]] =
let db = ?setupDatabaseConnection(PeerPersistenceDbUrl)
let db = ? SqliteDatabase.new(PeerPersistenceDbUrl)

?peer_store_sqlite_migrations.migrate(db.get())
? peer_store_sqlite_migrations.migrate(db)

let res = WakuPeerStorage.new(db.get())
let res = WakuPeerStorage.new(db)
if res.isErr():
return err("failed to init peer store" & res.error)

Expand All @@ -191,127 +156,6 @@ proc setupPeerPersistence*(app: var App): AppResult[void] =

ok()


## Waku archive

proc gatherSqlitePageStats(db: SqliteDatabase): AppResult[(int64, int64, int64)] =
let
pageSize = ?db.getPageSize()
pageCount = ?db.getPageCount()
freelistCount = ?db.getFreelistCount()

ok((pageSize, pageCount, freelistCount))

proc performSqliteVacuum(db: SqliteDatabase): AppResult[void] =
## SQLite database vacuuming
# TODO: Run vacuuming conditionally based on database page stats
# if (pageCount > 0 and freelistCount > 0):

debug "starting sqlite database vacuuming"

let resVacuum = db.vacuum()
if resVacuum.isErr():
return err("failed to execute vacuum: " & resVacuum.error)

debug "finished sqlite database vacuuming"

proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): AppResult[Option[RetentionPolicy]] =
if retentionPolicy == "" or retentionPolicy == "none":
return ok(none(RetentionPolicy))

let rententionPolicyParts = retentionPolicy.split(":", 1)
let
policy = rententionPolicyParts[0]
policyArgs = rententionPolicyParts[1]


if policy == "time":
var retentionTimeSeconds: int64
try:
retentionTimeSeconds = parseInt(policyArgs)
except ValueError:
return err("invalid time retention policy argument")

let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
return ok(some(retPolicy))

elif policy == "capacity":
var retentionCapacity: int
try:
retentionCapacity = parseInt(policyArgs)
except ValueError:
return err("invalid capacity retention policy argument")

let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
return ok(some(retPolicy))

else:
return err("unknown retention policy")

proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): AppResult[ArchiveDriver] =
let db = ?setupDatabaseConnection(dbUrl)

if db.isSome():
# SQLite vacuum
# TODO: Run this only if the database engine is SQLite
let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get())
debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount

if vacuum and (pageCount > 0 and freelistCount > 0):
?performSqliteVacuum(db.get())

# Database migration
if migrate:
?archive_driver_sqlite_migrations.migrate(db.get())

if db.isSome():
debug "setting up sqlite waku archive driver"
let res = SqliteDriver.new(db.get())
if res.isErr():
return err("failed to init sqlite archive driver: " & res.error)

ok(res.value)

else:
debug "setting up in-memory waku archive driver"
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
ok(driver)

proc setupWakuArchive*(app: var App): AppResult[void] =
if not app.conf.store:
return ok()

# Message storage
let dbUrlValidationRes = validateDbUrl(app.conf.storeMessageDbUrl)
if dbUrlValidationRes.isErr():
return err("failed to configure the message store database connection: " & dbUrlValidationRes.error)

let archiveDriverRes = setupWakuArchiveDriver(dbUrlValidationRes.get(),
vacuum = app.conf.storeMessageDbVacuum,
migrate = app.conf.storeMessageDbMigration)
if archiveDriverRes.isOk():
app.archiveDriver = some(archiveDriverRes.get())
else:
return err("failed to configure archive driver: " & archiveDriverRes.error)

# Message store retention policy
let storeMessageRetentionPolicyRes = validateStoreMessageRetentionPolicy(app.conf.storeMessageRetentionPolicy)
if storeMessageRetentionPolicyRes.isErr():
return err("failed to configure the message retention policy: " & storeMessageRetentionPolicyRes.error)

let archiveRetentionPolicyRes = setupWakuArchiveRetentionPolicy(storeMessageRetentionPolicyRes.get())
if archiveRetentionPolicyRes.isOk():
app.archiveRetentionPolicy = archiveRetentionPolicyRes.get()
else:
return err("failed to configure the message retention policy: " & archiveRetentionPolicyRes.error)

# TODO: Move retention policy execution here
# if archiveRetentionPolicy.isSome():
# executeMessageRetentionPolicy(node)
# startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)

ok()

## Retrieve dynamic bootstrap nodes (DNS discovery)

proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): AppResult[seq[RemotePeerInfo]] =
Expand Down Expand Up @@ -450,9 +294,8 @@ proc setupWakuNode*(app: var App): AppResult[void] =

proc setupProtocols(node: WakuNode,
conf: WakuNodeConf,
nodeKey: crypto.PrivateKey,
archiveDriver: Option[ArchiveDriver],
archiveRetentionPolicy: Option[RetentionPolicy]): Future[AppResult[void]] {.async.} =
nodeKey: crypto.PrivateKey):
Future[AppResult[void]] {.async.} =
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
## No protocols are started yet.
Expand Down Expand Up @@ -527,20 +370,27 @@ proc setupProtocols(node: WakuNode,

if conf.store:
# Archive setup
let messageValidator: MessageValidator = DefaultMessageValidator()
mountArchive(node, archiveDriver, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy)
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

let retPolicyRes = RetentionPolicy.new(conf.storeMessageRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)

let mountArcRes = node.mountArchive(archiveDriverRes.get(),
retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)

# Store setup
try:
await mountStore(node)
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())

# TODO: Move this to storage setup phase
if archiveRetentionPolicy.isSome():
executeMessageRetentionPolicy(node)
startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)

mountStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
Expand Down Expand Up @@ -599,12 +449,9 @@ proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} =
return await setupProtocols(
app.node,
app.conf,
app.key,
app.archiveDriver,
app.archiveRetentionPolicy
app.key
)


## Start node

proc startNode(node: WakuNode, conf: WakuNodeConf,
Expand Down
11 changes: 0 additions & 11 deletions apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -537,17 +537,6 @@ proc validateDbUrl*(val: string): ConfResult[string] =
else:
err("invalid 'db url' option format: " & val)


let StoreMessageRetentionPolicyRegex = re"^\w+:\w+$"

proc validateStoreMessageRetentionPolicy*(val: string): ConfResult[string] =
let val = val.strip()

if val == "" or val == "none" or val.match(StoreMessageRetentionPolicyRegex):
ok(val)
else:
err("invalid 'store message retention policy' option format: " & val)

proc validateExtMultiAddrs*(vals: seq[string]): ConfResult[seq[MultiAddress]] =
var multiaddrs: seq[MultiAddress]
for val in vals:
Expand Down
7 changes: 0 additions & 7 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import
logScope:
topics = "wakunode main"


{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
when isMainModule:
## Node setup happens in 6 phases:
Expand Down Expand Up @@ -65,12 +64,6 @@ when isMainModule:
error "1/7 Setting up storage failed", error=res1.error
quit(QuitFailure)

## Waku archive
let res2 = wakunode2.setupWakuArchive()
if res2.isErr():
error "1/7 Setting up storage failed (waku archive)", error=res2.error
quit(QuitFailure)

debug "2/7 Retrieve dynamic bootstrap nodes"

let res3 = wakunode2.setupDyamicBootstrapNodes()
Expand Down
3 changes: 1 addition & 2 deletions tests/v2/waku_archive/test_waku_archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ proc newTestArchiveDriver(): ArchiveDriver =
SqliteDriver.new(db).tryGet()

proc newTestWakuArchive(driver: ArchiveDriver): WakuArchive =
let validator: MessageValidator = DefaultMessageValidator()
WakuArchive.new(driver, validator=some(validator))
WakuArchive.new(driver).get()

proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor =
ArchiveCursor(
Expand Down
17 changes: 13 additions & 4 deletions tests/v2/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ procSuite "WakuNode - Store":

waitFor allFutures(client.start(), server.start())

server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error

waitFor server.mountStore()

client.mountStoreClient()
Expand Down Expand Up @@ -104,7 +106,9 @@ procSuite "WakuNode - Store":

waitFor allFutures(client.start(), server.start())

server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error

waitFor server.mountStore()

client.mountStoreClient()
Expand Down Expand Up @@ -153,7 +157,9 @@ procSuite "WakuNode - Store":

waitFor allFutures(client.start(), server.start())

server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error

waitFor server.mountStore()

client.mountStoreClient()
Expand Down Expand Up @@ -207,7 +213,10 @@ procSuite "WakuNode - Store":

waitFor filterSource.mountFilter()
let driver = newTestArchiveDriver()
server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))

let mountArchiveRes = server.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error

waitFor server.mountStore()
waitFor server.mountFilterClient()
client.mountStoreClient()
Expand Down
4 changes: 3 additions & 1 deletion tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ procSuite "Waku v2 JSON-RPC API - Admin":
await node.mountFilter()
await node.mountFilterClient()
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error

await node.mountStore()
node.mountStoreClient()

Expand Down
Loading

0 comments on commit 52894a8

Please sign in to comment.