forked from graphql/graphql-js
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
replace mapAsyncIterable with repeater-based implementation
- Loading branch information
Showing
3 changed files
with
85 additions
and
55 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -739,9 +739,8 @@ describe('Subscription Publish Phase', () => { | |
}, | ||
}); | ||
|
||
const returned = await subscription.return(); | ||
payload = subscription.next(); | ||
await subscription.return(); | ||
|
||
// A new email arrives! | ||
expect( | ||
pubsub.emit({ | ||
|
@@ -752,6 +751,10 @@ describe('Subscription Publish Phase', () => { | |
}), | ||
).to.equal(false); | ||
|
||
expect(returned).to.deep.equal({ | ||
done: true, | ||
value: undefined, | ||
}); | ||
expect(await payload).to.deep.equal({ | ||
done: true, | ||
value: undefined, | ||
|
@@ -793,17 +796,21 @@ describe('Subscription Publish Phase', () => { | |
}, | ||
}); | ||
|
||
const error = new Error('should not trigger when subscription is thrown'); | ||
const caughtError = subscription.throw(error).catch((e) => e); | ||
payload = subscription.next(); | ||
|
||
// Throw error | ||
let caughtError; | ||
try { | ||
/* c8 ignore next 2 */ | ||
await subscription.throw('ouch'); | ||
} catch (e) { | ||
caughtError = e; | ||
} | ||
expect(caughtError).to.equal('ouch'); | ||
// A new email arrives! | ||
expect( | ||
pubsub.emit({ | ||
from: '[email protected]', | ||
subject: 'Alright 2', | ||
message: 'Tests are good 2', | ||
unread: true, | ||
}), | ||
).to.equal(true); | ||
|
||
expect(await caughtError).to.equal(error); | ||
|
||
expect(await payload).to.deep.equal({ | ||
done: true, | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,57 +1,80 @@ | ||
/* eslint-disable no-await-in-loop */ | ||
|
||
import type { PromiseOrValue } from '../jsutils/PromiseOrValue'; | ||
import { Repeater, RepeaterClosureSignal } from '../jsutils/Repeater'; | ||
|
||
/** | ||
* Given an AsyncIterable and a callback function, return an AsyncIterator | ||
* Given an AsyncIterable and a callback function, return an AsyncGenerator | ||
* which produces values mapped via calling the callback function. | ||
*/ | ||
export function mapAsyncIterable<T, U, R = undefined>( | ||
iterable: AsyncGenerator<T, R, void> | AsyncIterable<T>, | ||
callback: (value: T) => PromiseOrValue<U>, | ||
fn: (value: T) => PromiseOrValue<U>, | ||
): AsyncGenerator<U, R, void> { | ||
const iterator = iterable[Symbol.asyncIterator](); | ||
return new Repeater<U, R, void>(async ({ push, stop }) => { | ||
const iterator: AsyncIterator<T, R, void> = | ||
iterable[Symbol.asyncIterator](); | ||
|
||
async function mapResult( | ||
result: IteratorResult<T, R>, | ||
): Promise<IteratorResult<U, R>> { | ||
if (result.done) { | ||
return result; | ||
} | ||
let next = iterator.next(); | ||
// eslint-disable-next-line no-constant-condition | ||
while (true) { | ||
let iteration: IteratorResult<T, R>; | ||
try { | ||
iteration = await next; | ||
} catch (err) { | ||
await abruptClose(iterator); | ||
throw err; | ||
} | ||
|
||
try { | ||
return { value: await callback(result.value), done: false }; | ||
} catch (error) { | ||
/* c8 ignore start */ | ||
// FIXME: add test case | ||
if (typeof iterator.return === 'function') { | ||
try { | ||
await iterator.return(); | ||
} catch (_e) { | ||
/* ignore error */ | ||
const { done, value } = iteration; | ||
|
||
if (done) { | ||
stop(); | ||
return value; | ||
} | ||
|
||
let mapped: U; | ||
try { | ||
mapped = await fn(value); | ||
} catch (err) { | ||
await abruptClose(iterator); | ||
throw err; | ||
} | ||
|
||
try { | ||
await push(mapped); | ||
} catch (err) { | ||
if (err instanceof RepeaterClosureSignal) { | ||
if (typeof iterator.return !== 'function') { | ||
stop(); | ||
return undefined as unknown as R; // void :( | ||
} | ||
|
||
next = iterator.return(err.returnValue); | ||
continue; | ||
} | ||
|
||
if (typeof iterator.throw !== 'function') { | ||
await abruptClose(iterator); | ||
throw err; | ||
} | ||
|
||
next = iterator.throw(err); | ||
continue; | ||
} | ||
throw error; | ||
/* c8 ignore stop */ | ||
|
||
next = iterator.next(); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
return { | ||
async next() { | ||
return mapResult(await iterator.next()); | ||
}, | ||
async return(): Promise<IteratorResult<U, R>> { | ||
// If iterator.return() does not exist, then type R must be undefined. | ||
return typeof iterator.return === 'function' | ||
? mapResult(await iterator.return()) | ||
: { value: undefined as any, done: true }; | ||
}, | ||
async throw(error?: unknown) { | ||
if (typeof iterator.throw === 'function') { | ||
return mapResult(await iterator.throw(error)); | ||
} | ||
throw error; | ||
}, | ||
[Symbol.asyncIterator]() { | ||
return this; | ||
}, | ||
}; | ||
async function abruptClose(iterator: AsyncIterator<unknown>): Promise<void> { | ||
if (typeof iterator.return === 'function') { | ||
try { | ||
await iterator.return(); /* c8 ignore start */ | ||
} catch (_err) { | ||
// FIXME: add test case | ||
/* ignore error */ | ||
} /* c8 ignore stop */ | ||
} | ||
} |