diff --git a/.gitignore b/.gitignore index 9f4fa6d20..9a3120f6b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ _harness - +.vscode \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 43cc9c2b6..78991a8c1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,8 +3,8 @@ language: go go_import_path: github.com/globalsign/mgo go: - - 1.8.x - 1.9.x + - 1.10.x env: global: @@ -29,7 +29,7 @@ install: - go get gopkg.in/check.v1 - go get gopkg.in/yaml.v2 - go get gopkg.in/tomb.v2 - - go get github.com/golang/lint/golint + - go get github.com/golang/lint before_script: - golint ./... | grep -v 'ID' | cat diff --git a/README.md b/README.md index d0f39d267..6c87fa905 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,16 @@ The MongoDB driver for Go ------------------------- -This fork has had a few improvements by ourselves as well as several PR's merged from the original mgo repo that are currently awaiting review. Changes are mostly geared towards performance improvements and bug fixes, though a few new features have been added. +This fork has had a few improvements by ourselves as well as several PR's merged from the original mgo repo that are currently awaiting review. +Changes are mostly geared towards performance improvements and bug fixes, though a few new features have been added. Further PR's (with tests) are welcome, but please maintain backwards compatibility. +Detailed documentation of the API is available at +[GoDoc](https://godoc.org/github.com/globalsign/mgo). + +A [sub-package](https://godoc.org/github.com/globalsign/mgo/bson) that implements the [BSON](http://bsonspec.org) specification is also included, and may be used independently of the driver. + ## Changes * Fixes attempting to authenticate before every query ([details](https://github.com/go-mgo/mgo/issues/254)) * Removes bulk update / delete batch size limitations ([details](https://github.com/go-mgo/mgo/issues/288)) @@ -15,13 +21,13 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Support majority read concerns ([details](https://github.com/globalsign/mgo/pull/2)) * Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5)) * Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7)) -* Support for partial indexes ([detials](https://github.com/domodwyer/mgo/commit/5efe8eccb028238d93c222828cae4806aeae9f51)) -* Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464)) +* Support for partial indexes ([details](https://github.com/domodwyer/mgo/commit/5efe8eccb028238d93c222828cae4806aeae9f51)) +* Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464)) * Integration tests run against MongoDB 3.2 & 3.4 releases ([details](https://github.com/globalsign/mgo/pull/4), [more](https://github.com/globalsign/mgo/pull/24), [more](https://github.com/globalsign/mgo/pull/35)) * Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11), [more](https://github.com/globalsign/mgo/pull/16)) * Fixes cursor timeouts ([details](https://jira.mongodb.org/browse/SERVER-24899)) * Support index hints and timeouts for count queries ([details](https://github.com/globalsign/mgo/pull/17)) -* Don't panic when handling indexed `int64` fields ([detials](https://github.com/go-mgo/mgo/issues/475)) +* Don't panic when handling indexed `int64` fields ([details](https://github.com/go-mgo/mgo/issues/475)) * Supports dropping all indexes on a collection ([details](https://github.com/globalsign/mgo/pull/25)) * Annotates log entries/profiler output with optional appName on 3.4+ ([details](https://github.com/globalsign/mgo/pull/28)) * Support for read-only [views](https://docs.mongodb.com/manual/core/views/) in 3.4+ ([details](https://github.com/globalsign/mgo/pull/33)) @@ -37,13 +43,21 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Use JSON tags when no explicit BSON are tags set ([details](https://github.com/globalsign/mgo/pull/91)) * Support [$changeStream](https://docs.mongodb.com/manual/changeStreams/) tailing on 3.6+ ([details](https://github.com/globalsign/mgo/pull/97)) * Fix deadlock in cluster synchronisation ([details](https://github.com/globalsign/mgo/issues/120)) +* Implement `maxIdleTimeout` for pooled connections ([details](https://github.com/globalsign/mgo/pull/116)) +* Connection pool waiting improvements ([details](https://github.com/globalsign/mgo/pull/115)) +* Fixes BSON encoding for `$in` and friends ([details](https://github.com/globalsign/mgo/pull/128)) +* Add BSON stream encoders ([details](https://github.com/globalsign/mgo/pull/127)) +* Add integer map key support in the BSON encoder ([details](https://github.com/globalsign/mgo/pull/140)) +* Support aggregation [collations](https://docs.mongodb.com/manual/reference/collation/) ([details](https://github.com/globalsign/mgo/pull/144)) --- ### Thanks to +* @aksentyev * @bachue * @bozaro * @BenLubar +* @carldunham * @carter2000 * @cezarsa * @drichelson @@ -51,13 +65,17 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * @eaglerayp * @feliixx * @fmpwizard +* @gazoon +* @gnawux * @idy * @jameinel +* @johnlawsharrison * @KJTsanaktsidis -* @gazoon * @mapete94 +* @maxnoel +* @mcspring * @peterdeka * @Reenjii * @smoya * @steve-gray -* @wgallagher \ No newline at end of file +* @wgallagher diff --git a/bson/README.md b/bson/README.md new file mode 100644 index 000000000..5c5819e61 --- /dev/null +++ b/bson/README.md @@ -0,0 +1,12 @@ +[![GoDoc](https://godoc.org/github.com/globalsign/mgo/bson?status.svg)](https://godoc.org/github.com/globalsign/mgo/bson) + +An Implementation of BSON for Go +-------------------------------- + +Package bson is an implementation of the [BSON specification](http://bsonspec.org) for Go. + +While the BSON package implements the BSON spec as faithfully as possible, there +is some MongoDB specific behaviour (such as map keys `$in`, `$all`, etc) in the +`bson` package. The priority is for backwards compatibility for the `mgo` +driver, though fixes for obviously buggy behaviour is welcome (and features, etc +behind feature flags). diff --git a/bson/bson_test.go b/bson/bson_test.go index db72d8a06..406ede6ae 100644 --- a/bson/bson_test.go +++ b/bson/bson_test.go @@ -34,9 +34,9 @@ import ( "errors" "net/url" "reflect" + "strings" "testing" "time" - "strings" "github.com/globalsign/mgo/bson" . "gopkg.in/check.v1" @@ -111,6 +111,10 @@ var sampleItems = []testItemType{ {bson.M{"BSON": []interface{}{"awesome", float64(5.05), 1986}}, "1\x00\x00\x00\x04BSON\x00&\x00\x00\x00\x020\x00\x08\x00\x00\x00" + "awesome\x00\x011\x00333333\x14@\x102\x00\xc2\x07\x00\x00\x00\x00"}, + {bson.M{"slice": []uint8{1, 2}}, + "\x13\x00\x00\x00\x05slice\x00\x02\x00\x00\x00\x00\x01\x02\x00"}, + {bson.M{"slice": []byte{1, 2}}, + "\x13\x00\x00\x00\x05slice\x00\x02\x00\x00\x00\x00\x01\x02\x00"}, } func (s *S) TestMarshalSampleItems(c *C) { @@ -343,6 +347,27 @@ func (s *S) TestOneWayMarshalItems(c *C) { } } +// -------------------------------------------------------------------------- +// Some ops marshaling operations which would encode []uint8 or []byte in array. + +var arrayOpsMarshalItems = []testItemType{ + {bson.M{"_": bson.M{"$in": []uint8{1, 2}}}, + "\x03_\x00\x1d\x00\x00\x00\x04$in\x00\x13\x00\x00\x00\x100\x00\x01\x00\x00\x00\x101\x00\x02\x00\x00\x00\x00\x00"}, + {bson.M{"_": bson.M{"$nin": []uint8{1, 2}}}, + "\x03_\x00\x1e\x00\x00\x00\x04$nin\x00\x13\x00\x00\x00\x100\x00\x01\x00\x00\x00\x101\x00\x02\x00\x00\x00\x00\x00"}, + {bson.M{"_": bson.M{"$all": []uint8{1, 2}}}, + "\x03_\x00\x1e\x00\x00\x00\x04$all\x00\x13\x00\x00\x00\x100\x00\x01\x00\x00\x00\x101\x00\x02\x00\x00\x00\x00\x00"}, +} + +func (s *S) TestArrayOpsMarshalItems(c *C) { + for i, item := range arrayOpsMarshalItems { + data, err := bson.Marshal(item.obj) + c.Assert(err, IsNil) + c.Assert(string(data), Equals, wrapInDoc(item.data), + Commentf("Failed on item %d", i)) + } +} + // -------------------------------------------------------------------------- // Two-way tests for user-defined structures using the samples // from bsonspec.org. @@ -582,6 +607,8 @@ func (s *S) TestMarshalOneWayItems(c *C) { // -------------------------------------------------------------------------- // One-way unmarshaling tests. +type intAlias int + var unmarshalItems = []testItemType{ // Field is private. Should not attempt to unmarshal it. {&struct{ priv byte }{}, @@ -636,6 +663,14 @@ var unmarshalItems = []testItemType{ // Decode a doc within a doc in to a slice within a doc; shouldn't error {&struct{ Foo []string }{}, "\x03\x66\x6f\x6f\x00\x05\x00\x00\x00\x00"}, + + // int key maps + {map[int]string{10: "s"}, + "\x0210\x00\x02\x00\x00\x00s\x00"}, + + //// event if type is alias to int + {map[intAlias]string{10: "s"}, + "\x0210\x00\x02\x00\x00\x00s\x00"}, } func (s *S) TestUnmarshalOneWayItems(c *C) { @@ -713,11 +748,6 @@ var unmarshalErrorItems = []unmarshalErrorType{ "\x10name\x00\x08\x00\x00\x00", "Duplicated key 'name' in struct bson_test.structWithDupKeys"}, - // Non-string map key. - {map[int]interface{}{}, - "\x10name\x00\x08\x00\x00\x00", - "BSON map must have string keys. Got: map\\[int\\]interface \\{\\}"}, - {nil, "\xEEname\x00", "Unknown element kind \\(0xEE\\)"}, @@ -733,6 +763,11 @@ var unmarshalErrorItems = []unmarshalErrorType{ {nil, "\x08\x62\x00\x02", "encoded boolean must be 1 or 0, found 2"}, + + // Non-string and not numeric map key. + {map[bool]interface{}{true: 1}, + "\x10true\x00\x01\x00\x00\x00", + "BSON map must have string or decimal keys. Got: map\\[bool\\]interface \\{\\}"}, } func (s *S) TestUnmarshalErrorItems(c *C) { @@ -1136,8 +1171,8 @@ type inlineBadKeyMap struct { M map[int]int `bson:",inline"` } type inlineUnexported struct { - M map[string]interface{} `bson:",inline"` - unexported `bson:",inline"` + M map[string]interface{} `bson:",inline"` + unexported `bson:",inline"` } type unexported struct { A int @@ -1194,11 +1229,11 @@ func (s ifaceSlice) GetBSON() (interface{}, error) { type ( MyString string - MyBytes []byte - MyBool bool - MyD []bson.DocElem - MyRawD []bson.RawDocElem - MyM map[string]interface{} + MyBytes []byte + MyBool bool + MyD []bson.DocElem + MyRawD []bson.RawDocElem + MyM map[string]interface{} ) var ( diff --git a/bson/decode.go b/bson/decode.go index e71eac23f..658856add 100644 --- a/bson/decode.go +++ b/bson/decode.go @@ -176,9 +176,6 @@ func (d *decoder) readDocTo(out reflect.Value) { switch outk { case reflect.Map: keyType = outt.Key() - if keyType.Kind() != reflect.String { - panic("BSON map must have string keys. Got: " + outt.String()) - } if keyType != typeString { convertKey = true } @@ -240,7 +237,42 @@ func (d *decoder) readDocTo(out reflect.Value) { if d.readElemTo(e, kind) { k := reflect.ValueOf(name) if convertKey { - k = k.Convert(keyType) + mapKeyType := out.Type().Key() + mapKeyKind := mapKeyType.Kind() + + switch mapKeyKind { + case reflect.Int: + fallthrough + case reflect.Int8: + fallthrough + case reflect.Int16: + fallthrough + case reflect.Int32: + fallthrough + case reflect.Int64: + fallthrough + case reflect.Uint: + fallthrough + case reflect.Uint8: + fallthrough + case reflect.Uint16: + fallthrough + case reflect.Uint32: + fallthrough + case reflect.Uint64: + fallthrough + case reflect.Float32: + fallthrough + case reflect.Float64: + parsed := d.parseMapKeyAsFloat(k, mapKeyKind) + k = reflect.ValueOf(parsed) + case reflect.String: + mapKeyType = keyType + default: + panic("BSON map must have string or decimal keys. Got: " + outt.String()) + } + + k = k.Convert(mapKeyType) } out.SetMapIndex(k, e) } @@ -276,6 +308,16 @@ func (d *decoder) readDocTo(out reflect.Value) { d.docType = docType } +func (decoder) parseMapKeyAsFloat(k reflect.Value, mapKeyKind reflect.Kind) float64 { + parsed, err := strconv.ParseFloat(k.String(), 64) + if err != nil { + panic("Map key is defined to be a decimal type (" + mapKeyKind.String() + ") but got error " + + err.Error()) + } + + return parsed +} + func (d *decoder) readArrayDocTo(out reflect.Value) { end := int(d.readInt32()) end += d.i - 4 diff --git a/bson/encode.go b/bson/encode.go index f307c31ec..7e0b84d77 100644 --- a/bson/encode.go +++ b/bson/encode.go @@ -60,6 +60,15 @@ var ( typeTimeDuration = reflect.TypeOf(time.Duration(0)) ) +var ( + // spec for []uint8 or []byte encoding + arrayOps = map[string]bool{ + "$in": true, + "$nin": true, + "$all": true, + } +) + const itoaCacheSize = 32 const ( @@ -194,7 +203,7 @@ func (e *encoder) addDoc(v reflect.Value) { func (e *encoder) addMap(v reflect.Value) { for _, k := range v.MapKeys() { - e.addElem(k.String(), v.MapIndex(k), false) + e.addElem(fmt.Sprint(k), v.MapIndex(k), false) } } @@ -423,8 +432,13 @@ func (e *encoder) addElem(name string, v reflect.Value, minSize bool) { vt := v.Type() et := vt.Elem() if et.Kind() == reflect.Uint8 { - e.addElemName(0x05, name) - e.addBinary(0x00, v.Bytes()) + if arrayOps[name] { + e.addElemName(0x04, name) + e.addDoc(v) + } else { + e.addElemName(0x05, name) + e.addBinary(0x00, v.Bytes()) + } } else if et == typeDocElem || et == typeRawDocElem { e.addElemName(0x03, name) e.addDoc(v) @@ -436,16 +450,21 @@ func (e *encoder) addElem(name string, v reflect.Value, minSize bool) { case reflect.Array: et := v.Type().Elem() if et.Kind() == reflect.Uint8 { - e.addElemName(0x05, name) - if v.CanAddr() { - e.addBinary(0x00, v.Slice(0, v.Len()).Interface().([]byte)) + if arrayOps[name] { + e.addElemName(0x04, name) + e.addDoc(v) } else { - n := v.Len() - e.addInt32(int32(n)) - e.addBytes(0x00) - for i := 0; i < n; i++ { - el := v.Index(i) - e.addBytes(byte(el.Uint())) + e.addElemName(0x05, name) + if v.CanAddr() { + e.addBinary(0x00, v.Slice(0, v.Len()).Interface().([]byte)) + } else { + n := v.Len() + e.addInt32(int32(n)) + e.addBytes(0x00) + for i := 0; i < n; i++ { + el := v.Index(i) + e.addBytes(byte(el.Uint())) + } } } } else { diff --git a/bson/stream.go b/bson/stream.go new file mode 100644 index 000000000..466528457 --- /dev/null +++ b/bson/stream.go @@ -0,0 +1,90 @@ +package bson + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" +) + +const ( + // MinDocumentSize is the size of the smallest possible valid BSON document: + // an int32 size header + 0x00 (end of document). + MinDocumentSize = 5 + + // MaxDocumentSize is the largest possible size for a BSON document allowed by MongoDB, + // that is, 16 MiB (see https://docs.mongodb.com/manual/reference/limits/). + MaxDocumentSize = 16777216 +) + +// ErrInvalidDocumentSize is an error returned when a BSON document's header +// contains a size smaller than MinDocumentSize or greater than MaxDocumentSize. +type ErrInvalidDocumentSize struct { + DocumentSize int32 +} + +func (e ErrInvalidDocumentSize) Error() string { + return fmt.Sprintf("invalid document size %d", e.DocumentSize) +} + +// A Decoder reads and decodes BSON values from an input stream. +type Decoder struct { + source io.Reader +} + +// NewDecoder returns a new Decoder that reads from source. +// It does not add any extra buffering, and may not read data from source beyond the BSON values requested. +func NewDecoder(source io.Reader) *Decoder { + return &Decoder{source: source} +} + +// Decode reads the next BSON-encoded value from its input and stores it in the value pointed to by v. +// See the documentation for Unmarshal for details about the conversion of BSON into a Go value. +func (dec *Decoder) Decode(v interface{}) (err error) { + // BSON documents start with their size as a *signed* int32. + var docSize int32 + if err = binary.Read(dec.source, binary.LittleEndian, &docSize); err != nil { + return + } + + if docSize < MinDocumentSize || docSize > MaxDocumentSize { + return ErrInvalidDocumentSize{DocumentSize: docSize} + } + + docBuffer := bytes.NewBuffer(make([]byte, 0, docSize)) + if err = binary.Write(docBuffer, binary.LittleEndian, docSize); err != nil { + return + } + + // docSize is the *full* document's size (including the 4-byte size header, + // which has already been read). + if _, err = io.CopyN(docBuffer, dec.source, int64(docSize-4)); err != nil { + return + } + + // Let Unmarshal handle the rest. + defer handleErr(&err) + return Unmarshal(docBuffer.Bytes(), v) +} + +// An Encoder encodes and writes BSON values to an output stream. +type Encoder struct { + target io.Writer +} + +// NewEncoder returns a new Encoder that writes to target. +func NewEncoder(target io.Writer) *Encoder { + return &Encoder{target: target} +} + +// Encode encodes v to BSON, and if successful writes it to the Encoder's output stream. +// See the documentation for Marshal for details about the conversion of Go values to BSON. +func (enc *Encoder) Encode(v interface{}) error { + data, err := Marshal(v) + if err != nil { + return err + } + + _, err = enc.target.Write(data) + return err +} diff --git a/bson/stream_test.go b/bson/stream_test.go new file mode 100644 index 000000000..14acbe3c5 --- /dev/null +++ b/bson/stream_test.go @@ -0,0 +1,77 @@ +package bson_test + +import ( + "bytes" + + "github.com/globalsign/mgo/bson" + . "gopkg.in/check.v1" +) + +var invalidSizeDocuments = [][]byte{ + // Empty document + []byte{}, + // Incomplete header + []byte{0x04}, + // Negative size + []byte{0xff, 0xff, 0xff, 0xff}, + // Full, valid size header but too small (less than 5 bytes) + []byte{0x04, 0x00, 0x00, 0x00}, + // Valid header, valid size but incomplete document + []byte{0xff, 0x00, 0x00, 0x00, 0x00}, + // Too big + []byte{0xff, 0xff, 0xff, 0x7f}, +} + +// Reusing sampleItems from bson_test + +func (s *S) TestEncodeSampleItems(c *C) { + for i, item := range sampleItems { + buf := bytes.NewBuffer(nil) + enc := bson.NewEncoder(buf) + + err := enc.Encode(item.obj) + c.Assert(err, IsNil) + c.Assert(string(buf.Bytes()), Equals, item.data, Commentf("Failed on item %d", i)) + } +} + +func (s *S) TestDecodeSampleItems(c *C) { + for i, item := range sampleItems { + buf := bytes.NewBuffer([]byte(item.data)) + dec := bson.NewDecoder(buf) + + value := bson.M{} + err := dec.Decode(&value) + c.Assert(err, IsNil) + c.Assert(value, DeepEquals, item.obj, Commentf("Failed on item %d", i)) + } +} + +func (s *S) TestStreamRoundTrip(c *C) { + buf := bytes.NewBuffer(nil) + enc := bson.NewEncoder(buf) + + for _, item := range sampleItems { + err := enc.Encode(item.obj) + c.Assert(err, IsNil) + } + + // Ensure that everything that was encoded is decodable in the same order. + dec := bson.NewDecoder(buf) + for i, item := range sampleItems { + value := bson.M{} + err := dec.Decode(&value) + c.Assert(err, IsNil) + c.Assert(value, DeepEquals, item.obj, Commentf("Failed on item %d", i)) + } +} + +func (s *S) TestDecodeDocumentTooSmall(c *C) { + for i, item := range invalidSizeDocuments { + buf := bytes.NewBuffer(item) + dec := bson.NewDecoder(buf) + value := bson.M{} + err := dec.Decode(&value) + c.Assert(err, NotNil, Commentf("Failed on invalid size item %d", i)) + } +} diff --git a/cluster.go b/cluster.go index d639a4193..4e54c5d81 100644 --- a/cluster.go +++ b/cluster.go @@ -48,21 +48,23 @@ import ( type mongoCluster struct { sync.RWMutex - serverSynced sync.Cond - userSeeds []string - dynaSeeds []string - servers mongoServers - masters mongoServers - references int - syncing bool - direct bool - failFast bool - syncCount uint - setName string - cachedIndex map[string]bool - sync chan bool - dial dialer - appName string + serverSynced sync.Cond + userSeeds []string + dynaSeeds []string + servers mongoServers + masters mongoServers + references int + syncing bool + direct bool + failFast bool + syncCount uint + setName string + cachedIndex map[string]bool + sync chan bool + dial dialer + appName string + minPoolSize int + maxIdleTimeMS int } func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster { @@ -437,11 +439,13 @@ func (cluster *mongoCluster) syncServersLoop() { func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer { cluster.RLock() server := cluster.servers.Search(tcpaddr.String()) + minPoolSize := cluster.minPoolSize + maxIdleTimeMS := cluster.maxIdleTimeMS cluster.RUnlock() if server != nil { return server } - return newServer(addr, tcpaddr, cluster.sync, cluster.dial) + return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS) } func resolveAddr(addr string) (*net.TCPAddr, error) { @@ -614,9 +618,17 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) { // true, it will attempt to return a socket to a slave server. If it is // false, the socket will necessarily be to a master server. func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) { + return cluster.AcquireSocketWithPoolTimeout(mode, slaveOk, syncTimeout, socketTimeout, serverTags, poolLimit, 0) +} + +// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is +// true, it will attempt to return a socket to a slave server. If it is +// false, the socket will necessarily be to a master server. +func (cluster *mongoCluster) AcquireSocketWithPoolTimeout( + mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration, +) (s *mongoSocket, err error) { var started time.Time var syncCount uint - warnedLimit := false for { cluster.RLock() for { @@ -658,14 +670,10 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout continue } - s, abended, err := server.AcquireSocket(poolLimit, socketTimeout) - if err == errPoolLimit { - if !warnedLimit { - warnedLimit = true - log("WARNING: Per-server connection limit reached.") - } - time.Sleep(100 * time.Millisecond) - continue + s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout) + if err == errPoolTimeout { + // No need to remove servers from the topology if acquiring a socket fails for this reason. + return nil, err } if err != nil { cluster.removeServer(server) diff --git a/cluster_test.go b/cluster_test.go index a0a197048..be11dc1a7 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1583,6 +1583,9 @@ func (s *S) TestPoolLimitSimple(c *C) { } defer session.Close() + // So we can measure the stats for the blocking operation + mgo.ResetStats() + // Put one socket in use. c.Assert(session.Ping(), IsNil) @@ -1603,6 +1606,11 @@ func (s *S) TestPoolLimitSimple(c *C) { session.Refresh() delay := <-done c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) + stats := mgo.GetStats() + c.Assert(stats.TimesSocketAcquired, Equals, 2) + c.Assert(stats.TimesWaitedForPool, Equals, 1) + c.Assert(stats.PoolTimeouts, Equals, 0) + c.Assert(stats.TotalPoolWaitTime > 300*time.Millisecond, Equals, true) } } @@ -1649,6 +1657,40 @@ func (s *S) TestPoolLimitMany(c *C) { c.Assert(delay < 6e9, Equals, true) } +func (s *S) TestPoolLimitTimeout(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + session.SetPoolTimeout(1 * time.Second) + session.SetPoolLimit(1) + + mgo.ResetStats() + + // Put one socket in use. + c.Assert(session.Ping(), IsNil) + + // Now block trying to get another one due to the pool limit. + copy := session.Copy() + defer copy.Close() + started := time.Now() + err = copy.Ping() + delay := time.Since(started) + + c.Assert(delay > 900*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) + c.Assert(delay < 1100*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) + c.Assert(strings.Contains(err.Error(), "could not acquire connection within pool timeout"), Equals, true, Commentf("Error: %s", err)) + stats := mgo.GetStats() + c.Assert(stats.PoolTimeouts, Equals, 1) + c.Assert(stats.TimesSocketAcquired, Equals, 1) + c.Assert(stats.TimesWaitedForPool, Equals, 1) + c.Assert(stats.TotalPoolWaitTime > 900*time.Millisecond, Equals, true) + c.Assert(stats.TotalPoolWaitTime < 1100*time.Millisecond, Equals, true) +} + func (s *S) TestSetModeEventualIterBug(c *C) { session1, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) diff --git a/coarse_time.go b/coarse_time.go new file mode 100644 index 000000000..e54dd17cf --- /dev/null +++ b/coarse_time.go @@ -0,0 +1,62 @@ +package mgo + +import ( + "sync" + "sync/atomic" + "time" +) + +// coarseTimeProvider provides a periodically updated (approximate) time value to +// amortise the cost of frequent calls to time.Now. +// +// A read throughput increase of ~6% was measured when using coarseTimeProvider with the +// high-precision event timer (HPET) on FreeBSD 11.1 and Go 1.10.1 after merging +// #116. +// +// Calling Now returns a time.Time that is updated at the configured interval, +// however due to scheduling the value may be marginally older than expected. +// +// coarseTimeProvider is safe for concurrent use. +type coarseTimeProvider struct { + once sync.Once + stop chan struct{} + last atomic.Value +} + +// Now returns the most recently acquired time.Time value. +func (t *coarseTimeProvider) Now() time.Time { + return t.last.Load().(time.Time) +} + +// Close stops the periodic update of t. +// +// Any subsequent calls to Now will return the same value forever. +func (t *coarseTimeProvider) Close() { + t.once.Do(func() { + close(t.stop) + }) +} + +// newcoarseTimeProvider returns a coarseTimeProvider configured to update at granularity. +func newcoarseTimeProvider(granularity time.Duration) *coarseTimeProvider { + t := &coarseTimeProvider{ + stop: make(chan struct{}), + } + + t.last.Store(time.Now()) + + go func() { + ticker := time.NewTicker(granularity) + for { + select { + case <-t.stop: + ticker.Stop() + return + case <-ticker.C: + t.last.Store(time.Now()) + } + } + }() + + return t +} diff --git a/coarse_time_test.go b/coarse_time_test.go new file mode 100644 index 000000000..5c98fc65d --- /dev/null +++ b/coarse_time_test.go @@ -0,0 +1,23 @@ +package mgo + +import ( + "testing" + "time" +) + +func TestCoarseTimeProvider(t *testing.T) { + t.Parallel() + + const granularity = 50 * time.Millisecond + + ct := newcoarseTimeProvider(granularity) + defer ct.Close() + + start := ct.Now().Unix() + time.Sleep(time.Second) + + got := ct.Now().Unix() + if got <= start { + t.Fatalf("got %d, expected at least %d", got, start) + } +} diff --git a/dbtest/dbserver.go b/dbtest/dbserver.go index b74280801..2fadaf764 100644 --- a/dbtest/dbserver.go +++ b/dbtest/dbserver.go @@ -69,6 +69,8 @@ func (dbs *DBServer) start() { dbs.server.Stderr = &dbs.output err = dbs.server.Start() if err != nil { + // print error to facilitate troubleshooting as the panic will be caught in a panic handler + fmt.Fprintf(os.Stderr, "mongod failed to start: %v\n",err) panic(err) } dbs.tomb.Go(dbs.monitor) diff --git a/dbtest/dbserver_test.go b/dbtest/dbserver_test.go index b3cc45a8a..e3abb1817 100644 --- a/dbtest/dbserver_test.go +++ b/dbtest/dbserver_test.go @@ -77,9 +77,7 @@ func (s *S) TestStop(c *C) { server.Stop() // Server should not be running anymore. - session, err = mgo.DialWithTimeout(addr, 500*time.Millisecond) - c.Assert(err, IsNil) - + session, _ = mgo.DialWithTimeout(addr, 500*time.Millisecond) if session != nil { session.Close() c.Fatalf("Stop did not stop the server") diff --git a/doc.go b/doc.go index 859fd9b8d..f3f373bf4 100644 --- a/doc.go +++ b/doc.go @@ -1,9 +1,8 @@ -// Package mgo offers a rich MongoDB driver for Go. +// Package mgo (pronounced as "mango") offers a rich MongoDB driver for Go. // -// Details about the mgo project (pronounced as "mango") are found -// in its web page: +// Detailed documentation of the API is available at GoDoc: // -// http://labix.org/mgo +// https://godoc.org/github.com/globalsign/mgo // // Usage of the driver revolves around the concept of sessions. To // get started, obtain a session using the Dial function: @@ -26,6 +25,11 @@ // of its life time, so its resources may be put back in the pool or // collected, depending on the case. // +// There is a sub-package that provides support for BSON, which can be +// used by itself as well: +// +// https://godoc.org/github.com/globalsign/mgo/bson +// // For more details, see the documentation for the types and methods. // package mgo diff --git a/example_test.go b/example_test.go index bf7982a46..d176d5f5c 100644 --- a/example_test.go +++ b/example_test.go @@ -90,12 +90,12 @@ func ExampleCredential_x509Authentication() { func ExampleSession_concurrency() { // This example shows the best practise for concurrent use of a mgo session. - // + // // Internally mgo maintains a connection pool, dialling new connections as - // required. - // + // required. + // // Some general suggestions: - // - Define a struct holding the original session, database name and + // - Define a struct holding the original session, database name and // collection name instead of passing them explicitly. // - Define an interface abstracting your data access instead of exposing // mgo to your application code directly. @@ -107,7 +107,7 @@ func ExampleSession_concurrency() { // Copy the session - if needed this will dial a new connection which // can later be reused. - // + // // Calling close returns the connection to the pool. conn := session.Copy() defer conn.Close() @@ -133,4 +133,31 @@ func ExampleSession_concurrency() { wg.Wait() session.Close() -} \ No newline at end of file +} + +func ExampleDial_usingSSL() { + // To connect via TLS/SSL (enforced for MongoDB Atlas for example) requires + // configuring the dialer to use a TLS connection: + url := "mongodb://localhost:40003" + + tlsConfig := &tls.Config{ + // This can be configured to use a private root CA - see the Credential + // x509 Authentication example. + // + // Please don't set InsecureSkipVerify to true - it makes using TLS + // pointless and is never the right answer! + } + + dialInfo, err := ParseURL(url) + dialInfo.DialServer = func(addr *ServerAddr) (net.Conn, error) { + return tls.Dial("tcp", addr.String(), tlsConfig) + } + + session, err := DialWithInfo(dialInfo) + if err != nil { + panic(err) + } + + // Use session as normal + session.Close() +} diff --git a/server.go b/server.go index 7ad955255..f34624f74 100644 --- a/server.go +++ b/server.go @@ -36,6 +36,18 @@ import ( "github.com/globalsign/mgo/bson" ) +// coarseTime is used to amortise the cost of querying the timecounter (possibly +// incurring a syscall too) when setting a socket.lastTimeUsed value which +// happens frequently in the hot-path. +// +// The lastTimeUsed value may be skewed by at least 25ms (see +// coarseTimeProvider). +var coarseTime *coarseTimeProvider + +func init() { + coarseTime = newcoarseTimeProvider(25 * time.Millisecond) +} + // --------------------------------------------------------------------------- // Mongo server encapsulation. @@ -55,6 +67,9 @@ type mongoServer struct { pingCount uint32 closed bool abended bool + minPoolSize int + maxIdleTimeMS int + poolWaiter *sync.Cond } type dialer struct { @@ -76,21 +91,28 @@ type mongoServerInfo struct { var defaultServerInfo mongoServerInfo -func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer { +func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { server := &mongoServer{ - Addr: addr, - ResolvedAddr: tcpaddr.String(), - tcpaddr: tcpaddr, - sync: sync, - dial: dial, - info: &defaultServerInfo, - pingValue: time.Hour, // Push it back before an actual ping. + Addr: addr, + ResolvedAddr: tcpaddr.String(), + tcpaddr: tcpaddr, + sync: syncChan, + dial: dial, + info: &defaultServerInfo, + pingValue: time.Hour, // Push it back before an actual ping. + minPoolSize: minPoolSize, + maxIdleTimeMS: maxIdleTimeMS, } + server.poolWaiter = sync.NewCond(server) go server.pinger(true) + if maxIdleTimeMS != 0 { + go server.poolShrinker() + } return server } var errPoolLimit = errors.New("per-server connection limit reached") +var errPoolTimeout = errors.New("could not acquire connection within pool timeout") var errServerClosed = errors.New("server was closed") // AcquireSocket returns a socket for communicating with the server. @@ -102,6 +124,21 @@ var errServerClosed = errors.New("server was closed") // use in this server is greater than the provided limit, errPoolLimit is // returned. func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) { + return server.acquireSocketInternal(poolLimit, timeout, false, 0*time.Millisecond) +} + +// AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_ +// return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout +// should elapse before a socket is available, it will return errPoolTimeout. +func (server *mongoServer) AcquireSocketWithBlocking( + poolLimit int, socketTimeout time.Duration, poolTimeout time.Duration, +) (socket *mongoSocket, abended bool, err error) { + return server.acquireSocketInternal(poolLimit, socketTimeout, true, poolTimeout) +} + +func (server *mongoServer) acquireSocketInternal( + poolLimit int, timeout time.Duration, shouldBlock bool, poolTimeout time.Duration, +) (socket *mongoSocket, abended bool, err error) { for { server.Lock() abended = server.abended @@ -109,11 +146,58 @@ func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) ( server.Unlock() return nil, abended, errServerClosed } - n := len(server.unusedSockets) - if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit { - server.Unlock() - return nil, false, errPoolLimit + if poolLimit > 0 { + if shouldBlock { + // Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout + // with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout, + // and fail if it is blown. + // Yes, this is a spurious wakeup, but we can't do a directed signal without having one condition + // variable per waiter, which would involve loop traversal in the RecycleSocket + // method. + // We also can't use the approach of turning a condition variable into a channel outlined in + // https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine. + waitDone := make(chan struct{}) + timeoutHit := false + if poolTimeout > 0 { + go func() { + select { + case <-waitDone: + case <-time.After(poolTimeout): + // timeoutHit is part of the wait condition, so needs to be changed under mutex. + server.Lock() + defer server.Unlock() + timeoutHit = true + server.poolWaiter.Broadcast() + } + }() + } + timeSpentWaiting := time.Duration(0) + for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit { + // We only count time spent in Wait(), and not time evaluating the entire loop, + // so that in the happy non-blocking path where the condition above evaluates true + // first time, we record a nice round zero wait time. + waitStart := time.Now() + // unlocks server mutex, waits, and locks again. Thus, the condition + // above is evaluated only when the lock is held. + server.poolWaiter.Wait() + timeSpentWaiting += time.Since(waitStart) + } + close(waitDone) + if timeoutHit { + server.Unlock() + stats.noticePoolTimeout(timeSpentWaiting) + return nil, false, errPoolTimeout + } + // Record that we fetched a connection of of a socket list and how long we spent waiting + stats.noticeSocketAcquisition(timeSpentWaiting) + } else { + if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit { + server.Unlock() + return nil, false, errPoolLimit + } + } } + n := len(server.unusedSockets) if n > 0 { socket = server.unusedSockets[n-1] server.unusedSockets[n-1] = nil // Help GC. @@ -221,8 +305,17 @@ func (server *mongoServer) close(waitForIdle bool) { func (server *mongoServer) RecycleSocket(socket *mongoSocket) { server.Lock() if !server.closed { + socket.lastTimeUsed = coarseTime.Now() // A rough approximation of the current time - see courseTime server.unusedSockets = append(server.unusedSockets, socket) } + // If anybody is waiting for a connection, they should try now. + // Note that this _has_ to be broadcast, not signal; the signature of AcquireSocket + // and AcquireSocketWithBlocking allow the caller to specify the max number of connections, + // rather than that being an intrinsic property of the connection pool (I assume to ensure + // that there is always a connection available for replset topology discovery). Thus, once + // a connection is returned to the pool, _every_ waiter needs to check if the connection count + // is underneath their particular value for poolLimit. + server.poolWaiter.Broadcast() server.Unlock() } @@ -346,6 +439,53 @@ func (server *mongoServer) pinger(loop bool) { } } +func (server *mongoServer) poolShrinker() { + ticker := time.NewTicker(1 * time.Minute) + for _ = range ticker.C { + if server.closed { + ticker.Stop() + return + } + server.Lock() + unused := len(server.unusedSockets) + if unused < server.minPoolSize { + server.Unlock() + continue + } + now := time.Now() + end := 0 + reclaimMap := map[*mongoSocket]struct{}{} + // Because the acquisition and recycle are done at the tail of array, + // the head is always the oldest unused socket. + for _, s := range server.unusedSockets[:unused-server.minPoolSize] { + if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) { + break + } + end++ + reclaimMap[s] = struct{}{} + } + tbr := server.unusedSockets[:end] + if end > 0 { + next := make([]*mongoSocket, unused-end) + copy(next, server.unusedSockets[end:]) + server.unusedSockets = next + remainSockets := []*mongoSocket{} + for _, s := range server.liveSockets { + if _, ok := reclaimMap[s]; !ok { + remainSockets = append(remainSockets, s) + } + } + server.liveSockets = remainSockets + stats.conn(-1*end, server.info.Master) + } + server.Unlock() + + for _, s := range tbr { + s.Close() + } + } +} + type mongoServerSlice []*mongoServer func (s mongoServerSlice) Len() int { diff --git a/session.go b/session.go index 0fae61498..5b98154f1 100644 --- a/session.go +++ b/session.go @@ -92,6 +92,7 @@ type Session struct { syncTimeout time.Duration sockTimeout time.Duration poolLimit int + poolTimeout time.Duration consistency Mode creds []Credential dialCred *Credential @@ -271,6 +272,16 @@ const ( // Defines the per-server socket pool limit. Defaults to 4096. // See Session.SetPoolLimit for details. // +// minPoolSize= +// +// Defines the per-server socket pool minium size. Defaults to 0. +// +// maxIdleTimeMS= +// +// The maximum number of milliseconds that a connection can remain idle in the pool +// before being removed and closed. If maxIdleTimeMS is 0, connections will never be +// closed due to inactivity. +// // appName= // // The identifier of this client application. This parameter is used to @@ -322,6 +333,8 @@ func ParseURL(url string) (*DialInfo, error) { appName := "" readPreferenceMode := Primary var readPreferenceTagSets []bson.D + minPoolSize := 0 + maxIdleTimeMS := 0 for _, opt := range uinfo.options { switch opt.key { case "authSource": @@ -368,6 +381,22 @@ func ParseURL(url string) (*DialInfo, error) { doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])}) } readPreferenceTagSets = append(readPreferenceTagSets, doc) + case "minPoolSize": + minPoolSize, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for minPoolSize: " + opt.value) + } + if minPoolSize < 0 { + return nil, errors.New("bad value (negtive) for minPoolSize: " + opt.value) + } + case "maxIdleTimeMS": + maxIdleTimeMS, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value) + } + if maxIdleTimeMS < 0 { + return nil, errors.New("bad value (negtive) for maxIdleTimeMS: " + opt.value) + } case "connect": if opt.value == "direct" { direct = true @@ -402,6 +431,8 @@ func ParseURL(url string) (*DialInfo, error) { TagSets: readPreferenceTagSets, }, ReplicaSetName: setName, + MinPoolSize: minPoolSize, + MaxIdleTimeMS: maxIdleTimeMS, } return &info, nil } @@ -456,6 +487,11 @@ type DialInfo struct { // See Session.SetPoolLimit for details. PoolLimit int + // PoolTimeout defines max time to wait for a connection to become available + // if the pool limit is reaqched. Defaults to zero, which means forever. + // See Session.SetPoolTimeout for details + PoolTimeout time.Duration + // The identifier of the client application which ran the operation. AppName string @@ -475,6 +511,14 @@ type DialInfo struct { // cluster and establish connections with further servers too. Direct bool + // MinPoolSize defines The minimum number of connections in the connection pool. + // Defaults to 0. + MinPoolSize int + + //The maximum number of milliseconds that a connection can remain idle in the pool + // before being removed and closed. + MaxIdleTimeMS int + // DialServer optionally specifies the dial function for establishing // connections with the MongoDB servers. DialServer func(addr *ServerAddr) (net.Conn, error) @@ -554,6 +598,14 @@ func DialWithInfo(info *DialInfo) (*Session, error) { if info.PoolLimit > 0 { session.poolLimit = info.PoolLimit } + + cluster.minPoolSize = info.MinPoolSize + cluster.maxIdleTimeMS = info.MaxIdleTimeMS + + if info.PoolTimeout > 0 { + session.poolTimeout = info.PoolTimeout + } + cluster.Release() // People get confused when we return a session that is not actually @@ -669,6 +721,7 @@ func copySession(session *Session, keepCreds bool) (s *Session) { syncTimeout: session.syncTimeout, sockTimeout: session.sockTimeout, poolLimit: session.poolLimit, + poolTimeout: session.poolTimeout, consistency: session.consistency, creds: creds, dialCred: session.dialCred, @@ -2009,6 +2062,16 @@ func (s *Session) SetPoolLimit(limit int) { s.m.Unlock() } +// SetPoolTimeout sets the maxinum time connection attempts will wait to reuse +// an existing connection from the pool if the PoolLimit has been reached. If +// the value is exceeded, the attempt to use a session will fail with an error. +// The default value is zero, which means to wait forever with no timeout. +func (s *Session) SetPoolTimeout(timeout time.Duration) { + s.m.Lock() + s.poolTimeout = timeout + s.m.Unlock() +} + // SetBypassValidation sets whether the server should bypass the registered // validation expressions executed when documents are inserted or modified, // in the interest of preserving invariants in the collection being modified. @@ -2447,6 +2510,7 @@ type Pipe struct { allowDisk bool batchSize int maxTimeMS int64 + collation *Collation } type pipeCmd struct { @@ -2456,6 +2520,7 @@ type pipeCmd struct { Explain bool `bson:",omitempty"` AllowDisk bool `bson:"allowDiskUse,omitempty"` MaxTimeMS int64 `bson:"maxTimeMS,omitempty"` + Collation *Collation `bson:"collation,omitempty"` } type pipeCmdCursor struct { @@ -2476,6 +2541,7 @@ type pipeCmdCursor struct { // http://docs.mongodb.org/manual/applications/aggregation // http://docs.mongodb.org/manual/tutorial/aggregation-examples // + func (c *Collection) Pipe(pipeline interface{}) *Pipe { session := c.Database.Session session.m.RLock() @@ -2509,6 +2575,7 @@ func (p *Pipe) Iter() *Iter { Pipeline: p.pipeline, AllowDisk: p.allowDisk, Cursor: &pipeCmdCursor{p.batchSize}, + Collation: p.collation, } if p.maxTimeMS > 0 { cmd.MaxTimeMS = p.maxTimeMS @@ -2698,6 +2765,23 @@ func (p *Pipe) SetMaxTime(d time.Duration) *Pipe { return p } + +// Collation allows to specify language-specific rules for string comparison, +// such as rules for lettercase and accent marks. +// When specifying collation, the locale field is mandatory; all other collation +// fields are optional +// +// Relevant documentation: +// +// https://docs.mongodb.com/manual/reference/collation/ +// +func (p *Pipe) Collation(collation *Collation) *Pipe { + if collation != nil { + p.collation = collation + } + return p +} + // LastError the error status of the preceding write operation on the current connection. // // Relevant documentation: @@ -4866,7 +4950,9 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) { } // Still not good. We need a new socket. - sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit) + sock, err := s.cluster().AcquireSocketWithPoolTimeout( + s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit, s.poolTimeout, + ) if err != nil { return nil, err } diff --git a/session_test.go b/session_test.go index eb2c812b3..14cb9b1a6 100644 --- a/session_test.go +++ b/session_test.go @@ -30,11 +30,13 @@ import ( "flag" "fmt" "math" + "math/rand" "os" "runtime" "sort" "strconv" "strings" + "sync" "testing" "time" @@ -166,6 +168,90 @@ func (s *S) TestURLInvalidReadPreference(c *C) { } } +func (s *S) TestMinPoolSize(c *C) { + tests := []struct { + url string + size int + fail bool + }{ + {"localhost:40001?minPoolSize=0", 0, false}, + {"localhost:40001?minPoolSize=1", 1, false}, + {"localhost:40001?minPoolSize=-1", -1, true}, + {"localhost:40001?minPoolSize=-.", 0, true}, + } + for _, test := range tests { + info, err := mgo.ParseURL(test.url) + if test.fail { + c.Assert(err, NotNil) + } else { + c.Assert(err, IsNil) + c.Assert(info.MinPoolSize, Equals, test.size) + } + } +} + +func (s *S) TestMaxIdleTimeMS(c *C) { + tests := []struct { + url string + size int + fail bool + }{ + {"localhost:40001?maxIdleTimeMS=0", 0, false}, + {"localhost:40001?maxIdleTimeMS=1", 1, false}, + {"localhost:40001?maxIdleTimeMS=-1", -1, true}, + {"localhost:40001?maxIdleTimeMS=-.", 0, true}, + } + for _, test := range tests { + info, err := mgo.ParseURL(test.url) + if test.fail { + c.Assert(err, NotNil) + } else { + c.Assert(err, IsNil) + c.Assert(info.MaxIdleTimeMS, Equals, test.size) + } + } +} + +func (s *S) TestPoolShrink(c *C) { + if *fast { + c.Skip("-fast") + } + oldSocket := mgo.GetStats().SocketsAlive + + session, err := mgo.Dial("localhost:40001?minPoolSize=1&maxIdleTimeMS=1000") + c.Assert(err, IsNil) + defer session.Close() + + parallel := 10 + res := make(chan error, parallel+1) + wg := &sync.WaitGroup{} + for i := 1; i < parallel; i++ { + wg.Add(1) + go func() { + s := session.Copy() + defer s.Close() + result := struct{}{} + err := s.Run("ping", &result) + + //sleep random time to make the allocate and release in different sequence + time.Sleep(time.Duration(rand.Intn(parallel)*100) * time.Millisecond) + res <- err + wg.Done() + }() + } + wg.Wait() + stats := mgo.GetStats() + c.Logf("living socket: After queries: %d, before queries: %d", stats.SocketsAlive, oldSocket) + + // give some time for shrink the pool, the tick is set to 1 minute + c.Log("Sleeping... 1 minute to for pool shrinking") + time.Sleep(60 * time.Second) + + stats = mgo.GetStats() + c.Logf("living socket: After shrinking: %d, at the beginning of the test: %d", stats.SocketsAlive, oldSocket) + c.Assert(stats.SocketsAlive-oldSocket > 1, Equals, false) +} + func (s *S) TestURLReadPreferenceTags(c *C) { type test struct { url string @@ -4531,6 +4617,36 @@ func (s *S) TestPipeExplain(c *C) { c.Assert(result.Ok, Equals, 1) } +func (s *S) TestPipeCollation(c *C) { + if !s.versionAtLeast(2, 1) { + c.Skip("Pipe only works on 2.1+") + } + if !s.versionAtLeast(3, 3, 12) { + c.Skip("collations being released with 3.4") + } + + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + beatles := []string{"John", "RINGO", "George", "Paul"} + for _, n := range beatles { + err := coll.Insert(M{"name": n}) + c.Assert(err, IsNil) + } + + collation := &mgo.Collation{ + Locale: "en", + Strength: 1, // ignore case/diacritics + } + var result []struct{ Name string } + err = coll.Pipe([]M{{"$match": M{"name": "ringo"}}}).Collation(collation).All(&result) + c.Assert(err, IsNil) + c.Assert(len(result), Equals, 1) + c.Assert(result[0].Name, Equals, "RINGO") +} + func (s *S) TestBatch1Bug(c *C) { session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) diff --git a/socket.go b/socket.go index a9124b043..ae13e401f 100644 --- a/socket.go +++ b/socket.go @@ -54,6 +54,7 @@ type mongoSocket struct { dead error serverInfo *mongoServerInfo closeAfterIdle bool + lastTimeUsed time.Time // for time based idle socket release sendMeta sync.Once } diff --git a/stats.go b/stats.go index dcbd01045..8cf4ecec1 100644 --- a/stats.go +++ b/stats.go @@ -28,6 +28,7 @@ package mgo import ( "sync" + "time" ) var stats *Stats @@ -77,15 +78,19 @@ func ResetStats() { // // TODO outdated fields ? type Stats struct { - Clusters int - MasterConns int - SlaveConns int - SentOps int - ReceivedOps int - ReceivedDocs int - SocketsAlive int - SocketsInUse int - SocketRefs int + Clusters int + MasterConns int + SlaveConns int + SentOps int + ReceivedOps int + ReceivedDocs int + SocketsAlive int + SocketsInUse int + SocketRefs int + TimesSocketAcquired int + TimesWaitedForPool int + TotalPoolWaitTime time.Duration + PoolTimeouts int } func (stats *Stats) cluster(delta int) { @@ -155,3 +160,25 @@ func (stats *Stats) socketRefs(delta int) { statsMutex.Unlock() } } + +func (stats *Stats) noticeSocketAcquisition(waitTime time.Duration) { + if stats != nil { + statsMutex.Lock() + stats.TimesSocketAcquired++ + stats.TotalPoolWaitTime += waitTime + if waitTime > 0 { + stats.TimesWaitedForPool++ + } + statsMutex.Unlock() + } +} + +func (stats *Stats) noticePoolTimeout(waitTime time.Duration) { + if stats != nil { + statsMutex.Lock() + stats.TimesWaitedForPool++ + stats.PoolTimeouts++ + stats.TotalPoolWaitTime += waitTime + statsMutex.Unlock() + } +}