Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: store v3 return pubsub topics #2676

Merged
merged 4 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions tests/node/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ suite "Waku Store - End to End - Sorted Archive":
]
archiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down Expand Up @@ -337,7 +339,9 @@ suite "Waku Store - End to End - Sorted Archive":
archiveMessages &
extraMessages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down Expand Up @@ -392,7 +396,9 @@ suite "Waku Store - End to End - Sorted Archive":
archiveMessages &
extraMessages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down Expand Up @@ -558,7 +564,9 @@ suite "Waku Store - End to End - Unsorted Archive":
]
unsortedArchiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down Expand Up @@ -773,7 +781,9 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
]
unsortedArchiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down Expand Up @@ -915,14 +925,18 @@ suite "Waku Store - End to End - Archive with Multiple Topics":

archiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

for i in 6 ..< 10:
archiveMessages[i].messagehash =
computeMessageHash(pubsubTopicB, archiveMessages[i].message.get())

archiveMessages[i].pubsubTopic = some(pubsubTopicB)

let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
Expand Down Expand Up @@ -1274,7 +1288,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics":

let voluminousArchiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down
18 changes: 15 additions & 3 deletions tests/waku_store/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,21 @@ suite "Store Client":
hash3 = computeMessageHash(DefaultPubsubTopic, message3)
messageSeq =
@[
WakuMessageKeyValue(messageHash: hash1, message: some(message1)),
WakuMessageKeyValue(messageHash: hash2, message: some(message2)),
WakuMessageKeyValue(messageHash: hash3, message: some(message3)),
WakuMessageKeyValue(
messageHash: hash1,
message: some(message1),
pubsubTopic: some(DefaultPubsubTopic),
),
WakuMessageKeyValue(
messageHash: hash2,
message: some(message2),
pubsubTopic: some(DefaultPubsubTopic),
),
WakuMessageKeyValue(
messageHash: hash3,
message: some(message3),
pubsubTopic: some(DefaultPubsubTopic),
),
]
handlerFuture = newHistoryFuture()
handler = proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.} =
Expand Down
4 changes: 3 additions & 1 deletion tests/waku_store/test_rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ procSuite "Waku Store - RPC codec":
let
message = fakeWakuMessage()
hash = computeMessageHash(DefaultPubsubTopic, message)
keyValue = WakuMessageKeyValue(messageHash: hash, message: some(message))
keyValue = WakuMessageKeyValue(
messageHash: hash, message: some(message), pubsubTopic: some(DefaultPubsubTopic)
)
res = StoreQueryResponse(
requestId: "1",
statusCode: 200,
Expand Down
4 changes: 3 additions & 1 deletion tests/waku_store/test_waku_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ suite "Waku Store - query handler":

let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
let kv = WakuMessageKeyValue(messageHash: hash, message: some(msg))
let kv = WakuMessageKeyValue(
messageHash: hash, message: some(msg), pubsubTopic: some(DefaultPubsubTopic)
)

var queryHandlerFut = newFuture[(StoreQueryRequest)]()

Expand Down
10 changes: 8 additions & 2 deletions tests/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ procSuite "WakuNode - Store":
let hashes = msgListA.mapIt(computeMessageHash(DefaultPubsubTopic, it))

let kvs = zip(hashes, msgListA).mapIt(
WakuMessageKeyValue(messageHash: it[0], message: some(it[1]))
WakuMessageKeyValue(
messageHash: it[0], message: some(it[1]), pubsubTopic: some(DefaultPubsubTopic)
)
)

let archiveA = block:
Expand Down Expand Up @@ -276,7 +278,11 @@ procSuite "WakuNode - Store":
check:
response.messages.len == 1
response.messages[0] ==
WakuMessageKeyValue(messageHash: hash, message: some(message))
WakuMessageKeyValue(
messageHash: hash,
message: some(message),
pubsubTopic: some(DefaultPubSubTopic),
)

let (handledPubsubTopic, handledMsg) = filterFut.read()
check:
Expand Down
4 changes: 2 additions & 2 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -838,13 +838,13 @@ proc toStoreResult(res: ArchiveResult): StoreQueryResult =
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]

let kv =
store_common.WakuMessageKeyValue(messageHash: hash, message: none(WakuMessage))
let kv = store_common.WakuMessageKeyValue(messageHash: hash)

res.messages.add(kv)

for i in 0 ..< response.messages.len:
res.messages[i].message = some(response.messages[i])
res.messages[i].pubsubTopic = some(response.topics[i])

if response.cursor.isSome():
res.paginationCursor = some(response.cursor.get().hash)
Expand Down
9 changes: 7 additions & 2 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ proc findMessages*(

var hashes = newSeq[WakuMessageHash]()
var messages = newSeq[WakuMessage]()
var topics = newSeq[PubsubTopic]()
var cursor = none(ArchiveCursor)

if rows.len == 0:
Expand All @@ -180,6 +181,7 @@ proc findMessages*(
#TODO once store v2 is removed, unzip instead of 2x map
#TODO once store v2 is removed, update driver to not return messages when not needed
if query.includeData:
topics = rows[0 ..< pageSize].mapIt(it[0])
messages = rows[0 ..< pageSize].mapIt(it[1])

hashes = rows[0 ..< pageSize].mapIt(it[4])
Expand All @@ -206,10 +208,13 @@ proc findMessages*(

# All messages MUST be returned in chronological order
if not isAscendingOrder:
reverse(messages)
reverse(hashes)
reverse(messages)
reverse(topics)

return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor))
return ok(
ArchiveResponse(hashes: hashes, messages: messages, topics: topics, cursor: cursor)
)

proc findMessagesV2*(
self: WakuArchive, query: ArchiveQuery
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type
ArchiveResponse* = object
hashes*: seq[WakuMessageHash]
messages*: seq[WakuMessage]
topics*: seq[PubsubTopic]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm. Wouldn't it be better to return seq[(message, topic)] to keep the 1:1 association explicit?

Copy link
Contributor Author

@SionoiS SionoiS May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the archive driver lvl it's separate then you would use a tuple at the archive lvl then undo the tuple at the Store lvl.

I tried it. I does not make much sense.

cursor*: Option[ArchiveCursor]

ArchiveErrorKind* {.pure.} = enum
Expand Down
1 change: 1 addition & 0 deletions waku/waku_store/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type
WakuMessageKeyValue* = object
messageHash*: WakuMessageHash
message*: Option[WakuMessage]
pubsubTopic*: Option[PubsubTopic]

StoreQueryResponse* = object
requestId*: string
Expand Down
12 changes: 8 additions & 4 deletions waku/waku_store/rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ proc encode*(keyValue: WakuMessageKeyValue): ProtoBuffer =

pb.write3(1, keyValue.messageHash)

if keyValue.message.isSome():
if keyValue.message.isSome() and keyValue.pubsubTopic.isSome():
pb.write3(2, keyValue.message.get().encode())
pb.write3(3, keyValue.pubsubTopic.get())

pb.finish3()

Expand Down Expand Up @@ -164,10 +165,13 @@ proc decode*(
keyValue.messagehash = hash

var proto: ProtoBuffer
if not ?pb.getField(2, proto):
keyValue.message = none(WakuMessage)
else:
var topic: string
if ?pb.getField(2, proto) and ?pb.getField(3, topic):
keyValue.message = some(?WakuMessage.decode(proto.buffer))
keyValue.pubsubTopic = some(topic)
else:
keyValue.message = none(WakuMessage)
keyValue.pubsubTopic = none(string)

return ok(keyValue)

Expand Down
Loading