Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
loreanvictor committed Dec 1, 2023
1 parent 024a3a0 commit b32ce4d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
24 changes: 22 additions & 2 deletions src/observe.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Listener, SourceLike, isSourceLike, Observable, ExprFn, SKIP, STOP, ExprResultSync } from './types'
import { Listener, SourceLike, isSourceLike, Observable, ExprFn, SKIP, STOP, ExprResultSync, isPure } from './types'
import { Source } from './source'


Expand Down Expand Up @@ -39,6 +39,7 @@ function normalize<T>(fn: Observable<T>): SourceLike<T> {
* ```
*/
export class Observation<T> extends Source<T> {
ctrl: AbortController | undefined
/**
* A mapping of all tracked sources. For receiving the values of tracked sources,
* a handler is registered with them. This handler is stored in this map for cleanup.
Expand Down Expand Up @@ -111,6 +112,7 @@ export class Observation<T> extends Source<T> {
if (this.syncToken > 0) {
// check if there is an unfinished run that needs to be aborted
if (this.lastSyncToken !== this.syncToken) {
this.ctrl?.abort()
this.abort && this.abort()
}
// if this is a higher-order observation, the last emitted source
Expand All @@ -130,7 +132,8 @@ export class Observation<T> extends Source<T> {
this.cleanCandidate = src
const syncToken = this.nextToken()

const _res = this.fn(obs => obs ? this.track(normalize(obs), syncToken) : undefined)
// const _res = this.fn(obs => obs ? this.track(normalize(obs), syncToken) : undefined)
const _res = this.execute(syncToken)

if (_res instanceof Promise) {
_res.then(res => {
Expand All @@ -145,6 +148,23 @@ export class Observation<T> extends Source<T> {
}
}

/**
* Executes the expression with given sync token. If the expression is abortable, will
* create a new AbortController and pass its signal to the expression.
* @param syncToken the token to check if the execution should be aborted
* @returns the result of the expression (sync or async)
*/
protected execute(syncToken: number) {
if (isPure(this.fn)) {
return this.fn(obs => obs ? this.track(normalize(obs), syncToken) : undefined)
} else {
this.ctrl = new AbortController()
const signal = this.ctrl.signal

return this.fn(obs => obs ? this.track(normalize(obs), syncToken) : undefined, signal)
}
}

/**
* Emits the result of the expression function if the observation is clean. The observation
* is "dirty" if the last initiated run was initiated by a source that is no longer tracked. This happens
Expand Down
9 changes: 8 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,12 @@ export const STOP = Symbol()
export type ExprResultSync<T> = T | typeof SKIP | typeof STOP
export type ExprResult<T> = ExprResultSync<T> | Promise<ExprResultSync<T>>
export type Track = <T>(obs: Observable<T>) => T | undefined
export type ExprFn<T> = (track: Track) => ExprResult<T>
export type PureExprFn<T> = (track: Track) => ExprResult<T>
export type AbortableExprFn<T> = (track: Track, signal: AbortSignal) => ExprResult<T>
export type ExprFn<T> = PureExprFn<T> | AbortableExprFn<T>

export function isPure<T>(fn: ExprFn<T>): fn is PureExprFn<T> {
return fn.length === 1
}

export type Observable<T> = SourceLike<T> | ExprFn<T>

0 comments on commit b32ce4d

Please sign in to comment.