Skip to content

Commit

Permalink
fix(topology): always emit SDAM unrecoverable errors
Browse files Browse the repository at this point in the history
The SDAM changes introduced for the "Connections survive primary
stepdown" project were erroneously implemented in this driver as
guarding whether a server should be transitioned to an `Unknown`
state, where the real intent is around whether a connection pool
is cleared or not.

NODE-2408
  • Loading branch information
mbroadst committed Jan 19, 2020
1 parent d0bfd8a commit 57f158f
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 20 deletions.
2 changes: 1 addition & 1 deletion lib/cmap/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ function messageHandler(conn) {
return;
}

if (document.ok === 0 || document.$err || document.errmsg) {
if (document.ok === 0 || document.$err || document.errmsg || document.code) {
callback(new MongoError(document));
return;
}
Expand Down
11 changes: 3 additions & 8 deletions lib/core/error.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict';

const mongoErrorContextSymbol = Symbol('mongoErrorContextSymbol');
const maxWireVersion = require('./utils').maxWireVersion;

/**
* Creates a new MongoError
Expand Down Expand Up @@ -237,20 +236,15 @@ function isNodeShuttingDownError(err) {
* @ignore
* @see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#not-master-and-node-is-recovering
* @param {MongoError|Error} error
* @param {Server} server
*/
function isSDAMUnrecoverableError(error, server) {
function isSDAMUnrecoverableError(error) {
// NOTE: null check is here for a strictly pre-CMAP world, a timeout or
// close event are considered unrecoverable
if (error instanceof MongoParseError || error == null) {
return true;
}

if (isRecoveringError(error) || isNotMasterError(error)) {
if (maxWireVersion(server) >= 8 && !isNodeShuttingDownError(error)) {
return false;
}

return true;
}

Expand All @@ -266,5 +260,6 @@ module.exports = {
MongoWriteConcernError,
mongoErrorContextSymbol,
isRetryableError,
isSDAMUnrecoverableError
isSDAMUnrecoverableError,
isNodeShuttingDownError
};
10 changes: 5 additions & 5 deletions lib/core/sdam/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class Server extends EventEmitter {
options.session.serverSession.isDirty = true;
}

if (isSDAMUnrecoverableError(err, this)) {
if (isSDAMUnrecoverableError(err)) {
this.emit('error', err);
}
}
Expand Down Expand Up @@ -319,7 +319,7 @@ class Server extends EventEmitter {
options.session.serverSession.isDirty = true;
}

if (isSDAMUnrecoverableError(err, this)) {
if (isSDAMUnrecoverableError(err)) {
this.emit('error', err);
}
}
Expand Down Expand Up @@ -352,7 +352,7 @@ class Server extends EventEmitter {
options.session.serverSession.isDirty = true;
}

if (isSDAMUnrecoverableError(err, this)) {
if (isSDAMUnrecoverableError(err)) {
this.emit('error', err);
}
}
Expand Down Expand Up @@ -382,7 +382,7 @@ class Server extends EventEmitter {
if (err) return cb(err);

conn.killCursors(ns, cursorState, (err, result) => {
if (err && isSDAMUnrecoverableError(err, this)) {
if (err && isSDAMUnrecoverableError(err)) {
this.emit('error', err);
}

Expand Down Expand Up @@ -489,7 +489,7 @@ function executeWriteOperation(args, options, callback) {
options.session.serverSession.isDirty = true;
}

if (isSDAMUnrecoverableError(err, server)) {
if (isSDAMUnrecoverableError(err)) {
server.emit('error', err);
}
}
Expand Down
11 changes: 5 additions & 6 deletions lib/core/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ const deprecate = require('util').deprecate;
const BSON = require('../connection/utils').retrieveBSON();
const createCompressionInfo = require('../topologies/shared').createCompressionInfo;
const isRetryableError = require('../error').isRetryableError;
const isSDAMUnrecoverableError = require('../error').isSDAMUnrecoverableError;
const isNodeShuttingDownError = require('../error').isNodeShuttingDownError;
const maxWireVersion = require('../utils').maxWireVersion;
const ClientSession = require('../sessions').ClientSession;
const MongoError = require('../error').MongoError;
const resolveClusterTime = require('../topologies/shared').resolveClusterTime;
Expand Down Expand Up @@ -881,14 +882,12 @@ function serverErrorEventHandler(server, topology) {
return;
}

if (isSDAMUnrecoverableError(err, server)) {
// NOTE: this must be commented out until we switch to the new CMAP pool because
// we presently _always_ clear the pool on error.
resetServerState(server, err, { clearPool: true });
if (maxWireVersion(server) >= 8 && !isNodeShuttingDownError(err)) {
resetServerState(server, err);
return;
}

resetServerState(server, err);
resetServerState(server, err, { clearPool: true });
};
}

Expand Down
80 changes: 80 additions & 0 deletions test/unit/sdam/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,84 @@ describe('Topology (unit)', function() {
});
});
});

describe('error handling', function() {
let mockServer;
beforeEach(() => mock.createServer().then(server => (mockServer = server)));
afterEach(() => mock.cleanup());

it('should set server to unknown and reset pool on `node is recovering` error', function(done) {
mockServer.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(Object.assign({}, mock.DEFAULT_ISMASTER, { maxWireVersion: 9 }));
} else if (doc.insert) {
request.reply({ ok: 0, message: 'node is recovering', code: 11600 });
} else {
request.reply({ ok: 1 });
}
});

const topology = new Topology(mockServer.uri());
topology.connect(err => {
expect(err).to.not.exist;

topology.selectServer('primary', (err, server) => {
expect(err).to.not.exist;
this.defer(() => topology.close());

let serverError;
server.on('error', err => (serverError = err));

let poolCleared = false;
topology.on('connectionPoolCleared', () => (poolCleared = true));

server.command('test.test', { insert: { a: 42 } }, (err, result) => {
expect(result).to.not.exist;
expect(err).to.exist;
expect(err).to.eql(serverError);
expect(poolCleared).to.be.true;
done();
});
});
});
});

it('should set server to unknown and NOT reset pool on stepdown errors', function(done) {
mockServer.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(Object.assign({}, mock.DEFAULT_ISMASTER, { maxWireVersion: 9 }));
} else if (doc.insert) {
request.reply({ ok: 0, message: 'not master' });
} else {
request.reply({ ok: 1 });
}
});

const topology = new Topology(mockServer.uri());
topology.connect(err => {
expect(err).to.not.exist;

topology.selectServer('primary', (err, server) => {
expect(err).to.not.exist;
this.defer(() => topology.close());

let serverError;
server.on('error', err => (serverError = err));

let poolCleared = false;
topology.on('connectionPoolCleared', () => (poolCleared = true));

server.command('test.test', { insert: { a: 42 } }, (err, result) => {
expect(result).to.not.exist;
expect(err).to.exist;
expect(err).to.eql(serverError);
expect(poolCleared).to.be.false;
done();
});
});
});
});
});
});

0 comments on commit 57f158f

Please sign in to comment.