Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Move _find_unreferenced_groups
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Oct 29, 2018
1 parent 664b192 commit ad88460
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 81 deletions.
85 changes: 83 additions & 2 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2052,8 +2052,10 @@ def _purge_history_txn(

logger.info("[purge] finding state groups that can be deleted")

state_groups_to_delete, remaining_state_groups = self._find_unreferenced_groups(
txn, referenced_state_groups,
state_groups_to_delete, remaining_state_groups = (
self._find_unreferenced_groups_during_purge(
txn, referenced_state_groups,
)
)

logger.info(
Expand Down Expand Up @@ -2209,6 +2211,85 @@ def _purge_history_txn(

logger.info("[purge] done")

def _find_unreferenced_groups_during_purge(self, txn, state_groups):
"""Used when purging history to figure out which state groups can be
deleted and which need to be de-delta'ed (due to one of its prev groups
being scheduled for deletion).
Args:
txn
state_groups (set[int]): Set of state groups referenced by events
that are going to be deleted.
Returns:
tuple[set[int], set[int]]: The set of state groups that can be
deleted and the set of state groups that need to be de-delta'ed
"""
# Graph of state group -> previous group
graph = {}

# Set of events that we have found to be referenced by events
referenced_groups = set()

# Set of state groups we've already seen
state_groups_seen = set(state_groups)

# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search

# Check if state groups are referenced
sql = """
SELECT DISTINCT state_group FROM event_to_state_groups
LEFT JOIN events_to_purge AS ep USING (event_id)
WHERE state_group IN (%s) AND ep.event_id IS NULL
""" % (",".join("?" for _ in current_search),)
txn.execute(sql, list(current_search))

referenced = set(sg for sg, in txn)
referenced_groups |= referenced

# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced

rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=current_search,
keyvalues={},
retcols=("prev_state_group", "state_group",),
)

prevs = set(row["state_group"] for row in rows)
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs

for row in rows:
# Note: Each state group can have at most one prev group
graph[row["state_group"]] = row["prev_state_group"]

to_delete = state_groups_seen - referenced_groups

to_dedelta = set()
for sg in referenced_groups:
prev_sg = graph.get(sg)
if prev_sg and prev_sg in to_delete:
to_dedelta.add(sg)

return to_delete, to_dedelta

@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
Expand Down
79 changes: 0 additions & 79 deletions synapse/storage/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,85 +1234,6 @@ def _count_state_group_hops_txn(self, txn, state_group):

return count

def _find_unreferenced_groups(self, txn, state_groups):
"""Used when purging history to figure out which state groups can be
deleted and which need to be de-delta'ed (due to one of its prev groups
being scheduled for deletion).
Args:
txn
state_groups (set[int]): Set of state groups referenced by events
that are going to be deleted.
Returns:
tuple[set[int], set[int]]: The set of state groups that can be
deleted and the set of state groups that need to be de-delta'ed
"""
# Graph of state group -> previous group
graph = {}

# Set of events that we have found to be referenced by events
referenced_groups = set()

# Set of state groups we've already seen
state_groups_seen = set(state_groups)

# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(islice(next_to_search, 100))
next_to_search -= current_search

# Check if state groups are referenced
sql = """
SELECT DISTINCT state_group FROM event_to_state_groups
LEFT JOIN events_to_purge AS ep USING (event_id)
WHERE state_group IN (%s) AND ep.event_id IS NULL
""" % (",".join("?" for _ in current_search),)
txn.execute(sql, list(current_search))

referenced = set(sg for sg, in txn)
referenced_groups |= referenced

# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced

rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=current_search,
keyvalues={},
retcols=("prev_state_group", "state_group",),
)

prevs = set(row["state_group"] for row in rows)
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs

for row in rows:
# Note: Each state group can have at most one prev group
graph[row["state_group"]] = row["prev_state_group"]

to_delete = state_groups_seen - referenced_groups

to_dedelta = set()
for sg in referenced_groups:
prev_sg = graph.get(sg)
if prev_sg and prev_sg in to_delete:
to_dedelta.add(sg)

return to_delete, to_dedelta


class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
""" Keeps track of the state at a given event.
Expand Down

0 comments on commit ad88460

Please sign in to comment.