diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 8755e22de0d00f..93b676ff02892b 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -20,6 +20,8 @@ const kLastPromise = Symbol('lastPromise'); const kHandlePromise = Symbol('handlePromise'); const kStream = Symbol('stream'); +let Readable; + function createIterResult(value, done) { return { value, done }; } @@ -145,6 +147,22 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ }, AsyncIteratorPrototype); const createReadableStreamAsyncIterator = (stream) => { + if (typeof stream.read !== 'function') { + // v1 stream + + if (!Readable) { + Readable = require('_stream_readable'); + } + + const src = stream; + stream = new Readable({ objectMode: true }).wrap(src); + finished(stream, (err) => { + if (typeof src.destroy === 'function') { + src.destroy(err); + } + }); + } + const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, { [kStream]: { value: stream, writable: true }, [kLastResolve]: { value: null, writable: true }, diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index ded77e01da3d9b..26482fa717ef74 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -1,7 +1,13 @@ 'use strict'; const common = require('../common'); -const { Readable, Transform, PassThrough, pipeline } = require('stream'); +const { + Stream, + Readable, + Transform, + PassThrough, + pipeline +} = require('stream'); const assert = require('assert'); async function tests() { @@ -14,6 +20,42 @@ async function tests() { AsyncIteratorPrototype); } + { + // v1 stream + + const stream = new Stream(); + stream.destroy = common.mustCall(); + process.nextTick(() => { + stream.emit('data', 'hello'); + stream.emit('data', 'world'); + stream.emit('end'); + }); + + let res = ''; + stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator]; + for await (const d of stream) { + res += d; + } + assert.strictEqual(res, 'helloworld'); + } + + { + // v1 stream error + + const stream = new Stream(); + stream.close = common.mustCall(); + process.nextTick(() => { + stream.emit('data', 0); + stream.emit('data', 1); + stream.emit('error', new Error('asd')); + }); + + const iter = Readable.prototype[Symbol.asyncIterator].call(stream); + iter.next().catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + } + { const readable = new Readable({ objectMode: true, read() {} }); readable.push(0);