diff --git a/lib/client.js b/lib/client.js index 2073ac47cf7..be453083e85 100644 --- a/lib/client.js +++ b/lib/client.js @@ -144,7 +144,10 @@ function MatrixClient(opts) { var self = this; this.scheduler.setProcessFunction(function(eventToSend) { var room = self.getRoom(eventToSend.getRoomId()); - _updateLocalEchoStatus(room, eventToSend, EventStatus.SENDING); + if (eventToSend.status !== EventStatus.SENDING) { + _updatePendingEventStatus(room, eventToSend, + EventStatus.SENDING); + } return _sendEventHttpRequest(self, eventToSend); }); } @@ -718,7 +721,7 @@ MatrixClient.prototype.joinRoom = function(roomIdOrAlias, opts, callback) { * @return {module:http-api.MatrixError} Rejects: with an error response. */ MatrixClient.prototype.resendEvent = function(event, room) { - _updateLocalEchoStatus(room, event, EventStatus.SENDING); + _updatePendingEventStatus(room, event, EventStatus.SENDING); return _sendEvent(this, room, event); }; @@ -928,11 +931,11 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId, content: content }); localEvent._txnId = txnId; + localEvent.status = EventStatus.SENDING; // add this event immediately to the local store as 'sending'. if (room) { - localEvent.status = EventStatus.SENDING; - room.addEventsToTimeline([localEvent]); + room.addPendingEvent(localEvent, txnId); } if (eventType === "m.room.message" && this.sessionStore && CRYPTO_ENABLED) { @@ -1140,11 +1143,6 @@ function _badEncryptedMessage(event, reason) { } function _sendEvent(client, room, event, callback) { - // cache the local event ID here because if /sync returns before /send then - // event.getId() will return a REAL event ID which we will then incorrectly - // remove! - var localEventId = event.getId(); - var defer = q.defer(); var promise; // this event may be queued @@ -1157,7 +1155,7 @@ function _sendEvent(client, room, event, callback) { if (promise && client.scheduler.getQueueForEvent(event).length > 1) { // event is processed FIFO so if the length is 2 or more we know // this event is stuck behind an earlier event. - _updateLocalEchoStatus(room, event, EventStatus.QUEUED); + _updatePendingEventStatus(room, event, EventStatus.QUEUED); } } @@ -1167,42 +1165,13 @@ function _sendEvent(client, room, event, callback) { promise.done(function(res) { // the request was sent OK if (room) { - var eventId = res.event_id; - - // FIXME: This manipulation of the room should probably be done - // inside the room class, not by the client. - var timeline = room.getTimelineForEvent(eventId); - if (!timeline) { - // we haven't yet received the event from the stream; we - // need to update the fake event with the right event id. - // - // best way to make sure the room timeline structures are updated - // correctly is to remove the event and add it again with the right - // ID. - // - // This will also make us synthesize our own read receipt for the - // sent message. - var oldStatus = event.status; - room.removeEvents([localEventId]); - event.event.event_id = res.event_id; - // TODO: at this point, we're still expecting the remote echo - // to come back and update the server-generated fields for - // us. We should probably set the status to some distinct value - // so that the client app can figure out what is going on. - event.status = null; - room.addEventsToTimeline([event]); - - // FIXME: doing this here is a horrible fudge, but this all - // needs unpicking, which will touch the crypto code. - room.emit("Room.localEchoUpdated", event, room, localEventId, - oldStatus); - } + room.updatePendingEvent(event, EventStatus.SENT, res.event_id); } _resolve(callback, defer, res); }, function(err) { // the request failed to send. - _updateLocalEchoStatus(room, event, EventStatus.NOT_SENT); + _updatePendingEventStatus(room, event, EventStatus.NOT_SENT); _reject(callback, defer, err); }); @@ -1210,9 +1179,9 @@ function _sendEvent(client, room, event, callback) { return defer.promise; } -function _updateLocalEchoStatus(room, event, newStatus) { +function _updatePendingEventStatus(room, event, newStatus) { if (room) { - room.updateLocalEchoStatus(event, newStatus); + room.updatePendingEvent(event, newStatus); } else { event.status = newStatus; } diff --git a/lib/models/event.js b/lib/models/event.js index 23c4147e5a7..de5287dcdab 100644 --- a/lib/models/event.js +++ b/lib/models/event.js @@ -32,7 +32,10 @@ module.exports.EventStatus = { /** The event is in the process of being sent. */ SENDING: "sending", /** The event is in a queue waiting to be sent. */ - QUEUED: "queued" + QUEUED: "queued", + /** The event has been sent to the server, but we have not yet received the + * echo. */ + SENT: "sent", }; /** diff --git a/lib/models/room.js b/lib/models/room.js index fb0790b80c4..dc4a9ed74db 100644 --- a/lib/models/room.js +++ b/lib/models/room.js @@ -369,9 +369,9 @@ Room.prototype.getAvatarUrl = function(baseUrl, width, height, resizeMethod, */ Room.prototype.addTimeline = function() { if (!this._timelineSupport) { - throw Error("timeline support is disabled. Set the 'timelineSupport'" + - " parameter to true when creating MatrixClient to enable" + - " it."); + throw new Error("timeline support is disabled. Set the 'timelineSupport'" + + " parameter to true when creating MatrixClient to enable" + + " it."); } var timeline = new EventTimeline(this.roomId); @@ -549,7 +549,7 @@ Room.prototype.addEventsToTimeline = function(events, toStartOfTimeline, }; /** - * Check for redactions, and otherwise add event to the given timeline. Assumes + * Add event to the given timeline, and emit Room.timeline. Assumes * we have already checked we don't know about this event. * * Will fire "Room.timeline" for each event added. @@ -592,53 +592,6 @@ Room.prototype._addLiveEvents = function(events) { var addLocalEchoToEnd = this._opts.pendingEventOrdering === "end"; for (var i = 0; i < events.length; i++) { - var isLocalEcho = ( - events[i].status === EventStatus.SENDING || - events[i].status === EventStatus.QUEUED - ); - - // FIXME: HORRIBLE ASSUMPTION THAT THIS PROP EXISTS - // Exists due to client.js:815 (MatrixClient.sendEvent) - // We should make txnId a first class citizen. - if (events[i]._txnId) { - // this is the outgoing copy of the event (ie, the local echo). - this._txnToEvent[events[i]._txnId] = events[i]; - } - else if (events[i].getUnsigned().transaction_id) { - // remote echo of an event we sent earlier - var existingEvent = this._txnToEvent[events[i].getUnsigned().transaction_id]; - if (existingEvent) { - var oldEventId = existingEvent.getId(); - var oldStatus = existingEvent.status; - - // no longer pending - delete this._txnToEvent[events[i].getUnsigned().transaction_id]; - - // update the timeline map, because the event id has changed - var existingTimeline = this._eventIdToTimeline[oldEventId]; - if (existingTimeline) { - delete this._eventIdToTimeline[oldEventId]; - this._eventIdToTimeline[events[i].getId()] = existingTimeline; - } - - // replace the event source, but preserve the original content - // and type in case it was encrypted (we won't be able to - // decrypt it, even though we sent it.) - var existingSource = existingEvent.event; - existingEvent.event = events[i].event; - existingEvent.event.content = existingSource.content; - existingEvent.event.type = existingSource.type; - - // successfully sent. - existingEvent.status = null; - - this.emit("Room.localEchoUpdated", existingEvent, this, oldEventId, - oldStatus); - continue; - } - } - - if (events[i].getType() === "m.room.redaction") { var redactId = events[i].event.redacts; @@ -660,21 +613,25 @@ Room.prototype._addLiveEvents = function(events) { // this may be needed to trigger an update. } - var spliceBeforeLocalEcho = !isLocalEcho && addLocalEchoToEnd; + if (events[i].getUnsigned().transaction_id) { + var existingEvent = this._txnToEvent[events[i].getUnsigned().transaction_id]; + if (existingEvent) { + // remote echo of an event we sent earlier + this._handleRemoteEcho(events[i], existingEvent); + continue; + } + } if (!this._eventIdToTimeline[events[i].getId()]) { // TODO: pass through filter to see if this should be added to the timeline. this._addEventToTimeline(events[i], this._liveTimeline, false, - spliceBeforeLocalEcho); + addLocalEchoToEnd); } // synthesize and inject implicit read receipts // Done after adding the event because otherwise the app would get a read receipt // pointing to an event that wasn't yet in the timeline - // - // (we don't do this for local echoes, as they have temporary event - // ids, which don't make much sense as RRs). - if (events[i].sender && !isLocalEcho) { + if (events[i].sender) { this.addReceipt(synthesizeReceipt( events[i].sender.userId, events[i], "m.read" ), true); @@ -684,29 +641,167 @@ Room.prototype._addLiveEvents = function(events) { /** - * Update the status field on a local echo, to reflect its transmission + * Add a pending outgoing event to this room. + * + *

This is an internal method, intended for use by MatrixClient. + * + * @param {module:models/event~MatrixEvent} event The event to add. + * + * @param {string} txnId Transaction id for this outgoing event + * + * @fires module:client~MatrixClient#event:"Room.localEchoUpdated" + * + * @throws if the event doesn't have status SENDING, or we aren't given a + * unique transaction id. + */ +Room.prototype.addPendingEvent = function(event, txnId) { + if (event.status !== EventStatus.SENDING) { + throw new Error("addPendingEvent called on an event with status " + + event.status); + } + + if (this._txnToEvent[txnId]) { + throw new Error("addPendingEvent called on an event with known txnId " + + txnId); + } + + // call setEventMetadata to set up event.sender etc + setEventMetadata( + event, + this._liveTimeline.getState(EventTimeline.FORWARDS), + false + ); + + this._txnToEvent[txnId] = event; + + this._addEventToTimeline(event, this._liveTimeline, false); + + this.emit("Room.localEchoUpdated", event, this, null, null); +}; + +/** + * Deal with the echo of a message we sent. + * + * @param {module:models/event~MatrixEvent} remoteEvent The event received from + * /sync + * @param {module:models/event~MatrixEvent} localEvent The local echo, which + * should already be in the timeline. + * + * @fires module:client~MatrixClient#event:"Room.localEchoUpdated" + * @private + */ +Room.prototype._handleRemoteEcho = function(remoteEvent, localEvent) { + var oldEventId = localEvent.getId(); + var newEventId = remoteEvent.getId(); + var oldStatus = localEvent.status; + + // no longer pending + delete this._txnToEvent[remoteEvent.transaction_id]; + + // replace the event source, but preserve the original content + // and type in case it was encrypted (we won't be able to + // decrypt it, even though we sent it.) + var existingSource = localEvent.event; + localEvent.event = remoteEvent.event; + localEvent.event.content = existingSource.content; + localEvent.event.type = existingSource.type; + + // successfully sent. + localEvent.status = null; + + // Update the timeline map. + var existingTimeline = this._eventIdToTimeline[oldEventId]; + if (existingTimeline) { + delete this._eventIdToTimeline[oldEventId]; + this._eventIdToTimeline[newEventId] = existingTimeline; + } + + this.emit("Room.localEchoUpdated", localEvent, this, + oldEventId, oldStatus); +}; + + +/* a map from current event status to a list of allowed next statuses + */ +var ALLOWED_TRANSITIONS = {}; + +ALLOWED_TRANSITIONS[EventStatus.SENDING] = + [EventStatus.QUEUED, EventStatus.NOT_SENT, EventStatus.SENT]; + +ALLOWED_TRANSITIONS[EventStatus.QUEUED] = + [EventStatus.SENDING]; + +ALLOWED_TRANSITIONS[EventStatus.SENT] = + []; + +ALLOWED_TRANSITIONS[EventStatus.NOT_SENT] = + [EventStatus.SENDING, EventStatus.QUEUED]; + +/** + * Update the status / event id on a pending event, to reflect its transmission * progress. * *

This is an internal method. * * @param {MatrixEvent} event local echo event * @param {EventStatus} newStatus status to assign + * @param {string} newEventId new event id to assign. Ignored unless + * newStatus == EventStatus.SENT. * @fires module:client~MatrixClient#event:"Room.localEchoUpdated" */ -Room.prototype.updateLocalEchoStatus = function(event, newStatus) { - if (!event.status) { - throw new Error("updateLocalEchoStatus called on an event which is " + - "not a local echo."); - } +Room.prototype.updatePendingEvent = function(event, newStatus, newEventId) { if (!this.getTimelineForEvent(event.getId())) { throw new Error("updateLocalEchoStatus called on an unknown event."); } + // if the message was sent, we expect an event id + if (newStatus == EventStatus.SENT && !newEventId) { + throw new Error("updatePendingEvent called with status=SENT, " + + "but no new event id"); + } + + // SENT races against /sync, so we have to special-case it. + if (newStatus == EventStatus.SENT) { + var timeline = this._eventIdToTimeline[newEventId]; + if (timeline) { + // we've already received the event via the event stream. + // nothing more to do here. + return; + } + } + var oldStatus = event.status; + var oldEventId = event.getId(); + + if (!oldStatus) { + throw new Error("updatePendingEventStatus called on an event which is " + + "not a local echo."); + } + + var allowed = ALLOWED_TRANSITIONS[oldStatus]; + if (!allowed || allowed.indexOf(newStatus) < 0) { + throw new Error("Invalid EventStatus transition " + oldStatus + "->" + + newStatus); + } + event.status = newStatus; + + if (newStatus == EventStatus.SENT) { + // update the event id + event.event.event_id = newEventId; + + // Update the timeline map + var existingTimeline = this._eventIdToTimeline[oldEventId]; + if (existingTimeline) { + delete this._eventIdToTimeline[oldEventId]; + this._eventIdToTimeline[newEventId] = existingTimeline; + } + } + this.emit("Room.localEchoUpdated", event, this, event.getId(), oldStatus); }; + /** * Add some events to this room. This can include state events, message * events and typing notifications. These events are treated as "live" so @@ -1382,14 +1477,13 @@ module.exports = Room; * *

Once the /send request completes, if the remote echo has not already * arrived, the event is updated with a new event id and the status is set to - * null. The server-generated fields are of course not updated yet. + * 'SENT'. The server-generated fields are of course not updated yet. * *

Finally, the /send might fail. In this case, the event's status is set to * 'NOT_SENT'. If it is later resent, the process starts again, setting the * status to 'SENDING'. * - *

This event is raised to reflect each of the transitions above (except the - * first send attempt). + *

This event is raised to reflect each of the transitions above. * * @event module:client~MatrixClient#"Room.localEchoUpdated" * diff --git a/spec/unit/room.spec.js b/spec/unit/room.spec.js index 1e3a6919b83..e7a5848fa98 100644 --- a/spec/unit/room.spec.js +++ b/spec/unit/room.spec.js @@ -344,22 +344,37 @@ describe("Room", function() { var remoteEventId = remoteEvent.getId(); var callCount = 0; - room.on("Room.localEchoUpdated", function(event, emitRoom, oldEventId) { - callCount += 1; - expect(event.getId()).toEqual(remoteEventId); - expect(emitRoom).toEqual(room); - expect(oldEventId).toEqual(localEventId); - }); + room.on("Room.localEchoUpdated", + function(event, emitRoom, oldEventId, oldStatus) { + switch (callCount) { + case 0: + expect(event.getId()).toEqual(localEventId); + expect(event.status).toEqual(EventStatus.SENDING); + expect(emitRoom).toEqual(room); + expect(oldEventId).toBe(null); + expect(oldStatus).toBe(null); + break; + case 1: + expect(event.getId()).toEqual(remoteEventId); + expect(event.status).toBe(null); + expect(emitRoom).toEqual(room); + expect(oldEventId).toEqual(localEventId); + expect(oldStatus).toBe(EventStatus.SENDING); + break; + } + callCount += 1; + } + ); - // first add the local echo to the timeline - room.addEventsToTimeline([localEvent]); + // first add the local echo + room.addPendingEvent(localEvent, "TXN_ID"); expect(room.timeline.length).toEqual(1); // then the remoteEvent room.addEventsToTimeline([remoteEvent]); expect(room.timeline.length).toEqual(1); - expect(callCount).toEqual(1); + expect(callCount).toEqual(2); }); });