Skip to content

Commit

Permalink
chore: remove list_active_shapes (#1602)
Browse files Browse the repository at this point in the history
This PR removes the `list_active_shapes` function and replaces its
usages by `list_shapes`.
Also renames the `meta_table` field in `ShapeStatus` to
`shape_meta_table` in order to be consistent with the naming throughout
the rest of the codebase.
  • Loading branch information
kevin-dp authored Sep 2, 2024
1 parent 1d00501 commit 538d99f
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 71 deletions.
5 changes: 5 additions & 0 deletions .changeset/perfect-moons-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Remove list_active_shapes and replace it by list_shapes.
13 changes: 6 additions & 7 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ defmodule Electric.ShapeCacheBehaviour do
@callback get_or_create_shape_id(shape_def(), opts :: keyword()) ::
{shape_id(), current_snapshot_offset :: LogOffset.t()}

@callback list_active_shapes(opts :: keyword()) :: [{shape_id(), shape_def(), xmin()}]
@callback get_relation(Messages.relation_id(), opts :: keyword()) :: Changes.Relation.t() | nil
@callback list_shapes(Electric.ShapeCache.ShapeStatus.t()) :: [{shape_id(), Shape.t()}]
@callback await_snapshot_start(shape_id(), opts :: keyword()) :: :started | {:error, term()}
@callback handle_truncate(shape_id(), keyword()) :: :ok
@callback clean_shape(shape_id(), keyword()) :: :ok
Expand Down Expand Up @@ -111,11 +111,10 @@ defmodule Electric.ShapeCache do
end

@impl Electric.ShapeCacheBehaviour
def list_active_shapes(opts \\ []) do
table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
@spec list_shapes(Electric.ShapeCache.ShapeStatus.t()) :: [{shape_id(), Shape.t()}]
def list_shapes(opts) do
shape_status = Access.get(opts, :shape_status, ShapeStatus)

shape_status.list_active_shapes(table)
shape_status.list_shapes(opts)
end

@impl Electric.ShapeCacheBehaviour
Expand Down Expand Up @@ -172,7 +171,7 @@ defmodule Electric.ShapeCache do
{:ok, persistent_state} =
opts.shape_status.initialise(
persistent_kv: opts.persistent_kv,
meta_table: opts.shape_meta_table
shape_meta_table: opts.shape_meta_table
)

state = %{
Expand Down Expand Up @@ -234,7 +233,7 @@ defmodule Electric.ShapeCache do

# Fetch all shapes that are affected by the relation change and clean them up
persistent_state
|> shape_status.list_active_shapes()
|> shape_status.list_shapes()
|> Enum.filter(&Shape.is_affected_by_relation_change?(&1, change))
|> Enum.map(&elem(&1, 0))
|> Enum.each(fn shape_id -> clean_up_shape(state, shape_id) end)
Expand Down
63 changes: 25 additions & 38 deletions packages/sync-service/lib/electric/shape_cache/shape_status.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ defmodule Electric.ShapeCache.ShapeStatusBehaviour do

@callback initialise(ShapeStatus.options()) :: {:ok, ShapeStatus.t()} | {:error, term()}
@callback list_shapes(ShapeStatus.t()) :: [{ShapeStatus.shape_id(), Shape.t()}]
@callback list_active_shapes(opts :: keyword()) :: [
{ShapeStatus.shape_id(), ShapeStatus.shape_def(), ShapeStatus.xmin()}
]
@callback get_relation(ShapeStatus.t(), Messages.relation_id()) :: Relation.t() | nil
@callback store_relation(ShapeStatus.t(), Relation.t()) :: :ok
@callback remove_shape(ShapeStatus.t(), ShapeStatus.shape_id()) ::
Expand Down Expand Up @@ -42,18 +39,18 @@ defmodule Electric.ShapeCache.ShapeStatus do

@schema NimbleOptions.new!(
persistent_kv: [type: :any, required: true],
meta_table: [type: {:or, [:atom, :reference]}, required: true],
shape_meta_table: [type: {:or, [:atom, :reference]}, required: true],
root: [type: :string, default: "./shape_cache"]
)

defstruct [:persistent_kv, :root, :meta_table]
defstruct [:persistent_kv, :root, :shape_meta_table]

@type shape_id() :: Electric.ShapeCache.shape_id()
@type table() :: atom() | reference()
@type t() :: %__MODULE__{
persistent_kv: PersistentKV.t(),
root: String.t(),
meta_table: table()
shape_meta_table: table()
}
@type option() :: unquote(NimbleOptions.option_typespec(@schema))
@type options() :: [option()]
Expand All @@ -69,7 +66,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
def initialise(opts) do
with {:ok, config} <- NimbleOptions.validate(opts, @schema),
{:ok, kv_backend} <- Access.fetch(config, :persistent_kv),
{:ok, table_name} = Access.fetch(config, :meta_table) do
{:ok, table_name} = Access.fetch(config, :shape_meta_table) do
persistent_kv =
PersistentKV.Serialized.new!(
backend: kv_backend,
Expand All @@ -81,7 +78,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
state =
struct(
__MODULE__,
Keyword.merge(config, persistent_kv: persistent_kv, meta_table: meta_table)
Keyword.merge(config, persistent_kv: persistent_kv, shape_meta_table: meta_table)
)

load(state)
Expand All @@ -97,7 +94,7 @@ defmodule Electric.ShapeCache.ShapeStatus do

true =
:ets.insert_new(
state.meta_table,
state.shape_meta_table,
[
{{@shape_hash_lookup, hash}, shape_id},
{{@shape_meta_data, shape_id}, shape, nil, offset}
Expand All @@ -111,7 +108,7 @@ defmodule Electric.ShapeCache.ShapeStatus do

@spec list_shapes(t()) :: [{shape_id(), Shape.t()}]
def list_shapes(state) do
:ets.select(state.meta_table, [
:ets.select(state.shape_meta_table, [
{
{{@shape_meta_data, :"$1"}, :"$2", :_, :_},
[true],
Expand All @@ -120,28 +117,18 @@ defmodule Electric.ShapeCache.ShapeStatus do
])
end

def list_active_shapes(%__MODULE__{meta_table: table}) do
list_active_shapes(table)
end

def list_active_shapes(table) when is_atom(table) or is_reference(table) do
:ets.select(table, [
{
{{@shape_meta_data, :"$1"}, :"$2", :"$3", :_},
[{:"=/=", :"$3", nil}],
[{{:"$1", :"$2", :"$3"}}]
}
])
end

@spec remove_shape(t(), shape_id()) :: {:ok, t()} | {:error, term()}
def remove_shape(state, shape_id) do
try do
shape =
:ets.lookup_element(state.meta_table, {@shape_meta_data, shape_id}, @shape_meta_shape_pos)
:ets.lookup_element(
state.shape_meta_table,
{@shape_meta_data, shape_id},
@shape_meta_shape_pos
)

:ets.select_delete(
state.meta_table,
state.shape_meta_table,
[
{{{@shape_meta_data, shape_id}, :_, :_, :_}, [], [true]},
{{{@shape_hash_lookup, :_}, shape_id}, [], [true]}
Expand All @@ -162,7 +149,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
end

@spec existing_shape(t(), shape_id() | Shape.t()) :: nil | {shape_id(), LogOffset.t()}
def existing_shape(%__MODULE__{meta_table: table}, shape_or_id) do
def existing_shape(%__MODULE__{shape_meta_table: table}, shape_or_id) do
existing_shape(table, shape_or_id)
end

Expand All @@ -188,7 +175,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
end

def initialise_shape(state, shape_id, snapshot_xmin, latest_offset) do
:ets.update_element(state.meta_table, {@shape_meta_data, shape_id}, [
:ets.update_element(state.shape_meta_table, {@shape_meta_data, shape_id}, [
{@shape_meta_xmin_pos, snapshot_xmin},
{@shape_meta_latest_offset_pos, latest_offset}
])
Expand All @@ -197,12 +184,12 @@ defmodule Electric.ShapeCache.ShapeStatus do
end

def set_snapshot_xmin(state, shape_id, snapshot_xmin) do
:ets.update_element(state.meta_table, {@shape_meta_data, shape_id}, [
:ets.update_element(state.shape_meta_table, {@shape_meta_data, shape_id}, [
{@shape_meta_xmin_pos, snapshot_xmin}
])
end

def set_latest_offset(%__MODULE__{meta_table: table} = _state, shape_id, latest_offset) do
def set_latest_offset(%__MODULE__{shape_meta_table: table} = _state, shape_id, latest_offset) do
set_latest_offset(table, shape_id, latest_offset)
end

Expand All @@ -212,7 +199,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
])
end

def latest_offset!(%__MODULE__{meta_table: table} = _state, shape_id) do
def latest_offset!(%__MODULE__{shape_meta_table: table} = _state, shape_id) do
latest_offset(table, shape_id)
end

Expand All @@ -224,7 +211,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
)
end

def latest_offset(%__MODULE__{meta_table: table} = _state, shape_id) do
def latest_offset(%__MODULE__{shape_meta_table: table} = _state, shape_id) do
latest_offset(table, shape_id)
end

Expand All @@ -238,7 +225,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
end)
end

def snapshot_xmin(%__MODULE__{meta_table: table} = _state, shape_id) do
def snapshot_xmin(%__MODULE__{shape_meta_table: table} = _state, shape_id) do
snapshot_xmin(table, shape_id)
end

Expand All @@ -252,7 +239,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
end)
end

def snapshot_xmin?(%__MODULE__{meta_table: table} = _state, shape_id) do
def snapshot_xmin?(%__MODULE__{shape_meta_table: table} = _state, shape_id) do
snapshot_xmin?(table, shape_id)
end

Expand All @@ -263,7 +250,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
end
end

def get_relation(%__MODULE__{meta_table: table} = _state, relation_id) do
def get_relation(%__MODULE__{shape_meta_table: table} = _state, relation_id) do
get_relation(table, relation_id)
end

Expand All @@ -274,7 +261,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
end
end

def store_relation(%__MODULE__{meta_table: meta_table} = state, %Relation{} = relation) do
def store_relation(%__MODULE__{shape_meta_table: meta_table} = state, %Relation{} = relation) do
with :ok <- store_relation(meta_table, relation) do
save(state)
end
Expand Down Expand Up @@ -332,7 +319,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
defp load(state) do
with {:ok, %{shapes: shapes, relations: relations}} <- load_shapes(state) do
:ets.insert(
state.meta_table,
state.shape_meta_table,
Enum.concat([
Enum.flat_map(shapes, fn {shape_id, shape} ->
hash = Shape.hash(shape)
Expand Down Expand Up @@ -367,7 +354,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
end
end

defp list_relations(%__MODULE__{meta_table: meta_table}) do
defp list_relations(%__MODULE__{shape_meta_table: meta_table}) do
:ets.select(meta_table, [
{
{{@relation_data, :"$1"}, :"$2"},
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/shapes/shape.ex
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ defmodule Electric.Shapes.Shape do
shape_matches?(shape, schema, table)
end

defp shape_matches?({_, %__MODULE__{root_table: {schema, table}}, _}, schema, table), do: true
defp shape_matches?({_, %__MODULE__{root_table: {schema, table}}}, schema, table), do: true
defp shape_matches?(_, _, _), do: false

@spec from_json_safe!(t()) :: json_safe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
)

describe "store_transaction/2" do
setup do
setup ctx do
# Start a test Registry
registry_name = Module.concat(__MODULE__, Registry)
start_link_supervised!({Registry, keys: :duplicate, name: registry_name})
Expand All @@ -61,18 +61,21 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
end)

MockShapeStatus
|> expect(:initialise, 1, fn opts -> {:ok, opts} end)
|> expect(:initialise, 1, fn opts -> Electric.ShapeCache.ShapeStatus.initialise(opts) end)
|> expect(:list_shapes, 1, fn _ -> [] end)
# allow the ShapeCache to call this mock
|> allow(self(), fn -> GenServer.whereis(Electric.ShapeCache) end)

# We need a ShapeCache process because it is a GenStage consumer
# that handles the Relation events produced by ShapeLogCollector
shape_meta_table = :"shape_meta_#{Support.ComponentSetup.full_test_name(ctx)}"

shape_cache_opts =
[
storage: {MockStorage, []},
inspector: {MockInspector, []},
shape_status: MockShapeStatus,
shape_meta_table: shape_meta_table,
persistent_kv: Electric.PersistentKV.Memory.new!(),
prepare_tables_fn: fn _, _ -> {:ok, [:ok]} end,
log_producer: __MODULE__.ShapeLogCollector,
Expand Down Expand Up @@ -197,9 +200,6 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
shape_id3 = "shape3"
shape3 = @other_shape

# doesn't matter, isn't used for this test
xmin = 100

old_rel = %Relation{
id: relation_id,
schema: "public",
Expand All @@ -219,8 +219,8 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
MockShapeStatus
|> expect(:get_relation, 1, fn _, ^relation_id -> old_rel end)
|> expect(:store_relation, 1, fn _, ^new_rel -> :ok end)
|> expect(:list_active_shapes, 1, fn _ ->
[{shape_id1, shape1, xmin}, {shape_id2, shape2, xmin}, {shape_id3, shape3, xmin}]
|> expect(:list_shapes, 1, fn _ ->
[{shape_id1, shape1}, {shape_id2, shape2}, {shape_id3, shape3}]
end)
|> expect(:remove_shape, 1, fn state, ^shape_id1 -> {:ok, state} end)
|> expect(:remove_shape, 1, fn state, ^shape_id2 -> {:ok, state} end)
Expand Down Expand Up @@ -257,9 +257,6 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
shape_id3 = "shape3"
shape3 = @other_shape

# doesn't matter, isn't used for this test
xmin = 100

old_rel = %Relation{
id: relation_id,
schema: "public",
Expand All @@ -279,8 +276,8 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
MockShapeStatus
|> expect(:get_relation, 1, fn _, ^relation_id -> old_rel end)
|> expect(:store_relation, 1, fn _, ^new_rel -> :ok end)
|> expect(:list_active_shapes, fn _ ->
[{shape_id1, shape1, xmin}, {shape_id2, shape2, xmin}, {shape_id3, shape3, xmin}]
|> expect(:list_shapes, 1, fn _ ->
[{shape_id1, shape1}, {shape_id2, shape2}, {shape_id3, shape3}]
end)
|> expect(:remove_shape, 1, fn state, ^shape_id1 -> {:ok, state} end)
|> expect(:remove_shape, 1, fn state, ^shape_id2 -> {:ok, state} end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Electric.ShapeCache.ShapeStatusTest do
defp new_state(ctx, opts \\ []) do
table = Keyword.get(opts, :table, table_name())

{:ok, state} = ShapeStatus.initialise(persistent_kv: ctx.kv, meta_table: table)
{:ok, state} = ShapeStatus.initialise(persistent_kv: ctx.kv, shape_meta_table: table)

shapes = Keyword.get(opts, :shapes, [])

Expand Down Expand Up @@ -182,11 +182,11 @@ defmodule Electric.ShapeCache.ShapeStatusTest do
refute ShapeStatus.set_snapshot_xmin(state, "sdfsodf", 1234)

refute ShapeStatus.snapshot_xmin?(state, "sdfsodf")
refute ShapeStatus.snapshot_xmin?(state.meta_table, "sdfsodf")
refute ShapeStatus.snapshot_xmin?(state.shape_meta_table, "sdfsodf")
refute ShapeStatus.snapshot_xmin?(state, shape_id)
assert ShapeStatus.set_snapshot_xmin(state, shape_id, 1234)
assert ShapeStatus.snapshot_xmin?(state, shape_id)
assert ShapeStatus.snapshot_xmin?(state.meta_table, shape_id)
assert ShapeStatus.snapshot_xmin?(state.shape_meta_table, shape_id)
end

test "relation data", ctx do
Expand Down
Loading

0 comments on commit 538d99f

Please sign in to comment.