From e697ca01879fd7d8644241390078df9d2bdf15a9 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 16:38:44 +0000 Subject: [PATCH 01/11] Raise an exception if there are pending background updates So we return with a non-0 code --- scripts/synapse_port_db | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 5e69104b97d6..38c15034f2ea 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -508,12 +508,11 @@ class Porter(object): yield self.sqlite_store.db.updates.has_completed_background_updates() ) if not updates_complete: - sys.stderr.write( + raise Exception( "Pending background updates exist in the SQLite3 database." " Please start Synapse again and wait until every update has finished" " before running this script.\n" ) - defer.returnValue(None) self.postgres_store = self.build_db_store( self.hs_config.get_single_database() From c684227f39d0d6974fd0d71f9eb9fd440eb00518 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 16:41:34 +0000 Subject: [PATCH 02/11] Changelog --- changelog.d/6718.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6718.bugfix diff --git a/changelog.d/6718.bugfix b/changelog.d/6718.bugfix new file mode 100644 index 000000000000..23b23e3ed8c4 --- /dev/null +++ b/changelog.d/6718.bugfix @@ -0,0 +1 @@ +Fix a bug causing the `synapse_port_db` script to return 0 in a specific error case. From 7fd6a4ec96fab626c4261f3558977a1883af9c2e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 17:09:24 +0000 Subject: [PATCH 03/11] Port synapse_port_db to async/await --- scripts/synapse_port_db | 87 +++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 38c15034f2ea..71b92f0d0773 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -17,6 +17,7 @@ # limitations under the License. import argparse +import asyncio import curses import logging import sys @@ -189,11 +190,10 @@ class Porter(object): def __init__(self, **kwargs): self.__dict__.update(kwargs) - @defer.inlineCallbacks - def setup_table(self, table): + async def setup_table(self, table): if table in APPEND_ONLY_TABLES: # It's safe to just carry on inserting. - row = yield self.postgres_store.db.simple_select_one( + row = await self.postgres_store.db.simple_select_one( table="port_from_sqlite3", keyvalues={"table_name": table}, retcols=("forward_rowid", "backward_rowid"), @@ -207,10 +207,10 @@ class Porter(object): forward_chunk, already_ported, total_to_port, - ) = yield self._setup_sent_transactions() + ) = await self._setup_sent_transactions() backward_chunk = 0 else: - yield self.postgres_store.db.simple_insert( + await self.postgres_store.db.simple_insert( table="port_from_sqlite3", values={ "table_name": table, @@ -227,7 +227,7 @@ class Porter(object): backward_chunk = row["backward_rowid"] if total_to_port is None: - already_ported, total_to_port = yield self._get_total_count_to_port( + already_ported, total_to_port = await self._get_total_count_to_port( table, forward_chunk, backward_chunk ) else: @@ -238,9 +238,9 @@ class Porter(object): ) txn.execute("TRUNCATE %s CASCADE" % (table,)) - yield self.postgres_store.execute(delete_all) + await self.postgres_store.execute(delete_all) - yield self.postgres_store.db.simple_insert( + await self.postgres_store.db.simple_insert( table="port_from_sqlite3", values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0}, ) @@ -248,7 +248,7 @@ class Porter(object): forward_chunk = 1 backward_chunk = 0 - already_ported, total_to_port = yield self._get_total_count_to_port( + already_ported, total_to_port = await self._get_total_count_to_port( table, forward_chunk, backward_chunk ) @@ -275,7 +275,7 @@ class Porter(object): self.progress.add_table(table, postgres_size, table_size) if table == "event_search": - yield self.handle_search_table( + await self.handle_search_table( postgres_size, table_size, forward_chunk, backward_chunk ) return @@ -294,7 +294,7 @@ class Porter(object): if table == "user_directory_stream_pos": # We need to make sure there is a single row, `(X, null), as that is # what synapse expects to be there. - yield self.postgres_store.db.simple_insert( + await self.postgres_store.db.simple_insert( table=table, values={"stream_id": None} ) self.progress.update(table, table_size) # Mark table as done @@ -335,7 +335,7 @@ class Porter(object): return headers, forward_rows, backward_rows - headers, frows, brows = yield self.sqlite_store.db.runInteraction( + headers, frows, brows = await self.sqlite_store.db.runInteraction( "select", r ) @@ -361,7 +361,7 @@ class Porter(object): }, ) - yield self.postgres_store.execute(insert) + await self.postgres_store.execute(insert) postgres_size += len(rows) @@ -390,7 +390,7 @@ class Porter(object): return headers, rows - headers, rows = yield self.sqlite_store.db.runInteraction("select", r) + headers, rows = await self.sqlite_store.db.runInteraction("select", r) if rows: forward_chunk = rows[-1][0] + 1 @@ -438,7 +438,7 @@ class Porter(object): }, ) - yield self.postgres_store.execute(insert) + await self.postgres_store.execute(insert) postgres_size += len(rows) @@ -480,7 +480,7 @@ class Porter(object): def run_background_updates_on_postgres(self): # Manually apply all background updates on the PostgreSQL database. postgres_ready = ( - yield self.postgres_store.db.updates.has_completed_background_updates() + await self.postgres_store.db.updates.has_completed_background_updates() ) if not postgres_ready: @@ -489,8 +489,10 @@ class Porter(object): self.progress.set_state("Running background updates on PostgreSQL") while not postgres_ready: - yield self.postgres_store.db.updates.do_next_background_update(100) - postgres_ready = yield ( + await defer.ensureDeferred( + self.postgres_store.db.updates.do_next_background_update(100) + ) + postgres_ready = await ( self.postgres_store.db.updates.has_completed_background_updates() ) @@ -505,7 +507,7 @@ class Porter(object): # Check if all background updates are done, abort if not. updates_complete = ( - yield self.sqlite_store.db.updates.has_completed_background_updates() + await self.sqlite_store.db.updates.has_completed_background_updates() ) if not updates_complete: raise Exception( @@ -518,7 +520,7 @@ class Porter(object): self.hs_config.get_single_database() ) - yield self.run_background_updates_on_postgres() + await self.run_background_updates_on_postgres() self.progress.set_state("Creating port tables") @@ -546,22 +548,22 @@ class Porter(object): ) try: - yield self.postgres_store.db.runInteraction("alter_table", alter_table) + await self.postgres_store.db.runInteraction("alter_table", alter_table) except Exception: # On Error Resume Next pass - yield self.postgres_store.db.runInteraction( + await self.postgres_store.db.runInteraction( "create_port_table", create_port_table ) # Step 2. Get tables. self.progress.set_state("Fetching tables") - sqlite_tables = yield self.sqlite_store.db.simple_select_onecol( + sqlite_tables = await self.sqlite_store.db.simple_select_onecol( table="sqlite_master", keyvalues={"type": "table"}, retcol="name" ) - postgres_tables = yield self.postgres_store.db.simple_select_onecol( + postgres_tables = await self.postgres_store.db.simple_select_onecol( table="information_schema.tables", keyvalues={}, retcol="distinct table_name", @@ -572,24 +574,21 @@ class Porter(object): # Step 3. Figure out what still needs copying self.progress.set_state("Checking on port progress") - setup_res = yield defer.gatherResults( - [ + setup_res = await asyncio.gather( + *[ self.setup_table(table) for table in tables if table not in ["schema_version", "applied_schema_deltas"] and not table.startswith("sqlite_") ], - consumeErrors=True, ) # Step 4. Do the copying. self.progress.set_state("Copying to postgres") - yield defer.gatherResults( - [self.handle_table(*res) for res in setup_res], consumeErrors=True - ) + await asyncio.gather(*[self.handle_table(*res) for res in setup_res]) # Step 5. Do final post-processing - yield self._setup_state_group_id_seq() + await self._setup_state_group_id_seq() self.progress.done() except Exception: @@ -655,7 +654,7 @@ class Porter(object): return headers, [r for r in rows if r[ts_ind] < yesterday] - headers, rows = yield self.sqlite_store.db.runInteraction("select", r) + headers, rows = await self.sqlite_store.db.runInteraction("select", r) rows = self._convert_rows("sent_transactions", headers, rows) @@ -668,7 +667,7 @@ class Porter(object): txn, "sent_transactions", headers[1:], rows ) - yield self.postgres_store.execute(insert) + await self.postgres_store.execute(insert) else: max_inserted_rowid = 0 @@ -685,10 +684,10 @@ class Porter(object): else: return 1 - next_chunk = yield self.sqlite_store.execute(get_start_id) + next_chunk = await self.sqlite_store.execute(get_start_id) next_chunk = max(max_inserted_rowid + 1, next_chunk) - yield self.postgres_store.db.simple_insert( + await self.postgres_store.db.simple_insert( table="port_from_sqlite3", values={ "table_name": "sent_transactions", @@ -704,7 +703,7 @@ class Porter(object): (size,) = txn.fetchone() return int(size) - remaining_count = yield self.sqlite_store.execute(get_sent_table_size) + remaining_count = await self.sqlite_store.execute(get_sent_table_size) total_count = remaining_count + inserted_rows @@ -712,11 +711,11 @@ class Porter(object): @defer.inlineCallbacks def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk): - frows = yield self.sqlite_store.execute_sql( + frows = await self.sqlite_store.execute_sql( "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk ) - brows = yield self.sqlite_store.execute_sql( + brows = await self.sqlite_store.execute_sql( "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk ) @@ -724,7 +723,7 @@ class Porter(object): @defer.inlineCallbacks def _get_already_ported_count(self, table): - rows = yield self.postgres_store.execute_sql( + rows = await self.postgres_store.execute_sql( "SELECT count(*) FROM %s" % (table,) ) @@ -732,12 +731,11 @@ class Porter(object): @defer.inlineCallbacks def _get_total_count_to_port(self, table, forward_chunk, backward_chunk): - remaining, done = yield defer.gatherResults( - [ + remaining, done = await asyncio.gather( + *[ self._get_remaining_count_to_port(table, forward_chunk, backward_chunk), self._get_already_ported_count(table), ], - consumeErrors=True, ) remaining = int(remaining) if remaining else 0 @@ -1009,7 +1007,10 @@ if __name__ == "__main__": hs_config=config, ) - reactor.callWhenRunning(porter.run) + def run(): + return defer.ensureDeferred(porter.run()) + + reactor.callWhenRunning(run) reactor.run() From 07a33a5f6a92ea480d9b32bf9d38ef8b509fcc96 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 17:09:51 +0000 Subject: [PATCH 04/11] Port update_database to async/await --- scripts-dev/update_database | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/scripts-dev/update_database b/scripts-dev/update_database index 1d62f0403afc..b03f6362166e 100755 --- a/scripts-dev/update_database +++ b/scripts-dev/update_database @@ -81,15 +81,17 @@ if __name__ == "__main__": hs.setup() store = hs.get_datastore() - @defer.inlineCallbacks - def run_background_updates(): - yield store.db.updates.run_background_updates(sleep=False) + async def run_background_updates(): + await store.db.updates.run_background_updates(sleep=False) # Stop the reactor to exit the script once every background update is run. reactor.stop() - # Apply all background updates on the database. - reactor.callWhenRunning( - lambda: run_as_background_process("background_updates", run_background_updates) - ) + def run(): + # Apply all background updates on the database. + defer.ensureDeferred( + run_as_background_process("background_updates", run_background_updates) + ) + + reactor.callWhenRunning(run) reactor.run() From 3fa6e5477049e6f91a22a15c6c92331da5303f71 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 17:26:07 +0000 Subject: [PATCH 05/11] Add version string to mocked homeservers --- scripts-dev/update_database | 4 ++++ scripts/synapse_port_db | 3 +++ 2 files changed, 7 insertions(+) diff --git a/scripts-dev/update_database b/scripts-dev/update_database index b03f6362166e..94aa8758b48f 100755 --- a/scripts-dev/update_database +++ b/scripts-dev/update_database @@ -22,10 +22,12 @@ import yaml from twisted.internet import defer, reactor +import synapse from synapse.config.homeserver import HomeServerConfig from synapse.metrics.background_process_metrics import run_as_background_process from synapse.server import HomeServer from synapse.storage import DataStore +from synapse.util.versionstring import get_version_string logger = logging.getLogger("update_database") @@ -38,6 +40,8 @@ class MockHomeserver(HomeServer): config.server_name, reactor=reactor, config=config, **kwargs ) + self.version_string = "Synapse/"+get_version_string(synapse) + if __name__ == "__main__": parser = argparse.ArgumentParser( diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 71b92f0d0773..2a2bedb8cb2d 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -31,6 +31,7 @@ import yaml from twisted.enterprise import adbapi from twisted.internet import defer, reactor +import synapse from synapse.config.database import DatabaseConnectionConfig from synapse.config.homeserver import HomeServerConfig from synapse.logging.context import PreserveLoggingContext @@ -62,6 +63,7 @@ from synapse.storage.database import Database, make_conn from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database from synapse.util import Clock +from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse_port_db") @@ -178,6 +180,7 @@ class MockHomeserver: self.clock = Clock(reactor) self.config = config self.hostname = config.server_name + self.version_string = "Synapse/"+get_version_string(synapse) def get_clock(self): return self.clock From 068d185aacb44848cc909fbc5080db0bd8244848 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 17:26:45 +0000 Subject: [PATCH 06/11] Remove unused imports --- scripts/synapse_port_db | 3 --- 1 file changed, 3 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 2a2bedb8cb2d..bb1763e47178 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -28,14 +28,11 @@ from six import string_types import yaml -from twisted.enterprise import adbapi from twisted.internet import defer, reactor import synapse from synapse.config.database import DatabaseConnectionConfig from synapse.config.homeserver import HomeServerConfig -from synapse.logging.context import PreserveLoggingContext -from synapse.storage._base import LoggingTransaction from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore from synapse.storage.data_stores.main.deviceinbox import ( DeviceInboxBackgroundUpdateStore, From f77d2d59810b868a4b0c9d416be47419d5f0f7cb Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 18:26:17 +0000 Subject: [PATCH 07/11] Convert overseen bits to async/await --- scripts/synapse_port_db | 58 ++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index bb1763e47178..8c0f098806a6 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -17,7 +17,6 @@ # limitations under the License. import argparse -import asyncio import curses import logging import sys @@ -252,12 +251,9 @@ class Porter(object): table, forward_chunk, backward_chunk ) - defer.returnValue( - (table, already_ported, total_to_port, forward_chunk, backward_chunk) - ) + return table, already_ported, total_to_port, forward_chunk, backward_chunk - @defer.inlineCallbacks - def handle_table( + async def handle_table( self, table, postgres_size, table_size, forward_chunk, backward_chunk ): logger.info( @@ -369,8 +365,7 @@ class Porter(object): else: return - @defer.inlineCallbacks - def handle_search_table( + async def handle_search_table( self, postgres_size, table_size, forward_chunk, backward_chunk ): select = ( @@ -476,8 +471,7 @@ class Porter(object): return store - @defer.inlineCallbacks - def run_background_updates_on_postgres(self): + async def run_background_updates_on_postgres(self): # Manually apply all background updates on the PostgreSQL database. postgres_ready = ( await self.postgres_store.db.updates.has_completed_background_updates() @@ -489,15 +483,12 @@ class Porter(object): self.progress.set_state("Running background updates on PostgreSQL") while not postgres_ready: - await defer.ensureDeferred( - self.postgres_store.db.updates.do_next_background_update(100) - ) + await self.postgres_store.db.updates.do_next_background_update(100) postgres_ready = await ( self.postgres_store.db.updates.has_completed_background_updates() ) - @defer.inlineCallbacks - def run(self): + async def run(self): try: # we allow people to port away from outdated versions of sqlite. self.sqlite_store = self.build_db_store( @@ -632,8 +623,7 @@ class Porter(object): return outrows - @defer.inlineCallbacks - def _setup_sent_transactions(self): + async def _setup_sent_transactions(self): # Only save things from the last day yesterday = int(time.time() * 1000) - 86400000 @@ -707,10 +697,9 @@ class Porter(object): total_count = remaining_count + inserted_rows - defer.returnValue((next_chunk, inserted_rows, total_count)) + return next_chunk, inserted_rows, total_count - @defer.inlineCallbacks - def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk): + async def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk): frows = await self.sqlite_store.execute_sql( "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk ) @@ -719,29 +708,34 @@ class Porter(object): "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk ) - defer.returnValue(frows[0][0] + brows[0][0]) + return frows[0][0] + brows[0][0] - @defer.inlineCallbacks - def _get_already_ported_count(self, table): + async def _get_already_ported_count(self, table): rows = await self.postgres_store.execute_sql( "SELECT count(*) FROM %s" % (table,) ) - defer.returnValue(rows[0][0]) + return rows[0][0] - @defer.inlineCallbacks - def _get_total_count_to_port(self, table, forward_chunk, backward_chunk): - remaining, done = await asyncio.gather( - *[ - self._get_remaining_count_to_port(table, forward_chunk, backward_chunk), - self._get_already_ported_count(table), - ], + async def _get_total_count_to_port(self, table, forward_chunk, backward_chunk): + remaining, done = await make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background( + self._get_remaining_count_to_port, + table, + forward_chunk, + backward_chunk, + ), + run_in_background(self._get_already_ported_count, table), + ], + ) ) remaining = int(remaining) if remaining else 0 done = int(done) if done else 0 - defer.returnValue((done, remaining + done)) + return done, remaining + done def _setup_state_group_id_seq(self): def r(txn): From a4d4e35b26a76fbc1f9f37de46968408d023626f Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 18:27:08 +0000 Subject: [PATCH 08/11] Fixup logging contexts --- scripts/synapse_port_db | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 8c0f098806a6..669fabe81268 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -60,6 +60,11 @@ from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database from synapse.util import Clock from synapse.util.versionstring import get_version_string +from synapse.logging.context import ( + LoggingContext, + make_deferred_yieldable, + run_in_background, +) logger = logging.getLogger("synapse_port_db") @@ -565,18 +570,26 @@ class Porter(object): # Step 3. Figure out what still needs copying self.progress.set_state("Checking on port progress") - setup_res = await asyncio.gather( - *[ - self.setup_table(table) - for table in tables - if table not in ["schema_version", "applied_schema_deltas"] - and not table.startswith("sqlite_") - ], + setup_res = await make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background(self.setup_table, table) + for table in tables + if table not in ["schema_version", "applied_schema_deltas"] + and not table.startswith("sqlite_") + ], + consumeErrors=True, + ) ) # Step 4. Do the copying. self.progress.set_state("Copying to postgres") - await asyncio.gather(*[self.handle_table(*res) for res in setup_res]) + await make_deferred_yieldable( + defer.gatherResults( + [run_in_background(self.handle_table, *res) for res in setup_res], + consumeErrors=True, + ) + ) # Step 5. Do final post-processing await self._setup_state_group_id_seq() @@ -1001,8 +1014,10 @@ if __name__ == "__main__": hs_config=config, ) + @defer.inlineCallbacks def run(): - return defer.ensureDeferred(porter.run()) + with LoggingContext("synapse_port_db_run"): + yield defer.ensureDeferred(porter.run()) reactor.callWhenRunning(run) From 75d60c2876e4f21f0ff18764d954631f2a18e38d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 18:31:59 +0000 Subject: [PATCH 09/11] Fix imports --- scripts/synapse_port_db | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 669fabe81268..9c2af889c482 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -32,6 +32,11 @@ from twisted.internet import defer, reactor import synapse from synapse.config.database import DatabaseConnectionConfig from synapse.config.homeserver import HomeServerConfig +from synapse.logging.context import ( + LoggingContext, + make_deferred_yieldable, + run_in_background, +) from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore from synapse.storage.data_stores.main.deviceinbox import ( DeviceInboxBackgroundUpdateStore, @@ -60,11 +65,6 @@ from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database from synapse.util import Clock from synapse.util.versionstring import get_version_string -from synapse.logging.context import ( - LoggingContext, - make_deferred_yieldable, - run_in_background, -) logger = logging.getLogger("synapse_port_db") From 8179108378f73f5ef72ce011da43685aefad31fb Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 20 Jan 2020 14:38:32 +0000 Subject: [PATCH 10/11] Add a way to print an error without raising an exception --- scripts/synapse_port_db | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 9c2af889c482..8ab58161afcc 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -129,6 +129,7 @@ APPEND_ONLY_TABLES = [ ] +end_error = None end_error_exec_info = None @@ -494,6 +495,7 @@ class Porter(object): ) async def run(self): + global end_error try: # we allow people to port away from outdated versions of sqlite. self.sqlite_store = self.build_db_store( @@ -506,11 +508,12 @@ class Porter(object): await self.sqlite_store.db.updates.has_completed_background_updates() ) if not updates_complete: - raise Exception( + end_error = ( "Pending background updates exist in the SQLite3 database." " Please start Synapse again and wait until every update has finished" " before running this script.\n" ) + return self.postgres_store = self.build_db_store( self.hs_config.get_single_database() @@ -595,8 +598,9 @@ class Porter(object): await self._setup_state_group_id_seq() self.progress.done() - except Exception: + except Exception as e: global end_error_exec_info + end_error = e end_error_exec_info = sys.exc_info() logger.exception("") finally: @@ -1028,7 +1032,11 @@ if __name__ == "__main__": else: start() - if end_error_exec_info: - exc_type, exc_value, exc_traceback = end_error_exec_info - traceback.print_exception(exc_type, exc_value, exc_traceback) + if end_error: + sys.stderr.write(end_error) + + if end_error_exec_info: + exc_type, exc_value, exc_traceback = end_error_exec_info + traceback.print_exception(exc_type, exc_value, exc_traceback) + sys.exit(5) From 0ff664884f591e9114fd9a668942054434a138aa Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 21 Jan 2020 12:42:12 +0000 Subject: [PATCH 11/11] Incorporate review --- scripts/synapse_port_db | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 8ab58161afcc..e8b698f3ffe5 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -129,7 +129,13 @@ APPEND_ONLY_TABLES = [ ] +# Error returned by the run function. Used at the top-level part of the script to +# handle errors and return codes. end_error = None +# The exec_info for the error, if any. If error is defined but not exec_info the script +# will show only the error message without the stacktrace, if exec_info is defined but +# not the error then the script will show nothing outside of what's printed in the run +# function. If both are defined, the script will print both the error and the stacktrace. end_error_exec_info = None @@ -495,7 +501,14 @@ class Porter(object): ) async def run(self): + """Ports the SQLite database to a PostgreSQL database. + + When a fatal error is met, its message is assigned to the global "end_error" + variable. When this error comes with a stacktrace, its exec_info is assigned to + the global "end_error_exec_info" variable. + """ global end_error + try: # we allow people to port away from outdated versions of sqlite. self.sqlite_store = self.build_db_store( @@ -1033,10 +1046,10 @@ if __name__ == "__main__": start() if end_error: - sys.stderr.write(end_error) - if end_error_exec_info: exc_type, exc_value, exc_traceback = end_error_exec_info traceback.print_exception(exc_type, exc_value, exc_traceback) + sys.stderr.write(end_error) + sys.exit(5)