From b7637b6a7fb2f41a90ff38ab50fbdfe66e426a0d Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 15 Mar 2024 23:38:34 +0100 Subject: [PATCH] archive: simpler logs that we had due to duplicate rows insertion errors --- waku/waku_archive/archive.nim | 131 +++++++++++++++------------------- 1 file changed, 58 insertions(+), 73 deletions(-) diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 903852a2b4..3f9f5b346d 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -25,18 +25,17 @@ const DefaultPageSize*: uint = 20 MaxPageSize*: uint = 100 - # Retention policy +# Retention policy WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30) - # Metrics reporting +# Metrics reporting WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(1) - # Message validation - # 20 seconds maximum allowable sender timestamp "drift" - MaxMessageTimestampVariance* = getNanoSecondTime(20) +# Message validation +# 20 seconds maximum allowable sender timestamp "drift" + MaxMessageTimestampVariance* = getNanoSecondTime(20) -type MessageValidator* = - proc(msg: WakuMessage): Result[void, string] {.closure, gcsafe, raises: [].} +type MessageValidator* = proc(msg: WakuMessage): Result[void, string] {.closure, gcsafe, raises: [].} ## Archive @@ -54,7 +53,7 @@ proc validate*(msg: WakuMessage): Result[void, string] = if msg.ephemeral: # Ephemeral message, do not store return - + if msg.timestamp == 0: return ok() @@ -71,23 +70,26 @@ proc validate*(msg: WakuMessage): Result[void, string] = return ok() -proc new*( - T: type WakuArchive, - driver: ArchiveDriver, - validator: MessageValidator = validate, - retentionPolicy = none(RetentionPolicy), -): Result[T, string] = +proc new*(T: type WakuArchive, + driver: ArchiveDriver, + validator: MessageValidator = validate, + retentionPolicy = none(RetentionPolicy)): + Result[T, string] = if driver.isNil(): return err("archive driver is Nil") let archive = - WakuArchive(driver: driver, validator: validator, retentionPolicy: retentionPolicy) + WakuArchive( + driver: driver, + validator: validator, + retentionPolicy: retentionPolicy, + ) return ok(archive) -proc handleMessage*( - self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage -) {.async.} = +proc handleMessage*(self: WakuArchive, + pubsubTopic: PubsubTopic, + msg: WakuMessage) {.async.} = self.validator(msg).isOkOr: waku_archive_errors.inc(labelValues = [error]) return @@ -95,46 +97,34 @@ proc handleMessage*( let msgDigest = computeDigest(msg) msgHash = computeMessageHash(pubsubTopic, msg) - msgTimestamp = - if msg.timestamp > 0: - msg.timestamp - else: - getNanosecondTime(getTime().toUnixFloat()) + msgTimestamp = if msg.timestamp > 0: msg.timestamp + else: getNanosecondTime(getTime().toUnixFloat()) trace "handling message", - pubsubTopic = pubsubTopic, - contentTopic = msg.contentTopic, - msgTimestamp = msg.timestamp, - usedTimestamp = msgTimestamp, - digest = toHex(msgDigest.data), - messageHash = toHex(msgHash) - + pubsubTopic=pubsubTopic, + contentTopic=msg.contentTopic, + msgTimestamp=msg.timestamp, + usedTimestamp=msgTimestamp, + digest=toHex(msgDigest.data), + messageHash=toHex(msgHash) + let insertStartTime = getTime().toUnixFloat() (await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) - # Prevent spamming the logs when multiple nodes are connected to the same database. - # In that case, the message cannot be inserted but is an expected "insert error" - # and therefore we reduce its visibility by having the log in trace level. - if "duplicate key value violates unique constraint" in error: - trace "failed to insert message", err = error - else: - debug "failed to insert message", err = error - + debug "failed to insert message", err=error let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) -proc findMessages*( - self: WakuArchive, query: ArchiveQuery -): Future[ArchiveResult] {.async, gcsafe.} = +proc findMessages*(self: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {.async, gcsafe.} = ## Search the archive to return a single page of messages matching the query criteria - + let maxPageSize = if query.pageSize <= 0: DefaultPageSize else: min(query.pageSize, MaxPageSize) - + let isAscendingOrder = query.direction.into() if query.contentTopics.len > 10: @@ -142,18 +132,16 @@ proc findMessages*( let queryStartTime = getTime().toUnixFloat() - let rows = ( - await self.driver.getMessages( - contentTopic = query.contentTopics, - pubsubTopic = query.pubsubTopic, - cursor = query.cursor, - startTime = query.startTime, - endTime = query.endTime, - hashes = query.hashes, - maxPageSize = maxPageSize + 1, - ascendingOrder = isAscendingOrder, - ) - ).valueOr: + let rows = (await self.driver.getMessages( + contentTopic = query.contentTopics, + pubsubTopic = query.pubsubTopic, + cursor = query.cursor, + startTime = query.startTime, + endTime = query.endTime, + hashes = query.hashes, + maxPageSize = maxPageSize + 1, + ascendingOrder = isAscendingOrder + )).valueOr: return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error)) let queryDuration = getTime().toUnixFloat() - queryStartTime @@ -162,36 +150,34 @@ proc findMessages*( var hashes = newSeq[WakuMessageHash]() var messages = newSeq[WakuMessage]() var cursor = none(ArchiveCursor) - + if rows.len == 0: return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor)) ## Messages let pageSize = min(rows.len, int(maxPageSize)) - + #TODO once store v2 is removed, unzip instead of 2x map - messages = rows[0 ..< pageSize].mapIt(it[1]) - hashes = rows[0 ..< pageSize].mapIt(it[4]) + messages = rows[0.. int(maxPageSize): ## Build last message cursor ## The cursor is built from the last message INCLUDED in the response ## (i.e. the second last message in the rows list) - + #TODO Once Store v2 is removed keep only message and hash let (pubsubTopic, message, digest, storeTimestamp, hash) = rows[^2] #TODO Once Store v2 is removed, the cursor becomes the hash of the last message - cursor = some( - ArchiveCursor( - digest: MessageDigest.fromBytes(digest), - storeTime: storeTimestamp, - sendertime: message.timestamp, - pubsubTopic: pubsubTopic, - hash: hash, - ) - ) + cursor = some(ArchiveCursor( + digest: MessageDigest.fromBytes(digest), + storeTime: storeTimestamp, + sendertime: message.timestamp, + pubsubTopic: pubsubTopic, + hash: hash, + )) # All messages MUST be returned in chronological order if not isAscendingOrder: @@ -208,7 +194,7 @@ proc periodicRetentionPolicy(self: WakuArchive) {.async.} = while true: (await policy.execute(self.driver)).isOkOr: waku_archive_errors.inc(labelValues = [retPolicyFailure]) - error "failed execution of retention policy", error = error + error "failed execution of retention policy", error=error await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval) @@ -216,8 +202,7 @@ proc periodicMetricReport(self: WakuArchive) {.async.} = while true: let countRes = (await self.driver.getMessagesCount()) if countRes.isErr(): - error "loopReportStoredMessagesMetric failed to get messages count", - error = countRes.error + error "loopReportStoredMessagesMetric failed to get messages count", error=countRes.error else: let count = countRes.get() waku_archive_messages.set(count, labelValues = ["stored"]) @@ -239,4 +224,4 @@ proc stopWait*(self: WakuArchive) {.async.} = if not self.metricsHandle.isNil: futures.add(self.metricsHandle.cancelAndWait()) - await noCancel(allFutures(futures)) + await noCancel(allFutures(futures)) \ No newline at end of file