Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4069): remove 'default' from options for fullDocument field in change stream options #3169

Merged
merged 9 commits into from
Mar 16, 2022
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
"check:socks5": "mocha --config test/manual/mocharc.json test/manual/socks5.test.ts",
"check:csfle": "mocha --config test/mocha_mongodb.json test/integration/client-side-encryption",
"check:snappy": "mocha test/unit/assorted/snappy.test.js",
"fix:eslint": "npm run check:eslint -- --fix",
"prepare": "node etc/prepare.js",
"preview:docs": "ts-node etc/docs/preview.ts",
"release": "standard-version -i HISTORY.md",
Expand Down
63 changes: 36 additions & 27 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,20 @@ const kClosed = Symbol('closed');
/** @internal */
const kMode = Symbol('mode');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
CHANGE_STREAM_OPTIONS
);
const CHANGE_STREAM_OPTIONS = [
'resumeAfter',
'startAfter',
'startAtOperationTime',
'fullDocument'
] as const;

const CURSOR_OPTIONS = [
'batchSize',
'maxAwaitTimeMS',
'collation',
'readPreference',
...CHANGE_STREAM_OPTIONS
] as const;

const CHANGE_DOMAIN_TYPES = {
COLLECTION: Symbol('Collection'),
Expand All @@ -68,6 +78,8 @@ export interface ResumeOptions {
maxAwaitTimeMS?: number;
collation?: CollationOptions;
readPreference?: ReadPreference;
resumeAfter?: ResumeToken;
startAfter?: ResumeToken;
}

/**
Expand All @@ -94,7 +106,7 @@ export interface PipeOptions {
* @public
*/
export interface ChangeStreamOptions extends AggregateOptions {
/** Allowed values: ‘default’, ‘updateLookup. When set to updateLookup, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
/** Allowed values: 'updateLookup'. When set to 'updateLookup', the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
fullDocument?: string;
/** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */
maxAwaitTimeMS?: number;
Expand Down Expand Up @@ -446,22 +458,18 @@ export class ChangeStreamCursor<TSchema extends Document = Document> extends Abs
}

get resumeOptions(): ResumeOptions {
const result = {} as ResumeOptions;
for (const optionName of CURSOR_OPTIONS) {
if (Reflect.has(this.options, optionName)) {
Reflect.set(result, optionName, Reflect.get(this.options, optionName));
}
}
const result: ResumeOptions = applyKnownOptions(this.options, CURSOR_OPTIONS);

if (this.resumeToken || this.startAtOperationTime) {
['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key =>
Reflect.deleteProperty(result, key)
);
for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime']) {
Reflect.deleteProperty(result, key);
}

if (this.resumeToken) {
const resumeKey =
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
Reflect.set(result, resumeKey, this.resumeToken);

result[resumeKey] = this.resumeToken;
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
result.startAtOperationTime = this.startAtOperationTime;
}
Expand Down Expand Up @@ -568,25 +576,25 @@ function setIsIterator<TSchema>(changeStream: ChangeStream<TSchema>): void {
}
changeStream[kMode] = 'iterator';
}

/**
* Create a new change stream cursor based on self's configuration
* @internal
*/
function createChangeStreamCursor<TSchema>(
changeStream: ChangeStream<TSchema>,
options: ChangeStreamOptions
options: ChangeStreamOptions | ResumeOptions
): ChangeStreamCursor<TSchema> {
const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
const changeStreamStageOptions = applyKnownOptions(options, CHANGE_STREAM_OPTIONS);
if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}

const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(
changeStream.pipeline
);

const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const cursorOptions: ChangeStreamCursorOptions = applyKnownOptions(options, CURSOR_OPTIONS);

const changeStreamCursor = new ChangeStreamCursor<TSchema>(
getTopology(changeStream.parent),
changeStream.namespace,
Expand All @@ -605,16 +613,17 @@ function createChangeStreamCursor<TSchema>(
return changeStreamCursor;
}

function applyKnownOptions(target: Document, source: Document, optionNames: string[]) {
optionNames.forEach(name => {
if (source[name]) {
target[name] = source[name];
function applyKnownOptions(source: Document, options: ReadonlyArray<string>) {
const result: Document = {};

for (const option of options) {
if (source[option]) {
result[option] = source[option];
}
});
}

return target;
return result;
}

interface TopologyWaitOptions {
start?: number;
timeout?: number;
Expand Down
80 changes: 80 additions & 0 deletions test/integration/change-streams/change_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,86 @@ describe('Change Streams', function () {
});
afterEach(async () => await mock.cleanup());

context('ChangeStreamCursor options', function () {
let client, db, collection;

beforeEach(async function () {
client = await this.configuration.newClient().connect();
db = client.db('db');
collection = db.collection('collection');
});

afterEach(async function () {
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});

context('fullDocument', () => {
it('does not set fullDocument if no value is provided', function () {
const changeStream = client.watch();

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.fullDocument'
);
});

it('does not validate the value passed in for the `fullDocument` property', function () {
const changeStream = client.watch([], { fullDocument: 'invalid value' });

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.fullDocument',
'invalid value'
);
});

it('assigns `fullDocument` to the correct value if it is passed as an option', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = client.watch([], { fullDocument: 'updateLookup' });

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.fullDocument',
'updateLookup'
);
});
});

context('allChangesForCluster', () => {
it('assigns `allChangesForCluster` to `true` if the ChangeStream.type is Cluster', function () {
const changeStream = client.watch();

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster',
true
);
});

it('does not assign `allChangesForCluster` if the ChangeStream.type is Db', function () {
const changeStream = db.watch();

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster'
);
});

it('does not assign `allChangesForCluster` if the ChangeStream.type is Collection', function () {
const changeStream = collection.watch();

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster'
);
});
});

it('ignores any invalid option values', function () {
const changeStream = collection.watch([], { invalidOption: true });

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.invalidOption'
);
});
});

it('should close the listeners after the cursor is closed', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },

Expand Down
9 changes: 9 additions & 0 deletions test/types/change_stream.test-d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { expectError } from 'tsd';

import type { ChangeStreamOptions } from '../../src';

declare const changeStreamOptions: ChangeStreamOptions;

// The change stream spec says that we cannot throw an error for invalid values to `fullDocument`
// for future compatability. This means we must leave `fullDocument` as type string.
expectError<'updateLookup' | undefined>(changeStreamOptions.fullDocument);
dariakp marked this conversation as resolved.
Show resolved Hide resolved