Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Experimental support for MSC3970: per-device transaction IDs (#15318)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandhose authored Apr 25, 2023
1 parent ea5c3ed commit 8b3a502
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 48 deletions.
1 change: 1 addition & 0 deletions changelog.d/15318.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Experimental support for MSC3970: Scope transaction IDs to devices.
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:

# MSC2659: Application service ping endpoint
self.msc2659_enabled = experimental.get("msc2659_enabled", False)

# MSC3970: Scope transaction IDs to devices
self.msc3970_enabled = experimental.get("msc3970_enabled", False)
9 changes: 8 additions & 1 deletion synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,16 @@ def __init__(self, internal_metadata_dict: JsonDict):
soft_failed: DictProperty[bool] = DictProperty("soft_failed")
proactively_send: DictProperty[bool] = DictProperty("proactively_send")
redacted: DictProperty[bool] = DictProperty("redacted")
historical: DictProperty[bool] = DictProperty("historical")

txn_id: DictProperty[str] = DictProperty("txn_id")
"""The transaction ID, if it was set when the event was created."""

token_id: DictProperty[int] = DictProperty("token_id")
historical: DictProperty[bool] = DictProperty("historical")
"""The access token ID of the user who sent this event, if any."""

device_id: DictProperty[str] = DictProperty("device_id")
"""The device ID of the user who sent this event, if any."""

# XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't
Expand Down
58 changes: 42 additions & 16 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,16 @@ def serialize_event(
time_now_ms: int,
*,
config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
msc3970_enabled: bool = False,
) -> JsonDict:
"""Serialize event for clients
Args:
e
time_now_ms
config: Event serialization config
msc3970_enabled: Whether MSC3970 is enabled. It changes whether we should
include the `transaction_id` in the event's `unsigned` section.
Returns:
The serialized event dictionary.
Expand All @@ -368,27 +371,43 @@ def serialize_event(

if "redacted_because" in e.unsigned:
d["unsigned"]["redacted_because"] = serialize_event(
e.unsigned["redacted_because"], time_now_ms, config=config
e.unsigned["redacted_because"],
time_now_ms,
config=config,
msc3970_enabled=msc3970_enabled,
)

# If we have a txn_id saved in the internal_metadata, we should include it in the
# unsigned section of the event if it was sent by the same session as the one
# requesting the event.
# There is a special case for guests, because they only have one access token
# without associated access_token_id, so we always include the txn_id for events
# they sent.
txn_id = getattr(e.internal_metadata, "txn_id", None)
txn_id: Optional[str] = getattr(e.internal_metadata, "txn_id", None)
if txn_id is not None and config.requester is not None:
event_token_id = getattr(e.internal_metadata, "token_id", None)
if config.requester.user.to_string() == e.sender and (
(
event_token_id is not None
and config.requester.access_token_id is not None
and event_token_id == config.requester.access_token_id
# For the MSC3970 rules to be applied, we *need* to have the device ID in the
# event internal metadata. Since we were not recording them before, if it hasn't
# been recorded, we fallback to the old behaviour.
event_device_id: Optional[str] = getattr(e.internal_metadata, "device_id", None)
if msc3970_enabled and event_device_id is not None:
if event_device_id == config.requester.device_id:
d["unsigned"]["transaction_id"] = txn_id

else:
# The pre-MSC3970 behaviour is to only include the transaction ID if the
# event was sent from the same access token. For regular users, we can use
# the access token ID to determine this. For guests, we can't, but since
# each guest only has one access token, we can just check that the event was
# sent by the same user as the one requesting the event.
event_token_id: Optional[int] = getattr(
e.internal_metadata, "token_id", None
)
or config.requester.is_guest
):
d["unsigned"]["transaction_id"] = txn_id
if config.requester.user.to_string() == e.sender and (
(
event_token_id is not None
and config.requester.access_token_id is not None
and event_token_id == config.requester.access_token_id
)
or config.requester.is_guest
):
d["unsigned"]["transaction_id"] = txn_id

# invite_room_state and knock_room_state are a list of stripped room state events
# that are meant to provide metadata about a room to an invitee/knocker. They are
Expand Down Expand Up @@ -419,6 +438,9 @@ class EventClientSerializer:
clients.
"""

def __init__(self, *, msc3970_enabled: bool = False):
self._msc3970_enabled = msc3970_enabled

def serialize_event(
self,
event: Union[JsonDict, EventBase],
Expand All @@ -443,7 +465,9 @@ def serialize_event(
if not isinstance(event, EventBase):
return event

serialized_event = serialize_event(event, time_now, config=config)
serialized_event = serialize_event(
event, time_now, config=config, msc3970_enabled=self._msc3970_enabled
)

# Check if there are any bundled aggregations to include with the event.
if bundle_aggregations:
Expand Down Expand Up @@ -501,7 +525,9 @@ def _inject_bundled_aggregations(
# `sender` of the edit; however MSC3925 proposes extending it to the whole
# of the edit, which is what we do here.
serialized_aggregations[RelationTypes.REPLACE] = self.serialize_event(
event_aggregations.replace, time_now, config=config
event_aggregations.replace,
time_now,
config=config,
)

# Include any threaded replies to this event.
Expand Down
38 changes: 33 additions & 5 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ def __init__(self, hs: "HomeServer"):
expiry_ms=30 * 60 * 1000,
)

self._msc3970_enabled = hs.config.experimental.msc3970_enabled

async def create_event(
self,
requester: Requester,
Expand Down Expand Up @@ -701,9 +703,16 @@ async def create_event(
if require_consent and not is_exempt:
await self.assert_accepted_privacy_policy(requester)

# Save the access token ID, the device ID and the transaction ID in the event
# internal metadata. This is useful to determine if we should echo the
# transaction_id in events.
# See `synapse.events.utils.EventClientSerializer.serialize_event`
if requester.access_token_id is not None:
builder.internal_metadata.token_id = requester.access_token_id

if requester.device_id is not None:
builder.internal_metadata.device_id = requester.device_id

if txn_id is not None:
builder.internal_metadata.txn_id = txn_id

Expand Down Expand Up @@ -897,12 +906,31 @@ async def get_event_from_transaction(
Returns:
An event if one could be found, None otherwise.
"""

if self._msc3970_enabled and requester.device_id:
# When MSC3970 is enabled, we lookup for events sent by the same device first,
# and fallback to the old behaviour if none were found.
existing_event_id = (
await self.store.get_event_id_from_transaction_id_and_device_id(
room_id,
requester.user.to_string(),
requester.device_id,
txn_id,
)
)
if existing_event_id:
return await self.store.get_event(existing_event_id)

# Pre-MSC3970, we looked up for events that were sent by the same session by
# using the access token ID.
if requester.access_token_id:
existing_event_id = await self.store.get_event_id_from_transaction_id(
room_id,
requester.user.to_string(),
requester.access_token_id,
txn_id,
existing_event_id = (
await self.store.get_event_id_from_transaction_id_and_token_id(
room_id,
requester.user.to_string(),
requester.access_token_id,
txn_id,
)
)
if existing_event_id:
return await self.store.get_event(existing_event_id)
Expand Down
33 changes: 26 additions & 7 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def __init__(self, hs: "HomeServer"):
self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)

self._msc3970_enabled = hs.config.experimental.msc3970_enabled

def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
Expand Down Expand Up @@ -399,13 +401,30 @@ async def _local_membership_update(
# Check if we already have an event with a matching transaction ID. (We
# do this check just before we persist an event as well, but may as well
# do it up front for efficiency.)
if txn_id and requester.access_token_id:
existing_event_id = await self.store.get_event_id_from_transaction_id(
room_id,
requester.user.to_string(),
requester.access_token_id,
txn_id,
)
if txn_id:
existing_event_id = None
if self._msc3970_enabled and requester.device_id:
# When MSC3970 is enabled, we lookup for events sent by the same device
# first, and fallback to the old behaviour if none were found.
existing_event_id = (
await self.store.get_event_id_from_transaction_id_and_device_id(
room_id,
requester.user.to_string(),
requester.device_id,
txn_id,
)
)

if requester.access_token_id and not existing_event_id:
existing_event_id = (
await self.store.get_event_id_from_transaction_id_and_token_id(
room_id,
requester.user.to_string(),
requester.access_token_id,
txn_id,
)
)

if existing_event_id:
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream
Expand Down
13 changes: 13 additions & 0 deletions synapse/rest/client/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def __init__(self, hs: "HomeServer"):
# for at *LEAST* 30 mins, and at *MOST* 60 mins.
self.cleaner = self.clock.looping_call(self._cleanup, CLEANUP_PERIOD_MS)

self._msc3970_enabled = hs.config.experimental.msc3970_enabled

def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hashable:
"""A helper function which returns a transaction key that can be used
with TransactionCache for idempotent requests.
Expand All @@ -58,6 +60,7 @@ def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hasha
requests to the same endpoint. The key is formed from the HTTP request
path and attributes from the requester: the access_token_id for regular users,
the user ID for guest users, and the appservice ID for appservice users.
With MSC3970, for regular users, the key is based on the user ID and device ID.
Args:
request: The incoming request.
Expand All @@ -67,11 +70,21 @@ def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hasha
"""
assert request.path is not None
path: str = request.path.decode("utf8")

if requester.is_guest:
assert requester.user is not None, "Guest requester must have a user ID set"
return (path, "guest", requester.user)

elif requester.app_service is not None:
return (path, "appservice", requester.app_service.id)

# With MSC3970, we use the user ID and device ID as the transaction key
elif self._msc3970_enabled:
assert requester.user, "Requester must have a user"
assert requester.device_id, "Requester must have a device_id"
return (path, "user", requester.user, requester.device_id)

# Otherwise, the pre-MSC3970 behaviour is to use the access token ID
else:
assert (
requester.access_token_id is not None
Expand Down
4 changes: 3 additions & 1 deletion synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,9 @@ def get_oidc_handler(self) -> "OidcHandler":

@cache_in_self
def get_event_client_serializer(self) -> EventClientSerializer:
return EventClientSerializer()
return EventClientSerializer(
msc3970_enabled=self.config.experimental.msc3970_enabled
)

@cache_in_self
def get_password_policy_handler(self) -> PasswordPolicyHandler:
Expand Down
68 changes: 54 additions & 14 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ def __init__(
self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen

self._msc3970_enabled = hs.config.experimental.msc3970_enabled

@trace
async def _persist_events_and_state_updates(
self,
Expand Down Expand Up @@ -977,23 +979,43 @@ def _persist_transaction_ids_txn(
) -> None:
"""Persist the mapping from transaction IDs to event IDs (if defined)."""

to_insert = []
inserted_ts = self._clock.time_msec()
to_insert_token_id: List[Tuple[str, str, str, int, str, int]] = []
to_insert_device_id: List[Tuple[str, str, str, str, str, int]] = []
for event, _ in events_and_contexts:
token_id = getattr(event.internal_metadata, "token_id", None)
txn_id = getattr(event.internal_metadata, "txn_id", None)
if token_id and txn_id:
to_insert.append(
(
event.event_id,
event.room_id,
event.sender,
token_id,
txn_id,
self._clock.time_msec(),
token_id = getattr(event.internal_metadata, "token_id", None)
device_id = getattr(event.internal_metadata, "device_id", None)

if txn_id is not None:
if token_id is not None:
to_insert_token_id.append(
(
event.event_id,
event.room_id,
event.sender,
token_id,
txn_id,
inserted_ts,
)
)
)

if to_insert:
if device_id is not None:
to_insert_device_id.append(
(
event.event_id,
event.room_id,
event.sender,
device_id,
txn_id,
inserted_ts,
)
)

# Pre-MSC3970, we rely on the access_token_id to scope the txn_id for events.
# Since this is an experimental flag, we still store the mapping even if the
# flag is disabled.
if to_insert_token_id:
self.db_pool.simple_insert_many_txn(
txn,
table="event_txn_id",
Expand All @@ -1005,7 +1027,25 @@ def _persist_transaction_ids_txn(
"txn_id",
"inserted_ts",
),
values=to_insert,
values=to_insert_token_id,
)

# With MSC3970, we rely on the device_id instead to scope the txn_id for events.
# We're only inserting if MSC3970 is *enabled*, because else the pre-MSC3970
# behaviour would allow for a UNIQUE constraint violation on this table
if to_insert_device_id and self._msc3970_enabled:
self.db_pool.simple_insert_many_txn(
txn,
table="event_txn_id_device_id",
keys=(
"event_id",
"room_id",
"user_id",
"device_id",
"txn_id",
"inserted_ts",
),
values=to_insert_device_id,
)

async def update_current_state(
Expand Down
Loading

0 comments on commit 8b3a502

Please sign in to comment.