From 3becdc0a21ec1223414374df600a7382a502bdf1 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Tue, 11 Aug 2020 17:32:22 -0400 Subject: [PATCH] refactor: remove legacy cursor construction We no longer need to support construction of cursors without an operation, so we can remove the logic in the constructor to check for whether one was passed in or not NODE-2763 --- src/collection.ts | 5 +- src/cursor/command_cursor.ts | 8 +- src/cursor/core_cursor.ts | 128 ++++--------------- src/cursor/cursor.ts | 15 +-- src/operations/command.ts | 6 +- src/operations/operation.ts | 3 + src/operations/run_command.ts | 6 +- src/sdam/topology.ts | 12 +- test/functional/core/cursor.test.js | 38 +++--- test/functional/core/extend_cursor.test.js | 4 +- test/functional/core/tailable_cursor.test.js | 6 +- test/functional/core/undefined.test.js | 12 +- 12 files changed, 89 insertions(+), 154 deletions(-) diff --git a/src/collection.ts b/src/collection.ts index 017a8bdfed..c3a9fab721 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -19,7 +19,7 @@ import ordered = require('./bulk/ordered'); import ChangeStream = require('./change_stream'); import { WriteConcern } from './write_concern'; import ReadConcern = require('./read_concern'); -import { AggregationCursor, CommandCursor } from './cursor'; +import { AggregationCursor, CommandCursor, Cursor } from './cursor'; import AggregateOperation = require('./operations/aggregate'); import BulkWriteOperation = require('./operations/bulk_write'); import CountDocumentsOperation = require('./operations/count_documents'); @@ -1594,7 +1594,8 @@ Collection.prototype.find = deprecateOptions( decorateWithCollation(findCommand, this, options); - const cursor = this.s.topology.cursor( + const cursor = new Cursor( + this.s.topology, new FindOperation(this, this.s.namespace, findCommand, newOptions), newOptions ); diff --git a/src/cursor/command_cursor.ts b/src/cursor/command_cursor.ts index 54d92c6adb..8a0f1e2b54 100644 --- a/src/cursor/command_cursor.ts +++ b/src/cursor/command_cursor.ts @@ -2,6 +2,7 @@ import { ReadPreference } from '../read_preference'; import { MongoError } from '../error'; import Cursor = require('./cursor'); import { CursorState } from './core_cursor'; +import type { OperationBase } from '../operations/operation'; /** * @file The **CommandCursor** class is an internal class that embodies a @@ -55,12 +56,11 @@ import { CursorState } from './core_cursor'; class CommandCursor extends Cursor { /** * @param {any} topology - * @param {any} ns - * @param {any} cmd + * @param {any} operation * @param {any} [options] */ - constructor(topology: any, ns: any, cmd: any, options?: any) { - super(topology, ns, cmd, options); + constructor(topology: any, operation: OperationBase, options?: any) { + super(topology, operation, options); } /** diff --git a/src/cursor/core_cursor.ts b/src/cursor/core_cursor.ts index bed0162338..e9cb7e931e 100644 --- a/src/cursor/core_cursor.ts +++ b/src/cursor/core_cursor.ts @@ -1,12 +1,12 @@ import Logger = require('../logger'); import { ReadPreference } from '../read_preference'; -import { handleCallback, collationNotSupported, MongoDBNamespace } from '../utils'; +import { handleCallback, MongoDBNamespace } from '../utils'; import executeOperation = require('../operations/execute_operation'); import { Readable } from 'stream'; -import { OperationBase } from '../operations/operation'; import { MongoError, MongoNetworkError } from '../error'; import { Long } from '../bson'; import type { BSONSerializeOptions } from '../types'; +import type { OperationBase } from '../operations/operation'; export interface InternalCursorState extends BSONSerializeOptions { [key: string]: any; @@ -80,9 +80,8 @@ class CoreCursor extends Readable { * Create a new core `Cursor` instance. * **NOTE** Not to be instantiated directly * - * @param {any} topology The server topology instance. - * @param {any} ns The MongoDB fully qualified namespace (ex: db1.collection1) - * @param {{object}|Long} cmd The selector (can be a command or a cursorId) + * @param {Topology} topology The server topology instance. + * @param {OperationBase} operation The operation to run against the cluster * @param {object} [options=null] Optional settings. * @param {object} [options.batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find/| find command documentation} and {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. * @param {Array} [options.documents=[]] Initial documents list for cursor @@ -90,36 +89,24 @@ class CoreCursor extends Readable { * @param {Function} [options.transforms.query] Transform the value returned from the initial query * @param {Function} [options.transforms.doc] Transform each document returned from Cursor.prototype._next */ - constructor(topology: any, ns: any, cmd: any, options?: any) { + constructor(topology: any, operation: OperationBase, options?: any) { super({ objectMode: true }); options = options || {}; - if (ns instanceof OperationBase) { - this.operation = ns; - ns = this.operation.ns.toString(); - options = this.operation.options; - cmd = this.operation.cmd ? this.operation.cmd : {}; - } - - // Cursor pool - this.pool = null; - // Cursor server - this.server = null; - - // Do we have a not connected handler - this.disconnectHandler = options.disconnectHandler; + const cmd = operation.cmd ? operation.cmd : {}; // Set local values - this.ns = ns; - this.namespace = MongoDBNamespace.fromString(ns); + this.operation = operation; + this.ns = this.operation.ns.toString(); + this.namespace = MongoDBNamespace.fromString(this.ns); this.cmd = cmd; - this.options = options; + this.options = this.operation.options; this.topology = topology; // All internal state this.cursorState = { cursorId: null, - cmd, + cmd: this.cmd, documents: options.documents || [], cursorIndex: 0, dead: false, @@ -218,14 +205,9 @@ class CoreCursor extends Readable { nextFunction(this, callback); } - /** - * Clone the cursor - * - * @function - * @returns {CoreCursor} - */ - clone(): CoreCursor { - return this.topology.cursor(this.ns, this.cmd, this.options); + /** Clone the cursor */ + clone(): this { + return new (this.constructor as any)(this.topology, this.operation, this.options); } /** @@ -552,89 +534,29 @@ class CoreCursor extends Readable { done(null, result); }; - if (cursor.operation) { - if (cursor.logger.isDebug()) { - cursor.logger.debug( - `issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify( - cursor.query - )}]` - ); - } - - executeOperation(cursor.topology, cursor.operation, (err?: any, result?: any) => { - if (err) { - done(err); - return; - } - - cursor.server = cursor.operation.server; - cursor.cursorState.init = true; - - // NOTE: this is a special internal method for cloning a cursor, consider removing - if (cursor.cursorState.cursorId != null) { - return done(); - } - - queryCallback(err, result); - }); - - return; - } - - // Very explicitly choose what is passed to selectServer - const serverSelectOptions = {} as any; - if (cursor.cursorState.session) { - serverSelectOptions.session = cursor.cursorState.session; - } - - if (cursor.operation) { - serverSelectOptions.readPreference = cursor.operation.readPreference; - } else if (cursor.options.readPreference) { - serverSelectOptions.readPreference = cursor.options.readPreference; + if (cursor.logger.isDebug()) { + cursor.logger.debug( + `issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify( + cursor.query + )}]` + ); } - return cursor.topology.selectServer(serverSelectOptions, (err?: any, server?: any) => { + executeOperation(cursor.topology, cursor.operation, (err?: any, result?: any) => { if (err) { - const disconnectHandler = cursor.disconnectHandler; - if (disconnectHandler != null) { - return disconnectHandler.addObjectAndMethod( - 'cursor', - cursor, - 'next', - [callback], - callback - ); - } - - return callback(err); + done(err); + return; } - cursor.server = server; + cursor.server = cursor.operation.server; cursor.cursorState.init = true; - if (collationNotSupported(cursor.server, cursor.cmd)) { - return callback(new MongoError(`server ${cursor.server.name} does not support collation`)); - } // NOTE: this is a special internal method for cloning a cursor, consider removing if (cursor.cursorState.cursorId != null) { return done(); } - if (cursor.logger.isDebug()) { - cursor.logger.debug( - `issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify( - cursor.query - )}]` - ); - } - - if (cursor.cmd.find != null) { - server.query(cursor.ns, cursor.cmd, cursor.cursorState, cursor.options, queryCallback); - return; - } - - const commandOptions = Object.assign({ session: cursor.cursorState.session }, cursor.options); - server.command(cursor.ns, cursor.cmd, commandOptions, queryCallback); + queryCallback(err, result); }); } } diff --git a/src/cursor/cursor.ts b/src/cursor/cursor.ts index 53de76e2a5..2895038585 100644 --- a/src/cursor/cursor.ts +++ b/src/cursor/cursor.ts @@ -9,6 +9,7 @@ import { handleCallback, maybePromise, formattedOrderClause } from '../utils'; import executeOperation = require('../operations/execute_operation'); import { each } from '../operations/cursor_ops'; import CountOperation = require('../operations/count'); +import type { OperationBase } from '../operations/operation'; /** * @file The **Cursor** class is an internal class that embodies a cursor on MongoDB @@ -97,12 +98,11 @@ const fields = ['numberOfRetries', 'tailableRetryInterval']; class Cursor extends CoreCursor { /** * @param {any} topology - * @param {any} ns - * @param {any} [cmd] + * @param {any} operation * @param {any} [options] */ - constructor(topology: any, ns: any, cmd?: any, options?: any) { - super(topology, ns, cmd, options); + constructor(topology: any, operation: OperationBase, options?: any) { + super(topology, operation, options); options = options || {}; if (this.operation) { @@ -690,13 +690,6 @@ class Cursor extends CoreCursor { * @param {(object|null|boolean)} result The result object if the command was executed successfully. */ - /** - * Clone the cursor - * - * @function external:CoreCursor#clone - * @returns {Cursor} - */ - /** * Resets the cursor * diff --git a/src/operations/command.ts b/src/operations/command.ts index ba9b4e1bc8..33945ae092 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -51,8 +51,12 @@ class CommandOperation extends OperationBase { this.writeConcern = resolveWriteConcern(propertyProvider, this.options); this.explain = false; + if (options && typeof options.fullResponse === 'boolean') { + this.fullResponse = options.fullResponse; + } + if (operationOptions && typeof operationOptions.fullResponse === 'boolean') { - this.fullResponse = true; + this.fullResponse = operationOptions.fullResponse; } // TODO: A lot of our code depends on having the read preference in the options. This should diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 03c71aa0d4..174b4bbc6d 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -1,3 +1,5 @@ +import type { Document } from '../types'; + const Aspect = { READ_OPERATION: Symbol('READ_OPERATION'), WRITE_OPERATION: Symbol('WRITE_OPERATION'), @@ -14,6 +16,7 @@ const Aspect = { */ class OperationBase { options: any; + cmd?: Document; constructor(options: any) { this.options = Object.assign({}, options); diff --git a/src/operations/run_command.ts b/src/operations/run_command.ts index ec51b7af87..4990ee15ec 100644 --- a/src/operations/run_command.ts +++ b/src/operations/run_command.ts @@ -9,7 +9,11 @@ import type { Server } from '../sdam/server'; class RunCommandOperation extends CommandOperation { command: any; - constructor(parent: MongoClient | Db | Collection, command: any, options: any) { + constructor( + parent: MongoClient | Db | Collection | { s: { namespace: MongoDBNamespace } }, + command: any, + options: any + ) { super(parent, options); this.command = command; } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 3805612033..8dd558b128 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -1,4 +1,4 @@ -import { emitDeprecatedOptionWarning, ClientMetadata } from '../utils'; +import { emitDeprecatedOptionWarning, ClientMetadata, MongoDBNamespace } from '../utils'; import Denque = require('denque'); import { EventEmitter } from 'events'; import { ReadPreference } from '../read_preference'; @@ -39,6 +39,7 @@ import type { CloseOptions } from '../cmap/connection_pool'; import type { Logger } from '..'; import type { DestroyOptions } from '../cmap/connection'; import type { CommandOptions } from '../cmap/wire_protocol/command'; +import { RunCommandOperation } from '../operations/run_command'; // Global state let globalTopologyCounter = 0; @@ -718,7 +719,14 @@ export class Topology extends EventEmitter { const CursorClass = options.cursorFactory ?? this.s.Cursor; ReadPreference.translate(options); - return new CursorClass(topology, ns, cmd, options); + return new CursorClass( + topology, + new RunCommandOperation({ s: { namespace: MongoDBNamespace.fromString(ns) } }, cmd, { + fullResponse: true, + ...options + }), + options + ); } get clientMetadata(): ClientMetadata { diff --git a/test/functional/core/cursor.test.js b/test/functional/core/cursor.test.js index cd7dc76b06..67bfeae927 100644 --- a/test/functional/core/cursor.test.js +++ b/test/functional/core/cursor.test.js @@ -11,7 +11,7 @@ describe('Cursor tests', function () { it('Should iterate cursor', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -38,8 +38,8 @@ describe('Cursor tests', function () { // Execute find var cursor = topology.cursor(ns, { - find: ns, - query: {}, + find: 'cursor1', + filter: {}, batchSize: 2 }); @@ -67,7 +67,7 @@ describe('Cursor tests', function () { it('Should iterate cursor but readBuffered', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -95,8 +95,8 @@ describe('Cursor tests', function () { // Execute find const cursor = topology.cursor(ns, { - find: ns, - query: {}, + find: 'cursor2', + filter: {}, batchSize: 5 }); @@ -127,7 +127,7 @@ describe('Cursor tests', function () { it('Should callback exhausted cursor with error', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -154,7 +154,7 @@ describe('Cursor tests', function () { expect(results.result.n).to.equal(1); // Execute find - const cursor = topology.cursor(ns, { find: ns, query: {}, batchSize: 5 }); + const cursor = topology.cursor(ns, { find: 'cursor3', filter: {}, batchSize: 5 }); // Execute next cursor._next((nextCursorErr, nextCursorD) => { @@ -183,7 +183,7 @@ describe('Cursor tests', function () { it('Should force a getMore call to happen', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -210,7 +210,7 @@ describe('Cursor tests', function () { expect(results.result.n).to.equal(3); // Execute find - const cursor = topology.cursor(ns, { find: ns, query: {}, batchSize: 2 }); + const cursor = topology.cursor(ns, { find: 'cursor4', filter: {}, batchSize: 2 }); // Execute next cursor._next((nextCursorErr, nextCursorD) => { @@ -239,7 +239,7 @@ describe('Cursor tests', function () { it('Should force a getMore call to happen then call killCursor', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -266,7 +266,7 @@ describe('Cursor tests', function () { expect(results.result.n).to.equal(3); // Execute find - const cursor = topology.cursor(ns, { find: ns, query: {}, batchSize: 2 }); + const cursor = topology.cursor(ns, { find: 'cursor4', filter: {}, batchSize: 2 }); // Execute next cursor._next((nextCursorErr, nextCursorD) => { @@ -301,7 +301,7 @@ describe('Cursor tests', function () { // Skipped due to usage of the topology manager it.skip('Should fail cursor correctly after server restart', { metadata: { - requires: { topology: ['single'] } + requires: { topology: ['single'], mongodb: '>=3.2' } }, test: function (done) { @@ -323,7 +323,7 @@ describe('Cursor tests', function () { expect(results.result.n).to.equal(3); // Execute find - var cursor = _server.cursor(ns, { find: ns, query: {}, batchSize: 2 }); + var cursor = _server.cursor(ns, { find: 'cursor5', filter: {}, batchSize: 2 }); // Execute next cursor._next(function (nextCursorErr, nextCursorD) { @@ -359,7 +359,7 @@ describe('Cursor tests', function () { // NOTE: a notoriously flakey test, needs rewriting // Commented out to stop before task from running and breaking auth tests // it.skip('should not hang if autoReconnect=false and pools sockets all timed out', { - // metadata: { requires: { topology: ['single'] } }, + // metadata: { requires: { topology: ['single'], mongodb: '>=3.2' } }, // test: function(done) { // var configuration = this.configuration, // Server = require('../../../src/core/topologies/server'), @@ -389,8 +389,8 @@ describe('Cursor tests', function () { // // Execute slow find // var cursor = server.cursor(ns, { - // find: ns, - // query: { $where: 'sleep(250) || true' }, + // find: 'cursor7', + // filter: { $where: 'sleep(250) || true' }, // batchSize: 1 // }); @@ -399,8 +399,8 @@ describe('Cursor tests', function () { // expect(err).to.exist; // cursor = server.cursor(ns, { - // find: ns, - // query: {}, + // find: 'cursor7', + // filter: {}, // batchSize: 1 // }); diff --git a/test/functional/core/extend_cursor.test.js b/test/functional/core/extend_cursor.test.js index 752165f814..a009409612 100644 --- a/test/functional/core/extend_cursor.test.js +++ b/test/functional/core/extend_cursor.test.js @@ -6,7 +6,7 @@ const { CoreCursor } = require('../../../src/cursor'); describe('Extend cursor tests', function () { it('should correctly extend the cursor with custom implementation', { metadata: { - requires: { topology: ['single'] } + requires: { topology: ['single'], mongodb: '>=3.2' } }, test: function (done) { @@ -67,7 +67,7 @@ describe('Extend cursor tests', function () { expect(results.result.n).to.equal(3); // Execute find - const cursor = topology.cursor(ns, { find: ns, query: {} }); + const cursor = topology.cursor(ns, { find: 'inserts_extend_cursors', filter: {} }); // Force a single // Logger.setLevel('debug'); diff --git a/test/functional/core/tailable_cursor.test.js b/test/functional/core/tailable_cursor.test.js index 2eae8580e2..4ae93e3c87 100644 --- a/test/functional/core/tailable_cursor.test.js +++ b/test/functional/core/tailable_cursor.test.js @@ -9,7 +9,7 @@ describe('Tailable cursor tests', function () { it('should correctly perform awaitdata', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -46,8 +46,8 @@ describe('Tailable cursor tests', function () { // Execute find const cursor = topology.cursor(ns, { - find: ns, - query: {}, + find: 'cursor_tailable', + filter: {}, batchSize: 2, tailable: true, awaitData: true diff --git a/test/functional/core/undefined.test.js b/test/functional/core/undefined.test.js index 682b7a73d3..9094cc30e1 100644 --- a/test/functional/core/undefined.test.js +++ b/test/functional/core/undefined.test.js @@ -7,7 +7,7 @@ const { ObjectId } = require('bson'); describe('A server', function () { it('should correctly execute insert culling undefined', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -40,8 +40,8 @@ describe('A server', function () { // Execute find var cursor = topology.cursor(ns, { - find: f('%s.insert1', self.configuration.db), - query: { _id: objectId }, + find: 'insert1', + filter: { _id: objectId }, batchSize: 2 }); @@ -63,7 +63,7 @@ describe('A server', function () { it('should correctly execute update culling undefined', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset', 'sharded'], mongodb: '>=3.2' } }, test: function (done) { @@ -100,8 +100,8 @@ describe('A server', function () { // Execute find const cursor = topology.cursor(ns, { - find: f('%s.update1', self.configuration.db), - query: { _id: objectId }, + find: 'update1', + filter: { _id: objectId }, batchSize: 2 });