Skip to content

Commit

Permalink
chore: allow autosharding in any cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta committed Mar 12, 2024
1 parent 505479b commit 5921c68
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 106 deletions.
119 changes: 72 additions & 47 deletions tests/waku_core/topics/test_sharding.nim
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
import
std/[
options,
tables
],
testutils/unittests
import std/[options, tables], testutils/unittests

import ../../../../waku/waku_core/topics, ../../testlib/[wakucore, tables, testutils]

import
../../../../waku/waku_core/topics,
../../testlib/[
wakucore,
tables,
testutils
]

const GenerationZeroShardsCount = 8
const ClusterId = 1

suite "Autosharding":
const
Expand All @@ -22,131 +12,166 @@ suite "Autosharding":
contentTopicShort = "/toychat/2/huilong/proto"
contentTopicFull = "/0/toychat/2/huilong/proto"
contentTopicInvalid = "/1/toychat/2/huilong/proto"


suite "getGenZeroShard":
test "Generate Gen0 Shard":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# Given two valid topics
let
let
nsContentTopic1 = NsContentTopic.parse(contentTopicShort).value()
nsContentTopic2 = NsContentTopic.parse(contentTopicFull).value()

# When we generate a gen0 shard from them
let
nsPubsubTopic1 = getGenZeroShard(nsContentTopic1, GenerationZeroShardsCount)
nsPubsubTopic2 = getGenZeroShard(nsContentTopic2, GenerationZeroShardsCount)

let
nsPubsubTopic1 =
sharding.getGenZeroShard(nsContentTopic1, GenerationZeroShardsCount)
nsPubsubTopic2 =
sharding.getGenZeroShard(nsContentTopic2, GenerationZeroShardsCount)

# Then the generated shards are valid
check:
nsPubsubTopic1 == NsPubsubTopic.staticSharding(ClusterId, 3)
nsPubsubTopic2 == NsPubsubTopic.staticSharding(ClusterId, 3)

suite "getShard from NsContentTopic":
test "Generate Gen0 Shard with topic.generation==none":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)

# When we get a shard from a topic without generation
let nsPubsubTopic = getShard(contentTopicShort)
let nsPubsubTopic = sharding.getShard(contentTopicShort)

# Then the generated shard is valid
check:
nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3)

test "Generate Gen0 Shard with topic.generation==0":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When we get a shard from a gen0 topic
let nsPubsubTopic = getShard(contentTopicFull)
let nsPubsubTopic = sharding.getShard(contentTopicFull)

# Then the generated shard is valid
check:
nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3)

test "Generate Gen0 Shard with topic.generation==other":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When we get a shard from ain invalid content topic
let nsPubsubTopic = getShard(contentTopicInvalid)
let nsPubsubTopic = sharding.getShard(contentTopicInvalid)

# Then the generated shard is valid
check:
nsPubsubTopic.error() == "Generation > 0 are not supported yet"

suite "getShard from ContentTopic":
test "Generate Gen0 Shard with topic.generation==none":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When we get a shard from it
let nsPubsubTopic = getShard(contentTopicShort)
let nsPubsubTopic = sharding.getShard(contentTopicShort)

# Then the generated shard is valid
check:
nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3)

test "Generate Gen0 Shard with topic.generation==0":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When we get a shard from it
let nsPubsubTopic = getShard(contentTopicFull)
let nsPubsubTopic = sharding.getShard(contentTopicFull)

# Then the generated shard is valid
check:
nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3)

test "Generate Gen0 Shard with topic.generation==other":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When we get a shard from it
let nsPubsubTopic = getShard(contentTopicInvalid)
let nsPubsubTopic = sharding.getShard(contentTopicInvalid)

# Then the generated shard is valid
check:
nsPubsubTopic.error() == "Generation > 0 are not supported yet"

test "Generate Gen0 Shard invalid topic":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When we get a shard from it
let nsPubsubTopic = getShard("invalid")
let nsPubsubTopic = sharding.getShard("invalid")

# Then the generated shard is valid
check:
nsPubsubTopic.error() == "invalid format: topic must start with slash"

suite "parseSharding":
test "contentTopics is ContentTopic":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When calling with contentTopic as string
let topicMap = parseSharding(some(pubsubTopic04), contentTopicShort)
let topicMap = sharding.parseSharding(some(pubsubTopic04), contentTopicShort)

# Then the topicMap is valid
check:
topicMap.value() == {pubsubTopic04: @[contentTopicShort]}

test "contentTopics is seq[ContentTopic]":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When calling with contentTopic as string seq
let topicMap = parseSharding(some(pubsubTopic04), @[contentTopicShort, "/0/foo/1/bar/proto"])
let topicMap = sharding.parseSharding(
some(pubsubTopic04), @[contentTopicShort, "/0/foo/1/bar/proto"]
)

# Then the topicMap is valid
check:
topicMap.value() == {pubsubTopic04: @[contentTopicShort, "/0/foo/1/bar/proto"]}

test "pubsubTopic is none":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When calling with pubsubTopic as none
let topicMap = parseSharding(PubsubTopic.none(), contentTopicShort)
let topicMap = sharding.parseSharding(PubsubTopic.none(), contentTopicShort)

# Then the topicMap is valid
check:
topicMap.value() == {pubsubTopic13: @[contentTopicShort]}

test "content parse error":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When calling with pubsubTopic as none with invalid content
let topicMap = parseSharding(PubsubTopic.none(), "invalid")
let topicMap = sharding.parseSharding(PubsubTopic.none(), "invalid")

# Then the topicMap is valid
check:
topicMap.error() == "Cannot parse content topic: invalid format: topic must start with slash"
topicMap.error() ==
"Cannot parse content topic: invalid format: topic must start with slash"

test "pubsubTopic parse error":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When calling with pubsubTopic as none with invalid content
let topicMap = parseSharding(some("invalid"), contentTopicShort)
let topicMap = sharding.parseSharding(some("invalid"), contentTopicShort)

# Then the topicMap is valid
check:
topicMap.error() == "Cannot parse pubsub topic: invalid format: must start with /waku/2"
topicMap.error() ==
"Cannot parse pubsub topic: invalid format: must start with /waku/2"

test "pubsubTopic getShard error":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# When calling with pubsubTopic as none with invalid content
let topicMap = parseSharding(PubsubTopic.none(), contentTopicInvalid)
let topicMap = sharding.parseSharding(PubsubTopic.none(), contentTopicInvalid)

# Then the topicMap is valid
check:
topicMap.error() == "Cannot autoshard content topic: Generation > 0 are not supported yet"
topicMap.error() ==
"Cannot autoshard content topic: Generation > 0 are not supported yet"

xtest "catchable error on add to topicMap":
# TODO: Trigger a CatchableError or mock
Expand Down
11 changes: 6 additions & 5 deletions tests/waku_relay/test_wakunode_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,12 @@ suite "WakuNode - Relay":

await node.start()
await node.mountRelay()
require node.mountSharding(1, 1).isOk
echo node.wakuSharding.getShard(DefaultContentTopic)

## Given
let
shard = "/waku/2/rs/1/1"
shard = "/waku/2/rs/1/0"
contentTopicA = DefaultContentTopic
contentTopicB = ContentTopic("/waku/2/default-content1/proto")
contentTopicC = ContentTopic("/waku/2/default-content2/proto")
Expand All @@ -520,10 +522,9 @@ suite "WakuNode - Relay":
): Future[void] {.gcsafe, raises: [Defect].} =
discard pubsubTopic
discard message

assert shard == getShard(contentTopicA).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(contentTopicB).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(contentTopicC).expect("Valid Topic"), "topic must use the same shard"
assert shard == node.wakuSharding.getShard(contentTopicA).expect("Valid Topic"), "topic must use the same shard"
assert shard == node.wakuSharding.getShard(contentTopicB).expect("Valid Topic"), "topic must use the same shard"
assert shard == node.wakuSharding.getShard(contentTopicC).expect("Valid Topic"), "topic must use the same shard"

## When
node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler))
Expand Down
11 changes: 5 additions & 6 deletions tests/wakunode_rest/test_rest_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ suite "Waku v2 Rest API - Relay":
let node = testWakuNode()
await node.start()
await node.mountRelay()
require node.mountSharding(1, 8).isOk

var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
Expand All @@ -276,13 +277,11 @@ suite "Waku v2 Rest API - Relay":
restServer.start()

let contentTopics = @[
ContentTopic("/waku/2/default-content1/proto"),
ContentTopic("/waku/2/default-content2/proto"),
ContentTopic("/waku/2/default-content3/proto")
ContentTopic("/app-1/2/default-content/proto"),
ContentTopic("/app-2/2/default-content/proto"),
ContentTopic("/app-3/2/default-content/proto")
]

let shards = contentTopics.mapIt(getShard(it).expect("Valid Shard")).deduplicate()

# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.relayPostAutoSubscriptionsV1(contentTopics)
Expand All @@ -300,7 +299,7 @@ suite "Waku v2 Rest API - Relay":

check:
# Node should be subscribed to all shards
toSeq(node.wakuRelay.subscribedTopics).len == shards.len
node.wakuRelay.subscribedTopics == @["/waku/2/rs/1/7", "/waku/2/rs/1/2", "/waku/2/rs/1/5"]

await restServer.stop()
await restServer.closeWait()
Expand Down
26 changes: 8 additions & 18 deletions waku/factory/internal_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,14 @@ proc enrConfiguration*(conf: WakuNodeConf, netConfig: NetConfig, key: crypto.Pri

enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)

let topics =
if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0:
let shardsRes = conf.contentTopics.mapIt(getShard(it))
for res in shardsRes:
if res.isErr():
error "failed to shard content topic", error=res.error
return err($res.error)

let shards = shardsRes.mapIt(it.get())

conf.pubsubTopics & shards
else:
conf.topics

let addShardedTopics = enrBuilder.withShardedTopics(topics)
if addShardedTopics.isErr():
error "failed to add sharded topics to ENR", error=addShardedTopics.error
return err($addShardedTopics.error)
let shards: seq[uint16] =
# no shards configured
if conf.shards.len == 0: toSeq(0..<conf.topics.len).mapIt(uint16(it))
# some shards configured
else: toSeq(conf.shards.mapIt(uint16(it)))

enrBuilder.withWakuRelaySharding(RelayShards(clusterId:uint16(conf.clusterId), shardIds: shards)).isOkOr():
return err("could not initialize ENR with shards")

let recordRes = enrBuilder.build()
let record =
Expand Down
5 changes: 4 additions & 1 deletion waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ proc setupProtocols*(node: WakuNode,
node.mountMetadata(conf.clusterId).isOkOr:
return err("failed to mount waku metadata protocol: " & error)

node.mountSharding(conf.clusterId, uint32(conf.pubsubTopics.len)).isOkOr:
return err("failed to mount waku sharding: " & error)

# Mount relay on all nodes
var peerExchangeHandler = none(RoutingRecordsHandler)
if conf.relayPeerExchange:
Expand All @@ -163,7 +166,7 @@ proc setupProtocols*(node: WakuNode,
if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0:
# TODO autoshard content topics only once.
# Already checked for errors in app.init
let shards = conf.contentTopics.mapIt(getShard(it).expect("Valid Shard"))
let shards = conf.contentTopics.mapIt(node.wakuSharding.getShard(it).expect("Valid Shard"))
conf.pubsubTopics & shards
else:
conf.topics
Expand Down
Loading

0 comments on commit 5921c68

Please sign in to comment.