Skip to content

Commit

Permalink
stream: make Readable.from performance better
Browse files Browse the repository at this point in the history
PR-URL: #37609
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
wwwzbwcom authored and ruyadorno committed Mar 24, 2021
1 parent f07428a commit 43c3b43
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 19 deletions.
26 changes: 26 additions & 0 deletions benchmark/streams/readable-from.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict';

const common = require('../common');
const Readable = require('stream').Readable;

const bench = common.createBenchmark(main, {
n: [1e7],
});

async function main({ n }) {
const arr = [];
for (let i = 0; i < n; i++) {
arr.push(`${i}`);
}

const s = new Readable.from(arr);

bench.start();
s.on('data', (data) => {
// eslint-disable-next-line no-unused-expressions
data;
});
s.on('close', () => {
bench.end(n);
});
}
51 changes: 32 additions & 19 deletions lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const {
PromisePrototypeThen,
SymbolAsyncIterator,
SymbolIterator
SymbolIterator,
} = primordials;
const { Buffer } = require('buffer');

Expand All @@ -25,18 +25,22 @@ function from(Readable, iterable, opts) {
});
}

if (iterable && iterable[SymbolAsyncIterator])
let isAsync = false;
if (iterable && iterable[SymbolAsyncIterator]) {
isAsync = true;
iterator = iterable[SymbolAsyncIterator]();
else if (iterable && iterable[SymbolIterator])
} else if (iterable && iterable[SymbolIterator]) {
isAsync = false;
iterator = iterable[SymbolIterator]();
else
} else {
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
}

const readable = new Readable({
objectMode: true,
highWaterMark: 1,
// TODO(ronag): What options should be allowed?
...opts
...opts,
});

// Flag to protect against _read
Expand Down Expand Up @@ -75,23 +79,32 @@ function from(Readable, iterable, opts) {
}

async function next() {
try {
const { value, done } = await iterator.next();
if (done) {
readable.push(null);
} else {
const res = await value;
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
next();
for (;;) {
try {
const { value, done } = isAsync ?
await iterator.next() :
iterator.next();

if (done) {
readable.push(null);
} else {
reading = false;
const res = (value &&
typeof value.then === 'function') ?
await value :
value;
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
continue;
} else {
reading = false;
}
}
} catch (err) {
readable.destroy(err);
}
} catch (err) {
readable.destroy(err);
break;
}
}
return readable;
Expand Down

0 comments on commit 43c3b43

Please sign in to comment.