From 9883993a14cd7a67ed0439eced63db7c53836d88 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Fri, 16 Sep 2022 09:08:49 -0400 Subject: [PATCH] refactor(NODE-4637): clean up async interval (#3411) --- src/index.ts | 3 +- src/sdam/monitor.ts | 152 ++++++++- src/sdam/topology.ts | 11 +- src/utils.ts | 119 ------- test/unit/sdam/monitor.test.js | 258 --------------- test/unit/sdam/monitor.test.ts | 558 +++++++++++++++++++++++++++++++++ test/unit/utils.test.ts | 292 ----------------- 7 files changed, 704 insertions(+), 689 deletions(-) delete mode 100644 test/unit/sdam/monitor.test.js create mode 100644 test/unit/sdam/monitor.test.ts diff --git a/src/index.ts b/src/index.ts index 04bc2f9b2a..6dc03ef7c5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -431,6 +431,8 @@ export type { ClusterTime, TimerQueue } from './sdam/common'; export type { Monitor, MonitorEvents, + MonitorInterval, + MonitorIntervalOptions, MonitorOptions, MonitorPrivate, RTTPinger, @@ -475,7 +477,6 @@ export type { ClientMetadataOptions, EventEmitterWithState, HostAddress, - InterruptibleAsyncInterval, MongoDBNamespace } from './utils'; export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern'; diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 47268a4a41..c455b58c8c 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -6,15 +6,8 @@ import { Connection, ConnectionOptions } from '../cmap/connection'; import { LEGACY_HELLO_COMMAND } from '../constants'; import { MongoError, MongoErrorLabel } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; -import type { Callback, InterruptibleAsyncInterval } from '../utils'; -import { - calculateDurationInMs, - EventEmitterWithState, - makeInterruptibleAsyncInterval, - makeStateMachine, - now, - ns -} from '../utils'; +import type { Callback } from '../utils'; +import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils'; import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common'; import { ServerHeartbeatFailedEvent, @@ -87,7 +80,7 @@ export class Monitor extends TypedEventEmitter { [kConnection]?: Connection; [kCancellationToken]: CancellationToken; /** @internal */ - [kMonitorId]?: InterruptibleAsyncInterval; + [kMonitorId]?: MonitorInterval; [kRTTPinger]?: RTTPinger; get connection(): Connection | undefined { @@ -150,9 +143,9 @@ export class Monitor extends TypedEventEmitter { // start const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS; const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS; - this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), { - interval: heartbeatFrequencyMS, - minInterval: minHeartbeatFrequencyMS, + this[kMonitorId] = new MonitorInterval(monitorServer(this), { + heartbeatFrequencyMS: heartbeatFrequencyMS, + minHeartbeatFrequencyMS: minHeartbeatFrequencyMS, immediate: true }); } @@ -180,9 +173,9 @@ export class Monitor extends TypedEventEmitter { // restart monitoring const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS; const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS; - this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), { - interval: heartbeatFrequencyMS, - minInterval: minHeartbeatFrequencyMS + this[kMonitorId] = new MonitorInterval(monitorServer(this), { + heartbeatFrequencyMS: heartbeatFrequencyMS, + minHeartbeatFrequencyMS: minHeartbeatFrequencyMS }); } @@ -466,3 +459,130 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { measureAndReschedule(); }); } + +/** + * @internal + */ +export interface MonitorIntervalOptions { + /** The interval to execute a method on */ + heartbeatFrequencyMS: number; + /** A minimum interval that must elapse before the method is called */ + minHeartbeatFrequencyMS: number; + /** Whether the method should be called immediately when the interval is started */ + immediate: boolean; + + /** + * Only used for testing unreliable timer environments + * @internal + */ + clock: () => number; +} + +/** + * @internal + */ +export class MonitorInterval { + fn: (callback: Callback) => void; + timerId: NodeJS.Timeout | undefined; + lastCallTime: number; + isExpeditedCheckScheduled = false; + stopped = false; + + heartbeatFrequencyMS: number; + minHeartbeatFrequencyMS: number; + clock: () => number; + + constructor(fn: (callback: Callback) => void, options: Partial = {}) { + this.fn = fn; + this.lastCallTime = 0; + + this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 1000; + this.minHeartbeatFrequencyMS = options.minHeartbeatFrequencyMS ?? 500; + this.clock = typeof options.clock === 'function' ? options.clock : now; + + if (options.immediate) { + this._executeAndReschedule(); + } else { + this.lastCallTime = this.clock(); + this._reschedule(undefined); + } + } + + wake() { + const currentTime = this.clock(); + const nextScheduledCallTime = this.lastCallTime + this.heartbeatFrequencyMS; + const timeUntilNextCall = nextScheduledCallTime - currentTime; + + // For the streaming protocol: there is nothing obviously stopping this + // interval from being woken up again while we are waiting "infinitely" + // for `fn` to be called again`. Since the function effectively + // never completes, the `timeUntilNextCall` will continue to grow + // negatively unbounded, so it will never trigger a reschedule here. + + // This is possible in virtualized environments like AWS Lambda where our + // clock is unreliable. In these cases the timer is "running" but never + // actually completes, so we want to execute immediately and then attempt + // to reschedule. + if (timeUntilNextCall < 0) { + this._executeAndReschedule(); + return; + } + + // debounce multiple calls to wake within the `minInterval` + if (this.isExpeditedCheckScheduled) { + return; + } + + // reschedule a call as soon as possible, ensuring the call never happens + // faster than the `minInterval` + if (timeUntilNextCall > this.minHeartbeatFrequencyMS) { + this._reschedule(this.minHeartbeatFrequencyMS); + this.isExpeditedCheckScheduled = true; + } + } + + stop() { + this.stopped = true; + if (this.timerId) { + clearTimeout(this.timerId); + this.timerId = undefined; + } + + this.lastCallTime = 0; + this.isExpeditedCheckScheduled = false; + } + + toString() { + return JSON.stringify(this); + } + + toJSON() { + return { + timerId: this.timerId != null ? 'set' : 'cleared', + lastCallTime: this.lastCallTime, + isExpeditedCheckScheduled: this.isExpeditedCheckScheduled, + stopped: this.stopped, + heartbeatFrequencyMS: this.heartbeatFrequencyMS, + minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS + }; + } + + private _reschedule(ms?: number) { + if (this.stopped) return; + if (this.timerId) { + clearTimeout(this.timerId); + } + + this.timerId = setTimeout(this._executeAndReschedule, ms || this.heartbeatFrequencyMS); + } + + private _executeAndReschedule = () => { + this.isExpeditedCheckScheduled = false; + this.lastCallTime = this.clock(); + + this.fn(err => { + if (err) throw err; + this._reschedule(this.heartbeatFrequencyMS); + }); + }; +} diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index b4ffcae556..61ec49fe53 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -386,7 +386,9 @@ export class Topology extends TypedEventEmitter { } /** Initiate server connect */ - connect(options?: ConnectOptions, callback?: Callback): void { + connect(callback: Callback): void; + connect(options: ConnectOptions, callback: Callback): void; + connect(options?: ConnectOptions | Callback, callback?: Callback): void { if (typeof options === 'function') (callback = options), (options = {}); options = options ?? {}; if (this.s.state === STATE_CONNECTED) { @@ -468,7 +470,10 @@ export class Topology extends TypedEventEmitter { } /** Close this topology */ - close(options?: CloseOptions, callback?: Callback): void { + close(callback: Callback): void; + close(options: CloseOptions): void; + close(options: CloseOptions, callback: Callback): void; + close(options?: CloseOptions | Callback, callback?: Callback): void { if (typeof options === 'function') { callback = options; options = {}; @@ -484,7 +489,7 @@ export class Topology extends TypedEventEmitter { } const destroyedServers = Array.from(this.s.servers.values(), server => { - return promisify(destroyServer)(server, this, options); + return promisify(destroyServer)(server, this, options as CloseOptions); }); Promise.all(destroyedServers) diff --git a/src/utils.ts b/src/utils.ts index 017b4012eb..82796ae17a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,7 +1,6 @@ import * as crypto from 'crypto'; import type { SrvRecord } from 'dns'; import * as os from 'os'; -import { clearTimeout, setTimeout } from 'timers'; import { URL } from 'url'; import { Document, ObjectId, resolveBSONOptions } from './bson'; @@ -774,124 +773,6 @@ export function calculateDurationInMs(started: number): number { return elapsed < 0 ? 0 : elapsed; } -export interface InterruptibleAsyncIntervalOptions { - /** The interval to execute a method on */ - interval: number; - /** A minimum interval that must elapse before the method is called */ - minInterval: number; - /** Whether the method should be called immediately when the interval is started */ - immediate: boolean; - - /** - * Only used for testing unreliable timer environments - * @internal - */ - clock: () => number; -} - -/** @internal */ -export interface InterruptibleAsyncInterval { - wake(): void; - stop(): void; -} - -/** - * Creates an interval timer which is able to be woken up sooner than - * the interval. The timer will also debounce multiple calls to wake - * ensuring that the function is only ever called once within a minimum - * interval window. - * @internal - * - * @param fn - An async function to run on an interval, must accept a `callback` as its only parameter - */ -export function makeInterruptibleAsyncInterval( - fn: (callback: Callback) => void, - options?: Partial -): InterruptibleAsyncInterval { - let timerId: NodeJS.Timeout | undefined; - let lastCallTime: number; - let cannotBeExpedited = false; - let stopped = false; - - options = options ?? {}; - const interval = options.interval || 1000; - const minInterval = options.minInterval || 500; - const immediate = typeof options.immediate === 'boolean' ? options.immediate : false; - const clock = typeof options.clock === 'function' ? options.clock : now; - - function wake() { - const currentTime = clock(); - const nextScheduledCallTime = lastCallTime + interval; - const timeUntilNextCall = nextScheduledCallTime - currentTime; - - // For the streaming protocol: there is nothing obviously stopping this - // interval from being woken up again while we are waiting "infinitely" - // for `fn` to be called again`. Since the function effectively - // never completes, the `timeUntilNextCall` will continue to grow - // negatively unbounded, so it will never trigger a reschedule here. - - // This is possible in virtualized environments like AWS Lambda where our - // clock is unreliable. In these cases the timer is "running" but never - // actually completes, so we want to execute immediately and then attempt - // to reschedule. - if (timeUntilNextCall < 0) { - executeAndReschedule(); - return; - } - - // debounce multiple calls to wake within the `minInterval` - if (cannotBeExpedited) { - return; - } - - // reschedule a call as soon as possible, ensuring the call never happens - // faster than the `minInterval` - if (timeUntilNextCall > minInterval) { - reschedule(minInterval); - cannotBeExpedited = true; - } - } - - function stop() { - stopped = true; - if (timerId) { - clearTimeout(timerId); - timerId = undefined; - } - - lastCallTime = 0; - cannotBeExpedited = false; - } - - function reschedule(ms?: number) { - if (stopped) return; - if (timerId) { - clearTimeout(timerId); - } - - timerId = setTimeout(executeAndReschedule, ms || interval); - } - - function executeAndReschedule() { - cannotBeExpedited = false; - lastCallTime = clock(); - - fn(err => { - if (err) throw err; - reschedule(interval); - }); - } - - if (immediate) { - executeAndReschedule(); - } else { - lastCallTime = clock(); - reschedule(undefined); - } - - return { wake, stop }; -} - /** @internal */ export function hasAtomicOperators(doc: Document | Document[]): boolean { if (Array.isArray(doc)) { diff --git a/test/unit/sdam/monitor.test.js b/test/unit/sdam/monitor.test.js deleted file mode 100644 index 40d5f0a7fe..0000000000 --- a/test/unit/sdam/monitor.test.js +++ /dev/null @@ -1,258 +0,0 @@ -'use strict'; -const { setTimeout } = require('timers'); -const mock = require('../../tools/mongodb-mock/index'); -const { ServerType } = require('../../../src/sdam/common'); -const { Topology } = require('../../../src/sdam/topology'); -const { Monitor } = require('../../../src/sdam/monitor'); -const { expect } = require('chai'); -const { ServerDescription } = require('../../../src/sdam/server_description'); -const { LEGACY_HELLO_COMMAND } = require('../../../src/constants'); -const { isHello } = require('../../../src/utils'); - -class MockServer { - constructor(options) { - this.s = { pool: { generation: 1 } }; - this.description = new ServerDescription(`${options.host}:${options.port}`); - this.description.type = ServerType.Unknown; - } -} - -describe('monitoring', function () { - let mockServer; - - after(() => mock.cleanup()); - beforeEach(function () { - return mock.createServer().then(server => (mockServer = server)); - }); - - // TODO(NODE-3819): Unskip flaky tests. - it.skip('should record roundTripTime', function (done) { - mockServer.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(Object.assign({}, mock.HELLO)); - } else if (doc.endSessions) { - request.reply({ ok: 1 }); - } - }); - - // set `heartbeatFrequencyMS` to 250ms to force a quick monitoring check, and wait 500ms to validate below - const topology = new Topology(mockServer.hostAddress(), { heartbeatFrequencyMS: 250 }); - topology.connect(err => { - expect(err).to.not.exist; - - setTimeout(() => { - expect(topology).property('description').property('servers').to.have.length(1); - - const serverDescription = Array.from(topology.description.servers.values())[0]; - expect(serverDescription).property('roundTripTime').to.be.greaterThan(0); - - topology.close(done); - }, 500); - }); - }).skipReason = 'TODO(NODE-3819): Unskip flaky tests'; - - // TODO(NODE-3600): Unskip flaky test - it.skip('should recover on error during initial connect', function (done) { - // This test should take ~1s because initial server selection fails and an immediate check - // is requested. If the behavior of the immediate check is broken, the test will take ~10s - // to complete. We want to ensure validation of the immediate check behavior, and therefore - // hardcode the test timeout to 2s. - this.timeout(2000); - - let acceptConnections = false; - mockServer.setMessageHandler(request => { - if (!acceptConnections) { - request.connection.destroy(); - return; - } - - const doc = request.document; - if (isHello(doc)) { - request.reply(Object.assign({}, mock.HELLO)); - } else if (doc.endSessions) { - request.reply({ ok: 1 }); - } - }); - - setTimeout(() => { - acceptConnections = true; - }, 250); - - const topology = new Topology(mockServer.hostAddress(), {}); - topology.connect(err => { - expect(err).to.not.exist; - expect(topology).property('description').property('servers').to.have.length(1); - - const serverDescription = Array.from(topology.description.servers.values())[0]; - expect(serverDescription).property('roundTripTime').to.be.greaterThan(0); - - topology.close(done); - }); - }).skipReason = 'TODO(NODE-3600): Unskip flaky tests'; - - describe('Monitor', function () { - let monitor; - - beforeEach(() => { - monitor = null; - }); - - afterEach(() => { - if (monitor) { - monitor.close(); - } - }); - - it('should connect and issue an initial server check', function (done) { - mockServer.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(Object.assign({}, mock.HELLO)); - } - }); - - const server = new MockServer(mockServer.address()); - monitor = new Monitor(server, {}); - - monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure'))); - monitor.on('serverHeartbeatSucceeded', () => { - expect(monitor.connection.isMonitoringConnection).to.be.true; - done(); - }); - monitor.connect(); - }); - - it('should ignore attempts to connect when not already closed', function (done) { - mockServer.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(Object.assign({}, mock.HELLO)); - } - }); - - const server = new MockServer(mockServer.address()); - monitor = new Monitor(server, {}); - - monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure'))); - monitor.on('serverHeartbeatSucceeded', () => done()); - monitor.connect(); - monitor.connect(); - }); - - it('should not initiate another check if one is in progress', function (done) { - mockServer.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - setTimeout(() => request.reply(Object.assign({}, mock.HELLO)), 250); - } - }); - - const server = new MockServer(mockServer.address()); - monitor = new Monitor(server, {}); - - const startedEvents = []; - monitor.on('serverHeartbeatStarted', event => startedEvents.push(event)); - monitor.on('close', () => { - expect(startedEvents).to.have.length(2); - done(); - }); - - monitor.connect(); - monitor.once('serverHeartbeatSucceeded', () => { - monitor.requestCheck(); - monitor.requestCheck(); - monitor.requestCheck(); - monitor.requestCheck(); - monitor.requestCheck(); - - const minHeartbeatFrequencyMS = 500; - setTimeout(() => { - // wait for minHeartbeatFrequencyMS, then request a check and verify another check occurred - monitor.once('serverHeartbeatSucceeded', () => { - monitor.close(); - }); - - monitor.requestCheck(); - }, minHeartbeatFrequencyMS); - }); - }); - - it('should not close the monitor on a failed heartbeat', function (done) { - let helloCount = 0; - mockServer.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - helloCount++; - if (helloCount === 2) { - request.reply({ ok: 0, errmsg: 'forced from mock server' }); - return; - } - - if (helloCount === 3) { - request.connection.destroy(); - return; - } - - request.reply(mock.HELLO); - } - }); - - const server = new MockServer(mockServer.address()); - server.description = new ServerDescription(server.description.hostAddress); - monitor = new Monitor(server, { - heartbeatFrequencyMS: 250, - minHeartbeatFrequencyMS: 50 - }); - - const events = []; - monitor.on('serverHeartbeatFailed', event => events.push(event)); - - let successCount = 0; - monitor.on('serverHeartbeatSucceeded', () => { - if (successCount++ === 2) { - monitor.close(); - } - }); - - monitor.on('close', () => { - expect(events).to.have.length(2); - done(); - }); - - monitor.connect(); - }); - - it('should upgrade to hello from legacy hello when initial handshake contains helloOk', function (done) { - const docs = []; - mockServer.setMessageHandler(request => { - const doc = request.document; - docs.push(doc); - if (docs.length === 2) { - expect(docs[0]).to.have.property(LEGACY_HELLO_COMMAND, true); - expect(docs[0]).to.have.property('helloOk', true); - expect(docs[1]).to.have.property('hello', true); - done(); - } else if (isHello(doc)) { - setTimeout(() => request.reply(Object.assign({ helloOk: true }, mock.HELLO)), 250); - } - }); - - const server = new MockServer(mockServer.address()); - monitor = new Monitor(server, {}); - - monitor.connect(); - monitor.once('serverHeartbeatSucceeded', () => { - const minHeartbeatFrequencyMS = 500; - setTimeout(() => { - // wait for minHeartbeatFrequencyMS, then request a check and verify another check occurred - monitor.once('serverHeartbeatSucceeded', () => { - monitor.close(); - }); - - monitor.requestCheck(); - }, minHeartbeatFrequencyMS); - }); - }); - }); -}); diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts new file mode 100644 index 0000000000..987040ef8d --- /dev/null +++ b/test/unit/sdam/monitor.test.ts @@ -0,0 +1,558 @@ +import { expect } from 'chai'; +import * as sinon from 'sinon'; +import { setTimeout } from 'timers'; + +import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; +import { ServerType } from '../../../src/sdam/common'; +import { ServerHeartbeatFailedEvent, ServerHeartbeatStartedEvent } from '../../../src/sdam/events'; +import { Monitor, MonitorInterval } from '../../../src/sdam/monitor'; +import { ServerDescription } from '../../../src/sdam/server_description'; +import { Topology } from '../../../src/sdam/topology'; +import { isHello } from '../../../src/utils'; +import * as mock from '../../tools/mongodb-mock/index'; +import { createTimerSandbox } from '../timer_sandbox'; + +class MockServer { + s: any; + description: ServerDescription; + constructor(options) { + this.s = { pool: { generation: 1 } }; + this.description = new ServerDescription(`${options.host}:${options.port}`); + this.description.type = ServerType.Unknown; + } +} + +describe('monitoring', function () { + let mockServer; + + after(() => mock.cleanup()); + beforeEach(function () { + return mock.createServer().then(server => (mockServer = server)); + }); + + // TODO(NODE-3819): Unskip flaky tests. + it.skip('should record roundTripTime', function (done) { + mockServer.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + request.reply(Object.assign({}, mock.HELLO)); + } else if (doc.endSessions) { + request.reply({ ok: 1 }); + } + }); + + // set `heartbeatFrequencyMS` to 250ms to force a quick monitoring check, and wait 500ms to validate below + const topology = new Topology(mockServer.hostAddress(), { heartbeatFrequencyMS: 250 } as any); + topology.connect(err => { + expect(err).to.not.exist; + + setTimeout(() => { + expect(topology).property('description').property('servers').to.have.length(1); + + const serverDescription = Array.from(topology.description.servers.values())[0]; + expect(serverDescription).property('roundTripTime').to.be.greaterThan(0); + + topology.close(done as any); + }, 500); + }); + }).skipReason = 'TODO(NODE-3819): Unskip flaky tests'; + + // TODO(NODE-3600): Unskip flaky test + it.skip('should recover on error during initial connect', function (done) { + // This test should take ~1s because initial server selection fails and an immediate check + // is requested. If the behavior of the immediate check is broken, the test will take ~10s + // to complete. We want to ensure validation of the immediate check behavior, and therefore + // hardcode the test timeout to 2s. + this.timeout(2000); + + let acceptConnections = false; + mockServer.setMessageHandler(request => { + if (!acceptConnections) { + request.connection.destroy(); + return; + } + + const doc = request.document; + if (isHello(doc)) { + request.reply(Object.assign({}, mock.HELLO)); + } else if (doc.endSessions) { + request.reply({ ok: 1 }); + } + }); + + setTimeout(() => { + acceptConnections = true; + }, 250); + + const topology = new Topology(mockServer.hostAddress(), {}); + topology.connect(err => { + expect(err).to.not.exist; + expect(topology).property('description').property('servers').to.have.length(1); + + const serverDescription = Array.from(topology.description.servers.values())[0]; + expect(serverDescription).property('roundTripTime').to.be.greaterThan(0); + + topology.close(done); + }); + }).skipReason = 'TODO(NODE-3600): Unskip flaky tests'; + + describe('Monitor', function () { + let monitor; + + beforeEach(() => { + monitor = null; + }); + + afterEach(() => { + if (monitor) { + monitor.close(); + } + }); + + it('should connect and issue an initial server check', function (done) { + mockServer.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + request.reply(Object.assign({}, mock.HELLO)); + } + }); + + const server = new MockServer(mockServer.address()); + monitor = new Monitor(server as any, {} as any); + + monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure'))); + monitor.on('serverHeartbeatSucceeded', () => { + expect(monitor.connection.isMonitoringConnection).to.be.true; + done(); + }); + monitor.connect(); + }); + + it('should ignore attempts to connect when not already closed', function (done) { + mockServer.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + request.reply(Object.assign({}, mock.HELLO)); + } + }); + + const server = new MockServer(mockServer.address()); + monitor = new Monitor(server as any, {} as any); + + monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure'))); + monitor.on('serverHeartbeatSucceeded', () => done()); + monitor.connect(); + monitor.connect(); + }); + + it('should not initiate another check if one is in progress', function (done) { + mockServer.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + setTimeout(() => request.reply(Object.assign({}, mock.HELLO)), 250); + } + }); + + const server = new MockServer(mockServer.address()); + monitor = new Monitor(server as any, {} as any); + + const startedEvents: ServerHeartbeatStartedEvent[] = []; + monitor.on('serverHeartbeatStarted', event => startedEvents.push(event)); + monitor.on('close', () => { + expect(startedEvents).to.have.length(2); + done(); + }); + + monitor.connect(); + monitor.once('serverHeartbeatSucceeded', () => { + monitor.requestCheck(); + monitor.requestCheck(); + monitor.requestCheck(); + monitor.requestCheck(); + monitor.requestCheck(); + + const minHeartbeatFrequencyMS = 500; + setTimeout(() => { + // wait for minHeartbeatFrequencyMS, then request a check and verify another check occurred + monitor.once('serverHeartbeatSucceeded', () => { + monitor.close(); + }); + + monitor.requestCheck(); + }, minHeartbeatFrequencyMS); + }); + }); + + it('should not close the monitor on a failed heartbeat', function (done) { + let helloCount = 0; + mockServer.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + helloCount++; + if (helloCount === 2) { + request.reply({ ok: 0, errmsg: 'forced from mock server' }); + return; + } + + if (helloCount === 3) { + request.connection.destroy(); + return; + } + + request.reply(mock.HELLO); + } + }); + + const server = new MockServer(mockServer.address()); + server.description = new ServerDescription(server.description.hostAddress); + monitor = new Monitor( + server as any, + { + heartbeatFrequencyMS: 250, + minHeartbeatFrequencyMS: 50 + } as any + ); + + const events: ServerHeartbeatFailedEvent[] = []; + monitor.on('serverHeartbeatFailed', event => events.push(event)); + + let successCount = 0; + monitor.on('serverHeartbeatSucceeded', () => { + if (successCount++ === 2) { + monitor.close(); + } + }); + + monitor.on('close', () => { + expect(events).to.have.length(2); + done(); + }); + + monitor.connect(); + }); + + it('should upgrade to hello from legacy hello when initial handshake contains helloOk', function (done) { + const docs: any[] = []; + mockServer.setMessageHandler(request => { + const doc = request.document; + docs.push(doc); + if (docs.length === 2) { + expect(docs[0]).to.have.property(LEGACY_HELLO_COMMAND, true); + expect(docs[0]).to.have.property('helloOk', true); + expect(docs[1]).to.have.property('hello', true); + done(); + } else if (isHello(doc)) { + setTimeout(() => request.reply(Object.assign({ helloOk: true }, mock.HELLO)), 250); + } + }); + + const server = new MockServer(mockServer.address()); + monitor = new Monitor(server as any, {} as any); + + monitor.connect(); + monitor.once('serverHeartbeatSucceeded', () => { + const minHeartbeatFrequencyMS = 500; + setTimeout(() => { + // wait for minHeartbeatFrequencyMS, then request a check and verify another check occurred + monitor.once('serverHeartbeatSucceeded', () => { + monitor.close(); + }); + + monitor.requestCheck(); + }, minHeartbeatFrequencyMS); + }); + }); + }); + + describe('class MonitorInterval', function () { + let timerSandbox, clock, executor, fnSpy; + + beforeEach(function () { + timerSandbox = createTimerSandbox(); + clock = sinon.useFakeTimers(); + fnSpy = sinon.spy(cb => { + cb(); + }); + }); + + afterEach(function () { + if (executor) { + executor.stop(); + } + clock.restore(); + timerSandbox.restore(); + }); + + context('when the immediate option is provided', function () { + it('executes the function immediately and schedules the next execution on the interval', function () { + executor = new MonitorInterval(fnSpy, { + immediate: true, + minHeartbeatFrequencyMS: 10, + heartbeatFrequencyMS: 30 + }); + // expect immediate invocation + expect(fnSpy.calledOnce).to.be.true; + // advance clock by less than the scheduled interval to ensure we don't execute early + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + // advance clock to the interval + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + }); + + context('when the immediate option is not provided', function () { + it('executes the function on the provided interval', function () { + executor = new MonitorInterval(fnSpy, { + minHeartbeatFrequencyMS: 10, + heartbeatFrequencyMS: 30 + }); + // advance clock by less than the scheduled interval to ensure we don't execute early + clock.tick(29); + expect(fnSpy.callCount).to.equal(0); + // advance clock to the interval + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // advance clock by the interval + clock.tick(30); + expect(fnSpy.calledTwice).to.be.true; + }); + }); + + describe('#wake', function () { + context('when the time until next call is negative', () => { + // somehow we missed the execution, due to an unreliable clock + + it('should execute immediately and schedule the next execution on the interval if this is the first wake', () => { + let fakeClockHasTicked = false; + executor = new MonitorInterval(fnSpy, { + minHeartbeatFrequencyMS: 10, + heartbeatFrequencyMS: 30, + clock: () => { + if (fakeClockHasTicked) { + return 81; + } + fakeClockHasTicked = true; + return 50; + } + }); + + // tick the environment clock by a smaller amount than the interval + clock.tick(2); + // sanity check to make sure we haven't called execute yet + expect(fnSpy.callCount).to.equal(0); + executor.wake(); + // expect immediate execution since expected next call time was 50 + 30 = 80, but the clock shows 81 + expect(fnSpy.calledOnce).to.be.true; + // move forward by more than minInterval but less than full interval to ensure we're scheduling correctly + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + // move forward by the full interval to make sure the scheduled call executes + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + + it('should execute immediately and schedule the next execution on the interval if this is a repeated wake and the current execution is not rescheduled', () => { + let fakeClockTickCount = 0; + executor = new MonitorInterval(fnSpy, { + minHeartbeatFrequencyMS: 10, + heartbeatFrequencyMS: 30, + clock: () => { + if (fakeClockTickCount === 0) { + // on init, return arbitrary starting time + fakeClockTickCount++; + return 50; + } + if (fakeClockTickCount === 1) { + // expected execution time is 80 + // on first wake return a time so less than minInterval is left and no need to reschedule + fakeClockTickCount++; + return 71; + } + return 81; + } + }); + + // tick the clock by a small amount before and after the wake to make sure no unexpected async things are happening + clock.tick(11); + executor.wake(); + clock.tick(5); + expect(fnSpy.callCount).to.equal(0); + // call our second wake that gets the overdue timer, so expect immediate execution + executor.wake(); + expect(fnSpy.calledOnce).to.be.true; + // move forward by more than minInterval but less than full interval to ensure we're scheduling correctly + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + // move forward by the full interval to make sure the scheduled call executes + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + + it('should execute immediately and schedule the next execution on the interval if this is a repeated wake even if the current execution is rescheduled', () => { + let fakeClockTickCount = 0; + executor = new MonitorInterval(fnSpy, { + minHeartbeatFrequencyMS: 10, + heartbeatFrequencyMS: 30, + clock: () => { + if (fakeClockTickCount === 0) { + // on init, return arbitrary starting time + fakeClockTickCount++; + return 50; + } + if (fakeClockTickCount === 1) { + // expected execution time is 80 + // on first wake return a time so that more than minInterval is left + fakeClockTickCount++; + return 61; + } + return 81; + } + }); + + // tick the clock by a small amount before and after the wake to make sure no unexpected async things are happening + clock.tick(2); + executor.wake(); + clock.tick(9); + expect(fnSpy.callCount).to.equal(0); + // call our second wake that gets the overdue timer, so expect immediate execution + executor.wake(); + expect(fnSpy.calledOnce).to.be.true; + // move forward by more than minInterval but less than full interval to ensure we're scheduling correctly + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + // move forward by the full interval to make sure the scheduled call executes + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + }); + + context('when the time until next call is less than the minInterval', () => { + // we can't make it go any faster, so we should let the scheduled execution run + + it('should execute on the interval if this is the first wake', () => { + executor = new MonitorInterval(fnSpy, { + minHeartbeatFrequencyMS: 10, + heartbeatFrequencyMS: 30 + }); + // tick the environment clock so that less than minInterval is left + clock.tick(21); + executor.wake(); + // move forward to just before exepected execution time + clock.tick(8); + expect(fnSpy.callCount).to.equal(0); + // move forward to the full interval to make sure the scheduled call executes + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // check to make sure the next execution runs as expected + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + + it('should execute on the original interval if this is a repeated wake and the current execution is not rescheduled', () => { + executor = new MonitorInterval(fnSpy, { + minHeartbeatFrequencyMS: 10, + heartbeatFrequencyMS: 30 + }); + // tick the environment clock so that less than minInterval is left + clock.tick(21); + executor.wake(); + // tick the environment clock some more so that the next wake is called at a different time + clock.tick(2); + executor.wake(); + // tick to just before the expected execution time + clock.tick(6); + expect(fnSpy.callCount).to.equal(0); + // tick up to 20 for the expected execution + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // check to make sure the next execution runs as expected + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + + it('should execute on the minInterval from the first wake if this is a repeated wake and the current execution is rescheduled', () => { + executor = new MonitorInterval(fnSpy, { + minHeartbeatFrequencyMS: 10, + heartbeatFrequencyMS: 30 + }); + // tick the environment clock so that more than minInterval is left + clock.tick(13); + executor.wake(); + // the first wake should move up the execution to occur at 23 ticks from the start + // we tick 8 to get to 21, so that less than minInterval is left on the original interval expected execution + clock.tick(8); + executor.wake(); + // now we tick to just before the rescheduled execution time + clock.tick(1); + expect(fnSpy.callCount).to.equal(0); + // tick up to 23 for the expected execution + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // check to make sure the next execution runs as expected + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + }); + + context('when the time until next call is more than the minInterval', () => { + // expedite the execution to minInterval + + it('should execute on the minInterval if this is the first wake', () => { + executor = new MonitorInterval(fnSpy, { + minHeartbeatFrequencyMS: 10, + heartbeatFrequencyMS: 30 + }); + // tick the environment clock so that more than minInterval is left + clock.tick(3); + executor.wake(); + // the first wake should move up the execution to occur at 13 ticks from the start + // we tick to just before the rescheduled execution time + clock.tick(9); + expect(fnSpy.callCount).to.equal(0); + // tick up to 13 for the expected execution + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // check to make sure the next execution runs as expected + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + + it('should execute on the minInterval from the first wake if this is a repeated wake', () => { + // NOTE: under regular circumstances, if the second wake is early enough to warrant a reschedule + // then the first wake must have already warranted a reschedule + executor = new MonitorInterval(fnSpy, { + minHeartbeatFrequencyMS: 10, + heartbeatFrequencyMS: 30 + }); + // tick the environment clock so that more than minInterval is left + clock.tick(3); + executor.wake(); + // the first wake should move up the execution to occur at 13 ticks from the start + // we tick a bit more so that more than minInterval is still left and call our repeated wake + clock.tick(2); + executor.wake(); + // tick up to just before the expected execution + clock.tick(7); + expect(fnSpy.callCount).to.equal(0); + // now go up to 13 + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // check to make sure the next execution runs as expected + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + }); + }); + }); +}); diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index fc175ebf92..9b4f7a0118 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1,5 +1,4 @@ import { expect } from 'chai'; -import * as sinon from 'sinon'; import { LEGACY_HELLO_COMMAND } from '../../src/constants'; import { MongoRuntimeError } from '../../src/error'; @@ -8,11 +7,9 @@ import { eachAsync, HostAddress, isHello, - makeInterruptibleAsyncInterval, MongoDBNamespace, shuffle } from '../../src/utils'; -import { createTimerSandbox } from './timer_sandbox'; describe('driver utils', function () { context('eachAsync()', function () { @@ -46,295 +43,6 @@ describe('driver utils', function () { }); }); - describe('#makeInterruptibleAsyncInterval', function () { - let timerSandbox, clock, executor, fnSpy; - - beforeEach(function () { - timerSandbox = createTimerSandbox(); - clock = sinon.useFakeTimers(); - fnSpy = sinon.spy(cb => { - cb(); - }); - }); - - afterEach(function () { - if (executor) { - executor.stop(); - } - clock.restore(); - timerSandbox.restore(); - }); - - context('when the immediate option is provided', function () { - it('executes the function immediately and schedules the next execution on the interval', function () { - executor = makeInterruptibleAsyncInterval(fnSpy, { - immediate: true, - minInterval: 10, - interval: 30 - }); - // expect immediate invocation - expect(fnSpy.calledOnce).to.be.true; - // advance clock by less than the scheduled interval to ensure we don't execute early - clock.tick(29); - expect(fnSpy.calledOnce).to.be.true; - // advance clock to the interval - clock.tick(1); - expect(fnSpy.calledTwice).to.be.true; - }); - }); - - context('when the immediate option is not provided', function () { - it('executes the function on the provided interval', function () { - executor = makeInterruptibleAsyncInterval(fnSpy, { minInterval: 10, interval: 30 }); - // advance clock by less than the scheduled interval to ensure we don't execute early - clock.tick(29); - expect(fnSpy.callCount).to.equal(0); - // advance clock to the interval - clock.tick(1); - expect(fnSpy.calledOnce).to.be.true; - // advance clock by the interval - clock.tick(30); - expect(fnSpy.calledTwice).to.be.true; - }); - }); - - describe('#wake', function () { - context('when the time until next call is negative', () => { - // somehow we missed the execution, due to an unreliable clock - - it('should execute immediately and schedule the next execution on the interval if this is the first wake', () => { - let fakeClockHasTicked = false; - executor = makeInterruptibleAsyncInterval(fnSpy, { - minInterval: 10, - interval: 30, - clock: () => { - if (fakeClockHasTicked) { - return 81; - } - fakeClockHasTicked = true; - return 50; - } - }); - - // tick the environment clock by a smaller amount than the interval - clock.tick(2); - // sanity check to make sure we haven't called execute yet - expect(fnSpy.callCount).to.equal(0); - executor.wake(); - // expect immediate execution since expected next call time was 50 + 30 = 80, but the clock shows 81 - expect(fnSpy.calledOnce).to.be.true; - // move forward by more than minInterval but less than full interval to ensure we're scheduling correctly - clock.tick(29); - expect(fnSpy.calledOnce).to.be.true; - // move forward by the full interval to make sure the scheduled call executes - clock.tick(1); - expect(fnSpy.calledTwice).to.be.true; - }); - - it('should execute immediately and schedule the next execution on the interval if this is a repeated wake and the current execution is not rescheduled', () => { - let fakeClockTickCount = 0; - executor = makeInterruptibleAsyncInterval(fnSpy, { - minInterval: 10, - interval: 30, - clock: () => { - if (fakeClockTickCount === 0) { - // on init, return arbitrary starting time - fakeClockTickCount++; - return 50; - } - if (fakeClockTickCount === 1) { - // expected execution time is 80 - // on first wake return a time so less than minInterval is left and no need to reschedule - fakeClockTickCount++; - return 71; - } - return 81; - } - }); - - // tick the clock by a small amount before and after the wake to make sure no unexpected async things are happening - clock.tick(11); - executor.wake(); - clock.tick(5); - expect(fnSpy.callCount).to.equal(0); - // call our second wake that gets the overdue timer, so expect immediate execution - executor.wake(); - expect(fnSpy.calledOnce).to.be.true; - // move forward by more than minInterval but less than full interval to ensure we're scheduling correctly - clock.tick(29); - expect(fnSpy.calledOnce).to.be.true; - // move forward by the full interval to make sure the scheduled call executes - clock.tick(1); - expect(fnSpy.calledTwice).to.be.true; - }); - - it('should execute immediately and schedule the next execution on the interval if this is a repeated wake even if the current execution is rescheduled', () => { - let fakeClockTickCount = 0; - executor = makeInterruptibleAsyncInterval(fnSpy, { - minInterval: 10, - interval: 30, - clock: () => { - if (fakeClockTickCount === 0) { - // on init, return arbitrary starting time - fakeClockTickCount++; - return 50; - } - if (fakeClockTickCount === 1) { - // expected execution time is 80 - // on first wake return a time so that more than minInterval is left - fakeClockTickCount++; - return 61; - } - return 81; - } - }); - - // tick the clock by a small amount before and after the wake to make sure no unexpected async things are happening - clock.tick(2); - executor.wake(); - clock.tick(9); - expect(fnSpy.callCount).to.equal(0); - // call our second wake that gets the overdue timer, so expect immediate execution - executor.wake(); - expect(fnSpy.calledOnce).to.be.true; - // move forward by more than minInterval but less than full interval to ensure we're scheduling correctly - clock.tick(29); - expect(fnSpy.calledOnce).to.be.true; - // move forward by the full interval to make sure the scheduled call executes - clock.tick(1); - expect(fnSpy.calledTwice).to.be.true; - }); - }); - - context('when the time until next call is less than the minInterval', () => { - // we can't make it go any faster, so we should let the scheduled execution run - - it('should execute on the interval if this is the first wake', () => { - executor = makeInterruptibleAsyncInterval(fnSpy, { - minInterval: 10, - interval: 30 - }); - // tick the environment clock so that less than minInterval is left - clock.tick(21); - executor.wake(); - // move forward to just before exepected execution time - clock.tick(8); - expect(fnSpy.callCount).to.equal(0); - // move forward to the full interval to make sure the scheduled call executes - clock.tick(1); - expect(fnSpy.calledOnce).to.be.true; - // check to make sure the next execution runs as expected - clock.tick(29); - expect(fnSpy.calledOnce).to.be.true; - clock.tick(1); - expect(fnSpy.calledTwice).to.be.true; - }); - - it('should execute on the original interval if this is a repeated wake and the current execution is not rescheduled', () => { - executor = makeInterruptibleAsyncInterval(fnSpy, { - minInterval: 10, - interval: 30 - }); - // tick the environment clock so that less than minInterval is left - clock.tick(21); - executor.wake(); - // tick the environment clock some more so that the next wake is called at a different time - clock.tick(2); - executor.wake(); - // tick to just before the expected execution time - clock.tick(6); - expect(fnSpy.callCount).to.equal(0); - // tick up to 20 for the expected execution - clock.tick(1); - expect(fnSpy.calledOnce).to.be.true; - // check to make sure the next execution runs as expected - clock.tick(29); - expect(fnSpy.calledOnce).to.be.true; - clock.tick(1); - expect(fnSpy.calledTwice).to.be.true; - }); - - it('should execute on the minInterval from the first wake if this is a repeated wake and the current execution is rescheduled', () => { - executor = makeInterruptibleAsyncInterval(fnSpy, { - minInterval: 10, - interval: 30 - }); - // tick the environment clock so that more than minInterval is left - clock.tick(13); - executor.wake(); - // the first wake should move up the execution to occur at 23 ticks from the start - // we tick 8 to get to 21, so that less than minInterval is left on the original interval expected execution - clock.tick(8); - executor.wake(); - // now we tick to just before the rescheduled execution time - clock.tick(1); - expect(fnSpy.callCount).to.equal(0); - // tick up to 23 for the expected execution - clock.tick(1); - expect(fnSpy.calledOnce).to.be.true; - // check to make sure the next execution runs as expected - clock.tick(29); - expect(fnSpy.calledOnce).to.be.true; - clock.tick(1); - expect(fnSpy.calledTwice).to.be.true; - }); - }); - - context('when the time until next call is more than the minInterval', () => { - // expedite the execution to minInterval - - it('should execute on the minInterval if this is the first wake', () => { - executor = makeInterruptibleAsyncInterval(fnSpy, { - minInterval: 10, - interval: 30 - }); - // tick the environment clock so that more than minInterval is left - clock.tick(3); - executor.wake(); - // the first wake should move up the execution to occur at 13 ticks from the start - // we tick to just before the rescheduled execution time - clock.tick(9); - expect(fnSpy.callCount).to.equal(0); - // tick up to 13 for the expected execution - clock.tick(1); - expect(fnSpy.calledOnce).to.be.true; - // check to make sure the next execution runs as expected - clock.tick(29); - expect(fnSpy.calledOnce).to.be.true; - clock.tick(1); - expect(fnSpy.calledTwice).to.be.true; - }); - - it('should execute on the minInterval from the first wake if this is a repeated wake', () => { - // NOTE: under regular circumstances, if the second wake is early enough to warrant a reschedule - // then the first wake must have already warranted a reschedule - executor = makeInterruptibleAsyncInterval(fnSpy, { - minInterval: 10, - interval: 30 - }); - // tick the environment clock so that more than minInterval is left - clock.tick(3); - executor.wake(); - // the first wake should move up the execution to occur at 13 ticks from the start - // we tick a bit more so that more than minInterval is still left and call our repeated wake - clock.tick(2); - executor.wake(); - // tick up to just before the expected execution - clock.tick(7); - expect(fnSpy.callCount).to.equal(0); - // now go up to 13 - clock.tick(1); - expect(fnSpy.calledOnce).to.be.true; - // check to make sure the next execution runs as expected - clock.tick(29); - expect(fnSpy.calledOnce).to.be.true; - clock.tick(1); - expect(fnSpy.calledTwice).to.be.true; - }); - }); - }); - }); - context('new BufferPool()', function () { it('should report the correct length', function () { const buffer = new BufferPool();