Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4834): ensure that MessageStream is destroyed when connections are destroyed #3482

Merged
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4130597
fix(NODE-4834): Add calls to destroy kMessageStream
W-A-James Dec 5, 2022
4c10235
test(NODE-4834): WIP - Start work on unit tests
W-A-James Dec 5, 2022
b341938
fix(NODE-4834): Add guards to onError, onClose, onTimeout
W-A-James Dec 6, 2022
032de2d
test(NODE-4834): WIP progress on unit tests
W-A-James Dec 6, 2022
f75e202
style(NODE-4834): Revert style changes
W-A-James Dec 6, 2022
586c410
test(NODE-4834): Remove unit test 'only' annotations
W-A-James Dec 6, 2022
7a8d807
test(NODE-4834): move test to more appropriate position
W-A-James Dec 6, 2022
0c804a6
style(NODE-4834): Remove todo comment
W-A-James Dec 6, 2022
4e5a9bb
docs(NODE-4834): Add tsdoc comments for clarification on closed and d…
W-A-James Dec 6, 2022
5ccb807
style(NODE-4834): Eslint fixes
W-A-James Dec 6, 2022
770d050
test(NODE-4834): Change test descriptions to match testing standards …
W-A-James Dec 7, 2022
92d577d
docs(NODE-4834): Update inline documentation to reference state rathe…
W-A-James Dec 7, 2022
1ada8ca
fix(NODE-4834): Address review comments
W-A-James Dec 8, 2022
21dc8b4
test(NODE-4834): FakeSocket now simulates IO delay
W-A-James Dec 8, 2022
f5f12f5
style(NODE-4834): Revert unintended style change
W-A-James Dec 8, 2022
1e5d874
test(NODE-4834): Ensure that FakeSocket.end also sets this.writableEn…
W-A-James Dec 8, 2022
f940281
fix(NODE-4834): Update Connection.destroy and event handler logic to …
W-A-James Dec 12, 2022
927b883
test(NODE-4834): Add unit tests to check that options are treated cor…
W-A-James Dec 12, 2022
bf1c8f0
fix(NODE-4834): Address eslint fixes
W-A-James Dec 12, 2022
452c1f0
docs(NODE-4834): Remove irrelevant docs
W-A-James Dec 12, 2022
dcda86c
style(NODE-4834): eslint fixes
W-A-James Dec 12, 2022
c74bff8
refactor(NODE-4834): Make all paths through Connection.destroy async
W-A-James Dec 12, 2022
7993c4e
fix(NODE-4834): Address review comments
W-A-James Dec 12, 2022
e5c8e0a
test(NODE-4834): Update old test
W-A-James Dec 12, 2022
6540197
test(NODE-4834): remove 'only' annotation
W-A-James Dec 12, 2022
7d43afb
test(NODE-4834): Fix failing tests
W-A-James Dec 13, 2022
2cdcc21
test(NODE-4834): remove debug messages
W-A-James Dec 13, 2022
8d66f61
fix: only invoke callback when it is defined
nbbeeken Dec 13, 2022
44d5b05
fix: lint
nbbeeken Dec 13, 2022
860d741
rm: prints
nbbeeken Dec 13, 2022
3b59a12
test(NODE-4834): Update tests to be more coherent
W-A-James Dec 15, 2022
50fc706
test(NODE-4834): Reorganize tests
W-A-James Dec 15, 2022
a11a4f6
test(NODE-4834): remove to.not.have.been.calledOnce instances
W-A-James Dec 15, 2022
23755ca
test(NODE-4834): Separate tests more cleanly
W-A-James Dec 16, 2022
878c764
test(NODE-4834): Split up tests more cleanly
W-A-James Dec 16, 2022
995700b
test(NODE-4834): fix test name typo
W-A-James Dec 16, 2022
c0d0189
Merge branch 'main' into NODE-4834/destroy_message_stream_when_connec…
nbbeeken Dec 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ function performInitialHandshake(
) {
const callback: Callback<Document> = function (err, ret) {
if (err && conn) {
conn.destroy();
conn.destroy({ force: false });
}
_callback(err, ret);
};
Expand Down
53 changes: 16 additions & 37 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export interface ConnectionOptions
/** @public */
export interface DestroyOptions {
/** Force the destruction. */
force?: boolean;
force: boolean;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
}

/** @public */
Expand All @@ -170,8 +170,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
address: string;
socketTimeoutMS: number;
monitorCommands: boolean;
/** Indicates that the connection (including underlying TCP socket) has been closed. */
closed: boolean;
destroyed: boolean;
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
Expand Down Expand Up @@ -220,7 +220,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.monitorCommands = options.monitorCommands;
this.serverApi = options.serverApi;
this.closed = false;
this.destroyed = false;
this[kHello] = null;
this[kClusterTime] = null;

Expand Down Expand Up @@ -313,10 +312,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
if (this.closed) {
return;
}

this[kStream].destroy(error);

this.closed = true;
this.destroy({ force: false });

for (const op of this[kQueue].values()) {
op.cb(error);
Expand All @@ -330,8 +326,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
if (this.closed) {
return;
}

this.closed = true;
this.destroy({ force: false });

const message = `connection ${this.id} to ${this.address} closed`;
for (const op of this[kQueue].values()) {
Expand All @@ -348,9 +343,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

this[kDelayedTimeoutId] = setTimeout(() => {
this[kStream].destroy();

this.closed = true;
this.destroy({ force: false });

const message = `connection ${this.id} to ${this.address} timed out`;
const beforeHandshake = this.hello == null;
Expand Down Expand Up @@ -459,41 +452,27 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
callback(undefined, message.documents[0]);
}

destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = { force: false };
}

destroy(options: DestroyOptions, callback?: Callback): void {
this.removeAllListeners(Connection.PINNED);
this.removeAllListeners(Connection.UNPINNED);
baileympearson marked this conversation as resolved.
Show resolved Hide resolved

options = Object.assign({ force: false }, options);
if (this[kStream] == null || this.destroyed) {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}

return;
}
this[kMessageStream].destroy();
this.closed = true;

if (options.force) {
this[kStream].destroy();
this.destroyed = true;
if (typeof callback === 'function') {
callback();
if (callback) {
return process.nextTick(callback);
}

return;
}

this[kStream].end(() => {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
if (!this[kStream].writableEnded) {
this[kStream].end(callback);
} else {
if (callback) {
return process.nextTick(callback);
}
});
}
}

command(
Expand Down
4 changes: 2 additions & 2 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy(options, cb);
conn.destroy({ force: !!options.force }, cb);
},
err => {
this[kConnections].clear();
Expand Down Expand Up @@ -591,7 +591,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionClosedEvent(this, connection, reason)
);
// destroy the connection
process.nextTick(() => connection.destroy());
process.nextTick(() => connection.destroy({ force: false }));
}

private connectionIsStale(connection: Connection) {
Expand Down
5 changes: 4 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {

/** Destroy the server connection */
destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') (callback = options), (options = {});
if (typeof options === 'function') {
callback = options;
options = { force: false };
}
options = Object.assign({}, { force: false }, options);

if (this.s.state === STATE_CLOSED) {
Expand Down
17 changes: 4 additions & 13 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,26 +484,17 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

/** Close this topology */
close(callback: Callback): void;
close(options: CloseOptions): void;
close(options: CloseOptions, callback: Callback): void;
close(options?: CloseOptions | Callback, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = {};
}

if (typeof options === 'boolean') {
options = { force: options };
}
options = options ?? {};
close(options?: CloseOptions, callback?: Callback): void {
options = options ?? { force: false };

if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
return callback?.();
}

const destroyedServers = Array.from(this.s.servers.values(), server => {
return promisify(destroyServer)(server, this, options as CloseOptions);
return promisify(destroyServer)(server, this, { force: !!options?.force });
});

Promise.all(destroyedServers)
Expand Down Expand Up @@ -765,7 +756,7 @@ function destroyServer(
options?: DestroyOptions,
callback?: Callback
) {
options = options ?? {};
options = options ?? { force: false };
for (const event of LOCAL_SERVER_EVENTS) {
server.removeAllListeners(event);
}
Expand Down
69 changes: 20 additions & 49 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { ReadPreference } = require('../../../src/read_preference');
const { ServerType } = require('../../../src/sdam/common');
const { formatSort } = require('../../../src/sort');
const { getSymbolFrom } = require('../../tools/utils');
const { MongoExpiredSessionError } = require('../../../src/error');

describe('Cursor', function () {
before(function () {
Expand Down Expand Up @@ -1905,61 +1906,31 @@ describe('Cursor', function () {
}
});

it('should close dead tailable cursors', {
metadata: {
os: '!win32' // NODE-2943: timeout on windows
},

test: function (done) {
// http://www.mongodb.org/display/DOCS/Tailable+Cursors

const configuration = this.configuration;
client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());

const db = client.db(configuration.db);
const options = { capped: true, size: 10000000 };
db.createCollection(
'test_if_dead_tailable_cursors_close',
options,
function (err, collection) {
expect(err).to.not.exist;
it('closes cursors when client is closed even if it has not been exhausted', async function () {
await client
.db()
.dropCollection('test_cleanup_tailable')
.catch(() => null);

let closeCount = 0;
const docs = Array.from({ length: 100 }).map(() => ({ a: 1 }));
collection.insertMany(docs, { w: 'majority', wtimeoutMS: 5000 }, err => {
expect(err).to.not.exist;

const cursor = collection.find({}, { tailable: true, awaitData: true });
const stream = cursor.stream();
const collection = await client
.db()
.createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 });

stream.resume();

var validator = () => {
closeCount++;
if (closeCount === 2) {
done();
}
};
// insert only 2 docs in capped coll of 3
await collection.insertMany([{ a: 1 }, { a: 1 }]);

// we validate that the stream "ends" either cleanly or with an error
stream.on('end', validator);
stream.on('error', validator);
const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 });

cursor.on('close', validator);
await cursor.next();
await cursor.next();
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
// will block for maxAwaitTimeMS (except we are closing the client)
const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error);

const docs = Array.from({ length: 100 }).map(() => ({ a: 1 }));
collection.insertMany(docs, err => {
expect(err).to.not.exist;
await client.close();
expect(cursor).to.have.property('killed', true);

setTimeout(() => client.close());
});
});
}
);
});
}
const error = await rejectedEarlyBecauseClientClosed;
expect(error).to.be.instanceOf(MongoExpiredSessionError);
});

it('shouldAwaitData', {
Expand Down
18 changes: 13 additions & 5 deletions test/integration/node-specific/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ describe('Topology', function () {
const states = [];
topology.on('stateChanged', (_, newState) => states.push(newState));
topology.connect(err => {
expect(err).to.not.exist;
topology.close(err => {
try {
expect(err).to.not.exist;
expect(topology.isDestroyed()).to.be.true;
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
done();
} catch (error) {
done(error);
}
topology.close({}, err => {
try {
expect(err).to.not.exist;
expect(topology.isDestroyed()).to.be.true;
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
done();
} catch (error) {
done(error);
}
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ describe('Polling Srv Records for Mongos Discovery', () => {

afterEach(function (done) {
if (context.topology) {
context.topology.close(done);
context.topology.close({}, done);
} else {
done();
}
Expand Down
2 changes: 1 addition & 1 deletion test/unit/assorted/server_selection_spec_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ function executeServerSelectionTest(testDefinition, testDone) {
});

function done(err) {
topology.close(e => testDone(e || err));
topology.close({}, e => testDone(e || err));
}

topology.connect(err => {
Expand Down
Loading