Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Better postgres duplicate insert #2535

Merged
merged 4 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ suite "Postgres driver":
let msg1 = fakeWakuMessage(ts = now)
let msg2 = fakeWakuMessage(ts = now)

let initialNumMsgs = (await driver.getMessagesCount()).valueOr:
raiseAssert "could not get num mgs correctly: " & $error

var putRes = await driver.put(
DefaultPubsubTopic,
msg1,
Expand All @@ -173,11 +176,24 @@ suite "Postgres driver":
)
assert putRes.isOk(), putRes.error

var newNumMsgs = (await driver.getMessagesCount()).valueOr:
raiseAssert "could not get num mgs correctly: " & $error

assert newNumMsgs == (initialNumMsgs + 1.int64),
"wrong number of messages: " & $newNumMsgs

putRes = await driver.put(
DefaultPubsubTopic,
msg2,
computeDigest(msg2),
computeMessageHash(DefaultPubsubTopic, msg2),
msg2.timestamp,
)
assert not putRes.isOk()

assert putRes.isOk()

newNumMsgs = (await driver.getMessagesCount()).valueOr:
raiseAssert "could not get num mgs correctly: " & $error

assert newNumMsgs == (initialNumMsgs + 1.int64),
"wrong number of messages: " & $newNumMsgs
9 changes: 1 addition & 8 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,7 @@ proc handleMessage*(

(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
# Prevent spamming the logs when multiple nodes are connected to the same database.
# In that case, the message cannot be inserted but is an expected "insert error"
# and therefore we reduce its visibility by having the log in trace level.
if "duplicate key value violates unique constraint" in error:
trace "failed to insert message", err = error
else:
debug "failed to insert message", err = error

debug "failed to insert message", err = error
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)

Expand Down
3 changes: 2 additions & 1 deletion waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type PostgresDriver* = ref object of ArchiveDriver
const InsertRowStmtName = "InsertRow"
const InsertRowStmtDefinition = # TODO: get the sql queries from a file
"""INSERT INTO messages (id, messageHash, storedAt, contentTopic, payload, pubsubTopic,
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8);"""
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING;"""

const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
const SelectNoCursorAscStmtDef =
Expand Down Expand Up @@ -679,6 +679,7 @@ proc loopPartitionFactory(
debug "creating a new partition for the future"
## The current used partition is the last one that was created.
## Thus, let's create another partition for the future.

(
await self.addPartition(
newestPartition.getLastMoment(), PartitionsRangeInterval
Expand Down
Loading