Skip to content

Commit

Permalink
fix: various small fixes found from new integration testing (#409)
Browse files Browse the repository at this point in the history
  • Loading branch information
vjeeva authored Feb 29, 2024
1 parent 6928255 commit 0309c56
Show file tree
Hide file tree
Showing 10 changed files with 593 additions and 435 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dist/*
logs/*
configs/testdc/*
schemas/*
tables/*
.python-version
.mypy_cache
__pycache__/
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ This feature is very useful when you are making code changes to `pgbelt` and wan

To do this, this local development feature uses `docker` and `docker-compose` to spin up the following:

1. Two Postgres Containers with networking configured between each other
1. 4 Pairs of Postgres Containers with networking configured between each other. These 4 sets are used for the following integration test cases: public schema & whole DB, public schema & exodus-style migration (only a subset is moved), non-public schema & whole DB, non-public schema & exodus-style migration (only a subset is moved)
2. One container loaded with your local copy of `pgbelt`, built and installed for CLI usage.

Simply run the following to spin the above up and drop yourself into your container with your local `pgbelt`:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ clean-docker: ## Stop and remove all docker containers and images made from loca
# Finally, we OR true because the pre-commit errors when finding stuff to fix, but that's exactly what we want it to do.
generate-usage-docs: ## Generate usage docs
pip3 install typer-cli && typer pgbelt/main.py utils docs --name belt > docs/usage.md && pre-commit run --files docs/usage.md || true

clean-local-files: # Clean out local files that were generated by local-dev command or local runs of pytest
rm -rf schemas/ configs/testdc/ logs/ tables/
123 changes: 83 additions & 40 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3"

services:
db-src:
db-src-set-public-schema-full: &db
image: autodesk/postgres-pglogical-docker:13
environment:
POSTGRES_PASSWORD: postgres
Expand All @@ -16,20 +16,47 @@ services:
timeout: 5s
retries: 5

db-dst:
image: autodesk/postgres-pglogical-docker:13
environment:
POSTGRES_PASSWORD: postgres
PGDATA: /tmp/pgdata
restart: on-failure
db-dst-set-public-schema-full:
<<: *db
networks:
datacenter:
ipv4_address: 10.5.0.6
healthcheck:
test: ["CMD-SHELL", "pg_isready"]
interval: 10s
timeout: 5s
retries: 5

db-src-set-non-public-schema-full:
<<: *db
networks:
datacenter:
ipv4_address: 10.5.0.7

db-dst-set-non-public-schema-full:
<<: *db
networks:
datacenter:
ipv4_address: 10.5.0.8

db-src-set-public-schema-exodus:
<<: *db
networks:
datacenter:
ipv4_address: 10.5.0.9

db-dst-set-public-schema-exodus:
<<: *db
networks:
datacenter:
ipv4_address: 10.5.0.10

db-src-set-non-public-schema-exodus:
<<: *db
networks:
datacenter:
ipv4_address: 10.5.0.11

db-dst-set-non-public-schema-exodus:
<<: *db
networks:
datacenter:
ipv4_address: 10.5.0.12

flake8:
image: autodesk/pgbelt:latest
Expand All @@ -46,47 +73,63 @@ services:
tests:
image: autodesk/pgbelt:latest
environment:
TEST_PG_SRC_HOST: localhost
TEST_PG_SRC_IP: 10.5.0.5
TEST_PG_SRC_DB: src
TEST_PG_SRC_PORT: 5432
TEST_PG_SRC_ROOT_USERNAME: postgres
TEST_PG_SRC_ROOT_PASSWORD: postgres
TEST_PG_DST_HOST: localhost
TEST_PG_DST_IP: 10.5.0.6
TEST_PG_DST_DB: dst
TEST_PG_DST_PORT: 5432
TEST_PG_DST_ROOT_USERNAME: postgres
TEST_PG_DST_ROOT_PASSWORD: postgres
PUBLIC_FULL_SRC_IP: 10.5.0.5
PUBLIC_FULL_DST_IP: 10.5.0.6
NONPUBLIC_FULL_SRC_IP: 10.5.0.7
NONPUBLIC_FULL_DST_IP: 10.5.0.8
PUBLIC_EXODUS_SRC_IP: 10.5.0.9
PUBLIC_EXODUS_DST_IP: 10.5.0.10
NONPUBLIC_EXODUS_SRC_IP: 10.5.0.11
NONPUBLIC_EXODUS_DST_IP: 10.5.0.12
command: poetry run pytest --cov=pgbelt tests/
depends_on:
db-src:
db-src-set-public-schema-full:
condition: service_healthy
db-dst-set-public-schema-full:
condition: service_healthy
db-src-set-non-public-schema-full:
condition: service_healthy
db-dst:
db-dst-set-non-public-schema-full:
condition: service_healthy
db-src-set-public-schema-exodus:
condition: service_healthy
db-dst-set-public-schema-exodus:
condition: service_healthy
db-src-set-non-public-schema-exodus:
condition: service_healthy
db-dst-set-non-public-schema-exodus:
condition: service_healthy
networks:
- datacenter

localtest:
image: autodesk/pgbelt:latest
environment:
TEST_PG_SRC_HOST: localhost
TEST_PG_SRC_IP: 10.5.0.5
TEST_PG_SRC_DB: src
TEST_PG_SRC_PORT: 5432
TEST_PG_SRC_ROOT_USERNAME: postgres
TEST_PG_SRC_ROOT_PASSWORD: postgres
TEST_PG_DST_HOST: localhost
TEST_PG_DST_IP: 10.5.0.6
TEST_PG_DST_DB: dst
TEST_PG_DST_PORT: 5432
TEST_PG_DST_ROOT_USERNAME: postgres
TEST_PG_DST_ROOT_PASSWORD: postgres
PUBLIC_FULL_SRC_IP: 10.5.0.5
PUBLIC_FULL_DST_IP: 10.5.0.6
NONPUBLIC_FULL_SRC_IP: 10.5.0.7
NONPUBLIC_FULL_DST_IP: 10.5.0.8
PUBLIC_EXODUS_SRC_IP: 10.5.0.9
PUBLIC_EXODUS_DST_IP: 10.5.0.10
NONPUBLIC_EXODUS_SRC_IP: 10.5.0.11
NONPUBLIC_EXODUS_DST_IP: 10.5.0.12
command: bash -c "cd /pgbelt-volume/ && poetry run python3 tests/integration/conftest.py --non-public-schema && pip3 install -e . && bash"
depends_on:
db-src:
db-src-set-public-schema-full:
condition: service_healthy
db-dst-set-public-schema-full:
condition: service_healthy
db-src-set-non-public-schema-full:
condition: service_healthy
db-dst-set-non-public-schema-full:
condition: service_healthy
db-src-set-public-schema-exodus:
condition: service_healthy
db-dst-set-public-schema-exodus:
condition: service_healthy
db-src-set-non-public-schema-exodus:
condition: service_healthy
db-dst:
db-dst-set-non-public-schema-exodus:
condition: service_healthy
networks:
- datacenter
Expand Down
14 changes: 13 additions & 1 deletion pgbelt/cmd/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,19 @@ async def _sync_sequences(
src_logger: Logger,
dst_logger: Logger,
) -> None:
seq_vals = await dump_sequences(src_pool, targeted_sequences, schema, src_logger)

# Note: When in an exodus migration with a non-public schema, the sequence names must be prefixed with the schema name.
# This may not be done by the user, so we must do it here.
proper_sequence_names = None
if targeted_sequences is not None:
proper_sequence_names = []
for seq in targeted_sequences:
if f"{schema}." not in seq:
proper_sequence_names.append(f"{schema}.{seq}")
else:
proper_sequence_names.append(seq)

seq_vals = await dump_sequences(src_pool, proper_sequence_names, schema, src_logger)
await load_sequences(dst_pool, seq_vals, dst_logger)


Expand Down
3 changes: 2 additions & 1 deletion pgbelt/util/pglogical.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ async def grant_pgl(pool: Pool, tables: list[str], schema: str, logger: Logger)
async with pool.acquire() as conn:
async with conn.transaction():
if tables:
tables_with_schema = [f"{schema}.{table}" for table in tables]
await conn.execute(
f"GRANT ALL ON TABLE {','.join(tables)} TO pglogical;"
f"GRANT ALL ON TABLE {','.join(tables_with_schema)} TO pglogical;"
)
else:
await conn.execute(
Expand Down
28 changes: 17 additions & 11 deletions pgbelt/util/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@ async def dump_sequences(
return a dictionary of sequence names mapped to their last values
"""
logger.info("Dumping sequence values...")
seqs = await pool.fetch("SELECT sequence_name FROM information_schema.sequences;")
# Get all sequences in the schema
seqs = await pool.fetch(
f"""
SELECT '{schema}' || '.' || sequence_name
FROM information_schema.sequences
WHERE sequence_schema = '{schema}';
"""
)

seq_vals = {}
final_seqs = []
# If we get a list of targeted sequences, we only want to dump whichever of those are found in the database and schema.
if targeted_sequences:
for seq in [r[0] for r in seqs if r[0] in targeted_sequences]:
seq_vals[seq.strip()] = await pool.fetchval(
f"SELECT last_value FROM {schema}.{seq};"
)
else:
for seq in [r[0] for r in seqs]:
seq_stripped = seq.strip()
seq_vals[f"{schema}.{seq_stripped}"] = await pool.fetchval(
f"SELECT last_value FROM {schema}.{seq};"
)
final_seqs = [r[0] for r in seqs if r[0] in targeted_sequences]
else: # Otherwise, we want to dump all sequences found in the schema.
final_seqs = [r[0] for r in seqs]

for seq in final_seqs:
res = await pool.fetchval(f"SELECT last_value FROM {seq};")
seq_vals[seq.strip()] = res

logger.debug(f"Dumped sequences: {seq_vals}")
return seq_vals
Expand Down
Loading

0 comments on commit 0309c56

Please sign in to comment.