diff --git a/.changeset/dull-hornets-talk.md b/.changeset/dull-hornets-talk.md new file mode 100644 index 0000000000..26752b4104 --- /dev/null +++ b/.changeset/dull-hornets-talk.md @@ -0,0 +1,5 @@ +--- +"@electric-sql/client": patch +--- + +Client refactor and fix `Shape` state synchronization with `ShapeStream`. diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index ea8e692159..a4576cb733 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -1,23 +1,22 @@ -import { Message, Offset, Schema, Row } from './types' +import { Message, Offset, Schema, Row, MaybePromise } from './types' import { MessageParser, Parser } from './parser' -import { isChangeMessage, isControlMessage } from './helpers' - -export type ShapeData = Map -export type ShapeChangedCallback = ( - value: ShapeData -) => void - -export interface BackoffOptions { - initialDelay: number - maxDelay: number - multiplier: number -} - -export const BackoffDefaults = { - initialDelay: 100, - maxDelay: 10_000, - multiplier: 1.3, -} +import { isUpToDateMessage } from './helpers' +import { MessageProcessor, MessageProcessorInterface } from './queue' +import { FetchError, FetchBackoffAbortError } from './error' +import { + BackoffDefaults, + BackoffOptions, + createFetchWithBackoff, +} from './fetch' +import { + CHUNK_LAST_OFFSET_HEADER, + LIVE_QUERY_PARAM, + OFFSET_QUERY_PARAM, + SHAPE_ID_HEADER, + SHAPE_ID_QUERY_PARAM, + SHAPE_SCHEMA_HEADER, + WHERE_QUERY_PARAM, +} from './constants' /** * Options for constructing a ShapeStream. @@ -57,86 +56,24 @@ export interface ShapeStreamOptions { parser?: Parser } -/** - * Receives batches of `messages`, puts them on a queue and processes - * them asynchronously by passing to a registered callback function. - * - * @constructor - * @param {(messages: Message[]) => void} callback function - */ -class MessageProcessor { - private messageQueue: Message[][] = [] - private isProcessing = false - private callback: (messages: Message[]) => void | Promise - - constructor(callback: (messages: Message[]) => void | Promise) { - this.callback = callback - } - - process(messages: Message[]) { - this.messageQueue.push(messages) - - if (!this.isProcessing) { - this.processQueue() - } - } - - private async processQueue() { - this.isProcessing = true - - while (this.messageQueue.length > 0) { - const messages = this.messageQueue.shift()! - - await this.callback(messages) - } - - this.isProcessing = false - } -} - -export class FetchError extends Error { - status: number - text?: string - json?: object - headers: Record - - constructor( - status: number, - text: string | undefined, - json: object | undefined, - headers: Record, - public url: string, - message?: string - ) { - super( - message || - `HTTP Error ${status} at ${url}: ${text ?? JSON.stringify(json)}` - ) - this.name = `FetchError` - this.status = status - this.text = text - this.json = json - this.headers = headers - } +export interface ShapeStreamInterface { + subscribe( + callback: (messages: Message[]) => MaybePromise, + onError?: (error: FetchError | Error) => void + ): void + unsubscribeAllUpToDateSubscribers(): void + unsubscribeAll(): void + subscribeOnceToUpToDate( + callback: () => MaybePromise, + error: (err: FetchError | Error) => void + ): () => void - static async fromResponse( - response: Response, - url: string - ): Promise { - const status = response.status - const headers = Object.fromEntries([...response.headers.entries()]) - let text: string | undefined = undefined - let json: object | undefined = undefined - - const contentType = response.headers.get(`content-type`) - if (contentType && contentType.includes(`application/json`)) { - json = (await response.json()) as object - } else { - text = await response.text() - } + isLoading(): boolean + lastSynced(): number + isConnected(): boolean - return new FetchError(status, text, json, headers, url) - } + isUpToDate: boolean + shapeId?: string } /** @@ -169,89 +106,111 @@ export class FetchError extends Error { * aborter.abort() * ``` */ -export class ShapeStream { - private options: ShapeStreamOptions - private backoffOptions: BackoffOptions - private fetchClient: typeof fetch - private schema?: Schema - private subscribers = new Map< +export class ShapeStream + implements ShapeStreamInterface +{ + readonly options: ShapeStreamOptions + + readonly #fetchClient: typeof fetch + readonly #messageParser: MessageParser + + readonly #subscribers = new Map< number, - [MessageProcessor, ((error: Error) => void) | undefined] + [ + MessageProcessorInterface[]>, + ((error: Error) => void) | undefined, + ] >() - private upToDateSubscribers = new Map< + readonly #upToDateSubscribers = new Map< number, [() => void, (error: FetchError | Error) => void] >() - private lastOffset: Offset - private messageParser: MessageParser - private lastSyncedAt?: number // unix time - public isUpToDate: boolean = false - private connected: boolean = false - - shapeId?: string + #lastOffset: Offset + #lastSyncedAt?: number // unix time + #isUpToDate: boolean = false + #connected: boolean = false + #shapeId?: string + #schema?: Schema constructor(options: ShapeStreamOptions) { - this.validateOptions(options) + validateOptions(options) this.options = { subscribe: true, ...options } - this.lastOffset = this.options.offset ?? `-1` - this.shapeId = this.options.shapeId - this.messageParser = new MessageParser(options.parser) + this.#lastOffset = this.options.offset ?? `-1` + this.#shapeId = this.options.shapeId + this.#messageParser = new MessageParser(options.parser) - this.backoffOptions = options.backoffOptions ?? BackoffDefaults - this.fetchClient = + this.#fetchClient = createFetchWithBackoff( options.fetchClient ?? - ((...args: Parameters) => fetch(...args)) + ((...args: Parameters) => fetch(...args)), + { + ...(options.backoffOptions ?? BackoffDefaults), + onFailedAttempt: () => { + this.#connected = false + options.backoffOptions?.onFailedAttempt?.() + }, + } + ) this.start() } + get shapeId() { + return this.#shapeId + } + + get isUpToDate() { + return this.#isUpToDate + } + async start() { - this.isUpToDate = false + this.#isUpToDate = false const { url, where, signal } = this.options try { - while ((!signal?.aborted && !this.isUpToDate) || this.options.subscribe) { + while ( + (!signal?.aborted && !this.#isUpToDate) || + this.options.subscribe + ) { const fetchUrl = new URL(url) - if (where) fetchUrl.searchParams.set(`where`, where) - fetchUrl.searchParams.set(`offset`, this.lastOffset) + if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where) + fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, this.#lastOffset) - if (this.isUpToDate) { - fetchUrl.searchParams.set(`live`, `true`) + if (this.#isUpToDate) { + fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`) } - if (this.shapeId) { + if (this.#shapeId) { // This should probably be a header for better cache breaking? - fetchUrl.searchParams.set(`shape_id`, this.shapeId!) + fetchUrl.searchParams.set(SHAPE_ID_QUERY_PARAM, this.#shapeId!) } let response!: Response - try { - const maybeResponse = await this.fetchWithBackoff(fetchUrl) - if (maybeResponse) response = maybeResponse - else break + response = await this.#fetchClient(fetchUrl.toString(), { signal }) + this.#connected = true } catch (e) { + if (e instanceof FetchBackoffAbortError) break // interrupted if (!(e instanceof FetchError)) throw e // should never happen if (e.status == 400) { // The request is invalid, most likely because the shape has been deleted. // We should start from scratch, this will force the shape to be recreated. - this.reset() - this.publish(e.json as Message[]) + this.#reset() + this.#publish(e.json as Message[]) continue } else if (e.status == 409) { // Upon receiving a 409, we should start from scratch // with the newly provided shape ID - const newShapeId = e.headers[`x-electric-shape-id`] - this.reset(newShapeId) - this.publish(e.json as Message[]) + const newShapeId = e.headers[SHAPE_ID_HEADER] + this.#reset(newShapeId) + this.#publish(e.json as Message[]) continue } else if (e.status >= 400 && e.status < 500) { // Notify subscribers - this.sendErrorToUpToDateSubscribers(e) - this.sendErrorToSubscribers(e) + this.#sendErrorToUpToDateSubscribers(e) + this.#sendErrorToSubscribers(e) // 400 errors are not actionable without additional user input, so we're throwing them. throw e @@ -259,108 +218,93 @@ export class ShapeStream { } const { headers, status } = response - const shapeId = headers.get(`X-Electric-Shape-Id`) + const shapeId = headers.get(SHAPE_ID_HEADER) if (shapeId) { - this.shapeId = shapeId + this.#shapeId = shapeId } - const lastOffset = headers.get(`X-Electric-Chunk-Last-Offset`) + const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER) if (lastOffset) { - this.lastOffset = lastOffset as Offset + this.#lastOffset = lastOffset as Offset } const getSchema = (): Schema => { - const schemaHeader = headers.get(`X-Electric-Schema`) + const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER) return schemaHeader ? JSON.parse(schemaHeader) : {} } - this.schema = this.schema ?? getSchema() + this.#schema = this.#schema ?? getSchema() const messages = status === 204 ? `[]` : await response.text() if (status === 204) { // There's no content so we are live and up to date - this.lastSyncedAt = Date.now() + this.#lastSyncedAt = Date.now() } - const batch = this.messageParser.parse(messages, this.schema) + const batch = this.#messageParser.parse(messages, this.#schema) // Update isUpToDate if (batch.length > 0) { const lastMessage = batch[batch.length - 1] - if ( - isControlMessage(lastMessage) && - lastMessage.headers.control === `up-to-date` - ) { - this.lastSyncedAt = Date.now() - if (!this.isUpToDate) { - this.isUpToDate = true - this.notifyUpToDateSubscribers() + if (isUpToDateMessage(lastMessage)) { + this.#lastSyncedAt = Date.now() + if (!this.#isUpToDate) { + this.#isUpToDate = true + this.#notifyUpToDateSubscribers() } } - this.publish(batch) + this.#publish(batch) } } } finally { - this.connected = false + this.#connected = false } } subscribe( - callback: (messages: Message[]) => void | Promise, + callback: (messages: Message[]) => MaybePromise, onError?: (error: FetchError | Error) => void ) { const subscriptionId = Math.random() const subscriber = new MessageProcessor(callback) - this.subscribers.set(subscriptionId, [subscriber, onError]) + this.#subscribers.set(subscriptionId, [subscriber, onError]) return () => { - this.subscribers.delete(subscriptionId) + this.#subscribers.delete(subscriptionId) } } unsubscribeAll(): void { - this.subscribers.clear() - } - - private publish(messages: Message[]) { - this.subscribers.forEach(([subscriber, _]) => { - subscriber.process(messages) - }) - } - - private sendErrorToSubscribers(error: Error) { - this.subscribers.forEach(([_, errorFn]) => { - errorFn?.(error) - }) + this.#subscribers.clear() } subscribeOnceToUpToDate( - callback: () => void | Promise, + callback: () => MaybePromise, error: (err: FetchError | Error) => void ) { const subscriptionId = Math.random() - this.upToDateSubscribers.set(subscriptionId, [callback, error]) + this.#upToDateSubscribers.set(subscriptionId, [callback, error]) return () => { - this.upToDateSubscribers.delete(subscriptionId) + this.#upToDateSubscribers.delete(subscriptionId) } } unsubscribeAllUpToDateSubscribers(): void { - this.upToDateSubscribers.clear() + this.#upToDateSubscribers.clear() } /** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */ lastSynced(): number { - if (this.lastSyncedAt === undefined) return Infinity - return Date.now() - this.lastSyncedAt + if (this.#lastSyncedAt === undefined) return Infinity + return Date.now() - this.#lastSyncedAt } isConnected(): boolean { - return this.connected + return this.#connected } /** True during initial fetch. False afterwise. */ @@ -368,15 +312,26 @@ export class ShapeStream { return !this.isUpToDate } - private notifyUpToDateSubscribers() { - this.upToDateSubscribers.forEach(([callback]) => { + #publish(messages: Message[]) { + this.#subscribers.forEach(([subscriber, _]) => { + subscriber.process(messages) + }) + } + + #sendErrorToSubscribers(error: Error) { + this.#subscribers.forEach(([_, errorFn]) => { + errorFn?.(error) + }) + } + + #notifyUpToDateSubscribers() { + this.#upToDateSubscribers.forEach(([callback]) => { callback() }) } - private sendErrorToUpToDateSubscribers(error: FetchError | Error) { - // eslint-disable-next-line - this.upToDateSubscribers.forEach(([_, errorCallback]) => + #sendErrorToUpToDateSubscribers(error: FetchError | Error) { + this.#upToDateSubscribers.forEach(([_, errorCallback]) => errorCallback(error) ) } @@ -385,248 +340,33 @@ export class ShapeStream { * Resets the state of the stream, optionally with a provided * shape ID */ - private reset(shapeId?: string) { - this.lastOffset = `-1` - this.shapeId = shapeId - this.isUpToDate = false - this.connected = false - this.schema = undefined - } - - private validateOptions(options: ShapeStreamOptions): void { - if (!options.url) { - throw new Error(`Invalid shape option. It must provide the url`) - } - if (options.signal && !(options.signal instanceof AbortSignal)) { - throw new Error( - `Invalid signal option. It must be an instance of AbortSignal.` - ) - } - - if ( - options.offset !== undefined && - options.offset !== `-1` && - !options.shapeId - ) { - throw new Error( - `shapeId is required if this isn't an initial fetch (i.e. offset > -1)` - ) - } - } - - private async fetchWithBackoff(url: URL) { - const { initialDelay, maxDelay, multiplier } = this.backoffOptions - const signal = this.options.signal - - let delay = initialDelay - let attempt = 0 - - // eslint-disable-next-line no-constant-condition -- we're retrying with a lag until we get a non-500 response or the abort signal is triggered - while (true) { - try { - const result = await this.fetchClient(url.toString(), { signal }) - if (result.ok) { - if (this.options.subscribe) { - this.connected = true - } - return result - } else throw await FetchError.fromResponse(result, url.toString()) - } catch (e) { - this.connected = false - if (signal?.aborted) { - return undefined - } else if ( - e instanceof FetchError && - e.status >= 400 && - e.status < 500 - ) { - // Any client errors cannot be backed off on, leave it to the caller to handle. - throw e - } else { - // Exponentially backoff on errors. - // Wait for the current delay duration - await new Promise((resolve) => setTimeout(resolve, delay)) - - // Increase the delay for the next attempt - delay = Math.min(delay * multiplier, maxDelay) - - attempt++ - console.log(`Retry attempt #${attempt} after ${delay}ms`) - } - } - } + #reset(shapeId?: string) { + this.#lastOffset = `-1` + this.#shapeId = shapeId + this.#isUpToDate = false + this.#connected = false + this.#schema = undefined } } -/** - * A Shape is an object that subscribes to a shape log, - * keeps a materialised shape `.value` in memory and - * notifies subscribers when the value has changed. - * - * It can be used without a framework and as a primitive - * to simplify developing framework hooks. - * - * @constructor - * @param {ShapeStream} - the underlying shape stream - * @example - * ``` - * const shapeStream = new ShapeStream<{ foo: number }>(url: 'http://localhost:3000/v1/shape/foo'}) - * const shape = new Shape(shapeStream) - * ``` - * - * `value` returns a promise that resolves the Shape data once the Shape has been - * fully loaded (and when resuming from being offline): - * - * const value = await shape.value - * - * `valueSync` returns the current data synchronously: - * - * const value = shape.valueSync - * - * Subscribe to updates. Called whenever the shape updates in Postgres. - * - * shape.subscribe(shapeData => { - * console.log(shapeData) - * }) - */ -export class Shape { - private stream: ShapeStream - - private data: ShapeData = new Map() - private subscribers = new Map>() - public error: FetchError | false = false - private hasNotifiedSubscribersUpToDate: boolean = false - - constructor(stream: ShapeStream) { - this.stream = stream - this.stream.subscribe(this.process.bind(this), this.handleError.bind(this)) - const unsubscribe = this.stream.subscribeOnceToUpToDate( - () => { - unsubscribe() - }, - (e) => { - this.handleError(e) - throw e - } - ) - } - - lastSynced(): number { - return this.stream.lastSynced() - } - - isConnected(): boolean { - return this.stream.isConnected() - } - - /** True during initial fetch. False afterwise. */ - isLoading(): boolean { - return this.stream.isLoading() - } - - get value(): Promise> { - return new Promise((resolve) => { - if (this.stream.isUpToDate) { - resolve(this.valueSync) - } else { - const unsubscribe = this.stream.subscribeOnceToUpToDate( - () => { - unsubscribe() - resolve(this.valueSync) - }, - (e) => { - throw e - } - ) - } - }) - } - - get valueSync() { - return this.data - } - - subscribe(callback: ShapeChangedCallback): () => void { - const subscriptionId = Math.random() - - this.subscribers.set(subscriptionId, callback) - - return () => { - this.subscribers.delete(subscriptionId) - } - } - - unsubscribeAll(): void { - this.subscribers.clear() - } - - get numSubscribers() { - return this.subscribers.size +function validateOptions(options: Partial): void { + if (!options.url) { + throw new Error(`Invalid shape option. It must provide the url`) } - - private process(messages: Message[]): void { - let dataMayHaveChanged = false - let isUpToDate = false - let newlyUpToDate = false - - messages.forEach((message) => { - if (isChangeMessage(message)) { - dataMayHaveChanged = [`insert`, `update`, `delete`].includes( - message.headers.operation - ) - - switch (message.headers.operation) { - case `insert`: - this.data.set(message.key, message.value) - break - case `update`: - this.data.set(message.key, { - ...this.data.get(message.key)!, - ...message.value, - }) - break - case `delete`: - this.data.delete(message.key) - break - } - } - - if (isControlMessage(message)) { - switch (message.headers.control) { - case `up-to-date`: - isUpToDate = true - if (!this.hasNotifiedSubscribersUpToDate) { - newlyUpToDate = true - } - break - case `must-refetch`: - this.data.clear() - this.error = false - isUpToDate = false - newlyUpToDate = false - break - } - } - }) - - // Always notify subscribers when the Shape first is up to date. - // FIXME this would be cleaner with a simple state machine. - if (newlyUpToDate || (isUpToDate && dataMayHaveChanged)) { - this.hasNotifiedSubscribersUpToDate = true - this.notify() - } - } - - private handleError(e: Error): void { - if (e instanceof FetchError) { - this.error = e - this.notify() - } + if (options.signal && !(options.signal instanceof AbortSignal)) { + throw new Error( + `Invalid signal option. It must be an instance of AbortSignal.` + ) } - private notify(): void { - this.subscribers.forEach((callback) => { - callback(this.valueSync) - }) + if ( + options.offset !== undefined && + options.offset !== `-1` && + !options.shapeId + ) { + throw new Error( + `shapeId is required if this isn't an initial fetch (i.e. offset > -1)` + ) } + return } diff --git a/packages/typescript-client/src/constants.ts b/packages/typescript-client/src/constants.ts new file mode 100644 index 0000000000..bff2c797d5 --- /dev/null +++ b/packages/typescript-client/src/constants.ts @@ -0,0 +1,7 @@ +export const SHAPE_ID_HEADER = `x-electric-shape-id` +export const CHUNK_LAST_OFFSET_HEADER = `x-electric-chunk-last-offset` +export const SHAPE_SCHEMA_HEADER = `x-electric-schema` +export const SHAPE_ID_QUERY_PARAM = `shape_id` +export const OFFSET_QUERY_PARAM = `offset` +export const WHERE_QUERY_PARAM = `where` +export const LIVE_QUERY_PARAM = `live` diff --git a/packages/typescript-client/src/error.ts b/packages/typescript-client/src/error.ts new file mode 100644 index 0000000000..b1cf1d5230 --- /dev/null +++ b/packages/typescript-client/src/error.ts @@ -0,0 +1,50 @@ +export class FetchError extends Error { + status: number + text?: string + json?: object + headers: Record + + constructor( + status: number, + text: string | undefined, + json: object | undefined, + headers: Record, + public url: string, + message?: string + ) { + super( + message || + `HTTP Error ${status} at ${url}: ${text ?? JSON.stringify(json)}` + ) + this.name = `FetchError` + this.status = status + this.text = text + this.json = json + this.headers = headers + } + + static async fromResponse( + response: Response, + url: string + ): Promise { + const status = response.status + const headers = Object.fromEntries([...response.headers.entries()]) + let text: string | undefined = undefined + let json: object | undefined = undefined + + const contentType = response.headers.get(`content-type`) + if (contentType && contentType.includes(`application/json`)) { + json = (await response.json()) as object + } else { + text = await response.text() + } + + return new FetchError(status, text, json, headers, url) + } +} + +export class FetchBackoffAbortError extends Error { + constructor() { + super(`Fetch with backoff aborted`) + } +} diff --git a/packages/typescript-client/src/fetch.ts b/packages/typescript-client/src/fetch.ts new file mode 100644 index 0000000000..85684e9aa9 --- /dev/null +++ b/packages/typescript-client/src/fetch.ts @@ -0,0 +1,79 @@ +import { FetchError, FetchBackoffAbortError } from './error' + +export interface BackoffOptions { + /** + * Initial delay before retrying in milliseconds + */ + initialDelay: number + /** + * Maximum retry delay in milliseconds + */ + maxDelay: number + multiplier: number + onFailedAttempt?: () => void + debug?: boolean +} + +export const BackoffDefaults = { + initialDelay: 100, + maxDelay: 10_000, + multiplier: 1.3, +} + +export function createFetchWithBackoff( + fetchClient: typeof fetch, + backoffOptions: BackoffOptions = BackoffDefaults +): typeof fetch { + const { + initialDelay, + maxDelay, + multiplier, + debug = false, + onFailedAttempt, + } = backoffOptions + return async (...args: Parameters): Promise => { + const url = args[0] + const options = args[1] + + let delay = initialDelay + let attempt = 0 + + /* eslint-disable no-constant-condition -- we re-fetch the shape log + * continuously until we get a non-ok response. For recoverable errors, + * we retry the fetch with exponential backoff. Users can pass in an + * AbortController to abort the fetching an any point. + * */ + while (true) { + /* eslint-enable no-constant-condition */ + try { + const result = await fetchClient(...args) + if (result.ok) return result + else throw await FetchError.fromResponse(result, url.toString()) + } catch (e) { + onFailedAttempt?.() + if (options?.signal?.aborted) { + throw new FetchBackoffAbortError() + } else if ( + e instanceof FetchError && + e.status >= 400 && + e.status < 500 + ) { + // Any client errors cannot be backed off on, leave it to the caller to handle. + throw e + } else { + // Exponentially backoff on errors. + // Wait for the current delay duration + await new Promise((resolve) => setTimeout(resolve, delay)) + + // Increase the delay for the next attempt + delay = Math.min(delay * multiplier, maxDelay) + + if (debug) { + attempt++ + console.log(`Retry attempt #${attempt} after ${delay}ms`) + } + } + } + } + } +} diff --git a/packages/typescript-client/src/helpers.ts b/packages/typescript-client/src/helpers.ts index 1f3a53cd84..c3bb5ab299 100644 --- a/packages/typescript-client/src/helpers.ts +++ b/packages/typescript-client/src/helpers.ts @@ -45,3 +45,9 @@ export function isControlMessage( ): message is ControlMessage { return !isChangeMessage(message) } + +export function isUpToDateMessage( + message: Message +): message is ControlMessage & { up_to_date: true } { + return isControlMessage(message) && message.headers.control === `up-to-date` +} diff --git a/packages/typescript-client/src/index.ts b/packages/typescript-client/src/index.ts index c4226cbccb..8c4270e669 100644 --- a/packages/typescript-client/src/index.ts +++ b/packages/typescript-client/src/index.ts @@ -1,3 +1,6 @@ export * from './client' +export * from './shape' export * from './types' -export * from './helpers' +export { isChangeMessage, isControlMessage } from './helpers' +export { FetchError } from './error' +export { type BackoffOptions, BackoffDefaults } from './fetch' diff --git a/packages/typescript-client/src/queue.ts b/packages/typescript-client/src/queue.ts new file mode 100644 index 0000000000..0c2932c35d --- /dev/null +++ b/packages/typescript-client/src/queue.ts @@ -0,0 +1,62 @@ +import { MaybePromise } from './types' + +function isThenable(value: MaybePromise): value is Promise { + return ( + !!value && + typeof value === `object` && + `then` in value && + typeof value.then === `function` + ) +} + +/** + * Processes messages asynchronously in order. + */ +export class AsyncProcessingQueue { + #processingChain: MaybePromise = undefined + + public process(callback: () => MaybePromise): MaybePromise { + this.#processingChain = isThenable(this.#processingChain) + ? this.#processingChain.then(callback) + : callback() + return this.#processingChain + } + + public async waitForProcessing(): Promise { + let currentChain: MaybePromise + do { + currentChain = this.#processingChain + await currentChain + } while (this.#processingChain !== currentChain) + } +} + +export interface MessageProcessorInterface { + process(messages: T): MaybePromise + waitForProcessing(): Promise +} + +/** + * Receives messages, puts them on a queue and processes + * them synchronously or asynchronously by passing to a + * registered callback function. + * + * @constructor + * @param {(message: T) => MaybePromise} callback function + */ +export class MessageProcessor implements MessageProcessorInterface { + readonly #queue = new AsyncProcessingQueue() + readonly #callback: (messages: T) => MaybePromise + + constructor(callback: (messages: T) => MaybePromise) { + this.#callback = callback + } + + public process(messages: T): void { + this.#queue.process(() => this.#callback(messages)) + } + + public async waitForProcessing(): Promise { + await this.#queue.waitForProcessing() + } +} diff --git a/packages/typescript-client/src/shape.ts b/packages/typescript-client/src/shape.ts new file mode 100644 index 0000000000..fff04d53cd --- /dev/null +++ b/packages/typescript-client/src/shape.ts @@ -0,0 +1,189 @@ +import { Message, Row } from './types' +import { isChangeMessage, isControlMessage } from './helpers' +import { FetchError } from './error' +import { ShapeStreamInterface } from './client' + +export type ShapeData = Map +export type ShapeChangedCallback = ( + value: ShapeData +) => void + +/** + * A Shape is an object that subscribes to a shape log, + * keeps a materialised shape `.value` in memory and + * notifies subscribers when the value has changed. + * + * It can be used without a framework and as a primitive + * to simplify developing framework hooks. + * + * @constructor + * @param {ShapeStream} - the underlying shape stream + * @example + * ``` + * const shapeStream = new ShapeStream<{ foo: number }>(url: 'http://localhost:3000/v1/shape/foo'}) + * const shape = new Shape(shapeStream) + * ``` + * + * `value` returns a promise that resolves the Shape data once the Shape has been + * fully loaded (and when resuming from being offline): + * + * const value = await shape.value + * + * `valueSync` returns the current data synchronously: + * + * const value = shape.valueSync + * + * Subscribe to updates. Called whenever the shape updates in Postgres. + * + * shape.subscribe(shapeData => { + * console.log(shapeData) + * }) + */ +export class Shape { + readonly #stream: ShapeStreamInterface + + readonly #data: ShapeData = new Map() + readonly #subscribers = new Map>() + + #hasNotifiedSubscribersUpToDate: boolean = false + #error: FetchError | false = false + + constructor(stream: ShapeStreamInterface) { + this.#stream = stream + this.#stream.subscribe( + this.#process.bind(this), + this.#handleError.bind(this) + ) + const unsubscribe = this.#stream.subscribeOnceToUpToDate( + () => { + unsubscribe() + }, + (e) => { + this.#handleError(e) + throw e + } + ) + } + + get isUpToDate(): boolean { + return this.#stream.isUpToDate + } + + get value(): Promise> { + return new Promise((resolve, reject) => { + if (this.#stream.isUpToDate) { + resolve(this.valueSync) + } else { + const unsubscribe = this.subscribe((shapeData) => { + unsubscribe() + if (this.#error) reject(this.#error) + resolve(shapeData) + }) + } + }) + } + + get valueSync() { + return this.#data + } + + get error() { + return this.#error + } + + lastSynced() { + return this.#stream.lastSynced() + } + + isLoading() { + return this.#stream.isLoading() + } + + isConnected(): boolean { + return this.#stream.isConnected() + } + + subscribe(callback: ShapeChangedCallback): () => void { + const subscriptionId = Math.random() + + this.#subscribers.set(subscriptionId, callback) + + return () => { + this.#subscribers.delete(subscriptionId) + } + } + + unsubscribeAll(): void { + this.#subscribers.clear() + } + + get numSubscribers() { + return this.#subscribers.size + } + + #process(messages: Message[]): void { + let dataMayHaveChanged = false + let isUpToDate = false + let newlyUpToDate = false + + messages.forEach((message) => { + if (isChangeMessage(message)) { + dataMayHaveChanged = [`insert`, `update`, `delete`].includes( + message.headers.operation + ) + + switch (message.headers.operation) { + case `insert`: + this.#data.set(message.key, message.value) + break + case `update`: + this.#data.set(message.key, { + ...this.#data.get(message.key)!, + ...message.value, + }) + break + case `delete`: + this.#data.delete(message.key) + break + } + } + + if (isControlMessage(message)) { + switch (message.headers.control) { + case `up-to-date`: + isUpToDate = true + if (!this.#hasNotifiedSubscribersUpToDate) { + newlyUpToDate = true + } + break + case `must-refetch`: + this.#data.clear() + this.#error = false + isUpToDate = false + newlyUpToDate = false + break + } + } + }) + + // Always notify subscribers when the Shape first is up to date. + // FIXME this would be cleaner with a simple state machine. + if (newlyUpToDate || (isUpToDate && dataMayHaveChanged)) { + this.#hasNotifiedSubscribersUpToDate = true + this.#notify() + } + } + + #handleError(e: Error): void { + if (e instanceof FetchError) { + this.#error = e + this.#notify() + } + } + + #notify(): void { + this.#subscribers.forEach((callback) => { + callback(this.valueSync) + }) + } +} diff --git a/packages/typescript-client/src/types.ts b/packages/typescript-client/src/types.ts index c74c9e76cc..677b2fce73 100644 --- a/packages/typescript-client/src/types.ts +++ b/packages/typescript-client/src/types.ts @@ -108,3 +108,5 @@ export type TypedMessages = { messages: Array> schema: ColumnInfo } + +export type MaybePromise = T | Promise diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index e188c0e891..1e204589cb 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -2,7 +2,7 @@ import { describe, expect, inject, vi } from 'vitest' import { v4 as uuidv4 } from 'uuid' import { setTimeout as sleep } from 'node:timers/promises' import { testWithIssuesTable as it } from './support/test-context' -import { ShapeStream, Shape, FetchError } from '../src/client' +import { ShapeStream, Shape, FetchError } from '../src' const BASE_URL = inject(`baseUrl`) @@ -135,10 +135,12 @@ describe(`Shape`, () => { const fetchWrapper = async (...args: Parameters) => { // clear the shape and modify the data after the initial request if (requestsMade === 1) { - await clearIssuesShape() // new shape data should have just second issue and not first await deleteIssue({ id: id1, title: `foo1` }) await insertIssues({ id: id2, title: `foo2` }) + await sleep(100) + await clearIssuesShape(shapeStream.shapeId) + rotationTime = Date.now() } diff --git a/packages/typescript-client/test/error.test.ts b/packages/typescript-client/test/error.test.ts new file mode 100644 index 0000000000..9a3bf7f3ec --- /dev/null +++ b/packages/typescript-client/test/error.test.ts @@ -0,0 +1,128 @@ +import { describe, it, expect, vi } from 'vitest' +import { FetchError } from '../src/error' + +describe(`FetchError`, () => { + it(`should create a FetchError with the correct properties`, () => { + const status = 404 + const text = `Not Found` + const json = undefined + const headers = { 'content-type': `text/plain` } + const url = `https://example.com/notfound` + + const error = new FetchError(status, text, json, headers, url) + + expect(error).toBeInstanceOf(Error) + expect(error.name).toBe(`FetchError`) + expect(error.status).toBe(status) + expect(error.text).toBe(text) + expect(error.json).toBe(json) + expect(error.headers).toEqual(headers) + expect(error.url).toBe(url) + expect(error.message).toBe( + `HTTP Error 404 at https://example.com/notfound: Not Found` + ) + }) + + it(`should create a FetchError with a JSON response and use the JSON in the message`, () => { + const status = 500 + const text = undefined + const json = { error: `Internal Server Error` } + const headers = { 'content-type': `application/json` } + const url = `https://example.com/servererror` + + const error = new FetchError(status, text, json, headers, url) + + expect(error.status).toBe(status) + expect(error.text).toBeUndefined() + expect(error.json).toEqual(json) + expect(error.headers).toEqual(headers) + expect(error.message).toBe( + `HTTP Error 500 at https://example.com/servererror: {"error":"Internal Server Error"}` + ) + }) + + it(`should create a FetchError with a custom message if provided`, () => { + const status = 403 + const text = `Forbidden` + const json = undefined + const headers = { 'content-type': `text/plain` } + const url = `https://example.com/forbidden` + const customMessage = `Custom Error Message` + + const error = new FetchError( + status, + text, + json, + headers, + url, + customMessage + ) + + expect(error.message).toBe(customMessage) + }) + + describe(`fromResponse`, () => { + it(`should create a FetchError from a text-based response`, async () => { + const mockResponse = { + status: 404, + headers: new Headers({ 'content-type': `text/plain` }), + text: vi.fn().mockResolvedValue(`Not Found`), + } as unknown as Response + + const url = `https://example.com/notfound` + const error = await FetchError.fromResponse(mockResponse, url) + + expect(mockResponse.text).toHaveBeenCalled() + expect(error).toBeInstanceOf(FetchError) + expect(error.status).toBe(404) + expect(error.text).toBe(`Not Found`) + expect(error.json).toBeUndefined() + expect(error.headers).toEqual({ 'content-type': `text/plain` }) + expect(error.message).toBe( + `HTTP Error 404 at https://example.com/notfound: Not Found` + ) + }) + + it(`should create a FetchError from a JSON-based response`, async () => { + const mockResponse = { + status: 500, + headers: new Headers({ 'content-type': `application/json` }), + json: vi.fn().mockResolvedValue({ error: `Internal Server Error` }), + } as unknown as Response + + const url = `https://example.com/servererror` + const error = await FetchError.fromResponse(mockResponse, url) + + expect(mockResponse.json).toHaveBeenCalled() + expect(error).toBeInstanceOf(FetchError) + expect(error.status).toBe(500) + expect(error.text).toBeUndefined() + expect(error.json).toEqual({ error: `Internal Server Error` }) + expect(error.headers).toEqual({ 'content-type': `application/json` }) + expect(error.message).toBe( + `HTTP Error 500 at https://example.com/servererror: {"error":"Internal Server Error"}` + ) + }) + + it(`should handle content-type not set in response headers`, async () => { + const mockResponse = { + status: 500, + headers: new Headers(), + text: vi.fn().mockResolvedValue(`Server error with no content-type`), + } as unknown as Response + + const url = `https://example.com/no-content-type` + const error = await FetchError.fromResponse(mockResponse, url) + + expect(mockResponse.text).toHaveBeenCalled() + expect(error).toBeInstanceOf(FetchError) + expect(error.status).toBe(500) + expect(error.text).toBe(`Server error with no content-type`) + expect(error.json).toBeUndefined() + expect(error.headers).toEqual({}) + expect(error.message).toBe( + `HTTP Error 500 at https://example.com/no-content-type: Server error with no content-type` + ) + }) + }) +}) diff --git a/packages/typescript-client/test/fetch.test.ts b/packages/typescript-client/test/fetch.test.ts new file mode 100644 index 0000000000..98b6932953 --- /dev/null +++ b/packages/typescript-client/test/fetch.test.ts @@ -0,0 +1,146 @@ +import { describe, beforeEach, it, expect, vi, type Mock } from 'vitest' +import { FetchError, FetchBackoffAbortError } from '../src/error' +import { createFetchWithBackoff, BackoffDefaults } from '../src/fetch' + +describe(`createFetchWithBackoff`, () => { + const initialDelay = 10 + const maxDelay = 100 + let mockFetchClient: Mock + + beforeEach(() => { + mockFetchClient = vi.fn() + }) + + it(`should return a successful response on the first attempt`, async () => { + const mockResponse = new Response(null, { status: 200, statusText: `OK` }) + mockFetchClient.mockResolvedValue(mockResponse) + + const fetchWithBackoff = createFetchWithBackoff(mockFetchClient) + + const result = await fetchWithBackoff(`https://example.com`) + + expect(mockFetchClient).toHaveBeenCalledTimes(1) + expect(result.ok).toBe(true) + expect(result).toEqual(mockResponse) + }) + + it(`should retry the request on a 500 response and succeed after a retry`, async () => { + const mockErrorResponse = new Response(null, { status: 500 }) + const mockSuccessResponse = new Response(null, { + status: 200, + statusText: `OK`, + }) + mockFetchClient + .mockResolvedValueOnce(mockErrorResponse) + .mockResolvedValueOnce(mockSuccessResponse) + + const fetchWithBackoff = createFetchWithBackoff(mockFetchClient, { + ...BackoffDefaults, + initialDelay, + }) + + const result = await fetchWithBackoff(`https://example.com`) + + expect(mockFetchClient).toHaveBeenCalledTimes(2) + expect(result.ok).toBe(true) + }) + + it(`should apply exponential backoff and retry until maxDelay is reached`, async () => { + const mockErrorResponse = new Response(null, { status: 500 }) + const mockSuccessResponse = new Response(null, { + status: 200, + statusText: `OK`, + }) + mockFetchClient + .mockResolvedValueOnce(mockErrorResponse) + .mockResolvedValueOnce(mockErrorResponse) + .mockResolvedValueOnce(mockErrorResponse) + .mockResolvedValueOnce(mockSuccessResponse) + + const multiplier = 2 + + const fetchWithBackoff = createFetchWithBackoff(mockFetchClient, { + initialDelay, + maxDelay, + multiplier, + }) + + const result = await fetchWithBackoff(`https://example.com`) + + expect(mockFetchClient).toHaveBeenCalledTimes(4) + expect(result.ok).toBe(true) + }) + + it(`should stop retrying and throw an error on a 400 response`, async () => { + const mockErrorResponse = new Response(null, { + status: 400, + statusText: `Bad Request`, + }) + mockFetchClient.mockResolvedValue(mockErrorResponse) + + const fetchWithBackoff = createFetchWithBackoff(mockFetchClient) + + await expect(fetchWithBackoff(`https://example.com`)).rejects.toThrow( + FetchError + ) + expect(mockFetchClient).toHaveBeenCalledTimes(1) + }) + + it(`should throw FetchBackoffAborted if the abort signal is triggered`, async () => { + const mockAbortController = new AbortController() + const signal = mockAbortController.signal + const mockErrorResponse = new Response(null, { status: 500 }) + mockFetchClient.mockImplementation( + () => new Promise((res) => setTimeout(() => res(mockErrorResponse), 10)) + ) + + const fetchWithBackoff = createFetchWithBackoff(mockFetchClient, { + ...BackoffDefaults, + initialDelay: 1000, + }) + + setTimeout(() => mockAbortController.abort(), 5) + + await expect( + fetchWithBackoff(`https://example.com`, { signal }) + ).rejects.toThrow(FetchBackoffAbortError) + + expect(mockFetchClient).toHaveBeenCalledTimes(1) + }) + + it(`should not retry when a client error (4xx) occurs`, async () => { + const mockErrorResponse = new Response(null, { + status: 403, + statusText: `Forbidden`, + }) + mockFetchClient.mockResolvedValue(mockErrorResponse) + + const fetchWithBackoff = createFetchWithBackoff( + mockFetchClient, + BackoffDefaults + ) + + await expect(fetchWithBackoff(`https://example.com`)).rejects.toThrow( + FetchError + ) + expect(mockFetchClient).toHaveBeenCalledTimes(1) + }) + + // it(`should retry multiple times and eventually throw if no success`, async () => { + // const mockErrorResponse = new Response(null, { status: 500 }) + // mockFetchClient.mockImplementation( + // () => new Promise((res) => setTimeout(() => res(mockErrorResponse), 10)) + // ) + + // const fetchWithBackoff = createFetchWithBackoff(mockFetchClient, { + // ...BackoffDefaults, + // initialDelay, + // maxDelay, + // }) + + // await expect(fetchWithBackoff(`https://example.com`)).rejects.toThrow( + // FetchError + // ) + // expect(mockFetchClient.mock.calls.length).greaterThan(1) + // }) +}) diff --git a/packages/typescript-client/test/helpers.test.ts b/packages/typescript-client/test/helpers.test.ts index d52cf67915..ccdad10275 100644 --- a/packages/typescript-client/test/helpers.test.ts +++ b/packages/typescript-client/test/helpers.test.ts @@ -1,28 +1,44 @@ import { describe, expect, it } from 'vitest' import { isChangeMessage, isControlMessage, Message } from '../src' +import { isUpToDateMessage } from '../src/helpers' describe(`helpers`, () => { - it(`should correctly detect ChangeMessages`, () => { - const message = { - headers: { - operation: `insert`, - }, - offset: `-1`, - key: `key`, - value: { key: `value` }, - } as Message + const changeMsg = { + headers: { + operation: `insert`, + }, + offset: `-1`, + key: `key`, + value: { key: `value` }, + } as Message + + const upToDateMsg = { + headers: { + control: `up-to-date`, + }, + } as Message + + const mustRefetchMsg = { + headers: { + control: `must-refetch`, + }, + } as Message - expect(isChangeMessage(message)).toBe(true) - expect(isControlMessage(message)).toBe(false) + it(`should correctly detect ChangeMessages`, () => { + expect(isChangeMessage(changeMsg)).toBe(true) + expect(isControlMessage(changeMsg)).toBe(false) }) it(`should correctly detect ControlMessages`, () => { - const message = { - headers: { - control: `up-to-date`, - }, - } as Message - expect(isControlMessage(message)).toBe(true) - expect(isChangeMessage(message)).toBe(false) + expect(isControlMessage(upToDateMsg)).toBe(true) + expect(isControlMessage(mustRefetchMsg)).toBe(true) + expect(isChangeMessage(upToDateMsg)).toBe(false) + expect(isChangeMessage(mustRefetchMsg)).toBe(false) + }) + + it(`should correctly detect up-to-date message`, () => { + expect(isUpToDateMessage(upToDateMsg)).toBe(true) + expect(isUpToDateMessage(mustRefetchMsg)).toBe(false) + expect(isUpToDateMessage(changeMsg)).toBe(false) }) }) diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index 04b94c4aeb..8ea78c1b04 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -2,9 +2,9 @@ import { parse } from 'cache-control-parser' import { setTimeout as sleep } from 'node:timers/promises' import { v4 as uuidv4 } from 'uuid' import { assert, describe, expect, inject, vi } from 'vitest' -import { Shape, ShapeStream } from '../src/client' -import { Message, Offset, Row } from '../src/types' -import { isChangeMessage, isControlMessage } from '../src' +import { Shape, ShapeStream } from '../src' +import { Message, Offset } from '../src/types' +import { isChangeMessage, isUpToDateMessage } from '../src/helpers' import { IssueRow, testWithIssuesTable as it, @@ -14,9 +14,6 @@ import * as h from './support/test-helpers' const BASE_URL = inject(`baseUrl`) -const isUpToDateMessage = (msg: Message) => - isControlMessage(msg) && msg.headers.control === `up-to-date` - it(`sanity check`, async ({ dbClient, issuesTableSql }) => { const result = await dbClient.query(`SELECT * FROM ${issuesTableSql}`) @@ -433,11 +430,12 @@ describe(`HTTP Sync`, () => { issuesTableUrl, insertIssues, }) => { + // initialize storage for the cases where persisted shape streams are tested await insertIssues({ title: `foo1` }, { title: `foo2` }, { title: `foo3` }) await sleep(50) let lastOffset: Offset = `-1` - const issueStream = new ShapeStream({ + const issueStream = new ShapeStream({ url: `${BASE_URL}/v1/shape/${issuesTableUrl}`, signal: aborter.signal, subscribe: false, @@ -474,7 +472,8 @@ describe(`HTTP Sync`, () => { offset: lastOffset, shapeId: issueStream.shapeId, }) - await h.forEachMessage(newIssueStream, aborter, (res, msg, nth) => { + + await h.forEachMessage(newIssueStream, newAborter, (res, msg, nth) => { if (isUpToDateMessage(msg)) { res() } else { diff --git a/packages/typescript-client/test/queue.test.ts b/packages/typescript-client/test/queue.test.ts new file mode 100644 index 0000000000..3f5e884113 --- /dev/null +++ b/packages/typescript-client/test/queue.test.ts @@ -0,0 +1,170 @@ +import { describe, it, expect, vi } from 'vitest' +import { AsyncProcessingQueue, MessageProcessor } from '../src/queue' + +describe(`AsyncProcessingQueue`, () => { + it(`should process synchronous callbacks in order`, () => { + let last = 0 + const cb1 = vi.fn().mockImplementationOnce(() => (last = 1)) + const cb2 = vi.fn().mockImplementationOnce(() => (last = 2)) + const processor = new AsyncProcessingQueue() + + processor.process(cb1) + processor.process(cb2) + + expect(cb1).toHaveBeenCalledOnce() + expect(cb2).toHaveBeenCalledOnce() + expect(last).toBe(2) + }) + + it(`should process asynchronous callbacks in order`, async () => { + let last = 0 + const cb1 = vi + .fn() + .mockImplementationOnce( + () => new Promise((res) => setTimeout(() => res((last = 1)), 10)) + ) + const cb2 = vi + .fn() + .mockImplementationOnce( + () => new Promise((res) => setTimeout(() => res((last = 2)), 5)) + ) + const processor = new AsyncProcessingQueue() + + processor.process(cb1) + processor.process(cb2) + + expect(cb1).toHaveBeenCalledOnce() + expect(cb2).not.toHaveBeenCalled() + + await processor.waitForProcessing() + expect(cb1).toHaveBeenCalledOnce() + expect(cb2).toHaveBeenCalledOnce() + expect(last).toBe(2) + }) + + it(`should process both async and sync callbacks in order`, async () => { + let last = 0 + const cb1 = vi.fn().mockImplementation(() => (last = 1)) + const cb2 = vi + .fn() + .mockImplementationOnce( + () => new Promise((res) => setTimeout(() => res((last = 2)), 10)) + ) + const cb3 = vi.fn().mockImplementation(() => (last = 3)) + const cb4 = vi + .fn() + .mockImplementationOnce( + () => new Promise((res) => setTimeout(() => res((last = 4)), 5)) + ) + + const processor = new AsyncProcessingQueue() + + processor.process(cb1) + processor.process(cb2) + processor.process(cb3) + processor.process(cb4) + + expect(cb1).toHaveBeenCalledOnce() + expect(cb2).toHaveBeenCalledOnce() + expect(cb3).not.toHaveBeenCalled() + expect(cb4).not.toHaveBeenCalled() + expect(last).toBe(1) // only sync has been called so far + + await processor.waitForProcessing() + + expect(cb3).toHaveBeenCalledOnce() + expect(cb4).toHaveBeenCalledOnce() + expect(last).toBe(4) + }) + + it(`should complete processing when waitForProcessing is called multiple times`, async () => { + const cb = vi.fn(() => Promise.resolve()) + const processor = new AsyncProcessingQueue() + + processor.process(cb) + await processor.waitForProcessing() // First call + + processor.process(cb) + await processor.waitForProcessing() // Second call + + expect(cb).toHaveBeenCalledTimes(2) + }) + + it(`should not resolve waitForProcessing if calls are added`, async () => { + const resolvers: Array<() => void> = [] + let finishedProcessing = false + + const cb = vi.fn( + () => new Promise((resolve) => resolvers.push(resolve)) + ) + const processor = new AsyncProcessingQueue() + + processor.process(cb) + processor.waitForProcessing().then(() => { + finishedProcessing = true + }) + + processor.process(cb) + await new Promise((res) => setTimeout(res)) + expect(finishedProcessing).toBe(false) + + resolvers[0]!() + await new Promise((res) => setTimeout(res)) + expect(finishedProcessing).toBe(false) + + resolvers[1]!() + await new Promise((res) => setTimeout(res)) + expect(finishedProcessing).toBe(true) + expect(cb).toHaveBeenCalledTimes(2) + }) +}) + +describe(`MessageProcessor`, () => { + it(`should queue up both async and sync processing in order`, async () => { + const callback = vi + .fn() + .mockImplementationOnce(() => {}) + .mockImplementationOnce(() => Promise.resolve()) + .mockImplementationOnce(() => {}) + .mockImplementationOnce(() => Promise.resolve()) + + const processor = new MessageProcessor(callback) + const messages1 = [`msg1`] + const messages2 = [`msg2`] + const messages3 = [`msg3`] + const messages4 = [`msg4`] + + processor.process(messages1) + processor.process(messages2) + processor.process(messages3) + processor.process(messages4) + + await processor.waitForProcessing() + + expect(callback).toHaveBeenCalledTimes(4) + expect(callback).toHaveBeenNthCalledWith(1, messages1) + expect(callback).toHaveBeenNthCalledWith(2, messages2) + expect(callback).toHaveBeenNthCalledWith(3, messages3) + expect(callback).toHaveBeenNthCalledWith(4, messages4) + }) + + it(`should process messages sequentially with slow asynchronous callbacks`, async () => { + const callback = vi.fn( + () => new Promise((resolve) => setTimeout(() => resolve(), 50)) + ) + const processor = new MessageProcessor(callback) + const messages1 = [`msg1`] + const messages2 = [`msg2`] + const messages3 = [`msg3`] + + processor.process(messages1) + processor.process(messages2) + processor.process(messages3) + + await processor.waitForProcessing() + + expect(callback).toHaveBeenNthCalledWith(1, messages1) + expect(callback).toHaveBeenNthCalledWith(2, messages2) + expect(callback).toHaveBeenNthCalledWith(3, messages3) + }) +}) diff --git a/packages/typescript-client/test/support/global-setup.ts b/packages/typescript-client/test/support/global-setup.ts index e1aecb81c5..e73615e6d5 100644 --- a/packages/typescript-client/test/support/global-setup.ts +++ b/packages/typescript-client/test/support/global-setup.ts @@ -1,5 +1,5 @@ import type { GlobalSetupContext } from 'vitest/node' -import { FetchError } from '../../src/client' +import { FetchError } from '../../src/error' import { makePgClient } from './test-helpers' const url = process.env.ELECTRIC_URL ?? `http://localhost:3000` diff --git a/packages/typescript-client/test/support/test-context.ts b/packages/typescript-client/test/support/test-context.ts index 49b13fd4dd..6a593b0c55 100644 --- a/packages/typescript-client/test/support/test-context.ts +++ b/packages/typescript-client/test/support/test-context.ts @@ -3,7 +3,7 @@ import { v4 as uuidv4 } from 'uuid' import { Client, QueryResult } from 'pg' import { inject, test } from 'vitest' import { makePgClient } from './test-helpers' -import { FetchError } from '../../src/client' +import { FetchError } from '../../src/error' export type IssueRow = { id: string; title: string; priority?: string } export type GeneratedIssueRow = { id?: string; title: string } diff --git a/packages/typescript-client/test/support/test-helpers.ts b/packages/typescript-client/test/support/test-helpers.ts index 20d3c18746..6b6c52fdec 100644 --- a/packages/typescript-client/test/support/test-helpers.ts +++ b/packages/typescript-client/test/support/test-helpers.ts @@ -1,4 +1,4 @@ -import { ShapeStream } from '../../src/client' +import { ShapeStreamInterface } from '../../src/client' import { Client, ClientConfig } from 'pg' import { Message, Row } from '../../src/types' @@ -15,7 +15,7 @@ export function makePgClient(overrides: ClientConfig = {}) { } export function forEachMessage( - stream: ShapeStream, + stream: ShapeStreamInterface, controller: AbortController, handler: ( resolve: () => void, diff --git a/packages/typescript-client/tsup.config.ts b/packages/typescript-client/tsup.config.ts index ce2388c4af..075298b6c3 100644 --- a/packages/typescript-client/tsup.config.ts +++ b/packages/typescript-client/tsup.config.ts @@ -1,14 +1,15 @@ import type { Options } from 'tsup' import { defineConfig } from 'tsup' -export default defineConfig(options => { +export default defineConfig((options) => { + const entry = { + index: 'src/index.ts', + } const commonOptions: Partial = { - entry: { - index: 'src/index.ts' - }, + entry, tsconfig: `./tsconfig.build.json`, sourcemap: true, - ...options + ...options, } return [ @@ -18,7 +19,7 @@ export default defineConfig(options => { format: ['esm'], outExtension: () => ({ js: '.mjs' }), // Add dts: '.d.ts' when egoist/tsup#1053 lands dts: true, - clean: true + clean: true, }, // Support Webpack 4 by pointing `"module"` to a file with a `.js` extension { @@ -27,26 +28,33 @@ export default defineConfig(options => { target: 'es2017', dts: false, outExtension: () => ({ js: '.js' }), - entry: { 'index.legacy-esm': 'src/index.ts' } + entry: Object.fromEntries( + Object.entries(entry).map(([key, value]) => [ + `${key}.legacy-esm`, + value, + ]) + ), }, // Browser-ready ESM, production + minified { ...commonOptions, - entry: { - 'index.browser': 'src/index.ts' - }, + + entry: Object.fromEntries( + Object.entries(entry).map(([key, value]) => [`${key}.browser`, value]) + ), + define: { - 'process.env.NODE_ENV': JSON.stringify('production') + 'process.env.NODE_ENV': JSON.stringify('production'), }, format: ['esm'], outExtension: () => ({ js: '.mjs' }), - minify: true + minify: true, }, { ...commonOptions, format: 'cjs', outDir: './dist/cjs/', - outExtension: () => ({ js: '.cjs' }) - } + outExtension: () => ({ js: '.cjs' }), + }, ] })