From a6c7bedd9e778a977ca5fca5386d8ffc68754214 Mon Sep 17 00:00:00 2001 From: Stefanos Mousafeiris Date: Tue, 27 Aug 2024 17:58:37 +0300 Subject: [PATCH] feat: `Message` type guard helpers (#1572) Addresses https://github.com/electric-sql/electric/issues/1453 Adds the type guards to check whether a message is a `ControlMessage` or a `ChangeMessage`, documented, tested, and exported. I've also honed in on the message type definitions a bit more as I had experienced an uncaught bug due to the generality of the `Headers` type. --- .changeset/pretty-donuts-rule.md | 5 ++ examples/nextjs-example/app/match-stream.ts | 8 +++- examples/redis-client/src/index.ts | 4 +- examples/remix-basic/app/match-stream.ts | 8 +++- packages/typescript-client/src/client.ts | 34 ++++++++------ packages/typescript-client/src/helpers.ts | 47 +++++++++++++++++++ packages/typescript-client/src/index.ts | 1 + packages/typescript-client/src/types.ts | 4 +- .../typescript-client/test/client.test.ts | 4 +- .../typescript-client/test/helpers.test-d.ts | 45 ++++++++++++++++++ .../typescript-client/test/helpers.test.ts | 28 +++++++++++ .../test/integration.test.ts | 40 +++++++++------- packages/typescript-client/vitest.config.ts | 1 + 13 files changed, 186 insertions(+), 43 deletions(-) create mode 100644 .changeset/pretty-donuts-rule.md create mode 100644 packages/typescript-client/src/helpers.ts create mode 100644 packages/typescript-client/test/helpers.test-d.ts create mode 100644 packages/typescript-client/test/helpers.test.ts diff --git a/.changeset/pretty-donuts-rule.md b/.changeset/pretty-donuts-rule.md new file mode 100644 index 0000000000..9ea6f84bdf --- /dev/null +++ b/.changeset/pretty-donuts-rule.md @@ -0,0 +1,5 @@ +--- +"@electric-sql/client": patch +--- + +Add `Message` type guard helpers `isChangeMessage` and `isControlMessage`. diff --git a/examples/nextjs-example/app/match-stream.ts b/examples/nextjs-example/app/match-stream.ts index 3c60edc9f3..25e8961962 100644 --- a/examples/nextjs-example/app/match-stream.ts +++ b/examples/nextjs-example/app/match-stream.ts @@ -1,4 +1,8 @@ -import { ShapeStream, ChangeMessage } from "@electric-sql/client" +import { + ShapeStream, + ChangeMessage, + isChangeMessage, +} from "@electric-sql/client" export async function matchStream({ stream, @@ -21,7 +25,7 @@ export async function matchStream({ const unsubscribe = stream.subscribe((messages) => { for (const message of messages) { if ( - `key` in message && + isChangeMessage(message) && operations.includes(message.headers.operation) ) { if ( diff --git a/examples/redis-client/src/index.ts b/examples/redis-client/src/index.ts index c81558bee4..fd3f5af1f0 100644 --- a/examples/redis-client/src/index.ts +++ b/examples/redis-client/src/index.ts @@ -1,5 +1,5 @@ import { createClient } from 'redis' -import { ShapeStream, Message } from '@electric-sql/client' +import { ShapeStream, Message, isChangeMessage } from '@electric-sql/client' // Create a Redis client const REDIS_HOST = `localhost` @@ -21,7 +21,7 @@ client.connect().then(() => { // Loop through each message and make writes to the Redis hash for action messages messages.forEach((message) => { - if (!(`key` in message)) return + if (!isChangeMessage(message)) return // Upsert/delete switch (message.headers.action) { case `delete`: diff --git a/examples/remix-basic/app/match-stream.ts b/examples/remix-basic/app/match-stream.ts index c670cf9481..e39df2f80a 100644 --- a/examples/remix-basic/app/match-stream.ts +++ b/examples/remix-basic/app/match-stream.ts @@ -1,4 +1,8 @@ -import { ShapeStream, ChangeMessage } from "@electric-sql/client" +import { + ShapeStream, + ChangeMessage, + isChangeMessage, +} from "@electric-sql/client" export async function matchStream({ stream, @@ -21,7 +25,7 @@ export async function matchStream({ const unsubscribe = stream.subscribe((messages) => { for (const message of messages) { if ( - `key` in message && + isChangeMessage(message) && operations.includes(message.headers.operation) ) { if (matchFn({ operationType: message.headers.operation, message })) { diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 80ff771c8a..d7b90e349b 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -1,6 +1,6 @@ -import { ArgumentsType } from 'vitest' import { Message, Value, Offset, Schema } from './types' import { MessageParser, Parser } from './parser' +import { isChangeMessage, isControlMessage } from './helpers' export type ShapeData = Map export type ShapeChangedCallback = (value: ShapeData) => void @@ -196,7 +196,7 @@ export class ShapeStream { this.backoffOptions = options.backoffOptions ?? BackoffDefaults this.fetchClient = options.fetchClient ?? - ((...args: ArgumentsType) => fetch(...args)) + ((...args: Parameters) => fetch(...args)) this.start() } @@ -270,7 +270,8 @@ export class ShapeStream { if (batch.length > 0) { const lastMessage = batch[batch.length - 1] if ( - lastMessage.headers?.[`control`] === `up-to-date` && + isControlMessage(lastMessage) && + lastMessage.headers.control === `up-to-date` && !this.isUpToDate ) { this.isUpToDate = true @@ -514,7 +515,7 @@ export class Shape { let newlyUpToDate = false messages.forEach((message) => { - if (`key` in message) { + if (isChangeMessage(message)) { dataMayHaveChanged = [`insert`, `update`, `delete`].includes( message.headers.operation ) @@ -535,19 +536,22 @@ export class Shape { } } - if (message.headers?.[`control`] === `up-to-date`) { - isUpToDate = true - if (!this.hasNotifiedSubscribersUpToDate) { - newlyUpToDate = true + if (isControlMessage(message)) { + switch (message.headers.control) { + case `up-to-date`: + isUpToDate = true + if (!this.hasNotifiedSubscribersUpToDate) { + newlyUpToDate = true + } + break + case `must-refetch`: + this.data.clear() + this.error = false + isUpToDate = false + newlyUpToDate = false + break } } - - if (message.headers?.[`control`] === `must-refetch`) { - this.data.clear() - this.error = false - isUpToDate = false - newlyUpToDate = false - } }) // Always notify subscribers when the Shape first is up to date. diff --git a/packages/typescript-client/src/helpers.ts b/packages/typescript-client/src/helpers.ts new file mode 100644 index 0000000000..d2218cc1c3 --- /dev/null +++ b/packages/typescript-client/src/helpers.ts @@ -0,0 +1,47 @@ +import { ChangeMessage, ControlMessage, Message, Value } from './types' + +/** + * Type guard for checking {@link Message} is {@link ChangeMessage}. + * + * See [TS docs](https://www.typescriptlang.org/docs/handbook/advanced-types.html#user-defined-type-guards) + * for information on how to use type guards. + * + * @param message - the message to check + * @returns true if the message is a {@link ChangeMessage} + * + * @example + * ```ts + * if (isChangeMessage(message)) { + * const msgChng: ChangeMessage = message // Ok + * const msgCtrl: ControlMessage = message // Err, type mismatch + * } + * ``` + */ +export function isChangeMessage( + message: Message +): message is ChangeMessage { + return `key` in message +} + +/** + * Type guard for checking {@link Message} is {@link ControlMessage}. + * + * See [TS docs](https://www.typescriptlang.org/docs/handbook/advanced-types.html#user-defined-type-guards) + * for information on how to use type guards. + * + * @param message - the message to check + * @returns true if the message is a {@link ControlMessage} + * + * * @example + * ```ts + * if (isControlMessage(message)) { + * const msgChng: ChangeMessage = message // Err, type mismatch + * const msgCtrl: ControlMessage = message // Ok + * } + * ``` + */ +export function isControlMessage( + message: Message +): message is ControlMessage { + return !isChangeMessage(message) +} diff --git a/packages/typescript-client/src/index.ts b/packages/typescript-client/src/index.ts index edfed4a6d6..c4226cbccb 100644 --- a/packages/typescript-client/src/index.ts +++ b/packages/typescript-client/src/index.ts @@ -1,2 +1,3 @@ export * from './client' export * from './types' +export * from './helpers' diff --git a/packages/typescript-client/src/types.ts b/packages/typescript-client/src/types.ts index fc190ef21b..54541814c2 100644 --- a/packages/typescript-client/src/types.ts +++ b/packages/typescript-client/src/types.ts @@ -10,11 +10,11 @@ export type Value = export type Offset = `-1` | `${number}_${number}` interface Header { - [key: string]: Value + [key: Exclude]: Value } export type ControlMessage = { - headers: Header + headers: Header & { control: `up-to-date` | `must-refetch` } } export type ChangeMessage = { diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index c5c2893a4f..bbda062487 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -1,4 +1,4 @@ -import { ArgumentsType, describe, expect, inject, vi } from 'vitest' +import { describe, expect, inject, vi } from 'vitest' import { v4 as uuidv4 } from 'uuid' import { setTimeout as sleep } from 'node:timers/promises' import { testWithIssuesTable as it } from './support/test-context' @@ -119,7 +119,7 @@ describe(`Shape`, () => { }) let requestsMade = 0 - const fetchWrapper = async (...args: ArgumentsType) => { + const fetchWrapper = async (...args: Parameters) => { // clear the shape and modify the data after the initial request if (requestsMade === 1) { await clearIssuesShape() diff --git a/packages/typescript-client/test/helpers.test-d.ts b/packages/typescript-client/test/helpers.test-d.ts new file mode 100644 index 0000000000..9e439e209e --- /dev/null +++ b/packages/typescript-client/test/helpers.test-d.ts @@ -0,0 +1,45 @@ +import { describe, expectTypeOf, it } from 'vitest' +import { + ChangeMessage, + ControlMessage, + isChangeMessage, + isControlMessage, + Message, +} from '../src' + +describe(`helpers`, () => { + it(`should respect ChangeMessages type guard`, () => { + const message = { + headers: { + operation: `insert`, + }, + offset: `-1`, + key: `foo`, + value: { foo: `bar` }, + } as Message<{ foo: string }> + + if (isChangeMessage(message)) { + const msgChng: ChangeMessage<{ foo: string }> = message + expectTypeOf(msgChng).toEqualTypeOf>() + + // @ts-expect-error - should have type mismatch + message as ControlMessage + } + }) + + it(`should respect ControlMessages type guard`, () => { + const message = { + headers: { + control: `up-to-date`, + }, + } as Message<{ [key: string]: string }> + + if (isControlMessage(message)) { + const msgCtrl: ControlMessage = message + expectTypeOf(msgCtrl).toEqualTypeOf() + + // @ts-expect-error - should have type mismatch + message as ChangeMessage<{ foo: string }> + } + }) +}) diff --git a/packages/typescript-client/test/helpers.test.ts b/packages/typescript-client/test/helpers.test.ts new file mode 100644 index 0000000000..d52cf67915 --- /dev/null +++ b/packages/typescript-client/test/helpers.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from 'vitest' +import { isChangeMessage, isControlMessage, Message } from '../src' + +describe(`helpers`, () => { + it(`should correctly detect ChangeMessages`, () => { + const message = { + headers: { + operation: `insert`, + }, + offset: `-1`, + key: `key`, + value: { key: `value` }, + } as Message + + expect(isChangeMessage(message)).toBe(true) + expect(isControlMessage(message)).toBe(false) + }) + + it(`should correctly detect ControlMessages`, () => { + const message = { + headers: { + control: `up-to-date`, + }, + } as Message + expect(isControlMessage(message)).toBe(true) + expect(isChangeMessage(message)).toBe(false) + }) +}) diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index 3ef327df64..5c48b12cf9 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -1,9 +1,10 @@ import { parse } from 'cache-control-parser' import { setTimeout as sleep } from 'node:timers/promises' import { v4 as uuidv4 } from 'uuid' -import { ArgumentsType, assert, describe, expect, inject, vi } from 'vitest' +import { assert, describe, expect, inject, vi } from 'vitest' import { Shape, ShapeStream } from '../src/client' -import { Message, Offset } from '../src/types' +import { Message, Offset, Value } from '../src/types' +import { isChangeMessage, isControlMessage } from '../src' import { IssueRow, testWithIssuesTable as it, @@ -13,6 +14,9 @@ import * as h from './support/test-helpers' const BASE_URL = inject(`baseUrl`) +const isUpToDateMessage = (msg: Message) => + isControlMessage(msg) && msg.headers.control === `up-to-date` + it(`sanity check`, async ({ dbClient, issuesTableSql }) => { const result = await dbClient.query(`SELECT * FROM ${issuesTableSql}`) @@ -35,10 +39,10 @@ describe(`HTTP Sync`, () => { await new Promise((resolve, reject) => { issueStream.subscribe((messages) => { messages.forEach((message) => { - if (`key` in message) { + if (isChangeMessage(message)) { shapeData.set(message.key, message.value) } - if (message.headers?.[`control`] === `up-to-date`) { + if (isUpToDateMessage(message)) { aborter.abort() return resolve() } @@ -55,7 +59,7 @@ describe(`HTTP Sync`, () => { aborter, }) => { const urlsRequested: URL[] = [] - const fetchWrapper = (...args: ArgumentsType) => { + const fetchWrapper = (...args: Parameters) => { const url = new URL(args[0]) urlsRequested.push(url) return fetch(...args) @@ -74,10 +78,10 @@ describe(`HTTP Sync`, () => { await new Promise((resolve, reject) => { issueStream.subscribe((messages) => { messages.forEach((message) => { - if (`key` in message) { + if (isChangeMessage(message)) { shapeData.set(message.key, message.value) } - if (message.headers?.[`control`] === `up-to-date`) { + if (isUpToDateMessage(message)) { upToDateMessageCount += 1 } }) @@ -148,10 +152,10 @@ describe(`HTTP Sync`, () => { await new Promise((resolve) => { issueStream.subscribe((messages) => { messages.forEach((message) => { - if (`key` in message) { + if (isChangeMessage(message)) { shapeData.set(message.key, message.value) } - if (message.headers?.[`control`] === `up-to-date`) { + if (isUpToDateMessage(message)) { aborter.abort() return resolve() } @@ -344,7 +348,7 @@ describe(`HTTP Sync`, () => { }) let secondRowId = `` await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { - if (!(`key` in msg)) return + if (!isChangeMessage(msg)) return shapeData.set(msg.key, msg.value) if (nth === 0) { @@ -396,7 +400,7 @@ describe(`HTTP Sync`, () => { }) const p1 = h.forEachMessage(issueStream1, aborter1, (res, msg, nth) => { - if (!(`key` in msg)) return + if (!isChangeMessage(msg)) return shapeData1.set(msg.key, msg.value) if (nth === 1) { @@ -407,7 +411,7 @@ describe(`HTTP Sync`, () => { }) const p2 = h.forEachMessage(issueStream2, aborter2, (res, msg, nth) => { - if (!(`key` in msg)) return + if (!isChangeMessage(msg)) return shapeData2.set(msg.key, msg.value) if (nth === 2) { @@ -439,7 +443,7 @@ describe(`HTTP Sync`, () => { if (`offset` in msg) { expect(msg.offset).to.not.eq(`0_`) lastOffset = msg.offset - } else if (msg.headers?.[`control`] === `up-to-date`) { + } else if (isUpToDateMessage(msg)) { res() } }) @@ -467,7 +471,7 @@ describe(`HTTP Sync`, () => { shapeId: issueStream.shapeId, }) await h.forEachMessage(newIssueStream, aborter, (res, msg, nth) => { - if (msg.headers?.[`control`] === `up-to-date`) { + if (isUpToDateMessage(msg)) { res() } else { catchupOpsCount = nth + 1 @@ -588,7 +592,7 @@ describe(`HTTP Sync`, () => { }) await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { - if (!(`key` in msg)) return + if (!isChangeMessage(msg)) return shapeData.set(msg.key, msg.value) if (nth === 0) { @@ -620,7 +624,7 @@ describe(`HTTP Sync`, () => { const statusCodesReceived: number[] = [] - const fetchWrapper = async (...args: ArgumentsType) => { + const fetchWrapper = async (...args: Parameters) => { // before any subsequent requests after the initial one, ensure // that the existing shape is deleted and some more data is inserted if (statusCodesReceived.length === 1 && statusCodesReceived[0] === 200) { @@ -649,7 +653,7 @@ describe(`HTTP Sync`, () => { aborter, async (res, msg, nth) => { // shapeData.set(msg.key, msg.value) - if (msg.headers?.[`control`] === `up-to-date`) { + if (isUpToDateMessage(msg)) { upToDateReachedCount++ if (upToDateReachedCount === 1) { // upon reaching up to date initially, we have one @@ -668,7 +672,7 @@ describe(`HTTP Sync`, () => { return } - if (!(`key` in msg)) return + if (!isChangeMessage(msg)) return switch (nth) { case 0: diff --git a/packages/typescript-client/vitest.config.ts b/packages/typescript-client/vitest.config.ts index 13fc1976a9..6f1bf248c0 100644 --- a/packages/typescript-client/vitest.config.ts +++ b/packages/typescript-client/vitest.config.ts @@ -3,5 +3,6 @@ import { defineConfig } from 'vitest/config' export default defineConfig({ test: { globalSetup: `test/support/global-setup.ts`, + typecheck: { enabled: true }, }, })