From 57f158fe27c38807e623ff42c4a5ded83c42fd6e Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Sun, 19 Jan 2020 10:41:50 -0500 Subject: [PATCH] fix(topology): always emit SDAM unrecoverable errors The SDAM changes introduced for the "Connections survive primary stepdown" project were erroneously implemented in this driver as guarding whether a server should be transitioned to an `Unknown` state, where the real intent is around whether a connection pool is cleared or not. NODE-2408 --- lib/cmap/connection.js | 2 +- lib/core/error.js | 11 ++--- lib/core/sdam/server.js | 10 ++--- lib/core/sdam/topology.js | 11 +++-- test/unit/sdam/topology.test.js | 80 +++++++++++++++++++++++++++++++++ 5 files changed, 94 insertions(+), 20 deletions(-) diff --git a/lib/cmap/connection.js b/lib/cmap/connection.js index fbb4a7a5be..a93d848903 100644 --- a/lib/cmap/connection.js +++ b/lib/cmap/connection.js @@ -248,7 +248,7 @@ function messageHandler(conn) { return; } - if (document.ok === 0 || document.$err || document.errmsg) { + if (document.ok === 0 || document.$err || document.errmsg || document.code) { callback(new MongoError(document)); return; } diff --git a/lib/core/error.js b/lib/core/error.js index fd1eaf7b4c..1854932017 100644 --- a/lib/core/error.js +++ b/lib/core/error.js @@ -1,7 +1,6 @@ 'use strict'; const mongoErrorContextSymbol = Symbol('mongoErrorContextSymbol'); -const maxWireVersion = require('./utils').maxWireVersion; /** * Creates a new MongoError @@ -237,9 +236,8 @@ function isNodeShuttingDownError(err) { * @ignore * @see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#not-master-and-node-is-recovering * @param {MongoError|Error} error - * @param {Server} server */ -function isSDAMUnrecoverableError(error, server) { +function isSDAMUnrecoverableError(error) { // NOTE: null check is here for a strictly pre-CMAP world, a timeout or // close event are considered unrecoverable if (error instanceof MongoParseError || error == null) { @@ -247,10 +245,6 @@ function isSDAMUnrecoverableError(error, server) { } if (isRecoveringError(error) || isNotMasterError(error)) { - if (maxWireVersion(server) >= 8 && !isNodeShuttingDownError(error)) { - return false; - } - return true; } @@ -266,5 +260,6 @@ module.exports = { MongoWriteConcernError, mongoErrorContextSymbol, isRetryableError, - isSDAMUnrecoverableError + isSDAMUnrecoverableError, + isNodeShuttingDownError }; diff --git a/lib/core/sdam/server.js b/lib/core/sdam/server.js index 0da1031b46..9e6903361e 100644 --- a/lib/core/sdam/server.js +++ b/lib/core/sdam/server.js @@ -286,7 +286,7 @@ class Server extends EventEmitter { options.session.serverSession.isDirty = true; } - if (isSDAMUnrecoverableError(err, this)) { + if (isSDAMUnrecoverableError(err)) { this.emit('error', err); } } @@ -319,7 +319,7 @@ class Server extends EventEmitter { options.session.serverSession.isDirty = true; } - if (isSDAMUnrecoverableError(err, this)) { + if (isSDAMUnrecoverableError(err)) { this.emit('error', err); } } @@ -352,7 +352,7 @@ class Server extends EventEmitter { options.session.serverSession.isDirty = true; } - if (isSDAMUnrecoverableError(err, this)) { + if (isSDAMUnrecoverableError(err)) { this.emit('error', err); } } @@ -382,7 +382,7 @@ class Server extends EventEmitter { if (err) return cb(err); conn.killCursors(ns, cursorState, (err, result) => { - if (err && isSDAMUnrecoverableError(err, this)) { + if (err && isSDAMUnrecoverableError(err)) { this.emit('error', err); } @@ -489,7 +489,7 @@ function executeWriteOperation(args, options, callback) { options.session.serverSession.isDirty = true; } - if (isSDAMUnrecoverableError(err, server)) { + if (isSDAMUnrecoverableError(err)) { server.emit('error', err); } } diff --git a/lib/core/sdam/topology.js b/lib/core/sdam/topology.js index f15e2b6352..ed5343c73c 100644 --- a/lib/core/sdam/topology.js +++ b/lib/core/sdam/topology.js @@ -14,7 +14,8 @@ const deprecate = require('util').deprecate; const BSON = require('../connection/utils').retrieveBSON(); const createCompressionInfo = require('../topologies/shared').createCompressionInfo; const isRetryableError = require('../error').isRetryableError; -const isSDAMUnrecoverableError = require('../error').isSDAMUnrecoverableError; +const isNodeShuttingDownError = require('../error').isNodeShuttingDownError; +const maxWireVersion = require('../utils').maxWireVersion; const ClientSession = require('../sessions').ClientSession; const MongoError = require('../error').MongoError; const resolveClusterTime = require('../topologies/shared').resolveClusterTime; @@ -881,14 +882,12 @@ function serverErrorEventHandler(server, topology) { return; } - if (isSDAMUnrecoverableError(err, server)) { - // NOTE: this must be commented out until we switch to the new CMAP pool because - // we presently _always_ clear the pool on error. - resetServerState(server, err, { clearPool: true }); + if (maxWireVersion(server) >= 8 && !isNodeShuttingDownError(err)) { + resetServerState(server, err); return; } - resetServerState(server, err); + resetServerState(server, err, { clearPool: true }); }; } diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index e975551162..4a11edd53c 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -113,4 +113,84 @@ describe('Topology (unit)', function() { }); }); }); + + describe('error handling', function() { + let mockServer; + beforeEach(() => mock.createServer().then(server => (mockServer = server))); + afterEach(() => mock.cleanup()); + + it('should set server to unknown and reset pool on `node is recovering` error', function(done) { + mockServer.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(Object.assign({}, mock.DEFAULT_ISMASTER, { maxWireVersion: 9 })); + } else if (doc.insert) { + request.reply({ ok: 0, message: 'node is recovering', code: 11600 }); + } else { + request.reply({ ok: 1 }); + } + }); + + const topology = new Topology(mockServer.uri()); + topology.connect(err => { + expect(err).to.not.exist; + + topology.selectServer('primary', (err, server) => { + expect(err).to.not.exist; + this.defer(() => topology.close()); + + let serverError; + server.on('error', err => (serverError = err)); + + let poolCleared = false; + topology.on('connectionPoolCleared', () => (poolCleared = true)); + + server.command('test.test', { insert: { a: 42 } }, (err, result) => { + expect(result).to.not.exist; + expect(err).to.exist; + expect(err).to.eql(serverError); + expect(poolCleared).to.be.true; + done(); + }); + }); + }); + }); + + it('should set server to unknown and NOT reset pool on stepdown errors', function(done) { + mockServer.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(Object.assign({}, mock.DEFAULT_ISMASTER, { maxWireVersion: 9 })); + } else if (doc.insert) { + request.reply({ ok: 0, message: 'not master' }); + } else { + request.reply({ ok: 1 }); + } + }); + + const topology = new Topology(mockServer.uri()); + topology.connect(err => { + expect(err).to.not.exist; + + topology.selectServer('primary', (err, server) => { + expect(err).to.not.exist; + this.defer(() => topology.close()); + + let serverError; + server.on('error', err => (serverError = err)); + + let poolCleared = false; + topology.on('connectionPoolCleared', () => (poolCleared = true)); + + server.command('test.test', { insert: { a: 42 } }, (err, result) => { + expect(result).to.not.exist; + expect(err).to.exist; + expect(err).to.eql(serverError); + expect(poolCleared).to.be.false; + done(); + }); + }); + }); + }); + }); });