Skip to content

Commit

Permalink
refactor: remove legacy cursor construction
Browse files Browse the repository at this point in the history
We no longer need to support construction of cursors without an
operation, so we can remove the logic in the constructor to check
for whether one was passed in or not

NODE-2763
  • Loading branch information
mbroadst committed Aug 11, 2020
1 parent 1a882fc commit 3becdc0
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 154 deletions.
5 changes: 3 additions & 2 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import ordered = require('./bulk/ordered');
import ChangeStream = require('./change_stream');
import { WriteConcern } from './write_concern';
import ReadConcern = require('./read_concern');
import { AggregationCursor, CommandCursor } from './cursor';
import { AggregationCursor, CommandCursor, Cursor } from './cursor';
import AggregateOperation = require('./operations/aggregate');
import BulkWriteOperation = require('./operations/bulk_write');
import CountDocumentsOperation = require('./operations/count_documents');
Expand Down Expand Up @@ -1594,7 +1594,8 @@ Collection.prototype.find = deprecateOptions(

decorateWithCollation(findCommand, this, options);

const cursor = this.s.topology.cursor(
const cursor = new Cursor(
this.s.topology,
new FindOperation(this, this.s.namespace, findCommand, newOptions),
newOptions
);
Expand Down
8 changes: 4 additions & 4 deletions src/cursor/command_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { ReadPreference } from '../read_preference';
import { MongoError } from '../error';
import Cursor = require('./cursor');
import { CursorState } from './core_cursor';
import type { OperationBase } from '../operations/operation';

/**
* @file The **CommandCursor** class is an internal class that embodies a
Expand Down Expand Up @@ -55,12 +56,11 @@ import { CursorState } from './core_cursor';
class CommandCursor extends Cursor {
/**
* @param {any} topology
* @param {any} ns
* @param {any} cmd
* @param {any} operation
* @param {any} [options]
*/
constructor(topology: any, ns: any, cmd: any, options?: any) {
super(topology, ns, cmd, options);
constructor(topology: any, operation: OperationBase, options?: any) {
super(topology, operation, options);
}

/**
Expand Down
128 changes: 25 additions & 103 deletions src/cursor/core_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import Logger = require('../logger');
import { ReadPreference } from '../read_preference';
import { handleCallback, collationNotSupported, MongoDBNamespace } from '../utils';
import { handleCallback, MongoDBNamespace } from '../utils';
import executeOperation = require('../operations/execute_operation');
import { Readable } from 'stream';
import { OperationBase } from '../operations/operation';
import { MongoError, MongoNetworkError } from '../error';
import { Long } from '../bson';
import type { BSONSerializeOptions } from '../types';
import type { OperationBase } from '../operations/operation';

export interface InternalCursorState extends BSONSerializeOptions {
[key: string]: any;
Expand Down Expand Up @@ -80,46 +80,33 @@ class CoreCursor extends Readable {
* Create a new core `Cursor` instance.
* **NOTE** Not to be instantiated directly
*
* @param {any} topology The server topology instance.
* @param {any} ns The MongoDB fully qualified namespace (ex: db1.collection1)
* @param {{object}|Long} cmd The selector (can be a command or a cursorId)
* @param {Topology} topology The server topology instance.
* @param {OperationBase} operation The operation to run against the cluster
* @param {object} [options=null] Optional settings.
* @param {object} [options.batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find/| find command documentation} and {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {Array} [options.documents=[]] Initial documents list for cursor
* @param {object} [options.transforms=null] Transform methods for the cursor results
* @param {Function} [options.transforms.query] Transform the value returned from the initial query
* @param {Function} [options.transforms.doc] Transform each document returned from Cursor.prototype._next
*/
constructor(topology: any, ns: any, cmd: any, options?: any) {
constructor(topology: any, operation: OperationBase, options?: any) {
super({ objectMode: true });
options = options || {};

if (ns instanceof OperationBase) {
this.operation = ns;
ns = this.operation.ns.toString();
options = this.operation.options;
cmd = this.operation.cmd ? this.operation.cmd : {};
}

// Cursor pool
this.pool = null;
// Cursor server
this.server = null;

// Do we have a not connected handler
this.disconnectHandler = options.disconnectHandler;
const cmd = operation.cmd ? operation.cmd : {};

// Set local values
this.ns = ns;
this.namespace = MongoDBNamespace.fromString(ns);
this.operation = operation;
this.ns = this.operation.ns.toString();
this.namespace = MongoDBNamespace.fromString(this.ns);
this.cmd = cmd;
this.options = options;
this.options = this.operation.options;
this.topology = topology;

// All internal state
this.cursorState = {
cursorId: null,
cmd,
cmd: this.cmd,
documents: options.documents || [],
cursorIndex: 0,
dead: false,
Expand Down Expand Up @@ -218,14 +205,9 @@ class CoreCursor extends Readable {
nextFunction(this, callback);
}

/**
* Clone the cursor
*
* @function
* @returns {CoreCursor}
*/
clone(): CoreCursor {
return this.topology.cursor(this.ns, this.cmd, this.options);
/** Clone the cursor */
clone(): this {
return new (this.constructor as any)(this.topology, this.operation, this.options);
}

/**
Expand Down Expand Up @@ -552,89 +534,29 @@ class CoreCursor extends Readable {
done(null, result);
};

if (cursor.operation) {
if (cursor.logger.isDebug()) {
cursor.logger.debug(
`issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify(
cursor.query
)}]`
);
}

executeOperation(cursor.topology, cursor.operation, (err?: any, result?: any) => {
if (err) {
done(err);
return;
}

cursor.server = cursor.operation.server;
cursor.cursorState.init = true;

// NOTE: this is a special internal method for cloning a cursor, consider removing
if (cursor.cursorState.cursorId != null) {
return done();
}

queryCallback(err, result);
});

return;
}

// Very explicitly choose what is passed to selectServer
const serverSelectOptions = {} as any;
if (cursor.cursorState.session) {
serverSelectOptions.session = cursor.cursorState.session;
}

if (cursor.operation) {
serverSelectOptions.readPreference = cursor.operation.readPreference;
} else if (cursor.options.readPreference) {
serverSelectOptions.readPreference = cursor.options.readPreference;
if (cursor.logger.isDebug()) {
cursor.logger.debug(
`issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify(
cursor.query
)}]`
);
}

return cursor.topology.selectServer(serverSelectOptions, (err?: any, server?: any) => {
executeOperation(cursor.topology, cursor.operation, (err?: any, result?: any) => {
if (err) {
const disconnectHandler = cursor.disconnectHandler;
if (disconnectHandler != null) {
return disconnectHandler.addObjectAndMethod(
'cursor',
cursor,
'next',
[callback],
callback
);
}

return callback(err);
done(err);
return;
}

cursor.server = server;
cursor.server = cursor.operation.server;
cursor.cursorState.init = true;
if (collationNotSupported(cursor.server, cursor.cmd)) {
return callback(new MongoError(`server ${cursor.server.name} does not support collation`));
}

// NOTE: this is a special internal method for cloning a cursor, consider removing
if (cursor.cursorState.cursorId != null) {
return done();
}

if (cursor.logger.isDebug()) {
cursor.logger.debug(
`issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify(
cursor.query
)}]`
);
}

if (cursor.cmd.find != null) {
server.query(cursor.ns, cursor.cmd, cursor.cursorState, cursor.options, queryCallback);
return;
}

const commandOptions = Object.assign({ session: cursor.cursorState.session }, cursor.options);
server.command(cursor.ns, cursor.cmd, commandOptions, queryCallback);
queryCallback(err, result);
});
}
}
Expand Down
15 changes: 4 additions & 11 deletions src/cursor/cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { handleCallback, maybePromise, formattedOrderClause } from '../utils';
import executeOperation = require('../operations/execute_operation');
import { each } from '../operations/cursor_ops';
import CountOperation = require('../operations/count');
import type { OperationBase } from '../operations/operation';

/**
* @file The **Cursor** class is an internal class that embodies a cursor on MongoDB
Expand Down Expand Up @@ -97,12 +98,11 @@ const fields = ['numberOfRetries', 'tailableRetryInterval'];
class Cursor extends CoreCursor {
/**
* @param {any} topology
* @param {any} ns
* @param {any} [cmd]
* @param {any} operation
* @param {any} [options]
*/
constructor(topology: any, ns: any, cmd?: any, options?: any) {
super(topology, ns, cmd, options);
constructor(topology: any, operation: OperationBase, options?: any) {
super(topology, operation, options);

options = options || {};
if (this.operation) {
Expand Down Expand Up @@ -690,13 +690,6 @@ class Cursor extends CoreCursor {
* @param {(object|null|boolean)} result The result object if the command was executed successfully.
*/

/**
* Clone the cursor
*
* @function external:CoreCursor#clone
* @returns {Cursor}
*/

/**
* Resets the cursor
*
Expand Down
6 changes: 5 additions & 1 deletion src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ class CommandOperation extends OperationBase {
this.writeConcern = resolveWriteConcern(propertyProvider, this.options);
this.explain = false;

if (options && typeof options.fullResponse === 'boolean') {
this.fullResponse = options.fullResponse;
}

if (operationOptions && typeof operationOptions.fullResponse === 'boolean') {
this.fullResponse = true;
this.fullResponse = operationOptions.fullResponse;
}

// TODO: A lot of our code depends on having the read preference in the options. This should
Expand Down
3 changes: 3 additions & 0 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { Document } from '../types';

const Aspect = {
READ_OPERATION: Symbol('READ_OPERATION'),
WRITE_OPERATION: Symbol('WRITE_OPERATION'),
Expand All @@ -14,6 +16,7 @@ const Aspect = {
*/
class OperationBase {
options: any;
cmd?: Document;

constructor(options: any) {
this.options = Object.assign({}, options);
Expand Down
6 changes: 5 additions & 1 deletion src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import type { Server } from '../sdam/server';
class RunCommandOperation extends CommandOperation {
command: any;

constructor(parent: MongoClient | Db | Collection, command: any, options: any) {
constructor(
parent: MongoClient | Db | Collection | { s: { namespace: MongoDBNamespace } },
command: any,
options: any
) {
super(parent, options);
this.command = command;
}
Expand Down
12 changes: 10 additions & 2 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { emitDeprecatedOptionWarning, ClientMetadata } from '../utils';
import { emitDeprecatedOptionWarning, ClientMetadata, MongoDBNamespace } from '../utils';
import Denque = require('denque');
import { EventEmitter } from 'events';
import { ReadPreference } from '../read_preference';
Expand Down Expand Up @@ -39,6 +39,7 @@ import type { CloseOptions } from '../cmap/connection_pool';
import type { Logger } from '..';
import type { DestroyOptions } from '../cmap/connection';
import type { CommandOptions } from '../cmap/wire_protocol/command';
import { RunCommandOperation } from '../operations/run_command';

// Global state
let globalTopologyCounter = 0;
Expand Down Expand Up @@ -718,7 +719,14 @@ export class Topology extends EventEmitter {
const CursorClass = options.cursorFactory ?? this.s.Cursor;
ReadPreference.translate(options);

return new CursorClass(topology, ns, cmd, options);
return new CursorClass(
topology,
new RunCommandOperation({ s: { namespace: MongoDBNamespace.fromString(ns) } }, cmd, {
fullResponse: true,
...options
}),
options
);
}

get clientMetadata(): ClientMetadata {
Expand Down
Loading

0 comments on commit 3becdc0

Please sign in to comment.