Skip to content

Commit

Permalink
fix(NODE-5636): generate _ids using pkFactory in bulk write operations (
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson authored Mar 8, 2024
1 parent 2348548 commit fbb5059
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 92 deletions.
51 changes: 21 additions & 30 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, ObjectId, resolveBSONOptions } from '../bson';
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson';
import type { Collection } from '../collection';
import {
type AnyError,
Expand All @@ -12,6 +12,7 @@ import {
} from '../error';
import type { Filter, OneOrMore, OptionalId, UpdateFilter, WithoutId } from '../mongo_types';
import type { CollationOptions, CommandOperationOptions } from '../operations/command';
import { maybeAddIdToDocuments } from '../operations/common_functions';
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
Expand Down Expand Up @@ -917,7 +918,7 @@ export abstract class BulkOperationBase {
* Create a new OrderedBulkOperation or UnorderedBulkOperation instance
* @internal
*/
constructor(collection: Collection, options: BulkWriteOptions, isOrdered: boolean) {
constructor(private collection: Collection, options: BulkWriteOptions, isOrdered: boolean) {
// determine whether bulkOperation is ordered or unordered
this.isOrdered = isOrdered;

Expand Down Expand Up @@ -1032,9 +1033,9 @@ export abstract class BulkOperationBase {
* ```
*/
insert(document: Document): BulkOperationBase {
if (document._id == null && !shouldForceServerObjectId(this)) {
document._id = new ObjectId();
}
maybeAddIdToDocuments(this.collection, document, {
forceServerObjectId: this.shouldForceServerObjectId()
});

return this.addToOperationsList(BatchType.INSERT, document);
}
Expand Down Expand Up @@ -1093,21 +1094,16 @@ export abstract class BulkOperationBase {
throw new MongoInvalidArgumentError('Operation must be an object with an operation key');
}
if ('insertOne' in op) {
const forceServerObjectId = shouldForceServerObjectId(this);
if (op.insertOne && op.insertOne.document == null) {
// NOTE: provided for legacy support, but this is a malformed operation
if (forceServerObjectId !== true && (op.insertOne as Document)._id == null) {
(op.insertOne as Document)._id = new ObjectId();
}

return this.addToOperationsList(BatchType.INSERT, op.insertOne);
}
const forceServerObjectId = this.shouldForceServerObjectId();
const document =
op.insertOne && op.insertOne.document == null
? // TODO(NODE-6003): remove support for omitting the `documents` subdocument in bulk inserts
(op.insertOne as Document)
: op.insertOne.document;

if (forceServerObjectId !== true && op.insertOne.document._id == null) {
op.insertOne.document._id = new ObjectId();
}
maybeAddIdToDocuments(this.collection, document, { forceServerObjectId });

return this.addToOperationsList(BatchType.INSERT, op.insertOne.document);
return this.addToOperationsList(BatchType.INSERT, document);
}

if ('replaceOne' in op || 'updateOne' in op || 'updateMany' in op) {
Expand Down Expand Up @@ -1268,6 +1264,13 @@ export abstract class BulkOperationBase {
batchType: BatchType,
document: Document | UpdateStatement | DeleteStatement
): this;

private shouldForceServerObjectId(): boolean {
return (
this.s.options.forceServerObjectId === true ||
this.s.collection.s.db.options?.forceServerObjectId === true
);
}
}

Object.defineProperty(BulkOperationBase.prototype, 'length', {
Expand All @@ -1277,18 +1280,6 @@ Object.defineProperty(BulkOperationBase.prototype, 'length', {
}
});

function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {
if (typeof bulkOperation.s.options.forceServerObjectId === 'boolean') {
return bulkOperation.s.options.forceServerObjectId;
}

if (typeof bulkOperation.s.collection.s.db.options?.forceServerObjectId === 'boolean') {
return bulkOperation.s.collection.s.db.options?.forceServerObjectId;
}

return false;
}

function isInsertBatch(batch: Batch): boolean {
return batch.batchType === BatchType.INSERT;
}
Expand Down
2 changes: 1 addition & 1 deletion src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export interface Auth {

/** @public */
export interface PkFactory {
createPk(): any; // TODO: when js-bson is typed, function should return some BSON type
createPk(): any;
}

/** @public */
Expand Down
21 changes: 16 additions & 5 deletions src/operations/common_functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,37 @@ export async function indexInformation(
return info;
}

export function prepareDocs(
export function maybeAddIdToDocuments(
coll: Collection,
docs: Document[],
options: { forceServerObjectId?: boolean }
): Document[] {
): Document[];
export function maybeAddIdToDocuments(
coll: Collection,
docs: Document,
options: { forceServerObjectId?: boolean }
): Document;
export function maybeAddIdToDocuments(
coll: Collection,
docOrDocs: Document[] | Document,
options: { forceServerObjectId?: boolean }
): Document[] | Document {
const forceServerObjectId =
typeof options.forceServerObjectId === 'boolean'
? options.forceServerObjectId
: coll.s.db.options?.forceServerObjectId;

// no need to modify the docs if server sets the ObjectId
if (forceServerObjectId === true) {
return docs;
return docOrDocs;
}

return docs.map(doc => {
const transform = (doc: Document): Document => {
if (doc._id == null) {
doc._id = coll.s.pkFactory.createPk();
}

return doc;
});
};
return Array.isArray(docOrDocs) ? docOrDocs.map(transform) : transform(docOrDocs);
}
8 changes: 5 additions & 3 deletions src/operations/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type { MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { BulkWriteOperation } from './bulk_write';
import { CommandOperation, type CommandOperationOptions } from './command';
import { prepareDocs } from './common_functions';
import { maybeAddIdToDocuments } from './common_functions';
import { AbstractOperation, Aspect, defineAspects } from './operation';

/** @internal */
Expand Down Expand Up @@ -69,7 +69,7 @@ export interface InsertOneResult<TSchema = Document> {

export class InsertOneOperation extends InsertOperation {
constructor(collection: Collection, doc: Document, options: InsertOneOptions) {
super(collection.s.namespace, prepareDocs(collection, [doc], options), options);
super(collection.s.namespace, maybeAddIdToDocuments(collection, [doc], options), options);
}

override async execute(
Expand Down Expand Up @@ -131,7 +131,9 @@ export class InsertManyOperation extends AbstractOperation<InsertManyResult> {
const writeConcern = WriteConcern.fromOptions(options);
const bulkWriteOperation = new BulkWriteOperation(
coll,
prepareDocs(coll, this.docs, options).map(document => ({ insertOne: { document } })),
this.docs.map(document => ({
insertOne: { document }
})),
options
);

Expand Down
72 changes: 71 additions & 1 deletion test/integration/crud/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as crypto from 'crypto';

import {
type Collection,
Double,
Long,
MongoBatchReExecutionError,
MongoBulkWriteError,
Expand Down Expand Up @@ -65,16 +66,85 @@ describe('Bulk', function () {
context('when called with a valid operation', function () {
it('should not throw a MongoInvalidArgument error', async function () {
try {
client.db('test').collection('test').initializeUnorderedBulkOp().raw({ insertOne: {} });
client
.db('test')
.collection('test')
.initializeUnorderedBulkOp()
.raw({ insertOne: { document: {} } });
} catch (error) {
expect(error).not.to.exist;
}
});
});

it('supports the legacy specification (no nested document field)', async function () {
await client
.db('test')
.collection('test')
.initializeUnorderedBulkOp()
// @ts-expect-error Not allowed in TS, but allowed for legacy compat
.raw({ insertOne: { name: 'john doe' } })
.execute();
const result = await client.db('test').collection('test').findOne({ name: 'john doe' });
expect(result).to.exist;
});
});
});

describe('Collection', function () {
describe('when a pkFactory is set on the client', function () {
let client: MongoClient;
const pkFactory = {
count: 0,
createPk: function () {
return new Double(this.count++);
}
};
let collection: Collection;

beforeEach(async function () {
client = this.configuration.newClient({}, { pkFactory, promoteValues: false });
collection = client.db('integration').collection('pk_factory_tests');
await collection.deleteMany({});
});

afterEach(() => client.close());

it('insertMany() generates _ids using the pkFactory', async function () {
await collection.insertMany([{ name: 'john doe' }]);
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});

it('bulkWrite() generates _ids using the pkFactory', async function () {
await collection.bulkWrite([{ insertOne: { document: { name: 'john doe' } } }]);
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});

it('ordered bulk operations generate _ids using pkFactory', async function () {
await collection.initializeOrderedBulkOp().insert({ name: 'john doe' }).execute();
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});

it('unordered bulk operations generate _ids using pkFactory', async function () {
await collection.initializeUnorderedBulkOp().insert({ name: 'john doe' }).execute();
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});

it('bulkOperation.raw() with the legacy syntax (no nested document field) generates _ids using pkFactory', async function () {
await collection
.initializeOrderedBulkOp()
// @ts-expect-error Not allowed by TS, but still permitted.
.raw({ insertOne: { name: 'john doe' } })
.execute();
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});
});

describe('#insertMany()', function () {
context('when passed an invalid docs argument', function () {
it('should throw a MongoInvalidArgument error', async function () {
Expand Down
29 changes: 28 additions & 1 deletion test/integration/crud/insert.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,31 @@ describe('crud - insert', function () {
await client.close();
});

describe('when a pkFactory is set on the client', function () {
let client;
const pkFactory = {
count: 0,
createPk: function () {
return new Double(this.count++);
}
};
let collection;

beforeEach(async function () {
client = this.configuration.newClient({}, { pkFactory, promoteValues: false });
collection = client.db('integration').collection('pk_factory_tests');
await collection.deleteMany({});
});

afterEach(() => client.close());

it('insertOne() generates _ids using the pkFactory', async function () {
await collection.insertOne({ name: 'john doe' });
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});
});

it('Should correctly execute Collection.prototype.insertOne', function (done) {
const configuration = this.configuration;
let url = configuration.url();
Expand Down Expand Up @@ -135,6 +160,7 @@ describe('crud - insert', function () {
it('insertMany returns the insertedIds and we can look up the documents', async function () {
const db = client.db();
const collection = db.collection('test_multiple_insert');
await collection.deleteMany({});
const docs = [{ a: 1 }, { a: 2 }];

const r = await collection.insertMany(docs);
Expand Down Expand Up @@ -839,6 +865,7 @@ describe('crud - insert', function () {

const db = client.db();
const collection = db.collection('Should_correctly_insert_object_with_timestamps');
await collection.deleteMany({});

const { insertedId } = await collection.insertOne(doc);
expect(insertedId.equals(doc._id)).to.be.true;
Expand Down Expand Up @@ -1700,7 +1727,7 @@ describe('crud - insert', function () {
try {
db.collection(k.toString());
test.fail(false);
} catch (err) { } // eslint-disable-line
} catch (err) {} // eslint-disable-line

client.close(done);
});
Expand Down
51 changes: 0 additions & 51 deletions test/integration/crud/pk_factory.test.js

This file was deleted.

0 comments on commit fbb5059

Please sign in to comment.