diff --git a/lib/cursor/AggregationCursor.js b/lib/cursor/AggregationCursor.js index 4a84429a031..cec302a9f11 100644 --- a/lib/cursor/AggregationCursor.js +++ b/lib/cursor/AggregationCursor.js @@ -10,6 +10,7 @@ const promiseOrCallback = require('../helpers/promiseOrCallback'); const eachAsync = require('../helpers/cursor/eachAsync'); const immediate = require('../helpers/immediate'); const util = require('util'); +const utils = require('../../lib/utils'); /** * An AggregationCursor is a concurrency primitive for processing aggregation @@ -36,7 +37,14 @@ const util = require('util'); */ function AggregationCursor(agg) { - Readable.call(this, { objectMode: true }); + const streamOpts = { objectMode: true }; + // for node < 12 we will emit 'close' event after 'end' + if (utils.nodeMajorVersion >= 12) { + // set autoDestroy=true because on node 12 it's by default false + // gh-10902 need autoDestroy to destroy correctly and emit 'close' event for node >= 12 + streamOpts.autoDestroy = true; + } + Readable.call(this, streamOpts); this.cursor = null; this.agg = agg; @@ -86,20 +94,10 @@ AggregationCursor.prototype._read = function() { if (error) { return _this.emit('error', error); } - setTimeout(function() { - // on node >= 14 streams close automatically (gh-8834) - const isNotClosedAutomatically = !_this.destroyed; - if (isNotClosedAutomatically) { - // call destroy method if exists to prevent emit twice 'close' by autoDestroy (gh-10876) - // @see https://nodejs.org/api/stream.html#stream_readable_destroy_error - // the 'close' is emited on destroy started with version 10 - if (_this.destroy && parseInt(process.versions.node.split('.')[0]) > 9) { - _this.destroy(); - } else { - _this.emit('close'); - } - } - }, 0); + // for node >= 12 the autoDestroy will emit the 'close' event + if (utils.nodeMajorVersion < 12) { + _this.on('end', () => _this.emit('close')); + } }); return; } diff --git a/lib/cursor/QueryCursor.js b/lib/cursor/QueryCursor.js index e5b7f032400..9eec7b074b0 100644 --- a/lib/cursor/QueryCursor.js +++ b/lib/cursor/QueryCursor.js @@ -10,6 +10,7 @@ const eachAsync = require('../helpers/cursor/eachAsync'); const helpers = require('../queryhelpers'); const immediate = require('../helpers/immediate'); const util = require('util'); +const utils = require('../../lib/utils'); /** * A QueryCursor is a concurrency primitive for processing query results @@ -34,7 +35,14 @@ const util = require('util'); */ function QueryCursor(query, options) { - Readable.call(this, { objectMode: true }); + const streamOpts = { objectMode: true }; + // for node < 12 we will emit 'close' event after 'end' + if (utils.nodeMajorVersion >= 12) { + // set autoDestroy=true because on node 12 it's by default false + // gh-10902 need autoDestroy to destroy correctly and emit 'close' event for node >= 12 + streamOpts.autoDestroy = true; + } + Readable.call(this, streamOpts); this.cursor = null; this.query = query; @@ -95,20 +103,10 @@ QueryCursor.prototype._read = function() { if (error) { return _this.emit('error', error); } - setTimeout(function() { - // on node >= 14 streams close automatically (gh-8834) - const isNotClosedAutomatically = !_this.destroyed; - if (isNotClosedAutomatically) { - // call destroy method if exists to prevent emit twice 'close' by autoDestroy (gh-10876) - // @see https://nodejs.org/api/stream.html#stream_readable_destroy_error - // the close is emited on destroy started with version 10 - if (_this.destroy && parseInt(process.versions.node.split('.')[0]) > 9) { - _this.destroy(); - } else { - _this.emit('close'); - } - } - }, 0); + // for node >= 12 the autoDestroy will emit the 'close' event + if (utils.nodeMajorVersion < 12) { + _this.on('end', () => _this.emit('close')); + } }); return; } diff --git a/lib/utils.js b/lib/utils.js index d6e2dc4e355..1c531f4efd0 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -934,4 +934,6 @@ exports.errorToPOJO = function errorToPOJO(error) { ret[properyName] = error[properyName]; } return ret; -}; \ No newline at end of file +}; + +exports.nodeMajorVersion = parseInt(process.versions.node.split('.')[0], 10); diff --git a/test/query.cursor.test.js b/test/query.cursor.test.js index debacbbb1b1..9ca94eda435 100644 --- a/test/query.cursor.test.js +++ b/test/query.cursor.test.js @@ -686,6 +686,88 @@ describe('QueryCursor', function() { }); }); + it('query cursor emit end event (gh-10902)', function(done) { + const User = db.model('User', new Schema({ name: String })); + + User.create({ name: 'First' }, { name: 'Second' }) + .then(() => { + const cursor = User.find({}).cursor(); + cursor.on('data', () => { + cursor.pause(); + setTimeout(() => cursor.resume(), 50); + }); + + let endEventTriggeredCount = 0; + cursor.on('end', () => endEventTriggeredCount++); + + setTimeout(() => { + assert.equal(endEventTriggeredCount, 1); + done(); + }, 200); + }); + }); + + it('aggregate cursor emit end event (gh-10902)', function(done) { + const User = db.model('User', new Schema({ name: String })); + + User.create({ name: 'First' }, { name: 'Second' }) + .then(() => { + const cursor = User.aggregate([{ $match: {} }]).cursor().exec(); + cursor.on('data', () => { + cursor.pause(); + setTimeout(() => cursor.resume(), 50); + }); + + let endEventTriggeredCount = 0; + cursor.on('end', () => endEventTriggeredCount++); + + setTimeout(() => { + assert.equal(endEventTriggeredCount, 1); + done(); + }, 200); + }); + }); + + it('query cursor emit end event before close event (gh-10902)', function(done) { + const User = db.model('User', new Schema({ name: String })); + + User.create({ name: 'First' }, { name: 'Second' }) + .then(() => { + const cursor = User.find({}).cursor(); + cursor.on('data', () => { + cursor.pause(); + setTimeout(() => cursor.resume(), 50); + }); + + let endEventTriggeredCount = 0; + cursor.on('end', () => endEventTriggeredCount++); + cursor.on('close', () => { + assert.equal(endEventTriggeredCount, 1); + done(); + }); + }); + }); + + it('aggregate cursor emit end event before close event (gh-10902)', function(done) { + const User = db.model('User', new Schema({ name: String })); + + User.create({ name: 'First' }, { name: 'Second' }) + .then(() => { + const cursor = User.aggregate([{ $match: {} }]).cursor().exec(); + cursor.on('data', () => { + cursor.pause(); + setTimeout(() => cursor.resume(), 50); + }); + + let endEventTriggeredCount = 0; + cursor.on('end', () => endEventTriggeredCount++); + cursor.on('close', () => { + assert.equal(endEventTriggeredCount, 1); + done(); + }); + }); + }); + it('passes document index as the second argument for query cursor (gh-8972)', function() { return co(function *() { const User = db.model('User', Schema({ order: Number }));