Skip to content

Commit

Permalink
feat: Restoring shapes only through the Storage interface (#1808)
Browse files Browse the repository at this point in the history
More radical alternative to
#1805

Addresses #1802

Moves the shape definition to the `Storage` interface.

I've started with just making `ShapeStatus` use the `Storage` for
recovering shapes - but if we also store OIDs of the relation we can
also do away with storing relations and get rid of any persistence in
`ShapeStatus`

I believe this is mergeable if we want to move at least shape
definitions to the storage and think about relations in a separate PR
  • Loading branch information
msfstef authored Oct 9, 2024
1 parent edb0f72 commit 14681cc
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .changeset/weak-chairs-type.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Store shape definitions along with shape data and use that to restore them instead of persisted cached metadata. This removes the unified serilization and persistence of all shape metadata and allows better scaling of speed of shape creation.
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ defmodule Electric.ShapeCache do
{:ok, persistent_state} =
opts.shape_status.initialise(
persistent_kv: opts.persistent_kv,
shape_meta_table: opts.shape_meta_table
shape_meta_table: opts.shape_meta_table,
storage: opts.storage
)

state = %{
Expand Down
72 changes: 69 additions & 3 deletions packages/sync-service/lib/electric/shape_cache/file_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,23 @@ defmodule Electric.ShapeCache.FileStorage do
@version 2
@version_key :version

@shape_definition_file_name "shape_defintion.json"

@xmin_key :snapshot_xmin
@snapshot_meta_key :snapshot_meta
@snapshot_started_key :snapshot_started

@behaviour Electric.ShapeCache.Storage

defstruct [:base_path, :shape_id, :db, :cubdb_dir, :snapshot_dir, version: @version]
defstruct [
:base_path,
:shape_id,
:db,
:cubdb_dir,
:shape_definition_dir,
:snapshot_dir,
version: @version
]

@impl Electric.ShapeCache.Storage
def shared_opts(opts) do
Expand All @@ -37,7 +47,8 @@ defmodule Electric.ShapeCache.FileStorage do
shape_id: shape_id,
db: name(electric_instance_id, shape_id),
cubdb_dir: Path.join([base_path, shape_id, "cubdb"]),
snapshot_dir: Path.join([base_path, shape_id, "snapshots"])
snapshot_dir: Path.join([base_path, shape_id, "snapshots"]),
shape_definition_dir: Path.join([base_path, shape_id])
}
end

Expand All @@ -62,7 +73,8 @@ defmodule Electric.ShapeCache.FileStorage do
end

defp initialise_filesystem(opts) do
with :ok <- File.mkdir_p(opts.cubdb_dir),
with :ok <- File.mkdir_p(opts.shape_definition_dir),
:ok <- File.mkdir_p(opts.cubdb_dir),
:ok <- File.mkdir_p(opts.snapshot_dir) do
:ok
end
Expand All @@ -73,13 +85,61 @@ defmodule Electric.ShapeCache.FileStorage do
stored_version = stored_version(opts)

if stored_version != opts.version || snapshot_xmin(opts) == nil ||
not File.exists?(shape_definition_path(opts)) ||
not CubDB.has_key?(opts.db, @snapshot_meta_key) do
cleanup!(opts)
end

CubDB.put(opts.db, @version_key, @version)
end

@impl Electric.ShapeCache.Storage
def set_shape_definition(shape, %FS{} = opts) do
file_path = shape_definition_path(opts)
encoded_shape = Jason.encode!(shape)

case File.write(file_path, encoded_shape, [:exclusive]) do
:ok ->
:ok

{:error, :eexist} ->
# file already exists - by virtue of the shape ID being the hash of the
# definition we do not need to compare them
:ok

{:error, reason} ->
raise "Failed to write shape definition to file: #{reason}"
end
end

@impl Electric.ShapeCache.Storage
def get_all_stored_shapes(%{base_path: base_path}) do
case File.ls(base_path) do
{:ok, shape_ids} ->
Enum.reduce(shape_ids, %{}, fn shape_id, acc ->
shape_def_path =
shape_definition_path(%{shape_definition_dir: Path.join(base_path, shape_id)})

with {:ok, shape_def_encoded} <- File.read(shape_def_path),
{:ok, shape_def_json} <- Jason.decode(shape_def_encoded),
shape = Electric.Shapes.Shape.from_json_safe!(shape_def_json) do
Map.put(acc, shape_id, shape)
else
# if the shape definition file cannot be read/decoded, just ignore it
{:error, _reason} -> acc
end
end)
|> then(&{:ok, &1})

{:error, :enoent} ->
# if not present, there's no stored shapes
{:ok, %{}}

{:error, reason} ->
{:error, reason}
end
end

@impl Electric.ShapeCache.Storage
def get_current_position(%FS{} = opts) do
{:ok, latest_offset(opts), snapshot_xmin(opts)}
Expand Down Expand Up @@ -253,9 +313,15 @@ defmodule Electric.ShapeCache.FileStorage do

{:ok, _} = File.rm_rf(shape_snapshot_path(opts))

{:ok, _} = File.rm_rf(shape_definition_path(opts))

:ok
end

defp shape_definition_path(%{shape_definition_dir: shape_definition_dir} = _opts) do
Path.join(shape_definition_dir, @shape_definition_file_name)
end

defp keys_from_range(min_key, max_key, opts) do
CubDB.select(opts.db, min_key: min_key, max_key: max_key)
|> Stream.map(&elem(&1, 0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ defmodule Electric.ShapeCache.InMemoryStorage do
@impl Electric.ShapeCache.Storage
def initialise(%MS{} = _opts), do: :ok

@impl Electric.ShapeCache.Storage
def set_shape_definition(_shape, %MS{} = _opts) do
# no-op - only used to restore shapes between sessions
:ok
end

@impl Electric.ShapeCache.Storage
def get_all_stored_shapes(_opts) do
# shapes not stored, empty map returned
{:ok, %{}}
end

@impl Electric.ShapeCache.Storage
def snapshot_started?(%MS{} = opts) do
try do
Expand Down
40 changes: 21 additions & 19 deletions packages/sync-service/lib/electric/shape_cache/shape_status.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,26 @@ defmodule Electric.ShapeCache.ShapeStatus do
"""
alias Electric.PersistentKV
alias Electric.Shapes.Shape
alias Electric.ShapeCache.Storage
alias Electric.Replication.LogOffset
alias Electric.Replication.Changes.{Column, Relation}

@schema NimbleOptions.new!(
persistent_kv: [type: :any, required: true],
shape_meta_table: [type: {:or, [:atom, :reference]}, required: true],
storage: [type: :mod_arg, required: true],
root: [type: :string, default: "./shape_cache"]
)

defstruct [:persistent_kv, :root, :shape_meta_table]
defstruct [:persistent_kv, :root, :shape_meta_table, :storage]

@type shape_id() :: Electric.ShapeCacheBehaviour.shape_id()
@type xmin() :: Electric.ShapeCacheBehaviour.xmin()
@type table() :: atom() | reference()
@type t() :: %__MODULE__{
persistent_kv: PersistentKV.t(),
root: String.t(),
storage: Storage.storage(),
shape_meta_table: table()
}
@type option() :: unquote(NimbleOptions.option_typespec(@schema))
Expand All @@ -82,26 +85,31 @@ defmodule Electric.ShapeCache.ShapeStatus do
def initialise(opts) do
with {:ok, config} <- NimbleOptions.validate(opts, @schema),
{:ok, kv_backend} <- Access.fetch(config, :persistent_kv),
{:ok, table_name} = Access.fetch(config, :shape_meta_table) do
{:ok, table_name} = Access.fetch(config, :shape_meta_table),
{:ok, storage} = Access.fetch(config, :storage) do
persistent_kv =
PersistentKV.Serialized.new!(
backend: kv_backend,
decoder: {__MODULE__, :decode_shapes, []}
decoder: {__MODULE__, :decode_relations, []}
)

meta_table = :ets.new(table_name, [:named_table, :public, :ordered_set])

state =
struct(
__MODULE__,
Keyword.merge(config, persistent_kv: persistent_kv, shape_meta_table: meta_table)
Keyword.merge(config,
persistent_kv: persistent_kv,
shape_meta_table: meta_table,
storage: storage
)
)

load(state)
end
end

@spec add_shape(t(), Shape.t()) :: {:ok, shape_id(), LogOffset.t()} | {:error, term()}
@spec add_shape(t(), Shape.t()) :: {:ok, shape_id()} | {:error, term()}
def add_shape(state, shape) do
{hash, shape_id} = Shape.generate_id(shape)
# fresh snapshots always start with a zero offset - only once they
Expand All @@ -117,9 +125,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
]
)

with :ok <- save(state) do
{:ok, shape_id}
end
{:ok, shape_id}
end

@spec list_shapes(t()) :: [{shape_id(), Shape.t()}]
Expand Down Expand Up @@ -151,9 +157,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
]
)

with :ok <- save(state) do
{:ok, shape}
end
{:ok, shape}
rescue
# Sometimes we're calling cleanup when snapshot creation has failed for
# some reason. In those cases we're not sure about the state of the ETS
Expand Down Expand Up @@ -295,11 +299,10 @@ defmodule Electric.ShapeCache.ShapeStatus do
end

@doc false
def decode_shapes(json) do
with {:ok, %{"shapes" => shapes, "relations" => relations}} <- Jason.decode(json) do
def decode_relations(json) do
with {:ok, %{"relations" => relations}} <- Jason.decode(json) do
{:ok,
%{
shapes: Map.new(shapes, fn {id, shape} -> {id, Shape.from_json_safe!(shape)} end),
relations:
Map.new(relations, fn %{"id" => id} = relation ->
{id, relation_from_json(relation)}
Expand All @@ -325,21 +328,20 @@ defmodule Electric.ShapeCache.ShapeStatus do
end

defp save(state) do
shapes = Map.new(list_shapes(state))
relations = list_relations(state)

PersistentKV.set(
state.persistent_kv,
key(state),
%{
shapes: shapes,
relations: relations
}
)
end

defp load(state) do
with {:ok, %{shapes: shapes, relations: relations}} <- load_shapes(state) do
with {:ok, %{relations: relations}} <- load_relations(state),
{:ok, shapes} <- Storage.get_all_stored_shapes(state.storage) do
:ets.insert(
state.shape_meta_table,
Enum.concat([
Expand All @@ -363,9 +365,9 @@ defmodule Electric.ShapeCache.ShapeStatus do
end
end

defp load_shapes(state) do
defp load_relations(state) do
case PersistentKV.get(state.persistent_kv, key(state)) do
{:ok, %{shapes: _shapes, relations: _relations} = data} ->
{:ok, %{relations: _relations} = data} ->
{:ok, data}

{:error, :not_found} ->
Expand Down
18 changes: 18 additions & 0 deletions packages/sync-service/lib/electric/shape_cache/storage.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Electric.ShapeCache.Storage do
import Electric.Replication.LogOffset, only: [is_log_offset_lt: 2]

alias Electric.Shapes.Shape
alias Electric.Shapes.Querying
alias Electric.Replication.LogOffset

Expand Down Expand Up @@ -32,6 +33,13 @@ defmodule Electric.ShapeCache.Storage do
@doc "Run any initial setup tasks"
@callback initialise(shape_opts()) :: :ok

@doc "Store the shape definition"
@callback set_shape_definition(Shape.t(), shape_opts()) :: :ok

@doc "Retrieve all stored shapes"
@callback get_all_stored_shapes(compiled_opts()) ::
{:ok, %{shape_id() => Shape.t()}} | {:error, term()}

@doc """
Get the current xmin and offset for the shape storage.
Expand Down Expand Up @@ -121,6 +129,16 @@ defmodule Electric.ShapeCache.Storage do
mod.initialise(shape_opts)
end

@impl __MODULE__
def set_shape_definition(shape, {mod, shape_opts}) do
mod.set_shape_definition(shape, shape_opts)
end

@impl __MODULE__
def get_all_stored_shapes({mod, opts}) do
mod.get_all_stored_shapes(opts)
end

@impl __MODULE__
def get_current_position({mod, shape_opts}) do
mod.get_current_position(shape_opts)
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ defmodule Electric.Shapes.Consumer do

:ok = ShapeCache.Storage.initialise(storage)

# Store the shape definition to ensure we can restore it
:ok = ShapeCache.Storage.set_shape_definition(config.shape, storage)

{:ok, latest_offset, snapshot_xmin} = ShapeCache.Storage.get_current_position(storage)

:ok =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
alias Electric.Replication.LogOffset

alias Support.Mock
import Support.ComponentSetup, only: [with_electric_instance_id: 1]
import Support.ComponentSetup, only: [with_electric_instance_id: 1, with_in_memory_storage: 1]

import Mox

@moduletag :capture_log

setup :verify_on_exit!
setup :with_electric_instance_id
setup [:with_electric_instance_id, :with_in_memory_storage]

setup(ctx) do
# Start a test Registry
Expand All @@ -32,7 +32,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
{:ok, pid} = start_supervised({ShapeLogCollector, opts})

Mock.ShapeStatus
|> expect(:initialise, 1, fn opts -> Electric.ShapeCache.ShapeStatus.initialise(opts) end)
|> expect(:initialise, 1, fn _opts -> {:ok, %{}} end)
|> expect(:list_shapes, 1, fn _ -> [] end)
# allow the ShapeCache to call this mock
|> allow(self(), fn -> GenServer.whereis(Electric.ShapeCache) end)
Expand Down
Loading

0 comments on commit 14681cc

Please sign in to comment.