Skip to content

Commit

Permalink
feat: clean shapes affected by relation changes (#1544)
Browse files Browse the repository at this point in the history
This PR addresses #1535
by detecting relation changes and cleaning up the shapes affected by the
relation change. Note that Postgres' logical replication stream
propagates relation changes lazily. So we won't detect the migration
until somebody actually inserts/updates/deletes a row from a migrated
table.

This PR is affected by a bug that causes schema information not to be
deleted when a shape is cleaned. Because of that, when testing this PR
you will notice that if we add a column the shape is cleaned but then if
we fetch the shape again the new column is not present in the resulting
rows. This should be fixed when the schema information bug is fixed.
  • Loading branch information
kevin-dp authored Aug 22, 2024
1 parent 03e34fb commit fa88719
Show file tree
Hide file tree
Showing 17 changed files with 462 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .changeset/clever-parents-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

clean shapes affected by migrations
4 changes: 3 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ defmodule Electric.Application do
try_creating_publication?: true,
slot_name: slot_name,
transaction_received:
{Electric.Replication.ShapeLogCollector, :store_transaction, []}
{Electric.Replication.ShapeLogCollector, :store_transaction, []},
relation_received:
{Electric.Replication.ShapeLogCollector, :handle_relation_msg, []}
],
pool_opts: [
name: Electric.DbPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Electric.Postgres.ReplicationClient do
alias Electric.Postgres.LogicalReplication.Decoder
alias Electric.Postgres.ReplicationClient.Collector
alias Electric.Postgres.ReplicationClient.ConnectionSetup
alias Electric.Replication.Changes.Relation

require Logger

Expand All @@ -20,9 +21,10 @@ defmodule Electric.Postgres.ReplicationClient do
| :streaming

defmodule State do
@enforce_keys [:transaction_received, :publication_name]
@enforce_keys [:transaction_received, :relation_received, :publication_name]
defstruct [
:transaction_received,
:relation_received,
:publication_name,
:try_creating_publication?,
:start_streaming?,
Expand All @@ -44,6 +46,7 @@ defmodule Electric.Postgres.ReplicationClient do

@type t() :: %__MODULE__{
transaction_received: {module(), atom(), [term()]},
relation_received: {module(), atom(), [term()]},
publication_name: String.t(),
try_creating_publication?: boolean(),
start_streaming?: boolean(),
Expand All @@ -58,6 +61,7 @@ defmodule Electric.Postgres.ReplicationClient do

@opts_schema NimbleOptions.new!(
transaction_received: [required: true, type: :mfa],
relation_received: [required: true, type: :mfa],
publication_name: [required: true, type: :string],
try_creating_publication?: [required: true, type: :boolean],
start_streaming?: [type: :boolean, default: true],
Expand Down Expand Up @@ -159,6 +163,11 @@ defmodule Electric.Postgres.ReplicationClient do
%Collector{} = txn_collector ->
{:noreply, %{state | txn_collector: txn_collector}}

{%Relation{} = rel, %Collector{} = txn_collector} ->
{m, f, args} = state.relation_received
apply(m, f, [rel | args])
{:noreply, %{state | txn_collector: txn_collector}}

{txn, %Collector{} = txn_collector} ->
state = %{state | txn_collector: txn_collector}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ defmodule Electric.Postgres.ReplicationClient.Collector do
NewRecord,
UpdatedRecord,
DeletedRecord,
TruncatedRelation
TruncatedRelation,
Relation,
Column
}

defstruct transaction: nil, tx_op_index: nil, relations: %{}
Expand Down Expand Up @@ -50,12 +52,21 @@ defmodule Electric.Postgres.ReplicationClient.Collector do
def handle_message(%LR.Origin{} = _msg, state), do: state
def handle_message(%LR.Type{}, state), do: state

def handle_message(%LR.Relation{id: id} = rel, %__MODULE__{} = state) do
if Map.get(state.relations, id, rel) != rel do
Logger.warning("Schema for the table #{rel.namespace}.#{rel.name} had changed")
end

Map.update!(state, :relations, &Map.put(&1, rel.id, rel))
def handle_message(
%LR.Relation{id: id, namespace: ns, name: name, columns: cols} = rel,
%__MODULE__{} = state
) do
new_state = Map.update!(state, :relations, &Map.put(&1, rel.id, rel))

{
%Relation{
id: id,
schema: ns,
table: name,
columns: Enum.map(cols, fn col -> %Column{name: col.name, type_oid: col.type_oid} end)
},
new_state
}
end

def handle_message(%LR.Insert{} = msg, %__MODULE__{} = state) do
Expand Down
43 changes: 36 additions & 7 deletions packages/sync-service/lib/electric/replication/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Electric.Replication.Changes do

@type db_identifier() :: String.t()
@type xid() :: non_neg_integer()
@type relation() :: {schema :: db_identifier(), table :: db_identifier()}
@type relation_name() :: {schema :: db_identifier(), table :: db_identifier()}
@type record() :: %{(column_name :: db_identifier()) => column_data :: binary()}
@type relation_id() :: non_neg_integer

Expand All @@ -33,15 +33,15 @@ defmodule Electric.Replication.Changes do
| Changes.UpdatedRecord.t()
| Changes.DeletedRecord.t()

@type change() :: data_change() | Changes.TruncatedRelation.t()
@type change() :: data_change() | Changes.TruncatedRelation.t() | Changes.RelationChange.t()

defmodule Transaction do
alias Electric.Replication.Changes

@type t() :: %__MODULE__{
xid: Changes.xid() | nil,
changes: [Changes.change()],
affected_relations: MapSet.t(Changes.relation()),
affected_relations: MapSet.t(Changes.relation_name()),
commit_timestamp: DateTime.t(),
lsn: Electric.Postgres.Lsn.t(),
last_log_offset: LogOffset.t()
Expand Down Expand Up @@ -79,7 +79,7 @@ defmodule Electric.Replication.Changes do
defstruct [:relation, :record, :log_offset, :key]

@type t() :: %__MODULE__{
relation: Changes.relation(),
relation: Changes.relation_name(),
record: Changes.record(),
log_offset: LogOffset.t(),
key: String.t()
Expand All @@ -99,7 +99,7 @@ defmodule Electric.Replication.Changes do
]

@type t() :: %__MODULE__{
relation: Changes.relation(),
relation: Changes.relation_name(),
old_record: Changes.record() | nil,
record: Changes.record(),
log_offset: LogOffset.t(),
Expand Down Expand Up @@ -145,7 +145,7 @@ defmodule Electric.Replication.Changes do
defstruct [:relation, :old_record, :log_offset, :key, tags: []]

@type t() :: %__MODULE__{
relation: Changes.relation(),
relation: Changes.relation_name(),
old_record: Changes.record(),
log_offset: LogOffset.t(),
key: String.t(),
Expand All @@ -157,11 +157,40 @@ defmodule Electric.Replication.Changes do
defstruct [:relation, :log_offset]

@type t() :: %__MODULE__{
relation: Changes.relation(),
relation: Changes.relation_name(),
log_offset: LogOffset.t()
}
end

defmodule Column do
defstruct [:name, :type_oid]

@type t() :: %__MODULE__{
name: Changes.db_identifier(),
type_oid: pos_integer()
}
end

defmodule Relation do
defstruct [:id, :schema, :table, :columns]

@type t() :: %__MODULE__{
id: Changes.relation_id(),
schema: Changes.db_identifier(),
table: Changes.db_identifier(),
columns: [Column.t()]
}
end

defmodule RelationChange do
defstruct [:old_relation, :new_relation]

@type t() :: %__MODULE__{
old_relation: Relation.t(),
new_relation: Relation.t()
}
end

@doc """
Build a unique key for a given record based on it's relation and PK.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Electric.Replication.ShapeLogCollector do
alias Electric.Postgres.Inspector
alias Electric.Shapes.Shape
alias Electric.Replication.Changes
alias Electric.Replication.Changes.Transaction
alias Electric.Replication.Changes.{Transaction, Relation, RelationChange}
use GenServer
require Logger

Expand All @@ -32,10 +32,39 @@ defmodule Electric.Replication.ShapeLogCollector do
GenServer.call(server, {:new_txn, txn})
end

def handle_relation_msg(%Relation{} = rel, server \\ __MODULE__) do
GenServer.call(server, {:relation_msg, rel})
end

def init(opts) do
{:ok, opts}
end

def handle_call(
{:relation_msg, rel},
_from,
state
) do
{shape_cache, opts} = state.shape_cache
old_rel = shape_cache.get_relation(rel.id, opts)

if is_nil(old_rel) || old_rel != rel do
shape_cache.store_relation(rel, opts)
end

if !is_nil(old_rel) && old_rel != rel do
Logger.info("Schema for the table #{old_rel.schema}.#{old_rel.table} changed")
change = %RelationChange{old_relation: old_rel, new_relation: rel}
# Fetch all shapes that are affected by the relation change and clean them up
shape_cache.list_active_shapes(opts)
|> Enum.filter(&is_affected_by_relation_change?(&1, change))
|> Enum.map(&elem(&1, 0))
|> Electric.Shapes.clean_shapes(state)
end

{:reply, :ok, state}
end

def handle_call(
{:new_txn,
%Transaction{xid: xid, changes: changes, lsn: lsn, last_log_offset: last_log_offset} =
Expand Down Expand Up @@ -101,4 +130,32 @@ defmodule Electric.Replication.ShapeLogCollector do
do: send(pid, {ref, :new_changes, latest_log_offset})
end)
end

defp is_affected_by_relation_change?(
shape,
%RelationChange{
old_relation: %Relation{schema: old_schema, table: old_table},
new_relation: %Relation{schema: new_schema, table: new_table}
}
)
when old_schema != new_schema or old_table != new_table do
# The table's qualified name changed
# so shapes that match the old schema or table name are affected
shape_matches?(shape, old_schema, old_table)
end

defp is_affected_by_relation_change?(shape, %RelationChange{
new_relation: %Relation{schema: schema, table: table}
}) do
shape_matches?(shape, schema, table)
end

# TODO: test this machinery of cleaning shapes on any migration
# once that works, then we can optimize it to only clean on relevant migrations

defp shape_matches?({_, %Shape{root_table: {ns, tbl}}, _}, schema, table)
when ns == schema and tbl == table,
do: true

defp shape_matches?(_, _, _), do: false
end
44 changes: 44 additions & 0 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Electric.ShapeCacheBehaviour do
alias Electric.ShapeCache.Storage
alias Electric.Shapes.Shape
alias Electric.Replication.LogOffset
alias Electric.Postgres.LogicalReplication.Messages

@type shape_id :: String.t()
@type shape_def :: Shape.t()
Expand All @@ -22,6 +23,8 @@ defmodule Electric.ShapeCacheBehaviour do
{shape_id(), current_snapshot_offset :: LogOffset.t()}

@callback list_active_shapes(opts :: keyword()) :: [{shape_id(), shape_def(), xmin()}]
@callback store_relation(Relation.t(), opts :: keyword()) :: :ok
@callback get_relation(Messages.relation_id(), opts :: keyword()) :: Relation.t() | nil
@callback await_snapshot_start(GenServer.name(), shape_id()) :: :started | {:error, term()}
@callback handle_truncate(GenServer.name(), shape_id()) :: :ok
@callback clean_shape(GenServer.name(), shape_id()) :: :ok
Expand All @@ -35,6 +38,7 @@ defmodule Electric.ShapeCache do
alias Electric.Shapes.Querying
alias Electric.Shapes.Shape
alias Electric.Replication.LogOffset
alias Electric.Replication.Changes.{Relation, Column}
alias Electric.Telemetry.OpenTelemetry

use GenServer
Expand All @@ -49,6 +53,8 @@ defmodule Electric.ShapeCache do
@shape_meta_xmin_pos 3
@shape_meta_latest_offset_pos 4

@relation_data :relation_data

@genserver_name_schema {:or, [:atom, {:tuple, [:atom, :atom, :any]}]}
@schema NimbleOptions.new!(
name: [
Expand Down Expand Up @@ -126,6 +132,37 @@ defmodule Electric.ShapeCache do
])
end

@spec store_relation(Relation.t(), keyword()) :: :ok
def store_relation(%Relation{} = rel, opts) do
store_relation_ets(rel, opts)
Storage.store_relation(rel, opts[:storage])
end

defp store_relation_ets(%Relation{id: id, schema: schema, table: table, columns: columns}, opts) do
meta_table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
cols = Enum.map(columns, fn col -> {col.name, col.type_oid} end)
:ets.insert(meta_table, {{@relation_data, id}, schema, table, cols})
end

@spec get_relation(Messages.relation_id(), opts :: keyword()) :: Relation.t() | nil
def get_relation(relation_id, opts) do
meta_table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)

case :ets.lookup(meta_table, {@relation_data, relation_id}) do
[] ->
nil

[{_, schema, table, cols}] ->
%Relation{
id: relation_id,
schema: schema,
table: table,
columns:
Enum.map(cols, fn {name, type_oid} -> %Column{name: name, type_oid: type_oid} end)
}
end
end

@spec clean_shape(GenServer.name(), String.t()) :: :ok
def clean_shape(server \\ __MODULE__, shape_id) do
GenServer.call(server, {:clean, shape_id})
Expand Down Expand Up @@ -155,6 +192,7 @@ defmodule Electric.ShapeCache do
}

recover_shapes(state)
recover_relations(state)

{:ok, state}
end
Expand Down Expand Up @@ -382,4 +420,10 @@ defmodule Electric.ShapeCache do
)
end)
end

defp recover_relations(state) do
state.storage
|> Storage.get_relations()
|> Enum.each(&store_relation_ets(&1, state))
end
end
Loading

0 comments on commit fa88719

Please sign in to comment.