Skip to content

Commit

Permalink
Clean shapes on PITR
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-dp committed Aug 19, 2024
1 parent e8b9b6b commit e1ff564
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 1 deletion.
6 changes: 5 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ defmodule Electric.Application do
shape_cache,
{Electric.Replication.ShapeLogCollector,
registry: Registry.ShapeChanges, shape_cache: shape_cache, inspector: inspector},
{Electric.TimelineCache, []},
{Electric.ConnectionManager,
connection_opts: Application.fetch_env!(:electric, :connection_opts),
replication_opts: [
Expand All @@ -45,7 +46,10 @@ defmodule Electric.Application do
name: Electric.DbPool,
pool_size: Application.fetch_env!(:electric, :db_pool_size),
types: PgInterop.Postgrex.Types
]},
],
shape_cache: {Electric.ShapeCache, []},
timeline_cache: Electric.TimelineCache
},
{Electric.Postgres.Inspector.EtsInspector, pool: Electric.DbPool},
{Bandit,
plug:
Expand Down
12 changes: 12 additions & 0 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ defmodule Electric.ConnectionManager do
def handle_continue(:start_connection_pool, state) do
case start_connection_pool(state.connection_opts, state.pool_opts) do
{:ok, pid} ->
Electric.Timeline.check(get_pg_timeline(pid), state)

# Now we have everything ready to start accepting and processing logical messages from
# Postgres.
Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid)
Expand Down Expand Up @@ -308,4 +310,14 @@ defmodule Electric.ConnectionManager do

Keyword.put(connection_opts, :socket_options, tcp_opts)
end

defp get_pg_timeline(conn) do
case Postgrex.query(conn, "SELECT timeline_id FROM pg_control_checkpoint()", []) do
{:ok, %Postgrex.Result{rows: [[timeline_id]]}} ->
{:ok, timeline_id}

{:error, reason} ->
{:error, reason}
end
end
end
21 changes: 21 additions & 0 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Electric.ShapeCacheBehaviour do
@callback await_snapshot_start(GenServer.name(), shape_id()) :: :started | {:error, term()}
@callback handle_truncate(GenServer.name(), shape_id()) :: :ok
@callback clean_shape(GenServer.name(), shape_id()) :: :ok
@callback clean_all_shapes(GenServer.name()) :: :ok
end

defmodule Electric.ShapeCache do
Expand Down Expand Up @@ -131,6 +132,11 @@ defmodule Electric.ShapeCache do
GenServer.call(server, {:clean, shape_id})
end

@spec clean_all_shapes(GenServer.name()) :: :ok
def clean_all_shapes(server \\ __MODULE__) do
GenServer.call(server, {:clean_all})
end

@spec handle_truncate(GenServer.name(), String.t()) :: :ok
def handle_truncate(server \\ __MODULE__, shape_id) do
GenServer.call(server, {:truncate, shape_id})
Expand Down Expand Up @@ -219,6 +225,12 @@ defmodule Electric.ShapeCache do
{:reply, :ok, state}
end

def handle_call({:clean_all}, _from, state) do
clean_up_all_shapes(state)
Logger.info("Cleaning up all shapes")
{:reply, :ok, state}
end

def handle_cast({:snapshot_xmin_known, shape_id, xmin}, state) do
if :ets.update_element(
state.shape_meta_table,
Expand Down Expand Up @@ -281,6 +293,15 @@ defmodule Electric.ShapeCache do
shape
end

defp clean_up_all_shapes(state) do
shape_ids = list_active_shapes(state) |> Enum.map(&elem(&1, 0))
# FIXME: is there a better way to do this?
# ideally we'd want an `:ets.select_delete` where the pattern matches any `shape_id` that is in the `shape_ids` list
for shape_id <- shape_ids do
clean_up_shape(state, shape_id)
end
end

defp maybe_start_snapshot(%{awaiting_snapshot_start: map} = state, shape_id, _)
when is_map_key(map, shape_id),
do: state
Expand Down
11 changes: 11 additions & 0 deletions packages/sync-service/lib/electric/shapes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,15 @@ defmodule Electric.Shapes do
shape_cache.clean_shape(server, shape_id)
:ok
end

@doc """
Clean up all data (meta data and shape log + snapshot) associated with all shapes
"""
@spec clean_all_shapes(keyword()) :: :ok
def clean_all_shapes(opts \\ []) do
{shape_cache, opts} = Access.get(opts, :shape_cache, {ShapeCache, []})
server = Access.get(opts, :server, shape_cache)
shape_cache.clean_all_shapes(server)
:ok
end
end
57 changes: 57 additions & 0 deletions packages/sync-service/lib/electric/timeline.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
defmodule Electric.Timeline do
@moduledoc """
Module containing helper functions for handling Postgres timelines.
"""
require Logger
alias Electric.Shapes
alias Electric.TimelineCache

@type timeline :: integer() | nil

@doc """
Checks the provided `pg_timeline` against Electric's timeline.
Normally, Postgres and Electric are on the same timeline and nothing must be done.
If the timelines differ, that indicates that a Point In Time Recovery (PITR) has occurred and all shapes must be cleaned.
If we fail to fetch timeline information, we also clean all shapes for safety as we can't be sure that Postgres and Electric are on the same timeline.
"""
@spec check(timeline(), keyword()) :: :ok
def check(pg_timeline, opts) do
cache = Keyword.fetch!(opts, :timeline_cache)
electric_timeline = TimelineCache.get_timeline(cache)
handle(pg_timeline, electric_timeline, opts)
end

# Handles the timelines from Postgres and Electric.
# Normally, Postgres and Electric are on the same timeline and nothing must be done.
# If the timelines differ, that indicates that a Point In Time Recovery (PITR) has occurred and all shapes must be cleaned.
# If we fail to fetch timeline information, we also clean all shapes for safety as we can't be sure that Postgres and Electric are on the same timeline.
@spec handle(timeline(), timeline(), keyword()) :: :ok
defp handle(nil, _, opts) do
Logger.warning("Unknown Postgres timeline; rotating shapes.")
Shapes.clean_all_shapes(opts)
cache = Keyword.fetch!(opts, :timeline_cache)
TimelineCache.store_timeline(cache, nil)
end

defp handle(pg_timeline_id, electric_timeline_id, _opts)
when pg_timeline_id == electric_timeline_id do
Logger.info("Connected to Postgres timeline #{pg_timeline_id}")
:ok
end

defp handle(pg_timeline_id, nil, opts) do
Logger.info("No previous timeline detected.")
Logger.info("Connected to Postgres timeline #{pg_timeline_id}")
# Store new timeline
cache = Keyword.fetch!(opts, :timeline_cache)
TimelineCache.store_timeline(cache, pg_timeline_id)
end

defp handle(pg_timeline_id, _electric_timeline_id, opts) do
Logger.info("Detected PITR to timeline #{pg_timeline_id}; rotating shapes.")
Electric.Shapes.clean_all_shapes(opts)
# Store new timeline only after all shapes have been cleaned
cache = Keyword.fetch!(opts, :timeline_cache)
TimelineCache.store_timeline(cache, pg_timeline_id)
end
end
41 changes: 41 additions & 0 deletions packages/sync-service/lib/electric/timeline_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Electric.TimelineCache do
@moduledoc """
In-memory cache for storing the Postgres timeline on which Electric is running.
"""
use GenServer

def start_link(timeline_id \\ nil) when is_nil(timeline_id) or is_integer(timeline_id) do
GenServer.start_link(__MODULE__, timeline_id)
end

@doc """
Store the timeline ID on which Electric is running.
"""
@spec store_timeline(GenServer.name(), integer()) :: :ok
def store_timeline(server \\ __MODULE__, timeline_id) do
GenServer.call(server, {:store, timeline_id})
end

@doc """
Get the timeline ID on which Electric is running.
Returns nil if the timeline ID is not set.
"""
@spec get_timeline(GenServer.name()) :: integer() | nil
def get_timeline(server \\ __MODULE__) do
GenServer.call(server, :get)
end

@impl true
def init(timeline_id) do
{:ok, %{id: timeline_id}}
end

@impl true
def handle_call({:store, timeline_id}, _from, state) do
{:reply, :ok, %{state | id: timeline_id}}
end

def handle_call(:get, _from, state) do
{:reply, Map.get(state, :id, nil), state}
end
end
20 changes: 20 additions & 0 deletions packages/sync-service/test/electric/timeline_cache_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Electric.TimelineCacheTest do
use ExUnit.Case, async: false
alias Electric.TimelineCache

describe "get_timeline/1" do
test "returns the timeline ID" do
timeline = 5
{:ok, pid} = TimelineCache.start_link(timeline)
assert TimelineCache.get_timeline(pid) == timeline
end
end

describe "store_timeline/2" do
test "stores the timeline ID" do
{:ok, pid} = TimelineCache.start_link(3)
assert TimelineCache.store_timeline(pid, 4) == :ok
assert TimelineCache.get_timeline(pid) == 4
end
end
end
65 changes: 65 additions & 0 deletions packages/sync-service/test/electric/timeline_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
defmodule Electric.TimelineTest do
use ExUnit.Case, async: true
alias Electric.Timeline
alias Electric.TimelineCache
alias Electric.ShapeCacheMock

import Mox

describe "check/2" do
setup context do
timeline = context[:electric_timeline]

pid =
case timeline do
nil ->
{:ok, pid} = TimelineCache.start_link()
pid

_ ->
{:ok, pid} = TimelineCache.start_link(timeline)
pid
end

opts = [timeline_cache: pid, shape_cache: {ShapeCacheMock, []}]
{:ok, [timeline: timeline, opts: opts]}
end

@tag electric_timeline: nil
test "stores the Postgres timeline if Electric has no timeline yet", %{opts: opts} do
timeline = 5
assert :ok = Timeline.check(timeline, opts)
assert ^timeline = TimelineCache.get_timeline(opts[:timeline_cache])
end

@tag electric_timeline: 3
test "proceeds without changes if Postgres' timeline matches Electric's timeline", %{
timeline: timeline,
opts: opts
} do
assert :ok = Timeline.check(timeline, opts)
assert ^timeline = TimelineCache.get_timeline(opts[:timeline_cache])
end

@tag electric_timeline: 3
test "cleans all shapes if Postgres' timeline does not match Electric's timeline", %{
opts: opts
} do
ShapeCacheMock
|> expect(:clean_all_shapes, fn _ -> :ok end)

pg_timeline = 4
assert :ok = Timeline.check(pg_timeline, opts)
assert ^pg_timeline = TimelineCache.get_timeline(opts[:timeline_cache])
end

@tag electric_timeline: 3
test "cleans all shapes if Postgres' timeline is unknown", %{opts: opts} do
ShapeCacheMock
|> expect(:clean_all_shapes, fn _ -> :ok end)

assert :ok = Timeline.check(nil, opts)
assert TimelineCache.get_timeline(opts[:timeline_cache]) == nil
end
end
end

0 comments on commit e1ff564

Please sign in to comment.