From 3ec49e5a9c34474a0bc81bf43243e33b0cb5fada Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Mon, 2 Dec 2019 15:35:35 -0500 Subject: [PATCH] fix(pool): flush workItems after next tick to avoid dupe selection In the event that a timeout occurs and a server instance is drained, the work items were cleared before the server could signal it needed to be cycled. By waiting until the next tick, we can ensure the event makes it to the `Topology`, and the server is removed from consideration for server selection. NODE-2350 --- lib/core/connection/connection.js | 14 ++++++++++++++ lib/core/connection/pool.js | 18 +++++++++++------- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/lib/core/connection/connection.js b/lib/core/connection/connection.js index 0bdc7c5a60..c1c5a3b399 100644 --- a/lib/core/connection/connection.js +++ b/lib/core/connection/connection.js @@ -194,6 +194,20 @@ class Connection extends EventEmitter { this.socket.unref(); } + /** + * Flush all work Items on this connection + * + * @param {*} err The error to propagate to the flushed work items + */ + flush(err) { + while (this.workItems.length > 0) { + const workItem = this.workItems.shift(); + if (workItem.cb) { + workItem.cb(err); + } + } + } + /** * Destroy connection * @method diff --git a/lib/core/connection/pool.js b/lib/core/connection/pool.js index 98a9aad947..fb067f5cba 100644 --- a/lib/core/connection/pool.js +++ b/lib/core/connection/pool.js @@ -251,20 +251,24 @@ function connectionFailureHandler(pool, event, err, conn) { // Remove the connection removeConnection(pool, conn); - // Flush all work Items on this connection - while (conn.workItems.length > 0) { - const workItem = conn.workItems.shift(); - if (workItem.cb) workItem.cb(err); - } - - if (pool.state !== DRAINING && pool.options.legacyCompatMode === false) { + if ( + pool.state !== DRAINING && + pool.state !== DESTROYED && + pool.options.legacyCompatMode === false + ) { // since an error/close/timeout means pool invalidation in a // pre-CMAP world, we will issue a custom `drain` event here to // signal that the server should be recycled stateTransition(pool, DRAINING); pool.emit('drain', err); + + // wait to flush work items so this server isn't selected again immediately + process.nextTick(() => conn.flush(err)); return; } + + // flush remaining work items + conn.flush(err); } // Did we catch a timeout, increment the numberOfConsecutiveTimeouts