Skip to content

Commit

Permalink
Decouple MLS messages from messagev1 (#333)
Browse files Browse the repository at this point in the history
* Separate MLS messages and implement service methods

* fix: group id and installation id are bytes

* fix: idempotent send group/welcome messages via uniquness in db

* fix: hex decode group id from mls validation service

* fix: s/Cursor/IdCursor

* fix: pass message data only in send group message request

* refactor: add mls {Group,Welcome}MessageInput types for send requests

* refactor: s/installation_id/installation_key in mls/api

* fix: clean up mls query page size logic

* feat: implement mls subscribe group/welcome messages

* Hex encode group ID

* fix: remove duplicate import

* fix: return grpc invalidargument on invalid group id

---------

Co-authored-by: Nicholas Molnar <[email protected]>
  • Loading branch information
2 people authored and Steven Normore committed Jan 23, 2024
1 parent 9e8ddc9 commit 77370ed
Show file tree
Hide file tree
Showing 28 changed files with 1,683 additions and 411 deletions.
2 changes: 2 additions & 0 deletions dev/generate
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
set -e

go generate ./...

mockgen -package api github.com/xmtp/proto/v3/go/mls/api/v1 MlsApi_SubscribeGroupMessagesServer,MlsApi_SubscribeWelcomeMessagesServer > pkg/mls/api/v1/mock.gen.go
4 changes: 4 additions & 0 deletions dev/migrate-mls
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -e

dev/run --create-mls-migration "$@"
2 changes: 2 additions & 0 deletions dev/run
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
set -e

MESSAGE_DB_DSN="postgres://postgres:xmtp@localhost:15432/postgres?sslmode=disable"
MLS_DB_DSN="postgres://postgres:xmtp@localhost:15432/postgres?sslmode=disable"
AUTHZ_DB_DSN="postgres://postgres:xmtp@localhost:15432/postgres?sslmode=disable"
NODE_KEY="8a30dcb604b0b53627a5adc054dbf434b446628d4bd1eccc681d223f0550ce67"

Expand All @@ -13,6 +14,7 @@ go run cmd/xmtpd/main.go \
--store.db-connection-string "${MESSAGE_DB_DSN}" \
--store.reader-db-connection-string "${MESSAGE_DB_DSN}" \
--store.metrics-period 5s \
--mls-store.db-connection-string "${MESSAGE_DB_DSN}" \
--authz-db-connection-string "${AUTHZ_DB_DSN}" \
--go-profiling \
"$@"
2 changes: 1 addition & 1 deletion dev/up
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if ! which golangci-lint &>/dev/null; then brew install golangci-lint; fi
if ! which shellcheck &>/dev/null; then brew install shellcheck; fi
if ! which protoc &>/dev/null; then brew install protobuf; fi
if ! which protoc-gen-go &>/dev/null; then go install google.golang.org/protobuf/cmd/protoc-gen-go@latest; fi
if ! which mockgen &>/dev/null; then go install github.com/golang/mock/mockgen@latest; fi
if ! which mockgen &>/dev/null; then go install go.uber.org/mock/mockgen@latest; fi
if ! which protolint &>/dev/null; then go install github.com/yoheimuta/protolint/cmd/protolint@latest; fi

dev/generate
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.20

require (
github.com/ethereum/go-ethereum v1.10.26
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
Expand All @@ -30,8 +29,9 @@ require (
github.com/uptrace/bun/driver/pgdriver v1.1.16
github.com/waku-org/go-waku v0.8.0
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3
github.com/xmtp/proto/v3 v3.36.3-0.20240111132545-4e1d2b1b2399
github.com/xmtp/proto/v3 v3.37.1-0.20240112125235-f02fe8d0f1a0
github.com/yoheimuta/protolint v0.39.0
go.uber.org/mock v0.4.0
go.uber.org/zap v1.24.0
golang.org/x/sync v0.3.0
google.golang.org/grpc v1.53.0
Expand Down Expand Up @@ -75,6 +75,7 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1146,8 +1146,8 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3 h1:wzUffJGCTBGXIDyNU+1UBu1fn2Nzo+OQzM1pLrheh58=
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3/go.mod h1:bJREWk+NDnZYjgLQdAi8SUWuq/5pkMme4GqiffEhUF4=
github.com/xmtp/proto/v3 v3.36.3-0.20240111132545-4e1d2b1b2399 h1:i5qynxHZRn7mIXQPt8M7c6ac0NBb+MEn2g2qKzvRTyM=
github.com/xmtp/proto/v3 v3.36.3-0.20240111132545-4e1d2b1b2399/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xmtp/proto/v3 v3.37.1-0.20240112125235-f02fe8d0f1a0 h1:eGNiXDTiXcXTf5ne4HACbqbHaQrVlRz2hwcn05E7v8U=
github.com/xmtp/proto/v3 v3.37.1-0.20240112125235-f02fe8d0f1a0/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/yoheimuta/go-protoparser/v4 v4.6.0 h1:uvz1e9/5Ihsm4Ku8AJeDImTpirKmIxubZdSn0QJNdnw=
github.com/yoheimuta/go-protoparser/v4 v4.6.0/go.mod h1:AHNNnSWnb0UoL4QgHPiOAg2BniQceFscPI5X/BZNHl8=
Expand Down Expand Up @@ -1182,6 +1182,8 @@ go.uber.org/fx v1.20.0 h1:ZMC/pnRvhsthOZh9MZjMq5U8Or3mA9zBSPaLnzs3ihQ=
go.uber.org/fx v1.20.0/go.mod h1:qCUj0btiR3/JnanEr1TYEePfSw6o/4qYJscgvzQ5Ub0=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down
18 changes: 1 addition & 17 deletions pkg/api/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func Test_AuthnNoToken(t *testing.T) {
})
}

func Test_AuthnNoTokenNonMLS(t *testing.T) {
func Test_AuthnNoTokenNonV0(t *testing.T) {
ctx := context.Background()
testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, server *Server) {
_, err := client.Publish(ctx, &messageV1.PublishRequest{
Expand All @@ -36,22 +36,6 @@ func Test_AuthnNoTokenNonMLS(t *testing.T) {
})
}

func Test_AuthnNoTokenMLS(t *testing.T) {
ctx := context.Background()
testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, server *Server) {
_, err := client.Publish(ctx, &messageV1.PublishRequest{
Envelopes: []*messageV1.Envelope{
{
ContentTopic: "/xmtp/mls/1/m-0x1234/proto",
TimestampNs: 0,
Message: []byte{},
},
},
})
require.NoError(t, err)
})
}

func Test_AuthnNoTokenMixedV0MLS(t *testing.T) {
ctx := context.Background()
testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, server *Server) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ type AuthnOptions struct {
Authenticated requests will be permitted according to the rules of the request type,
(i.e. you can't publish into other wallets' contact and private topics).
*/
Enable bool `long:"enable" description:"require client authentication via wallet tokens"`
EnableMLS bool `long:"enable-mls" description:"require client authentication for MLS"`
Enable bool `long:"enable" description:"require client authentication via wallet tokens"`
/*
Ratelimits enables request rate limiting.
Expand Down
18 changes: 2 additions & 16 deletions pkg/api/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,9 @@ func (wa *WalletAuthorizer) Stream() grpc.StreamServerInterceptor {
}
}

func (wa *WalletAuthorizer) isProtocolMLS(request *messagev1.PublishRequest) bool {
envelopes := request.Envelopes
if len(envelopes) == 0 {
return false
}
// If any of the envelopes are not for a v3 topic, then we treat the request as non-v3
for _, envelope := range envelopes {
if !strings.HasPrefix(envelope.ContentTopic, "/xmtp/mls/") {
return false
}
}
return true
}

func (wa *WalletAuthorizer) requiresAuthorization(req interface{}) bool {
publishRequest, isPublish := req.(*messagev1.PublishRequest)
return isPublish && (!wa.isProtocolMLS(publishRequest) || wa.AuthnConfig.EnableMLS)
_, isPublish := req.(*messagev1.PublishRequest)
return isPublish
}

func (wa *WalletAuthorizer) getWallet(ctx context.Context) (types.WalletAddr, error) {
Expand Down
67 changes: 24 additions & 43 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ import (
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
wakunode "github.com/waku-org/go-waku/waku/v2/node"
wakupb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
wakurelay "github.com/waku-org/go-waku/waku/v2/protocol/relay"
proto "github.com/xmtp/proto/v3/go/message_api/v1"
apicontext "github.com/xmtp/xmtp-node-go/pkg/api/message/v1/context"
"github.com/xmtp/xmtp-node-go/pkg/logging"
"github.com/xmtp/xmtp-node-go/pkg/metrics"
"github.com/xmtp/xmtp-node-go/pkg/store"
"github.com/xmtp/xmtp-node-go/pkg/topic"
"github.com/xmtp/xmtp-node-go/pkg/tracing"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand All @@ -45,24 +42,24 @@ type Service struct {

// Configured as constructor options.
log *zap.Logger
waku *wakunode.WakuNode
store *store.Store

publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error

// Configured internally.
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
relaySub *wakurelay.Subscription

ns *server.Server
nc *nats.Conn
}

func NewService(node *wakunode.WakuNode, logger *zap.Logger, store *store.Store) (s *Service, err error) {
func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error) (s *Service, err error) {
s = &Service{
waku: node,
log: logger.Named("message/v1"),
store: store,
log: log.Named("message/v1"),
store: store,
publishToWakuRelay: publishToWakuRelay,
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

Expand All @@ -82,44 +79,11 @@ func NewService(node *wakunode.WakuNode, logger *zap.Logger, store *store.Store)
return nil, err
}

// Initialize waku relay subscription.
s.relaySub, err = s.waku.Relay().Subscribe(s.ctx)
if err != nil {
return nil, errors.Wrap(err, "subscribing to relay")
}
tracing.GoPanicWrap(s.ctx, &s.wg, "broadcast", func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case wakuEnv := <-s.relaySub.Ch:
if wakuEnv == nil {
continue
}
env := buildEnvelope(wakuEnv.Message())

envB, err := pb.Marshal(env)
if err != nil {
s.log.Error("marshalling envelope", zap.Error(err))
continue
}
err = s.nc.Publish(buildNatsSubject(env.ContentTopic), envB)
if err != nil {
s.log.Error("publishing envelope to local nats", zap.Error(err))
continue
}
}
}
})

return s, nil
}

func (s *Service) Close() {
s.log.Info("closing")
if s.relaySub != nil {
s.relaySub.Unsubscribe()
}

if s.ctxCancel != nil {
s.ctxCancel()
Expand All @@ -136,6 +100,22 @@ func (s *Service) Close() {
s.log.Info("closed")
}

func (s *Service) HandleIncomingWakuRelayMessage(msg *wakupb.WakuMessage) error {
env := buildEnvelope(msg)

envB, err := pb.Marshal(env)
if err != nil {
return err
}

err = s.nc.Publish(buildNatsSubject(env.ContentTopic), envB)
if err != nil {
return err
}

return nil
}

func (s *Service) Publish(ctx context.Context, req *proto.PublishRequest) (*proto.PublishResponse, error) {
for _, env := range req.Envelopes {
log := s.log.Named("publish").With(zap.String("content_topic", env.ContentTopic))
Expand All @@ -156,14 +136,15 @@ func (s *Service) Publish(ctx context.Context, req *proto.PublishRequest) (*prot
}
}

_, err := s.waku.Relay().Publish(ctx, &wakupb.WakuMessage{
err := s.publishToWakuRelay(ctx, &wakupb.WakuMessage{
ContentTopic: env.ContentTopic,
Timestamp: int64(env.TimestampNs),
Payload: env.Message,
})
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}

metrics.EmitPublishedEnvelope(ctx, log, env)
}
return &proto.PublishResponse{}, nil
Expand Down
Loading

0 comments on commit 77370ed

Please sign in to comment.