diff --git a/mongo/client.go b/mongo/client.go index 41112fd6fc..039583372e 100644 --- a/mongo/client.go +++ b/mongo/client.go @@ -877,6 +877,9 @@ func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels, } wc := c.writeConcern + if bwo.WriteConcern != nil { + wc = bwo.WriteConcern + } if sess.TransactionRunning() { wc = nil } @@ -905,7 +908,7 @@ func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels, op.errorsOnly = true } err = op.execute(ctx) - return op.result, replaceErrors(err) + return &op.result, replaceErrors(err) } // newLogger will use the LoggerOptions to create an internal logger and publish diff --git a/mongo/client_bulk_write.go b/mongo/client_bulk_write.go index da9d1422a0..ba79069b42 100644 --- a/mongo/client_bulk_write.go +++ b/mongo/client_bulk_write.go @@ -10,6 +10,7 @@ import ( "context" "strconv" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsoncodec" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo/description" @@ -35,27 +36,11 @@ type clientBulkWrite struct { selector description.ServerSelector writeConcern *writeconcern.WriteConcern - result *ClientBulkWriteResult + result ClientBulkWriteResult } func (bw *clientBulkWrite) execute(ctx context.Context) error { - batches := &driver.Batches{ - Ordered: bw.ordered, - } - op := operation.NewCommandFn(bw.command).Batches(batches). - Session(bw.session).WriteConcern(bw.writeConcern).CommandMonitor(bw.client.monitor). - ServerSelector(bw.selector).ClusterClock(bw.client.clock). - Database("admin"). - Deployment(bw.client.deployment).Crypt(bw.client.cryptFLE). - ServerAPI(bw.client.serverAPI).Timeout(bw.client.timeout). - Logger(bw.client.logger).Authenticator(bw.client.authenticator) - err := op.Execute(ctx) - bw.result = newClientBulkWriteResult(op.Result()) - return err -} - -func (bw *clientBulkWrite) command(dst []byte, desc description.SelectedServer) ([]byte, error) { - dst = bsoncore.AppendInt32Element(dst, "bulkWrite", 1) + docs := make([]bsoncore.Document, len(bw.models)) nsMap := make(map[string]int) var nsList []string getNsIndex := func(namespace string) int { @@ -68,22 +53,31 @@ func (bw *clientBulkWrite) command(dst []byte, desc description.SelectedServer) return nsIdx } } - var err error - var idx int32 - idx, dst = bsoncore.AppendArrayElementStart(dst, "ops") + resMap := make([]interface{}, len(bw.models)) + insIdMap := make(map[int]interface{}) for i, v := range bw.models { var doc bsoncore.Document + var err error var nsIdx int switch model := v.(type) { - case ClientInsertOneModel: + case *ClientInsertOneModel: nsIdx = getNsIndex(model.Namespace) - doc, err = createClientInsertDoc(int32(nsIdx), model.Document, bw.client.bsonOpts, bw.client.registry) + if bw.result.InsertResults == nil { + bw.result.InsertResults = make(map[int64]ClientInsertResult) + } + resMap[i] = bw.result.InsertResults + var id interface{} + id, doc, err = createClientInsertDoc(int32(nsIdx), model.Document, bw.client.bsonOpts, bw.client.registry) if err != nil { break } - doc, _, err = ensureID(doc, primitive.NilObjectID, bw.client.bsonOpts, bw.client.registry) - case ClientUpdateOneModel: + insIdMap[i] = id + case *ClientUpdateOneModel: nsIdx = getNsIndex(model.Namespace) + if bw.result.UpdateResults == nil { + bw.result.UpdateResults = make(map[int64]ClientUpdateResult) + } + resMap[i] = bw.result.UpdateResults doc, err = createClientUpdateDoc( int32(nsIdx), model.Filter, @@ -96,8 +90,12 @@ func (bw *clientBulkWrite) command(dst []byte, desc description.SelectedServer) true, bw.client.bsonOpts, bw.client.registry) - case ClientUpdateManyModel: + case *ClientUpdateManyModel: nsIdx = getNsIndex(model.Namespace) + if bw.result.UpdateResults == nil { + bw.result.UpdateResults = make(map[int64]ClientUpdateResult) + } + resMap[i] = bw.result.UpdateResults doc, err = createClientUpdateDoc( int32(nsIdx), model.Filter, @@ -110,8 +108,12 @@ func (bw *clientBulkWrite) command(dst []byte, desc description.SelectedServer) true, bw.client.bsonOpts, bw.client.registry) - case ClientReplaceOneModel: + case *ClientReplaceOneModel: nsIdx = getNsIndex(model.Namespace) + if bw.result.UpdateResults == nil { + bw.result.UpdateResults = make(map[int64]ClientUpdateResult) + } + resMap[i] = bw.result.UpdateResults doc, err = createClientUpdateDoc( int32(nsIdx), model.Filter, @@ -124,76 +126,139 @@ func (bw *clientBulkWrite) command(dst []byte, desc description.SelectedServer) false, bw.client.bsonOpts, bw.client.registry) - case ClientDeleteOneModel: + case *ClientDeleteOneModel: nsIdx = getNsIndex(model.Namespace) + if bw.result.DeleteResults == nil { + bw.result.DeleteResults = make(map[int64]ClientDeleteResult) + } + resMap[i] = bw.result.DeleteResults doc, err = createClientDeleteDoc( int32(nsIdx), model.Filter, model.Collation, model.Hint, - true, + false, bw.client.bsonOpts, bw.client.registry) - case ClientDeleteManyModel: + case *ClientDeleteManyModel: nsIdx = getNsIndex(model.Namespace) + if bw.result.DeleteResults == nil { + bw.result.DeleteResults = make(map[int64]ClientDeleteResult) + } + resMap[i] = bw.result.DeleteResults doc, err = createClientDeleteDoc( int32(nsIdx), model.Filter, model.Collation, model.Hint, - false, + true, bw.client.bsonOpts, bw.client.registry) } if err != nil { - return nil, err + return err } - dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(i), doc) + docs[i] = doc } - dst, err = bsoncore.AppendArrayEnd(dst, idx) + batches := &driver.Batches{ + Identifier: "ops", + Documents: docs, + Ordered: bw.ordered, + } + op := operation.NewCommandFn(bw.newCommand(nsList)).Batches(batches). + Session(bw.session).WriteConcern(bw.writeConcern).CommandMonitor(bw.client.monitor). + ServerSelector(bw.selector).ClusterClock(bw.client.clock). + Database("admin"). + Deployment(bw.client.deployment).Crypt(bw.client.cryptFLE). + ServerAPI(bw.client.serverAPI).Timeout(bw.client.timeout). + Logger(bw.client.logger).Authenticator(bw.client.authenticator) + err := op.Execute(ctx) if err != nil { - return nil, err + return err } - - idx, dst = bsoncore.AppendArrayElementStart(dst, "nsInfo") - for i, v := range nsList { - doc, err := marshal(struct { - Namespace string `bson:"ns"` - }{v}, bw.client.bsonOpts, bw.client.registry) - if err != nil { - return nil, err + var res struct { + Cursor struct { + FirstBatch []bson.Raw } - dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(i), doc) + NDeleted int32 + NInserted int32 + NMatched int32 + NModified int32 + NUpserted int32 } - dst, err = bsoncore.AppendArrayEnd(dst, idx) + rawRes := op.Result() + err = bson.Unmarshal(rawRes, &res) if err != nil { - return nil, err - } - - if bw.errorsOnly { - dst = bsoncore.AppendBooleanElement(dst, "errorsOnly", bw.errorsOnly) - } - if bw.bypassDocumentValidation != nil && (desc.WireVersion != nil && desc.WireVersion.Includes(4)) { - dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *bw.bypassDocumentValidation) + return err } - if bw.comment != nil { - comment, err := marshalValue(bw.comment, bw.client.bsonOpts, bw.client.registry) - if err != nil { - return nil, err + bw.result.DeletedCount = int64(res.NDeleted) + bw.result.InsertedCount = int64(res.NInserted) + bw.result.MatchedCount = int64(res.NMatched) + bw.result.ModifiedCount = int64(res.NModified) + bw.result.UpsertedCount = int64(res.NUpserted) + for i, cur := range res.Cursor.FirstBatch { + switch res := resMap[i].(type) { + case map[int64]ClientDeleteResult: + if err = appendDeleteResult(cur, res); err != nil { + return err + } + case map[int64]ClientInsertResult: + if err = appendInsertResult(cur, res, insIdMap); err != nil { + return err + } + case map[int64]ClientUpdateResult: + if err = appendUpdateResult(cur, res); err != nil { + return err + } } - dst = bsoncore.AppendValueElement(dst, "comment", comment) - } - if bw.ordered != nil { - dst = bsoncore.AppendBooleanElement(dst, "ordered", *bw.ordered) } - if bw.let != nil { - let, err := marshal(bw.let, bw.client.bsonOpts, bw.client.registry) + return nil +} + +func (bw *clientBulkWrite) newCommand(nsList []string) func([]byte, description.SelectedServer) ([]byte, error) { + return func(dst []byte, desc description.SelectedServer) ([]byte, error) { + dst = bsoncore.AppendInt32Element(dst, "bulkWrite", 1) + + var idx int32 + var err error + idx, dst = bsoncore.AppendArrayElementStart(dst, "nsInfo") + for i, v := range nsList { + doc, err := marshal(struct { + Namespace string `bson:"ns"` + }{v}, bw.client.bsonOpts, bw.client.registry) + if err != nil { + return nil, err + } + dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(i), doc) + } + dst, err = bsoncore.AppendArrayEnd(dst, idx) if err != nil { return nil, err } - dst = bsoncore.AppendDocumentElement(dst, "let", let) + + dst = bsoncore.AppendBooleanElement(dst, "errorsOnly", bw.errorsOnly) + if bw.bypassDocumentValidation != nil && (desc.WireVersion != nil && desc.WireVersion.Includes(4)) { + dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *bw.bypassDocumentValidation) + } + if bw.comment != nil { + comment, err := marshalValue(bw.comment, bw.client.bsonOpts, bw.client.registry) + if err != nil { + return nil, err + } + dst = bsoncore.AppendValueElement(dst, "comment", comment) + } + if bw.ordered != nil { + dst = bsoncore.AppendBooleanElement(dst, "ordered", *bw.ordered) + } + if bw.let != nil { + let, err := marshal(bw.let, bw.client.bsonOpts, bw.client.registry) + if err != nil { + return nil, err + } + dst = bsoncore.AppendDocumentElement(dst, "let", let) + } + return dst, nil } - return dst, nil } func createClientInsertDoc( @@ -201,17 +266,22 @@ func createClientInsertDoc( document interface{}, bsonOpts *options.BSONOptions, registry *bsoncodec.Registry, -) (bsoncore.Document, error) { - uidx, insertDoc := bsoncore.AppendDocumentStart(nil) +) (interface{}, bsoncore.Document, error) { + uidx, doc := bsoncore.AppendDocumentStart(nil) - insertDoc = bsoncore.AppendInt32Element(insertDoc, "update", namespace) + doc = bsoncore.AppendInt32Element(doc, "insert", namespace) f, err := marshal(document, bsonOpts, registry) if err != nil { - return nil, err + return nil, nil, err } - insertDoc = bsoncore.AppendDocumentElement(insertDoc, "document", f) - - return bsoncore.AppendDocumentEnd(insertDoc, uidx) + var id interface{} + f, id, err = ensureID(f, primitive.NilObjectID, bsonOpts, registry) + if err != nil { + return nil, nil, err + } + doc = bsoncore.AppendDocumentElement(doc, "document", f) + doc, err = bsoncore.AppendDocumentEnd(doc, uidx) + return id, doc, err } func createClientUpdateDoc( @@ -242,10 +312,7 @@ func createClientUpdateDoc( return nil, err } doc = bsoncore.AppendValueElement(doc, "updateMods", u) - - if multi { - doc = bsoncore.AppendBooleanElement(doc, "multi", multi) - } + doc = bsoncore.AppendBooleanElement(doc, "multi", multi) if arrayFilters != nil { reg := registry @@ -299,10 +366,7 @@ func createClientDeleteDoc( return nil, err } doc = bsoncore.AppendDocumentElement(doc, "filter", f) - - if multi { - doc = bsoncore.AppendBooleanElement(doc, "multi", multi) - } + doc = bsoncore.AppendBooleanElement(doc, "multi", multi) if collation != nil { doc = bsoncore.AppendDocumentElement(doc, "collation", collation.ToDocument()) @@ -319,3 +383,46 @@ func createClientDeleteDoc( } return bsoncore.AppendDocumentEnd(doc, didx) } + +func appendDeleteResult(cur bson.Raw, m map[int64]ClientDeleteResult) error { + var res struct { + Idx int32 + N int32 + } + if err := bson.Unmarshal(cur, &res); err != nil { + return err + } + m[int64(res.Idx)] = ClientDeleteResult{int64(res.N)} + return nil +} + +func appendInsertResult(cur bson.Raw, m map[int64]ClientInsertResult, insIdMap map[int]interface{}) error { + var res struct { + Idx int32 + } + if err := bson.Unmarshal(cur, &res); err != nil { + return err + } + m[int64(res.Idx)] = ClientInsertResult{insIdMap[int(res.Idx)]} + return nil +} + +func appendUpdateResult(cur bson.Raw, m map[int64]ClientUpdateResult) error { + var res struct { + Idx int32 + N int32 + NModified int32 + Upserted struct { + Id interface{} `bson:"_id"` + } + } + if err := bson.Unmarshal(cur, &res); err != nil { + return err + } + m[int64(res.Idx)] = ClientUpdateResult{ + MatchedCount: int64(res.N), + ModifiedCount: int64(res.NModified), + UpsertedID: res.Upserted.Id, + } + return nil +} diff --git a/mongo/client_bulk_write_models.go b/mongo/client_bulk_write_models.go index b389019bb2..c14e37451f 100644 --- a/mongo/client_bulk_write_models.go +++ b/mongo/client_bulk_write_models.go @@ -15,7 +15,7 @@ type ClientWriteModels struct { models []interface{} } -func (m *ClientWriteModels) AppendInsertOne(models ...ClientInsertOneModel) *ClientWriteModels { +func (m *ClientWriteModels) AppendInsertOne(models ...*ClientInsertOneModel) *ClientWriteModels { if m == nil { m = &ClientWriteModels{} } @@ -25,7 +25,7 @@ func (m *ClientWriteModels) AppendInsertOne(models ...ClientInsertOneModel) *Cli return m } -func (m *ClientWriteModels) AppendUpdateOne(models ...ClientUpdateOneModel) *ClientWriteModels { +func (m *ClientWriteModels) AppendUpdateOne(models ...*ClientUpdateOneModel) *ClientWriteModels { if m == nil { m = &ClientWriteModels{} } @@ -35,7 +35,7 @@ func (m *ClientWriteModels) AppendUpdateOne(models ...ClientUpdateOneModel) *Cli return m } -func (m *ClientWriteModels) AppendUpdateMany(models ...ClientUpdateManyModel) *ClientWriteModels { +func (m *ClientWriteModels) AppendUpdateMany(models ...*ClientUpdateManyModel) *ClientWriteModels { if m == nil { m = &ClientWriteModels{} } @@ -45,7 +45,7 @@ func (m *ClientWriteModels) AppendUpdateMany(models ...ClientUpdateManyModel) *C return m } -func (m *ClientWriteModels) AppendReplaceOne(models ...ClientReplaceOneModel) *ClientWriteModels { +func (m *ClientWriteModels) AppendReplaceOne(models ...*ClientReplaceOneModel) *ClientWriteModels { if m == nil { m = &ClientWriteModels{} } @@ -55,7 +55,7 @@ func (m *ClientWriteModels) AppendReplaceOne(models ...ClientReplaceOneModel) *C return m } -func (m *ClientWriteModels) AppendDeleteOne(models ...ClientDeleteOneModel) *ClientWriteModels { +func (m *ClientWriteModels) AppendDeleteOne(models ...*ClientDeleteOneModel) *ClientWriteModels { if m == nil { m = &ClientWriteModels{} } @@ -65,7 +65,7 @@ func (m *ClientWriteModels) AppendDeleteOne(models ...ClientDeleteOneModel) *Cli return m } -func (m *ClientWriteModels) AppendDeleteMany(models ...ClientDeleteManyModel) *ClientWriteModels { +func (m *ClientWriteModels) AppendDeleteMany(models ...*ClientDeleteManyModel) *ClientWriteModels { if m == nil { m = &ClientWriteModels{} } diff --git a/mongo/errors.go b/mongo/errors.go index 2732a968ae..e20af03e95 100644 --- a/mongo/errors.go +++ b/mongo/errors.go @@ -643,7 +643,7 @@ func (bwe ClientBulkWriteException) Error() string { causes = append(causes, "write errors: "+joinBatchErrors(errs)) } if bwe.PartialResult != nil { - causes = append(causes, "top level error: "+bwe.PartialResult.String()) + causes = append(causes, fmt.Sprintf("result: %v", bwe.PartialResult)) } message := "bulk write exception: " diff --git a/mongo/integration/unified/client_operation_execution.go b/mongo/integration/unified/client_operation_execution.go index 5a69e77b1e..0a4c381638 100644 --- a/mongo/integration/unified/client_operation_execution.go +++ b/mongo/integration/unified/client_operation_execution.go @@ -9,8 +9,10 @@ package unified import ( "context" "fmt" + "strconv" "time" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/internal/bsonutil" "go.mongodb.org/mongo-driver/mongo" @@ -167,3 +169,323 @@ func executeListDatabases(ctx context.Context, operation *operation, nameOnly bo Build() return newDocumentResult(raw, nil), nil } + +func executeClientBulkWrite(ctx context.Context, operation *operation) (*operationResult, error) { + client, err := entities(ctx).client(operation.Object) + if err != nil { + return nil, err + } + + wirteModels := &mongo.ClientWriteModels{} + opts := options.ClientBulkWrite() + + elems, err := operation.Arguments.Elements() + if err != nil { + return nil, err + } + for _, elem := range elems { + key := elem.Key() + val := elem.Value() + + switch key { + case "models": + models, err := val.Array().Values() + if err != nil { + return nil, err + } + for _, m := range models { + model := m.Document().Index(0) + err = appendClientBulkWriteModel(model.Key(), model.Value().Document(), wirteModels) + if err != nil { + return nil, err + } + } + case "bypassDocumentValidation": + opts.SetBypassDocumentValidation(val.Boolean()) + case "comment": + opts.SetComment(val) + case "let": + opts.SetLet(val.Document()) + case "ordered": + opts.SetOrdered(val.Boolean()) + case "verboseResults": + opts.SetVerboseResults(val.Boolean()) + case "writeConcern": + var wc writeConcern + bson.Unmarshal(val.Value, &wc) + c, err := wc.toWriteConcernOption() + if err != nil { + return nil, err + } + opts.SetWriteConcern(c) + default: + return nil, fmt.Errorf("unrecognized bulkWrite option %q", key) + } + } + + res, err := client.BulkWrite(ctx, wirteModels, opts) + raw := emptyCoreDocument + if res != nil { + rawBuilder := bsoncore.NewDocumentBuilder(). + AppendInt64("deletedCount", res.DeletedCount). + AppendInt64("insertedCount", res.InsertedCount). + AppendInt64("matchedCount", res.MatchedCount). + AppendInt64("modifiedCount", res.ModifiedCount). + AppendInt64("upsertedCount", res.UpsertedCount) + + var resBuilder *bsoncore.DocumentBuilder + + resBuilder = bsoncore.NewDocumentBuilder() + for k, v := range res.DeleteResults { + resBuilder.AppendDocument(strconv.Itoa(int(k)), + bsoncore.NewDocumentBuilder(). + AppendInt64("deletedCount", v.DeletedCount). + Build(), + ) + } + rawBuilder.AppendDocument("deleteResults", resBuilder.Build()) + + resBuilder = bsoncore.NewDocumentBuilder() + for k, v := range res.InsertResults { + t, d, err := bson.MarshalValue(v.InsertedID) + if err != nil { + return nil, err + } + resBuilder.AppendDocument(strconv.Itoa(int(k)), + bsoncore.NewDocumentBuilder(). + AppendValue("insertedId", bsoncore.Value{Type: t, Data: d}). + Build(), + ) + } + rawBuilder.AppendDocument("insertResults", resBuilder.Build()) + + resBuilder = bsoncore.NewDocumentBuilder() + for k, v := range res.UpdateResults { + b := bsoncore.NewDocumentBuilder(). + AppendInt64("matchedCount", v.MatchedCount). + AppendInt64("modifiedCount", v.ModifiedCount) + if v.UpsertedID != nil { + t, d, err := bson.MarshalValue(v.UpsertedID) + if err != nil { + return nil, err + } + b.AppendValue("upsertedId", bsoncore.Value{Type: t, Data: d}) + } + resBuilder.AppendDocument(strconv.Itoa(int(k)), b.Build()) + } + rawBuilder.AppendDocument("updateResults", resBuilder.Build()) + + raw = rawBuilder.Build() + } + return newDocumentResult(raw, err), nil +} + +func appendClientBulkWriteModel(key string, value bson.Raw, model *mongo.ClientWriteModels) error { + switch key { + case "insertOne": + m, err := createClientInsertOneModel(value) + if err != nil { + return err + } + model.AppendInsertOne(m) + case "updateOne": + m, err := createClientUpdateOneModel(value) + if err != nil { + return err + } + model.AppendUpdateOne(m) + case "updateMany": + m, err := createClientUpdateManyModel(value) + if err != nil { + return err + } + model.AppendUpdateMany(m) + case "replaceOne": + m, err := createClientReplaceOneModel(value) + if err != nil { + return err + } + model.AppendReplaceOne(m) + case "deleteOne": + m, err := createClientDeleteOneModel(value) + if err != nil { + return err + } + model.AppendDeleteOne(m) + case "deleteMany": + m, err := createClientDeleteManyModel(value) + if err != nil { + return err + } + model.AppendDeleteMany(m) + } + return nil +} + +func createClientInsertOneModel(value bson.Raw) (*mongo.ClientInsertOneModel, error) { + var v struct { + Namespace string + Document bson.Raw + } + err := bson.Unmarshal(value, &v) + if err != nil { + return nil, err + } + return &mongo.ClientInsertOneModel{ + Namespace: v.Namespace, + Document: v.Document, + }, nil +} + +func createClientUpdateOneModel(value bson.Raw) (*mongo.ClientUpdateOneModel, error) { + var v struct { + Namespace string + Filter bson.Raw + Update interface{} + ArrayFilters []interface{} + Collation *options.Collation + Hint *bson.RawValue + Upsert *bool + } + err := bson.Unmarshal(value, &v) + if err != nil { + return nil, err + } + var hint interface{} + if v.Hint != nil { + hint, err = createHint(*v.Hint) + if err != nil { + return nil, err + } + } + model := &mongo.ClientUpdateOneModel{ + Namespace: v.Namespace, + Filter: v.Filter, + Update: v.Update, + Collation: v.Collation, + Hint: hint, + Upsert: v.Upsert, + } + if len(v.ArrayFilters) > 0 { + model.ArrayFilters = &options.ArrayFilters{Filters: v.ArrayFilters} + } + return model, nil + +} + +func createClientUpdateManyModel(value bson.Raw) (*mongo.ClientUpdateManyModel, error) { + var v struct { + Namespace string + Filter bson.Raw + Update interface{} + ArrayFilters []interface{} + Collation *options.Collation + Hint *bson.RawValue + Upsert *bool + } + err := bson.Unmarshal(value, &v) + if err != nil { + return nil, err + } + var hint interface{} + if v.Hint != nil { + hint, err = createHint(*v.Hint) + if err != nil { + return nil, err + } + } + model := &mongo.ClientUpdateManyModel{ + Namespace: v.Namespace, + Filter: v.Filter, + Update: v.Update, + Collation: v.Collation, + Hint: hint, + Upsert: v.Upsert, + } + if len(v.ArrayFilters) > 0 { + model.ArrayFilters = &options.ArrayFilters{Filters: v.ArrayFilters} + } + return model, nil +} + +func createClientReplaceOneModel(value bson.Raw) (*mongo.ClientReplaceOneModel, error) { + var v struct { + Namespace string + Filter bson.Raw + Replacement bson.Raw + Collation *options.Collation + Hint *bson.RawValue + Upsert *bool + } + err := bson.Unmarshal(value, &v) + if err != nil { + return nil, err + } + var hint interface{} + if v.Hint != nil { + hint, err = createHint(*v.Hint) + if err != nil { + return nil, err + } + } + return &mongo.ClientReplaceOneModel{ + Namespace: v.Namespace, + Filter: v.Filter, + Replacement: v.Replacement, + Collation: v.Collation, + Hint: hint, + Upsert: v.Upsert, + }, nil +} + +func createClientDeleteOneModel(value bson.Raw) (*mongo.ClientDeleteOneModel, error) { + var v struct { + Namespace string + Filter bson.Raw + Collation *options.Collation + Hint *bson.RawValue + } + err := bson.Unmarshal(value, &v) + if err != nil { + return nil, err + } + var hint interface{} + if v.Hint != nil { + hint, err = createHint(*v.Hint) + if err != nil { + return nil, err + } + } + return &mongo.ClientDeleteOneModel{ + Namespace: v.Namespace, + Filter: v.Filter, + Collation: v.Collation, + Hint: hint, + }, nil +} + +func createClientDeleteManyModel(value bson.Raw) (*mongo.ClientDeleteManyModel, error) { + var v struct { + Namespace string + Filter bson.Raw + Collation *options.Collation + Hint *bson.RawValue + } + err := bson.Unmarshal(value, &v) + if err != nil { + return nil, err + } + var hint interface{} + if v.Hint != nil { + hint, err = createHint(*v.Hint) + if err != nil { + return nil, err + } + } + return &mongo.ClientDeleteManyModel{ + Namespace: v.Namespace, + Filter: v.Filter, + Collation: v.Collation, + Hint: hint, + }, nil +} diff --git a/mongo/integration/unified/operation.go b/mongo/integration/unified/operation.go index 59aa36ae8c..179cf16793 100644 --- a/mongo/integration/unified/operation.go +++ b/mongo/integration/unified/operation.go @@ -148,6 +148,8 @@ func (op *operation) run(ctx context.Context, loopDone <-chan struct{}) (*operat return executeAggregate(ctx, op) case "bulkWrite": return executeBulkWrite(ctx, op) + case "clientBulkWrite": + return executeClientBulkWrite(ctx, op) case "countDocuments": return executeCountDocuments(ctx, op) case "createFindCursor": diff --git a/mongo/results.go b/mongo/results.go index 3681b28e12..0088e734fc 100644 --- a/mongo/results.go +++ b/mongo/results.go @@ -11,7 +11,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" "go.mongodb.org/mongo-driver/x/mongo/driver/operation" ) @@ -33,27 +32,17 @@ type ClientBulkWriteResult struct { UpsertedCount int64 // A map of operation index to the _id of each inserted document. - InsertResults map[int64]CientInsertOneResult + InsertResults map[int64]ClientInsertResult // A map of operation index to the _id of each updated document. UpdateResults map[int64]ClientUpdateResult // A map of operation index to the _id of each deleted document. DeleteResults map[int64]ClientDeleteResult - - raw bsoncore.Document -} - -func newClientBulkWriteResult(result bsoncore.Document) *ClientBulkWriteResult { - return &ClientBulkWriteResult{raw: result} -} - -func (result *ClientBulkWriteResult) String() string { - return result.raw.String() } -// CientInsertOneResult is the result type returned by a client-level bulk write of InsertOne operation. -type CientInsertOneResult struct { +// ClientInsertResult is the result type returned by a client-level bulk write of InsertOne operation. +type ClientInsertResult struct { // The _id of the inserted document. A value generated by the driver will be of type primitive.ObjectID. InsertedID interface{} } @@ -67,7 +56,7 @@ type ClientUpdateResult struct { // ClientDeleteResult is the result type returned by a client-level bulk write DeleteOne and DeleteMany operation. type ClientDeleteResult struct { - DeletedCount int64 // `bson:"n"` // The number of documents deleted. + DeletedCount int64 // The number of documents deleted. } // BulkWriteResult is the result type returned by a BulkWrite operation.