From 258203e834afa2ebb42a0c912ef5cda80310889a Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 15 Nov 2021 14:34:07 +0000 Subject: [PATCH 1/5] Run _upgrade_existing_database on workers if at current schema_version Workers need to run this method to figure out if any outstanding delta files exist - even if we're currently on the latest schema version. #11255 made the correct fix in that _upgrade_existing_database should not be run if version_info.schema_version > SCHEMA_VERSION. But we still need to run this method if version_info.schema_version == SCHEMA_VERSION. --- synapse/storage/prepare_database.py | 36 ++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 8b9c6adae2a7..73c0eebe73f5 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -131,17 +131,13 @@ def prepare_database( "config==None in prepare_database, but database is not empty" ) - # if it's a worker app, refuse to upgrade the database, to avoid multiple - # workers doing it at once. - if config.worker.worker_app is None: - _upgrade_existing_database( - cur, - version_info, - database_engine, - config, - databases=databases, - ) - elif version_info.current_version < SCHEMA_VERSION: + # If the schema version needs to be updated, and we are on a worker, we immediately + # know to bail out as workers cannot update the database schema. Only one worker + # must update the database at the time, therefore we delegate this task to the master. + if ( + config.worker.worker_app is not None + and version_info.current_version < SCHEMA_VERSION + ): # If the DB is on an older version than we expect then we refuse # to start the worker (as the main process needs to run first to # update the schema). @@ -150,6 +146,24 @@ def prepare_database( % (SCHEMA_VERSION, version_info.current_version) ) + # Otherwise if we are on the current schema version, we need to check if there + # are any unapplied delta files to process. This should be run on all processes, + # master or worker. The master will apply the deltas, workers will check if any + # outstanding deltas exist and bail out if they do. + if version_info.current_version == SCHEMA_VERSION: + _upgrade_existing_database( + cur, + version_info, + database_engine, + config, + databases=databases, + ) + + # If SCHEMA_VERSION is greater than version_info.current_version (for instance, + # if we're rolling back the database) then don't apply any migrations. + # Things should work as long as one hasn't rolled back past the + # SCHEMA_COMPAT_VERSION. + else: logger.info("%r: Initialising new database", databases) From cc0df3556a3c4bb5b4f15bca03e7090b942682a7 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 15 Nov 2021 14:37:22 +0000 Subject: [PATCH 2/5] Changelog --- changelog.d/11346.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11346.bugfix diff --git a/changelog.d/11346.bugfix b/changelog.d/11346.bugfix new file mode 100644 index 000000000000..1fe8020eab8c --- /dev/null +++ b/changelog.d/11346.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.47.0rc1 which caused worker processes to not halt startup in the presence of outstanding database migrations. \ No newline at end of file From 5aa7589c81d055f3f40f9e4199c31e5eeb629483 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 15 Nov 2021 14:56:35 +0000 Subject: [PATCH 3/5] Move worker checking logic into _upgrade_existing_database --- synapse/storage/prepare_database.py | 54 ++++++++++++----------------- 1 file changed, 22 insertions(+), 32 deletions(-) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 73c0eebe73f5..99cff5d903ec 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -131,38 +131,16 @@ def prepare_database( "config==None in prepare_database, but database is not empty" ) - # If the schema version needs to be updated, and we are on a worker, we immediately - # know to bail out as workers cannot update the database schema. Only one worker - # must update the database at the time, therefore we delegate this task to the master. - if ( - config.worker.worker_app is not None - and version_info.current_version < SCHEMA_VERSION - ): - # If the DB is on an older version than we expect then we refuse - # to start the worker (as the main process needs to run first to - # update the schema). - raise UpgradeDatabaseException( - OUTDATED_SCHEMA_ON_WORKER_ERROR - % (SCHEMA_VERSION, version_info.current_version) - ) - - # Otherwise if we are on the current schema version, we need to check if there - # are any unapplied delta files to process. This should be run on all processes, - # master or worker. The master will apply the deltas, workers will check if any - # outstanding deltas exist and bail out if they do. - if version_info.current_version == SCHEMA_VERSION: - _upgrade_existing_database( - cur, - version_info, - database_engine, - config, - databases=databases, - ) - - # If SCHEMA_VERSION is greater than version_info.current_version (for instance, - # if we're rolling back the database) then don't apply any migrations. - # Things should work as long as one hasn't rolled back past the - # SCHEMA_COMPAT_VERSION. + # This should be run on all processes, master or worker. The master will + # apply the deltas, while workers will check if any outstanding deltas + # exist and raise an PrepareDatabaseException if they do. + _upgrade_existing_database( + cur, + version_info, + database_engine, + config, + databases=databases, + ) else: logger.info("%r: Initialising new database", databases) @@ -372,6 +350,18 @@ def _upgrade_existing_database( is_worker = config and config.worker.worker_app is not None + # If the schema version needs to be updated, and we are on a worker, we immediately + # know to bail out as workers cannot update the database schema. Only one worker + # must update the database at the time, therefore we delegate this task to the master. + if is_worker and current_schema_state.current_version < SCHEMA_VERSION: + # If the DB is on an older version than we expect then we refuse + # to start the worker (as the main process needs to run first to + # update the schema). + raise UpgradeDatabaseException( + OUTDATED_SCHEMA_ON_WORKER_ERROR + % (SCHEMA_VERSION, current_schema_state.current_version) + ) + if ( current_schema_state.compat_version is not None and current_schema_state.compat_version > SCHEMA_VERSION From 0f8a399249bd32c1dba61b190818c19893593cbd Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 15 Nov 2021 15:47:11 +0000 Subject: [PATCH 4/5] Add test for running delta on worker with current schema version --- tests/storage/test_rollback_worker.py | 52 ++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py index a6be9a1bb184..0ce089216523 100644 --- a/tests/storage/test_rollback_worker.py +++ b/tests/storage/test_rollback_worker.py @@ -11,6 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import List +from unittest import mock + from synapse.app.generic_worker import GenericWorkerServer from synapse.storage.database import LoggingDatabaseConnection from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database @@ -19,6 +22,22 @@ from tests.unittest import HomeserverTestCase +def fake_listdir(filepath: str) -> List[str]: + """ + A fake implementation of os.listdir which we can use to mock out the filesystem. + + Args: + filepath: The directory to list files for. + + Returns: + A list of files and folders in the directory. + """ + if filepath.endswith("full_schemas"): + return [SCHEMA_VERSION] + + return ["99_add_unicorn_to_database.sql"] + + class WorkerSchemaTests(HomeserverTestCase): def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver( @@ -51,7 +70,7 @@ def test_rolling_back(self): prepare_database(db_conn, db_pool.engine, self.hs.config) - def test_not_upgraded(self): + def test_not_upgraded_old_schema_version(self): """Test that workers don't start if the DB has an older schema version""" db_pool = self.hs.get_datastore().db_pool db_conn = LoggingDatabaseConnection( @@ -67,3 +86,34 @@ def test_not_upgraded(self): with self.assertRaises(PrepareDatabaseException): prepare_database(db_conn, db_pool.engine, self.hs.config) + + def test_not_upgraded_current_schema_version_with_outstanding_deltas(self): + """ + Test that workers don't start if the DB is on the current schema version, + but there are still outstanding delta migrations to run. + """ + db_pool = self.hs.get_datastore().db_pool + db_conn = LoggingDatabaseConnection( + db_pool._db_pool.connect(), + db_pool.engine, + "tests", + ) + + # Set the schema version of the database to the current version + cur = db_conn.cursor() + cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION,)) + + db_conn.commit() + + # Path `os.listdir` here to make synapse think that there is a migration + # file ready to be run. + # Note that we can't patch this function for the whole method, else Synapse + # will try to find the file when building the database initially. + with mock.patch("os.listdir", mock.Mock(side_effect=fake_listdir)): + with self.assertRaises(PrepareDatabaseException): + # Synapse should think that there is an outstanding migration file due to + # patching 'os.listdir' in the function decorator. + # + # We expect Synapse to raise an exception to indicate the master process + # needs to apply this migration file. + prepare_database(db_conn, db_pool.engine, self.hs.config) From 3a9709c17c03fef7c94b8df70b5202fea443d089 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 15 Nov 2021 17:05:23 +0000 Subject: [PATCH 5/5] Update synapse/storage/prepare_database.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/prepare_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 99cff5d903ec..e45adfcb5569 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -351,7 +351,7 @@ def _upgrade_existing_database( is_worker = config and config.worker.worker_app is not None # If the schema version needs to be updated, and we are on a worker, we immediately - # know to bail out as workers cannot update the database schema. Only one worker + # know to bail out as workers cannot update the database schema. Only one process # must update the database at the time, therefore we delegate this task to the master. if is_worker and current_schema_state.current_version < SCHEMA_VERSION: # If the DB is on an older version than we expect then we refuse