diff --git a/src/collection.ts b/src/collection.ts index 017a8bdfed..c3a9fab721 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -19,7 +19,7 @@ import ordered = require('./bulk/ordered'); import ChangeStream = require('./change_stream'); import { WriteConcern } from './write_concern'; import ReadConcern = require('./read_concern'); -import { AggregationCursor, CommandCursor } from './cursor'; +import { AggregationCursor, CommandCursor, Cursor } from './cursor'; import AggregateOperation = require('./operations/aggregate'); import BulkWriteOperation = require('./operations/bulk_write'); import CountDocumentsOperation = require('./operations/count_documents'); @@ -1594,7 +1594,8 @@ Collection.prototype.find = deprecateOptions( decorateWithCollation(findCommand, this, options); - const cursor = this.s.topology.cursor( + const cursor = new Cursor( + this.s.topology, new FindOperation(this, this.s.namespace, findCommand, newOptions), newOptions ); diff --git a/src/cursor/command_cursor.ts b/src/cursor/command_cursor.ts index 54d92c6adb..8a0f1e2b54 100644 --- a/src/cursor/command_cursor.ts +++ b/src/cursor/command_cursor.ts @@ -2,6 +2,7 @@ import { ReadPreference } from '../read_preference'; import { MongoError } from '../error'; import Cursor = require('./cursor'); import { CursorState } from './core_cursor'; +import type { OperationBase } from '../operations/operation'; /** * @file The **CommandCursor** class is an internal class that embodies a @@ -55,12 +56,11 @@ import { CursorState } from './core_cursor'; class CommandCursor extends Cursor { /** * @param {any} topology - * @param {any} ns - * @param {any} cmd + * @param {any} operation * @param {any} [options] */ - constructor(topology: any, ns: any, cmd: any, options?: any) { - super(topology, ns, cmd, options); + constructor(topology: any, operation: OperationBase, options?: any) { + super(topology, operation, options); } /** diff --git a/src/cursor/core_cursor.ts b/src/cursor/core_cursor.ts index bed0162338..e9cb7e931e 100644 --- a/src/cursor/core_cursor.ts +++ b/src/cursor/core_cursor.ts @@ -1,12 +1,12 @@ import Logger = require('../logger'); import { ReadPreference } from '../read_preference'; -import { handleCallback, collationNotSupported, MongoDBNamespace } from '../utils'; +import { handleCallback, MongoDBNamespace } from '../utils'; import executeOperation = require('../operations/execute_operation'); import { Readable } from 'stream'; -import { OperationBase } from '../operations/operation'; import { MongoError, MongoNetworkError } from '../error'; import { Long } from '../bson'; import type { BSONSerializeOptions } from '../types'; +import type { OperationBase } from '../operations/operation'; export interface InternalCursorState extends BSONSerializeOptions { [key: string]: any; @@ -80,9 +80,8 @@ class CoreCursor extends Readable { * Create a new core `Cursor` instance. * **NOTE** Not to be instantiated directly * - * @param {any} topology The server topology instance. - * @param {any} ns The MongoDB fully qualified namespace (ex: db1.collection1) - * @param {{object}|Long} cmd The selector (can be a command or a cursorId) + * @param {Topology} topology The server topology instance. + * @param {OperationBase} operation The operation to run against the cluster * @param {object} [options=null] Optional settings. * @param {object} [options.batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find/| find command documentation} and {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. * @param {Array} [options.documents=[]] Initial documents list for cursor @@ -90,36 +89,24 @@ class CoreCursor extends Readable { * @param {Function} [options.transforms.query] Transform the value returned from the initial query * @param {Function} [options.transforms.doc] Transform each document returned from Cursor.prototype._next */ - constructor(topology: any, ns: any, cmd: any, options?: any) { + constructor(topology: any, operation: OperationBase, options?: any) { super({ objectMode: true }); options = options || {}; - if (ns instanceof OperationBase) { - this.operation = ns; - ns = this.operation.ns.toString(); - options = this.operation.options; - cmd = this.operation.cmd ? this.operation.cmd : {}; - } - - // Cursor pool - this.pool = null; - // Cursor server - this.server = null; - - // Do we have a not connected handler - this.disconnectHandler = options.disconnectHandler; + const cmd = operation.cmd ? operation.cmd : {}; // Set local values - this.ns = ns; - this.namespace = MongoDBNamespace.fromString(ns); + this.operation = operation; + this.ns = this.operation.ns.toString(); + this.namespace = MongoDBNamespace.fromString(this.ns); this.cmd = cmd; - this.options = options; + this.options = this.operation.options; this.topology = topology; // All internal state this.cursorState = { cursorId: null, - cmd, + cmd: this.cmd, documents: options.documents || [], cursorIndex: 0, dead: false, @@ -218,14 +205,9 @@ class CoreCursor extends Readable { nextFunction(this, callback); } - /** - * Clone the cursor - * - * @function - * @returns {CoreCursor} - */ - clone(): CoreCursor { - return this.topology.cursor(this.ns, this.cmd, this.options); + /** Clone the cursor */ + clone(): this { + return new (this.constructor as any)(this.topology, this.operation, this.options); } /** @@ -552,89 +534,29 @@ class CoreCursor extends Readable { done(null, result); }; - if (cursor.operation) { - if (cursor.logger.isDebug()) { - cursor.logger.debug( - `issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify( - cursor.query - )}]` - ); - } - - executeOperation(cursor.topology, cursor.operation, (err?: any, result?: any) => { - if (err) { - done(err); - return; - } - - cursor.server = cursor.operation.server; - cursor.cursorState.init = true; - - // NOTE: this is a special internal method for cloning a cursor, consider removing - if (cursor.cursorState.cursorId != null) { - return done(); - } - - queryCallback(err, result); - }); - - return; - } - - // Very explicitly choose what is passed to selectServer - const serverSelectOptions = {} as any; - if (cursor.cursorState.session) { - serverSelectOptions.session = cursor.cursorState.session; - } - - if (cursor.operation) { - serverSelectOptions.readPreference = cursor.operation.readPreference; - } else if (cursor.options.readPreference) { - serverSelectOptions.readPreference = cursor.options.readPreference; + if (cursor.logger.isDebug()) { + cursor.logger.debug( + `issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify( + cursor.query + )}]` + ); } - return cursor.topology.selectServer(serverSelectOptions, (err?: any, server?: any) => { + executeOperation(cursor.topology, cursor.operation, (err?: any, result?: any) => { if (err) { - const disconnectHandler = cursor.disconnectHandler; - if (disconnectHandler != null) { - return disconnectHandler.addObjectAndMethod( - 'cursor', - cursor, - 'next', - [callback], - callback - ); - } - - return callback(err); + done(err); + return; } - cursor.server = server; + cursor.server = cursor.operation.server; cursor.cursorState.init = true; - if (collationNotSupported(cursor.server, cursor.cmd)) { - return callback(new MongoError(`server ${cursor.server.name} does not support collation`)); - } // NOTE: this is a special internal method for cloning a cursor, consider removing if (cursor.cursorState.cursorId != null) { return done(); } - if (cursor.logger.isDebug()) { - cursor.logger.debug( - `issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify( - cursor.query - )}]` - ); - } - - if (cursor.cmd.find != null) { - server.query(cursor.ns, cursor.cmd, cursor.cursorState, cursor.options, queryCallback); - return; - } - - const commandOptions = Object.assign({ session: cursor.cursorState.session }, cursor.options); - server.command(cursor.ns, cursor.cmd, commandOptions, queryCallback); + queryCallback(err, result); }); } } diff --git a/src/cursor/cursor.ts b/src/cursor/cursor.ts index 53de76e2a5..2895038585 100644 --- a/src/cursor/cursor.ts +++ b/src/cursor/cursor.ts @@ -9,6 +9,7 @@ import { handleCallback, maybePromise, formattedOrderClause } from '../utils'; import executeOperation = require('../operations/execute_operation'); import { each } from '../operations/cursor_ops'; import CountOperation = require('../operations/count'); +import type { OperationBase } from '../operations/operation'; /** * @file The **Cursor** class is an internal class that embodies a cursor on MongoDB @@ -97,12 +98,11 @@ const fields = ['numberOfRetries', 'tailableRetryInterval']; class Cursor extends CoreCursor { /** * @param {any} topology - * @param {any} ns - * @param {any} [cmd] + * @param {any} operation * @param {any} [options] */ - constructor(topology: any, ns: any, cmd?: any, options?: any) { - super(topology, ns, cmd, options); + constructor(topology: any, operation: OperationBase, options?: any) { + super(topology, operation, options); options = options || {}; if (this.operation) { @@ -690,13 +690,6 @@ class Cursor extends CoreCursor { * @param {(object|null|boolean)} result The result object if the command was executed successfully. */ - /** - * Clone the cursor - * - * @function external:CoreCursor#clone - * @returns {Cursor} - */ - /** * Resets the cursor * diff --git a/src/operations/command.ts b/src/operations/command.ts index ba9b4e1bc8..33945ae092 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -51,8 +51,12 @@ class CommandOperation extends OperationBase { this.writeConcern = resolveWriteConcern(propertyProvider, this.options); this.explain = false; + if (options && typeof options.fullResponse === 'boolean') { + this.fullResponse = options.fullResponse; + } + if (operationOptions && typeof operationOptions.fullResponse === 'boolean') { - this.fullResponse = true; + this.fullResponse = operationOptions.fullResponse; } // TODO: A lot of our code depends on having the read preference in the options. This should diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 03c71aa0d4..174b4bbc6d 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -1,3 +1,5 @@ +import type { Document } from '../types'; + const Aspect = { READ_OPERATION: Symbol('READ_OPERATION'), WRITE_OPERATION: Symbol('WRITE_OPERATION'), @@ -14,6 +16,7 @@ const Aspect = { */ class OperationBase { options: any; + cmd?: Document; constructor(options: any) { this.options = Object.assign({}, options); diff --git a/src/operations/run_command.ts b/src/operations/run_command.ts index ec51b7af87..4990ee15ec 100644 --- a/src/operations/run_command.ts +++ b/src/operations/run_command.ts @@ -9,7 +9,11 @@ import type { Server } from '../sdam/server'; class RunCommandOperation extends CommandOperation { command: any; - constructor(parent: MongoClient | Db | Collection, command: any, options: any) { + constructor( + parent: MongoClient | Db | Collection | { s: { namespace: MongoDBNamespace } }, + command: any, + options: any + ) { super(parent, options); this.command = command; } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 3805612033..8dd558b128 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -1,4 +1,4 @@ -import { emitDeprecatedOptionWarning, ClientMetadata } from '../utils'; +import { emitDeprecatedOptionWarning, ClientMetadata, MongoDBNamespace } from '../utils'; import Denque = require('denque'); import { EventEmitter } from 'events'; import { ReadPreference } from '../read_preference'; @@ -39,6 +39,7 @@ import type { CloseOptions } from '../cmap/connection_pool'; import type { Logger } from '..'; import type { DestroyOptions } from '../cmap/connection'; import type { CommandOptions } from '../cmap/wire_protocol/command'; +import { RunCommandOperation } from '../operations/run_command'; // Global state let globalTopologyCounter = 0; @@ -718,7 +719,14 @@ export class Topology extends EventEmitter { const CursorClass = options.cursorFactory ?? this.s.Cursor; ReadPreference.translate(options); - return new CursorClass(topology, ns, cmd, options); + return new CursorClass( + topology, + new RunCommandOperation({ s: { namespace: MongoDBNamespace.fromString(ns) } }, cmd, { + fullResponse: true, + ...options + }), + options + ); } get clientMetadata(): ClientMetadata { diff --git a/test/functional/core/cursor.test.js b/test/functional/core/cursor.test.js index cd7dc76b06..67bfeae927 100644 --- a/test/functional/core/cursor.test.js +++ b/test/functional/core/cursor.test.js @@ -11,7 +11,7 @@ describe('Cursor tests', function () { it('Should iterate cursor', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -38,8 +38,8 @@ describe('Cursor tests', function () { // Execute find var cursor = topology.cursor(ns, { - find: ns, - query: {}, + find: 'cursor1', + filter: {}, batchSize: 2 }); @@ -67,7 +67,7 @@ describe('Cursor tests', function () { it('Should iterate cursor but readBuffered', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -95,8 +95,8 @@ describe('Cursor tests', function () { // Execute find const cursor = topology.cursor(ns, { - find: ns, - query: {}, + find: 'cursor2', + filter: {}, batchSize: 5 }); @@ -127,7 +127,7 @@ describe('Cursor tests', function () { it('Should callback exhausted cursor with error', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -154,7 +154,7 @@ describe('Cursor tests', function () { expect(results.result.n).to.equal(1); // Execute find - const cursor = topology.cursor(ns, { find: ns, query: {}, batchSize: 5 }); + const cursor = topology.cursor(ns, { find: 'cursor3', filter: {}, batchSize: 5 }); // Execute next cursor._next((nextCursorErr, nextCursorD) => { @@ -183,7 +183,7 @@ describe('Cursor tests', function () { it('Should force a getMore call to happen', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -210,7 +210,7 @@ describe('Cursor tests', function () { expect(results.result.n).to.equal(3); // Execute find - const cursor = topology.cursor(ns, { find: ns, query: {}, batchSize: 2 }); + const cursor = topology.cursor(ns, { find: 'cursor4', filter: {}, batchSize: 2 }); // Execute next cursor._next((nextCursorErr, nextCursorD) => { @@ -239,7 +239,7 @@ describe('Cursor tests', function () { it('Should force a getMore call to happen then call killCursor', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -266,7 +266,7 @@ describe('Cursor tests', function () { expect(results.result.n).to.equal(3); // Execute find - const cursor = topology.cursor(ns, { find: ns, query: {}, batchSize: 2 }); + const cursor = topology.cursor(ns, { find: 'cursor4', filter: {}, batchSize: 2 }); // Execute next cursor._next((nextCursorErr, nextCursorD) => { @@ -301,7 +301,7 @@ describe('Cursor tests', function () { // Skipped due to usage of the topology manager it.skip('Should fail cursor correctly after server restart', { metadata: { - requires: { topology: ['single'] } + requires: { topology: ['single'], mongodb: '>=3.2' } }, test: function (done) { @@ -323,7 +323,7 @@ describe('Cursor tests', function () { expect(results.result.n).to.equal(3); // Execute find - var cursor = _server.cursor(ns, { find: ns, query: {}, batchSize: 2 }); + var cursor = _server.cursor(ns, { find: 'cursor5', filter: {}, batchSize: 2 }); // Execute next cursor._next(function (nextCursorErr, nextCursorD) { @@ -359,7 +359,7 @@ describe('Cursor tests', function () { // NOTE: a notoriously flakey test, needs rewriting // Commented out to stop before task from running and breaking auth tests // it.skip('should not hang if autoReconnect=false and pools sockets all timed out', { - // metadata: { requires: { topology: ['single'] } }, + // metadata: { requires: { topology: ['single'], mongodb: '>=3.2' } }, // test: function(done) { // var configuration = this.configuration, // Server = require('../../../src/core/topologies/server'), @@ -389,8 +389,8 @@ describe('Cursor tests', function () { // // Execute slow find // var cursor = server.cursor(ns, { - // find: ns, - // query: { $where: 'sleep(250) || true' }, + // find: 'cursor7', + // filter: { $where: 'sleep(250) || true' }, // batchSize: 1 // }); @@ -399,8 +399,8 @@ describe('Cursor tests', function () { // expect(err).to.exist; // cursor = server.cursor(ns, { - // find: ns, - // query: {}, + // find: 'cursor7', + // filter: {}, // batchSize: 1 // }); diff --git a/test/functional/core/extend_cursor.test.js b/test/functional/core/extend_cursor.test.js index 752165f814..a009409612 100644 --- a/test/functional/core/extend_cursor.test.js +++ b/test/functional/core/extend_cursor.test.js @@ -6,7 +6,7 @@ const { CoreCursor } = require('../../../src/cursor'); describe('Extend cursor tests', function () { it('should correctly extend the cursor with custom implementation', { metadata: { - requires: { topology: ['single'] } + requires: { topology: ['single'], mongodb: '>=3.2' } }, test: function (done) { @@ -67,7 +67,7 @@ describe('Extend cursor tests', function () { expect(results.result.n).to.equal(3); // Execute find - const cursor = topology.cursor(ns, { find: ns, query: {} }); + const cursor = topology.cursor(ns, { find: 'inserts_extend_cursors', filter: {} }); // Force a single // Logger.setLevel('debug'); diff --git a/test/functional/core/tailable_cursor.test.js b/test/functional/core/tailable_cursor.test.js index 2eae8580e2..4ae93e3c87 100644 --- a/test/functional/core/tailable_cursor.test.js +++ b/test/functional/core/tailable_cursor.test.js @@ -9,7 +9,7 @@ describe('Tailable cursor tests', function () { it('should correctly perform awaitdata', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -46,8 +46,8 @@ describe('Tailable cursor tests', function () { // Execute find const cursor = topology.cursor(ns, { - find: ns, - query: {}, + find: 'cursor_tailable', + filter: {}, batchSize: 2, tailable: true, awaitData: true diff --git a/test/functional/core/undefined.test.js b/test/functional/core/undefined.test.js index 682b7a73d3..9094cc30e1 100644 --- a/test/functional/core/undefined.test.js +++ b/test/functional/core/undefined.test.js @@ -7,7 +7,7 @@ const { ObjectId } = require('bson'); describe('A server', function () { it('should correctly execute insert culling undefined', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -40,8 +40,8 @@ describe('A server', function () { // Execute find var cursor = topology.cursor(ns, { - find: f('%s.insert1', self.configuration.db), - query: { _id: objectId }, + find: 'insert1', + filter: { _id: objectId }, batchSize: 2 }); @@ -63,7 +63,7 @@ describe('A server', function () { it('should correctly execute update culling undefined', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -100,8 +100,8 @@ describe('A server', function () { // Execute find const cursor = topology.cursor(ns, { - find: f('%s.update1', self.configuration.db), - query: { _id: objectId }, + find: 'update1', + filter: { _id: objectId }, batchSize: 2 });