diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 8d42bd9d17..8cb819aecf 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -7,14 +7,13 @@ import { hasAtomicOperators, Callback, MongoDBNamespace, - maxWireVersion, getTopology, resolveOptions } from '../utils'; import { executeOperation } from '../operations/execute_operation'; import { InsertOperation } from '../operations/insert'; -import { UpdateOperation, UpdateStatement } from '../operations/update'; -import { DeleteOperation, DeleteStatement } from '../operations/delete'; +import { UpdateOperation, UpdateStatement, makeUpdateStatement } from '../operations/update'; +import { DeleteOperation, DeleteStatement, makeDeleteStatement } from '../operations/delete'; import { WriteConcern } from '../write_concern'; import type { Collection } from '../collection'; import type { Topology } from '../sdam/topology'; @@ -727,97 +726,66 @@ export class FindOperators { /** Add a multiple update operation to the bulk operation */ update(updateDocument: Document): BulkOperationBase { - if (!this.bulkOperation.s.currentOp) { - this.bulkOperation.s.currentOp = {}; - } - - // Perform upsert - const upsert = - typeof this.bulkOperation.s.currentOp.upsert === 'boolean' - ? this.bulkOperation.s.currentOp.upsert - : false; - - // Establish the update command - const document: Document = { - q: this.bulkOperation.s.currentOp.selector, - u: updateDocument, - multi: true, - upsert: upsert - }; - - if (updateDocument.hint) { - document.hint = updateDocument.hint; - } - - // Clear out current Op - this.bulkOperation.s.currentOp = undefined; - return this.bulkOperation.addToOperationsList(BatchType.UPDATE, document); + const currentOp = buildCurrentOp(this.bulkOperation); + return this.bulkOperation.addToOperationsList( + BatchType.UPDATE, + makeUpdateStatement(currentOp.selector, updateDocument, { + ...currentOp, + multi: true + }) + ); } /** Add a single update operation to the bulk operation */ updateOne(updateDocument: Document): BulkOperationBase { - if (!this.bulkOperation.s.currentOp) { - this.bulkOperation.s.currentOp = {}; - } - - // Perform upsert - const upsert = - typeof this.bulkOperation.s.currentOp.upsert === 'boolean' - ? this.bulkOperation.s.currentOp.upsert - : false; - - // Establish the update command - const document: Document = { - q: this.bulkOperation.s.currentOp.selector, - u: updateDocument, - multi: false, - upsert: upsert - }; - - if (updateDocument.hint) { - document.hint = updateDocument.hint; - } - if (!hasAtomicOperators(updateDocument)) { throw new TypeError('Update document requires atomic operators'); } - // Clear out current Op - this.bulkOperation.s.currentOp = undefined; - return this.bulkOperation.addToOperationsList(BatchType.UPDATE, document); + const currentOp = buildCurrentOp(this.bulkOperation); + return this.bulkOperation.addToOperationsList( + BatchType.UPDATE, + makeUpdateStatement(currentOp.selector, updateDocument, { ...currentOp, multi: false }) + ); } /** Add a replace one operation to the bulk operation */ replaceOne(replacement: Document): BulkOperationBase { - if (!this.bulkOperation.s.currentOp) { - this.bulkOperation.s.currentOp = {}; + if (hasAtomicOperators(replacement)) { + throw new TypeError('Replacement document must not use atomic operators'); } - // Perform upsert - const upsert = - typeof this.bulkOperation.s.currentOp.upsert === 'boolean' - ? this.bulkOperation.s.currentOp.upsert - : false; - - // Establish the update command - const document: Document = { - q: this.bulkOperation.s.currentOp.selector, - u: replacement, - multi: false, - upsert: upsert - }; + const currentOp = buildCurrentOp(this.bulkOperation); + return this.bulkOperation.addToOperationsList( + BatchType.UPDATE, + makeUpdateStatement(currentOp.selector, replacement, { ...currentOp, multi: false }) + ); + } - if (replacement.hint) { - document.hint = replacement.hint; - } + /** Add a delete one operation to the bulk operation */ + deleteOne(): BulkOperationBase { + const currentOp = buildCurrentOp(this.bulkOperation); + return this.bulkOperation.addToOperationsList( + BatchType.DELETE, + makeDeleteStatement(currentOp.selector, { ...currentOp, limit: 1 }) + ); + } - if (hasAtomicOperators(replacement)) { - throw new TypeError('Replacement document must not use atomic operators'); - } + /** Add a delete many operation to the bulk operation */ + delete(): BulkOperationBase { + const currentOp = buildCurrentOp(this.bulkOperation); + return this.bulkOperation.addToOperationsList( + BatchType.DELETE, + makeDeleteStatement(currentOp.selector, { ...currentOp, limit: 0 }) + ); + } - // Clear out current Op - this.bulkOperation.s.currentOp = undefined; - return this.bulkOperation.addToOperationsList(BatchType.UPDATE, document); + removeOne(): BulkOperationBase { + return this.deleteOne(); + } + + remove(): BulkOperationBase { + return this.delete(); } /** Upsert modifier for update bulk operation, noting that this operation is an upsert. */ @@ -830,46 +798,14 @@ export class FindOperators { return this; } - /** Add a delete one operation to the bulk operation */ - deleteOne(): BulkOperationBase { - if (!this.bulkOperation.s.currentOp) { - this.bulkOperation.s.currentOp = {}; - } - - // Establish the update command - const document = { - q: this.bulkOperation.s.currentOp.selector, - limit: 1 - }; - - // Clear out current Op - this.bulkOperation.s.currentOp = undefined; - return this.bulkOperation.addToOperationsList(BatchType.DELETE, document); - } - - /** Add a delete many operation to the bulk operation */ - delete(): BulkOperationBase { + /** Specifies the collation for the query condition. */ + collation(collation: CollationOptions): this { if (!this.bulkOperation.s.currentOp) { this.bulkOperation.s.currentOp = {}; } - // Establish the update command - const document = { - q: this.bulkOperation.s.currentOp.selector, - limit: 0 - }; - - // Clear out current Op - this.bulkOperation.s.currentOp = undefined; - return this.bulkOperation.addToOperationsList(BatchType.DELETE, document); - } - - removeOne(): BulkOperationBase { - return this.deleteOne(); - } - - remove(): BulkOperationBase { - return this.delete(); + this.bulkOperation.s.currentOp.collation = collation; + return this; } } @@ -1136,32 +1072,45 @@ export abstract class BulkOperationBase { if ('replaceOne' in op || 'updateOne' in op || 'updateMany' in op) { if ('replaceOne' in op) { - const updateStatement = makeUpdateStatement(this.s.topology, op.replaceOne, false); + if ('q' in op.replaceOne) { + throw new TypeError('Raw operations are not allowed'); + } + const updateStatement = makeUpdateStatement( + op.replaceOne.filter, + op.replaceOne.replacement, + { ...op.replaceOne, multi: false } + ); if (hasAtomicOperators(updateStatement.u)) { throw new TypeError('Replacement document must not use atomic operators'); } - - return this.addToOperationsList( - BatchType.UPDATE, - makeUpdateStatement(this.s.topology, op.replaceOne, false) - ); + return this.addToOperationsList(BatchType.UPDATE, updateStatement); } if ('updateOne' in op) { - const updateStatement = makeUpdateStatement(this.s.topology, op.updateOne, false); + if ('q' in op.updateOne) { + throw new TypeError('Raw operations are not allowed'); + } + const updateStatement = makeUpdateStatement(op.updateOne.filter, op.updateOne.update, { + ...op.updateOne, + multi: false + }); if (!hasAtomicOperators(updateStatement.u)) { throw new TypeError('Update document requires atomic operators'); } - return this.addToOperationsList(BatchType.UPDATE, updateStatement); } if ('updateMany' in op) { - const updateStatement = makeUpdateStatement(this.s.topology, op.updateMany, true); + if ('q' in op.updateMany) { + throw new TypeError('Raw operations are not allowed'); + } + const updateStatement = makeUpdateStatement(op.updateMany.filter, op.updateMany.update, { + ...op.updateMany, + multi: true + }); if (!hasAtomicOperators(updateStatement.u)) { throw new TypeError('Update document requires atomic operators'); } - return this.addToOperationsList(BatchType.UPDATE, updateStatement); } } @@ -1169,28 +1118,34 @@ export abstract class BulkOperationBase { if ('removeOne' in op) { return this.addToOperationsList( BatchType.DELETE, - makeDeleteStatement(this.s.topology, op.removeOne, false) + makeDeleteStatement(op.removeOne.filter, { ...op.removeOne, limit: 1 }) ); } if ('removeMany' in op) { return this.addToOperationsList( BatchType.DELETE, - makeDeleteStatement(this.s.topology, op.removeMany, true) + makeDeleteStatement(op.removeMany.filter, { ...op.removeMany, limit: 0 }) ); } if ('deleteOne' in op) { + if ('q' in op.deleteOne) { + throw new TypeError('Raw operations are not allowed'); + } return this.addToOperationsList( BatchType.DELETE, - makeDeleteStatement(this.s.topology, op.deleteOne, false) + makeDeleteStatement(op.deleteOne.filter, { ...op.deleteOne, limit: 1 }) ); } if ('deleteMany' in op) { + if ('q' in op.deleteMany) { + throw new TypeError('Raw operations are not allowed'); + } return this.addToOperationsList( BatchType.DELETE, - makeDeleteStatement(this.s.topology, op.deleteMany, true) + makeDeleteStatement(op.deleteMany.filter, { ...op.deleteMany, limit: 0 }) ); } @@ -1328,94 +1283,6 @@ function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean { return false; } -function makeUpdateStatement( - topology: Topology, - model: ReplaceOneModel | UpdateOneModel | UpdateManyModel, - multi: boolean -): UpdateStatement { - // NOTE: legacy support for a raw statement, consider removing - if (isUpdateStatement(model)) { - if ('collation' in model && maxWireVersion(topology) < 5) { - throw new TypeError('Topology does not support collation'); - } - - return model as UpdateStatement; - } - - const statement: UpdateStatement = { - q: model.filter, - u: 'update' in model ? model.update : model.replacement, - multi, - upsert: 'upsert' in model ? model.upsert : false - }; - - if ('collation' in model) { - if (maxWireVersion(topology) < 5) { - throw new TypeError('Topology does not support collation'); - } - - statement.collation = model.collation; - } - - if ('arrayFilters' in model) { - // TODO: this check should be done at command construction against a connection, not a topology - if (maxWireVersion(topology) < 6) { - throw new TypeError('arrayFilters are only supported on MongoDB 3.6+'); - } - - statement.arrayFilters = model.arrayFilters; - } - - if ('hint' in model) { - statement.hint = model.hint; - } - - return statement; -} - -function isUpdateStatement(model: Document): model is UpdateStatement { - return 'q' in model; -} - -function makeDeleteStatement( - topology: Topology, - model: DeleteOneModel | DeleteManyModel, - multi: boolean -): DeleteStatement { - // NOTE: legacy support for a raw statement, consider removing - if (isDeleteStatement(model)) { - if ('collation' in model && maxWireVersion(topology) < 5) { - throw new TypeError('Topology does not support collation'); - } - - model.limit = multi ? 0 : 1; - return model as DeleteStatement; - } - - const statement: DeleteStatement = { - q: model.filter, - limit: multi ? 0 : 1 - }; - - if ('collation' in model) { - if (maxWireVersion(topology) < 5) { - throw new TypeError('Topology does not support collation'); - } - - statement.collation = model.collation; - } - - if ('hint' in model) { - statement.hint = model.hint; - } - - return statement; -} - -function isDeleteStatement(model: Document): model is DeleteStatement { - return 'q' in model; -} - function isInsertBatch(batch: Batch): boolean { return batch.batchType === BatchType.INSERT; } @@ -1427,3 +1294,10 @@ function isUpdateBatch(batch: Batch): batch is Batch { function isDeleteBatch(batch: Batch): batch is Batch { return batch.batchType === BatchType.DELETE; } + +function buildCurrentOp(bulkOp: BulkOperationBase): Document { + let { currentOp } = bulkOp.s; + bulkOp.s.currentOp = undefined; + if (!currentOp) currentOp = {}; + return currentOp; +} diff --git a/src/collection.ts b/src/collection.ts index d486ad8d44..8105b3281f 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -333,6 +333,8 @@ export class Collection { * ```js * { insertOne: { document: { a: 1 } } } * + * { insertMany: [{ g: 1 }, { g: 2 }]} + * * { updateOne: { filter: {a:2}, update: {$set: {a:2}}, upsert:true } } * * { updateMany: { filter: {a:2}, update: {$set: {a:2}}, upsert:true } } @@ -345,6 +347,7 @@ export class Collection { * * { replaceOne: { filter: {c:3}, replacement: {c:4}, upsert:true}} *``` + * Please note that raw operations are no longer accepted as of driver version 4.0. * * If documents passed in do not contain the **_id** field, * one will be added to each of the documents missing it by the driver, mutating the document. This behavior diff --git a/src/operations/delete.ts b/src/operations/delete.ts index ab1c3057e1..b709e67a78 100644 --- a/src/operations/delete.ts +++ b/src/operations/delete.ts @@ -1,6 +1,6 @@ import { defineAspects, Aspect, Hint } from './operation'; import { CommandOperation, CommandOperationOptions, CollationOptions } from './command'; -import { Callback, maxWireVersion, MongoDBNamespace } from '../utils'; +import { Callback, maxWireVersion, MongoDBNamespace, collationNotSupported } from '../utils'; import type { Document } from '../bson'; import type { Server } from '../sdam/server'; import type { Collection } from '../collection'; @@ -88,6 +88,12 @@ export class DeleteOperation extends CommandOperation { } } + const statementWithCollation = this.statements.find(statement => !!statement.collation); + if (statementWithCollation && collationNotSupported(server, statementWithCollation)) { + callback(new MongoError(`server ${server.name} does not support collation`)); + return; + } + super.executeCommand(server, session, command, callback); } } @@ -132,7 +138,7 @@ export class DeleteManyOperation extends DeleteOperation { } } -function makeDeleteStatement( +export function makeDeleteStatement( filter: Document, options: DeleteOptions & { limit?: number } ): DeleteStatement { diff --git a/src/operations/update.ts b/src/operations/update.ts index 2132259c52..197c934ab8 100644 --- a/src/operations/update.ts +++ b/src/operations/update.ts @@ -97,7 +97,11 @@ export class UpdateOperation extends CommandOperation { command.bypassDocumentValidation = options.bypassDocumentValidation; } - if (collationNotSupported(server, options)) { + const statementWithCollation = this.statements.find(statement => !!statement.collation); + if ( + collationNotSupported(server, options) || + (statementWithCollation && collationNotSupported(server, statementWithCollation)) + ) { callback(new MongoError(`server ${server.name} does not support collation`)); return; } @@ -115,6 +119,11 @@ export class UpdateOperation extends CommandOperation { return; } + if (this.statements.some(statement => !!statement.arrayFilters) && maxWireVersion(server) < 6) { + callback(new MongoError('arrayFilters are only supported on MongoDB 3.6+')); + return; + } + super.executeCommand(server, session, command, callback); } } @@ -247,7 +256,7 @@ export class ReplaceOneOperation extends UpdateOperation { } } -function makeUpdateStatement( +export function makeUpdateStatement( filter: Document, update: Document, options: UpdateOptions & { multi?: boolean } diff --git a/test/functional/apm.test.js b/test/functional/apm.test.js index f9d55c704c..59eb929284 100644 --- a/test/functional/apm.test.js +++ b/test/functional/apm.test.js @@ -483,9 +483,9 @@ describe('APM', function () { .collection('apm_test_3') .bulkWrite( [ - { insertOne: { a: 1 } }, - { updateOne: { q: { a: 2 }, u: { $set: { a: 2 } }, upsert: true } }, - { deleteOne: { q: { c: 1 } } } + { insertOne: { document: { a: 1 } } }, + { updateOne: { filter: { a: 2 }, update: { $set: { a: 2 } }, upsert: true } }, + { deleteOne: { filter: { c: 1 } } } ], { ordered: true } ) diff --git a/test/functional/bulk.test.js b/test/functional/bulk.test.js index 69c9315607..836d44d980 100644 --- a/test/functional/bulk.test.js +++ b/test/functional/bulk.test.js @@ -1,5 +1,11 @@ 'use strict'; -const { withClient, withClientV2, setupDatabase, ignoreNsNotFound } = require('./shared'); +const { + withClient, + withClientV2, + withMonitoredClient, + setupDatabase, + ignoreNsNotFound +} = require('./shared'); const test = require('./shared').assert; const { MongoError } = require('../../src/error'); const { Long } = require('../../src'); @@ -1841,4 +1847,67 @@ describe('Bulk', function () { }); }) ); + + it('should apply collation via FindOperators', { + metadata: { requires: { mongodb: '>= 3.4' } }, + test: withMonitoredClient(['update', 'delete'], function (client, events, done) { + const locales = ['fr', 'de', 'es']; + const bulk = client.db().collection('coll').initializeOrderedBulkOp(); + + // updates + bulk + .find({ b: 1 }) + .collation({ locale: locales[0] }) + .updateOne({ $set: { b: 2 } }); + bulk + .find({ b: 2 }) + .collation({ locale: locales[1] }) + .update({ $set: { b: 3 } }); + bulk.find({ b: 3 }).collation({ locale: locales[2] }).replaceOne({ b: 2 }); + + // deletes + bulk.find({ b: 2 }).collation({ locale: locales[0] }).removeOne(); + bulk.find({ b: 1 }).collation({ locale: locales[1] }).remove(); + + bulk.execute(err => { + expect(err).to.not.exist; + expect(events).to.be.an('array').with.length.at.least(1); + expect(events[0]).property('commandName').to.equal('update'); + const updateCommand = events[0].command; + expect(updateCommand).property('updates').to.be.an('array').with.length(3); + updateCommand.updates.forEach((statement, idx) => { + expect(statement).property('collation').to.eql({ locale: locales[idx] }); + }); + expect(events[1]).property('commandName').to.equal('delete'); + const deleteCommand = events[1].command; + expect(deleteCommand).property('deletes').to.be.an('array').with.length(2); + deleteCommand.deletes.forEach((statement, idx) => { + expect(statement).property('collation').to.eql({ locale: locales[idx] }); + }); + client.close(done); + }); + }) + }); + + it('should throw an error if raw operations are passed to bulkWrite', function () { + const client = this.configuration.newClient(); + return client.connect().then(() => { + this.defer(() => client.close()); + + const coll = client.db().collection('single_bulk_write_error'); + return coll + .bulkWrite([ + { updateOne: { q: { a: 2 }, u: { $set: { a: 2 } }, upsert: true } }, + { deleteOne: { q: { c: 1 } } } + ]) + .then( + () => { + throw new Error('expected a bulk error'); + }, + err => { + expect(err).to.match(/Raw operations are not allowed/); + } + ); + }); + }); }); diff --git a/test/functional/collations.test.js b/test/functional/collations.test.js index 99350cdf67..7df51d7255 100644 --- a/test/functional/collations.test.js +++ b/test/functional/collations.test.js @@ -536,13 +536,13 @@ describe('Collation', function () { [ { updateOne: { - q: { a: 2 }, - u: { $set: { a: 2 } }, + filter: { a: 2 }, + update: { $set: { a: 2 } }, upsert: true, collation: { caseLevel: true } } }, - { deleteOne: { q: { c: 1 } } } + { deleteOne: { filter: { c: 1 } } } ], { ordered: true } ) @@ -559,7 +559,7 @@ describe('Collation', function () { } }); - it('Successfully fail bulkWrite due to unsupported collation', { + it('Successfully fail bulkWrite due to unsupported collation in update', { metadata: { requires: { generators: true, topology: 'single' } }, test: function () { const configuration = this.configuration; @@ -588,13 +588,63 @@ describe('Collation', function () { [ { updateOne: { - q: { a: 2 }, - u: { $set: { a: 2 } }, + filter: { a: 2 }, + update: { $set: { a: 2 } }, upsert: true, collation: { caseLevel: true } } }, - { deleteOne: { q: { c: 1 } } } + { deleteOne: { filter: { c: 1 } } } + ], + { ordered: true } + ) + .then(() => { + throw new Error('should not succeed'); + }) + .catch(err => { + expect(err).to.exist; + expect(err.message).to.match(/does not support collation/); + }) + .then(() => client.close()); + }); + } + }); + + it('Successfully fail bulkWrite due to unsupported collation in delete', { + metadata: { requires: { generators: true, topology: 'single' } }, + test: function () { + const configuration = this.configuration; + const client = configuration.newClient(`mongodb://${testContext.server.uri()}/test`); + const primary = [Object.assign({}, mock.DEFAULT_ISMASTER, { maxWireVersion: 4 })]; + + testContext.server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(primary[0]); + } else if (doc.update) { + request.reply({ ok: 1 }); + } else if (doc.delete) { + request.reply({ ok: 1 }); + } else if (doc.endSessions) { + request.reply({ ok: 1 }); + } + }); + + return client.connect().then(() => { + const db = client.db(configuration.db); + + return db + .collection('test') + .bulkWrite( + [ + { + updateOne: { + filter: { a: 2 }, + update: { $set: { a: 2 } }, + upsert: true + } + }, + { deleteOne: { filter: { c: 1 }, collation: { caseLevel: true } } } ], { ordered: true } ) diff --git a/test/functional/crud_api.test.js b/test/functional/crud_api.test.js index 3e8847c19a..6c99c4faca 100644 --- a/test/functional/crud_api.test.js +++ b/test/functional/crud_api.test.js @@ -357,12 +357,12 @@ describe('CRUD API', function () { db.collection('t2_5').bulkWrite( [ - { insertOne: { a: 1 } }, + { insertOne: { document: { a: 1 } } }, { insertMany: [{ g: 1 }, { g: 2 }] }, - { updateOne: { q: { a: 2 }, u: { $set: { a: 2 } }, upsert: true } }, - { updateMany: { q: { a: 2 }, u: { $set: { a: 2 } }, upsert: true } }, - { deleteOne: { q: { c: 1 } } }, - { deleteMany: { q: { c: 1 } } } + { updateOne: { filter: { a: 2 }, update: { $set: { a: 2 } }, upsert: true } }, + { updateMany: { filter: { a: 2 }, update: { $set: { a: 2 } }, upsert: true } }, + { deleteOne: { filter: { c: 1 } } }, + { deleteMany: { filter: { c: 1 } } } ], { ordered: false, writeConcern: { w: 1 } }, function (err, r) { @@ -442,12 +442,12 @@ describe('CRUD API', function () { db.collection('t2_7').bulkWrite( [ - { insertOne: { a: 1 } }, + { insertOne: { document: { a: 1 } } }, { insertMany: [{ g: 1 }, { g: 2 }] }, - { updateOne: { q: { a: 2 }, u: { $set: { a: 2 } }, upsert: true } }, - { updateMany: { q: { a: 2 }, u: { $set: { a: 2 } }, upsert: true } }, - { deleteOne: { q: { c: 1 } } }, - { deleteMany: { q: { c: 1 } } } + { updateOne: { filter: { a: 2 }, update: { $set: { a: 2 } }, upsert: true } }, + { updateMany: { filter: { a: 2 }, update: { $set: { a: 2 } }, upsert: true } }, + { deleteOne: { filter: { c: 1 } } }, + { deleteMany: { filter: { c: 1 } } } ], { ordered: true, writeConcern: { w: 1 } }, function (err, r) {