Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5316): prevent parallel topology creation in MongoClient.connect #3696

Merged
merged 4 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 46 additions & 35 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
topology?: Topology;
/** @internal */
readonly mongoLogger: MongoLogger;
/** @internal */
private connectionLock?: Promise<this>;

/**
* The consolidate, parsed, transformed and merged options.
Expand Down Expand Up @@ -447,54 +449,63 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
}

return maybeCallback(async () => {
if (this.topology && this.topology.isConnected()) {
try {
this.connectionLock = this.connectionLock ?? this._connect();
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
await this.connectionLock;
return this;
} finally {
this.connectionLock = undefined;
}
}, callback);
}

const options = this[kOptions];
private async _connect(): Promise<this> {
if (this.topology && this.topology.isConnected()) {
return this;
}

if (typeof options.srvHost === 'string') {
const hosts = await resolveSRVRecord(options);
const options = this[kOptions];

for (const [index, host] of hosts.entries()) {
options.hosts[index] = host;
}
if (typeof options.srvHost === 'string') {
const hosts = await resolveSRVRecord(options);

for (const [index, host] of hosts.entries()) {
options.hosts[index] = host;
}
}

const topology = new Topology(options.hosts, options);
// Events can be emitted before initialization is complete so we have to
// save the reference to the topology on the client ASAP if the event handlers need to access it
this.topology = topology;
topology.client = this;
const topology = new Topology(options.hosts, options);
// Events can be emitted before initialization is complete so we have to
// save the reference to the topology on the client ASAP if the event handlers need to access it
this.topology = topology;
topology.client = this;

topology.once(Topology.OPEN, () => this.emit('open', this));
topology.once(Topology.OPEN, () => this.emit('open', this));

for (const event of MONGO_CLIENT_EVENTS) {
topology.on(event, (...args: any[]): unknown => this.emit(event, ...(args as any)));
}
for (const event of MONGO_CLIENT_EVENTS) {
topology.on(event, (...args: any[]): unknown => this.emit(event, ...(args as any)));
}

const topologyConnect = async () => {
try {
await promisify(callback => topology.connect(options, callback))();
} catch (error) {
topology.close({ force: true });
throw error;
}
};

if (this.autoEncrypter) {
const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback));
await initAutoEncrypter();
await topologyConnect();
await options.encrypter.connectInternalClient();
} else {
await topologyConnect();
const topologyConnect = async () => {
try {
await promisify(callback => topology.connect(options, callback))();
} catch (error) {
topology.close({ force: true });
throw error;
}
};

return this;
}, callback);
}
if (this.autoEncrypter) {
const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback));
await initAutoEncrypter();
await topologyConnect();
await options.encrypter.connectInternalClient();
} else {
await topologyConnect();
}

return this;
}
/**
* Close the db and its underlying connections
*
Expand Down
59 changes: 59 additions & 0 deletions test/integration/node-specific/mongo_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,65 @@ describe('class MongoClient', function () {
);
});

context('concurrent #connect()', () => {
let client: MongoClient;
let topologyOpenEvents;

/** Keep track number of call to client connect to close as many as connect (otherwise leak_checker hook will failed) */
let clientConnectCounter: number;

/**
* Wrap the connect method of the client to keep track
* of number of times connect is called
*/
async function clientConnect() {
if (!client) {
return;
}
clientConnectCounter++;
return client.connect();
}

beforeEach(async function () {
client = this.configuration.newClient();
topologyOpenEvents = [];
clientConnectCounter = 0;
client.on('open', event => topologyOpenEvents.push(event));
});

afterEach(async function () {
// close `clientConnectCounter` times
const clientClosePromises = Array.from({ length: clientConnectCounter }, () =>
client.close()
);
await Promise.all(clientClosePromises);
});

it('parallel client connect calls only create one topology', async function () {
await Promise.all([clientConnect(), clientConnect(), clientConnect()]);

expect(topologyOpenEvents).to.have.lengthOf(1);
expect(client.topology?.isConnected()).to.be.true;
});

it('when connect rejects lock is released regardless', async function () {
const internalConnectStub = sinon.stub(client, '_connect' as keyof MongoClient);
internalConnectStub.onFirstCall().rejects(new Error('cannot connect'));

// first call rejected to simulate a connection failure
const error = await clientConnect().catch(error => error);
expect(error).to.match(/cannot connect/);

internalConnectStub.restore();

// second call should connect
await clientConnect();

expect(topologyOpenEvents).to.have.lengthOf(1);
expect(client.topology?.isConnected()).to.be.true;
});
});

context('#close()', () => {
let client: MongoClient;
let db: Db;
Expand Down