Skip to content

Commit

Permalink
stream: add more forEach tests
Browse files Browse the repository at this point in the history
PR-URL: nodejs#41937
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Tobias Nießen <[email protected]>
Reviewed-By: Mestery <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Antoine du Hamel <[email protected]>
  • Loading branch information
benjamingr authored and bengl committed Feb 21, 2022
1 parent 8949daa commit 79322f0
Showing 1 changed file with 47 additions and 2 deletions.
49 changes: 47 additions & 2 deletions test/parallel/test-stream-forEach.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');
const { once } = require('events');

{
// forEach works on synchronous streams with a synchronous predicate
Expand Down Expand Up @@ -43,14 +43,59 @@ const { setTimeout } = require('timers/promises');
})().then(common.mustCall());
}

{
// forEach works on an infinite stream
const ac = new AbortController();
const { signal } = ac;
const stream = Readable.from(async function* () {
while (true) yield 1;
}(), { signal });
let i = 0;
assert.rejects(stream.forEach(common.mustCall((x) => {
i++;
if (i === 10) ac.abort();
assert.strictEqual(x, 1);
}, 10)), { name: 'AbortError' }).then(common.mustCall());
}

{
// Emitting an error during `forEach`
const stream = Readable.from([1, 2, 3, 4, 5]);
assert.rejects(stream.forEach(async (x) => {
if (x === 3) {
stream.emit('error', new Error('boom'));
}
}), /boom/).then(common.mustCall());
}

{
// Throwing an error during `forEach` (sync)
const stream = Readable.from([1, 2, 3, 4, 5]);
assert.rejects(stream.forEach((x) => {
if (x === 3) {
throw new Error('boom');
}
}), /boom/).then(common.mustCall());
}

{
// Throwing an error during `forEach` (async)
const stream = Readable.from([1, 2, 3, 4, 5]);
assert.rejects(stream.forEach(async (x) => {
if (x === 3) {
return Promise.reject(new Error('boom'));
}
}), /boom/).then(common.mustCall());
}

{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const forEachPromise =
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
await once(signal, 'abort');
}, { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
Expand Down

0 comments on commit 79322f0

Please sign in to comment.