diff --git a/.changeset/clever-parents-fail.md b/.changeset/clever-parents-fail.md new file mode 100644 index 0000000000..38d6e8180f --- /dev/null +++ b/.changeset/clever-parents-fail.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +clean shapes affected by migrations diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index e14847a69c..6c95a434c2 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -39,7 +39,9 @@ defmodule Electric.Application do try_creating_publication?: true, slot_name: slot_name, transaction_received: - {Electric.Replication.ShapeLogCollector, :store_transaction, []} + {Electric.Replication.ShapeLogCollector, :store_transaction, []}, + relation_received: + {Electric.Replication.ShapeLogCollector, :handle_relation_msg, []} ], pool_opts: [ name: Electric.DbPool, diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index c94738a602..3b0b33a14d 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -7,6 +7,7 @@ defmodule Electric.Postgres.ReplicationClient do alias Electric.Postgres.LogicalReplication.Decoder alias Electric.Postgres.ReplicationClient.Collector alias Electric.Postgres.ReplicationClient.ConnectionSetup + alias Electric.Replication.Changes.Relation require Logger @@ -20,9 +21,10 @@ defmodule Electric.Postgres.ReplicationClient do | :streaming defmodule State do - @enforce_keys [:transaction_received, :publication_name] + @enforce_keys [:transaction_received, :relation_received, :publication_name] defstruct [ :transaction_received, + :relation_received, :publication_name, :try_creating_publication?, :start_streaming?, @@ -44,6 +46,7 @@ defmodule Electric.Postgres.ReplicationClient do @type t() :: %__MODULE__{ transaction_received: {module(), atom(), [term()]}, + relation_received: {module(), atom(), [term()]}, publication_name: String.t(), try_creating_publication?: boolean(), start_streaming?: boolean(), @@ -58,6 +61,7 @@ defmodule Electric.Postgres.ReplicationClient do @opts_schema NimbleOptions.new!( transaction_received: [required: true, type: :mfa], + relation_received: [required: true, type: :mfa], publication_name: [required: true, type: :string], try_creating_publication?: [required: true, type: :boolean], start_streaming?: [type: :boolean, default: true], @@ -159,6 +163,11 @@ defmodule Electric.Postgres.ReplicationClient do %Collector{} = txn_collector -> {:noreply, %{state | txn_collector: txn_collector}} + {%Relation{} = rel, %Collector{} = txn_collector} -> + {m, f, args} = state.relation_received + apply(m, f, [rel | args]) + {:noreply, %{state | txn_collector: txn_collector}} + {txn, %Collector{} = txn_collector} -> state = %{state | txn_collector: txn_collector} diff --git a/packages/sync-service/lib/electric/postgres/replication_client/collector.ex b/packages/sync-service/lib/electric/postgres/replication_client/collector.ex index 1f72e5762a..2573e0035e 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/collector.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/collector.ex @@ -14,7 +14,9 @@ defmodule Electric.Postgres.ReplicationClient.Collector do NewRecord, UpdatedRecord, DeletedRecord, - TruncatedRelation + TruncatedRelation, + Relation, + Column } defstruct transaction: nil, tx_op_index: nil, relations: %{} @@ -50,12 +52,21 @@ defmodule Electric.Postgres.ReplicationClient.Collector do def handle_message(%LR.Origin{} = _msg, state), do: state def handle_message(%LR.Type{}, state), do: state - def handle_message(%LR.Relation{id: id} = rel, %__MODULE__{} = state) do - if Map.get(state.relations, id, rel) != rel do - Logger.warning("Schema for the table #{rel.namespace}.#{rel.name} had changed") - end - - Map.update!(state, :relations, &Map.put(&1, rel.id, rel)) + def handle_message( + %LR.Relation{id: id, namespace: ns, name: name, columns: cols} = rel, + %__MODULE__{} = state + ) do + new_state = Map.update!(state, :relations, &Map.put(&1, rel.id, rel)) + + { + %Relation{ + id: id, + schema: ns, + table: name, + columns: Enum.map(cols, fn col -> %Column{name: col.name, type_oid: col.type_oid} end) + }, + new_state + } end def handle_message(%LR.Insert{} = msg, %__MODULE__{} = state) do diff --git a/packages/sync-service/lib/electric/replication/changes.ex b/packages/sync-service/lib/electric/replication/changes.ex index e59585dd01..61d7cebc04 100644 --- a/packages/sync-service/lib/electric/replication/changes.ex +++ b/packages/sync-service/lib/electric/replication/changes.ex @@ -17,7 +17,7 @@ defmodule Electric.Replication.Changes do @type db_identifier() :: String.t() @type xid() :: non_neg_integer() - @type relation() :: {schema :: db_identifier(), table :: db_identifier()} + @type relation_name() :: {schema :: db_identifier(), table :: db_identifier()} @type record() :: %{(column_name :: db_identifier()) => column_data :: binary()} @type relation_id() :: non_neg_integer @@ -33,7 +33,7 @@ defmodule Electric.Replication.Changes do | Changes.UpdatedRecord.t() | Changes.DeletedRecord.t() - @type change() :: data_change() | Changes.TruncatedRelation.t() + @type change() :: data_change() | Changes.TruncatedRelation.t() | Changes.RelationChange.t() defmodule Transaction do alias Electric.Replication.Changes @@ -41,7 +41,7 @@ defmodule Electric.Replication.Changes do @type t() :: %__MODULE__{ xid: Changes.xid() | nil, changes: [Changes.change()], - affected_relations: MapSet.t(Changes.relation()), + affected_relations: MapSet.t(Changes.relation_name()), commit_timestamp: DateTime.t(), lsn: Electric.Postgres.Lsn.t(), last_log_offset: LogOffset.t() @@ -79,7 +79,7 @@ defmodule Electric.Replication.Changes do defstruct [:relation, :record, :log_offset, :key] @type t() :: %__MODULE__{ - relation: Changes.relation(), + relation: Changes.relation_name(), record: Changes.record(), log_offset: LogOffset.t(), key: String.t() @@ -99,7 +99,7 @@ defmodule Electric.Replication.Changes do ] @type t() :: %__MODULE__{ - relation: Changes.relation(), + relation: Changes.relation_name(), old_record: Changes.record() | nil, record: Changes.record(), log_offset: LogOffset.t(), @@ -145,7 +145,7 @@ defmodule Electric.Replication.Changes do defstruct [:relation, :old_record, :log_offset, :key, tags: []] @type t() :: %__MODULE__{ - relation: Changes.relation(), + relation: Changes.relation_name(), old_record: Changes.record(), log_offset: LogOffset.t(), key: String.t(), @@ -157,11 +157,40 @@ defmodule Electric.Replication.Changes do defstruct [:relation, :log_offset] @type t() :: %__MODULE__{ - relation: Changes.relation(), + relation: Changes.relation_name(), log_offset: LogOffset.t() } end + defmodule Column do + defstruct [:name, :type_oid] + + @type t() :: %__MODULE__{ + name: Changes.db_identifier(), + type_oid: pos_integer() + } + end + + defmodule Relation do + defstruct [:id, :schema, :table, :columns] + + @type t() :: %__MODULE__{ + id: Changes.relation_id(), + schema: Changes.db_identifier(), + table: Changes.db_identifier(), + columns: [Column.t()] + } + end + + defmodule RelationChange do + defstruct [:old_relation, :new_relation] + + @type t() :: %__MODULE__{ + old_relation: Relation.t(), + new_relation: Relation.t() + } + end + @doc """ Build a unique key for a given record based on it's relation and PK. diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector.ex b/packages/sync-service/lib/electric/replication/shape_log_collector.ex index 7a6a68604c..06dd03b3c5 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector.ex @@ -7,7 +7,7 @@ defmodule Electric.Replication.ShapeLogCollector do alias Electric.Postgres.Inspector alias Electric.Shapes.Shape alias Electric.Replication.Changes - alias Electric.Replication.Changes.Transaction + alias Electric.Replication.Changes.{Transaction, Relation, RelationChange} use GenServer require Logger @@ -32,10 +32,39 @@ defmodule Electric.Replication.ShapeLogCollector do GenServer.call(server, {:new_txn, txn}) end + def handle_relation_msg(%Relation{} = rel, server \\ __MODULE__) do + GenServer.call(server, {:relation_msg, rel}) + end + def init(opts) do {:ok, opts} end + def handle_call( + {:relation_msg, rel}, + _from, + state + ) do + {shape_cache, opts} = state.shape_cache + old_rel = shape_cache.get_relation(rel.id, opts) + + if is_nil(old_rel) || old_rel != rel do + shape_cache.store_relation(rel, opts) + end + + if !is_nil(old_rel) && old_rel != rel do + Logger.info("Schema for the table #{old_rel.schema}.#{old_rel.table} changed") + change = %RelationChange{old_relation: old_rel, new_relation: rel} + # Fetch all shapes that are affected by the relation change and clean them up + shape_cache.list_active_shapes(opts) + |> Enum.filter(&is_affected_by_relation_change?(&1, change)) + |> Enum.map(&elem(&1, 0)) + |> Electric.Shapes.clean_shapes(state) + end + + {:reply, :ok, state} + end + def handle_call( {:new_txn, %Transaction{xid: xid, changes: changes, lsn: lsn, last_log_offset: last_log_offset} = @@ -101,4 +130,32 @@ defmodule Electric.Replication.ShapeLogCollector do do: send(pid, {ref, :new_changes, latest_log_offset}) end) end + + defp is_affected_by_relation_change?( + shape, + %RelationChange{ + old_relation: %Relation{schema: old_schema, table: old_table}, + new_relation: %Relation{schema: new_schema, table: new_table} + } + ) + when old_schema != new_schema or old_table != new_table do + # The table's qualified name changed + # so shapes that match the old schema or table name are affected + shape_matches?(shape, old_schema, old_table) + end + + defp is_affected_by_relation_change?(shape, %RelationChange{ + new_relation: %Relation{schema: schema, table: table} + }) do + shape_matches?(shape, schema, table) + end + + # TODO: test this machinery of cleaning shapes on any migration + # once that works, then we can optimize it to only clean on relevant migrations + + defp shape_matches?({_, %Shape{root_table: {ns, tbl}}, _}, schema, table) + when ns == schema and tbl == table, + do: true + + defp shape_matches?(_, _, _), do: false end diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index b676dd74e3..c336db4d76 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -6,6 +6,7 @@ defmodule Electric.ShapeCacheBehaviour do alias Electric.ShapeCache.Storage alias Electric.Shapes.Shape alias Electric.Replication.LogOffset + alias Electric.Postgres.LogicalReplication.Messages @type shape_id :: String.t() @type shape_def :: Shape.t() @@ -22,6 +23,8 @@ defmodule Electric.ShapeCacheBehaviour do {shape_id(), current_snapshot_offset :: LogOffset.t()} @callback list_active_shapes(opts :: keyword()) :: [{shape_id(), shape_def(), xmin()}] + @callback store_relation(Relation.t(), opts :: keyword()) :: :ok + @callback get_relation(Messages.relation_id(), opts :: keyword()) :: Relation.t() | nil @callback await_snapshot_start(GenServer.name(), shape_id()) :: :started | {:error, term()} @callback handle_truncate(GenServer.name(), shape_id()) :: :ok @callback clean_shape(GenServer.name(), shape_id()) :: :ok @@ -35,6 +38,7 @@ defmodule Electric.ShapeCache do alias Electric.Shapes.Querying alias Electric.Shapes.Shape alias Electric.Replication.LogOffset + alias Electric.Replication.Changes.{Relation, Column} alias Electric.Telemetry.OpenTelemetry use GenServer @@ -49,6 +53,8 @@ defmodule Electric.ShapeCache do @shape_meta_xmin_pos 3 @shape_meta_latest_offset_pos 4 + @relation_data :relation_data + @genserver_name_schema {:or, [:atom, {:tuple, [:atom, :atom, :any]}]} @schema NimbleOptions.new!( name: [ @@ -126,6 +132,37 @@ defmodule Electric.ShapeCache do ]) end + @spec store_relation(Relation.t(), keyword()) :: :ok + def store_relation(%Relation{} = rel, opts) do + store_relation_ets(rel, opts) + Storage.store_relation(rel, opts[:storage]) + end + + defp store_relation_ets(%Relation{id: id, schema: schema, table: table, columns: columns}, opts) do + meta_table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + cols = Enum.map(columns, fn col -> {col.name, col.type_oid} end) + :ets.insert(meta_table, {{@relation_data, id}, schema, table, cols}) + end + + @spec get_relation(Messages.relation_id(), opts :: keyword()) :: Relation.t() | nil + def get_relation(relation_id, opts) do + meta_table = Access.get(opts, :shape_meta_table, @default_shape_meta_table) + + case :ets.lookup(meta_table, {@relation_data, relation_id}) do + [] -> + nil + + [{_, schema, table, cols}] -> + %Relation{ + id: relation_id, + schema: schema, + table: table, + columns: + Enum.map(cols, fn {name, type_oid} -> %Column{name: name, type_oid: type_oid} end) + } + end + end + @spec clean_shape(GenServer.name(), String.t()) :: :ok def clean_shape(server \\ __MODULE__, shape_id) do GenServer.call(server, {:clean, shape_id}) @@ -155,6 +192,7 @@ defmodule Electric.ShapeCache do } recover_shapes(state) + recover_relations(state) {:ok, state} end @@ -382,4 +420,10 @@ defmodule Electric.ShapeCache do ) end) end + + defp recover_relations(state) do + state.storage + |> Storage.get_relations() + |> Enum.each(&store_relation_ets(&1, state)) + end end diff --git a/packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex b/packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex index 656e08b696..a0624902d2 100644 --- a/packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex @@ -2,6 +2,7 @@ defmodule Electric.ShapeCache.CubDbStorage do alias Electric.ConcurrentStream alias Electric.LogItems alias Electric.Replication.LogOffset + alias Electric.Replication.Changes.Relation alias Electric.Telemetry.OpenTelemetry @behaviour Electric.ShapeCache.Storage @@ -165,6 +166,15 @@ defmodule Electric.ShapeCache.CubDbStorage do :ok end + def store_relation(%Relation{id: id} = rel, opts) do + CubDB.put(opts.db, relation_key(id), rel) + end + + def get_relations(opts) do + CubDB.select(opts.db, min_key: relations_start(), max_key: relations_end()) + |> Stream.map(fn {_key, value} -> value end) + end + def cleanup!(shape_id, opts) do [ shape_key(shape_id), @@ -192,12 +202,24 @@ defmodule Electric.ShapeCache.CubDbStorage do {:shapes, shape_id} end + defp relation_key(relation_id) do + {:relations, relation_id} + end + + defp relations_start, do: relation_key(0) + # Atoms are always bigger than numbers + # Thus this key is bigger than any possible relation key + defp relations_end, do: relation_key(:max) + def xmin_key(shape_id) do {:snapshot_xmin, shape_id} end - defp shapes_start, do: shape_key(0) - defp shapes_end, do: shape_key("zzz-end") + defp shapes_start, do: shape_key("") + # Since strings in Elixir are encoded using UTF-8, + # it is impossible for any valid string to contain byte value 255. + # Thus any key will be smaller than this one. + defp shapes_end, do: shape_key(<<255>>) # FIXME: this is naive while we don't have snapshot metadata to get real offsets defp offset({_shape_id, @snapshot_key_type, _index}), do: @snapshot_offset diff --git a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex index 5ac3d7fbbb..2b4b6770b8 100644 --- a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex @@ -2,6 +2,7 @@ defmodule Electric.ShapeCache.InMemoryStorage do alias Electric.ConcurrentStream alias Electric.LogItems alias Electric.Replication.LogOffset + alias Electric.Replication.Changes.Relation alias Electric.Telemetry.OpenTelemetry use Agent @@ -141,6 +142,15 @@ defmodule Electric.ShapeCache.InMemoryStorage do :ok end + def store_relation(%Relation{}, _opts) do + # Relations are already stored in memory by the shape cache + # so no need to do anything here + end + + def get_relations(_opts) do + [] + end + def cleanup!(shape_id, opts) do :ets.match_delete(opts.snapshot_ets_table, {snapshot_key(shape_id, :_), :_}) :ets.match_delete(opts.log_ets_table, {{shape_id, :_}, :_}) diff --git a/packages/sync-service/lib/electric/shape_cache/storage.ex b/packages/sync-service/lib/electric/shape_cache/storage.ex index ef89a70f8b..20c90813c2 100644 --- a/packages/sync-service/lib/electric/shape_cache/storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/storage.ex @@ -69,6 +69,10 @@ defmodule Electric.ShapeCache.Storage do Enumerable.t() @doc "Check if log entry for given shape ID and offset exists" @callback has_log_entry?(shape_id(), LogOffset.t(), compiled_opts()) :: boolean() + @doc "Store a relation containing information about the schema of a table" + @callback store_relation(Relation.t(), compiled_opts()) :: :ok + @doc "Get all stored relations" + @callback get_relations(compiled_opts()) :: Enumerable.t(Relation.t()) @doc "Clean up snapshots/logs for a shape id" @callback cleanup!(shape_id(), compiled_opts()) :: :ok @@ -139,6 +143,15 @@ defmodule Electric.ShapeCache.Storage do def has_log_entry?(shape_id, offset, {mod, opts}), do: mod.has_log_entry?(shape_id, offset, opts) + @doc "Store a relation containing information about the schema of a table" + @spec store_relation(Relation.t(), storage()) :: :ok + def store_relation(relation, {mod, opts}), + do: mod.store_relation(relation, opts) + + @doc "Get all stored relations" + @spec get_relations(storage()) :: Enumerable.t(Relation.t()) + def get_relations({mod, opts}), do: mod.get_relations(opts) + @doc "Clean up snapshots/logs for a shape id" @spec cleanup!(shape_id(), storage()) :: :ok def cleanup!(shape_id, {mod, opts}), do: mod.cleanup!(shape_id, opts) diff --git a/packages/sync-service/lib/electric/shapes.ex b/packages/sync-service/lib/electric/shapes.ex index b3ca58ee00..2c0b496944 100644 --- a/packages/sync-service/lib/electric/shapes.ex +++ b/packages/sync-service/lib/electric/shapes.ex @@ -60,4 +60,16 @@ defmodule Electric.Shapes do shape_cache.clean_shape(server, shape_id) :ok end + + @spec clean_shapes([Storage.shape_id()], keyword()) :: :ok + def clean_shapes(shape_ids, opts \\ []) do + {shape_cache, opts} = Access.get(opts, :shape_cache, {ShapeCache, []}) + server = Access.get(opts, :server, shape_cache) + + for shape_id <- shape_ids do + shape_cache.clean_shape(server, shape_id) + end + + :ok + end end diff --git a/packages/sync-service/test/electric/postgres/replication_client/collector_test.exs b/packages/sync-service/test/electric/postgres/replication_client/collector_test.exs index 7672a5d82c..127a53f0b0 100644 --- a/packages/sync-service/test/electric/postgres/replication_client/collector_test.exs +++ b/packages/sync-service/test/electric/postgres/replication_client/collector_test.exs @@ -31,7 +31,7 @@ defmodule Electric.Postgres.ReplicationClient.CollectorTest do setup do collector = %Collector{} - collector = Collector.handle_message(@relation, collector) + {_relation, collector} = Collector.handle_message(@relation, collector) {:ok, collector: collector} end @@ -58,22 +58,11 @@ defmodule Electric.Postgres.ReplicationClient.CollectorTest do columns: [%LR.Relation.Column{name: "id", flags: [:key], type_oid: 23, type_modifier: -1}] } - updated_collector = Collector.handle_message(new_relation, collector) + {_rel, updated_collector} = Collector.handle_message(new_relation, collector) assert %Collector{relations: %{1 => @relation, 2 => ^new_relation}} = updated_collector end - test "collector logs a warning when receiving a new relation message that doesn't match the previous one", - %{collector: collector} do - new_relation = %{ - @relation - | columns: [%LR.Relation.Column{name: "id", flags: [:key], type_oid: 20, type_modifier: -1}] - } - - log = capture_log(fn -> Collector.handle_message(new_relation, collector) end) - assert log =~ "Schema for the table public.users had changed" - end - test "collector logs information when receiving a generic message", %{collector: collector} do message = %LR.Message{prefix: "test", content: "hello world"} diff --git a/packages/sync-service/test/electric/postgres/replication_client_test.exs b/packages/sync-service/test/electric/postgres/replication_client_test.exs index fc2e4180af..736128fc7b 100644 --- a/packages/sync-service/test/electric/postgres/replication_client_test.exs +++ b/packages/sync-service/test/electric/postgres/replication_client_test.exs @@ -29,7 +29,8 @@ defmodule Electric.Postgres.ReplicationClientTest do publication_name: @publication_name, try_creating_publication?: true, slot_name: @slot_name, - transaction_received: nil + transaction_received: nil, + relation_received: nil ] assert {:ok, _} = ReplicationClient.start_link(config, replication_opts) @@ -278,6 +279,7 @@ defmodule Electric.Postgres.ReplicationClientTest do state = ReplicationClient.State.new( transaction_received: nil, + relation_received: nil, publication_name: "", try_creating_publication?: false, slot_name: "" @@ -312,7 +314,8 @@ defmodule Electric.Postgres.ReplicationClientTest do publication_name: @publication_name, try_creating_publication?: false, slot_name: @slot_name, - transaction_received: {__MODULE__, :test_transaction_received, [self()]} + transaction_received: {__MODULE__, :test_transaction_received, [self()]}, + relation_received: {__MODULE__, :test_relation_received, [self()]} ] } end @@ -348,6 +351,10 @@ defmodule Electric.Postgres.ReplicationClientTest do :ok end + def test_relation_received(_change, _test_pid) do + :ok + end + defp gen_random_string(length) do Stream.repeatedly(fn -> :rand.uniform(125 - 32) + 32 end) |> Enum.take(length) diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 68c20bec38..9a040eed20 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -1,5 +1,6 @@ defmodule Electric.Replication.ShapeLogCollectorTest do use ExUnit.Case, async: true + import ExUnit.CaptureLog import Mox alias Electric.ShapeCache @@ -9,7 +10,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do alias Electric.Shapes.Shape alias Electric.Postgres.Lsn alias Electric.Replication.ShapeLogCollector - alias Electric.Replication.Changes.Transaction + alias Electric.Replication.Changes.{Relation, Transaction} alias Electric.Replication.Changes alias Electric.Replication.LogOffset @@ -26,6 +27,11 @@ defmodule Electric.Replication.ShapeLogCollectorTest do inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) ) + @similar_shape Shape.new!("public.test_table", + inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]), + where: "id > 5" + ) + @other_shape Shape.new!("public.other_table", inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) ) @@ -291,6 +297,146 @@ defmodule Electric.Replication.ShapeLogCollectorTest do end end + describe "handle_relation_msg/2" do + setup do + # Start a test Registry + registry_name = Module.concat(__MODULE__, Registry) + start_link_supervised!({Registry, keys: :duplicate, name: registry_name}) + + # Start the ShapeLogCollector process + opts = [ + name: :test_shape_log_storage, + registry: registry_name, + shape_cache: {MockShapeCache, []}, + inspector: {MockInspector, []} + ] + + {:ok, pid} = start_supervised({ShapeLogCollector, opts}) + %{server: pid, registry: registry_name} + end + + test "stores relation if it is not known", %{server: server} do + relation_id = "rel1" + + rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [] + } + + MockShapeCache + |> expect(:get_relation, 1, fn ^relation_id, _ -> nil end) + |> expect(:store_relation, 1, fn ^rel, _ -> :ok end) + |> allow(self(), server) + + assert :ok = ShapeLogCollector.handle_relation_msg(rel, server) + end + + test "does not clean shapes if relation didn't change", %{server: server} do + relation_id = "rel1" + + rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [] + } + + MockShapeCache + |> expect(:get_relation, 1, fn ^relation_id, _ -> rel end) + |> expect(:clean_shape, 0, fn _, _ -> :ok end) + |> allow(self(), server) + + assert :ok = ShapeLogCollector.handle_relation_msg(rel, server) + end + + test "cleans shapes affected by table renaming and logs a warning", %{server: server} do + relation_id = "rel1" + + shape_id1 = "shape1" + shape1 = @shape + + shape_id2 = "shape2" + shape2 = @similar_shape + + shape_id3 = "shape3" + shape3 = @other_shape + + # doesn't matter, isn't used for this test + xmin = 100 + + old_rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [] + } + + new_rel = %Relation{ + id: relation_id, + schema: "public", + table: "renamed_test_table", + columns: [] + } + + MockShapeCache + |> expect(:get_relation, 1, fn ^relation_id, _ -> old_rel end) + |> expect(:store_relation, 1, fn ^new_rel, _ -> :ok end) + |> expect(:list_active_shapes, 1, fn _ -> + [{shape_id1, shape1, xmin}, {shape_id2, shape2, xmin}, {shape_id3, shape3, xmin}] + end) + |> expect(:clean_shape, 1, fn _, ^shape_id1 -> :ok end) + |> expect(:clean_shape, 1, fn _, ^shape_id2 -> :ok end) + |> allow(self(), server) + + log = capture_log(fn -> ShapeLogCollector.handle_relation_msg(new_rel, server) end) + assert log =~ "Schema for the table public.test_table changed" + end + + test "cleans shapes affected by a relation change", %{server: server} do + relation_id = "rel1" + + shape_id1 = "shape1" + shape1 = @shape + + shape_id2 = "shape2" + shape2 = @similar_shape + + shape_id3 = "shape3" + shape3 = @other_shape + + # doesn't matter, isn't used for this test + xmin = 100 + + old_rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [{"id", "float4"}] + } + + new_rel = %Relation{ + id: relation_id, + schema: "public", + table: "test_table", + columns: [{"id", "int8"}] + } + + MockShapeCache + |> expect(:get_relation, 1, fn ^relation_id, _ -> old_rel end) + |> expect(:store_relation, 1, fn ^new_rel, _ -> :ok end) + |> expect(:list_active_shapes, fn _ -> + [{shape_id1, shape1, xmin}, {shape_id2, shape2, xmin}, {shape_id3, shape3, xmin}] + end) + |> expect(:clean_shape, 1, fn _, ^shape_id1 -> :ok end) + |> expect(:clean_shape, 1, fn _, ^shape_id2 -> :ok end) + |> allow(self(), server) + + assert :ok = ShapeLogCollector.handle_relation_msg(new_rel, server) + end + end + @basic_query_meta %Postgrex.Query{columns: ["id"], result_types: [:text], name: "key_prefix"} describe "store_transaction/2 with real storage" do diff --git a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs index b60b88afa2..a52ccc645c 100644 --- a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs +++ b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs @@ -6,6 +6,7 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do alias Electric.Postgres.Lsn alias Electric.Replication.LogOffset alias Electric.Replication.Changes + alias Electric.Replication.Changes.{Relation, Column} alias Electric.ShapeCache.CubDbStorage alias Electric.ShapeCache.InMemoryStorage alias Electric.Shapes.Shape @@ -524,7 +525,7 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do end end - # Tests for storage implimentations that are recoverable + # Tests for storage implementations that are recoverable for module <- [CubDbStorage] do module_name = module |> Module.split() |> List.last() @@ -681,6 +682,53 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do assert storage.snapshot_started?("shape-3", opts) == false end end + + describe "#{module_name}.get_relations/1" do + setup do + {:ok, %{module: unquote(module)}} + end + + setup :start_storage + + test "returns all stored relations", %{module: storage, opts: opts} do + rel1 = %Relation{ + id: 1, + schema: "public", + table: "table1", + columns: [ + %Column{name: "a", type_oid: 1}, + %Column{name: "b", type_oid: 2} + ] + } + + rel1_updated = %Relation{ + id: 1, + schema: "public", + table: "table1", + columns: [ + %Column{name: "a", type_oid: 1}, + %Column{name: "b", type_oid: 2}, + %Column{name: "c", type_oid: 3} + ] + } + + rel2 = %Relation{ + id: 2, + schema: "public", + table: "table1", + columns: [ + %Column{name: "a", type_oid: 1}, + %Column{name: "b", type_oid: 2} + ] + } + + storage.store_relation(rel1, opts) + storage.store_relation(rel1_updated, opts) + storage.store_relation(rel2, opts) + + assert [^rel1_updated, ^rel2] = storage.get_relations(opts) |> Enum.to_list() + end + end end defp start_storage(%{module: module} = context) do diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index a1ebfc3a11..68de078d68 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -8,6 +8,7 @@ defmodule Electric.ShapeCacheTest do import Support.TestUtils alias Electric.Replication.Changes + alias Electric.Replication.Changes.{Relation, Column} alias Electric.Replication.LogOffset alias Electric.ShapeCache alias Electric.ShapeCache.Storage @@ -714,6 +715,24 @@ defmodule Electric.ShapeCacheTest do assert {^shape_id, ^offset} = ShapeCache.get_or_create_shape_id(@shape, opts) end + test "restores relations", %{shape_cache_opts: opts} = context do + rel = %Relation{ + id: 42, + schema: "public", + table: "items", + columns: [ + %Column{name: "id", type_oid: 9}, + %Column{name: "value", type_oid: 2} + ] + } + + :ok = ShapeCache.store_relation(rel, opts) + assert ^rel = ShapeCache.get_relation(rel.id, opts) + + restart_shape_cache(context) + assert ^rel = ShapeCache.get_relation(rel.id, opts) + end + defp restart_shape_cache(context) do stop_shape_cache(context) # Wait 1 millisecond to ensure shape IDs are not generated the same diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index d4f9bf2e9f..160edc79a4 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -93,7 +93,9 @@ defmodule Support.ComponentSetup do try_creating_publication?: true, slot_name: ctx.slot_name, transaction_received: - {Electric.Replication.ShapeLogCollector, :store_transaction, [ctx.shape_log_collector]} + {Electric.Replication.ShapeLogCollector, :store_transaction, [ctx.shape_log_collector]}, + relation_received: + {Electric.Replication.ShapeLogCollector, :handle_relation_msg, [ctx.shape_log_collector]} ] {:ok, pid} = ReplicationClient.start_link(ctx.db_config, replication_opts)