Skip to content

Commit

Permalink
Merge branch 'master' into dhi
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored Oct 31, 2023
2 parents d693e49 + 36beb9d commit f25031f
Show file tree
Hide file tree
Showing 51 changed files with 646 additions and 499 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,13 @@ build-example-filter2:
build-example-c-bindings:
cd examples/c-bindings && $(MAKE)

build-example-noise:
cd examples/noise && $(MAKE)

build-example-rln:
cd examples/rln && $(MAKE)

build-example: build-example-basic2 build-example-chat-2 build-example-filter2 build-example-c-bindings build-example-rln
build-example: build-example-basic2 build-example-chat-2 build-example-filter2 build-example-c-bindings build-example-noise build-example-rln

static-library:
@echo "Building static library..."
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
return
}

_, err = r.node.Relay().PublishToTopic(req.Context(), message, strings.Replace(topic, "\n", "", -1))
_, err = r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1)))
if err != nil {
r.log.Error("publishing message", zap.Error(err))
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/waku/server/rpc/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ func TestFilterGetV1Messages(t *testing.T) {
// Wait for the subscription to be started
time.Sleep(1 * time.Second)

_, err = serviceA.node.Relay().PublishToTopic(
_, err = serviceA.node.Relay().Publish(
context.Background(),
&wpb.WakuMessage{ContentTopic: "ct"},
testTopic,
relay.WithPubSubTopic(testTopic),
)
require.NoError(t, err)
require.True(t, reply)
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
return err
}

_, err = r.node.Relay().PublishToTopic(req.Context(), msg, topic)
_, err = r.node.Relay().Publish(req.Context(), msg, relay.WithPubSubTopic(topic))
if err != nil {
r.log.Error("publishing message", zap.Error(err))
return err
Expand Down
17 changes: 6 additions & 11 deletions docs/api/lightpush.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,17 @@ if err != nil {
```


To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer. The payload of the message is not limited to strings. Any kind of data that can be serialized
To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer.
The payload of the message is not limited to strings. Any kind of data that can be serialized
into a `[]byte` can be sent as long as it does not exceed the maximum length a message can have (~1MB)

The following functions can be used to publish a message:
- `wakuNode.Lightpush().Publish(ctx, msg, opts...)` - to send a message to the default waku pubsub topic
- `wakuNode.Lightpush().PublishToTopic(ctx, msg, topic, opts...)` - to send a message to a custom pubsub topic
`wakuNode.Lightpush().Publish(ctx, msg, opts...)` is used to publish a message. This function will return a message id on success, or an error if the message could not be published.

Both of these functions will return a message id on success, or an error if the message could not be published.

If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Lightpush. This behaviour can be controlled via options:
If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Lightpush and publish the message to a pubsub topic derived from the content topic of the message. This behaviour can be controlled via options:

### Options

- `lightpush.WithPubSubTopic(topic)` - broadcast the message using a custom pubsub topic
- `lightpush.WithDefaultPubsubTopic()` - broadcast the message to the default pubsub topic
- `lightpush.WithPeer(peerID)` - use an specific peer ID (which should be part of the node peerstore) to broadcast the message with
- `lightpush.WithAutomaticPeerSelection(host)` - automatically select a peer that supports lightpush protocol from the peerstore to broadcast the message with
- `lightpush.WithFastestPeerSelection(ctx)` - automatically select a peer based on its ping reply time



33 changes: 10 additions & 23 deletions docs/api/relay.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,19 @@ One of these options must be specified when instantiating a node supporting the
## Receiving messages
```go
...
sub, err := wakuNode.Relay().Subscribe(context.Background())
contentFilter := protocol.NewContentFilter(relay.DefaultWakuTopic)
sub, err := wakuNode.Relay().Subscribe(context.Background, contentFilter) ([]*Subscription, error)
if err != nil {
fmt.Println(err)
return
}

for value := range sub.C {
for value := range sub[0].C {
fmt.Println("Received msg:", string(value.Message().Payload))
}
...
```
To receive messages sent via the relay protocol, you need to subscribe to a pubsub topic. This can be done via any of these functions:
- `wakuNode.Relay().Subscribe(ctx)` - subscribes to the default waku pubsub topic `/waku/2/default-waku/proto`
- `wakuNode.Relay().SubscribeToTopic(ctx, topic)` - subscribes to a custom pubsub topic

These functions return a `Subscription` struct containing a channel on which messages will be received. To stop receiving messages in this channel `sub.Unsubscribe()` can be executed which will close the channel (without unsubscribing from the pubsub topic)

> Pubsub topics should follow the [recommended usage](https://rfc.vac.dev/spec/23/) structure. For this purpose, the `NewPubsubTopic` helper function was created:
```go
import "github.com/waku-org/go-waku/waku/v2/protocol"

topic := protocol.NewPubsubTopic("the_topic_name", "the_encoding")
/*
fmt.Println(topic.String()) // => `/waku/2/the_topic_name/the_encoding`
*/
```


To receive messages sent via the relay protocol, you need to subscribe specifying a content filter with the function `Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error)`. This functions return a list of `Subscription` struct containing a channel on which messages will be received. To stop receiving messages `WakuRelay`'s `Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error` can be executed which will close the channel (without unsubscribing from the pubsub topic) which will make sure the subscription is stopped, and if no other subscriptions exist for underlying pubsub topic, the pubsub is also unsubscribed.

## Sending messages

Expand All @@ -95,11 +80,13 @@ if err != nil {
To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer. The payload of the message is not limited to strings. Any kind of data that can be serialized
into a `[]byte` can be sent as long as it does not exceed the maximum length a message can have (~1MB)

The following functions can be used to publish a message:
- `wakuNode.Relay().Publish(ctx, msg)` - to send a message to the default waku pubsub topic
- `wakuNode.Relay().PublishToTopic(ctx, msg, topic)` - to send a message to a custom pubsub topic
`wakuNode.Relay().Publish(ctx, msg, opts...)` is used to publish a message. This function will return a message id on success, or an error if the message could not be published.

Both of these functions will return a message id on success, or an error if the message could not be published.
If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Relay and publish the message to a pubsub topic derived from the content topic of the message. This behaviour can be controlled via options:

### Options
- `relay.WithPubSubTopic(topic)` - broadcast the message using a custom pubsub topic
- `relay.WithDefaultPubsubTopic()` - broadcast the message to the default pubsub topic

> If `WithWakuRelayAndMinPeers` was used during the instantiation of the wakuNode, it should be possible to verify if there's enough peers for publishing to a topic with `wakuNode.Relay().EnoughPeersToPublish()` and `wakuNode.Relay().EnoughPeersToPublishToTopic(topic)`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,3 @@ fun ContentTopic(applicationName: String, applicationVersion: Long, contentTopic
return Gowaku.contentTopic(applicationName, applicationVersion, contentTopicName, encoding)
}

/**
* Create a pubsub topic string
* @param name
* @param encoding
* @return Pubsub topic string according to RFC 23
*/
fun PubsubTopic(name: String, encoding: String): String {
return Gowaku.pubsubTopic(name, encoding)
}
2 changes: 1 addition & 1 deletion examples/basic2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, contentTopic string, ms
Timestamp: timestamp,
}

_, err = wakuNode.Relay().Publish(ctx, msg)
_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic())
if err != nil {
log.Error("Error sending a message", zap.Error(err))
}
Expand Down
10 changes: 5 additions & 5 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,18 +319,18 @@ func (c *Chat) publish(ctx context.Context, message string) error {
}

if c.options.LightPush.Enable {
var lightOpt lightpush.Option
lightOpt := []lightpush.Option{lightpush.WithDefaultPubsubTopic()}
var peerID peer.ID
peerID, err = options.LightPush.NodePeerID()
if err != nil {
lightOpt = lightpush.WithAutomaticPeerSelection()
lightOpt = append(lightOpt, lightpush.WithAutomaticPeerSelection())
} else {
lightOpt = lightpush.WithPeer(peerID)
lightOpt = append(lightOpt, lightpush.WithPeer(peerID))
}

_, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt)
_, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt...)
} else {
_, err = c.node.Relay().Publish(ctx, wakuMsg)
_, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic())
}

return err
Expand Down
6 changes: 4 additions & 2 deletions examples/filter2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)

var log = logging.Logger("filter2")

var pubSubTopic = protocol.DefaultPubsubTopic()
var pubSubTopic = protocol.DefaultPubsubTopic{}

const contentTopic = "/filter2test/1/testTopic/proto"

Expand Down Expand Up @@ -98,6 +99,7 @@ func main() {

// Send FilterRequest from light node to full node
cf := protocol.ContentFilter{
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: protocol.NewContentTopicSet(contentTopic),
}

Expand Down Expand Up @@ -157,7 +159,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
Timestamp: timestamp,
}

_, err := wakuNode.Relay().Publish(ctx, msg)
_, err := wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic.String()))
if err != nil {
log.Error("Error sending a message: ", err)
}
Expand Down
8 changes: 5 additions & 3 deletions examples/noise/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
n "github.com/waku-org/go-noise"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/noise"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
Expand Down Expand Up @@ -186,21 +188,21 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.P

msg.Timestamp = wakuNode.Timesource().Now().UnixNano()

_, err = wakuNode.Relay().Publish(ctx, msg)
_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic())
if err != nil {
log.Error("Error sending a message", zap.Error(err))
}
}
}

func readLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.Pairing) {
sub, err := wakuNode.Relay().Subscribe(ctx)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
if err != nil {
log.Error("Could not subscribe", zap.Error(err))
return
}

for value := range sub.Ch {
for value := range sub[0].Ch {
if value.Message().ContentTopic != pairingObj.ContentTopic {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion examples/rln/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const credentialsPath = ""
const credentialsPassword = ""

var contentTopic = protocol.NewContentTopic("rln", 1, "test", "proto").String()
var pubsubTopic = protocol.DefaultPubsubTopic()
var pubsubTopic = protocol.DefaultPubsubTopic{}
```
The private key used here should contain enough Sepolia ETH to register on the contract (0.001 ETH). An ethereum client address is required as well. After updating these values, execute `make`

Expand Down
5 changes: 3 additions & 2 deletions examples/rln/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
Expand All @@ -31,7 +32,7 @@ var keystorePath = "./rlnKeystore.json"
var keystorePassword = "password"
var membershipIndex = uint(0)
var contentTopic, _ = protocol.NewContentTopic("rln", 1, "test", "proto")
var pubsubTopic = protocol.DefaultPubsubTopic()
var pubsubTopic = protocol.DefaultPubsubTopic{}

// ============================================================================

Expand Down Expand Up @@ -128,7 +129,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
log.Error("Error appending proof", zap.Error(err))
}

_, err = wakuNode.Relay().PublishToTopic(ctx, msg, pubsubTopic.String())
_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic.String()))
if err != nil {
log.Error("Error sending a message", zap.Error(err))
}
Expand Down
14 changes: 0 additions & 14 deletions examples/waku-csharp/waku-csharp/Waku.Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,5 @@ public static string ContentTopic(string applicationName, uint applicationVersio
return Response.PtrToStringUtf8(ptr);
}

[DllImport(Constants.dllName)]
internal static extern IntPtr waku_pubsub_topic(string name, string encoding);

/// <summary>
/// Create a pubsub topic string
/// </summary>
/// <param name="name"></param>
/// <param name="encoding"></param>
/// <returns>Pubsub topic string according to RFC 23</returns>
public static string PubsubTopic(string name, string encoding)
{
IntPtr ptr = waku_pubsub_topic(name, encoding);
return Response.PtrToStringUtf8(ptr);
}
}
}
7 changes: 0 additions & 7 deletions library/c/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,6 @@ func waku_content_topic(applicationName *C.char, applicationVersion C.uint, cont
return onSuccesfulResponse(contentTopic.String(), cb, userData)
}

// Create a pubsub topic string according to RFC 23
//
//export waku_pubsub_topic
func waku_pubsub_topic(name *C.char, encoding *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
topic := library.PubsubTopic(C.GoString(name), C.GoString(encoding))
return onSuccesfulResponse(topic, cb, userData)
}

// Get the default pubsub topic used in waku2: /waku/2/default-waku/proto
//
Expand Down
2 changes: 1 addition & 1 deletion library/lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms
lpOptions = append(lpOptions, lightpush.WithPubSubTopic(pubsubTopic))
}

hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, lpOptions...)
hash, err := wakuState.node.Lightpush().Publish(ctx, msg, lpOptions...)
return hexutil.Encode(hash), err
}

Expand Down
7 changes: 1 addition & 6 deletions library/mobile/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,9 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa
return contentTopic.String()
}

// PubsubTopic creates a pubsub topic string according to RFC 23
func PubsubTopic(name string, encoding string) string {
return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String()
}

// DefaultPubsubTopic returns the default pubsub topic used in waku2: /waku/2/default-waku/proto
func DefaultPubsubTopic() string {
return protocol.DefaultPubsubTopic().String()
return library.DefaultPubsubTopic()
}

// Peers retrieves the list of peers known by the waku node
Expand Down
7 changes: 1 addition & 6 deletions library/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,9 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa
return contentTopic.String()
}

// PubsubTopic creates a pubsub topic string according to RFC 23
func PubsubTopic(name string, encoding string) string {
return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String()
}

// DefaultPubsubTopic returns the default pubsub topic used in waku2: /waku/2/default-waku/proto
func DefaultPubsubTopic() string {
return protocol.DefaultPubsubTopic().String()
return protocol.DefaultPubsubTopic{}.String()
}

type subscriptionMsg struct {
Expand Down
4 changes: 2 additions & 2 deletions library/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func RelayEnoughPeers(topic string) (bool, error) {
return false, errWakuNodeNotReady
}

topicToCheck := protocol.DefaultPubsubTopic().String()
topicToCheck := protocol.DefaultPubsubTopic{}.String()
if topic != "" {
topicToCheck = topic
}
Expand All @@ -39,7 +39,7 @@ func relayPublish(msg *pb.WakuMessage, pubsubTopic string, ms int) (string, erro
ctx = context.Background()
}

hash, err := wakuState.node.Relay().PublishToTopic(ctx, msg, pubsubTopic)
hash, err := wakuState.node.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic))
return hexutil.Encode(hash), err
}

Expand Down
2 changes: 1 addition & 1 deletion tests/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) erro
Timestamp: timestamp,
}

_, err = wakuNode.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic)
_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic())
return err
}
Loading

0 comments on commit f25031f

Please sign in to comment.