diff --git a/.changeset/late-monkeys-vanish.md b/.changeset/late-monkeys-vanish.md new file mode 100644 index 0000000000..212d23ba60 --- /dev/null +++ b/.changeset/late-monkeys-vanish.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Add shape filters to Postgres publication to reduce processing load on Electric. diff --git a/.github/workflows/elixir_tests.yml b/.github/workflows/elixir_tests.yml index 8dd4d62729..d3280d344c 100644 --- a/.github/workflows/elixir_tests.yml +++ b/.github/workflows/elixir_tests.yml @@ -10,7 +10,7 @@ permissions: jobs: build: - name: Build and test + name: Build and test with PostgreSQL 14 runs-on: ubuntu-latest defaults: run: @@ -65,6 +65,62 @@ jobs: run: mix compile --force --all-warnings --warnings-as-errors - name: Run tests run: mix test + test_pg_15: + name: Build and test with PostgreSQL 15 + runs-on: ubuntu-latest + defaults: + run: + working-directory: packages/sync-service + env: + MIX_ENV: test + services: + postgres: + image: postgres:15-alpine + env: + POSTGRES_PASSWORD: password + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 54321:5432 + steps: + - uses: actions/checkout@v4 + - name: 'Set PG settings' + run: | + docker exec ${{ job.services.postgres.id }} sh -c 'echo "wal_level=logical" >> /var/lib/postgresql/data/postgresql.conf' + docker restart ${{ job.services.postgres.id }} + - uses: erlef/setup-beam@v1 + with: + version-type: strict + version-file: '.tool-versions' + - name: Restore dependencies cache + uses: actions/cache@v3 + with: + path: packages/sync-service/deps + key: ${{ runner.os }}-mix-${{ hashFiles('packages/sync-service/mix.lock') }} + restore-keys: ${{ runner.os }}-mix- + - name: Restore compiled code + uses: actions/cache/restore@v4 + with: + path: | + packages/sync-service/_build/*/lib + !packages/sync-service/_build/*/lib/electric + key: ${{ runner.os }}-build-test-${{ hashFiles('packages/sync-service/mix.lock') }} + - name: Install dependencies + run: mix deps.get && mix deps.compile + - name: Save compiled code + uses: actions/cache/save@v4 + with: + path: | + packages/sync-service/_build/*/lib + !packages/sync-service/_build/*/lib/electric + key: ${{ runner.os }}-build-test-${{ hashFiles('packages/sync-service/mix.lock') }} + - name: Compiles without warnings + run: mix compile --force --all-warnings --warnings-as-errors + - name: Run tests + run: mix test test/electric/postgres/configuration_test.exs formatting: name: Check formatting runs-on: ubuntu-latest diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 24413b65a4..41ce1a83bd 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -28,8 +28,13 @@ defmodule Electric.Application do with {:ok, storage_opts} <- storage_module.shared_opts(storage_opts) do storage = {storage_module, storage_opts} + get_pg_version = fn -> + Electric.ConnectionManager.get_pg_version(Electric.ConnectionManager) + end + prepare_tables_fn = - {Electric.Postgres.Configuration, :configure_tables_for_replication!, [publication_name]} + {Electric.Postgres.Configuration, :configure_tables_for_replication!, + [get_pg_version, publication_name]} inspector = {Electric.Postgres.Inspector.EtsInspector, diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index 1877839704..a8354c95e3 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -47,6 +47,8 @@ defmodule Electric.ConnectionManager do :pool_pid, # Backoff term used for reconnection with exponential back-off. :backoff, + # PostgreSQL server version + :pg_version, :electric_instance_id ] end @@ -67,6 +69,14 @@ defmodule Electric.ConnectionManager do @name __MODULE__ + @doc """ + Returns the version of the PostgreSQL server. + """ + @spec get_pg_version(GenServer.server()) :: float() + def get_pg_version(server) do + GenServer.call(server, :get_pg_version) + end + @spec start_link(options) :: GenServer.on_start() def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: @name) @@ -112,6 +122,11 @@ defmodule Electric.ConnectionManager do {:ok, state, {:continue, :start_replication_client}} end + @impl true + def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do + {:reply, pg_version, state} + end + @impl true def handle_continue(:start_replication_client, state) do case start_replication_client(state) do @@ -144,11 +159,13 @@ defmodule Electric.ConnectionManager do {:ok, pid} -> Electric.Timeline.check(get_pg_timeline(pid), state.timeline_opts) + pg_version = query_pg_major_version(pid) + # Now we have everything ready to start accepting and processing logical messages from # Postgres. Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid) - state = %{state | pool_pid: pid} + state = %{state | pool_pid: pid, pg_version: pg_version} {:noreply, state} {:error, reason} -> @@ -379,4 +396,16 @@ defmodule Electric.ConnectionManager do {:error, _reason} -> nil end end + + def query_pg_major_version(conn) do + [[setting]] = + Postgrex.query!( + conn, + "SELECT floor(setting::numeric)::integer FROM pg_settings WHERE name = 'server_version'", + [] + ) + |> Map.fetch!(:rows) + + setting + end end diff --git a/packages/sync-service/lib/electric/postgres/configuration.ex b/packages/sync-service/lib/electric/postgres/configuration.ex index 3273fcad59..09af47db14 100644 --- a/packages/sync-service/lib/electric/postgres/configuration.ex +++ b/packages/sync-service/lib/electric/postgres/configuration.ex @@ -4,6 +4,11 @@ defmodule Electric.Postgres.Configuration do a provided connection. """ alias Electric.Utils + alias Electric.Shapes.Shape + + @type filter() :: String.t() | nil + @type maybe_filter() :: filter() | :relation_not_found + @type filters() :: %{Electric.relation() => filter()} @doc """ Ensure that all tables are configured for replication. @@ -16,17 +21,31 @@ defmodule Electric.Postgres.Configuration do Raises if it fails to configure all the tables in the expected way. """ - @spec configure_tables_for_replication!(Postgrex.conn(), [Electric.relation()], String.t()) :: + @spec configure_tables_for_replication!( + Postgrex.conn(), + [Shape.table_with_where_clause()], + (-> String.t()), + float() + ) :: {:ok, [:ok]} - def configure_tables_for_replication!(pool, relations, publication_name) do + def configure_tables_for_replication!(pool, relations, get_pg_version, publication_name) do + configure_tables_for_replication_internal!( + pool, + relations, + get_pg_version.(), + publication_name + ) + end + + defp configure_tables_for_replication_internal!(pool, relations, pg_version, publication_name) + when pg_version <= 14 do Postgrex.transaction(pool, fn conn -> - for relation <- relations, - table = Utils.relation_to_sql(relation), - do: Postgrex.query!(conn, "ALTER TABLE #{table} REPLICA IDENTITY FULL", []) + set_replica_identity!(conn, relations) - for relation <- relations, table = Utils.relation_to_sql(relation) do + for {relation, _} <- relations, table = Utils.relation_to_sql(relation) do Postgrex.query!(conn, "SAVEPOINT before_publication", []) + # PG 14 and below do not support filters on tables of publications case Postgrex.query( conn, "ALTER PUBLICATION #{publication_name} ADD TABLE #{table}", @@ -48,4 +67,93 @@ defmodule Electric.Postgres.Configuration do end end) end + + defp configure_tables_for_replication_internal!(pool, relations, _pg_version, publication_name) do + Postgrex.transaction(pool, fn conn -> + set_replica_identity!(conn, relations) + + for {relation, rel_where_clause} <- relations do + Postgrex.query!(conn, "SAVEPOINT before_publication", []) + + filters = get_publication_filters(conn, publication_name) + + # Get the existing filter for the table + # and extend it with the where clause for the table + # and update the table in the map with the new filter + filter = Map.get(filters, relation, :relation_not_found) + rel_filter = extend_where_clause(filter, rel_where_clause) + filters = Map.put(filters, relation, rel_filter) + + alter_publication_sql = + make_alter_publication_query(publication_name, filters) + + case Postgrex.query(conn, alter_publication_sql, []) do + {:ok, _} -> + Postgrex.query!(conn, "RELEASE SAVEPOINT before_publication", []) + :ok + + {:error, reason} -> + raise reason + end + end + end) + end + + defp set_replica_identity!(conn, relations) do + for {relation, _} <- relations, + table = Utils.relation_to_sql(relation) do + Postgrex.query!(conn, "ALTER TABLE #{table} REPLICA IDENTITY FULL", []) + end + end + + # Returns the filters grouped by table for the given publication. + @spec get_publication_filters(Postgrex.conn(), String.t()) :: filters() + defp get_publication_filters(conn, publication) do + Postgrex.query!( + conn, + "SELECT schemaname, tablename, rowfilter FROM pg_publication_tables WHERE pubname = $1", + [publication] + ) + |> Map.fetch!(:rows) + |> Enum.map(&{Enum.take(&1, 2) |> List.to_tuple(), Enum.at(&1, 2)}) + |> Map.new() + end + + # Joins the existing filter for the table with the where clause for the table. + # If one of them is `nil` (i.e. no filter) then the resulting filter is `nil`. + @spec extend_where_clause(maybe_filter(), filter()) :: filter() + defp extend_where_clause(:relation_not_found, where_clause) do + where_clause + end + + defp extend_where_clause(filter, where_clause) when is_nil(filter) or is_nil(where_clause) do + nil + end + + defp extend_where_clause(filter, where_clause) do + "(#{filter} OR #{where_clause})" + end + + # Makes an SQL query that alters the given publication whith the given tables and filters. + @spec make_alter_publication_query(String.t(), filters()) :: String.t() + defp make_alter_publication_query(publication_name, filters) do + base_sql = "ALTER PUBLICATION #{publication_name} SET TABLE " + + tables = + filters + |> Enum.map(&make_table_clause/1) + |> Enum.join(", ") + + base_sql <> tables + end + + @spec make_table_clause(filter()) :: String.t() + defp make_table_clause({{schema, tbl}, nil}) do + Utils.relation_to_sql({schema, tbl}) + end + + defp make_table_clause({{schema, tbl}, where}) do + table = Utils.relation_to_sql({schema, tbl}) + table <> " WHERE " <> where + end end diff --git a/packages/sync-service/lib/electric/shapes/shape.ex b/packages/sync-service/lib/electric/shapes/shape.ex index e3ada4f28f..b20c94e9b4 100644 --- a/packages/sync-service/lib/electric/shapes/shape.ex +++ b/packages/sync-service/lib/electric/shapes/shape.ex @@ -24,6 +24,8 @@ defmodule Electric.Shapes.Shape do where: Electric.Replication.Eval.Expr.t() | nil } + @type table_with_where_clause() :: {Electric.relation(), String.t() | nil} + @type json_relation() :: [String.t(), ...] @type json_table_info() :: table_info() | json_relation() @type json_table_list() :: [json_table_info(), ...] @@ -117,7 +119,14 @@ defmodule Electric.Shapes.Shape do @doc """ List tables that are a part of this shape. """ - def affected_tables(%__MODULE__{root_table: table}), do: [table] + @spec affected_tables(t()) :: [table_with_where_clause()] + def affected_tables(%__MODULE__{root_table: table, where: nil}), do: [{table, nil}] + + def affected_tables(%__MODULE__{ + root_table: table, + where: %Electric.Replication.Eval.Expr{query: where_clause} + }), + do: [{table, "(" <> where_clause <> ")"}] @doc """ Convert a change to be correctly represented within the shape. diff --git a/packages/sync-service/test/electric/postgres/configuration_test.exs b/packages/sync-service/test/electric/postgres/configuration_test.exs index 8c17ba7ab9..6919d331ad 100644 --- a/packages/sync-service/test/electric/postgres/configuration_test.exs +++ b/packages/sync-service/test/electric/postgres/configuration_test.exs @@ -5,6 +5,7 @@ defmodule Electric.Postgres.ConfigurationTest do setup {Support.DbSetup, :with_unique_db} setup {Support.DbSetup, :with_publication} + setup {Support.DbSetup, :with_pg_version} setup %{db_conn: conn} do Postgrex.query!( @@ -34,60 +35,183 @@ defmodule Electric.Postgres.ConfigurationTest do describe "configure_tables_for_replication!/3" do test "sets REPLICA IDENTITY on the table and adds it to the publication", - %{pool: conn, publication_name: publication} do + %{pool: conn, publication_name: publication, get_pg_version: get_pg_version} do assert get_table_identity(conn, {"public", "items"}) == "d" assert list_tables_in_publication(conn, publication) == [] - Configuration.configure_tables_for_replication!(conn, [{"public", "items"}], publication) + + Configuration.configure_tables_for_replication!( + conn, + [{{"public", "items"}, "(value ILIKE 'yes%')"}], + get_pg_version, + publication + ) + + assert get_table_identity(conn, {"public", "items"}) == "f" + + pg_version = get_pg_version.() + + assert list_tables_in_publication(conn, publication) == + expected_filters( + [ + {"public", "items", "(value ~~* 'yes%'::text)"} + ], + pg_version + ) + end + + test "works with multiple tables", %{ + pool: conn, + publication_name: publication, + get_pg_version: get_pg_version + } do + assert get_table_identity(conn, {"public", "items"}) == "d" + assert get_table_identity(conn, {"public", "other_table"}) == "d" + assert list_tables_in_publication(conn, publication) == [] + + Configuration.configure_tables_for_replication!( + conn, + [ + {{"public", "items"}, "(value ILIKE 'yes%')"}, + {{"public", "other_table"}, "(value ILIKE 'no%')"} + ], + get_pg_version, + publication + ) + assert get_table_identity(conn, {"public", "items"}) == "f" - assert list_tables_in_publication(conn, publication) == [{"public", "items"}] + assert get_table_identity(conn, {"public", "other_table"}) == "f" + + pg_version = get_pg_version.() + + assert list_tables_in_publication(conn, publication) == + expected_filters( + [ + {"public", "items", "(value ~~* 'yes%'::text)"}, + {"public", "other_table", "(value ~~* 'no%'::text)"} + ], + pg_version + ) end - test "works with multiple tables", %{pool: conn, publication_name: publication} do + test "keeps all tables when updating one of them", %{ + pool: conn, + publication_name: publication, + get_pg_version: get_pg_version + } do assert get_table_identity(conn, {"public", "items"}) == "d" assert get_table_identity(conn, {"public", "other_table"}) == "d" assert list_tables_in_publication(conn, publication) == [] Configuration.configure_tables_for_replication!( conn, - [{"public", "items"}, {"public", "other_table"}], + [ + {{"public", "items"}, "(value ILIKE 'yes%')"}, + {{"public", "other_table"}, "(value ILIKE 'no%')"} + ], + get_pg_version, publication ) assert get_table_identity(conn, {"public", "items"}) == "f" assert get_table_identity(conn, {"public", "other_table"}) == "f" - assert list_tables_in_publication(conn, publication) == [ - {"public", "items"}, - {"public", "other_table"} - ] + pg_version = get_pg_version.() + + assert list_tables_in_publication(conn, publication) == + expected_filters( + [ + {"public", "items", "(value ~~* 'yes%'::text)"}, + {"public", "other_table", "(value ~~* 'no%'::text)"} + ], + pg_version + ) + + Configuration.configure_tables_for_replication!( + conn, + [ + {{"public", "other_table"}, "(value ILIKE 'yes%')"} + ], + get_pg_version, + publication + ) + + assert list_tables_in_publication(conn, publication) == + expected_filters( + [ + {"public", "items", "(value ~~* 'yes%'::text)"}, + {"public", "other_table", + "((value ~~* 'no%'::text) OR (value ~~* 'yes%'::text))"} + ], + pg_version + ) end test "doesn't fail when one of the tables is already configured", - %{pool: conn, publication_name: publication} do - Configuration.configure_tables_for_replication!(conn, [{"public", "items"}], publication) + %{pool: conn, publication_name: publication, get_pg_version: get_pg_version} do + Configuration.configure_tables_for_replication!( + conn, + [{{"public", "items"}, "(value ILIKE 'yes%')"}], + get_pg_version, + publication + ) + assert get_table_identity(conn, {"public", "other_table"}) == "d" - assert list_tables_in_publication(conn, publication) == [{"public", "items"}] + pg_version = get_pg_version.() + + assert list_tables_in_publication(conn, publication) == + expected_filters( + [ + {"public", "items", "(value ~~* 'yes%'::text)"} + ], + pg_version + ) + + # Configure `items` table again but with a different where clause Configuration.configure_tables_for_replication!( conn, - [{"public", "items"}, {"public", "other_table"}], + [{{"public", "items"}, "(value ILIKE 'no%')"}, {{"public", "other_table"}, nil}], + get_pg_version, publication ) assert get_table_identity(conn, {"public", "items"}) == "f" assert get_table_identity(conn, {"public", "other_table"}) == "f" - assert list_tables_in_publication(conn, publication) == [ - {"public", "items"}, - {"public", "other_table"} - ] + assert list_tables_in_publication(conn, publication) == + expected_filters( + [ + {"public", "items", "((value ~~* 'yes%'::text) OR (value ~~* 'no%'::text))"}, + {"public", "other_table", nil} + ], + pg_version + ) + + # Now configure it again but for a shape that has no where clause + # the resulting publication should no longer have a filter for that table + Configuration.configure_tables_for_replication!( + conn, + [{{"public", "items"}, nil}, {{"public", "other_table"}, "(value ILIKE 'no%')"}], + get_pg_version, + publication + ) + + assert list_tables_in_publication(conn, publication) == + expected_filters( + [ + {"public", "items", nil}, + {"public", "other_table", nil} + ], + pg_version + ) end - test "fails when a publication doesn't exist", %{pool: conn} do + test "fails when a publication doesn't exist", %{pool: conn, get_pg_version: get_pg_version} do assert_raise Postgrex.Error, ~r/undefined_object/, fn -> Configuration.configure_tables_for_replication!( conn, - [{"public", "items"}], + [{{"public", "items"}, nil}], + get_pg_version, "nonexistent" ) end @@ -111,6 +235,11 @@ defmodule Electric.Postgres.ConfigurationTest do end defp list_tables_in_publication(conn, publication) do + pg_version = Electric.ConnectionManager.query_pg_major_version(conn) + list_tables_in_pub(conn, publication, pg_version) + end + + defp list_tables_in_pub(conn, publication, pg_version) when pg_version <= 14 do Postgrex.query!( conn, "SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = $1 ORDER BY tablename", @@ -119,4 +248,20 @@ defmodule Electric.Postgres.ConfigurationTest do |> Map.fetch!(:rows) |> Enum.map(&List.to_tuple/1) end + + defp list_tables_in_pub(conn, publication, _pg_version) do + Postgrex.query!( + conn, + "SELECT schemaname, tablename, rowfilter FROM pg_publication_tables WHERE pubname = $1 ORDER BY tablename", + [publication] + ) + |> Map.fetch!(:rows) + |> Enum.map(&List.to_tuple/1) + end + + defp expected_filters(filters, pg_version) when pg_version <= 14 do + Enum.map(filters, fn {schema, table, _filter} -> {schema, table} end) + end + + defp expected_filters(filters, _pg_version), do: filters end diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 69b93d3b86..da69070786 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -125,7 +125,7 @@ defmodule Electric.ShapeCacheTest do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), - prepare_tables_fn: fn nil, [{"public", "items"}] -> + prepare_tables_fn: fn nil, [{{"public", "items"}, nil}] -> send(test_pid, {:called, :prepare_tables_fn}) end, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 04699f99b8..e66d0cc7db 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -667,6 +667,8 @@ defmodule Electric.Shapes.ConsumerTest do } ] + get_pg_version = fn -> 14 end + shape_cache_config = [ name: shape_cache_name, @@ -683,7 +685,7 @@ defmodule Electric.Shapes.ConsumerTest do prepare_tables_fn: { Electric.Postgres.Configuration, :configure_tables_for_replication!, - [ctx.publication_name] + [get_pg_version, ctx.publication_name] }, create_snapshot_fn: fn parent, shape_id, _shape, _, _storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 0}) diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 916b580922..8307977ef4 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -55,6 +55,7 @@ defmodule Support.ComponentSetup do shape_meta_table = :"shape_meta_#{full_test_name(ctx)}" server = :"shape_cache_#{full_test_name(ctx)}" consumer_supervisor = :"consumer_supervisor_#{full_test_name(ctx)}" + get_pg_version = fn -> 14 end start_opts = [ @@ -72,8 +73,12 @@ defmodule Support.ComponentSetup do ] |> Keyword.merge(additional_opts) |> Keyword.put_new_lazy(:prepare_tables_fn, fn -> - {Electric.Postgres.Configuration, :configure_tables_for_replication!, - [ctx.publication_name]} + { + Electric.Postgres.Configuration, + :configure_tables_for_replication!, + # TODO: can pass PG version here, then in Application also pass it there, or pass a function that returns the version + [get_pg_version, ctx.publication_name] + } end) {:ok, _pid} = diff --git a/packages/sync-service/test/support/db_setup.ex b/packages/sync-service/test/support/db_setup.ex index c68de82503..f2c171d538 100644 --- a/packages/sync-service/test/support/db_setup.ex +++ b/packages/sync-service/test/support/db_setup.ex @@ -48,6 +48,11 @@ defmodule Support.DbSetup do {:ok, %{publication_name: "electric_test_publication"}} end + def with_pg_version(ctx) do + pg_version = Electric.ConnectionManager.query_pg_major_version(ctx.db_conn) + {:ok, %{get_pg_version: fn -> pg_version end}} + end + def with_shared_db(_ctx) do config = Application.fetch_env!(:electric, :connection_opts) {:ok, pool} = Postgrex.start_link(config ++ @postgrex_start_opts)