Skip to content

Commit

Permalink
feat: introduce AbstractCursor and its concrete subclasses
Browse files Browse the repository at this point in the history
This change introduces a fundamental redesign of the cursor types
in the driver. The first change is to add a new `AbstractCursor`
type, which is only concerned with iterating a cursor (using
`getMore`) once it has been initialized. The `_initialize` method
must be implemented by subclasses. The concrete subclasses are
generally builders for `find` and `aggregate` commands, each
providing their own custom initialization method.

NODE-2809
  • Loading branch information
mbroadst committed Nov 12, 2020
1 parent cf5c865 commit 4d30352
Show file tree
Hide file tree
Showing 52 changed files with 2,004 additions and 2,894 deletions.
14 changes: 12 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
"bson-ext": "^2.0.0"
},
"dependencies": {
"@types/lodash": "^4.14.164",
"bl": "^2.2.1",
"bson": "^4.0.4",
"denque": "^1.4.1"
"denque": "^1.4.1",
"lodash": "^4.17.20"
},
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.1",
Expand Down
113 changes: 70 additions & 43 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError } from './error';
import { Cursor, CursorOptions, CursorStream, CursorStreamOptions } from './cursor/cursor';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import {
relayEvents,
Expand All @@ -21,6 +20,14 @@ import type { CollationOptions } from './cmap/wire_protocol/write_command';
import { MongoClient } from './mongo_client';
import { Db } from './db';
import { Collection } from './collection';
import type { Readable } from 'stream';
import {
AbstractCursor,
AbstractCursorOptions,
CursorStreamOptions
} from './cursor/abstract_cursor';
import type { ClientSession } from './sessions';
import { executeOperation, ExecutionResult } from './operations/execute_operation';

const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');
Expand Down Expand Up @@ -162,13 +169,6 @@ interface UpdateDescription {
removedFields: string[];
}

/** @internal */
export class ChangeStreamStream extends CursorStream {
constructor(cursor: ChangeStreamCursor) {
super(cursor);
}
}

/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @public
Expand All @@ -183,7 +183,7 @@ export class ChangeStream extends EventEmitter {
closed: boolean;
streamOptions?: CursorStreamOptions;
[kResumeQueue]: Denque;
[kCursorStream]?: CursorStream;
[kCursorStream]?: Readable;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -252,13 +252,13 @@ export class ChangeStream extends EventEmitter {

this.on('removeListener', eventName => {
if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
this[kCursorStream]?.removeAllListeners(CursorStream.DATA);
this[kCursorStream]?.removeAllListeners('data');
}
});
}

/** @internal */
get cursorStream(): CursorStream | undefined {
get cursorStream(): Readable | undefined {
return this[kCursorStream];
}

Expand Down Expand Up @@ -325,7 +325,7 @@ export class ChangeStream extends EventEmitter {
* Return a modified Readable stream including a possible transform method.
* @throws MongoError if this.cursor is undefined
*/
stream(options?: CursorStreamOptions): ChangeStreamStream {
stream(options?: CursorStreamOptions): Readable {
this.streamOptions = options;
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to stream');
Expand All @@ -335,28 +335,34 @@ export class ChangeStream extends EventEmitter {
}

/** @public */
export interface ChangeStreamCursorOptions extends CursorOptions {
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
startAtOperationTime?: OperationTime;
resumeAfter?: ResumeToken;
startAfter?: boolean;
}

/** @internal */
export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamCursorOptions> {
export class ChangeStreamCursor extends AbstractCursor {
_resumeToken: ResumeToken;
startAtOperationTime?: OperationTime;
hasReceived?: boolean;
resumeAfter: ResumeToken;
startAfter: ResumeToken;
options: ChangeStreamCursorOptions;

postBatchResumeToken?: Document;
pipeline: Document[];

constructor(
topology: Topology,
operation: AggregateOperation,
options: ChangeStreamCursorOptions
namespace: MongoDBNamespace,
pipeline: Document[] = [],
options: ChangeStreamCursorOptions = {}
) {
super(topology, operation, options);
super(topology, namespace, options);

options = options || {};
this.pipeline = pipeline;
this.options = options;
this._resumeToken = null;
this.startAtOperationTime = options.startAtOperationTime;

Expand Down Expand Up @@ -421,18 +427,28 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
}
}

_initializeCursor(callback: Callback): void {
super._initializeCursor((err, response) => {
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
const aggregateOperation = new AggregateOperation(
{ s: { namespace: this.namespace } },
this.pipeline,
{
...this.cursorOptions,
...this.options,
session
}
);

executeOperation(this.topology, aggregateOperation, (err, response) => {
if (err || response == null) {
callback(err, response);
return;
return callback(err);
}

const server = aggregateOperation.server;
if (
this.startAtOperationTime == null &&
this.resumeAfter == null &&
this.startAfter == null &&
maxWireVersion(this.server) >= 7
maxWireVersion(server) >= 7
) {
this.startAtOperationTime = response.operationTime;
}
Expand All @@ -441,15 +457,16 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC

this.emit('init', response);
this.emit('response');
callback(err, response);

// TODO: NODE-2882
callback(undefined, { server, session, response });
});
}

_getMore(callback: Callback): void {
super._getMore((err, response) => {
_getMore(batchSize: number, callback: Callback): void {
super._getMore(batchSize, (err, response) => {
if (err) {
callback(err);
return;
return callback(err);
}

this._processBatch('nextBatch', response);
Expand All @@ -466,26 +483,32 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
* @internal
*/
function createChangeStreamCursor(
self: ChangeStream,
changeStream: ChangeStream,
options: ChangeStreamOptions
): ChangeStreamCursor {
const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}

const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(
changeStream.pipeline
);

const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const changeStreamCursor = new ChangeStreamCursor(
getTopology(self.parent),
new AggregateOperation(self.parent, pipeline, options),
getTopology(changeStream.parent),
changeStream.namespace,
pipeline,
cursorOptions
);

relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
relayEvents(changeStreamCursor, changeStream, ['resumeTokenChanged', 'end', 'close']);
if (changeStream.listenerCount(ChangeStream.CHANGE) > 0) {
streamEvents(changeStream, changeStreamCursor);
}

if (self.listenerCount(ChangeStream.CHANGE) > 0) streamEvents(self, changeStreamCursor);
return changeStreamCursor;
}

Expand Down Expand Up @@ -532,24 +555,24 @@ function waitForTopologyConnected(
}

function closeWithError(changeStream: ChangeStream, error: AnyError, callback?: Callback): void {
if (!callback) changeStream.emit(ChangeStream.ERROR, error);
if (!callback) {
changeStream.emit(ChangeStream.ERROR, error);
}

changeStream.close(() => callback && callback(error));
}

function streamEvents(changeStream: ChangeStream, cursor: ChangeStreamCursor): void {
const stream = changeStream[kCursorStream] || cursor.stream();
changeStream[kCursorStream] = stream;
stream.on(CursorStream.DATA, change => processNewChange(changeStream, change));
stream.on(CursorStream.ERROR, error => processError(changeStream, error));
stream.on('data', change => processNewChange(changeStream, change));
stream.on('error', error => processError(changeStream, error));
}

function endStream(changeStream: ChangeStream): void {
const cursorStream = changeStream[kCursorStream];
if (cursorStream) {
[CursorStream.DATA, CursorStream.CLOSE, CursorStream.END, CursorStream.ERROR].forEach(event =>
cursorStream.removeAllListeners(event)
);

['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
cursorStream.destroy();
}

Expand Down Expand Up @@ -605,7 +628,10 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca

// otherwise, raise an error and close the change stream
function unresumableError(err: AnyError) {
if (!callback) changeStream.emit(ChangeStream.ERROR, err);
if (!callback) {
changeStream.emit(ChangeStream.ERROR, err);
}

changeStream.close(() => processResumeQueue(changeStream, err));
}

Expand Down Expand Up @@ -676,6 +702,7 @@ function processResumeQueue(changeStream: ChangeStream, err?: Error) {
request(new MongoError('Change Stream is not open.'));
return;
}

request(err, changeStream.cursor);
}
}
5 changes: 5 additions & 0 deletions src/cmap/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { ConnectionPool, ConnectionPoolOptions } from './connection_pool';
import type { Connection } from './connection';
import type { Document } from '../bson';
import type { AnyError } from '../error';
import { cloneDeep } from 'lodash';

/**
* The base export class for all monitoring events published from the connection pool
Expand Down Expand Up @@ -394,6 +395,10 @@ function extractCommand(command: WriteProtocolMessageType): Document {
}

function extractReply(command: WriteProtocolMessageType, reply?: Document) {
if (reply) {
reply = cloneDeep(reply);
}

if (command instanceof KillCursor) {
return {
ok: 1,
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/wire_protocol/get_more.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface GetMoreOptions extends CommandOptions {
batchSize?: number;
maxTimeMS?: number;
maxAwaitTimeMS?: number;
comment?: Document;
comment?: Document | string;
}

export function getMore(
Expand Down
19 changes: 16 additions & 3 deletions src/cmap/wire_protocol/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@ export function query(
}

const readPreference = getReadPreference(cmd, options);
const findCmd = prepareFindCommand(server, ns, cmd);
let findCmd = prepareFindCommand(server, ns, cmd);

// If we have explain, we need to rewrite the find command
// to wrap it in the explain command
if (typeof options.explain === 'boolean' && options.explain === true) {
findCmd = {
explain: findCmd
};
}

// NOTE: This actually modifies the passed in cmd, and our code _depends_ on this
// side-effect. Change this ASAP
Expand Down Expand Up @@ -192,10 +200,15 @@ function prepareLegacyFindQuery(
if (cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan;
if (cmd.min) findCmd['$min'] = cmd.min;
if (cmd.max) findCmd['$max'] = cmd.max;
if (typeof cmd.showDiskLoc !== 'undefined') findCmd['$showDiskLoc'] = cmd.showDiskLoc;
if (typeof cmd.showDiskLoc !== 'undefined') {
findCmd['$showDiskLoc'] = cmd.showDiskLoc;
} else if (typeof cmd.showRecordId !== 'undefined') {
findCmd['$showDiskLoc'] = cmd.showRecordId;
}

if (cmd.comment) findCmd['$comment'] = cmd.comment;
if (cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS;
if (cmd.explain) {
if (options.explain) {
// nToReturn must be 0 (match all) or negative (match N and close cursor)
// nToReturn > 0 will give explain results equivalent to limit(0)
numberToReturn = -Math.abs(cmd.limit || 0);
Expand Down
Loading

0 comments on commit 4d30352

Please sign in to comment.