Skip to content
This repository has been archived by the owner on Sep 14, 2023. It is now read-only.

fix: flatten cleanup #1100

Merged
merged 2 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions fluent/ConnectionRune.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Connection, ConnectionError, RpcSubscriptionMessage, ServerError } from
import { is, MetaRune, Run, Rune, RunicArgs, Runner, RunStream } from "../rune/mod.ts"

class RunConnection extends Run<Connection, never> {
controller = new AbortController()
constructor(
ctx: Runner,
readonly initConnection: (signal: AbortSignal) => Connection | Promise<Connection>,
Expand All @@ -12,7 +13,11 @@ class RunConnection extends Run<Connection, never> {

connection?: Connection
async _evaluate(): Promise<Connection> {
return this.connection ??= await this.initConnection(this.signal)
return this.connection ??= await this.initConnection(this.controller.signal)
}

override cleanup(): void {
this.controller.abort()
}
}

Expand Down Expand Up @@ -61,6 +66,7 @@ export class ConnectionRune<U> extends Rune<Connection, U> {
}

class RunRpcSubscription extends RunStream<RpcSubscriptionMessage> {
controller = new AbortController()
constructor(
ctx: Runner,
connection: Connection,
Expand All @@ -74,7 +80,11 @@ class RunRpcSubscription extends RunStream<RpcSubscriptionMessage> {
unsubscribeMethod,
params,
(value) => this.push(value),
this.signal,
this.controller.signal,
)
}

override cleanup(): void {
this.controller.abort()
}
}
41 changes: 27 additions & 14 deletions rune/MetaRune.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ export class OrthoRune<T, U1, U2> extends Rune<Run<T, U1>, U2> {
class RunFlat<T, U1, U2> extends Run<T, U1 | U2> {
child
constructor(
readonly runner: Runner,
runner: Runner,
child: Rune<Rune<T, U1>, U2>,
) {
super(runner)
this.child = runner.prime(child, this.signal)
this.child = this.use(child)
}

lastChildReceipt = new Receipt()
Expand All @@ -62,10 +62,12 @@ class RunFlat<T, U1, U2> extends Run<T, U1 | U2> {
const rune = await this.child.evaluate(time, receipt)
if (!receipt.ready) return null!
if (receipt.novel) {
// TODO: prime before aborting?
this.innerController.abort()
this.innerController = new AbortController()
this.currentInner = this.runner.prime(rune, this.innerController.signal)
// TODO: prime before dereferencing?
if (this.currentInner) {
this.currentInner.dereference()
}
this.currentInner = this.runner.prime(rune)
this.currentInner.reference()
}
const _receipt = new Receipt()
try {
Expand All @@ -84,7 +86,9 @@ class RunFlat<T, U1, U2> extends Run<T, U1 | U2> {
}

override cleanup(): void {
this.innerController.abort()
if (this.currentInner) {
this.dependencies.push(this.currentInner)
}
super.cleanup()
}
}
Expand Down Expand Up @@ -112,13 +116,20 @@ class OrthoRunner extends Runner {
this.parent.memo.set(rune._prime, run)
return run
}

override onCleanup(run: Run<unknown, unknown>): void {
super.onCleanup(run)
if (run._sources.length) {
this.parent.onCleanup(run)
}
}
}

class RunFlatSingular<T, U1, U2> extends Run<T, U1 | U2> {
child
constructor(readonly runner: Runner, child: Rune<Run<T, U1>, U2>) {
constructor(runner: Runner, child: Rune<Run<T, U1>, U2>) {
super(runner)
this.child = runner.prime(child, this.signal)
this.child = this.use(child)
}

async _evaluate(time: number, receipt: Receipt) {
Expand All @@ -129,22 +140,24 @@ class RunFlatSingular<T, U1, U2> extends Run<T, U1 | U2> {

class RunAsOrtho<T, U1, U2> extends Run<Run<T, U1>, U2> {
child
constructor(readonly runner: Runner, child: Rune<Rune<T, U1>, U2>) {
constructor(runner: Runner, child: Rune<Rune<T, U1>, U2>) {
super(runner)
this.child = runner.prime(child, this.signal)
this.child = this.use(child)
}

async _evaluate(time: number, receipt: Receipt) {
const orthoRunner = new OrthoRunner(this.runner, time)
const rune = await this.child.evaluate(time, receipt)
return orthoRunner.prime(rune, undefined)
return orthoRunner.prime(rune)
}
}

class RunWrapOrtho<T, U> extends Run<T, U> {
constructor(readonly runner: OrthoRunner, readonly child: Run<T, U>) {
declare runner: OrthoRunner

constructor(runner: OrthoRunner, readonly child: Run<T, U>) {
super(runner)
child.reference(this.signal)
this.useRun(child)
}

_evaluate(): Promise<T> {
Expand Down
89 changes: 58 additions & 31 deletions rune/Rune.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@ export abstract class Runner {
protected abstract _prime<T, U>(rune: Rune<T, U>): Run<T, U>

memo = new Map<(runner: Runner) => Run<any, any>, Run<any, any>>()
prime<T, U>(rune: _.Rune<T, U>, signal: AbortSignal | undefined): Run<T, U> {
prime<T, U>(rune: Rune<T, U>): Run<T, U> {
const run = getOrInit(this.memo, rune._prime, () => {
const old = this._currentTrace
this._currentTrace = rune._trace
const run = this._prime(rune)
this._currentTrace = old
run.signal.addEventListener("abort", () => {
this.memo.delete(rune._prime)
})
run._sources.push(rune._prime)
return run
})
if (signal) run.reference(signal)
return run
}

getPrimed<T, U>(rune: _.Rune<T, U>): Run<T, U> | undefined {
getPrimed<T, U>(rune: Rune<T, U>): Run<T, U> | undefined {
return this.memo.get(rune._prime)
}

onCleanup(run: Run<unknown, unknown>) {
for (const source of run._sources) {
this.memo.delete(source)
}
}
}

class RootRunner extends Runner {
Expand Down Expand Up @@ -84,8 +87,8 @@ export class Rune<out T, out U = never> {
}

async *iter(runner: Runner = globalRunner) {
const abortController = new AbortController()
const primed = runner.prime(this, abortController.signal)
const primed = runner.prime(this)
primed.reference()
let time = runner.timeline.current
try {
while (time !== Infinity) {
Expand All @@ -98,7 +101,7 @@ export class Rune<out T, out U = never> {
time = receipt.nextTime
}
} finally {
abortController.abort()
primed.dereference()
}
}

Expand Down Expand Up @@ -172,7 +175,7 @@ export class Rune<out T, out U = never> {
)
}

static asyncIter<T>(fn: (signal: AbortSignal) => AsyncIterable<T>): Rune.ValueRune<T, never> {
static asyncIter<T>(fn: () => AsyncIterable<T>): Rune.ValueRune<T, never> {
return Rune.ValueRune.new(RunAsyncIter, fn)
}

Expand Down Expand Up @@ -203,8 +206,8 @@ export class Rune<out T, out U = never> {

static pin<T, U>(rune: Rune<T, U>, pinned: Rune<unknown, unknown>): Rune<T, U> {
return new Rune((runner) => {
const run = runner.prime(rune, undefined)
runner.prime(pinned, run.signal)
const run = runner.prime(rune)
run.use(pinned)
return run
})
}
Expand All @@ -216,27 +219,56 @@ export abstract class Run<T, U> {
order: number
timeline

abortController = new AbortController()
signal = this.abortController.signal
constructor(runner: Runner) {
this.signal.addEventListener("abort", () => this.cleanup())
constructor(readonly runner: Runner) {
this.trace = runner._currentTrace
?? new Trace(`execution of the ${new.target.name} instantiated`)
this.order = runner.order
this.timeline = runner.timeline
}

dependencies: Run<unknown, unknown>[] = []

use<T, U>(rune: Rune<T, U>): Run<T, U> {
const run = this.runner.prime(rune)
this.useRun(run)
return run
}

useRun(run: Run<unknown, unknown>) {
run.reference()
this.dependencies.push(run)
}

referenceCount = 0
reference(signal: AbortSignal) {
alive = true

reference() {
if (!this.alive) throw new Error("cannot reference a dead rune")
this.referenceCount++
signal.addEventListener("abort", () => {
if (!--this.referenceCount) {
this.abortController.abort()
}

_sources: Array<(runner: Runner) => Run<any, any>> = []
dereference(cleanupBatches?: Run<unknown, unknown>[][]) {
if (!--this.referenceCount) {
this.alive = false
this.cleanup()
this.runner.onCleanup(this)
if (cleanupBatches) {
cleanupBatches.push(this.dependencies)
} else {
const cleanupBatches = [this.dependencies]
while (cleanupBatches.length) {
const batch = cleanupBatches.pop()!
for (const run of batch) {
run.dereference(cleanupBatches)
}
}
}
})
}
}

cleanup() {}

_currentTime = -1
_currentPromise: Promise<T> = null!
_currentReceipt = new Receipt()
Expand All @@ -258,11 +290,6 @@ export abstract class Run<T, U> {
}
}
abstract _evaluate(time: number, receipt: Receipt): Promise<T>

alive = true
cleanup() {
this.alive = false
}
}

class RunConstant<T> extends Run<T, never> {
Expand All @@ -279,7 +306,7 @@ class RunLs<T, U> extends Run<T[], U> {
children
constructor(runner: Runner, children: Rune<T, U>[]) {
super(runner)
this.children = children.map((child) => runner.prime(child, this.signal))
this.children = children.map((child) => this.use(child))
}

_evaluate(time: number, receipt: Receipt) {
Expand Down Expand Up @@ -335,10 +362,10 @@ export abstract class RunStream<T> extends Run<T, never> {
}

class RunAsyncIter<T> extends RunStream<T> {
constructor(runner: Runner, fn: (signal: AbortSignal) => AsyncIterable<T>) {
constructor(runner: Runner, fn: () => AsyncIterable<T>) {
super(runner)
;(async () => {
for await (const value of fn(this.signal)) {
for await (const value of fn()) {
this.push(value)
}
this.finish()
Expand All @@ -356,7 +383,7 @@ class RunBubbleUnhandled<T, U> extends Run<T, never> {
child
constructor(runner: Runner, child: Rune<T, U>, readonly symbol: symbol) {
super(runner)
this.child = runner.prime(child, this.signal)
this.child = this.use(child)
}

async _evaluate(time: number, receipt: Receipt) {
Expand All @@ -375,7 +402,7 @@ class RunCaptureUnhandled<T, U1, U2> extends Run<T, U1 | U2> {
child
constructor(runner: Runner, child: Rune<T, U1>, readonly symbol: symbol) {
super(runner)
this.child = runner.prime(child, this.signal)
this.child = this.use(child)
}

async _evaluate(time: number, receipt: Receipt) {
Expand Down
Loading