Skip to content

Commit

Permalink
fix: clean cached column information on delete (#1562)
Browse files Browse the repository at this point in the history
Fixes #1550.
This PR extends the shape log collector to also clean the cached column
information from ETS when a relation changes. This ensures that when a
table is migrated, we don't re-use the old table information from ETS
but instead load it from Postgres and re-populate the cache with the new
column info.

**There's one corner case that this PR does not address:**
- Create table, insert data
- Sync a shape containing that table
- Drop the table
- Delete the shape
- Recreate the table but with a different schema
- Insert some data into the new table
- Sync a shape containing the newly recreated table

In the above scenario, when syncing the newly recreated table, we get
the new data but in the old schema (so we only get the columns that also
existed in the old schema). This is because Postgres logical replication
stream does not inform us when a table is dropped.

As a result, we can't detect that a table was dropped and thus don't
know that we need to clean the cached column information. It's only when
we get a Relation message that we know this. But Postgres only sends a
Relation message the first time the data in the table changes and
**we're subscribed to that table in the replication stream** (and that's
only after syncing the shape).

So, the data that was inserted before we synced the table in the last
step, does not lead to a Relation message. Only if we insert data into
the table after that sync step, will Postgres send a Relation message
that will make us clean the cached column information. But note that at
that point the row that was inserted in the previous step is already
stored in storage in the format of the old schema.
  • Loading branch information
kevin-dp authored Sep 2, 2024
1 parent 97a9f5e commit 1d00501
Show file tree
Hide file tree
Showing 13 changed files with 373 additions and 54 deletions.
5 changes: 5 additions & 0 deletions .changeset/soft-gifts-watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Clean cached column info on relation changes.
9 changes: 5 additions & 4 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ defmodule Electric.Application do
prepare_tables_fn =
{Electric.Postgres.Configuration, :configure_tables_for_replication!, [publication_name]}

inspector =
{Electric.Postgres.Inspector.EtsInspector,
server: Electric.Postgres.Inspector.EtsInspector}

shape_cache =
{Electric.ShapeCache,
storage: storage,
inspector: inspector,
prepare_tables_fn: prepare_tables_fn,
log_producer: Electric.Replication.ShapeLogCollector,
persistent_kv: persistent_kv,
registry: Registry.ShapeChanges}

inspector =
{Electric.Postgres.Inspector.EtsInspector,
server: Electric.Postgres.Inspector.EtsInspector}

core_processes = [
{Registry,
name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
Expand Down
20 changes: 20 additions & 0 deletions packages/sync-service/lib/electric/persistent_kv/mock.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Electric.PersistentKV.Mock do
defstruct []

@type t() :: %__MODULE__{}

@spec new() :: t()
def new() do
%__MODULE__{}
end

defimpl Electric.PersistentKV do
def set(_memory, _key, _value) do
:ok
end

def get(_memory, _key) do
{:ok, 42}
end
end
end
8 changes: 8 additions & 0 deletions packages/sync-service/lib/electric/postgres/inspector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ defmodule Electric.Postgres.Inspector do
@callback load_column_info(relation(), opts :: term()) ::
{:ok, [column_info()]} | :table_not_found

@callback clean_column_info(relation(), opts :: term()) :: true

@type inspector :: {module(), opts :: term()}

@doc """
Expand All @@ -25,6 +27,12 @@ defmodule Electric.Postgres.Inspector do
@spec load_column_info(relation(), inspector()) :: {:ok, [column_info()]} | :table_not_found
def load_column_info(relation, {module, opts}), do: module.load_column_info(relation, opts)

@doc """
Clean up column information about a given table using a provided inspector.
"""
@spec clean_column_info(relation(), inspector()) :: true
def clean_column_info(relation, {module, opts}), do: module.clean_column_info(relation, opts)

@doc """
Get columns that should be considered a PK for table. If the table
has no PK, then we're considering all columns as identifying.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
{:ok, rows}
end
end

def clean_column_info(_, _), do: true
end
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
end
end

@impl Electric.Postgres.Inspector
def clean_column_info(table, opts_or_state) do
ets_table = Access.get(opts_or_state, :pg_info_table, @default_pg_info_table)

:ets.delete(ets_table, {table, :columns})
end

## Internal API

@impl GenServer
Expand Down
72 changes: 48 additions & 24 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Electric.ShapeCacheBehaviour do
{shape_id(), current_snapshot_offset :: LogOffset.t()}

@callback list_active_shapes(opts :: keyword()) :: [{shape_id(), shape_def(), xmin()}]
@callback get_relation(Messages.relation_id(), opts :: keyword()) :: Changes.Relation.t() | nil
@callback await_snapshot_start(shape_id(), opts :: keyword()) :: :started | {:error, term()}
@callback handle_truncate(shape_id(), keyword()) :: :ok
@callback clean_shape(shape_id(), keyword()) :: :ok
Expand Down Expand Up @@ -55,6 +56,8 @@ defmodule Electric.ShapeCache do
default: Electric.Replication.ShapeLogCollector
],
storage: [type: :mod_arg, required: true],
inspector: [type: :mod_arg, required: true],
shape_status: [type: :atom, default: Electric.ShapeCache.ShapeStatus],
registry: [type: {:or, [:atom, :pid]}, required: true],
# NimbleOptions has no "implementation of protocol" type
persistent_kv: [type: :any, required: true],
Expand All @@ -81,9 +84,10 @@ defmodule Electric.ShapeCache do
@impl Electric.ShapeCacheBehaviour
def get_or_create_shape_id(shape, opts \\ []) do
table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
shape_status = Access.get(opts, :shape_status, ShapeStatus)

# Get or create the shape ID and fire a snapshot if necessary
if shape_state = ShapeStatus.existing_shape(table, shape) do
if shape_state = shape_status.existing_shape(table, shape) do
shape_state
else
server = Access.get(opts, :server, __MODULE__)
Expand All @@ -96,8 +100,9 @@ defmodule Electric.ShapeCache do
:ok | {:error, term()}
def update_shape_latest_offset(shape_id, latest_offset, opts) do
meta_table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
shape_status = Access.get(opts, :shape_status, ShapeStatus)

if ShapeStatus.set_latest_offset(meta_table, shape_id, latest_offset) do
if shape_status.set_latest_offset(meta_table, shape_id, latest_offset) do
:ok
else
Logger.warning("Tried to update latest offset for shape #{shape_id} which doesn't exist")
Expand All @@ -108,14 +113,17 @@ defmodule Electric.ShapeCache do
@impl Electric.ShapeCacheBehaviour
def list_active_shapes(opts \\ []) do
table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
shape_status = Access.get(opts, :shape_status, ShapeStatus)

ShapeStatus.list_active_shapes(table)
shape_status.list_active_shapes(table)
end

@impl Electric.ShapeCacheBehaviour
@spec get_relation(Messages.relation_id(), opts :: keyword()) :: Changes.Relation.t() | nil
def get_relation(relation_id, opts) do
meta_table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
ShapeStatus.get_relation(meta_table, relation_id)
shape_status = Access.get(opts, :shape_status, ShapeStatus)
shape_status.get_relation(meta_table, relation_id)
end

@impl Electric.ShapeCacheBehaviour
Expand All @@ -136,8 +144,9 @@ defmodule Electric.ShapeCache do
@spec await_snapshot_start(shape_id(), keyword()) :: :started | {:error, term()}
def await_snapshot_start(shape_id, opts \\ []) when is_binary(shape_id) do
table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
shape_status = Access.get(opts, :shape_status, ShapeStatus)

if ShapeStatus.snapshot_xmin?(table, shape_id) do
if shape_status.snapshot_xmin?(table, shape_id) do
:started
else
server = Access.get(opts, :server, __MODULE__)
Expand All @@ -148,8 +157,9 @@ defmodule Electric.ShapeCache do
@impl Electric.ShapeCacheBehaviour
def has_shape?(shape_id, opts \\ []) do
table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
shape_status = Access.get(opts, :shape_status, ShapeStatus)

if ShapeStatus.existing_shape(table, shape_id) do
if shape_status.existing_shape(table, shape_id) do
true
else
server = Access.get(opts, :server, __MODULE__)
Expand All @@ -160,15 +170,17 @@ defmodule Electric.ShapeCache do
@impl GenStage
def init(opts) do
{:ok, persistent_state} =
ShapeStatus.initialise(
opts.shape_status.initialise(
persistent_kv: opts.persistent_kv,
meta_table: opts.shape_meta_table
)

state = %{
name: opts.name,
storage: opts.storage,
inspector: opts.inspector,
shape_meta_table: opts.shape_meta_table,
shape_status: opts.shape_status,
awaiting_snapshot_start: %{},
db_pool: opts.db_pool,
persistent_state: persistent_state,
Expand All @@ -186,7 +198,7 @@ defmodule Electric.ShapeCache do

@impl GenStage
def handle_events(relations, _from, state) do
%{persistent_state: persistent_state} = state
%{persistent_state: persistent_state, shape_status: shape_status} = state
# NOTE: [@magnetised] this manages cleaning up shapes after a relation
# change. it's not doing it in a consistent way, as the shape consumers
# could still be receiving txns after a relation message that requires them
Expand All @@ -209,10 +221,10 @@ defmodule Electric.ShapeCache do
# replication stream, it knows that it can just continue.

Enum.each(relations, fn relation ->
old_rel = ShapeStatus.get_relation(persistent_state, relation.id)
old_rel = shape_status.get_relation(persistent_state, relation.id)

if is_nil(old_rel) || old_rel != relation do
:ok = ShapeStatus.store_relation(persistent_state, relation)
:ok = shape_status.store_relation(persistent_state, relation)
end

if !is_nil(old_rel) && old_rel != relation do
Expand All @@ -222,23 +234,35 @@ defmodule Electric.ShapeCache do

# Fetch all shapes that are affected by the relation change and clean them up
persistent_state
|> ShapeStatus.list_active_shapes()
|> shape_status.list_active_shapes()
|> Enum.filter(&Shape.is_affected_by_relation_change?(&1, change))
|> Enum.map(&elem(&1, 0))
|> Enum.each(fn shape_id -> clean_up_shape(state, shape_id) end)
end

if old_rel != relation do
# Clean column information from ETS
# also when old_rel is nil because column info
# may already be loaded into ETS by the initial shape request
{inspector, inspector_opts} = state.inspector
# if the old relation exists we use that one
# because the table name may have changed in the new relation
# if there is no old relation, we use the new one
rel = old_rel || relation
inspector.clean_column_info({rel.schema, rel.table}, inspector_opts)
end
end)

{:noreply, [], state}
end

@impl GenStage
def handle_call({:create_or_wait_shape_id, shape}, _from, state) do
def handle_call({:create_or_wait_shape_id, shape}, _from, %{shape_status: shape_status} = state) do
{{shape_id, latest_offset}, state} =
if shape_state = ShapeStatus.existing_shape(state.persistent_state, shape) do
if shape_state = shape_status.existing_shape(state.persistent_state, shape) do
{shape_state, state}
else
{:ok, shape_id} = ShapeStatus.add_shape(state.persistent_state, shape)
{:ok, shape_id} = shape_status.add_shape(state.persistent_state, shape)

{:ok, _snapshot_xmin, latest_offset} = start_shape(shape_id, shape, state)
{{shape_id, latest_offset}, state}
Expand All @@ -249,12 +273,12 @@ defmodule Electric.ShapeCache do
{:reply, {shape_id, latest_offset}, [], state}
end

def handle_call({:await_snapshot_start, shape_id}, from, state) do
def handle_call({:await_snapshot_start, shape_id}, from, %{shape_status: shape_status} = state) do
cond do
not is_known_shape_id?(state, shape_id) ->
{:reply, {:error, :unknown}, [], state}

ShapeStatus.snapshot_xmin?(state.persistent_state, shape_id) ->
shape_status.snapshot_xmin?(state.persistent_state, shape_id) ->
{:reply, :started, [], state}

true ->
Expand All @@ -264,8 +288,8 @@ defmodule Electric.ShapeCache do
end
end

def handle_call({:wait_shape_id, shape_id}, _from, state) do
{:reply, !is_nil(ShapeStatus.existing_shape(state.persistent_state, shape_id)), [], state}
def handle_call({:wait_shape_id, shape_id}, _from, %{shape_status: shape_status} = state) do
{:reply, !is_nil(shape_status.existing_shape(state.persistent_state, shape_id)), [], state}
end

def handle_call({:truncate, shape_id}, _from, state) do
Expand All @@ -288,8 +312,8 @@ defmodule Electric.ShapeCache do
end

@impl GenStage
def handle_cast({:snapshot_xmin_known, shape_id, xmin}, state) do
unless ShapeStatus.set_snapshot_xmin(state.persistent_state, shape_id, xmin) do
def handle_cast({:snapshot_xmin_known, shape_id, xmin}, %{shape_status: shape_status} = state) do
unless shape_status.set_snapshot_xmin(state.persistent_state, shape_id, xmin) do
Logger.warning(
"Got snapshot information for a #{shape_id}, that shape id is no longer valid. Ignoring."
)
Expand Down Expand Up @@ -324,11 +348,11 @@ defmodule Electric.ShapeCache do
defp clean_up_shape(state, shape_id) do
Electric.ShapeCache.ShapeSupervisor.stop_shape_consumer(shape_id)

ShapeStatus.remove_shape(state.persistent_state, shape_id)
state.shape_status.remove_shape(state.persistent_state, shape_id)
end

defp is_known_shape_id?(state, shape_id) do
if ShapeStatus.existing_shape(state.persistent_state, shape_id) do
if state.shape_status.existing_shape(state.persistent_state, shape_id) do
true
else
false
Expand All @@ -343,7 +367,7 @@ defmodule Electric.ShapeCache do

defp recover_shapes(state) do
state.persistent_state
|> ShapeStatus.list_shapes()
|> state.shape_status.list_shapes()
|> Enum.each(fn {shape_id, shape} ->
{:ok, _snapshot_xmin, _latest_offset} = start_shape(shape_id, shape, state)
end)
Expand All @@ -368,7 +392,7 @@ defmodule Electric.ShapeCache do
{:ok, snapshot_xmin, latest_offset} = Shapes.Consumer.initial_state(consumer)

:ok =
ShapeStatus.initialise_shape(
state.shape_status.initialise_shape(
state.persistent_state,
shape_id,
snapshot_xmin,
Expand Down
22 changes: 21 additions & 1 deletion packages/sync-service/lib/electric/shape_cache/shape_status.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
defmodule Electric.ShapeCache.ShapeStatusBehaviour do
@moduledoc """
Behaviour defining the ShapeStatus functions to be used in mocks
"""
alias Electric.Shapes.Shape
alias Electric.ShapeCache.ShapeStatus
alias Electric.Postgres.LogicalReplication.Messages
alias Electric.Replication.Changes.Relation

@callback initialise(ShapeStatus.options()) :: {:ok, ShapeStatus.t()} | {:error, term()}
@callback list_shapes(ShapeStatus.t()) :: [{ShapeStatus.shape_id(), Shape.t()}]
@callback list_active_shapes(opts :: keyword()) :: [
{ShapeStatus.shape_id(), ShapeStatus.shape_def(), ShapeStatus.xmin()}
]
@callback get_relation(ShapeStatus.t(), Messages.relation_id()) :: Relation.t() | nil
@callback store_relation(ShapeStatus.t(), Relation.t()) :: :ok
@callback remove_shape(ShapeStatus.t(), ShapeStatus.shape_id()) ::
{:ok, ShapeStatus.t()} | {:error, term()}
end

defmodule Electric.ShapeCache.ShapeStatus do
@moduledoc """
Keeps track of shape state.
Serializes just enough to some persistent storage to bootstrap the
ShapeCache by writing the mapping of `shape_id => %Shape{}` to
ShapeCache by writing the mapping of `shape_id => %Shape{}` to
storage.
The shape cache then loads this and starts processes (storage and consumer)
Expand Down
Loading

0 comments on commit 1d00501

Please sign in to comment.