Skip to content

Commit

Permalink
refactor(NODE-4632): async await in MongoClient, ClientSession, and A…
Browse files Browse the repository at this point in the history
…bstractCursor (mongodb#3428)
  • Loading branch information
nbbeeken authored and ZLY201 committed Nov 5, 2022
1 parent 8b139e4 commit 355454a
Show file tree
Hide file tree
Showing 28 changed files with 672 additions and 896 deletions.
56 changes: 25 additions & 31 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Readable } from 'stream';
import { promisify } from 'util';

import type { Binary, Document, Timestamp } from './bson';
import { Collection } from './collection';
Expand Down Expand Up @@ -239,7 +238,7 @@ export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
operationType: 'insert';
/** This key will contain the document being inserted */
fullDocument: TSchema;
/** Namespace the insert event occured on */
/** Namespace the insert event occurred on */
ns: ChangeStreamNameSpace;
}

Expand All @@ -262,7 +261,7 @@ export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
fullDocument?: TSchema;
/** Contains a description of updated and removed fields in this operation */
updateDescription: UpdateDescription<TSchema>;
/** Namespace the update event occured on */
/** Namespace the update event occurred on */
ns: ChangeStreamNameSpace;
/**
* Contains the pre-image of the modified or deleted document if the
Expand All @@ -285,7 +284,7 @@ export interface ChangeStreamReplaceDocument<TSchema extends Document = Document
operationType: 'replace';
/** The fullDocument of a replace event represents the document after the insert of the replacement document */
fullDocument: TSchema;
/** Namespace the replace event occured on */
/** Namespace the replace event occurred on */
ns: ChangeStreamNameSpace;
/**
* Contains the pre-image of the modified or deleted document if the
Expand All @@ -307,7 +306,7 @@ export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'delete';
/** Namespace the delete event occured on */
/** Namespace the delete event occurred on */
ns: ChangeStreamNameSpace;
/**
* Contains the pre-image of the modified or deleted document if the
Expand All @@ -328,7 +327,7 @@ export interface ChangeStreamDropDocument
ChangeStreamDocumentCollectionUUID {
/** Describes the type of operation represented in this change notification */
operationType: 'drop';
/** Namespace the drop event occured on */
/** Namespace the drop event occurred on */
ns: ChangeStreamNameSpace;
}

Expand All @@ -343,7 +342,7 @@ export interface ChangeStreamRenameDocument
operationType: 'rename';
/** The new name for the `ns.coll` collection */
to: { db: string; coll: string };
/** The "from" namespace that the rename occured on */
/** The "from" namespace that the rename occurred on */
ns: ChangeStreamNameSpace;
}

Expand Down Expand Up @@ -918,36 +917,31 @@ export class ChangeStream<
}
}

/**
* @internal
*
* TODO(NODE-4320): promisify selectServer and refactor this code to be async
*
* we promisify _processErrorIteratorModeCallback until we have a promisifed version of selectServer.
*/
// eslint-disable-next-line @typescript-eslint/unbound-method
private _processErrorIteratorMode = promisify(this._processErrorIteratorModeCallback);

/** @internal */
private _processErrorIteratorModeCallback(changeStreamError: AnyError, callback: Callback) {
private async _processErrorIteratorMode(changeStreamError: AnyError) {
if (this[kClosed]) {
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
return callback(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
}

if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
this.cursor.close().catch(() => null);

const topology = getTopology(this.parent);
topology.selectServer(this.cursor.readPreference, {}, serverSelectionError => {
// if the topology can't reconnect, close the stream
if (serverSelectionError) return this.close(() => callback(changeStreamError));
if (!isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
try {
await this.close();
} catch {
// ignore errors from close
}
throw changeStreamError;
}

this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
callback();
});
} else {
this.close(() => callback(changeStreamError));
await this.cursor.close().catch(() => null);
const topology = getTopology(this.parent);
try {
await topology.selectServerAsync(this.cursor.readPreference, {});
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
} catch {
// if the topology can't reconnect, close the stream
await this.close();
throw changeStreamError;
}
}
}
6 changes: 3 additions & 3 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ export class Collection<TSchema extends Document = Document> {
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection.
*
* @remarks
* watch() accepts two generic arguments for distinct usecases:
* watch() accepts two generic arguments for distinct use cases:
* - The first is to override the schema that may be defined for this specific collection
* - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
* @example
Expand Down Expand Up @@ -1603,7 +1603,7 @@ export class Collection<TSchema extends Document = Document> {
*
* @throws MongoNotConnectedError
* @remarks
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implemenation.
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implementation.
* However, `collection.bulkWrite()` provides an equivalent API that does not require prior connecting.
*/
initializeUnorderedBulkOp(options?: BulkWriteOptions): UnorderedBulkOperation {
Expand All @@ -1615,7 +1615,7 @@ export class Collection<TSchema extends Document = Document> {
*
* @throws MongoNotConnectedError
* @remarks
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implemenation.
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implementation.
* However, `collection.bulkWrite()` provides an equivalent API that does not require prior connecting.
*/
initializeOrderedBulkOp(options?: BulkWriteOptions): OrderedBulkOperation {
Expand Down
152 changes: 75 additions & 77 deletions src/connection_string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import { ReadPreference, ReadPreferenceMode } from './read_preference';
import type { TagSet } from './sdam/server_description';
import {
AnyOptions,
Callback,
DEFAULT_PK_FACTORY,
emitWarning,
emitWarningOnce,
Expand Down Expand Up @@ -70,97 +69,89 @@ function matchesParentDomain(srvAddress: string, parentDomain: string): boolean
* @param uri - The connection string to parse
* @param options - Optional user provided connection string options
*/
export function resolveSRVRecord(options: MongoOptions, callback: Callback<HostAddress[]>): void {
export async function resolveSRVRecord(options: MongoOptions): Promise<HostAddress[]> {
if (typeof options.srvHost !== 'string') {
return callback(new MongoAPIError('Option "srvHost" must not be empty'));
throw new MongoAPIError('Option "srvHost" must not be empty');
}

if (options.srvHost.split('.').length < 3) {
// TODO(NODE-3484): Replace with MongoConnectionStringError
return callback(new MongoAPIError('URI must include hostname, domain name, and tld'));
throw new MongoAPIError('URI must include hostname, domain name, and tld');
}

// Resolve the SRV record and use the result as the list of hosts to connect to.
const lookupAddress = options.srvHost;
dns.resolveSrv(`_${options.srvServiceName}._tcp.${lookupAddress}`, (err, addresses) => {
if (err) return callback(err);
const addresses = await dns.promises.resolveSrv(
`_${options.srvServiceName}._tcp.${lookupAddress}`
);

if (addresses.length === 0) {
return callback(new MongoAPIError('No addresses found at host'));
}
if (addresses.length === 0) {
throw new MongoAPIError('No addresses found at host');
}

for (const { name } of addresses) {
if (!matchesParentDomain(name, lookupAddress)) {
return callback(new MongoAPIError('Server record does not share hostname with parent URI'));
}
for (const { name } of addresses) {
if (!matchesParentDomain(name, lookupAddress)) {
throw new MongoAPIError('Server record does not share hostname with parent URI');
}
}

const hostAddresses = addresses.map(r =>
HostAddress.fromString(`${r.name}:${r.port ?? 27017}`)
);
const hostAddresses = addresses.map(r => HostAddress.fromString(`${r.name}:${r.port ?? 27017}`));

validateLoadBalancedOptions(hostAddresses, options, true);

const lbError = validateLoadBalancedOptions(hostAddresses, options, true);
if (lbError) {
return callback(lbError);
// Resolve TXT record and add options from there if they exist.
let record;
try {
record = await dns.promises.resolveTxt(lookupAddress);
} catch (error) {
if (error.code !== 'ENODATA' && error.code !== 'ENOTFOUND') {
throw error;
}
return hostAddresses;
}

// Resolve TXT record and add options from there if they exist.
dns.resolveTxt(lookupAddress, (err, record) => {
if (err) {
if (err.code !== 'ENODATA' && err.code !== 'ENOTFOUND') {
return callback(err);
}
} else {
if (record.length > 1) {
return callback(new MongoParseError('Multiple text records not allowed'));
}
if (record.length > 1) {
throw new MongoParseError('Multiple text records not allowed');
}

const txtRecordOptions = new URLSearchParams(record[0].join(''));
const txtRecordOptionKeys = [...txtRecordOptions.keys()];
if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) {
return callback(
new MongoParseError(`Text record may only set any of: ${VALID_TXT_RECORDS.join(', ')}`)
);
}
const txtRecordOptions = new URLSearchParams(record[0].join(''));
const txtRecordOptionKeys = [...txtRecordOptions.keys()];
if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) {
throw new MongoParseError(`Text record may only set any of: ${VALID_TXT_RECORDS.join(', ')}`);
}

if (VALID_TXT_RECORDS.some(option => txtRecordOptions.get(option) === '')) {
return callback(new MongoParseError('Cannot have empty URI params in DNS TXT Record'));
}
if (VALID_TXT_RECORDS.some(option => txtRecordOptions.get(option) === '')) {
throw new MongoParseError('Cannot have empty URI params in DNS TXT Record');
}

const source = txtRecordOptions.get('authSource') ?? undefined;
const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined;
const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined;

if (
!options.userSpecifiedAuthSource &&
source &&
options.credentials &&
!AUTH_MECHS_AUTH_SRC_EXTERNAL.has(options.credentials.mechanism)
) {
options.credentials = MongoCredentials.merge(options.credentials, { source });
}
const source = txtRecordOptions.get('authSource') ?? undefined;
const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined;
const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined;

if (!options.userSpecifiedReplicaSet && replicaSet) {
options.replicaSet = replicaSet;
}
if (
!options.userSpecifiedAuthSource &&
source &&
options.credentials &&
!AUTH_MECHS_AUTH_SRC_EXTERNAL.has(options.credentials.mechanism)
) {
options.credentials = MongoCredentials.merge(options.credentials, { source });
}

if (loadBalanced === 'true') {
options.loadBalanced = true;
}
if (!options.userSpecifiedReplicaSet && replicaSet) {
options.replicaSet = replicaSet;
}

if (options.replicaSet && options.srvMaxHosts > 0) {
return callback(new MongoParseError('Cannot combine replicaSet option with srvMaxHosts'));
}
if (loadBalanced === 'true') {
options.loadBalanced = true;
}

const lbError = validateLoadBalancedOptions(hostAddresses, options, true);
if (lbError) {
return callback(lbError);
}
}
if (options.replicaSet && options.srvMaxHosts > 0) {
throw new MongoParseError('Cannot combine replicaSet option with srvMaxHosts');
}

callback(undefined, hostAddresses);
});
});
validateLoadBalancedOptions(hostAddresses, options, true);

return hostAddresses;
}

/**
Expand Down Expand Up @@ -442,10 +433,8 @@ export function parseOptions(
PromiseProvider.set(options.promiseLibrary);
}

const lbError = validateLoadBalancedOptions(hosts, mongoOptions, isSRV);
if (lbError) {
throw lbError;
}
validateLoadBalancedOptions(hosts, mongoOptions, isSRV);

if (mongoClient && mongoOptions.autoEncryption) {
Encrypter.checkForMongoCrypt();
mongoOptions.encrypter = new Encrypter(mongoClient, uri, options);
Expand Down Expand Up @@ -522,24 +511,33 @@ export function parseOptions(
return mongoOptions;
}

/**
* #### Throws if LB mode is true:
* - hosts contains more than one host
* - there is a replicaSet name set
* - directConnection is set
* - if srvMaxHosts is used when an srv connection string is passed in
*
* @throws MongoParseError
*/
function validateLoadBalancedOptions(
hosts: HostAddress[] | string[],
mongoOptions: MongoOptions,
isSrv: boolean
): MongoParseError | undefined {
): void {
if (mongoOptions.loadBalanced) {
if (hosts.length > 1) {
return new MongoParseError(LB_SINGLE_HOST_ERROR);
throw new MongoParseError(LB_SINGLE_HOST_ERROR);
}
if (mongoOptions.replicaSet) {
return new MongoParseError(LB_REPLICA_SET_ERROR);
throw new MongoParseError(LB_REPLICA_SET_ERROR);
}
if (mongoOptions.directConnection) {
return new MongoParseError(LB_DIRECT_CONNECTION_ERROR);
throw new MongoParseError(LB_DIRECT_CONNECTION_ERROR);
}

if (isSrv && mongoOptions.srvMaxHosts > 0) {
return new MongoParseError('Cannot limit srv hosts with loadBalanced enabled');
throw new MongoParseError('Cannot limit srv hosts with loadBalanced enabled');
}
}
return;
Expand Down
Loading

0 comments on commit 355454a

Please sign in to comment.