Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: example using filter and lightpush #1720

Merged
merged 3 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions examples/v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ TODO

# publisher/subscriber

Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publises messages to the default pubsub topic to a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives.
Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publishes messages to the default pubsub topic on a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives.

**Some notes:**
* These examples are meant to work even in if you are behind a firewall and you can't be discovered by discv5.
Expand All @@ -31,4 +31,34 @@ And run a publisher
./build/publisher
```

See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations.
See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations.

# resource-restricted publisher/subscriber (lightpush/filter)

To illustrate publishing and receiving messages on a resource-restricted client,
`examples/v2` also provides a `lightpush_publisher` and a `filter_subscriber`.
The `lightpush_publisher` continually publishes messages via a lightpush service node
to the default pubsub topic on a given content topic.
The `filter_subscriber` subscribes via a filter service node
to the same pubsub and content topic.
It runs forever, maintaining this subscription
and printing the content it receives.

**compile and run:**

Wait until the filter subscriber is ready.
```console
./env.sh bash
nim c -r examples/v2/filter_subscriber.nim
```

And run a lightpush publisher
```console
./env.sh bash
nim c -r examples/v2/lightpush_publisher.nim
```

See how the filter subscriber receives messages published by the lightpush publisher.
Neither the publisher nor the subscriber participates in `relay`,
but instead make use of service nodes to save resources.
Feel free to experiment from different machines in different locations.
82 changes: 82 additions & 0 deletions examples/v2/filter_subscriber.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
## Example showing how a resource restricted client may
## subscribe to messages without relay

import
chronicles,
chronos,
stew/byteutils,
stew/results
import
../../../waku/common/logging,
../../../waku/v2/node/peer_manager,
../../../waku/v2/waku_core,
../../../waku/v2/waku_filter_v2/client

const
FilterPeer = "/ip4/104.154.239.128/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS" # node-01.gc-us-central1-a.wakuv2.test.statusim.net on wakuv2.test
FilterPubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
FilterContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")

proc unsubscribe(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic) {.async.} =
notice "unsubscribing from filter"
let unsubscribeRes = await wfc.unsubscribe(filterPeer, filterPubsubTopic, @[filterContentTopic])
if unsubscribeRes.isErr:
notice "unsubscribe request failed", err=unsubscribeRes.error
else:
notice "unsubscribe request successful"

proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) =
let payloadStr = string.fromBytes(message.payload)
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp

proc maintainSubscription(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic) {.async.} =
while true:
notice "maintaining subscription"
# First use filter-ping to check if we have an active subscription
let pingRes = await wfc.ping(filterPeer)
if pingRes.isErr:
jm-clius marked this conversation as resolved.
Show resolved Hide resolved
# No subscription found. Let's subscribe.
notice "no subscription found. Sending subscribe request"

let subscribeRes = await wfc.subscribe(filterPeer, filterPubsubTopic, @[filterContentTopic])

if subscribeRes.isErr:
jm-clius marked this conversation as resolved.
Show resolved Hide resolved
notice "subscribe request failed. Quitting.", err=subscribeRes.error
break
else:
notice "subscribe request successful."
else:
notice "subscription found."

await sleepAsync(60.seconds) # Subscription maintenance interval

proc setupAndSubscribe(rng: ref HmacDrbgContext) =
let filterPeer = parsePeerInfo(FilterPeer).get()

setupLogLevel(logging.LogLevel.NOTICE)
notice "starting filter subscriber"

var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wfc = WakuFilterClient.new(rng, messagePushHandler, pm)

# Mount filter client protocol
switch.mount(wfc)

# Start maintaining subscription
asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic)

Comment on lines +77 to +78
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding, in which cases we should use asyncSpawn ? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is to dispatch a future without awaiting its result. However, if you simply discard the future it may contain errors which gets "swallowed". For that reason, you should always use asyncSpawn, which will convert any error in the future to a Defect. See https://github.com/status-im/nim-chronos/blob/8563c936733a28bc1e773542e9741b5b55dfcaeb/chronos/asyncloop.nim#L130-L137

when isMainModule:
let rng = newRng()
setupAndSubscribe(rng)
runForever()
57 changes: 57 additions & 0 deletions examples/v2/lightpush_publisher.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
## Example showing how a resource restricted client may
## use lightpush to publish messages without relay

import
chronicles,
chronos,
stew/byteutils,
stew/results
import
../../../waku/common/logging,
../../../waku/v2/node/peer_manager,
../../../waku/v2/waku_core,
../../../waku/v2/waku_lightpush/client

const
LightpushPeer = "/ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ" # node-01.do-ams3.wakuv2.test.statusim.net on wakuv2.test
LightpushPubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")

proc publishMessages(wlc: WakuLightpushClient,
lightpushPeer: RemotePeerInfo,
lightpushPubsubTopic: PubsubTopic,
lightpushContentTopic: ContentTopic) {.async.} =
while true:
let text = "hi there i'm a lightpush publisher"
let message = WakuMessage(payload: toBytes(text), # content of the message
contentTopic: lightpushContentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: getNowInNanosecondTime()) # current timestamp

let wlpRes = await wlc.publish(lightpushPubsubTopic, message, lightpushPeer)

if wlpRes.isOk():
notice "published message using lightpush", message=message
else:
notice "failed to publish message using lightpush", err=wlpRes.error()

await sleepAsync(5000) # Publish every 5 seconds

proc setupAndPublish(rng: ref HmacDrbgContext) =
let lightpushPeer = parsePeerInfo(LightpushPeer).get()

setupLogLevel(logging.LogLevel.NOTICE)
notice "starting lightpush publisher"

var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wlc = WakuLightpushClient.new(pm, rng)

# Start maintaining subscription
asyncSpawn publishMessages(wlc, lightpushPeer, LightpushPubsubTopic, LightpushContentTopic)

when isMainModule:
let rng = newRng()
setupAndPublish(rng)
runForever()