Skip to content

Commit

Permalink
feat: append RLN proofs when posting messages in REST/RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Sep 7, 2023
1 parent 76b0071 commit 6f58393
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 13 deletions.
35 changes: 26 additions & 9 deletions cmd/waku/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rest
import (
"context"
"encoding/json"
"errors"
"net/http"
"strings"
"sync"
Expand All @@ -15,9 +16,10 @@ import (
"go.uber.org/zap"
)

const ROUTE_RELAY_SUBSCRIPTIONSV1 = "/relay/v1/subscriptions"
const ROUTE_RELAY_MESSAGESV1 = "/relay/v1/messages/{topic}"
const routeRelayV1Subscriptions = "/relay/v1/subscriptions"
const routeRelayV1Messages = "/relay/v1/messages/{topic}"

// RelayService represents the REST service for WakuRelay
type RelayService struct {
node *node.WakuNode
cancel context.CancelFunc
Expand All @@ -31,6 +33,7 @@ type RelayService struct {
runner *runnerService
}

// NewRelayService returns an instance of RelayService
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService {
s := &RelayService{
node: node,
Expand All @@ -41,10 +44,10 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za

s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)

m.Post(ROUTE_RELAY_SUBSCRIPTIONSV1, s.postV1Subscriptions)
m.Delete(ROUTE_RELAY_SUBSCRIPTIONSV1, s.deleteV1Subscriptions)
m.Get(ROUTE_RELAY_MESSAGESV1, s.getV1Messages)
m.Post(ROUTE_RELAY_MESSAGESV1, s.postV1Message)
m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions)
m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions)
m.Get(routeRelayV1Messages, s.getV1Messages)
m.Post(routeRelayV1Messages, s.postV1Message)

return s
}
Expand All @@ -65,6 +68,7 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}

// Start starts the RelayService
func (r *RelayService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
Expand All @@ -80,6 +84,7 @@ func (r *RelayService) Start(ctx context.Context) {
r.runner.Start(ctx)
}

// Stop stops the RelayService
func (r *RelayService) Stop() {
if r.cancel == nil {
return
Expand Down Expand Up @@ -187,11 +192,23 @@ func (d *RelayService) postV1Message(w http.ResponseWriter, r *http.Request) {

var err error
if topic == "" {
_, err = d.node.Relay().Publish(r.Context(), message)
} else {
_, err = d.node.Relay().PublishToTopic(r.Context(), message, strings.Replace(topic, "\n", "", -1))
topic = relay.DefaultWakuTopic
}

if !d.node.Relay().IsSubscribed(topic) {
writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil)
return
}

if node.SupportsRLN {
err = d.node.RLNRelay().AppendRLNProof(message, d.node.Timesource().Now())
if err != nil {
writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil)
return
}
}

_, err = d.node.Relay().PublishToTopic(r.Context(), message, strings.Replace(topic, "\n", "", -1))
if err != nil {
d.log.Error("publishing message", zap.Error(err))
}
Expand Down
34 changes: 30 additions & 4 deletions cmd/waku/rpc/relay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rpc

import (
"errors"
"fmt"
"net/http"
"sync"
Expand All @@ -12,6 +13,7 @@ import (
"go.uber.org/zap"
)

// RelayService represents the JSON RPC service for WakuRelay
type RelayService struct {
node *node.WakuNode

Expand All @@ -24,19 +26,23 @@ type RelayService struct {
runner *runnerService
}

// RelayMessageArgs represents the requests used for posting messages
type RelayMessageArgs struct {
Topic string `json:"topic,omitempty"`
Message *RPCWakuMessage `json:"message,omitempty"`
}

// TopicArgs represents the lists of topics to use when subscribing / unsubscribing
type TopicsArgs struct {
Topics []string `json:"topics,omitempty"`
}

// TopicArgs represents a request that contains a single topic
type TopicArgs struct {
Topic string `json:"topic,omitempty"`
}

// NewRelayService returns an instance of RelayService
func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *RelayService {
s := &RelayService{
node: node,
Expand Down Expand Up @@ -66,6 +72,7 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}

// Start starts the RelayService
func (r *RelayService) Start() {
r.messagesMutex.Lock()
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
Expand All @@ -78,18 +85,34 @@ func (r *RelayService) Start() {
r.runner.Start()
}

// Stop stops the RelayService
func (r *RelayService) Stop() {
r.runner.Stop()
}

// PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
var err error

if args.Topic == "" {
_, err = r.node.Relay().Publish(req.Context(), args.Message.toProto())
} else {
_, err = r.node.Relay().PublishToTopic(req.Context(), args.Message.toProto(), args.Topic)
topic := relay.DefaultWakuTopic
if args.Topic != "" {
topic = args.Topic
}

if !r.node.Relay().IsSubscribed(topic) {
return errors.New("not subscribed to pubsubTopic")
}

msg := args.Message.toProto()

if node.SupportsRLN {
err = r.node.RLNRelay().AppendRLNProof(msg, r.node.Timesource().Now())
if err != nil {
return err
}
}

_, err = r.node.Relay().PublishToTopic(req.Context(), args.Message.toProto(), topic)
if err != nil {
r.log.Error("publishing message", zap.Error(err))
return err
Expand All @@ -99,6 +122,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
return nil
}

// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()
for _, topic := range args.Topics {
Expand Down Expand Up @@ -129,6 +153,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
return nil
}

// DeleteV1Subscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_subscription method
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()
for _, topic := range args.Topics {
Expand All @@ -145,6 +170,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs,
return nil
}

// GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method
func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/node/wakunode2_no_rln.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package node

import "context"

const SupportsRLN = false

// RLNRelay is used to access any operation related to Waku RLN protocol
func (w *WakuNode) RLNRelay() RLNRelay {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/node/wakunode2_rln.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
r "github.com/waku-org/go-zerokit-rln/rln"
)

const SupportsRLN = true

// RLNRelay is used to access any operation related to Waku RLN protocol
func (w *WakuNode) RLNRelay() RLNRelay {
return w.rlnRelay
Expand Down

0 comments on commit 6f58393

Please sign in to comment.