From 6c36399aa15e11b3f1f02b6bf797ecdb2c4b92c8 Mon Sep 17 00:00:00 2001 From: Zach Bjornson Date: Thu, 7 Mar 2019 12:17:54 -0800 Subject: [PATCH] support res.removeListener('drain'), res.once('drain') Fixes #152 Fixes #135 --- .travis.yml | 1 + index.js | 27 +++++++ test/compression.js | 179 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 207 insertions(+) diff --git a/.travis.yml b/.travis.yml index df524d09..c330a8bb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ node_js: - "7.10" - "8.11" - "9.11" + - "10" sudo: false cache: directories: diff --git a/index.js b/index.js index f190c689..e0f4cd97 100644 --- a/index.js +++ b/index.js @@ -64,6 +64,7 @@ function compression (options) { var _end = res.end var _on = res.on + var _removeListener = res.removeListener var _write = res.write // flush @@ -131,6 +132,32 @@ function compression (options) { return this } + res.addListener = res.on + + res.removeListener = function removeListener (type, listener) { + if (!listeners || type !== 'drain') { + return _removeListener.call(this, type, listener) + } + + if (stream) { + return stream.removeListener(type, listener) + } + + // remove buffered listener + for (var i = listeners.length - 1; i >= 0; i--) { + if (listeners[i][0] === type && listeners[i][1] === listener) { + listeners.splice(i, 1) + } + } + + return this + } + + if (res.off) { + // emitter.off was added in Node.js v10+; don't add it to earlier versions + res.off = res.removeListener + } + function nocompress (msg) { debug('no compression: %s', msg) addListeners(res, _on, listeners) diff --git a/test/compression.js b/test/compression.js index 013744cb..41db19cf 100644 --- a/test/compression.js +++ b/test/compression.js @@ -301,6 +301,185 @@ describe('compression()', function () { .expect(200, done) }) + it('should support removeListener("drain") after on("drain"); stream present', function (done) { + // compression doesn't proxy listenerCount() to the compression stream, so + // instead watch for a MaxListenersExceededWarning + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({threshold: 0}, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain") after addListener("drain")', function (done) { + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({threshold: 0}, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.addListener('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support off("drain") after addListener("drain")', function (done) { + if (!require('events').prototype.off) { // off was added in Node.js v10 + this.skip() + } + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({threshold: 0}, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + var len = bytes('40kb') + var buf = Buffer.alloc(len, '.') + res.write(buf) + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.addListener('drain', listener) + res.off('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain"); buffered', function (done) { + // Variant of above tests for scenario when the listener is buffered (stream + // is not yet present). + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({threshold: 0}, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should support removeListener("drain"); multiple bindings of same listener, buffered', function (done) { + // Variant of above test for scenario when the listener is buffered (stream + // is not yet present) and the same listener is added two or more times. + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + process.on('warning', onWarning) + var server = createServer({threshold: 0}, function (req, res) { + res.setHeader('Content-Type', 'text/plain') + for (var times = 0; times < res.getMaxListeners() + 1; times++) { + var listener = function () {} + res.on('drain', listener) + res.on('drain', listener) + res.removeListener('drain', listener) + } + res.end() + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + + it('should not leak event listeners when res.unpipe() is used (#135)', function (done) { + var hasWarned = false + var onWarning = function () { + hasWarned = true + } + var server = createServer({threshold: 0}, function (req, res) { + var times = 0 + var int = setInterval(function () { + var rs = require('fs').createReadStream('does not exist') + rs.on('error', function (e) { + rs.unpipe(res) + }) + rs.pipe(res) + if (times++ > res.getMaxListeners()) { + clearInterval(int) + res.end('hello, world') + } + }) + }) + + request(server) + .get('/') + .set('Accept-Encoding', 'gzip') + .expect(function () { + process.removeListener('warning', onWarning) + assert.ok(!hasWarned) + }) + .expect(200, done) + }) + describe('threshold', function () { it('should not compress responses below the threshold size', function (done) { var server = createServer({ threshold: '1kb' }, function (req, res) {