Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4834): ensure that MessageStream is destroyed when connections are destroyed #3482

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4130597
fix(NODE-4834): Add calls to destroy kMessageStream
W-A-James Dec 5, 2022
4c10235
test(NODE-4834): WIP - Start work on unit tests
W-A-James Dec 5, 2022
b341938
fix(NODE-4834): Add guards to onError, onClose, onTimeout
W-A-James Dec 6, 2022
032de2d
test(NODE-4834): WIP progress on unit tests
W-A-James Dec 6, 2022
f75e202
style(NODE-4834): Revert style changes
W-A-James Dec 6, 2022
586c410
test(NODE-4834): Remove unit test 'only' annotations
W-A-James Dec 6, 2022
7a8d807
test(NODE-4834): move test to more appropriate position
W-A-James Dec 6, 2022
0c804a6
style(NODE-4834): Remove todo comment
W-A-James Dec 6, 2022
4e5a9bb
docs(NODE-4834): Add tsdoc comments for clarification on closed and d…
W-A-James Dec 6, 2022
5ccb807
style(NODE-4834): Eslint fixes
W-A-James Dec 6, 2022
770d050
test(NODE-4834): Change test descriptions to match testing standards …
W-A-James Dec 7, 2022
92d577d
docs(NODE-4834): Update inline documentation to reference state rathe…
W-A-James Dec 7, 2022
1ada8ca
fix(NODE-4834): Address review comments
W-A-James Dec 8, 2022
21dc8b4
test(NODE-4834): FakeSocket now simulates IO delay
W-A-James Dec 8, 2022
f5f12f5
style(NODE-4834): Revert unintended style change
W-A-James Dec 8, 2022
1e5d874
test(NODE-4834): Ensure that FakeSocket.end also sets this.writableEn…
W-A-James Dec 8, 2022
f940281
fix(NODE-4834): Update Connection.destroy and event handler logic to …
W-A-James Dec 12, 2022
927b883
test(NODE-4834): Add unit tests to check that options are treated cor…
W-A-James Dec 12, 2022
bf1c8f0
fix(NODE-4834): Address eslint fixes
W-A-James Dec 12, 2022
452c1f0
docs(NODE-4834): Remove irrelevant docs
W-A-James Dec 12, 2022
dcda86c
style(NODE-4834): eslint fixes
W-A-James Dec 12, 2022
c74bff8
refactor(NODE-4834): Make all paths through Connection.destroy async
W-A-James Dec 12, 2022
7993c4e
fix(NODE-4834): Address review comments
W-A-James Dec 12, 2022
e5c8e0a
test(NODE-4834): Update old test
W-A-James Dec 12, 2022
6540197
test(NODE-4834): remove 'only' annotation
W-A-James Dec 12, 2022
7d43afb
test(NODE-4834): Fix failing tests
W-A-James Dec 13, 2022
2cdcc21
test(NODE-4834): remove debug messages
W-A-James Dec 13, 2022
8d66f61
fix: only invoke callback when it is defined
nbbeeken Dec 13, 2022
44d5b05
fix: lint
nbbeeken Dec 13, 2022
860d741
rm: prints
nbbeeken Dec 13, 2022
3b59a12
test(NODE-4834): Update tests to be more coherent
W-A-James Dec 15, 2022
50fc706
test(NODE-4834): Reorganize tests
W-A-James Dec 15, 2022
a11a4f6
test(NODE-4834): remove to.not.have.been.calledOnce instances
W-A-James Dec 15, 2022
23755ca
test(NODE-4834): Separate tests more cleanly
W-A-James Dec 16, 2022
878c764
test(NODE-4834): Split up tests more cleanly
W-A-James Dec 16, 2022
995700b
test(NODE-4834): fix test name typo
W-A-James Dec 16, 2022
c0d0189
Merge branch 'main' into NODE-4834/destroy_message_stream_when_connec…
nbbeeken Dec 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
address: string;
socketTimeoutMS: number;
monitorCommands: boolean;
/** Indicates that the connection (including underlying TCP socket) has been closed. */
closed: boolean;
/** Indicates that the connection has been explicitly destroyed. In the destroyed state, the connection is also closed */
destroyed: boolean;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
lastHelloMS?: number;
serverApi?: ServerApi;
Expand Down Expand Up @@ -309,11 +311,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this[kLastUseTime] = now();
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @remarks
* onError is called when an error is propagated up from the socket behind
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
* kStream to kMessageStream. This occurs prior to the closing of the socket,
* so the resource must be return to the operating system here.
*/
onError(error: Error) {
if (this.closed) {
if (this.closed || this.destroyed) {
return;
}

this[kMessageStream].destroy(error);
this[kStream].destroy(error);

this.closed = true;
Expand All @@ -326,10 +335,16 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.emit(Connection.CLOSE);
}

/**
* @remarks
* onClose is called when the socket underlying kStream has been closed and
* the resource is returned to the operating system
*/
onClose() {
if (this.closed) {
if (this.closed || this.destroyed) {
return;
}
this[kMessageStream].destroy();

this.closed = true;

Expand All @@ -342,12 +357,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.emit(Connection.CLOSE);
}

/**
* @remarks
* onTimeout is called when the tcp socket underlying kStream times out. This
* occurs prior to the closing of the socket, so the resource must be returned
* to the operating system here */
onTimeout() {
if (this.closed) {
if (this.closed || this.destroyed) {
return;
}

this[kDelayedTimeoutId] = setTimeout(() => {
this[kMessageStream].destroy();
this[kStream].destroy();

this.closed = true;
Expand Down Expand Up @@ -468,32 +489,28 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.removeAllListeners(Connection.PINNED);
this.removeAllListeners(Connection.UNPINNED);
baileympearson marked this conversation as resolved.
Show resolved Hide resolved

options = Object.assign({ force: false }, options);
if (this[kStream] == null || this.destroyed) {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}
this[kMessageStream].destroy();

return;
options = Object.assign({ force: false }, options);
if (this.destroyed) {
return callback?.();
}

if (options.force) {
this[kStream].destroy();
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}

return;
return callback?.();
}

this[kStream].end(() => {
if (this[kStream].writableEnded) {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}
});
return callback?.();
} else {
this[kStream].end(() => {
this.destroyed = true;
callback?.();
});
addaleax marked this conversation as resolved.
Show resolved Hide resolved
}
}

command(
Expand Down
127 changes: 126 additions & 1 deletion test/unit/cmap/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const connectionOptionsDefaults = {

/** The absolute minimum socket API needed by Connection as of writing this test */
class FakeSocket extends EventEmitter {
writableEnded: boolean;
address() {
// is never called
}
Expand All @@ -34,6 +35,11 @@ class FakeSocket extends EventEmitter {
}
destroy() {
// is called, has no side effects
this.writableEnded = true;
}
end(cb) {
// nextTick to simulate I/O delay
process.nextTick(cb);
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
}
get remoteAddress() {
return 'iLoveJavaScript';
Expand Down Expand Up @@ -389,7 +395,7 @@ describe('new Connection()', function () {
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
const messageStreamSymbol = getSymbolFrom(connection, 'messageStream');
kDelayedTimeoutId = getSymbolFrom(connection, 'delayedTimeoutId');
messageStream = connection[messageStreamSymbol];
messageStream = sinon.spy(connection[messageStreamSymbol]);
});

afterEach(() => {
Expand Down Expand Up @@ -433,6 +439,85 @@ describe('new Connection()', function () {
expect(connection).to.have.property('closed', false);
expect(connection).to.have.property(kDelayedTimeoutId, null);
});

it('destroys the message stream and socket', () => {
expect(connection).to.have.property(kDelayedTimeoutId, null);

driverSocket.emit('timeout');

clock.tick(1);

expect(connection.onTimeout).to.have.been.calledOnce;
expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass);

expect(messageStream.destroy).to.have.been.calledOnce;
expect(driverSocket.destroy).to.have.been.calledOnce;
});
});

describe('onError()', () => {
let connection: sinon.SinonSpiedInstance<Connection>;
let clock: sinon.SinonFakeTimers;
let timerSandbox: sinon.SinonFakeTimers;
let driverSocket: sinon.SinonSpiedInstance<FakeSocket>;
let messageStream: MessageStream;
beforeEach(() => {
timerSandbox = createTimerSandbox();
clock = sinon.useFakeTimers();
driverSocket = sinon.spy(new FakeSocket());
// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
const messageStreamSymbol = getSymbolFrom(connection, 'messageStream');
messageStream = sinon.spy(connection[messageStreamSymbol]);
});

afterEach(() => {
timerSandbox.restore();
clock.restore();
});

it('destroys the message stream and socket', () => {
messageStream.emit('error');

clock.tick(1);

expect(connection.onError).to.have.been.calledOnce;

expect(messageStream.destroy).to.have.been.calledOnce;
expect(driverSocket.destroy).to.have.been.calledOnce;
});
});

describe('onClose()', () => {
let connection: sinon.SinonSpiedInstance<Connection>;
let clock: sinon.SinonFakeTimers;
let timerSandbox: sinon.SinonFakeTimers;
let driverSocket: sinon.SinonSpiedInstance<FakeSocket>;
let messageStream: MessageStream;
beforeEach(() => {
timerSandbox = createTimerSandbox();
clock = sinon.useFakeTimers();

driverSocket = sinon.spy(new FakeSocket());
// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
const messageStreamSymbol = getSymbolFrom(connection, 'messageStream');
messageStream = sinon.spy(connection[messageStreamSymbol]);
});

afterEach(() => {
timerSandbox.restore();
clock.restore();
});

it('destroys the message stream', () => {
driverSocket.emit('close');

clock.tick(1);

expect(connection.onClose).to.have.been.calledOnce;
expect(messageStream.destroy).to.have.been.calledOnce;
});
});

describe('.hasSessionSupport', function () {
Expand Down Expand Up @@ -486,4 +571,44 @@ describe('new Connection()', function () {
});
});
});

describe('destroy()', () => {
let connection: sinon.SinonSpiedInstance<Connection>;
let clock: sinon.SinonFakeTimers;
let timerSandbox: sinon.SinonFakeTimers;
let driverSocket: sinon.SinonSpiedInstance<FakeSocket>;
let messageStream: MessageStream;
beforeEach(() => {
timerSandbox = createTimerSandbox();
clock = sinon.useFakeTimers();

driverSocket = sinon.spy(new FakeSocket());
// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
const messageStreamSymbol = getSymbolFrom(connection, 'messageStream');
messageStream = sinon.spy(connection[messageStreamSymbol]);
});

afterEach(() => {
timerSandbox.restore();
clock.restore();
});

it('ends the tcp socket and destroys the messageStream', () => {
connection.destroy();
clock.tick(1);
expect(messageStream.destroy).to.have.been.calledOnce;
expect(driverSocket.end).to.have.been.calledOnce;
});

it('does not call stream.end after onClose, onTimeout, or onError', () => {
messageStream.emit('error');
clock.tick(1);
expect(connection.onError).to.have.been.calledOnce;
expect(driverSocket.destroy).to.have.been.calledOnce;
connection.destroy();
clock.tick(1);
expect(driverSocket.end).to.not.have.been.called;
});
});
});