Skip to content

Commit

Permalink
chore(examples): add pubsub example with production env (#1333)
Browse files Browse the repository at this point in the history
* chore(examples): add pubsub example with production env

* chore(examples): fix comments 1/2

* chore(examples): fix comments 2/2
  • Loading branch information
alrevuelta authored Nov 4, 2022
1 parent 3c8fab7 commit 1244342
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 5 deletions.
34 changes: 34 additions & 0 deletions examples/v2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# basic2

TODO

# publisher/subscriber

Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publises messages to the default pubsub topic to a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives.

**Some notes:**
* These examples are meant to work even in if you are behind a firewall and you can't be discovered by discv5.
* You only need to provide a reachable bootstrap peer (see our [fleets](https://fleets.status.im/))
* The examples are meant to work out of the box.
* Note that both services wait for some time until a given minimum amount of connections are reached. This is to ensure messages are gossiped.

**Compile:**

Make all examples.
```console
make example2
```

**Run:**

Wait until the subscriber is ready.
```console
./build/subscriber
```

And run a publisher
```console
./build/publisher
```

See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations.
5 changes: 3 additions & 2 deletions examples/v2/basic2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import
std/[os,options],
confutils, chronicles, chronos,
stew/shims/net as stewNet,
stew/byteutils,
libp2p/crypto/[crypto,secp],
eth/keys,
json_rpc/[rpcclient, rpcserver],
Expand All @@ -29,15 +30,15 @@ proc runBackground() {.async.} =
await node.mountRelay()

# Subscribe to a topic
let topic = cast[PubsubTopic]("foobar")
let topic = PubsubTopic("foobar")
proc handler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.init(data).value
let payload = cast[string](message.payload)
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
node.subscribe(topic, handler)

# Publish to a topic
let payload = cast[seq[byte]]("hello world")
let payload = toBytes("hello world")
let message = WakuMessage(payload: payload, contentTopic: ContentTopic("/waku/2/default-content/proto"))
await node.publish(topic, message)

Expand Down
88 changes: 88 additions & 0 deletions examples/v2/publisher.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import
std/[tables,times,sequtils],
stew/byteutils,
stew/shims/net,
chronicles,
chronicles/topics_registry,
chronos,
confutils,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr

import
../../../waku/v2/node/discv5/waku_discv5,
../../../waku/v2/node/peer_manager/peer_manager,
../../../waku/v2/node/waku_node,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/utils/time,
../../../waku/v2/utils/wakuenr

proc now*(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())

# An accesible bootstrap node. See wakuv2.prod fleets.status.im
const bootstrapNodes = @["enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"]

# careful if running pub and sub in the same machine
const wakuPort = 60000
const discv5Port = 9000

proc setupAndPublish() {.async.} =
# use notice to filter all waku messaging
setLogLevel(LogLevel.NOTICE)
notice "starting publisher", wakuPort=wakuPort, discv5Port=discv5Port
let
nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
ip = ValidIpAddress.init("0.0.0.0")
node = WakuNode.new(nodeKey, ip, Port(wakuPort))
flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true)

# assumes behind a firewall, so not care about being discoverable
node.wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
bindIP = ip,
discv5UdpPort = Port(discv5Port),
bootstrapNodes = bootstrapNodes,
privateKey = keys.PrivateKey(nodeKey.skkey),
flags = flags,
enrFields = [],
rng = node.rng)

await node.start()
await node.mountRelay()
if not await node.startDiscv5():
error "failed to start discv5"
quit(1)

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected)
if numConnectedPeers >= 6:
notice "publisher is ready", connectedPeers=numConnectedPeers, required=6
break
notice "waiting to be ready", connectedPeers=numConnectedPeers, required=6
await sleepAsync(5000)

# Make sure it matches the publisher. Use default value
# see spec: https://rfc.vac.dev/spec/23/
let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto")

# any content topic can be chosen
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")

notice "publisher service started"
while true:
let text = "hi there i'm a publisher"
let message = WakuMessage(payload: toBytes(text), # content of the message
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now()) # current timestamp
await node.publish(pubSubTopic, message)
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic
await sleepAsync(5000)

asyncSpawn setupAndPublish()
runForever()
84 changes: 84 additions & 0 deletions examples/v2/subscriber.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import
std/[tables, sequtils],
stew/byteutils,
stew/shims/net,
chronicles,
chronicles/topics_registry,
chronos,
confutils,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr

import
../../../waku/v2/node/discv5/waku_discv5,
../../../waku/v2/node/peer_manager/peer_manager,
../../../waku/v2/node/waku_node,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/utils/wakuenr

# An accesible bootstrap node. See wakuv2.prod fleets.status.im
const bootstrapNodes = @["enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"]

# careful if running pub and sub in the same machine
const wakuPort = 50000
const discv5Port = 8000

proc setupAndSubscribe() {.async.} =
# use notice to filter all waku messaging
setLogLevel(LogLevel.NOTICE)
notice "starting subscriber", wakuPort=wakuPort, discv5Port=discv5Port
let
nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
ip = ValidIpAddress.init("0.0.0.0")
node = WakuNode.new(nodeKey, ip, Port(wakuPort))
flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true)

# assumes behind a firewall, so not care about being discoverable
node.wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
bindIP = ip,
discv5UdpPort = Port(discv5Port),
bootstrapNodes = bootstrapNodes,
privateKey = keys.PrivateKey(nodeKey.skkey),
flags = flags,
enrFields = [],
rng = node.rng)

await node.start()
await node.mountRelay()
if not await node.startDiscv5():
error "failed to start discv5"
quit(1)

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected)
if numConnectedPeers >= 6:
notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6
break
notice "waiting to be ready", connectedPeers=numConnectedPeers, required=6
await sleepAsync(5000)

# Make sure it matches the publisher. Use default value
# see spec: https://rfc.vac.dev/spec/23/
let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto")

# any content topic can be chosen. make sure it matches the publisher
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")

proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.init(data).value
let payloadStr = string.fromBytes(message.payload)
if message.contentTopic == contentTopic:
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp
node.subscribe(pubSubTopic, handler)

asyncSpawn setupAndSubscribe()

runForever()
6 changes: 3 additions & 3 deletions waku.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ task sim2, "Build Waku v2 simulation tools":
buildBinary "start_network2", "tools/simulation/", "-d:chronicles_log_level=TRACE"

task example2, "Build Waku v2 example":
let name = "basic2"
buildBinary name, "examples/v2/", "-d:chronicles_log_level=DEBUG"
buildBinary "basic2", "examples/v2/", "-d:chronicles_log_level=DEBUG"
buildBinary "publisher", "examples/v2/", "-d:chronicles_log_level=DEBUG"
buildBinary "subscriber", "examples/v2/", "-d:chronicles_log_level=DEBUG"

task scripts2, "Build Waku v2 scripts":
buildBinary "rpc_publish", "tools/scripts/", "-d:chronicles_log_level=DEBUG"
Expand All @@ -103,7 +104,6 @@ task chat2bridge, "Build chat2bridge":
let name = "chat2bridge"
buildBinary name, "apps/chat2bridge/", "-d:chronicles_log_level=TRACE"


### Waku Tooling
task wakucanary, "Build waku-canary tool":
let name = "wakucanary"
Expand Down

0 comments on commit 1244342

Please sign in to comment.