diff --git a/src/observe.ts b/src/observe.ts index 91ffb48..1bc19c5 100644 --- a/src/observe.ts +++ b/src/observe.ts @@ -1,4 +1,4 @@ -import { Listener, SourceLike, isSourceLike, Observable, ExprFn, SKIP, STOP, ExprResultSync } from './types' +import { Listener, SourceLike, isSourceLike, Observable, ExprFn, SKIP, STOP, ExprResultSync, isPure } from './types' import { Source } from './source' @@ -39,6 +39,7 @@ function normalize(fn: Observable): SourceLike { * ``` */ export class Observation extends Source { + ctrl: AbortController | undefined /** * A mapping of all tracked sources. For receiving the values of tracked sources, * a handler is registered with them. This handler is stored in this map for cleanup. @@ -111,6 +112,7 @@ export class Observation extends Source { if (this.syncToken > 0) { // check if there is an unfinished run that needs to be aborted if (this.lastSyncToken !== this.syncToken) { + this.ctrl?.abort() this.abort && this.abort() } // if this is a higher-order observation, the last emitted source @@ -130,7 +132,8 @@ export class Observation extends Source { this.cleanCandidate = src const syncToken = this.nextToken() - const _res = this.fn(obs => obs ? this.track(normalize(obs), syncToken) : undefined) + // const _res = this.fn(obs => obs ? this.track(normalize(obs), syncToken) : undefined) + const _res = this.execute(syncToken) if (_res instanceof Promise) { _res.then(res => { @@ -145,6 +148,23 @@ export class Observation extends Source { } } + /** + * Executes the expression with given sync token. If the expression is abortable, will + * create a new AbortController and pass its signal to the expression. + * @param syncToken the token to check if the execution should be aborted + * @returns the result of the expression (sync or async) + */ + protected execute(syncToken: number) { + if (isPure(this.fn)) { + return this.fn(obs => obs ? this.track(normalize(obs), syncToken) : undefined) + } else { + this.ctrl = new AbortController() + const signal = this.ctrl.signal + + return this.fn(obs => obs ? this.track(normalize(obs), syncToken) : undefined, signal) + } + } + /** * Emits the result of the expression function if the observation is clean. The observation * is "dirty" if the last initiated run was initiated by a source that is no longer tracked. This happens diff --git a/src/types.ts b/src/types.ts index 1fa840b..b638b4c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -27,5 +27,12 @@ export const STOP = Symbol() export type ExprResultSync = T | typeof SKIP | typeof STOP export type ExprResult = ExprResultSync | Promise> export type Track = (obs: Observable) => T | undefined -export type ExprFn = (track: Track) => ExprResult +export type PureExprFn = (track: Track) => ExprResult +export type AbortableExprFn = (track: Track, signal: AbortSignal) => ExprResult +export type ExprFn = PureExprFn | AbortableExprFn + +export function isPure(fn: ExprFn): fn is PureExprFn { + return fn.length === 1 +} + export type Observable = SourceLike | ExprFn