Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implement Read Marker API #2120

Merged
merged 15 commits into from
Apr 13, 2017
68 changes: 68 additions & 0 deletions synapse/handlers/read_marker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ._base import BaseHandler

from twisted.internet import defer

from synapse.util.async import Linearizer

import logging
logger = logging.getLogger(__name__)


class ReadMarkerHandler(BaseHandler):
def __init__(self, hs):
super(ReadMarkerHandler, self).__init__(hs)
self.server_name = hs.config.server_name
self.store = hs.get_datastore()
self.read_marker_linearizer = Linearizer(name="read_marker")
self.notifier = hs.get_notifier()

@defer.inlineCallbacks
def received_client_read_marker(self, room_id, user_id, event_id):
"""Updates the read marker for a given user in a given room if the event ID given
is ahead in the stream relative to the current read marker.

This uses a notifier to indicate that account data should be sent down /sync if
the read marker has changed.
"""

# Get ordering for existing read marker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outdated comment?

with (yield self.read_marker_linearizer.queue((room_id, user_id))):
account_data = yield self.store.get_account_data_for_room(user_id, room_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should be using a storage function that pulls out based on type too. I think there already is one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one for global_account_data but not for_room


existing_read_marker = None
if "m.read_marker" in account_data:
existing_read_marker = account_data["m.read_marker"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be written as:

existing_read_marker = account_data.get("m.read_marker", None)


should_update = True

if existing_read_marker:
should_update = yield self.store.is_event_after(
existing_read_marker['marker'],
event_id
)

if should_update:
content = {
"marker": event_id
}
max_id = yield self.store.add_account_data_to_room(
user_id, room_id, "m.read_marker", content
)
self.notifier.on_new_event(
"account_data_key", max_id, users=[user_id], rooms=[room_id]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to notify the entire room about this change, just the user.

)
2 changes: 2 additions & 0 deletions synapse/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
register,
auth,
receipts,
read_marker,
keys,
tokenrefresh,
tags,
Expand Down Expand Up @@ -88,6 +89,7 @@ def register_servlets(client_resource, hs):
register.register_servlets(hs, client_resource)
auth.register_servlets(hs, client_resource)
receipts.register_servlets(hs, client_resource)
read_marker.register_servlets(hs, client_resource)
keys.register_servlets(hs, client_resource)
tokenrefresh.register_servlets(hs, client_resource)
tags.register_servlets(hs, client_resource)
Expand Down
9 changes: 8 additions & 1 deletion synapse/rest/client/v2_alpha/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ._base import client_v2_patterns

from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.api.errors import AuthError
from synapse.api.errors import AuthError, SynapseError

from twisted.internet import defer

Expand Down Expand Up @@ -82,6 +82,13 @@ def on_PUT(self, request, user_id, room_id, account_data_type):

body = parse_json_object_from_request(request)

if account_data_type == "m.read_marker":
raise SynapseError(
405,
"Cannot set m.read_marker through this API. "
"Use /rooms/!roomId:server.name/read_marker"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference we tend to put spaces at the start:

"Cannot set m.read_marker through this API."
" Use /rooms/!roomId:server.name/read_marker"

This makes it more obvious that there is at least one space, which is quite important for e.g. SQL

)

max_id = yield self.store.add_account_data_to_room(
user_id, room_id, account_data_type, body
)
Expand Down
67 changes: 67 additions & 0 deletions synapse/rest/client/v2_alpha/read_marker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer

from synapse.http.servlet import RestServlet, parse_json_object_from_request
from ._base import client_v2_patterns

import logging


logger = logging.getLogger(__name__)


class ReadMarkerRestServlet(RestServlet):
PATTERNS = client_v2_patterns("/rooms/(?P<room_id>[^/]*)/read_marker$")

def __init__(self, hs):
super(ReadMarkerRestServlet, self).__init__()
self.hs = hs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to avoid taking a reference to hs if possible, and instead just pull out everything you need. This is for cleanliness and makes it clearer what the dependencies of this class is.

self.auth = hs.get_auth()
self.receipts_handler = hs.get_receipts_handler()
self.read_marker_handler = hs.get_read_marker_handler()
self.presence_handler = hs.get_presence_handler()

@defer.inlineCallbacks
def on_POST(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)

yield self.presence_handler.bump_presence_active_time(requester.user)

body = parse_json_object_from_request(request)

if "m.read" in body:
read_event_id = body["m.read"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually do:

read_event_id = body.get("m.read", None)
if read_event_id:
    ...

but its a matter of taste

yield self.receipts_handler.received_client_receipt(
room_id,
"m.read",
user_id=requester.user.to_string(),
event_id=read_event_id
)

if "m.read_marker" in body:
read_marker_event_id = body["m.read_marker"]
yield self.read_marker_handler.received_client_read_marker(
room_id,
user_id=requester.user.to_string(),
event_id=read_marker_event_id
)

defer.returnValue((200, {}))


def register_servlets(hs, http_server):
ReadMarkerRestServlet(hs).register(http_server)
5 changes: 5 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.notifier import Notifier
Expand Down Expand Up @@ -133,6 +134,7 @@ def build_DEPENDENCY(self)
'receipts_handler',
'macaroon_generator',
'tcp_replication',
'read_marker_handler',
]

def __init__(self, hostname, **kwargs):
Expand Down Expand Up @@ -291,6 +293,9 @@ def build_federation_sender(self):
def build_receipts_handler(self):
return ReceiptsHandler(self)

def build_read_marker_handler(self):
return ReadMarkerHandler(self)

def build_tcp_replication(self):
raise NotImplementedError()

Expand Down
29 changes: 29 additions & 0 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2159,6 +2159,35 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
]
)

@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
is_after = True

to_1, so_1 = yield self._get_event_ordering(event_id1)
to_2, so_2 = yield self._get_event_ordering(event_id2)

# Prevent updating if the existing marker is ahead in the stream
if to_1 > to_2:
is_after = False
elif to_1 == to_2 and so_1 >= so_2:
is_after = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can actually write this as:

is_after = (to_1, so_1) > (to_2, so_2)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly,

>>> ( 1, 2 ) < ( 1, 2 )
False
>>> ( 1, 2 ) < ( 1, 3 )
True

So we'll have to go for the form with two comparisons. is_event_after is now also an accurate name with this change...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly what you want though, isn't it?


defer.returnValue(is_after)

@defer.inlineCallbacks
def _get_event_ordering(self, event_id):
res = yield self._simple_select_one(
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
allow_none=True
)

if not res:
raise SynapseError(404, "Could not find event %s" % (event_id,))

defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"])))


AllNewEventsResult = namedtuple("AllNewEventsResult", [
"new_forward_events", "new_backfill_events",
Expand Down