From e03d1165e64e35e35073e78f755462cbdfdb080b Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Wed, 1 May 2024 14:47:06 -0400 Subject: [PATCH] fix: store v3 validate cursor & remove messages (#2636) --- tests/node/test_wakunode_store.nim | 146 +++++++++++------- tests/waku_archive/test_waku_archive.nim | 19 ++- tests/waku_store/test_client.nim | 6 +- tests/waku_store/test_rpc_codec.nim | 4 +- tests/waku_store/test_waku_store.nim | 2 +- tests/waku_store/test_wakunode_store.nim | 32 ++-- tests/wakunode_rest/test_rest_store.nim | 30 ++-- waku/common/protobuf.nim | 2 - waku/node/waku_node.nim | 15 +- waku/waku_api/rest/store/types.nim | 9 +- waku/waku_archive/archive.nim | 11 +- waku/waku_archive/common.nim | 1 + waku/waku_archive/driver.nim | 3 +- .../postgres_driver/postgres_driver.nim | 7 +- .../driver/queue_driver/queue_driver.nim | 1 + .../driver/sqlite_driver/queries.nim | 53 ++++--- .../driver/sqlite_driver/sqlite_driver.nim | 3 +- waku/waku_core/message/codec.nim | 4 +- waku/waku_store/common.nim | 2 +- waku/waku_store/protocol.nim | 10 +- waku/waku_store/rpc_codec.nim | 14 +- 21 files changed, 220 insertions(+), 154 deletions(-) diff --git a/tests/node/test_wakunode_store.nim b/tests/node/test_wakunode_store.nim index 47f5e63a68..b7e5651b29 100644 --- a/tests/node/test_wakunode_store.nim +++ b/tests/node/test_wakunode_store.nim @@ -59,10 +59,13 @@ suite "Waku Store - End to End - Sorted Archive": fakeWakuMessage(@[byte 09], ts = ts(90, timeOrigin)), ] archiveMessages = messages.mapIt( - WakuMessageKeyValue(messageHash: computeMessageHash(pubsubTopic, it), message: it) + WakuMessageKeyValue( + messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + ) ) storeQuery = StoreQueryRequest( + includeData: true, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, paginationForward: PagingDirection.Forward, @@ -102,6 +105,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query var otherHistoryQuery = StoreQueryRequest( + includeData: true, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, paginationCursor: queryResponse.get().paginationCursor, @@ -130,6 +134,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query var nextHistoryQuery = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -159,6 +164,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query (2/5) let historyQuery2 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse1.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -175,6 +181,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query (3/5) let historyQuery3 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse2.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -191,6 +198,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query (4/5) let historyQuery4 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse3.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -207,6 +215,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query (5/5) let historyQuery5 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse4.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -234,6 +243,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query (2/2) let historyQuery2 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse1.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -272,6 +282,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query (2/3) let historyQuery2 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse1.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -288,6 +299,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query (3/3) let historyQuery3 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse2.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -310,7 +322,7 @@ suite "Waku Store - End to End - Sorted Archive": let missingMessagesAmount = archive.DefaultPageSize - currentStoreLen + 5 let lastMessageTimestamp = - archiveMessages[archiveMessages.len - 1].message.timestamp + archiveMessages[archiveMessages.len - 1].message.get().timestamp var extraMessages: seq[WakuMessage] = @[] for i in 0 ..< missingMessagesAmount: let @@ -325,7 +337,7 @@ suite "Waku Store - End to End - Sorted Archive": archiveMessages & extraMessages.mapIt( WakuMessageKeyValue( - messageHash: computeMessageHash(pubsubTopic, it), message: it + messageHash: computeMessageHash(pubsubTopic, it), message: some(it) ) ) @@ -341,6 +353,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query (2/2) let historyQuery2 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse1.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -364,7 +377,7 @@ suite "Waku Store - End to End - Sorted Archive": let missingMessagesAmount = archive.DefaultPageSize - currentStoreLen + 5 let lastMessageTimestamp = - archiveMessages[archiveMessages.len - 1].message.timestamp + archiveMessages[archiveMessages.len - 1].message.get().timestamp var extraMessages: seq[WakuMessage] = @[] for i in 0 ..< missingMessagesAmount: let @@ -379,12 +392,13 @@ suite "Waku Store - End to End - Sorted Archive": archiveMessages & extraMessages.mapIt( WakuMessageKeyValue( - messageHash: computeMessageHash(pubsubTopic, it), message: it + messageHash: computeMessageHash(pubsubTopic, it), message: some(it) ) ) # Given a query with default page size (1/2) storeQuery = StoreQueryRequest( + includeData: true, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, paginationForward: PagingDirection.FORWARD, @@ -399,6 +413,7 @@ suite "Waku Store - End to End - Sorted Archive": # Given the next query (2/2) let historyQuery2 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -457,8 +472,9 @@ suite "Waku Store - End to End - Sorted Archive": asyncTest "Cursor Reusability Across Nodes": # Given a different server node with the same archive let - otherArchiveDriverWithMessages = - newArchiveDriverWithMessages(pubsubTopic, archiveMessages.mapIt(it.message)) + otherArchiveDriverWithMessages = newArchiveDriverWithMessages( + pubsubTopic, archiveMessages.mapIt(it.message.get()) + ) otherServerKey = generateSecp256k1Key() otherServer = newTestWakuNode(otherServerKey, ValidIpAddress.init("0.0.0.0"), Port(0)) @@ -483,6 +499,7 @@ suite "Waku Store - End to End - Sorted Archive": # When making a history query to the second server node let otherHistoryQuery = StoreQueryRequest( + includeData: true, paginationCursor: paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -518,6 +535,7 @@ suite "Waku Store - End to End - Unsorted Archive": contentTopicSeq = @[contentTopic] storeQuery = StoreQueryRequest( + includeData: true, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, paginationForward: PagingDirection.FORWARD, @@ -539,7 +557,9 @@ suite "Waku Store - End to End - Unsorted Archive": fakeWakuMessage(@[byte 05], ts = ts(20, timeOrigin)), ] unsortedArchiveMessages = messages.mapIt( - WakuMessageKeyValue(messageHash: computeMessageHash(pubsubTopic, it), message: it) + WakuMessageKeyValue( + messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + ) ) let @@ -575,17 +595,17 @@ suite "Waku Store - End to End - Unsorted Archive": check: queryResponse.get().messages.len == 5 - queryResponse.get().messages[0].message.timestamp == - queryResponse.get().messages[1].message.timestamp + queryResponse.get().messages[0].message.get().timestamp == + queryResponse.get().messages[1].message.get().timestamp - queryResponse.get().messages[1].message.timestamp == - queryResponse.get().messages[2].message.timestamp + queryResponse.get().messages[1].message.get().timestamp == + queryResponse.get().messages[2].message.get().timestamp - queryResponse.get().messages[2].message.timestamp < - queryResponse.get().messages[3].message.timestamp + queryResponse.get().messages[2].message.get().timestamp < + queryResponse.get().messages[3].message.get().timestamp - queryResponse.get().messages[3].message.timestamp == - queryResponse.get().messages[4].message.timestamp + queryResponse.get().messages[3].message.get().timestamp == + queryResponse.get().messages[4].message.get().timestamp toHex(queryResponse.get().messages[0].messageHash) < toHex(queryResponse.get().messages[1].messageHash) @@ -598,6 +618,7 @@ suite "Waku Store - End to End - Unsorted Archive": # Given the next query var historyQuery2 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -610,17 +631,17 @@ suite "Waku Store - End to End - Unsorted Archive": # Check the ordering check: - queryResponse2.get().messages[0].message.timestamp < - queryResponse2.get().messages[1].message.timestamp + queryResponse2.get().messages[0].message.get().timestamp < + queryResponse2.get().messages[1].message.get().timestamp - queryResponse2.get().messages[1].message.timestamp == - queryResponse2.get().messages[2].message.timestamp + queryResponse2.get().messages[1].message.get().timestamp == + queryResponse2.get().messages[2].message.get().timestamp - queryResponse2.get().messages[2].message.timestamp == - queryResponse2.get().messages[3].message.timestamp + queryResponse2.get().messages[2].message.get().timestamp == + queryResponse2.get().messages[3].message.get().timestamp - queryResponse2.get().messages[3].message.timestamp == - queryResponse2.get().messages[4].message.timestamp + queryResponse2.get().messages[3].message.get().timestamp == + queryResponse2.get().messages[4].message.get().timestamp toHex(queryResponse2.get().messages[1].messageHash) < toHex(queryResponse2.get().messages[2].messageHash) @@ -651,11 +672,11 @@ suite "Waku Store - End to End - Unsorted Archive": check: queryResponse.get().messages.len == 3 - queryResponse.get().messages[0].message.timestamp == - queryResponse.get().messages[1].message.timestamp + queryResponse.get().messages[0].message.get().timestamp == + queryResponse.get().messages[1].message.get().timestamp - queryResponse.get().messages[1].message.timestamp == - queryResponse.get().messages[2].message.timestamp + queryResponse.get().messages[1].message.get().timestamp == + queryResponse.get().messages[2].message.get().timestamp toHex(queryResponse.get().messages[0].messageHash) < toHex(queryResponse.get().messages[1].messageHash) @@ -684,20 +705,20 @@ suite "Waku Store - End to End - Unsorted Archive": check: queryResponse.get().messages.len == 6 - queryResponse.get().messages[0].message.timestamp == - queryResponse.get().messages[1].message.timestamp + queryResponse.get().messages[0].message.get().timestamp == + queryResponse.get().messages[1].message.get().timestamp - queryResponse.get().messages[1].message.timestamp < - queryResponse.get().messages[2].message.timestamp + queryResponse.get().messages[1].message.get().timestamp < + queryResponse.get().messages[2].message.get().timestamp - queryResponse.get().messages[2].message.timestamp == - queryResponse.get().messages[3].message.timestamp + queryResponse.get().messages[2].message.get().timestamp == + queryResponse.get().messages[3].message.get().timestamp - queryResponse.get().messages[3].message.timestamp == - queryResponse.get().messages[4].message.timestamp + queryResponse.get().messages[3].message.get().timestamp == + queryResponse.get().messages[4].message.get().timestamp - queryResponse.get().messages[4].message.timestamp == - queryResponse.get().messages[5].message.timestamp + queryResponse.get().messages[4].message.get().timestamp == + queryResponse.get().messages[5].message.get().timestamp toHex(queryResponse.get().messages[0].messageHash) < toHex(queryResponse.get().messages[1].messageHash) @@ -730,6 +751,7 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp": contentTopicSeq = @[contentTopic] storeQuery = StoreQueryRequest( + includeData: true, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, paginationForward: PagingDirection.FORWARD, @@ -750,7 +772,9 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp": fakeWakuMessage(@[byte 08]), ] unsortedArchiveMessages = messages.mapIt( - WakuMessageKeyValue(messageHash: computeMessageHash(pubsubTopic, it), message: it) + WakuMessageKeyValue( + messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + ) ) let @@ -785,20 +809,21 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp": check: queryResponse.get().messages.len == 5 - queryResponse.get().messages[0].message.timestamp <= - queryResponse.get().messages[1].message.timestamp + queryResponse.get().messages[0].message.get().timestamp <= + queryResponse.get().messages[1].message.get().timestamp - queryResponse.get().messages[1].message.timestamp <= - queryResponse.get().messages[2].message.timestamp + queryResponse.get().messages[1].message.get().timestamp <= + queryResponse.get().messages[2].message.get().timestamp - queryResponse.get().messages[2].message.timestamp <= - queryResponse.get().messages[3].message.timestamp + queryResponse.get().messages[2].message.get().timestamp <= + queryResponse.get().messages[3].message.get().timestamp - queryResponse.get().messages[3].message.timestamp <= - queryResponse.get().messages[4].message.timestamp + queryResponse.get().messages[3].message.get().timestamp <= + queryResponse.get().messages[4].message.get().timestamp # Given the next query var historyQuery2 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse.get().paginationCursor, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, @@ -820,17 +845,17 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp": queryResponse2.get().messages.len == 5 - queryResponse2.get().messages[0].message.timestamp <= - queryResponse2.get().messages[1].message.timestamp + queryResponse2.get().messages[0].message.get().timestamp <= + queryResponse2.get().messages[1].message.get().timestamp - queryResponse2.get().messages[1].message.timestamp <= - queryResponse2.get().messages[2].message.timestamp + queryResponse2.get().messages[1].message.get().timestamp <= + queryResponse2.get().messages[2].message.get().timestamp - queryResponse2.get().messages[2].message.timestamp <= - queryResponse2.get().messages[3].message.timestamp + queryResponse2.get().messages[2].message.get().timestamp <= + queryResponse2.get().messages[3].message.get().timestamp - queryResponse2.get().messages[3].message.timestamp <= - queryResponse2.get().messages[4].message.timestamp + queryResponse2.get().messages[3].message.get().timestamp <= + queryResponse2.get().messages[4].message.get().timestamp suite "Waku Store - End to End - Archive with Multiple Topics": var pubsubTopic {.threadvar.}: PubsubTopic @@ -861,6 +886,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics": @[contentTopic, contentTopicB, contentTopicC, contentTopicSpecials] storeQuery = StoreQueryRequest( + includeData: true, pubsubTopic: some(pubsubTopic), contentTopics: contentTopicSeq, paginationForward: PagingDirection.FORWARD, @@ -888,12 +914,14 @@ suite "Waku Store - End to End - Archive with Multiple Topics": ] archiveMessages = messages.mapIt( - WakuMessageKeyValue(messageHash: computeMessageHash(pubsubTopic, it), message: it) + WakuMessageKeyValue( + messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + ) ) for i in 6 ..< 10: archiveMessages[i].messagehash = - computeMessageHash(pubsubTopicB, archiveMessages[i].message) + computeMessageHash(pubsubTopicB, archiveMessages[i].message.get()) let serverKey = generateSecp256k1Key() @@ -961,6 +989,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics": # Given the next query let historyQuery2 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse.get().paginationCursor, pubsubTopic: none(PubsubTopic), contentTopics: contentTopicSeq, @@ -1028,6 +1057,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics": # Given the next query let historyQuery2 = StoreQueryRequest( + includeData: true, paginationCursor: queryResponse.get().paginationCursor, pubsubTopic: none(PubsubTopic), contentTopics: contentTopicSeq, @@ -1244,7 +1274,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics": let voluminousArchiveMessages = messages.mapIt( WakuMessageKeyValue( - messageHash: computeMessageHash(pubsubTopic, it), message: it + messageHash: computeMessageHash(pubsubTopic, it), message: some(it) ) ) diff --git a/tests/waku_archive/test_waku_archive.nim b/tests/waku_archive/test_waku_archive.nim index 45947c76ea..8408d3f3de 100644 --- a/tests/waku_archive/test_waku_archive.nim +++ b/tests/waku_archive/test_waku_archive.nim @@ -183,7 +183,7 @@ procSuite "Waku Archive - find messages": waitFor archive.handleMessage("foo", msg2) ## Given - let req = ArchiveQuery(contentTopics: @[topic]) + let req = ArchiveQuery(includeData: true, contentTopics: @[topic]) ## When let queryRes = waitFor archive.findMessages(req) @@ -218,7 +218,7 @@ procSuite "Waku Archive - find messages": waitFor archive.handleMessage("foo", msg3) ## Given - let req = ArchiveQuery(contentTopics: @[topic1, topic3]) + let req = ArchiveQuery(includeData: true, contentTopics: @[topic1, topic3]) ## When let queryRes = waitFor archive.findMessages(req) @@ -283,7 +283,9 @@ procSuite "Waku Archive - find messages": ## Given # This query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) let req = ArchiveQuery( - pubsubTopic: some(pubsubTopic1), contentTopics: @[contentTopic1, contentTopic3] + includeData: true, + pubsubTopic: some(pubsubTopic1), + contentTopics: @[contentTopic1, contentTopic3], ) ## When @@ -349,7 +351,7 @@ procSuite "Waku Archive - find messages": waitFor archive.handleMessage(pubsubTopic, msg3) ## Given - let req = ArchiveQuery(pubsubTopic: some(pubsubTopic)) + let req = ArchiveQuery(includeData: true, pubsubTopic: some(pubsubTopic)) ## When let res = waitFor archive.findMessages(req) @@ -367,7 +369,8 @@ procSuite "Waku Archive - find messages": test "handle query with forward pagination": ## Given - let req = ArchiveQuery(pageSize: 4, direction: PagingDirection.FORWARD) + let req = + ArchiveQuery(includeData: true, pageSize: 4, direction: PagingDirection.FORWARD) ## When var nextReq = req # copy @@ -400,7 +403,8 @@ procSuite "Waku Archive - find messages": test "handle query with backward pagination": ## Given - let req = ArchiveQuery(pageSize: 4, direction: PagingDirection.BACKWARD) + let req = + ArchiveQuery(includeData: true, pageSize: 4, direction: PagingDirection.BACKWARD) ## When var nextReq = req # copy @@ -463,7 +467,7 @@ procSuite "Waku Archive - find messages": ).isOk() ## Given - let req = ArchiveQuery(contentTopics: @[DefaultContentTopic]) + let req = ArchiveQuery(includeData: true, contentTopics: @[DefaultContentTopic]) ## When let res = waitFor archive.findMessages(req) @@ -482,6 +486,7 @@ procSuite "Waku Archive - find messages": test "handle temporal history query with a valid time window": ## Given let req = ArchiveQuery( + includeData: true, contentTopics: @[ContentTopic("1")], startTime: some(ts(15, timeOrigin)), endTime: some(ts(55, timeOrigin)), diff --git a/tests/waku_store/test_client.nim b/tests/waku_store/test_client.nim index bee963eb2a..f0dacb538b 100644 --- a/tests/waku_store/test_client.nim +++ b/tests/waku_store/test_client.nim @@ -38,9 +38,9 @@ suite "Store Client": hash3 = computeMessageHash(DefaultPubsubTopic, message3) messageSeq = @[ - WakuMessageKeyValue(messageHash: hash1, message: message1), - WakuMessageKeyValue(messageHash: hash2, message: message2), - WakuMessageKeyValue(messageHash: hash3, message: message3), + WakuMessageKeyValue(messageHash: hash1, message: some(message1)), + WakuMessageKeyValue(messageHash: hash2, message: some(message2)), + WakuMessageKeyValue(messageHash: hash3, message: some(message3)), ] handlerFuture = newHistoryFuture() handler = proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.} = diff --git a/tests/waku_store/test_rpc_codec.nim b/tests/waku_store/test_rpc_codec.nim index d88da84b1b..3d59badfe2 100644 --- a/tests/waku_store/test_rpc_codec.nim +++ b/tests/waku_store/test_rpc_codec.nim @@ -14,7 +14,7 @@ procSuite "Waku Store - RPC codec": ## Given let query = StoreQueryRequest( requestId: "0", - includeData: false, + includeData: true, pubsubTopic: some(DefaultPubsubTopic), contentTopics: @[DefaultContentTopic], startTime: some(Timestamp(10)), @@ -58,7 +58,7 @@ procSuite "Waku Store - RPC codec": let message = fakeWakuMessage() hash = computeMessageHash(DefaultPubsubTopic, message) - keyValue = WakuMessageKeyValue(messageHash: hash, message: message) + keyValue = WakuMessageKeyValue(messageHash: hash, message: some(message)) res = StoreQueryResponse( requestId: "1", statusCode: 200, diff --git a/tests/waku_store/test_waku_store.nim b/tests/waku_store/test_waku_store.nim index 56ca9a314f..b897196d82 100644 --- a/tests/waku_store/test_waku_store.nim +++ b/tests/waku_store/test_waku_store.nim @@ -29,7 +29,7 @@ suite "Waku Store - query handler": let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) let hash = computeMessageHash(DefaultPubsubTopic, msg) - let kv = WakuMessageKeyValue(messageHash: hash, message: msg) + let kv = WakuMessageKeyValue(messageHash: hash, message: some(msg)) var queryHandlerFut = newFuture[(StoreQueryRequest)]() diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index bce7bacb88..1071af19af 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -49,18 +49,19 @@ procSuite "WakuNode - Store": let hashes = msgListA.mapIt(computeMessageHash(DefaultPubsubTopic, it)) - let kvs = - zip(hashes, msgListA).mapIt(WakuMessageKeyValue(messageHash: it[0], message: it[1])) + let kvs = zip(hashes, msgListA).mapIt( + WakuMessageKeyValue(messageHash: it[0], message: some(it[1])) + ) let archiveA = block: let driver = newSqliteArchiveDriver() for kv in kvs: - let msg_digest = computeDigest(kv.message) + let message = kv.message.get() + let msg_digest = computeDigest(message) require ( waitFor driver.put( - DefaultPubsubTopic, kv.message, msg_digest, kv.messageHash, - kv.message.timestamp, + DefaultPubsubTopic, message, msg_digest, kv.messageHash, message.timestamp ) ).isOk() @@ -84,7 +85,8 @@ procSuite "WakuNode - Store": client.mountStoreClient() ## Given - let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic]) + let req = + StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic]) let serverPeer = server.peerInfo.toRemotePeerInfo() ## When @@ -119,6 +121,7 @@ procSuite "WakuNode - Store": ## Given let req = StoreQueryRequest( + includeData: true, contentTopics: @[DefaultContentTopic], paginationForward: PagingDirection.FORWARD, paginationLimit: some(uint64(7)), @@ -174,6 +177,7 @@ procSuite "WakuNode - Store": ## Given let req = StoreQueryRequest( + includeData: true, contentTopics: @[DefaultContentTopic], paginationLimit: some(uint64(7)), paginationForward: PagingDirection.BACKWARD, @@ -261,7 +265,8 @@ procSuite "WakuNode - Store": # Wait for the server filter to receive the push message require waitFor filterFut.withTimeout(5.seconds) - let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic]) + let req = + StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic]) let res = waitFor client.query(req, serverPeer) ## Then @@ -270,7 +275,8 @@ procSuite "WakuNode - Store": let response = res.get() check: response.messages.len == 1 - response.messages[0] == WakuMessageKeyValue(messageHash: hash, message: message) + response.messages[0] == + WakuMessageKeyValue(messageHash: hash, message: some(message)) let (handledPubsubTopic, handledMsg) = filterFut.read() check: @@ -341,7 +347,8 @@ procSuite "WakuNode - Store": client.mountStoreClient() ## Given - let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic]) + let req = + StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic]) let serverPeer = server.peerInfo.toRemotePeerInfo() let requestProc = proc() {.async.} = @@ -351,7 +358,7 @@ procSuite "WakuNode - Store": let response = queryRes.get() check: - response.messages.mapIt(it.message) == msgListA + response.messages.mapIt(it.message.get()) == msgListA for count in 0 ..< 4: waitFor requestProc() @@ -384,7 +391,8 @@ procSuite "WakuNode - Store": client.mountStoreClient() ## Given - let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic]) + let req = + StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic]) let serverPeer = server.peerInfo.toRemotePeerInfo() let successProc = proc() {.async.} = @@ -393,7 +401,7 @@ procSuite "WakuNode - Store": check queryRes.isOk() let response = queryRes.get() check: - response.messages.mapIt(it.message) == msgListA + response.messages.mapIt(it.message.get()) == msgListA let failsProc = proc() {.async.} = let queryRes = waitFor client.query(req, peer = serverPeer) diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index d5adabb7eb..345e35f497 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, times], + std/[options, times, sugar], stew/shims/net as stewNet, chronicles, testutils/unittests, @@ -224,9 +224,10 @@ procSuite "Waku Rest API - Store v3": "7", # page size. Empty implies default page size. ) - var wakuMessages = newSeq[WakuMessage](0) - for j in 0 ..< response.data.messages.len: - wakuMessages.add(response.data.messages[j].message) + let wakuMessages = collect(newSeq): + for element in response.data.messages: + if element.message.isSome(): + element.message.get() pages[i] = wakuMessages @@ -620,15 +621,16 @@ procSuite "Waku Rest API - Store v3": let client = newRestHttpClient(initTAddress(restAddress, restPort)) # Filtering by a known pubsub topic. - var response = - await client.getStoreMessagesV3(pubsubTopic = encodeUrl(DefaultPubsubTopic)) + var response = await client.getStoreMessagesV3( + includeData = "true", pubsubTopic = encodeUrl(DefaultPubsubTopic) + ) check: response.status == 200 $response.contentType == $MIMETYPE_JSON response.data.messages.len == 1 - let storeMessage = response.data.messages[0].message + let storeMessage = response.data.messages[0].message.get() check: storeMessage.payload == msg.payload @@ -710,9 +712,10 @@ procSuite "Waku Rest API - Store v3": "3", # page size. Empty implies default page size. ) - var wakuMessages = newSeq[WakuMessage](0) - for j in 0 ..< response.data.messages.len: - wakuMessages.add(response.data.messages[j].message) + let wakuMessages = collect(newSeq): + for element in response.data.messages: + if element.message.isSome(): + element.message.get() pages[i] = wakuMessages @@ -773,9 +776,10 @@ procSuite "Waku Rest API - Store v3": response.status == 200 $response.contentType == $MIMETYPE_JSON - var wakuMessages = newSeq[WakuMessage](0) - for j in 0 ..< response.data.messages.len: - wakuMessages.add(response.data.messages[j].message) + let wakuMessages = collect(newSeq): + for element in response.data.messages: + if element.message.isSome(): + element.message.get() check wakuMessages == msgList[6 .. 9] diff --git a/waku/common/protobuf.nim b/waku/common/protobuf.nim index 4086124de9..cafd83e650 100644 --- a/waku/common/protobuf.nim +++ b/waku/common/protobuf.nim @@ -45,8 +45,6 @@ proc write3*(proto: var ProtoBuffer, field: int, value: auto) = when value is Option: if value.isSome(): proto.write(field, value.get()) - elif value is bool: - proto.write(field, zint(value)) else: proto.write(field, value) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3dd514f6c4..32db2f03eb 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -808,6 +808,7 @@ when defined(waku_exp_store_resume): proc toArchiveQuery(request: StoreQueryRequest): ArchiveQuery = var query = ArchiveQuery() + query.includeData = request.includeData query.pubsubTopic = request.pubsubTopic query.contentTopics = request.contentTopics query.startTime = request.startTime @@ -834,9 +835,17 @@ proc toStoreResult(res: ArchiveResult): StoreQueryResult = res.statusCode = 200 res.statusDesc = "OK" - res.messages = response.hashes.zip(response.messages).mapIt( - WakuMessageKeyValue(messageHash: it[0], message: it[1]) - ) + + for i in 0 ..< response.hashes.len: + let hash = response.hashes[i] + + let kv = + store_common.WakuMessageKeyValue(messageHash: hash, message: none(WakuMessage)) + + res.messages.add(kv) + + for i in 0 ..< response.messages.len: + res.messages[i].message = some(response.messages[i]) if response.cursor.isSome(): res.paginationCursor = some(response.cursor.get().hash) diff --git a/waku/waku_api/rest/store/types.nim b/waku/waku_api/rest/store/types.nim index 22c957c8da..1b11871087 100644 --- a/waku/waku_api/rest/store/types.nim +++ b/waku/waku_api/rest/store/types.nim @@ -186,7 +186,9 @@ proc writeValue*( writer.beginRecord() writer.writeField("message_hash", value.messageHash) - writer.writeField("message", value.message) + + if value.message.isSome(): + writer.writeField("message", value.message.get()) writer.endRecord() @@ -217,10 +219,7 @@ proc readValue*( if messageHash.isNone(): reader.raiseUnexpectedValue("Field `message_hash` is missing") - if message.isNone(): - reader.raiseUnexpectedValue("Field `message` is missing") - - value = WakuMessageKeyValue(messageHash: messageHash.get(), message: message.get()) + value = WakuMessageKeyValue(messageHash: messageHash.get(), message: message) ## StoreQueryResponse serde diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 1829ff0552..6d2cf6ee7d 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -144,10 +144,14 @@ proc findMessages*( if query.contentTopics.len > 10: return err(ArchiveError.invalidQuery("too many content topics")) + if query.cursor.isSome() and query.cursor.get().hash.len != 32: + return err(ArchiveError.invalidQuery("invalid cursor hash length")) + let queryStartTime = getTime().toUnixFloat() let rows = ( await self.driver.getMessages( + includeData = query.includeData, contentTopic = query.contentTopics, pubsubTopic = query.pubsubTopic, cursor = query.cursor, @@ -174,7 +178,10 @@ proc findMessages*( let pageSize = min(rows.len, int(maxPageSize)) #TODO once store v2 is removed, unzip instead of 2x map - messages = rows[0 ..< pageSize].mapIt(it[1]) + #TODO once store v2 is removed, update driver to not return messages when not needed + if query.includeData: + messages = rows[0 ..< pageSize].mapIt(it[1]) + hashes = rows[0 ..< pageSize].mapIt(it[4]) ## Cursor @@ -206,7 +213,7 @@ proc findMessages*( proc findMessagesV2*( self: WakuArchive, query: ArchiveQuery -): Future[ArchiveResult] {.async, gcsafe.} = +): Future[ArchiveResult] {.async, deprecated, gcsafe.} = ## Search the archive to return a single page of messages matching the query criteria let maxPageSize = diff --git a/waku/waku_archive/common.nim b/waku/waku_archive/common.nim index 0d469ce0ee..adc598941f 100644 --- a/waku/waku_archive/common.nim +++ b/waku/waku_archive/common.nim @@ -43,6 +43,7 @@ type hash*: WakuMessageHash ArchiveQuery* = object + includeData*: bool # indicate if messages should be returned in addition to hashes. pubsubTopic*: Option[PubsubTopic] contentTopics*: seq[ContentTopic] cursor*: Option[ArchiveCursor] diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index 6b7f28fce2..235c4b8a7c 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -41,11 +41,12 @@ method getMessagesV2*( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true, -): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, deprecated, async.} = discard method getMessages*( driver: ArchiveDriver, + includeData = false, contentTopic = newSeq[ContentTopic](0), pubsubTopic = none(PubsubTopic), cursor = none(ArchiveCursor), diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 0d38320e02..5336a8403b 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -377,7 +377,7 @@ proc getMessagesV2ArbitraryQuery( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true, -): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} = ## This proc allows to handle atypical queries. We don't use prepared statements for those. var query = @@ -521,7 +521,7 @@ proc getMessagesV2PreparedStmt( endTime: Timestamp, maxPageSize = DefaultPageSize, ascOrder = true, -): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} = ## This proc aims to run the most typical queries in a more performant way, i.e. by means of ## prepared statements. ## @@ -591,6 +591,7 @@ proc getMessagesV2PreparedStmt( method getMessages*( s: PostgresDriver, + includeData = false, contentTopicSeq = newSeq[ContentTopic](0), pubsubTopic = none(PubsubTopic), cursor = none(ArchiveCursor), @@ -631,7 +632,7 @@ method getMessagesV2*( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true, -): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} = if contentTopicSeq.len == 1 and pubsubTopic.isSome() and startTime.isSome() and endTime.isSome(): ## Considered the most common query. Therefore, we use prepared statements to optimize it. diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index bec9253d2e..dcc45f9700 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -258,6 +258,7 @@ method existsTable*( method getMessages*( driver: QueueDriver, + includeData = false, contentTopic: seq[ContentTopic] = @[], pubsubTopic = none(PubsubTopic), cursor = none(ArchiveCursor), diff --git a/waku/waku_archive/driver/sqlite_driver/queries.nim b/waku/waku_archive/driver/sqlite_driver/queries.nim index 30e8af02b2..5c837b3bcb 100644 --- a/waku/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/waku_archive/driver/sqlite_driver/queries.nim @@ -292,7 +292,7 @@ proc whereClausev2( startTime: Option[Timestamp], endTime: Option[Timestamp], ascending: bool, -): Option[string] = +): Option[string] {.deprecated.} = let cursorClause = if cursor: let comp = if ascending: ">" else: "<" @@ -336,7 +336,7 @@ proc whereClausev2( proc selectMessagesWithLimitQueryv2( table: string, where: Option[string], limit: uint, ascending = true, v3 = false -): SqlQueryStr = +): SqlQueryStr {.deprecated.} = let order = if ascending: "ASC" else: "DESC" var query: string @@ -369,7 +369,7 @@ proc execSelectMessagesV2WithLimitStmt( startTime: Option[Timestamp], endTime: Option[Timestamp], onRowCallback: DataProc, -): DatabaseResult[void] = +): DatabaseResult[void] {.deprecated.} = let s = RawStmtPtr(s) # Bind params @@ -416,29 +416,6 @@ proc execSelectMessagesV2WithLimitStmt( discard sqlite3_reset(s) # same return information as step discard sqlite3_clear_bindings(s) # no errors possible -proc execSelectMessageByHash( - s: SqliteStmt, hash: WakuMessageHash, onRowCallback: DataProc -): DatabaseResult[void] = - let s = RawStmtPtr(s) - - checkErr bindParam(s, 1, toSeq(hash)) - - try: - while true: - let v = sqlite3_step(s) - case v - of SQLITE_ROW: - onRowCallback(s) - of SQLITE_DONE: - return ok() - else: - return err($sqlite3_errstr(v)) - finally: - # release implicit transaction - discard sqlite3_reset(s) # same return information as step - discard sqlite3_clear_bindings(s) - # no errors possible - proc selectMessagesByHistoryQueryWithLimit*( db: SqliteDatabase, contentTopic: seq[ContentTopic], @@ -450,7 +427,7 @@ proc selectMessagesByHistoryQueryWithLimit*( ascending: bool, ): DatabaseResult[ seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] -] = +] {.deprecated.} = var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] = @[] @@ -483,6 +460,28 @@ proc selectMessagesByHistoryQueryWithLimit*( ### Store v3 ### +proc execSelectMessageByHash( + s: SqliteStmt, hash: WakuMessageHash, onRowCallback: DataProc +): DatabaseResult[void] = + let s = RawStmtPtr(s) + + checkErr bindParam(s, 1, toSeq(hash)) + + try: + while true: + let v = sqlite3_step(s) + case v + of SQLITE_ROW: + onRowCallback(s) + of SQLITE_DONE: + return ok() + else: + return err($sqlite3_errstr(v)) + finally: + # release implicit transaction + discard sqlite3_reset(s) # same return information as step + discard sqlite3_clear_bindings(s) # no errors possible + proc selectMessageByHashQuery(): SqlQueryStr = var query: string diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 5a67b5778e..05ec9c229c 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -92,7 +92,7 @@ method getMessagesV2*( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true, -): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} = echo "here" let cursor = cursor.map(toDbCursor) @@ -111,6 +111,7 @@ method getMessagesV2*( method getMessages*( s: SqliteDriver, + includeData = false, contentTopic = newSeq[ContentTopic](0), pubsubTopic = none(PubsubTopic), cursor = none(ArchiveCursor), diff --git a/waku/waku_core/message/codec.nim b/waku/waku_core/message/codec.nim index 323f1a8986..7b71d95382 100644 --- a/waku/waku_core/message/codec.nim +++ b/waku/waku_core/message/codec.nim @@ -18,7 +18,7 @@ proc encode*(message: WakuMessage): ProtoBuffer = buf.write3(10, zint64(message.timestamp)) buf.write3(11, message.meta) buf.write3(21, message.proof) - buf.write3(31, message.ephemeral) + buf.write3(31, uint32(message.ephemeral)) buf.finish3() buf @@ -67,7 +67,7 @@ proc decode*(T: type WakuMessage, buffer: seq[byte]): ProtobufResult[T] = else: msg.proof = proof - var ephemeral: uint + var ephemeral: uint32 if not ?pb.getField(31, ephemeral): msg.ephemeral = false else: diff --git a/waku/waku_store/common.nim b/waku/waku_store/common.nim index b078e5574e..6481b73f5a 100644 --- a/waku/waku_store/common.nim +++ b/waku/waku_store/common.nim @@ -37,7 +37,7 @@ type WakuMessageKeyValue* = object messageHash*: WakuMessageHash - message*: WakuMessage + message*: Option[WakuMessage] StoreQueryResponse* = object requestId*: string diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index ad43394a85..3d02f1e706 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -49,11 +49,11 @@ proc handleQueryRequest*( var res = StoreQueryResponse() let req = StoreQueryRequest.decode(raw_request).valueOr: - error "failed to decode rpc", peerId = requestor + error "failed to decode rpc", peerId = requestor, error = $error waku_store_errors.inc(labelValues = [decodeRpcFailure]) res.statusCode = uint32(ErrorCode.BAD_REQUEST) - res.statusDesc = "decode rpc failed" + res.statusDesc = "decoding rpc failed: " & $error return res.encode().buffer @@ -82,10 +82,10 @@ proc handleQueryRequest*( res = queryResult.valueOr: error "store query failed", - peerId = requestor, requestId = requestId, error = queryResult.error + peerId = requestor, requestId = requestId, error = $error - res.statusCode = uint32(queryResult.error.kind) - res.statusDesc = $queryResult.error + res.statusCode = uint32(error.kind) + res.statusDesc = $error return res.encode().buffer diff --git a/waku/waku_store/rpc_codec.nim b/waku/waku_store/rpc_codec.nim index f1f49ad908..6b5c905906 100644 --- a/waku/waku_store/rpc_codec.nim +++ b/waku/waku_store/rpc_codec.nim @@ -14,7 +14,7 @@ proc encode*(req: StoreQueryRequest): ProtoBuffer = var pb = initProtoBuffer() pb.write3(1, req.requestId) - pb.write3(2, req.includeData) + pb.write3(2, uint32(req.includeData)) pb.write3(10, req.pubsubTopic) @@ -56,11 +56,11 @@ proc decode*( if not ?pb.getField(1, req.requestId): return err(ProtobufError.missingRequiredField("request_id")) - var inclData: uint + var inclData: uint32 if not ?pb.getField(2, inclData): req.includeData = false else: - req.includeData = inclData == 1 + req.includeData = inclData > 0 var pubsubTopic: string if not ?pb.getField(10, pubsubTopic): @@ -124,7 +124,9 @@ proc encode*(keyValue: WakuMessageKeyValue): ProtoBuffer = var pb = initProtoBuffer() pb.write3(1, keyValue.messageHash) - pb.write3(2, keyValue.message.encode()) + + if keyValue.message.isSome(): + pb.write3(2, keyValue.message.get().encode()) pb.finish3() @@ -163,9 +165,9 @@ proc decode*( var proto: ProtoBuffer if not ?pb.getField(2, proto): - return err(ProtobufError.missingRequiredField("message")) + keyValue.message = none(WakuMessage) else: - keyValue.message = ?WakuMessage.decode(proto.buffer) + keyValue.message = some(?WakuMessage.decode(proto.buffer)) return ok(keyValue)