From a2decbdd664508ff84bdbe5a6c4d5a4b7b7f73b2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 May 2016 13:31:22 +0100 Subject: [PATCH 01/37] Only load the last N joined room --- synapse/handlers/sync.py | 40 +++++++++++++++++++++++++++++---------- synapse/storage/stream.py | 20 ++++++++++++++++++++ 2 files changed, 50 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 921215469fe8..4c5b935012d0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -258,19 +258,39 @@ def full_state_sync(self, sync_config, timeline_since_token): user_id = sync_config.user.to_string() + room_to_last_ts = {} + @defer.inlineCallbacks - def _generate_room_entry(event): + def _get_last_ts(event): + room_id = event.room_id if event.membership == Membership.JOIN: - room_result = yield self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, + ts = yield self.store.get_last_ts_for_room( + room_id, now_token.room_key ) - joined.append(room_result) + room_to_last_ts[room_id] = ts if ts else 0 + + logger.info("room_to_last_ts: %r", room_to_last_ts) + yield concurrently_execute(_get_last_ts, room_list, 10) + + joined_rooms_list = frozenset([ + room_id for room_id, _f in + sorted(room_to_last_ts.items(), key=lambda item: -item[1]) + ][:20]) + + @defer.inlineCallbacks + def _generate_room_entry(event): + if event.membership == Membership.JOIN: + if event.room_id in joined_rooms_list: + room_result = yield self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) + joined.append(room_result) elif event.membership == Membership.INVITE: if event.sender in ignored_users: return diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 95b12559a69f..60a8384fae99 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -529,6 +529,26 @@ def _set_before_and_after(events, rows, topo_order=True): int(stream), ) + def get_last_ts_for_room(self, room_id, token): + stream_ordering = RoomStreamToken.parse_stream_token(token).stream + + sql = ( + "SELECT origin_server_ts FROM events" + " WHERE room_id = ? AND stream_ordering <= ?" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT 1" + ) + + def f(txn): + txn.execute(sql, (room_id, stream_ordering)) + rows = txn.fetchall() + if rows: + return rows[0][0] + else: + return None + + return self.runInteraction("get_last_ts_for_room", f) + @defer.inlineCallbacks def get_events_around(self, room_id, event_id, before_limit, after_limit): """Retrieve events and pagination tokens around a given event in a From 32d476d4f1fd2a99c8ccbffe687293a26d5519d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 May 2016 16:59:18 +0100 Subject: [PATCH 02/37] Change token format --- synapse/handlers/sync.py | 87 ++++++++++++++++++---------- synapse/rest/client/v2_alpha/sync.py | 8 +-- synapse/types.py | 27 +++++++++ 3 files changed, 86 insertions(+), 36 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4c5b935012d0..33c05950c513 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -24,6 +24,8 @@ from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client +from synapse.types import SyncNextBatchToken + from twisted.internet import defer import collections @@ -141,7 +143,7 @@ def __init__(self, hs): self.clock = hs.get_clock() self.response_cache = ResponseCache() - def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, + def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then @@ -154,53 +156,68 @@ def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, result = self.response_cache.set( sync_config.request_key, self._wait_for_sync_for_user( - sync_config, since_token, timeout, full_state + sync_config, batch_token, timeout, full_state ) ) return result @defer.inlineCallbacks - def _wait_for_sync_for_user(self, sync_config, since_token, timeout, + def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, full_state): context = LoggingContext.current_context() if context: - if since_token is None: + if batch_token is None: context.tag = "initial_sync" elif full_state: context.tag = "full_state_sync" else: context.tag = "incremental_sync" - if timeout == 0 or since_token is None or full_state: + if timeout == 0 or batch_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. result = yield self.current_sync_for_user( - sync_config, since_token, full_state=full_state, + sync_config, batch_token, full_state=full_state, ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): - return self.current_sync_for_user(sync_config, since_token) + return self.current_sync_for_user(sync_config, batch_token) result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, - from_token=since_token, + from_token=batch_token.stream_token, ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, since_token=None, + def current_sync_for_user(self, sync_config, batch_token=None, full_state=False): """Get the sync for client needed to match what the server has now. Returns: A Deferred SyncResult. """ - if since_token is None or full_state: - return self.full_state_sync(sync_config, since_token) + if batch_token is None or full_state: + return self.full_state_sync(sync_config, batch_token) else: - return self.incremental_sync_with_gap(sync_config, since_token) + return self.incremental_sync_with_gap(sync_config, batch_token) @defer.inlineCallbacks - def full_state_sync(self, sync_config, timeline_since_token): + def _get_room_timestamps_at_token(self, room_ids, token): + room_to_last_ts = {} + + @defer.inlineCallbacks + def _get_last_ts(room_id): + ts = yield self.store.get_last_ts_for_room( + room_id, token.room_key + ) + room_to_last_ts[room_id] = ts if ts else 0 + + logger.info("room_to_last_ts: %r", room_to_last_ts) + yield concurrently_execute(_get_last_ts, room_ids, 10) + defer.returnValue(room_to_last_ts) + + @defer.inlineCallbacks + def full_state_sync(self, sync_config, batch_token): """Get a sync for a client which is starting without any state. If a 'message_since_token' is given, only timeline events which have @@ -209,6 +226,11 @@ def full_state_sync(self, sync_config, timeline_since_token): Returns: A Deferred SyncResult. """ + if batch_token: + timeline_since_token = batch_token.stream_token + else: + timeline_since_token = None + now_token = yield self.event_sources.get_current_token() now_token, ephemeral_by_room = yield self.ephemeral_by_room( @@ -258,24 +280,22 @@ def full_state_sync(self, sync_config, timeline_since_token): user_id = sync_config.user.to_string() - room_to_last_ts = {} - - @defer.inlineCallbacks - def _get_last_ts(event): - room_id = event.room_id - if event.membership == Membership.JOIN: - ts = yield self.store.get_last_ts_for_room( - room_id, now_token.room_key - ) - room_to_last_ts[room_id] = ts if ts else 0 + pagination_limit = 20 + room_pagination_config = { + "l": pagination_limit, + "o": 0, + "t": now_token, + } - logger.info("room_to_last_ts: %r", room_to_last_ts) - yield concurrently_execute(_get_last_ts, room_list, 10) + room_to_last_ts = yield self._get_room_timestamps_at_token( + room_ids=[e.room_id for e in room_list if e.membership == Membership.JOIN], + token=now_token, + ) joined_rooms_list = frozenset([ room_id for room_id, _f in sorted(room_to_last_ts.items(), key=lambda item: -item[1]) - ][:20]) + ][:pagination_limit]) @defer.inlineCallbacks def _generate_room_entry(event): @@ -326,9 +346,7 @@ def _generate_room_entry(event): self.account_data_for_user(account_data) ) - presence = sync_config.filter_collection.filter_presence( - presence - ) + presence = sync_config.filter_collection.filter_presence(presence) defer.returnValue(SyncResult( presence=presence, @@ -336,7 +354,10 @@ def _generate_room_entry(event): joined=joined, invited=invited, archived=archived, - next_batch=now_token, + next_batch=SyncNextBatchToken( + stream_token=now_token, + pagination_config=room_pagination_config, + ), )) @defer.inlineCallbacks @@ -480,12 +501,14 @@ def full_state_sync_for_archived_room(self, room_id, sync_config, ) @defer.inlineCallbacks - def incremental_sync_with_gap(self, sync_config, since_token): + def incremental_sync_with_gap(self, sync_config, batch_token): """ Get the incremental delta needed to bring the client up to date with the server. Returns: A Deferred SyncResult. """ + since_token = batch_token.stream_token + now_token = yield self.event_sources.get_current_token() rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) @@ -693,7 +716,7 @@ def incremental_sync_with_gap(self, sync_config, since_token): joined=joined, invited=invited, archived=archived, - next_batch=now_token, + next_batch=batch_token.replace(stream_token=now_token), )) @defer.inlineCallbacks diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 60d3dc403020..37fd1539f687 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -19,7 +19,7 @@ RestServlet, parse_string, parse_integer, parse_boolean ) from synapse.handlers.sync import SyncConfig -from synapse.types import StreamToken +from synapse.types import SyncNextBatchToken from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_room_id, ) @@ -140,9 +140,9 @@ def on_GET(self, request): ) if since is not None: - since_token = StreamToken.from_string(since) + batch_token = SyncNextBatchToken.from_string(since) else: - since_token = None + batch_token = None affect_presence = set_presence != PresenceState.OFFLINE @@ -154,7 +154,7 @@ def on_GET(self, request): ) with context: sync_result = yield self.sync_handler.wait_for_sync_for_user( - sync_config, since_token=since_token, timeout=timeout, + sync_config, batch_token=batch_token, timeout=timeout, full_state=full_state ) diff --git a/synapse/types.py b/synapse/types.py index 42fd9c7204f3..b53d91747b83 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -17,6 +17,9 @@ from collections import namedtuple +from unpaddedbase64 import encode_base64, decode_base64 +import ujson as json + Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) @@ -112,6 +115,30 @@ class EventID(DomainSpecificString): SIGIL = "$" +class SyncNextBatchToken( + namedtuple("SyncNextBatchToken", ( + "stream_token", + "pagination_config", + )) +): + @classmethod + def from_string(cls, string): + try: + d = json.loads(decode_base64(string)) + return cls(StreamToken.from_string(d["t"]), d.get("pa", {})) + except: + raise SynapseError(400, "Invalid Token") + + def to_string(self): + return encode_base64(json.dumps({ + "t": self.stream_token.to_string(), + "pa": self.pagination_config, + })) + + def replace(self, **kwargs): + return self._replace(**kwargs) + + class StreamToken( namedtuple("Token", ( "room_key", From 64df83606745f8f8567a0db5920cd27cc744b098 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 May 2016 14:23:13 +0100 Subject: [PATCH 03/37] Correctly figure out which rooms we've sent down --- synapse/handlers/sync.py | 53 ++++++++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 33c05950c513..a1a818e264ff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -284,18 +284,29 @@ def full_state_sync(self, sync_config, batch_token): room_pagination_config = { "l": pagination_limit, "o": 0, - "t": now_token, + "t": now_token.to_string(), } room_to_last_ts = yield self._get_room_timestamps_at_token( - room_ids=[e.room_id for e in room_list if e.membership == Membership.JOIN], + room_ids=[ + e.room_id for e in room_list if e.membership == Membership.JOIN + ], token=now_token, ) - joined_rooms_list = frozenset([ - room_id for room_id, _f in - sorted(room_to_last_ts.items(), key=lambda item: -item[1]) - ][:pagination_limit]) + if room_to_last_ts: + sorted_list = sorted( + room_to_last_ts.items(), + key=lambda item: -item[1] + )[:pagination_limit] + + _, bottom_ts = sorted_list[-1] + room_pagination_config["ts"] = bottom_ts + + joined_rooms_list = frozenset(room_id for room_id, _f in sorted_list) + else: + room_pagination_config = {} + joined_rooms_list = frozenset() @defer.inlineCallbacks def _generate_room_entry(event): @@ -500,6 +511,18 @@ def full_state_sync_for_archived_room(self, room_id, sync_config, account_data_by_room, full_state=True, leave_token=leave_token, ) + @defer.inlineCallbacks + def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts): + start_ts = yield self._get_room_timestamps_at_token(room_ids, since_token) + + missing_list = frozenset( + room_id for room_id, ts in + sorted(start_ts.items(), key=lambda item: -item[1]) + if ts < pa_ts + ) + + defer.returnValue(missing_list) + @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, batch_token): """ Get the incremental delta needed to bring the client up to @@ -508,6 +531,7 @@ def incremental_sync_with_gap(self, sync_config, batch_token): A Deferred SyncResult. """ since_token = batch_token.stream_token + room_pagination_config = batch_token.pagination_config now_token = yield self.event_sources.get_current_token() @@ -638,18 +662,30 @@ def incremental_sync_with_gap(self, sync_config, batch_token): limit=timeline_limit + 1, ) + p_room_token = room_pagination_config.get("t", None) + if p_room_token: + needing_full_state = yield self._get_rooms_that_need_full_state( + joined_room_ids, + since_token, + room_pagination_config.get("ts", 0), + ) + else: + needing_full_state = set() + joined = [] # We loop through all room ids, even if there are no new events, in case # there are non room events taht we need to notify about. for room_id in joined_room_ids: room_entry = room_to_events.get(room_id, None) + need_full_state = room_id in needing_full_state + if room_entry: events, start_key = room_entry prev_batch_token = now_token.copy_and_replace("room_key", start_key) - newly_joined_room = room_id in newly_joined_rooms + newly_joined_room = (room_id in newly_joined_rooms) or need_full_state full_state = newly_joined_room batch = yield self.load_filtered_recents( @@ -659,6 +695,9 @@ def incremental_sync_with_gap(self, sync_config, batch_token): newly_joined_room=newly_joined_room, ) else: + if need_full_state: + continue + batch = TimelineBatch( events=[], prev_batch=since_token, From d1e9655f759bf41a96e46ae7ebf0d83e6e60a8c1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 May 2016 15:37:48 +0100 Subject: [PATCH 04/37] Call get_last_ts less --- synapse/handlers/sync.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a1a818e264ff..497744b9af26 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -662,29 +662,28 @@ def incremental_sync_with_gap(self, sync_config, batch_token): limit=timeline_limit + 1, ) - p_room_token = room_pagination_config.get("t", None) - if p_room_token: - needing_full_state = yield self._get_rooms_that_need_full_state( - joined_room_ids, - since_token, - room_pagination_config.get("ts", 0), - ) - else: - needing_full_state = set() - joined = [] # We loop through all room ids, even if there are no new events, in case # there are non room events taht we need to notify about. for room_id in joined_room_ids: room_entry = room_to_events.get(room_id, None) - need_full_state = room_id in needing_full_state - if room_entry: events, start_key = room_entry prev_batch_token = now_token.copy_and_replace("room_key", start_key) + p_room_token = room_pagination_config.get("t", None) + if p_room_token: + needing_full_state = yield self._get_rooms_that_need_full_state( + [room_id], + since_token, + room_pagination_config.get("ts", 0), + ) + need_full_state = room_id in needing_full_state + else: + need_full_state = False + newly_joined_room = (room_id in newly_joined_rooms) or need_full_state full_state = newly_joined_room @@ -695,9 +694,6 @@ def incremental_sync_with_gap(self, sync_config, batch_token): newly_joined_room=newly_joined_room, ) else: - if need_full_state: - continue - batch = TimelineBatch( events=[], prev_batch=since_token, @@ -717,6 +713,16 @@ def incremental_sync_with_gap(self, sync_config, batch_token): full_state=full_state, ) if room_sync: + if not room_sync.timeline: + p_room_token = room_pagination_config.get("t", None) + if p_room_token: + needing_full_state = yield self._get_rooms_that_need_full_state( + [room_id], + since_token, + room_pagination_config.get("ts", 0), + ) + if room_id in needing_full_state: + continue joined.append(room_sync) # For each newly joined room, we want to send down presence of From b999adcaa21f1d51b87e2198d9f5f4c98d027f90 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 May 2016 11:28:26 +0100 Subject: [PATCH 05/37] Filter before ordering --- synapse/handlers/sync.py | 74 ++++++++++++++++++++++++++++++++++----- synapse/storage/stream.py | 6 ++-- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 497744b9af26..426a93b8dde9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -202,19 +202,64 @@ def current_sync_for_user(self, sync_config, batch_token=None, return self.incremental_sync_with_gap(sync_config, batch_token) @defer.inlineCallbacks - def _get_room_timestamps_at_token(self, room_ids, token): - room_to_last_ts = {} + def _get_room_timestamps_at_token(self, room_ids, token, sync_config, + limit): + room_to_entries = {} @defer.inlineCallbacks def _get_last_ts(room_id): - ts = yield self.store.get_last_ts_for_room( + entry = yield self.store.get_last_ts_for_room( room_id, token.room_key ) - room_to_last_ts[room_id] = ts if ts else 0 - logger.info("room_to_last_ts: %r", room_to_last_ts) + # TODO: Is this ever possible? + room_to_entries[room_id] = entry if entry else { + "origin_server_ts": 0, + } + yield concurrently_execute(_get_last_ts, room_ids, 10) - defer.returnValue(room_to_last_ts) + + if len(room_to_entries) <= limit: + defer.returnValue({ + room_id: entry["origin_server_ts"] + for room_id, entry in room_to_entries.items() + }) + + queued_events = sorted( + room_to_entries.items(), + key=lambda e: e[1]["origin_server_ts"] + ) + + to_return = {} + + while len(to_return) < limit and len(queued_events) > 0: + to_fetch = queued_events[:limit - len(to_return)] + event_to_q = { + e["event_id"]: (room_id, e) for room_id, e in to_fetch.items() + if "event_id" in e + } + + # Now we fetch each event to check if its been filtered out + event_map = yield self.store.get_events(event_to_q.keys()) + + recents = sync_config.filter_collection.filter_room_timeline( + event_map.values() + ) + recents = yield filter_events_for_client( + self.store, + sync_config.user.to_string(), + recents, + ) + + to_return.update({r.room_id: r.origin_server_ts for r in recents}) + + for ev_id in set(event_map.keys()) - set(r.event_id for r in recents): + queued_events.append(event_to_q[ev_id]) + + # FIXME: Need to refetch TS + queued_events.sort(key=lambda e: e[1]["origin_server_ts"]) + + defer.returnValue(to_return) @defer.inlineCallbacks def full_state_sync(self, sync_config, batch_token): @@ -292,6 +337,8 @@ def full_state_sync(self, sync_config, batch_token): e.room_id for e in room_list if e.membership == Membership.JOIN ], token=now_token, + sync_config=sync_config, + limit=pagination_limit, ) if room_to_last_ts: @@ -512,8 +559,13 @@ def full_state_sync_for_archived_room(self, room_id, sync_config, ) @defer.inlineCallbacks - def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts): - start_ts = yield self._get_room_timestamps_at_token(room_ids, since_token) + def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts, + sync_config, pagination_limit): + start_ts = yield self._get_room_timestamps_at_token( + room_ids, since_token, + sync_config=sync_config, + limit=pagination_limit, + ) missing_list = frozenset( room_id for room_id, ts in @@ -675,10 +727,13 @@ def incremental_sync_with_gap(self, sync_config, batch_token): p_room_token = room_pagination_config.get("t", None) if p_room_token: + pa_limit = room_pagination_config["l"] needing_full_state = yield self._get_rooms_that_need_full_state( [room_id], since_token, room_pagination_config.get("ts", 0), + sync_config=sync_config, + pagination_limit=pa_limit, ) need_full_state = room_id in needing_full_state else: @@ -716,10 +771,13 @@ def incremental_sync_with_gap(self, sync_config, batch_token): if not room_sync.timeline: p_room_token = room_pagination_config.get("t", None) if p_room_token: + pa_limit = room_pagination_config["l"] needing_full_state = yield self._get_rooms_that_need_full_state( [room_id], since_token, room_pagination_config.get("ts", 0), + sync_config=sync_config, + pagination_limit=pa_limit, ) if room_id in needing_full_state: continue diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 60a8384fae99..51f8b9e01720 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -533,7 +533,7 @@ def get_last_ts_for_room(self, room_id, token): stream_ordering = RoomStreamToken.parse_stream_token(token).stream sql = ( - "SELECT origin_server_ts FROM events" + "SELECT event_id, origin_server_ts FROM events" " WHERE room_id = ? AND stream_ordering <= ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT 1" @@ -541,9 +541,9 @@ def get_last_ts_for_room(self, room_id, token): def f(txn): txn.execute(sql, (room_id, stream_ordering)) - rows = txn.fetchall() + rows = self.cursor_to_dict(txn) if rows: - return rows[0][0] + return rows[0] else: return None From 39182c3594fc044c8ebe7b860d605349dbb9c667 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 May 2016 11:30:01 +0100 Subject: [PATCH 06/37] Typo --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 426a93b8dde9..fc432e2a52f8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -235,7 +235,7 @@ def _get_last_ts(room_id): while len(to_return) < limit and len(queued_events) > 0: to_fetch = queued_events[:limit - len(to_return)] event_to_q = { - e["event_id"]: (room_id, e) for room_id, e in to_fetch.items() + e["event_id"]: (room_id, e) for room_id, e in to_fetch if "event_id" in e } From 573e51cc0b51c62c745c1172a5e4fa54139d1858 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 May 2016 11:33:26 +0100 Subject: [PATCH 07/37] Correctly order recents --- synapse/handlers/sync.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index fc432e2a52f8..7aacc8f9195d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -227,7 +227,7 @@ def _get_last_ts(room_id): queued_events = sorted( room_to_entries.items(), - key=lambda e: e[1]["origin_server_ts"] + key=lambda e: -e[1]["origin_server_ts"] ) to_return = {} @@ -257,7 +257,7 @@ def _get_last_ts(room_id): queued_events.append(event_to_q[ev_id]) # FIXME: Need to refetch TS - queued_events.sort(key=lambda e: e[1]["origin_server_ts"]) + queued_events.sort(key=lambda e: -e[1]["origin_server_ts"]) defer.returnValue(to_return) @@ -325,7 +325,7 @@ def full_state_sync(self, sync_config, batch_token): user_id = sync_config.user.to_string() - pagination_limit = 20 + pagination_limit = 10 room_pagination_config = { "l": pagination_limit, "o": 0, From 99a7205093e82214f6ace5e4fc8acde1d7aeacf5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 May 2016 11:11:42 +0100 Subject: [PATCH 08/37] Change name --- synapse/handlers/sync.py | 2 +- synapse/storage/stream.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a32f48135e46..394fe8154f8a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -208,7 +208,7 @@ def _get_room_timestamps_at_token(self, room_ids, token, sync_config, @defer.inlineCallbacks def _get_last_ts(room_id): - entry = yield self.store.get_last_ts_for_room( + entry = yield self.store.get_last_event_id_ts_for_room( room_id, token.room_key ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 51f8b9e01720..a85cdac038c9 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -529,7 +529,7 @@ def _set_before_and_after(events, rows, topo_order=True): int(stream), ) - def get_last_ts_for_room(self, room_id, token): + def get_last_event_id_ts_for_room(self, room_id, token): stream_ordering = RoomStreamToken.parse_stream_token(token).stream sql = ( From 38d90e0d7db4372733016c8a121ad0843b38df6b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 May 2016 11:11:53 +0100 Subject: [PATCH 09/37] Add POST /sync API endpoint --- synapse/rest/client/v2_alpha/sync.py | 95 +++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 651ada6a507b..e1ece5d406c1 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -16,7 +16,8 @@ from twisted.internet import defer from synapse.http.servlet import ( - RestServlet, parse_string, parse_integer, parse_boolean + RestServlet, parse_string, parse_integer, parse_boolean, + parse_json_object_from_request, ) from synapse.handlers.sync import SyncConfig from synapse.types import SyncNextBatchToken @@ -84,6 +85,82 @@ def __init__(self, hs): self.filtering = hs.get_filtering() self.presence_handler = hs.get_presence_handler() + @defer.inlineCallbacks + def on_POST(self, request): + requester = yield self.auth.get_user_by_req( + request, allow_guest=True + ) + user = requester.user + + body = parse_json_object_from_request(request) + + timeout = body.get("timeout", 0) + + since = body.get("since", None) + + if "from" in body: + # /events used to use 'from', but /sync uses 'since'. + # Lets be helpful and whine if we see a 'from'. + raise SynapseError( + 400, "'from' is not a valid parameter. Did you mean 'since'?" + ) + + set_presence = body.get("set_presence", "online") + if set_presence not in self.ALLOWED_PRESENCE: + message = "Parameter 'set_presence' must be one of [%s]" % ( + ", ".join(repr(v) for v in self.ALLOWED_PRESENCE) + ) + raise SynapseError(400, message) + + full_state = body.get("full_state", False) + + filter_id = body.get("filter_id", None) + filter_dict = body.get("filter", None) + + if filter_dict is not None and filter_id is not None: + raise SynapseError( + 400, + "Can only specify one of `filter` and `filter_id` paramters" + ) + + if filter_id: + filter_collection = yield self.filtering.get_user_filter( + user.localpart, filter_id + ) + filter_key = filter_id + elif filter_dict: + self.filtering.check_valid_filter(filter_dict) + filter_collection = FilterCollection(filter_dict) + filter_key = json.dumps(filter_dict) + else: + filter_collection = DEFAULT_FILTER_COLLECTION + filter_key = None + + request_key = (user, timeout, since, filter_key, full_state) + + sync_config = SyncConfig( + user=user, + filter_collection=filter_collection, + is_guest=requester.is_guest, + request_key=request_key, + ) + + if since is not None: + batch_token = SyncNextBatchToken.from_string(since) + else: + batch_token = None + + sync_result = yield self._handle_sync( + requester=requester, + sync_config=sync_config, + batch_token=batch_token, + set_presence=set_presence, + full_state=full_state, + timeout=timeout, + ) + + defer.returnValue(sync_result) + @defer.inlineCallbacks def on_GET(self, request): if "from" in request.args: @@ -143,8 +220,24 @@ def on_GET(self, request): else: batch_token = None + sync_result = yield self._handle_sync( + requester=requester, + sync_config=sync_config, + batch_token=batch_token, + set_presence=set_presence, + full_state=full_state, + timeout=timeout, + ) + + defer.returnValue(sync_result) + + @defer.inlineCallbacks + def _handle_sync(self, requester, sync_config, batch_token, set_presence, + full_state, timeout): affect_presence = set_presence != PresenceState.OFFLINE + user = sync_config.user + if affect_presence: yield self.presence_handler.set_state(user, {"presence": set_presence}) From 26c7f08465a23c28fafbef4bf45d249b8404a300 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 May 2016 17:10:23 +0100 Subject: [PATCH 10/37] Implement basic pagination --- synapse/handlers/sync.py | 108 +++++++++++++++++++++++++-- synapse/rest/client/v2_alpha/sync.py | 8 +- synapse/storage/stream.py | 2 +- synapse/types.py | 16 ++-- 4 files changed, 118 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4ca9ff4dbc3e..a880845605aa 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,7 +20,7 @@ from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client -from synapse.types import SyncNextBatchToken +from synapse.types import SyncNextBatchToken, SyncPaginationState from twisted.internet import defer @@ -36,9 +36,19 @@ "filter_collection", "is_guest", "request_key", + "pagination_config", ]) +SyncPaginationConfig = collections.namedtuple("SyncPaginationConfig", [ + "order", + "limit", +]) + +SYNC_PAGINATION_ORDER_TS = "o" +SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,) + + class TimelineBatch(collections.namedtuple("TimelineBatch", [ "prev_batch", "events", @@ -537,7 +547,7 @@ def generate_sync_result(self, sync_config, batch_token=None, full_state=False): archived=sync_result_builder.archived, next_batch=SyncNextBatchToken( stream_token=sync_result_builder.now_token, - pagination_config=batch_token.pagination_config if batch_token else None, + pagination_state=sync_result_builder.pagination_state, ) )) @@ -661,6 +671,7 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro `(newly_joined_rooms, newly_joined_users)` """ user_id = sync_result_builder.sync_config.user.to_string() + sync_config = sync_result_builder.sync_config now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_result_builder.sync_config, @@ -692,6 +703,59 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro tags_by_room = yield self.store.get_tags_for_user(user_id) + if sync_config.pagination_config: + pagination_config = sync_config.pagination_config + elif sync_result_builder.pagination_state: + pagination_config = SyncPaginationConfig( + order=sync_result_builder.pagination_state.order, + limit=sync_result_builder.pagination_state.limit, + ) + else: + pagination_config = None + + if sync_result_builder.pagination_state: + missing_state = yield self._get_rooms_that_need_full_state( + room_ids=[r.room_id for r in room_entries], + sync_config=sync_config, + since_token=sync_result_builder.since_token, + pagination_state=sync_result_builder.pagination_state, + ) + + if missing_state: + for r in room_entries: + if r.room_id in missing_state: + r.full_state = True + + new_pagination_state = None + if pagination_config: + room_ids = [r.room_id for r in room_entries] + pagination_limit = pagination_config.limit + + room_map = yield self._get_room_timestamps_at_token( + room_ids, sync_result_builder.now_token, sync_config, pagination_limit + ) + + if room_map: + sorted_list = sorted( + room_map.items(), + key=lambda item: -item[1] + )[:pagination_limit] + + _, bottom_ts = sorted_list[-1] + value = bottom_ts + + new_pagination_state = SyncPaginationState( + order=pagination_config.order, value=value, limit=pagination_limit, + ) + else: + new_pagination_state = None + + room_entries = [r for r in room_entries if r.room_id in room_map] + + sync_result_builder.pagination_state = new_pagination_state + + sync_result_builder.full_state |= sync_result_builder.since_token is None + def handle_room_entries(room_entry): return self._generate_room_entry( sync_result_builder, @@ -948,6 +1012,7 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, even if empty. """ newly_joined = room_builder.newly_joined + always_include = always_include or newly_joined or sync_result_builder.full_state full_state = ( room_builder.full_state or newly_joined @@ -956,7 +1021,7 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, events = room_builder.events # We want to shortcut out as early as possible. - if not (always_include or account_data or ephemeral or full_state): + if not (always_include or account_data or ephemeral): if events == [] and tags is None: return @@ -995,7 +1060,7 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) - if not (always_include or batch or account_data or ephemeral or full_state): + if not (always_include or batch or account_data or ephemeral): return state = yield self.compute_state_delta( @@ -1037,13 +1102,12 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, raise Exception("Unrecognized rtype: %r", room_builder.rtype) @defer.inlineCallbacks - def _get_room_timestamps_at_token(self, room_ids, token, sync_config, - limit): + def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit): room_to_entries = {} @defer.inlineCallbacks def _get_last_ts(room_id): - entry = yield self.store.get_last_ts_for_room( + entry = yield self.store.get_last_event_id_ts_for_room( room_id, token.room_key ) @@ -1096,6 +1160,23 @@ def _get_last_ts(room_id): defer.returnValue(to_return) + @defer.inlineCallbacks + def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token, + pagination_state): + start_ts = yield self._get_room_timestamps_at_token( + room_ids, since_token, + sync_config=sync_config, + limit=pagination_state.limit, + ) + + missing_list = frozenset( + room_id for room_id, ts in + sorted(start_ts.items(), key=lambda item: -item[1]) + if ts < pagination_state.value + ) + + defer.returnValue(missing_list) + def _action_has_highlight(actions): for action in actions: @@ -1147,6 +1228,12 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): class SyncResultBuilder(object): "Used to help build up a new SyncResult for a user" + + __slots__ = ( + "sync_config", "full_state", "batch_token", "since_token", "pagination_state", + "now_token", "presence", "account_data", "joined", "invited", "archived", + ) + def __init__(self, sync_config, full_state, batch_token, now_token): """ Args: @@ -1159,6 +1246,7 @@ def __init__(self, sync_config, full_state, batch_token, now_token): self.full_state = full_state self.batch_token = batch_token self.since_token = batch_token.stream_token if batch_token else None + self.pagination_state = batch_token.pagination_state if batch_token else None self.now_token = now_token self.presence = [] @@ -1172,6 +1260,12 @@ class RoomSyncResultBuilder(object): """Stores information needed to create either a `JoinedSyncResult` or `ArchivedSyncResult`. """ + + __slots__ = ( + "room_id", "rtype", "events", "newly_joined", "full_state", "since_token", + "upto_token", + ) + def __init__(self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token): """ diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index e1ece5d406c1..3df9743132e6 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -19,7 +19,7 @@ RestServlet, parse_string, parse_integer, parse_boolean, parse_json_object_from_request, ) -from synapse.handlers.sync import SyncConfig +from synapse.handlers.sync import SyncConfig, SyncPaginationConfig from synapse.types import SyncNextBatchToken from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_room_id, @@ -116,6 +116,7 @@ def on_POST(self, request): filter_id = body.get("filter_id", None) filter_dict = body.get("filter", None) + pagination_config = body.get("pagination_config", None) if filter_dict is not None and filter_id is not None: raise SynapseError( @@ -143,6 +144,10 @@ def on_POST(self, request): filter_collection=filter_collection, is_guest=requester.is_guest, request_key=request_key, + pagination_config=SyncPaginationConfig( + order=pagination_config["order"], + limit=pagination_config["limit"], + ) if pagination_config else None, ) if since is not None: @@ -213,6 +218,7 @@ def on_GET(self, request): filter_collection=filter, is_guest=requester.is_guest, request_key=request_key, + pagination_config=None, ) if since is not None: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a85cdac038c9..ab991e877dba 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -547,7 +547,7 @@ def f(txn): else: return None - return self.runInteraction("get_last_ts_for_room", f) + return self.runInteraction("get_last_event_id_ts_for_room", f) @defer.inlineCallbacks def get_events_around(self, room_id, event_id, before_limit, after_limit): diff --git a/synapse/types.py b/synapse/types.py index 0c9efdfd00aa..fd7c0ffe7a5e 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -118,7 +118,7 @@ class EventID(DomainSpecificString): class SyncNextBatchToken( namedtuple("SyncNextBatchToken", ( "stream_token", - "pagination_config", + "pagination_state", )) ): @classmethod @@ -127,10 +127,10 @@ def from_string(cls, string): d = json.loads(decode_base64(string)) pa = d.get("pa", None) if pa: - pa = SyncPaginationConfig.from_dict(pa) + pa = SyncPaginationState.from_dict(pa) return cls( stream_token=StreamToken.from_string(d["t"]), - pagination_config=pa, + pagination_state=pa, ) except: raise SynapseError(400, "Invalid Token") @@ -138,23 +138,24 @@ def from_string(cls, string): def to_string(self): return encode_base64(json.dumps({ "t": self.stream_token.to_string(), - "pa": self.pagination_config.to_dict() if self.pagination_config else None, + "pa": self.pagination_state.to_dict() if self.pagination_state else None, })) def replace(self, **kwargs): return self._replace(**kwargs) -class SyncPaginationConfig( - namedtuple("SyncPaginationConfig", ( +class SyncPaginationState( + namedtuple("SyncPaginationState", ( "order", "value", + "limit", )) ): @classmethod def from_dict(cls, d): try: - return cls(d["o"], d["v"]) + return cls(d["o"], d["v"], d["l"]) except: raise SynapseError(400, "Invalid Token") @@ -162,6 +163,7 @@ def to_dict(self): return { "o": self.order, "v": self.value, + "l": self.limit, } def replace(self, **kwargs): From 43cbde4653ee988fc0b68c9a54a864a54f07d8e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2016 15:54:32 +0100 Subject: [PATCH 11/37] Basic extra include pagination impl --- synapse/handlers/sync.py | 92 ++++++++++++++++++---------- synapse/rest/client/v2_alpha/sync.py | 7 ++- synapse/types.py | 18 +++++- 3 files changed, 80 insertions(+), 37 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a880845605aa..fef81f5f9bf2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -49,6 +49,12 @@ SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,) +SyncExtras = collections.namedtuple("SyncExtras", [ + "paginate", + "rooms", +]) + + class TimelineBatch(collections.namedtuple("TimelineBatch", [ "prev_batch", "events", @@ -152,7 +158,7 @@ def __init__(self, hs): self.response_cache = ResponseCache() def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, - full_state=False): + full_state=False, extras=None): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. @@ -164,14 +170,14 @@ def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, result = self.response_cache.set( sync_config.request_key, self._wait_for_sync_for_user( - sync_config, batch_token, timeout, full_state + sync_config, batch_token, timeout, full_state, extras, ) ) return result @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, - full_state): + full_state, extras=None): context = LoggingContext.current_context() if context: if batch_token is None: @@ -184,13 +190,15 @@ def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, if timeout == 0 or batch_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. - result = yield self.current_sync_for_user( - sync_config, batch_token, full_state=full_state, + result = yield self.generate_sync_result( + sync_config, batch_token, full_state=full_state, extras=extras, ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): - return self.current_sync_for_user(sync_config, batch_token) + return self.generate_sync_result( + sync_config, batch_token, full_state=False, extras=extras, + ) result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, @@ -198,14 +206,6 @@ def current_sync_callback(before_token, after_token): ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, batch_token=None, - full_state=False): - """Get the sync for client needed to match what the server has now. - Returns: - A Deferred SyncResult. - """ - return self.generate_sync_result(sync_config, batch_token, full_state) - @defer.inlineCallbacks def push_rules_for_user(self, user): user_id = user.to_string() @@ -502,7 +502,8 @@ def unread_notifs_for_room_id(self, room_id, sync_config): defer.returnValue(None) @defer.inlineCallbacks - def generate_sync_result(self, sync_config, batch_token=None, full_state=False): + def generate_sync_result(self, sync_config, batch_token=None, full_state=False, + extras=None): """Generates a sync result. Args: @@ -531,7 +532,7 @@ def generate_sync_result(self, sync_config, batch_token=None, full_state=False): ) res = yield self._generate_sync_entry_for_rooms( - sync_result_builder, account_data_by_room + sync_result_builder, account_data_by_room, extras, ) newly_joined_rooms, newly_joined_users = res @@ -658,7 +659,8 @@ def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_ro sync_result_builder.presence = presence @defer.inlineCallbacks - def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): + def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room, + extras): """Generates the rooms portion of the sync response. Populates the `sync_result_builder` with the result. @@ -713,6 +715,8 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro else: pagination_config = None + include_map = extras.get("peek", {}) if extras else {} + if sync_result_builder.pagination_state: missing_state = yield self._get_rooms_that_need_full_state( room_ids=[r.room_id for r in room_entries], @@ -725,34 +729,54 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro for r in room_entries: if r.room_id in missing_state: r.full_state = True + if r.room_id in include_map: + r.always_include = True + r.events = None + r.since_token = None + r.upto_token = now_token - new_pagination_state = None if pagination_config: room_ids = [r.room_id for r in room_entries] pagination_limit = pagination_config.limit + extra_limit = extras.get("paginate", {}).get("limit", 0) if extras else 0 + room_map = yield self._get_room_timestamps_at_token( - room_ids, sync_result_builder.now_token, sync_config, pagination_limit + room_ids, sync_result_builder.now_token, sync_config, + pagination_limit + extra_limit, ) if room_map: sorted_list = sorted( room_map.items(), key=lambda item: -item[1] - )[:pagination_limit] + )[:pagination_limit + extra_limit] + + if sorted_list[pagination_limit:]: + new_room_ids = set(r[0] for r in sorted_list[pagination_limit:]) + for r in room_entries: + if r.room_id in new_room_ids: + r.full_state = True + r.always_include = True + r.since_token = None + r.upto_token = now_token + r.events = None _, bottom_ts = sorted_list[-1] value = bottom_ts - new_pagination_state = SyncPaginationState( - order=pagination_config.order, value=value, limit=pagination_limit, + sync_result_builder.pagination_state = SyncPaginationState( + order=pagination_config.order, value=value, + limit=pagination_limit + extra_limit, ) - else: - new_pagination_state = None - room_entries = [r for r in room_entries if r.room_id in room_map] + if len(room_map) == len(room_entries): + sync_result_builder.pagination_state = None - sync_result_builder.pagination_state = new_pagination_state + room_entries = [ + r for r in room_entries + if r.room_id in room_map or r.always_include + ] sync_result_builder.full_state |= sync_result_builder.since_token is None @@ -764,7 +788,6 @@ def handle_room_entries(room_entry): ephemeral=ephemeral_by_room.get(room_entry.room_id, []), tags=tags_by_room.get(room_entry.room_id), account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builder.full_state, ) yield concurrently_execute(handle_room_entries, room_entries, 10) @@ -995,8 +1018,7 @@ def _get_all_rooms(self, sync_result_builder, ignored_users): @defer.inlineCallbacks def _generate_room_entry(self, sync_result_builder, ignored_users, - room_builder, ephemeral, tags, account_data, - always_include=False): + room_builder, ephemeral, tags, account_data): """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. @@ -1012,7 +1034,11 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, even if empty. """ newly_joined = room_builder.newly_joined - always_include = always_include or newly_joined or sync_result_builder.full_state + always_include = ( + newly_joined + or sync_result_builder.full_state + or room_builder.always_include + ) full_state = ( room_builder.full_state or newly_joined @@ -1025,7 +1051,6 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, if events == [] and tags is None: return - since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config @@ -1166,7 +1191,7 @@ def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token, start_ts = yield self._get_room_timestamps_at_token( room_ids, since_token, sync_config=sync_config, - limit=pagination_state.limit, + limit=len(room_ids), ) missing_list = frozenset( @@ -1263,7 +1288,7 @@ class RoomSyncResultBuilder(object): __slots__ = ( "room_id", "rtype", "events", "newly_joined", "full_state", "since_token", - "upto_token", + "upto_token", "always_include", ) def __init__(self, room_id, rtype, events, newly_joined, full_state, @@ -1286,3 +1311,4 @@ def __init__(self, room_id, rtype, events, newly_joined, full_state, self.full_state = full_state self.since_token = since_token self.upto_token = upto_token + self.always_include = False diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 3df9743132e6..da94220f1e61 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -98,6 +98,8 @@ def on_POST(self, request): since = body.get("since", None) + extras = body.get("extras", None) + if "from" in body: # /events used to use 'from', but /sync uses 'since'. # Lets be helpful and whine if we see a 'from'. @@ -162,6 +164,7 @@ def on_POST(self, request): set_presence=set_presence, full_state=full_state, timeout=timeout, + extras=extras, ) defer.returnValue(sync_result) @@ -239,7 +242,7 @@ def on_GET(self, request): @defer.inlineCallbacks def _handle_sync(self, requester, sync_config, batch_token, set_presence, - full_state, timeout): + full_state, timeout, extras=None): affect_presence = set_presence != PresenceState.OFFLINE user = sync_config.user @@ -253,7 +256,7 @@ def _handle_sync(self, requester, sync_config, batch_token, set_presence, with context: sync_result = yield self.sync_handler.wait_for_sync_for_user( sync_config, batch_token=batch_token, timeout=timeout, - full_state=full_state + full_state=full_state, extras=extras, ) time_now = self.clock.time_msec() diff --git a/synapse/types.py b/synapse/types.py index fd7c0ffe7a5e..cf950a0c36a9 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -129,7 +129,7 @@ def from_string(cls, string): if pa: pa = SyncPaginationState.from_dict(pa) return cls( - stream_token=StreamToken.from_string(d["t"]), + stream_token=StreamToken.from_arr(d["t"]), pagination_state=pa, ) except: @@ -137,7 +137,7 @@ def from_string(cls, string): def to_string(self): return encode_base64(json.dumps({ - "t": self.stream_token.to_string(), + "t": self.stream_token.to_arr(), "pa": self.pagination_state.to_dict() if self.pagination_state else None, })) @@ -196,6 +196,20 @@ def from_string(cls, string): def to_string(self): return self._SEPARATOR.join([str(k) for k in self]) + @classmethod + def from_arr(cls, arr): + try: + keys = arr + while len(keys) < len(cls._fields): + # i.e. old token from before receipt_key + keys.append("0") + return cls(*keys) + except: + raise SynapseError(400, "Invalid Token") + + def to_arr(self): + return self + @property def room_stream_id(self): # TODO(markjh): Awful hack to work around hacks in the presence tests From e5b3034fc432c823f8a5426950d21771a9ee338c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2016 17:00:59 +0100 Subject: [PATCH 12/37] Indicate if /sync was limited or not --- synapse/handlers/sync.py | 30 ++++++++++++++++++++++------ synapse/rest/client/v2_alpha/sync.py | 3 +++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index fef81f5f9bf2..b47fefc13fff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -130,6 +130,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. + "pagination_info", ])): __slots__ = [] @@ -549,7 +550,8 @@ def generate_sync_result(self, sync_config, batch_token=None, full_state=False, next_batch=SyncNextBatchToken( stream_token=sync_result_builder.now_token, pagination_state=sync_result_builder.pagination_state, - ) + ), + pagination_info=sync_result_builder.pagination_info, )) @defer.inlineCallbacks @@ -707,13 +709,16 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro if sync_config.pagination_config: pagination_config = sync_config.pagination_config + old_pagination_value = 0 elif sync_result_builder.pagination_state: pagination_config = SyncPaginationConfig( order=sync_result_builder.pagination_state.order, limit=sync_result_builder.pagination_state.limit, ) + old_pagination_value = sync_result_builder.pagination_state.value else: pagination_config = None + old_pagination_value = 0 include_map = extras.get("peek", {}) if extras else {} @@ -743,17 +748,20 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro room_map = yield self._get_room_timestamps_at_token( room_ids, sync_result_builder.now_token, sync_config, - pagination_limit + extra_limit, + pagination_limit + extra_limit + 1, ) + limited = False if room_map: sorted_list = sorted( room_map.items(), key=lambda item: -item[1] - )[:pagination_limit + extra_limit] + ) - if sorted_list[pagination_limit:]: - new_room_ids = set(r[0] for r in sorted_list[pagination_limit:]) + cutoff_list = sorted_list[:pagination_limit + extra_limit] + + if cutoff_list[pagination_limit:]: + new_room_ids = set(r[0] for r in cutoff_list[pagination_limit:]) for r in room_entries: if r.room_id in new_room_ids: r.full_state = True @@ -762,14 +770,21 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro r.upto_token = now_token r.events = None - _, bottom_ts = sorted_list[-1] + _, bottom_ts = cutoff_list[-1] value = bottom_ts + limited = any( + old_pagination_value < r[1] < value + for r in sorted_list[pagination_limit + extra_limit:] + ) + sync_result_builder.pagination_state = SyncPaginationState( order=pagination_config.order, value=value, limit=pagination_limit + extra_limit, ) + sync_result_builder.pagination_info["limited"] = limited + if len(room_map) == len(room_entries): sync_result_builder.pagination_state = None @@ -1257,6 +1272,7 @@ class SyncResultBuilder(object): __slots__ = ( "sync_config", "full_state", "batch_token", "since_token", "pagination_state", "now_token", "presence", "account_data", "joined", "invited", "archived", + "pagination_info", ) def __init__(self, sync_config, full_state, batch_token, now_token): @@ -1280,6 +1296,8 @@ def __init__(self, sync_config, full_state, batch_token, now_token): self.invited = [] self.archived = [] + self.pagination_info = {} + class RoomSyncResultBuilder(object): """Stores information needed to create either a `JoinedSyncResult` or diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index da94220f1e61..24587cbc6140 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -286,6 +286,9 @@ def _handle_sync(self, requester, sync_config, batch_token, set_presence, "next_batch": sync_result.next_batch.to_string(), } + if sync_result.pagination_info: + response_content["pagination_info"] = sync_result.pagination_info + defer.returnValue((200, response_content)) def encode_presence(self, events, time_now): From 96d6fff44787a53863b1321ed28f1636aa5dc78a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Jun 2016 11:33:53 +0100 Subject: [PATCH 13/37] Fix 'A next_batch token can be used in the v1 messages API' --- synapse/streams/config.py | 14 +++++++++----- synapse/types.py | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 4f089bfb9419..49be3c222aa5 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.api.errors import SynapseError -from synapse.types import StreamToken +from synapse.types import StreamToken, SyncNextBatchToken import logging @@ -72,14 +72,18 @@ def get_param(name, default=None): if direction not in ['f', 'b']: raise SynapseError(400, "'dir' parameter is invalid.") - from_tok = get_param("from") + raw_from_tok = get_param("from") to_tok = get_param("to") try: - if from_tok == "END": + from_tok = None + if raw_from_tok == "END": from_tok = None # For backwards compat. - elif from_tok: - from_tok = StreamToken.from_string(from_tok) + elif raw_from_tok: + try: + from_tok = SyncNextBatchToken.from_string(raw_from_tok).stream_token + except: + from_tok = StreamToken.from_string(raw_from_tok) except: raise SynapseError(400, "'from' paramater is invalid") diff --git a/synapse/types.py b/synapse/types.py index bde9e8e2c5ed..13cdc737fb79 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -174,7 +174,7 @@ def replace(self, **kwargs): class StreamToken( - namedtuple("Token", ( + namedtuple("StreamToken", ( "room_key", "presence_key", "typing_key", From 6992fb9bc174578486d96f13d55acec26c8ae6e1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 21 Jun 2016 10:26:11 +0100 Subject: [PATCH 14/37] Implement error responses --- synapse/api/errors.py | 1 + synapse/handlers/sync.py | 25 ++++++++++++++++++++++++- synapse/rest/client/v2_alpha/sync.py | 12 ++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index b106fbed6dd6..dd57fe3b8d40 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -44,6 +44,7 @@ class Codes(object): THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED" THREEPID_IN_USE = "THREEPID_IN_USE" INVALID_USERNAME = "M_INVALID_USERNAME" + CANNOT_PEEK = "M_CANNOT_PEEK" class CodeMessageException(RuntimeError): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 451182cfecab..71c3f678dee1 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -21,6 +21,7 @@ from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client from synapse.types import SyncNextBatchToken, SyncPaginationState +from synapse.api.errors import Codes from twisted.internet import defer @@ -123,6 +124,18 @@ def __nonzero__(self): return True +class ErrorSyncResult(collections.namedtuple("ErrorSyncResult", [ + "room_id", # str + "errcode", # str + "error", # str +])): + __slots__ = [] + + def __nonzero__(self): + """Errors should always be reported to the client""" + return True + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. @@ -130,6 +143,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. + "errors", # ErrorSyncResult "pagination_info", ])): __slots__ = [] @@ -546,6 +560,7 @@ def generate_sync_result(self, sync_config, batch_token=None, full_state=False, joined=sync_result_builder.joined, invited=sync_result_builder.invited, archived=sync_result_builder.archived, + errors=sync_result_builder.errors, next_batch=SyncNextBatchToken( stream_token=sync_result_builder.now_token, pagination_state=sync_result_builder.pagination_state, @@ -742,6 +757,13 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro r.since_token = None r.upto_token = now_token + for room_id in set(include_map.keys()) - {r.room_id for r in room_entries}: + sync_result_builder.errors.append(ErrorSyncResult( + room_id=room_id, + errcode=Codes.CANNOT_PEEK, + error="Cannot peek into requested room", + )) + if pagination_config: room_ids = [r.room_id for r in room_entries] pagination_limit = pagination_config.limit @@ -1274,7 +1296,7 @@ class SyncResultBuilder(object): __slots__ = ( "sync_config", "full_state", "batch_token", "since_token", "pagination_state", "now_token", "presence", "account_data", "joined", "invited", "archived", - "pagination_info", + "pagination_info", "errors", ) def __init__(self, sync_config, full_state, batch_token, now_token): @@ -1297,6 +1319,7 @@ def __init__(self, sync_config, full_state, batch_token, now_token): self.joined = [] self.invited = [] self.archived = [] + self.errors = [] self.pagination_info = {} diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 24587cbc6140..8492ef66f6f8 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -286,6 +286,9 @@ def _handle_sync(self, requester, sync_config, batch_token, set_presence, "next_batch": sync_result.next_batch.to_string(), } + if sync_result.errors: + response_content["rooms"]["errors"] = self.encode_errors(sync_result.errors) + if sync_result.pagination_info: response_content["pagination_info"] = sync_result.pagination_info @@ -299,6 +302,15 @@ def encode_presence(self, events, time_now): formatted.append(event) return {"events": formatted} + def encode_errors(self, errors): + return { + e.room_id: { + "errcode": e.errcode, + "error": e.error + } + for e in errors + } + def encode_joined(self, rooms, time_now, token_id): """ Encode the joined rooms in a sync result From 3b6027dbc1302a918df272f9fae7f4cac1ff2685 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 21 Jun 2016 11:18:09 +0100 Subject: [PATCH 15/37] Always include tags --- synapse/handlers/sync.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 71c3f678dee1..19ba5fdba44e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -747,15 +747,27 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro pagination_state=sync_result_builder.pagination_state, ) + all_tags = yield self.store.get_tags_for_user(user_id) + if missing_state: for r in room_entries: if r.room_id in missing_state: + if r.room_id in all_tags: + r.always_include = True + continue r.full_state = True if r.room_id in include_map: r.always_include = True r.events = None r.since_token = None r.upto_token = now_token + elif pagination_config: + all_tags = yield self.store.get_tags_for_user(user_id) + + logger.info("all_tags: %r", all_tags) + for r in room_entries: + if r.room_id in all_tags: + r.always_include = True for room_id in set(include_map.keys()) - {r.room_id for r in room_entries}: sync_result_builder.errors.append(ErrorSyncResult( From cdd379b6df4f9eb3cbfaae2b576538d3d23d38d6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 21 Jun 2016 11:36:28 +0100 Subject: [PATCH 16/37] Use msgpack for shorter tokens --- synapse/python_dependencies.py | 1 + synapse/types.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index e0a7a1977763..ca49645e90e0 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -36,6 +36,7 @@ "blist": ["blist"], "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"], "pymacaroons-pynacl": ["pymacaroons"], + "msgpack": ["msgpack"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/types.py b/synapse/types.py index 13cdc737fb79..e4eddbacef9e 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -19,7 +19,7 @@ from unpaddedbase64 import encode_base64, decode_base64 import ujson as json - +import msgpack Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) @@ -127,7 +127,7 @@ class SyncNextBatchToken( @classmethod def from_string(cls, string): try: - d = json.loads(decode_base64(string)) + d = msgpack.loads(decode_base64(string)) pa = d.get("pa", None) if pa: pa = SyncPaginationState.from_dict(pa) @@ -139,7 +139,7 @@ def from_string(cls, string): raise SynapseError(400, "Invalid Token") def to_string(self): - return encode_base64(json.dumps({ + return encode_base64(msgpack.dumps({ "t": self.stream_token.to_arr(), "pa": self.pagination_state.to_dict() if self.pagination_state else None, })) From 6a101e512fed86e26ce77d16457daf2292cc6aa3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Jun 2016 10:59:24 +0100 Subject: [PATCH 17/37] Add tag handling --- synapse/handlers/sync.py | 59 ++++++++++++++++++++++++---- synapse/python_dependencies.py | 2 +- synapse/rest/client/v2_alpha/sync.py | 5 ++- synapse/types.py | 6 ++- 4 files changed, 60 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 19ba5fdba44e..04bb1616c050 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -21,7 +21,7 @@ from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client from synapse.types import SyncNextBatchToken, SyncPaginationState -from synapse.api.errors import Codes +from synapse.api.errors import Codes, SynapseError from twisted.internet import defer @@ -41,10 +41,30 @@ ]) -SyncPaginationConfig = collections.namedtuple("SyncPaginationConfig", [ +class SyncPaginationConfig(collections.namedtuple("SyncPaginationConfig", [ "order", "limit", -]) + "tags", +])): + def __init__(self, order, limit, tags): + if order not in SYNC_PAGINATION_VALID_ORDERS: + raise SynapseError(400, "Invalid 'order'") + if tags not in SYNC_PAGINATION_VALID_TAGS_OPTIONS: + raise SynapseError(400, "Invalid 'tags'") + + try: + limit = int(limit) + except: + raise SynapseError(400, "Invalid 'limit'") + + super(SyncPaginationConfig, self).__init__(order, limit, tags) + + +SYNC_PAGINATION_TAGS_INCLUDE_ALL = "include_all" +SYNC_PAGINATION_TAGS_IGNORE = "ignore" +SYNC_PAGINATION_VALID_TAGS_OPTIONS = ( + SYNC_PAGINATION_TAGS_INCLUDE_ALL, SYNC_PAGINATION_TAGS_IGNORE, +) SYNC_PAGINATION_ORDER_TS = "o" SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,) @@ -727,15 +747,19 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro if sync_config.pagination_config: pagination_config = sync_config.pagination_config old_pagination_value = 0 + include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL elif sync_result_builder.pagination_state: pagination_config = SyncPaginationConfig( order=sync_result_builder.pagination_state.order, limit=sync_result_builder.pagination_state.limit, + tags=sync_result_builder.pagination_state.tags, ) old_pagination_value = sync_result_builder.pagination_state.value + include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL else: pagination_config = None old_pagination_value = 0 + include_all_tags = False include_map = extras.get("peek", {}) if extras else {} @@ -752,19 +776,19 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro if missing_state: for r in room_entries: if r.room_id in missing_state: - if r.room_id in all_tags: + if include_all_tags and r.room_id in all_tags: r.always_include = True continue r.full_state = True + r.would_require_resync = True if r.room_id in include_map: r.always_include = True r.events = None r.since_token = None r.upto_token = now_token - elif pagination_config: + elif pagination_config and include_all_tags: all_tags = yield self.store.get_tags_for_user(user_id) - logger.info("all_tags: %r", all_tags) for r in room_entries: if r.room_id in all_tags: r.always_include = True @@ -817,8 +841,15 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro sync_result_builder.pagination_state = SyncPaginationState( order=pagination_config.order, value=value, limit=pagination_limit + extra_limit, + tags=pagination_config.tags, ) + to_sync_map = { + key: value for key, value in cutoff_list + } + else: + to_sync_map = {} + sync_result_builder.pagination_info["limited"] = limited if len(room_map) == len(room_entries): @@ -826,7 +857,7 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro room_entries = [ r for r in room_entries - if r.room_id in room_map or r.always_include + if r.room_id in to_sync_map or r.always_include ] sync_result_builder.full_state |= sync_result_builder.since_token is None @@ -1094,6 +1125,7 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, room_builder.full_state or newly_joined or sync_result_builder.full_state + or room_builder.would_require_resync ) events = room_builder.events @@ -1139,6 +1171,16 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, if not (always_include or batch or account_data or ephemeral): return + if room_builder.would_require_resync: + since_token = None + batch = yield self._load_filtered_recents( + room_id, sync_config, + now_token=upto_token, + since_token=since_token, + recents=None, + newly_joined_room=newly_joined, + ) + state = yield self.compute_state_delta( room_id, batch, sync_config, since_token, now_token, full_state=full_state @@ -1343,7 +1385,7 @@ class RoomSyncResultBuilder(object): __slots__ = ( "room_id", "rtype", "events", "newly_joined", "full_state", "since_token", - "upto_token", "always_include", + "upto_token", "always_include", "would_require_resync", ) def __init__(self, room_id, rtype, events, newly_joined, full_state, @@ -1367,3 +1409,4 @@ def __init__(self, room_id, rtype, events, newly_joined, full_state, self.since_token = since_token self.upto_token = upto_token self.always_include = False + self.would_require_resync = False diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index ca49645e90e0..e7232b5f2b85 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -36,7 +36,7 @@ "blist": ["blist"], "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"], "pymacaroons-pynacl": ["pymacaroons"], - "msgpack": ["msgpack"], + "msgpack-python": ["msgpack"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 8492ef66f6f8..e690e74a22c8 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -19,7 +19,9 @@ RestServlet, parse_string, parse_integer, parse_boolean, parse_json_object_from_request, ) -from synapse.handlers.sync import SyncConfig, SyncPaginationConfig +from synapse.handlers.sync import ( + SyncConfig, SyncPaginationConfig, SYNC_PAGINATION_TAGS_INCLUDE_ALL, +) from synapse.types import SyncNextBatchToken from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_room_id, @@ -149,6 +151,7 @@ def on_POST(self, request): pagination_config=SyncPaginationConfig( order=pagination_config["order"], limit=pagination_config["limit"], + tags=pagination_config.get("tags", SYNC_PAGINATION_TAGS_INCLUDE_ALL), ) if pagination_config else None, ) diff --git a/synapse/types.py b/synapse/types.py index e4eddbacef9e..f0ad527bc049 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,9 +18,9 @@ from collections import namedtuple from unpaddedbase64 import encode_base64, decode_base64 -import ujson as json import msgpack + Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) @@ -153,12 +153,13 @@ class SyncPaginationState( "order", "value", "limit", + "tags", )) ): @classmethod def from_dict(cls, d): try: - return cls(d["o"], d["v"], d["l"]) + return cls(d["o"], d["v"], d["l"], d["t"]) except: raise SynapseError(400, "Invalid Token") @@ -167,6 +168,7 @@ def to_dict(self): "o": self.order, "v": self.value, "l": self.limit, + "t": self.tags, } def replace(self, **kwargs): From 839088e2e773ce7835034b6a73566bc31a068e84 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Jun 2016 11:02:27 +0100 Subject: [PATCH 18/37] Support streaming peek --- synapse/handlers/sync.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 04bb1616c050..625f2f25833c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -779,13 +779,19 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro if include_all_tags and r.room_id in all_tags: r.always_include = True continue - r.full_state = True - r.would_require_resync = True if r.room_id in include_map: - r.always_include = True + since = include_map[r.room_id].get("since", None) + r.since_token = since + if not since: + r.always_include = True + r.full_state = True + r.would_require_resync = True r.events = None - r.since_token = None r.upto_token = now_token + else: + r.full_state = True + r.would_require_resync = True + elif pagination_config and include_all_tags: all_tags = yield self.store.get_tags_for_user(user_id) From baab93b0dd686966b62f4fb3d27aaae1a49ba7e5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Jun 2016 11:40:06 +0100 Subject: [PATCH 19/37] Implement 'synced' flag --- synapse/handlers/sync.py | 16 +++++++++++----- synapse/rest/client/v2_alpha/sync.py | 1 + 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 625f2f25833c..2a8e6e7efd6f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -97,6 +97,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "ephemeral", "account_data", "unread_notifications", + "synced", # bool ])): __slots__ = [] @@ -781,13 +782,16 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro continue if r.room_id in include_map: since = include_map[r.room_id].get("since", None) - r.since_token = since - if not since: + if since: + tok = SyncNextBatchToken.from_string(since) + r.since_token = tok.stream_token + else: + r.since_token = None r.always_include = True r.full_state = True r.would_require_resync = True - r.events = None - r.upto_token = now_token + r.events = None + r.synced = False else: r.full_state = True r.would_require_resync = True @@ -1201,6 +1205,7 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, ephemeral=ephemeral, account_data=account_data_events, unread_notifications=unread_notifications, + synced=room_builder.synced, ) if room_sync or always_include: @@ -1391,7 +1396,7 @@ class RoomSyncResultBuilder(object): __slots__ = ( "room_id", "rtype", "events", "newly_joined", "full_state", "since_token", - "upto_token", "always_include", "would_require_resync", + "upto_token", "always_include", "would_require_resync", "synced", ) def __init__(self, room_id, rtype, events, newly_joined, full_state, @@ -1416,3 +1421,4 @@ def __init__(self, room_id, rtype, events, newly_joined, full_state, self.upto_token = upto_token self.always_include = False self.would_require_resync = False + self.synced = True diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index e690e74a22c8..abf22e4dc114 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -335,6 +335,7 @@ def encode_joined(self, rooms, time_now, token_id): joined[room.room_id] = self.encode_room( room, time_now, token_id ) + joined[room.room_id]["synced"] = room.synced return joined From a90140358b7436b317ba600a8820107dbde30d73 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 23 Jun 2016 10:40:43 +0100 Subject: [PATCH 20/37] Change default tag handling --- synapse/rest/client/v2_alpha/sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index abf22e4dc114..683bba4cf640 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -20,7 +20,7 @@ parse_json_object_from_request, ) from synapse.handlers.sync import ( - SyncConfig, SyncPaginationConfig, SYNC_PAGINATION_TAGS_INCLUDE_ALL, + SyncConfig, SyncPaginationConfig, SYNC_PAGINATION_TAGS_IGNORE, ) from synapse.types import SyncNextBatchToken from synapse.events.utils import ( @@ -151,7 +151,7 @@ def on_POST(self, request): pagination_config=SyncPaginationConfig( order=pagination_config["order"], limit=pagination_config["limit"], - tags=pagination_config.get("tags", SYNC_PAGINATION_TAGS_INCLUDE_ALL), + tags=pagination_config.get("tags", SYNC_PAGINATION_TAGS_IGNORE), ) if pagination_config else None, ) From 8c3fca8b282a45cb98123a34a46d80b0f55f2493 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 23 Jun 2016 13:43:25 +0100 Subject: [PATCH 21/37] Correctly handle tags changing in paginated sync --- synapse/handlers/sync.py | 28 ++++++- .../replication/slave/storage/account_data.py | 3 + synapse/storage/prepare_database.py | 2 +- .../storage/schema/delta/33/tag_changes.sql | 24 ++++++ synapse/storage/tags.py | 74 +++++++++++++++++++ 5 files changed, 127 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/schema/delta/33/tag_changes.sql diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 2a8e6e7efd6f..16c6b8dedfed 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -22,6 +22,7 @@ from synapse.visibility import filter_events_for_client from synapse.types import SyncNextBatchToken, SyncPaginationState from synapse.api.errors import Codes, SynapseError +from synapse.storage.tags import (TAG_CHANGE_NEWLY_TAGGED, TAG_CHANGE_ALL_REMOVED) from twisted.internet import defer @@ -774,12 +775,33 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro all_tags = yield self.store.get_tags_for_user(user_id) + if sync_result_builder.since_token: + stream_id = sync_result_builder.since_token.account_data_key + tag_changes = yield self.store.get_room_tags_changed(user_id, stream_id) + else: + tag_changes = {} + if missing_state: for r in room_entries: if r.room_id in missing_state: - if include_all_tags and r.room_id in all_tags: - r.always_include = True - continue + if include_all_tags: + change = tag_changes.get(r.room_id) + if change == TAG_CHANGE_NEWLY_TAGGED: + r.since_token = None + r.always_include = True + r.full_state = True + r.would_require_resync = True + r.events = None + r.synced = True + continue + elif change == TAG_CHANGE_ALL_REMOVED: + r.always_include = True + r.synced = False + continue + elif r.room_id in all_tags: + r.always_include = True + continue + if r.room_id in include_map: since = include_map[r.room_id].get("since", None) if since: diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index 735c03c7eb07..351894510c85 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -51,6 +51,9 @@ def __init__(self, db_conn, hs): get_updated_account_data_for_user = ( DataStore.get_updated_account_data_for_user.__func__ ) + get_room_tags_changed = ( + DataStore.get_room_tags_changed.__func__ + ) def get_max_account_data_stream_id(self): return self._account_data_id_gen.get_current_token() diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c8487c8838c9..8801669a6b3c 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 32 +SCHEMA_VERSION = 33 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/33/tag_changes.sql b/synapse/storage/schema/delta/33/tag_changes.sql new file mode 100644 index 000000000000..6d858000d92c --- /dev/null +++ b/synapse/storage/schema/delta/33/tag_changes.sql @@ -0,0 +1,24 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE room_tags_change_revisions( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + stream_id BIGINT NOT NULL, + change TEXT NOT NULL +); + +CREATE INDEX room_tags_change_revisions_rm_idx ON room_tags_change_revisions(user_id, room_id, stream_id); +CREATE INDEX room_tags_change_revisions_idx ON room_tags_change_revisions(user_id, stream_id); diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 5a2c1aa59b6a..682576fb8de2 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -17,12 +17,18 @@ from synapse.util.caches.descriptors import cached from twisted.internet import defer +from collections import Counter + import ujson as json import logging logger = logging.getLogger(__name__) +TAG_CHANGE_NEWLY_TAGGED = "newly_tagged" +TAG_CHANGE_ALL_REMOVED = "all_removed" + + class TagsStore(SQLBaseStore): def get_max_account_data_stream_id(self): """Get the current max stream id for the private user data stream @@ -170,6 +176,39 @@ def get_tags_for_room(self, user_id, room_id): row["tag"]: json.loads(row["content"]) for row in rows }) + def get_room_tags_changed(self, user_id, stream_id): + changed = self._account_data_stream_cache.has_entity_changed( + user_id, int(stream_id) + ) + + if not changed: + return {} + + def _get_room_tags_changed(txn): + txn.execute( + "SELECT room_id, change FROM room_tags_change_revisions" + " WHERE user_id = ? AND stream_id > ?", + (user_id, stream_id) + ) + + results = Counter() + + for room_id, change in txn.fetchall(): + if change == TAG_CHANGE_NEWLY_TAGGED: + results[room_id] += 1 + elif change == TAG_CHANGE_ALL_REMOVED: + results[room_id] -= 1 + else: + logger.warn("Unexpected tag change: %r", change) + + return { + room_id: TAG_CHANGE_NEWLY_TAGGED if count > 0 else TAG_CHANGE_ALL_REMOVED + for room_id, count in results.items() + if count + } + + return self.runInteraction("get_room_tags_changed", _get_room_tags_changed) + @defer.inlineCallbacks def add_tag_to_room(self, user_id, room_id, tag, content): """Add a tag to a room for a user. @@ -184,6 +223,12 @@ def add_tag_to_room(self, user_id, room_id, tag, content): content_json = json.dumps(content) def add_tag_txn(txn, next_id): + txn.execute( + "SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?", + (user_id, room_id), + ) + existing_tags, = txn.fetchone() + self._simple_upsert_txn( txn, table="room_tags", @@ -197,6 +242,17 @@ def add_tag_txn(txn, next_id): } ) self._update_revision_txn(txn, user_id, room_id, next_id) + if not existing_tags: + self._simple_insert_txn( + txn, + table="room_tags_change_revisions", + values={ + "user_id": user_id, + "room_id": room_id, + "stream_id": next_id, + "change": TAG_CHANGE_NEWLY_TAGGED, + } + ) with self._account_data_id_gen.get_next() as next_id: yield self.runInteraction("add_tag", add_tag_txn, next_id) @@ -218,6 +274,24 @@ def remove_tag_txn(txn, next_id): " WHERE user_id = ? AND room_id = ? AND tag = ?" ) txn.execute(sql, (user_id, room_id, tag)) + if txn.rowcount > 0: + txn.execute( + "SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?", + (user_id, room_id), + ) + existing_tags, = txn.fetchone() + if not existing_tags: + self._simple_insert_txn( + txn, + table="room_tags_change_revisions", + values={ + "user_id": user_id, + "room_id": room_id, + "stream_id": next_id, + "change": TAG_CHANGE_ALL_REMOVED, + } + ) + self._update_revision_txn(txn, user_id, room_id, next_id) with self._account_data_id_gen.get_next() as next_id: From 7b3324e2524e2464b5a7431f60d132ae96239711 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 23 Jun 2016 15:48:33 +0100 Subject: [PATCH 22/37] Get rid of per room full_state flag --- synapse/handlers/sync.py | 30 ++++++------------------------ 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 16c6b8dedfed..10f9e1c5d8f3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -787,11 +787,8 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro if include_all_tags: change = tag_changes.get(r.room_id) if change == TAG_CHANGE_NEWLY_TAGGED: - r.since_token = None r.always_include = True - r.full_state = True r.would_require_resync = True - r.events = None r.synced = True continue elif change == TAG_CHANGE_ALL_REMOVED: @@ -808,14 +805,10 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro tok = SyncNextBatchToken.from_string(since) r.since_token = tok.stream_token else: - r.since_token = None r.always_include = True - r.full_state = True r.would_require_resync = True - r.events = None r.synced = False else: - r.full_state = True r.would_require_resync = True elif pagination_config and include_all_tags: @@ -856,11 +849,8 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro new_room_ids = set(r[0] for r in cutoff_list[pagination_limit:]) for r in room_entries: if r.room_id in new_room_ids: - r.full_state = True r.always_include = True - r.since_token = None - r.upto_token = now_token - r.events = None + r.would_require_resync = True _, bottom_ts = cutoff_list[-1] value = bottom_ts @@ -1012,7 +1002,6 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users): rtype="archived", events=None, newly_joined=room_id in newly_joined_rooms, - full_state=False, since_token=since_token, upto_token=leave_token, )) @@ -1042,7 +1031,6 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users): rtype="joined", events=events, newly_joined=room_id in newly_joined_rooms, - full_state=False, since_token=None if room_id in newly_joined_rooms else since_token, upto_token=prev_batch_token, )) @@ -1052,7 +1040,6 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users): rtype="joined", events=[], newly_joined=room_id in newly_joined_rooms, - full_state=False, since_token=since_token, upto_token=since_token, )) @@ -1096,7 +1083,6 @@ def _get_all_rooms(self, sync_result_builder, ignored_users): rtype="joined", events=None, newly_joined=False, - full_state=True, since_token=since_token, upto_token=now_token, )) @@ -1123,7 +1109,6 @@ def _get_all_rooms(self, sync_result_builder, ignored_users): rtype="archived", events=None, newly_joined=False, - full_state=True, since_token=since_token, upto_token=leave_token, )) @@ -1154,8 +1139,7 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, or room_builder.always_include ) full_state = ( - room_builder.full_state - or newly_joined + newly_joined or sync_result_builder.full_state or room_builder.would_require_resync ) @@ -1204,11 +1188,10 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, return if room_builder.would_require_resync: - since_token = None batch = yield self._load_filtered_recents( room_id, sync_config, now_token=upto_token, - since_token=since_token, + since_token=None, recents=None, newly_joined_room=newly_joined, ) @@ -1417,11 +1400,11 @@ class RoomSyncResultBuilder(object): """ __slots__ = ( - "room_id", "rtype", "events", "newly_joined", "full_state", "since_token", + "room_id", "rtype", "events", "newly_joined", "since_token", "upto_token", "always_include", "would_require_resync", "synced", ) - def __init__(self, room_id, rtype, events, newly_joined, full_state, + def __init__(self, room_id, rtype, events, newly_joined, since_token, upto_token): """ Args: @@ -1430,7 +1413,6 @@ def __init__(self, room_id, rtype, events, newly_joined, full_state, events(list): List of events to include in the room, (more events may be added when generating result). newly_joined(bool): If the user has newly joined the room - full_state(bool): Whether the full state should be sent in result since_token(StreamToken): Earliest point to return events from, or None upto_token(StreamToken): Latest point to return events from. """ @@ -1438,9 +1420,9 @@ def __init__(self, room_id, rtype, events, newly_joined, full_state, self.rtype = rtype self.events = events self.newly_joined = newly_joined - self.full_state = full_state self.since_token = since_token self.upto_token = upto_token + self.always_include = False self.would_require_resync = False self.synced = True From 6c8c061c2f71bfd7e785493354c57427cad0e7ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 23 Jun 2016 15:55:53 +0100 Subject: [PATCH 23/37] Move stuff into separate function --- synapse/handlers/sync.py | 83 ++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 32 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 10f9e1c5d8f3..00a009fdf5db 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -714,7 +714,6 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro `(newly_joined_rooms, newly_joined_users)` """ user_id = sync_result_builder.sync_config.user.to_string() - sync_config = sync_result_builder.sync_config now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_result_builder.sync_config, @@ -746,6 +745,56 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro tags_by_room = yield self.store.get_tags_for_user(user_id) + yield self._update_room_entries_for_paginated_sync( + sync_result_builder, room_entries, extras + ) + + sync_result_builder.full_state |= sync_result_builder.since_token is None + + def handle_room_entries(room_entry): + return self._generate_room_entry( + sync_result_builder, + ignored_users, + room_entry, + ephemeral=ephemeral_by_room.get(room_entry.room_id, []), + tags=tags_by_room.get(room_entry.room_id), + account_data=account_data_by_room.get(room_entry.room_id, {}), + ) + + yield concurrently_execute(handle_room_entries, room_entries, 10) + + sync_result_builder.invited.extend(invited) + + # Now we want to get any newly joined users + newly_joined_users = set() + if sync_result_builder.since_token: + for joined_sync in sync_result_builder.joined: + it = itertools.chain( + joined_sync.timeline.events, joined_sync.state.values() + ) + for event in it: + if event.type == EventTypes.Member: + if event.membership == Membership.JOIN: + newly_joined_users.add(event.state_key) + + defer.returnValue((newly_joined_rooms, newly_joined_users)) + + @defer.inlineCallbacks + def _update_room_entries_for_paginated_sync(self, sync_result_builder, + room_entries, extras): + """Works out which room_entries should be synced to the client, which + would need to be resynced if they were sent down, etc. + + Mutates room_entries. + + Args: + sync_result_builder (SyncResultBuilder) + room_entries (list(RoomSyncResultBuilder)) + extras (dict) + """ + user_id = sync_result_builder.sync_config.user.to_string() + sync_config = sync_result_builder.sync_config + if sync_config.pagination_config: pagination_config = sync_config.pagination_config old_pagination_value = 0 @@ -877,41 +926,11 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro if len(room_map) == len(room_entries): sync_result_builder.pagination_state = None - room_entries = [ + room_entries[:] = [ r for r in room_entries if r.room_id in to_sync_map or r.always_include ] - sync_result_builder.full_state |= sync_result_builder.since_token is None - - def handle_room_entries(room_entry): - return self._generate_room_entry( - sync_result_builder, - ignored_users, - room_entry, - ephemeral=ephemeral_by_room.get(room_entry.room_id, []), - tags=tags_by_room.get(room_entry.room_id), - account_data=account_data_by_room.get(room_entry.room_id, {}), - ) - - yield concurrently_execute(handle_room_entries, room_entries, 10) - - sync_result_builder.invited.extend(invited) - - # Now we want to get any newly joined users - newly_joined_users = set() - if sync_result_builder.since_token: - for joined_sync in sync_result_builder.joined: - it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.values() - ) - for event in it: - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - newly_joined_users.add(event.state_key) - - defer.returnValue((newly_joined_rooms, newly_joined_users)) - @defer.inlineCallbacks def _get_rooms_changed(self, sync_result_builder, ignored_users): """Gets the the changes that have happened since the last sync. From a7e6ad9f3ecde82894e214a7530fd65a32ad3ecf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 23 Jun 2016 17:26:27 +0100 Subject: [PATCH 24/37] Use SyncExtras --- synapse/handlers/sync.py | 25 +++++++++++++------------ synapse/rest/client/v2_alpha/sync.py | 11 ++++++++--- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 00a009fdf5db..06ffb9040c77 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -72,10 +72,12 @@ def __init__(self, order, limit, tags): SyncExtras = collections.namedtuple("SyncExtras", [ - "paginate", - "rooms", + "paginate", # dict with "limit" key + "peek", # dict of room_id -> dict ]) +DEFAULT_SYNC_EXTRAS = SyncExtras(paginate={}, peek={}) + class TimelineBatch(collections.namedtuple("TimelineBatch", [ "prev_batch", @@ -195,7 +197,7 @@ def __init__(self, hs): self.response_cache = ResponseCache() def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, - full_state=False, extras=None): + full_state=False, extras=DEFAULT_SYNC_EXTRAS): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. @@ -214,7 +216,7 @@ def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, - full_state, extras=None): + full_state, extras=DEFAULT_SYNC_EXTRAS): context = LoggingContext.current_context() if context: if batch_token is None: @@ -539,13 +541,14 @@ def unread_notifs_for_room_id(self, room_id, sync_config): @defer.inlineCallbacks def generate_sync_result(self, sync_config, batch_token=None, full_state=False, - extras=None): + extras=DEFAULT_SYNC_EXTRAS): """Generates a sync result. Args: sync_config (SyncConfig) since_token (StreamToken) full_state (bool) + extras (SyncExtras) Returns: Deferred(SyncResult) @@ -790,7 +793,7 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, Args: sync_result_builder (SyncResultBuilder) room_entries (list(RoomSyncResultBuilder)) - extras (dict) + extras (SyncExtras) """ user_id = sync_result_builder.sync_config.user.to_string() sync_config = sync_result_builder.sync_config @@ -812,8 +815,6 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, old_pagination_value = 0 include_all_tags = False - include_map = extras.get("peek", {}) if extras else {} - if sync_result_builder.pagination_state: missing_state = yield self._get_rooms_that_need_full_state( room_ids=[r.room_id for r in room_entries], @@ -848,8 +849,8 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, r.always_include = True continue - if r.room_id in include_map: - since = include_map[r.room_id].get("since", None) + if r.room_id in extras.peek: + since = extras.peek[r.room_id].get("since", None) if since: tok = SyncNextBatchToken.from_string(since) r.since_token = tok.stream_token @@ -867,7 +868,7 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, if r.room_id in all_tags: r.always_include = True - for room_id in set(include_map.keys()) - {r.room_id for r in room_entries}: + for room_id in set(extras.peek.keys()) - {r.room_id for r in room_entries}: sync_result_builder.errors.append(ErrorSyncResult( room_id=room_id, errcode=Codes.CANNOT_PEEK, @@ -878,7 +879,7 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, room_ids = [r.room_id for r in room_entries] pagination_limit = pagination_config.limit - extra_limit = extras.get("paginate", {}).get("limit", 0) if extras else 0 + extra_limit = extras.paginate.get("limit", 0) room_map = yield self._get_room_timestamps_at_token( room_ids, sync_result_builder.now_token, sync_config, diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 683bba4cf640..61d013eb572d 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -20,7 +20,8 @@ parse_json_object_from_request, ) from synapse.handlers.sync import ( - SyncConfig, SyncPaginationConfig, SYNC_PAGINATION_TAGS_IGNORE, + SyncConfig, SyncPaginationConfig, SYNC_PAGINATION_TAGS_IGNORE, SyncExtras, + DEFAULT_SYNC_EXTRAS, ) from synapse.types import SyncNextBatchToken from synapse.events.utils import ( @@ -100,7 +101,11 @@ def on_POST(self, request): since = body.get("since", None) - extras = body.get("extras", None) + extras = body.get("extras", {}) + extras = SyncExtras( + paginate=extras.get("paginate", {}), + peek=extras.get("peek", {}), + ) if "from" in body: # /events used to use 'from', but /sync uses 'since'. @@ -245,7 +250,7 @@ def on_GET(self, request): @defer.inlineCallbacks def _handle_sync(self, requester, sync_config, batch_token, set_presence, - full_state, timeout, extras=None): + full_state, timeout, extras=DEFAULT_SYNC_EXTRAS): affect_presence = set_presence != PresenceState.OFFLINE user = sync_config.user From 9df5f8168705cdb855829a6e2c1a459560f1e85c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 23 Jun 2016 17:50:30 +0100 Subject: [PATCH 25/37] Make get_room_tags_changed take a now position. Comments --- synapse/handlers/sync.py | 15 ++++++++++++++- synapse/storage/tags.py | 12 +++++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 06ffb9040c77..f068814e5de0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -827,7 +827,10 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, if sync_result_builder.since_token: stream_id = sync_result_builder.since_token.account_data_key - tag_changes = yield self.store.get_room_tags_changed(user_id, stream_id) + now_stream_id = sync_result_builder.now_token.account_data_key + tag_changes = yield self.store.get_room_tags_changed( + user_id, stream_id, now_stream_id + ) else: tag_changes = {} @@ -1207,6 +1210,8 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, if not (always_include or batch or account_data or ephemeral): return + # At this point we're guarenteed (?) to send down the room, so if we + # need to resync the entire room do so now. if room_builder.would_require_resync: batch = yield self._load_filtered_recents( room_id, sync_config, @@ -1257,6 +1262,11 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, @defer.inlineCallbacks def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit): + """For each room, get the last origin_server_ts timestamp the client + would see (after filtering) at a particular token. + + Only attempts finds the latest `limit` room timestamps. + """ room_to_entries = {} @defer.inlineCallbacks @@ -1317,6 +1327,9 @@ def _get_last_ts(room_id): @defer.inlineCallbacks def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token, pagination_state): + """Work out which rooms we haven't sent to the client yet, so would + require us to send down the full state + """ start_ts = yield self._get_room_timestamps_at_token( room_ids, since_token, sync_config=sync_config, diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 682576fb8de2..8dcbe9bc9022 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -176,7 +176,13 @@ def get_tags_for_room(self, user_id, room_id): row["tag"]: json.loads(row["content"]) for row in rows }) - def get_room_tags_changed(self, user_id, stream_id): + def get_room_tags_changed(self, user_id, stream_id, now_id): + """Returns the rooms that have been newly tagged or had all their tags + removed since `stream_id`. + + Collapses multiple changes into one. For example, if a room has gone + from untagged to tagged back to untagged, the room_id won't be returned. + """ changed = self._account_data_stream_cache.has_entity_changed( user_id, int(stream_id) ) @@ -187,8 +193,8 @@ def get_room_tags_changed(self, user_id, stream_id): def _get_room_tags_changed(txn): txn.execute( "SELECT room_id, change FROM room_tags_change_revisions" - " WHERE user_id = ? AND stream_id > ?", - (user_id, stream_id) + " WHERE user_id = ? AND stream_id > ? AND stream_id <= ?", + (user_id, stream_id, now_id) ) results = Counter() From bf0edf7a16deaf52f9eaf1d416297f30377b0ba5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Jun 2016 11:05:09 +0100 Subject: [PATCH 26/37] Make jenkins-unittests.sh install deps --- jenkins-unittests.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/jenkins-unittests.sh b/jenkins-unittests.sh index 104d51199474..7b56a714a15b 100755 --- a/jenkins-unittests.sh +++ b/jenkins-unittests.sh @@ -20,6 +20,10 @@ export DUMP_COVERAGE_COMMAND="coverage help" # UNSTABLE or FAILURE this build. export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" +TOX_BIN=$WORKSPACE/.tox/py27/bin +python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install +$TOX_BIN/pip install lxml + rm .coverage* || echo "No coverage files to remove" tox -e py27 From 62050d2dfbdc029cebb1b2b8451ad128e3a6b022 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Jun 2016 11:11:53 +0100 Subject: [PATCH 27/37] Comments --- synapse/handlers/sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f068814e5de0..37a2c4ba8b40 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1456,6 +1456,9 @@ def __init__(self, room_id, rtype, events, newly_joined, self.since_token = since_token self.upto_token = upto_token + # Should this room always be included in the sync? self.always_include = False + # If we send down this room, should we send down the full state? self.would_require_resync = False + # Should the client consider this room "synced"? self.synced = True From a72919b748d1c9231e3f7e8c16662c203b3f8f57 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Jun 2016 13:54:06 +0100 Subject: [PATCH 28/37] Add get_last_event_id_ts_for_room to slave DataStore --- synapse/replication/slave/storage/events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 877c68508ceb..03231550b751 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -144,6 +144,8 @@ def __init__(self, db_conn, hs): _get_events_around_txn = DataStore._get_events_around_txn.__func__ _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ + get_last_event_id_ts_for_room = DataStore.get_last_event_id_ts_for_room.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() From 3ace9bdff9dc3f3810b539820d63367728f93964 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Jun 2016 16:34:37 +0100 Subject: [PATCH 29/37] Empty commit From c0b2f33dc23d005eed006a60853ebcca8d3c117d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2016 10:34:52 +0100 Subject: [PATCH 30/37] Logging --- synapse/handlers/sync.py | 11 ++++++++--- synapse/rest/client/v2_alpha/sync.py | 14 +++++++------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 37a2c4ba8b40..097456be4fa5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -906,15 +906,20 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, r.would_require_resync = True _, bottom_ts = cutoff_list[-1] - value = bottom_ts + new_pagination_value = bottom_ts + logger.info("old pagination value: %r", old_pagination_value) + logger.info("New pagination value: %r", new_pagination_value) + + # Are there any rooms that fall into the range between the + # old and new value? limited = any( - old_pagination_value < r[1] < value + old_pagination_value < r[1] < new_pagination_value for r in sorted_list[pagination_limit + extra_limit:] ) sync_result_builder.pagination_state = SyncPaginationState( - order=pagination_config.order, value=value, + order=pagination_config.order, value=new_pagination_value, limit=pagination_limit + extra_limit, tags=pagination_config.tags, ) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 61d013eb572d..0f8411f87fec 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -200,13 +200,6 @@ def on_GET(self, request): filter_id = parse_string(request, "filter", default=None) full_state = parse_boolean(request, "full_state", default=False) - logger.info( - "/sync: user=%r, timeout=%r, since=%r," - " set_presence=%r, filter_id=%r" % ( - user, timeout, since, set_presence, filter_id - ) - ) - request_key = (user, timeout, since, filter_id, full_state) if filter_id: @@ -255,6 +248,13 @@ def _handle_sync(self, requester, sync_config, batch_token, set_presence, user = sync_config.user + logger.info( + "/sync: user=%r, timeout=%r, since=%r," + " set_presence=%r" % ( + user, timeout, batch_token, set_presence + ) + ) + if affect_presence: yield self.presence_handler.set_state(user, {"presence": set_presence}) From 3263e12d73df503aa8bc95d679a898d422ec171a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2016 11:24:58 +0100 Subject: [PATCH 31/37] Try serializing as json rather than msgpack --- synapse/handlers/sync.py | 4 +--- synapse/types.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 097456be4fa5..3c6f2fe9897d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -924,9 +924,7 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, tags=pagination_config.tags, ) - to_sync_map = { - key: value for key, value in cutoff_list - } + to_sync_map = {key: value for key, value in cutoff_list} else: to_sync_map = {} diff --git a/synapse/types.py b/synapse/types.py index f0ad527bc049..cfd3e7328be9 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,7 @@ from collections import namedtuple from unpaddedbase64 import encode_base64, decode_base64 -import msgpack +import ujson as msgpack Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) From 92c58932d1e99c68623b638e11636649d15ede16 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2016 11:49:45 +0100 Subject: [PATCH 32/37] More logging --- synapse/rest/client/v2_alpha/sync.py | 4 ++++ synapse/types.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 0f8411f87fec..e20a9ef31790 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -101,6 +101,8 @@ def on_POST(self, request): since = body.get("since", None) + logger.info("Since: %r", since) + extras = body.get("extras", {}) extras = SyncExtras( paginate=extras.get("paginate", {}), @@ -175,6 +177,8 @@ def on_POST(self, request): extras=extras, ) + logger.info("next_batch: %r", sync_result[1]["next_batch"]) + defer.returnValue(sync_result) @defer.inlineCallbacks diff --git a/synapse/types.py b/synapse/types.py index cfd3e7328be9..f0ad527bc049 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,7 @@ from collections import namedtuple from unpaddedbase64 import encode_base64, decode_base64 -import ujson as msgpack +import msgpack Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) From 4c67e06dfb0dfb69f173b81c0d7be86a44616fe3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2016 13:18:04 +0100 Subject: [PATCH 33/37] Use JSON instead of msgpack --- synapse/types.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/types.py b/synapse/types.py index f0ad527bc049..0f787d3a4304 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,7 @@ from collections import namedtuple from unpaddedbase64 import encode_base64, decode_base64 -import msgpack +import ujson as serializer Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) @@ -127,7 +127,7 @@ class SyncNextBatchToken( @classmethod def from_string(cls, string): try: - d = msgpack.loads(decode_base64(string)) + d = serializer.loads(decode_base64(string)) pa = d.get("pa", None) if pa: pa = SyncPaginationState.from_dict(pa) @@ -139,7 +139,7 @@ def from_string(cls, string): raise SynapseError(400, "Invalid Token") def to_string(self): - return encode_base64(msgpack.dumps({ + return encode_base64(serializer.dumps({ "t": self.stream_token.to_arr(), "pa": self.pagination_state.to_dict() if self.pagination_state else None, })) From f07f99387e16ec604d60f00633cd13359e393c17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2016 13:25:44 +0100 Subject: [PATCH 34/37] Use cbor --- synapse/python_dependencies.py | 2 +- synapse/types.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 281322261967..5b98940292b7 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -36,7 +36,7 @@ "blist": ["blist"], "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"], "pymacaroons-pynacl": ["pymacaroons"], - "msgpack-python": ["msgpack"], + "cbor2": ["cbor2"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/types.py b/synapse/types.py index 0f787d3a4304..2bbc7ae3b7ec 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,7 @@ from collections import namedtuple from unpaddedbase64 import encode_base64, decode_base64 -import ujson as serializer +import cbor2 as serializer Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) From 4b7abedfd984b099f1a00452246aa7816dede7dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2016 14:52:08 +0100 Subject: [PATCH 35/37] Comments --- synapse/handlers/sync.py | 15 ++++++++------- synapse/rest/client/v2_alpha/sync.py | 5 ----- synapse/storage/stream.py | 10 ++++++++++ 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3c6f2fe9897d..0c88a10ba67e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -47,6 +47,7 @@ class SyncPaginationConfig(collections.namedtuple("SyncPaginationConfig", [ "limit", "tags", ])): + "Initial pagination configuration from initial sync." def __init__(self, order, limit, tags): if order not in SYNC_PAGINATION_VALID_ORDERS: raise SynapseError(400, "Invalid 'order'") @@ -838,6 +839,8 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, for r in room_entries: if r.room_id in missing_state: if include_all_tags: + # If we're always including tagged rooms, then only + # resync rooms which are newly tagged. change = tag_changes.get(r.room_id) if change == TAG_CHANGE_NEWLY_TAGGED: r.always_include = True @@ -908,13 +911,11 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, _, bottom_ts = cutoff_list[-1] new_pagination_value = bottom_ts - logger.info("old pagination value: %r", old_pagination_value) - logger.info("New pagination value: %r", new_pagination_value) - - # Are there any rooms that fall into the range between the - # old and new value? + # We're limited if there are any rooms that are after cutoff + # in the list, but still have an origin server ts from after + # the pagination value from the since token. limited = any( - old_pagination_value < r[1] < new_pagination_value + old_pagination_value < r[1] for r in sorted_list[pagination_limit + extra_limit:] ) @@ -924,7 +925,7 @@ def _update_room_entries_for_paginated_sync(self, sync_result_builder, tags=pagination_config.tags, ) - to_sync_map = {key: value for key, value in cutoff_list} + to_sync_map = dict(cutoff_list) else: to_sync_map = {} diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index e20a9ef31790..693b1bad07ba 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -98,11 +98,8 @@ def on_POST(self, request): body = parse_json_object_from_request(request) timeout = body.get("timeout", 0) - since = body.get("since", None) - logger.info("Since: %r", since) - extras = body.get("extras", {}) extras = SyncExtras( paginate=extras.get("paginate", {}), @@ -177,8 +174,6 @@ def on_POST(self, request): extras=extras, ) - logger.info("next_batch: %r", sync_result[1]["next_batch"]) - defer.returnValue(sync_result) @defer.inlineCallbacks diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index ada20706dca3..434adf9a1779 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -526,6 +526,16 @@ def _set_before_and_after(events, rows, topo_order=True): ) def get_last_event_id_ts_for_room(self, room_id, token): + """Get the latest event_id and origin_server_ts for a room_id before a + given token. + + Args: + room_id (str) + token (str) + + Returns: + Dictionary with ``event_id`` and ``origin_server_ts`` keys. + """ stream_ordering = RoomStreamToken.parse_stream_token(token).stream sql = ( From 6c137b321decdca9a7fb3f34616c8520f208c31c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2016 15:21:12 +0100 Subject: [PATCH 36/37] Encode batch tokens better --- synapse/handlers/sync.py | 6 +++--- synapse/types.py | 12 +++++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0c88a10ba67e..de1571de8879 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -62,13 +62,13 @@ def __init__(self, order, limit, tags): super(SyncPaginationConfig, self).__init__(order, limit, tags) -SYNC_PAGINATION_TAGS_INCLUDE_ALL = "include_all" -SYNC_PAGINATION_TAGS_IGNORE = "ignore" +SYNC_PAGINATION_TAGS_INCLUDE_ALL = "m.include_all" +SYNC_PAGINATION_TAGS_IGNORE = "m.ignore" SYNC_PAGINATION_VALID_TAGS_OPTIONS = ( SYNC_PAGINATION_TAGS_INCLUDE_ALL, SYNC_PAGINATION_TAGS_IGNORE, ) -SYNC_PAGINATION_ORDER_TS = "o" +SYNC_PAGINATION_ORDER_TS = "m.origin_server_ts" SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,) diff --git a/synapse/types.py b/synapse/types.py index 2bbc7ae3b7ec..6c7bdf0cf9c7 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -148,6 +148,12 @@ def replace(self, **kwargs): return self._replace(**kwargs) +_ORDER_ENCODE = {"m.origin_server_ts": "o"} +_ORDER_DECODE = {v: k for k, v in _ORDER_ENCODE.items()} +_TAG_ENCODE = {"m.include_all": "i", "m.ignore": "x"} +_TAG_DECODE = {v: k for k, v in _TAG_ENCODE.items()} + + class SyncPaginationState( namedtuple("SyncPaginationState", ( "order", @@ -159,16 +165,16 @@ class SyncPaginationState( @classmethod def from_dict(cls, d): try: - return cls(d["o"], d["v"], d["l"], d["t"]) + return cls(_ORDER_DECODE[d["o"]], d["v"], d["l"], _TAG_DECODE[d["t"]]) except: raise SynapseError(400, "Invalid Token") def to_dict(self): return { - "o": self.order, + "o": _ORDER_ENCODE[self.order], "v": self.value, "l": self.limit, - "t": self.tags, + "t": _TAG_ENCODE[self.tags], } def replace(self, **kwargs): From d2b59f2482df7e1bbc7eec1a07d7359cb218de75 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jun 2016 10:55:54 +0100 Subject: [PATCH 37/37] Implement top-level unread_notifications --- synapse/handlers/sync.py | 50 ++++++++++++++++++++++++++-- synapse/rest/client/v2_alpha/sync.py | 1 + 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index de1571de8879..e1c940af4b4b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -170,6 +170,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ "archived", # ArchivedSyncResult for each archived room. "errors", # ErrorSyncResult "pagination_info", + "unread_notifications", ])): __slots__ = [] @@ -561,10 +562,16 @@ def generate_sync_result(self, sync_config, batch_token=None, full_state=False, # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() + all_joined_rooms = yield self.store.get_rooms_for_user( + sync_config.user.to_string() + ) + all_joined_rooms = [room.room_id for room in all_joined_rooms] + sync_result_builder = SyncResultBuilder( sync_config, full_state, batch_token=batch_token, now_token=now_token, + all_joined_rooms=all_joined_rooms, ) account_data_by_room = yield self._generate_sync_entry_for_account_data( @@ -580,6 +587,8 @@ def generate_sync_result(self, sync_config, batch_token=None, full_state=False, sync_result_builder, newly_joined_rooms, newly_joined_users ) + yield self._generate_notification_counts(sync_result_builder) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -592,8 +601,41 @@ def generate_sync_result(self, sync_config, batch_token=None, full_state=False, pagination_state=sync_result_builder.pagination_state, ), pagination_info=sync_result_builder.pagination_info, + unread_notifications=sync_result_builder.unread_notifications, )) + @defer.inlineCallbacks + def _generate_notification_counts(self, sync_result_builder): + rooms = sync_result_builder.all_joined_rooms + + total_notif_count = [0] + rooms_with_notifs = set() + total_highlight_count = [0] + rooms_with_highlights = set() + + @defer.inlineCallbacks + def notif_for_room(room_id): + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_result_builder.sync_config + ) + if notifs is not None: + total_notif_count[0] += notifs["notify_count"] + total_highlight_count[0] += notifs["highlight_count"] + + if notifs["notify_count"]: + rooms_with_notifs.add(room_id) + if notifs["highlight_count"]: + rooms_with_highlights.add(room_id) + + yield concurrently_execute(notif_for_room, rooms, 10) + + sync_result_builder.unread_notifications = { + "total_notification_count": total_notif_count[0], + "rooms_notification_count": len(rooms_with_notifs), + "total_highlight_count": total_highlight_count[0], + "rooms_highlight_count": len(rooms_with_highlights), + } + @defer.inlineCallbacks def _generate_sync_entry_for_account_data(self, sync_result_builder): """Generates the account data portion of the sync response. Populates @@ -1403,16 +1445,18 @@ class SyncResultBuilder(object): __slots__ = ( "sync_config", "full_state", "batch_token", "since_token", "pagination_state", "now_token", "presence", "account_data", "joined", "invited", "archived", - "pagination_info", "errors", + "pagination_info", "errors", "all_joined_rooms", "unread_notifications", ) - def __init__(self, sync_config, full_state, batch_token, now_token): + def __init__(self, sync_config, full_state, batch_token, now_token, + all_joined_rooms): """ Args: sync_config(SyncConfig) full_state(bool): The full_state flag as specified by user batch_token(SyncNextBatchToken): The token supplied by user, or None. now_token(StreamToken): The token to sync up to. + all_joined_rooms(list(str)): List of all joined room ids. """ self.sync_config = sync_config self.full_state = full_state @@ -1420,6 +1464,7 @@ def __init__(self, sync_config, full_state, batch_token, now_token): self.since_token = batch_token.stream_token if batch_token else None self.pagination_state = batch_token.pagination_state if batch_token else None self.now_token = now_token + self.all_joined_rooms = all_joined_rooms self.presence = [] self.account_data = [] @@ -1429,6 +1474,7 @@ def __init__(self, sync_config, full_state, batch_token, now_token): self.errors = [] self.pagination_info = {} + self.unread_notifications = {} class RoomSyncResultBuilder(object): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 693b1bad07ba..cfec804b4414 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -291,6 +291,7 @@ def _handle_sync(self, requester, sync_config, batch_token, set_presence, "leave": archived, }, "next_batch": sync_result.next_batch.to_string(), + "unread_notifications": sync_result.unread_notifications, } if sync_result.errors: