Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support redactions and relations of/with unsent events. #947

Merged
merged 21 commits into from
Jun 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e222fb1
enqueue relations and redactions as well
bwindels Jun 6, 2019
c58db66
give the client a chance to run room.updatePendingEvent after sending
bwindels Jun 6, 2019
6eb229a
first look in pending event list for event being redacted
bwindels Jun 7, 2019
831aec6
emit remote id once received so enqueued relations have it when sent
bwindels Jun 7, 2019
7a10d50
emit Relations.redaction synchronously, timeout should not be needed
bwindels Jun 7, 2019
f1336a5
rename target id to related id and add jsdoc comments
bwindels Jun 7, 2019
3f917b3
fix lint
bwindels Jun 7, 2019
7d2f7fa
fix tests
bwindels Jun 7, 2019
624c6f0
get the txnId from the correct place to delete event after remote echo
bwindels Jun 11, 2019
6d9fba8
preserve (locally) redacted state after applying remote echo
bwindels Jun 11, 2019
930de64
don't add events from /sync that have been locally redacted
bwindels Jun 12, 2019
5602b94
make sure where not re-adding cancelled events when undoing local red.
bwindels Jun 12, 2019
a9f9e2c
comment typo
bwindels Jun 12, 2019
b005b75
comment typo
bwindels Jun 12, 2019
3ed9b00
clarify why we need to listen for remote echo of related event
bwindels Jun 13, 2019
4143a79
rename related id to associated id
bwindels Jun 13, 2019
4462f4b
add isRedaction helper on Event
bwindels Jun 13, 2019
811a98a
whitespace, newlines
bwindels Jun 13, 2019
3488fbe
expand comment why need to preserve redaction local echo on remote echo
bwindels Jun 13, 2019
2a0c85c
add hasAssociation helper
bwindels Jun 13, 2019
6059df1
move CANCELLED check deeper into aggregation path
bwindels Jun 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 39 additions & 27 deletions spec/unit/scheduler.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ describe("MatrixScheduler", function() {
clock.uninstall();
});

it("should process events in a queue in a FIFO manner", function(done) {
it("should process events in a queue in a FIFO manner", async function() {
retryFn = function() {
return 0;
};
Expand All @@ -57,28 +57,30 @@ describe("MatrixScheduler", function() {
};
const deferA = Promise.defer();
const deferB = Promise.defer();
let resolvedA = false;
let yieldedA = false;
scheduler.setProcessFunction(function(event) {
if (resolvedA) {
if (yieldedA) {
expect(event).toEqual(eventB);
return deferB.promise;
} else {
yieldedA = true;
expect(event).toEqual(eventA);
return deferA.promise;
}
});
scheduler.queueEvent(eventA);
scheduler.queueEvent(eventB).done(function() {
expect(resolvedA).toBe(true);
done();
});
deferA.resolve({});
resolvedA = true;
deferB.resolve({});
const abPromise = Promise.all([
scheduler.queueEvent(eventA),
scheduler.queueEvent(eventB),
]);
deferB.resolve({b: true});
deferA.resolve({a: true});
const [a, b] = await abPromise;
expect(a.a).toEqual(true);
expect(b.b).toEqual(true);
});

it("should invoke the retryFn on failure and wait the amount of time specified",
function(done) {
async function() {
const waitTimeMs = 1500;
const retryDefer = Promise.defer();
retryFn = function() {
Expand All @@ -97,33 +99,35 @@ describe("MatrixScheduler", function() {
return defer.promise;
} else if (procCount === 2) {
// don't care about this defer
return Promise.defer().promise;
return new Promise();
}
expect(procCount).toBeLessThan(3);
});

scheduler.queueEvent(eventA);
// as queueing doesn't start processing synchronously anymore (see commit bbdb5ac)
// wait just long enough before it does
bwindels marked this conversation as resolved.
Show resolved Hide resolved
await Promise.resolve();
expect(procCount).toEqual(1);
defer.reject({});
retryDefer.promise.done(function() {
expect(procCount).toEqual(1);
clock.tick(waitTimeMs);
expect(procCount).toEqual(2);
done();
});
await retryDefer.promise;
expect(procCount).toEqual(1);
clock.tick(waitTimeMs);
await Promise.resolve();
expect(procCount).toEqual(2);
});

it("should give up if the retryFn on failure returns -1 and try the next event",
function(done) {
async function() {
// Queue A & B.
// Reject A and return -1 on retry.
// Expect B to be tried next and the promise for A to be rejected.
retryFn = function() {
return -1;
};
queueFn = function() {
return "yep";
};
return "yep";
};

const deferA = Promise.defer();
const deferB = Promise.defer();
Expand All @@ -142,13 +146,17 @@ describe("MatrixScheduler", function() {

const globalA = scheduler.queueEvent(eventA);
scheduler.queueEvent(eventB);

// as queueing doesn't start processing synchronously anymore (see commit bbdb5ac)
// wait just long enough before it does
await Promise.resolve();
expect(procCount).toEqual(1);
deferA.reject({});
globalA.catch(function() {
try {
await globalA;
} catch(err) {
await Promise.resolve();
expect(procCount).toEqual(2);
done();
});
}
});

it("should treat each queue separately", function(done) {
Expand Down Expand Up @@ -300,7 +308,11 @@ describe("MatrixScheduler", function() {
expect(ev).toEqual(eventA);
return defer.promise;
});
expect(procCount).toEqual(1);
// as queueing doesn't start processing synchronously anymore (see commit bbdb5ac)
// wait just long enough before it does
Promise.resolve().then(() => {
expect(procCount).toEqual(1);
});
});

it("should not call the processFn if there are no queued events", function() {
Expand Down
23 changes: 18 additions & 5 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1719,20 +1719,33 @@ MatrixClient.prototype._sendCompleteEvent = function(roomId, eventObject, txnId,
txnId = this.makeTxnId();
}

// we always construct a MatrixEvent when sending because the store and
// scheduler use them. We'll extract the params back out if it turns out
// the client has no scheduler or store.
const localEvent = new MatrixEvent(Object.assign(eventObject, {
event_id: "~" + roomId + ":" + txnId,
user_id: this.credentials.userId,
room_id: roomId,
origin_server_ts: new Date().getTime(),
}));

const room = this.getRoom(roomId);
bwindels marked this conversation as resolved.
Show resolved Hide resolved

// if this is a relation or redaction of an event
// that hasn't been sent yet (e.g. with a local id starting with a ~)
// then listen for the remote echo of that event so that by the time
// this event does get sent, we have the correct event_id
const targetId = localEvent.getAssociatedId();
if (targetId && targetId.startsWith("~")) {
const target = room.getPendingEvents().find(e => e.getId() === targetId);
target.once("Event.localEventIdReplaced", () => {
localEvent.updateAssociatedId(target.getId());
});
}
bwindels marked this conversation as resolved.
Show resolved Hide resolved

const type = localEvent.getType();
logger.log(`sendEvent of type ${type} in ${roomId} with txnId ${txnId}`);

// we always construct a MatrixEvent when sending because the store and
// scheduler use them. We'll extract the params back out if it turns out
// the client has no scheduler or store.
const room = this.getRoom(roomId);
localEvent._txnId = txnId;
localEvent.setStatus(EventStatus.SENDING);

Expand Down Expand Up @@ -1886,7 +1899,7 @@ function _sendEventHttpRequest(client, event) {
pathTemplate = "/rooms/$roomId/state/$eventType/$stateKey";
}
path = utils.encodeUri(pathTemplate, pathParams);
} else if (event.getType() === "m.room.redaction") {
} else if (event.isRedaction()) {
const pathTemplate = `/rooms/$roomId/redact/$redactsEventId/$txnId`;
path = utils.encodeUri(pathTemplate, Object.assign({
$redactsEventId: event.event.redacts,
Expand Down
5 changes: 5 additions & 0 deletions src/models/event-timeline-set.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
const EventEmitter = require("events").EventEmitter;
const utils = require("../utils");
const EventTimeline = require("./event-timeline");
import {EventStatus} from "./event";
import logger from '../../src/logger';
import Relations from './relations';

Expand Down Expand Up @@ -749,6 +750,10 @@ EventTimelineSet.prototype.aggregateRelations = function(event) {
return;
}

if (event.isRedacted() || event.status === EventStatus.CANCELLED) {
return;
}

// If the event is currently encrypted, wait until it has been decrypted.
if (event.isBeingDecrypted()) {
event.once("Event.decrypted", () => {
Expand Down
71 changes: 71 additions & 0 deletions src/models/event.js
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,15 @@ utils.extend(module.exports.MatrixEvent.prototype, {
return Boolean(this.getUnsigned().redacted_because);
},

/**
* Check if this event is a redaction of another event
*
* @return {boolean} True if this event is a redaction
*/
isRedaction: function() {
return this.getType() === "m.room.redaction";
},

/**
* Get the push actions, if known, for this event
*
Expand All @@ -776,9 +785,26 @@ utils.extend(module.exports.MatrixEvent.prototype, {
* @param {Object} event the object to assign to the `event` property
*/
handleRemoteEcho: function(event) {
const oldUnsigned = this.getUnsigned();
const oldId = this.getId();
this.event = event;
// if this event was redacted before it was sent, it's locally marked as redacted.
// At this point, we've received the remote echo for the event, but not yet for
// the redaction that we are sending ourselves. Preserve the locally redacted
// state by copying over redacted_because so we don't get a flash of
// redacted, not-redacted, redacted as remote echos come in
if (oldUnsigned.redacted_because) {
if (!this.event.unsigned) {
bwindels marked this conversation as resolved.
Show resolved Hide resolved
this.event.unsigned = {};
}
this.event.unsigned.redacted_because = oldUnsigned.redacted_because;
}
// successfully sent.
this.setStatus(null);
if (this.getId() !== oldId) {
// emit the event if it changed
this.emit("Event.localEventIdReplaced", this);
}
},

/**
Expand All @@ -801,6 +827,11 @@ utils.extend(module.exports.MatrixEvent.prototype, {
this.emit("Event.status", this, status);
},

replaceLocalEventId(eventId) {
this.event.event_id = eventId;
this.emit("Event.localEventIdReplaced", this);
},

/**
* Get whether the event is a relation event, and of a given type if
* `relType` is passed in.
Expand Down Expand Up @@ -876,6 +907,46 @@ utils.extend(module.exports.MatrixEvent.prototype, {
return this._replacingEvent;
},

/**
* For relations and redactions, returns the event_id this event is referring to.
*
* @return {string?}
*/
getAssociatedId() {
const relation = this.getRelation();
if (relation) {
return relation.event_id;
} else if (this.isRedaction()) {
return this.event.redacts;
}
},

/**
* Checks if this event is associated with another event. See `getAssociatedId`.
*
* @return {bool}
*/
hasAssocation() {
return !!this.getAssociatedId();
},

/**
* Update the related id with a new one.
*
* Used to replace a local id with remote one before sending
* an event with a related id.
*
* @param {string} eventId the new event id
*/
updateAssociatedId(eventId) {
const relation = this.getRelation();
if (relation) {
relation.event_id = eventId;
} else if (this.isRedaction()) {
this.event.redacts = eventId;
}
},

/**
* Summarise the event as JSON for debugging. If encrypted, include both the
* decrypted and encrypted view of the event. This is named `toJSON` for use
Expand Down
7 changes: 1 addition & 6 deletions src/models/relations.js
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,7 @@ export default class Relations extends EventEmitter {

redactedEvent.removeListener("Event.beforeRedaction", this._onBeforeRedaction);

// Dispatch a redaction event on this collection. `setTimeout` is used
// to wait until the next event loop iteration by which time the event
// has actually been marked as redacted.
setTimeout(() => {
this.emit("Relations.redaction");
}, 0);
this.emit("Relations.redaction");
}

/**
Expand Down
18 changes: 11 additions & 7 deletions src/models/room.js
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ Room.prototype.removeFilteredTimelineSet = function(filter) {
* @private
*/
Room.prototype._addLiveEvent = function(event, duplicateStrategy) {
if (event.getType() === "m.room.redaction") {
if (event.isRedaction()) {
const redactId = event.event.redacts;

// if we know about this event, redact its contents now.
Expand Down Expand Up @@ -1141,9 +1141,13 @@ Room.prototype.addPendingEvent = function(event, txnId) {
this._aggregateNonLiveRelation(event);
}

if (event.getType() === "m.room.redaction") {
if (event.isRedaction()) {
const redactId = event.event.redacts;
const redactedEvent = this.getUnfilteredTimelineSet().findEventById(redactId);
let redactedEvent = this._pendingEventList &&
this._pendingEventList.find(e => e.getId() === redactId);
if (!redactedEvent) {
redactedEvent = this.getUnfilteredTimelineSet().findEventById(redactId);
}
if (redactedEvent) {
redactedEvent.markLocallyRedacted(event);
this.emit("Room.redaction", event, this);
Expand Down Expand Up @@ -1211,7 +1215,7 @@ Room.prototype._handleRemoteEcho = function(remoteEvent, localEvent) {
const oldStatus = localEvent.status;

// no longer pending
delete this._txnToEvent[remoteEvent.transaction_id];
delete this._txnToEvent[remoteEvent.getUnsigned().transaction_id];

// if it's in the pending list, remove it
if (this._pendingEventList) {
Expand Down Expand Up @@ -1315,7 +1319,7 @@ Room.prototype.updatePendingEvent = function(event, newStatus, newEventId) {

if (newStatus == EventStatus.SENT) {
// update the event id
event.event.event_id = newEventId;
event.replaceLocalEventId(newEventId);

// if the event was already in the timeline (which will be the case if
// opts.pendingEventOrdering==chronological), we need to update the
Expand All @@ -1329,7 +1333,7 @@ Room.prototype.updatePendingEvent = function(event, newStatus, newEventId) {
const idx = this._pendingEventList.findIndex(ev => ev.getId() === oldEventId);
if (idx !== -1) {
const [removedEvent] = this._pendingEventList.splice(idx, 1);
if (removedEvent.getType() === "m.room.redaction") {
if (removedEvent.isRedaction()) {
this._revertRedactionLocalEcho(removedEvent);
}
}
Expand Down Expand Up @@ -1435,7 +1439,7 @@ Room.prototype.removeEvent = function(eventId) {
for (let i = 0; i < this._timelineSets.length; i++) {
const removed = this._timelineSets[i].removeEvent(eventId);
if (removed) {
if (removed.getType() === "m.room.redaction") {
if (removed.isRedaction()) {
this._revertRedactionLocalEcho(removed);
}
removedAny = true;
Expand Down
Loading