Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Minor typing fixes for synapse/storage/persist_events.py #12069

Merged
merged 3 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12069.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Minor typing fixes.
21 changes: 12 additions & 9 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async def _persist_events_and_state_updates(
*,
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
new_forward_extremities: Dict[str, Set[str]],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these be Collection instead of Set? They don't seem to actually need to be sets? (Same for _persist_events_txn.)

Mostly... is there any bug here or was it just some incorrect type hints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I'd prefer to keep these as Sets, since new_forward_extremities eventually makes it down to _update_forward_extremities_txn, which relies upon there being no duplicates.

The List was just an incorrect type hint which got flagged once we typed the return of _calculate_new_extremities.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, just wasn't sure if we were overly strictly typing. 👍

use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
) -> None:
Expand All @@ -143,7 +143,7 @@ async def _persist_events_and_state_updates(
the room based on forward extremities
state_delta_for_room: Map from room_id to the delta to apply to
room state
new_forward_extremities: Map from room_id to list of event IDs
new_forward_extremities: Map from room_id to set of event IDs
that are the new forward extremities of the room.
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
Expand Down Expand Up @@ -193,7 +193,7 @@ async def _persist_events_and_state_updates(
events_and_contexts=events_and_contexts,
inhibit_local_membership_updates=inhibit_local_membership_updates,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
new_forward_extremities=new_forward_extremities,
)
persist_event_counter.inc(len(events_and_contexts))

Expand All @@ -220,7 +220,7 @@ async def _persist_events_and_state_updates(
for room_id, new_state in current_state_for_room.items():
self.store.get_current_state_ids.prefill((room_id,), new_state)

for room_id, latest_event_ids in new_forward_extremeties.items():
for room_id, latest_event_ids in new_forward_extremities.items():
self.store.get_latest_event_ids_in_room.prefill(
(room_id,), list(latest_event_ids)
)
Expand Down Expand Up @@ -334,7 +334,7 @@ def _persist_events_txn(
events_and_contexts: List[Tuple[EventBase, EventContext]],
inhibit_local_membership_updates: bool = False,
state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
new_forward_extremities: Optional[Dict[str, Set[str]]] = None,
):
squahtx marked this conversation as resolved.
Show resolved Hide resolved
"""Insert some number of room events into the necessary database tables.

Expand All @@ -353,13 +353,13 @@ def _persist_events_txn(
from the database. This is useful when retrying due to
IntegrityError.
state_delta_for_room: The current-state delta for each room.
new_forward_extremetie: The new forward extremities for each room.
new_forward_extremities: The new forward extremities for each room.
For each room, a list of the event ids which are the forward
extremities.

"""
state_delta_for_room = state_delta_for_room or {}
new_forward_extremeties = new_forward_extremeties or {}
new_forward_extremities = new_forward_extremities or {}

all_events_and_contexts = events_and_contexts

Expand All @@ -372,7 +372,7 @@ def _persist_events_txn(

self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremeties,
new_forward_extremities=new_forward_extremities,
max_stream_order=max_stream_order,
)

Expand Down Expand Up @@ -1158,7 +1158,10 @@ def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
)

def _update_forward_extremities_txn(
self, txn, new_forward_extremities, max_stream_order
self,
txn: LoggingTransaction,
new_forward_extremities: Dict[str, Set[str]],
max_stream_order: int,
):
for room_id in new_forward_extremities.keys():
self.db_pool.simple_delete_txn(
Expand Down
25 changes: 12 additions & 13 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,21 +427,21 @@ async def _persist_event_batch(
# NB: Assumes that we are only persisting events for one room
# at a time.

# map room_id->list[event_ids] giving the new forward
# map room_id->set[event_ids] giving the new forward
# extremities in each room
new_forward_extremeties = {}
new_forward_extremities: Dict[str, Set[str]] = {}

# map room_id->(type,state_key)->event_id tracking the full
# state in each room after adding these events.
# This is simply used to prefill the get_current_state_ids
# cache
current_state_for_room = {}
current_state_for_room: Dict[str, StateMap[str]] = {}

# map room_id->(to_delete, to_insert) where to_delete is a list
# of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = {}
state_delta_for_room: Dict[str, DeltaState] = {}

# Set of remote users which were in rooms the server has left. We
# should check if we still share any rooms and if not we mark their
Expand All @@ -460,14 +460,13 @@ async def _persist_event_batch(
)

for room_id, ev_ctx_rm in events_by_room.items():
latest_event_ids = (
latest_event_ids = set(
await self.main_store.get_latest_event_ids_in_room(room_id)
)
new_latest_event_ids = await self._calculate_new_extremities(
clokep marked this conversation as resolved.
Show resolved Hide resolved
room_id, ev_ctx_rm, latest_event_ids
)

latest_event_ids = set(latest_event_ids)
if new_latest_event_ids == latest_event_ids:
# No change in extremities, so no change in state
continue
Expand All @@ -478,7 +477,7 @@ async def _persist_event_batch(
# extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids, "No forward extremities left!"

new_forward_extremeties[room_id] = new_latest_event_ids
new_forward_extremities[room_id] = new_latest_event_ids

len_1 = (
len(latest_event_ids) == 1
Expand Down Expand Up @@ -533,7 +532,7 @@ async def _persist_event_batch(
# extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids, "No forward extremities left!"

new_forward_extremeties[room_id] = new_latest_event_ids
new_forward_extremities[room_id] = new_latest_event_ids

# If either are not None then there has been a change,
# and we need to work out the delta (or use that
Expand Down Expand Up @@ -567,7 +566,7 @@ async def _persist_event_batch(
)
if not is_still_joined:
logger.info("Server no longer in room %s", room_id)
latest_event_ids = []
latest_event_ids = set()
current_state = {}
delta.no_longer_in_room = True

Expand All @@ -582,7 +581,7 @@ async def _persist_event_batch(
chunk,
current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
new_forward_extremities=new_forward_extremities,
use_negative_stream_ordering=backfilled,
inhibit_local_membership_updates=backfilled,
)
Expand All @@ -596,7 +595,7 @@ async def _calculate_new_extremities(
room_id: str,
event_contexts: List[Tuple[EventBase, EventContext]],
latest_event_ids: Collection[str],
):
) -> Set[str]:
"""Calculates the new forward extremities for a room given events to
persist.

Expand Down Expand Up @@ -906,9 +905,9 @@ async def _prune_extremities(
# Ideally we'd figure out a way of still being able to drop old
# dummy events that reference local events, but this is good enough
# as a first cut.
events_to_check = [event]
events_to_check: Collection[EventBase] = [event]
while events_to_check:
new_events = set()
new_events: Set[str] = set()
for event_to_check in events_to_check:
if self.is_mine_id(event_to_check.sender):
if event_to_check.type != EventTypes.Dummy:
Expand Down