Skip to content

Commit

Permalink
Make sure historical events aren't sent as AS transactions
Browse files Browse the repository at this point in the history
Regression tests for matrix-org/synapse#11241

Synapse changes: matrix-org/synapse#11265
  • Loading branch information
MadLittleMods committed Nov 6, 2021
1 parent 865848d commit 7ab71d4
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
2 changes: 1 addition & 1 deletion internal/b/hs_with_application_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var BlueprintHSWithApplicationService = MustValidate(Blueprint{
ApplicationServices: []ApplicationService{
{
ID: "my_as_id",
URL: "http://localhost:9000",
URL: "http://host.docker.internal:9111",
SenderLocalpart: "the-bridge-user",
RateLimited: false,
},
Expand Down
98 changes: 98 additions & 0 deletions tests/msc2716_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ package tests

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/tidwall/gjson"

"github.com/matrix-org/complement/internal/b"
Expand Down Expand Up @@ -307,6 +309,102 @@ func TestImportHistoricalMessages(t *testing.T) {
})
})

t.Run("Historical events from batch_send do not get pushed out as application service transactions", func(t *testing.T) {
t.Parallel()

// Create a listener and handler to stub an application service listening
// for transactions from a homeserver.
handler := mux.NewRouter()
// Application Service API: /_matrix/app/v1/transactions/{txnId}
waiter := NewWaiter()
var eventIDsWeSawOverTransactions []string
var eventIDAfterHistoricalImport string
handler.HandleFunc("/transactions/{txnId}", func(w http.ResponseWriter, req *http.Request) {
must.MatchRequest(t, req, match.HTTPRequest{
JSON: []match.JSON{
match.JSONArrayEach("events", func(r gjson.Result) error {
// Add to our running list of events
eventIDsWeSawOverTransactions = append(eventIDsWeSawOverTransactions, r.Get("event_id").Str)

// If we found the event that occurs after our batch send. we can
// probably safely assume the historical events won't come later.
if r.Get("event_id").Str != "" && r.Get("event_id").Str == eventIDAfterHistoricalImport {
defer waiter.Finish()
}

return nil
}),
},
})

// Acknowledge that we've seen the transaction
w.WriteHeader(200)
w.Write([]byte("{}"))
}).Methods("PUT")

srv := &http.Server{
Addr: ":9111",
Handler: handler,
}
go func() {
srv.ListenAndServe()
}()
defer srv.Shutdown(context.Background())
// ----------------------------------------------------------

// Create the room all of the action is going to happen in
roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)

// Create the "live" event we are going to insert our historical events next to
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

// Import a historical event
batchSendRes := batchSendHistoricalMessages(
t,
as,
roomID,
eventIdBefore,
"",
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore),
createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 1),
// Status
200,
)
batchSendResBody := client.ParseJSON(t, batchSendRes)
historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")
historicalStateEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "state_event_ids")

// This is just a dummy event we search for after the historicalEventIDs/historicalStateEventIDs
eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1)
eventIDAfterHistoricalImport = eventIDsAfterHistoricalImport[0]

// Check if eventIDAfterHistoricalImport already came over `/transactions`.
if !includes(eventIDAfterHistoricalImport, eventIDsWeSawOverTransactions) {
// If not, wait 5 seconds for to see if it happens. The waiter will only
// resolve if we see eventIDAfterHistoricalImport, otherwise timeout
waiter.Wait(t, 5*time.Second)
}

// Now, that we know eventIDAfterHistoricalImport came over /transactions,
// we can probably safely assume the historical events won't come later.

// Check to make sure the historical events didn't come over /transactions
for _, historicalEventID := range historicalEventIDs {
if includes(historicalEventID, eventIDsWeSawOverTransactions) {
t.Fatalf("We should not see the %s historical event come over /transactions but it did", historicalEventID)
}
}
// Check to make sure the historical state events didn't come over /transactions
for _, historicalStateEventID := range historicalStateEventIDs {
if includes(historicalStateEventID, eventIDsWeSawOverTransactions) {
t.Fatalf("We should not see the %s historical state event come over /transactions but it did", historicalStateEventID)
}
}
})

t.Run("Batch send endpoint only returns state events that we passed in via state_events_at_start", func(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 7ab71d4

Please sign in to comment.