diff --git a/lib/internal/wrap_js_stream.js b/lib/internal/wrap_js_stream.js index 25a77b8c4a9e8c..9e0961654df22e 100644 --- a/lib/internal/wrap_js_stream.js +++ b/lib/internal/wrap_js_stream.js @@ -2,226 +2,222 @@ const assert = require('assert'); const util = require('util'); -const Socket = require('net').Socket; -const JSStream = process.binding('js_stream').JSStream; +const { Socket } = require('net'); +const { JSStream } = process.binding('js_stream'); // TODO(bmeurer): Change this back to const once hole checks are // properly optimized away early in Ignition+TurboFan. var Buffer = require('buffer').Buffer; const uv = process.binding('uv'); const debug = util.debuglog('stream_wrap'); -function StreamWrap(stream) { - const handle = new JSStream(); - - this.stream = stream; - - this._list = null; - - const self = this; - handle.close = function(cb) { - debug('close'); - self.doClose(cb); - }; - handle.isAlive = function() { - return self.isAlive(); - }; - handle.isClosing = function() { - return self.isClosing(); - }; - handle.onreadstart = function() { - return self.readStart(); - }; - handle.onreadstop = function() { - return self.readStop(); - }; - handle.onshutdown = function(req) { - return self.doShutdown(req); - }; - handle.onwrite = function(req, bufs) { - return self.doWrite(req, bufs); - }; - - this.stream.pause(); - this.stream.on('error', function onerror(err) { - self.emit('error', err); - }); - this.stream.on('data', function ondata(chunk) { - if (!(chunk instanceof Buffer)) { - // Make sure that no further `data` events will happen - this.pause(); - this.removeListener('data', ondata); - - self.emit('error', new Error('Stream has StringDecoder')); - return; - } - - debug('data', chunk.length); - if (self._handle) - self._handle.readBuffer(chunk); - }); - this.stream.once('end', function onend() { - debug('end'); - if (self._handle) - self._handle.emitEOF(); - }); - - Socket.call(this, { - handle: handle - }); -} -util.inherits(StreamWrap, Socket); -module.exports = StreamWrap; - -// require('_stream_wrap').StreamWrap -StreamWrap.StreamWrap = StreamWrap; - -StreamWrap.prototype.isAlive = function isAlive() { - return true; -}; - -StreamWrap.prototype.isClosing = function isClosing() { - return !this.readable || !this.writable; -}; - -StreamWrap.prototype.readStart = function readStart() { - this.stream.resume(); - return 0; -}; - -StreamWrap.prototype.readStop = function readStop() { - this.stream.pause(); - return 0; -}; - -StreamWrap.prototype.doShutdown = function doShutdown(req) { - const self = this; - const handle = this._handle; - const item = this._enqueue('shutdown', req); - - this.stream.end(function() { - // Ensure that write was dispatched - setImmediate(function() { - if (!self._dequeue(item)) +/* This class serves as a wrapper for when the C++ side of Node wants access + * to a standard JS stream. For example, TLS or HTTP do not operate on network + * resources conceptually, although that is the common case and what we are + * optimizing for; in theory, they are completely composable and can work with + * any stream resource they see. + * + * For the common case, i.e. a TLS socket wrapping around a net.Socket, we + * can skip going through the JS layer and let TLS access the raw C++ handle + * of a net.Socket. The flipside of this is that, to maintain composability, + * we need a way to create "fake" net.Socket instances that call back into a + * "real" JavaScript stream. JSStreamWrap is exactly this. + */ +class JSStreamWrap extends Socket { + constructor(stream) { + const handle = new JSStream(); + handle.close = (cb) => { + debug('close'); + this.doClose(cb); + }; + handle.isAlive = () => this.isAlive(); + handle.isClosing = () => this.isClosing(); + handle.onreadstart = () => this.readStart(); + handle.onreadstop = () => this.readStop(); + handle.onshutdown = (req) => this.doShutdown(req); + handle.onwrite = (req, bufs) => this.doWrite(req, bufs); + + stream.pause(); + stream.on('error', (err) => this.emit('error', err)); + const ondata = (chunk) => { + if (!(chunk instanceof Buffer)) { + // Make sure that no further `data` events will happen. + stream.pause(); + stream.removeListener('data', ondata); + + this.emit('error', new Error('Stream has StringDecoder')); return; + } - handle.finishShutdown(req, 0); + debug('data', chunk.length); + if (this._handle) + this._handle.readBuffer(chunk); + }; + stream.on('data', ondata); + stream.once('end', () => { + debug('end'); + if (this._handle) + this._handle.emitEOF(); }); - }); - return 0; -}; -StreamWrap.prototype.doWrite = function doWrite(req, bufs) { - const self = this; - const handle = self._handle; + super({ handle, manualStart: true }); + this.stream = stream; + this._list = null; + this.read(0); + } - var pending = bufs.length; + // Legacy + static get StreamWrap() { + return JSStreamWrap; + } - // Queue the request to be able to cancel it - const item = self._enqueue('write', req); + isAlive() { + return true; + } - self.stream.cork(); - for (var n = 0; n < bufs.length; n++) - self.stream.write(bufs[n], done); - self.stream.uncork(); + isClosing() { + return !this.readable || !this.writable; + } - function done(err) { - if (!err && --pending !== 0) - return; + readStart() { + this.stream.resume(); + return 0; + } - // Ensure that this is called once in case of error - pending = 0; + readStop() { + this.stream.pause(); + return 0; + } - // Ensure that write was dispatched - setImmediate(function() { - // Do not invoke callback twice - if (!self._dequeue(item)) - return; + doShutdown(req) { + const handle = this._handle; + const item = this._enqueue('shutdown', req); - var errCode = 0; - if (err) { - if (err.code && uv['UV_' + err.code]) - errCode = uv['UV_' + err.code]; - else - errCode = uv.UV_EPIPE; - } + this.stream.end(() => { + // Ensure that write was dispatched + setImmediate(() => { + if (!this._dequeue(item)) + return; - handle.doAfterWrite(req); - handle.finishWrite(req, errCode); + handle.finishShutdown(req, 0); + }); }); + return 0; } - return 0; -}; + doWrite(req, bufs) { + const self = this; + const handle = this._handle; -function QueueItem(type, req) { - this.type = type; - this.req = req; - this.prev = this; - this.next = this; -} + var pending = bufs.length; -StreamWrap.prototype._enqueue = function _enqueue(type, req) { - const item = new QueueItem(type, req); - if (this._list === null) { - this._list = item; - return item; - } + // Queue the request to be able to cancel it + const item = this._enqueue('write', req); - item.next = this._list.next; - item.prev = this._list; - item.next.prev = item; - item.prev.next = item; + this.stream.cork(); + for (var n = 0; n < bufs.length; n++) + this.stream.write(bufs[n], done); + this.stream.uncork(); - return item; -}; + function done(err) { + if (!err && --pending !== 0) + return; -StreamWrap.prototype._dequeue = function _dequeue(item) { - assert(item instanceof QueueItem); + // Ensure that this is called once in case of error + pending = 0; - var next = item.next; - var prev = item.prev; + // Ensure that write was dispatched + setImmediate(function() { + // Do not invoke callback twice + if (!self._dequeue(item)) + return; - if (next === null && prev === null) - return false; + var errCode = 0; + if (err) { + if (err.code && uv['UV_' + err.code]) + errCode = uv['UV_' + err.code]; + else + errCode = uv.UV_EPIPE; + } - item.next = null; - item.prev = null; + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); + }); + } - if (next === item) { - prev = null; - next = null; - } else { - prev.next = next; - next.prev = prev; + return 0; } - if (this._list === item) - this._list = next; + _enqueue(type, req) { + const item = new QueueItem(type, req); + if (this._list === null) { + this._list = item; + return item; + } - return true; -}; + item.next = this._list.next; + item.prev = this._list; + item.next.prev = item; + item.prev.next = item; -StreamWrap.prototype.doClose = function doClose(cb) { - const self = this; - const handle = self._handle; + return item; + } - setImmediate(function() { - while (self._list !== null) { - const item = self._list; - const req = item.req; - self._dequeue(item); + _dequeue(item) { + assert(item instanceof QueueItem); - const errCode = uv.UV_ECANCELED; - if (item.type === 'write') { - handle.doAfterWrite(req); - handle.finishWrite(req, errCode); - } else if (item.type === 'shutdown') { - handle.finishShutdown(req, errCode); - } + var next = item.next; + var prev = item.prev; + + if (next === null && prev === null) + return false; + + item.next = null; + item.prev = null; + + if (next === item) { + prev = null; + next = null; + } else { + prev.next = next; + next.prev = prev; } - // Should be already set by net.js - assert(self._handle === null); - cb(); - }); -}; + if (this._list === item) + this._list = next; + + return true; + } + + doClose(cb) { + const handle = this._handle; + + setImmediate(() => { + while (this._list !== null) { + const item = this._list; + const req = item.req; + this._dequeue(item); + + const errCode = uv.UV_ECANCELED; + if (item.type === 'write') { + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); + } else if (item.type === 'shutdown') { + handle.finishShutdown(req, errCode); + } + } + + // Should be already set by net.js + assert.strictEqual(this._handle, null); + cb(); + }); + } +} + +function QueueItem(type, req) { + this.type = type; + this.req = req; + this.prev = this; + this.next = this; +} + +module.exports = JSStreamWrap; diff --git a/lib/net.js b/lib/net.js index 70d4841d5e7b05..8e4377cb41ae53 100644 --- a/lib/net.js +++ b/lib/net.js @@ -241,7 +241,7 @@ function Socket(options) { this._handle.reading = false; this._handle.readStop(); this._readableState.flowing = false; - } else { + } else if (!options.manualStart) { this.read(0); } } diff --git a/test/parallel/test-tls-wrap-event-emmiter.js b/test/parallel/test-tls-wrap-event-emmiter.js index 82953f1333e5da..b6ae9e2d5a7e99 100644 --- a/test/parallel/test-tls-wrap-event-emmiter.js +++ b/test/parallel/test-tls-wrap-event-emmiter.js @@ -15,5 +15,5 @@ const TlsSocket = require('tls').TLSSocket; const EventEmitter = require('events').EventEmitter; assert.throws( () => { new TlsSocket(new EventEmitter()); }, - /^TypeError: this\.stream\.pause is not a function/ + /^TypeError: (.+) is not a function$/ );