Skip to content

Commit

Permalink
feat (sync-service): add filters to PG publication (#1660)
Browse files Browse the repository at this point in the history
This PR modifies the sync-service such that it adds the necessary
filters to the PG publication based on the shape's where clause.
Basically, for every table, we add a filter which is the disjunction of
the where clauses of all the shapes that are rooted in this table.

For example, if we have a shape on `foo` with where clause `a = 5` and
another shape for `foo` with where clause `a = 9` then the publication
will have the filter `((a = 5) OR (a = 9))`.

As soon as there is a shape that does not have a where clause, the
filters for that table is removed from the publication.

**Note: this requires at least Postgres 15.**
  • Loading branch information
kevin-dp authored Sep 11, 2024
1 parent b193cd7 commit 5c684bd
Show file tree
Hide file tree
Showing 11 changed files with 402 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/late-monkeys-vanish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Add shape filters to Postgres publication to reduce processing load on Electric.
58 changes: 57 additions & 1 deletion .github/workflows/elixir_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ permissions:

jobs:
build:
name: Build and test
name: Build and test with PostgreSQL 14
runs-on: ubuntu-latest
defaults:
run:
Expand Down Expand Up @@ -65,6 +65,62 @@ jobs:
run: mix compile --force --all-warnings --warnings-as-errors
- name: Run tests
run: mix test
test_pg_15:
name: Build and test with PostgreSQL 15
runs-on: ubuntu-latest
defaults:
run:
working-directory: packages/sync-service
env:
MIX_ENV: test
services:
postgres:
image: postgres:15-alpine
env:
POSTGRES_PASSWORD: password
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 54321:5432
steps:
- uses: actions/checkout@v4
- name: 'Set PG settings'
run: |
docker exec ${{ job.services.postgres.id }} sh -c 'echo "wal_level=logical" >> /var/lib/postgresql/data/postgresql.conf'
docker restart ${{ job.services.postgres.id }}
- uses: erlef/setup-beam@v1
with:
version-type: strict
version-file: '.tool-versions'
- name: Restore dependencies cache
uses: actions/cache@v3
with:
path: packages/sync-service/deps
key: ${{ runner.os }}-mix-${{ hashFiles('packages/sync-service/mix.lock') }}
restore-keys: ${{ runner.os }}-mix-
- name: Restore compiled code
uses: actions/cache/restore@v4
with:
path: |
packages/sync-service/_build/*/lib
!packages/sync-service/_build/*/lib/electric
key: ${{ runner.os }}-build-test-${{ hashFiles('packages/sync-service/mix.lock') }}
- name: Install dependencies
run: mix deps.get && mix deps.compile
- name: Save compiled code
uses: actions/cache/save@v4
with:
path: |
packages/sync-service/_build/*/lib
!packages/sync-service/_build/*/lib/electric
key: ${{ runner.os }}-build-test-${{ hashFiles('packages/sync-service/mix.lock') }}
- name: Compiles without warnings
run: mix compile --force --all-warnings --warnings-as-errors
- name: Run tests
run: mix test test/electric/postgres/configuration_test.exs
formatting:
name: Check formatting
runs-on: ubuntu-latest
Expand Down
7 changes: 6 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ defmodule Electric.Application do
with {:ok, storage_opts} <- storage_module.shared_opts(storage_opts) do
storage = {storage_module, storage_opts}

get_pg_version = fn ->
Electric.ConnectionManager.get_pg_version(Electric.ConnectionManager)
end

prepare_tables_fn =
{Electric.Postgres.Configuration, :configure_tables_for_replication!, [publication_name]}
{Electric.Postgres.Configuration, :configure_tables_for_replication!,
[get_pg_version, publication_name]}

inspector =
{Electric.Postgres.Inspector.EtsInspector,
Expand Down
31 changes: 30 additions & 1 deletion packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ defmodule Electric.ConnectionManager do
:pool_pid,
# Backoff term used for reconnection with exponential back-off.
:backoff,
# PostgreSQL server version
:pg_version,
:electric_instance_id
]
end
Expand All @@ -67,6 +69,14 @@ defmodule Electric.ConnectionManager do

@name __MODULE__

@doc """
Returns the version of the PostgreSQL server.
"""
@spec get_pg_version(GenServer.server()) :: float()
def get_pg_version(server) do
GenServer.call(server, :get_pg_version)
end

@spec start_link(options) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: @name)
Expand Down Expand Up @@ -112,6 +122,11 @@ defmodule Electric.ConnectionManager do
{:ok, state, {:continue, :start_replication_client}}
end

@impl true
def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do
{:reply, pg_version, state}
end

@impl true
def handle_continue(:start_replication_client, state) do
case start_replication_client(state) do
Expand Down Expand Up @@ -144,11 +159,13 @@ defmodule Electric.ConnectionManager do
{:ok, pid} ->
Electric.Timeline.check(get_pg_timeline(pid), state.timeline_opts)

pg_version = query_pg_major_version(pid)

# Now we have everything ready to start accepting and processing logical messages from
# Postgres.
Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid)

state = %{state | pool_pid: pid}
state = %{state | pool_pid: pid, pg_version: pg_version}
{:noreply, state}

{:error, reason} ->
Expand Down Expand Up @@ -379,4 +396,16 @@ defmodule Electric.ConnectionManager do
{:error, _reason} -> nil
end
end

def query_pg_major_version(conn) do
[[setting]] =
Postgrex.query!(
conn,
"SELECT floor(setting::numeric)::integer FROM pg_settings WHERE name = 'server_version'",
[]
)
|> Map.fetch!(:rows)

setting
end
end
120 changes: 114 additions & 6 deletions packages/sync-service/lib/electric/postgres/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ defmodule Electric.Postgres.Configuration do
a provided connection.
"""
alias Electric.Utils
alias Electric.Shapes.Shape

@type filter() :: String.t() | nil
@type maybe_filter() :: filter() | :relation_not_found
@type filters() :: %{Electric.relation() => filter()}

@doc """
Ensure that all tables are configured for replication.
Expand All @@ -16,17 +21,31 @@ defmodule Electric.Postgres.Configuration do
Raises if it fails to configure all the tables in the expected way.
"""
@spec configure_tables_for_replication!(Postgrex.conn(), [Electric.relation()], String.t()) ::
@spec configure_tables_for_replication!(
Postgrex.conn(),
[Shape.table_with_where_clause()],
(-> String.t()),
float()
) ::
{:ok, [:ok]}
def configure_tables_for_replication!(pool, relations, publication_name) do
def configure_tables_for_replication!(pool, relations, get_pg_version, publication_name) do
configure_tables_for_replication_internal!(
pool,
relations,
get_pg_version.(),
publication_name
)
end

defp configure_tables_for_replication_internal!(pool, relations, pg_version, publication_name)
when pg_version <= 14 do
Postgrex.transaction(pool, fn conn ->
for relation <- relations,
table = Utils.relation_to_sql(relation),
do: Postgrex.query!(conn, "ALTER TABLE #{table} REPLICA IDENTITY FULL", [])
set_replica_identity!(conn, relations)

for relation <- relations, table = Utils.relation_to_sql(relation) do
for {relation, _} <- relations, table = Utils.relation_to_sql(relation) do
Postgrex.query!(conn, "SAVEPOINT before_publication", [])

# PG 14 and below do not support filters on tables of publications
case Postgrex.query(
conn,
"ALTER PUBLICATION #{publication_name} ADD TABLE #{table}",
Expand All @@ -48,4 +67,93 @@ defmodule Electric.Postgres.Configuration do
end
end)
end

defp configure_tables_for_replication_internal!(pool, relations, _pg_version, publication_name) do
Postgrex.transaction(pool, fn conn ->
set_replica_identity!(conn, relations)

for {relation, rel_where_clause} <- relations do
Postgrex.query!(conn, "SAVEPOINT before_publication", [])

filters = get_publication_filters(conn, publication_name)

# Get the existing filter for the table
# and extend it with the where clause for the table
# and update the table in the map with the new filter
filter = Map.get(filters, relation, :relation_not_found)
rel_filter = extend_where_clause(filter, rel_where_clause)
filters = Map.put(filters, relation, rel_filter)

alter_publication_sql =
make_alter_publication_query(publication_name, filters)

case Postgrex.query(conn, alter_publication_sql, []) do
{:ok, _} ->
Postgrex.query!(conn, "RELEASE SAVEPOINT before_publication", [])
:ok

{:error, reason} ->
raise reason
end
end
end)
end

defp set_replica_identity!(conn, relations) do
for {relation, _} <- relations,
table = Utils.relation_to_sql(relation) do
Postgrex.query!(conn, "ALTER TABLE #{table} REPLICA IDENTITY FULL", [])
end
end

# Returns the filters grouped by table for the given publication.
@spec get_publication_filters(Postgrex.conn(), String.t()) :: filters()
defp get_publication_filters(conn, publication) do
Postgrex.query!(
conn,
"SELECT schemaname, tablename, rowfilter FROM pg_publication_tables WHERE pubname = $1",
[publication]
)
|> Map.fetch!(:rows)
|> Enum.map(&{Enum.take(&1, 2) |> List.to_tuple(), Enum.at(&1, 2)})
|> Map.new()
end

# Joins the existing filter for the table with the where clause for the table.
# If one of them is `nil` (i.e. no filter) then the resulting filter is `nil`.
@spec extend_where_clause(maybe_filter(), filter()) :: filter()
defp extend_where_clause(:relation_not_found, where_clause) do
where_clause
end

defp extend_where_clause(filter, where_clause) when is_nil(filter) or is_nil(where_clause) do
nil
end

defp extend_where_clause(filter, where_clause) do
"(#{filter} OR #{where_clause})"
end

# Makes an SQL query that alters the given publication whith the given tables and filters.
@spec make_alter_publication_query(String.t(), filters()) :: String.t()
defp make_alter_publication_query(publication_name, filters) do
base_sql = "ALTER PUBLICATION #{publication_name} SET TABLE "

tables =
filters
|> Enum.map(&make_table_clause/1)
|> Enum.join(", ")

base_sql <> tables
end

@spec make_table_clause(filter()) :: String.t()
defp make_table_clause({{schema, tbl}, nil}) do
Utils.relation_to_sql({schema, tbl})
end

defp make_table_clause({{schema, tbl}, where}) do
table = Utils.relation_to_sql({schema, tbl})
table <> " WHERE " <> where
end
end
11 changes: 10 additions & 1 deletion packages/sync-service/lib/electric/shapes/shape.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ defmodule Electric.Shapes.Shape do
where: Electric.Replication.Eval.Expr.t() | nil
}

@type table_with_where_clause() :: {Electric.relation(), String.t() | nil}

@type json_relation() :: [String.t(), ...]
@type json_table_info() :: table_info() | json_relation()
@type json_table_list() :: [json_table_info(), ...]
Expand Down Expand Up @@ -117,7 +119,14 @@ defmodule Electric.Shapes.Shape do
@doc """
List tables that are a part of this shape.
"""
def affected_tables(%__MODULE__{root_table: table}), do: [table]
@spec affected_tables(t()) :: [table_with_where_clause()]
def affected_tables(%__MODULE__{root_table: table, where: nil}), do: [{table, nil}]

def affected_tables(%__MODULE__{
root_table: table,
where: %Electric.Replication.Eval.Expr{query: where_clause}
}),
do: [{table, "(" <> where_clause <> ")"}]

@doc """
Convert a change to be correctly represented within the shape.
Expand Down
Loading

0 comments on commit 5c684bd

Please sign in to comment.