Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sizes in status to should target config dataset instead of whole DB #395

Merged
merged 5 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ build/*
dist/*
logs/*
configs/integrationtest-datacenter/*
schemas/*
.python-version
.mypy_cache
__pycache__/
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
FROM python:3.11-slim
ENV VIRTUAL_ENV=/opt/venv
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
COPY ./ /opt/pgbelt
WORKDIR /opt/pgbelt

Expand All @@ -8,6 +10,7 @@ RUN set -e \
&& apt-get -y install gcc

RUN set -e \
&& python -m venv $VIRTUAL_ENV \
&& python -m pip install --upgrade pip \
&& pip install poetry poetry-dynamic-versioning \
&& poetry install
15 changes: 15 additions & 0 deletions local_dev_scripts/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Local Development Scripts

This section of the repository will contain scripts to aid with development in `pgbelt`.

## generate_large_test_data.py

This script simply generates chunks of INSERTs to the integration test `users` table. It will return a large string.

For easy use, simply redirect output to a file, then load it into your database yourself via PSQL.

```
python3 generate_large_test_data.py > extra_data.sql
```

NOTE: The existing parameters in the script generate a 5GB SQL file and 10000MB of on-disk data to use. This could overwhelm your laptop's Docker engine (you might need to bump your Docker engine allocated memory).
27 changes: 27 additions & 0 deletions local_dev_scripts/generate_large_test_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Script to echo Postgres lines for garbage test data
# Useful for local development where you want to test with a large dataset
# Need to chunk inserts otherwise the query goes too large for the docker container to handle.

# NOTE: The existing parameters in the script generate a 5GB SQL file and 10000MB of on-disk data to use.
# This could overwhelm your laptop's Docker engine (you might need to bump your Docker engine allocated memory).

set_size = 100000
num_sets = 1000
set_num = 0
while set_num < num_sets:
num = 0
print(
"""
INSERT INTO public.users (id, hash_firstname, hash_lastname, gender)
VALUES
"""
)
while num < set_size - 1:
print(
f" ({set_num * set_size + num}, 'dsdssdgarbagefirst', 'dgsaggggdjj', 'male'),"
)
num = num + 1
print(
f" ({set_num * set_size + num}, 'dsdssdgarbagefirst', 'dgsaggggdjj', 'male');"
)
set_num = set_num + 1
29 changes: 18 additions & 11 deletions pgbelt/cmd/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from pgbelt.util import get_logger
from pgbelt.util.pglogical import dst_status
from pgbelt.util.pglogical import src_status
from pgbelt.util.postgres import initialization_status
from pgbelt.util.postgres import initialization_progress
from pgbelt.util.postgres import analyze_table_pkeys
from tabulate import tabulate
from typer import echo
from typer import style
Expand All @@ -23,8 +24,8 @@ async def _print_status_table(results: list[dict[str, str]]) -> list[list[str]]:
style("flush_lag", "yellow"),
style("write_lag", "yellow"),
style("replay_lag", "yellow"),
style("src_db_size", "yellow"),
style("dst_db_size", "yellow"),
style("src_dataset_size", "yellow"),
style("dst_dataset_size", "yellow"),
style("progress", "yellow"),
]
]
Expand All @@ -45,8 +46,8 @@ async def _print_status_table(results: list[dict[str, str]]) -> list[list[str]]:
style(r["flush_lag"], "green" if r["flush_lag"] == "0" else "red"),
style(r["write_lag"], "green" if r["write_lag"] == "0" else "red"),
style(r["replay_lag"], "green" if r["replay_lag"] == "0" else "red"),
style(r["src_db_size"], "green"),
style(r["dst_db_size"], "green"),
style(r["src_dataset_size"], "green"),
style(r["dst_dataset_size"], "green"),
style(r["progress"], "green"),
]
)
Expand Down Expand Up @@ -77,28 +78,34 @@ async def status(conf_future: Awaitable[DbupgradeConfig]) -> dict[str, str]:
conf = await conf_future
src_logger = get_logger(conf.db, conf.dc, "status.src")
dst_logger = get_logger(conf.db, conf.dc, "status.dst")
initialization_logger = get_logger(conf.db, conf.dc, "status.initialization")

pools = await gather(
create_pool(dsn=conf.src.root_uri, min_size=1),
create_pool(dsn=conf.dst.root_uri, min_size=1),
)
src_pool, dst_pool = pools

# Get the list of targeted tables by first getting all tables, then filtering whatever is in the config.
pkey_tables, non_pkey_tables, _ = await analyze_table_pkeys(src_pool, src_logger)
all_tables = pkey_tables + non_pkey_tables
target_tables = all_tables
if conf.tables:
target_tables = [t for t in all_tables if t in conf.tables]

try:
result = await gather(
src_status(src_pool, src_logger),
dst_status(dst_pool, dst_logger),
initialization_status(
conf.src.db, conf.dst.db, src_pool, dst_pool, initialization_logger
initialization_progress(
target_tables, src_pool, dst_pool, src_logger, dst_logger
),
)

result[0].update(result[1])
result[0]["db"] = conf.db
if result[0]["pg1_pg2"] != "initializing":
result[2]["src_db_size"] = "n/a"
result[2]["dst_db_size"] = "n/a"
if result[0]["pg1_pg2"] == "replicating":
vjeeva marked this conversation as resolved.
Show resolved Hide resolved
result[2]["src_dataset_size"] = "n/a"
result[2]["dst_dataset_size"] = "n/a"
result[2]["progress"] = "n/a"
result[0].update(result[2])
return result[0]
Expand Down
4 changes: 2 additions & 2 deletions pgbelt/util/pglogical.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ async def teardown_replication_set(pool: Pool, logger: Logger) -> None:
"""
Tear down the replication_set
"""
logger.info("Dropping replication set 'default'...")
logger.info("Dropping replication set 'pgbelt'...")
async with pool.acquire() as conn:
async with conn.transaction():
try:
Expand All @@ -213,7 +213,7 @@ async def teardown_replication_set(pool: Pool, logger: Logger) -> None:
UndefinedFunctionError,
InternalServerError,
):
logger.debug("Replication set 'default' does not exist")
logger.debug("Replication set 'pgbelt' does not exist")


async def revoke_pgl(pool: Pool, tables: list[str], logger: Logger) -> None:
Expand Down
75 changes: 59 additions & 16 deletions pgbelt/util/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,33 +378,76 @@
return result


async def get_db_size(db: str, pool: Pool, logger: Logger) -> str:
# TODO: Need to add schema here when working on non-public schema support.
async def get_dataset_size(
tables: list[str], pool: Pool, logger: Logger, schema: str = "public"
) -> str:
"""
Get the DB size
Get the total disk size of a dataset (via list of tables)
"""
logger.info("Getting the DB size...")
logger.info("Getting the targeted dataset size...")

# Tables string must be of form "'table1', 'table2', ..."
tables_string = ", ".join([f"'{t}'" for t in tables])

query = f"""
SELECT
sum(pg_total_relation_size(schemaname || '.' || tablename)) AS total_relation_size
FROM
pg_tables
WHERE
schemaname = '{schema}'
AND tablename IN ({tables_string});
"""

Check warning on line 401 in pgbelt/util/postgres.py

View check run for this annotation

Autodesk Chorus / security/bandit

B608: hardcoded_sql_expressions

Possible SQL injection vector through string-based query construction. secure coding id: PYTH-INJC-20.

# Yes it's a duplicate, but it's a pretty one. Rather let Postgres do this than Python.
pretty_query = f"""
SELECT
pg_size_pretty(sum(pg_total_relation_size(schemaname || '.' || tablename))) AS total_relation_size
FROM
pg_tables
WHERE
schemaname = '{schema}'
AND tablename IN ({tables_string});
"""

Check warning on line 412 in pgbelt/util/postgres.py

View check run for this annotation

Autodesk Chorus / security/bandit

B608: hardcoded_sql_expressions

Possible SQL injection vector through string-based query construction. secure coding id: PYTH-INJC-20.

result = {
"db_size": await pool.fetchval(f"SELECT pg_database_size('{db}');"),
"db_size_pretty": await pool.fetchval(
f"SELECT pg_size_pretty( pg_database_size('{db}') );"
),
"db_size": await pool.fetchval(query),
"db_size_pretty": await pool.fetchval(pretty_query),
}

return result


async def initialization_status(
db_src: str, db_dst: str, src_pool: Pool, dst_pool: Pool, logger: Logger
async def initialization_progress(
tables: list[str],
src_pool: Pool,
dst_pool: Pool,
src_logger: Logger,
dst_logger: Logger,
) -> dict[str, str]:
"""
Get the status of the initialization stage
Get the size progress of the initialization stage
"""
logger.info("checking status of the initialization stage...")
src_db_size = await get_db_size(db_src, src_pool, logger)
dst_db_size = await get_db_size(db_dst, dst_pool, logger)

src_dataset_size = await get_dataset_size(tables, src_pool, src_logger)
dst_dataset_size = await get_dataset_size(tables, dst_pool, dst_logger)

# Eliminate None values
if src_dataset_size["db_size"] is None:
src_dataset_size["db_size"] = 0
if dst_dataset_size["db_size"] is None:
dst_dataset_size["db_size"] = 0

# Eliminate division by zero
if src_dataset_size["db_size"] == 0 and dst_dataset_size["db_size"] == 0:
progress = "0 %"
else:
progress = f"{str(round(int(dst_dataset_size['db_size'])/int(src_dataset_size['db_size'])*100 ,1))} %"

status = {
"src_db_size": src_db_size["db_size_pretty"],
"dst_db_size": dst_db_size["db_size_pretty"],
"progress": f"{str(round(int(dst_db_size['db_size'])/int(src_db_size['db_size'])*100 ,1))} %",
"src_dataset_size": src_dataset_size["db_size_pretty"] or "0 bytes",
"dst_dataset_size": dst_dataset_size["db_size_pretty"] or "0 bytes",
"progress": progress,
}
return status
34 changes: 34 additions & 0 deletions tests/integration/files/test_schema_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,37 @@ SELECT pg_catalog.setval('public.users_id_seq', 1, false);

ALTER TABLE ONLY public.Users
ADD CONSTRAINT users_pkey PRIMARY KEY (id);

-- Extra table with PK for testing exodus migrations
CREATE TABLE public.Users2 (
id bigint NOT NULL,
hash_firstname text NOT NULL,
hash_lastname text NOT NULL,
gender character varying(6) NOT NULL,
CONSTRAINT users_gender_check CHECK (((gender)::text = ANY (ARRAY[('male'::character varying)::text, ('female'::character varying)::text])))
);


ALTER TABLE public.Users2 OWNER TO owner;

CREATE SEQUENCE public.users2_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;


ALTER TABLE public.users2_id_seq OWNER TO owner;

SELECT pg_catalog.setval('public.users2_id_seq', 1, false);

ALTER TABLE ONLY public.Users2
ADD CONSTRAINT users2_pkey PRIMARY KEY (id);

INSERT INTO public.users2 (id, hash_firstname, hash_lastname, gender)
VALUES (1, 'garbagefirst', 'garbagelast', 'male'),
(2, 'garbagefirst1', 'garbagelast1', 'female'),
(3, 'sdgarbagefirst', 'dgsadsrbagelast', 'male'),
(4, 'dsdssdgarbagefirst', 'dgsaggggdjjjsrbagelast', 'female'),
(5, 'dsdssdgarbagefirt', 'dgsagggdjjjsrbagelast', 'female');
Loading