Skip to content

Commit

Permalink
add cursor initialization checks
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Nov 13, 2020
1 parent da9d687 commit 6dac300
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 108 deletions.
18 changes: 18 additions & 0 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const kTopology = Symbol('topology');
const kSession = Symbol('session');
const kOptions = Symbol('options');
const kTransform = Symbol('transform');
const kInitialized = Symbol('iniitalized');
const kClosed = Symbol('closed');
const kKilled = Symbol('killed');

Expand Down Expand Up @@ -77,6 +78,7 @@ export abstract class AbstractCursor extends EventEmitter {
[kDocuments]: Document[];
[kTopology]: Topology;
[kTransform]?: (doc: Document) => Document;
[kInitialized]: boolean;
[kClosed]: boolean;
[kKilled]: boolean;
[kOptions]: InternalAbstractCursorOptions;
Expand All @@ -94,6 +96,7 @@ export abstract class AbstractCursor extends EventEmitter {
this[kTopology] = topology;
this[kNamespace] = namespace;
this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230
this[kInitialized] = false;
this[kClosed] = false;
this[kKilled] = false;
this[kOptions] = {
Expand Down Expand Up @@ -386,6 +389,7 @@ export abstract class AbstractCursor extends EventEmitter {
* @param value - The flag boolean value.
*/
addCursorFlag(flag: CursorFlag, value: boolean): this {
assertUninitialized(this);
if (!CURSOR_FLAGS.includes(flag)) {
throw new MongoError(`flag ${flag} is not one of ${CURSOR_FLAGS}`);
}
Expand All @@ -404,6 +408,7 @@ export abstract class AbstractCursor extends EventEmitter {
* @param transform - The mapping transformation method.
*/
map(transform: (doc: Document) => Document): this {
assertUninitialized(this);
const oldTransform = this[kTransform];
if (oldTransform) {
this[kTransform] = doc => {
Expand All @@ -422,6 +427,7 @@ export abstract class AbstractCursor extends EventEmitter {
* @param readPreference - The new read preference for the cursor.
*/
setReadPreference(readPreference: ReadPreferenceLike): this {
assertUninitialized(this);
if (readPreference instanceof ReadPreference) {
this[kOptions].readPreference = readPreference;
} else if (typeof readPreference === 'string') {
Expand All @@ -439,6 +445,7 @@ export abstract class AbstractCursor extends EventEmitter {
* @param value - Number of milliseconds to wait before aborting the query.
*/
maxTimeMS(value: number): this {
assertUninitialized(this);
if (typeof value !== 'number') {
throw new TypeError('maxTimeMS must be a number');
}
Expand All @@ -453,6 +460,7 @@ export abstract class AbstractCursor extends EventEmitter {
* @param value - The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find/|find command documentation}.
*/
batchSize(value: number): this {
assertUninitialized(this);
if (this[kOptions].tailable) {
throw new MongoError('Tailable cursors do not support batchSize');
}
Expand Down Expand Up @@ -568,6 +576,9 @@ function next(cursor: AbstractCursor, callback: Callback<Document | null>): void
}
}

// the cursor is now initialized, even if an error occurred or it is dead
cursor[kInitialized] = true;

if (err || cursorIsDead(cursor)) {
return cleanupCursor(cursor, () => callback(err, nextDocument(cursor)));
}
Expand Down Expand Up @@ -622,6 +633,13 @@ function cleanupCursor(cursor: AbstractCursor, callback: Callback): void {
}
}

/** @internal */
export function assertUninitialized(cursor: AbstractCursor): void {
if (cursor[kInitialized]) {
throw new MongoError('Cursor is already initialized');
}
}

function makeCursorStream(cursor: AbstractCursor) {
const readable = new Readable({
objectMode: true,
Expand Down
35 changes: 23 additions & 12 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AggregateOperation, AggregateOptions } from '../operations/aggregate';
import { AbstractCursor } from './abstract_cursor';
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import type { Document } from '../bson';
import type { Sort } from '../sort';
Expand Down Expand Up @@ -80,68 +80,79 @@ export class AggregationCursor extends AbstractCursor {

/** Add a group stage to the aggregation pipeline */
group($group: Document): this {
this.pipeline.push({ $group });
assertUninitialized(this);
this[kPipeline].push({ $group });
return this;
}

/** Add a limit stage to the aggregation pipeline */
limit($limit: number): this {
this.pipeline.push({ $limit });
assertUninitialized(this);
this[kPipeline].push({ $limit });
return this;
}

/** Add a match stage to the aggregation pipeline */
match($match: Document): this {
this.pipeline.push({ $match });
assertUninitialized(this);
this[kPipeline].push({ $match });
return this;
}

/** Add a out stage to the aggregation pipeline */
out($out: number): this {
this.pipeline.push({ $out });
assertUninitialized(this);
this[kPipeline].push({ $out });
return this;
}

/** Add a project stage to the aggregation pipeline */
project($project: Document): this {
this.pipeline.push({ $project });
assertUninitialized(this);
this[kPipeline].push({ $project });
return this;
}

/** Add a lookup stage to the aggregation pipeline */
lookup($lookup: Document): this {
this.pipeline.push({ $lookup });
assertUninitialized(this);
this[kPipeline].push({ $lookup });
return this;
}

/** Add a redact stage to the aggregation pipeline */
redact($redact: Document): this {
this.pipeline.push({ $redact });
assertUninitialized(this);
this[kPipeline].push({ $redact });
return this;
}

/** Add a skip stage to the aggregation pipeline */
skip($skip: number): this {
this.pipeline.push({ $skip });
assertUninitialized(this);
this[kPipeline].push({ $skip });
return this;
}

/** Add a sort stage to the aggregation pipeline */
sort($sort: Sort): this {
this.pipeline.push({ $sort });
assertUninitialized(this);
this[kPipeline].push({ $sort });
return this;
}

/** Add a unwind stage to the aggregation pipeline */
unwind($unwind: number): this {
this.pipeline.push({ $unwind });
assertUninitialized(this);
this[kPipeline].push({ $unwind });
return this;
}

// deprecated methods
/** @deprecated Add a geoNear stage to the aggregation pipeline */
geoNear($geoNear: Document): this {
this.pipeline.push({ $geoNear });
assertUninitialized(this);
this[kPipeline].push({ $geoNear });
return this;
}
}
22 changes: 18 additions & 4 deletions src/cursor/find_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { formatSort, Sort, SortDirection } from '../sort';
import type { Callback, MongoDBNamespace } from '../utils';
import { AbstractCursor } from './abstract_cursor';
import { AbstractCursor, assertUninitialized } from './abstract_cursor';

/** @internal */
const kFilter = Symbol('filter');
Expand Down Expand Up @@ -50,7 +50,6 @@ export class FindCursor extends AbstractCursor {

/** @internal */
_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
this[kBuiltOptions] = Object.freeze(this[kBuiltOptions]);
const findOperation = new FindOperation(undefined, this.namespace, this[kFilter], {
...this[kBuiltOptions], // NOTE: order matters here, we may need to refine this
...this.cursorOptions,
Expand Down Expand Up @@ -143,6 +142,7 @@ export class FindCursor extends AbstractCursor {

/** Set the cursor query */
filter(filter: Document): this {
assertUninitialized(this);
this[kFilter] = filter;
return this;
}
Expand All @@ -153,6 +153,7 @@ export class FindCursor extends AbstractCursor {
* @param hint - If specified, then the query system will only consider plans using the hinted index.
*/
hint(hint: Hint): this {
assertUninitialized(this);
this[kBuiltOptions].hint = hint;
return this;
}
Expand All @@ -163,6 +164,7 @@ export class FindCursor extends AbstractCursor {
* @param min - Specify a $min value to specify the inclusive lower bound for a specific index in order to constrain the results of find(). The $min specifies the lower bound for all keys of a specific index in order.
*/
min(min: number): this {
assertUninitialized(this);
this[kBuiltOptions].min = min;
return this;
}
Expand All @@ -173,6 +175,7 @@ export class FindCursor extends AbstractCursor {
* @param max - Specify a $max value to specify the exclusive upper bound for a specific index in order to constrain the results of find(). The $max specifies the upper bound for all keys of a specific index in order.
*/
max(max: number): this {
assertUninitialized(this);
this[kBuiltOptions].max = max;
return this;
}
Expand All @@ -185,6 +188,7 @@ export class FindCursor extends AbstractCursor {
* @param value - the returnKey value.
*/
returnKey(value: boolean): this {
assertUninitialized(this);
this[kBuiltOptions].returnKey = value;
return this;
}
Expand All @@ -195,6 +199,7 @@ export class FindCursor extends AbstractCursor {
* @param value - The $showDiskLoc option has now been deprecated and replaced with the showRecordId field. $showDiskLoc will still be accepted for OP_QUERY stye find.
*/
showRecordId(value: boolean): this {
assertUninitialized(this);
this[kBuiltOptions].showRecordId = value;
return this;
}
Expand All @@ -206,6 +211,7 @@ export class FindCursor extends AbstractCursor {
* @param value - The modifier value.
*/
addQueryModifier(name: string, value: string | boolean | number | Document): this {
assertUninitialized(this);
if (name[0] !== '$') {
throw new MongoError(`${name} is not a valid query modifier`);
}
Expand Down Expand Up @@ -268,6 +274,7 @@ export class FindCursor extends AbstractCursor {
* @param value - The comment attached to this query.
*/
comment(value: string): this {
assertUninitialized(this);
this[kBuiltOptions].comment = value;
return this;
}
Expand All @@ -278,6 +285,7 @@ export class FindCursor extends AbstractCursor {
* @param value - Number of milliseconds to wait before aborting the tailed query.
*/
maxAwaitTimeMS(value: number): this {
assertUninitialized(this);
if (typeof value !== 'number') {
throw new MongoError('maxAwaitTimeMS must be a number');
}
Expand All @@ -292,6 +300,7 @@ export class FindCursor extends AbstractCursor {
* @param value - Number of milliseconds to wait before aborting the query.
*/
maxTimeMS(value: number): this {
assertUninitialized(this);
if (typeof value !== 'number') {
throw new MongoError('maxTimeMS must be a number');
}
Expand All @@ -306,6 +315,7 @@ export class FindCursor extends AbstractCursor {
* @param value - The field projection object.
*/
project(value: Document): this {
assertUninitialized(this);
this[kBuiltOptions].projection = value;
return this;
}
Expand All @@ -317,6 +327,7 @@ export class FindCursor extends AbstractCursor {
* @param direction - The direction of the sorting (1 or -1).
*/
sort(sort: Sort | string, direction?: SortDirection): this {
assertUninitialized(this);
if (this[kBuiltOptions].tailable) {
throw new MongoError('Tailable cursor does not support sorting');
}
Expand All @@ -331,6 +342,7 @@ export class FindCursor extends AbstractCursor {
* @param value - The cursor collation options (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
*/
collation(value: CollationOptions): this {
assertUninitialized(this);
this[kBuiltOptions].collation = value;
return this;
}
Expand All @@ -341,12 +353,13 @@ export class FindCursor extends AbstractCursor {
* @param value - The limit for the cursor query.
*/
limit(value: number): this {
assertUninitialized(this);
if (this[kBuiltOptions].tailable) {
throw new MongoError('Tailable cursor does not support limit');
}

if (typeof value !== 'number') {
throw new MongoError('limit requires an integer');
throw new TypeError('limit requires an integer');
}

this[kBuiltOptions].limit = value;
Expand All @@ -359,12 +372,13 @@ export class FindCursor extends AbstractCursor {
* @param value - The skip for the cursor query.
*/
skip(value: number): this {
assertUninitialized(this);
if (this[kBuiltOptions].tailable) {
throw new MongoError('Tailable cursor does not support skip');
}

if (typeof value !== 'number') {
throw new MongoError('skip requires an integer');
throw new TypeError('skip requires an integer');
}

this[kBuiltOptions].skip = value;
Expand Down
Loading

0 comments on commit 6dac300

Please sign in to comment.