Skip to content

Commit

Permalink
refactor!: only store topology on MongoClient (#2594)
Browse files Browse the repository at this point in the history
A topology is now only stored on MongoClient and not on Db or
Collection. These classes now store a parent and access the 
topology through that parent. External topology accessors have
been removed, and an internal `getTopology` util has been
added. Methods that require a topology can now run into cases
where the topology is undefined because the client is closed; an 
error is thrown in these cases.

NODE-2850
  • Loading branch information
HanaPearlman authored Nov 6, 2020
1 parent dcb1d20 commit 33fa6b2
Show file tree
Hide file tree
Showing 16 changed files with 183 additions and 211 deletions.
12 changes: 6 additions & 6 deletions src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from './operations/list_databases';
import { executeOperation } from './operations/execute_operation';
import { RunCommandOperation, RunCommandOptions } from './operations/run_command';
import type { Callback } from './utils';
import { Callback, getTopology } from './utils';
import type { Document } from './bson';
import type { CommandOperationOptions } from './operations/command';
import type { Db } from './db';
Expand Down Expand Up @@ -83,7 +83,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new RunCommandOperation(this.s.db, command, options),
callback
);
Expand Down Expand Up @@ -207,7 +207,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new AddUserOperation(this.s.db, username, password, options),
callback
);
Expand All @@ -233,7 +233,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new RemoveUserOperation(this.s.db, username, options),
callback
);
Expand Down Expand Up @@ -263,7 +263,7 @@ export class Admin {
options = options || {};

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new ValidateCollectionOperation(this, collectionName, options),
callback
);
Expand All @@ -287,7 +287,7 @@ export class Admin {
options = options || {};

return executeOperation(
this.s.db.s.topology,
getTopology(this.s.db),
new ListDatabasesOperation(this.s.db, options),
callback
);
Expand Down
5 changes: 3 additions & 2 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import {
hasAtomicOperators,
Callback,
MongoDBNamespace,
maxWireVersion
maxWireVersion,
getTopology
} from '../utils';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
Expand Down Expand Up @@ -904,7 +905,7 @@ export abstract class BulkOperationBase {
// determine whether bulkOperation is ordered or unordered
this.isOrdered = isOrdered;

const topology = collection.s.topology;
const topology = getTopology(collection);
options = options == null ? {} : options;
// TODO Bring from driver information in isMaster
// Get the namespace for the write operations
Expand Down
27 changes: 11 additions & 16 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@ import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError } from './error';
import { Cursor, CursorOptions, CursorStream, CursorStreamOptions } from './cursor/cursor';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import { loadCollection, loadDb, loadMongoClient } from './dynamic_loaders';
import {
relayEvents,
maxWireVersion,
calculateDurationInMs,
now,
maybePromise,
MongoDBNamespace,
Callback
Callback,
getTopology
} from './utils';
import type { ReadPreference } from './read_preference';
import type { Timestamp, Document } from './bson';
import type { Topology } from './sdam/topology';
import type { OperationParent } from './operations/command';
import type { CollationOptions } from './cmap/wire_protocol/write_command';
import { MongoClient } from './mongo_client';
import { Db } from './db';
import { Collection } from './collection';

const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');

Expand Down Expand Up @@ -172,10 +176,9 @@ export class ChangeStreamStream extends CursorStream {
export class ChangeStream extends EventEmitter {
pipeline: Document[];
options: ChangeStreamOptions;
parent: OperationParent;
parent: MongoClient | Db | Collection;
namespace: MongoDBNamespace;
type: symbol;
topology: Topology;
cursor?: ChangeStreamCursor;
closed: boolean;
streamOptions?: CursorStreamOptions;
Expand Down Expand Up @@ -212,31 +215,23 @@ export class ChangeStream extends EventEmitter {
) {
super();

const Collection = loadCollection();
const Db = loadDb();
const MongoClient = loadMongoClient();

this.pipeline = pipeline;
this.options = options;

this.parent = parent;
this.namespace = parent.s.namespace;
if (parent instanceof Collection) {
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
this.topology = parent.s.db.s.topology;
} else if (parent instanceof Db) {
this.type = CHANGE_DOMAIN_TYPES.DATABASE;
this.topology = parent.s.topology;
} else if (parent instanceof MongoClient) {
this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.topology = parent.topology!;
} else {
throw new TypeError(
'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
);
}

this.parent = parent;
this.namespace = parent.s.namespace;
if (!this.options.readPreference && parent.readPreference) {
this.options.readPreference = parent.readPreference;
}
Expand Down Expand Up @@ -483,7 +478,7 @@ function createChangeStreamCursor(
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const changeStreamCursor = new ChangeStreamCursor(
self.topology,
getTopology(self.parent),
new AggregateOperation(self.parent, pipeline, options),
cursorOptions
);
Expand Down Expand Up @@ -593,7 +588,7 @@ function processNewChange(
}

function processError(changeStream: ChangeStream, error: AnyError, callback?: Callback) {
const topology = changeStream.topology;
const topology = getTopology(changeStream.parent);
const cursor = changeStream.cursor;

// If the change stream has been closed explicitly, do not process error.
Expand Down
Loading

0 comments on commit 33fa6b2

Please sign in to comment.