diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 94a87abfc37..31baff9232f 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -17,6 +17,7 @@ const kTopology = Symbol('topology'); const kSession = Symbol('session'); const kOptions = Symbol('options'); const kTransform = Symbol('transform'); +const kInitialized = Symbol('iniitalized'); const kClosed = Symbol('closed'); const kKilled = Symbol('killed'); @@ -77,6 +78,7 @@ export abstract class AbstractCursor extends EventEmitter { [kDocuments]: Document[]; [kTopology]: Topology; [kTransform]?: (doc: Document) => Document; + [kInitialized]: boolean; [kClosed]: boolean; [kKilled]: boolean; [kOptions]: InternalAbstractCursorOptions; @@ -94,6 +96,7 @@ export abstract class AbstractCursor extends EventEmitter { this[kTopology] = topology; this[kNamespace] = namespace; this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230 + this[kInitialized] = false; this[kClosed] = false; this[kKilled] = false; this[kOptions] = { @@ -386,6 +389,7 @@ export abstract class AbstractCursor extends EventEmitter { * @param value - The flag boolean value. */ addCursorFlag(flag: CursorFlag, value: boolean): this { + assertUninitialized(this); if (!CURSOR_FLAGS.includes(flag)) { throw new MongoError(`flag ${flag} is not one of ${CURSOR_FLAGS}`); } @@ -404,6 +408,7 @@ export abstract class AbstractCursor extends EventEmitter { * @param transform - The mapping transformation method. */ map(transform: (doc: Document) => Document): this { + assertUninitialized(this); const oldTransform = this[kTransform]; if (oldTransform) { this[kTransform] = doc => { @@ -422,6 +427,7 @@ export abstract class AbstractCursor extends EventEmitter { * @param readPreference - The new read preference for the cursor. */ setReadPreference(readPreference: ReadPreferenceLike): this { + assertUninitialized(this); if (readPreference instanceof ReadPreference) { this[kOptions].readPreference = readPreference; } else if (typeof readPreference === 'string') { @@ -439,6 +445,7 @@ export abstract class AbstractCursor extends EventEmitter { * @param value - Number of milliseconds to wait before aborting the query. */ maxTimeMS(value: number): this { + assertUninitialized(this); if (typeof value !== 'number') { throw new TypeError('maxTimeMS must be a number'); } @@ -453,6 +460,7 @@ export abstract class AbstractCursor extends EventEmitter { * @param value - The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find/|find command documentation}. */ batchSize(value: number): this { + assertUninitialized(this); if (this[kOptions].tailable) { throw new MongoError('Tailable cursors do not support batchSize'); } @@ -568,6 +576,9 @@ function next(cursor: AbstractCursor, callback: Callback): void } } + // the cursor is now initialized, even if an error occurred or it is dead + cursor[kInitialized] = true; + if (err || cursorIsDead(cursor)) { return cleanupCursor(cursor, () => callback(err, nextDocument(cursor))); } @@ -622,6 +633,13 @@ function cleanupCursor(cursor: AbstractCursor, callback: Callback): void { } } +/** @internal */ +export function assertUninitialized(cursor: AbstractCursor): void { + if (cursor[kInitialized]) { + throw new MongoError('Cursor is already initialized'); + } +} + function makeCursorStream(cursor: AbstractCursor) { const readable = new Readable({ objectMode: true, diff --git a/src/cursor/aggregation_cursor.ts b/src/cursor/aggregation_cursor.ts index ba3633d5643..fee6b6d3096 100644 --- a/src/cursor/aggregation_cursor.ts +++ b/src/cursor/aggregation_cursor.ts @@ -1,5 +1,5 @@ import { AggregateOperation, AggregateOptions } from '../operations/aggregate'; -import { AbstractCursor } from './abstract_cursor'; +import { AbstractCursor, assertUninitialized } from './abstract_cursor'; import { executeOperation, ExecutionResult } from '../operations/execute_operation'; import type { Document } from '../bson'; import type { Sort } from '../sort'; @@ -80,68 +80,79 @@ export class AggregationCursor extends AbstractCursor { /** Add a group stage to the aggregation pipeline */ group($group: Document): this { - this.pipeline.push({ $group }); + assertUninitialized(this); + this[kPipeline].push({ $group }); return this; } /** Add a limit stage to the aggregation pipeline */ limit($limit: number): this { - this.pipeline.push({ $limit }); + assertUninitialized(this); + this[kPipeline].push({ $limit }); return this; } /** Add a match stage to the aggregation pipeline */ match($match: Document): this { - this.pipeline.push({ $match }); + assertUninitialized(this); + this[kPipeline].push({ $match }); return this; } /** Add a out stage to the aggregation pipeline */ out($out: number): this { - this.pipeline.push({ $out }); + assertUninitialized(this); + this[kPipeline].push({ $out }); return this; } /** Add a project stage to the aggregation pipeline */ project($project: Document): this { - this.pipeline.push({ $project }); + assertUninitialized(this); + this[kPipeline].push({ $project }); return this; } /** Add a lookup stage to the aggregation pipeline */ lookup($lookup: Document): this { - this.pipeline.push({ $lookup }); + assertUninitialized(this); + this[kPipeline].push({ $lookup }); return this; } /** Add a redact stage to the aggregation pipeline */ redact($redact: Document): this { - this.pipeline.push({ $redact }); + assertUninitialized(this); + this[kPipeline].push({ $redact }); return this; } /** Add a skip stage to the aggregation pipeline */ skip($skip: number): this { - this.pipeline.push({ $skip }); + assertUninitialized(this); + this[kPipeline].push({ $skip }); return this; } /** Add a sort stage to the aggregation pipeline */ sort($sort: Sort): this { - this.pipeline.push({ $sort }); + assertUninitialized(this); + this[kPipeline].push({ $sort }); return this; } /** Add a unwind stage to the aggregation pipeline */ unwind($unwind: number): this { - this.pipeline.push({ $unwind }); + assertUninitialized(this); + this[kPipeline].push({ $unwind }); return this; } // deprecated methods /** @deprecated Add a geoNear stage to the aggregation pipeline */ geoNear($geoNear: Document): this { - this.pipeline.push({ $geoNear }); + assertUninitialized(this); + this[kPipeline].push({ $geoNear }); return this; } } diff --git a/src/cursor/find_cursor.ts b/src/cursor/find_cursor.ts index 4c2d4c853c5..3684f91feff 100644 --- a/src/cursor/find_cursor.ts +++ b/src/cursor/find_cursor.ts @@ -9,7 +9,7 @@ import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import { formatSort, Sort, SortDirection } from '../sort'; import type { Callback, MongoDBNamespace } from '../utils'; -import { AbstractCursor } from './abstract_cursor'; +import { AbstractCursor, assertUninitialized } from './abstract_cursor'; /** @internal */ const kFilter = Symbol('filter'); @@ -50,7 +50,6 @@ export class FindCursor extends AbstractCursor { /** @internal */ _initialize(session: ClientSession | undefined, callback: Callback): void { - this[kBuiltOptions] = Object.freeze(this[kBuiltOptions]); const findOperation = new FindOperation(undefined, this.namespace, this[kFilter], { ...this[kBuiltOptions], // NOTE: order matters here, we may need to refine this ...this.cursorOptions, @@ -143,6 +142,7 @@ export class FindCursor extends AbstractCursor { /** Set the cursor query */ filter(filter: Document): this { + assertUninitialized(this); this[kFilter] = filter; return this; } @@ -153,6 +153,7 @@ export class FindCursor extends AbstractCursor { * @param hint - If specified, then the query system will only consider plans using the hinted index. */ hint(hint: Hint): this { + assertUninitialized(this); this[kBuiltOptions].hint = hint; return this; } @@ -163,6 +164,7 @@ export class FindCursor extends AbstractCursor { * @param min - Specify a $min value to specify the inclusive lower bound for a specific index in order to constrain the results of find(). The $min specifies the lower bound for all keys of a specific index in order. */ min(min: number): this { + assertUninitialized(this); this[kBuiltOptions].min = min; return this; } @@ -173,6 +175,7 @@ export class FindCursor extends AbstractCursor { * @param max - Specify a $max value to specify the exclusive upper bound for a specific index in order to constrain the results of find(). The $max specifies the upper bound for all keys of a specific index in order. */ max(max: number): this { + assertUninitialized(this); this[kBuiltOptions].max = max; return this; } @@ -185,6 +188,7 @@ export class FindCursor extends AbstractCursor { * @param value - the returnKey value. */ returnKey(value: boolean): this { + assertUninitialized(this); this[kBuiltOptions].returnKey = value; return this; } @@ -195,6 +199,7 @@ export class FindCursor extends AbstractCursor { * @param value - The $showDiskLoc option has now been deprecated and replaced with the showRecordId field. $showDiskLoc will still be accepted for OP_QUERY stye find. */ showRecordId(value: boolean): this { + assertUninitialized(this); this[kBuiltOptions].showRecordId = value; return this; } @@ -206,6 +211,7 @@ export class FindCursor extends AbstractCursor { * @param value - The modifier value. */ addQueryModifier(name: string, value: string | boolean | number | Document): this { + assertUninitialized(this); if (name[0] !== '$') { throw new MongoError(`${name} is not a valid query modifier`); } @@ -268,6 +274,7 @@ export class FindCursor extends AbstractCursor { * @param value - The comment attached to this query. */ comment(value: string): this { + assertUninitialized(this); this[kBuiltOptions].comment = value; return this; } @@ -278,6 +285,7 @@ export class FindCursor extends AbstractCursor { * @param value - Number of milliseconds to wait before aborting the tailed query. */ maxAwaitTimeMS(value: number): this { + assertUninitialized(this); if (typeof value !== 'number') { throw new MongoError('maxAwaitTimeMS must be a number'); } @@ -292,6 +300,7 @@ export class FindCursor extends AbstractCursor { * @param value - Number of milliseconds to wait before aborting the query. */ maxTimeMS(value: number): this { + assertUninitialized(this); if (typeof value !== 'number') { throw new MongoError('maxTimeMS must be a number'); } @@ -306,6 +315,7 @@ export class FindCursor extends AbstractCursor { * @param value - The field projection object. */ project(value: Document): this { + assertUninitialized(this); this[kBuiltOptions].projection = value; return this; } @@ -317,6 +327,7 @@ export class FindCursor extends AbstractCursor { * @param direction - The direction of the sorting (1 or -1). */ sort(sort: Sort | string, direction?: SortDirection): this { + assertUninitialized(this); if (this[kBuiltOptions].tailable) { throw new MongoError('Tailable cursor does not support sorting'); } @@ -331,6 +342,7 @@ export class FindCursor extends AbstractCursor { * @param value - The cursor collation options (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields). */ collation(value: CollationOptions): this { + assertUninitialized(this); this[kBuiltOptions].collation = value; return this; } @@ -341,12 +353,13 @@ export class FindCursor extends AbstractCursor { * @param value - The limit for the cursor query. */ limit(value: number): this { + assertUninitialized(this); if (this[kBuiltOptions].tailable) { throw new MongoError('Tailable cursor does not support limit'); } if (typeof value !== 'number') { - throw new MongoError('limit requires an integer'); + throw new TypeError('limit requires an integer'); } this[kBuiltOptions].limit = value; @@ -359,12 +372,13 @@ export class FindCursor extends AbstractCursor { * @param value - The skip for the cursor query. */ skip(value: number): this { + assertUninitialized(this); if (this[kBuiltOptions].tailable) { throw new MongoError('Tailable cursor does not support skip'); } if (typeof value !== 'number') { - throw new MongoError('skip requires an integer'); + throw new TypeError('skip requires an integer'); } this[kBuiltOptions].skip = value; diff --git a/test/functional/cursor.test.js b/test/functional/cursor.test.js index 9707facd3e5..eacfdf8fe79 100644 --- a/test/functional/cursor.test.js +++ b/test/functional/cursor.test.js @@ -638,7 +638,7 @@ describe('Cursor', function () { expect(err).to.not.exist; expect(() => { cursor.limit(1); - }).to.throw(/not extensible/); + }).to.throw(/Cursor is already initialized/); done(); }); @@ -868,97 +868,6 @@ describe('Cursor', function () { } }); - it('shouldCorrectlyHandleChangesInBatchSizes', { - // Add a tag that our runner can trigger on - // in this case we are setting that node needs to be higher than 0.10.X to run - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] } - }, - - test: function (done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); - - const db = client.db(configuration.db); - db.createCollection('test_not_multiple_batch_size', (err, collection) => { - expect(err).to.not.exist; - - var records = 6; - var batchSize = 2; - var docs = []; - for (var i = 0; i < records; i++) { - docs.push({ a: i }); - } - - collection.insert(docs, configuration.writeConcernMax(), () => { - expect(err).to.not.exist; - - const cursor = collection.find({}, { batchSize: batchSize }); - this.defer(() => cursor.close()); - - //1st - cursor.next((err, items) => { - expect(err).to.not.exist; - //cursor.items should contain 1 since nextObject already popped one - test.equal(1, cursor.bufferedCount()); - test.ok(items != null); - - //2nd - cursor.next((err, items) => { - expect(err).to.not.exist; - test.equal(0, cursor.bufferedCount()); - test.ok(items != null); - - //test batch size modification on the fly - batchSize = 3; - cursor.batchSize(batchSize); - - //3rd - cursor.next((err, items) => { - expect(err).to.not.exist; - test.equal(2, cursor.bufferedCount()); - test.ok(items != null); - - //4th - cursor.next((err, items) => { - expect(err).to.not.exist; - test.equal(1, cursor.bufferedCount()); - test.ok(items != null); - - //5th - cursor.next((err, items) => { - expect(err).to.not.exist; - test.equal(0, cursor.bufferedCount()); - test.ok(items != null); - - //6th - cursor.next((err, items) => { - expect(err).to.not.exist; - test.equal(0, cursor.bufferedCount()); - test.ok(items != null); - - //No more - cursor.next((err, items) => { - expect(err).to.not.exist; - test.ok(items == null); - test.ok(cursor.isClosed()); - done(); - }); - }); - }); - }); - }); - }); - }); - }); - }); - }); - } - }); - it('shouldCorrectlyHandleBatchSize', { // Add a tag that our runner can trigger on // in this case we are setting that node needs to be higher than 0.10.X to run