diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index fedf731132..4597db2c5f 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -428,7 +428,7 @@ function makeConnection(options: MakeConnectionOptions, _callback: Callback { this[kDelayedTimeoutId] = null; } + const socketTimeoutMS = this[kStream].timeout ?? 0; + this[kStream].setTimeout(0); + // always emit the message, in case we are streaming this.emit('message', message); let operationDescription = this[kQueue].get(message.responseTo); @@ -410,8 +413,7 @@ export class Connection extends TypedEventEmitter { // back in the queue with the correct requestId and will resolve not being able // to find the next one via the responseTo of the next streaming hello. this[kQueue].set(message.requestId, operationDescription); - } else if (operationDescription.socketTimeoutOverride) { - this[kStream].setTimeout(this.socketTimeoutMS); + this[kStream].setTimeout(socketTimeoutMS); } try { @@ -707,8 +709,9 @@ function write( } if (typeof options.socketTimeoutMS === 'number') { - operationDescription.socketTimeoutOverride = true; conn[kStream].setTimeout(options.socketTimeoutMS); + } else if (conn.socketTimeoutMS !== 0) { + conn[kStream].setTimeout(conn.socketTimeoutMS); } // if command monitoring is enabled we need to modify the callback here diff --git a/src/cmap/message_stream.ts b/src/cmap/message_stream.ts index f0c4ad639d..91f436a9b7 100644 --- a/src/cmap/message_stream.ts +++ b/src/cmap/message_stream.ts @@ -36,7 +36,6 @@ export interface OperationDescription extends BSONSerializeOptions { raw: boolean; requestId: number; session?: ClientSession; - socketTimeoutOverride?: boolean; agreedCompressor?: CompressorName; zlibCompressionLevel?: number; $clusterTime?: Document; diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.ts b/test/integration/connection-monitoring-and-pooling/connection.test.ts index 1ab4bd366d..226b00ab41 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection.test.ts @@ -5,7 +5,7 @@ import { connect } from '../../../src/cmap/connect'; import { Connection } from '../../../src/cmap/connection'; import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; import { Topology } from '../../../src/sdam/topology'; -import { HostAddress, ns } from '../../../src/utils'; +import { ns } from '../../../src/utils'; import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration'; import { assert as test, setupDatabase } from '../shared'; @@ -74,28 +74,6 @@ describe('Connection', function () { } }); - it.skip('should support socket timeouts', { - // FIXME: NODE-2941 - metadata: { - requires: { - os: '!win32' // 240.0.0.1 doesnt work for windows - } - }, - test: function (done) { - const connectOptions = { - hostAddress: new HostAddress('240.0.0.1'), - connectionType: Connection, - connectionTimeout: 500 - }; - - connect(connectOptions, err => { - expect(err).to.exist; - expect(err).to.match(/timed out/); - done(); - }); - } - }); - it('should support calling back multiple times on exhaust commands', { metadata: { requires: { apiVersion: false, mongodb: '>=4.2.0', topology: ['single'] } diff --git a/test/unit/cmap/connect.test.js b/test/unit/cmap/connect.test.js deleted file mode 100644 index ce8e0e65b5..0000000000 --- a/test/unit/cmap/connect.test.js +++ /dev/null @@ -1,165 +0,0 @@ -'use strict'; - -const mock = require('../../tools/mongodb-mock/index'); -const { expect } = require('chai'); -const EventEmitter = require('events'); -const { setTimeout } = require('timers'); - -const { - connect, - prepareHandshakeDocument: prepareHandshakeDocumentCb -} = require('../../../src/cmap/connect'); -const { MongoCredentials } = require('../../../src/cmap/auth/mongo_credentials'); -const { genClusterTime } = require('../../tools/common'); -const { MongoNetworkError } = require('../../../src/error'); -const { HostAddress, isHello } = require('../../../src/utils'); -const { LEGACY_HELLO_COMMAND } = require('../../../src/constants'); -const { promisify } = require('util'); - -describe('Connect Tests', function () { - const test = {}; - beforeEach(() => { - return mock.createServer().then(mockServer => { - test.server = mockServer; - test.connectOptions = { - hostAddress: test.server.hostAddress(), - credentials: new MongoCredentials({ - username: 'testUser', - password: 'pencil', - source: 'admin', - mechanism: 'PLAIN' - }) - }; - }); - }); - - afterEach(() => mock.cleanup()); - it('should auth against a non-arbiter', function (done) { - const whatHappened = {}; - - test.server.setMessageHandler(request => { - const doc = request.document; - const $clusterTime = genClusterTime(Date.now()); - - if (isHello(doc)) { - whatHappened[LEGACY_HELLO_COMMAND] = true; - request.reply( - Object.assign({}, mock.HELLO, { - $clusterTime - }) - ); - } else if (doc.saslStart) { - whatHappened.saslStart = true; - request.reply({ ok: 1 }); - } - }); - - connect(test.connectOptions, err => { - try { - expect(whatHappened).to.have.property(LEGACY_HELLO_COMMAND, true); - expect(whatHappened).to.have.property('saslStart', true); - } catch (_err) { - err = _err; - } - - done(err); - }); - }); - - it('should not auth against an arbiter', function (done) { - const whatHappened = {}; - test.server.setMessageHandler(request => { - const doc = request.document; - const $clusterTime = genClusterTime(Date.now()); - if (isHello(doc)) { - whatHappened[LEGACY_HELLO_COMMAND] = true; - request.reply( - Object.assign({}, mock.HELLO, { - $clusterTime, - arbiterOnly: true - }) - ); - } else if (doc.saslStart) { - whatHappened.saslStart = true; - request.reply({ ok: 0 }); - } - }); - - connect(test.connectOptions, err => { - try { - expect(whatHappened).to.have.property(LEGACY_HELLO_COMMAND, true); - expect(whatHappened).to.not.have.property('saslStart'); - } catch (_err) { - err = _err; - } - - done(err); - }); - }); - - it('should emit `MongoNetworkError` for network errors', function (done) { - connect({ hostAddress: new HostAddress('non-existent:27018') }, err => { - expect(err).to.be.instanceOf(MongoNetworkError); - done(); - }); - }); - - it.skip('should allow a cancellaton token', function (done) { - const cancellationToken = new EventEmitter(); - setTimeout(() => cancellationToken.emit('cancel'), 500); - // set no response handler for mock server, effecively blackhole requests - - connect({ hostAddress: new HostAddress('240.0.0.1'), cancellationToken }, (err, conn) => { - expect(err).to.exist; - expect(err).to.match(/connection establishment was cancelled/); - expect(conn).to.not.exist; - done(); - }); - }).skipReason = 'TODO(NODE-2941): stop using 240.0.0.1 in tests'; - - context('prepareHandshakeDocument', () => { - const prepareHandshakeDocument = promisify(prepareHandshakeDocumentCb); - - context('loadBalanced option', () => { - context('when loadBalanced is not set as an option', () => { - it('does not set loadBalanced on the handshake document', async () => { - const options = {}; - const authContext = { - connection: {}, - options - }; - const handshakeDocument = await prepareHandshakeDocument(authContext); - expect(handshakeDocument).not.to.have.property('loadBalanced'); - }); - }); - - context('when loadBalanced is set to false', () => { - it('does not set loadBalanced on the handshake document', async () => { - const options = { - loadBalanced: false - }; - const authContext = { - connection: {}, - options - }; - const handshakeDocument = await prepareHandshakeDocument(authContext); - expect(handshakeDocument).not.to.have.property('loadBalanced'); - }); - }); - - context('when loadBalanced is set to true', () => { - it('does set loadBalanced on the handshake document', async () => { - const options = { - loadBalanced: true - }; - const authContext = { - connection: {}, - options - }; - const handshakeDocument = await prepareHandshakeDocument(authContext); - expect(handshakeDocument).to.have.property('loadBalanced', true); - }); - }); - }); - }); -}); diff --git a/test/unit/cmap/connect.test.ts b/test/unit/cmap/connect.test.ts new file mode 100644 index 0000000000..15a05751ef --- /dev/null +++ b/test/unit/cmap/connect.test.ts @@ -0,0 +1,276 @@ +import { expect } from 'chai'; +import { promisify } from 'util'; + +import { CancellationToken } from '../../../src'; +import { MongoCredentials } from '../../../src/cmap/auth/mongo_credentials'; +import { + connect, + prepareHandshakeDocument as prepareHandshakeDocumentCb +} from '../../../src/cmap/connect'; +import { Connection, ConnectionOptions } from '../../../src/cmap/connection'; +import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; +import { MongoNetworkError } from '../../../src/error'; +import { ClientMetadata, HostAddress, isHello } from '../../../src/utils'; +import { genClusterTime } from '../../tools/common'; +import * as mock from '../../tools/mongodb-mock/index'; + +const CONNECT_DEFAULTS = { + id: 1, + tls: false, + generation: 1, + monitorCommands: false, + metadata: {} as ClientMetadata, + loadBalanced: false +}; + +describe('Connect Tests', function () { + context('when PLAIN auth enabled', () => { + const test: { + server?: any; + connectOptions?: ConnectionOptions; + } = {}; + + beforeEach(async () => { + const mockServer = await mock.createServer(); + test.server = mockServer; + test.connectOptions = { + ...CONNECT_DEFAULTS, + hostAddress: test.server.hostAddress() as HostAddress, + credentials: new MongoCredentials({ + username: 'testUser', + password: 'pencil', + source: 'admin', + mechanism: 'PLAIN', + mechanismProperties: {} + }) + }; + }); + + afterEach(() => mock.cleanup()); + + it('should auth against a non-arbiter', function (done) { + const whatHappened = {}; + + test.server.setMessageHandler(request => { + const doc = request.document; + const $clusterTime = genClusterTime(Date.now()); + + if (isHello(doc)) { + whatHappened[LEGACY_HELLO_COMMAND] = true; + request.reply( + Object.assign({}, mock.HELLO, { + $clusterTime + }) + ); + } else if (doc.saslStart) { + whatHappened.saslStart = true; + request.reply({ ok: 1 }); + } + }); + + connect(test.connectOptions, err => { + try { + expect(whatHappened).to.have.property(LEGACY_HELLO_COMMAND, true); + expect(whatHappened).to.have.property('saslStart', true); + } catch (_err) { + err = _err; + } + + done(err); + }); + }); + + it('should not auth against an arbiter', function (done) { + const whatHappened = {}; + test.server.setMessageHandler(request => { + const doc = request.document; + const $clusterTime = genClusterTime(Date.now()); + if (isHello(doc)) { + whatHappened[LEGACY_HELLO_COMMAND] = true; + request.reply( + Object.assign({}, mock.HELLO, { + $clusterTime, + arbiterOnly: true + }) + ); + } else if (doc.saslStart) { + whatHappened.saslStart = true; + request.reply({ ok: 0 }); + } + }); + + connect(test.connectOptions, err => { + try { + expect(whatHappened).to.have.property(LEGACY_HELLO_COMMAND, true); + expect(whatHappened).to.not.have.property('saslStart'); + } catch (_err) { + err = _err; + } + + done(err); + }); + }); + }); + + context('when creating a connection', () => { + let server; + let connectOptions; + let connection: Connection; + + beforeEach(async () => { + server = await mock.createServer(); + server.setMessageHandler(request => { + if (isHello(request.document)) { + request.reply(mock.HELLO); + } + }); + connectOptions = { + ...CONNECT_DEFAULTS, + hostAddress: server.hostAddress() as HostAddress, + socketTimeoutMS: 15000 + }; + + connection = await promisify(callback => + //@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence + connect(connectOptions, callback) + )(); + }); + + afterEach(async () => { + connection.destroy({ force: true }); + await mock.cleanup(); + }); + + it('creates a connection with an infinite timeout', async () => { + expect(connection.stream).to.have.property('timeout', 0); + }); + + it('connection instance has property socketTimeoutMS equal to the value passed in the connectOptions', async () => { + expect(connection).to.have.property('socketTimeoutMS', 15000); + }); + + context('when the provided cancellation token emits cancel', () => { + it('interrupts the connection with an error', async () => { + // set no response handler for mock server, effectively black hole requests + server.setMessageHandler(() => null); + + const cancellationToken = new CancellationToken(); + // Make sure the cancel listener is added before emitting cancel + cancellationToken.addListener('newListener', () => { + process.nextTick(() => { + cancellationToken.emit('cancel'); + }); + }); + + const error = await promisify(callback => + connect( + { + ...connectOptions, + // Ensure these timeouts do not fire first + socketTimeoutMS: 5000, + connectTimeoutMS: 5000, + cancellationToken + }, + //@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence + callback + ) + )().catch(error => error); + + expect(error, error.stack).to.match(/connection establishment was cancelled/); + }); + }); + + context('when connecting takes longer than connectTimeoutMS', () => { + it('interrupts the connection with an error', async () => { + // set no response handler for mock server, effectively black hole requests + server.setMessageHandler(() => null); + + const error = await promisify(callback => + //@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence + connect({ ...connectOptions, connectTimeoutMS: 5 }, callback) + )().catch(error => error); + + expect(error).to.match(/timed out/); + }); + }); + }); + + it('should emit `MongoNetworkError` for network errors', function (done) { + connect({ hostAddress: new HostAddress('non-existent:27018') }, err => { + expect(err).to.be.instanceOf(MongoNetworkError); + done(); + }); + }); + + context('prepareHandshakeDocument', () => { + const prepareHandshakeDocument = promisify(prepareHandshakeDocumentCb); + + context('when serverApi.version is present', () => { + const options = {}; + const authContext = { + connection: { serverApi: { version: '1' } }, + options + }; + + it('sets the hello parameter to true', async () => { + const handshakeDocument = await prepareHandshakeDocument(authContext); + expect(handshakeDocument).to.have.property('hello', true); + }); + }); + + context('when serverApi is not present', () => { + const options = {}; + const authContext = { + connection: {}, + options + }; + + it('sets the legacy hello parameter to true', async () => { + const handshakeDocument = await prepareHandshakeDocument(authContext); + expect(handshakeDocument).to.have.property(LEGACY_HELLO_COMMAND, true); + }); + }); + + context('loadBalanced option', () => { + context('when loadBalanced is not set as an option', () => { + it('does not set loadBalanced on the handshake document', async () => { + const options = {}; + const authContext = { + connection: {}, + options + }; + const handshakeDocument = await prepareHandshakeDocument(authContext); + expect(handshakeDocument).not.to.have.property('loadBalanced'); + }); + }); + + context('when loadBalanced is set to false', () => { + it('does not set loadBalanced on the handshake document', async () => { + const options = { + loadBalanced: false + }; + const authContext = { + connection: {}, + options + }; + const handshakeDocument = await prepareHandshakeDocument(authContext); + expect(handshakeDocument).not.to.have.property('loadBalanced'); + }); + }); + + context('when loadBalanced is set to true', () => { + it('does set loadBalanced on the handshake document', async () => { + const options = { + loadBalanced: true + }; + const authContext = { + connection: {}, + options + }; + const handshakeDocument = await prepareHandshakeDocument(authContext); + expect(handshakeDocument).to.have.property('loadBalanced', true); + }); + }); + }); + }); +}); diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 5c8d872bb8..75512d522c 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -4,11 +4,12 @@ import { Socket } from 'net'; import * as sinon from 'sinon'; import { Readable } from 'stream'; import { setTimeout } from 'timers'; +import { promisify } from 'util'; import { BinMsg } from '../../../src/cmap/commands'; import { connect } from '../../../src/cmap/connect'; import { Connection, hasSessionSupport } from '../../../src/cmap/connection'; -import { MessageStream } from '../../../src/cmap/message_stream'; +import { MessageStream, OperationDescription } from '../../../src/cmap/message_stream'; import { MongoNetworkTimeoutError, MongoRuntimeError } from '../../../src/error'; import { isHello, ns } from '../../../src/utils'; import * as mock from '../../tools/mongodb-mock/index'; @@ -24,8 +25,15 @@ const connectionOptionsDefaults = { loadBalanced: false }; -/** The absolute minimum socket API needed by Connection as of writing this test */ +/** + * The absolute minimum socket API needed by these tests + * + * The driver has a greater API requirement for sockets detailed in: NODE-4785 + */ class FakeSocket extends EventEmitter { + destroyed = false; + writableEnded: boolean; + timeout = 0; address() { // is never called } @@ -41,6 +49,29 @@ class FakeSocket extends EventEmitter { get remotePort() { return 123; } + setTimeout(timeout) { + this.timeout = timeout; + } +} + +class InputStream extends Readable { + writableEnded: boolean; + timeout = 0; + + constructor(options?) { + super(options); + } + + end(cb) { + this.writableEnded = true; + if (typeof cb === 'function') { + process.nextTick(cb); + } + } + + setTimeout(timeout) { + this.timeout = timeout; + } } describe('new Connection()', function () { @@ -170,7 +201,7 @@ describe('new Connection()', function () { context('when multiple hellos exist on the stream', function () { let callbackSpy; - const inputStream = new Readable(); + const inputStream = new InputStream(); const document = { ok: 1 }; const last = { isWritablePrimary: true }; @@ -367,6 +398,95 @@ describe('new Connection()', function () { }); }); }); + + context('when sending commands on a connection', () => { + const CONNECT_DEFAULTS = { + id: 1, + tls: false, + generation: 1, + monitorCommands: false, + metadata: {} as ClientMetadata, + loadBalanced: false + }; + let server; + let connectOptions; + let connection: Connection; + let streamSetTimeoutSpy; + + beforeEach(async () => { + server = await mock.createServer(); + server.setMessageHandler(request => { + if (isHello(request.document)) { + request.reply(mock.HELLO); + } + }); + connectOptions = { + ...CONNECT_DEFAULTS, + hostAddress: server.hostAddress() as HostAddress, + socketTimeoutMS: 15000 + }; + + connection = await promisify(callback => + //@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence + connect(connectOptions, callback) + )(); + + streamSetTimeoutSpy = sinon.spy(connection.stream, 'setTimeout'); + }); + + afterEach(async () => { + connection.destroy({ force: true }); + sinon.restore(); + await mock.cleanup(); + }); + + it('sets timeout specified on class before writing to the socket', async () => { + await promisify(callback => + connection.command(ns('admin.$cmd'), { hello: 1 }, {}, callback) + )(); + expect(streamSetTimeoutSpy).to.have.been.calledWith(15000); + }); + + it('sets timeout specified on options before writing to the socket', async () => { + await promisify(callback => + connection.command(ns('admin.$cmd'), { hello: 1 }, { socketTimeoutMS: 2000 }, callback) + )(); + expect(streamSetTimeoutSpy).to.have.been.calledWith(2000); + }); + + it('clears timeout after getting a message if moreToCome=false', async () => { + connection.stream.setTimeout(1); + const msg = generateOpMsgBuffer({ hello: 1 }); + const msgHeader = { + length: msg.readInt32LE(0), + requestId: 1, + responseTo: 0, + opCode: msg.readInt32LE(12) + }; + const msgBody = msg.subarray(16); + msgBody.writeInt32LE(0, 0); // OPTS_MORE_TO_COME + connection.onMessage(new BinMsg(msg, msgHeader, msgBody)); + // timeout is still reset + expect(connection.stream).to.have.property('timeout', 0); + }); + + it('does not clear timeout after getting a message if moreToCome=true', async () => { + connection.stream.setTimeout(1); + const msg = generateOpMsgBuffer({ hello: 1 }); + const msgHeader = { + length: msg.readInt32LE(0), + requestId: 1, + responseTo: 0, + opCode: msg.readInt32LE(12) + }; + const msgBody = msg.subarray(16); + msgBody.writeInt32LE(2, 0); // OPTS_MORE_TO_COME + connection[getSymbolFrom(connection, 'queue')].set(0, { cb: () => null }); + connection.onMessage(new BinMsg(msg, msgHeader, msgBody)); + // timeout is still set + expect(connection.stream).to.have.property('timeout', 1); + }); + }); }); describe('onTimeout()', () => {