Skip to content

Commit

Permalink
Revert "fix(NODE-3521): remove session support checks"
Browse files Browse the repository at this point in the history
This reverts commit a0dfc5b.
  • Loading branch information
durran committed Feb 28, 2022
1 parent ef6894d commit b0ca0b6
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 12 deletions.
8 changes: 7 additions & 1 deletion src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
if (deprecationErrors != null) finalCmd.apiDeprecationErrors = deprecationErrors;
}

if (session) {
if (hasSessionSupport(this) && session) {
if (
session.clusterTime &&
clusterTime &&
Expand Down Expand Up @@ -683,6 +683,12 @@ export class CryptoConnection extends Connection {
}
}

/** @internal */
export function hasSessionSupport(conn: Connection): boolean {
const description = conn.description;
return description.logicalSessionTimeoutMinutes != null || !!description.loadBalanced;
}

function supportsOpMsg(conn: Connection) {
const description = conn.description;
if (description == null) {
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ function next<T>(cursor: AbstractCursor, blocking: boolean, callback: Callback<T

if (cursorId == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
if (cursor[kSession] == null) {
if (cursor[kSession] == null && cursor[kTopology].hasSessionSupport()) {
cursor[kSession] = cursor[kTopology].startSession({ owner: cursor, explicit: false });
}

Expand Down
20 changes: 13 additions & 7 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,19 @@ export function executeOperation<
// that are not explicitly provided with a session.
let session: ClientSession | undefined = operation.session;
let owner: symbol | undefined;
if (session == null) {
owner = Symbol();
session = topology.startSession({ owner, explicit: false });
} else if (session.hasEnded) {
return cb(new MongoExpiredSessionError('Use of expired sessions is not permitted'));
} else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
return cb(new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later'));
if (topology.hasSessionSupport()) {
if (session == null) {
owner = Symbol();
session = topology.startSession({ owner, explicit: false });
} else if (session.hasEnded) {
return cb(new MongoExpiredSessionError('Use of expired sessions is not permitted'));
} else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
return cb(new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later'));
}
} else if (session) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return cb(new MongoCompatibilityError('Current topology does not support sessions'));
}

try {
Expand Down
7 changes: 7 additions & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,13 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return !this.description.hasDataBearingServers;
}

/**
* @returns Whether sessions are supported on the current topology
*/
hasSessionSupport(): boolean {
return this.loadBalanced || this.description.logicalSessionTimeoutMinutes != null;
}

/** Start a logical session */
startSession(options: ClientSessionOptions, clientOptions?: MongoOptions): ClientSession {
const session = new ClientSession(this, this.s.sessionPool, options, clientOptions);
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ export function executeLegacyOperation(
let session: ClientSession;
let opOptions: any;
let owner: any;
if (!options.skipSessions) {
if (!options.skipSessions && topology.hasSessionSupport()) {
opOptions = args[args.length - 2];
if (opOptions == null || opOptions.session == null) {
owner = Symbol();
Expand Down
91 changes: 90 additions & 1 deletion test/unit/assorted/sessions_client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const expect = require('chai').expect;
const mock = require('../../tools/mongodb-mock/index');
const { ReplSetFixture } = require('../../tools/common');
const { isHello } = require('../../../src/utils');
const { MongoClient } = require('../../../src');

Expand All @@ -15,7 +16,95 @@ describe('Sessions - client/unit', function () {
});
});

it('should return a client session when requested', function (done) {
it('should not throw a synchronous exception if sessions are not supported', function () {
test.server.setMessageHandler(request => {
var doc = request.document;
if (isHello(doc)) {
request.reply(Object.assign({}, mock.HELLO));
} else if (doc.endSessions) {
request.reply({ ok: 1 });
}
});

const client = new MongoClient(`mongodb://${test.server.uri()}/test`);
return client.connect().then(() => {
expect(() => client.startSession()).to.not.throw(
'Current topology does not support sessions'
);
return client.close();
});
});

it('should throw an exception if sessions are not supported on some servers', function () {
const replicaSetMock = new ReplSetFixture();
let testClient;
return replicaSetMock
.setup({ doNotInitHandlers: true })
.then(() => {
replicaSetMock.firstSecondaryServer.setMessageHandler(request => {
var doc = request.document;
if (isHello(doc)) {
const hello = replicaSetMock.firstSecondaryStates[0];
hello.logicalSessionTimeoutMinutes = 20;
request.reply(hello);
} else if (doc.endSessions) {
request.reply({ ok: 1 });
}
});

replicaSetMock.secondSecondaryServer.setMessageHandler(request => {
var doc = request.document;
if (isHello(doc)) {
const hello = replicaSetMock.secondSecondaryStates[0];
hello.logicalSessionTimeoutMinutes = 10;
request.reply(hello);
} else if (doc.endSessions) {
request.reply({ ok: 1 });
}
});

replicaSetMock.arbiterServer.setMessageHandler(request => {
var doc = request.document;
if (isHello(doc)) {
const hello = replicaSetMock.arbiterStates[0];
hello.logicalSessionTimeoutMinutes = 30;
request.reply(hello);
} else if (doc.endSessions) {
request.reply({ ok: 1 });
}
});

replicaSetMock.primaryServer.setMessageHandler(request => {
var doc = request.document;
if (isHello(doc)) {
const hello = replicaSetMock.primaryStates[0];
hello.logicalSessionTimeoutMinutes = null;
request.reply(hello);
} else if (doc.endSessions) {
request.reply({ ok: 1 });
}
});

return replicaSetMock.uri();
})
.then(uri => {
testClient = new MongoClient(uri);
return testClient.connect();
})
.then(client => {
const session = client.startSession();
return client.db().collection('t').insertOne({ a: 1 }, { session });
})
.then(() => {
expect.fail('Expected an error to be thrown about not supporting sessions');
})
.catch(error => {
expect(error.message).to.equal('Current topology does not support sessions');
})
.finally(() => (testClient ? testClient.close() : null));
});

it('should return a client session when requested if the topology supports it', function (done) {
test.server.setMessageHandler(request => {
var doc = request.document;
if (isHello(doc)) {
Expand Down
48 changes: 47 additions & 1 deletion test/unit/cmap/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

const mock = require('../../tools/mongodb-mock/index');
const { connect } = require('../../../src/cmap/connect');
const { Connection } = require('../../../src/cmap/connection');
const { Connection, hasSessionSupport } = require('../../../src/cmap/connection');
const { expect } = require('chai');
const { Socket } = require('net');
const { ns, isHello } = require('../../../src/utils');
const { getSymbolFrom } = require('../../tools/utils');

Expand Down Expand Up @@ -106,4 +107,49 @@ describe('Connection - unit/cmap', function () {
done();
});
});

describe('.hasSessionSupport', function () {
let connection;
const stream = new Socket();

context('when logicalSessionTimeoutMinutes is present', function () {
beforeEach(function () {
connection = new Connection(stream, {
hostAddress: server.hostAddress(),
logicalSessionTimeoutMinutes: 5
});
});

it('returns true', function () {
expect(hasSessionSupport(connection)).to.be.true;
});
});

context('when logicalSessionTimeoutMinutes is not present', function () {
context('when in load balancing mode', function () {
beforeEach(function () {
connection = new Connection(stream, {
hostAddress: server.hostAddress(),
loadBalanced: true
});
});

it('returns true', function () {
expect(hasSessionSupport(connection)).to.be.true;
});
});

context('when not in load balancing mode', function () {
beforeEach(function () {
connection = new Connection(stream, {
hostAddress: server.hostAddress()
});
});

it('returns false', function () {
expect(hasSessionSupport(connection)).to.be.false;
});
});
});
});
});

0 comments on commit b0ca0b6

Please sign in to comment.