From 41845cb6bdde7c9db8179eba63c86607030fb0fa Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 10 Oct 2024 09:37:21 -0600 Subject: [PATCH] fix: during live requests, the server returns a cursor for the client to use for cache-busting (#1826) This PR is a fix for inconsistencies in caching in http proxying while clients are long-polling. It also adds `public` to our `cache-control` header as that's required by some http proxies in order to cache. HTTP Proxies don't treat the max-age in cache-control exactly the same way. Some start counting the age of the cache from the *beginning* of the request while others count from the *end* of the request. This inconsistency makes it difficult to reliably control caching and request collapsing behavior for long-polling requests. My previous PR in this area https://github.com/electric-sql/electric/pull/1656 made request collapsing work nicely with proxies with the first behavior as they'd collapse all requests within the time from the start of a long-poll and the end of the max-age. And when the client went to request again after the long-poll had ended, the previous request cache had expired already so a new request would get sent to the origin. However, this approach caused issues with proxies with the second behavior as request collapsing would work but when the client re-polled, the cache hadn't yet expired so the client would go into an infinite loop requesting the same cached response over and over. So this PR adds a `cursor` generated by the server that clients use as part of `live` requests. This skips by any caches from the previous live request (which on proxies with the first behavior, would have expired already). The cursor is generated by finding the next alignment boundary. I.e. if the timeout is 20 seconds (which it is now but this could change) then we calculate the alignment boundary by taking the current unix timestamp and subtracting the Electric Epoch of October 9th, 2024 then dividing by 20 and rounding up and the multiplying by 20 again. In practice this partitions caches for live requests for a given offset into 20 second windows. --------- Co-authored-by: Stefanos Mousafeiris --- .changeset/khaki-dolphins-promise.md | 8 ++++++ .../lib/electric/plug/serve_shape_plug.ex | 25 ++++++++++++++++--- .../electric/plug/serve_shape_plug_test.exs | 6 ++--- packages/typescript-client/src/client.ts | 14 +++++++++++ packages/typescript-client/src/constants.ts | 2 ++ .../test/integration.test.ts | 7 +++++- website/docs/quickstart.md | 2 +- website/electric-api.yaml | 15 ++++++++--- 8 files changed, 66 insertions(+), 13 deletions(-) create mode 100644 .changeset/khaki-dolphins-promise.md diff --git a/.changeset/khaki-dolphins-promise.md b/.changeset/khaki-dolphins-promise.md new file mode 100644 index 0000000000..644476a491 --- /dev/null +++ b/.changeset/khaki-dolphins-promise.md @@ -0,0 +1,8 @@ +--- +"@electric-sql/client": patch +"@core/sync-service": patch +--- + +Fix inconsistencies in http proxies for caching live long-polling requests. + +The server now returns a cursor for the client to use in requests to cache-bust any stale caches. diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index 453eb1e84a..1a4e5df27f 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -23,6 +23,19 @@ defmodule Electric.Plug.ServeShapePlug do @up_to_date [Jason.encode!(%{headers: %{control: "up-to-date"}})] @must_refetch Jason.encode!([%{headers: %{control: "must-refetch"}}]) + defmodule TimeUtils do + @oct9th2024 DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC") + def seconds_since_oct9th_2024_next_interval(conn) do + long_poll_timeout = conn.assigns.config[:long_poll_timeout] + now = DateTime.utc_now() + + diff_in_seconds = DateTime.diff(now, @oct9th2024, :second) + next_interval = ceil(diff_in_seconds / long_poll_timeout) * long_poll_timeout + + next_interval + end + end + defmodule Params do use Ecto.Schema import Ecto.Changeset @@ -331,16 +344,20 @@ defmodule Electric.Plug.ServeShapePlug do defp put_resp_cache_headers(%Conn{assigns: %{config: config, live: live}} = conn, _) do if live do - put_resp_header( - conn, + conn + |> put_resp_header( "cache-control", - "max-age=5, stale-while-revalidate=5" + "public, max-age=5, stale-while-revalidate=5" + ) + |> put_resp_header( + "electric-next-cursor", + TimeUtils.seconds_since_oct9th_2024_next_interval(conn) |> Integer.to_string() ) else put_resp_header( conn, "cache-control", - "max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}" + "public, max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}" ) end end diff --git a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs index cb6f172c7f..13403918a2 100644 --- a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs @@ -203,7 +203,7 @@ defmodule Electric.Plug.ServeShapePlugTest do assert conn.status == 200 assert Plug.Conn.get_resp_header(conn, "cache-control") == [ - "max-age=#{max_age}, stale-while-revalidate=#{stale_age}" + "public, max-age=#{max_age}, stale-while-revalidate=#{stale_age}" ] end @@ -381,7 +381,7 @@ defmodule Electric.Plug.ServeShapePlugTest do ] assert Plug.Conn.get_resp_header(conn, "cache-control") == [ - "max-age=5, stale-while-revalidate=5" + "public, max-age=5, stale-while-revalidate=5" ] assert Plug.Conn.get_resp_header(conn, "electric-chunk-last-offset") == [next_offset_str] @@ -467,7 +467,7 @@ defmodule Electric.Plug.ServeShapePlugTest do assert Jason.decode!(conn.resp_body) == [%{"headers" => %{"control" => "up-to-date"}}] assert Plug.Conn.get_resp_header(conn, "cache-control") == [ - "max-age=5, stale-while-revalidate=5" + "public, max-age=5, stale-while-revalidate=5" ] assert Plug.Conn.get_resp_header(conn, "electric-chunk-up-to-date") == [""] diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 32a62daa06..7a1a4e50da 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -17,6 +17,8 @@ import { } from './fetch' import { CHUNK_LAST_OFFSET_HEADER, + LIVE_CACHE_BUSTER_HEADER, + LIVE_CACHE_BUSTER_QUERY_PARAM, LIVE_QUERY_PARAM, OFFSET_QUERY_PARAM, SHAPE_ID_HEADER, @@ -143,6 +145,7 @@ export class ShapeStream = Row> >() #lastOffset: Offset + #liveCacheBuster: string // Seconds since our Electric Epoch 😎 #lastSyncedAt?: number // unix time #isUpToDate: boolean = false #connected: boolean = false @@ -153,6 +156,7 @@ export class ShapeStream = Row> validateOptions(options) this.options = { subscribe: true, ...options } this.#lastOffset = this.options.offset ?? `-1` + this.#liveCacheBuster = `` this.#shapeId = this.options.shapeId this.#messageParser = new MessageParser(options.parser) @@ -197,6 +201,10 @@ export class ShapeStream = Row> if (this.#isUpToDate) { fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`) + fetchUrl.searchParams.set( + LIVE_CACHE_BUSTER_QUERY_PARAM, + this.#liveCacheBuster + ) } if (this.#shapeId) { @@ -248,6 +256,11 @@ export class ShapeStream = Row> this.#lastOffset = lastOffset as Offset } + const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER) + if (liveCacheBuster) { + this.#liveCacheBuster = liveCacheBuster + } + const getSchema = (): Schema => { const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER) return schemaHeader ? JSON.parse(schemaHeader) : {} @@ -376,6 +389,7 @@ export class ShapeStream = Row> */ #reset(shapeId?: string) { this.#lastOffset = `-1` + this.#liveCacheBuster = `` this.#shapeId = shapeId this.#isUpToDate = false this.#connected = false diff --git a/packages/typescript-client/src/constants.ts b/packages/typescript-client/src/constants.ts index d5b1a53121..bfa2676ee0 100644 --- a/packages/typescript-client/src/constants.ts +++ b/packages/typescript-client/src/constants.ts @@ -1,4 +1,6 @@ export const SHAPE_ID_HEADER = `electric-shape-id` +export const LIVE_CACHE_BUSTER_HEADER = `electric-next-cursor` +export const LIVE_CACHE_BUSTER_QUERY_PARAM = `cursor` export const CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset` export const CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date` export const SHAPE_SCHEMA_HEADER = `electric-schema` diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index 288ebcf531..c61e242d11 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -98,6 +98,7 @@ describe(`HTTP Sync`, () => { expect(urlsRequested[0].searchParams.has(`live`)).false expect(urlsRequested[1].searchParams.get(`offset`)).not.toBe(`-1`) expect(urlsRequested[1].searchParams.has(`live`)).true + expect(urlsRequested[1].searchParams.has(`cursor`)).true // first request comes back immediately and is up to date, second one // should hang while waiting for updates @@ -539,7 +540,11 @@ describe(`HTTP Sync`, () => { const cacheHeaders = res.headers.get(`cache-control`) assert(cacheHeaders !== null, `Response should have cache-control header`) const directives = parse(cacheHeaders) - expect(directives).toEqual({ 'max-age': 1, 'stale-while-revalidate': 3 }) + expect(directives).toEqual({ + public: true, + 'max-age': 1, + 'stale-while-revalidate': 3, + }) const etagHeader = res.headers.get(`etag`) assert(etagHeader !== null, `Response should have etag header`) diff --git a/website/docs/quickstart.md b/website/docs/quickstart.md index c43eb0629a..a4b713a1dc 100644 --- a/website/docs/quickstart.md +++ b/website/docs/quickstart.md @@ -109,7 +109,7 @@ HTTP/1.1 200 OK date: Thu, 18 Jul 2024 10:49:12 GMT content-length: 643 vary: accept-encoding -cache-control: max-age=60, stale-while-revalidate=300 +cache-control: public, max-age=60, stale-while-revalidate=300 x-request-id: F-NJAXyulHAQP2MAAABN access-control-allow-origin: * access-control-expose-headers: * diff --git a/website/electric-api.yaml b/website/electric-api.yaml index 3ecaaa0150..35a8a585b1 100644 --- a/website/electric-api.yaml +++ b/website/electric-api.yaml @@ -88,6 +88,13 @@ paths: This allows you to implement a long-polling strategy to consume real-time updates. + - name: cursor + in: query + schema: + type: string + description: |- + This is a cursor generated by the server during live requests. It helps bust caches for + responses from previous long-polls. - name: shape_id in: query schema: @@ -127,7 +134,7 @@ paths: cache-control: schema: type: string - example: "max-age=60, stale-while-revalidate=300" + example: "public, max-age=60, stale-while-revalidate=300" description: |- Cache control header as a string of comma separated directives. @@ -140,7 +147,7 @@ paths: Etag header specifying the shape ID and offset for efficient caching. In the format `{shape_id}:{start_offset}:{end_offset}`. - x-electric-chunk-last-offset: + electric-chunk-last-offset: schema: type: string example: "26800584_4" @@ -151,7 +158,7 @@ paths: you have provided. This header simplifies client development by avoiding the need to parse the last offset out of the stream of log entries. - x-electric-shape-id: + electric-shape-id: schema: type: string example: "3833821-1721812114261" @@ -160,7 +167,7 @@ paths: Must be provided as the `shape_id` parameter when making subsequent requests where `offset` is not `-1`. - x-electric-schema: + electric-schema: schema: type: string example: '{"id":{"type":"int4","dimensions":0},"title":{"type":"text","dimensions":0},"status":{"type":"text","dimensions":0,"max_length":8}}'