Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions around threads #2331

Merged
merged 10 commits into from
May 3, 2022
4 changes: 1 addition & 3 deletions spec/unit/matrix-client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -811,9 +811,7 @@ describe("MatrixClient", function() {
}
},
},
threads: {
get: jest.fn(),
},
getThread: jest.fn(),
addPendingEvent: jest.fn(),
updatePendingEvent: jest.fn(),
reEmitter: {
Expand Down
79 changes: 65 additions & 14 deletions spec/unit/room.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { RoomState } from "../../src/models/room-state";
import { UNSTABLE_ELEMENT_FUNCTIONAL_USERS } from "../../src/@types/event";
import { TestClient } from "../TestClient";
import { emitPromise } from "../test-utils/test-utils";
import { ThreadEvent } from "../../src/models/thread";
import { Thread, ThreadEvent } from "../../src/models/thread";

describe("Room", function() {
const roomId = "!foo:bar";
Expand Down Expand Up @@ -1914,7 +1914,7 @@ describe("Room", function() {
},
});

room.createThread(undefined, [eventWithoutARootEvent]);
room.createThread("$000", undefined, [eventWithoutARootEvent]);

const rootEvent = new MatrixEvent({
event_id: "$666",
Expand All @@ -1932,7 +1932,7 @@ describe("Room", function() {
},
});

expect(() => room.createThread(rootEvent, [])).not.toThrow();
expect(() => room.createThread(rootEvent.getId(), rootEvent, [])).not.toThrow();
});

it("Edits update the lastReply event", async () => {
Expand All @@ -1959,14 +1959,16 @@ describe("Room", function() {
},
});

let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([randomMessage, threadRoot, threadResponse]);
const thread = await emitPromise(room, ThreadEvent.New);
const thread = await prom;

expect(thread.replyToEvent).toBe(threadResponse);
expect(thread.replyToEvent.getContent().body).toBe(threadResponse.getContent().body);

prom = emitPromise(thread, ThreadEvent.Update);
room.addLiveEvents([threadResponseEdit]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread.replyToEvent.getContent().body).toBe(threadResponseEdit.getContent()["m.new_content"].body);
});

Expand All @@ -1993,15 +1995,17 @@ describe("Room", function() {
},
});

let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]);
const thread = await emitPromise(room, ThreadEvent.New);
const thread = await prom;

expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());

prom = emitPromise(thread, ThreadEvent.Update);
const threadResponse1Redaction = mkRedaction(threadResponse1);
room.addLiveEvents([threadResponse1Redaction]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread).toHaveLength(1);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
});
Expand Down Expand Up @@ -2030,15 +2034,17 @@ describe("Room", function() {
},
});

let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]);
const thread = await emitPromise(room, ThreadEvent.New);
const thread = await prom;

expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());

prom = emitPromise(thread, ThreadEvent.Update);
const threadResponse2ReactionRedaction = mkRedaction(threadResponse2Reaction);
room.addLiveEvents([threadResponse2ReactionRedaction]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
});
Expand Down Expand Up @@ -2067,15 +2073,17 @@ describe("Room", function() {
},
});

let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]);
const thread = await emitPromise(room, ThreadEvent.New);
const thread = await prom;

expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());

prom = emitPromise(room, ThreadEvent.Update);
const threadRootRedaction = mkRedaction(threadRoot);
room.addLiveEvents([threadRootRedaction]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread).toHaveLength(2);
});

Expand All @@ -2102,21 +2110,24 @@ describe("Room", function() {
},
});

let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]);
const thread = await emitPromise(room, ThreadEvent.New);
const thread = await prom;

expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());

prom = emitPromise(room, ThreadEvent.Update);
const threadResponse2Redaction = mkRedaction(threadResponse2);
room.addLiveEvents([threadResponse2Redaction]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread).toHaveLength(1);
expect(thread.replyToEvent.getId()).toBe(threadResponse1.getId());

prom = emitPromise(room, ThreadEvent.Update);
const threadResponse1Redaction = mkRedaction(threadResponse1);
room.addLiveEvents([threadResponse1Redaction]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread).toHaveLength(0);
expect(thread.replyToEvent.getId()).toBe(threadRoot.getId());
});
Expand Down Expand Up @@ -2234,5 +2245,45 @@ describe("Room", function() {
expect(room.eventShouldLiveIn(reply2, events, roots).shouldLiveInRoom).toBeTruthy();
expect(room.eventShouldLiveIn(reply2, events, roots).shouldLiveInThread).toBeFalsy();
});

it("should aggregate relations in thread event timeline set", () => {
Thread.setServerSideSupport(true, true);
const threadRoot = mkMessage();
const rootReaction = mkReaction(threadRoot);
const threadResponse = mkThreadResponse(threadRoot);
const threadReaction = mkReaction(threadResponse);

const events = [
threadRoot,
rootReaction,
threadResponse,
threadReaction,
];

room.addLiveEvents(events);

const thread = threadRoot.getThread();
expect(thread.rootEvent).toBe(threadRoot);

const rootRelations = thread.timelineSet.getRelationsForEvent(
threadRoot.getId(),
RelationType.Annotation,
EventType.Reaction,
).getSortedAnnotationsByKey();
expect(rootRelations).toHaveLength(1);
expect(rootRelations[0][0]).toEqual(rootReaction.getRelation().key);
expect(rootRelations[0][1].size).toEqual(1);
expect(rootRelations[0][1].has(rootReaction)).toBeTruthy();

const responseRelations = thread.timelineSet.getRelationsForEvent(
threadResponse.getId(),
RelationType.Annotation,
EventType.Reaction,
).getSortedAnnotationsByKey();
expect(responseRelations).toHaveLength(1);
expect(responseRelations[0][0]).toEqual(threadReaction.getRelation().key);
expect(responseRelations[0][1].size).toEqual(1);
expect(responseRelations[0][1].has(threadReaction)).toBeTruthy();
});
});
});
46 changes: 23 additions & 23 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ import { IRoomEncryption, RoomList } from './crypto/RoomList';
import { logger } from './logger';
import { SERVICE_TYPES } from './service-types';
import {
FileType, HttpApiEvent, HttpApiEventHandlerMap,
FileType,
HttpApiEvent,
HttpApiEventHandlerMap,
IHttpOpts,
IUpload,
MatrixError,
Expand Down Expand Up @@ -3741,7 +3743,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
"rel_type": THREAD_RELATION_TYPE.name,
"event_id": threadId,
};
const thread = this.getRoom(roomId)?.threads.get(threadId);
const thread = this.getRoom(roomId)?.getThread(threadId);
if (thread) {
content["m.relates_to"]["m.in_reply_to"] = {
"event_id": thread.lastReply((ev: MatrixEvent) => {
Expand Down Expand Up @@ -3790,7 +3792,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
}));

const room = this.getRoom(roomId);
const thread = room?.threads.get(threadId);
const thread = room?.getThread(threadId);
if (thread) {
localEvent.setThread(thread);
}
Expand Down Expand Up @@ -5185,7 +5187,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
limit,
Direction.Backward,
);
}).then(async (res: IMessagesResponse) => {
}).then((res: IMessagesResponse) => {
const matrixEvents = res.chunk.map(this.getEventMapper());
if (res.state) {
const stateEvents = res.state.map(this.getEventMapper());
Expand All @@ -5196,7 +5198,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa

this.processBeaconEvents(room, timelineEvents);
room.addEventsToTimeline(timelineEvents, true, room.getLiveTimeline());
await this.processThreadEvents(room, threadedEvents, true);
this.processThreadEvents(room, threadedEvents, true);

room.oldState.paginationToken = res.end;
if (res.chunk.length === 0) {
Expand Down Expand Up @@ -5299,25 +5301,27 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
event.isRelation(THREAD_RELATION_TYPE.name)
) {
const [, threadedEvents] = timelineSet.room.partitionThreadedEvents(events);
const thread = await timelineSet.room.createThreadFetchRoot(event.threadRootId, threadedEvents, true);

let nextBatch: string;
const response = await thread.fetchInitialEvents();
if (response?.nextBatch) {
nextBatch = response.nextBatch;
let thread = timelineSet.room.getThread(event.threadRootId);
if (!thread) {
thread = timelineSet.room.createThread(event.threadRootId, undefined, threadedEvents, true);
}

const opts: IRelationsRequestOpts = {
direction: Direction.Backward,
limit: 50,
};

// Fetch events until we find the one we were asked for
await thread.fetchInitialEvents();
let nextBatch = thread.liveTimeline.getPaginationToken(Direction.Backward);

// Fetch events until we find the one we were asked for, or we run out of pages
while (!thread.findEventById(eventId)) {
if (nextBatch) {
opts.from = nextBatch;
}

({ nextBatch } = await thread.fetchEvents(opts));
if (!nextBatch) break;
}

return thread.liveTimeline;
Expand All @@ -5336,7 +5340,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(events);
timelineSet.addEventsToTimeline(timelineEvents, true, timeline, res.start);
// The target event is not in a thread but process the contextual events, so we can show any threads around it.
await this.processThreadEvents(timelineSet.room, threadedEvents, true);
this.processThreadEvents(timelineSet.room, threadedEvents, true);
this.processBeaconEvents(timelineSet.room, timelineEvents);

// There is no guarantee that the event ended up in "timeline" (we might have switched to a neighbouring
Expand Down Expand Up @@ -5493,7 +5497,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
opts.limit,
dir,
eventTimeline.getFilter(),
).then(async (res) => {
).then((res) => {
if (res.state) {
const roomState = eventTimeline.getState(dir);
const stateEvents = res.state.map(this.getEventMapper());
Expand All @@ -5506,7 +5510,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(matrixEvents);
timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
this.processBeaconEvents(timelineSet.room, timelineEvents);
await this.processThreadEvents(room, threadedEvents, backwards);
this.processThreadEvents(room, threadedEvents, backwards);

// if we've hit the end of the timeline, we need to stop trying to
// paginate. We need to keep the 'forwards' token though, to make sure
Expand Down Expand Up @@ -6663,7 +6667,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventId: string,
relationType?: RelationType | string | null,
eventType?: EventType | string | null,
opts: IRelationsRequestOpts = {},
opts: IRelationsRequestOpts = { direction: Direction.Backward },
): Promise<{
originalEvent: MatrixEvent;
events: MatrixEvent[];
Expand Down Expand Up @@ -7204,7 +7208,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventId: string,
relationType?: RelationType | string | null,
eventType?: EventType | string | null,
opts: IRelationsRequestOpts = {},
opts: IRelationsRequestOpts = { direction: Direction.Backward },
): Promise<IRelationsResponse> {
const queryString = utils.encodeParams(opts as Record<string, string | number>);

Expand Down Expand Up @@ -8916,12 +8920,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
/**
* @experimental
*/
public async processThreadEvents(
room: Room,
threadedEvents: MatrixEvent[],
toStartOfTimeline: boolean,
): Promise<void> {
await room.processThreadedEvents(threadedEvents, toStartOfTimeline);
public processThreadEvents(room: Room, threadedEvents: MatrixEvent[], toStartOfTimeline: boolean): void {
room.processThreadedEvents(threadedEvents, toStartOfTimeline);
}

public processBeaconEvents(
Expand Down
3 changes: 1 addition & 2 deletions src/models/event-timeline-set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -852,14 +852,13 @@ export class EventTimelineSet extends TypedEventEmitter<EmittedEvents, EventTime
}
let relationsWithEventType = relationsWithRelType[eventType];

let relatesToEvent: MatrixEvent;
if (!relationsWithEventType) {
relationsWithEventType = relationsWithRelType[eventType] = new Relations(
relationType,
eventType,
this.room,
);
relatesToEvent = this.findEventById(relatesToEventId) || this.room.getPendingEvent(relatesToEventId);
const relatesToEvent = this.findEventById(relatesToEventId) || this.room.getPendingEvent(relatesToEventId);
if (relatesToEvent) {
relationsWithEventType.setTargetEvent(relatesToEvent);
}
Expand Down
3 changes: 1 addition & 2 deletions src/models/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1303,8 +1303,7 @@ export class MatrixEvent extends TypedEventEmitter<EmittedEvents, MatrixEventHan
public isRelation(relType: string = undefined): boolean {
// Relation info is lifted out of the encrypted content when sent to
// encrypted rooms, so we have to check `getWireContent` for this.
const content = this.getWireContent();
const relation = content && content["m.relates_to"];
const relation = this.getWireContent()?.["m.relates_to"];
return relation && relation.rel_type && relation.event_id &&
((relType && relation.rel_type === relType) || !relType);
}
Expand Down
Loading