diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 06aeacf1f5..12a41c7ab1 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -343,6 +343,8 @@ export class MongoClient extends TypedEventEmitter { topology?: Topology; /** @internal */ readonly mongoLogger: MongoLogger; + /** @internal */ + private connectionLock?: Promise; /** * The consolidate, parsed, transformed and merged options. @@ -447,54 +449,66 @@ export class MongoClient extends TypedEventEmitter { } return maybeCallback(async () => { - if (this.topology && this.topology.isConnected()) { + if (this.connectionLock) { + return this.connectionLock; + } + try { + this.connectionLock = this._connect(); + await this.connectionLock; return this; + } finally { + this.connectionLock = undefined; } + }, callback); + } - const options = this[kOptions]; + private async _connect(): Promise { + if (this.topology && this.topology.isConnected()) { + return this; + } - if (typeof options.srvHost === 'string') { - const hosts = await resolveSRVRecord(options); + const options = this[kOptions]; - for (const [index, host] of hosts.entries()) { - options.hosts[index] = host; - } + if (typeof options.srvHost === 'string') { + const hosts = await resolveSRVRecord(options); + + for (const [index, host] of hosts.entries()) { + options.hosts[index] = host; } + } - const topology = new Topology(options.hosts, options); - // Events can be emitted before initialization is complete so we have to - // save the reference to the topology on the client ASAP if the event handlers need to access it - this.topology = topology; - topology.client = this; + const topology = new Topology(options.hosts, options); + // Events can be emitted before initialization is complete so we have to + // save the reference to the topology on the client ASAP if the event handlers need to access it + this.topology = topology; + topology.client = this; - topology.once(Topology.OPEN, () => this.emit('open', this)); + topology.once(Topology.OPEN, () => this.emit('open', this)); - for (const event of MONGO_CLIENT_EVENTS) { - topology.on(event, (...args: any[]): unknown => this.emit(event, ...(args as any))); - } + for (const event of MONGO_CLIENT_EVENTS) { + topology.on(event, (...args: any[]): unknown => this.emit(event, ...(args as any))); + } - const topologyConnect = async () => { - try { - await promisify(callback => topology.connect(options, callback))(); - } catch (error) { - topology.close({ force: true }); - throw error; - } - }; - - if (this.autoEncrypter) { - const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback)); - await initAutoEncrypter(); - await topologyConnect(); - await options.encrypter.connectInternalClient(); - } else { - await topologyConnect(); + const topologyConnect = async () => { + try { + await promisify(callback => topology.connect(options, callback))(); + } catch (error) { + topology.close({ force: true }); + throw error; } + }; - return this; - }, callback); - } + if (this.autoEncrypter) { + const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback)); + await initAutoEncrypter(); + await topologyConnect(); + await options.encrypter.connectInternalClient(); + } else { + await topologyConnect(); + } + return this; + } /** * Close the db and its underlying connections * diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index e0e005a899..e70b0348a6 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -516,6 +516,65 @@ describe('class MongoClient', function () { ); }); + context('concurrent #connect()', () => { + let client: MongoClient; + let topologyOpenEvents; + + /** Keep track number of call to client connect to close as many as connect (otherwise leak_checker hook will failed) */ + let clientConnectCounter: number; + + /** + * Wrap the connect method of the client to keep track + * of number of times connect is called + */ + async function clientConnect() { + if (!client) { + return; + } + clientConnectCounter++; + return client.connect(); + } + + beforeEach(async function () { + client = this.configuration.newClient(); + topologyOpenEvents = []; + clientConnectCounter = 0; + client.on('open', event => topologyOpenEvents.push(event)); + }); + + afterEach(async function () { + // close `clientConnectCounter` times + const clientClosePromises = Array.from({ length: clientConnectCounter }, () => + client.close() + ); + await Promise.all(clientClosePromises); + }); + + it('parallel client connect calls only create one topology', async function () { + await Promise.all([clientConnect(), clientConnect(), clientConnect()]); + + expect(topologyOpenEvents).to.have.lengthOf(1); + expect(client.topology?.isConnected()).to.be.true; + }); + + it('when connect rejects lock is released regardless', async function () { + const internalConnectStub = sinon.stub(client, '_connect' as keyof MongoClient); + internalConnectStub.onFirstCall().rejects(new Error('cannot connect')); + + // first call rejected to simulate a connection failure + const error = await clientConnect().catch(error => error); + expect(error).to.match(/cannot connect/); + + internalConnectStub.restore(); + + // second call should connect + await clientConnect(); + + expect(topologyOpenEvents).to.have.lengthOf(1); + expect(client.topology?.isConnected()).to.be.true; + }); + }); + context('#close()', () => { let client: MongoClient; let db: Db;