From 1d005011311f47e14d46c084f1ca7e61dfefb284 Mon Sep 17 00:00:00 2001 From: Kevin Date: Mon, 2 Sep 2024 14:59:17 +0200 Subject: [PATCH] fix: clean cached column information on delete (#1562) Fixes https://github.com/electric-sql/electric/issues/1550. This PR extends the shape log collector to also clean the cached column information from ETS when a relation changes. This ensures that when a table is migrated, we don't re-use the old table information from ETS but instead load it from Postgres and re-populate the cache with the new column info. **There's one corner case that this PR does not address:** - Create table, insert data - Sync a shape containing that table - Drop the table - Delete the shape - Recreate the table but with a different schema - Insert some data into the new table - Sync a shape containing the newly recreated table In the above scenario, when syncing the newly recreated table, we get the new data but in the old schema (so we only get the columns that also existed in the old schema). This is because Postgres logical replication stream does not inform us when a table is dropped. As a result, we can't detect that a table was dropped and thus don't know that we need to clean the cached column information. It's only when we get a Relation message that we know this. But Postgres only sends a Relation message the first time the data in the table changes and **we're subscribed to that table in the replication stream** (and that's only after syncing the shape). So, the data that was inserted before we synced the table in the last step, does not lead to a Relation message. Only if we insert data into the table after that sync step, will Postgres send a Relation message that will make us clean the cached column information. But note that at that point the row that was inserted in the previous step is already stored in storage in the format of the old schema. --- .changeset/soft-gifts-watch.md | 5 + .../sync-service/lib/electric/application.ex | 9 +- .../lib/electric/persistent_kv/mock.ex | 20 ++ .../lib/electric/postgres/inspector.ex | 8 + .../postgres/inspector/direct_inspector.ex | 2 + .../postgres/inspector/ets_inspector.ex | 7 + .../sync-service/lib/electric/shape_cache.ex | 72 ++++-- .../lib/electric/shape_cache/shape_status.ex | 22 +- .../replication/shape_log_collector_test.exs | 221 +++++++++++++++++- .../test/electric/shape_cache_test.exs | 52 +++-- .../test/electric/shapes/consumer_test.exs | 5 +- .../test/support/component_setup.ex | 1 + .../test/support/stub_inspector.ex | 3 + 13 files changed, 373 insertions(+), 54 deletions(-) create mode 100644 .changeset/soft-gifts-watch.md create mode 100644 packages/sync-service/lib/electric/persistent_kv/mock.ex diff --git a/.changeset/soft-gifts-watch.md b/.changeset/soft-gifts-watch.md new file mode 100644 index 0000000000..835454c4e7 --- /dev/null +++ b/.changeset/soft-gifts-watch.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Clean cached column info on relation changes. diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 7591cf1518..efa32bad54 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -26,18 +26,19 @@ defmodule Electric.Application do prepare_tables_fn = {Electric.Postgres.Configuration, :configure_tables_for_replication!, [publication_name]} + inspector = + {Electric.Postgres.Inspector.EtsInspector, + server: Electric.Postgres.Inspector.EtsInspector} + shape_cache = {Electric.ShapeCache, storage: storage, + inspector: inspector, prepare_tables_fn: prepare_tables_fn, log_producer: Electric.Replication.ShapeLogCollector, persistent_kv: persistent_kv, registry: Registry.ShapeChanges} - inspector = - {Electric.Postgres.Inspector.EtsInspector, - server: Electric.Postgres.Inspector.EtsInspector} - core_processes = [ {Registry, name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()}, diff --git a/packages/sync-service/lib/electric/persistent_kv/mock.ex b/packages/sync-service/lib/electric/persistent_kv/mock.ex new file mode 100644 index 0000000000..c957b099f4 --- /dev/null +++ b/packages/sync-service/lib/electric/persistent_kv/mock.ex @@ -0,0 +1,20 @@ +defmodule Electric.PersistentKV.Mock do + defstruct [] + + @type t() :: %__MODULE__{} + + @spec new() :: t() + def new() do + %__MODULE__{} + end + + defimpl Electric.PersistentKV do + def set(_memory, _key, _value) do + :ok + end + + def get(_memory, _key) do + {:ok, 42} + end + end +end diff --git a/packages/sync-service/lib/electric/postgres/inspector.ex b/packages/sync-service/lib/electric/postgres/inspector.ex index 5657de478d..4978cdb834 100644 --- a/packages/sync-service/lib/electric/postgres/inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector.ex @@ -17,6 +17,8 @@ defmodule Electric.Postgres.Inspector do @callback load_column_info(relation(), opts :: term()) :: {:ok, [column_info()]} | :table_not_found + @callback clean_column_info(relation(), opts :: term()) :: true + @type inspector :: {module(), opts :: term()} @doc """ @@ -25,6 +27,12 @@ defmodule Electric.Postgres.Inspector do @spec load_column_info(relation(), inspector()) :: {:ok, [column_info()]} | :table_not_found def load_column_info(relation, {module, opts}), do: module.load_column_info(relation, opts) + @doc """ + Clean up column information about a given table using a provided inspector. + """ + @spec clean_column_info(relation(), inspector()) :: true + def clean_column_info(relation, {module, opts}), do: module.clean_column_info(relation, opts) + @doc """ Get columns that should be considered a PK for table. If the table has no PK, then we're considering all columns as identifying. diff --git a/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex index 69b8c9e646..7c861a5a4c 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex @@ -36,4 +36,6 @@ defmodule Electric.Postgres.Inspector.DirectInspector do {:ok, rows} end end + + def clean_column_info(_, _), do: true end diff --git a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex index c56bc7ca01..bf223f3079 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex @@ -29,6 +29,13 @@ defmodule Electric.Postgres.Inspector.EtsInspector do end end + @impl Electric.Postgres.Inspector + def clean_column_info(table, opts_or_state) do + ets_table = Access.get(opts_or_state, :pg_info_table, @default_pg_info_table) + + :ets.delete(ets_table, {table, :columns}) + end + ## Internal API @impl GenServer diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index ec9698bf98..c26808bad3 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -17,6 +17,7 @@ defmodule Electric.ShapeCacheBehaviour do {shape_id(), current_snapshot_offset :: LogOffset.t()} @callback list_active_shapes(opts :: keyword()) :: [{shape_id(), shape_def(), xmin()}] + @callback get_relation(Messages.relation_id(), opts :: keyword()) :: Changes.Relation.t() | nil @callback await_snapshot_start(shape_id(), opts :: keyword()) :: :started | {:error, term()} @callback handle_truncate(shape_id(), keyword()) :: :ok @callback clean_shape(shape_id(), keyword()) :: :ok @@ -55,6 +56,8 @@ defmodule Electric.ShapeCache do default: Electric.Replication.ShapeLogCollector ], storage: [type: :mod_arg, required: true], + inspector: [type: :mod_arg, required: true], + shape_status: [type: :atom, default: Electric.ShapeCache.ShapeStatus], registry: [type: {:or, [:atom, :pid]}, required: true], # NimbleOptions has no "implementation of protocol" type persistent_kv: [type: :any, required: true], @@ -81,9 +84,10 @@ defmodule Electric.ShapeCache do @impl Electric.ShapeCacheBehaviour def get_or_create_shape_id(shape, opts \\ []) do table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + shape_status = Access.get(opts, :shape_status, ShapeStatus) # Get or create the shape ID and fire a snapshot if necessary - if shape_state = ShapeStatus.existing_shape(table, shape) do + if shape_state = shape_status.existing_shape(table, shape) do shape_state else server = Access.get(opts, :server, __MODULE__) @@ -96,8 +100,9 @@ defmodule Electric.ShapeCache do :ok | {:error, term()} def update_shape_latest_offset(shape_id, latest_offset, opts) do meta_table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + shape_status = Access.get(opts, :shape_status, ShapeStatus) - if ShapeStatus.set_latest_offset(meta_table, shape_id, latest_offset) do + if shape_status.set_latest_offset(meta_table, shape_id, latest_offset) do :ok else Logger.warning("Tried to update latest offset for shape #{shape_id} which doesn't exist") @@ -108,14 +113,17 @@ defmodule Electric.ShapeCache do @impl Electric.ShapeCacheBehaviour def list_active_shapes(opts \\ []) do table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + shape_status = Access.get(opts, :shape_status, ShapeStatus) - ShapeStatus.list_active_shapes(table) + shape_status.list_active_shapes(table) end + @impl Electric.ShapeCacheBehaviour @spec get_relation(Messages.relation_id(), opts :: keyword()) :: Changes.Relation.t() | nil def get_relation(relation_id, opts) do meta_table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) - ShapeStatus.get_relation(meta_table, relation_id) + shape_status = Access.get(opts, :shape_status, ShapeStatus) + shape_status.get_relation(meta_table, relation_id) end @impl Electric.ShapeCacheBehaviour @@ -136,8 +144,9 @@ defmodule Electric.ShapeCache do @spec await_snapshot_start(shape_id(), keyword()) :: :started | {:error, term()} def await_snapshot_start(shape_id, opts \\ []) when is_binary(shape_id) do table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + shape_status = Access.get(opts, :shape_status, ShapeStatus) - if ShapeStatus.snapshot_xmin?(table, shape_id) do + if shape_status.snapshot_xmin?(table, shape_id) do :started else server = Access.get(opts, :server, __MODULE__) @@ -148,8 +157,9 @@ defmodule Electric.ShapeCache do @impl Electric.ShapeCacheBehaviour def has_shape?(shape_id, opts \\ []) do table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + shape_status = Access.get(opts, :shape_status, ShapeStatus) - if ShapeStatus.existing_shape(table, shape_id) do + if shape_status.existing_shape(table, shape_id) do true else server = Access.get(opts, :server, __MODULE__) @@ -160,7 +170,7 @@ defmodule Electric.ShapeCache do @impl GenStage def init(opts) do {:ok, persistent_state} = - ShapeStatus.initialise( + opts.shape_status.initialise( persistent_kv: opts.persistent_kv, meta_table: opts.shape_meta_table ) @@ -168,7 +178,9 @@ defmodule Electric.ShapeCache do state = %{ name: opts.name, storage: opts.storage, + inspector: opts.inspector, shape_meta_table: opts.shape_meta_table, + shape_status: opts.shape_status, awaiting_snapshot_start: %{}, db_pool: opts.db_pool, persistent_state: persistent_state, @@ -186,7 +198,7 @@ defmodule Electric.ShapeCache do @impl GenStage def handle_events(relations, _from, state) do - %{persistent_state: persistent_state} = state + %{persistent_state: persistent_state, shape_status: shape_status} = state # NOTE: [@magnetised] this manages cleaning up shapes after a relation # change. it's not doing it in a consistent way, as the shape consumers # could still be receiving txns after a relation message that requires them @@ -209,10 +221,10 @@ defmodule Electric.ShapeCache do # replication stream, it knows that it can just continue. Enum.each(relations, fn relation -> - old_rel = ShapeStatus.get_relation(persistent_state, relation.id) + old_rel = shape_status.get_relation(persistent_state, relation.id) if is_nil(old_rel) || old_rel != relation do - :ok = ShapeStatus.store_relation(persistent_state, relation) + :ok = shape_status.store_relation(persistent_state, relation) end if !is_nil(old_rel) && old_rel != relation do @@ -222,23 +234,35 @@ defmodule Electric.ShapeCache do # Fetch all shapes that are affected by the relation change and clean them up persistent_state - |> ShapeStatus.list_active_shapes() + |> shape_status.list_active_shapes() |> Enum.filter(&Shape.is_affected_by_relation_change?(&1, change)) |> Enum.map(&elem(&1, 0)) |> Enum.each(fn shape_id -> clean_up_shape(state, shape_id) end) end + + if old_rel != relation do + # Clean column information from ETS + # also when old_rel is nil because column info + # may already be loaded into ETS by the initial shape request + {inspector, inspector_opts} = state.inspector + # if the old relation exists we use that one + # because the table name may have changed in the new relation + # if there is no old relation, we use the new one + rel = old_rel || relation + inspector.clean_column_info({rel.schema, rel.table}, inspector_opts) + end end) {:noreply, [], state} end @impl GenStage - def handle_call({:create_or_wait_shape_id, shape}, _from, state) do + def handle_call({:create_or_wait_shape_id, shape}, _from, %{shape_status: shape_status} = state) do {{shape_id, latest_offset}, state} = - if shape_state = ShapeStatus.existing_shape(state.persistent_state, shape) do + if shape_state = shape_status.existing_shape(state.persistent_state, shape) do {shape_state, state} else - {:ok, shape_id} = ShapeStatus.add_shape(state.persistent_state, shape) + {:ok, shape_id} = shape_status.add_shape(state.persistent_state, shape) {:ok, _snapshot_xmin, latest_offset} = start_shape(shape_id, shape, state) {{shape_id, latest_offset}, state} @@ -249,12 +273,12 @@ defmodule Electric.ShapeCache do {:reply, {shape_id, latest_offset}, [], state} end - def handle_call({:await_snapshot_start, shape_id}, from, state) do + def handle_call({:await_snapshot_start, shape_id}, from, %{shape_status: shape_status} = state) do cond do not is_known_shape_id?(state, shape_id) -> {:reply, {:error, :unknown}, [], state} - ShapeStatus.snapshot_xmin?(state.persistent_state, shape_id) -> + shape_status.snapshot_xmin?(state.persistent_state, shape_id) -> {:reply, :started, [], state} true -> @@ -264,8 +288,8 @@ defmodule Electric.ShapeCache do end end - def handle_call({:wait_shape_id, shape_id}, _from, state) do - {:reply, !is_nil(ShapeStatus.existing_shape(state.persistent_state, shape_id)), [], state} + def handle_call({:wait_shape_id, shape_id}, _from, %{shape_status: shape_status} = state) do + {:reply, !is_nil(shape_status.existing_shape(state.persistent_state, shape_id)), [], state} end def handle_call({:truncate, shape_id}, _from, state) do @@ -288,8 +312,8 @@ defmodule Electric.ShapeCache do end @impl GenStage - def handle_cast({:snapshot_xmin_known, shape_id, xmin}, state) do - unless ShapeStatus.set_snapshot_xmin(state.persistent_state, shape_id, xmin) do + def handle_cast({:snapshot_xmin_known, shape_id, xmin}, %{shape_status: shape_status} = state) do + unless shape_status.set_snapshot_xmin(state.persistent_state, shape_id, xmin) do Logger.warning( "Got snapshot information for a #{shape_id}, that shape id is no longer valid. Ignoring." ) @@ -324,11 +348,11 @@ defmodule Electric.ShapeCache do defp clean_up_shape(state, shape_id) do Electric.ShapeCache.ShapeSupervisor.stop_shape_consumer(shape_id) - ShapeStatus.remove_shape(state.persistent_state, shape_id) + state.shape_status.remove_shape(state.persistent_state, shape_id) end defp is_known_shape_id?(state, shape_id) do - if ShapeStatus.existing_shape(state.persistent_state, shape_id) do + if state.shape_status.existing_shape(state.persistent_state, shape_id) do true else false @@ -343,7 +367,7 @@ defmodule Electric.ShapeCache do defp recover_shapes(state) do state.persistent_state - |> ShapeStatus.list_shapes() + |> state.shape_status.list_shapes() |> Enum.each(fn {shape_id, shape} -> {:ok, _snapshot_xmin, _latest_offset} = start_shape(shape_id, shape, state) end) @@ -368,7 +392,7 @@ defmodule Electric.ShapeCache do {:ok, snapshot_xmin, latest_offset} = Shapes.Consumer.initial_state(consumer) :ok = - ShapeStatus.initialise_shape( + state.shape_status.initialise_shape( state.persistent_state, shape_id, snapshot_xmin, diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status.ex b/packages/sync-service/lib/electric/shape_cache/shape_status.ex index d16bbd414d..53dd4e7482 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status.ex @@ -1,9 +1,29 @@ +defmodule Electric.ShapeCache.ShapeStatusBehaviour do + @moduledoc """ + Behaviour defining the ShapeStatus functions to be used in mocks + """ + alias Electric.Shapes.Shape + alias Electric.ShapeCache.ShapeStatus + alias Electric.Postgres.LogicalReplication.Messages + alias Electric.Replication.Changes.Relation + + @callback initialise(ShapeStatus.options()) :: {:ok, ShapeStatus.t()} | {:error, term()} + @callback list_shapes(ShapeStatus.t()) :: [{ShapeStatus.shape_id(), Shape.t()}] + @callback list_active_shapes(opts :: keyword()) :: [ + {ShapeStatus.shape_id(), ShapeStatus.shape_def(), ShapeStatus.xmin()} + ] + @callback get_relation(ShapeStatus.t(), Messages.relation_id()) :: Relation.t() | nil + @callback store_relation(ShapeStatus.t(), Relation.t()) :: :ok + @callback remove_shape(ShapeStatus.t(), ShapeStatus.shape_id()) :: + {:ok, ShapeStatus.t()} | {:error, term()} +end + defmodule Electric.ShapeCache.ShapeStatus do @moduledoc """ Keeps track of shape state. Serializes just enough to some persistent storage to bootstrap the - ShapeCache by writing the mapping of `shape_id => %Shape{}` to + ShapeCache by writing the mapping of `shape_id => %Shape{}` to storage. The shape cache then loads this and starts processes (storage and consumer) diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 513d794027..25d33cf523 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -1,20 +1,42 @@ defmodule Electric.Replication.ShapeLogCollectorTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false + import ExUnit.CaptureLog import Mox alias Electric.Postgres.Lsn + alias Electric.Shapes.Shape alias Electric.Replication.ShapeLogCollector - alias Electric.Replication.Changes.Transaction + alias Electric.Replication.Changes.{Transaction, Relation} alias Electric.Replication.Changes alias Electric.Replication.LogOffset alias Support.Mock + alias Support.StubInspector @moduletag :capture_log + # Define mocks + Mox.defmock(MockShapeStatus, for: Electric.ShapeCache.ShapeStatusBehaviour) + Mox.defmock(MockShapeCache, for: Electric.ShapeCacheBehaviour) + Mox.defmock(MockInspector, for: Electric.Postgres.Inspector) + Mox.defmock(MockStorage, for: Electric.ShapeCache.Storage) + setup :verify_on_exit! + @shape Shape.new!("public.test_table", + inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) + ) + + @similar_shape Shape.new!("public.test_table", + inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]), + where: "id > 5" + ) + + @other_shape Shape.new!("public.other_table", + inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) + ) + describe "store_transaction/2" do setup do # Start a test Registry @@ -38,7 +60,28 @@ defmodule Electric.Replication.ShapeLogCollectorTest do {id, consumer} end) - %{server: pid, registry: registry_name, consumers: consumers} + MockShapeStatus + |> expect(:initialise, 1, fn opts -> {:ok, opts} end) + |> expect(:list_shapes, 1, fn _ -> [] end) + # allow the ShapeCache to call this mock + |> allow(self(), fn -> GenServer.whereis(Electric.ShapeCache) end) + + # We need a ShapeCache process because it is a GenStage consumer + # that handles the Relation events produced by ShapeLogCollector + shape_cache_opts = + [ + storage: {MockStorage, []}, + inspector: {MockInspector, []}, + shape_status: MockShapeStatus, + persistent_kv: Electric.PersistentKV.Memory.new!(), + prepare_tables_fn: fn _, _ -> {:ok, [:ok]} end, + log_producer: __MODULE__.ShapeLogCollector, + registry: registry_name + ] + + {:ok, shape_cache_pid} = Electric.ShapeCache.start_link(shape_cache_opts) + + %{server: pid, registry: registry_name, consumers: consumers, shape_cache: shape_cache_pid} end test "broadcasts keyed changes to consumers", ctx do @@ -80,5 +123,177 @@ defmodule Electric.Replication.ShapeLogCollectorTest do assert xids == [xid] end + + test "stores relation if it is not known", %{server: server, shape_cache: shape_cache} do + relation_id = "rel1" + + rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [] + } + + pid = self() + + MockShapeStatus + |> expect(:get_relation, 1, fn _, ^relation_id -> nil end) + |> expect(:store_relation, 1, fn _, ^rel -> :ok end) + |> allow(self(), server) + + MockInspector + |> expect(:clean_column_info, 1, fn {"public", "test_table"}, _ -> + send(pid, :cleaned) + true + end) + |> allow(self(), shape_cache) + + assert :ok = ShapeLogCollector.handle_relation_msg(rel, server) + assert_receive :cleaned + end + + test "does not clean shapes if relation didn't change", %{ + server: server, + shape_cache: shape_cache + } do + relation_id = "rel1" + + rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [] + } + + pid = self() + + MockShapeStatus + |> expect(:get_relation, 1, fn _, ^relation_id -> rel end) + |> expect(:remove_shape, 0, fn _, _ -> :ok end) + + MockInspector + |> expect(:clean_column_info, 0, fn _, _ -> + send(pid, :cleaned) + true + end) + |> allow(self(), shape_cache) + + assert :ok = ShapeLogCollector.handle_relation_msg(rel, server) + refute_receive :cleaned + end + + test "cleans shapes affected by table renaming and logs a warning", %{ + server: server, + shape_cache: shape_cache + } do + relation_id = "rel1" + + shape_id1 = "shape1" + shape1 = @shape + + shape_id2 = "shape2" + shape2 = @similar_shape + + shape_id3 = "shape3" + shape3 = @other_shape + + # doesn't matter, isn't used for this test + xmin = 100 + + old_rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [] + } + + new_rel = %Relation{ + id: relation_id, + schema: "public", + table: "renamed_test_table", + columns: [] + } + + pid = self() + + MockShapeStatus + |> expect(:get_relation, 1, fn _, ^relation_id -> old_rel end) + |> expect(:store_relation, 1, fn _, ^new_rel -> :ok end) + |> expect(:list_active_shapes, 1, fn _ -> + [{shape_id1, shape1, xmin}, {shape_id2, shape2, xmin}, {shape_id3, shape3, xmin}] + end) + |> expect(:remove_shape, 1, fn state, ^shape_id1 -> {:ok, state} end) + |> expect(:remove_shape, 1, fn state, ^shape_id2 -> {:ok, state} end) + + MockInspector + |> expect(:clean_column_info, 1, fn {"public", "test_table"}, _ -> + send(pid, :cleaned) + true + end) + |> allow(self(), shape_cache) + + log = + capture_log(fn -> + ShapeLogCollector.handle_relation_msg(new_rel, server) + # assert here such that we capture the log until we receive this message + assert_receive :cleaned + end) + + assert log =~ "Schema for the table public.test_table changed" + end + + test "cleans shapes affected by a relation change", %{ + server: server, + shape_cache: shape_cache + } do + relation_id = "rel1" + + shape_id1 = "shape1" + shape1 = @shape + + shape_id2 = "shape2" + shape2 = @similar_shape + + shape_id3 = "shape3" + shape3 = @other_shape + + # doesn't matter, isn't used for this test + xmin = 100 + + old_rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [{"id", "float4"}] + } + + new_rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [{"id", "int8"}] + } + + pid = self() + + MockShapeStatus + |> expect(:get_relation, 1, fn _, ^relation_id -> old_rel end) + |> expect(:store_relation, 1, fn _, ^new_rel -> :ok end) + |> expect(:list_active_shapes, fn _ -> + [{shape_id1, shape1, xmin}, {shape_id2, shape2, xmin}, {shape_id3, shape3, xmin}] + end) + |> expect(:remove_shape, 1, fn state, ^shape_id1 -> {:ok, state} end) + |> expect(:remove_shape, 1, fn state, ^shape_id2 -> {:ok, state} end) + + MockInspector + |> expect(:clean_column_info, 1, fn {"public", "test_table"}, _ -> + send(pid, :cleaned) + true + end) + |> allow(self(), shape_cache) + + assert :ok = ShapeLogCollector.handle_relation_msg(new_rel, server) + assert_receive :cleaned + end end end diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 3184c841ca..d765c8414a 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -46,6 +46,11 @@ defmodule Electric.ShapeCacheTest do @prepare_tables_noop {__MODULE__, :prepare_tables_noop, []} + @stub_inspector StubInspector.new([ + %{name: "id", type: "int8", pk_position: 0}, + %{name: "value", type: "text"} + ]) + describe "get_or_create_shape_id/2" do setup [ :with_in_memory_storage, @@ -56,7 +61,8 @@ defmodule Electric.ShapeCacheTest do ] setup ctx do - with_shape_cache(ctx, + with_shape_cache( + Map.put(ctx, :inspector, @stub_inspector), create_snapshot_fn: fn _, _, _, _, _ -> nil end, prepare_tables_fn: @prepare_tables_noop ) @@ -84,7 +90,7 @@ defmodule Electric.ShapeCacheTest do test "creates initial snapshot if one doesn't exist", %{storage: storage} = ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -104,7 +110,7 @@ defmodule Electric.ShapeCacheTest do test_pid = self() %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: fn nil, [{"public", "items"}] -> send(test_pid, {:called, :prepare_tables_fn}) end, @@ -132,7 +138,7 @@ defmodule Electric.ShapeCacheTest do test_pid = self() %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> send(test_pid, {:called, :create_snapshot_fn}) @@ -179,7 +185,9 @@ defmodule Electric.ShapeCacheTest do shape_id = "foo" %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), prepare_tables_fn: @prepare_tables_noop) + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), + prepare_tables_fn: @prepare_tables_noop + ) shape_meta_table = Access.get(opts, :shape_meta_table) @@ -368,14 +376,16 @@ defmodule Electric.ShapeCacheTest do test "returns empty list initially", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), prepare_tables_fn: @prepare_tables_noop) + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), + prepare_tables_fn: @prepare_tables_noop + ) assert ShapeCache.list_active_shapes(opts) == [] end test "lists the shape as active once there is a snapshot", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -393,7 +403,7 @@ defmodule Electric.ShapeCacheTest do test_pid = self() %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> ref = make_ref() @@ -429,7 +439,7 @@ defmodule Electric.ShapeCacheTest do test "returns true for known shape id", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 100}) @@ -444,7 +454,7 @@ defmodule Electric.ShapeCacheTest do test "works with slow snapshot generation", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _, _, _ -> Process.sleep(100) @@ -468,7 +478,7 @@ defmodule Electric.ShapeCacheTest do test "returns :started for snapshots that have started", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 100}) @@ -487,7 +497,7 @@ defmodule Electric.ShapeCacheTest do storage = Storage.for_shape(shape_id, ctx.storage) %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -505,7 +515,7 @@ defmodule Electric.ShapeCacheTest do test_pid = self() %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> ref = make_ref() @@ -553,7 +563,7 @@ defmodule Electric.ShapeCacheTest do end) %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -584,7 +594,7 @@ defmodule Electric.ShapeCacheTest do test_pid = self() %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, _storage -> ref = make_ref() @@ -625,7 +635,7 @@ defmodule Electric.ShapeCacheTest do test "cleans up shape data and rotates the shape id", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -677,7 +687,7 @@ defmodule Electric.ShapeCacheTest do test "cleans up shape data and rotates the shape id", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -723,7 +733,7 @@ defmodule Electric.ShapeCacheTest do shape_id = "foo" %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -753,7 +763,7 @@ defmodule Electric.ShapeCacheTest do setup(ctx, do: - with_shape_cache(ctx, + with_shape_cache(Map.put(ctx, :inspector, @stub_inspector), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, @snapshot_xmin}) @@ -841,7 +851,7 @@ defmodule Electric.ShapeCacheTest do Process.sleep(1) with_cub_db_storage(context) - with_shape_cache(context, + with_shape_cache(Map.put(context, :inspector, @stub_inspector), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, @snapshot_xmin}) @@ -890,7 +900,7 @@ defmodule Electric.ShapeCacheTest do ] setup(ctx) do - with_shape_cache(ctx, + with_shape_cache(Map.put(ctx, :inspector, @stub_inspector), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, @snapshot_xmin}) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 5dd87eb810..6bf4bbe5c6 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -370,7 +370,10 @@ defmodule Electric.Shapes.ConsumerTest do %{shape_cache_opts: shape_cache_opts} = Support.ComponentSetup.with_shape_cache( - Map.merge(ctx, %{pool: nil}), + Map.merge(ctx, %{ + pool: nil, + inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) + }), log_producer: producer, prepare_tables_fn: fn _, _ -> :ok end, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 8fe497a362..bd2f66dc52 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -51,6 +51,7 @@ defmodule Support.ComponentSetup do [ name: server, shape_meta_table: shape_meta_table, + inspector: ctx.inspector, storage: ctx.storage, db_pool: ctx.pool, persistent_kv: ctx.persistent_kv, diff --git a/packages/sync-service/test/support/stub_inspector.ex b/packages/sync-service/test/support/stub_inspector.ex index bbf9e0e3a2..545d25d266 100644 --- a/packages/sync-service/test/support/stub_inspector.ex +++ b/packages/sync-service/test/support/stub_inspector.ex @@ -19,4 +19,7 @@ defmodule Support.StubInspector do |> Map.fetch!(relation) |> then(&load_column_info(relation, &1)) end + + @impl true + def clean_column_info(_, _), do: true end