Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add iterator helper some every #41573

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1918,7 +1918,7 @@ import { Resolver } from 'dns/promises';
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

// Make dns queries concurrently using .map and collect
// the results into an aray using toArray
// the results into an array using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
Expand All @@ -1929,6 +1929,104 @@ const dnsResults = await Readable.from([
}, { concurrency: 2 }).toArray();
```

### `readable.some(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
value for at least one of the chunks.

This method is similar to `Array.prototype.some` and calls `fn` on each chunk
in the stream until the awaited return value is `true` (or any truthy value).
Once an `fn` call on a chunk awaited return value is truthy, the stream is
destroyed and the promise is fulfilled with `true`. If none of the `fn`
calls on the chunks return a truthy value, the promise is fulfilled with
`false`.

```mjs
import { Readable } from 'stream';
import { stat } from 'fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false

// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).some(async (fileName) => {
const stats = await stat(fileName);
return stat.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
console.log('done'); // Stream has finished
```

### `readable.every(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
value for all of the chunks.

This method is similar to `Array.prototype.every` and calls `fn` on each chunk
in the stream to check if all awaited return values are truthy value for `fn`.
Once an `fn` call on a chunk awaited return value is falsy, the stream is
destroyed and the promise is fulfilled with `false`. If all of the `fn` calls
on the chunks return a truthy value, the promise is fulfilled with `true`.

```mjs
import { Readable } from 'stream';
import { stat } from 'fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true

// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
'file1',
'file2',
'file3',
]).every(async (fileName) => {
const stats = await stat(fileName);
return stat.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
41 changes: 41 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');

const {
ArrayPrototypePush,
Expand Down Expand Up @@ -47,6 +48,10 @@ async function * map(fn, options) {
const signalOpt = { signal };

const abort = () => ac.abort();
if (options?.signal?.aborted) {
abort();
}

options?.signal?.addEventListener('abort', abort);

let next;
Expand Down Expand Up @@ -150,6 +155,40 @@ async function * map(fn, options) {
}
}

async function some(fn, options) {
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
// Note that some does short circuit but also closes the iterator if it does
const ac = new AbortController();
if (options?.signal) {
if (options.signal.aborted) {
ac.abort();
}
options.signal.addEventListener('abort', () => ac.abort(), {
[kWeakHandler]: this,
once: true,
});
}
const mapped = this.map(fn, { ...options, signal: ac.signal });
for await (const result of mapped) {
if (result) {
ac.abort();
return true;
}
}
return false;
}
mcollina marked this conversation as resolved.
Show resolved Hide resolved

async function every(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
return !(await some.call(this, async (x) => {
return !(await fn(x));
}, options));
}

async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -196,6 +235,8 @@ module.exports.streamReturningOperators = {
};

module.exports.promiseReturningOperators = {
every,
forEach,
toArray,
some,
};
95 changes: 95 additions & 0 deletions test/parallel/test-stream-some-every.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');

function oneTo5() {
return Readable.from([1, 2, 3, 4, 5]);
}

function oneTo5Async() {
return oneTo5().map(async (x) => {
await Promise.resolve();
return x;
});
}
{
// Some and every work with a synchronous stream and predicate
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
(async () => {
assert.strictEqual(await oneTo5().some((x) => x > 3), true);
assert.strictEqual(await oneTo5().every((x) => x > 3), false);
assert.strictEqual(await oneTo5().some((x) => x > 6), false);
assert.strictEqual(await oneTo5().every((x) => x < 6), true);
assert.strictEqual(await Readable.from([]).some((x) => true), false);
assert.strictEqual(await Readable.from([]).every((x) => true), true);
})().then(common.mustCall());
}

{
// Some and every work with an asynchronous stream and synchronous predicate
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
(async () => {
assert.strictEqual(await oneTo5Async().some((x) => x > 3), true);
assert.strictEqual(await oneTo5Async().every((x) => x > 3), false);
assert.strictEqual(await oneTo5Async().some((x) => x > 6), false);
assert.strictEqual(await oneTo5Async().every((x) => x < 6), true);
})().then(common.mustCall());
}

{
// Some and every work on asynchronous streams with an asynchronous predicate
(async () => {
assert.strictEqual(await oneTo5().some(async (x) => x > 3), true);
assert.strictEqual(await oneTo5().every(async (x) => x > 3), false);
assert.strictEqual(await oneTo5().some(async (x) => x > 6), false);
assert.strictEqual(await oneTo5().every(async (x) => x < 6), true);
})().then(common.mustCall());
}

{
// Some and every short circuit
(async () => {
await oneTo5().some(common.mustCall((x) => x > 2, 3));
await oneTo5().every(common.mustCall((x) => x < 3, 3));
// When short circuit isn't possible the whole stream is iterated
await oneTo5().some(common.mustCall((x) => x > 6, 5));
// The stream is destroyed afterwards
const stream = oneTo5();
await stream.some(common.mustCall((x) => x > 2, 3));
assert.strictEqual(stream.destroyed, true);
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
assert.rejects(Readable.from([1, 2, 3]).some(
() => new Promise(() => {}),
{ signal: ac.signal }
), {
name: 'AbortError',
}).then(common.mustCall());
ac.abort();
}
{
// Support for pre-aborted AbortSignal
assert.rejects(Readable.from([1, 2, 3]).some(
() => new Promise(() => {}),
{ signal: AbortSignal.abort() }
), {
name: 'AbortError',
}).then(common.mustCall());
}
{
// Error cases
assert.rejects(async () => {
await Readable.from([1]).every(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1]).every((x) => x, {
concurrency: 'Foo'
});
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
}