diff --git a/.changeset/soft-gifts-watch.md b/.changeset/soft-gifts-watch.md new file mode 100644 index 0000000000..835454c4e7 --- /dev/null +++ b/.changeset/soft-gifts-watch.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Clean cached column info on relation changes. diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 7591cf1518..efa32bad54 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -26,18 +26,19 @@ defmodule Electric.Application do prepare_tables_fn = {Electric.Postgres.Configuration, :configure_tables_for_replication!, [publication_name]} + inspector = + {Electric.Postgres.Inspector.EtsInspector, + server: Electric.Postgres.Inspector.EtsInspector} + shape_cache = {Electric.ShapeCache, storage: storage, + inspector: inspector, prepare_tables_fn: prepare_tables_fn, log_producer: Electric.Replication.ShapeLogCollector, persistent_kv: persistent_kv, registry: Registry.ShapeChanges} - inspector = - {Electric.Postgres.Inspector.EtsInspector, - server: Electric.Postgres.Inspector.EtsInspector} - core_processes = [ {Registry, name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()}, diff --git a/packages/sync-service/lib/electric/persistent_kv/mock.ex b/packages/sync-service/lib/electric/persistent_kv/mock.ex new file mode 100644 index 0000000000..c957b099f4 --- /dev/null +++ b/packages/sync-service/lib/electric/persistent_kv/mock.ex @@ -0,0 +1,20 @@ +defmodule Electric.PersistentKV.Mock do + defstruct [] + + @type t() :: %__MODULE__{} + + @spec new() :: t() + def new() do + %__MODULE__{} + end + + defimpl Electric.PersistentKV do + def set(_memory, _key, _value) do + :ok + end + + def get(_memory, _key) do + {:ok, 42} + end + end +end diff --git a/packages/sync-service/lib/electric/postgres/inspector.ex b/packages/sync-service/lib/electric/postgres/inspector.ex index 5657de478d..4978cdb834 100644 --- a/packages/sync-service/lib/electric/postgres/inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector.ex @@ -17,6 +17,8 @@ defmodule Electric.Postgres.Inspector do @callback load_column_info(relation(), opts :: term()) :: {:ok, [column_info()]} | :table_not_found + @callback clean_column_info(relation(), opts :: term()) :: true + @type inspector :: {module(), opts :: term()} @doc """ @@ -25,6 +27,12 @@ defmodule Electric.Postgres.Inspector do @spec load_column_info(relation(), inspector()) :: {:ok, [column_info()]} | :table_not_found def load_column_info(relation, {module, opts}), do: module.load_column_info(relation, opts) + @doc """ + Clean up column information about a given table using a provided inspector. + """ + @spec clean_column_info(relation(), inspector()) :: true + def clean_column_info(relation, {module, opts}), do: module.clean_column_info(relation, opts) + @doc """ Get columns that should be considered a PK for table. If the table has no PK, then we're considering all columns as identifying. diff --git a/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex index 69b8c9e646..7c861a5a4c 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex @@ -36,4 +36,6 @@ defmodule Electric.Postgres.Inspector.DirectInspector do {:ok, rows} end end + + def clean_column_info(_, _), do: true end diff --git a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex index c56bc7ca01..bf223f3079 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex @@ -29,6 +29,13 @@ defmodule Electric.Postgres.Inspector.EtsInspector do end end + @impl Electric.Postgres.Inspector + def clean_column_info(table, opts_or_state) do + ets_table = Access.get(opts_or_state, :pg_info_table, @default_pg_info_table) + + :ets.delete(ets_table, {table, :columns}) + end + ## Internal API @impl GenServer diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index ec9698bf98..c26808bad3 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -17,6 +17,7 @@ defmodule Electric.ShapeCacheBehaviour do {shape_id(), current_snapshot_offset :: LogOffset.t()} @callback list_active_shapes(opts :: keyword()) :: [{shape_id(), shape_def(), xmin()}] + @callback get_relation(Messages.relation_id(), opts :: keyword()) :: Changes.Relation.t() | nil @callback await_snapshot_start(shape_id(), opts :: keyword()) :: :started | {:error, term()} @callback handle_truncate(shape_id(), keyword()) :: :ok @callback clean_shape(shape_id(), keyword()) :: :ok @@ -55,6 +56,8 @@ defmodule Electric.ShapeCache do default: Electric.Replication.ShapeLogCollector ], storage: [type: :mod_arg, required: true], + inspector: [type: :mod_arg, required: true], + shape_status: [type: :atom, default: Electric.ShapeCache.ShapeStatus], registry: [type: {:or, [:atom, :pid]}, required: true], # NimbleOptions has no "implementation of protocol" type persistent_kv: [type: :any, required: true], @@ -81,9 +84,10 @@ defmodule Electric.ShapeCache do @impl Electric.ShapeCacheBehaviour def get_or_create_shape_id(shape, opts \\ []) do table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + shape_status = Access.get(opts, :shape_status, ShapeStatus) # Get or create the shape ID and fire a snapshot if necessary - if shape_state = ShapeStatus.existing_shape(table, shape) do + if shape_state = shape_status.existing_shape(table, shape) do shape_state else server = Access.get(opts, :server, __MODULE__) @@ -96,8 +100,9 @@ defmodule Electric.ShapeCache do :ok | {:error, term()} def update_shape_latest_offset(shape_id, latest_offset, opts) do meta_table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + shape_status = Access.get(opts, :shape_status, ShapeStatus) - if ShapeStatus.set_latest_offset(meta_table, shape_id, latest_offset) do + if shape_status.set_latest_offset(meta_table, shape_id, latest_offset) do :ok else Logger.warning("Tried to update latest offset for shape #{shape_id} which doesn't exist") @@ -108,14 +113,17 @@ defmodule Electric.ShapeCache do @impl Electric.ShapeCacheBehaviour def list_active_shapes(opts \\ []) do table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + shape_status = Access.get(opts, :shape_status, ShapeStatus) - ShapeStatus.list_active_shapes(table) + shape_status.list_active_shapes(table) end + @impl Electric.ShapeCacheBehaviour @spec get_relation(Messages.relation_id(), opts :: keyword()) :: Changes.Relation.t() | nil def get_relation(relation_id, opts) do meta_table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) - ShapeStatus.get_relation(meta_table, relation_id) + shape_status = Access.get(opts, :shape_status, ShapeStatus) + shape_status.get_relation(meta_table, relation_id) end @impl Electric.ShapeCacheBehaviour @@ -136,8 +144,9 @@ defmodule Electric.ShapeCache do @spec await_snapshot_start(shape_id(), keyword()) :: :started | {:error, term()} def await_snapshot_start(shape_id, opts \\ []) when is_binary(shape_id) do table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + shape_status = Access.get(opts, :shape_status, ShapeStatus) - if ShapeStatus.snapshot_xmin?(table, shape_id) do + if shape_status.snapshot_xmin?(table, shape_id) do :started else server = Access.get(opts, :server, __MODULE__) @@ -148,8 +157,9 @@ defmodule Electric.ShapeCache do @impl Electric.ShapeCacheBehaviour def has_shape?(shape_id, opts \\ []) do table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + shape_status = Access.get(opts, :shape_status, ShapeStatus) - if ShapeStatus.existing_shape(table, shape_id) do + if shape_status.existing_shape(table, shape_id) do true else server = Access.get(opts, :server, __MODULE__) @@ -160,7 +170,7 @@ defmodule Electric.ShapeCache do @impl GenStage def init(opts) do {:ok, persistent_state} = - ShapeStatus.initialise( + opts.shape_status.initialise( persistent_kv: opts.persistent_kv, meta_table: opts.shape_meta_table ) @@ -168,7 +178,9 @@ defmodule Electric.ShapeCache do state = %{ name: opts.name, storage: opts.storage, + inspector: opts.inspector, shape_meta_table: opts.shape_meta_table, + shape_status: opts.shape_status, awaiting_snapshot_start: %{}, db_pool: opts.db_pool, persistent_state: persistent_state, @@ -186,7 +198,7 @@ defmodule Electric.ShapeCache do @impl GenStage def handle_events(relations, _from, state) do - %{persistent_state: persistent_state} = state + %{persistent_state: persistent_state, shape_status: shape_status} = state # NOTE: [@magnetised] this manages cleaning up shapes after a relation # change. it's not doing it in a consistent way, as the shape consumers # could still be receiving txns after a relation message that requires them @@ -209,10 +221,10 @@ defmodule Electric.ShapeCache do # replication stream, it knows that it can just continue. Enum.each(relations, fn relation -> - old_rel = ShapeStatus.get_relation(persistent_state, relation.id) + old_rel = shape_status.get_relation(persistent_state, relation.id) if is_nil(old_rel) || old_rel != relation do - :ok = ShapeStatus.store_relation(persistent_state, relation) + :ok = shape_status.store_relation(persistent_state, relation) end if !is_nil(old_rel) && old_rel != relation do @@ -222,23 +234,35 @@ defmodule Electric.ShapeCache do # Fetch all shapes that are affected by the relation change and clean them up persistent_state - |> ShapeStatus.list_active_shapes() + |> shape_status.list_active_shapes() |> Enum.filter(&Shape.is_affected_by_relation_change?(&1, change)) |> Enum.map(&elem(&1, 0)) |> Enum.each(fn shape_id -> clean_up_shape(state, shape_id) end) end + + if old_rel != relation do + # Clean column information from ETS + # also when old_rel is nil because column info + # may already be loaded into ETS by the initial shape request + {inspector, inspector_opts} = state.inspector + # if the old relation exists we use that one + # because the table name may have changed in the new relation + # if there is no old relation, we use the new one + rel = old_rel || relation + inspector.clean_column_info({rel.schema, rel.table}, inspector_opts) + end end) {:noreply, [], state} end @impl GenStage - def handle_call({:create_or_wait_shape_id, shape}, _from, state) do + def handle_call({:create_or_wait_shape_id, shape}, _from, %{shape_status: shape_status} = state) do {{shape_id, latest_offset}, state} = - if shape_state = ShapeStatus.existing_shape(state.persistent_state, shape) do + if shape_state = shape_status.existing_shape(state.persistent_state, shape) do {shape_state, state} else - {:ok, shape_id} = ShapeStatus.add_shape(state.persistent_state, shape) + {:ok, shape_id} = shape_status.add_shape(state.persistent_state, shape) {:ok, _snapshot_xmin, latest_offset} = start_shape(shape_id, shape, state) {{shape_id, latest_offset}, state} @@ -249,12 +273,12 @@ defmodule Electric.ShapeCache do {:reply, {shape_id, latest_offset}, [], state} end - def handle_call({:await_snapshot_start, shape_id}, from, state) do + def handle_call({:await_snapshot_start, shape_id}, from, %{shape_status: shape_status} = state) do cond do not is_known_shape_id?(state, shape_id) -> {:reply, {:error, :unknown}, [], state} - ShapeStatus.snapshot_xmin?(state.persistent_state, shape_id) -> + shape_status.snapshot_xmin?(state.persistent_state, shape_id) -> {:reply, :started, [], state} true -> @@ -264,8 +288,8 @@ defmodule Electric.ShapeCache do end end - def handle_call({:wait_shape_id, shape_id}, _from, state) do - {:reply, !is_nil(ShapeStatus.existing_shape(state.persistent_state, shape_id)), [], state} + def handle_call({:wait_shape_id, shape_id}, _from, %{shape_status: shape_status} = state) do + {:reply, !is_nil(shape_status.existing_shape(state.persistent_state, shape_id)), [], state} end def handle_call({:truncate, shape_id}, _from, state) do @@ -288,8 +312,8 @@ defmodule Electric.ShapeCache do end @impl GenStage - def handle_cast({:snapshot_xmin_known, shape_id, xmin}, state) do - unless ShapeStatus.set_snapshot_xmin(state.persistent_state, shape_id, xmin) do + def handle_cast({:snapshot_xmin_known, shape_id, xmin}, %{shape_status: shape_status} = state) do + unless shape_status.set_snapshot_xmin(state.persistent_state, shape_id, xmin) do Logger.warning( "Got snapshot information for a #{shape_id}, that shape id is no longer valid. Ignoring." ) @@ -324,11 +348,11 @@ defmodule Electric.ShapeCache do defp clean_up_shape(state, shape_id) do Electric.ShapeCache.ShapeSupervisor.stop_shape_consumer(shape_id) - ShapeStatus.remove_shape(state.persistent_state, shape_id) + state.shape_status.remove_shape(state.persistent_state, shape_id) end defp is_known_shape_id?(state, shape_id) do - if ShapeStatus.existing_shape(state.persistent_state, shape_id) do + if state.shape_status.existing_shape(state.persistent_state, shape_id) do true else false @@ -343,7 +367,7 @@ defmodule Electric.ShapeCache do defp recover_shapes(state) do state.persistent_state - |> ShapeStatus.list_shapes() + |> state.shape_status.list_shapes() |> Enum.each(fn {shape_id, shape} -> {:ok, _snapshot_xmin, _latest_offset} = start_shape(shape_id, shape, state) end) @@ -368,7 +392,7 @@ defmodule Electric.ShapeCache do {:ok, snapshot_xmin, latest_offset} = Shapes.Consumer.initial_state(consumer) :ok = - ShapeStatus.initialise_shape( + state.shape_status.initialise_shape( state.persistent_state, shape_id, snapshot_xmin, diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status.ex b/packages/sync-service/lib/electric/shape_cache/shape_status.ex index d16bbd414d..53dd4e7482 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status.ex @@ -1,9 +1,29 @@ +defmodule Electric.ShapeCache.ShapeStatusBehaviour do + @moduledoc """ + Behaviour defining the ShapeStatus functions to be used in mocks + """ + alias Electric.Shapes.Shape + alias Electric.ShapeCache.ShapeStatus + alias Electric.Postgres.LogicalReplication.Messages + alias Electric.Replication.Changes.Relation + + @callback initialise(ShapeStatus.options()) :: {:ok, ShapeStatus.t()} | {:error, term()} + @callback list_shapes(ShapeStatus.t()) :: [{ShapeStatus.shape_id(), Shape.t()}] + @callback list_active_shapes(opts :: keyword()) :: [ + {ShapeStatus.shape_id(), ShapeStatus.shape_def(), ShapeStatus.xmin()} + ] + @callback get_relation(ShapeStatus.t(), Messages.relation_id()) :: Relation.t() | nil + @callback store_relation(ShapeStatus.t(), Relation.t()) :: :ok + @callback remove_shape(ShapeStatus.t(), ShapeStatus.shape_id()) :: + {:ok, ShapeStatus.t()} | {:error, term()} +end + defmodule Electric.ShapeCache.ShapeStatus do @moduledoc """ Keeps track of shape state. Serializes just enough to some persistent storage to bootstrap the - ShapeCache by writing the mapping of `shape_id => %Shape{}` to + ShapeCache by writing the mapping of `shape_id => %Shape{}` to storage. The shape cache then loads this and starts processes (storage and consumer) diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 513d794027..25d33cf523 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -1,20 +1,42 @@ defmodule Electric.Replication.ShapeLogCollectorTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false + import ExUnit.CaptureLog import Mox alias Electric.Postgres.Lsn + alias Electric.Shapes.Shape alias Electric.Replication.ShapeLogCollector - alias Electric.Replication.Changes.Transaction + alias Electric.Replication.Changes.{Transaction, Relation} alias Electric.Replication.Changes alias Electric.Replication.LogOffset alias Support.Mock + alias Support.StubInspector @moduletag :capture_log + # Define mocks + Mox.defmock(MockShapeStatus, for: Electric.ShapeCache.ShapeStatusBehaviour) + Mox.defmock(MockShapeCache, for: Electric.ShapeCacheBehaviour) + Mox.defmock(MockInspector, for: Electric.Postgres.Inspector) + Mox.defmock(MockStorage, for: Electric.ShapeCache.Storage) + setup :verify_on_exit! + @shape Shape.new!("public.test_table", + inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) + ) + + @similar_shape Shape.new!("public.test_table", + inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]), + where: "id > 5" + ) + + @other_shape Shape.new!("public.other_table", + inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) + ) + describe "store_transaction/2" do setup do # Start a test Registry @@ -38,7 +60,28 @@ defmodule Electric.Replication.ShapeLogCollectorTest do {id, consumer} end) - %{server: pid, registry: registry_name, consumers: consumers} + MockShapeStatus + |> expect(:initialise, 1, fn opts -> {:ok, opts} end) + |> expect(:list_shapes, 1, fn _ -> [] end) + # allow the ShapeCache to call this mock + |> allow(self(), fn -> GenServer.whereis(Electric.ShapeCache) end) + + # We need a ShapeCache process because it is a GenStage consumer + # that handles the Relation events produced by ShapeLogCollector + shape_cache_opts = + [ + storage: {MockStorage, []}, + inspector: {MockInspector, []}, + shape_status: MockShapeStatus, + persistent_kv: Electric.PersistentKV.Memory.new!(), + prepare_tables_fn: fn _, _ -> {:ok, [:ok]} end, + log_producer: __MODULE__.ShapeLogCollector, + registry: registry_name + ] + + {:ok, shape_cache_pid} = Electric.ShapeCache.start_link(shape_cache_opts) + + %{server: pid, registry: registry_name, consumers: consumers, shape_cache: shape_cache_pid} end test "broadcasts keyed changes to consumers", ctx do @@ -80,5 +123,177 @@ defmodule Electric.Replication.ShapeLogCollectorTest do assert xids == [xid] end + + test "stores relation if it is not known", %{server: server, shape_cache: shape_cache} do + relation_id = "rel1" + + rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [] + } + + pid = self() + + MockShapeStatus + |> expect(:get_relation, 1, fn _, ^relation_id -> nil end) + |> expect(:store_relation, 1, fn _, ^rel -> :ok end) + |> allow(self(), server) + + MockInspector + |> expect(:clean_column_info, 1, fn {"public", "test_table"}, _ -> + send(pid, :cleaned) + true + end) + |> allow(self(), shape_cache) + + assert :ok = ShapeLogCollector.handle_relation_msg(rel, server) + assert_receive :cleaned + end + + test "does not clean shapes if relation didn't change", %{ + server: server, + shape_cache: shape_cache + } do + relation_id = "rel1" + + rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [] + } + + pid = self() + + MockShapeStatus + |> expect(:get_relation, 1, fn _, ^relation_id -> rel end) + |> expect(:remove_shape, 0, fn _, _ -> :ok end) + + MockInspector + |> expect(:clean_column_info, 0, fn _, _ -> + send(pid, :cleaned) + true + end) + |> allow(self(), shape_cache) + + assert :ok = ShapeLogCollector.handle_relation_msg(rel, server) + refute_receive :cleaned + end + + test "cleans shapes affected by table renaming and logs a warning", %{ + server: server, + shape_cache: shape_cache + } do + relation_id = "rel1" + + shape_id1 = "shape1" + shape1 = @shape + + shape_id2 = "shape2" + shape2 = @similar_shape + + shape_id3 = "shape3" + shape3 = @other_shape + + # doesn't matter, isn't used for this test + xmin = 100 + + old_rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [] + } + + new_rel = %Relation{ + id: relation_id, + schema: "public", + table: "renamed_test_table", + columns: [] + } + + pid = self() + + MockShapeStatus + |> expect(:get_relation, 1, fn _, ^relation_id -> old_rel end) + |> expect(:store_relation, 1, fn _, ^new_rel -> :ok end) + |> expect(:list_active_shapes, 1, fn _ -> + [{shape_id1, shape1, xmin}, {shape_id2, shape2, xmin}, {shape_id3, shape3, xmin}] + end) + |> expect(:remove_shape, 1, fn state, ^shape_id1 -> {:ok, state} end) + |> expect(:remove_shape, 1, fn state, ^shape_id2 -> {:ok, state} end) + + MockInspector + |> expect(:clean_column_info, 1, fn {"public", "test_table"}, _ -> + send(pid, :cleaned) + true + end) + |> allow(self(), shape_cache) + + log = + capture_log(fn -> + ShapeLogCollector.handle_relation_msg(new_rel, server) + # assert here such that we capture the log until we receive this message + assert_receive :cleaned + end) + + assert log =~ "Schema for the table public.test_table changed" + end + + test "cleans shapes affected by a relation change", %{ + server: server, + shape_cache: shape_cache + } do + relation_id = "rel1" + + shape_id1 = "shape1" + shape1 = @shape + + shape_id2 = "shape2" + shape2 = @similar_shape + + shape_id3 = "shape3" + shape3 = @other_shape + + # doesn't matter, isn't used for this test + xmin = 100 + + old_rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [{"id", "float4"}] + } + + new_rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [{"id", "int8"}] + } + + pid = self() + + MockShapeStatus + |> expect(:get_relation, 1, fn _, ^relation_id -> old_rel end) + |> expect(:store_relation, 1, fn _, ^new_rel -> :ok end) + |> expect(:list_active_shapes, fn _ -> + [{shape_id1, shape1, xmin}, {shape_id2, shape2, xmin}, {shape_id3, shape3, xmin}] + end) + |> expect(:remove_shape, 1, fn state, ^shape_id1 -> {:ok, state} end) + |> expect(:remove_shape, 1, fn state, ^shape_id2 -> {:ok, state} end) + + MockInspector + |> expect(:clean_column_info, 1, fn {"public", "test_table"}, _ -> + send(pid, :cleaned) + true + end) + |> allow(self(), shape_cache) + + assert :ok = ShapeLogCollector.handle_relation_msg(new_rel, server) + assert_receive :cleaned + end end end diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 3184c841ca..d765c8414a 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -46,6 +46,11 @@ defmodule Electric.ShapeCacheTest do @prepare_tables_noop {__MODULE__, :prepare_tables_noop, []} + @stub_inspector StubInspector.new([ + %{name: "id", type: "int8", pk_position: 0}, + %{name: "value", type: "text"} + ]) + describe "get_or_create_shape_id/2" do setup [ :with_in_memory_storage, @@ -56,7 +61,8 @@ defmodule Electric.ShapeCacheTest do ] setup ctx do - with_shape_cache(ctx, + with_shape_cache( + Map.put(ctx, :inspector, @stub_inspector), create_snapshot_fn: fn _, _, _, _, _ -> nil end, prepare_tables_fn: @prepare_tables_noop ) @@ -84,7 +90,7 @@ defmodule Electric.ShapeCacheTest do test "creates initial snapshot if one doesn't exist", %{storage: storage} = ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -104,7 +110,7 @@ defmodule Electric.ShapeCacheTest do test_pid = self() %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: fn nil, [{"public", "items"}] -> send(test_pid, {:called, :prepare_tables_fn}) end, @@ -132,7 +138,7 @@ defmodule Electric.ShapeCacheTest do test_pid = self() %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> send(test_pid, {:called, :create_snapshot_fn}) @@ -179,7 +185,9 @@ defmodule Electric.ShapeCacheTest do shape_id = "foo" %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), prepare_tables_fn: @prepare_tables_noop) + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), + prepare_tables_fn: @prepare_tables_noop + ) shape_meta_table = Access.get(opts, :shape_meta_table) @@ -368,14 +376,16 @@ defmodule Electric.ShapeCacheTest do test "returns empty list initially", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), prepare_tables_fn: @prepare_tables_noop) + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), + prepare_tables_fn: @prepare_tables_noop + ) assert ShapeCache.list_active_shapes(opts) == [] end test "lists the shape as active once there is a snapshot", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -393,7 +403,7 @@ defmodule Electric.ShapeCacheTest do test_pid = self() %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> ref = make_ref() @@ -429,7 +439,7 @@ defmodule Electric.ShapeCacheTest do test "returns true for known shape id", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 100}) @@ -444,7 +454,7 @@ defmodule Electric.ShapeCacheTest do test "works with slow snapshot generation", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _, _, _ -> Process.sleep(100) @@ -468,7 +478,7 @@ defmodule Electric.ShapeCacheTest do test "returns :started for snapshots that have started", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 100}) @@ -487,7 +497,7 @@ defmodule Electric.ShapeCacheTest do storage = Storage.for_shape(shape_id, ctx.storage) %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -505,7 +515,7 @@ defmodule Electric.ShapeCacheTest do test_pid = self() %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> ref = make_ref() @@ -553,7 +563,7 @@ defmodule Electric.ShapeCacheTest do end) %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -584,7 +594,7 @@ defmodule Electric.ShapeCacheTest do test_pid = self() %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, _storage -> ref = make_ref() @@ -625,7 +635,7 @@ defmodule Electric.ShapeCacheTest do test "cleans up shape data and rotates the shape id", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -677,7 +687,7 @@ defmodule Electric.ShapeCacheTest do test "cleans up shape data and rotates the shape id", ctx do %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -723,7 +733,7 @@ defmodule Electric.ShapeCacheTest do shape_id = "foo" %{shape_cache_opts: opts} = - with_shape_cache(Map.put(ctx, :pool, nil), + with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) @@ -753,7 +763,7 @@ defmodule Electric.ShapeCacheTest do setup(ctx, do: - with_shape_cache(ctx, + with_shape_cache(Map.put(ctx, :inspector, @stub_inspector), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, @snapshot_xmin}) @@ -841,7 +851,7 @@ defmodule Electric.ShapeCacheTest do Process.sleep(1) with_cub_db_storage(context) - with_shape_cache(context, + with_shape_cache(Map.put(context, :inspector, @stub_inspector), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, @snapshot_xmin}) @@ -890,7 +900,7 @@ defmodule Electric.ShapeCacheTest do ] setup(ctx) do - with_shape_cache(ctx, + with_shape_cache(Map.put(ctx, :inspector, @stub_inspector), prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, @snapshot_xmin}) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 5dd87eb810..6bf4bbe5c6 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -370,7 +370,10 @@ defmodule Electric.Shapes.ConsumerTest do %{shape_cache_opts: shape_cache_opts} = Support.ComponentSetup.with_shape_cache( - Map.merge(ctx, %{pool: nil}), + Map.merge(ctx, %{ + pool: nil, + inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) + }), log_producer: producer, prepare_tables_fn: fn _, _ -> :ok end, create_snapshot_fn: fn parent, shape_id, _shape, _, storage -> diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 8fe497a362..bd2f66dc52 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -51,6 +51,7 @@ defmodule Support.ComponentSetup do [ name: server, shape_meta_table: shape_meta_table, + inspector: ctx.inspector, storage: ctx.storage, db_pool: ctx.pool, persistent_kv: ctx.persistent_kv, diff --git a/packages/sync-service/test/support/stub_inspector.ex b/packages/sync-service/test/support/stub_inspector.ex index bbf9e0e3a2..545d25d266 100644 --- a/packages/sync-service/test/support/stub_inspector.ex +++ b/packages/sync-service/test/support/stub_inspector.ex @@ -19,4 +19,7 @@ defmodule Support.StubInspector do |> Map.fetch!(relation) |> then(&load_column_info(relation, &1)) end + + @impl true + def clean_column_info(_, _), do: true end