Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Recursively fetch the thread for receipts & notifications. #13824

Merged
merged 49 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
9cb167c
Update filtering to include the thread notifications flag.
clokep Sep 8, 2022
8ac2f32
Ensure that the thread_id column is non-null and then require it to b…
clokep Sep 9, 2022
111fe57
Add infrastructure to pass notifications per thread.
clokep Sep 8, 2022
62aa85b
Calculate thread specific notification counts.
clokep Sep 8, 2022
cb679e2
Clarify comment.
clokep Sep 16, 2022
ba00c5f
Simplify handling of summaries with neither notifications or unread c…
clokep Sep 16, 2022
eb56567
Delete old push summaries.
clokep Sep 16, 2022
e6f97ec
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 16, 2022
4f4711a
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 19, 2022
8b63c5b
Fix postgres compatibility.
clokep Sep 19, 2022
c3783df
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 20, 2022
c4f2d50
Create a constant for "main".
clokep Sep 20, 2022
6927e59
Reduce duplicated code.
clokep Sep 20, 2022
1d05975
Lint
clokep Sep 20, 2022
28b5a1f
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 22, 2022
55d15a3
Threads must already be summarized between the stream orderings that …
clokep Sep 22, 2022
56c21e4
Don't delete empty push summaries.
clokep Sep 22, 2022
241b40c
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 23, 2022
a04258f
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 26, 2022
8353e7e
Recursively fetch the thread ID when calculating notifications.
clokep Sep 15, 2022
cb71bc5
Additional validation for /receipts.
clokep Sep 15, 2022
c4e18b3
Review comments.
clokep Sep 27, 2022
ddbb644
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 27, 2022
f20620f
Update constraints and indexes now that thread ID is used.
clokep Sep 12, 2022
52b0a3d
Mark threads as read separately.
clokep Sep 22, 2022
a03560b
Merge remote-tracking branch 'origin/clokep/threads-notif-2' into clo…
clokep Sep 27, 2022
fb50244
Use MAIN_TIMELINE constant in more places.
clokep Sep 28, 2022
79452e9
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 28, 2022
b0d9008
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 28, 2022
d6d7788
Combine logic for processing receipts.
clokep Sep 28, 2022
162bd8d
Expand comment and rename variables for clarity.
clokep Sep 28, 2022
e7b5421
Clarify comment.
clokep Sep 28, 2022
5f5e9ad
Improve docstrings.
clokep Sep 28, 2022
16a60b9
Rename function.
clokep Sep 28, 2022
f6a99c8
Lint
clokep Sep 28, 2022
f279a15
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 29, 2022
ef37de2
Merge remote-tracking branch 'origin/clokep/threads-notif-2' into clo…
clokep Sep 29, 2022
6b2384d
Fix typo.
clokep Sep 29, 2022
041fe7f
Only attempt to find threaded receipts newer than the latest unthread…
clokep Sep 29, 2022
f416be9
Merge branch 'develop' into clokep/threads-notif-2
clokep Oct 4, 2022
0b1b432
Update for changes in develop.
clokep Oct 4, 2022
2322f8a
Merge remote-tracking branch 'origin/clokep/threads-notif-2' into clo…
clokep Oct 4, 2022
20ace0f
Update delta numbers.
clokep Oct 4, 2022
bd6c80c
Update background index numbers.
clokep Oct 4, 2022
09f36e9
Merge remote-tracking branch 'origin/clokep/threads-notif-2' into clo…
clokep Oct 4, 2022
49a2aaa
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Oct 4, 2022
377acc3
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Oct 4, 2022
c3c51f9
Merge remote-tracking branch 'origin/clokep/threads-notif-3b' into cl…
clokep Oct 4, 2022
d3d0ab7
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13824.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).
5 changes: 5 additions & 0 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,13 @@ async def action_for_event_by_user(
relation.parent_id,
itertools.chain(*(r.rules() for r in rules_by_user.values())),
)
# Recursively attempt to find the thread this event relates to.
if relation.rel_type == RelationTypes.THREAD:
thread_id = relation.parent_id
else:
# Since the event has not yet been persisted we check whether
# the parent is part of a thread.
thread_id = await self.store.get_thread_id(relation.parent_id) or "main"

evaluator = PushRuleEvaluator(
_flatten_dict(event),
Expand Down
22 changes: 20 additions & 2 deletions synapse/rest/client/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import TYPE_CHECKING, Tuple

from synapse.api.constants import ReceiptTypes
from synapse.api.errors import SynapseError
from synapse.api.errors import Codes, SynapseError
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
Expand All @@ -43,6 +43,7 @@ def __init__(self, hs: "HomeServer"):
self.receipts_handler = hs.get_receipts_handler()
self.read_marker_handler = hs.get_read_marker_handler()
self.presence_handler = hs.get_presence_handler()
self._main_store = hs.get_datastores().main

self._known_receipt_types = {
ReceiptTypes.READ,
Expand Down Expand Up @@ -71,7 +72,24 @@ async def on_POST(
thread_id = body.get("thread_id")
if not thread_id or not isinstance(thread_id, str):
raise SynapseError(
400, "thread_id field must be a non-empty string"
400,
"thread_id field must be a non-empty string",
Codes.INVALID_PARAM,
)

if receipt_type == ReceiptTypes.FULLY_READ:
raise SynapseError(
400,
f"thread_id is not compatible with {ReceiptTypes.FULLY_READ} receipts.",
Codes.INVALID_PARAM,
)

# Ensure the event ID roughly correlates to the thread ID.
if thread_id != await self._main_store.get_thread_id(event_id):
raise SynapseError(
400,
f"event_id {event_id} is not related to thread {thread_id}",
Codes.INVALID_PARAM,
)

await self.presence_handler.bump_presence_active_time(requester.user)
Expand Down
36 changes: 36 additions & 0 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,42 @@ def _get_event_relations(
"get_event_relations", _get_event_relations
)

@cached()
async def get_thread_id(self, event_id: str) -> Optional[str]:
"""
Get the thread ID for an event. This considers multi-level relations,
e.g. an annotation to an event which is part of a thread.

Args:
event_id: The event ID to fetch the thread ID for.

Returns:
The event ID of the root event in the thread, if this event is part
of a thread. None, otherwise.
"""
# Since event relations form a tree, we should only ever find 0 or 1
# results from the below query.
sql = """
WITH RECURSIVE related_events AS (
SELECT event_id, relates_to_id, relation_type
FROM event_relations
WHERE event_id = ?
UNION SELECT e.event_id, e.relates_to_id, e.relation_type
FROM event_relations e
INNER JOIN related_events r ON r.relates_to_id = e.event_id
) SELECT relates_to_id FROM related_events WHERE relation_type = 'm.thread';
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""

def _get_thread_id(txn: LoggingTransaction) -> Optional[str]:
txn.execute(sql, (event_id,))
# TODO Should we ensure there's only a single result here?
row = txn.fetchone()
if row:
return row[0]
return None

return await self.db_pool.runInteraction("get_thread_id", _get_thread_id)


class RelationsStore(RelationsWorkerStore):
pass
100 changes: 100 additions & 0 deletions tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,106 @@ def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None:
_rotate()
_assert_counts(0, 0, 0, 0)

def test_recursive_thread(self) -> None:
"""
Events related to events in a thread should still be considered part of
that thread.
"""

# Create a user to receive notifications and send receipts.
user_id = self.register_user("user1235", "pass")
token = self.login("user1235", "pass")

# And another users to send events.
other_id = self.register_user("other", "pass")
other_token = self.login("other", "pass")

# Create a room and put both users in it.
room_id = self.helper.create_room_as(user_id, tok=token)
self.helper.join(room_id, other_id, tok=other_token)

# Update the user's push rules to care about reaction events.
self.get_success(
self.store.add_push_rule(
user_id,
"related_events",
priority_class=5,
conditions=[
{"kind": "event_match", "key": "type", "pattern": "m.reaction"}
],
actions=["notify"],
)
)

def _create_event(type: str, content: JsonDict) -> str:
result = self.helper.send_event(
room_id, type=type, content=content, tok=other_token
)
return result["event_id"]

def _assert_counts(noitf_count: int, thread_notif_count: int) -> None:
counts = self.get_success(
self.store.db_pool.runInteraction(
"get-unread-counts",
self.store._get_unread_counts_by_receipt_txn,
room_id,
user_id,
)
)
self.assertEqual(
counts.main_timeline,
NotifCounts(
notify_count=noitf_count, unread_count=0, highlight_count=0
),
)
if thread_notif_count:
self.assertEqual(
counts.threads,
{
thread_id: NotifCounts(
notify_count=thread_notif_count,
unread_count=0,
highlight_count=0,
),
},
)
else:
self.assertEqual(counts.threads, {})

# Create a root event.
thread_id = _create_event(
"m.room.message", {"msgtype": "m.text", "body": "msg"}
)
_assert_counts(1, 0)

# Reply, creating a thread.
reply_id = _create_event(
"m.room.message",
{
"msgtype": "m.text",
"body": "msg",
"m.relates_to": {
"rel_type": "m.thread",
"event_id": thread_id,
},
},
)
_assert_counts(1, 1)

# Create an event related to a thread event, this should still appear in
# the thread.
_create_event(
type="m.reaction",
content={
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": reply_id,
"key": "A",
}
},
)
_assert_counts(1, 2)

def test_find_first_stream_ordering_after_ts(self) -> None:
def add_event(so: int, ts: int) -> None:
self.get_success(
Expand Down