Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-3083): support aggregate writes on secondaries #3022

Merged
merged 7 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export interface CommandOptions extends BSONSerializeOptions {
session?: ClientSession;
documentsReturnedIn?: string;
noResponse?: boolean;
omitReadPreference?: boolean;
dariakp marked this conversation as resolved.
Show resolved Hide resolved

// FIXME: NODE-2802
willRetryWrite?: boolean;
Expand Down
3 changes: 1 addition & 2 deletions src/operations/aggregate.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { CommandOperation, CommandOperationOptions, CollationOptions } from './command';
import { ReadPreference } from '../read_preference';
import { MongoInvalidArgumentError } from '../error';
import { maxWireVersion, MongoDBNamespace } from '../utils';
import { Aspect, defineAspects, Hint } from './operation';
Expand Down Expand Up @@ -65,7 +64,7 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
}

if (this.hasWriteStage) {
this.readPreference = ReadPreference.primary;
this.trySecondaryWrite = true;
dariakp marked this conversation as resolved.
Show resolved Hide resolved
}

if (this.explain && this.writeConcern) {
Expand Down
5 changes: 5 additions & 0 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { Server } from '../sdam/server';
import type { BSONSerializeOptions, Document } from '../bson';
import type { ReadConcernLike } from './../read_concern';
import { Explain, ExplainOptions } from '../explain';
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';

const SUPPORTS_WRITE_CONCERN_AND_COLLATION = 5;

Expand Down Expand Up @@ -126,6 +127,10 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
Object.assign(cmd, { readConcern: this.readConcern });
}

if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
options.omitReadPreference = true;
}

if (options.collation && serverWireVersion < SUPPORTS_WRITE_CONCERN_AND_COLLATION) {
callback(
new MongoCompatibilityError(
Expand Down
15 changes: 13 additions & 2 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import type { Document } from '../bson';
import { supportsRetryableWrites } from '../utils';
import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
Expand Down Expand Up @@ -150,6 +151,16 @@ function executeWithServerSelection(
session.unpin();
}

let selector: ReadPreference | ServerSelector;

// If operation should try to write to secondary use the custom server selector
// otherwise provide the read preference.
if (operation.trySecondaryWrite) {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
} else {
selector = readPreference;
}

const serverSelectionOptions = { session };
function callbackWithRetry(err?: any, result?: any) {
if (err == null) {
Expand Down Expand Up @@ -182,7 +193,7 @@ function executeWithServerSelection(
}

// select a new server, and attempt to retry the operation
topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => {
topology.selectServer(selector, serverSelectionOptions, (e?: any, server?: any) => {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
if (
e ||
(operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) ||
Expand Down Expand Up @@ -227,7 +238,7 @@ function executeWithServerSelection(
}

// select a server, and execute the operation against it
topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => {
topology.selectServer(selector, serverSelectionOptions, (err?: any, server?: any) => {
if (err) {
callback(err);
return;
Expand Down
3 changes: 3 additions & 0 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface OperationOptions extends BSONSerializeOptions {

/** @internal Hints to `executeOperation` that this operation should not unpin on an ended transaction */
bypassPinningCheck?: boolean;
omitReadPreference?: boolean;
}

/** @internal */
Expand All @@ -49,6 +50,7 @@ export abstract class AbstractOperation<TResult = any> {
readPreference: ReadPreference;
server!: Server;
bypassPinningCheck: boolean;
trySecondaryWrite: boolean;

// BSON serialization options
bsonOptions?: BSONSerializeOptions;
Expand All @@ -72,6 +74,7 @@ export abstract class AbstractOperation<TResult = any> {

this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;
this.trySecondaryWrite = false;
}

abstract execute(server: Server, session: ClientSession, callback: Callback<TResult>): void;
Expand Down
8 changes: 8 additions & 0 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,14 @@ export class Server extends TypedEventEmitter<ServerEvents> {
// Clone the options
const finalOptions = Object.assign({}, options, { wireProtocolCommand: false });

// There are cases where we need to flag the read preference not to get sent in
// the command, such as pre-5.0 servers attempting to perform an aggregate write
// with a non-primary read preference. In this case the effective read preference
// (primary) is not the same as the provided and must be removed completely.
if (finalOptions.omitReadPreference) {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
delete finalOptions.readPreference;
}

// error if collation not supported
if (collationNotSupported(this, cmd)) {
callback(new MongoCompatibilityError(`Server ${this.name} does not support collation`));
Expand Down
25 changes: 25 additions & 0 deletions src/sdam/server_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import type { ServerDescription, TagSet } from './server_description';
const IDLE_WRITE_PERIOD = 10000;
const SMALLEST_MAX_STALENESS_SECONDS = 90;

// Minimum version to try writes on secondaries.
export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
dariakp marked this conversation as resolved.
Show resolved Hide resolved

/** @public */
export type ServerSelector = (
topologyDescription: TopologyDescription,
Expand All @@ -28,6 +31,28 @@ export function writableServerSelector(): ServerSelector {
);
}

/**
* Returns a server selector that uses a read preference to select a
* server potentially for a write on a secondary.
*/
export function secondaryWritableServerSelector(
dariakp marked this conversation as resolved.
Show resolved Hide resolved
wireVersion?: number,
readPreference?: ReadPreference
): ServerSelector {
// If server version < 5.0, read preference always primary.
// If server version >= 5.0...
// - If read preference is supplied, use that.
// - If no read preference is supplied, use primary.
if (
!readPreference ||
!wireVersion ||
(wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION)
) {
return readPreferenceServerSelector(ReadPreference.primary);
}
return readPreferenceServerSelector(readPreference);
}

/**
* Reduces the passed in array of servers by the rules of the "Max Staleness" specification
* found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst
Expand Down
4 changes: 4 additions & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return result;
}

get commonWireVersion(): number | undefined {
return this.description.commonWireVersion;
}

get logicalSessionTimeoutMinutes(): number | undefined {
return this.description.logicalSessionTimeoutMinutes;
}
Expand Down
6 changes: 1 addition & 5 deletions test/functional/crud_spec.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -424,15 +424,11 @@ describe('CRUD spec v1', function () {
}
});

// TODO: Unskip when implementing NODE-3083.
const SKIP = ['aggregate-write-readPreference', 'db-aggregate-write-readPreference'];

describe('CRUD unified', function () {
for (const crudSpecTest of loadSpecTests('crud/unified')) {
expect(crudSpecTest).to.exist;
const testDescription = String(crudSpecTest.description);
const spec = SKIP.includes(testDescription) ? context.skip : context;
spec(testDescription, function () {
context(testDescription, function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
for (const test of crudSpecTest.tests) {
it(String(test.description), {
metadata: { sessions: { skipLeakTests: true } },
Expand Down
8 changes: 5 additions & 3 deletions test/spec/crud/unified/aggregate-write-readPreference.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "aggregate-write-readPreference",
"schemaVersion": "1.3",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "3.6",
Expand Down Expand Up @@ -90,7 +90,8 @@
"description": "Aggregate with $out includes read preference for 5.0+ server",
"runOnRequirements": [
{
"minServerVersion": "5.0"
"minServerVersion": "5.0",
"serverless": "forbid"
dariakp marked this conversation as resolved.
Show resolved Hide resolved
}
],
"operations": [
Expand Down Expand Up @@ -181,7 +182,8 @@
"runOnRequirements": [
{
"minServerVersion": "4.2",
"maxServerVersion": "4.4.99"
"maxServerVersion": "4.4.99",
"serverless": "forbid"
}
],
"operations": [
Expand Down
4 changes: 3 additions & 1 deletion test/spec/crud/unified/aggregate-write-readPreference.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
description: aggregate-write-readPreference

schemaVersion: '1.3'
schemaVersion: '1.4'

runOnRequirements:
# 3.6+ non-standalone is needed to utilize $readPreference in OP_MSG
Expand Down Expand Up @@ -59,6 +59,7 @@ tests:
- description: "Aggregate with $out includes read preference for 5.0+ server"
runOnRequirements:
- minServerVersion: "5.0"
serverless: "forbid"
operations:
- object: *collection0
name: aggregate
Expand Down Expand Up @@ -91,6 +92,7 @@ tests:
# drivers may avoid inheriting a client-level read concern for pre-4.2.
- minServerVersion: "4.2"
maxServerVersion: "4.4.99"
serverless: "forbid"
operations:
- object: *collection0
name: aggregate
Expand Down
6 changes: 4 additions & 2 deletions test/spec/crud/unified/db-aggregate-write-readPreference.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
"description": "Database-level aggregate with $out includes read preference for 5.0+ server",
"runOnRequirements": [
{
"minServerVersion": "5.0"
"minServerVersion": "5.0",
"serverless": "forbid"
}
],
"operations": [
Expand Down Expand Up @@ -158,7 +159,8 @@
"runOnRequirements": [
{
"minServerVersion": "4.2",
"maxServerVersion": "4.4.99"
"maxServerVersion": "4.4.99",
"serverless": "forbid"
}
],
"operations": [
Expand Down
2 changes: 2 additions & 0 deletions test/spec/crud/unified/db-aggregate-write-readPreference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tests:
- description: "Database-level aggregate with $out includes read preference for 5.0+ server"
runOnRequirements:
- minServerVersion: "5.0"
serverless: "forbid"
operations:
- object: *database0
name: aggregate
Expand Down Expand Up @@ -85,6 +86,7 @@ tests:
# drivers may avoid inheriting a client-level read concern for pre-4.2.
- minServerVersion: "4.2"
maxServerVersion: "4.4.99"
serverless: "forbid"
operations:
- object: *database0
name: aggregate
Expand Down
72 changes: 72 additions & 0 deletions test/unit/operations/aggregate.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
'use strict';

const { expect } = require('chai');
const { AggregateOperation } = require('../../../src/operations/aggregate');

describe('AggregateOperation', function () {
const db = 'test';

describe('#constructor', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
context('when out is in the options', function () {
const operation = new AggregateOperation(db, [], { out: 'test', dbName: db });

it('sets trySecondaryWrite to true', function () {
expect(operation.trySecondaryWrite).to.be.true;
});
});

context('when $out is the last stage', function () {
const operation = new AggregateOperation(db, [{ $out: 'test' }], { dbName: db });
dariakp marked this conversation as resolved.
Show resolved Hide resolved

it('sets trySecondaryWrite to true', function () {
expect(operation.trySecondaryWrite).to.be.true;
});
});

context('when $out is not the last stage', function () {
const operation = new AggregateOperation(db, [{ $out: 'test' }, { $project: { name: 1 } }], {
dbName: db
});

it('sets trySecondaryWrite to false', function () {
expect(operation.trySecondaryWrite).to.be.false;
});
});

context('when $merge is the last stage', function () {
const operation = new AggregateOperation(db, [{ $merge: { into: 'test' } }], { dbName: db });

it('sets trySecondaryWrite to true', function () {
expect(operation.trySecondaryWrite).to.be.true;
});
});

context('when $merge is not the last stage', function () {
const operation = new AggregateOperation(
db,
[{ $merge: { into: 'test' } }, { $project: { name: 1 } }],
{ dbName: db }
);

it('sets trySecondaryWrite to false', function () {
expect(operation.trySecondaryWrite).to.be.false;
});
});

context('when no writable stages in empty pipeline', function () {
const operation = new AggregateOperation(db, [], { dbName: db });

it('sets trySecondaryWrite to false', function () {
expect(operation.trySecondaryWrite).to.be.false;
});
});

context('when no writable stages', function () {
const operation = new AggregateOperation(db, [{ $project: { name: 1 } }], { dbName: db });

it('sets trySecondaryWrite to false', function () {
expect(operation.trySecondaryWrite).to.be.false;
});
});
});
});
Loading