Skip to content

Commit

Permalink
feat(NODE-3255): add minPoolSizeCheckIntervalMS option to connection …
Browse files Browse the repository at this point in the history
…pool (#3429)
  • Loading branch information
dariakp authored Sep 30, 2022
1 parent 6aeff81 commit 5f34ad0
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 21 deletions.
13 changes: 11 additions & 2 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
waitQueueTimeoutMS: number;
/** If we are in load balancer mode. */
loadBalanced: boolean;
/** @internal */
minPoolSizeCheckFrequencyMS?: number;
}

/** @internal */
Expand Down Expand Up @@ -234,6 +236,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
maxConnecting: options.maxConnecting ?? 2,
maxIdleTimeMS: options.maxIdleTimeMS ?? 0,
waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0,
minPoolSizeCheckFrequencyMS: options.minPoolSizeCheckFrequencyMS ?? 100,
autoEncrypter: options.autoEncrypter,
metadata: options.metadata
});
Expand Down Expand Up @@ -683,12 +686,18 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
if (this[kPoolState] === PoolState.ready) {
clearTimeout(this[kMinPoolSizeTimer]);
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10);
this[kMinPoolSizeTimer] = setTimeout(
() => this.ensureMinPoolSize(),
this.options.minPoolSizeCheckFrequencyMS
);
}
});
} else {
clearTimeout(this[kMinPoolSizeTimer]);
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100);
this[kMinPoolSizeTimer] = setTimeout(
() => this.ensureMinPoolSize(),
this.options.minPoolSizeCheckFrequencyMS
);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { expect } from 'chai';
import { setTimeout } from 'timers';
import { promisify } from 'util';
import { on } from 'events';

import { CommandStartedEvent } from '../../../src';
import { Collection } from '../../../src/collection';
import { MongoClient } from '../../../src/mongo_client';
import { sleep } from '../../tools/utils';

const failPoint = {
configureFailPoint: 'failCommand',
Expand All @@ -25,21 +25,14 @@ async function runTaskGroup(collection: Collection, count: 10 | 100 | 1000) {
}
}

async function ensurePoolIsFull(client: MongoClient) {
async function ensurePoolIsFull(client: MongoClient): Promise<boolean> {
let connectionCount = 0;
const onConnectionCreated = () => connectionCount++;
client.on('connectionCreated', onConnectionCreated);

// 250ms should be plenty of time to fill the connection pool,
// but just in case we'll loop a couple of times.
for (let i = 0; connectionCount < POOL_SIZE * 2 && i < 10; ++i) {
await promisify(setTimeout)(250);
}

client.removeListener('connectionCreated', onConnectionCreated);

if (connectionCount !== POOL_SIZE * 2) {
throw new Error('Connection pool did not fill up');
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _event of on(client, 'connectionCreated')) {
connectionCount++;
if (connectionCount === POOL_SIZE * 2) {
break;
}
}
}

Expand Down Expand Up @@ -82,7 +75,10 @@ describe('operationCount-based Selection Within Latency Window - Prose Test', fu
await client.connect();

// Step 4: Using CMAP events, ensure the client's connection pools for both mongoses have been saturated
await poolIsFullPromise;
const poolIsFull = Promise.race([poolIsFullPromise, sleep(30 * 1000)]);
if (!poolIsFull) {
throw new Error('Timed out waiting for connection pool to fill to minPoolSize');
}

seeds = client.topology.s.seedlist.map(address => address.toString());

Expand Down
7 changes: 5 additions & 2 deletions test/tools/cmap_spec_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,11 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
const poolOptions = test.poolOptions || {};
expect(CMAP_POOL_OPTION_NAMES).to.include.members(Object.keys(poolOptions));

// TODO(NODE-3255): update condition to only remove option if set to -1
let minPoolSizeCheckFrequencyMS;
if (poolOptions.backgroundThreadIntervalMS) {
if (poolOptions.backgroundThreadIntervalMS !== -1) {
minPoolSizeCheckFrequencyMS = poolOptions.backgroundThreadIntervalMS;
}
delete poolOptions.backgroundThreadIntervalMS;
}

Expand All @@ -373,7 +376,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
const mainThread = threadContext.getThread(MAIN_THREAD_KEY);
mainThread.start();

threadContext.createPool({ ...poolOptions, metadata });
threadContext.createPool({ ...poolOptions, metadata, minPoolSizeCheckFrequencyMS });
// yield control back to the event loop so that the ConnectionPoolCreatedEvent
// has a chance to be fired before any synchronously-emitted events from
// the queued operations
Expand Down
88 changes: 88 additions & 0 deletions test/unit/cmap/connection_pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const { expect } = require('chai');
const { setImmediate } = require('timers');
const { ns, isHello } = require('../../../src/utils');
const { LEGACY_HELLO_COMMAND } = require('../../../src/constants');
const { createTimerSandbox } = require('../timer_sandbox');

describe('Connection Pool', function () {
let server;
Expand Down Expand Up @@ -128,6 +129,93 @@ describe('Connection Pool', function () {
});
});

describe('minPoolSize population', function () {
let clock, timerSandbox;
beforeEach(() => {
timerSandbox = createTimerSandbox();
clock = sinon.useFakeTimers();
});

afterEach(() => {
if (clock) {
timerSandbox.restore();
clock.restore();
clock = undefined;
}
});

it('should respect the minPoolSizeCheckFrequencyMS option', function () {
const pool = new ConnectionPool(server, {
minPoolSize: 2,
minPoolSizeCheckFrequencyMS: 42,
hostAddress: server.hostAddress()
});
const ensureSpy = sinon.spy(pool, 'ensureMinPoolSize');

// return a fake connection that won't get identified as perished
const createConnStub = sinon
.stub(pool, 'createConnection')
.yields(null, { destroy: () => null, generation: 0 });

pool.ready();

// expect ensureMinPoolSize to execute immediately
expect(ensureSpy).to.have.been.calledOnce;
expect(createConnStub).to.have.been.calledOnce;

// check that the successful connection return schedules another run
clock.tick(42);
expect(ensureSpy).to.have.been.calledTwice;
expect(createConnStub).to.have.been.calledTwice;

// check that the 2nd successful connection return schedules another run
// but don't expect to get a new connection since we are at minPoolSize
clock.tick(42);
expect(ensureSpy).to.have.been.calledThrice;
expect(createConnStub).to.have.been.calledTwice;

// check that the next scheduled check runs even after we're at minPoolSize
clock.tick(42);
expect(ensureSpy).to.have.callCount(4);
expect(createConnStub).to.have.been.calledTwice;
});

it('should default minPoolSizeCheckFrequencyMS to 100ms', function () {
const pool = new ConnectionPool(server, {
minPoolSize: 2,
hostAddress: server.hostAddress()
});
const ensureSpy = sinon.spy(pool, 'ensureMinPoolSize');

// return a fake connection that won't get identified as perished
const createConnStub = sinon
.stub(pool, 'createConnection')
.yields(null, { destroy: () => null, generation: 0 });

pool.ready();

// expect ensureMinPoolSize to execute immediately
expect(ensureSpy).to.have.been.calledOnce;
expect(createConnStub).to.have.been.calledOnce;

// check that the successful connection return schedules another run
clock.tick(100);
expect(ensureSpy).to.have.been.calledTwice;
expect(createConnStub).to.have.been.calledTwice;

// check that the 2nd successful connection return schedules another run
// but don't expect to get a new connection since we are at minPoolSize
clock.tick(100);
expect(ensureSpy).to.have.been.calledThrice;
expect(createConnStub).to.have.been.calledTwice;

// check that the next scheduled check runs even after we're at minPoolSize
clock.tick(100);
expect(ensureSpy).to.have.callCount(4);
expect(createConnStub).to.have.been.calledTwice;
});
});

describe('withConnection', function () {
it('should manage a connection for a successful operation', function (done) {
server.setMessageHandler(request => {
Expand Down

0 comments on commit 5f34ad0

Please sign in to comment.