Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: extended Postgres code to support retention policy + refactoring #2244

Merged
merged 5 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions tests/waku_archive/test_retention_policy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ suite "Waku Archive - Retention policy":
test "size retention policy - windowed message deletion":
## Given
let
# in megabytes
sizeLimit:float = 0.05
# in bytes
sizeLimit:int64 = 52428
excess = 325

let driver = newTestArchiveDriver()
Expand Down Expand Up @@ -92,9 +92,7 @@ suite "Waku Archive - Retention policy":

## Then
# calculate the current database size
let pageSize = (waitFor driver.getPagesSize()).tryGet()
let pageCount = (waitFor driver.getPagesCount()).tryGet()
let sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0)
let sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet())

# NOTE: since vacuumin is done manually, this needs to be revisited if vacuuming done automatically

Expand All @@ -105,7 +103,7 @@ suite "Waku Archive - Retention policy":
require (sizeDB >= sizeLimit)
require (waitFor retentionPolicy.execute(driver)).isOk()

# get the number or rows from DB
# get the number or rows from database
let rowCountAfterDeletion = (waitFor driver.getMessagesCount()).tryGet()

check:
Expand Down
14 changes: 14 additions & 0 deletions waku/common/databases/db_sqlite.nim
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,20 @@ proc getPageCount*(db: SqliteDatabase): DatabaseResult[int64] =

return ok(count)

proc getDatabaseSize*(db: SqliteDatabase): DatabaseResult[int64] =
# get the database page size in bytes
var pageSize: int64 = ?db.getPageSize()

if pageSize == 0:
return err("failed to get page size ")

# get the database page count
let pageCount = ?db.getPageCount()

let databaseSize = (pageSize * pageCount)

return ok(databaseSize)

proc gatherSqlitePageStats*(db: SqliteDatabase):
DatabaseResult[(int64, int64, int64)] =
let
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ method getPagesCount*(driver: ArchiveDriver):
method getPagesSize*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard

method getDatabaseSize*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard

method performVacuum*(driver: ArchiveDriver):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

Expand Down
9 changes: 9 additions & 0 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,15 @@ proc getInt(s: PostgresDriver,

return ok(retInt)

method getDatabaseSize*(s: PostgresDriver):
Future[ArchiveDriverResult[int64]] {.async.} =

let intRes = (await s.getInt("SELECT pg_database_size(current_database())")).valueOr:
return err("error in getDatabaseSize: " & error)

let databaseSize: int64 = int64(intRes)
return ok(databaseSize)

method getMessagesCount*(s: PostgresDriver):
Future[ArchiveDriverResult[int64]] {.async.} =

Expand Down
4 changes: 4 additions & 0 deletions waku/waku_archive/driver/queue_driver/queue_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ method getPagesSize*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))

method getDatabasesSize*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))

method performVacuum*(driver: QueueDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return err("interface method not implemented")
Expand Down
4 changes: 4 additions & 0 deletions waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ method getPagesSize*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getPageSize()

method getDatabaseSize*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getDatabaseSize()

method performVacuum*(s: SqliteDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.performSqliteVacuum()
Expand Down
15 changes: 9 additions & 6 deletions waku/waku_archive/retention_policy/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,27 @@ proc new*(T: type RetentionPolicy,
var retentionSize: string
retentionSize = policyArgs

# captures the size unit such as Gb or Mb
# captures the size unit such as GB or MB
let sizeUnit = retentionSize.substr(retentionSize.len-2)
# captures the string type number data of the size provided
let sizeQuantityStr = retentionSize.substr(0,retentionSize.len-3)
# to hold the numeric value data of size
var sizeQuantity: float
var inptSizeQuantity: float
var sizeQuantity: int64

if sizeUnit in ["gb", "Gb", "GB", "gB"]:
# parse the actual value into integer type var
try:
sizeQuantity = parseFloat(sizeQuantityStr)
inptSizeQuantity = parseFloat(sizeQuantityStr)
except ValueError:
return err("invalid size retention policy argument: " & getCurrentExceptionMsg())
# Gb data is converted into Mb for uniform processing
sizeQuantity = sizeQuantity * 1024
# GB data is converted into bytes for uniform processing
sizeQuantity = int64(inptSizeQuantity * 1024.0 * 1024.0 * 1024.0)
SionoiS marked this conversation as resolved.
Show resolved Hide resolved
elif sizeUnit in ["mb", "Mb", "MB", "mB"]:
try:
sizeQuantity = parseFloat(sizeQuantityStr)
inptSizeQuantity = parseFloat(sizeQuantityStr)
# MB data is converted into bytes for uniform processing
sizeQuantity = int64(inptSizeQuantity * 1024.0 * 1024.0)
ABresting marked this conversation as resolved.
Show resolved Hide resolved
except ValueError:
return err("invalid size retention policy argument")
else:
Expand Down
27 changes: 9 additions & 18 deletions waku/waku_archive/retention_policy/retention_policy_size.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ import
logScope:
topics = "waku archive retention_policy"

# default size is 30 Gb
const DefaultRetentionSize*: float = 30_720
# default size is 30 GiB or 32212254720.0 in bytes
const DefaultRetentionSize*: int64 = 32212254720

# to remove 20% of the outdated data from database
const DeleteLimit = 0.80

type
# SizeRetentionPolicy implements auto delete as follows:
# - sizeLimit is the size in megabytes (Mbs) the database can grow upto
# - sizeLimit is the size in bytes the database can grow upto
# to reduce the size of the databases, remove the rows/number-of-messages
# DeleteLimit is the total number of messages to delete beyond this limit
# when the database size crosses the sizeLimit, then only a fraction of messages are kept,
# rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(),
# upon deletion process the fragmented space is retrieve back using Vacuum process.
SizeRetentionPolicy* = ref object of RetentionPolicy
sizeLimit: float
sizeLimit: int64

proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
SizeRetentionPolicy(
Expand All @@ -42,21 +42,12 @@ method execute*(p: SizeRetentionPolicy,
driver: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.async.} =
## when db size overshoots the database limit, shread 20% of outdated messages
# get size of database
let dbSize = (await driver.getDatabaseSize()).valueOr:
return err("failed to get database size: " & $error)

# get page size of database
let pageSizeRes = await driver.getPagesSize()
let pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024)

if pageSize == 0:
return err("failed to get Page size: " & pageSizeRes.error)

# to get the size of the database, pageCount and PageSize is required
# get page count in "messages" database
let pageCount = (await driver.getPagesCount()).valueOr:
return err("failed to get Pages count: " & $error)

# database size in megabytes (Mb)
let totalSizeOfDB: float = float(pageSize * pageCount)/1024.0
# database size in bytes
let totalSizeOfDB: int64 = int64(dbSize)

if totalSizeOfDB < p.sizeLimit:
return ok()
Expand Down
Loading