From ab79ac973ad18213f4155066871830048f144e85 Mon Sep 17 00:00:00 2001 From: Varjitt Jeeva Date: Thu, 22 Feb 2024 11:21:23 -0500 Subject: [PATCH 1/5] test: fix local-dev due to poetry venv location --- Dockerfile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Dockerfile b/Dockerfile index 2bf521e..02cdd52 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,6 @@ FROM python:3.11-slim +ENV VIRTUAL_ENV=/opt/venv +ENV PATH="$VIRTUAL_ENV/bin:$PATH" COPY ./ /opt/pgbelt WORKDIR /opt/pgbelt @@ -8,6 +10,7 @@ RUN set -e \ && apt-get -y install gcc RUN set -e \ + && python -m venv $VIRTUAL_ENV \ && python -m pip install --upgrade pip \ && pip install poetry poetry-dynamic-versioning \ && poetry install From 8a2b152bcbe7567030ba3e775cd79344b3906ba5 Mon Sep 17 00:00:00 2001 From: Varjitt Jeeva Date: Thu, 22 Feb 2024 16:14:18 -0500 Subject: [PATCH 2/5] feat: support for sizes in status to target config dataset instead of whole DB --- .gitignore | 1 + local_dev_scripts/README.md | 15 ++++ local_dev_scripts/generate_large_test_data.py | 24 ++++++ pgbelt/cmd/status.py | 29 ++++--- pgbelt/util/pglogical.py | 4 +- pgbelt/util/postgres.py | 78 +++++++++++++++---- tests/integration/files/test_schema_data.sql | 33 ++++++++ 7 files changed, 155 insertions(+), 29 deletions(-) create mode 100644 local_dev_scripts/README.md create mode 100644 local_dev_scripts/generate_large_test_data.py diff --git a/.gitignore b/.gitignore index 8fd25d2..6d8ca84 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ build/* dist/* logs/* configs/integrationtest-datacenter/* +schemas/* .python-version .mypy_cache __pycache__/ diff --git a/local_dev_scripts/README.md b/local_dev_scripts/README.md new file mode 100644 index 0000000..b44536d --- /dev/null +++ b/local_dev_scripts/README.md @@ -0,0 +1,15 @@ +# Local Development Scripts + +This section of the repository will contain scripts to aid with development in `pgbelt`. + +## generate_large_test_data.py + +This script simply generates chunks of INSERTs to the integration test `users` table. It will return a large string. + +For easy use, simply redirect output to a file, then load it into your database yourself via PSQL. + +``` +python3 generate_large_test_data.py > extra_data.sql +``` + +NOTE: The existing parameters in the script generate a 5GB SQL file and 10000MB of on-disk data to use. This could overwhelm your laptop's Docker engine (you might need to bump your Docker engine allocated memory). diff --git a/local_dev_scripts/generate_large_test_data.py b/local_dev_scripts/generate_large_test_data.py new file mode 100644 index 0000000..eb1bb79 --- /dev/null +++ b/local_dev_scripts/generate_large_test_data.py @@ -0,0 +1,24 @@ +# Script to echo Postgres lines for garbage test data +# Useful for local development where you want to test with a large dataset +# Need to chunk inserts otherwise the query goes too large for the docker container to handle. + +set_size = 100000 +num_sets = 1000 +set_num = 0 +while set_num < num_sets: + num = 0 + print( + """ +INSERT INTO public.users (id, hash_firstname, hash_lastname, gender) + VALUES + """ + ) + while num < set_size - 1: + print( + f" ({set_num * set_size + num}, 'dsdssdgarbagefirst', 'dgsaggggdjj', 'male')," + ) + num = num + 1 + print( + f" ({set_num * set_size + num}, 'dsdssdgarbagefirst', 'dgsaggggdjj', 'male');" + ) + set_num = set_num + 1 diff --git a/pgbelt/cmd/status.py b/pgbelt/cmd/status.py index dba7ddf..5743710 100644 --- a/pgbelt/cmd/status.py +++ b/pgbelt/cmd/status.py @@ -7,7 +7,8 @@ from pgbelt.util import get_logger from pgbelt.util.pglogical import dst_status from pgbelt.util.pglogical import src_status -from pgbelt.util.postgres import initialization_status +from pgbelt.util.postgres import initialization_progress +from pgbelt.util.postgres import analyze_table_pkeys from tabulate import tabulate from typer import echo from typer import style @@ -23,8 +24,8 @@ async def _print_status_table(results: list[dict[str, str]]) -> list[list[str]]: style("flush_lag", "yellow"), style("write_lag", "yellow"), style("replay_lag", "yellow"), - style("src_db_size", "yellow"), - style("dst_db_size", "yellow"), + style("src_dataset_size", "yellow"), + style("dst_dataset_size", "yellow"), style("progress", "yellow"), ] ] @@ -45,8 +46,8 @@ async def _print_status_table(results: list[dict[str, str]]) -> list[list[str]]: style(r["flush_lag"], "green" if r["flush_lag"] == "0" else "red"), style(r["write_lag"], "green" if r["write_lag"] == "0" else "red"), style(r["replay_lag"], "green" if r["replay_lag"] == "0" else "red"), - style(r["src_db_size"], "green"), - style(r["dst_db_size"], "green"), + style(r["src_dataset_size"], "green"), + style(r["dst_dataset_size"], "green"), style(r["progress"], "green"), ] ) @@ -77,7 +78,6 @@ async def status(conf_future: Awaitable[DbupgradeConfig]) -> dict[str, str]: conf = await conf_future src_logger = get_logger(conf.db, conf.dc, "status.src") dst_logger = get_logger(conf.db, conf.dc, "status.dst") - initialization_logger = get_logger(conf.db, conf.dc, "status.initialization") pools = await gather( create_pool(dsn=conf.src.root_uri, min_size=1), @@ -85,20 +85,27 @@ async def status(conf_future: Awaitable[DbupgradeConfig]) -> dict[str, str]: ) src_pool, dst_pool = pools + # Get the list of targeted tables by first getting all tables, then filtering whatever is in the config. + pkey_tables, non_pkey_tables, _ = await analyze_table_pkeys(src_pool, src_logger) + all_tables = pkey_tables + non_pkey_tables + target_tables = all_tables + if conf.tables: + target_tables = [t for t in all_tables if t in conf.tables] + try: result = await gather( src_status(src_pool, src_logger), dst_status(dst_pool, dst_logger), - initialization_status( - conf.src.db, conf.dst.db, src_pool, dst_pool, initialization_logger + initialization_progress( + target_tables, src_pool, dst_pool, src_logger, dst_logger ), ) result[0].update(result[1]) result[0]["db"] = conf.db - if result[0]["pg1_pg2"] != "initializing": - result[2]["src_db_size"] = "n/a" - result[2]["dst_db_size"] = "n/a" + if result[0]["pg1_pg2"] == "replicating": + result[2]["src_dataset_size"] = "n/a" + result[2]["dst_dataset_size"] = "n/a" result[2]["progress"] = "n/a" result[0].update(result[2]) return result[0] diff --git a/pgbelt/util/pglogical.py b/pgbelt/util/pglogical.py index 04650fa..ba5bdf4 100644 --- a/pgbelt/util/pglogical.py +++ b/pgbelt/util/pglogical.py @@ -202,7 +202,7 @@ async def teardown_replication_set(pool: Pool, logger: Logger) -> None: """ Tear down the replication_set """ - logger.info("Dropping replication set 'default'...") + logger.info("Dropping replication set 'pgbelt'...") async with pool.acquire() as conn: async with conn.transaction(): try: @@ -213,7 +213,7 @@ async def teardown_replication_set(pool: Pool, logger: Logger) -> None: UndefinedFunctionError, InternalServerError, ): - logger.debug("Replication set 'default' does not exist") + logger.debug("Replication set 'pgbelt' does not exist") async def revoke_pgl(pool: Pool, tables: list[str], logger: Logger) -> None: diff --git a/pgbelt/util/postgres.py b/pgbelt/util/postgres.py index b23e115..8166c1c 100644 --- a/pgbelt/util/postgres.py +++ b/pgbelt/util/postgres.py @@ -4,6 +4,8 @@ from asyncpg import Record from asyncpg.exceptions import UndefinedObjectError +from typer import echo + async def dump_sequences( pool: Pool, targeted_sequences: list[str], logger: Logger @@ -378,33 +380,77 @@ async def precheck_info( return result -async def get_db_size(db: str, pool: Pool, logger: Logger) -> str: +# TODO: Need to add schema here when working on non-public schema support. +async def get_dataset_size( + tables: list[str], pool: Pool, logger: Logger, schema: str = "public" +) -> str: + """ + Get the total disk size of a dataset (via list of tables) + """ + logger.info(f"Getting the targeted dataset size...") + + # Tables string must be of form "'table1', 'table2', ..." + tables_string = ", ".join([f"'{t}'" for t in tables]) + + query = f""" + SELECT + sum(pg_total_relation_size(schemaname || '.' || tablename)) AS total_relation_size + FROM + pg_tables + WHERE + schemaname = '{schema}' + AND tablename IN ({tables_string}); """ - Get the DB size + print(query) + + # Yes it's a duplicate, but it's a pretty one. Rather let Postgres do this than Python. + pretty_query = f""" + SELECT + pg_size_pretty(sum(pg_total_relation_size(schemaname || '.' || tablename))) AS total_relation_size + FROM + pg_tables + WHERE + schemaname = '{schema}' + AND tablename IN ({tables_string}); """ - logger.info("Getting the DB size...") + result = { - "db_size": await pool.fetchval(f"SELECT pg_database_size('{db}');"), - "db_size_pretty": await pool.fetchval( - f"SELECT pg_size_pretty( pg_database_size('{db}') );" - ), + "db_size": await pool.fetchval(query), + "db_size_pretty": await pool.fetchval(pretty_query), } return result -async def initialization_status( - db_src: str, db_dst: str, src_pool: Pool, dst_pool: Pool, logger: Logger +async def initialization_progress( + tables: list[str], + src_pool: Pool, + dst_pool: Pool, + src_logger: Logger, + dst_logger: Logger, ) -> dict[str, str]: """ - Get the status of the initialization stage + Get the size progress of the initialization stage """ - logger.info("checking status of the initialization stage...") - src_db_size = await get_db_size(db_src, src_pool, logger) - dst_db_size = await get_db_size(db_dst, dst_pool, logger) + + src_dataset_size = await get_dataset_size(tables, src_pool, src_logger) + dst_dataset_size = await get_dataset_size(tables, dst_pool, dst_logger) + + # Eliminate None values + if src_dataset_size["db_size"] == None: + src_dataset_size["db_size"] = 0 + if dst_dataset_size["db_size"] == None: + dst_dataset_size["db_size"] = 0 + + # Eliminate division by zero + if src_dataset_size["db_size"] == 0 and dst_dataset_size["db_size"] == 0: + progress = "0 %" + else: + progress = f"{str(round(int(dst_dataset_size['db_size'])/int(src_dataset_size['db_size'])*100 ,1))} %" + status = { - "src_db_size": src_db_size["db_size_pretty"], - "dst_db_size": dst_db_size["db_size_pretty"], - "progress": f"{str(round(int(dst_db_size['db_size'])/int(src_db_size['db_size'])*100 ,1))} %", + "src_dataset_size": src_dataset_size["db_size_pretty"] or "0 bytes", + "dst_dataset_size": dst_dataset_size["db_size_pretty"] or "0 bytes", + "progress": progress, } return status diff --git a/tests/integration/files/test_schema_data.sql b/tests/integration/files/test_schema_data.sql index d41cd76..90a8fa9 100644 --- a/tests/integration/files/test_schema_data.sql +++ b/tests/integration/files/test_schema_data.sql @@ -82,3 +82,36 @@ SELECT pg_catalog.setval('public.users_id_seq', 1, false); ALTER TABLE ONLY public.Users ADD CONSTRAINT users_pkey PRIMARY KEY (id); + +CREATE TABLE public.Users2 ( + id bigint NOT NULL, + hash_firstname text NOT NULL, + hash_lastname text NOT NULL, + gender character varying(6) NOT NULL, + CONSTRAINT users_gender_check CHECK (((gender)::text = ANY (ARRAY[('male'::character varying)::text, ('female'::character varying)::text]))) +); + + +ALTER TABLE public.Users2 OWNER TO owner; + +CREATE SEQUENCE public.users2_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +ALTER TABLE public.users2_id_seq OWNER TO owner; + +SELECT pg_catalog.setval('public.users2_id_seq', 1, false); + +ALTER TABLE ONLY public.Users2 + ADD CONSTRAINT users2_pkey PRIMARY KEY (id); + +INSERT INTO public.users2 (id, hash_firstname, hash_lastname, gender) + VALUES (1, 'garbagefirst', 'garbagelast', 'male'), + (2, 'garbagefirst1', 'garbagelast1', 'female'), + (3, 'sdgarbagefirst', 'dgsadsrbagelast', 'male'), + (4, 'dsdssdgarbagefirst', 'dgsaggggdjjjsrbagelast', 'female'), + (5, 'dsdssdgarbagefirt', 'dgsagggdjjjsrbagelast', 'female'); From d5becc2a4f972f3a47a66fa3f14049ec3adb9bb7 Mon Sep 17 00:00:00 2001 From: Varjitt Jeeva Date: Thu, 22 Feb 2024 16:17:29 -0500 Subject: [PATCH 3/5] fix: remove test print lines and add comments --- local_dev_scripts/generate_large_test_data.py | 3 +++ pgbelt/util/postgres.py | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/local_dev_scripts/generate_large_test_data.py b/local_dev_scripts/generate_large_test_data.py index eb1bb79..34ed943 100644 --- a/local_dev_scripts/generate_large_test_data.py +++ b/local_dev_scripts/generate_large_test_data.py @@ -2,6 +2,9 @@ # Useful for local development where you want to test with a large dataset # Need to chunk inserts otherwise the query goes too large for the docker container to handle. +# NOTE: The existing parameters in the script generate a 5GB SQL file and 10000MB of on-disk data to use. +# This could overwhelm your laptop's Docker engine (you might need to bump your Docker engine allocated memory). + set_size = 100000 num_sets = 1000 set_num = 0 diff --git a/pgbelt/util/postgres.py b/pgbelt/util/postgres.py index 8166c1c..f139cd5 100644 --- a/pgbelt/util/postgres.py +++ b/pgbelt/util/postgres.py @@ -4,8 +4,6 @@ from asyncpg import Record from asyncpg.exceptions import UndefinedObjectError -from typer import echo - async def dump_sequences( pool: Pool, targeted_sequences: list[str], logger: Logger @@ -401,7 +399,6 @@ async def get_dataset_size( schemaname = '{schema}' AND tablename IN ({tables_string}); """ - print(query) # Yes it's a duplicate, but it's a pretty one. Rather let Postgres do this than Python. pretty_query = f""" From 8de2d78f6990f436a219da541f8b77cf8a2794d0 Mon Sep 17 00:00:00 2001 From: Varjitt Jeeva Date: Thu, 22 Feb 2024 16:19:07 -0500 Subject: [PATCH 4/5] fix: add comments --- tests/integration/files/test_schema_data.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/files/test_schema_data.sql b/tests/integration/files/test_schema_data.sql index 90a8fa9..5f24521 100644 --- a/tests/integration/files/test_schema_data.sql +++ b/tests/integration/files/test_schema_data.sql @@ -83,6 +83,7 @@ SELECT pg_catalog.setval('public.users_id_seq', 1, false); ALTER TABLE ONLY public.Users ADD CONSTRAINT users_pkey PRIMARY KEY (id); +-- Extra table with PK for testing exodus migrations CREATE TABLE public.Users2 ( id bigint NOT NULL, hash_firstname text NOT NULL, From 92e7965d46062b263cfd4216578609e07c11d77d Mon Sep 17 00:00:00 2001 From: Varjitt Jeeva Date: Thu, 22 Feb 2024 16:26:13 -0500 Subject: [PATCH 5/5] fix: address flake8 --- pgbelt/util/postgres.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pgbelt/util/postgres.py b/pgbelt/util/postgres.py index f139cd5..4abf0fa 100644 --- a/pgbelt/util/postgres.py +++ b/pgbelt/util/postgres.py @@ -385,7 +385,7 @@ async def get_dataset_size( """ Get the total disk size of a dataset (via list of tables) """ - logger.info(f"Getting the targeted dataset size...") + logger.info("Getting the targeted dataset size...") # Tables string must be of form "'table1', 'table2', ..." tables_string = ", ".join([f"'{t}'" for t in tables]) @@ -434,9 +434,9 @@ async def initialization_progress( dst_dataset_size = await get_dataset_size(tables, dst_pool, dst_logger) # Eliminate None values - if src_dataset_size["db_size"] == None: + if src_dataset_size["db_size"] is None: src_dataset_size["db_size"] = 0 - if dst_dataset_size["db_size"] == None: + if dst_dataset_size["db_size"] is None: dst_dataset_size["db_size"] = 0 # Eliminate division by zero