Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Migrate stream_ordering to a bigint
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh committed Jun 28, 2021
1 parent 9cfb3dd commit aed30b7
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 1 deletion.
103 changes: 103 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,23 @@
logger = logging.getLogger(__name__)


_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
# there should be no leftover rows without a stream_ordering2, but just in case...
"UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
# finally, we can drop the rule and switch the columns
"DROP RULE populate_stream_ordering2 ON events",
"ALTER TABLE events DROP COLUMN stream_ordering",
"ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
)


class _BackgroundUpdates:
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"


@attr.s(slots=True, frozen=True)
Expand Down Expand Up @@ -142,6 +155,24 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._purged_chain_cover_index,
)

# bg updates for replacing stream_ordering with a BIGINT
# (these only run on postgres.)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
self._background_populate_stream_ordering2,
)
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.INDEX_STREAM_ORDERING2,
index_name="events_stream_ordering",
table="events",
columns=["stream_ordering2"],
unique=True,
)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
self._background_replace_stream_ordering_column,
)

async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
Expand Down Expand Up @@ -1012,3 +1043,75 @@ def purged_chain_cover_txn(txn) -> int:
await self.db_pool.updates._end_background_update("purged_chain_cover")

return result

async def _background_populate_stream_ordering2(
self, progress: JsonDict, batch_size: int
) -> int:
"""Populate events.stream_ordering2, then replace stream_ordering
This is to deal with the fact that stream_ordering was initially created as a
32-bit integer field.
"""
batch_size = max(batch_size, 1)

def process(txn: Cursor) -> int:
# if this is the first pass, find the minimum stream ordering
last_stream = progress.get("last_stream")
if last_stream is None:
txn.execute(
"""
SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1
"""
)
rows = txn.fetchall()
if not rows:
return 0
last_stream = rows[0][0] - 1

txn.execute(
"""
UPDATE events SET stream_ordering2=stream_ordering
WHERE stream_ordering > ? AND stream_ordering <= ?
""",
(last_stream, last_stream + batch_size),
)
row_count = txn.rowcount

self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
{"last_stream": last_stream + batch_size},
)
return row_count

result = await self.db_pool.runInteraction(
"_background_populate_stream_ordering2", process
)

if result != 0:
return result

await self.db_pool.updates._end_background_update(
_BackgroundUpdates.POPULATE_STREAM_ORDERING2
)
return 0

async def _background_replace_stream_ordering_column(
self, progress: JsonDict, batch_size: int
) -> int:
"""Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""

def process(txn: Cursor) -> None:
for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
logger.info("completing stream_ordering migration: %s", sql)
txn.execute(sql)

await self.db_pool.runInteraction(
"_background_replace_stream_ordering_column", process
)

await self.db_pool.updates._end_background_update(
_BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN
)

return 0
2 changes: 1 addition & 1 deletion synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

SCHEMA_VERSION = 59
SCHEMA_VERSION = 60
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- This migration handles the process of changing the type of `stream_ordering` to
-- a BIGINT.
--
-- Note that this is only a problem on postgres as sqlite only has one "integer" type
-- which can cope with values up to 2^63.

-- First add a new column to contain the bigger stream_ordering
ALTER TABLE events ADD COLUMN stream_ordering2 BIGINT;

-- Create a rule which will populate it for new rows.
CREATE OR REPLACE RULE "populate_stream_ordering2" AS
ON INSERT TO events
DO UPDATE events SET stream_ordering2=NEW.stream_ordering WHERE stream_ordering=NEW.stream_ordering;

-- Start a bg process to populate it for old events
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6001, 'populate_stream_ordering2', '{}');

-- ... and another to build an index on it
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2');

-- ... and another to do the switcheroo
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2');

0 comments on commit aed30b7

Please sign in to comment.