From f548bd7da7a551b1b2bdf59222564455a3f7a2ca Mon Sep 17 00:00:00 2001 From: Dan Aprahamian Date: Wed, 10 Apr 2019 10:08:14 -0400 Subject: [PATCH] fix(BulkOp): run unordered bulk ops in serial UnorderedBulkWrites that use either retryableWrites or transactions and require more than one batch have a chance to fail if the batches arrive at the server out of order. We will now conform to other drivers by executing batches serially. Fixes NODE-1934 --- lib/bulk/unordered.js | 73 ++++++++++++++----------------------------- 1 file changed, 24 insertions(+), 49 deletions(-) diff --git a/lib/bulk/unordered.js b/lib/bulk/unordered.js index c3eeceea5c..0f19822579 100644 --- a/lib/bulk/unordered.js +++ b/lib/bulk/unordered.js @@ -139,19 +139,25 @@ class UnorderedBulkOperation extends BulkOperationBase { options = ret.options; callback = ret.callback; - return executeOperation(this.s.topology, executeBatches, [this, options, callback]); + return executeOperation(this.s.topology, executeCommands, [this, options, callback]); } } /** - * Execute the command + * Execute next write command in a chain * - * @param {UnorderedBulkOperation} bulkOperation - * @param {object} batch + * @param {OrderedBulkOperation} bulkOperation * @param {object} options * @param {function} callback */ -function executeBatch(bulkOperation, batch, options, callback) { +function executeCommands(bulkOperation, options, callback) { + if (bulkOperation.s.batches.length === 0) { + return handleCallback(callback, null, new BulkWriteResult(bulkOperation.s.bulkResult)); + } + + // Ordered execution of the command + const batch = bulkOperation.s.batches.shift(); + function resultHandler(err, result) { // Error is a driver related error not a bulk op error, terminate if (((err && err.driver) || (err && err.message)) && !(err instanceof MongoWriteConcernError)) { @@ -161,54 +167,23 @@ function executeBatch(bulkOperation, batch, options, callback) { // If we have and error if (err) err.ok = 0; if (err instanceof MongoWriteConcernError) { - return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, false, err, callback); + return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, true, err, callback); } - handleCallback( - callback, - null, - mergeBatchResults(false, batch, bulkOperation.s.bulkResult, err, result) - ); - } - bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback); -} + // Merge the results together + const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult); + const mergeResult = mergeBatchResults(true, batch, bulkOperation.s.bulkResult, err, result); + if (mergeResult != null) { + return handleCallback(callback, null, writeResult); + } -/** - * Execute all the commands - * - * @param {UnorderedBulkOperation} bulkOperation - * @param {object} options - * @param {function} callback - */ -function executeBatches(bulkOperation, options, callback) { - let numberOfCommandsToExecute = bulkOperation.s.batches.length; - let hasErrored = false; - // Execute over all the batches - for (let i = 0; i < bulkOperation.s.batches.length; i++) { - executeBatch(bulkOperation, bulkOperation.s.batches[i], options, function(err) { - if (hasErrored) { - return; - } - - if (err) { - hasErrored = true; - return handleCallback(callback, err); - } - // Count down the number of commands left to execute - numberOfCommandsToExecute = numberOfCommandsToExecute - 1; - - // Execute - if (numberOfCommandsToExecute === 0) { - // Driver level error - if (err) return handleCallback(callback, err); - - const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult); - if (bulkOperation.handleWriteError(callback, writeResult)) return; - - return handleCallback(callback, null, writeResult); - } - }); + if (bulkOperation.handleWriteError(callback, writeResult)) return; + + // Execute the next command in line + executeCommands(bulkOperation, options, callback); } + + bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback); } /**