Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Roomserver producers package (#2546)
Browse files Browse the repository at this point in the history
* Give the roomserver a producers package

* Change init point

* Populate ACLs API

* Fix build issues

* `RoomEventProducer` naming
  • Loading branch information
neilalexander authored Jul 1, 2022
1 parent 89cd0e8 commit b50a24c
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 103 deletions.
3 changes: 1 addition & 2 deletions roomserver/internal/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,10 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
return err
}

err = api.SendEvents(ctx, r.RSAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
if err != nil {
return err
}

}
}

Expand Down
46 changes: 26 additions & 20 deletions roomserver/internal/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/internal/perform"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
Expand Down Expand Up @@ -49,17 +51,21 @@ type RoomserverInternalAPI struct {
JetStream nats.JetStreamContext
Durable string
InputRoomEventTopic string // JetStream topic for new input room events
OutputRoomEventTopic string // JetStream topic for new output room events
OutputProducer *producers.RoomEventProducer
PerspectiveServerNames []gomatrixserverlib.ServerName
}

func NewRoomserverAPI(
processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
consumer nats.JetStreamContext, nc *nats.Conn,
inputRoomEventTopic, outputRoomEventTopic string,
js nats.JetStreamContext, nc *nats.Conn, inputRoomEventTopic string,
caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
) *RoomserverInternalAPI {
serverACLs := acls.NewServerACLs(roomserverDB)
producer := &producers.RoomEventProducer{
Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent)),
JetStream: js,
ACLs: serverACLs,
}
a := &RoomserverInternalAPI{
ProcessContext: processCtx,
DB: roomserverDB,
Expand All @@ -68,8 +74,8 @@ func NewRoomserverAPI(
ServerName: cfg.Matrix.ServerName,
PerspectiveServerNames: perspectiveServerNames,
InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic,
JetStream: consumer,
OutputProducer: producer,
JetStream: js,
NATSClient: nc,
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
ServerACLs: serverACLs,
Expand All @@ -92,19 +98,19 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
r.KeyRing = keyRing

r.Inputer = &input.Inputer{
Cfg: r.Cfg,
ProcessContext: r.ProcessContext,
DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic,
OutputRoomEventTopic: r.OutputRoomEventTopic,
JetStream: r.JetStream,
NATSClient: r.NATSClient,
Durable: nats.Durable(r.Durable),
ServerName: r.Cfg.Matrix.ServerName,
FSAPI: fsAPI,
KeyRing: keyRing,
ACLs: r.ServerACLs,
Queryer: r.Queryer,
Cfg: r.Cfg,
ProcessContext: r.ProcessContext,
DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic,
OutputProducer: r.OutputProducer,
JetStream: r.JetStream,
NATSClient: r.NATSClient,
Durable: nats.Durable(r.Durable),
ServerName: r.Cfg.Matrix.ServerName,
FSAPI: fsAPI,
KeyRing: keyRing,
ACLs: r.ServerACLs,
Queryer: r.Queryer,
}
r.Inviter = &perform.Inviter{
DB: r.DB,
Expand Down Expand Up @@ -199,7 +205,7 @@ func (r *RoomserverInternalAPI) PerformInvite(
if len(outputEvents) == 0 {
return nil
}
return r.WriteOutputEvents(req.Event.RoomID(), outputEvents)
return r.OutputProducer.ProduceRoomEvents(req.Event.RoomID(), outputEvents)
}

func (r *RoomserverInternalAPI) PerformLeave(
Expand All @@ -215,7 +221,7 @@ func (r *RoomserverInternalAPI) PerformLeave(
if len(outputEvents) == 0 {
return nil
}
return r.WriteOutputEvents(req.RoomID, outputEvents)
return r.OutputProducer.ProduceRoomEvents(req.RoomID, outputEvents)
}

func (r *RoomserverInternalAPI) PerformForget(
Expand Down
87 changes: 14 additions & 73 deletions roomserver/internal/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
Expand All @@ -37,16 +38,8 @@ import (
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)

var keyContentFields = map[string]string{
"m.room.join_rules": "join_rule",
"m.room.history_visibility": "history_visibility",
"m.room.member": "membership",
}

// Inputer is responsible for consuming from the roomserver input
// streams and processing the events. All input events are queued
// into a single NATS stream and the order is preserved strictly.
Expand Down Expand Up @@ -75,19 +68,19 @@ var keyContentFields = map[string]string{
// up, so they will do nothing until a new event comes in for B
// or C.
type Inputer struct {
Cfg *config.RoomServer
ProcessContext *process.ProcessContext
DB storage.Database
NATSClient *nats.Conn
JetStream nats.JetStreamContext
Durable nats.SubOpt
ServerName gomatrixserverlib.ServerName
FSAPI fedapi.RoomserverFederationAPI
KeyRing gomatrixserverlib.JSONVerifier
ACLs *acls.ServerACLs
InputRoomEventTopic string
OutputRoomEventTopic string
workers sync.Map // room ID -> *worker
Cfg *config.RoomServer
ProcessContext *process.ProcessContext
DB storage.Database
NATSClient *nats.Conn
JetStream nats.JetStreamContext
Durable nats.SubOpt
ServerName gomatrixserverlib.ServerName
FSAPI fedapi.RoomserverFederationAPI
KeyRing gomatrixserverlib.JSONVerifier
ACLs *acls.ServerACLs
InputRoomEventTopic string
OutputProducer *producers.RoomEventProducer
workers sync.Map // room ID -> *worker

Queryer *query.Queryer
}
Expand Down Expand Up @@ -370,58 +363,6 @@ func (r *Inputer) InputRoomEvents(
}
}

// WriteOutputEvents implements OutputRoomEventWriter
func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
var err error
for _, update := range updates {
msg := &nats.Msg{
Subject: r.OutputRoomEventTopic,
Header: nats.Header{},
}
msg.Header.Set(jetstream.RoomID, roomID)
msg.Data, err = json.Marshal(update)
if err != nil {
return err
}
logger := log.WithFields(log.Fields{
"room_id": roomID,
"type": update.Type,
})
if update.NewRoomEvent != nil {
eventType := update.NewRoomEvent.Event.Type()
logger = logger.WithFields(log.Fields{
"event_type": eventType,
"event_id": update.NewRoomEvent.Event.EventID(),
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
"send_as_server": update.NewRoomEvent.SendAsServer,
"sender": update.NewRoomEvent.Event.Sender(),
})
if update.NewRoomEvent.Event.StateKey() != nil {
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
}
contentKey := keyContentFields[eventType]
if contentKey != "" {
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
if value.Exists() {
logger = logger.WithField("content_value", value.String())
}
}

if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
ev := update.NewRoomEvent.Event.Unwrap()
defer r.ACLs.OnServerACLUpdate(ev)
}
}
logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic)
if _, err := r.JetStream.PublishMsg(msg); err != nil {
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err)
return err
}
}
return nil
}

var roomserverInputBackpressure = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dendrite",
Expand Down
4 changes: 2 additions & 2 deletions roomserver/internal/input/input_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (r *Inputer) processRoomEvent(
return fmt.Errorf("r.updateLatestEvents: %w", err)
}
case api.KindOld:
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
{
Type: api.OutputTypeOldRoomEvent,
OldRoomEvent: &api.OutputOldRoomEvent{
Expand All @@ -400,7 +400,7 @@ func (r *Inputer) processRoomEvent(
// so notify downstream components to redact this event - they should have it if they've
// been tracking our output log.
if redactedEventID != "" {
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
{
Type: api.OutputTypeRedactedEvent,
RedactedEvent: &api.OutputRedactedEvent{
Expand Down
2 changes: 1 addition & 1 deletion roomserver/internal/input/input_latest_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now.
if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil {
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion roomserver/internal/perform/perform_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (r *Admin) PerformAdminEvacuateUser(
if len(outputEvents) == 0 {
continue
}
if err := r.Inputer.WriteOutputEvents(roomID, outputEvents); err != nil {
if err := r.Inputer.OutputProducer.ProduceRoomEvents(roomID, outputEvents); err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err),
Expand Down
2 changes: 1 addition & 1 deletion roomserver/internal/perform/perform_inbound_peek.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *InboundPeeker) PerformInboundPeek(
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
}

err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{
err = r.Inputer.OutputProducer.ProduceRoomEvents(request.RoomID, []api.OutputEvent{
{
Type: api.OutputTypeNewInboundPeek,
NewInboundPeek: &api.OutputNewInboundPeek{
Expand Down
2 changes: 1 addition & 1 deletion roomserver/internal/perform/perform_peek.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (r *Peeker) performPeekRoomByID(

// TODO: handle federated peeks

err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{
err = r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{
{
Type: api.OutputTypeNewPeek,
NewPeek: &api.OutputNewPeek{
Expand Down
2 changes: 1 addition & 1 deletion roomserver/internal/perform/perform_unpeek.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (r *Unpeeker) performUnpeekRoomByID(

// TODO: handle federated peeks

err = r.Inputer.WriteOutputEvents(req.RoomID, []api.OutputEvent{
err = r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{
{
Type: api.OutputTypeRetirePeek,
RetirePeek: &api.OutputRetirePeek{
Expand Down
89 changes: 89 additions & 0 deletions roomserver/producers/roomevent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package producers

import (
"encoding/json"

"github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)

var keyContentFields = map[string]string{
"m.room.join_rules": "join_rule",
"m.room.history_visibility": "history_visibility",
"m.room.member": "membership",
}

type RoomEventProducer struct {
Topic string
ACLs *acls.ServerACLs
JetStream nats.JetStreamContext
}

func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error {
var err error
for _, update := range updates {
msg := &nats.Msg{
Subject: r.Topic,
Header: nats.Header{},
}
msg.Header.Set(jetstream.RoomID, roomID)
msg.Data, err = json.Marshal(update)
if err != nil {
return err
}
logger := log.WithFields(log.Fields{
"room_id": roomID,
"type": update.Type,
})
if update.NewRoomEvent != nil {
eventType := update.NewRoomEvent.Event.Type()
logger = logger.WithFields(log.Fields{
"event_type": eventType,
"event_id": update.NewRoomEvent.Event.EventID(),
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
"send_as_server": update.NewRoomEvent.SendAsServer,
"sender": update.NewRoomEvent.Event.Sender(),
})
if update.NewRoomEvent.Event.StateKey() != nil {
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
}
contentKey := keyContentFields[eventType]
if contentKey != "" {
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
if value.Exists() {
logger = logger.WithField("content_value", value.String())
}
}

if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
ev := update.NewRoomEvent.Event.Unwrap()
defer r.ACLs.OnServerACLUpdate(ev)
}
}
logger.Tracef("Producing to topic '%s'", r.Topic)
if _, err := r.JetStream.PublishMsg(msg); err != nil {
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.Topic, err)
return err
}
}
return nil
}
1 change: 0 additions & 1 deletion roomserver/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func NewInternalAPI(
return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js, nc,
cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames,
)
}

0 comments on commit b50a24c

Please sign in to comment.