From 0309c5646c2d58e8d9c41299e4762a02c211f512 Mon Sep 17 00:00:00 2001 From: Varjitt Jeeva Date: Thu, 29 Feb 2024 18:49:36 -0500 Subject: [PATCH] fix: various small fixes found from new integration testing (#409) --- .gitignore | 1 + CONTRIBUTING.md | 2 +- Makefile | 3 + docker-compose.yml | 123 ++++-- pgbelt/cmd/sync.py | 14 +- pgbelt/util/pglogical.py | 3 +- pgbelt/util/postgres.py | 28 +- tests/integration/conftest.py | 361 +++++++---------- tests/integration/files/test_schema_data.sql | 89 ++-- tests/integration/test_integration.py | 404 ++++++++++++------- 10 files changed, 593 insertions(+), 435 deletions(-) diff --git a/.gitignore b/.gitignore index 56d8640..bc7090d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ dist/* logs/* configs/testdc/* schemas/* +tables/* .python-version .mypy_cache __pycache__/ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f278bb8..35687c3 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -72,7 +72,7 @@ This feature is very useful when you are making code changes to `pgbelt` and wan To do this, this local development feature uses `docker` and `docker-compose` to spin up the following: -1. Two Postgres Containers with networking configured between each other +1. 4 Pairs of Postgres Containers with networking configured between each other. These 4 sets are used for the following integration test cases: public schema & whole DB, public schema & exodus-style migration (only a subset is moved), non-public schema & whole DB, non-public schema & exodus-style migration (only a subset is moved) 2. One container loaded with your local copy of `pgbelt`, built and installed for CLI usage. Simply run the following to spin the above up and drop yourself into your container with your local `pgbelt`: diff --git a/Makefile b/Makefile index 9fae612..475f000 100644 --- a/Makefile +++ b/Makefile @@ -29,3 +29,6 @@ clean-docker: ## Stop and remove all docker containers and images made from loca # Finally, we OR true because the pre-commit errors when finding stuff to fix, but that's exactly what we want it to do. generate-usage-docs: ## Generate usage docs pip3 install typer-cli && typer pgbelt/main.py utils docs --name belt > docs/usage.md && pre-commit run --files docs/usage.md || true + +clean-local-files: # Clean out local files that were generated by local-dev command or local runs of pytest + rm -rf schemas/ configs/testdc/ logs/ tables/ diff --git a/docker-compose.yml b/docker-compose.yml index d51c4e9..045c85d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: "3" services: - db-src: + db-src-set-public-schema-full: &db image: autodesk/postgres-pglogical-docker:13 environment: POSTGRES_PASSWORD: postgres @@ -16,20 +16,47 @@ services: timeout: 5s retries: 5 - db-dst: - image: autodesk/postgres-pglogical-docker:13 - environment: - POSTGRES_PASSWORD: postgres - PGDATA: /tmp/pgdata - restart: on-failure + db-dst-set-public-schema-full: + <<: *db networks: datacenter: ipv4_address: 10.5.0.6 - healthcheck: - test: ["CMD-SHELL", "pg_isready"] - interval: 10s - timeout: 5s - retries: 5 + + db-src-set-non-public-schema-full: + <<: *db + networks: + datacenter: + ipv4_address: 10.5.0.7 + + db-dst-set-non-public-schema-full: + <<: *db + networks: + datacenter: + ipv4_address: 10.5.0.8 + + db-src-set-public-schema-exodus: + <<: *db + networks: + datacenter: + ipv4_address: 10.5.0.9 + + db-dst-set-public-schema-exodus: + <<: *db + networks: + datacenter: + ipv4_address: 10.5.0.10 + + db-src-set-non-public-schema-exodus: + <<: *db + networks: + datacenter: + ipv4_address: 10.5.0.11 + + db-dst-set-non-public-schema-exodus: + <<: *db + networks: + datacenter: + ipv4_address: 10.5.0.12 flake8: image: autodesk/pgbelt:latest @@ -46,23 +73,31 @@ services: tests: image: autodesk/pgbelt:latest environment: - TEST_PG_SRC_HOST: localhost - TEST_PG_SRC_IP: 10.5.0.5 - TEST_PG_SRC_DB: src - TEST_PG_SRC_PORT: 5432 - TEST_PG_SRC_ROOT_USERNAME: postgres - TEST_PG_SRC_ROOT_PASSWORD: postgres - TEST_PG_DST_HOST: localhost - TEST_PG_DST_IP: 10.5.0.6 - TEST_PG_DST_DB: dst - TEST_PG_DST_PORT: 5432 - TEST_PG_DST_ROOT_USERNAME: postgres - TEST_PG_DST_ROOT_PASSWORD: postgres + PUBLIC_FULL_SRC_IP: 10.5.0.5 + PUBLIC_FULL_DST_IP: 10.5.0.6 + NONPUBLIC_FULL_SRC_IP: 10.5.0.7 + NONPUBLIC_FULL_DST_IP: 10.5.0.8 + PUBLIC_EXODUS_SRC_IP: 10.5.0.9 + PUBLIC_EXODUS_DST_IP: 10.5.0.10 + NONPUBLIC_EXODUS_SRC_IP: 10.5.0.11 + NONPUBLIC_EXODUS_DST_IP: 10.5.0.12 command: poetry run pytest --cov=pgbelt tests/ depends_on: - db-src: + db-src-set-public-schema-full: + condition: service_healthy + db-dst-set-public-schema-full: + condition: service_healthy + db-src-set-non-public-schema-full: condition: service_healthy - db-dst: + db-dst-set-non-public-schema-full: + condition: service_healthy + db-src-set-public-schema-exodus: + condition: service_healthy + db-dst-set-public-schema-exodus: + condition: service_healthy + db-src-set-non-public-schema-exodus: + condition: service_healthy + db-dst-set-non-public-schema-exodus: condition: service_healthy networks: - datacenter @@ -70,23 +105,31 @@ services: localtest: image: autodesk/pgbelt:latest environment: - TEST_PG_SRC_HOST: localhost - TEST_PG_SRC_IP: 10.5.0.5 - TEST_PG_SRC_DB: src - TEST_PG_SRC_PORT: 5432 - TEST_PG_SRC_ROOT_USERNAME: postgres - TEST_PG_SRC_ROOT_PASSWORD: postgres - TEST_PG_DST_HOST: localhost - TEST_PG_DST_IP: 10.5.0.6 - TEST_PG_DST_DB: dst - TEST_PG_DST_PORT: 5432 - TEST_PG_DST_ROOT_USERNAME: postgres - TEST_PG_DST_ROOT_PASSWORD: postgres + PUBLIC_FULL_SRC_IP: 10.5.0.5 + PUBLIC_FULL_DST_IP: 10.5.0.6 + NONPUBLIC_FULL_SRC_IP: 10.5.0.7 + NONPUBLIC_FULL_DST_IP: 10.5.0.8 + PUBLIC_EXODUS_SRC_IP: 10.5.0.9 + PUBLIC_EXODUS_DST_IP: 10.5.0.10 + NONPUBLIC_EXODUS_SRC_IP: 10.5.0.11 + NONPUBLIC_EXODUS_DST_IP: 10.5.0.12 command: bash -c "cd /pgbelt-volume/ && poetry run python3 tests/integration/conftest.py --non-public-schema && pip3 install -e . && bash" depends_on: - db-src: + db-src-set-public-schema-full: + condition: service_healthy + db-dst-set-public-schema-full: + condition: service_healthy + db-src-set-non-public-schema-full: + condition: service_healthy + db-dst-set-non-public-schema-full: + condition: service_healthy + db-src-set-public-schema-exodus: + condition: service_healthy + db-dst-set-public-schema-exodus: + condition: service_healthy + db-src-set-non-public-schema-exodus: condition: service_healthy - db-dst: + db-dst-set-non-public-schema-exodus: condition: service_healthy networks: - datacenter diff --git a/pgbelt/cmd/sync.py b/pgbelt/cmd/sync.py index 40b104b..cdffe5f 100644 --- a/pgbelt/cmd/sync.py +++ b/pgbelt/cmd/sync.py @@ -28,7 +28,19 @@ async def _sync_sequences( src_logger: Logger, dst_logger: Logger, ) -> None: - seq_vals = await dump_sequences(src_pool, targeted_sequences, schema, src_logger) + + # Note: When in an exodus migration with a non-public schema, the sequence names must be prefixed with the schema name. + # This may not be done by the user, so we must do it here. + proper_sequence_names = None + if targeted_sequences is not None: + proper_sequence_names = [] + for seq in targeted_sequences: + if f"{schema}." not in seq: + proper_sequence_names.append(f"{schema}.{seq}") + else: + proper_sequence_names.append(seq) + + seq_vals = await dump_sequences(src_pool, proper_sequence_names, schema, src_logger) await load_sequences(dst_pool, seq_vals, dst_logger) diff --git a/pgbelt/util/pglogical.py b/pgbelt/util/pglogical.py index 4605883..12855fd 100644 --- a/pgbelt/util/pglogical.py +++ b/pgbelt/util/pglogical.py @@ -78,8 +78,9 @@ async def grant_pgl(pool: Pool, tables: list[str], schema: str, logger: Logger) async with pool.acquire() as conn: async with conn.transaction(): if tables: + tables_with_schema = [f"{schema}.{table}" for table in tables] await conn.execute( - f"GRANT ALL ON TABLE {','.join(tables)} TO pglogical;" + f"GRANT ALL ON TABLE {','.join(tables_with_schema)} TO pglogical;" ) else: await conn.execute( diff --git a/pgbelt/util/postgres.py b/pgbelt/util/postgres.py index f76cd57..429a5a1 100644 --- a/pgbelt/util/postgres.py +++ b/pgbelt/util/postgres.py @@ -12,20 +12,26 @@ async def dump_sequences( return a dictionary of sequence names mapped to their last values """ logger.info("Dumping sequence values...") - seqs = await pool.fetch("SELECT sequence_name FROM information_schema.sequences;") + # Get all sequences in the schema + seqs = await pool.fetch( + f""" + SELECT '{schema}' || '.' || sequence_name + FROM information_schema.sequences + WHERE sequence_schema = '{schema}'; + """ + ) seq_vals = {} + final_seqs = [] + # If we get a list of targeted sequences, we only want to dump whichever of those are found in the database and schema. if targeted_sequences: - for seq in [r[0] for r in seqs if r[0] in targeted_sequences]: - seq_vals[seq.strip()] = await pool.fetchval( - f"SELECT last_value FROM {schema}.{seq};" - ) - else: - for seq in [r[0] for r in seqs]: - seq_stripped = seq.strip() - seq_vals[f"{schema}.{seq_stripped}"] = await pool.fetchval( - f"SELECT last_value FROM {schema}.{seq};" - ) + final_seqs = [r[0] for r in seqs if r[0] in targeted_sequences] + else: # Otherwise, we want to dump all sequences found in the schema. + final_seqs = [r[0] for r in seqs] + + for seq in final_seqs: + res = await pool.fetchval(f"SELECT last_value FROM {seq};") + seq_vals[seq.strip()] = res logger.debug(f"Dumped sequences: {seq_vals}") return seq_vals diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 4fc316b..c73faf3 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -11,130 +11,153 @@ from pgbelt.config.models import User -async def _create_dbupgradeconfig( - non_public_schema: bool = False, exodus: bool = False -) -> DbupgradeConfig: +async def _create_dbupgradeconfigs() -> dict[str, DbupgradeConfig]: """ - Function for creating a DbupgradeConfig object for testing. + Function for creating DbupgradeConfig objects for testing. We also save it to disk since the pgbelt commands will look for it there. - If you want to test with a non-public schema, set non_public_schema to True. - If you want to test the migration of a subset of tables, set exodus to True. + This will create 4 sets of DBs: public vs non-public schema, and exodus-style vs full migration. """ - config = DbupgradeConfig( - db="testdb", - dc="testdc", - schema_name="non_public_schema" if non_public_schema else "public", - tables=["users2"] if exodus else [], - sequences=["users2_id_seq"] if exodus else [], - ) + # Set the common kwargs at the DBUpgradeConfig level + db_upgrade_config_kwargs = { + "dc": "testdc", + } - config.src = DbConfig( - host=environ["TEST_PG_SRC_HOST"], - ip=environ["TEST_PG_SRC_IP"], - db=environ["TEST_PG_SRC_DB"], - port=environ["TEST_PG_SRC_PORT"], - root_user=User( - name=environ["TEST_PG_SRC_ROOT_USERNAME"], - pw=environ["TEST_PG_SRC_ROOT_PASSWORD"], + # Set the common config kwargs for the individual DBs + # We set many of the args here in the actual DB containers, so we don't need to pull these vars out to docker-compose. + common_db_config_kwargs = { + "host": "localhost", + "port": "5432", + # This is the default credential for the admin user in the Postgres containers used for testing. + "root_user": User( + name="postgres", + pw="postgres", ), - # Owner will not be made in the Postgres containers used for testing, so we will define them - owner_user=User(name="owner", pw="ownerpassword"), - # Pglogical user info is set in the db by pgbelt, so we defined this stuff here - pglogical_user=User(name="pglogical", pw="pglogicalpassword"), - ) + # We will create the owner_user in the DBs via the integration test setup. + "owner_user": User(name="owner", pw="ownerpassword"), + "pglogical_user": User(name="pglogical", pw="pglogicalpassword"), + "db": "testdb", + } + + # We're treating DB pairs as sets here. + sets = [ + "public-full", + "nonpublic-full", + "public-exodus", + "nonpublic-exodus", + ] + + configs = {} + for s in sets: + db_upgrade_config_kwargs["db"] = f"testdb-{s}" + db_upgrade_config_kwargs["schema_name"] = ( + "non_public_schema" if "nonpublic" in s else "public" + ) + db_upgrade_config_kwargs["tables"] = ["users"] if "exodus" in s else None + db_upgrade_config_kwargs["sequences"] = ( + ["users_id_seq"] if "exodus" in s else None + ) + config = DbupgradeConfig(**db_upgrade_config_kwargs) - config.dst = DbConfig( - host=environ["TEST_PG_DST_HOST"], - ip=environ["TEST_PG_DST_IP"], - db=environ["TEST_PG_DST_DB"], - port=environ["TEST_PG_DST_PORT"], - root_user=User( - name=environ["TEST_PG_DST_ROOT_USERNAME"], - pw=environ["TEST_PG_DST_ROOT_PASSWORD"], - ), - # Owner will not be made in the Postgres containers used for testing, so we will define them - owner_user=User(name="owner", pw="ownerpassword"), - # Pglogical user info is set in the db by pgbelt, so we defined this stuff here - pglogical_user=User(name="pglogical", pw="pglogicalpassword"), - ) + # The IP addresses are set in the docker-compose file, so we can pull them out of the environment. They follow the following pattern: + # (NON)PUBLIC___IP + config.src = DbConfig( + ip=environ[f"{s.split('-')[0].upper()}_{s.split('-')[1].upper()}_SRC_IP"], + **common_db_config_kwargs, + ) + config.dst = DbConfig( + ip=environ[f"{s.split('-')[0].upper()}_{s.split('-')[1].upper()}_DST_IP"], + **common_db_config_kwargs, + ) - # Save to disk - await config.save() + # Save the config to disk + await config.save() + configs[s] = config - return config + return configs -async def _prepare_databases(config: DbupgradeConfig, non_public_schema: bool = False): +async def _prepare_databases(configs: dict[str, DbupgradeConfig]) -> None: """ - Given the root URIs for the source and destination databases: + Given a dict of various configs for database pairs, prepare the following: 1. Create the owner user on both databases 2. Create the Postgres DB on both databases - 3. Load the test data into the source database + 3. If the schema is non-public, create the schema on both databases + 4. Load the test data into the source database """ # Load test data and schema SQL with open("tests/integration/files/test_schema_data.sql") as f: - test_schema_data = f.read() - - # Just replace all the `public.` with `non_public_schema.` if non_public_schema is True - if non_public_schema: - test_schema_data = test_schema_data.replace("public.", "non_public_schema.") + base_test_schema_data = f.read() - # Get the root connections to the root DBs - src_root_uri_with_root_db, dst_root_uri_with_root_db = _root_uris(config) - src_root_user_root_db_pool, dst_root_user_root_db_pool = await asyncio.gather( - create_pool(src_root_uri_with_root_db, min_size=1), - create_pool(dst_root_uri_with_root_db, min_size=1), - ) + for config in configs.values(): - # Create the owner user - await asyncio.gather( - src_root_user_root_db_pool.execute( - f"CREATE ROLE {config.src.owner_user.name} LOGIN PASSWORD '{config.src.owner_user.pw}'", - ), - dst_root_user_root_db_pool.execute( - f"CREATE ROLE {config.dst.owner_user.name} LOGIN PASSWORD '{config.dst.owner_user.pw}'", - ), - ) - - # Create the databases - await asyncio.gather( - src_root_user_root_db_pool.execute( - f"CREATE DATABASE src WITH OWNER = {config.src.owner_user.name}" - ), - dst_root_user_root_db_pool.execute( - f"CREATE DATABASE dst WITH OWNER = {config.dst.owner_user.name}" - ), - ) - - src_owner_user_logical_db_pool, dst_owner_user_logical_db_pool = ( - await asyncio.gather( - create_pool(config.src.owner_uri, min_size=1), - create_pool(config.dst.owner_uri, min_size=1), + # Get the root connections to the root DBs + src_root_uri_with_root_db, dst_root_uri_with_root_db = _root_uris(config) + src_root_user_root_db_pool, dst_root_user_root_db_pool = await asyncio.gather( + create_pool(src_root_uri_with_root_db, min_size=1), + create_pool(dst_root_uri_with_root_db, min_size=1), ) - ) - # Create the non_public_schema if non_public_schema is True - if non_public_schema: + # Create the owner user await asyncio.gather( - src_owner_user_logical_db_pool.execute(f"CREATE SCHEMA non_public_schema"), - dst_owner_user_logical_db_pool.execute(f"CREATE SCHEMA non_public_schema"), + src_root_user_root_db_pool.execute( + f"CREATE ROLE {config.src.owner_user.name} LOGIN PASSWORD '{config.src.owner_user.pw}'", + ), + dst_root_user_root_db_pool.execute( + f"CREATE ROLE {config.dst.owner_user.name} LOGIN PASSWORD '{config.dst.owner_user.pw}'", + ), ) + + # Create the databases await asyncio.gather( - src_owner_user_logical_db_pool.execute( - f"GRANT CREATE ON SCHEMA non_public_schema TO {config.src.owner_user.name}" + src_root_user_root_db_pool.execute( + f"CREATE DATABASE {config.src.db} WITH OWNER = {config.src.owner_user.name}" ), - dst_owner_user_logical_db_pool.execute( - f"GRANT CREATE ON SCHEMA non_public_schema TO {config.dst.owner_user.name}" + dst_root_user_root_db_pool.execute( + f"CREATE DATABASE {config.dst.db} WITH OWNER = {config.dst.owner_user.name}" ), ) - # With the db made, load data into src - await asyncio.gather( - src_owner_user_logical_db_pool.execute(test_schema_data), - ) + src_owner_user_logical_db_pool, dst_owner_user_logical_db_pool = ( + await asyncio.gather( + create_pool(config.src.owner_uri, min_size=1), + create_pool(config.dst.owner_uri, min_size=1), + ) + ) + + # Create the non-public schema if the schema_name is not "public" + if config.schema_name != "public": + await asyncio.gather( + src_owner_user_logical_db_pool.execute( + f"CREATE SCHEMA {config.schema_name}" + ), + dst_owner_user_logical_db_pool.execute( + f"CREATE SCHEMA {config.schema_name}" + ), + ) + await asyncio.gather( + src_owner_user_logical_db_pool.execute( + f"GRANT CREATE ON SCHEMA {config.schema_name} TO {config.src.owner_user.name}" + ), + dst_owner_user_logical_db_pool.execute( + f"GRANT CREATE ON SCHEMA {config.schema_name} TO {config.dst.owner_user.name}" + ), + ) + + # With the db made, load data into src + test_schema_data = base_test_schema_data + + # If we're testing with a non-public schema, we need to replace the schema name in our schema template. + if config.schema_name != "public": + test_schema_data = test_schema_data.replace( + "public.", f"{config.schema_name}." + ) + + await asyncio.gather( + src_owner_user_logical_db_pool.execute(test_schema_data), + ) def _root_uris(config: DbupgradeConfig) -> tuple[str, str]: @@ -157,130 +180,60 @@ def _root_uris(config: DbupgradeConfig) -> tuple[str, str]: return src_root_uri_with_root_db, dst_root_uri_with_root_db -async def _empty_out_database(config: DbupgradeConfig) -> None: +async def _empty_out_databases(configs: dict[str, DbupgradeConfig]) -> None: """ This code will DROP the databases specified in the config, DROP the owner role specified in the config and any permissions with it. """ - # Get the root URIs - src_root_uri_with_root_db, dst_root_uri_with_root_db = _root_uris(config) - - async with create_pool(src_root_uri_with_root_db, min_size=1) as pool: - async with pool.acquire() as conn: - await conn.execute( - f"DROP DATABASE src WITH (FORCE);", - ) - await conn.execute( - f"DROP OWNED BY {config.src.owner_user.name};", - ) - await conn.execute( - f"DROP ROLE {config.src.owner_user.name};", - ) - - async with create_pool(dst_root_uri_with_root_db, min_size=1) as pool: - async with pool.acquire() as conn: - await conn.execute( - f"DROP DATABASE dst WITH (FORCE);", - ) - await conn.execute( - f"DROP OWNED BY {config.dst.owner_user.name};", - ) - await conn.execute( - f"DROP ROLE {config.dst.owner_user.name};", - ) - - -@pytest.mark.asyncio -@pytest_asyncio.fixture -async def setup_db_upgrade_config_public_schema(): - """ - Fixture for preparing the test databases and creating a DbupgradeConfig object. - This fixture will also clean up after the test (removing local files and tearing down against the DBs). - """ - - # Create the config - test_db_upgrade_config = await _create_dbupgradeconfig() - - # Prepare the databases - await _prepare_databases(test_db_upgrade_config) - - yield test_db_upgrade_config - - # Clear out all data and stuff in the database containers :shrug: - await _empty_out_database(test_db_upgrade_config) - - # Delete the config that was saved to disk by the setup - rmtree("configs/testdc") - rmtree("schemas/") + for config in configs.values(): + + # Get the root URIs + src_root_uri_with_root_db, dst_root_uri_with_root_db = _root_uris(config) + + async with create_pool(src_root_uri_with_root_db, min_size=1) as pool: + async with pool.acquire() as conn: + await conn.execute( + f"DROP DATABASE {config.src.db} WITH (FORCE);", + ) + await conn.execute( + f"DROP OWNED BY {config.src.owner_user.name};", + ) + await conn.execute( + f"DROP ROLE {config.src.owner_user.name};", + ) + + async with create_pool(dst_root_uri_with_root_db, min_size=1) as pool: + async with pool.acquire() as conn: + await conn.execute( + f"DROP DATABASE {config.dst.db} WITH (FORCE);", + ) + await conn.execute( + f"DROP OWNED BY {config.dst.owner_user.name};", + ) + await conn.execute( + f"DROP ROLE {config.dst.owner_user.name};", + ) @pytest.mark.asyncio @pytest_asyncio.fixture -async def setup_db_upgrade_config_non_public_schema(): - """ - Same as above, but with a non-public schema. - """ - - # Create the config - test_db_upgrade_config = await _create_dbupgradeconfig(non_public_schema=True) - - # Prepare the databases - await _prepare_databases(test_db_upgrade_config, non_public_schema=True) - - yield test_db_upgrade_config - - # Clear out all data and stuff in the database containers :shrug: - await _empty_out_database(test_db_upgrade_config) - - # Delete the config that was saved to disk by the setup - rmtree("configs/testdc") - rmtree("schemas/") - - -@pytest.mark.asyncio -@pytest_asyncio.fixture -async def setup_db_upgrade_config_public_schema_exodus(): +async def setup_db_upgrade_configs(): """ Fixture for preparing the test databases and creating a DbupgradeConfig object. This fixture will also clean up after the test (removing local files and tearing down against the DBs). """ # Create the config - test_db_upgrade_config = await _create_dbupgradeconfig(exodus=True) - - # Prepare the databases - await _prepare_databases(test_db_upgrade_config) - - yield test_db_upgrade_config - - # Clear out all data and stuff in the database containers :shrug: - await _empty_out_database(test_db_upgrade_config) - - # Delete the config that was saved to disk by the setup - rmtree("configs/testdc") - rmtree("schemas/") - - -@pytest.mark.asyncio -@pytest_asyncio.fixture -async def setup_db_upgrade_config_non_public_schema_exodus(): - """ - Same as the base fixture, but with a non-public schema and exodus-style (moving a subset of data). - """ - - # Create the config - test_db_upgrade_config = await _create_dbupgradeconfig( - non_public_schema=True, exodus=True - ) + test_configs = await _create_dbupgradeconfigs() # Prepare the databases - await _prepare_databases(test_db_upgrade_config, non_public_schema=True) + await _prepare_databases(test_configs) - yield test_db_upgrade_config + yield test_configs # Clear out all data and stuff in the database containers :shrug: - await _empty_out_database(test_db_upgrade_config) + await _empty_out_databases(test_configs) # Delete the config that was saved to disk by the setup rmtree("configs/testdc") @@ -294,15 +247,7 @@ async def setup_db_upgrade_config_non_public_schema_exodus(): # This will create the datasets. if __name__ == "__main__": - # Check if any flags were passed using argv - if "--non-public-schema" in argv: - print("Creating non-public schema dataset...") - non_public_schema = True - else: - print("Creating public schema dataset...") - non_public_schema = False - - config = asyncio.run(_create_dbupgradeconfig(non_public_schema)) - asyncio.run(_prepare_databases(config, non_public_schema)) + configs = asyncio.run(_create_dbupgradeconfigs()) + asyncio.run(_prepare_databases(configs)) print("Local databases are ready for local testing!") diff --git a/tests/integration/files/test_schema_data.sql b/tests/integration/files/test_schema_data.sql index 5f24521..0cafee4 100644 --- a/tests/integration/files/test_schema_data.sql +++ b/tests/integration/files/test_schema_data.sql @@ -25,6 +25,21 @@ CREATE TABLE public.Users ( ALTER TABLE public.Users OWNER TO owner; +-- +-- Name: users2; Type: TABLE; Schema: public; Owner: owner +-- + +CREATE TABLE public.Users2 ( + id bigint NOT NULL, + hash_firstname text NOT NULL, + hash_lastname text NOT NULL, + gender character varying(6) NOT NULL, + CONSTRAINT users_gender_check CHECK (((gender)::text = ANY (ARRAY[('male'::character varying)::text, ('female'::character varying)::text]))) +); + + +ALTER TABLE public.Users2 OWNER TO owner; + -- -- Name: users_idx; Type: INDEX; Schema: public; Owner: owner -- @@ -34,6 +49,15 @@ CREATE INDEX users_idx ON public.Users ( hash_lastname ); +-- +-- Name: users2_idx; Type: INDEX; Schema: public; Owner: owner +-- + +CREATE INDEX users2_idx ON public.Users ( + hash_firstname, + hash_lastname +); + -- -- Name: users_id_seq; Type: SEQUENCE; Schema: public; Owner: owner -- @@ -48,6 +72,20 @@ CREATE SEQUENCE public.users_id_seq ALTER TABLE public.users_id_seq OWNER TO owner; +-- +-- Name: users2_id_seq; Type: SEQUENCE; Schema: public; Owner: owner +-- + +CREATE SEQUENCE public.users2_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +ALTER TABLE public.users2_id_seq OWNER TO owner; + -- -- Data for Name: fruits; Type: TABLE DATA; Schema: public; Owner: owner -- @@ -70,49 +108,42 @@ INSERT INTO public.users (id, hash_firstname, hash_lastname, gender) -- --- Name: users_id_seq; Type: SEQUENCE SET; Schema: public; Owner: owner +-- Data for Name: Users2; Type: TABLE DATA; Schema: public; Owner: owner -- -SELECT pg_catalog.setval('public.users_id_seq', 1, false); +INSERT INTO public.users2 (id, hash_firstname, hash_lastname, gender) + VALUES (1, 'garbagefirst', 'garbagelast', 'male'), + (2, 'garbagefirst1', 'garbagelast1', 'female'), + (3, 'sdgarbagefirst', 'dgsadsrbagelast', 'male'), + (4, 'dsdssdgarbagefirst', 'dgsaggggdjjjsrbagelast', 'female'), + (5, 'dsdssdgarbagefirt', 'dgsagggdjjjsrbagelast', 'female'); -- --- Name: Users users_pkey; Type: CONSTRAINT; Schema: public; Owner: owner +-- Name: users_id_seq; Type: SEQUENCE SET; Schema: public; Owner: owner -- -ALTER TABLE ONLY public.Users - ADD CONSTRAINT users_pkey PRIMARY KEY (id); +SELECT pg_catalog.setval('public.users_id_seq', 1, false); --- Extra table with PK for testing exodus migrations -CREATE TABLE public.Users2 ( - id bigint NOT NULL, - hash_firstname text NOT NULL, - hash_lastname text NOT NULL, - gender character varying(6) NOT NULL, - CONSTRAINT users_gender_check CHECK (((gender)::text = ANY (ARRAY[('male'::character varying)::text, ('female'::character varying)::text]))) -); +-- +-- Name: users2_id_seq; Type: SEQUENCE SET; Schema: public; Owner: owner +-- -ALTER TABLE public.Users2 OWNER TO owner; +SELECT pg_catalog.setval('public.users2_id_seq', 1, false); -CREATE SEQUENCE public.users2_id_seq - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; +-- +-- Name: Users users_pkey; Type: CONSTRAINT; Schema: public; Owner: owner +-- + +ALTER TABLE ONLY public.Users + ADD CONSTRAINT users_pkey PRIMARY KEY (id); -ALTER TABLE public.users2_id_seq OWNER TO owner; -SELECT pg_catalog.setval('public.users2_id_seq', 1, false); +-- +-- Name: Users users_pkey; Type: CONSTRAINT; Schema: public; Owner: owner +-- ALTER TABLE ONLY public.Users2 ADD CONSTRAINT users2_pkey PRIMARY KEY (id); - -INSERT INTO public.users2 (id, hash_firstname, hash_lastname, gender) - VALUES (1, 'garbagefirst', 'garbagelast', 'male'), - (2, 'garbagefirst1', 'garbagelast1', 'female'), - (3, 'sdgarbagefirst', 'dgsadsrbagelast', 'male'), - (4, 'dsdssdgarbagefirst', 'dgsaggggdjjjsrbagelast', 'female'), - (5, 'dsdssdgarbagefirt', 'dgsagggdjjjsrbagelast', 'female'); diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 915211e..940af1c 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -4,19 +4,30 @@ from unittest.mock import AsyncMock from unittest.mock import Mock from pgbelt.util.dump import _parse_dump_commands +from pgbelt.config.models import DbupgradeConfig + +import asyncio import pgbelt import pytest -async def _check_status(config, src_dst_status, dst_src_status): +async def _check_status( + configs: dict[str, DbupgradeConfig], src_dst_status: str, dst_src_status: str +): + # Check status and make sure all are in the correct state + # ALL sets must match the src_dst_status and dst_src_status + + dc = list(configs.values())[0].dc + num_configs = len(configs.keys()) + # Sleep 1, repeat until target status is seen. pgbelt.cmd.status.echo = Mock() - not_replicating = True + status_reached = False i = 4 - while not_replicating and i > 0: + while not status_reached and i > 0: sleep(1) - await pgbelt.cmd.status.status(db=None, dc=config.dc) + await pgbelt.cmd.status.status(db=None, dc=dc) status_echo_call_arg = pgbelt.cmd.status.echo.call_args[0][0] @@ -25,125 +36,201 @@ async def _check_status(config, src_dst_status, dst_src_status): rf"^\S+\s+\S+{src_dst_status}\S+\s+\S+{dst_src_status}.*", status_echo_call_arg.split("\n")[2], ) - if len(matches) == 1: - not_replicating = False + if len(matches) == num_configs: + status_reached = True elif i > 0: i = i - 1 else: raise AssertionError( - f"Timed out waiting for src->dst: {src_dst_status}, dst->src: {dst_src_status} state. Ended with: {status_echo_call_arg}" + f"Timed out waiting for src->dst: {src_dst_status}, dst->src: {dst_src_status} state across {num_configs} configs. Ended with: {status_echo_call_arg}" ) -async def _test_check_connectivity(config): - # Run check_connectivity and make sure all green, no red +async def _test_check_connectivity(configs: dict[str, DbupgradeConfig]): + # Run check_connectivity and make sure all green, no rec pgbelt.cmd.convenience.echo = Mock() - await pgbelt.cmd.convenience.check_connectivity(db=config.db, dc=config.dc) + await pgbelt.cmd.convenience.check_connectivity( + db=None, dc=configs[list(configs.keys())[0]].dc + ) check_connectivity_echo_call_arg = pgbelt.cmd.convenience.echo.call_args[0][0] assert "\x1b[31m" not in check_connectivity_echo_call_arg - await _check_status(config, "unconfigured", "unconfigured") + await _check_status(configs, "unconfigured", "unconfigured") -async def _test_precheck(config): +async def _test_precheck(configs: dict[str, DbupgradeConfig]): # Run precheck and make sure all green, no red pgbelt.cmd.preflight.echo = Mock() - await pgbelt.cmd.preflight.precheck(db=config.db, dc=config.dc) + await pgbelt.cmd.preflight.precheck(db=None, dc=configs[list(configs.keys())[0]].dc) preflight_echo_call_arg = pgbelt.cmd.preflight.echo.call_args[0][0] assert "\x1b[31m" not in preflight_echo_call_arg - await _check_status(config, "unconfigured", "unconfigured") + await _check_status(configs, "unconfigured", "unconfigured") -async def _test_setup(config): - # Run Setup - await pgbelt.cmd.setup.setup(db=config.db, dc=config.dc) +async def _test_setup(configs: dict[str, DbupgradeConfig]): + # Run Setup on the dc of the first config to run against all DBs in that dc + await pgbelt.cmd.setup.setup(db=None, dc=configs[list(configs.keys())[0]].dc) - # Ensure Schema in the destination doesn't have NOT VALID, no Indexes - p = subprocess.Popen( - ["pg_dump", "-s", config.dst.root_dsn], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - out, err = p.communicate() + # Ensure Schema in the destination doesn't have NOT VALID, no Indexes across all DB pairs + dst_dumps = await _get_dumps(configs) - commands_raw = _parse_dump_commands(out.decode("utf-8")) - for c in commands_raw: - assert "NOT VALID" not in c - assert "INDEX" not in c + # Format of dumps: {setname: stdout} + for setname, stdout in dst_dumps.items(): + commands_raw = _parse_dump_commands(stdout.decode("utf-8")) + print( + f"Test Setup: checking {setname} for NOT VALID and INDEXES in destination schema..." + ) + for c in commands_raw: + assert "NOT VALID" not in c + assert "INDEX" not in c - await _check_status(config, "replicating", "unconfigured") + await _check_status(configs, "replicating", "unconfigured") -async def _test_setup_back_replication(config): +async def _test_setup_back_replication(configs: dict[str, DbupgradeConfig]): # Set up back replication - await pgbelt.cmd.setup.setup_back_replication(db=config.db, dc=config.dc) + await pgbelt.cmd.setup.setup_back_replication( + db=None, dc=configs[list(configs.keys())[0]].dc + ) - await _check_status(config, "replicating", "replicating") + await _check_status(configs, "replicating", "replicating") -async def _test_create_indexes(config): +async def _test_create_indexes(configs: dict[str, DbupgradeConfig]): # Load in Indexes - await pgbelt.cmd.schema.create_indexes(db=config.db, dc=config.dc) + await pgbelt.cmd.schema.create_indexes( + db=None, dc=configs[list(configs.keys())[0]].dc + ) # Ensure Schema in the destination has Indexes - p = subprocess.Popen( - ["pg_dump", "-s", config.dst.root_dsn], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + + dst_dumps = await _get_dumps(configs) + + # Format of dumps: {setname: stdout} + for setname, stdout in dst_dumps.items(): + + print( + f"Test Create-Indexes: checking {setname} for INDEXES in destination schema..." + ) + + commands_raw = _parse_dump_commands(stdout.decode("utf-8")) + index_exists = False + for c in commands_raw: + if "INDEX" in c: + index_exists = True + break + assert index_exists + + await _check_status(configs, "replicating", "replicating") + + +async def _test_analyze(configs: dict[str, DbupgradeConfig]): + await pgbelt.cmd.sync.analyze(db=None, dc=configs[list(configs.keys())[0]].dc) + + # TODO: test that ANALYZE was run on the destination + + await _check_status(configs, "replicating", "replicating") + + +async def _test_revoke_logins(configs: dict[str, DbupgradeConfig]): + await pgbelt.cmd.login.revoke_logins( + db=None, dc=configs[list(configs.keys())[0]].dc ) - out, err = p.communicate() - commands_raw = _parse_dump_commands(out.decode("utf-8")) - index_exists = False - for c in commands_raw: - if "INDEX" in c: - index_exists = True - break - assert index_exists + # TODO: test that appropriate login roles were revoked - await _check_status(config, "replicating", "replicating") + await _check_status(configs, "replicating", "replicating") -async def _test_analyze(config): - await pgbelt.cmd.sync.analyze(db=config.db, dc=config.dc) +async def _test_teardown_forward_replication(configs: dict[str, DbupgradeConfig]): + await pgbelt.cmd.teardown.teardown_forward_replication( + db=None, dc=configs[list(configs.keys())[0]].dc + ) - await _check_status(config, "replicating", "replicating") + await _check_status(configs, "unconfigured", "replicating") -async def _test_revoke_logins(config): - await pgbelt.cmd.login.revoke_logins(db=config.db, dc=config.dc) +async def _test_sync(configs: dict[str, DbupgradeConfig]): + await pgbelt.cmd.sync.sync(db=None, dc=configs[list(configs.keys())[0]].dc) - await _check_status(config, "replicating", "replicating") + # TODO: test that the appropriate sync steps were run + await _check_status(configs, "unconfigured", "replicating") -async def _test_teardown_forward_replication(config): - await pgbelt.cmd.teardown.teardown_forward_replication(db=config.db, dc=config.dc) - await _check_status(config, "unconfigured", "replicating") +async def _get_dumps( + configs: dict[str, DbupgradeConfig], src: bool = False +) -> dict[str, str]: + """ + Get the full dumps for the source or destination databases using pg_dump. + Default is destination. + """ + std_kwargs = { + "stdin": subprocess.PIPE, + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + } + + # For each set of DBs, run pg_dump -s against the destination + if src: + dump_processes = await asyncio.gather( + *[ + asyncio.create_subprocess_exec( + "pg_dump", + configs[setname].src.root_dsn, + **std_kwargs, + ) + for setname in configs.keys() + ] + ) + else: # Default is destination + dump_processes = await asyncio.gather( + *[ + asyncio.create_subprocess_exec( + "pg_dump", + configs[setname].dst.root_dsn, + **std_kwargs, + ) + for setname in configs.keys() + ] + ) -async def _test_sync(config): - await pgbelt.cmd.sync.sync(db=config.db, dc=config.dc) + await asyncio.gather(*[d.wait() for d in dump_processes]) - await _check_status(config, "unconfigured", "replicating") + # get STDOUT for each dump + # Format of dumps: {setname: stdout} + return { + setname: (await d.communicate())[0] + for setname, d in zip(configs.keys(), dump_processes) + } -async def _ensure_same_data(config): +async def _filter_dump(dump: str, keywords_to_exclude: list[str]): + commands_raw = _parse_dump_commands(dump) + commands = [] + for c in commands_raw: + add_command = True + for k in keywords_to_exclude: + if k in c: + add_command = False + break + if add_command: + commands.append(c) + return "\n".join(commands) + + +async def _ensure_same_data(configs: dict[str, DbupgradeConfig]): # Dump the databases and ensure they're the same # Unfortunately except for the sequence lines because for some reason, the dump in the source is_called is true, yet on the destination is false. # Verified in the code we set it with is_called=True, so not sure what's going on there. # ------------------------------------------------------------------ - p = subprocess.Popen( - ["pg_dump", config.src.root_dsn], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - out, err = p.communicate() + # Get all the SRC and DST Dumps + # Format of dumps: {setname: stdout} + src_dumps = await _get_dumps(configs, src=True) + dst_dumps = await _get_dumps(configs) keywords_to_exclude = [ "EXTENSION ", @@ -156,54 +243,105 @@ async def _ensure_same_data(config): "CREATE SCHEMA", ] - commands_raw = _parse_dump_commands(out.decode("utf-8")) - commands = [] - for c in commands_raw: - add_command = True - for k in keywords_to_exclude: - if k in c: - add_command = False - break - if add_command: - commands.append(c) - source_dump = "\n".join(commands) + # First, asynchronously filter out the keywords from the source dumps - p = subprocess.Popen( - ["pg_dump", config.dst.root_dsn], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + # Run the filter_dump function on each dump asynchronously + src_dumps_filtered = await asyncio.gather( + *[ + _filter_dump(dump.decode("utf-8"), keywords_to_exclude) + for dump in src_dumps.values() + ] ) - out, err = p.communicate() - commands_raw = _parse_dump_commands(out.decode("utf-8")) - commands = [] - for c in commands_raw: - add_command = True - for k in keywords_to_exclude: - if k in c: - add_command = False - break - if add_command: - commands.append(c) - dest_dump = "\n".join(commands) + # Then, asynchronously filter out the keywords from the destination dumps + dst_dumps_filtered = await asyncio.gather( + *[ + _filter_dump(dump.decode("utf-8"), keywords_to_exclude) + for dump in dst_dumps.values() + ] + ) - assert source_dump == dest_dump + # Note: the asyncio gathers will return a list of the filtered dumps in the same order as the input dumps + # So we can safely say that the ith element of each list corresponds to the same set of DBs + + # Ensure the filtered dumps are the same + for i in range(len(src_dumps_filtered)): + setname = list(configs.keys())[i] + + # Only the targeted tables should match in exodus-style migrations + if "exodus" in setname: + + # In a real exodus migration, only the schema related to the targeted tables will probably exist. + # But in our integration testing, we just copy the entire schema yet just copy only the targeted data. + + # Given this, the only thing to really check is that the targeted data is the same. Even the schema and structure is not the responsibility of pgbelt. + + src_dump = src_dumps_filtered[i] + dst_dump = dst_dumps_filtered[i] + + # Only get the COPY lines for the targeted tables in the dumps. + # COPY format: + # COPY non_public_schema.users (id, hash_firstname, hash_lastname, gender) FROM stdin; + + # 1 garbagefirst garbagelast male + # 2 garbagefirst1 garbagelast1 female + # 3 sdgarbagefirst dgsadsrbagelast male + # 4 dsdssdgarbagefirst dgsaggggdjjjsrbagelast female + # 5 dsdssdgarbagefirt dgsagggdjjjsrbagelast female + # \. + + src_table_data = {} + for table in configs[setname].tables: + src_table_data[table] = "" + for line in src_dump.split("\n"): + if f"COPY {configs[setname].schema_name}.{table}" in line: + src_table_data[table] = src_table_data[table] + line + "\n" + elif len(src_table_data[table]) > 0: + src_table_data[table] = src_table_data[table] + line + "\n" + if line == "\.": + break + dst_table_data = {} + for table in configs[setname].tables: + dst_table_data[table] = "" + for line in dst_dump.split("\n"): + if f"COPY {configs[setname].schema_name}.{table}" in line: + dst_table_data[table] = dst_table_data[table] + line + "\n" + elif len(dst_table_data[table]) > 0: + dst_table_data[table] = dst_table_data[table] + line + "\n" + if line == "\.": + break + + # Ensure the targeted data is the same + for table in configs[setname].tables: + print( + f"Ensuring {setname} source and destination data for table {table} are the same..." + ) + assert src_table_data[table] == dst_table_data[table] + else: + print(f"Ensuring {setname} source and destination dumps are the same...") + assert src_dumps_filtered[i] == dst_dumps_filtered[i] -async def _test_teardown_not_full(config): - await pgbelt.cmd.teardown.teardown(db=config.db, dc=config.dc) - await _check_status(config, "unconfigured", "unconfigured") +async def _test_teardown_not_full(configs: dict[str, DbupgradeConfig]): + await pgbelt.cmd.teardown.teardown(db=None, dc=configs[list(configs.keys())[0]].dc) + # TODO: test that the appropriate teardown steps were run for a non-full teardown -async def _test_teardown_full(config): - await pgbelt.cmd.teardown.teardown(db=config.db, dc=config.dc, full=True) + await _check_status(configs, "unconfigured", "unconfigured") - await _check_status(config, "unconfigured", "unconfigured") +async def _test_teardown_full(configs: dict[str, DbupgradeConfig]): + await pgbelt.cmd.teardown.teardown( + db=None, dc=configs[list(configs.keys())[0]].dc, full=True + ) -async def _test_main_workflow(config): + # TODO: test that the appropriate teardown steps were run for a full teardown + + await _check_status(configs, "unconfigured", "unconfigured") + + +async def _test_main_workflow(configs: dict[str, DbupgradeConfig]): """ Run the following commands in order: @@ -219,49 +357,27 @@ async def _test_main_workflow(config): belt teardown testdc --full """ - await _test_check_connectivity(config) - await _test_precheck(config) - await _test_setup(config) - await _test_setup_back_replication(config) - await _test_create_indexes(config) - await _test_analyze(config) - await _test_revoke_logins(config) - await _test_teardown_forward_replication(config) - await _test_sync(config) + await _test_check_connectivity(configs) + await _test_precheck(configs) + await _test_setup(configs) + await _test_setup_back_replication(configs) + await _test_create_indexes(configs) + await _test_analyze(configs) + await _test_revoke_logins(configs) + await _test_teardown_forward_replication(configs) + await _test_sync(configs) # Check if the data is the same before testing teardown - await _ensure_same_data(config) - - await _test_teardown_not_full(config) - await _test_teardown_full(config) + await _ensure_same_data(configs) - -# Run the main integration test with objects in the public schema -@pytest.mark.asyncio -async def test_main_workflow_public_schema(setup_db_upgrade_config_public_schema): - - await _test_main_workflow(setup_db_upgrade_config_public_schema) + await _test_teardown_not_full(configs) + await _test_teardown_full(configs) -# Run the main integration test with objects in a non-public schema +# Run the main integration test. +# 4 sets of DBs are created: public vs non-public schema, and exodus-style vs full migration. +# Use pgbelt's native async parallelization to run the main workflow on the total set of DBs. @pytest.mark.asyncio -async def test_main_workflow_non_public_schema( - setup_db_upgrade_config_non_public_schema, -): - - await _test_main_workflow(setup_db_upgrade_config_non_public_schema) - - -# TODO: fix up the exodus-style tests, the dump comparision step obviously fails because not all data was moved. -# # Run the main integration test with objects in a non-public schema and exodus-style (moving a subset of data) -# @pytest.mark.asyncio -# async def test_main_workflow_public_schema_exodus(setup_db_upgrade_config_public_schema_exodus): - -# await _test_main_workflow(setup_db_upgrade_config_public_schema_exodus) - -# TODO: fix up the exodus-style tests, the dump comparision step obviously fails because not all data was moved. -# # Run the main integration test with objects in a non-public schema and exodus-style (moving a subset of data) -# @pytest.mark.asyncio -# async def test_main_workflow_non_public_schema_exodus(setup_db_upgrade_config_non_public_schema_exodus): +async def test_main_workflow(setup_db_upgrade_configs): -# await _test_main_workflow(setup_db_upgrade_config_non_public_schema_exodus) + await _test_main_workflow(setup_db_upgrade_configs)