Skip to content

Commit

Permalink
feat: move JSON serialization at snapshot creation to PostgreSQL query
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter committed Aug 16, 2024
1 parent 72460dd commit e80fd23
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 174 deletions.
36 changes: 1 addition & 35 deletions packages/sync-service/lib/electric/log_items.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
defmodule Electric.LogItems do
alias Electric.Replication.Changes
alias Electric.Replication.LogOffset
alias Electric.Shapes.Shape
alias Electric.Utils

@moduledoc """
Defines the structure and how to create the items in the log that the electric client reads.
The log_item() data structure is a map for ease of consumption in the Elixir code,
The log_item() data structure is a map for ease of consumption in the Elixir code,
however when JSON encoded (not done in this module) it's the format that the electric
client accepts.
"""
Expand Down Expand Up @@ -87,36 +85,4 @@ defmodule Electric.LogItems do

defp take_pks_or_all(record, []), do: record
defp take_pks_or_all(record, pks), do: Map.take(record, pks)

@spec from_snapshot_row_stream(
row_stream :: Stream.t(list()),
offset :: LogOffset.t(),
shape :: Shape.t(),
query_info :: %Postgrex.Query{}
) :: log_item()
def from_snapshot_row_stream(row_stream, offset, shape, query_info) do
Stream.map(row_stream, &from_snapshot_row(&1, offset, shape, query_info))
end

defp from_snapshot_row(row, offset, shape, query_info) do
value = value(row, query_info)

key = Changes.build_key(shape.root_table, value, Shape.pk(shape))

%{
key: key,
value: value,
headers: %{operation: :insert},
offset: offset
}
end

defp value(row, %Postgrex.Query{columns: columns, result_types: types}) do
[columns, types, row]
|> Enum.zip_with(fn
[col, Postgrex.Extensions.UUID, val] -> {col, Utils.encode_uuid(val)}
[col, _, val] -> {col, to_string(val)}
end)
|> Map.new()
end
end
4 changes: 2 additions & 2 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,12 @@ defmodule Electric.ShapeCache do
# formatting between snapshot and live log entries.
Enum.each(Electric.Postgres.display_settings(), &Postgrex.query!(conn, &1, []))

{query, stream} = Querying.stream_initial_data(conn, shape)
stream = Querying.stream_initial_data(conn, shape)
GenServer.cast(parent, {:snapshot_started, shape_id})

# could pass the shape and then make_new_snapshot! can pass it to row_to_snapshot_item
# that way it has the relation, but it is still missing the pk_cols
Storage.make_new_snapshot!(shape_id, shape, query, stream, storage)
Storage.make_new_snapshot!(shape_id, stream, storage)
end)
end)
end
Expand Down
12 changes: 4 additions & 8 deletions packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule Electric.ShapeCache.CubDbStorage do
alias Electric.ConcurrentStream
alias Electric.LogItems
alias Electric.Replication.LogOffset
alias Electric.Telemetry.OpenTelemetry
@behaviour Electric.ShapeCache.Storage
Expand Down Expand Up @@ -112,7 +111,6 @@ defmodule Electric.ShapeCache.CubDbStorage do
end
)
|> Stream.flat_map(fn {_, items} -> items end)
|> Stream.map(fn {_, item} -> item end)

# FIXME: this is naive while we don't have snapshot metadata to get real offset
{@snapshot_offset, stream}
Expand Down Expand Up @@ -141,16 +139,14 @@ defmodule Electric.ShapeCache.CubDbStorage do
CubDB.put(opts.db, snapshot_start(shape_id), 0)
end

def make_new_snapshot!(shape_id, shape, query_info, data_stream, opts) do
def make_new_snapshot!(shape_id, data_stream, opts) do
OpenTelemetry.with_span("storage.make_new_snapshot", [storage_impl: "cub_db"], fn ->
data_stream
|> LogItems.from_snapshot_row_stream(@snapshot_offset, shape, query_info)
|> Stream.chunk_every(500)
|> Stream.with_index()
|> Stream.map(fn {log_item, index} ->
{snapshot_key(shape_id, index), Jason.encode!(log_item)}
|> Stream.each(fn {chunk, index} ->
CubDB.put(opts.db, snapshot_key(shape_id, index), chunk)
end)
|> Stream.chunk_every(500)
|> Stream.each(fn [{key, _} | _] = chunk -> CubDB.put(opts.db, key, chunk) end)
|> Stream.run()

CubDB.put(opts.db, snapshot_end(shape_id), 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule Electric.ShapeCache.InMemoryStorage do
alias Electric.ConcurrentStream
alias Electric.LogItems
alias Electric.Replication.LogOffset
alias Electric.Telemetry.OpenTelemetry
use Agent
Expand Down Expand Up @@ -62,7 +61,7 @@ defmodule Electric.ShapeCache.InMemoryStorage do
])
end
)
|> Stream.map(fn {_, item} -> item end)
|> Stream.flat_map(fn {_, item} -> item end)

{@snapshot_offset, stream}
end
Expand Down Expand Up @@ -100,27 +99,20 @@ defmodule Electric.ShapeCache.InMemoryStorage do

def mark_snapshot_as_started(shape_id, opts) do
:ets.insert(opts.snapshot_ets_table, {snapshot_start(shape_id), 0})
:ok
end

@spec make_new_snapshot!(
String.t(),
Electric.Shapes.Shape.t(),
Postgrex.Query.t(),
Enumerable.t(),
map()
) :: :ok
def make_new_snapshot!(shape_id, shape, query_info, data_stream, opts) do
@spec make_new_snapshot!(String.t(), Enumerable.t(), map()) :: :ok
def make_new_snapshot!(shape_id, data_stream, opts) do
OpenTelemetry.with_span("storage.make_new_snapshot", [storage_impl: "in_memory"], fn ->
ets_table = opts.snapshot_ets_table

data_stream
|> LogItems.from_snapshot_row_stream(@snapshot_offset, shape, query_info)
|> Stream.chunk_every(500)
|> Stream.with_index(1)
|> Stream.map(fn {log_item, index} ->
{snapshot_key(shape_id, index), Jason.encode!(log_item)}
|> Stream.each(fn {chunk, index} ->
:ets.insert(ets_table, {snapshot_key(shape_id, index), chunk})
end)
|> Stream.chunk_every(500)
|> Stream.each(fn chunk -> :ets.insert(ets_table, chunk) end)
|> Stream.run()

:ets.insert(ets_table, {snapshot_end(shape_id), 0})
Expand Down
16 changes: 5 additions & 11 deletions packages/sync-service/lib/electric/shape_cache/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,13 @@ defmodule Electric.ShapeCache.Storage do
@doc "Get the full snapshot for a given shape, also returning the offset this snapshot includes"
@callback get_snapshot(shape_id(), compiled_opts()) :: {offset :: LogOffset.t(), log()}
@doc """
Make a new snapshot for a shape ID based on the meta information about the table and a stream of plain string rows
Make a new snapshot for a shape ID based on the meta information about the table and a stream of log entries preformatted as JSON strings.
Should raise an error if making the snapshot had failed for any reason.
"""
@doc unstable: "The meta information about the single table is subject to change"
@callback make_new_snapshot!(
shape_id(),
Electric.Shapes.Shape.t(),
Postgrex.Query.t(),
Enumerable.t(row()),
Enumerable.t(String.t()),
compiled_opts()
) :: :ok
@callback mark_snapshot_as_started(shape_id, compiled_opts()) :: :ok
Expand Down Expand Up @@ -102,18 +99,15 @@ defmodule Electric.ShapeCache.Storage do
def get_snapshot(shape_id, {mod, opts}), do: mod.get_snapshot(shape_id, opts)

@doc """
Make a new snapshot for a shape ID based on the meta information about the table and a stream of plain string rows
Make a new snapshot for a shape ID based on the meta information about the table and a stream of log entries preformatted as JSON strings.
"""
@doc unstable: "The meta information about the single table is subject to change"
@spec make_new_snapshot!(
shape_id(),
Electric.Shapes.Shape.t(),
Postgrex.Query.t(),
Enumerable.t(row()),
storage()
) :: :ok
def make_new_snapshot!(shape_id, shape, meta, stream, {mod, opts}),
do: mod.make_new_snapshot!(shape_id, shape, meta, stream, opts)
def make_new_snapshot!(shape_id, stream, {mod, opts}),
do: mod.make_new_snapshot!(shape_id, stream, opts)

@spec mark_snapshot_as_started(shape_id, compiled_opts()) :: :ok
def mark_snapshot_as_started(shape_id, {mod, opts}),
Expand Down
115 changes: 98 additions & 17 deletions packages/sync-service/lib/electric/shapes/querying.ex
Original file line number Diff line number Diff line change
@@ -1,39 +1,120 @@
defmodule Electric.Shapes.Querying do
alias Electric.Replication.LogOffset
alias Electric.Utils
alias Electric.Shapes.Shape
alias Electric.Telemetry.OpenTelemetry

@type row :: [term()]

@spec stream_initial_data(DBConnection.t(), Shape.t()) ::
{Postgrex.Query.t(), Enumerable.t(row())}
@doc """
Streams the initial data for a shape. Query results are returned as a stream of JSON strings, as prepared on PostgreSQL.
"""
@spec stream_initial_data(DBConnection.t(), Shape.t()) :: Enumerable.t(String.t())
def stream_initial_data(conn, %Shape{root_table: root_table, table_info: table_info} = shape) do
OpenTelemetry.with_span("querying.stream_initial_data", [], fn ->
table = Utils.relation_to_sql(root_table)

where =
if not is_nil(shape.where), do: " WHERE " <> shape.where.query, else: ""

query =
Postgrex.prepare!(
conn,
table,
~s|SELECT #{columns(table_info, root_table)} FROM #{table} #{where}|
)
{json_like_select, params} = json_like_select(table_info, root_table, Shape.pk(shape))

stream =
Postgrex.stream(conn, query, [])
|> Stream.flat_map(& &1.rows)
query =
Postgrex.prepare!(conn, table, ~s|SELECT #{json_like_select} FROM #{table} #{where}|)

{query, stream}
Postgrex.stream(conn, query, params)
|> Stream.flat_map(& &1.rows)
|> Stream.map(&hd/1)
end)
end

defp columns(table_info, root_table) do
defp json_like_select(table_info, root_table, pk_cols) do
columns = get_column_names(table_info, root_table)

key_part = build_key_part(root_table, pk_cols)
value_part = build_value_part(columns)
headers_part = build_headers_part(root_table)
offset_part = ~s['"offset":"#{LogOffset.first()}"']

# We're building a JSON string that looks like this:
#
# {
# "key": "\"public\".\"test_table\"/\"1\"",
# "value": {
# "id": "1",
# "name": "John Doe",
# "email": "[email protected]",
# "nullable": null
# },
# "headers": {"operation": "insert", "relation": ["public", "test_table"]},
# "offset": "0_0"
# }
query =
~s['{' || #{key_part} || ',' || #{value_part} || ',' || #{headers_part} || ',' || #{offset_part} || '}']

{query, []}
end

defp get_column_names(table_info, root_table) do
table_info
|> Map.fetch!(root_table)
|> Map.fetch!(:columns)
|> Enum.map(&~s("#{Utils.escape_quotes(&1.name)}"::text))
|> Enum.join(", ")
|> Enum.map(& &1.name)
end

defp build_headers_part(root_table) do
~s['"headers":{"operation":"insert","relation":#{build_relation_header(root_table)}}']
end

defp build_relation_header({schema, table}) do
~s'["#{escape_sql_json_interpolation(schema)}","#{escape_sql_json_interpolation(table)}"]'
end

defp build_key_part(root_table, pk_cols) do
pk_part = join_primary_keys(pk_cols)

# Because relation part of the key is known at query building time, we can use $1 to inject escaped version of the relation
~s['"key":' || ] <> pg_escape_string_for_json(~s['#{escape_relation(root_table)}' || '/"' || #{pk_part} || '"'])
end

defp join_primary_keys(pk_cols) do
pk_cols
|> Enum.map(&pg_cast_column_to_text/1)
|> Enum.map(&~s[replace(#{&1}, '/', '//')])
# NULL values are not allowed in PKs, but they are possible on pk-less tables where we consider all columns to be PKs
|> Enum.map(&~s[coalesce(#{&1}, '')])
|> Enum.join(~s[ || '"/"' || ])
end

defp build_value_part(columns) do
column_parts = Enum.map(columns, &build_column_part/1)
~s['"value":{' || #{Enum.join(column_parts, " || ',' || ")} || '}']
end

defp build_column_part(column) do
escaped_name = escape_sql_json_interpolation(column)
escaped_value = escape_column_value(column)

# Since `||` returns NULL if any of the arguments is NULL, we need to use `coalesce` to handle NULL values
~s['"#{escaped_name}":' || #{pg_coalesce_json_string(escaped_value)}]
end

defp escape_sql_json_interpolation(str) do
str
|> String.replace(~S|"|, ~S|\"|)
|> String.replace(~S|'|, ~S|''|)
end

defp escape_relation(relation) do
relation |> Utils.relation_to_sql() |> String.replace(~S|'|, ~S|''|)
end

defp escape_column_value(column) do
column
|> pg_cast_column_to_text()
|> pg_escape_string_for_json()
|> pg_coalesce_json_string()
end

defp pg_cast_column_to_text(column), do: ~s["#{Utils.escape_quotes(column)}"::text]
defp pg_escape_string_for_json(str), do: ~s[to_json(#{str})::text]
defp pg_coalesce_json_string(str), do: ~s[coalesce(#{str} , 'null')]
end
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,6 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
end
end

@basic_query_meta %Postgrex.Query{columns: ["id"], result_types: [:text], name: "key_prefix"}

describe "store_transaction/2 with real storage" do
setup [
{Support.ComponentSetup, :with_registry},
Expand All @@ -303,9 +301,9 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
%{shape_cache: shape_cache, shape_cache_opts: shape_cache_opts} =
Support.ComponentSetup.with_shape_cache(Map.put(ctx, :pool, nil),
prepare_tables_fn: fn _, _ -> :ok end,
create_snapshot_fn: fn parent, shape_id, shape, _, storage ->
create_snapshot_fn: fn parent, shape_id, _, _, storage ->
GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10})
Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage)
Storage.make_new_snapshot!(shape_id, [["test"]], storage)
GenServer.cast(parent, {:snapshot_started, shape_id})
end
)
Expand Down
Loading

0 comments on commit e80fd23

Please sign in to comment.