diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 6e8257048a..1203361b6f 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -212,7 +212,7 @@ proc publish(c: Chat, line: string) = if not c.node.wakuLightPush.isNil(): # Attempt lightpush - asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message) + asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message) else: asyncSpawn c.node.publish(DefaultPubsubTopic, message) @@ -267,7 +267,7 @@ proc writeAndPrint(c: Chat) {.async.} = if not c.node.wakuFilter.isNil(): echo "unsubscribing from content filters..." - await c.node.unsubscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=c.contentTopic) + await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic) echo "quitting..." @@ -473,7 +473,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = trace "Hit filter handler", contentTopic=msg.contentTopic chat.printReceivedMessage(msg) - await node.subscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=chat.contentTopic, filterHandler) + await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler) else: error "Filter not mounted. Couldn't parse conf.filternode", @@ -488,7 +488,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = chat.printReceivedMessage(msg) let topic = DefaultPubsubTopic - node.subscribe(topic, handler) + await node.subscribe(some(topic), @[ContentTopic("")], handler) when defined(rln): if conf.rlnRelay: diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index cde2d3c524..3a877dfaa6 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -135,7 +135,7 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T = error "failed to parse content topic", error=res.error quit(QuitFailure) - let shardsRes = contentTopicsRes.mapIt(singleHighestWeigthShard(it.get())) + let shardsRes = contentTopicsRes.mapIt(getShard(it.get())) for res in shardsRes: if res.isErr(): @@ -363,7 +363,7 @@ proc setupProtocols(node: WakuNode, # TODO autoshard content topics only once. # Already checked for errors in app.init let contentTopics = conf.contentTopics.mapIt(NsContentTopic.parse(it).expect("Parsing")) - let shards = contentTopics.mapIt($(singleHighestWeigthShard(it).expect("Sharding"))) + let shards = contentTopics.mapIt($(getShard(it).expect("Sharding"))) let pubsubTopics = conf.topics & conf.pubsubTopics & shards try: diff --git a/tests/test_waku_lightpush.nim b/tests/test_waku_lightpush.nim index 9182a2958c..9aa8df52f9 100644 --- a/tests/test_waku_lightpush.nim +++ b/tests/test_waku_lightpush.nim @@ -115,4 +115,4 @@ suite "Waku Lightpush": requestError == error ## Cleanup - await allFutures(clientSwitch.stop(), serverSwitch.stop()) + await allFutures(clientSwitch.stop(), serverSwitch.stop()) \ No newline at end of file diff --git a/tests/test_wakunode_filter.nim b/tests/test_wakunode_filter.nim index a1f3c7e9a4..4ea7750ecc 100644 --- a/tests/test_wakunode_filter.nim +++ b/tests/test_wakunode_filter.nim @@ -1,6 +1,7 @@ {.used.} import + std/options, stew/shims/net as stewNet, testutils/unittests, chronicles, @@ -43,7 +44,7 @@ suite "WakuNode - Filter": filterPushHandlerFut.complete((pubsubTopic, msg)) ## When - await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo) + await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo) # Wait for subscription to take effect waitFor sleepAsync(100.millis) diff --git a/tests/test_wakunode_lightpush.nim b/tests/test_wakunode_lightpush.nim index a4f717bd29..7a15175c1c 100644 --- a/tests/test_wakunode_lightpush.nim +++ b/tests/test_wakunode_lightpush.nim @@ -1,6 +1,7 @@ {.used.} import + std/options, stew/shims/net as stewNet, testutils/unittests, chronicles, @@ -54,7 +55,7 @@ suite "WakuNode - Lightpush": await sleepAsync(100.millis) ## When - await lightNode.lightpushPublish(DefaultPubsubTopic, message) + await lightNode.lightpushPublish(some(DefaultPubsubTopic), message) ## Then check await completionFutRelay.withTimeout(5.seconds) diff --git a/tests/waku_core/test_namespaced_topics.nim b/tests/waku_core/test_namespaced_topics.nim index 9cc9774562..cf4ac34755 100644 --- a/tests/waku_core/test_namespaced_topics.nim +++ b/tests/waku_core/test_namespaced_topics.nim @@ -13,7 +13,6 @@ suite "Waku Message - Content topics namespacing": ## Given var ns = NsContentTopic() ns.generation = none(int) - ns.bias = Unbiased ns.application = "toychat" ns.version = "2" ns.name = "huilong" @@ -39,7 +38,6 @@ suite "Waku Message - Content topics namespacing": let ns = nsRes.get() check: ns.generation == none(int) - ns.bias == Unbiased ns.application == "toychat" ns.version == "2" ns.name == "huilong" @@ -47,7 +45,7 @@ suite "Waku Message - Content topics namespacing": test "Parse content topic string - Valid string with sharding": ## Given - let topic = "/0/lower20/toychat/2/huilong/proto" + let topic = "/0/toychat/2/huilong/proto" ## When let nsRes = NsContentTopic.parse(topic) @@ -58,7 +56,6 @@ suite "Waku Message - Content topics namespacing": let ns = nsRes.get() check: ns.generation == some(0) - ns.bias == Lower20 ns.application == "toychat" ns.version == "2" ns.name == "huilong" @@ -122,11 +119,11 @@ suite "Waku Message - Content topics namespacing": let err = ns.tryError() check: err.kind == ParsingErrorKind.InvalidFormat - err.cause == "invalid topic structure" + err.cause == "generation should be a numeric value" test "Parse content topic string - Invalid string: non numeric generation": ## Given - let topic = "/first/unbiased/toychat/2/huilong/proto" + let topic = "/first/toychat/2/huilong/proto" ## When let ns = NsContentTopic.parse(topic) @@ -139,21 +136,6 @@ suite "Waku Message - Content topics namespacing": err.kind == ParsingErrorKind.InvalidFormat err.cause == "generation should be a numeric value" - test "Parse content topic string - Invalid string: invalid bias": - ## Given - let topic = "/0/no/toychat/2/huilong/proto" - - ## When - let ns = NsContentTopic.parse(topic) - - ## Then - assert ns.isErr(), $ns.get() - - let err = ns.tryError() - check: - err.kind == ParsingErrorKind.InvalidFormat - err.cause == "bias should be one of; unbiased, lower20 or higher80" - suite "Waku Message - Pub-sub topics namespacing": test "Stringify named sharding pub-sub topic": diff --git a/tests/waku_core/test_sharding.nim b/tests/waku_core/test_sharding.nim index 76e910cc5b..e67cb89d47 100644 --- a/tests/waku_core/test_sharding.nim +++ b/tests/waku_core/test_sharding.nim @@ -4,7 +4,6 @@ import std/options, std/strutils, std/sugar, - std/algorithm, std/random, stew/results, testutils/unittests @@ -34,88 +33,60 @@ suite "Waku Sharding": let enc = "cbor" - NsContentTopic.init(none(int), Unbiased, app, version, name, enc) + NsContentTopic.init(none(int), app, version, name, enc) test "Implicit content topic generation": ## Given let topic = "/toychat/2/huilong/proto" ## When - let ns = NsContentTopic.parse(topic).expect("Parsing") - - let paramRes = shardCount(ns) + let parseRes = NsContentTopic.parse(topic) ## Then - assert paramRes.isOk(), paramRes.error + assert parseRes.isOk(), $parseRes.error - let count = paramRes.get() + let nsTopic = parseRes.get() check: - count == GenerationZeroShardsCount - ns.bias == Unbiased + nsTopic.generation == none(int) test "Valid content topic": ## Given - let topic = "/0/lower20/toychat/2/huilong/proto" + let topic = "/0/toychat/2/huilong/proto" ## When - let ns = NsContentTopic.parse(topic).expect("Parsing") - - let paramRes = shardCount(ns) + let parseRes = NsContentTopic.parse(topic) ## Then - assert paramRes.isOk(), paramRes.error + assert parseRes.isOk(), $parseRes.error - let count = paramRes.get() + let nsTopic = parseRes.get() check: - count == GenerationZeroShardsCount - ns.bias == Lower20 + nsTopic.generation.get() == 0 test "Invalid content topic generation": ## Given - let topic = "/1/unbiased/toychat/2/huilong/proto" + let topic = "/1/toychat/2/huilong/proto" ## When let ns = NsContentTopic.parse(topic).expect("Parsing") - let paramRes = shardCount(ns) + let shardRes = getShard(ns) ## Then - assert paramRes.isErr(), $paramRes.get() + assert shardRes.isErr(), $shardRes.get() - let err = paramRes.error + let err = shardRes.error check: err == "Generation > 0 are not supported yet" - test "Weigths bias": - ## Given - let count = 5 - - ## When - let anonWeigths = biasedWeights(count, ShardingBias.Lower20) - let speedWeigths = biasedWeights(count, ShardingBias.Higher80) - - ## Then - check: - anonWeigths[0] == 2.0 - anonWeigths[1] == 1.0 - anonWeigths[2] == 1.0 - anonWeigths[3] == 1.0 - anonWeigths[4] == 1.0 - - speedWeigths[0] == 1.0 - speedWeigths[1] == 2.0 - speedWeigths[2] == 2.0 - speedWeigths[3] == 2.0 - speedWeigths[4] == 2.0 - - test "Sorted shard list": + #[ test "Sorted shard list": ## Given - let topic = "/0/unbiased/toychat/2/huilong/proto" + let topic = "/0/toychat/2/huilong/proto" ## When let contentTopic = NsContentTopic.parse(topic).expect("Parsing") let count = shardCount(contentTopic).expect("Valid parameters") - let weights = biasedWeights(count, contentTopic.bias) + let weights = repeat(1.0, count) let shardsRes = weightedShardList(contentTopic, count, weights) @@ -125,7 +96,7 @@ suite "Waku Sharding": let shards = shardsRes.get() check: shards.len == count - isSorted(shards, hashOrder) + isSorted(shards, hashOrder) ]# test "Shard Choice Reproducibility": ## Given @@ -134,15 +105,11 @@ suite "Waku Sharding": ## When let contentTopic = NsContentTopic.parse(topic).expect("Parsing") - let res = singleHighestWeigthShard(contentTopic) + let pubsub = getGenZeroShard(contentTopic, GenerationZeroShardsCount) ## Then - assert res.isOk(), res.error - - let pubsubTopic = res.get() - check: - pubsubTopic == NsPubsubTopic.staticSharding(ClusterIndex, 3) + pubsub == NsPubsubTopic.staticSharding(ClusterIndex, 3) test "Shard Choice Simulation": ## Given @@ -154,7 +121,7 @@ suite "Waku Sharding": ## When for topic in topics: - let pubsub = singleHighestWeigthShard(topic).expect("Valid Topic") + let pubsub = getShard(topic).expect("Valid Topic") counts[pubsub.shard] += 1 ## Then diff --git a/tests/waku_filter_v2/test_waku_filter.nim b/tests/waku_filter_v2/test_waku_filter.nim index 39eae66884..8d5ee06269 100644 --- a/tests/waku_filter_v2/test_waku_filter.nim +++ b/tests/waku_filter_v2/test_waku_filter.nim @@ -10,7 +10,6 @@ import ../../../waku/node/peer_manager, ../../../waku/waku_filter_v2, ../../../waku/waku_filter_v2/client, - ../../../waku/waku_filter_v2/rpc, ../../../waku/waku_core, ../testlib/common, ../testlib/wakucore diff --git a/tests/waku_store/test_waku_store.nim b/tests/waku_store/test_waku_store.nim index a2db4b4bd8..f751dbfd19 100644 --- a/tests/waku_store/test_waku_store.nim +++ b/tests/waku_store/test_waku_store.nim @@ -115,4 +115,4 @@ suite "Waku Store - query handler": error.kind == HistoryErrorKind.BAD_REQUEST ## Cleanup - await allFutures(serverSwitch.stop(), clientSwitch.stop()) + await allFutures(serverSwitch.stop(), clientSwitch.stop()) \ No newline at end of file diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index d1fcf92111..6d01c36eea 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -232,7 +232,7 @@ procSuite "WakuNode - Store": proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = filterFut.complete((pubsubTopic, msg)) - waitFor server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer) + waitFor server.filterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer) waitFor sleepAsync(100.millis) diff --git a/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim b/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim index ebb5613cbe..17202df4ee 100644 --- a/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim +++ b/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim @@ -61,9 +61,9 @@ procSuite "Waku v2 JSON-RPC API - Filter": let contentFilters = @[ ContentFilter(contentTopic: DefaultContentTopic), - ContentFilter(contentTopic: ContentTopic("2")), - ContentFilter(contentTopic: ContentTopic("3")), - ContentFilter(contentTopic: ContentTopic("4")), + ContentFilter(contentTopic: ContentTopic("/waku/2/default-content2/proto")), + ContentFilter(contentTopic: ContentTopic("/waku/2/default-content3/proto")), + ContentFilter(contentTopic: ContentTopic("/waku/2/default-content4/proto")), ] var response = await client.post_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic)) check: diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 0d6d08b347..e4276fa3b5 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -90,7 +90,7 @@ suite "Waku v2 Rest API - Filter": ] let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, - pubsubTopic: DefaultPubsubTopic) + pubsubTopic: some(DefaultPubsubTopic)) let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody) # Then @@ -106,7 +106,7 @@ suite "Waku v2 Rest API - Filter": restFilterTest.messageCache.isSubscribed("4") # When - error case - let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: "") + let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: none(string)) let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody) check: @@ -137,7 +137,7 @@ suite "Waku v2 Rest API - Filter": # When let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, - pubsubTopic: DefaultPubsubTopic) + pubsubTopic: some(DefaultPubsubTopic)) let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody) # Then diff --git a/waku/node/jsonrpc/filter/handlers.nim b/waku/node/jsonrpc/filter/handlers.nim index 900fd85fa6..43174c0e14 100644 --- a/waku/node/jsonrpc/filter/handlers.nim +++ b/waku/node/jsonrpc/filter/handlers.nim @@ -21,8 +21,6 @@ logScope: topics = "waku node jsonrpc filter_api" -const DefaultPubsubTopic: PubsubTopic = "/waku/2/default-waku/proto" - const futTimeout* = 5.seconds # Max time to wait for futures @@ -32,7 +30,7 @@ type proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) = - server.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[PubsubTopic]) -> bool: + server.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], pubsubTopic: Option[PubsubTopic]) -> bool: ## Subscribes a node to a list of content filters debug "post_waku_v2_filter_v1_subscription" @@ -40,9 +38,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message if peerOpt.isNone(): raise newException(ValueError, "no suitable remote filter peers") - let - pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic) - contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) + let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = cache.addMessage(msg.contentTopic, msg) @@ -57,13 +53,11 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message return true - server.rpc("delete_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[PubsubTopic]) -> bool: + server.rpc("delete_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], pubsubTopic: Option[PubsubTopic]) -> bool: ## Unsubscribes a node from a list of content filters debug "delete_waku_v2_filter_v1_subscription" - let - pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic) - contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) + let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) let unsubFut = node.unsubscribe(pubsubTopic, contentTopics) if not await unsubFut.withTimeout(futTimeout): diff --git a/waku/node/rest/filter/types.nim b/waku/node/rest/filter/types.nim index 6b8fdb8718..595c8b1808 100644 --- a/waku/node/rest/filter/types.nim +++ b/waku/node/rest/filter/types.nim @@ -25,7 +25,7 @@ type FilterWakuMessage* = object type FilterGetMessagesResponse* = seq[FilterWakuMessage] type FilterSubscriptionsRequest* = object - pubsubTopic*: PubSubTopic + pubsubTopic*: Option[PubSubTopic] contentFilters*: seq[ContentTopic] #### Type conversion @@ -146,6 +146,6 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions reader.raiseUnexpectedValue("Field `contentFilters` is empty") value = FilterSubscriptionsRequest( - pubsubTopic: pubsubTopic.get(), + pubsubTopic: if pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), contentFilters: contentFilters.get() ) \ No newline at end of file diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 6720e408df..f26dd82871 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/[hashes, options, tables, strutils, sequtils, os], + std/[hashes, options, sugar, tables, strutils, sequtils, os], chronos, chronicles, metrics, stew/results, stew/byteutils, @@ -365,7 +365,7 @@ proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} = node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuFilterCodec)) -proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], +proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. if node.wakuFilterClientLegacy.isNil(): @@ -379,8 +379,6 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C let remotePeer = remotePeerRes.value - info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId - # Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled # TODO: Move this logic to wakunode2 app let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} = @@ -389,14 +387,44 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C await handler(pubsubTopic, message) - let subRes = await node.wakuFilterClientLegacy.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer) - if subRes.isOk(): - info "subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics + if pubsubTopic.isSome(): + info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId + + let res = await node.wakuFilterClientLegacy.subscribe(pubsubTopic.get(), contentTopics, handlerWrapper, peer=remotePeer) + + if res.isOk(): + info "subscribed to topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics + else: + error "failed filter subscription", error=res.error + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) else: - error "failed filter subscription", error=subRes.error - waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + let topicMapRes = parseSharding(pubsubTopic, contentTopics) + + let topicMap = + if topicMapRes.isErr(): + error "can't get shard", error=topicMapRes.error + return + else: topicMapRes.get() + + var futures = collect(newSeq): + for pubsub, topics in topicMap.pairs: + info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId + let content = topics.mapIt($it) + node.wakuFilterClientLegacy.subscribe($pubsub, content, handlerWrapper, peer=remotePeer) + + let finished = await allFinished(futures) + + for fut in finished: + let res = fut.read() + + if res.isErr(): + error "failed filter subscription", error=res.error + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + + for pubsub, topics in topicMap.pairs: + info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics -proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], +proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = ## Unsubscribe from a content filter. if node.wakuFilterClientLegacy.isNil(): @@ -410,17 +438,45 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: let remotePeer = remotePeerRes.value - info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId + if pubsubTopic.isSome(): + info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId - let unsubRes = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer) - if unsubRes.isOk(): - info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics + let res = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic.get(), contentTopics, peer=remotePeer) + + if res.isOk(): + info "unsubscribed from topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics + else: + error "failed filter unsubscription", error=res.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) else: - error "failed filter unsubscription", error=unsubRes.error - waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + let topicMapRes = parseSharding(pubsubTopic, contentTopics) + + let topicMap = + if topicMapRes.isErr(): + error "can't get shard", error = topicMapRes.error + return + else: topicMapRes.get() + + var futures = collect(newSeq): + for pubsub, topics in topicMap.pairs: + info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId + let content = topics.mapIt($it) + node.wakuFilterClientLegacy.unsubscribe($pubsub, content, peer=remotePeer) + + let finished = await allFinished(futures) + + for fut in finished: + let res = fut.read() + + if res.isErr(): + error "failed filter unsubscription", error=res.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + + for pubsub, topics in topicMap.pairs: + info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics # TODO: Move to application module (e.g., wakunode2.nim) -proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe, +proc subscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe, deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} = ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. if node.wakuFilterClientLegacy.isNil(): @@ -435,7 +491,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get()) # TODO: Move to application module (e.g., wakunode2.nim) -proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe, +proc unsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe, deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} = ## Unsubscribe from a content filter. if node.wakuFilterClientLegacy.isNil(): @@ -623,7 +679,7 @@ proc mountLightPushClient*(node: WakuNode) = node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng) -proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} = +proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} = ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. ## Returns whether relaying was successful or not. ## `WakuMessage` should contain a `contentTopic` field for light node @@ -631,12 +687,23 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe if node.wakuLightpushClient.isNil(): return err("waku lightpush client is nil") - debug "publishing message with lightpush", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, peer=peer.peerId + if pubsubTopic.isSome(): + debug "publishing message with lightpush", pubsubTopic=pubsubTopic.get(), contentTopic=message.contentTopic, peer=peer.peerId + return await node.wakuLightpushClient.publish(pubsubTopic.get(), message, peer) + + let topicMapRes = parseSharding(pubsubTopic, message.contentTopic) + + let topicMap = + if topicMapRes.isErr(): + return err(topicMapRes.error) + else: topicMapRes.get() - return await node.wakuLightpushClient.publish(pubsubTopic, message, peer) + for pubsub, _ in topicMap.pairs: # There's only one pair anyway + debug "publishing message with lightpush", pubsubTopic=pubsub, contentTopic=message.contentTopic, peer=peer.peerId + return await node.wakuLightpushClient.publish($pubsub, message, peer) # TODO: Move to application module (e.g., wakunode2.nim) -proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.async, gcsafe, +proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage): Future[void] {.async, gcsafe, deprecated: "Use 'node.lightpushPublish()' instead".} = if node.wakuLightpushClient.isNil(): error "failed to publish message", error="waku lightpush client is nil" diff --git a/waku/waku_core/topics/content_topic.nim b/waku/waku_core/topics/content_topic.nim index 96cdd84bcf..16fbcf4b5a 100644 --- a/waku/waku_core/topics/content_topic.nim +++ b/waku/waku_core/topics/content_topic.nim @@ -26,25 +26,18 @@ const DefaultContentTopic* = ContentTopic("/waku/2/default-content/proto") ## Namespaced content topic -type ShardingBias* = enum - Unbiased = "unbiased" - Lower20 = "lower20" - Higher80 = "higher80" - type NsContentTopic* = object generation*: Option[int] - bias*: ShardingBias application*: string version*: string name*: string encoding*: string -proc init*(T: type NsContentTopic, generation: Option[int], bias: ShardingBias, +proc init*(T: type NsContentTopic, generation: Option[int], application: string, version: string, name: string, encoding: string): T = NsContentTopic( generation: generation, - bias: bias, application: application, version: version, name: name, @@ -56,16 +49,13 @@ proc init*(T: type NsContentTopic, generation: Option[int], bias: ShardingBias, proc `$`*(topic: NsContentTopic): string = ## Returns a string representation of a namespaced topic ## in the format `////` - ## Autosharding adds 2 optional prefixes `//bias + ## Autosharding adds 1 optional prefix `/ var formatted = "" if topic.generation.isSome(): formatted = formatted & "/" & $topic.generation.get() - if topic.bias != ShardingBias.Unbiased: - formatted = formatted & "/" & $topic.bias - formatted & "/" & topic.application & "/" & topic.version & "/" & topic.name & "/" & topic.encoding # Deserialization @@ -73,7 +63,7 @@ proc `$`*(topic: NsContentTopic): string = proc parse*(T: type NsContentTopic, topic: ContentTopic|string): ParsingResult[NsContentTopic] = ## Splits a namespaced topic string into its constituent parts. ## The topic string has to be in the format `////` - ## Autosharding adds 2 optional prefixes `//bias + ## Autosharding adds 1 optional prefix `/ if not topic.startsWith("/"): return err(ParsingError.invalidFormat("topic must start with slash")) @@ -98,8 +88,8 @@ proc parse*(T: type NsContentTopic, topic: ContentTopic|string): ParsingResult[N if enc.len == 0: return err(ParsingError.missingPart("encoding")) - return ok(NsContentTopic.init(none(int), Unbiased, app, ver, name, enc)) - of 6: + return ok(NsContentTopic.init(none(int), app, ver, name, enc)) + of 5: if parts[0].len == 0: return err(ParsingError.missingPart("generation")) @@ -108,31 +98,23 @@ proc parse*(T: type NsContentTopic, topic: ContentTopic|string): ParsingResult[N except ValueError: return err(ParsingError.invalidFormat("generation should be a numeric value")) - if parts[1].len == 0: - return err(ParsingError.missingPart("sharding-bias")) - - let bias = try: - parseEnum[ShardingBias](parts[1]) - except ValueError: - return err(ParsingError.invalidFormat("bias should be one of; unbiased, lower20 or higher80")) - - let app = parts[2] + let app = parts[1] if app.len == 0: return err(ParsingError.missingPart("appplication")) - let ver = parts[3] + let ver = parts[2] if ver.len == 0: return err(ParsingError.missingPart("version")) - let name = parts[4] + let name = parts[3] if name.len == 0: return err(ParsingError.missingPart("topic-name")) - let enc = parts[5] + let enc = parts[4] if enc.len == 0: return err(ParsingError.missingPart("encoding")) - return ok(NsContentTopic.init(some(gen), bias, app, ver, name, enc)) + return ok(NsContentTopic.init(some(gen), app, ver, name, enc)) else: return err(ParsingError.invalidFormat("invalid topic structure")) diff --git a/waku/waku_core/topics/sharding.nim b/waku/waku_core/topics/sharding.nim index e463b018fe..ddcdba0a38 100644 --- a/waku/waku_core/topics/sharding.nim +++ b/waku/waku_core/topics/sharding.nim @@ -10,9 +10,7 @@ else: import nimcrypto, std/options, - std/math, - std/sequtils, - std/algorithm, + std/tables, stew/endians2, stew/results, stew/byteutils @@ -22,14 +20,77 @@ import ./pubsub_topic ## For indices allocation and other magic numbers refer to RFC 51 -const ClusterIndex* = 49152 -const GenerationZeroShardsCount* = 5 - -type ShardsPriority = seq[tuple[topic: NsPubsubTopic, value: float64]] - -proc shardCount*(topic: NsContentTopic): Result[int, string] = - ## Returns the total shard count, sharding selection bias - ## and the shard name from the content topic. +const ClusterIndex* = 1 +const GenerationZeroShardsCount* = 8 + +proc getGenZeroShard*(topic: NsContentTopic, count: int): NsPubsubTopic = + let bytes = toBytes(topic.application) & toBytes(topic.version) + + let hash = sha256.digest(bytes) + + # We only use the last 64 bits of the hash as having more shards is unlikely. + let hashValue = uint64.fromBytesBE(hash.data[24..31]) + + # This is equilavent to modulo shard count but faster + let shard = hashValue and uint64((count - 1)) + + NsPubsubTopic.staticSharding(ClusterIndex, uint16(shard)) + +proc getShard*(topic: NsContentTopic): Result[NsPubsubTopic, string] = + ## Compute the (pubsub topic) shard to use for this content topic. + + if topic.generation.isNone(): + ## Implicit generation # is 0 for all content topic + return ok(getGenZeroShard(topic, GenerationZeroShardsCount)) + + case topic.generation.get(): + of 0: return ok(getGenZeroShard(topic, GenerationZeroShardsCount)) + else: return err("Generation > 0 are not supported yet") + +proc parseSharding*(pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]): Result[Table[NsPubsubTopic, seq[NsContentTopic]], string] = + var topics: seq[ContentTopic] + when contentTopics is seq[ContentTopic]: + topics = contentTopics + else: + topics = @[contentTopics] + + var topicMap = initTable[NsPubsubTopic, seq[NsContentTopic]]() + for contentTopic in topics: + let parseRes = NsContentTopic.parse(contentTopic) + + let content = + if parseRes.isErr(): + return err("Cannot parse content topic: " & $parseRes.error) + else: parseRes.get() + + let pubsub = + if pubsubTopic.isSome(): + let parseRes = NsPubsubTopic.parse(pubsubTopic.get()) + + if parseRes.isErr(): + return err("Cannot parse pubsub topic: " & $parseRes.error) + else: parseRes.get() + else: + let shardsRes = getShard(content) + + if shardsRes.isErr(): + return err("Cannot autoshard content topic: " & $shardsRes.error) + else: shardsRes.get() + + if not topicMap.hasKey(pubsub): + topicMap[pubsub] = @[] + + try: + topicMap[pubsub].add(content) + except CatchableError: + return err(getCurrentExceptionMsg()) + + ok(topicMap) + +#type ShardsPriority = seq[tuple[topic: NsPubsubTopic, value: float64]] + +#[ proc shardCount*(topic: NsContentTopic): Result[int, string] = + ## Returns the total shard count from the content topic. let shardCount = if topic.generation.isNone(): ## Implicit generation # is 0 for all content topic @@ -41,34 +102,15 @@ proc shardCount*(topic: NsContentTopic): Result[int, string] = else: return err("Generation > 0 are not supported yet") - ok((shardCount)) - -proc biasedWeights*(shardCount: int, bias: ShardingBias): seq[float64] = - var weights = repeat(1.0, shardCount) - - case bias: - of Unbiased: - return weights - of Lower20: - # we choose the lower 20% of shards and double their weigths - let index = shardCount div 5 - for i in (0..