diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 39ad2793d98d..c6690ee041f1 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -37,7 +37,7 @@ from typing_extensions import Literal from unpaddedbase64 import encode_base64 -from synapse.api.constants import RelationTypes +from synapse.api.constants import Membership, RelationTypes from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions from synapse.types import JsonDict, RoomStreamToken from synapse.util.caches import intern_dict @@ -339,12 +339,19 @@ def event_id(self) -> str: raise NotImplementedError() @property - def membership(self) -> str: - return self.content["membership"] + def membership(self) -> Optional[str]: + return self.content.get("membership") def is_state(self) -> bool: return self.get_state_key() is not None + @property + def is_notifiable(self) -> bool: + return ( + self.membership == Membership.INVITE + or not self.internal_metadata.is_outlier() + ) + def get_state_key(self) -> Optional[str]: """Get the state key of this event, or None if it's not a state event""" return self._dict.get("state_key") diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index dd4b9f66d10e..1cf09356a013 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -946,6 +946,12 @@ async def on_invite_request( ) context = EventContext.for_outlier(self._storage_controllers) + + if event.is_notifiable: + await self._federation_event_handler.bulk_push_rule_evaluator.action_for_event_by_user( + event, context + ) + await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] ) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index ace7adcffb61..db04caea0664 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -145,7 +145,7 @@ def __init__(self, hs: "HomeServer"): self._event_creation_handler = hs.get_event_creation_handler() self._event_auth_handler = hs.get_event_auth_handler() self._message_handler = hs.get_message_handler() - self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() + self.bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() self._state_resolution_handler = hs.get_state_resolution_handler() # avoid a circular dependency by deferring execution here self._get_room_member_handler = hs.get_room_member_handler @@ -2110,7 +2110,7 @@ async def _run_push_actions_and_persist_event( min_depth, ) else: - await self._bulk_push_rule_evaluator.action_for_event_by_user( + await self.bulk_push_rule_evaluator.action_for_event_by_user( event, context ) @@ -2153,6 +2153,7 @@ async def persist_events_and_notify( if instance != self._instance_name: # Limit the number of events sent over replication. We choose 200 # here as that is what we default to in `max_request_body_size(..)` + result = {} try: for batch in batch_iter(event_and_contexts, 200): result = await self._send_events( @@ -2173,14 +2174,14 @@ async def persist_events_and_notify( # Note that this returns the events that were persisted, which may not be # the same as were passed in if some were deduplicated due to transaction IDs. ( - events, + output_events, max_stream_token, ) = await self._storage_controllers.persistence.persist_events( event_and_contexts, backfilled=backfilled ) if self._ephemeral_messages_enabled: - for event in events: + for event in output_events: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) @@ -2188,13 +2189,13 @@ async def persist_events_and_notify( with start_active_span("notify_persisted_events"): set_tag( SynapseTags.RESULT_PREFIX + "event_ids", - str([ev.event_id for ev in events]), + str([ev.event_id for ev in output_events]), ) set_tag( SynapseTags.RESULT_PREFIX + "event_ids.length", - str(len(events)), + str(len(output_events)), ) - for event in events: + for event in output_events: await self._notify_persisted_event(event, max_stream_token) return max_stream_token.stream diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index ccd512be54b9..4e66a79e0d91 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -173,7 +173,11 @@ async def _get_rules_for_event( async def _get_power_levels_and_sender_level( self, event: EventBase, context: EventContext - ) -> Tuple[dict, int]: + ) -> Tuple[dict, Optional[int]]: + # There are no power levels and sender levels possible to get from outlier + if event.internal_metadata.is_outlier(): + return {}, None + event_types = auth_types_for_event(event.room_version, event) prev_state_ids = await context.get_prev_state_ids( StateFilter.from_types(event_types) @@ -258,8 +262,8 @@ async def action_for_event_by_user( should increment the unread count, and insert the results into the event_push_actions_staging table. """ - if event.internal_metadata.is_outlier(): - # This can happen due to out of band memberships + if not event.is_notifiable: + # Push rules for events that aren't notifiable can't be processed by this return count_as_unread = _should_count_as_unread(event, context) diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 3c5632cd9153..f8176c5a4253 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -42,18 +42,18 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") -def _room_member_count( - ev: EventBase, condition: Mapping[str, Any], room_member_count: int -) -> bool: +def _room_member_count(condition: Mapping[str, Any], room_member_count: int) -> bool: return _test_ineq_condition(condition, room_member_count) def _sender_notification_permission( - ev: EventBase, condition: Mapping[str, Any], - sender_power_level: int, + sender_power_level: Optional[int], power_levels: Dict[str, Union[int, Dict[str, int]]], ) -> bool: + if sender_power_level is None: + return False + notif_level_key = condition.get("key") if notif_level_key is None: return False @@ -129,7 +129,7 @@ def __init__( self, event: EventBase, room_member_count: int, - sender_power_level: int, + sender_power_level: Optional[int], power_levels: Dict[str, Union[int, Dict[str, int]]], relations: Dict[str, Set[Tuple[str, str]]], relations_match_enabled: bool, @@ -198,10 +198,10 @@ def matches( elif condition["kind"] == "contains_display_name": return self._contains_display_name(display_name) elif condition["kind"] == "room_member_count": - return _room_member_count(self._event, condition, self._room_member_count) + return _room_member_count(condition, self._room_member_count) elif condition["kind"] == "sender_notification_permission": return _sender_notification_permission( - self._event, condition, self._sender_power_level, self._power_levels + condition, self._sender_power_level, self._power_levels ) elif ( condition["kind"] == "org.matrix.msc3772.relation_match" diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index dad3731b9b50..cf7816f15ae7 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -423,16 +423,18 @@ async def enqueue( for d in ret_vals: replaced_events.update(d) - events = [] + persisted_events = [] for event, _ in events_and_contexts: existing_event_id = replaced_events.get(event.event_id) if existing_event_id: - events.append(await self.main_store.get_event(existing_event_id)) + persisted_events.append( + await self.main_store.get_event(existing_event_id) + ) else: - events.append(event) + persisted_events.append(event) return ( - events, + persisted_events, self.main_store.get_room_max_token(), ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a4010ee28dca..2a9fa3d2d6b1 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2180,13 +2180,11 @@ def _set_push_actions_for_event_and_users_txn( appear in events_and_context. """ - # Only non outlier events will have push actions associated with them, + # Only notifiable events will have push actions associated with them, # so let's filter them out. (This makes joining large rooms faster, as # these queries took seconds to process all the state events). - non_outlier_events = [ - event - for event, _ in events_and_contexts - if not event.internal_metadata.is_outlier() + notifiable_events = [ + event for event, _ in events_and_contexts if event.is_notifiable ] sql = """ @@ -2199,7 +2197,7 @@ def _set_push_actions_for_event_and_users_txn( WHERE event_id = ? """ - if non_outlier_events: + if notifiable_events: txn.execute_batch( sql, ( @@ -2209,12 +2207,12 @@ def _set_push_actions_for_event_and_users_txn( event.depth, event.event_id, ) - for event in non_outlier_events + for event in notifiable_events ), ) room_to_event_ids: Dict[str, List[str]] = {} - for e in non_outlier_events: + for e in notifiable_events: room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) for room_id, event_ids in room_to_event_ids.items():