Skip to content

Commit

Permalink
fix: during live requests, the server returns a cursor for the client…
Browse files Browse the repository at this point in the history
… to use for cache-busting (#1826)

This PR is a fix for inconsistencies in caching in http proxying while
clients are long-polling. It also adds `public` to our `cache-control`
header as that's required by some http proxies in order to cache.

HTTP Proxies don't treat the max-age in cache-control exactly the same
way. Some start counting the age of the cache from the *beginning* of
the request while others count from the *end* of the request.

This inconsistency makes it difficult to reliably control caching and
request collapsing behavior for long-polling requests.

My previous PR in this area
#1656 made request
collapsing work nicely with proxies with the first behavior as they'd
collapse all requests within the time from the start of a long-poll and
the end of the max-age. And when the client went to request again after
the long-poll had ended, the previous request cache had expired already
so a new request would get sent to the origin.

However, this approach caused issues with proxies with the second
behavior as request collapsing would work but when the client re-polled,
the cache hadn't yet expired so the client would go into an infinite
loop requesting the same cached response over and over.

So this PR adds a `cursor` generated by the server that clients use as
part of `live` requests. This skips by any caches from the previous live
request (which on proxies with the first behavior, would have expired
already).

The cursor is generated by finding the next alignment boundary. I.e. if
the timeout is 20 seconds (which it is now but this could change) then
we calculate the alignment boundary by taking the current unix timestamp
and subtracting the Electric Epoch of October 9th, 2024 then dividing by
20 and rounding up and the multiplying by 20 again.

In practice this partitions caches for live requests for a given offset
into 20 second windows.

---------

Co-authored-by: Stefanos Mousafeiris <[email protected]>
  • Loading branch information
KyleAMathews and msfstef authored Oct 10, 2024
1 parent c0c9af6 commit 41845cb
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 13 deletions.
8 changes: 8 additions & 0 deletions .changeset/khaki-dolphins-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@electric-sql/client": patch
"@core/sync-service": patch
---

Fix inconsistencies in http proxies for caching live long-polling requests.

The server now returns a cursor for the client to use in requests to cache-bust any stale caches.
25 changes: 21 additions & 4 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ defmodule Electric.Plug.ServeShapePlug do
@up_to_date [Jason.encode!(%{headers: %{control: "up-to-date"}})]
@must_refetch Jason.encode!([%{headers: %{control: "must-refetch"}}])

defmodule TimeUtils do
@oct9th2024 DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC")
def seconds_since_oct9th_2024_next_interval(conn) do
long_poll_timeout = conn.assigns.config[:long_poll_timeout]
now = DateTime.utc_now()

diff_in_seconds = DateTime.diff(now, @oct9th2024, :second)
next_interval = ceil(diff_in_seconds / long_poll_timeout) * long_poll_timeout

next_interval
end
end

defmodule Params do
use Ecto.Schema
import Ecto.Changeset
Expand Down Expand Up @@ -331,16 +344,20 @@ defmodule Electric.Plug.ServeShapePlug do

defp put_resp_cache_headers(%Conn{assigns: %{config: config, live: live}} = conn, _) do
if live do
put_resp_header(
conn,
conn
|> put_resp_header(
"cache-control",
"max-age=5, stale-while-revalidate=5"
"public, max-age=5, stale-while-revalidate=5"
)
|> put_resp_header(
"electric-next-cursor",
TimeUtils.seconds_since_oct9th_2024_next_interval(conn) |> Integer.to_string()
)
else
put_resp_header(
conn,
"cache-control",
"max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}"
"public, max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}"
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
assert conn.status == 200

assert Plug.Conn.get_resp_header(conn, "cache-control") == [
"max-age=#{max_age}, stale-while-revalidate=#{stale_age}"
"public, max-age=#{max_age}, stale-while-revalidate=#{stale_age}"
]
end

Expand Down Expand Up @@ -381,7 +381,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
]

assert Plug.Conn.get_resp_header(conn, "cache-control") == [
"max-age=5, stale-while-revalidate=5"
"public, max-age=5, stale-while-revalidate=5"
]

assert Plug.Conn.get_resp_header(conn, "electric-chunk-last-offset") == [next_offset_str]
Expand Down Expand Up @@ -467,7 +467,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
assert Jason.decode!(conn.resp_body) == [%{"headers" => %{"control" => "up-to-date"}}]

assert Plug.Conn.get_resp_header(conn, "cache-control") == [
"max-age=5, stale-while-revalidate=5"
"public, max-age=5, stale-while-revalidate=5"
]

assert Plug.Conn.get_resp_header(conn, "electric-chunk-up-to-date") == [""]
Expand Down
14 changes: 14 additions & 0 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {
} from './fetch'
import {
CHUNK_LAST_OFFSET_HEADER,
LIVE_CACHE_BUSTER_HEADER,
LIVE_CACHE_BUSTER_QUERY_PARAM,
LIVE_QUERY_PARAM,
OFFSET_QUERY_PARAM,
SHAPE_ID_HEADER,
Expand Down Expand Up @@ -143,6 +145,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
>()

#lastOffset: Offset
#liveCacheBuster: string // Seconds since our Electric Epoch 😎
#lastSyncedAt?: number // unix time
#isUpToDate: boolean = false
#connected: boolean = false
Expand All @@ -153,6 +156,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
validateOptions(options)
this.options = { subscribe: true, ...options }
this.#lastOffset = this.options.offset ?? `-1`
this.#liveCacheBuster = ``
this.#shapeId = this.options.shapeId
this.#messageParser = new MessageParser<T>(options.parser)

Expand Down Expand Up @@ -197,6 +201,10 @@ export class ShapeStream<T extends Row<unknown> = Row>

if (this.#isUpToDate) {
fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`)
fetchUrl.searchParams.set(
LIVE_CACHE_BUSTER_QUERY_PARAM,
this.#liveCacheBuster
)
}

if (this.#shapeId) {
Expand Down Expand Up @@ -248,6 +256,11 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#lastOffset = lastOffset as Offset
}

const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER)
if (liveCacheBuster) {
this.#liveCacheBuster = liveCacheBuster
}

const getSchema = (): Schema => {
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER)
return schemaHeader ? JSON.parse(schemaHeader) : {}
Expand Down Expand Up @@ -376,6 +389,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
*/
#reset(shapeId?: string) {
this.#lastOffset = `-1`
this.#liveCacheBuster = ``
this.#shapeId = shapeId
this.#isUpToDate = false
this.#connected = false
Expand Down
2 changes: 2 additions & 0 deletions packages/typescript-client/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
export const SHAPE_ID_HEADER = `electric-shape-id`
export const LIVE_CACHE_BUSTER_HEADER = `electric-next-cursor`
export const LIVE_CACHE_BUSTER_QUERY_PARAM = `cursor`
export const CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset`
export const CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date`
export const SHAPE_SCHEMA_HEADER = `electric-schema`
Expand Down
7 changes: 6 additions & 1 deletion packages/typescript-client/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ describe(`HTTP Sync`, () => {
expect(urlsRequested[0].searchParams.has(`live`)).false
expect(urlsRequested[1].searchParams.get(`offset`)).not.toBe(`-1`)
expect(urlsRequested[1].searchParams.has(`live`)).true
expect(urlsRequested[1].searchParams.has(`cursor`)).true

// first request comes back immediately and is up to date, second one
// should hang while waiting for updates
Expand Down Expand Up @@ -539,7 +540,11 @@ describe(`HTTP Sync`, () => {
const cacheHeaders = res.headers.get(`cache-control`)
assert(cacheHeaders !== null, `Response should have cache-control header`)
const directives = parse(cacheHeaders)
expect(directives).toEqual({ 'max-age': 1, 'stale-while-revalidate': 3 })
expect(directives).toEqual({
public: true,
'max-age': 1,
'stale-while-revalidate': 3,
})
const etagHeader = res.headers.get(`etag`)
assert(etagHeader !== null, `Response should have etag header`)

Expand Down
2 changes: 1 addition & 1 deletion website/docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ HTTP/1.1 200 OK
date: Thu, 18 Jul 2024 10:49:12 GMT
content-length: 643
vary: accept-encoding
cache-control: max-age=60, stale-while-revalidate=300
cache-control: public, max-age=60, stale-while-revalidate=300
x-request-id: F-NJAXyulHAQP2MAAABN
access-control-allow-origin: *
access-control-expose-headers: *
Expand Down
15 changes: 11 additions & 4 deletions website/electric-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ paths:
This allows you to implement a long-polling strategy to consume
real-time updates.
- name: cursor
in: query
schema:
type: string
description: |-
This is a cursor generated by the server during live requests. It helps bust caches for
responses from previous long-polls.
- name: shape_id
in: query
schema:
Expand Down Expand Up @@ -127,7 +134,7 @@ paths:
cache-control:
schema:
type: string
example: "max-age=60, stale-while-revalidate=300"
example: "public, max-age=60, stale-while-revalidate=300"
description: |-
Cache control header as a string of comma separated directives.
Expand All @@ -140,7 +147,7 @@ paths:
Etag header specifying the shape ID and offset for efficient caching.
In the format `{shape_id}:{start_offset}:{end_offset}`.
x-electric-chunk-last-offset:
electric-chunk-last-offset:
schema:
type: string
example: "26800584_4"
Expand All @@ -151,7 +158,7 @@ paths:
you have provided. This header simplifies client development by
avoiding the need to parse the last offset out of the stream of
log entries.
x-electric-shape-id:
electric-shape-id:
schema:
type: string
example: "3833821-1721812114261"
Expand All @@ -160,7 +167,7 @@ paths:
Must be provided as the `shape_id` parameter when making
subsequent requests where `offset` is not `-1`.
x-electric-schema:
electric-schema:
schema:
type: string
example: '{"id":{"type":"int4","dimensions":0},"title":{"type":"text","dimensions":0},"status":{"type":"text","dimensions":0,"max_length":8}}'
Expand Down

0 comments on commit 41845cb

Please sign in to comment.