diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index af90dbbfc2..756d8adac4 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -229,11 +229,11 @@ when isMainModule: # Install enabled API handlers: if conf.relay: - let cache = MessageCache[string].init(capacity=30) + let cache = MessageCache.init(capacity=30) installRelayApiHandlers(node, rpcServer, cache) if conf.filter: - let messageCache = filter_api.MessageCache.init(capacity=30) + let messageCache = MessageCache.init(capacity=30) installFilterApiHandlers(node, rpcServer, messageCache) if conf.store: diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 3a2d156bf0..6308ea71c0 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -687,18 +687,17 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo ## Relay REST API if conf.relay: - let cache = MessageCache[string].init(capacity=conf.restRelayCacheCapacity) + let cache = MessageCache.init(int(conf.restRelayCacheCapacity)) let handler = messageCacheHandler(cache) - let autoHandler = autoMessageCacheHandler(cache) for pubsubTopic in conf.pubsubTopics: - cache.subscribe(pubsubTopic) + cache.pubsubSubscribe(pubsubTopic) app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) for contentTopic in conf.contentTopics: - cache.subscribe(contentTopic) - app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler)) + cache.contentSubscribe(contentTopic) + app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler)) installRelayApiHandlers(server.router, app.node, cache) else: @@ -709,10 +708,10 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo app.node.wakuFilterClient != nil and app.node.wakuFilterClientLegacy != nil: - let legacyFilterCache = rest_legacy_filter_api.MessageCache.init() + let legacyFilterCache = MessageCache.init() rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache) - let filterCache = rest_filter_api.MessageCache.init() + let filterCache = MessageCache.init() let filterDiscoHandler = if app.wakuDiscv5.isSome(): @@ -765,23 +764,22 @@ proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNod installDebugApiHandlers(app.node, server) if conf.relay: - let cache = MessageCache[string].init(capacity=30) + let cache = MessageCache.init(capacity=50) let handler = messageCacheHandler(cache) - let autoHandler = autoMessageCacheHandler(cache) for pubsubTopic in conf.pubsubTopics: - cache.subscribe(pubsubTopic) + cache.pubsubSubscribe(pubsubTopic) app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) for contentTopic in conf.contentTopics: - cache.subscribe(contentTopic) - app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler)) + cache.contentSubscribe(contentTopic) + app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler)) installRelayApiHandlers(app.node, server, cache) if conf.filternode != "": - let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30) + let filterMessageCache = MessageCache.init(capacity=50) installFilterApiHandlers(app.node, server, filterMessageCache) installStoreApiHandlers(app.node, server) diff --git a/tests/test_message_cache.nim b/tests/test_message_cache.nim index b9dc691838..98a0c997ce 100644 --- a/tests/test_message_cache.nim +++ b/tests/test_message_cache.nim @@ -1,153 +1,217 @@ {.used.} import + std/sets, stew/[results, byteutils], - testutils/unittests, - chronicles + testutils/unittests import ../../waku/waku_core, ../../waku/waku_api/message_cache, - ./testlib/common, ./testlib/wakucore - -type TestMessageCache = MessageCache[(PubsubTopic, ContentTopic)] - suite "MessageCache": - test "subscribe to topic": + setup: ## Given - let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) - let cache = TestMessageCache.init() + let capacity = 3 + let testPubsubTopic = DefaultPubsubTopic + let testContentTopic = DefaultContentTopic + let cache = MessageCache.init(capacity) + test "subscribe to topic": ## When - cache.subscribe(testTopic) + cache.pubsubSubscribe(testPubsubTopic) + cache.pubsubSubscribe(testPubsubTopic) + + # idempotence of subscribe is also tested + cache.contentSubscribe(testContentTopic) + cache.contentSubscribe(testContentTopic) ## Then check: - cache.isSubscribed(testTopic) - + cache.isPubsubSubscribed(testPubsubTopic) + cache.isContentSubscribed(testContentTopic) + cache.pubsubTopicCount() == 1 + cache.contentTopicCount() == 1 test "unsubscribe from topic": - ## Given - let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) - let cache = TestMessageCache.init() - # Init cache content - cache.subscribe(testTopic) + cache.pubsubSubscribe(testPubsubTopic) + cache.contentSubscribe(testContentTopic) + + cache.pubsubSubscribe("AnotherPubsubTopic") + cache.contentSubscribe("AnotherContentTopic") ## When - cache.unsubscribe(testTopic) + cache.pubsubUnsubscribe(testPubsubTopic) + cache.contentUnsubscribe(testContentTopic) ## Then check: - not cache.isSubscribed(testTopic) - + not cache.isPubsubSubscribed(testPubsubTopic) + not cache.isContentSubscribed(testContentTopic) + cache.pubsubTopicCount() == 1 + cache.contentTopicCount() == 1 test "get messages of a subscribed topic": ## Given - let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() - let cache = TestMessageCache.init() # Init cache content - cache.subscribe(testTopic) - cache.addMessage(testTopic, testMessage) + cache.pubsubSubscribe(testPubsubTopic) + cache.addMessage(testPubsubTopic, testMessage) ## When - let res = cache.getMessages(testTopic) + let res = cache.getMessages(testPubsubTopic) ## Then check: res.isOk() res.get() == @[testMessage] - test "get messages with clean flag shoud clear the messages cache": ## Given - let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() - let cache = TestMessageCache.init() # Init cache content - cache.subscribe(testTopic) - cache.addMessage(testTopic, testMessage) + cache.pubsubSubscribe(testPubsubTopic) + cache.addMessage(testPubsubTopic, testMessage) ## When - var res = cache.getMessages(testTopic, clear=true) + var res = cache.getMessages(testPubsubTopic, clear=true) require(res.isOk()) - res = cache.getMessages(testTopic) + res = cache.getMessages(testPubsubTopic) ## Then check: res.isOk() res.get().len == 0 - test "get messages of a non-subscribed topic": - ## Given - let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) - let cache = TestMessageCache.init() - ## When - let res = cache.getMessages(testTopic) + cache.pubsubSubscribe(PubsubTopic("dummyPubsub")) + let res = cache.getMessages(testPubsubTopic) ## Then check: res.isErr() - res.error() == "Not subscribed to topic" - + res.error() == "not subscribed to this pubsub topic" test "add messages to subscribed topic": ## Given - let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() - let cache = TestMessageCache.init() - cache.subscribe(testTopic) + cache.pubsubSubscribe(testPubsubTopic) ## When - cache.addMessage(testTopic, testMessage) + cache.addMessage(testPubsubTopic, testMessage) ## Then - let messages = cache.getMessages(testTopic).tryGet() + let messages = cache.getMessages(testPubsubTopic).tryGet() check: messages == @[testMessage] - test "add messages to non-subscribed topic": ## Given - let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() - let cache = TestMessageCache.init() ## When - cache.addMessage(testTopic, testMessage) + cache.addMessage(testPubsubTopic, testMessage) ## Then - let res = cache.getMessages(testTopic) + let res = cache.getMessages(testPubsubTopic) check: res.isErr() - res.error() == "Not subscribed to topic" - + res.error() == "not subscribed to any pubsub topics" test "add messages beyond the capacity": ## Given - let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) - let testMessages = @[ - fakeWakuMessage(toBytes("MSG-1")), - fakeWakuMessage(toBytes("MSG-2")), - fakeWakuMessage(toBytes("MSG-3")) - ] + var testMessages = @[fakeWakuMessage(toBytes("MSG-1"))] + + # Prevent duplicate messages timestamp + for i in 0..<5: + var msg = fakeWakuMessage(toBytes("MSG-1")) - let cache = TestMessageCache.init(capacity = 2) - cache.subscribe(testTopic) + while msg.timestamp <= testMessages[i].timestamp: + msg = fakeWakuMessage(toBytes("MSG-1")) + + testMessages.add(msg) + + cache.pubsubSubscribe(testPubsubTopic) ## When for msg in testMessages: - cache.addMessage(testTopic, msg) + cache.addMessage(testPubsubTopic, msg) ## Then - let messages = cache.getMessages(testTopic).tryGet() + let messages = cache.getMessages(testPubsubTopic).tryGet() + let messageSet = toHashSet(messages) + + let testSet = toHashSet(testMessages) + + check: + messageSet.len == capacity + messageSet < testSet + testMessages[0] notin messages + + test "get messages on pubsub via content topics": + cache.pubsubSubscribe(testPubsubTopic) + + let fakeMessage = fakeWakuMessage() + + cache.addMessage(testPubsubTopic, fakeMessage) + + let getRes = cache.getAutoMessages(DefaultContentTopic) + + check: + getRes.isOk + getRes.get() == @[fakeMessage] + + test "add same message twice": + cache.pubsubSubscribe(testPubsubTopic) + + let fakeMessage = fakeWakuMessage() + + cache.addMessage(testPubsubTopic, fakeMessage) + cache.addMessage(testPubsubTopic, fakeMessage) + + check: + cache.messagesCount() == 1 + + test "unsubscribing remove messages": + let topic0 = "PubsubTopic0" + let topic1 = "PubsubTopic1" + let topic2 = "PubsubTopic2" + + let fakeMessage0 = fakeWakuMessage(toBytes("MSG-0")) + let fakeMessage1 = fakeWakuMessage(toBytes("MSG-1")) + let fakeMessage2 = fakeWakuMessage(toBytes("MSG-2")) + + cache.pubsubSubscribe(topic0) + cache.pubsubSubscribe(topic1) + cache.pubsubSubscribe(topic2) + cache.contentSubscribe("ContentTopic0") + + cache.addMessage(topic0, fakeMessage0) + cache.addMessage(topic1, fakeMessage1) + cache.addMessage(topic2, fakeMessage2) + + cache.pubsubUnsubscribe(topic0) + + # at this point, fakeMessage0 is only ref by DefaultContentTopic + + let res = cache.getAutoMessages(DefaultContentTopic) + + check: + res.isOk() + res.get().len == 3 + cache.isPubsubSubscribed(topic0) == false + cache.isPubsubSubscribed(topic1) == true + cache.isPubsubSubscribed(topic2) == true + + cache.contentUnsubscribe(DefaultContentTopic) + + # msg0 was delete because no refs + check: - messages == testMessages[1..2] + cache.messagesCount() == 2 \ No newline at end of file diff --git a/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim b/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim index 0c95626b08..b82d55407a 100644 --- a/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim +++ b/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim @@ -4,8 +4,6 @@ import std/options, stew/shims/net as stewNet, testutils/unittests, - chronicles, - libp2p/crypto/crypto, json_rpc/[rpcserver, rpcclient] import ../../../waku/waku_core, @@ -20,11 +18,6 @@ import ../testlib/wakucore, ../testlib/wakunode - -proc newTestMessageCache(): filter_api.MessageCache = - filter_api.MessageCache.init(capacity=30) - - procSuite "Waku v2 JSON-RPC API - Filter": let bindIp = ValidIpAddress.init("0.0.0.0") @@ -49,7 +42,8 @@ procSuite "Waku v2 JSON-RPC API - Filter": ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) - installFilterApiHandlers(node2, server, newTestMessageCache()) + let cache = MessageCache.init(capacity=30) + installFilterApiHandlers(node2, server, cache) server.start() let client = newRpcHttpClient() diff --git a/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim b/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim index e99e2edaa8..08c94b8514 100644 --- a/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim +++ b/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim @@ -4,7 +4,6 @@ import std/[options, sequtils, tempfiles], stew/shims/net as stewNet, testutils/unittests, - chronicles, libp2p/crypto/crypto, json_rpc/[rpcserver, rpcclient] import @@ -15,10 +14,8 @@ import ../../../waku/waku_node, ../../../waku/waku_api/jsonrpc/relay/handlers as relay_api, ../../../waku/waku_api/jsonrpc/relay/client as relay_api_client, - ../../../waku/waku_core, ../../../waku/waku_relay, ../../../waku/waku_rln_relay, - ../testlib/common, ../testlib/wakucore, ../testlib/wakunode @@ -37,7 +34,7 @@ suite "Waku v2 JSON-RPC API - Relay": ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) server = newRpcHttpServer([ta]) - let cache = MessageCache[string].init(capacity=30) + let cache = MessageCache.init(capacity=30) installRelayApiHandlers(node, server, cache) server.start() @@ -111,7 +108,7 @@ suite "Waku v2 JSON-RPC API - Relay": ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) server = newRpcHttpServer([ta]) - let cache = MessageCache[string].init(capacity=30) + let cache = MessageCache.init(capacity=30) installRelayApiHandlers(srcNode, server, cache) server.start() @@ -181,7 +178,7 @@ suite "Waku v2 JSON-RPC API - Relay": ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) server = newRpcHttpServer([ta]) - let cache = MessageCache[string].init(capacity=30) + let cache = MessageCache.init(capacity=30) installRelayApiHandlers(dstNode, server, cache) server.start() @@ -244,7 +241,7 @@ suite "Waku v2 JSON-RPC API - Relay": ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) server = newRpcHttpServer([ta]) - let cache = MessageCache[string].init(capacity=30) + let cache = MessageCache.init(capacity=30) installRelayApiHandlers(dstNode, server, cache) server.start() diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 6170e9ce8e..d6ab93525d 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -1,7 +1,6 @@ {.used.} import - std/sequtils, stew/byteutils, stew/shims/net, testutils/unittests, @@ -9,7 +8,6 @@ import libp2p/crypto/crypto import ../../waku/waku_api/message_cache, - ../../waku/common/base64, ../../waku/waku_core, ../../waku/waku_node, ../../waku/node/peer_manager, @@ -44,7 +42,7 @@ type RestFilterTest = object subscriberNode: WakuNode restServer: RestServerRef restServerForService: RestServerRef - messageCache: filter_api.MessageCache + messageCache: MessageCache client: RestClientRef clientTwdServiceNode: RestClientRef @@ -70,10 +68,10 @@ proc init(T: type RestFilterTest): Future[T] {.async.} = testSetup.restServerForService = RestServerRef.init(restAddress, restPort2).tryGet() # through this one we will see if messages are pushed according to our content topic sub - testSetup.messageCache = filter_api.MessageCache.init() + testSetup.messageCache = MessageCache.init() installFilterRestApiHandlers(testSetup.restServer.router, testSetup.subscriberNode, testSetup.messageCache) - let topicCache = MessageCache[string].init() + let topicCache = MessageCache.init() installRelayApiHandlers(testSetup.restServerForService.router, testSetup.serviceNode, topicCache) testSetup.restServer.start() @@ -242,7 +240,7 @@ suite "Waku v2 Rest API - Filter V2": restFilterTest = await RestFilterTest.init() subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId - restFilterTest.messageCache.subscribe(DefaultPubsubTopic) + restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic) restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) # When @@ -272,7 +270,7 @@ suite "Waku v2 Rest API - Filter V2": toRelayWakuMessage(testMessage) ) # Then - let messages = restFilterTest.messageCache.getMessages("1").tryGet() + let messages = restFilterTest.messageCache.getAutoMessages("1").tryGet() check: postMsgResponse.status == 200 diff --git a/tests/wakunode_rest/test_rest_legacy_filter.nim b/tests/wakunode_rest/test_rest_legacy_filter.nim index b26dbef8ff..800c13b33d 100644 --- a/tests/wakunode_rest/test_rest_legacy_filter.nim +++ b/tests/wakunode_rest/test_rest_legacy_filter.nim @@ -39,7 +39,7 @@ type RestFilterTest = object filterNode: WakuNode clientNode: WakuNode restServer: RestServerRef - messageCache: filter_api.MessageCache + messageCache: MessageCache client: RestClientRef @@ -59,7 +59,7 @@ proc setupRestFilter(): Future[RestFilterTest] {.async.} = let restAddress = ValidIpAddress.init("0.0.0.0") result.restServer = RestServerRef.init(restAddress, restPort).tryGet() - result.messageCache = filter_api.MessageCache.init() + result.messageCache = MessageCache.init() installLegacyFilterRestApiHandlers(result.restServer.router ,result.clientNode ,result.messageCache) @@ -100,10 +100,10 @@ suite "Waku v2 Rest API - Filter": response.data == "OK" check: - restFilterTest.messageCache.isSubscribed(DefaultContentTopic) - restFilterTest.messageCache.isSubscribed("2") - restFilterTest.messageCache.isSubscribed("3") - restFilterTest.messageCache.isSubscribed("4") + restFilterTest.messageCache.isContentSubscribed(DefaultContentTopic) + restFilterTest.messageCache.isContentSubscribed("2") + restFilterTest.messageCache.isContentSubscribed("3") + restFilterTest.messageCache.isContentSubscribed("4") # When - error case let badRequestBody = FilterLegacySubscribeRequest(contentFilters: @[] @@ -125,10 +125,10 @@ suite "Waku v2 Rest API - Filter": restFilterTest: RestFilterTest = await setupRestFilter() # When - restFilterTest.messageCache.subscribe("1") - restFilterTest.messageCache.subscribe("2") - restFilterTest.messageCache.subscribe("3") - restFilterTest.messageCache.subscribe("4") + restFilterTest.messageCache.contentSubscribe("1") + restFilterTest.messageCache.contentSubscribe("2") + restFilterTest.messageCache.contentSubscribe("3") + restFilterTest.messageCache.contentSubscribe("4") let contentFilters = @[ContentTopic("1") ,ContentTopic("2") @@ -148,10 +148,10 @@ suite "Waku v2 Rest API - Filter": response.data == "OK" check: - not restFilterTest.messageCache.isSubscribed("1") - not restFilterTest.messageCache.isSubscribed("2") - not restFilterTest.messageCache.isSubscribed("3") - restFilterTest.messageCache.isSubscribed("4") + not restFilterTest.messageCache.isContentSubscribed("1") + not restFilterTest.messageCache.isContentSubscribed("2") + not restFilterTest.messageCache.isContentSubscribed("3") + restFilterTest.messageCache.isContentSubscribed("4") await restFilterTest.shutdown() @@ -164,15 +164,22 @@ suite "Waku v2 Rest API - Filter": let pubSubTopic = "/waku/2/default-waku/proto" let contentTopic = ContentTopic( "content-topic-x" ) - let messages = @[ - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + var messages = @[ + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) ] - restFilterTest.messageCache.subscribe(contentTopic) + # Prevent duplicate messages + for i in 0..<2: + var msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) + + while msg == messages[i]: + msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) + + messages.add(msg) + + restFilterTest.messageCache.contentSubscribe(contentTopic) for msg in messages: - restFilterTest.messageCache.addMessage(contentTopic, msg) + restFilterTest.messageCache.addMessage(pubSubTopic, msg) # When let response = await restFilterTest.client.filterGetMessagesV1(contentTopic) diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index e3fc2130bf..3554253aa1 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -44,7 +44,7 @@ suite "Waku v2 Rest API - Relay": let restAddress = ValidIpAddress.init("0.0.0.0") let restServer = RestServerRef.init(restAddress, restPort).tryGet() - let cache = MessageCache[string].init() + let cache = MessageCache.init() installRelayApiHandlers(restServer.router, node, cache) restServer.start() @@ -66,9 +66,9 @@ suite "Waku v2 Rest API - Relay": response.data == "OK" check: - cache.isSubscribed("pubsub-topic-1") - cache.isSubscribed("pubsub-topic-2") - cache.isSubscribed("pubsub-topic-3") + cache.isPubsubSubscribed("pubsub-topic-1") + cache.isPubsubSubscribed("pubsub-topic-2") + cache.isPubsubSubscribed("pubsub-topic-3") check: toSeq(node.wakuRelay.subscribedTopics).len == pubSubTopics.len @@ -92,11 +92,11 @@ suite "Waku v2 Rest API - Relay": let restAddress = ValidIpAddress.init("0.0.0.0") let restServer = RestServerRef.init(restAddress, restPort).tryGet() - let cache = MessageCache[string].init() - cache.subscribe("pubsub-topic-1") - cache.subscribe("pubsub-topic-2") - cache.subscribe("pubsub-topic-3") - cache.subscribe("pubsub-topic-x") + let cache = MessageCache.init() + cache.pubsubSubscribe("pubsub-topic-1") + cache.pubsubSubscribe("pubsub-topic-2") + cache.pubsubSubscribe("pubsub-topic-3") + cache.pubsubSubscribe("pubsub-topic-x") installRelayApiHandlers(restServer.router, node, cache) restServer.start() @@ -119,15 +119,15 @@ suite "Waku v2 Rest API - Relay": response.data == "OK" check: - not cache.isSubscribed("pubsub-topic-1") + not cache.isPubsubSubscribed("pubsub-topic-1") not node.wakuRelay.isSubscribed("pubsub-topic-1") - not cache.isSubscribed("pubsub-topic-2") + not cache.isPubsubSubscribed("pubsub-topic-2") not node.wakuRelay.isSubscribed("pubsub-topic-2") - not cache.isSubscribed("pubsub-topic-3") + not cache.isPubsubSubscribed("pubsub-topic-3") not node.wakuRelay.isSubscribed("pubsub-topic-3") - cache.isSubscribed("pubsub-topic-x") + cache.isPubsubSubscribed("pubsub-topic-x") node.wakuRelay.isSubscribed("pubsub-topic-x") - not cache.isSubscribed("pubsub-topic-y") + not cache.isPubsubSubscribed("pubsub-topic-y") not node.wakuRelay.isSubscribed("pubsub-topic-y") await restServer.stop() @@ -145,15 +145,23 @@ suite "Waku v2 Rest API - Relay": let restServer = RestServerRef.init(restAddress, restPort).tryGet() let pubSubTopic = "/waku/2/default-waku/proto" - let messages = @[ - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + + var messages = @[ + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) ] - let cache = MessageCache[string].init() + # Prevent duplicate messages + for i in 0..<2: + var msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) - cache.subscribe(pubSubTopic) + while msg == messages[i]: + msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) + + messages.add(msg) + + let cache = MessageCache.init() + + cache.pubsubSubscribe(pubSubTopic) for msg in messages: cache.addMessage(pubSubTopic, msg) @@ -177,7 +185,7 @@ suite "Waku v2 Rest API - Relay": check: - cache.isSubscribed(pubSubTopic) + cache.isPubsubSubscribed(pubSubTopic) cache.getMessages(pubSubTopic).tryGet().len == 0 await restServer.stop() @@ -199,7 +207,7 @@ suite "Waku v2 Rest API - Relay": let restAddress = ValidIpAddress.init("0.0.0.0") let restServer = RestServerRef.init(restAddress, restPort).tryGet() - let cache = MessageCache[string].init() + let cache = MessageCache.init() installRelayApiHandlers(restServer.router, node, cache) restServer.start() @@ -239,7 +247,7 @@ suite "Waku v2 Rest API - Relay": let restAddress = ValidIpAddress.init("0.0.0.0") let restServer = RestServerRef.init(restAddress, restPort).tryGet() - let cache = MessageCache[string].init() + let cache = MessageCache.init() installRelayApiHandlers(restServer.router, node, cache) restServer.start() @@ -263,9 +271,9 @@ suite "Waku v2 Rest API - Relay": response.data == "OK" check: - cache.isSubscribed(contentTopics[0]) - cache.isSubscribed(contentTopics[1]) - cache.isSubscribed(contentTopics[2]) + cache.isContentSubscribed(contentTopics[0]) + cache.isContentSubscribed(contentTopics[1]) + cache.isContentSubscribed(contentTopics[2]) check: # Node should be subscribed to all shards @@ -292,11 +300,11 @@ suite "Waku v2 Rest API - Relay": ContentTopic("/waku/2/default-contentX/proto") ] - let cache = MessageCache[string].init() - cache.subscribe(contentTopics[0]) - cache.subscribe(contentTopics[1]) - cache.subscribe(contentTopics[2]) - cache.subscribe("/waku/2/default-contentY/proto") + let cache = MessageCache.init() + cache.contentSubscribe(contentTopics[0]) + cache.contentSubscribe(contentTopics[1]) + cache.contentSubscribe(contentTopics[2]) + cache.contentSubscribe("/waku/2/default-contentY/proto") installRelayApiHandlers(restServer.router, node, cache) restServer.start() @@ -312,10 +320,10 @@ suite "Waku v2 Rest API - Relay": response.data == "OK" check: - not cache.isSubscribed(contentTopics[1]) - not cache.isSubscribed(contentTopics[2]) - not cache.isSubscribed(contentTopics[3]) - cache.isSubscribed("/waku/2/default-contentY/proto") + not cache.isContentSubscribed(contentTopics[1]) + not cache.isContentSubscribed(contentTopics[2]) + not cache.isContentSubscribed(contentTopics[3]) + cache.isContentSubscribed("/waku/2/default-contentY/proto") await restServer.stop() await restServer.closeWait() @@ -332,17 +340,25 @@ suite "Waku v2 Rest API - Relay": let restServer = RestServerRef.init(restAddress, restPort).tryGet() let contentTopic = DefaultContentTopic - let messages = @[ - fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")), + + var messages = @[ + fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")) ] - let cache = MessageCache[string].init() + # Prevent duplicate messages + for i in 0..<2: + var msg = fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")) + + while msg == messages[i]: + msg = fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")) + + messages.add(msg) + + let cache = MessageCache.init() - cache.subscribe(contentTopic) + cache.contentSubscribe(contentTopic) for msg in messages: - cache.addMessage(contentTopic, msg) + cache.addMessage(DefaultPubsubTopic, msg) installRelayApiHandlers(restServer.router, node, cache) restServer.start() @@ -363,8 +379,8 @@ suite "Waku v2 Rest API - Relay": msg.timestamp.get() != Timestamp(0) check: - cache.isSubscribed(contentTopic) - cache.getMessages(contentTopic).tryGet().len == 0 # The cache is cleared when getMessage is called + cache.isContentSubscribed(contentTopic) + cache.getAutoMessages(contentTopic).tryGet().len == 0 # The cache is cleared when getMessage is called await restServer.stop() await restServer.closeWait() @@ -385,7 +401,7 @@ suite "Waku v2 Rest API - Relay": let restAddress = ValidIpAddress.init("0.0.0.0") let restServer = RestServerRef.init(restAddress, restPort).tryGet() - let cache = MessageCache[string].init() + let cache = MessageCache.init() installRelayApiHandlers(restServer.router, node, cache) restServer.start() diff --git a/waku/waku_api/handlers.nim b/waku/waku_api/handlers.nim index 075c5959b2..dfbfca76d5 100644 --- a/waku/waku_api/handlers.nim +++ b/waku/waku_api/handlers.nim @@ -40,11 +40,6 @@ proc defaultDiscoveryHandler*(discv5: WakuDiscoveryV5, cap: Capabilities): Disco ### Message Cache -proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler = +proc messageCacheHandler*(cache: MessageCache): WakuRelayHandler = return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} = - cache.addMessage(PubSubTopic(pubsubTopic), msg) - -proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler = - return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} = - if cache.isSubscribed(msg.contentTopic): - cache.addMessage(msg.contentTopic, msg) \ No newline at end of file + cache.addMessage(pubsubTopic, msg) \ No newline at end of file diff --git a/waku/waku_api/jsonrpc/filter/handlers.nim b/waku/waku_api/jsonrpc/filter/handlers.nim index df7e459cfb..23f5aaa76c 100644 --- a/waku/waku_api/jsonrpc/filter/handlers.nim +++ b/waku/waku_api/jsonrpc/filter/handlers.nim @@ -24,11 +24,6 @@ logScope: const futTimeout* = 5.seconds # Max time to wait for futures - -type - MessageCache* = message_cache.MessageCache[ContentTopic] - - proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) = server.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], pubsubTopic: Option[PubsubTopic]) -> bool: @@ -42,7 +37,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = - cache.addMessage(msg.contentTopic, msg) + cache.addMessage(pubsubTopic, msg) let subFut = node.legacyFilterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get()) if not await subFut.withTimeout(futTimeout): @@ -50,7 +45,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message # Successfully subscribed to all content filters for cTopic in contentTopics: - cache.subscribe(cTopic) + cache.contentSubscribe(cTopic) return true @@ -69,7 +64,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message raise newException(ValueError, "Failed to unsubscribe from contentFilters") for cTopic in contentTopics: - cache.unsubscribe(cTopic) + cache.contentUnsubscribe(cTopic) return true @@ -78,7 +73,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message ## last time this method was called debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic - if not cache.isSubscribed(contentTopic): + if not cache.isContentSubscribed(contentTopic): raise newException(ValueError, "Not subscribed to topic: " & contentTopic) let msgRes = cache.getMessages(contentTopic, clear=true) diff --git a/waku/waku_api/jsonrpc/relay/handlers.nim b/waku/waku_api/jsonrpc/relay/handlers.nim index 15205f3bc8..e57518b598 100644 --- a/waku/waku_api/jsonrpc/relay/handlers.nim +++ b/waku/waku_api/jsonrpc/relay/handlers.nim @@ -32,7 +32,7 @@ const futTimeout* = 5.seconds # Max time to wait for futures ## Waku Relay JSON-RPC API -proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache[string]) = +proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) = server.rpc("post_waku_v2_relay_v1_subscriptions") do (pubsubTopics: seq[PubsubTopic]) -> bool: if pubsubTopics.len == 0: raise newException(ValueError, "No pubsub topic provided") @@ -41,13 +41,13 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC debug "post_waku_v2_relay_v1_subscriptions" # Subscribe to all requested topics - let newTopics = pubsubTopics.filterIt(not cache.isSubscribed(it)) + let newTopics = pubsubTopics.filterIt(not cache.isPubsubSubscribed(it)) for pubsubTopic in newTopics: if pubsubTopic == "": raise newException(ValueError, "Empty pubsub topic") - cache.subscribe(pubsubTopic) + cache.pubsubSubscribe(pubsubTopic) node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache))) return true @@ -60,13 +60,13 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC debug "delete_waku_v2_relay_v1_subscriptions" # Unsubscribe all handlers from requested topics - let subscribedTopics = pubsubTopics.filterIt(cache.isSubscribed(it)) + let subscribedTopics = pubsubTopics.filterIt(cache.isPubsubSubscribed(it)) for pubsubTopic in subscribedTopics: if pubsubTopic == "": raise newException(ValueError, "Empty pubsub topic") - cache.unsubscribe(pubsubTopic) + cache.pubsubUnsubscribe(pubsubTopic) node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic)) return true @@ -148,15 +148,15 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC ## Subscribes a node to a list of Content topics debug "post_waku_v2_relay_v1_auto_subscriptions" - let newTopics = contentTopics.filterIt(not cache.isSubscribed(it)) + let newTopics = contentTopics.filterIt(not cache.isContentSubscribed(it)) # Subscribe to all requested topics for contentTopic in newTopics: if contentTopic == "": raise newException(ValueError, "Empty content topic") - cache.subscribe(contentTopic) - node.subscribe((kind: ContentSub, topic: contentTopic), some(autoMessageCacheHandler(cache))) + cache.contentSubscribe(contentTopic) + node.subscribe((kind: ContentSub, topic: contentTopic), some(messageCacheHandler(cache))) return true @@ -167,14 +167,14 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC ## Unsubscribes a node from a list of Content topics debug "delete_waku_v2_relay_v1_auto_subscriptions" - let subscribedTopics = contentTopics.filterIt(cache.isSubscribed(it)) + let subscribedTopics = contentTopics.filterIt(cache.isContentSubscribed(it)) # Unsubscribe all handlers from requested topics for contentTopic in subscribedTopics: if contentTopic == "": raise newException(ValueError, "Empty content topic") - cache.unsubscribe(contentTopic) + cache.contentUnsubscribe(contentTopic) node.unsubscribe((kind: ContentUnsub, topic: contentTopic)) return true @@ -232,7 +232,7 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC ## last time this method was called debug "get_waku_v2_relay_v1_auto_messages", topic=contentTopic - let msgRes = cache.getMessages(contentTopic, clear=true) + let msgRes = cache.getAutoMessages(contentTopic, clear=true) if msgRes.isErr(): raise newException(ValueError, "Not subscribed to content topic: " & contentTopic) diff --git a/waku/waku_api/message_cache.nim b/waku/waku_api/message_cache.nim index dd0af17404..fe2bbd77c4 100644 --- a/waku/waku_api/message_cache.nim +++ b/waku/waku_api/message_cache.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/[tables, sequtils], + std/[sequtils, sugar, algorithm, options], stew/results, chronicles, chronos, @@ -15,68 +15,274 @@ import logScope: topics = "waku node message_cache" -const DefaultMessageCacheCapacity*: uint = 30 # Max number of messages cached per topic @TODO make this configurable +const DefaultMessageCacheCapacity: int = 50 +type MessageCache* = ref object + pubsubTopics: seq[PubsubTopic] + contentTopics: seq[ContentTopic] -type MessageCacheResult*[T] = Result[T, cstring] + pubsubIndex: seq[tuple[pubsubIdx: int, msgIdx: int]] + contentIndex: seq[tuple[contentIdx: int, msgIdx: int]] -type MessageCache*[K] = ref object - capacity: uint - table: Table[K, seq[WakuMessage]] + messages: seq[WakuMessage] -func init*[K](T: type MessageCache[K], capacity=DefaultMessageCacheCapacity): T = - MessageCache[K]( - capacity: capacity, - table: initTable[K, seq[WakuMessage]]() + capacity: int + +func `$`*(self: MessageCache): string = + "Messages: " & $self.messages.len & + " \nPubsubTopics: " & $self.pubsubTopics & + " \nContentTopics: " & $self.contentTopics & + " \nPubsubIndex: " & $self.pubsubIndex & + " \nContentIndex: " & $self.contentIndex + +func init*(T: type MessageCache, capacity=DefaultMessageCacheCapacity): T = + MessageCache( + capacity: capacity ) +proc messagesCount*(self: MessageCache): int = + self.messages.len -proc isSubscribed*[K](t: MessageCache[K], topic: K): bool = - t.table.hasKey(topic) +proc pubsubTopicCount*(self: MessageCache): int = + self.pubsubTopics.len -proc subscribe*[K](t: MessageCache[K], topic: K) = - if t.isSubscribed(topic): - return - t.table[topic] = @[] +proc contentTopicCount*(self: MessageCache): int = + self.contentTopics.len -proc unsubscribe*[K](t: MessageCache[K], topic: K) = - if not t.isSubscribed(topic): - return - t.table.del(topic) +proc pubsubSearch(self: MessageCache, pubsubTopic: PubsubTopic): Option[int] = + # Return some with the index if found none otherwise. + + for i, topic in self.pubsubTopics: + if topic == pubsubTopic: + return some(i) -proc unsubscribeAll*[K](t: MessageCache[K]) = - t.table.clear() + return none(int) -proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) = - if not t.isSubscribed(topic): - return +proc contentSearch(self: MessageCache, contentTopic: ContentTopic): Option[int] = + # Return some with the index if found none otherwise. + + for i, topic in self.contentTopics: + if topic == contentTopic: + return some(i) + + return none(int) + +proc isPubsubSubscribed*(self: MessageCache, pubsubTopic: PubsubTopic): bool = + self.pubsubSearch(pubsubTopic).isSome() + +proc isContentSubscribed*(self: MessageCache, contentTopic: ContentTopic): bool = + self.contentSearch(contentTopic).isSome() + +proc pubsubSubscribe*(self: MessageCache, pubsubTopic: PubsubTopic) = + if self.pubsubSearch(pubsubTopic).isNone(): + self.pubsubTopics.add(pubsubTopic) + +proc contentSubscribe*(self: MessageCache, contentTopic: ContentTopic) = + if self.contentSearch(contentTopic).isNone(): + self.contentTopics.add(contentTopic) + +proc removeMessage(self: MessageCache, idx: int) = + # get last index because del() is a swap + let lastIndex = self.messages.high + + self.messages.del(idx) + + # update indices + var j = self.pubsubIndex.high + while j > -1: + let (pId, mId) = self.pubsubIndex[j] + + if mId == idx: + self.pubsubIndex.del(j) + elif mId == lastIndex: + self.pubsubIndex[j] = (pId, idx) + + dec(j) + + j = self.contentIndex.high + while j > -1: + let (cId, mId) = self.contentIndex[j] + + if mId == idx: + self.contentIndex.del(j) + elif mId == lastIndex: + self.contentIndex[j] = (cId, idx) + + dec(j) + +proc pubsubUnsubscribe*(self: MessageCache, pubsubTopic: PubsubTopic) = + let pubsubIdxOp = self.pubsubSearch(pubsubTopic) + + let pubsubIdx = + if pubsubIdxOp.isSome(): pubsubIdxOp.get() + else: return + + let lastIndex = self.pubsubTopics.high + self.pubsubTopics.del(pubsubIdx) + + var msgIndices = newSeq[int](0) + + var j = self.pubsubIndex.high + while j > -1: + let (pId, mId) = self.pubsubIndex[j] + + if pId == pubsubIdx: + # remove index for this topic + self.pubsubIndex.del(j) + msgIndices.add(mId) + elif pId == lastIndex: + # swap the index because pubsubTopics.del() is a swap + self.pubsubIndex[j] = (pubsubIdx, mId) + + dec(j) + + # check if messages on this pubsub topic are indexed by any content topic, if not remove them. + for mId in msgIndices: + if not self.contentIndex.anyIt(it.msgIdx == mId): + self.removeMessage(mId) + +proc contentUnsubscribe*(self: MessageCache, contentTopic: ContentTopic) = + let contentIdxOP = self.contentSearch(contentTopic) - # Make a copy of msgs for this topic to modify - var messages = t.table.getOrDefault(topic, @[]) + let contentIdx = + if contentIdxOP.isSome(): contentIdxOP.get() + else: return - if messages.len >= t.capacity.int: - trace "Topic cache capacity reached", topic=topic - # Message cache on this topic exceeds maximum. Delete oldest. - # TODO: this may become a bottle neck if called as the norm rather than - # exception when adding messages. Performance profile needed. - messages.delete(0,0) + let lastIndex = self.contentTopics.high + self.contentTopics.del(contentIdx) + + var msgIndices = newSeq[int](0) - messages.add(msg) + var j = self.contentIndex.high + while j > -1: + let (cId, mId) = self.contentIndex[j] - # Replace indexed entry with copy - t.table[topic] = messages + if cId == contentIdx: + # remove indices for this topic + self.contentIndex.del(j) + msgIndices.add(mId) + elif cId == lastIndex: + # swap the indices because contentTopics.del() is a swap + self.contentIndex[j] = (contentIdx, mId) + + dec(j) -proc clearMessages*[K](t: MessageCache[K], topic: K) = - if not t.isSubscribed(topic): + # check if messages on this content topic are indexed by any pubsub topic, if not remove them. + for mId in msgIndices: + if not self.pubsubIndex.anyIt(it.msgIdx == mId): + self.removeMessage(mId) + +proc reset*(self: MessageCache) = + self.messages.setLen(0) + self.pubsubTopics.setLen(0) + self.contentTopics.setLen(0) + self.pubsubIndex.setLen(0) + self.contentIndex.setLen(0) + +proc addMessage*( + self: MessageCache, + pubsubTopic: PubsubTopic, + msg: WakuMessage + ) = + ## Idempotent message addition. + + var oldestTime = int64.high + var oldestMsg = int.high + for i, message in self.messages.reversed: + if message == msg: + return + + if message.timestamp < oldestTime: + oldestTime = message.timestamp + oldestMsg = i + + # reverse index + oldestMsg = self.messages.high - oldestMsg + + var pubsubIdxOp = self.pubsubSearch(pubsubTopic) + var contentIdxOp = self.contentSearch(msg.contentTopic) + + if pubsubIdxOp.isNone() and contentIdxOp.isNone(): return - t.table[topic] = @[] -proc getMessages*[K](t: MessageCache[K], topic: K, clear=false): MessageCacheResult[seq[WakuMessage]] = - if not t.isSubscribed(topic): - return err("Not subscribed to topic") + let pubsubIdx = + if pubsubIdxOp.isNone(): + self.pubsubTopics.add(pubsubTopic) + self.pubsubTopics.high + else: + pubsubIdxOp.get() + + let contentIdx = + if contentIdxOp.isNone(): + self.contentTopics.add(msg.contentTopic) + self.contentTopics.high + else: + contentIdxOp.get() + + # add the message, make space if needed + if self.messages.len >= self.capacity: + self.removeMessage(oldestMsg) + + let msgIdx = self.messages.len + self.messages.add(msg) + + self.pubsubIndex.add((pubsubIdx, msgIdx)) + self.contentIndex.add((contentIdx, msgIdx)) + +proc getMessages*( + self: MessageCache, + pubsubTopic: PubsubTopic, + clear=false + ): Result[seq[WakuMessage], string] = + ## Return all messages on this pubsub topic + + if self.pubsubTopics.len == 0: + return err("not subscribed to any pubsub topics") + + let pubsubIdxOp = self.pubsubSearch(pubsubTopic) + let pubsubIdx = + if pubsubIdxOp.isNone: + return err("not subscribed to this pubsub topic") + else: pubsubIdxOp.get() + + let msgIndices = collect: + for (pId, mId) in self.pubsubIndex: + if pId == pubsubIdx: + mId + + let messages = msgIndices.mapIt(self.messages[it]) - let messages = t.table.getOrDefault(topic, @[]) if clear: - t.clearMessages(topic) + for idx in msgIndices.reversed: + self.removeMessage(idx) + + return ok(messages) + +proc getAutoMessages*( + self: MessageCache, + contentTopic: ContentTopic, + clear=false + ): Result[seq[WakuMessage], string] = + ## Return all messages on this content topic - ok(messages) + if self.contentTopics.len == 0: + return err("not subscribed to any content topics") + + let contentIdxOp = self.contentSearch(contentTopic) + let contentIdx = + if contentIdxOp.isNone(): + return err("not subscribed to this content topic") + else: contentIdxOp.get() + + let msgIndices = collect: + for (cId, mId) in self.contentIndex: + if cId == contentIdx: + mId + + let messages = msgIndices.mapIt(self.messages[it]) + + if clear: + for idx in msgIndices.reversed: + self.removeMessage(idx) + + return ok(messages) \ No newline at end of file diff --git a/waku/waku_api/rest/filter/handlers.nim b/waku/waku_api/rest/filter/handlers.nim index 3b72d1b2d5..af2f77b257 100644 --- a/waku/waku_api/rest/filter/handlers.nim +++ b/waku/waku_api/rest/filter/handlers.nim @@ -40,10 +40,22 @@ const ROUTE_FILTER_SUBSCRIPTIONS* = "/filter/v2/subscriptions" const ROUTE_FILTER_ALL_SUBSCRIPTIONS* = "/filter/v2/subscriptions/all" -const filterMessageCacheDefaultCapacity* = 30 +func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] = + if contentBody.isNone(): + return err(RestApiResponse.badRequest("Missing content body")) -type - MessageCache* = message_cache.MessageCache[ContentTopic] + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json")) + + let reqBodyData = contentBody.get().data + + let requestResult = decodeFromJsonBytes(T, reqBodyData) + if requestResult.isErr(): + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + $requestResult.error)) + + return ok(requestResult.get()) proc getErrorCause(err: filter_protocol_type.FilterSubscribeError): string = ## Retrieve proper error cause of FilterSubscribeError - due stringify make some parts of text double @@ -169,7 +181,7 @@ proc filterPostPutSubscriptionRequestHandler( # Successfully subscribed to all content filters for cTopic in req.contentFilters: - cache.subscribe(cTopic) + cache.contentSubscribe(cTopic) return makeRestResponse(req.requestId, subFut.read()) @@ -235,7 +247,7 @@ proc installFilterDeleteSubscriptionsHandler( # Successfully subscribed to all content filters for cTopic in req.contentFilters: - cache.unsubscribe(cTopic) + cache.contentUnsubscribe(cTopic) # Successfully unsubscribed from all requested contentTopics return makeRestResponse(req.requestId, unsubFut.read()) @@ -276,7 +288,7 @@ proc installFilterDeleteAllSubscriptionsHandler( FilterSubscribeError.serviceUnavailable( "Failed to unsubscribe from all contentFilters due to timeout!")) - cache.unsubscribeAll() + cache.reset() # Successfully unsubscribed from all requested contentTopics return makeRestResponse(req.requestId, unsubFut.read()) @@ -321,7 +333,7 @@ proc installFilterGetMessagesHandler(router: var RestRouter, let pushHandler : FilterPushHandler = proc (pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = - cache.addMessage(msg.contentTopic, msg) + cache.addMessage(pubsubTopic, msg) node.wakuFilterClient.registerPushHandler(pushHandler) @@ -336,7 +348,7 @@ proc installFilterGetMessagesHandler(router: var RestRouter, let contentTopic = contentTopic.get() - let msgRes = cache.getMessages(contentTopic, clear=true) + let msgRes = cache.getAutoMessages(contentTopic, clear=true) if msgRes.isErr(): return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic) diff --git a/waku/waku_api/rest/filter/legacy_handlers.nim b/waku/waku_api/rest/filter/legacy_handlers.nim index 86d51f0351..c1c2ae91c1 100644 --- a/waku/waku_api/rest/filter/legacy_handlers.nim +++ b/waku/waku_api/rest/filter/legacy_handlers.nim @@ -34,10 +34,22 @@ const futTimeoutForSubscriptionProcessing* = 5.seconds const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" -const filterMessageCacheDefaultCapacity* = 30 +func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] = + if contentBody.isNone(): + return err(RestApiResponse.badRequest("Missing content body")) -type - MessageCache* = message_cache.MessageCache[ContentTopic] + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json")) + + let reqBodyData = contentBody.get().data + + let requestResult = decodeFromJsonBytes(T, reqBodyData) + if requestResult.isErr(): + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + $requestResult.error)) + + return ok(requestResult.get()) proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, @@ -45,7 +57,7 @@ proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter, let pushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = - cache.addMessage(msg.contentTopic, msg) + cache.addMessage(pubsubTopic, msg) router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: ## Subscribes a node to a list of contentTopics of a pubsubTopic @@ -74,7 +86,7 @@ proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter, # Successfully subscribed to all content filters for cTopic in req.contentFilters: - cache.subscribe(cTopic) + cache.contentSubscribe(cTopic) return RestApiResponse.ok() @@ -103,7 +115,7 @@ proc installFilterV1DeleteSubscriptionsV1Handler*(router: var RestRouter, return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters") for cTopic in req.contentFilters: - cache.unsubscribe(cTopic) + cache.contentUnsubscribe(cTopic) # Successfully unsubscribed from all requested contentTopics return RestApiResponse.ok() @@ -124,7 +136,7 @@ proc installFilterV1GetMessagesV1Handler*(router: var RestRouter, let contentTopic = contentTopic.get() - let msgRes = cache.getMessages(contentTopic, clear=true) + let msgRes = cache.getAutoMessages(contentTopic, clear=true) if msgRes.isErr(): return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic) diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index 999c5c7f1d..8456af37a6 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -5,7 +5,7 @@ else: import std/sequtils, - stew/byteutils, + stew/[byteutils, results], chronicles, json_serialization, json_serialization/std/options, @@ -26,19 +26,15 @@ import from std/times import getTime from std/times import toUnix - export types - logScope: topics = "waku node rest relay_api" - ##### Topic cache const futTimeout* = 5.seconds # Max time to wait for futures - #### Request handlers const ROUTE_RELAY_SUBSCRIPTIONSV1* = "/relay/v1/subscriptions" @@ -47,10 +43,11 @@ const ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1* = "/relay/v1/auto/subscriptions" const ROUTE_RELAY_AUTO_MESSAGESV1* = "/relay/v1/auto/messages/{contentTopic}" const ROUTE_RELAY_AUTO_MESSAGESV1_NO_TOPIC* = "/relay/v1/auto/messages" -proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: MessageCache[string]) = +proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: MessageCache) = router.api(MethodPost, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: - # ## Subscribes a node to a list of PubSub topics - # debug "post_waku_v2_relay_v1_subscriptions" + ## Subscribes a node to a list of PubSub topics + + debug "post_waku_v2_relay_v1_subscriptions" # Check the request body if contentBody.isNone(): @@ -60,10 +57,10 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes return error # Only subscribe to topics for which we have no subscribed topic handlers yet - let newTopics = req.filterIt(not cache.isSubscribed(it)) + let newTopics = req.filterIt(not cache.isPubsubSubscribed(it)) for pubsubTopic in newTopics: - cache.subscribe(pubsubTopic) + cache.pubsubSubscribe(pubsubTopic) node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache))) return RestApiResponse.ok() @@ -81,8 +78,8 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes # Unsubscribe all handlers from requested topics for pubsubTopic in req: - node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic)) - cache.unsubscribe(pubsubTopic) + cache.pubsubUnsubscribe(pubsubTopic) + node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic)) # Successfully unsubscribed from all requested topics return RestApiResponse.ok() @@ -160,66 +157,54 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes # Autosharding API router.api(MethodPost, ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: - # ## Subscribes a node to a list of content topics - # debug "post_waku_v2_relay_v1_auto_subscriptions" - - # Check the request body - if contentBody.isNone(): - return RestApiResponse.badRequest() + ## Subscribes a node to a list of content topics. + + debug "post_waku_v2_relay_v1_auto_subscriptions" let req: seq[ContentTopic] = decodeRequestBody[seq[ContentTopic]](contentBody).valueOr: return error # Only subscribe to topics for which we have no subscribed topic handlers yet - let newTopics = req.filterIt(not cache.isSubscribed(it)) + let newTopics = req.filterIt(not cache.isContentSubscribed(it)) for contentTopic in newTopics: - cache.subscribe(contentTopic) - node.subscribe((kind: ContentSub, topic: contentTopic), some(autoMessageCacheHandler(cache))) + cache.contentSubscribe(contentTopic) + node.subscribe((kind: ContentSub, topic: contentTopic), some(messageCacheHandler(cache))) return RestApiResponse.ok() router.api(MethodDelete, ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: - # ## Subscribes a node to a list of content topics - # debug "delete_waku_v2_relay_v1_auto_subscriptions" - - # Check the request body - if contentBody.isNone(): - return RestApiResponse.badRequest() + ## Unsubscribes a node from a list of content topics. + + debug "delete_waku_v2_relay_v1_auto_subscriptions" let req: seq[ContentTopic] = decodeRequestBody[seq[ContentTopic]](contentBody).valueOr: return error - # Unsubscribe all handlers from requested topics for contentTopic in req: - cache.unsubscribe(contentTopic) + cache.contentUnsubscribe(contentTopic) node.unsubscribe((kind: ContentUnsub, topic: contentTopic)) - # Successfully unsubscribed from all requested topics return RestApiResponse.ok() router.api(MethodGet, ROUTE_RELAY_AUTO_MESSAGESV1) do (contentTopic: string) -> RestApiResponse: - # ## Returns all WakuMessages received on a content topic since the - # ## last time this method was called - # ## TODO: ability to specify a return message limit - # debug "get_waku_v2_relay_v1_auto_messages", topic=topic + ## Returns all WakuMessages received on a content topic since the + ## last time this method was called. + + debug "get_waku_v2_relay_v1_auto_messages", contentTopic=contentTopic - if contentTopic.isErr(): - return RestApiResponse.badRequest() - let contentTopic = contentTopic.get() + let contentTopic = contentTopic.valueOr: + return RestApiResponse.badRequest($error) - let messages = cache.getMessages(contentTopic, clear=true) - if messages.isErr(): + let messages = cache.getAutoMessages(contentTopic, clear=true).valueOr: debug "Not subscribed to topic", topic=contentTopic - return RestApiResponse.notFound() + return RestApiResponse.notFound(contentTopic) - let data = RelayGetMessagesResponse(messages.get().map(toRelayWakuMessage)) - let resp = RestApiResponse.jsonResponse(data, status=Http200) - if resp.isErr(): - debug "An error ocurred while building the json respose", error=resp.error - return RestApiResponse.internalServerError() + let data = RelayGetMessagesResponse(messages.map(toRelayWakuMessage)) - return resp.get() + return RestApiResponse.jsonResponse(data, status=Http200).valueOr: + debug "An error ocurred while building the json respose", error = error + return RestApiResponse.internalServerError($error) router.api(MethodPost, ROUTE_RELAY_AUTO_MESSAGESV1_NO_TOPIC) do (contentBody: Option[ContentBody]) -> RestApiResponse: # Check the request body @@ -237,25 +222,21 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes # if RLN is mounted, append the proof to the message if not node.wakuRlnRelay.isNil(): - # append the proof to the message - let success = node.wakuRlnRelay.appendRLNProof(message, - float64(getTime().toUnix())) - if not success: - return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message") - - # validate the message before sending it - let result = node.wakuRlnRelay.validateMessageAndUpdateLog(message) - if result == MessageValidationResult.Invalid: - return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof") - elif result == MessageValidationResult.Spam: - return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later") - elif result == MessageValidationResult.Valid: - debug "RLN proof validated successfully", contentTopic=message.contentTopic - else: - return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result") + if not node.wakuRlnRelay.appendRLNProof(message, float64(getTime().toUnix())): + return RestApiResponse.internalServerError( + "Failed to publish: error appending RLN proof to message") + + case node.wakuRlnRelay.validateMessage(message): + of MessageValidationResult.Invalid: + return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof") + of MessageValidationResult.Spam: + return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later") + of MessageValidationResult.Valid: + debug "RLN proof validated successfully", contentTopic=message.contentTopic # if we reach here its either a non-RLN message or a RLN message with a valid proof debug "Publishing message", contentTopic=message.contentTopic, rln=not node.wakuRlnRelay.isNil() + if not (waitFor node.publish(none(PubSubTopic), message).withTimeout(futTimeout)): error "Failed to publish message to topic", contentTopic=message.contentTopic return RestApiResponse.internalServerError("Failed to publish: timedout")