Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert storage layer to async/await. #7963

Merged
merged 13 commits into from
Jul 28, 2020
1 change: 1 addition & 0 deletions changelog.d/7963.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
40 changes: 18 additions & 22 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.events import FrozenEvent
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -192,12 +192,11 @@ def __init__(self, hs, stores: DataStores):
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()

@defer.inlineCallbacks
def persist_events(
async def persist_events(
self,
events_and_contexts: List[Tuple[FrozenEvent, EventContext]],
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
):
) -> int:
"""
Write events to the database
Args:
Expand All @@ -207,7 +206,7 @@ def persist_events(
which might update the current state etc.

Returns:
Deferred[int]: the stream ordering of the latest persisted event
the stream ordering of the latest persisted event
"""
partitioned = {}
for event, ctx in events_and_contexts:
Expand All @@ -223,32 +222,29 @@ def persist_events(
for room_id in partitioned:
self._maybe_start_persisting(room_id)

yield make_deferred_yieldable(
await make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)

max_persisted_id = yield self.main_store.get_current_events_token()

return max_persisted_id
return self.main_store.get_current_events_token()

@defer.inlineCallbacks
def persist_event(
self, event: FrozenEvent, context: EventContext, backfilled: bool = False
):
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[int, int]:
"""
Returns:
Deferred[Tuple[int, int]]: the stream ordering of ``event``,
and the stream ordering of the latest persisted event
The stream ordering of `event`, and the stream ordering of the
latest persisted event
"""
deferred = self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)], backfilled=backfilled
)

self._maybe_start_persisting(event.room_id)

yield make_deferred_yieldable(deferred)
await make_deferred_yieldable(deferred)

max_persisted_id = yield self.main_store.get_current_events_token()
max_persisted_id = self.main_store.get_current_events_token()
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
return (event.internal_metadata.stream_ordering, max_persisted_id)

def _maybe_start_persisting(self, room_id: str):
Expand All @@ -262,7 +258,7 @@ async def persisting_queue(item):

async def _persist_events(
self,
events_and_contexts: List[Tuple[FrozenEvent, EventContext]],
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
):
"""Calculates the change to current state and forward extremities, and
Expand Down Expand Up @@ -439,7 +435,7 @@ async def _persist_events(
async def _calculate_new_extremities(
self,
room_id: str,
event_contexts: List[Tuple[FrozenEvent, EventContext]],
event_contexts: List[Tuple[EventBase, EventContext]],
latest_event_ids: List[str],
):
"""Calculates the new forward extremities for a room given events to
Expand Down Expand Up @@ -497,7 +493,7 @@ async def _calculate_new_extremities(
async def _get_new_state_after_events(
self,
room_id: str,
events_context: List[Tuple[FrozenEvent, EventContext]],
events_context: List[Tuple[EventBase, EventContext]],
old_latest_event_ids: Iterable[str],
new_latest_event_ids: Iterable[str],
) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]]]:
Expand Down Expand Up @@ -683,7 +679,7 @@ async def _calculate_state_delta(
async def _is_server_still_joined(
self,
room_id: str,
ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
ev_ctx_rm: List[Tuple[EventBase, EventContext]],
delta: DeltaState,
current_state: Optional[StateMap[str]],
potentially_left_users: Set[str],
Expand Down
38 changes: 18 additions & 20 deletions synapse/storage/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

import itertools
import logging

from twisted.internet import defer
from typing import Set

logger = logging.getLogger(__name__)

Expand All @@ -28,49 +27,48 @@ class PurgeEventsStorage(object):
def __init__(self, hs, stores):
self.stores = stores

@defer.inlineCallbacks
def purge_room(self, room_id: str):
async def purge_room(self, room_id: str):
"""Deletes all record of a room
"""

state_groups_to_delete = yield self.stores.main.purge_room(room_id)
yield self.stores.state.purge_room_state(room_id, state_groups_to_delete)
state_groups_to_delete = await self.stores.main.purge_room(room_id)
await self.stores.state.purge_room_state(room_id, state_groups_to_delete)

@defer.inlineCallbacks
def purge_history(self, room_id, token, delete_local_events):
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> None:
"""Deletes room history before a certain point

Args:
room_id (str):
room_id: The room ID

token (str): A topological token to delete events before
token: A topological token to delete events before

delete_local_events (bool):
delete_local_events:
if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their
state groups).
"""
state_groups = yield self.stores.main.purge_history(
state_groups = await self.stores.main.purge_history(
room_id, token, delete_local_events
)

logger.info("[purge] finding state groups that can be deleted")

sg_to_delete = yield self._find_unreferenced_groups(state_groups)
sg_to_delete = await self._find_unreferenced_groups(state_groups)

yield self.stores.state.purge_unreferenced_state_groups(room_id, sg_to_delete)
await self.stores.state.purge_unreferenced_state_groups(room_id, sg_to_delete)

@defer.inlineCallbacks
def _find_unreferenced_groups(self, state_groups):
async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]:
"""Used when purging history to figure out which state groups can be
deleted.

Args:
state_groups (set[int]): Set of state groups referenced by events
state_groups: Set of state groups referenced by events
that are going to be deleted.

Returns:
Deferred[set[int]] The set of state groups that can be deleted.
The set of state groups that can be deleted.
"""
# Graph of state group -> previous group
graph = {}
Expand All @@ -93,7 +91,7 @@ def _find_unreferenced_groups(self, state_groups):
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search

referenced = yield self.stores.main.get_referenced_state_groups(
referenced = await self.stores.main.get_referenced_state_groups(
current_search
)
referenced_groups |= referenced
Expand All @@ -102,7 +100,7 @@ def _find_unreferenced_groups(self, state_groups):
# groups that are referenced.
current_search -= referenced

edges = yield self.stores.state.get_previous_state_groups(current_search)
edges = await self.stores.state.get_previous_state_groups(current_search)

prevs = set(edges.values())
# We don't bother re-handling groups we've already seen
Expand Down
Loading