Skip to content

Commit

Permalink
feat (client): expose time elapsed since last sync (#1625)
Browse files Browse the repository at this point in the history
This PR fixes #1468 as it
exposes a `lastSynced` getter on the `ShapeStream` and `Shape` classes
which is the number of milliseconds since last sync with the server.

---------

Co-authored-by: Kyle Mathews <[email protected]>
  • Loading branch information
kevin-dp and KyleAMathews authored Sep 6, 2024
1 parent 661cb64 commit fe251c8
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 72 deletions.
5 changes: 5 additions & 0 deletions .changeset/brown-turtles-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": minor
---

Expose a `lastSyncedAt` field on the `ShapeStream` and `Shape` classes which is the time elapsed since the last sync with Electric (in milliseconds). Remove the `isUpToDate` field on the `Shape` class.
5 changes: 5 additions & 0 deletions .changeset/funny-houses-remember.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": patch
---

Expose an `isConnected` method on `ShapeStream` and `Shape` classes.
5 changes: 0 additions & 5 deletions packages/react-hooks/src/react-hooks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ export interface UseShapeResult<T extends Row = Row> {
shape: Shape<T>
error: Shape<T>[`error`]
isError: boolean
/**
* Has the ShapeStream caught up with the replication log from Postgres.
*/
isUpToDate: boolean
}

function shapeSubscribe<T extends Row>(shape: Shape<T>, callback: () => void) {
Expand All @@ -86,7 +82,6 @@ function shapeSubscribe<T extends Row>(shape: Shape<T>, callback: () => void) {
function parseShapeData<T extends Row>(shape: Shape<T>): UseShapeResult<T> {
return {
data: [...shape.valueSync.values()],
isUpToDate: shape.isUpToDate,
isError: shape.error !== false,
shape,
error: shape.error,
Expand Down
1 change: 0 additions & 1 deletion packages/react-hooks/test/react-hooks.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ describe(`useShape`, () => {
})
)

await waitFor(() => expect(result.current.isUpToDate).toEqual(true))
await waitFor(() => expect(result.current.error).toBe(false))
await waitFor(() => expect(result.current.isError).toEqual(false))
await waitFor(() => expect(result.current.data).toEqual([]))
Expand Down
163 changes: 98 additions & 65 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ export class ShapeStream<T extends Row = Row> {

private lastOffset: Offset
private messageParser: MessageParser<T>
private lastSyncedAt?: number // unix time
public isUpToDate: boolean = false
private connected: boolean = false

shapeId?: string

Expand All @@ -210,80 +212,91 @@ export class ShapeStream<T extends Row = Row> {

const { url, where, signal } = this.options

while ((!signal?.aborted && !this.isUpToDate) || this.options.subscribe) {
const fetchUrl = new URL(url)
if (where) fetchUrl.searchParams.set(`where`, where)
fetchUrl.searchParams.set(`offset`, this.lastOffset)
try {
while ((!signal?.aborted && !this.isUpToDate) || this.options.subscribe) {
const fetchUrl = new URL(url)
if (where) fetchUrl.searchParams.set(`where`, where)
fetchUrl.searchParams.set(`offset`, this.lastOffset)

if (this.isUpToDate) {
fetchUrl.searchParams.set(`live`, `true`)
}
if (this.isUpToDate) {
fetchUrl.searchParams.set(`live`, `true`)
}

if (this.shapeId) {
// This should probably be a header for better cache breaking?
fetchUrl.searchParams.set(`shape_id`, this.shapeId!)
}
if (this.shapeId) {
// This should probably be a header for better cache breaking?
fetchUrl.searchParams.set(`shape_id`, this.shapeId!)
}

let response!: Response
let response!: Response

try {
const maybeResponse = await this.fetchWithBackoff(fetchUrl)
if (maybeResponse) response = maybeResponse
else break
} catch (e) {
if (!(e instanceof FetchError)) throw e // should never happen
if (e.status == 409) {
// Upon receiving a 409, we should start from scratch
// with the newly provided shape ID
const newShapeId = e.headers[`x-electric-shape-id`]
this.reset(newShapeId)
this.publish(e.json as Message<T>[])
continue
} else if (e.status >= 400 && e.status < 500) {
// Notify subscribers
this.sendErrorToUpToDateSubscribers(e)
this.sendErrorToSubscribers(e)

// 400 errors are not actionable without additional user input, so we're throwing them.
throw e
}
}

try {
const maybeResponse = await this.fetchWithBackoff(fetchUrl)
if (maybeResponse) response = maybeResponse
else break
} catch (e) {
if (!(e instanceof FetchError)) throw e // should never happen
if (e.status == 409) {
// Upon receiving a 409, we should start from scratch
// with the newly provided shape ID
const newShapeId = e.headers[`x-electric-shape-id`]
this.reset(newShapeId)
this.publish(e.json as Message<T>[])
continue
} else if (e.status >= 400 && e.status < 500) {
// Notify subscribers
this.sendErrorToUpToDateSubscribers(e)
this.sendErrorToSubscribers(e)

// 400 errors are not actionable without additional user input, so we're throwing them.
throw e
const { headers, status } = response
const shapeId = headers.get(`X-Electric-Shape-Id`)
if (shapeId) {
this.shapeId = shapeId
}
}

const { headers, status } = response
const shapeId = headers.get(`X-Electric-Shape-Id`)
if (shapeId) {
this.shapeId = shapeId
}
const lastOffset = headers.get(`X-Electric-Chunk-Last-Offset`)
if (lastOffset) {
this.lastOffset = lastOffset as Offset
}

const lastOffset = headers.get(`X-Electric-Chunk-Last-Offset`)
if (lastOffset) {
this.lastOffset = lastOffset as Offset
}
const getSchema = (): Schema => {
const schemaHeader = headers.get(`X-Electric-Schema`)
return schemaHeader ? JSON.parse(schemaHeader) : {}
}
this.schema = this.schema ?? getSchema()

const getSchema = (): Schema => {
const schemaHeader = headers.get(`X-Electric-Schema`)
return schemaHeader ? JSON.parse(schemaHeader) : {}
}
this.schema = this.schema ?? getSchema()
const messages = status === 204 ? `[]` : await response.text()

const messages = status === 204 ? `[]` : await response.text()
if (status === 204) {
// There's no content so we are live and up to date
this.lastSyncedAt = Date.now()
}

const batch = this.messageParser.parse(messages, this.schema)
const batch = this.messageParser.parse(messages, this.schema)

// Update isUpToDate
if (batch.length > 0) {
const lastMessage = batch[batch.length - 1]
if (
isControlMessage(lastMessage) &&
lastMessage.headers.control === `up-to-date`
) {
this.lastSyncedAt = Date.now()
if (!this.isUpToDate) {
this.isUpToDate = true
this.notifyUpToDateSubscribers()
}
}

// Update isUpToDate
if (batch.length > 0) {
const lastMessage = batch[batch.length - 1]
if (
isControlMessage(lastMessage) &&
lastMessage.headers.control === `up-to-date` &&
!this.isUpToDate
) {
this.isUpToDate = true
this.notifyUpToDateSubscribers()
this.publish(batch)
}

this.publish(batch)
}
} finally {
this.connected = false
}
}

Expand Down Expand Up @@ -334,6 +347,16 @@ export class ShapeStream<T extends Row = Row> {
this.upToDateSubscribers.clear()
}

/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */
lastSynced(): number {
if (this.lastSyncedAt === undefined) return Infinity
return Date.now() - this.lastSyncedAt
}

isConnected(): boolean {
return this.connected
}

private notifyUpToDateSubscribers() {
this.upToDateSubscribers.forEach(([callback]) => {
callback()
Expand All @@ -355,6 +378,7 @@ export class ShapeStream<T extends Row = Row> {
this.lastOffset = `-1`
this.shapeId = shapeId
this.isUpToDate = false
this.connected = false
this.schema = undefined
}

Expand Down Expand Up @@ -390,9 +414,14 @@ export class ShapeStream<T extends Row = Row> {
while (true) {
try {
const result = await this.fetchClient(url.toString(), { signal })
if (result.ok) return result
else throw await FetchError.fromResponse(result, url.toString())
if (result.ok) {
if (this.options.subscribe) {
this.connected = true
}
return result
} else throw await FetchError.fromResponse(result, url.toString())
} catch (e) {
this.connected = false
if (signal?.aborted) {
return undefined
} else if (
Expand Down Expand Up @@ -471,8 +500,12 @@ export class Shape<T extends Row = Row> {
)
}

get isUpToDate(): boolean {
return this.stream.isUpToDate
lastSynced(): number {
return this.stream.lastSynced()
}

isConnected(): boolean {
return this.stream.isConnected()
}

get value(): Promise<ShapeData<T>> {
Expand Down
Loading

0 comments on commit fe251c8

Please sign in to comment.