From 538d99faf588140d515f2b938b092190c0a3428e Mon Sep 17 00:00:00 2001 From: Kevin Date: Mon, 2 Sep 2024 15:37:28 +0200 Subject: [PATCH] chore: remove list_active_shapes (#1602) This PR removes the `list_active_shapes` function and replaces its usages by `list_shapes`. Also renames the `meta_table` field in `ShapeStatus` to `shape_meta_table` in order to be consistent with the naming throughout the rest of the codebase. --- .changeset/perfect-moons-end.md | 5 ++ .../sync-service/lib/electric/shape_cache.ex | 13 ++-- .../lib/electric/shape_cache/shape_status.ex | 63 ++++++++----------- .../sync-service/lib/electric/shapes/shape.ex | 2 +- .../replication/shape_log_collector_test.exs | 21 +++---- .../shape_cache/shape_status_test.exs | 6 +- .../test/electric/shape_cache_test.exs | 26 +++++--- .../test/support/component_setup.ex | 2 +- 8 files changed, 67 insertions(+), 71 deletions(-) create mode 100644 .changeset/perfect-moons-end.md diff --git a/.changeset/perfect-moons-end.md b/.changeset/perfect-moons-end.md new file mode 100644 index 0000000000..f4f1fb0b9c --- /dev/null +++ b/.changeset/perfect-moons-end.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Remove list_active_shapes and replace it by list_shapes. diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index c26808bad3..9640176a73 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -16,8 +16,8 @@ defmodule Electric.ShapeCacheBehaviour do @callback get_or_create_shape_id(shape_def(), opts :: keyword()) :: {shape_id(), current_snapshot_offset :: LogOffset.t()} - @callback list_active_shapes(opts :: keyword()) :: [{shape_id(), shape_def(), xmin()}] @callback get_relation(Messages.relation_id(), opts :: keyword()) :: Changes.Relation.t() | nil + @callback list_shapes(Electric.ShapeCache.ShapeStatus.t()) :: [{shape_id(), Shape.t()}] @callback await_snapshot_start(shape_id(), opts :: keyword()) :: :started | {:error, term()} @callback handle_truncate(shape_id(), keyword()) :: :ok @callback clean_shape(shape_id(), keyword()) :: :ok @@ -111,11 +111,10 @@ defmodule Electric.ShapeCache do end @impl Electric.ShapeCacheBehaviour - def list_active_shapes(opts \\ []) do - table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + @spec list_shapes(Electric.ShapeCache.ShapeStatus.t()) :: [{shape_id(), Shape.t()}] + def list_shapes(opts) do shape_status = Access.get(opts, :shape_status, ShapeStatus) - - shape_status.list_active_shapes(table) + shape_status.list_shapes(opts) end @impl Electric.ShapeCacheBehaviour @@ -172,7 +171,7 @@ defmodule Electric.ShapeCache do {:ok, persistent_state} = opts.shape_status.initialise( persistent_kv: opts.persistent_kv, - meta_table: opts.shape_meta_table + shape_meta_table: opts.shape_meta_table ) state = %{ @@ -234,7 +233,7 @@ defmodule Electric.ShapeCache do # Fetch all shapes that are affected by the relation change and clean them up persistent_state - |> shape_status.list_active_shapes() + |> shape_status.list_shapes() |> Enum.filter(&Shape.is_affected_by_relation_change?(&1, change)) |> Enum.map(&elem(&1, 0)) |> Enum.each(fn shape_id -> clean_up_shape(state, shape_id) end) diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status.ex b/packages/sync-service/lib/electric/shape_cache/shape_status.ex index 53dd4e7482..abd44ed73c 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status.ex @@ -9,9 +9,6 @@ defmodule Electric.ShapeCache.ShapeStatusBehaviour do @callback initialise(ShapeStatus.options()) :: {:ok, ShapeStatus.t()} | {:error, term()} @callback list_shapes(ShapeStatus.t()) :: [{ShapeStatus.shape_id(), Shape.t()}] - @callback list_active_shapes(opts :: keyword()) :: [ - {ShapeStatus.shape_id(), ShapeStatus.shape_def(), ShapeStatus.xmin()} - ] @callback get_relation(ShapeStatus.t(), Messages.relation_id()) :: Relation.t() | nil @callback store_relation(ShapeStatus.t(), Relation.t()) :: :ok @callback remove_shape(ShapeStatus.t(), ShapeStatus.shape_id()) :: @@ -42,18 +39,18 @@ defmodule Electric.ShapeCache.ShapeStatus do @schema NimbleOptions.new!( persistent_kv: [type: :any, required: true], - meta_table: [type: {:or, [:atom, :reference]}, required: true], + shape_meta_table: [type: {:or, [:atom, :reference]}, required: true], root: [type: :string, default: "./shape_cache"] ) - defstruct [:persistent_kv, :root, :meta_table] + defstruct [:persistent_kv, :root, :shape_meta_table] @type shape_id() :: Electric.ShapeCache.shape_id() @type table() :: atom() | reference() @type t() :: %__MODULE__{ persistent_kv: PersistentKV.t(), root: String.t(), - meta_table: table() + shape_meta_table: table() } @type option() :: unquote(NimbleOptions.option_typespec(@schema)) @type options() :: [option()] @@ -69,7 +66,7 @@ defmodule Electric.ShapeCache.ShapeStatus do def initialise(opts) do with {:ok, config} <- NimbleOptions.validate(opts, @schema), {:ok, kv_backend} <- Access.fetch(config, :persistent_kv), - {:ok, table_name} = Access.fetch(config, :meta_table) do + {:ok, table_name} = Access.fetch(config, :shape_meta_table) do persistent_kv = PersistentKV.Serialized.new!( backend: kv_backend, @@ -81,7 +78,7 @@ defmodule Electric.ShapeCache.ShapeStatus do state = struct( __MODULE__, - Keyword.merge(config, persistent_kv: persistent_kv, meta_table: meta_table) + Keyword.merge(config, persistent_kv: persistent_kv, shape_meta_table: meta_table) ) load(state) @@ -97,7 +94,7 @@ defmodule Electric.ShapeCache.ShapeStatus do true = :ets.insert_new( - state.meta_table, + state.shape_meta_table, [ {{@shape_hash_lookup, hash}, shape_id}, {{@shape_meta_data, shape_id}, shape, nil, offset} @@ -111,7 +108,7 @@ defmodule Electric.ShapeCache.ShapeStatus do @spec list_shapes(t()) :: [{shape_id(), Shape.t()}] def list_shapes(state) do - :ets.select(state.meta_table, [ + :ets.select(state.shape_meta_table, [ { {{@shape_meta_data, :"$1"}, :"$2", :_, :_}, [true], @@ -120,28 +117,18 @@ defmodule Electric.ShapeCache.ShapeStatus do ]) end - def list_active_shapes(%__MODULE__{meta_table: table}) do - list_active_shapes(table) - end - - def list_active_shapes(table) when is_atom(table) or is_reference(table) do - :ets.select(table, [ - { - {{@shape_meta_data, :"$1"}, :"$2", :"$3", :_}, - [{:"=/=", :"$3", nil}], - [{{:"$1", :"$2", :"$3"}}] - } - ]) - end - @spec remove_shape(t(), shape_id()) :: {:ok, t()} | {:error, term()} def remove_shape(state, shape_id) do try do shape = - :ets.lookup_element(state.meta_table, {@shape_meta_data, shape_id}, @shape_meta_shape_pos) + :ets.lookup_element( + state.shape_meta_table, + {@shape_meta_data, shape_id}, + @shape_meta_shape_pos + ) :ets.select_delete( - state.meta_table, + state.shape_meta_table, [ {{{@shape_meta_data, shape_id}, :_, :_, :_}, [], [true]}, {{{@shape_hash_lookup, :_}, shape_id}, [], [true]} @@ -162,7 +149,7 @@ defmodule Electric.ShapeCache.ShapeStatus do end @spec existing_shape(t(), shape_id() | Shape.t()) :: nil | {shape_id(), LogOffset.t()} - def existing_shape(%__MODULE__{meta_table: table}, shape_or_id) do + def existing_shape(%__MODULE__{shape_meta_table: table}, shape_or_id) do existing_shape(table, shape_or_id) end @@ -188,7 +175,7 @@ defmodule Electric.ShapeCache.ShapeStatus do end def initialise_shape(state, shape_id, snapshot_xmin, latest_offset) do - :ets.update_element(state.meta_table, {@shape_meta_data, shape_id}, [ + :ets.update_element(state.shape_meta_table, {@shape_meta_data, shape_id}, [ {@shape_meta_xmin_pos, snapshot_xmin}, {@shape_meta_latest_offset_pos, latest_offset} ]) @@ -197,12 +184,12 @@ defmodule Electric.ShapeCache.ShapeStatus do end def set_snapshot_xmin(state, shape_id, snapshot_xmin) do - :ets.update_element(state.meta_table, {@shape_meta_data, shape_id}, [ + :ets.update_element(state.shape_meta_table, {@shape_meta_data, shape_id}, [ {@shape_meta_xmin_pos, snapshot_xmin} ]) end - def set_latest_offset(%__MODULE__{meta_table: table} = _state, shape_id, latest_offset) do + def set_latest_offset(%__MODULE__{shape_meta_table: table} = _state, shape_id, latest_offset) do set_latest_offset(table, shape_id, latest_offset) end @@ -212,7 +199,7 @@ defmodule Electric.ShapeCache.ShapeStatus do ]) end - def latest_offset!(%__MODULE__{meta_table: table} = _state, shape_id) do + def latest_offset!(%__MODULE__{shape_meta_table: table} = _state, shape_id) do latest_offset(table, shape_id) end @@ -224,7 +211,7 @@ defmodule Electric.ShapeCache.ShapeStatus do ) end - def latest_offset(%__MODULE__{meta_table: table} = _state, shape_id) do + def latest_offset(%__MODULE__{shape_meta_table: table} = _state, shape_id) do latest_offset(table, shape_id) end @@ -238,7 +225,7 @@ defmodule Electric.ShapeCache.ShapeStatus do end) end - def snapshot_xmin(%__MODULE__{meta_table: table} = _state, shape_id) do + def snapshot_xmin(%__MODULE__{shape_meta_table: table} = _state, shape_id) do snapshot_xmin(table, shape_id) end @@ -252,7 +239,7 @@ defmodule Electric.ShapeCache.ShapeStatus do end) end - def snapshot_xmin?(%__MODULE__{meta_table: table} = _state, shape_id) do + def snapshot_xmin?(%__MODULE__{shape_meta_table: table} = _state, shape_id) do snapshot_xmin?(table, shape_id) end @@ -263,7 +250,7 @@ defmodule Electric.ShapeCache.ShapeStatus do end end - def get_relation(%__MODULE__{meta_table: table} = _state, relation_id) do + def get_relation(%__MODULE__{shape_meta_table: table} = _state, relation_id) do get_relation(table, relation_id) end @@ -274,7 +261,7 @@ defmodule Electric.ShapeCache.ShapeStatus do end end - def store_relation(%__MODULE__{meta_table: meta_table} = state, %Relation{} = relation) do + def store_relation(%__MODULE__{shape_meta_table: meta_table} = state, %Relation{} = relation) do with :ok <- store_relation(meta_table, relation) do save(state) end @@ -332,7 +319,7 @@ defmodule Electric.ShapeCache.ShapeStatus do defp load(state) do with {:ok, %{shapes: shapes, relations: relations}} <- load_shapes(state) do :ets.insert( - state.meta_table, + state.shape_meta_table, Enum.concat([ Enum.flat_map(shapes, fn {shape_id, shape} -> hash = Shape.hash(shape) @@ -367,7 +354,7 @@ defmodule Electric.ShapeCache.ShapeStatus do end end - defp list_relations(%__MODULE__{meta_table: meta_table}) do + defp list_relations(%__MODULE__{shape_meta_table: meta_table}) do :ets.select(meta_table, [ { {{@relation_data, :"$1"}, :"$2"}, diff --git a/packages/sync-service/lib/electric/shapes/shape.ex b/packages/sync-service/lib/electric/shapes/shape.ex index 840509e1e3..e3ada4f28f 100644 --- a/packages/sync-service/lib/electric/shapes/shape.ex +++ b/packages/sync-service/lib/electric/shapes/shape.ex @@ -183,7 +183,7 @@ defmodule Electric.Shapes.Shape do shape_matches?(shape, schema, table) end - defp shape_matches?({_, %__MODULE__{root_table: {schema, table}}, _}, schema, table), do: true + defp shape_matches?({_, %__MODULE__{root_table: {schema, table}}}, schema, table), do: true defp shape_matches?(_, _, _), do: false @spec from_json_safe!(t()) :: json_safe() diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 25d33cf523..9c187b1f2a 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -38,7 +38,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do ) describe "store_transaction/2" do - setup do + setup ctx do # Start a test Registry registry_name = Module.concat(__MODULE__, Registry) start_link_supervised!({Registry, keys: :duplicate, name: registry_name}) @@ -61,18 +61,21 @@ defmodule Electric.Replication.ShapeLogCollectorTest do end) MockShapeStatus - |> expect(:initialise, 1, fn opts -> {:ok, opts} end) + |> expect(:initialise, 1, fn opts -> Electric.ShapeCache.ShapeStatus.initialise(opts) end) |> expect(:list_shapes, 1, fn _ -> [] end) # allow the ShapeCache to call this mock |> allow(self(), fn -> GenServer.whereis(Electric.ShapeCache) end) # We need a ShapeCache process because it is a GenStage consumer # that handles the Relation events produced by ShapeLogCollector + shape_meta_table = :"shape_meta_#{Support.ComponentSetup.full_test_name(ctx)}" + shape_cache_opts = [ storage: {MockStorage, []}, inspector: {MockInspector, []}, shape_status: MockShapeStatus, + shape_meta_table: shape_meta_table, persistent_kv: Electric.PersistentKV.Memory.new!(), prepare_tables_fn: fn _, _ -> {:ok, [:ok]} end, log_producer: __MODULE__.ShapeLogCollector, @@ -197,9 +200,6 @@ defmodule Electric.Replication.ShapeLogCollectorTest do shape_id3 = "shape3" shape3 = @other_shape - # doesn't matter, isn't used for this test - xmin = 100 - old_rel = %Relation{ id: relation_id, schema: "public", @@ -219,8 +219,8 @@ defmodule Electric.Replication.ShapeLogCollectorTest do MockShapeStatus |> expect(:get_relation, 1, fn _, ^relation_id -> old_rel end) |> expect(:store_relation, 1, fn _, ^new_rel -> :ok end) - |> expect(:list_active_shapes, 1, fn _ -> - [{shape_id1, shape1, xmin}, {shape_id2, shape2, xmin}, {shape_id3, shape3, xmin}] + |> expect(:list_shapes, 1, fn _ -> + [{shape_id1, shape1}, {shape_id2, shape2}, {shape_id3, shape3}] end) |> expect(:remove_shape, 1, fn state, ^shape_id1 -> {:ok, state} end) |> expect(:remove_shape, 1, fn state, ^shape_id2 -> {:ok, state} end) @@ -257,9 +257,6 @@ defmodule Electric.Replication.ShapeLogCollectorTest do shape_id3 = "shape3" shape3 = @other_shape - # doesn't matter, isn't used for this test - xmin = 100 - old_rel = %Relation{ id: relation_id, schema: "public", @@ -279,8 +276,8 @@ defmodule Electric.Replication.ShapeLogCollectorTest do MockShapeStatus |> expect(:get_relation, 1, fn _, ^relation_id -> old_rel end) |> expect(:store_relation, 1, fn _, ^new_rel -> :ok end) - |> expect(:list_active_shapes, fn _ -> - [{shape_id1, shape1, xmin}, {shape_id2, shape2, xmin}, {shape_id3, shape3, xmin}] + |> expect(:list_shapes, 1, fn _ -> + [{shape_id1, shape1}, {shape_id2, shape2}, {shape_id3, shape3}] end) |> expect(:remove_shape, 1, fn state, ^shape_id1 -> {:ok, state} end) |> expect(:remove_shape, 1, fn state, ^shape_id2 -> {:ok, state} end) diff --git a/packages/sync-service/test/electric/shape_cache/shape_status_test.exs b/packages/sync-service/test/electric/shape_cache/shape_status_test.exs index 95c078e47f..fb701251bb 100644 --- a/packages/sync-service/test/electric/shape_cache/shape_status_test.exs +++ b/packages/sync-service/test/electric/shape_cache/shape_status_test.exs @@ -30,7 +30,7 @@ defmodule Electric.ShapeCache.ShapeStatusTest do defp new_state(ctx, opts \\ []) do table = Keyword.get(opts, :table, table_name()) - {:ok, state} = ShapeStatus.initialise(persistent_kv: ctx.kv, meta_table: table) + {:ok, state} = ShapeStatus.initialise(persistent_kv: ctx.kv, shape_meta_table: table) shapes = Keyword.get(opts, :shapes, []) @@ -182,11 +182,11 @@ defmodule Electric.ShapeCache.ShapeStatusTest do refute ShapeStatus.set_snapshot_xmin(state, "sdfsodf", 1234) refute ShapeStatus.snapshot_xmin?(state, "sdfsodf") - refute ShapeStatus.snapshot_xmin?(state.meta_table, "sdfsodf") + refute ShapeStatus.snapshot_xmin?(state.shape_meta_table, "sdfsodf") refute ShapeStatus.snapshot_xmin?(state, shape_id) assert ShapeStatus.set_snapshot_xmin(state, shape_id, 1234) assert ShapeStatus.snapshot_xmin?(state, shape_id) - assert ShapeStatus.snapshot_xmin?(state.meta_table, shape_id) + assert ShapeStatus.snapshot_xmin?(state.shape_meta_table, shape_id) end test "relation data", ctx do diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index d765c8414a..1be90782bb 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -11,7 +11,7 @@ defmodule Electric.ShapeCacheTest do alias Electric.Replication.Changes.{Relation, Column} alias Electric.Replication.LogOffset alias Electric.ShapeCache - alias Electric.ShapeCache.Storage + alias Electric.ShapeCache.{Storage, ShapeStatus} alias Electric.Shapes alias Electric.Shapes.Shape @@ -366,7 +366,7 @@ defmodule Electric.ShapeCacheTest do end end - describe "list_active_shapes/1" do + describe "list_shapes/1" do setup [ :with_in_memory_storage, :with_persistent_kv, @@ -380,7 +380,9 @@ defmodule Electric.ShapeCacheTest do prepare_tables_fn: @prepare_tables_noop ) - assert ShapeCache.list_active_shapes(opts) == [] + meta_table = Keyword.fetch!(opts, :shape_meta_table) + + assert ShapeCache.list_shapes(%{shape_meta_table: meta_table}) == [] end test "lists the shape as active once there is a snapshot", ctx do @@ -396,10 +398,12 @@ defmodule Electric.ShapeCacheTest do {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) assert :started = ShapeCache.await_snapshot_start(shape_id, opts) - assert [{^shape_id, @shape, 10}] = ShapeCache.list_active_shapes(opts) + meta_table = Keyword.fetch!(opts, :shape_meta_table) + assert [{^shape_id, @shape}] = ShapeCache.list_shapes(%{shape_meta_table: meta_table}) + assert {:ok, 10} = ShapeStatus.snapshot_xmin(meta_table, shape_id) end - test "doesn't list the shape as active until we know xmin", ctx do + test "lists the shape even if we don't know xmin", ctx do test_pid = self() %{shape_cache_opts: opts} = @@ -420,12 +424,13 @@ defmodule Electric.ShapeCacheTest do # Wait until we get to the waiting point in the snapshot assert_receive {:waiting_point, ref, pid} - assert ShapeCache.list_active_shapes(opts) == [] + meta_table = Keyword.fetch!(opts, :shape_meta_table) + assert [{^shape_id, @shape}] = ShapeCache.list_shapes(%{shape_meta_table: meta_table}) send(pid, {:continue, ref}) assert :started = ShapeCache.await_snapshot_start(shape_id, opts) - assert [{^shape_id, @shape, 10}] = ShapeCache.list_active_shapes(opts) + assert [{^shape_id, @shape}] = ShapeCache.list_shapes(%{shape_meta_table: meta_table}) end end @@ -784,12 +789,15 @@ defmodule Electric.ShapeCacheTest do test "restores snapshot xmins", %{shape_cache_opts: opts} = context do {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) :started = ShapeCache.await_snapshot_start(shape_id, opts) - [{^shape_id, @shape, @snapshot_xmin}] = ShapeCache.list_active_shapes(opts) + meta_table = Keyword.fetch!(opts, :shape_meta_table) + [{^shape_id, @shape}] = ShapeCache.list_shapes(%{shape_meta_table: meta_table}) + {:ok, @snapshot_xmin} = ShapeStatus.snapshot_xmin(meta_table, shape_id) restart_shape_cache(context) :started = ShapeCache.await_snapshot_start(shape_id, opts) - assert [{^shape_id, @shape, @snapshot_xmin}] = ShapeCache.list_active_shapes(opts) + assert [{^shape_id, @shape}] = ShapeCache.list_shapes(%{shape_meta_table: meta_table}) + {:ok, @snapshot_xmin} = ShapeStatus.snapshot_xmin(meta_table, shape_id) end test "restores latest offset", %{shape_cache_opts: opts} = context do diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index bd2f66dc52..19d2221397 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -152,7 +152,7 @@ defmodule Support.ComponentSetup do |> Keyword.merge(overrides) end - defp full_test_name(ctx) do + def full_test_name(ctx) do "#{ctx.module} #{ctx.test}" end end