Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Nov 15, 2024
1 parent 4d88edf commit 7a76a57
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 97 deletions.
72 changes: 72 additions & 0 deletions internal/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/wiremessage"
Expand Down Expand Up @@ -718,6 +719,77 @@ func TestClient(t *testing.T) {
})
}
})
mtBulkWriteOpts := mtest.NewOptions().MinServerVersion("8.0").AtlasDataLake(false).ClientType(mtest.Pinned)
mt.RunOpts("bulk write with nil filter", mtBulkWriteOpts, func(mt *mtest.T) {
mt.Parallel()

testCases := []struct {
name string
models *mongo.ClientWriteModels
}{
{
name: "DeleteOne",
models: (&mongo.ClientWriteModels{}).AppendDeleteOne("foo", "bar", mongo.NewClientDeleteOneModel()),
},
{
name: "DeleteMany",
models: (&mongo.ClientWriteModels{}).AppendDeleteMany("foo", "bar", mongo.NewClientDeleteManyModel()),
},
{
name: "UpdateOne",
models: (&mongo.ClientWriteModels{}).AppendUpdateOne("foo", "bar", mongo.NewClientUpdateOneModel()),
},
{
name: "UpdateMany",
models: (&mongo.ClientWriteModels{}).AppendUpdateMany("foo", "bar", mongo.NewClientUpdateManyModel()),
},
}
for _, tc := range testCases {
tc := tc

mt.Run(tc.name, func(mt *mtest.T) {
mt.Parallel()

_, err := mt.Client.BulkWrite(context.Background(), tc.models)
require.ErrorContains(mt, err, "filter is required")
})
}
})
mt.RunOpts("bulk write with write concern", mtBulkWriteOpts, func(mt *mtest.T) {
mt.Parallel()

testCases := []struct {
name string
opts *options.ClientBulkWriteOptionsBuilder
want bool
}{
{
name: "unacknowledged",
opts: options.ClientBulkWrite().SetWriteConcern(writeconcern.Unacknowledged()).SetOrdered(false),
want: false,
},
{
name: "acknowledged",
want: true,
},
}
for _, tc := range testCases {
tc := tc

mt.Run(tc.name, func(mt *mtest.T) {
mt.Parallel()

var models *mongo.ClientWriteModels

insertOneModel := mongo.NewClientInsertOneModel().SetDocument(bson.D{{"x", 1}})
models = (&mongo.ClientWriteModels{}).AppendInsertOne("foo", "bar", insertOneModel)
res, err := mt.Client.BulkWrite(context.Background(), models, tc.opts)
require.NoError(mt, err, "BulkWrite error: %v", err)
require.NotNil(mt, res, "expected a ClientBulkWriteResult")
assert.Equal(mt, res.Acknowledged, tc.want, "expected Acknowledged: %v, got: %v", tc.want, res.Acknowledged)
})
}
})
}

func TestClient_BSONOptions(t *testing.T) {
Expand Down
35 changes: 35 additions & 0 deletions internal/integration/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,41 @@ func TestCollection(t *testing.T) {
})
}
})
mt.Run("error on nil filter", func(mt *mtest.T) {
mt.Parallel()

testCases := []struct {
name string
model mongo.WriteModel
}{
{
name: "DeleteOne",
model: mongo.NewDeleteOneModel(),
},
{
name: "DeleteMany",
model: mongo.NewDeleteManyModel(),
},
{
name: "UpdateOne",
model: mongo.NewUpdateOneModel().SetUpdate(bson.D{{"$set", bson.D{{"x", 1}}}}),
},
{
name: "UpdateMany",
model: mongo.NewUpdateManyModel().SetUpdate(bson.D{{"$set", bson.D{{"x", 1}}}}),
},
}
for _, tc := range testCases {
tc := tc

mt.Run(tc.name, func(mt *mtest.T) {
mt.Parallel()

_, err := mt.Coll.BulkWrite(context.Background(), []mongo.WriteModel{tc.model})
assert.ErrorContains(mt, err, "filter is required")
})
}
})
mt.Run("correct model in errors", func(mt *mtest.T) {
models := []mongo.WriteModel{
mongo.NewUpdateOneModel().SetFilter(bson.M{}).SetUpdate(bson.M{
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/unified/client_operation_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func executeClientBulkWrite(ctx context.Context, operation *operation) (*operati
}

res, err := client.BulkWrite(ctx, wirteModels, opts)
if res == nil {
if res == nil || !res.Acknowledged {
var bwe mongo.ClientBulkWriteException
if !errors.As(err, &bwe) || bwe.PartialResult == nil {
return newDocumentResult(emptyCoreDocument, err), nil
Expand Down
7 changes: 7 additions & 0 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package mongo
import (
"context"
"errors"
"fmt"

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo/options"
Expand Down Expand Up @@ -296,6 +297,9 @@ func createDeleteDoc(
) (bsoncore.Document, error) {
f, err := marshal(filter, bsonOpts, registry)
if err != nil {
if filter == nil {
return nil, fmt.Errorf("%w: filter is required", err)
}
return nil, err
}

Expand Down Expand Up @@ -428,6 +432,9 @@ type updateDoc struct {
func (doc updateDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.Registry) (bsoncore.Document, error) {
f, err := marshal(doc.filter, bsonOpts, registry)
if err != nil {
if doc.filter == nil {
return nil, fmt.Errorf("%w: filter is required", err)
}
return nil, err
}

Expand Down
12 changes: 12 additions & 0 deletions mongo/bulk_write_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type WriteModel interface {
}

// InsertOneModel is used to insert a single document in a BulkWrite operation.
//
// See corresponding setter methods for documentation.
type InsertOneModel struct {
Document interface{}
}
Expand All @@ -40,6 +42,8 @@ func (iom *InsertOneModel) SetDocument(doc interface{}) *InsertOneModel {
func (*InsertOneModel) writeModel() {}

// DeleteOneModel is used to delete at most one document in a BulkWriteOperation.
//
// See corresponding setter methods for documentation.
type DeleteOneModel struct {
Filter interface{}
Collation *options.Collation
Expand Down Expand Up @@ -80,6 +84,8 @@ func (dom *DeleteOneModel) SetHint(hint interface{}) *DeleteOneModel {
func (*DeleteOneModel) writeModel() {}

// DeleteManyModel is used to delete multiple documents in a BulkWrite operation.
//
// See corresponding setter methods for documentation.
type DeleteManyModel struct {
Filter interface{}
Collation *options.Collation
Expand Down Expand Up @@ -119,6 +125,8 @@ func (dmm *DeleteManyModel) SetHint(hint interface{}) *DeleteManyModel {
func (*DeleteManyModel) writeModel() {}

// ReplaceOneModel is used to replace at most one document in a BulkWrite operation.
//
// See corresponding setter methods for documentation.
type ReplaceOneModel struct {
Collation *options.Collation
Upsert *bool
Expand Down Expand Up @@ -176,6 +184,8 @@ func (rom *ReplaceOneModel) SetUpsert(upsert bool) *ReplaceOneModel {
func (*ReplaceOneModel) writeModel() {}

// UpdateOneModel is used to update at most one document in a BulkWrite operation.
//
// See corresponding setter methods for documentation.
type UpdateOneModel struct {
Collation *options.Collation
Upsert *bool
Expand Down Expand Up @@ -241,6 +251,8 @@ func (uom *UpdateOneModel) SetUpsert(upsert bool) *UpdateOneModel {
func (*UpdateOneModel) writeModel() {}

// UpdateManyModel is used to update multiple documents in a BulkWrite operation.
//
// See corresponding setter methods for documentation.
type UpdateManyModel struct {
Collation *options.Collation
Upsert *bool
Expand Down
17 changes: 8 additions & 9 deletions mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,10 +895,14 @@ func (c *Client) createBaseCursorOptions() driver.CursorOptions {
// BulkWrite performs a client-level bulk write operation.
func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
opts ...options.Lister[options.ClientBulkWriteOptions]) (*ClientBulkWriteResult, error) {
// TODO: Remove once DRIVERS-2888 is implemented.
// TODO(GODRIVER-3403): Remove after support for QE with Client.bulkWrite.
if c.isAutoEncryptionSet {
return nil, errors.New("bulkWrite does not currently support automatic encryption")
}

if models == nil {
return nil, ErrNilValue
}
bwo, err := mongoutil.NewOptions(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -962,14 +966,9 @@ func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
} else if !acknowledged {
return nil, errors.New("cannot request unacknowledged write concern and verbose results")
}
if err = op.execute(ctx); err != nil {
return nil, replaceErrors(err)
}
var results *ClientBulkWriteResult
if acknowledged {
results = &op.result
}
return results, nil
err = op.execute(ctx)
op.result.Acknowledged = acknowledged
return &op.result, replaceErrors(err)
}

// newLogger will use the LoggerOptions to create an internal logger and publish
Expand Down
34 changes: 19 additions & 15 deletions mongo/client_bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"strconv"

Expand Down Expand Up @@ -81,19 +82,16 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
Name: driverutil.BulkWriteOp,
}.Execute(ctx)
var exception *ClientBulkWriteException
switch tt := err.(type) {
case CommandError:

var ce CommandError
if errors.As(err, &ce) {
exception = &ClientBulkWriteException{
TopLevelError: &WriteError{
Code: int(tt.Code),
Message: tt.Message,
Raw: tt.Raw,
Code: int(ce.Code),
Message: ce.Message,
Raw: ce.Raw,
},
}
default:
if errors.Is(err, driver.ErrUnacknowledgedWrite) {
err = nil
}
}
if len(batches.writeConcernErrors) > 0 || len(batches.writeErrors) > 0 {
if exception == nil {
Expand Down Expand Up @@ -521,9 +519,9 @@ func (mb *modelBatches) appendDeleteResult(cur *cursorInfo, raw bson.Raw) bool {
}

if mb.result.DeleteResults == nil {
mb.result.DeleteResults = make(map[int]ClientDeleteResult)
mb.result.DeleteResults = make(map[int]ClientBulkWriteDeleteResult)
}
mb.result.DeleteResults[idx] = ClientDeleteResult{int64(cur.N)}
mb.result.DeleteResults[idx] = ClientBulkWriteDeleteResult{int64(cur.N)}

return true
}
Expand All @@ -540,9 +538,9 @@ func (mb *modelBatches) appendInsertResult(cur *cursorInfo, raw bson.Raw) bool {
}

if mb.result.InsertResults == nil {
mb.result.InsertResults = make(map[int]ClientInsertResult)
mb.result.InsertResults = make(map[int]ClientBulkWriteInsertResult)
}
mb.result.InsertResults[idx] = ClientInsertResult{mb.newIDMap[idx]}
mb.result.InsertResults[idx] = ClientBulkWriteInsertResult{mb.newIDMap[idx]}

return true
}
Expand All @@ -559,9 +557,9 @@ func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
}

if mb.result.UpdateResults == nil {
mb.result.UpdateResults = make(map[int]ClientUpdateResult)
mb.result.UpdateResults = make(map[int]ClientBulkWriteUpdateResult)
}
result := ClientUpdateResult{
result := ClientBulkWriteUpdateResult{
MatchedCount: int64(cur.N),
}
if cur.NModified != nil {
Expand Down Expand Up @@ -624,6 +622,9 @@ func (d *clientUpdateDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.

f, err := marshal(d.filter, bsonOpts, registry)
if err != nil {
if d.filter == nil {
return nil, fmt.Errorf("%w: filter is required", err)
}
return nil, err
}
doc = bsoncore.AppendDocumentElement(doc, "filter", f)
Expand Down Expand Up @@ -684,6 +685,9 @@ func (d *clientDeleteDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.

f, err := marshal(d.filter, bsonOpts, registry)
if err != nil {
if d.filter == nil {
return nil, fmt.Errorf("%w: filter is required", err)
}
return nil, err
}
doc = bsoncore.AppendDocumentElement(doc, "filter", f)
Expand Down
Loading

0 comments on commit 7a76a57

Please sign in to comment.