Skip to content

Commit

Permalink
feat: Message type guard helpers (#1572)
Browse files Browse the repository at this point in the history
Addresses #1453

Adds the type guards to check whether a message is a `ControlMessage` or
a `ChangeMessage`, documented, tested, and exported.

I've also honed in on the message type definitions a bit more as I had
experienced an uncaught bug due to the generality of the `Headers` type.
  • Loading branch information
msfstef authored Aug 27, 2024
1 parent 914e768 commit a6c7bed
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 43 deletions.
5 changes: 5 additions & 0 deletions .changeset/pretty-donuts-rule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": patch
---

Add `Message` type guard helpers `isChangeMessage` and `isControlMessage`.
8 changes: 6 additions & 2 deletions examples/nextjs-example/app/match-stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { ShapeStream, ChangeMessage } from "@electric-sql/client"
import {
ShapeStream,
ChangeMessage,
isChangeMessage,
} from "@electric-sql/client"

export async function matchStream<T>({
stream,
Expand All @@ -21,7 +25,7 @@ export async function matchStream<T>({
const unsubscribe = stream.subscribe((messages) => {
for (const message of messages) {
if (
`key` in message &&
isChangeMessage(message) &&
operations.includes(message.headers.operation)
) {
if (
Expand Down
4 changes: 2 additions & 2 deletions examples/redis-client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createClient } from 'redis'
import { ShapeStream, Message } from '@electric-sql/client'
import { ShapeStream, Message, isChangeMessage } from '@electric-sql/client'

// Create a Redis client
const REDIS_HOST = `localhost`
Expand All @@ -21,7 +21,7 @@ client.connect().then(() => {

// Loop through each message and make writes to the Redis hash for action messages
messages.forEach((message) => {
if (!(`key` in message)) return
if (!isChangeMessage(message)) return
// Upsert/delete
switch (message.headers.action) {
case `delete`:
Expand Down
8 changes: 6 additions & 2 deletions examples/remix-basic/app/match-stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { ShapeStream, ChangeMessage } from "@electric-sql/client"
import {
ShapeStream,
ChangeMessage,
isChangeMessage,
} from "@electric-sql/client"

export async function matchStream({
stream,
Expand All @@ -21,7 +25,7 @@ export async function matchStream({
const unsubscribe = stream.subscribe((messages) => {
for (const message of messages) {
if (
`key` in message &&
isChangeMessage(message) &&
operations.includes(message.headers.operation)
) {
if (matchFn({ operationType: message.headers.operation, message })) {
Expand Down
34 changes: 19 additions & 15 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ArgumentsType } from 'vitest'
import { Message, Value, Offset, Schema } from './types'
import { MessageParser, Parser } from './parser'
import { isChangeMessage, isControlMessage } from './helpers'

export type ShapeData = Map<string, { [key: string]: Value }>
export type ShapeChangedCallback = (value: ShapeData) => void
Expand Down Expand Up @@ -196,7 +196,7 @@ export class ShapeStream {
this.backoffOptions = options.backoffOptions ?? BackoffDefaults
this.fetchClient =
options.fetchClient ??
((...args: ArgumentsType<typeof fetch>) => fetch(...args))
((...args: Parameters<typeof fetch>) => fetch(...args))

this.start()
}
Expand Down Expand Up @@ -270,7 +270,8 @@ export class ShapeStream {
if (batch.length > 0) {
const lastMessage = batch[batch.length - 1]
if (
lastMessage.headers?.[`control`] === `up-to-date` &&
isControlMessage(lastMessage) &&
lastMessage.headers.control === `up-to-date` &&
!this.isUpToDate
) {
this.isUpToDate = true
Expand Down Expand Up @@ -514,7 +515,7 @@ export class Shape {
let newlyUpToDate = false

messages.forEach((message) => {
if (`key` in message) {
if (isChangeMessage(message)) {
dataMayHaveChanged = [`insert`, `update`, `delete`].includes(
message.headers.operation
)
Expand All @@ -535,19 +536,22 @@ export class Shape {
}
}

if (message.headers?.[`control`] === `up-to-date`) {
isUpToDate = true
if (!this.hasNotifiedSubscribersUpToDate) {
newlyUpToDate = true
if (isControlMessage(message)) {
switch (message.headers.control) {
case `up-to-date`:
isUpToDate = true
if (!this.hasNotifiedSubscribersUpToDate) {
newlyUpToDate = true
}
break
case `must-refetch`:
this.data.clear()
this.error = false
isUpToDate = false
newlyUpToDate = false
break
}
}

if (message.headers?.[`control`] === `must-refetch`) {
this.data.clear()
this.error = false
isUpToDate = false
newlyUpToDate = false
}
})

// Always notify subscribers when the Shape first is up to date.
Expand Down
47 changes: 47 additions & 0 deletions packages/typescript-client/src/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { ChangeMessage, ControlMessage, Message, Value } from './types'

/**
* Type guard for checking {@link Message} is {@link ChangeMessage}.
*
* See [TS docs](https://www.typescriptlang.org/docs/handbook/advanced-types.html#user-defined-type-guards)
* for information on how to use type guards.
*
* @param message - the message to check
* @returns true if the message is a {@link ChangeMessage}
*
* @example
* ```ts
* if (isChangeMessage(message)) {
* const msgChng: ChangeMessage = message // Ok
* const msgCtrl: ControlMessage = message // Err, type mismatch
* }
* ```
*/
export function isChangeMessage<T extends Value = { [key: string]: Value }>(
message: Message<T>
): message is ChangeMessage<T> {
return `key` in message
}

/**
* Type guard for checking {@link Message} is {@link ControlMessage}.
*
* See [TS docs](https://www.typescriptlang.org/docs/handbook/advanced-types.html#user-defined-type-guards)
* for information on how to use type guards.
*
* @param message - the message to check
* @returns true if the message is a {@link ControlMessage}
*
* * @example
* ```ts
* if (isControlMessage(message)) {
* const msgChng: ChangeMessage = message // Err, type mismatch
* const msgCtrl: ControlMessage = message // Ok
* }
* ```
*/
export function isControlMessage<T extends Value = { [key: string]: Value }>(
message: Message<T>
): message is ControlMessage {
return !isChangeMessage(message)
}
1 change: 1 addition & 0 deletions packages/typescript-client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './client'
export * from './types'
export * from './helpers'
4 changes: 2 additions & 2 deletions packages/typescript-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ export type Value =
export type Offset = `-1` | `${number}_${number}`

interface Header {
[key: string]: Value
[key: Exclude<string, `operation` | `control`>]: Value
}

export type ControlMessage = {
headers: Header
headers: Header & { control: `up-to-date` | `must-refetch` }
}

export type ChangeMessage<T> = {
Expand Down
4 changes: 2 additions & 2 deletions packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ArgumentsType, describe, expect, inject, vi } from 'vitest'
import { describe, expect, inject, vi } from 'vitest'
import { v4 as uuidv4 } from 'uuid'
import { setTimeout as sleep } from 'node:timers/promises'
import { testWithIssuesTable as it } from './support/test-context'
Expand Down Expand Up @@ -119,7 +119,7 @@ describe(`Shape`, () => {
})

let requestsMade = 0
const fetchWrapper = async (...args: ArgumentsType<typeof fetch>) => {
const fetchWrapper = async (...args: Parameters<typeof fetch>) => {
// clear the shape and modify the data after the initial request
if (requestsMade === 1) {
await clearIssuesShape()
Expand Down
45 changes: 45 additions & 0 deletions packages/typescript-client/test/helpers.test-d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { describe, expectTypeOf, it } from 'vitest'
import {
ChangeMessage,
ControlMessage,
isChangeMessage,
isControlMessage,
Message,
} from '../src'

describe(`helpers`, () => {
it(`should respect ChangeMessages type guard`, () => {
const message = {
headers: {
operation: `insert`,
},
offset: `-1`,
key: `foo`,
value: { foo: `bar` },
} as Message<{ foo: string }>

if (isChangeMessage(message)) {
const msgChng: ChangeMessage<{ foo: string }> = message
expectTypeOf(msgChng).toEqualTypeOf<ChangeMessage<{ foo: string }>>()

// @ts-expect-error - should have type mismatch
message as ControlMessage
}
})

it(`should respect ControlMessages type guard`, () => {
const message = {
headers: {
control: `up-to-date`,
},
} as Message<{ [key: string]: string }>

if (isControlMessage(message)) {
const msgCtrl: ControlMessage = message
expectTypeOf(msgCtrl).toEqualTypeOf<ControlMessage>()

// @ts-expect-error - should have type mismatch
message as ChangeMessage<{ foo: string }>
}
})
})
28 changes: 28 additions & 0 deletions packages/typescript-client/test/helpers.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { describe, expect, it } from 'vitest'
import { isChangeMessage, isControlMessage, Message } from '../src'

describe(`helpers`, () => {
it(`should correctly detect ChangeMessages`, () => {
const message = {
headers: {
operation: `insert`,
},
offset: `-1`,
key: `key`,
value: { key: `value` },
} as Message

expect(isChangeMessage(message)).toBe(true)
expect(isControlMessage(message)).toBe(false)
})

it(`should correctly detect ControlMessages`, () => {
const message = {
headers: {
control: `up-to-date`,
},
} as Message
expect(isControlMessage(message)).toBe(true)
expect(isChangeMessage(message)).toBe(false)
})
})
Loading

0 comments on commit a6c7bed

Please sign in to comment.