diff --git a/lib/parallel-merge.ts b/lib/parallel-merge.ts index 7030e444..6fee7ef3 100644 --- a/lib/parallel-merge.ts +++ b/lib/parallel-merge.ts @@ -1,6 +1,6 @@ /// import { getIterator } from './get-iterator' -import { AnyIterable, UnArrayAnyIterable } from './types' +import { AnyIterable, UnArrayAnyIterable, NullOrFunction } from './types' export async function* parallelMerge>>( ...iterables: I @@ -9,6 +9,35 @@ export async function* parallelMerge>>( const concurrentWork = new Set() const values = new Map() + let lastError = null + let errCb: NullOrFunction = null + let valueCb: NullOrFunction = null + + const notifyError = err => { + lastError = err + if (errCb) { + errCb(err) + } + } + + const notifyDone = value => { + if (valueCb) { + valueCb(value) + } + } + + const waitForQueue = () => + new Promise((resolve, reject) => { + if (lastError) { + reject(lastError) + } + if (values.size > 0) { + return resolve() + } + valueCb = resolve + errCb = reject + }) + const queueNext = input => { const nextVal = Promise.resolve(input.next()).then(async ({ done, value }) => { if (!done) { @@ -17,6 +46,7 @@ export async function* parallelMerge>>( concurrentWork.delete(nextVal) }) concurrentWork.add(nextVal) + nextVal.then(notifyDone, notifyError) } for (const input of inputs) { @@ -24,10 +54,13 @@ export async function* parallelMerge>>( } while (true) { - if (concurrentWork.size === 0) { + // We technically don't have to check `values.size` as the for loop should have emptied it + // However I haven't yet found specs verifying that behavior, only tests + // the guard in waitForQueue() checking for values is in place for the same reason + if (concurrentWork.size === 0 && values.size === 0) { return } - await Promise.race(concurrentWork) + await waitForQueue() for (const [input, value] of values) { values.delete(input) yield value diff --git a/lib/types.ts b/lib/types.ts index e427dbd6..b804426a 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -3,3 +3,4 @@ export type Iterableish = Iterable | Iterator | AsyncIterable | Asyn export type AnyIterable = Iterable | AsyncIterable export type FlatMapValue = B | AnyIterable | undefined | null | Promise | undefined | null> export type UnArrayAnyIterable>> = A extends Array> ? T : never +export type NullOrFunction = null | ((anything?: any) => void) diff --git a/lib/write-to-stream.ts b/lib/write-to-stream.ts index 4bb859c5..b04d7ba0 100644 --- a/lib/write-to-stream.ts +++ b/lib/write-to-stream.ts @@ -1,5 +1,5 @@ /// -import { AnyIterable } from './types' +import { AnyIterable, NullOrFunction } from './types' interface IWritable { once: any @@ -14,28 +14,52 @@ function once(event: string, stream: IWritable): Promise { } async function _writeToStream(stream: IWritable, iterable: AnyIterable): Promise { - let errorListener - let error - const errorPromise = new Promise((resolve, reject) => { - errorListener = err => { - error = err - reject(err) + let lastError = null + let errCb: NullOrFunction = null + let drainCb: NullOrFunction = null + + const notifyError = err => { + lastError = err + if (errCb) { + errCb(err) } - stream.once('error', errorListener) - }) as Promise + } + + const notifyDrain = () => { + if (drainCb) { + drainCb() + } + } + + const cleanup = () => { + stream.removeListener('error', notifyError) + stream.removeListener('drain', notifyDrain) + } + + stream.once('error', notifyError) + + const waitForDrain = () => + new Promise((resolve, reject) => { + if (lastError) { + return reject(lastError) + } + stream.once('drain', notifyDrain) + drainCb = resolve + errCb = reject + }) for await (const value of iterable) { if (stream.write(value) === false) { - await Promise.race([errorPromise, once('drain', stream)]) + await waitForDrain() } - if (error) { - return errorPromise + if (lastError) { + break } } - stream.removeListener('error', errorListener) - if (error) { - return errorPromise + cleanup() + if (lastError) { + throw lastError } }