Skip to content

Commit

Permalink
Fix race conditions around threads (#2331)
Browse files Browse the repository at this point in the history
  • Loading branch information
t3chguy committed May 3, 2022
1 parent 274d6a9 commit ac5fee0
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 194 deletions.
4 changes: 1 addition & 3 deletions spec/unit/matrix-client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -811,9 +811,7 @@ describe("MatrixClient", function() {
}
},
},
threads: {
get: jest.fn(),
},
getThread: jest.fn(),
addPendingEvent: jest.fn(),
updatePendingEvent: jest.fn(),
reEmitter: {
Expand Down
79 changes: 65 additions & 14 deletions spec/unit/room.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { RoomState } from "../../src/models/room-state";
import { UNSTABLE_ELEMENT_FUNCTIONAL_USERS } from "../../src/@types/event";
import { TestClient } from "../TestClient";
import { emitPromise } from "../test-utils/test-utils";
import { ThreadEvent } from "../../src/models/thread";
import { Thread, ThreadEvent } from "../../src/models/thread";

describe("Room", function() {
const roomId = "!foo:bar";
Expand Down Expand Up @@ -1914,7 +1914,7 @@ describe("Room", function() {
},
});

room.createThread(undefined, [eventWithoutARootEvent]);
room.createThread("$000", undefined, [eventWithoutARootEvent]);

const rootEvent = new MatrixEvent({
event_id: "$666",
Expand All @@ -1932,7 +1932,7 @@ describe("Room", function() {
},
});

expect(() => room.createThread(rootEvent, [])).not.toThrow();
expect(() => room.createThread(rootEvent.getId(), rootEvent, [])).not.toThrow();
});

it("Edits update the lastReply event", async () => {
Expand All @@ -1959,14 +1959,16 @@ describe("Room", function() {
},
});

let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([randomMessage, threadRoot, threadResponse]);
const thread = await emitPromise(room, ThreadEvent.New);
const thread = await prom;

expect(thread.replyToEvent).toBe(threadResponse);
expect(thread.replyToEvent.getContent().body).toBe(threadResponse.getContent().body);

prom = emitPromise(thread, ThreadEvent.Update);
room.addLiveEvents([threadResponseEdit]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread.replyToEvent.getContent().body).toBe(threadResponseEdit.getContent()["m.new_content"].body);
});

Expand All @@ -1993,15 +1995,17 @@ describe("Room", function() {
},
});

let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]);
const thread = await emitPromise(room, ThreadEvent.New);
const thread = await prom;

expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());

prom = emitPromise(thread, ThreadEvent.Update);
const threadResponse1Redaction = mkRedaction(threadResponse1);
room.addLiveEvents([threadResponse1Redaction]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread).toHaveLength(1);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
});
Expand Down Expand Up @@ -2030,15 +2034,17 @@ describe("Room", function() {
},
});

let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]);
const thread = await emitPromise(room, ThreadEvent.New);
const thread = await prom;

expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());

prom = emitPromise(thread, ThreadEvent.Update);
const threadResponse2ReactionRedaction = mkRedaction(threadResponse2Reaction);
room.addLiveEvents([threadResponse2ReactionRedaction]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
});
Expand Down Expand Up @@ -2067,15 +2073,17 @@ describe("Room", function() {
},
});

let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]);
const thread = await emitPromise(room, ThreadEvent.New);
const thread = await prom;

expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());

prom = emitPromise(room, ThreadEvent.Update);
const threadRootRedaction = mkRedaction(threadRoot);
room.addLiveEvents([threadRootRedaction]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread).toHaveLength(2);
});

Expand All @@ -2102,21 +2110,24 @@ describe("Room", function() {
},
});

let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]);
const thread = await emitPromise(room, ThreadEvent.New);
const thread = await prom;

expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());

prom = emitPromise(room, ThreadEvent.Update);
const threadResponse2Redaction = mkRedaction(threadResponse2);
room.addLiveEvents([threadResponse2Redaction]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread).toHaveLength(1);
expect(thread.replyToEvent.getId()).toBe(threadResponse1.getId());

prom = emitPromise(room, ThreadEvent.Update);
const threadResponse1Redaction = mkRedaction(threadResponse1);
room.addLiveEvents([threadResponse1Redaction]);
await emitPromise(thread, ThreadEvent.Update);
await prom;
expect(thread).toHaveLength(0);
expect(thread.replyToEvent.getId()).toBe(threadRoot.getId());
});
Expand Down Expand Up @@ -2234,5 +2245,45 @@ describe("Room", function() {
expect(room.eventShouldLiveIn(reply2, events, roots).shouldLiveInRoom).toBeTruthy();
expect(room.eventShouldLiveIn(reply2, events, roots).shouldLiveInThread).toBeFalsy();
});

it("should aggregate relations in thread event timeline set", () => {
Thread.setServerSideSupport(true, true);
const threadRoot = mkMessage();
const rootReaction = mkReaction(threadRoot);
const threadResponse = mkThreadResponse(threadRoot);
const threadReaction = mkReaction(threadResponse);

const events = [
threadRoot,
rootReaction,
threadResponse,
threadReaction,
];

room.addLiveEvents(events);

const thread = threadRoot.getThread();
expect(thread.rootEvent).toBe(threadRoot);

const rootRelations = thread.timelineSet.getRelationsForEvent(
threadRoot.getId(),
RelationType.Annotation,
EventType.Reaction,
).getSortedAnnotationsByKey();
expect(rootRelations).toHaveLength(1);
expect(rootRelations[0][0]).toEqual(rootReaction.getRelation().key);
expect(rootRelations[0][1].size).toEqual(1);
expect(rootRelations[0][1].has(rootReaction)).toBeTruthy();

const responseRelations = thread.timelineSet.getRelationsForEvent(
threadResponse.getId(),
RelationType.Annotation,
EventType.Reaction,
).getSortedAnnotationsByKey();
expect(responseRelations).toHaveLength(1);
expect(responseRelations[0][0]).toEqual(threadReaction.getRelation().key);
expect(responseRelations[0][1].size).toEqual(1);
expect(responseRelations[0][1].has(threadReaction)).toBeTruthy();
});
});
});
46 changes: 23 additions & 23 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ import { IRoomEncryption, RoomList } from './crypto/RoomList';
import { logger } from './logger';
import { SERVICE_TYPES } from './service-types';
import {
FileType, HttpApiEvent, HttpApiEventHandlerMap,
FileType,
HttpApiEvent,
HttpApiEventHandlerMap,
IHttpOpts,
IUpload,
MatrixError,
Expand Down Expand Up @@ -3741,7 +3743,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
"rel_type": THREAD_RELATION_TYPE.name,
"event_id": threadId,
};
const thread = this.getRoom(roomId)?.threads.get(threadId);
const thread = this.getRoom(roomId)?.getThread(threadId);
if (thread) {
content["m.relates_to"]["m.in_reply_to"] = {
"event_id": thread.lastReply((ev: MatrixEvent) => {
Expand Down Expand Up @@ -3790,7 +3792,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
}));

const room = this.getRoom(roomId);
const thread = room?.threads.get(threadId);
const thread = room?.getThread(threadId);
if (thread) {
localEvent.setThread(thread);
}
Expand Down Expand Up @@ -5185,7 +5187,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
limit,
Direction.Backward,
);
}).then(async (res: IMessagesResponse) => {
}).then((res: IMessagesResponse) => {
const matrixEvents = res.chunk.map(this.getEventMapper());
if (res.state) {
const stateEvents = res.state.map(this.getEventMapper());
Expand All @@ -5196,7 +5198,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa

this.processBeaconEvents(room, timelineEvents);
room.addEventsToTimeline(timelineEvents, true, room.getLiveTimeline());
await this.processThreadEvents(room, threadedEvents, true);
this.processThreadEvents(room, threadedEvents, true);

room.oldState.paginationToken = res.end;
if (res.chunk.length === 0) {
Expand Down Expand Up @@ -5299,25 +5301,27 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
event.isRelation(THREAD_RELATION_TYPE.name)
) {
const [, threadedEvents] = timelineSet.room.partitionThreadedEvents(events);
const thread = await timelineSet.room.createThreadFetchRoot(event.threadRootId, threadedEvents, true);

let nextBatch: string;
const response = await thread.fetchInitialEvents();
if (response?.nextBatch) {
nextBatch = response.nextBatch;
let thread = timelineSet.room.getThread(event.threadRootId);
if (!thread) {
thread = timelineSet.room.createThread(event.threadRootId, undefined, threadedEvents, true);
}

const opts: IRelationsRequestOpts = {
direction: Direction.Backward,
limit: 50,
};

// Fetch events until we find the one we were asked for
await thread.fetchInitialEvents();
let nextBatch = thread.liveTimeline.getPaginationToken(Direction.Backward);

// Fetch events until we find the one we were asked for, or we run out of pages
while (!thread.findEventById(eventId)) {
if (nextBatch) {
opts.from = nextBatch;
}

({ nextBatch } = await thread.fetchEvents(opts));
if (!nextBatch) break;
}

return thread.liveTimeline;
Expand All @@ -5336,7 +5340,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(events);
timelineSet.addEventsToTimeline(timelineEvents, true, timeline, res.start);
// The target event is not in a thread but process the contextual events, so we can show any threads around it.
await this.processThreadEvents(timelineSet.room, threadedEvents, true);
this.processThreadEvents(timelineSet.room, threadedEvents, true);
this.processBeaconEvents(timelineSet.room, timelineEvents);

// There is no guarantee that the event ended up in "timeline" (we might have switched to a neighbouring
Expand Down Expand Up @@ -5493,7 +5497,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
opts.limit,
dir,
eventTimeline.getFilter(),
).then(async (res) => {
).then((res) => {
if (res.state) {
const roomState = eventTimeline.getState(dir);
const stateEvents = res.state.map(this.getEventMapper());
Expand All @@ -5506,7 +5510,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(matrixEvents);
timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
this.processBeaconEvents(timelineSet.room, timelineEvents);
await this.processThreadEvents(room, threadedEvents, backwards);
this.processThreadEvents(room, threadedEvents, backwards);

// if we've hit the end of the timeline, we need to stop trying to
// paginate. We need to keep the 'forwards' token though, to make sure
Expand Down Expand Up @@ -6663,7 +6667,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventId: string,
relationType?: RelationType | string | null,
eventType?: EventType | string | null,
opts: IRelationsRequestOpts = {},
opts: IRelationsRequestOpts = { direction: Direction.Backward },
): Promise<{
originalEvent: MatrixEvent;
events: MatrixEvent[];
Expand Down Expand Up @@ -7204,7 +7208,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventId: string,
relationType?: RelationType | string | null,
eventType?: EventType | string | null,
opts: IRelationsRequestOpts = {},
opts: IRelationsRequestOpts = { direction: Direction.Backward },
): Promise<IRelationsResponse> {
const queryString = utils.encodeParams(opts as Record<string, string | number>);

Expand Down Expand Up @@ -8916,12 +8920,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
/**
* @experimental
*/
public async processThreadEvents(
room: Room,
threadedEvents: MatrixEvent[],
toStartOfTimeline: boolean,
): Promise<void> {
await room.processThreadedEvents(threadedEvents, toStartOfTimeline);
public processThreadEvents(room: Room, threadedEvents: MatrixEvent[], toStartOfTimeline: boolean): void {
room.processThreadedEvents(threadedEvents, toStartOfTimeline);
}

public processBeaconEvents(
Expand Down
3 changes: 1 addition & 2 deletions src/models/event-timeline-set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -852,14 +852,13 @@ export class EventTimelineSet extends TypedEventEmitter<EmittedEvents, EventTime
}
let relationsWithEventType = relationsWithRelType[eventType];

let relatesToEvent: MatrixEvent;
if (!relationsWithEventType) {
relationsWithEventType = relationsWithRelType[eventType] = new Relations(
relationType,
eventType,
this.room,
);
relatesToEvent = this.findEventById(relatesToEventId) || this.room.getPendingEvent(relatesToEventId);
const relatesToEvent = this.findEventById(relatesToEventId) || this.room.getPendingEvent(relatesToEventId);
if (relatesToEvent) {
relationsWithEventType.setTargetEvent(relatesToEvent);
}
Expand Down
3 changes: 1 addition & 2 deletions src/models/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1303,8 +1303,7 @@ export class MatrixEvent extends TypedEventEmitter<EmittedEvents, MatrixEventHan
public isRelation(relType: string = undefined): boolean {
// Relation info is lifted out of the encrypted content when sent to
// encrypted rooms, so we have to check `getWireContent` for this.
const content = this.getWireContent();
const relation = content && content["m.relates_to"];
const relation = this.getWireContent()?.["m.relates_to"];
return relation && relation.rel_type && relation.event_id &&
((relType && relation.rel_type === relType) || !relType);
}
Expand Down
Loading

0 comments on commit ac5fee0

Please sign in to comment.