Skip to content

Commit

Permalink
archive: simpler logs that we had due to duplicate rows insertion errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Mar 15, 2024
1 parent a3537fa commit b7637b6
Showing 1 changed file with 58 additions and 73 deletions.
131 changes: 58 additions & 73 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@ const
DefaultPageSize*: uint = 20
MaxPageSize*: uint = 100

# Retention policy
# Retention policy
WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30)

# Metrics reporting
# Metrics reporting
WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(1)

# Message validation
# 20 seconds maximum allowable sender timestamp "drift"
MaxMessageTimestampVariance* = getNanoSecondTime(20)
# Message validation
# 20 seconds maximum allowable sender timestamp "drift"
MaxMessageTimestampVariance* = getNanoSecondTime(20)

type MessageValidator* =
proc(msg: WakuMessage): Result[void, string] {.closure, gcsafe, raises: [].}
type MessageValidator* = proc(msg: WakuMessage): Result[void, string] {.closure, gcsafe, raises: [].}

## Archive

Expand All @@ -54,7 +53,7 @@ proc validate*(msg: WakuMessage): Result[void, string] =
if msg.ephemeral:
# Ephemeral message, do not store
return

if msg.timestamp == 0:
return ok()

Expand All @@ -71,89 +70,78 @@ proc validate*(msg: WakuMessage): Result[void, string] =

return ok()

proc new*(
T: type WakuArchive,
driver: ArchiveDriver,
validator: MessageValidator = validate,
retentionPolicy = none(RetentionPolicy),
): Result[T, string] =
proc new*(T: type WakuArchive,
driver: ArchiveDriver,
validator: MessageValidator = validate,
retentionPolicy = none(RetentionPolicy)):
Result[T, string] =
if driver.isNil():
return err("archive driver is Nil")

let archive =
WakuArchive(driver: driver, validator: validator, retentionPolicy: retentionPolicy)
WakuArchive(
driver: driver,
validator: validator,
retentionPolicy: retentionPolicy,
)

return ok(archive)

proc handleMessage*(
self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage
) {.async.} =
proc handleMessage*(self: WakuArchive,
pubsubTopic: PubsubTopic,
msg: WakuMessage) {.async.} =
self.validator(msg).isOkOr:
waku_archive_errors.inc(labelValues = [error])
return

let
msgDigest = computeDigest(msg)
msgHash = computeMessageHash(pubsubTopic, msg)
msgTimestamp =
if msg.timestamp > 0:
msg.timestamp
else:
getNanosecondTime(getTime().toUnixFloat())
msgTimestamp = if msg.timestamp > 0: msg.timestamp
else: getNanosecondTime(getTime().toUnixFloat())

trace "handling message",
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
msgTimestamp = msg.timestamp,
usedTimestamp = msgTimestamp,
digest = toHex(msgDigest.data),
messageHash = toHex(msgHash)

pubsubTopic=pubsubTopic,
contentTopic=msg.contentTopic,
msgTimestamp=msg.timestamp,
usedTimestamp=msgTimestamp,
digest=toHex(msgDigest.data),
messageHash=toHex(msgHash)
let insertStartTime = getTime().toUnixFloat()

(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
# Prevent spamming the logs when multiple nodes are connected to the same database.
# In that case, the message cannot be inserted but is an expected "insert error"
# and therefore we reduce its visibility by having the log in trace level.
if "duplicate key value violates unique constraint" in error:
trace "failed to insert message", err = error
else:
debug "failed to insert message", err = error

debug "failed to insert message", err=error
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)

proc findMessages*(
self: WakuArchive, query: ArchiveQuery
): Future[ArchiveResult] {.async, gcsafe.} =
proc findMessages*(self: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {.async, gcsafe.} =
## Search the archive to return a single page of messages matching the query criteria

let maxPageSize =
if query.pageSize <= 0:
DefaultPageSize
else:
min(query.pageSize, MaxPageSize)

let isAscendingOrder = query.direction.into()

if query.contentTopics.len > 10:
return err(ArchiveError.invalidQuery("too many content topics"))

let queryStartTime = getTime().toUnixFloat()

let rows = (
await self.driver.getMessages(
contentTopic = query.contentTopics,
pubsubTopic = query.pubsubTopic,
cursor = query.cursor,
startTime = query.startTime,
endTime = query.endTime,
hashes = query.hashes,
maxPageSize = maxPageSize + 1,
ascendingOrder = isAscendingOrder,
)
).valueOr:
let rows = (await self.driver.getMessages(
contentTopic = query.contentTopics,
pubsubTopic = query.pubsubTopic,
cursor = query.cursor,
startTime = query.startTime,
endTime = query.endTime,
hashes = query.hashes,
maxPageSize = maxPageSize + 1,
ascendingOrder = isAscendingOrder
)).valueOr:
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error))

let queryDuration = getTime().toUnixFloat() - queryStartTime
Expand All @@ -162,36 +150,34 @@ proc findMessages*(
var hashes = newSeq[WakuMessageHash]()
var messages = newSeq[WakuMessage]()
var cursor = none(ArchiveCursor)

if rows.len == 0:
return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor))

## Messages
let pageSize = min(rows.len, int(maxPageSize))

#TODO once store v2 is removed, unzip instead of 2x map
messages = rows[0 ..< pageSize].mapIt(it[1])
hashes = rows[0 ..< pageSize].mapIt(it[4])
messages = rows[0..<pageSize].mapIt(it[1])
hashes = rows[0..<pageSize].mapIt(it[4])

## Cursor
if rows.len > int(maxPageSize):
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)

#TODO Once Store v2 is removed keep only message and hash
let (pubsubTopic, message, digest, storeTimestamp, hash) = rows[^2]

#TODO Once Store v2 is removed, the cursor becomes the hash of the last message
cursor = some(
ArchiveCursor(
digest: MessageDigest.fromBytes(digest),
storeTime: storeTimestamp,
sendertime: message.timestamp,
pubsubTopic: pubsubTopic,
hash: hash,
)
)
cursor = some(ArchiveCursor(
digest: MessageDigest.fromBytes(digest),
storeTime: storeTimestamp,
sendertime: message.timestamp,
pubsubTopic: pubsubTopic,
hash: hash,
))

# All messages MUST be returned in chronological order
if not isAscendingOrder:
Expand All @@ -208,16 +194,15 @@ proc periodicRetentionPolicy(self: WakuArchive) {.async.} =
while true:
(await policy.execute(self.driver)).isOkOr:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error = error
error "failed execution of retention policy", error=error

await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval)

proc periodicMetricReport(self: WakuArchive) {.async.} =
while true:
let countRes = (await self.driver.getMessagesCount())
if countRes.isErr():
error "loopReportStoredMessagesMetric failed to get messages count",
error = countRes.error
error "loopReportStoredMessagesMetric failed to get messages count", error=countRes.error
else:
let count = countRes.get()
waku_archive_messages.set(count, labelValues = ["stored"])
Expand All @@ -239,4 +224,4 @@ proc stopWait*(self: WakuArchive) {.async.} =
if not self.metricsHandle.isNil:
futures.add(self.metricsHandle.cancelAndWait())

await noCancel(allFutures(futures))
await noCancel(allFutures(futures))

0 comments on commit b7637b6

Please sign in to comment.