Skip to content

Commit

Permalink
Support switching federated rooms while reusing internal connections.
Browse files Browse the repository at this point in the history
  • Loading branch information
fancycode committed Jul 25, 2024
1 parent b2f5906 commit 4387080
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 78 deletions.
159 changes: 99 additions & 60 deletions federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,10 @@ type FederationClient struct {
session *ClientSession
message atomic.Pointer[ClientMessage]

roomId string
remoteRoomId string
changeRoomId bool
roomSessionId string
roomProperties atomic.Pointer[json.RawMessage]
federation *RoomFederationMessage
roomId atomic.Value
remoteRoomId atomic.Value
changeRoomId atomic.Bool
federation atomic.Pointer[RoomFederationMessage]

mu sync.Mutex
dialer *websocket.Dialer
Expand Down Expand Up @@ -110,18 +108,16 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession,
hub: hub,
session: session,

roomId: room.RoomId,
remoteRoomId: remoteRoomId,
changeRoomId: room.RoomId != remoteRoomId,
roomSessionId: room.SessionId,
federation: room.Federation,

reconnectDelay: initialFederationReconnectInterval,

dialer: &dialer,
url: url,
closer: NewCloser(),
}
result.roomId.Store(room.RoomId)
result.remoteRoomId.Store(remoteRoomId)
result.changeRoomId.Store(room.RoomId != remoteRoomId)
result.federation.Store(room.Federation)
result.message.Store(message)

if err := result.connect(ctx); err != nil {
Expand All @@ -139,26 +135,13 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession,
}

func (c *FederationClient) URL() string {
return c.federation.parsedSignalingUrl.String()
return c.federation.Load().parsedSignalingUrl.String()
}

func (c *FederationClient) IsSameRoom(room *RoomClientMessage) (string, json.RawMessage, bool) {
federation := room.Federation
remoteRoomId := federation.RoomId
if remoteRoomId == "" {
remoteRoomId = room.RoomId
}

if c.remoteRoomId != remoteRoomId || c.federation.NextcloudUrl != federation.NextcloudUrl {
return "", nil, false
}

var properties json.RawMessage
if roomProperties := c.roomProperties.Load(); roomProperties != nil {
properties = *roomProperties
}

return room.RoomId, properties, true
func (c *FederationClient) CanReuse(federation *RoomFederationMessage) bool {
fed := c.federation.Load()
return fed.NextcloudUrl == federation.NextcloudUrl &&
fed.SignalingUrl == federation.SignalingUrl
}

func (c *FederationClient) connect(ctx context.Context) error {
Expand Down Expand Up @@ -202,6 +185,17 @@ func (c *FederationClient) connect(ctx context.Context) error {
return nil
}

func (c *FederationClient) ChangeRoom(message *ClientMessage) error {
if message.Room == nil || message.Room.Federation == nil {
return fmt.Errorf("expected federation room message, got %+v", message)
} else if !c.CanReuse(message.Room.Federation) {
return fmt.Errorf("can't reuse federation client to join room in %+v", message)
}

c.message.Swap(message)
return c.joinRoom()
}

func (c *FederationClient) Leave(message *ClientMessage) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -433,7 +427,7 @@ func (c *FederationClient) sendHelloLocked(auth *FederationAuthParams) error {
} else {
msg.Hello.Auth = &HelloClientMessageAuth{
Type: HelloClientTypeFederation,
Url: c.federation.NextcloudUrl,
Url: c.federation.Load().NextcloudUrl,
Params: authData,
}
}
Expand All @@ -447,7 +441,7 @@ func (c *FederationClient) processWelcome(msg *ServerMessage) {
}

federationParams := &FederationAuthParams{
Token: c.federation.Token,
Token: c.federation.Load().Token,
}
if err := c.sendHello(federationParams); err != nil {
log.Printf("Error sending hello message to %s for %s: %s", c.URL(), c.session.PublicId(), err)
Expand Down Expand Up @@ -546,16 +540,24 @@ func (c *FederationClient) processHello(msg *ServerMessage) {
}

func (c *FederationClient) joinRoom() error {
var id string
if message := c.message.Swap(nil); message != nil {
id = message.Id
message := c.message.Load()
if message == nil {
// Should not happen as the connection has been closed with an error already.
return ErrNotConnected
}

room := message.Room
remoteRoomId := room.Federation.RoomId
if remoteRoomId == "" {
remoteRoomId = room.RoomId
}

return c.SendMessage(&ClientMessage{
Id: id,
Id: message.Id,
Type: "room",
Room: &RoomClientMessage{
RoomId: c.remoteRoomId,
SessionId: c.roomSessionId,
RoomId: remoteRoomId,
SessionId: room.SessionId,
},
})
}
Expand Down Expand Up @@ -597,6 +599,9 @@ func (c *FederationClient) processMessage(msg *ServerMessage) {
remoteSessionId = hello.SessionId
}

remoteRoomId := c.remoteRoomId.Load().(string)
roomId := c.roomId.Load().(string)

var doClose bool
switch msg.Type {
case "control":
Expand All @@ -607,23 +612,23 @@ func (c *FederationClient) processMessage(msg *ServerMessage) {
case "participants":
switch msg.Event.Type {
case "update":
if c.changeRoomId && msg.Event.Update.RoomId == c.remoteRoomId {
msg.Event.Update.RoomId = c.roomId
if c.changeRoomId.Load() && msg.Event.Update.RoomId == remoteRoomId {
msg.Event.Update.RoomId = roomId
}
if remoteSessionId != "" {
c.updateEventUsers(msg.Event.Update.Changed, localSessionId, remoteSessionId)
c.updateEventUsers(msg.Event.Update.Users, localSessionId, remoteSessionId)
}
case "flags":
if c.changeRoomId && msg.Event.Flags.RoomId == c.remoteRoomId {
msg.Event.Flags.RoomId = c.roomId
if c.changeRoomId.Load() && msg.Event.Flags.RoomId == remoteRoomId {
msg.Event.Flags.RoomId = roomId
}
if remoteSessionId != "" && msg.Event.Flags.SessionId == remoteSessionId {
msg.Event.Flags.SessionId = localSessionId
}
case "message":
if c.changeRoomId && msg.Event.Message.RoomId == c.remoteRoomId {
msg.Event.Message.RoomId = c.roomId
if c.changeRoomId.Load() && msg.Event.Message.RoomId == remoteRoomId {
msg.Event.Message.RoomId = roomId
}
}
case "room":
Expand All @@ -650,36 +655,66 @@ func (c *FederationClient) processMessage(msg *ServerMessage) {
}
}
case "message":
if c.changeRoomId && msg.Event.Message.RoomId == c.remoteRoomId {
msg.Event.Message.RoomId = c.roomId
if c.changeRoomId.Load() && msg.Event.Message.RoomId == remoteRoomId {
msg.Event.Message.RoomId = roomId
}
}
case "roomlist":
switch msg.Event.Type {
case "invite":
if c.changeRoomId && msg.Event.Invite.RoomId == c.remoteRoomId {
msg.Event.Invite.RoomId = c.roomId
if c.changeRoomId.Load() && msg.Event.Invite.RoomId == remoteRoomId {
msg.Event.Invite.RoomId = roomId
}
case "disinvite":
if c.changeRoomId && msg.Event.Disinvite.RoomId == c.remoteRoomId {
msg.Event.Disinvite.RoomId = c.roomId
if c.changeRoomId.Load() && msg.Event.Disinvite.RoomId == remoteRoomId {
msg.Event.Disinvite.RoomId = roomId
}
case "update":
if c.changeRoomId && msg.Event.Update.RoomId == c.remoteRoomId {
msg.Event.Update.RoomId = c.roomId
if c.changeRoomId.Load() && msg.Event.Update.RoomId == remoteRoomId {
msg.Event.Update.RoomId = roomId
}
}
}
case "error":
if c.changeRoomId.Load() && msg.Error.Code == "already_joined" {
if len(msg.Error.Details) > 0 {
var details RoomErrorDetails
if err := json.Unmarshal(msg.Error.Details, &details); err == nil && details.Room != nil {
if details.Room.RoomId == remoteRoomId {
details.Room.RoomId = roomId
if data, err := json.Marshal(details); err == nil {
msg.Error.Details = data
}
}
}
}
}
case "room":
if message := c.message.Load(); message != nil {
if msg.Id != "" && message.Id == msg.Id {
// Got response to initial join request, clear id so future join
// requests will not be mapped to any client callbacks.
message.Id = ""
c.message.Store(message)
}

room := message.Room
roomId = room.RoomId
remoteRoomId = room.Federation.RoomId
if remoteRoomId == "" {
remoteRoomId = room.RoomId
}

c.roomId.Store(room.RoomId)
c.remoteRoomId.Store(remoteRoomId)
c.changeRoomId.Store(room.RoomId != remoteRoomId)
c.federation.Store(room.Federation)
}

if msg.Room.RoomId == "" && c.closeOnLeave.Load() {
doClose = true
} else if c.changeRoomId && msg.Room.RoomId == c.remoteRoomId {
msg.Room.RoomId = c.roomId
}
if len(msg.Room.Properties) > 0 {
c.roomProperties.Store(&msg.Room.Properties)
} else {
c.roomProperties.Store(nil)
} else if c.changeRoomId.Load() && msg.Room.RoomId == remoteRoomId {
msg.Room.RoomId = roomId
}
case "message":
c.updateSessionRecipient(msg.Message.Recipient, localSessionId, remoteSessionId)
Expand Down Expand Up @@ -745,7 +780,11 @@ func (c *FederationClient) deferMessage(message *ClientMessage) {

func (c *FederationClient) sendMessageLocked(message *ClientMessage) error {
if c.conn == nil {
c.deferMessage(message)
if message.Type != "room" {
// Join requests will be automatically sent after the hello response has
// been received.
c.deferMessage(message)
}
return nil
}

Expand Down
108 changes: 108 additions & 0 deletions federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,114 @@ func Test_FederationJoinRoomTwice(t *testing.T) {
}
}

func Test_FederationChangeRoom(t *testing.T) {
CatchLogForTest(t)

assert := assert.New(t)
require := require.New(t)

hub1, hub2, server1, server2 := CreateClusteredHubsForTest(t)

client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
require.NoError(client1.SendHelloV2(testDefaultUserId + "1"))

client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
require.NoError(client2.SendHelloV2(testDefaultUserId + "2"))

ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

hello1, err := client1.RunUntilHello(ctx)
require.NoError(err)

hello2, err := client2.RunUntilHello(ctx)
require.NoError(err)

roomId := "test-room"
federatedRoomId := roomId + "@federated"
room1, err := client1.JoinRoom(ctx, roomId)
require.NoError(err)
require.Equal(roomId, room1.Room.RoomId)

assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))

now := time.Now()
token, err := client1.CreateHelloV2Token(testDefaultUserId+"2", now, now.Add(time.Minute))
require.NoError(err)

msg := &ClientMessage{
Id: "join-room-fed",
Type: "room",
Room: &RoomClientMessage{
RoomId: federatedRoomId,
SessionId: federatedRoomId + "-" + hello2.Hello.SessionId,
Federation: &RoomFederationMessage{
SignalingUrl: server1.URL,
NextcloudUrl: server1.URL,
RoomId: roomId,
Token: token,
},
},
}
require.NoError(client2.WriteJSON(msg))

if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal(msg.Id, message.Id)
require.Equal("room", message.Type)
require.Equal(federatedRoomId, message.Room.RoomId)
}

session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
fed := session2.GetFederationClient()
require.NotNil(fed)
localAddr := fed.conn.LocalAddr()

// The client1 will see the remote session id for client2.
var remoteSessionId string
if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
assert.NoError(client1.checkSingleMessageJoined(message))
evt := message.Event.Join[0]
remoteSessionId = evt.SessionId
assert.NotEqual(hello2.Hello.SessionId, remoteSessionId)
assert.Equal(hello2.Hello.UserId, evt.UserId)
assert.True(evt.Federated)
}

// The client2 will see its own session id, not the one from the remote server.
assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))

roomId2 := roomId + "-2"
federatedRoomId2 := roomId2 + "@federated"
msg2 := &ClientMessage{
Id: "join-room-fed-2",
Type: "room",
Room: &RoomClientMessage{
RoomId: federatedRoomId2,
SessionId: federatedRoomId2 + "-" + hello2.Hello.SessionId,
Federation: &RoomFederationMessage{
SignalingUrl: server1.URL,
NextcloudUrl: server1.URL,
RoomId: roomId2,
Token: token,
},
},
}
require.NoError(client2.WriteJSON(msg2))

if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal(msg2.Id, message.Id)
require.Equal("room", message.Type)
require.Equal(federatedRoomId2, message.Room.RoomId)
}

fed2 := session2.GetFederationClient()
require.NotNil(fed2)
localAddr2 := fed2.conn.LocalAddr()
assert.Equal(localAddr, localAddr2)
}

func Test_FederationMedia(t *testing.T) {
CatchLogForTest(t)

Expand Down
Loading

0 comments on commit 4387080

Please sign in to comment.