Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Run _upgrade_existing_database on workers if at current schema_version #11346

Merged
merged 5 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11346.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in v1.47.0rc1 which caused worker processes to not halt startup in the presence of outstanding database migrations.
40 changes: 22 additions & 18 deletions synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,24 +131,16 @@ def prepare_database(
"config==None in prepare_database, but database is not empty"
)

# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config.worker.worker_app is None:
_upgrade_existing_database(
cur,
version_info,
database_engine,
config,
databases=databases,
)
elif version_info.current_version < SCHEMA_VERSION:
# If the DB is on an older version than we expect then we refuse
# to start the worker (as the main process needs to run first to
# update the schema).
raise UpgradeDatabaseException(
OUTDATED_SCHEMA_ON_WORKER_ERROR
% (SCHEMA_VERSION, version_info.current_version)
)
# This should be run on all processes, master or worker. The master will
# apply the deltas, while workers will check if any outstanding deltas
# exist and raise an PrepareDatabaseException if they do.
_upgrade_existing_database(
cur,
version_info,
database_engine,
config,
databases=databases,
)

else:
logger.info("%r: Initialising new database", databases)
Expand Down Expand Up @@ -358,6 +350,18 @@ def _upgrade_existing_database(

is_worker = config and config.worker.worker_app is not None

# If the schema version needs to be updated, and we are on a worker, we immediately
# know to bail out as workers cannot update the database schema. Only one process
# must update the database at the time, therefore we delegate this task to the master.
if is_worker and current_schema_state.current_version < SCHEMA_VERSION:
# If the DB is on an older version than we expect then we refuse
# to start the worker (as the main process needs to run first to
# update the schema).
raise UpgradeDatabaseException(
OUTDATED_SCHEMA_ON_WORKER_ERROR
% (SCHEMA_VERSION, current_schema_state.current_version)
)

if (
current_schema_state.compat_version is not None
and current_schema_state.compat_version > SCHEMA_VERSION
Expand Down
52 changes: 51 additions & 1 deletion tests/storage/test_rollback_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from unittest import mock

from synapse.app.generic_worker import GenericWorkerServer
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database
Expand All @@ -19,6 +22,22 @@
from tests.unittest import HomeserverTestCase


def fake_listdir(filepath: str) -> List[str]:
"""
A fake implementation of os.listdir which we can use to mock out the filesystem.

Args:
filepath: The directory to list files for.

Returns:
A list of files and folders in the directory.
"""
if filepath.endswith("full_schemas"):
return [SCHEMA_VERSION]

return ["99_add_unicorn_to_database.sql"]


class WorkerSchemaTests(HomeserverTestCase):
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver(
Expand Down Expand Up @@ -51,7 +70,7 @@ def test_rolling_back(self):

prepare_database(db_conn, db_pool.engine, self.hs.config)

def test_not_upgraded(self):
def test_not_upgraded_old_schema_version(self):
"""Test that workers don't start if the DB has an older schema version"""
db_pool = self.hs.get_datastore().db_pool
db_conn = LoggingDatabaseConnection(
Expand All @@ -67,3 +86,34 @@ def test_not_upgraded(self):

with self.assertRaises(PrepareDatabaseException):
prepare_database(db_conn, db_pool.engine, self.hs.config)

def test_not_upgraded_current_schema_version_with_outstanding_deltas(self):
"""
Test that workers don't start if the DB is on the current schema version,
but there are still outstanding delta migrations to run.
"""
db_pool = self.hs.get_datastore().db_pool
db_conn = LoggingDatabaseConnection(
db_pool._db_pool.connect(),
db_pool.engine,
"tests",
)

# Set the schema version of the database to the current version
cur = db_conn.cursor()
cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION,))

db_conn.commit()

# Path `os.listdir` here to make synapse think that there is a migration
# file ready to be run.
# Note that we can't patch this function for the whole method, else Synapse
# will try to find the file when building the database initially.
with mock.patch("os.listdir", mock.Mock(side_effect=fake_listdir)):
with self.assertRaises(PrepareDatabaseException):
# Synapse should think that there is an outstanding migration file due to
# patching 'os.listdir' in the function decorator.
#
# We expect Synapse to raise an exception to indicate the master process
# needs to apply this migration file.
prepare_database(db_conn, db_pool.engine, self.hs.config)