diff --git a/.eslintrc b/.eslintrc index 2440ddd908..283970bdeb 100644 --- a/.eslintrc +++ b/.eslintrc @@ -7,7 +7,8 @@ "mocha": true }, "globals": { - "Promise": true + "Promise": true, + "Set": true }, "parserOptions": { "ecmaVersion": 2017 diff --git a/lib/mongo_client.js b/lib/mongo_client.js index 80f20412f1..52bc0f9ded 100644 --- a/lib/mongo_client.js +++ b/lib/mongo_client.js @@ -327,14 +327,6 @@ MongoClient.prototype.close = function(force, callback) { // Remove listeners after emit self.removeAllListeners('close'); - // If we have sessions, we want to send a single `endSessions` command for them, - // and then individually clean them up. They will be removed from the internal state - // when they emit their `ended` events. - if (this.s.sessions.length) { - this.topology.endSessions(this.s.sessions); - this.s.sessions.forEach(session => session.endSession({ skipCommand: true })); - } - // Callback after next event loop tick if (typeof callback === 'function') return process.nextTick(function() { @@ -508,13 +500,7 @@ MongoClient.prototype.startSession = function(options) { throw new MongoError('Current topology does not support sessions'); } - const session = this.topology.startSession(options); - session.once('ended', () => { - this.s.sessions = this.s.sessions.filter(s => s.equals(session)); - }); - - this.s.sessions.push(session); - return session; + return this.topology.startSession(options); }; var mergeOptions = function(target, source, flatten) { diff --git a/lib/topologies/replset.js b/lib/topologies/replset.js index ad46c17df7..9506057916 100644 --- a/lib/topologies/replset.js +++ b/lib/topologies/replset.js @@ -371,22 +371,9 @@ class ReplSet extends TopologyBase { } close(forceClosed) { - var self = this; - // Call destroy on the topology - this.s.coreTopology.destroy({ - force: typeof forceClosed === 'boolean' ? forceClosed : false - }); - - // We need to wash out all stored processes - if (forceClosed === true) { - this.s.storeOptions.force = forceClosed; - this.s.store.flush(); - } + super.close(forceClosed); - var events = ['timeout', 'error', 'close', 'joined', 'left']; - events.forEach(function(e) { - self.removeAllListeners(e); - }); + ['timeout', 'error', 'close', 'joined', 'left'].forEach(e => this.removeAllListeners(e)); } } diff --git a/lib/topologies/topology_base.js b/lib/topologies/topology_base.js index a5999b8e87..2ed931e2fb 100644 --- a/lib/topologies/topology_base.js +++ b/lib/topologies/topology_base.js @@ -4,9 +4,7 @@ const EventEmitter = require('events'), MongoError = require('mongodb-core').MongoError, f = require('util').format, os = require('os'), - translateReadPreference = require('../utils').translateReadPreference, - ClientSession = require('mongodb-core').Sessions.ClientSession; - + translateReadPreference = require('../utils').translateReadPreference; // The store of ops var Store = function(topology, storeOptions) { var self = this; @@ -290,7 +288,12 @@ class TopologyBase extends EventEmitter { } startSession(options) { - return new ClientSession(this, this.s.sessionPool, options); + if (this.s.sessionPool) { + return this.s.sessionPool.startSession(options); + } + + // TODO: Should we have a better error message here? + throw new Error('Sessions are not supported'); } endSessions(sessions, callback) { @@ -388,6 +391,10 @@ class TopologyBase extends EventEmitter { } close(forceClosed) { + if (this.s.sessionPool) { + this.s.sessionPool.endAllSessions(); + } + this.s.coreTopology.destroy({ force: typeof forceClosed === 'boolean' ? forceClosed : false }); diff --git a/package.json b/package.json index 246856de5f..3a56b8c09b 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "mongodb-test-runner": "^1.1.18", "prettier": "^1.5.3", "semver": "5.4.1", + "sinon": "^4.3.0", "worker-farm": "^1.5.0" }, "author": "Christian Kvalheim", diff --git a/test/functional/apm_tests.js b/test/functional/apm_tests.js index 41438dfc8e..f53a908d7e 100644 --- a/test/functional/apm_tests.js +++ b/test/functional/apm_tests.js @@ -1036,6 +1036,10 @@ describe('APM', function() { // Get the result result = results.successes.shift(); + if (result.commandName === 'endSessions') { + result = results.successes.shift(); + } + // Validate the test expect(commandName).to.equal(result.commandName); // Do we have a getMore command @@ -1054,6 +1058,10 @@ describe('APM', function() { results.failures = filterSessionsCommands(results.failures); result = results.failures.shift(); + if (result.commandName === 'endSessions') { + result = results.failures.shift(); + } + // Validate the test expect(commandName).to.equal(result.commandName); } diff --git a/test/functional/crud_api_tests.js b/test/functional/crud_api_tests.js index de6d0246c4..de791c83d4 100644 --- a/test/functional/crud_api_tests.js +++ b/test/functional/crud_api_tests.js @@ -830,7 +830,7 @@ describe('CRUD API', function() { test.equal(null, err); // Delete all items with no selector - db.collection('t6_1').deleteMany(function(err) { + db.collection('t6_1').deleteMany({}, function(err) { test.equal(null, err); client.close(); diff --git a/test/functional/index_tests.js b/test/functional/index_tests.js index 6ad31e13ce..bf4f0bbea2 100644 --- a/test/functional/index_tests.js +++ b/test/functional/index_tests.js @@ -271,6 +271,8 @@ describe('Indexes', function() { }); /** + * TODO: This test is non-deterministic, and I don't think it is doing + * what it thinks it is doing. The try-catch statement does nothing. * @ignore */ it('shouldThrowErrorOnAttemptingSafeCreateIndexWithNoCallback', { @@ -302,6 +304,8 @@ describe('Indexes', function() { }); /** + * TODO: This test is non-deterministic, and I don't think it is doing + * what it thinks it is doing. The try-catch statement does nothing. * @ignore */ it('shouldThrowErrorOnAttemptingSafeEnsureIndexWithNoCallback', { diff --git a/test/functional/operation_generators_example_tests.js b/test/functional/operation_generators_example_tests.js index ddd2d5ea67..c432c9d555 100644 --- a/test/functional/operation_generators_example_tests.js +++ b/test/functional/operation_generators_example_tests.js @@ -1844,17 +1844,15 @@ describe('Operation (Generators)', function() { // BEGIN var collection = db.collection('simple_document_insert_collection_no_safe_with_generators'); // Insert a single document - collection.insertOne({ hello: 'world_no_safe' }); + yield collection.insertOne({ hello: 'world_no_safe' }); // Wait for a second before finishing up, to ensure we have written the item to disk - setTimeout(function() { - return co(function*() { - // Fetch the document - var item = yield collection.findOne({ hello: 'world_no_safe' }); - test.equal('world_no_safe', item.hello); - client.close(); - }); - }, 100); + return co(function*() { + // Fetch the document + var item = yield collection.findOne({ hello: 'world_no_safe' }); + test.equal('world_no_safe', item.hello); + client.close(); + }); }); // END } @@ -2477,17 +2475,15 @@ describe('Operation (Generators)', function() { // Fetch the collection var collection = db.collection('save_a_simple_document_with_generators'); // Save a document with no safe option - collection.save({ hello: 'world' }); + yield collection.save({ hello: 'world' }); // Wait for a second - setTimeout(function() { - return co(function*() { - // Find the saved document - var item = yield collection.findOne({ hello: 'world' }); - test.equal('world', item.hello); - client.close(); - }); - }, 2000); + return co(function*() { + // Find the saved document + var item = yield collection.findOne({ hello: 'world' }); + test.equal('world', item && item.hello); + client.close(); + }); }); // END } diff --git a/test/functional/session_leak_test.js b/test/functional/session_leak_test.js new file mode 100644 index 0000000000..3224152f71 --- /dev/null +++ b/test/functional/session_leak_test.js @@ -0,0 +1,51 @@ +'use strict'; + +const expect = require('chai').expect; +const sinon = require('sinon'); +const core = require('mongodb-core'); +const Server = core.Server; +const ReplSet = core.ReplSet; +const Mongos = core.Mongos; +const ServerSessionPool = core.Sessions.ServerSessionPool; + +(() => { + const sandbox = sinon.createSandbox(); + + beforeEach('Session Leak Before Each - setup session tracking', function() { + sandbox.spy(Server.prototype, 'endSessions'); + sandbox.spy(ReplSet.prototype, 'endSessions'); + sandbox.spy(Mongos.prototype, 'endSessions'); + sandbox.spy(ServerSessionPool.prototype, 'acquire'); + }); + + afterEach('Session Leak After Each - ensure no leaks', function() { + const poolCalls = ServerSessionPool.prototype.acquire.getCalls(); + const endCalls = Server.prototype.endSessions + .getCalls() + .concat(ReplSet.prototype.endSessions.getCalls()) + .concat(Mongos.prototype.endSessions.getCalls()); + + const sessions = new Set(); + poolCalls.forEach(call => sessions.add(call.returnValue.id)); + // const totalSessionCount = set.size; + + endCalls.forEach(call => { + const arg = call.args[0]; + const ids = Array.isArray(arg) ? arg : [arg]; + + ids.forEach(id => sessions.delete(id)); + }); + + const leakedSessionCount = sessions.size; + try { + expect( + leakedSessionCount, + `test is leaking ${leakedSessionCount} sessions, when it should be leaking 0` + ).to.equal(0); + } catch (e) { + this.test.error(e); + } + }); + + afterEach('Session Leak After Each - restore sandbox', () => sandbox.restore()); +})();