Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
replace asyncInterrupt with AbortSignal variant
Browse files Browse the repository at this point in the history
and rename asyncInterruptEither to asyncEither
  • Loading branch information
tim-smart committed Jul 26, 2023
1 parent 5350d4d commit d9f449b
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 62 deletions.
5 changes: 5 additions & 0 deletions .changeset/good-boxes-suffer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/io": minor
---

rename asyncInterruptEither to asyncEither
5 changes: 5 additions & 0 deletions .changeset/large-planets-worry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/io": minor
---

replace Effect.asyncInterrupt with AbortSignal variant
5 changes: 5 additions & 0 deletions .changeset/selfish-bikes-own.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/io": patch
---

add optional interruption return to Effect.async
48 changes: 26 additions & 22 deletions docs/modules/Effect.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ Added in v1.0.0
- [constructors](#constructors)
- [async](#async)
- [asyncEffect](#asynceffect)
- [asyncEither](#asynceither)
- [asyncInterrupt](#asyncinterrupt)
- [asyncInterruptEither](#asyncinterrupteither)
- [asyncOption](#asyncoption)
- [die](#die)
- [dieMessage](#diemessage)
Expand Down Expand Up @@ -1145,20 +1145,20 @@ Added in v1.0.0
## async
Imports an asynchronous side-effect into a pure `Effect` value. See
`asyncMaybe` for the more expressive variant of this function that can
return a value synchronously.
Imports an asynchronous side-effect into a pure `Effect` value.
The callback function `Effect<R, E, A> => void` must be called at most once.
If an Effect is returned by the registration function, it will be executed
if the fiber executing the effect is interrupted.
The `FiberId` of the fiber that may complete the async callback may be
provided to allow for better diagnostics.
**Signature**
```ts
export declare const async: <R, E, A>(
register: (callback: (_: Effect<R, E, A>) => void) => void,
register: (callback: (_: Effect<R, E, A>) => void) => void | Effect<R, never, void>,
blockingOn?: FiberId.FiberId
) => Effect<R, E, A>
```
Expand All @@ -1182,45 +1182,49 @@ export declare const asyncEffect: <R, E, A, R2, E2, X>(
Added in v1.0.0
## asyncInterrupt
## asyncEither
Imports an asynchronous side-effect into an effect. It has the option of
returning the value synchronously, which is useful in cases where it cannot
be determined if the effect is synchronous or asynchronous until the register
is actually executed. It also has the option of returning a canceler,
which will be used by the runtime to cancel the asynchronous effect if the fiber
executing the effect is interrupted.
Imports an asynchronous side-effect into an effect allowing control of interruption.
If the register function returns a value synchronously, then the callback
function `Effect<R, E, A> => void` must not be called. Otherwise the callback
function must be called at most once.
The `FiberId` of the fiber that may complete the async callback may be
provided to allow for better diagnostics.
**Signature**
```ts
export declare const asyncInterrupt: <R, E, A>(
register: (callback: (effect: Effect<R, E, A>) => void) => Effect<R, never, void>,
export declare const asyncEither: <R, E, A>(
register: (callback: (effect: Effect<R, E, A>) => void) => Either.Either<Effect<R, never, void>, Effect<R, E, A>>,
blockingOn?: FiberId.FiberId
) => Effect<R, E, A>
```
Added in v1.0.0
## asyncInterruptEither
## asyncInterrupt
Imports an asynchronous side-effect into an effect. It has the option of
returning the value synchronously, which is useful in cases where it cannot
be determined if the effect is synchronous or asynchronous until the register
is actually executed. It also has the option of returning a canceler,
which will be used by the runtime to cancel the asynchronous effect if the fiber
executing the effect is interrupted.
Imports an asynchronous side-effect into a pure `Effect` value.
The callback function `Effect<R, E, A> => void` must be called at most once.
If the register function returns a value synchronously, then the callback
function `Effect<R, E, A> => void` must not be called. Otherwise the callback
function must be called at most once.
The registration function receives an AbortSignal that can be used to handle
interruption.
The `FiberId` of the fiber that may complete the async callback may be
provided to allow for better diagnostics.
**Signature**
```ts
export declare const asyncInterruptEither: <R, E, A>(
register: (callback: (effect: Effect<R, E, A>) => void) => Either.Either<Effect<R, never, void>, Effect<R, E, A>>,
export declare const asyncInterrupt: <R, E, A>(
register: (callback: (_: Effect<R, E, A>) => void, signal: AbortSignal) => void,
blockingOn?: FiberId.FiberId
) => Effect<R, E, A>
```
Expand Down
46 changes: 25 additions & 21 deletions src/Effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -884,20 +884,20 @@ export const validateFirst: {
// -------------------------------------------------------------------------------------

/**
* Imports an asynchronous side-effect into a pure `Effect` value. See
* `asyncMaybe` for the more expressive variant of this function that can
* return a value synchronously.
*
* Imports an asynchronous side-effect into a pure `Effect` value.
* The callback function `Effect<R, E, A> => void` must be called at most once.
*
* If an Effect is returned by the registration function, it will be executed
* if the fiber executing the effect is interrupted.
*
* The `FiberId` of the fiber that may complete the async callback may be
* provided to allow for better diagnostics.
*
* @since 1.0.0
* @category constructors
*/
export const async: <R, E, A>(
register: (callback: (_: Effect<R, E, A>) => void) => void,
register: (callback: (_: Effect<R, E, A>) => void) => void | Effect<R, never, void>,
blockingOn?: FiberId.FiberId
) => Effect<R, E, A> = core.async

Expand All @@ -914,6 +914,24 @@ export const asyncEffect: <R, E, A, R2, E2, X>(
register: (callback: (_: Effect<R, E, A>) => void) => Effect<R2, E2, X>
) => Effect<R | R2, E | E2, A> = _runtime.asyncEffect

/**
* Imports an asynchronous side-effect into a pure `Effect` value.
* The callback function `Effect<R, E, A> => void` must be called at most once.
*
* The registration function receives an AbortSignal that can be used to handle
* interruption.
*
* The `FiberId` of the fiber that may complete the async callback may be
* provided to allow for better diagnostics.
*
* @since 1.0.0
* @category constructors
*/
export const asyncInterrupt: <R, E, A>(
register: (callback: (_: Effect<R, E, A>) => void, signal: AbortSignal) => void,
blockingOn?: FiberId.FiberId
) => Effect<R, E, A> = core.asyncInterrupt

/**
* Imports an asynchronous effect into a pure `Effect` value, possibly returning
* the value synchronously.
Expand Down Expand Up @@ -951,24 +969,10 @@ export const asyncOption: <R, E, A>(
* @since 1.0.0
* @category constructors
*/
export const asyncInterruptEither: <R, E, A>(
export const asyncEither: <R, E, A>(
register: (callback: (effect: Effect<R, E, A>) => void) => Either.Either<Effect<R, never, void>, Effect<R, E, A>>,
blockingOn?: FiberId.FiberId
) => Effect<R, E, A> = core.asyncInterruptEither

/**
* Imports an asynchronous side-effect into an effect allowing control of interruption.
*
* The `FiberId` of the fiber that may complete the async callback may be
* provided to allow for better diagnostics.
*
* @since 1.0.0
* @category constructors
*/
export const asyncInterrupt: <R, E, A>(
register: (callback: (effect: Effect<R, E, A>) => void) => Effect<R, never, void>,
blockingOn?: FiberId.FiberId
) => Effect<R, E, A> = core.asyncInterrupt
) => Effect<R, E, A> = core.asyncEither

/**
* @since 1.0.0
Expand Down
2 changes: 1 addition & 1 deletion src/internal/clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ClockImpl implements Clock.Clock {
}

sleep(duration: Duration.Duration): Effect.Effect<never, never, void> {
return core.asyncInterruptEither<never, never, void>((cb) => {
return core.asyncEither<never, never, void>((cb) => {
const canceler = globalClockScheduler.unsafeSchedule(() => cb(core.unit), duration)
return Either.left(core.asUnit(core.sync(canceler)))
})
Expand Down
26 changes: 16 additions & 10 deletions src/internal/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,17 +397,16 @@ export const async = <R, E, A>(
): Effect.Effect<R, E, A> =>
suspend(() => {
let cancelerRef: Effect.Effect<R, never, void> | void = undefined

const effect = new EffectPrimitive(OpCodes.OP_ASYNC) as any
effect.i0 = (resume: (_: Effect.Effect<R, E, A>) => void) => {
cancelerRef = register(resume)
}, effect.i1 = blockingOn

return effect.pipe(onInterrupt(() => cancelerRef ? cancelerRef : unit))
}
effect.i1 = blockingOn
return onInterrupt(effect, () => cancelerRef ?? unit)
})

/* @internal */
export const asyncInterruptEither = <R, E, A>(
export const asyncEither = <R, E, A>(
register: (
callback: (effect: Effect.Effect<R, E, A>) => void
) => Either.Either<Effect.Effect<R, never, void>, Effect.Effect<R, E, A>>,
Expand All @@ -422,11 +421,18 @@ export const asyncInterruptEither = <R, E, A>(
}
}, blockingOn)

/* @internal */
/** @internal */
export const asyncInterrupt = <R, E, A>(
register: (callback: (effect: Effect.Effect<R, E, A>) => void) => Effect.Effect<R, never, void>,
register: (callback: (_: Effect.Effect<R, E, A>) => void, signal: AbortSignal) => void,
blockingOn: FiberId.FiberId = FiberId.none
): Effect.Effect<R, E, A> => async(register, blockingOn)
): Effect.Effect<R, E, A> =>
async<R, E, A>((resume) => {
const controller = new AbortController()
register(resume, controller.signal)
return sync(() => {
controller.abort()
})
}, blockingOn)

/* @internal */
export const catchAllCause = dual<
Expand Down Expand Up @@ -1228,7 +1234,7 @@ export const zipWith = dual<
>(3, (self, that, f) => flatMap(self, (a) => map(that, (b) => f(a, b))))

/* @internal */
export const never: Effect.Effect<never, never, never> = asyncInterruptEither<never, never, never>(() => {
export const never: Effect.Effect<never, never, never> = asyncEither<never, never, never>(() => {
const interval = setInterval(() => {
//
}, 2 ** 31 - 1)
Expand Down Expand Up @@ -2439,7 +2445,7 @@ export const deferredMakeAs = <E, A>(fiberId: FiberId.FiberId): Effect.Effect<ne

/* @internal */
export const deferredAwait = <E, A>(self: Deferred.Deferred<E, A>): Effect.Effect<never, E, A> =>
asyncInterruptEither<never, E, A>((k) => {
asyncEither<never, E, A>((k) => {
const state = MutableRef.get(self.state)
switch (state._tag) {
case DeferredOpCodes.OP_STATE_DONE: {
Expand Down
6 changes: 3 additions & 3 deletions src/internal/effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export const asyncOption = <R, E, A>(
register: (callback: (_: Effect.Effect<R, E, A>) => void) => Option.Option<Effect.Effect<R, E, A>>,
blockingOn: FiberId.FiberId = FiberId.none
): Effect.Effect<R, E, A> =>
core.asyncInterruptEither<R, E, A>(
core.asyncEither<R, E, A>(
(cb) => {
const option = register(cb)
switch (option._tag) {
Expand Down Expand Up @@ -1163,7 +1163,7 @@ export const promise = <A>(evaluate: LazyArg<Promise<A>>): Effect.Effect<never,

/* @internal */
export const promiseInterrupt = <A>(evaluate: (signal: AbortSignal) => Promise<A>): Effect.Effect<never, never, A> =>
core.asyncInterruptEither<never, never, A>((resolve) => {
core.asyncEither<never, never, A>((resolve) => {
const controller = new AbortController()
evaluate(controller.signal)
.then((a) => resolve(core.exitSucceed(a)))
Expand Down Expand Up @@ -1621,7 +1621,7 @@ export const tryPromiseInterrupt: {
return core.flatMap(
hasCatch ? try_({ try: evaluate, catch: arg.catch }) : try_(evaluate),
([controller, promise]) =>
core.asyncInterruptEither<never, E, A>((resolve) => {
core.asyncEither<never, E, A>((resolve) => {
promise
.then((a) => resolve(core.exitSucceed(a)))
.catch((e) => resolve(core.exitFail(hasCatch ? arg.catch(e) : e)))
Expand Down
2 changes: 1 addition & 1 deletion src/internal/effect/circular.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Semaphore {
}

readonly take = (n: number): Effect.Effect<never, never, number> =>
core.asyncInterruptEither<never, never, number>((resume) => {
core.asyncEither<never, never, number>((resume) => {
if (this.free < n) {
const observer = () => {
if (this.free >= n) {
Expand Down
4 changes: 2 additions & 2 deletions test/Effect/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ describe.concurrent("Effect", () => {
const unexpectedPlace = yield* $(Ref.make(Chunk.empty<number>()))
const runtime = yield* $(Effect.runtime<never>())
const fiber = yield* $(
Effect.async<never, never, void>((cb) =>
Effect.async<never, never, void>((cb) => {
Runtime.runCallback(runtime)(pipe(
Deferred.await(step),
Effect.zipRight(Effect.sync(() => cb(Ref.update(unexpectedPlace, Chunk.prepend(1)))))
))
),
}),
Effect.ensuring(Effect.async<never, never, void>(() => {
// The callback is never called so this never completes
Runtime.runCallback(runtime)(Deferred.succeed(step, undefined))
Expand Down
17 changes: 15 additions & 2 deletions test/Effect/interruption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,10 @@ describe.concurrent("Effect", () => {
const result = yield* $(Ref.get(ref))
assert.isTrue(result)
}))
it.effect("asyncInterrupt cancelation", () =>
it.effect("async cancelation", () =>
Effect.gen(function*($) {
const ref = MutableRef.make(0)
const effect = Effect.asyncInterruptEither(() => {
const effect = Effect.asyncEither(() => {
pipe(ref, MutableRef.set(MutableRef.get(ref) + 1))
return Either.left(Effect.sync(() => {
pipe(ref, MutableRef.set(MutableRef.get(ref) - 1))
Expand Down Expand Up @@ -588,4 +588,17 @@ describe.concurrent("Effect", () => {
const result = yield* $(Deferred.await(deferred))
assert.strictEqual(result, 42)
}))
it.effect("asyncInterrupt aborts the signal", () =>
Effect.gen(function*($) {
let signal: AbortSignal
const fiber = yield* $(
Effect.asyncInterrupt<never, never, void>((cb, signal_) => {
signal = signal_
}),
Effect.fork
)
yield* $(Effect.yieldNow())
yield* $(Fiber.interrupt(fiber))
assert.strictEqual(signal!.aborted, true)
}))
})

0 comments on commit d9f449b

Please sign in to comment.