Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Instrument state and state_group storage related things (tracing) (
Browse files Browse the repository at this point in the history
…#15610)

Instrument `state` and `state_group` storage related things (tracing) so it's a little more clear where these database transactions are coming from as there is a lot of wires crossing in these functions.

Part of `/messages` performance investigation: #13356
  • Loading branch information
MadLittleMods authored May 19, 2023
1 parent ca3c07e commit 703a8f9
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/15610.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Instrument `state` and `state_group` storage-related operations to better picture what's happening when tracing.
5 changes: 5 additions & 0 deletions synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from synapse.appservice import ApplicationService
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
from synapse.types import JsonDict, StateMap

if TYPE_CHECKING:
Expand Down Expand Up @@ -242,6 +243,8 @@ def state_group(self) -> Optional[int]:

return self._state_group

@trace
@tag_args
async def get_current_state_ids(
self, state_filter: Optional["StateFilter"] = None
) -> Optional[StateMap[str]]:
Expand Down Expand Up @@ -275,6 +278,8 @@ async def get_current_state_ids(

return prev_state_ids

@trace
@tag_args
async def get_prev_state_ids(
self, state_filter: Optional["StateFilter"] = None
) -> StateMap[str]:
Expand Down
4 changes: 4 additions & 0 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
UnpersistedEventContextBase,
)
from synapse.logging.context import ContextResourceUsage
from synapse.logging.opentracing import tag_args, trace
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
Expand Down Expand Up @@ -270,6 +271,8 @@ async def get_hosts_in_room_at_events(
state = await entry.get_state(self._state_storage_controller, StateFilter.all())
return await self.store.get_joined_hosts(room_id, state, entry)

@trace
@tag_args
async def calculate_context_info(
self,
event: EventBase,
Expand Down Expand Up @@ -465,6 +468,7 @@ async def compute_event_context(

return await unpersisted_context.persist(event)

@trace
@measure_func()
async def resolve_state_groups_for_events(
self, room_id: str, event_ids: Collection[str], await_full_state: bool = True
Expand Down
33 changes: 33 additions & 0 deletions synapse/storage/controllers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def notify_room_un_partial_stated(self, room_id: str) -> None:
"""
self._partial_state_room_tracker.notify_un_partial_stated(room_id)

@trace
@tag_args
async def get_state_group_delta(
self, state_group: int
) -> Tuple[Optional[int], Optional[StateMap[str]]]:
Expand All @@ -84,6 +86,8 @@ async def get_state_group_delta(
state_group_delta = await self.stores.state.get_state_group_delta(state_group)
return state_group_delta.prev_group, state_group_delta.delta_ids

@trace
@tag_args
async def get_state_groups_ids(
self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True
) -> Dict[int, MutableStateMap[str]]:
Expand Down Expand Up @@ -114,6 +118,8 @@ async def get_state_groups_ids(

return group_to_state

@trace
@tag_args
async def get_state_ids_for_group(
self, state_group: int, state_filter: Optional[StateFilter] = None
) -> StateMap[str]:
Expand All @@ -130,6 +136,8 @@ async def get_state_ids_for_group(

return group_to_state[state_group]

@trace
@tag_args
async def get_state_groups(
self, room_id: str, event_ids: Collection[str]
) -> Dict[int, List[EventBase]]:
Expand Down Expand Up @@ -165,6 +173,8 @@ async def get_state_groups(
for group, event_id_map in group_to_ids.items()
}

@trace
@tag_args
def _get_state_groups_from_groups(
self, groups: List[int], state_filter: StateFilter
) -> Awaitable[Dict[int, StateMap[str]]]:
Expand All @@ -183,6 +193,7 @@ def _get_state_groups_from_groups(
return self.stores.state._get_state_groups_from_groups(groups, state_filter)

@trace
@tag_args
async def get_state_for_events(
self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
) -> Dict[str, StateMap[EventBase]]:
Expand Down Expand Up @@ -280,6 +291,8 @@ async def get_state_ids_for_events(

return {event: event_to_state[event] for event in event_ids}

@trace
@tag_args
async def get_state_for_event(
self, event_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[EventBase]:
Expand All @@ -303,6 +316,7 @@ async def get_state_for_event(
return state_map[event_id]

@trace
@tag_args
async def get_state_ids_for_event(
self,
event_id: str,
Expand Down Expand Up @@ -333,6 +347,8 @@ async def get_state_ids_for_event(
)
return state_map[event_id]

@trace
@tag_args
def get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
) -> Awaitable[Dict[int, MutableStateMap[str]]]:
Expand Down Expand Up @@ -402,6 +418,8 @@ async def store_state_group(
event_id, room_id, prev_group, delta_ids, current_state_ids
)

@trace
@tag_args
@cancellable
async def get_current_state_ids(
self,
Expand Down Expand Up @@ -442,6 +460,8 @@ async def get_current_state_ids(
room_id, on_invalidate=on_invalidate
)

@trace
@tag_args
async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]:
"""Get canonical alias for room, if any
Expand All @@ -466,6 +486,8 @@ async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]:

return event.content.get("canonical_alias")

@trace
@tag_args
async def get_current_state_deltas(
self, prev_stream_id: int, max_stream_id: int
) -> Tuple[int, List[Dict[str, Any]]]:
Expand Down Expand Up @@ -500,6 +522,7 @@ async def get_current_state_deltas(
)

@trace
@tag_args
async def get_current_state(
self, room_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[EventBase]:
Expand All @@ -516,6 +539,8 @@ async def get_current_state(

return state_map

@trace
@tag_args
async def get_current_state_event(
self, room_id: str, event_type: str, state_key: str
) -> Optional[EventBase]:
Expand All @@ -527,6 +552,8 @@ async def get_current_state_event(
)
return state_map.get(key)

@trace
@tag_args
async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]:
"""Get current hosts in room based on current state.
Expand All @@ -538,6 +565,8 @@ async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]:

return await self.stores.main.get_current_hosts_in_room(room_id)

@trace
@tag_args
async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]:
"""Get current hosts in room based on current state.
Expand All @@ -553,6 +582,8 @@ async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]:

return await self.stores.main.get_current_hosts_in_room_ordered(room_id)

@trace
@tag_args
async def get_current_hosts_in_room_or_partial_state_approximation(
self, room_id: str
) -> Collection[str]:
Expand Down Expand Up @@ -582,6 +613,8 @@ async def get_current_hosts_in_room_or_partial_state_approximation(

return hosts

@trace
@tag_args
async def get_users_in_room_with_profiles(
self, room_id: str
) -> Mapping[str, ProfileInfo]:
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/databases/state/bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union

from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
Expand All @@ -40,6 +41,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
updates.
"""

@trace
@tag_args
def _count_state_group_hops_txn(
self, txn: LoggingTransaction, state_group: int
) -> int:
Expand Down Expand Up @@ -83,6 +86,8 @@ def _count_state_group_hops_txn(

return count

@trace
@tag_args
def _get_state_groups_from_groups_txn(
self,
txn: LoggingTransaction,
Expand Down
15 changes: 15 additions & 0 deletions synapse/storage/databases/state/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase
from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
Expand Down Expand Up @@ -159,6 +160,8 @@ def _get_state_group_delta_txn(txn: LoggingTransaction) -> _GetStateGroupDelta:
"get_state_group_delta", _get_state_group_delta_txn
)

@trace
@tag_args
@cancellable
async def _get_state_groups_from_groups(
self, groups: List[int], state_filter: StateFilter
Expand Down Expand Up @@ -187,6 +190,8 @@ async def _get_state_groups_from_groups(

return results

@trace
@tag_args
def _get_state_for_group_using_cache(
self,
cache: DictionaryCache[int, StateKey, str],
Expand Down Expand Up @@ -239,6 +244,8 @@ def _get_state_for_group_using_cache(

return state_filter.filter_state(state_dict_ids), not missing_types

@trace
@tag_args
@cancellable
async def _get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
Expand Down Expand Up @@ -305,6 +312,8 @@ async def _get_state_for_groups(

return state

@trace
@tag_args
def _get_state_for_groups_using_cache(
self,
groups: Iterable[int],
Expand Down Expand Up @@ -403,6 +412,8 @@ def _insert_into_cache(
fetched_keys=non_member_types,
)

@trace
@tag_args
async def store_state_deltas_for_batched(
self,
events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]],
Expand Down Expand Up @@ -520,6 +531,8 @@ def insert_deltas_group_txn(
prev_group,
)

@trace
@tag_args
async def store_state_group(
self,
event_id: str,
Expand Down Expand Up @@ -772,6 +785,8 @@ def _purge_unreferenced_state_groups(
((sg,) for sg in state_groups_to_delete),
)

@trace
@tag_args
async def get_previous_state_groups(
self, state_groups: Iterable[int]
) -> Dict[int, int]:
Expand Down

0 comments on commit 703a8f9

Please sign in to comment.