Skip to content

Commit

Permalink
fix(sessions): transfer control of session management to session pool
Browse files Browse the repository at this point in the history
Part of HELP-5834
  • Loading branch information
daprahamian committed Feb 20, 2018
1 parent 46e14d1 commit 6dde985
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 54 deletions.
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"mocha": true
},
"globals": {
"Promise": true
"Promise": true,
"Set": true
},
"parserOptions": {
"ecmaVersion": 2017
Expand Down
16 changes: 1 addition & 15 deletions lib/mongo_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,6 @@ MongoClient.prototype.close = function(force, callback) {
// Remove listeners after emit
self.removeAllListeners('close');

// If we have sessions, we want to send a single `endSessions` command for them,
// and then individually clean them up. They will be removed from the internal state
// when they emit their `ended` events.
if (this.s.sessions.length) {
this.topology.endSessions(this.s.sessions);
this.s.sessions.forEach(session => session.endSession({ skipCommand: true }));
}

// Callback after next event loop tick
if (typeof callback === 'function')
return process.nextTick(function() {
Expand Down Expand Up @@ -508,13 +500,7 @@ MongoClient.prototype.startSession = function(options) {
throw new MongoError('Current topology does not support sessions');
}

const session = this.topology.startSession(options);
session.once('ended', () => {
this.s.sessions = this.s.sessions.filter(s => s.equals(session));
});

this.s.sessions.push(session);
return session;
return this.topology.startSession(options);
};

var mergeOptions = function(target, source, flatten) {
Expand Down
17 changes: 2 additions & 15 deletions lib/topologies/replset.js
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,9 @@ class ReplSet extends TopologyBase {
}

close(forceClosed) {
var self = this;
// Call destroy on the topology
this.s.coreTopology.destroy({
force: typeof forceClosed === 'boolean' ? forceClosed : false
});

// We need to wash out all stored processes
if (forceClosed === true) {
this.s.storeOptions.force = forceClosed;
this.s.store.flush();
}
super.close(forceClosed);

var events = ['timeout', 'error', 'close', 'joined', 'left'];
events.forEach(function(e) {
self.removeAllListeners(e);
});
['timeout', 'error', 'close', 'joined', 'left'].forEach(e => this.removeAllListeners(e));
}
}

Expand Down
15 changes: 11 additions & 4 deletions lib/topologies/topology_base.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ const EventEmitter = require('events'),
MongoError = require('mongodb-core').MongoError,
f = require('util').format,
os = require('os'),
translateReadPreference = require('../utils').translateReadPreference,
ClientSession = require('mongodb-core').Sessions.ClientSession;

translateReadPreference = require('../utils').translateReadPreference;
// The store of ops
var Store = function(topology, storeOptions) {
var self = this;
Expand Down Expand Up @@ -290,7 +288,12 @@ class TopologyBase extends EventEmitter {
}

startSession(options) {
return new ClientSession(this, this.s.sessionPool, options);
if (this.s.sessionPool) {
return this.s.sessionPool.startSession(options);
}

// TODO: Should we have a better error message here?
throw new Error('Sessions are not supported');
}

endSessions(sessions, callback) {
Expand Down Expand Up @@ -388,6 +391,10 @@ class TopologyBase extends EventEmitter {
}

close(forceClosed) {
if (this.s.sessionPool) {
this.s.sessionPool.endAllSessions();
}

this.s.coreTopology.destroy({
force: typeof forceClosed === 'boolean' ? forceClosed : false
});
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"mongodb-test-runner": "^1.1.18",
"prettier": "^1.5.3",
"semver": "5.4.1",
"sinon": "^4.3.0",
"worker-farm": "^1.5.0"
},
"author": "Christian Kvalheim",
Expand Down
8 changes: 8 additions & 0 deletions test/functional/apm_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,10 @@ describe('APM', function() {
// Get the result
result = results.successes.shift();

if (result.commandName === 'endSessions') {
result = results.successes.shift();
}

// Validate the test
expect(commandName).to.equal(result.commandName);
// Do we have a getMore command
Expand All @@ -1054,6 +1058,10 @@ describe('APM', function() {
results.failures = filterSessionsCommands(results.failures);
result = results.failures.shift();

if (result.commandName === 'endSessions') {
result = results.failures.shift();
}

// Validate the test
expect(commandName).to.equal(result.commandName);
}
Expand Down
2 changes: 1 addition & 1 deletion test/functional/crud_api_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ describe('CRUD API', function() {
test.equal(null, err);

// Delete all items with no selector
db.collection('t6_1').deleteMany(function(err) {
db.collection('t6_1').deleteMany({}, function(err) {
test.equal(null, err);

client.close();
Expand Down
4 changes: 4 additions & 0 deletions test/functional/index_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ describe('Indexes', function() {
});

/**
* TODO: This test is non-deterministic, and I don't think it is doing
* what it thinks it is doing. The try-catch statement does nothing.
* @ignore
*/
it('shouldThrowErrorOnAttemptingSafeCreateIndexWithNoCallback', {
Expand Down Expand Up @@ -302,6 +304,8 @@ describe('Indexes', function() {
});

/**
* TODO: This test is non-deterministic, and I don't think it is doing
* what it thinks it is doing. The try-catch statement does nothing.
* @ignore
*/
it('shouldThrowErrorOnAttemptingSafeEnsureIndexWithNoCallback', {
Expand Down
32 changes: 14 additions & 18 deletions test/functional/operation_generators_example_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1844,17 +1844,15 @@ describe('Operation (Generators)', function() {
// BEGIN
var collection = db.collection('simple_document_insert_collection_no_safe_with_generators');
// Insert a single document
collection.insertOne({ hello: 'world_no_safe' });
yield collection.insertOne({ hello: 'world_no_safe' });

// Wait for a second before finishing up, to ensure we have written the item to disk
setTimeout(function() {
return co(function*() {
// Fetch the document
var item = yield collection.findOne({ hello: 'world_no_safe' });
test.equal('world_no_safe', item.hello);
client.close();
});
}, 100);
return co(function*() {
// Fetch the document
var item = yield collection.findOne({ hello: 'world_no_safe' });
test.equal('world_no_safe', item.hello);
client.close();
});
});
// END
}
Expand Down Expand Up @@ -2477,17 +2475,15 @@ describe('Operation (Generators)', function() {
// Fetch the collection
var collection = db.collection('save_a_simple_document_with_generators');
// Save a document with no safe option
collection.save({ hello: 'world' });
yield collection.save({ hello: 'world' });

// Wait for a second
setTimeout(function() {
return co(function*() {
// Find the saved document
var item = yield collection.findOne({ hello: 'world' });
test.equal('world', item.hello);
client.close();
});
}, 2000);
return co(function*() {
// Find the saved document
var item = yield collection.findOne({ hello: 'world' });
test.equal('world', item && item.hello);
client.close();
});
});
// END
}
Expand Down
51 changes: 51 additions & 0 deletions test/functional/session_leak_test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
'use strict';

const expect = require('chai').expect;
const sinon = require('sinon');
const core = require('mongodb-core');
const Server = core.Server;
const ReplSet = core.ReplSet;
const Mongos = core.Mongos;
const ServerSessionPool = core.Sessions.ServerSessionPool;

(() => {
const sandbox = sinon.createSandbox();

beforeEach('Session Leak Before Each - setup session tracking', function() {
sandbox.spy(Server.prototype, 'endSessions');
sandbox.spy(ReplSet.prototype, 'endSessions');
sandbox.spy(Mongos.prototype, 'endSessions');
sandbox.spy(ServerSessionPool.prototype, 'acquire');
});

afterEach('Session Leak After Each - ensure no leaks', function() {
const poolCalls = ServerSessionPool.prototype.acquire.getCalls();
const endCalls = Server.prototype.endSessions
.getCalls()
.concat(ReplSet.prototype.endSessions.getCalls())
.concat(Mongos.prototype.endSessions.getCalls());

const sessions = new Set();
poolCalls.forEach(call => sessions.add(call.returnValue.id));
// const totalSessionCount = set.size;

endCalls.forEach(call => {
const arg = call.args[0];
const ids = Array.isArray(arg) ? arg : [arg];

ids.forEach(id => sessions.delete(id));
});

const leakedSessionCount = sessions.size;
try {
expect(
leakedSessionCount,
`test is leaking ${leakedSessionCount} sessions, when it should be leaking 0`
).to.equal(0);
} catch (e) {
this.test.error(e);
}
});

afterEach('Session Leak After Each - restore sandbox', () => sandbox.restore());
})();

0 comments on commit 6dde985

Please sign in to comment.