Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup invites in the poller, not the API #333

Merged
merged 6 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Accumulator struct {
eventsTable *EventTable
snapshotTable *SnapshotTable
spacesTable *SpacesTable
invitesTable *InvitesTable
entityName string
}

Expand All @@ -38,6 +39,7 @@ func NewAccumulator(db *sqlx.DB) *Accumulator {
eventsTable: NewEventTable(db),
snapshotTable: NewSnapshotsTable(db),
spacesTable: NewSpacesTable(db),
invitesTable: NewInvitesTable(db),
entityName: "server",
}
}
Expand Down Expand Up @@ -295,6 +297,10 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
}
}

if err = a.invitesTable.RemoveSupersededInvites(txn, roomID, events); err != nil {
return fmt.Errorf("RemoveSupersededInvites: %w", err)
}

if err = a.spacesTable.HandleSpaceUpdates(txn, events); err != nil {
return fmt.Errorf("HandleSpaceUpdates: %s", err)
}
Expand Down Expand Up @@ -431,7 +437,9 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
}

var latestNID int64
newEventsByID := make([]Event, 0, len(eventIDToNID))
// postInsertEvents matches newEvents, but a) it has NIDs, and b) state events have
// the IsState field hacked to true so we don't insert them into the timeline.
postInsertEvents := make([]Event, 0, len(eventIDToNID))
kegsay marked this conversation as resolved.
Show resolved Hide resolved
redactTheseEventIDs := make(map[string]*Event)
for i, ev := range newEvents {
nid, ok := eventIDToNID[ev.ID]
Expand Down Expand Up @@ -459,7 +467,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
redactTheseEventIDs[redactsEventID] = &newEvents[i]
}
}
newEventsByID = append(newEventsByID, ev)
postInsertEvents = append(postInsertEvents, ev)
result.TimelineNIDs = append(result.TimelineNIDs, ev.NID)
}
}
Expand All @@ -485,7 +493,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
}
}

for _, ev := range newEventsByID {
for _, ev := range postInsertEvents {
var replacesNID int64
// the snapshot ID we assign to this event is unaffected by whether /this/ event is state or not,
// as this is the before snapshot ID.
Expand Down Expand Up @@ -541,12 +549,16 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
result.RequiresReload = currentStateRedactions > 0
}

if err = a.spacesTable.HandleSpaceUpdates(txn, newEventsByID); err != nil {
if err = a.invitesTable.RemoveSupersededInvites(txn, roomID, postInsertEvents); err != nil {
return AccumulateResult{}, fmt.Errorf("RemoveSupersededInvites: %w", err)
}

if err = a.spacesTable.HandleSpaceUpdates(txn, postInsertEvents); err != nil {
return AccumulateResult{}, fmt.Errorf("HandleSpaceUpdates: %s", err)
}

// the last fetched snapshot ID is the current one
info := a.roomInfoDelta(roomID, newEventsByID)
info := a.roomInfoDelta(roomID, postInsertEvents)
if err = a.roomsTable.Upsert(txn, info, snapID, latestNID); err != nil {
return AccumulateResult{}, fmt.Errorf("failed to UpdateCurrentSnapshotID to %d: %w", snapID, err)
}
Expand Down
42 changes: 42 additions & 0 deletions state/invites_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package state
import (
"database/sql"
"encoding/json"
"github.com/lib/pq"

"github.com/jmoiron/sqlx"
)
Expand All @@ -14,14 +15,17 @@ import (
// correctly when the user joined the room.
// - The user could read room data in the room without being joined to the room e.g could pull
// `required_state` and `timeline` as they would be authorised by the invite to see this data.
//
// Instead, we now completely split out invites from the normal event flow. This fixes the issues
// outlined above but introduce more problems:
// - How do you sort the invite with rooms?
// - How do you calculate the room name when you lack heroes?
//
// For now, we say that invites:
// - are treated as a highlightable event for the purposes of sorting by highlight count.
// - are given the timestamp of when the invite arrived.
// - calculate the room name on a best-effort basis given the lack of heroes (same as element-web).
//
// When an invite is rejected, it appears in the `leave` section which then causes the invite to be
// removed from this table.
type InvitesTable struct {
Expand All @@ -47,6 +51,44 @@ func (t *InvitesTable) RemoveInvite(userID, roomID string) error {
return err
}

// RemoveSupersededInvites accepts a list of events in the given room. The events should
// either
// - contain at most one membership event per user, or else
// - be in timeline order (most recent last)
//
// (corresponding to an Accumulate and an Initialise call, respectively).
//
// The events are scanned in order for membership changes, to determine the "final"
// memberships. Users who final membership is not "invite" have their outstanding
// invites to this room deleted.
func (t *InvitesTable) RemoveSupersededInvites(txn *sqlx.Tx, roomID string, newEvents []Event) error {
memberships := map[string]string{} // user ID -> memberships
for _, ev := range newEvents {
if ev.Type != "m.room.member" {
continue
}
memberships[ev.StateKey] = ev.Membership
}

var usersToRemove []string
for userID, membership := range memberships {
if membership != "invite" && membership != "_invite" {
usersToRemove = append(usersToRemove, userID)
}
}

if len(usersToRemove) == 0 {
return nil
}

_, err := txn.Exec(`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if len(usersToRemove) == 0 { return nil }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh yes, thanks.

DELETE FROM syncv3_invites
WHERE user_id = ANY($1) AND room_id = $2
`, pq.StringArray(usersToRemove), roomID)

return err
}

func (t *InvitesTable) InsertInvite(userID, roomID string, inviteRoomState []json.RawMessage) error {
blob, err := json.Marshal(inviteRoomState)
if err != nil {
Expand Down
151 changes: 151 additions & 0 deletions state/invites_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package state

import (
"encoding/json"
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/sqlutil"
"reflect"
"testing"
)
Expand Down Expand Up @@ -128,6 +130,155 @@ func TestInviteTable(t *testing.T) {
}
}

func TestInviteTable_RemoveSupersededInvites(t *testing.T) {
db, close := connectToDB(t)
defer close()

alice := "@alice:localhost"
bob := "@bob:localhost"
roomA := "!a:localhost"
roomB := "!b:localhost"
inviteState := []json.RawMessage{[]byte(`{"foo":"bar"}`)}

table := NewInvitesTable(db)
t.Log("Invite Alice and Bob to both rooms.")

// Add some invites
if err := table.InsertInvite(alice, roomA, inviteState); err != nil {
t.Fatalf("failed to InsertInvite: %s", err)
}
if err := table.InsertInvite(bob, roomA, inviteState); err != nil {
t.Fatalf("failed to InsertInvite: %s", err)
}
if err := table.InsertInvite(alice, roomB, inviteState); err != nil {
t.Fatalf("failed to InsertInvite: %s", err)
}
if err := table.InsertInvite(bob, roomB, inviteState); err != nil {
t.Fatalf("failed to InsertInvite: %s", err)
}

t.Log("Alice joins room A. Remove her superseded invite.")
newEvents := []Event{
{
Type: "m.room.member",
StateKey: alice,
Membership: "join",
RoomID: roomA,
},
}
err := sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
return table.RemoveSupersededInvites(txn, roomA, newEvents)
})
if err != nil {
t.Fatalf("failed to RemoveSupersededInvites: %s", err)
}

t.Log("Alice should still be invited to room B.")
assertInvites(t, table, alice, map[string][]json.RawMessage{roomB: inviteState})
t.Log("Bob should still be invited to rooms A and B.")
assertInvites(t, table, bob, map[string][]json.RawMessage{roomA: inviteState, roomB: inviteState})

t.Log("Bob declines his invitation to room B.")
newEvents = []Event{
{
Type: "m.room.member",
StateKey: bob,
Membership: "leave",
RoomID: roomB,
},
}
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
return table.RemoveSupersededInvites(txn, roomB, newEvents)
})
if err != nil {
t.Fatalf("failed to RemoveSupersededInvites: %s", err)
}

t.Log("Alice should still be invited to room B.")
assertInvites(t, table, alice, map[string][]json.RawMessage{roomB: inviteState})
t.Log("Bob should still be invited to room A.")
assertInvites(t, table, bob, map[string][]json.RawMessage{roomA: inviteState})

// Now try multiple membership changes in one call.
t.Log("Alice joins, changes profile, leaves and is re-invited to room B.")
newEvents = []Event{
{
Type: "m.room.member",
StateKey: alice,
Membership: "join",
RoomID: roomB,
},
{
Type: "m.room.member",
StateKey: alice,
Membership: "_join",
RoomID: roomB,
},
{
Type: "m.room.member",
StateKey: alice,
Membership: "leave",
RoomID: roomB,
},
{
Type: "m.room.member",
StateKey: alice,
Membership: "invite",
RoomID: roomB,
},
}

err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
return table.RemoveSupersededInvites(txn, roomB, newEvents)
})
if err != nil {
t.Fatalf("failed to RemoveSupersededInvites: %s", err)
}

t.Log("Alice should still be invited to room B.")
assertInvites(t, table, alice, map[string][]json.RawMessage{roomB: inviteState})

t.Log("Bob declines, is reinvited to and joins room A.")
newEvents = []Event{
{
Type: "m.room.member",
StateKey: bob,
Membership: "leave",
RoomID: roomA,
},
{
Type: "m.room.member",
StateKey: bob,
Membership: "invite",
RoomID: roomA,
},
{
Type: "m.room.member",
StateKey: bob,
Membership: "join",
RoomID: roomA,
},
}

err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
return table.RemoveSupersededInvites(txn, roomA, newEvents)
})
if err != nil {
t.Fatalf("failed to RemoveSupersededInvites: %s", err)
}
assertInvites(t, table, bob, map[string][]json.RawMessage{})
}

func assertInvites(t *testing.T, table *InvitesTable, user string, expected map[string][]json.RawMessage) {
invites, err := table.SelectAllInvitesForUser(user)
if err != nil {
t.Fatalf("failed to SelectAllInvitesForUser: %s", err)
}
if !reflect.DeepEqual(invites, expected) {
t.Fatalf("got %v invites, want %v", invites, expected)
}
}

func jsonArrStr(a []json.RawMessage) (result string) {
for _, e := range a {
result += string(e) + "\n"
Expand Down
3 changes: 2 additions & 1 deletion state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func NewStorageWithDB(db *sqlx.DB, addPrometheusMetrics bool) *Storage {
eventsTable: NewEventTable(db),
snapshotTable: NewSnapshotsTable(db),
spacesTable: NewSpacesTable(db),
invitesTable: NewInvitesTable(db),
entityName: "server",
}

Expand All @@ -94,7 +95,7 @@ func NewStorageWithDB(db *sqlx.DB, addPrometheusMetrics bool) *Storage {
UnreadTable: NewUnreadTable(db),
EventsTable: acc.eventsTable,
AccountDataTable: NewAccountDataTable(db),
InvitesTable: NewInvitesTable(db),
InvitesTable: acc.invitesTable,
TransactionsTable: NewTransactionsTable(db),
DeviceDataTable: NewDeviceDataTable(db),
ReceiptTable: NewReceiptTable(db),
Expand Down
9 changes: 0 additions & 9 deletions sync3/caches/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,15 +345,6 @@ func (c *GlobalCache) OnNewEvent(
// remove this user as a hero
metadata.RemoveHero(*ed.StateKey)
}

if membership == "join" && eventJSON.Get("unsigned.prev_content.membership").Str == "invite" {
// invite -> join, retire any outstanding invites
err := c.store.InvitesTable.RemoveInvite(*ed.StateKey, ed.RoomID)
if err != nil {
logger.Err(err).Str("user", *ed.StateKey).Str("room", ed.RoomID).Msg("failed to remove accepted invite")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
}
}
if len(metadata.Heroes) < 6 && (membership == "join" || membership == "invite") {
// try to find the existing hero e.g they changed their display name
Expand Down
Loading