Skip to content

Commit

Permalink
feat (client): extensible value type for use with custom parsers (#1791)
Browse files Browse the repository at this point in the history
This PR modifies the types of our client to enable users to extend the
`Value` type. This is needed when using custom parsers.

For example, the parser below used to throw a type error because `Date`
is not a `Value` (where `Value` is the type of base SQL values):

```ts
const parser: Parser = {
  timestampz: (date: string) => {
    return new Date(date)
  },
}
```

With this PR, we can now write this parser as:
```ts
const parser: Parser<Date> = {
  timestampz: (date: string) => {
    return new Date(date)
  },
}
```

And an example usage of this parser with the `ShapeStream`:
```ts
type CustomRow = {
  foo: number
  bar: boolean
  baz: string
  ts: Date
}

const shapeStream = new ShapeStream<CustomRow>({
  url: `...`,
  parser: {
    timestampz: (date: string) => {
      return new Date(date)
    },
  },
})
```

Note that this would throw an error as `Date` is not a type that occurs
in the row:
```ts
type CustomRow = {
  foo: number
  bar: boolean
  baz: string
}

const shapeStream = new ShapeStream<CustomRow>({
  url: `...`,
  parser: {
    // type error here
    timestampz: (date: string) => {
      return new Date(date)
    },
  },
})
```
  • Loading branch information
kevin-dp authored Oct 7, 2024
1 parent 5e671a5 commit c980a76
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 55 deletions.
5 changes: 5 additions & 0 deletions .changeset/pink-rivers-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": patch
---

Make parser generic such that it can be parameterized with additional types supported by custom parsers.
2 changes: 1 addition & 1 deletion examples/tanstack-example/src/Example.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export const Example = () => {
const { data: items } = useShape<Item>(itemShape())
const submissions: Item[] = useMutationState({
filters: { status: `pending` },
select: (mutation) => mutation.state.context as Item | undefined,
select: (mutation) => mutation.state.context as Item,
}).filter((item) => item !== undefined)

const { mutateAsync: addItemMut } = useMutation({
Expand Down
2 changes: 1 addition & 1 deletion examples/tanstack-example/src/match-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
isChangeMessage,
} from "@electric-sql/client"

export async function matchStream<T extends Row>({
export async function matchStream<T extends Row<unknown>>({
stream,
operations,
matchFn,
Expand Down
43 changes: 28 additions & 15 deletions packages/react-hooks/src/react-hooks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,32 @@ import {
ShapeStream,
ShapeStreamOptions,
Row,
GetExtensions,
} from '@electric-sql/client'
import React from 'react'
import { useSyncExternalStoreWithSelector } from 'use-sync-external-store/with-selector.js'

const streamCache = new Map<string, ShapeStream>()
const shapeCache = new Map<ShapeStream, Shape>()
type UnknownShape = Shape<Row<unknown>>
type UnknownShapeStream = ShapeStream<Row<unknown>>

export async function preloadShape<T extends Row = Row>(
options: ShapeStreamOptions
const streamCache = new Map<string, UnknownShapeStream>()
const shapeCache = new Map<UnknownShapeStream, UnknownShape>()

export async function preloadShape<T extends Row<unknown> = Row>(
options: ShapeStreamOptions<GetExtensions<T>>
): Promise<Shape<T>> {
const shapeStream = getShapeStream<T>(options)
const shape = getShape<T>(shapeStream)
await shape.value
return shape
}

export function sortedOptionsHash(options: ShapeStreamOptions): string {
export function sortedOptionsHash<T>(options: ShapeStreamOptions<T>): string {
return JSON.stringify(options, Object.keys(options).sort())
}

export function getShapeStream<T extends Row = Row>(
options: ShapeStreamOptions
export function getShapeStream<T extends Row<unknown>>(
options: ShapeStreamOptions<GetExtensions<T>>
): ShapeStream<T> {
const shapeHash = sortedOptionsHash(options)

Expand All @@ -42,7 +46,9 @@ export function getShapeStream<T extends Row = Row>(
}
}

export function getShape<T extends Row>(shapeStream: ShapeStream<T>): Shape<T> {
export function getShape<T extends Row<unknown>>(
shapeStream: ShapeStream<T>
): Shape<T> {
// If the stream is already cached, return
if (shapeCache.has(shapeStream)) {
// Return the ShapeStream
Expand All @@ -57,7 +63,7 @@ export function getShape<T extends Row>(shapeStream: ShapeStream<T>): Shape<T> {
}
}

export interface UseShapeResult<T extends Row = Row> {
export interface UseShapeResult<T extends Row<unknown> = Row> {
/**
* The array of rows that make up the Shape.
* @type {T[]}
Expand All @@ -76,14 +82,19 @@ export interface UseShapeResult<T extends Row = Row> {
isError: boolean
}

function shapeSubscribe<T extends Row>(shape: Shape<T>, callback: () => void) {
function shapeSubscribe<T extends Row<unknown>>(
shape: Shape<T>,
callback: () => void
) {
const unsubscribe = shape.subscribe(callback)
return () => {
unsubscribe()
}
}

function parseShapeData<T extends Row>(shape: Shape<T>): UseShapeResult<T> {
function parseShapeData<T extends Row<unknown>>(
shape: Shape<T>
): UseShapeResult<T> {
return {
data: [...shape.valueSync.values()],
isLoading: shape.isLoading(),
Expand All @@ -98,19 +109,21 @@ function identity<T>(arg: T): T {
return arg
}

interface UseShapeOptions<SourceData extends Row, Selection>
extends ShapeStreamOptions {
interface UseShapeOptions<SourceData extends Row<unknown>, Selection>
extends ShapeStreamOptions<GetExtensions<SourceData>> {
selector?: (value: UseShapeResult<SourceData>) => Selection
}

export function useShape<
SourceData extends Row = Row,
SourceData extends Row<unknown> = Row,
Selection = UseShapeResult<SourceData>,
>({
selector = identity as (arg: UseShapeResult<SourceData>) => Selection,
...options
}: UseShapeOptions<SourceData, Selection>): Selection {
const shapeStream = getShapeStream<SourceData>(options as ShapeStreamOptions)
const shapeStream = getShapeStream<SourceData>(
options as ShapeStreamOptions<GetExtensions<SourceData>>
)
const shape = getShape<SourceData>(shapeStream)

const useShapeData = React.useMemo(() => {
Expand Down
3 changes: 3 additions & 0 deletions packages/react-hooks/test/react-hooks.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ describe(`useShape`, () => {
foo: number
bar: boolean
baz: string
ts: Date
}

it(`should infer correct return type when a selector is provided`, () => {
Expand All @@ -25,6 +26,7 @@ describe(`useShape`, () => {
foo: 5,
bar: true,
baz: `str`,
ts: new Date(),
}
},
})
Expand All @@ -41,6 +43,7 @@ describe(`useShape`, () => {
foo: 5,
bar: true,
baz: `str`,
ts: new Date(),
}
},
})
Expand Down
23 changes: 15 additions & 8 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { Message, Offset, Schema, Row, MaybePromise } from './types'
import {
Message,
Offset,
Schema,
Row,
MaybePromise,
GetExtensions,
} from './types'
import { MessageParser, Parser } from './parser'
import { isUpToDateMessage } from './helpers'
import { FetchError, FetchBackoffAbortError } from './error'
Expand All @@ -21,7 +28,7 @@ import {
/**
* Options for constructing a ShapeStream.
*/
export interface ShapeStreamOptions {
export interface ShapeStreamOptions<T = never> {
/**
* The full URL to where the Shape is hosted. This can either be the Electric server
* directly or a proxy. E.g. for a local Electric instance, you might set `http://localhost:3000/v1/shape/foo`
Expand Down Expand Up @@ -53,10 +60,10 @@ export interface ShapeStreamOptions {
subscribe?: boolean
signal?: AbortSignal
fetchClient?: typeof fetch
parser?: Parser
parser?: Parser<T>
}

export interface ShapeStreamInterface<T extends Row = Row> {
export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
subscribe(
callback: (messages: Message<T>[]) => MaybePromise<void>,
onError?: (error: FetchError | Error) => void
Expand Down Expand Up @@ -108,10 +115,10 @@ export interface ShapeStreamInterface<T extends Row = Row> {
* ```
*/

export class ShapeStream<T extends Row = Row>
export class ShapeStream<T extends Row<unknown> = Row>
implements ShapeStreamInterface<T>
{
readonly options: ShapeStreamOptions
readonly options: ShapeStreamOptions<GetExtensions<T>>

readonly #fetchClient: typeof fetch
readonly #messageParser: MessageParser<T>
Expand All @@ -135,7 +142,7 @@ export class ShapeStream<T extends Row = Row>
#shapeId?: string
#schema?: Schema

constructor(options: ShapeStreamOptions) {
constructor(options: ShapeStreamOptions<GetExtensions<T>>) {
validateOptions(options)
this.options = { subscribe: true, ...options }
this.#lastOffset = this.options.offset ?? `-1`
Expand Down Expand Up @@ -366,7 +373,7 @@ export class ShapeStream<T extends Row = Row>
}
}

function validateOptions(options: Partial<ShapeStreamOptions>): void {
function validateOptions<T>(options: Partial<ShapeStreamOptions<T>>): void {
if (!options.url) {
throw new Error(`Invalid shape option. It must provide the url`)
}
Expand Down
6 changes: 3 additions & 3 deletions packages/typescript-client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { ChangeMessage, ControlMessage, Message, Row } from './types'
* }
* ```
*/
export function isChangeMessage<T extends Row = Row>(
export function isChangeMessage<T extends Row<unknown> = Row>(
message: Message<T>
): message is ChangeMessage<T> {
return `key` in message
Expand All @@ -40,13 +40,13 @@ export function isChangeMessage<T extends Row = Row>(
* }
* ```
*/
export function isControlMessage<T extends Row = Row>(
export function isControlMessage<T extends Row<unknown> = Row>(
message: Message<T>
): message is ControlMessage {
return !isChangeMessage(message)
}

export function isUpToDateMessage<T extends Row = Row>(
export function isUpToDateMessage<T extends Row<unknown> = Row>(
message: Message<T>
): message is ControlMessage & { up_to_date: true } {
return isControlMessage(message) && message.headers.control === `up-to-date`
Expand Down
45 changes: 29 additions & 16 deletions packages/typescript-client/src/parser.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import { ColumnInfo, Message, Row, Schema, Value } from './types'
import { ColumnInfo, GetExtensions, Message, Row, Schema, Value } from './types'

type NullToken = null | `NULL`
type Token = Exclude<string, NullToken>
type NullableToken = Token | NullToken
export type ParseFunction = (
export type ParseFunction<Extensions = never> = (
value: Token,
additionalInfo?: Omit<ColumnInfo, `type` | `dims`>
) => Value
type NullableParseFunction = (
) => Value<Extensions>
type NullableParseFunction<Extensions = never> = (
value: NullableToken,
additionalInfo?: Omit<ColumnInfo, `type` | `dims`>
) => Value
export type Parser = { [key: string]: ParseFunction }
) => Value<Extensions>
/**
* @typeParam Extensions - Additional types that can be parsed by this parser beyond the standard SQL types.
* Defaults to no additional types.
*/
export type Parser<Extensions = never> = {
[key: string]: ParseFunction<Extensions>
}

const parseNumber = (value: string) => Number(value)
const parseBool = (value: string) => value === `true` || value === `t`
Expand All @@ -31,15 +37,18 @@ export const defaultParser: Parser = {
}

// Taken from: https://github.com/electric-sql/pglite/blob/main/packages/pglite/src/types.ts#L233-L279
export function pgArrayParser(value: Token, parser?: ParseFunction): Value {
export function pgArrayParser<Extensions>(
value: Token,
parser?: ParseFunction<Extensions>
): Value<Extensions> {
let i = 0
let char = null
let str = ``
let quoted = false
let last = 0
let p: string | undefined = undefined

function loop(x: string): Value[] {
function loop(x: string): Array<Value<Extensions>> {
const xs = []
for (; i < x.length; i++) {
char = x[i]
Expand Down Expand Up @@ -79,9 +88,9 @@ export function pgArrayParser(value: Token, parser?: ParseFunction): Value {
return loop(value)[0]
}

export class MessageParser<T extends Row> {
private parser: Parser
constructor(parser?: Parser) {
export class MessageParser<T extends Row<unknown>> {
private parser: Parser<GetExtensions<T>>
constructor(parser?: Parser<GetExtensions<T>>) {
// Merge the provided parser with the default parser
// to use the provided parser whenever defined
// and otherwise fall back to the default parser
Expand All @@ -96,7 +105,7 @@ export class MessageParser<T extends Row> {
// But `typeof null === 'object'` so we need to make an explicit check.
if (key === `value` && typeof value === `object` && value !== null) {
// Parse the row values
const row = value as Record<string, Value>
const row = value as Record<string, Value<GetExtensions<T>>>
Object.keys(row).forEach((key) => {
row[key] = this.parseRow(key, row[key] as NullableToken, schema)
})
Expand All @@ -106,7 +115,11 @@ export class MessageParser<T extends Row> {
}

// Parses the message values using the provided parser based on the schema information
private parseRow(key: string, value: NullableToken, schema: Schema): Value {
private parseRow(
key: string,
value: NullableToken,
schema: Schema
): Value<GetExtensions<T>> {
const columnInfo = schema[key]
if (!columnInfo) {
// We don't have information about the value
Expand Down Expand Up @@ -137,11 +150,11 @@ export class MessageParser<T extends Row> {
}
}

function makeNullableParser(
parser: ParseFunction,
function makeNullableParser<Extensions>(
parser: ParseFunction<Extensions>,
columnInfo: ColumnInfo,
columnName?: string
): NullableParseFunction {
): NullableParseFunction<Extensions> {
const isNullable = !(columnInfo.not_null ?? false)
// The sync service contains `null` value for a column whose value is NULL
// but if the column value is an array that contains a NULL value
Expand Down
6 changes: 3 additions & 3 deletions packages/typescript-client/src/shape.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { isChangeMessage, isControlMessage } from './helpers'
import { FetchError } from './error'
import { ShapeStreamInterface } from './client'

export type ShapeData<T extends Row = Row> = Map<string, T>
export type ShapeChangedCallback<T extends Row = Row> = (
export type ShapeData<T extends Row<unknown> = Row> = Map<string, T>
export type ShapeChangedCallback<T extends Row<unknown> = Row> = (
value: ShapeData<T>
) => void

Expand Down Expand Up @@ -39,7 +39,7 @@ export type ShapeChangedCallback<T extends Row = Row> = (
* console.log(shapeData)
* })
*/
export class Shape<T extends Row = Row> {
export class Shape<T extends Row<unknown> = Row> {
readonly #stream: ShapeStreamInterface<T>

readonly #data: ShapeData<T> = new Map()
Expand Down
Loading

0 comments on commit c980a76

Please sign in to comment.