diff --git a/src/computed.ts b/src/computed.ts index 4190232..1a7aca1 100644 --- a/src/computed.ts +++ b/src/computed.ts @@ -1,6 +1,5 @@ import { Source } from './source' import { Signal, SignalOptions } from './signal' -import { deferred, Deferred } from './util/deferred' import { isStoppable } from './util/stoppable' import { AbortableOptions, checkAbort } from './util/abortable' @@ -8,10 +7,7 @@ import { AbortableOptions, checkAbort } from './util/abortable' export const SKIP = Symbol() export const STOP = Symbol() -export interface ExprOptions extends AbortableOptions { - initial?: boolean -} - +export type ExprOptions = AbortableOptions export type ExprResultSync = undefined | T | typeof SKIP | typeof STOP export type ExprResult = ExprResultSync | Promise> export type ExprFn = (track: Track, options: ExprOptions) => ExprResult @@ -33,15 +29,13 @@ export function wrap(src: Trackable): Source | Computed { export class Computed extends Signal { private _changedDependencies?: Source[] - private _initialRun?: Deferred - private _initialError?: unknown - private _activeRuns = 0 + private _ranOnce = false + private _lastRunError?: any + private _lastRun?: Promise private _activeDependencies = new WeakMap, boolean>() constructor(private expr: ExprFn, options?: ComputedOptions) { super({ initial: undefined, ...options }) - - this.run({ initial: true }) } private track(src: Source, tracked: Source[], signal?: AbortSignal): R { @@ -60,22 +54,36 @@ export class Computed extends Signal { } override async get(options?: AbortableOptions) { - if (this._changedDependencies) { - const promises: Promise[] = [] - for (const dep of this._changedDependencies) { - const promise = dep.get() - !dep.valid && promises.push(promise) - } + if (!this.valid) { + if ( + this._lastRun && + (!this._changedDependencies || this._changedDependencies.length === 0) + ) { + await this._lastRun + } else { + const promises: Promise[] = [] + const deps = (this._changedDependencies ?? []).slice() + this._changedDependencies = undefined - this._changedDependencies = undefined - promises.length > 0 && await Promise.all(promises) + for (const dep of deps) { + const promise = dep.get() + !dep.valid && promises.push(promise) + } - const p = this.run(options) - p instanceof Promise && await p - } else if (!this.valid) { - await (this._initialRun ??= deferred()).promise - } else if (this._initialError) { - throw this._initialError + checkAbort(options?.signal) + promises.length > 0 && await Promise.all(promises) + + try { + const p = this.run(options) + this._lastRunError = undefined + p instanceof Promise ? + await this.awaitRun(p) : + this._lastRun = undefined + } catch (error) { + this._lastRunError = error + throw error + } + } } return this._last @@ -83,56 +91,44 @@ export class Computed extends Signal { public async reevaluate(options?: AbortableOptions) { const p = this.run(options) - p instanceof Promise && await p + p instanceof Promise ? + await this.awaitRun(p) + : this._lastRun = undefined return this._last } override get valid() { - return this._activeRuns === 0 && ( + return this._ranOnce && !this._lastRun && !this._lastRunError && ( !this._changedDependencies || this._changedDependencies.length === 0 ) } - protected override invalidate() { this._activeRuns++ } - protected override validate() { this._activeRuns-- } + protected async awaitRun(promise: Promise) { + this._lastRun = promise + await promise + this._lastRun === promise && (this._lastRun = undefined) + } protected run(options?: ExprOptions): void | Promise { + checkAbort(options?.signal) + const tracked: Source[] = [] - try { - checkAbort(options?.signal) - const result = this.expr( - ((t: Trackable) => this.track(wrap(t), tracked, options?.signal)) as Track, - options ?? {} - ) - tracked.length === 0 && this.stop() - - if (result instanceof Promise) { - this.invalidate() - - return result.finally(() => { - this.validate() - }).then((value) => { - checkAbort(options?.signal) - this.emit(value) - this._initialRun?.resolve() - }).catch(err => { - if (options?.initial) { - this._initialRun?.reject(err) - } else { - throw err - } - }) - } else { + const result = this.expr( + ((t: Trackable) => this.track(wrap(t), tracked, options?.signal)) as Track, + options ?? {} + ) + this._ranOnce = true + tracked.length === 0 && this.stop() + + if (result instanceof Promise) { + return result.then((value) => { checkAbort(options?.signal) - this.emit(result) - } - } catch (err) { - if (options?.initial) { - this._initialError = err - } else { - throw err - } + this.emit(value) + }) + } else { + checkAbort(options?.signal) + this.emit(result) } } diff --git a/src/observe.ts b/src/observe.ts index 63712f1..f3db737 100644 --- a/src/observe.ts +++ b/src/observe.ts @@ -15,45 +15,17 @@ export interface ObservationOptions { export class Observation extends Cleanable implements Source { private abort?: AbortController + private schedule: Scheduler private subscriptions: Disposable private _last?: Deferred constructor(readonly src: Source, options?: ObservationOptions) { super() - const schedule = options?.schedule ?? asap() - this.subscriptions = src.listen( - () => schedule( - () => { - if (!this._last?.closed) { - this._last = deferred() - } + this.schedule = options?.schedule ?? asap() + this.subscriptions = src.listen(() => this.fetch()) - this.abort?.abort() - const ctrl = new AbortController() - this.abort = ctrl - - const promise = src.get({ signal: ctrl.signal }) - - if (!src.valid) { - promise.then(val => { - if (!ctrl.signal.aborted) { - this.abort === ctrl && (this.abort = undefined) - this._last!.resolve(val) - } - }).catch(reason => { - if (!ctrl.signal.aborted) { - this.abort === ctrl && (this.abort = undefined) - this._last!.reject(reason) - } - }) - } else { - this.abort === ctrl && (this.abort = undefined) - this._last.resolve(src.last) - } - } - ) - ) + !src.valid && this.fetch() } get(options?: AbortableOptions) { @@ -81,6 +53,38 @@ export class Observation extends Cleanable implements Source { get last() { return this.src.last } + + protected fetch() { + this.schedule( + () => { + if (!this._last || this._last.closed) { + this._last = deferred() + } + + this.abort?.abort() + const ctrl = new AbortController() + this.abort = ctrl + + const promise = this.src.get({ signal: ctrl.signal }).catch(reason => { + if (!ctrl.signal.aborted) { + this._last!.reject(reason) + } + }) + + if (!this.src.valid) { + promise.then(val => { + if (!ctrl.signal.aborted) { + this._last!.resolve(val!) + } + }) + } else { + if (!ctrl.signal.aborted) { + this._last.resolve(this.src.last) + } + } + } + ) + } } diff --git a/src/test/computed.test.ts b/src/test/computed.test.ts index 249e086..103dd1a 100644 --- a/src/test/computed.test.ts +++ b/src/test/computed.test.ts @@ -24,6 +24,7 @@ describe(computed, () => { test('computes initial value.', () => { const s = state(0) const c = computed($ => $(s) + 1) + c.get() expect(c.last).toBe(1) }) @@ -32,6 +33,8 @@ describe(computed, () => { const s = state(0) const c = computed($ => $(s) + 1) + c.get() + const cb = jest.fn() c.listen(async () => cb(await c.get())) @@ -59,6 +62,8 @@ describe(computed, () => { const cb = jest.fn(x => x + 1) const c = computed($ => ($(s), cb($(s)))) + c.get() + c.listen(async () => await c.get()) expect(cb).toBeCalledTimes(1) @@ -75,7 +80,9 @@ describe(computed, () => { const a = state(0) const b = state(0) const cb = jest.fn() - computed($ => cb($(a) + $(b))).listen(async (c) => await c.get()) + const c = computed($ => cb($(a) + $(b))) + c.get() + c.listen(async () => await c.get()) a.set(1) b.set(2) @@ -89,6 +96,7 @@ describe(computed, () => { const a = state(0) const b = state(0) const c = computed($ => 10 * $(a) + $(b)) + c.get() expect(c.last).toBe(0) @@ -127,14 +135,6 @@ describe(computed, () => { a.set(1) await expect(c.get()).rejects.toThrow('Test error') - expect(c.valid).toBe(true) - }) - - test('it passes down errors in sync initial expression.', async () => { - const c = computed(() => { throw new Error('Test error') }) - - expect(c.valid).toBe(true) - await expect(c.get()).rejects.toThrow('Test error') }) test('it passes down errors in async expressions.', async () => { @@ -153,30 +153,25 @@ describe(computed, () => { a.set(1) await expect(c.get()).rejects.toThrow('Test error') - expect(c.valid).toBe(true) - }) - - test('it passes down errors in async initial expression.', async () => { - const c = computed(async () => { - await sleep(1) - throw new Error('Test error') - }) - - await expect(c.get()).rejects.toThrow('Test error') - expect(c.valid).toBe(true) }) test('is stopped for static expressions.', () => { - expect(computed(() => 1).stopped).toBe(true) + const c1 = computed(() => 1) + c1.get() + expect(c1.stopped).toBe(true) + const s = state(1) s.stop() - expect(computed($ => $(s)).stopped).toBe(true) + const c2 = computed($ => $(s)) + c2.get() + expect(c2.stopped).toBe(true) }) test('is stopped when sources stop after reevaluation.', async () => { const a = state(0) const b = state(0) const c = computed($ => $(a) + $(b)) + c.get() expect(c.stopped).toBe(false) @@ -197,11 +192,13 @@ describe(computed, () => { const cb = jest.fn() const a = state(0) - computed(async $ => { + const c = computed(async $ => { const val = $(a) await sleep(5) cb(val) - }).listen(async c => await c.get()) + }) + c.get() + c.listen(async () => await c.get()) a.set(1) a.set(2) @@ -218,6 +215,7 @@ describe(computed, () => { const a = state(0) const b = state(0) const c = computed($ => $(a) + $(b)) + c.get() expect(c.valid).toBe(true) @@ -264,4 +262,39 @@ describe(computed, () => { expect(cb).toHaveBeenCalledTimes(3) }) + + test('it is invalid after an error in a sync expression', () => { + const a = state(0) + const c = computed($ => { + if ($(a) === 1) { + throw new Error('test') + } + + return $(a) + }) + + c.get() + expect(c.valid).toBe(true) + + a.set(1) + expect(c.valid).toBe(false) + }) + + test('it is invalid after an error in an async expression', async () => { + const a = state(0) + const c = computed(async $ => { + if ($(a) === 1) { + throw new Error('test') + } + + return $(a) + }) + + await c.get() + expect(c.valid).toBe(true) + + a.set(1) + await expect(c.get()).rejects.toThrow('test') + expect(c.valid).toBe(false) + }) }) diff --git a/src/test/observe.test.ts b/src/test/observe.test.ts index dc0cd08..b065e60 100644 --- a/src/test/observe.test.ts +++ b/src/test/observe.test.ts @@ -28,17 +28,16 @@ describe(observe, () => { }) test('debounces.', async () => { - const cbs = jest.fn() + const cbd = jest.fn() const signal = new class extends Signal { constructor() { super({ initial: 0 }) } tick() { this.emit(this.last + 1) } override async get(options?: AbortableOptions) { - options?.signal?.addEventListener('abort', cbs, { once: true }) - this.invalidate() await sleep(100) checkAbort(options?.signal) + cbd() this.validate() return this.last @@ -61,11 +60,12 @@ describe(observe, () => { await sleep(200) - expect(cb).toHaveBeenCalledTimes(2) - expect(cb).toHaveBeenNthCalledWith(1, 2) - expect(cb).toHaveBeenNthCalledWith(2, 4) + expect(cb).not.toHaveBeenCalledWith(1) + expect(cb).toHaveBeenCalledWith(2) + expect(cb).not.toHaveBeenCalledWith(3) + expect(cb).toHaveBeenCalledWith(4) - expect(cbs).toHaveBeenCalledTimes(2) + expect(cbd).toHaveBeenCalledTimes(2) }) test('eagerly evaluates on notif by default.', async () => { @@ -166,11 +166,6 @@ describe(observe, () => { return cb1(val) }) - // FIXME: this observation DOES NOT control the initial - // run of the expression. as a result, it is effectively never cancelled, - // just ignored by this observation. - // this behaviour should somehow be fixed: for example, - // by moving "initial run" logic completely to the observation. observe($ => cb2($(o))) a.set(1) @@ -188,11 +183,7 @@ describe(observe, () => { expect(cb1).toHaveBeenCalledWith(3) expect(cb1).toHaveBeenCalledWith(4) - // FIXME: we have an extra call here, with the last - // value being repeated. since this is actively - // an unnecessary execution of the expression, - // it should be optimized out. - // expect(cb2).toHaveBeenCalledTimes(3) + expect(cb2).toHaveBeenCalledTimes(3) expect(cb2).not.toHaveBeenCalledWith(0) expect(cb2).not.toHaveBeenCalledWith(1) expect(cb2).not.toHaveBeenCalledWith(2) @@ -202,4 +193,52 @@ describe(observe, () => { expect(ab).toHaveBeenCalledWith(1) expect(ab).toHaveBeenCalledWith(2) }) + + test('handles expression abort errors properly.', async () => { + const cb1 = jest.fn() + const cb2 = jest.fn() + + const a = state(0) + const o = observe(async ($, { signal }) => { + cb1() + const val = $(a) + await sleep(5) + + !signal?.aborted && cb2() + + return val + }) + + observe($ => $(o)) + + a.set(1) + await sleep(10) + a.set(2) + await sleep(10) + + expect(cb1).toHaveBeenCalledTimes(3) + expect(cb2).toHaveBeenCalledTimes(2) + }) + + test('passes down expression errors.', async () => { + const o1 = observe(async () => { throw new Error('test') }) + const o2 = observe(() => { throw new Error('test 2') }) + + await expect(o1.get()).rejects.toThrow('test') + await expect(o2.get()).rejects.toThrow('test 2') + + const a = state(0) + const o3 = observe($ => { + if ($(a) > 0) { + throw new Error('test 3') + } + + return $(a) + }) + + await expect(o3.get()).resolves.toBe(0) + a.set(1) + await expect(o3.get()).rejects.toThrow('test 3') + await sleep(10) + }) }) diff --git a/src/util/deferred.ts b/src/util/deferred.ts index 40df8b2..c22bb9f 100644 --- a/src/util/deferred.ts +++ b/src/util/deferred.ts @@ -10,28 +10,29 @@ export type Deferred = Disposable & { export function deferred(): Deferred { let resolve: (value: T) => void let reject: (reason: any) => void - let closed = false + const pack: any = { + closed: false, + ...disposable(() => reject!(new Error('Deferred was disposed'))) + } const promise = new Promise((res, rej) => { resolve = (value: T) => { - if (!closed) { - closed = true + if (!pack.closed) { + pack.closed = true res(value) } } reject = (reason: any) => { - if (!closed) { - closed = true + if (!pack.closed) { + pack.closed = true rej(reason) } } }) - return { - closed, - promise, - resolve: resolve!, - reject: reject!, - ...disposable(() => reject!(new Error('Deferred was disposed'))) - } + pack.resolve = resolve! + pack.reject = reject! + pack.promise = promise + + return pack } diff --git a/src/util/schedulable.ts b/src/util/schedulable.ts index 68a4c81..8c8a3e1 100644 --- a/src/util/schedulable.ts +++ b/src/util/schedulable.ts @@ -1,18 +1,19 @@ import { AbortableOptions } from './abortable' +import { deferred } from './deferred' export type ScheduleOptions = AbortableOptions export type Schedulable = () => void -export type Scheduler = (fn: Schedulable, options?: ScheduleOptions) => void +export type Scheduler = (fn: Schedulable, options?: ScheduleOptions) => void | Promise export function asap(): Scheduler { - return (fn, options?) => !options?.signal?.aborted && fn() + return (fn, options?) => { !options?.signal?.aborted && fn() } } export function async(): Scheduler { - return (fn, options?) => Promise.resolve().then(() => !options?.signal?.aborted && fn()) + return (fn, options?) => Promise.resolve().then(() => { !options?.signal?.aborted && fn() }) } @@ -24,7 +25,7 @@ export function debounce(schedule: Scheduler): Scheduler { ctrl = new AbortController() options?.signal?.addEventListener('abort', () => ctrl.abort(), { once: true }) - schedule(() => { + return schedule(() => { if (!ctrl.signal.aborted && !options?.signal?.aborted) { fn() } @@ -43,7 +44,7 @@ export function throttle(schedule: Scheduler): Scheduler { running = true - schedule(() => { + return schedule(() => { running = false if (!options?.signal?.aborted) { @@ -56,15 +57,35 @@ export function throttle(schedule: Scheduler): Scheduler { export function delay(ms: number): Scheduler { return (fn, options?) => { - const timeout = setTimeout(() => !options?.signal?.aborted && fn(), ms) + const def = deferred() + const timeout = setTimeout(() => { + try { + !options?.signal?.aborted && fn() + def.resolve() + } catch (err) { + def.reject(err) + } + }, ms) options?.signal?.addEventListener('abort', () => clearTimeout(timeout), { once: true }) + + return def.promise } } export function nextFrame(): Scheduler { return (fn, options?) => { - const frame = requestAnimationFrame(() => !options?.signal?.aborted && fn()) + const def = deferred() + const frame = requestAnimationFrame(() => { + try { + !options?.signal?.aborted && fn() + def.resolve() + } catch (err) { + def.reject(err) + } + }) options?.signal?.addEventListener('abort', () => cancelAnimationFrame(frame), { once: true }) + + return def.promise } } diff --git a/src/util/stoppable.ts b/src/util/stoppable.ts index 06a398a..df7c872 100644 --- a/src/util/stoppable.ts +++ b/src/util/stoppable.ts @@ -24,7 +24,7 @@ export abstract class Cleanable implements Stoppable { stop() { this.clean() this._stopped = true - this._stops && this._stops.resolve() + this._stops?.resolve() } get stops() { diff --git a/src/util/test/deferred.test.ts b/src/util/test/deferred.test.ts new file mode 100644 index 0000000..c721da0 --- /dev/null +++ b/src/util/test/deferred.test.ts @@ -0,0 +1,10 @@ +import { deferred } from '../deferred' + + +describe(deferred, () => { + test('rejects', async () => { + const d = deferred() + d.reject(new Error('test')) + await expect(d.promise).rejects.toThrow('test') + }) +}) diff --git a/src/util/test/schedulable.test.ts b/src/util/test/schedulable.test.ts index ad2d8d6..023fa2b 100644 --- a/src/util/test/schedulable.test.ts +++ b/src/util/test/schedulable.test.ts @@ -21,6 +21,12 @@ describe(asap, () => { expect(a).toHaveBeenCalledTimes(1) expect(b).toHaveBeenCalledTimes(1) }) + + test('catches errors.', async () => { + const f = () => { throw new Error('test') } + const schedule = async() + await expect(async () => await schedule(f)).rejects.toThrow('test') + }) }) @@ -41,6 +47,12 @@ describe(async, () => { expect(a).toHaveBeenCalledTimes(1) expect(b).toHaveBeenCalledTimes(1) }) + + test('catches errors.', async () => { + const f = () => { throw new Error('test') } + const schedule = async() + await expect(async () => await schedule(f)).rejects.toThrow('test') + }) }) @@ -65,6 +77,12 @@ describe(debounce, () => { expect(b).not.toHaveBeenCalled() expect(c).toHaveBeenCalledTimes(1) }) + + test('catches errors.', async () => { + const f = () => { throw new Error('test') } + const schedule = debounce(async()) + await expect(async () => await schedule(f)).rejects.toThrow('test') + }) }) @@ -89,6 +107,12 @@ describe(throttle, () => { expect(b).not.toHaveBeenCalled() expect(c).not.toHaveBeenCalled() }) + + test('catches errors.', async () => { + const f = () => { throw new Error('test') } + const schedule = throttle(async()) + await expect(async () => await schedule(f)).rejects.toThrow('test') + }) }) @@ -109,6 +133,12 @@ describe(delay, () => { expect(a).toHaveBeenCalledTimes(1) expect(b).toHaveBeenCalledTimes(1) }) + + test('catches errors.', async () => { + const f = () => { throw new Error('test') } + const schedule = delay(10) + await expect(async () => await schedule(f)).rejects.toThrow('test') + }) }) @@ -129,4 +159,10 @@ describe(nextFrame, () => { expect(a).toHaveBeenCalledTimes(1) expect(b).toHaveBeenCalledTimes(1) }) + + test('catches errors.', async () => { + const f = () => { throw new Error('test') } + const schedule = nextFrame() + await expect(async () => await schedule(f)).rejects.toThrow('test') + }) })