Skip to content

Commit

Permalink
feat (sync-service): validate shape ID matches shape definition from …
Browse files Browse the repository at this point in the history
…URL (#1678)

This PR fixes #1663.

The implemented semantics are the following.
- When shape id exists:
  - and is the currently active shape for this shape definition:
    - serve it normally
  - and is not the currently active shape for that shape definition:
- return 400 (because there must be a mismatch between shape ID and
shape def, otherwise this would be the active shape for that shape
definition)
- When shape id does not (or no longer) exist:
  - if there is an active shape for this shape definition:
    - return 409 that redirects to the active shape
  - if there is no active shape for this shape definition:
    - create the shape
    - return a 409 with a redirect to the newly created shape
  • Loading branch information
kevin-dp authored Sep 13, 2024
1 parent 7a07cde commit e3a07b7
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 43 deletions.
6 changes: 6 additions & 0 deletions .changeset/flat-ears-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@electric-sql/client": patch
"@core/sync-service": patch
---

Return 400 if shape ID does not match shape definition. Also handle 400 status codes on the client.
107 changes: 74 additions & 33 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ defmodule Electric.Plug.ServeShapePlug do

# We're starting listening as soon as possible to not miss stuff that was added since we've asked for last offset
plug :listen_for_new_changes
plug :validate_shape_offset
plug :determine_log_chunk_offset
plug :generate_etag
plug :validate_and_put_etag
Expand All @@ -146,61 +145,103 @@ defmodule Electric.Plug.ServeShapePlug do
end

defp load_shape_info(%Conn{} = conn, _) do
shape = conn.assigns.shape_definition

{shape_id, last_offset} =
Shapes.get_or_create_shape_id(conn.assigns.config, shape)

conn
|> assign(:active_shape_id, shape_id)
|> assign(:last_offset, last_offset)
|> put_resp_header("x-electric-shape-id", shape_id)
shape_info = get_or_create_shape_id(conn.assigns)
handle_shape_info(conn, shape_info)
end

defp schema(shape) do
shape.table_info
|> Map.fetch!(shape.root_table)
|> Map.fetch!(:columns)
|> Schema.from_column_info()
|> Jason.encode!()
# No shape_id is provided so we can get the existing one for this shape
# or create a new shape if it does not yet exist
defp get_or_create_shape_id(%{shape_definition: shape, config: config, shape_id: nil}) do
Shapes.get_or_create_shape_id(config, shape)
end

# Only adds schema header when not in live mode
defp put_schema_header(conn, _) when not conn.assigns.live do
shape = conn.assigns.shape_definition
put_resp_header(conn, "x-electric-schema", schema(shape))
# A shape ID is provided so we need to return the shape that matches the shape ID and the shape definition
defp get_or_create_shape_id(%{shape_definition: shape, config: config}) do
Shapes.get_shape(config, shape)
end

defp put_schema_header(conn, _), do: conn
defp handle_shape_info(
%Conn{assigns: %{shape_definition: shape, config: config, shape_id: shape_id}} = conn,
nil
) do
# There is no shape that matches the shape definition (because shape info is `nil`)
if shape_id != nil && Shapes.has_shape?(config, shape_id) do
# but there is a shape that matches the shape ID
# thus the shape ID does not match the shape definition
# and we return a 400 bad request status code
conn
|> send_resp(400, @must_refetch)
|> halt()
else
# The shape ID does not exist or no longer exists
# e.g. it may have been deleted.
# Hence, create a new shape for this shape definition
# and return a 409 with a redirect to the newly created shape.
# (will be done by the recursive `handle_shape_info` call)
shape_info = Shapes.get_or_create_shape_id(config, shape)
handle_shape_info(conn, shape_info)
end
end

# If the offset requested is -1, noop as we can always serve it
def validate_shape_offset(%Conn{assigns: %{offset: @before_all_offset}} = conn, _) do
# noop
defp handle_shape_info(
%Conn{assigns: %{shape_id: shape_id}} = conn,
{active_shape_id, last_offset}
)
when is_nil(shape_id) or shape_id == active_shape_id do
# We found a shape that matches the shape definition
# and the shape has the same ID as the shape ID provided by the user
conn
|> assign(:active_shape_id, active_shape_id)
|> assign(:last_offset, last_offset)
|> put_resp_header("x-electric-shape-id", active_shape_id)
end

# If the requested shape_id is not found, returns 409 along with a location redirect for clients to
# re-request the shape from scratch with the new shape id which acts as a consistent cache buster
# e.g. GET /v1/shape/{root_table}?shape_id={new_shape_id}&offset=-1
def validate_shape_offset(%Conn{} = conn, _) do
shape_id = conn.assigns.shape_id
active_shape_id = conn.assigns.active_shape_id
defp handle_shape_info(
%Conn{assigns: %{shape_id: shape_id, config: config}} = conn,
{active_shape_id, _}
) do
if Shapes.has_shape?(config, shape_id) do
# The shape with the provided ID exists but does not match the shape definition
# otherwise we would have found it and it would have matched the previous function clause
IO.puts("400 - SHAPE ID NOT FOUND")

conn
|> send_resp(400, @must_refetch)
|> halt()
else
# The requested shape_id is not found, returns 409 along with a location redirect for clients to
# re-request the shape from scratch with the new shape id which acts as a consistent cache buster
# e.g. GET /v1/shape/{root_table}?shape_id={new_shape_id}&offset=-1

if !Shapes.has_shape?(conn.assigns.config, shape_id) do
# TODO: discuss returning a 307 redirect rather than a 409, the client
# will have to detect this and throw out old data
conn
|> put_resp_header("x-electric-shape-id", active_shape_id)
|> put_resp_header(
"location",
"#{conn.request_path}?shape_id=#{active_shape_id}&offset=-1"
)
|> send_resp(409, @must_refetch)
|> halt()
else
conn
end
end

defp schema(shape) do
shape.table_info
|> Map.fetch!(shape.root_table)
|> Map.fetch!(:columns)
|> Schema.from_column_info()
|> Jason.encode!()
end

# Only adds schema header when not in live mode
defp put_schema_header(conn, _) when not conn.assigns.live do
shape = conn.assigns.shape_definition
put_resp_header(conn, "x-electric-schema", schema(shape))
end

defp put_schema_header(conn, _), do: conn

# If chunk offsets are available, use those instead of the latest available offset
# to optimize for cache hits and response sizes
defp determine_log_chunk_offset(%Conn{assigns: assigns} = conn, _) do
Expand Down
10 changes: 8 additions & 2 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ defmodule Electric.ShapeCacheBehaviour do
@doc "Update a shape's status with a new log offset"
@callback update_shape_latest_offset(shape_id(), LogOffset.t(), keyword()) :: :ok

@callback get_shape(shape_def(), opts :: keyword()) ::
{shape_id(), current_snapshot_offset :: LogOffset.t()}
@callback get_or_create_shape_id(shape_def(), opts :: keyword()) ::
{shape_id(), current_snapshot_offset :: LogOffset.t()}

Expand Down Expand Up @@ -86,12 +88,16 @@ defmodule Electric.ShapeCache do
end

@impl Electric.ShapeCacheBehaviour
def get_or_create_shape_id(shape, opts \\ []) do
def get_shape(shape, opts \\ []) do
table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
shape_status = Access.get(opts, :shape_status, ShapeStatus)
shape_status.get_existing_shape(table, shape)
end

@impl Electric.ShapeCacheBehaviour
def get_or_create_shape_id(shape, opts \\ []) do
# Get or create the shape ID and fire a snapshot if necessary
if shape_state = shape_status.get_existing_shape(table, shape) do
if shape_state = get_shape(shape, opts) do
shape_state
else
server = Access.get(opts, :server, __MODULE__)
Expand Down
10 changes: 10 additions & 0 deletions packages/sync-service/lib/electric/shapes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ defmodule Electric.Shapes do
end
end

@doc """
Get the shape that corresponds to this shape definition and return it along with the latest offset of the shape
"""
@spec get_shape(keyword(), Shape.t()) :: {shape_id(), LogOffset.t()}
def get_shape(config, shape_def) do
{shape_cache, opts} = Access.get(config, :shape_cache, {ShapeCache, []})

shape_cache.get_shape(shape_def, opts)
end

@doc """
Get or create a shape ID and return it along with the latest offset of the shape
"""
Expand Down
69 changes: 69 additions & 0 deletions packages/sync-service/test/electric/plug/router_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,75 @@ defmodule Electric.Plug.RouterTest do
] = Jason.decode!(conn.resp_body)
end

test "GET receives 400 when shape ID does not match shape definition", %{
opts: opts
} do
where = "value ILIKE 'yes%'"

# Initial shape request
# forces the shape to be created
conn =
conn("GET", "/v1/shape/items", %{offset: "-1", where: where})
|> Router.call(opts)

assert %{status: 200} = conn
assert conn.resp_body != ""

shape_id = get_resp_shape_id(conn)
[next_offset] = Plug.Conn.get_resp_header(conn, "x-electric-chunk-last-offset")

# Make the next request but forget to include the where clause
conn =
conn("GET", "/v1/shape/items", %{offset: next_offset, shape_id: shape_id})
|> Router.call(opts)

assert %{status: 400} = conn
assert conn.resp_body == Jason.encode!([%{headers: %{control: "must-refetch"}}])
end

test "GET receives 409 to a newly created shape when shape ID is not found and no shape matches the shape definition",
%{
opts: opts
} do
# Make the next request but forget to include the where clause
conn =
conn("GET", "/v1/shape/items", %{offset: "0_0", shape_id: "nonexistent"})
|> Router.call(opts)

assert %{status: 409} = conn
assert conn.resp_body == Jason.encode!([%{headers: %{control: "must-refetch"}}])
new_shape_id = get_resp_header(conn, "x-electric-shape-id")

assert get_resp_header(conn, "location") ==
"/v1/shape/items?shape_id=#{new_shape_id}&offset=-1"
end

test "GET receives 409 when shape ID is not found but there is another shape matching the definition",
%{
opts: opts
} do
where = "value ILIKE 'yes%'"

# Initial shape request
# forces the shape to be created
conn =
conn("GET", "/v1/shape/items", %{offset: "-1", where: where})
|> Router.call(opts)

assert %{status: 200} = conn
assert conn.resp_body != ""

shape_id = get_resp_shape_id(conn)

# Request the same shape definition but with invalid shape_id
conn =
conn("GET", "/v1/shape/items", %{offset: "0_0", shape_id: "nonexistent", where: where})
|> Router.call(opts)

assert %{status: 409} = conn
[^shape_id] = Plug.Conn.get_resp_header(conn, "x-electric-shape-id")
end

@tag with_sql: [
"INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')"
]
Expand Down
62 changes: 55 additions & 7 deletions packages/sync-service/test/electric/plug/serve_shape_plug_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ defmodule Electric.Plug.ServeShapePlugTest do

test "returns log when offset is >= 0" do
Mock.ShapeCache
|> expect(:get_or_create_shape_id, fn @test_shape, _opts ->
|> expect(:get_shape, fn @test_shape, _opts ->
{@test_shape_id, @test_offset}
end)
|> stub(:has_shape?, fn @test_shape_id, _opts -> true end)
Expand Down Expand Up @@ -273,7 +273,7 @@ defmodule Electric.Plug.ServeShapePlugTest do

test "returns 304 Not Modified when If-None-Match matches ETag" do
Mock.ShapeCache
|> expect(:get_or_create_shape_id, fn @test_shape, _opts ->
|> expect(:get_shape, fn @test_shape, _opts ->
{@test_shape_id, @test_offset}
end)
|> stub(:has_shape?, fn @test_shape_id, _opts -> true end)
Expand Down Expand Up @@ -302,7 +302,7 @@ defmodule Electric.Plug.ServeShapePlugTest do

test "handles live updates" do
Mock.ShapeCache
|> expect(:get_or_create_shape_id, fn @test_shape, _opts ->
|> expect(:get_shape, fn @test_shape, _opts ->
{@test_shape_id, @test_offset}
end)
|> stub(:has_shape?, fn @test_shape_id, _opts -> true end)
Expand Down Expand Up @@ -363,7 +363,7 @@ defmodule Electric.Plug.ServeShapePlugTest do

test "handles shape rotation" do
Mock.ShapeCache
|> expect(:get_or_create_shape_id, fn @test_shape, _opts ->
|> expect(:get_shape, fn @test_shape, _opts ->
{@test_shape_id, @test_offset}
end)
|> stub(:has_shape?, fn @test_shape_id, _opts -> true end)
Expand Down Expand Up @@ -410,7 +410,7 @@ defmodule Electric.Plug.ServeShapePlugTest do

test "sends an up-to-date response after a timeout if no changes are observed" do
Mock.ShapeCache
|> expect(:get_or_create_shape_id, fn @test_shape, _opts ->
|> expect(:get_shape, fn @test_shape, _opts ->
{@test_shape_id, @test_offset}
end)
|> stub(:has_shape?, fn @test_shape_id, _opts -> true end)
Expand Down Expand Up @@ -442,9 +442,9 @@ defmodule Electric.Plug.ServeShapePlugTest do
]
end

test "send 409 when shape ID requested does not exist" do
test "sends 409 with a redirect to existing shape when requested shape ID does not exist" do
Mock.ShapeCache
|> expect(:get_or_create_shape_id, fn @test_shape, _opts ->
|> expect(:get_shape, fn @test_shape, _opts ->
{@test_shape_id, @test_offset}
end)
|> stub(:has_shape?, fn "foo", _opts -> false end)
Expand All @@ -466,6 +466,54 @@ defmodule Electric.Plug.ServeShapePlugTest do
assert get_resp_header(conn, "x-electric-shape-id") == [@test_shape_id]
assert get_resp_header(conn, "location") == ["/?shape_id=#{@test_shape_id}&offset=-1"]
end

test "creates a new shape when shape ID does not exist and sends a 409 redirecting to the newly created shape" do
new_shape_id = "new-shape-id"

Mock.ShapeCache
|> expect(:get_shape, fn @test_shape, _opts -> nil end)
|> stub(:has_shape?, fn @test_shape_id, _opts -> false end)
|> expect(:get_or_create_shape_id, fn @test_shape, _opts ->
{new_shape_id, @test_offset}
end)

Mock.Storage
|> stub(:for_shape, fn new_shape_id, opts -> {new_shape_id, opts} end)

conn =
conn(
:get,
%{"root_table" => "public.users"},
"?offset=#{"50_12"}&shape_id=#{@test_shape_id}"
)
|> ServeShapePlug.call([])

assert conn.status == 409

assert Jason.decode!(conn.resp_body) == [%{"headers" => %{"control" => "must-refetch"}}]
assert get_resp_header(conn, "x-electric-shape-id") == [new_shape_id]
assert get_resp_header(conn, "location") == ["/?shape_id=#{new_shape_id}&offset=-1"]
end

test "sends 400 when shape ID does not match shape definition" do
Mock.ShapeCache
|> expect(:get_shape, fn @test_shape, _opts -> nil end)
|> stub(:has_shape?, fn @test_shape_id, _opts -> true end)

Mock.Storage
|> stub(:for_shape, fn @test_shape_id, opts -> {@test_shape_id, opts} end)

conn =
conn(
:get,
%{"root_table" => "public.users"},
"?offset=#{"50_12"}&shape_id=#{@test_shape_id}"
)
|> ServeShapePlug.call([])

assert conn.status == 400
assert Jason.decode!(conn.resp_body) == [%{"headers" => %{"control" => "must-refetch"}}]
end
end

defp put_in_config(%Plug.Conn{assigns: assigns} = conn, key, value),
Expand Down
8 changes: 7 additions & 1 deletion packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,13 @@ export class ShapeStream<T extends Row = Row> {
else break
} catch (e) {
if (!(e instanceof FetchError)) throw e // should never happen
if (e.status == 409) {
if (e.status == 400) {
// The request is invalid, most likely because the shape has been deleted.
// We should start from scratch, this will force the shape to be recreated.
this.reset()
this.publish(e.json as Message<T>[])
continue
} else if (e.status == 409) {
// Upon receiving a 409, we should start from scratch
// with the newly provided shape ID
const newShapeId = e.headers[`x-electric-shape-id`]
Expand Down

0 comments on commit e3a07b7

Please sign in to comment.