From 7ab71d45c08bc67c7d0166df4879a781bec25f5a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Sat, 6 Nov 2021 00:30:07 -0500 Subject: [PATCH] Make sure historical events aren't sent as AS transactions Regression tests for https://github.com/matrix-org/synapse/issues/11241 Synapse changes: https://github.com/matrix-org/synapse/pull/11265 --- internal/b/hs_with_application_service.go | 2 +- tests/msc2716_test.go | 98 +++++++++++++++++++++++ 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/internal/b/hs_with_application_service.go b/internal/b/hs_with_application_service.go index e3aa059b..fe20baf0 100644 --- a/internal/b/hs_with_application_service.go +++ b/internal/b/hs_with_application_service.go @@ -15,7 +15,7 @@ var BlueprintHSWithApplicationService = MustValidate(Blueprint{ ApplicationServices: []ApplicationService{ { ID: "my_as_id", - URL: "http://localhost:9000", + URL: "http://host.docker.internal:9111", SenderLocalpart: "the-bridge-user", RateLimited: false, }, diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 971b5e0e..69d5adab 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -8,6 +8,7 @@ package tests import ( "bytes" + "context" "fmt" "io/ioutil" "net/http" @@ -15,6 +16,7 @@ import ( "testing" "time" + "github.com/gorilla/mux" "github.com/tidwall/gjson" "github.com/matrix-org/complement/internal/b" @@ -307,6 +309,102 @@ func TestImportHistoricalMessages(t *testing.T) { }) }) + t.Run("Historical events from batch_send do not get pushed out as application service transactions", func(t *testing.T) { + t.Parallel() + + // Create a listener and handler to stub an application service listening + // for transactions from a homeserver. + handler := mux.NewRouter() + // Application Service API: /_matrix/app/v1/transactions/{txnId} + waiter := NewWaiter() + var eventIDsWeSawOverTransactions []string + var eventIDAfterHistoricalImport string + handler.HandleFunc("/transactions/{txnId}", func(w http.ResponseWriter, req *http.Request) { + must.MatchRequest(t, req, match.HTTPRequest{ + JSON: []match.JSON{ + match.JSONArrayEach("events", func(r gjson.Result) error { + // Add to our running list of events + eventIDsWeSawOverTransactions = append(eventIDsWeSawOverTransactions, r.Get("event_id").Str) + + // If we found the event that occurs after our batch send. we can + // probably safely assume the historical events won't come later. + if r.Get("event_id").Str != "" && r.Get("event_id").Str == eventIDAfterHistoricalImport { + defer waiter.Finish() + } + + return nil + }), + }, + }) + + // Acknowledge that we've seen the transaction + w.WriteHeader(200) + w.Write([]byte("{}")) + }).Methods("PUT") + + srv := &http.Server{ + Addr: ":9111", + Handler: handler, + } + go func() { + srv.ListenAndServe() + }() + defer srv.Shutdown(context.Background()) + // ---------------------------------------------------------- + + // Create the room all of the action is going to happen in + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) + + // Create the "live" event we are going to insert our historical events next to + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + // Import a historical event + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 1), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + historicalStateEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "state_event_ids") + + // This is just a dummy event we search for after the historicalEventIDs/historicalStateEventIDs + eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1) + eventIDAfterHistoricalImport = eventIDsAfterHistoricalImport[0] + + // Check if eventIDAfterHistoricalImport already came over `/transactions`. + if !includes(eventIDAfterHistoricalImport, eventIDsWeSawOverTransactions) { + // If not, wait 5 seconds for to see if it happens. The waiter will only + // resolve if we see eventIDAfterHistoricalImport, otherwise timeout + waiter.Wait(t, 5*time.Second) + } + + // Now, that we know eventIDAfterHistoricalImport came over /transactions, + // we can probably safely assume the historical events won't come later. + + // Check to make sure the historical events didn't come over /transactions + for _, historicalEventID := range historicalEventIDs { + if includes(historicalEventID, eventIDsWeSawOverTransactions) { + t.Fatalf("We should not see the %s historical event come over /transactions but it did", historicalEventID) + } + } + // Check to make sure the historical state events didn't come over /transactions + for _, historicalStateEventID := range historicalStateEventIDs { + if includes(historicalStateEventID, eventIDsWeSawOverTransactions) { + t.Fatalf("We should not see the %s historical state event come over /transactions but it did", historicalStateEventID) + } + } + }) + t.Run("Batch send endpoint only returns state events that we passed in via state_events_at_start", func(t *testing.T) { t.Parallel()