Skip to content

Commit

Permalink
fix: store v3 validate cursor & remove messages (#2636)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed May 1, 2024
1 parent 5dd645c commit e03d116
Show file tree
Hide file tree
Showing 21 changed files with 220 additions and 154 deletions.
146 changes: 88 additions & 58 deletions tests/node/test_wakunode_store.nim

Large diffs are not rendered by default.

19 changes: 12 additions & 7 deletions tests/waku_archive/test_waku_archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ procSuite "Waku Archive - find messages":
waitFor archive.handleMessage("foo", msg2)

## Given
let req = ArchiveQuery(contentTopics: @[topic])
let req = ArchiveQuery(includeData: true, contentTopics: @[topic])

## When
let queryRes = waitFor archive.findMessages(req)
Expand Down Expand Up @@ -218,7 +218,7 @@ procSuite "Waku Archive - find messages":
waitFor archive.handleMessage("foo", msg3)

## Given
let req = ArchiveQuery(contentTopics: @[topic1, topic3])
let req = ArchiveQuery(includeData: true, contentTopics: @[topic1, topic3])

## When
let queryRes = waitFor archive.findMessages(req)
Expand Down Expand Up @@ -283,7 +283,9 @@ procSuite "Waku Archive - find messages":
## Given
# This query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
let req = ArchiveQuery(
pubsubTopic: some(pubsubTopic1), contentTopics: @[contentTopic1, contentTopic3]
includeData: true,
pubsubTopic: some(pubsubTopic1),
contentTopics: @[contentTopic1, contentTopic3],
)

## When
Expand Down Expand Up @@ -349,7 +351,7 @@ procSuite "Waku Archive - find messages":
waitFor archive.handleMessage(pubsubTopic, msg3)

## Given
let req = ArchiveQuery(pubsubTopic: some(pubsubTopic))
let req = ArchiveQuery(includeData: true, pubsubTopic: some(pubsubTopic))

## When
let res = waitFor archive.findMessages(req)
Expand All @@ -367,7 +369,8 @@ procSuite "Waku Archive - find messages":

test "handle query with forward pagination":
## Given
let req = ArchiveQuery(pageSize: 4, direction: PagingDirection.FORWARD)
let req =
ArchiveQuery(includeData: true, pageSize: 4, direction: PagingDirection.FORWARD)

## When
var nextReq = req # copy
Expand Down Expand Up @@ -400,7 +403,8 @@ procSuite "Waku Archive - find messages":

test "handle query with backward pagination":
## Given
let req = ArchiveQuery(pageSize: 4, direction: PagingDirection.BACKWARD)
let req =
ArchiveQuery(includeData: true, pageSize: 4, direction: PagingDirection.BACKWARD)

## When
var nextReq = req # copy
Expand Down Expand Up @@ -463,7 +467,7 @@ procSuite "Waku Archive - find messages":
).isOk()

## Given
let req = ArchiveQuery(contentTopics: @[DefaultContentTopic])
let req = ArchiveQuery(includeData: true, contentTopics: @[DefaultContentTopic])

## When
let res = waitFor archive.findMessages(req)
Expand All @@ -482,6 +486,7 @@ procSuite "Waku Archive - find messages":
test "handle temporal history query with a valid time window":
## Given
let req = ArchiveQuery(
includeData: true,
contentTopics: @[ContentTopic("1")],
startTime: some(ts(15, timeOrigin)),
endTime: some(ts(55, timeOrigin)),
Expand Down
6 changes: 3 additions & 3 deletions tests/waku_store/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ suite "Store Client":
hash3 = computeMessageHash(DefaultPubsubTopic, message3)
messageSeq =
@[
WakuMessageKeyValue(messageHash: hash1, message: message1),
WakuMessageKeyValue(messageHash: hash2, message: message2),
WakuMessageKeyValue(messageHash: hash3, message: message3),
WakuMessageKeyValue(messageHash: hash1, message: some(message1)),
WakuMessageKeyValue(messageHash: hash2, message: some(message2)),
WakuMessageKeyValue(messageHash: hash3, message: some(message3)),
]
handlerFuture = newHistoryFuture()
handler = proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.} =
Expand Down
4 changes: 2 additions & 2 deletions tests/waku_store/test_rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ procSuite "Waku Store - RPC codec":
## Given
let query = StoreQueryRequest(
requestId: "0",
includeData: false,
includeData: true,
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
startTime: some(Timestamp(10)),
Expand Down Expand Up @@ -58,7 +58,7 @@ procSuite "Waku Store - RPC codec":
let
message = fakeWakuMessage()
hash = computeMessageHash(DefaultPubsubTopic, message)
keyValue = WakuMessageKeyValue(messageHash: hash, message: message)
keyValue = WakuMessageKeyValue(messageHash: hash, message: some(message))
res = StoreQueryResponse(
requestId: "1",
statusCode: 200,
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_store/test_waku_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ suite "Waku Store - query handler":

let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
let kv = WakuMessageKeyValue(messageHash: hash, message: msg)
let kv = WakuMessageKeyValue(messageHash: hash, message: some(msg))

var queryHandlerFut = newFuture[(StoreQueryRequest)]()

Expand Down
32 changes: 20 additions & 12 deletions tests/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,19 @@ procSuite "WakuNode - Store":

let hashes = msgListA.mapIt(computeMessageHash(DefaultPubsubTopic, it))

let kvs =
zip(hashes, msgListA).mapIt(WakuMessageKeyValue(messageHash: it[0], message: it[1]))
let kvs = zip(hashes, msgListA).mapIt(
WakuMessageKeyValue(messageHash: it[0], message: some(it[1]))
)

let archiveA = block:
let driver = newSqliteArchiveDriver()

for kv in kvs:
let msg_digest = computeDigest(kv.message)
let message = kv.message.get()
let msg_digest = computeDigest(message)
require (
waitFor driver.put(
DefaultPubsubTopic, kv.message, msg_digest, kv.messageHash,
kv.message.timestamp,
DefaultPubsubTopic, message, msg_digest, kv.messageHash, message.timestamp
)
).isOk()

Expand All @@ -84,7 +85,8 @@ procSuite "WakuNode - Store":
client.mountStoreClient()

## Given
let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic])
let req =
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

## When
Expand Down Expand Up @@ -119,6 +121,7 @@ procSuite "WakuNode - Store":

## Given
let req = StoreQueryRequest(
includeData: true,
contentTopics: @[DefaultContentTopic],
paginationForward: PagingDirection.FORWARD,
paginationLimit: some(uint64(7)),
Expand Down Expand Up @@ -174,6 +177,7 @@ procSuite "WakuNode - Store":

## Given
let req = StoreQueryRequest(
includeData: true,
contentTopics: @[DefaultContentTopic],
paginationLimit: some(uint64(7)),
paginationForward: PagingDirection.BACKWARD,
Expand Down Expand Up @@ -261,7 +265,8 @@ procSuite "WakuNode - Store":
# Wait for the server filter to receive the push message
require waitFor filterFut.withTimeout(5.seconds)

let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic])
let req =
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
let res = waitFor client.query(req, serverPeer)

## Then
Expand All @@ -270,7 +275,8 @@ procSuite "WakuNode - Store":
let response = res.get()
check:
response.messages.len == 1
response.messages[0] == WakuMessageKeyValue(messageHash: hash, message: message)
response.messages[0] ==
WakuMessageKeyValue(messageHash: hash, message: some(message))

let (handledPubsubTopic, handledMsg) = filterFut.read()
check:
Expand Down Expand Up @@ -341,7 +347,8 @@ procSuite "WakuNode - Store":
client.mountStoreClient()

## Given
let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic])
let req =
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

let requestProc = proc() {.async.} =
Expand All @@ -351,7 +358,7 @@ procSuite "WakuNode - Store":

let response = queryRes.get()
check:
response.messages.mapIt(it.message) == msgListA
response.messages.mapIt(it.message.get()) == msgListA

for count in 0 ..< 4:
waitFor requestProc()
Expand Down Expand Up @@ -384,7 +391,8 @@ procSuite "WakuNode - Store":
client.mountStoreClient()

## Given
let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic])
let req =
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

let successProc = proc() {.async.} =
Expand All @@ -393,7 +401,7 @@ procSuite "WakuNode - Store":
check queryRes.isOk()
let response = queryRes.get()
check:
response.messages.mapIt(it.message) == msgListA
response.messages.mapIt(it.message.get()) == msgListA

let failsProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)
Expand Down
30 changes: 17 additions & 13 deletions tests/wakunode_rest/test_rest_store.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options, times],
std/[options, times, sugar],
stew/shims/net as stewNet,
chronicles,
testutils/unittests,
Expand Down Expand Up @@ -224,9 +224,10 @@ procSuite "Waku Rest API - Store v3":
"7", # page size. Empty implies default page size.
)

var wakuMessages = newSeq[WakuMessage](0)
for j in 0 ..< response.data.messages.len:
wakuMessages.add(response.data.messages[j].message)
let wakuMessages = collect(newSeq):
for element in response.data.messages:
if element.message.isSome():
element.message.get()

pages[i] = wakuMessages

Expand Down Expand Up @@ -620,15 +621,16 @@ procSuite "Waku Rest API - Store v3":
let client = newRestHttpClient(initTAddress(restAddress, restPort))

# Filtering by a known pubsub topic.
var response =
await client.getStoreMessagesV3(pubsubTopic = encodeUrl(DefaultPubsubTopic))
var response = await client.getStoreMessagesV3(
includeData = "true", pubsubTopic = encodeUrl(DefaultPubsubTopic)
)

check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.messages.len == 1

let storeMessage = response.data.messages[0].message
let storeMessage = response.data.messages[0].message.get()

check:
storeMessage.payload == msg.payload
Expand Down Expand Up @@ -710,9 +712,10 @@ procSuite "Waku Rest API - Store v3":
"3", # page size. Empty implies default page size.
)

var wakuMessages = newSeq[WakuMessage](0)
for j in 0 ..< response.data.messages.len:
wakuMessages.add(response.data.messages[j].message)
let wakuMessages = collect(newSeq):
for element in response.data.messages:
if element.message.isSome():
element.message.get()

pages[i] = wakuMessages

Expand Down Expand Up @@ -773,9 +776,10 @@ procSuite "Waku Rest API - Store v3":
response.status == 200
$response.contentType == $MIMETYPE_JSON

var wakuMessages = newSeq[WakuMessage](0)
for j in 0 ..< response.data.messages.len:
wakuMessages.add(response.data.messages[j].message)
let wakuMessages = collect(newSeq):
for element in response.data.messages:
if element.message.isSome():
element.message.get()

check wakuMessages == msgList[6 .. 9]

Expand Down
2 changes: 0 additions & 2 deletions waku/common/protobuf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ proc write3*(proto: var ProtoBuffer, field: int, value: auto) =
when value is Option:
if value.isSome():
proto.write(field, value.get())
elif value is bool:
proto.write(field, zint(value))
else:
proto.write(field, value)

Expand Down
15 changes: 12 additions & 3 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ when defined(waku_exp_store_resume):
proc toArchiveQuery(request: StoreQueryRequest): ArchiveQuery =
var query = ArchiveQuery()

query.includeData = request.includeData
query.pubsubTopic = request.pubsubTopic
query.contentTopics = request.contentTopics
query.startTime = request.startTime
Expand All @@ -834,9 +835,17 @@ proc toStoreResult(res: ArchiveResult): StoreQueryResult =

res.statusCode = 200
res.statusDesc = "OK"
res.messages = response.hashes.zip(response.messages).mapIt(
WakuMessageKeyValue(messageHash: it[0], message: it[1])
)

for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]

let kv =
store_common.WakuMessageKeyValue(messageHash: hash, message: none(WakuMessage))

res.messages.add(kv)

for i in 0 ..< response.messages.len:
res.messages[i].message = some(response.messages[i])

if response.cursor.isSome():
res.paginationCursor = some(response.cursor.get().hash)
Expand Down
9 changes: 4 additions & 5 deletions waku/waku_api/rest/store/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ proc writeValue*(
writer.beginRecord()

writer.writeField("message_hash", value.messageHash)
writer.writeField("message", value.message)

if value.message.isSome():
writer.writeField("message", value.message.get())

writer.endRecord()

Expand Down Expand Up @@ -217,10 +219,7 @@ proc readValue*(
if messageHash.isNone():
reader.raiseUnexpectedValue("Field `message_hash` is missing")

if message.isNone():
reader.raiseUnexpectedValue("Field `message` is missing")

value = WakuMessageKeyValue(messageHash: messageHash.get(), message: message.get())
value = WakuMessageKeyValue(messageHash: messageHash.get(), message: message)

## StoreQueryResponse serde

Expand Down
11 changes: 9 additions & 2 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,14 @@ proc findMessages*(
if query.contentTopics.len > 10:
return err(ArchiveError.invalidQuery("too many content topics"))

if query.cursor.isSome() and query.cursor.get().hash.len != 32:
return err(ArchiveError.invalidQuery("invalid cursor hash length"))

let queryStartTime = getTime().toUnixFloat()

let rows = (
await self.driver.getMessages(
includeData = query.includeData,
contentTopic = query.contentTopics,
pubsubTopic = query.pubsubTopic,
cursor = query.cursor,
Expand All @@ -174,7 +178,10 @@ proc findMessages*(
let pageSize = min(rows.len, int(maxPageSize))

#TODO once store v2 is removed, unzip instead of 2x map
messages = rows[0 ..< pageSize].mapIt(it[1])
#TODO once store v2 is removed, update driver to not return messages when not needed
if query.includeData:
messages = rows[0 ..< pageSize].mapIt(it[1])

hashes = rows[0 ..< pageSize].mapIt(it[4])

## Cursor
Expand Down Expand Up @@ -206,7 +213,7 @@ proc findMessages*(

proc findMessagesV2*(
self: WakuArchive, query: ArchiveQuery
): Future[ArchiveResult] {.async, gcsafe.} =
): Future[ArchiveResult] {.async, deprecated, gcsafe.} =
## Search the archive to return a single page of messages matching the query criteria

let maxPageSize =
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type
hash*: WakuMessageHash

ArchiveQuery* = object
includeData*: bool # indicate if messages should be returned in addition to hashes.
pubsubTopic*: Option[PubsubTopic]
contentTopics*: seq[ContentTopic]
cursor*: Option[ArchiveCursor]
Expand Down
Loading

0 comments on commit e03d116

Please sign in to comment.