Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changeStream support #97

Merged
merged 44 commits into from
Feb 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
de872a6
Merge branch 'master' into development
domodwyer Aug 8, 2017
1519fd3
Merge branch 'master' into development
domodwyer Aug 15, 2017
454da02
add DropAllIndexes() method (#25)
feliixx Aug 30, 2017
93aaa6e
readme: credit @feliixx for #25 (#26)
domodwyer Aug 30, 2017
165af68
send metadata during handshake (#28)
feliixx Sep 6, 2017
a76b1a0
Update README to add appName (#32)
domodwyer Sep 6, 2017
25200e4
add method CreateView() (#33)
feliixx Sep 11, 2017
1f4c10f
readme: credit @feliixx in the README (#36)
domodwyer Sep 11, 2017
934a190
Don't panic on indexed int64 fields (#23)
domodwyer Sep 11, 2017
b37e3c1
Add collation option to collection.Create() (#37)
feliixx Sep 13, 2017
10876f5
support the $changeStream aggregation in 3.6+
Aug 2, 2017
b82ca4c
changestreams: fix import path
domodwyer Sep 15, 2017
aead58f
Test against MongoDB 3.4.x (#35)
feliixx Sep 15, 2017
5b7a419
MGO-142 implement changestream status functions for err, close and
Sep 14, 2017
950ed5a
Introduce constants for BSON element types (#41)
bozaro Sep 20, 2017
d21a525
bson.Unmarshal returns time in UTC (#42)
Sep 28, 2017
9d743b4
readme: add missing features / credit
domodwyer Sep 28, 2017
c86ed84
Merge pull request #45 from globalsign/feature/update-readme
Sep 28, 2017
97bd0cd
fix golint, go vet and gofmt warnings (#44)
feliixx Oct 6, 2017
fd79249
readme: credit @feliixx (#46)
domodwyer Oct 9, 2017
dba7b4c
Fix GetBSON() method usage (#40)
bozaro Oct 11, 2017
12fb1c2
readme: credit @bozaro (#47)
domodwyer Oct 11, 2017
199dc25
Improve cursorData struct unmarshaling speed (#49)
bozaro Oct 19, 2017
345ab0b
readme: credit @bozaro and @idy (#53)
domodwyer Oct 19, 2017
0454966
do not lock while writing to a socket (#52) (#54)
domodwyer Oct 19, 2017
663dfe5
Add proper DN construction (#55)
csucu Nov 2, 2017
7cd0b89
reduce memory allocation in bulk op (#56)
feliixx Nov 3, 2017
ea8e8e6
readme: credit @feliixx (#58)
domodwyer Nov 6, 2017
90c056c
MongoDB 3.6: implement the new wire protocol (#61)
feliixx Dec 12, 2017
1ac9b5d
Merge branch 'master' into development
domodwyer Dec 12, 2017
a104bfb
Recover from persistent "i/o timeout" or "Closed explicitly" pool err…
bachue Dec 27, 2017
f9be6c5
development: revert #61 (#73)
domodwyer Jan 9, 2018
138ba2f
readme: credit @bachue (#74)
domodwyer Jan 9, 2018
9acbd68
auth: add an example for x509 authentication (#75)
domodwyer Jan 9, 2018
eeedc17
session: add example concurrent usage (#78)
domodwyer Jan 15, 2018
88cedcd
Brings in a patch on having flusher not suppress errors. (#81)
jameinel Jan 25, 2018
90ad51b
Fallback to JSON tags when BSON tag isn't present (#91)
steve-gray Jan 31, 2018
9b03c58
Merge branch 'development' into feature/changestream
peterdeka Feb 6, 2018
4eb6ac9
Added maxAwaitTimeMS support to ChangeStream so Next() times out. Add…
peterdeka Feb 6, 2018
467c79f
Enabled journaling in test harness db config that made the ChangeStre…
peterdeka Feb 6, 2018
c8cbfa8
Better refactored harness daemons env file --nojournal removal. Added…
peterdeka Feb 6, 2018
5534387
Changed stream iterator timeout handling from driver to DB
peterdeka Feb 8, 2018
f341232
Merge branch 'development' into feature/changestream
peterdeka Feb 12, 2018
cebee9a
Renamed Pipe MaxTimeMS to SetMaxTime and changed parameter to a time.…
peterdeka Feb 14, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
357 changes: 357 additions & 0 deletions changestreams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,357 @@
package mgo

import (
"errors"
"fmt"
"reflect"
"sync"
"time"

"github.com/globalsign/mgo/bson"
)

type FullDocument string

const (
Default = "default"
UpdateLookup = "updateLookup"
)

type ChangeStream struct {
iter *Iter
isClosed bool
options ChangeStreamOptions
pipeline interface{}
resumeToken *bson.Raw
collection *Collection
readPreference *ReadPreference
err error
m sync.Mutex
sessionCopied bool
}

type ChangeStreamOptions struct {

// FullDocument controls the amount of data that the server will return when
// returning a changes document.
FullDocument FullDocument

// ResumeAfter specifies the logical starting point for the new change stream.
ResumeAfter *bson.Raw

// MaxAwaitTimeMS specifies the maximum amount of time for the server to wait
// on new documents to satisfy a change stream query.
MaxAwaitTimeMS time.Duration

// BatchSize specifies the number of documents to return per batch.
BatchSize int

// Collation specifies the way the server should collate returned data.
//TODO Collation *Collation
}

var errMissingResumeToken = errors.New("resume token missing from result")

// Watch constructs a new ChangeStream capable of receiving continuing data
// from the database.
func (coll *Collection) Watch(pipeline interface{},
options ChangeStreamOptions) (*ChangeStream, error) {

if pipeline == nil {
pipeline = []bson.M{}
}

csPipe := constructChangeStreamPipeline(pipeline, options)
pipe := coll.Pipe(&csPipe)
if options.MaxAwaitTimeMS > 0 {
pipe.SetMaxTime(options.MaxAwaitTimeMS)
}
if options.BatchSize > 0 {
pipe.Batch(options.BatchSize)
}
pIter := pipe.Iter()

// check that there was no issue creating the iterator.
// this will fail immediately with an error from the server if running against
// a standalone.
if err := pIter.Err(); err != nil {
return nil, err
}

pIter.isChangeStream = true
return &ChangeStream{
iter: pIter,
collection: coll,
resumeToken: nil,
options: options,
pipeline: pipeline,
}, nil
}

// Next retrieves the next document from the change stream, blocking if necessary.
// Next returns true if a document was successfully unmarshalled into result,
// and false if an error occured. When Next returns false, the Err method should
// be called to check what error occurred during iteration. If there were no events
// available (ErrNotFound), the Err method returns nil so the user can retry the invocaton.
//
// For example:
//
// pipeline := []bson.M{}
//
// changeStream := collection.Watch(pipeline, ChangeStreamOptions{})
// for changeStream.Next(&changeDoc) {
// fmt.Printf("Change: %v\n", changeDoc)
// }
//
// if err := changeStream.Close(); err != nil {
// return err
// }
//
// If the pipeline used removes the _id field from the result, Next will error
// because the _id field is needed to resume iteration when an error occurs.
//
func (changeStream *ChangeStream) Next(result interface{}) bool {
// the err field is being constantly overwritten and we don't want the user to
// attempt to read it at this point so we lock.
changeStream.m.Lock()

defer changeStream.m.Unlock()

// if we are in a state of error, then don't continue.
if changeStream.err != nil {
return false
}

if changeStream.isClosed {
changeStream.err = fmt.Errorf("illegal use of a closed ChangeStream")
return false
}

var err error

// attempt to fetch the change stream result.
err = changeStream.fetchResultSet(result)
if err == nil {
return true
}

// if we get no results we return false with no errors so the user can call Next
// again, resuming is not needed as the iterator is simply timed out as no events happened.
// The user will call Timeout in order to understand if this was the case.
if err == ErrNotFound {
return false
}

// check if the error is resumable
if !isResumableError(err) {
// error is not resumable, give up and return it to the user.
changeStream.err = err
return false
}

// try to resume.
err = changeStream.resume()
if err != nil {
// we've not been able to successfully resume and should only try once,
// so we give up.
changeStream.err = err
return false
}

// we've successfully resumed the changestream.
// try to fetch the next result.
err = changeStream.fetchResultSet(result)
if err != nil {
changeStream.err = err
return false
}

return true
}

// Err returns nil if no errors happened during iteration, or the actual
// error otherwise.
func (changeStream *ChangeStream) Err() error {
changeStream.m.Lock()
defer changeStream.m.Unlock()
return changeStream.err
}

// Close kills the server cursor used by the iterator, if any, and returns
// nil if no errors happened during iteration, or the actual error otherwise.
func (changeStream *ChangeStream) Close() error {
changeStream.m.Lock()
defer changeStream.m.Unlock()
changeStream.isClosed = true
err := changeStream.iter.Close()
if err != nil {
changeStream.err = err
}
if changeStream.sessionCopied {
changeStream.iter.session.Close()
changeStream.sessionCopied = false
}
return err
}

// ResumeToken returns a copy of the current resume token held by the change stream.
// This token should be treated as an opaque token that can be provided to instantiate
// a new change stream.
func (changeStream *ChangeStream) ResumeToken() *bson.Raw {
changeStream.m.Lock()
defer changeStream.m.Unlock()
if changeStream.resumeToken == nil {
return nil
}
var tokenCopy = *changeStream.resumeToken
return &tokenCopy
}

// Timeout returns true if the last call of Next returned false because of an iterator timeout.
func (changeStream *ChangeStream) Timeout() bool {
return changeStream.iter.Timeout()
}

func constructChangeStreamPipeline(pipeline interface{},
options ChangeStreamOptions) interface{} {
pipelinev := reflect.ValueOf(pipeline)

// ensure that the pipeline passed in is a slice.
if pipelinev.Kind() != reflect.Slice {
panic("pipeline argument must be a slice")
}

// construct the options to be used by the change notification
// pipeline stage.
changeStreamStageOptions := bson.M{}

if options.FullDocument != "" {
changeStreamStageOptions["fullDocument"] = options.FullDocument
}
if options.ResumeAfter != nil {
changeStreamStageOptions["resumeAfter"] = options.ResumeAfter
}

changeStreamStage := bson.M{"$changeStream": changeStreamStageOptions}

pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1)

// insert the change notification pipeline stage at the beginning of the
// aggregation.
pipeOfInterfaces[0] = changeStreamStage

// convert the passed in slice to a slice of interfaces.
for i := 0; i < pipelinev.Len(); i++ {
pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface()
}
var pipelineAsInterface interface{} = pipeOfInterfaces
return pipelineAsInterface
}

func (changeStream *ChangeStream) resume() error {
// copy the information for the new socket.

// Thanks to Copy() future uses will acquire a new socket against the newly selected DB.
newSession := changeStream.iter.session.Copy()

// fetch the cursor from the iterator and use it to run a killCursors
// on the connection.
cursorId := changeStream.iter.op.cursorId
err := runKillCursorsOnSession(newSession, cursorId)
if err != nil {
return err
}

// change out the old connection to the database with the new connection.
if changeStream.sessionCopied {
changeStream.collection.Database.Session.Close()
}
changeStream.collection.Database.Session = newSession
changeStream.sessionCopied = true

opts := changeStream.options
if changeStream.resumeToken != nil {
opts.ResumeAfter = changeStream.resumeToken
}
// make a new pipeline containing the resume token.
changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts)

// generate the new iterator with the new connection.
newPipe := changeStream.collection.Pipe(changeStreamPipeline)
changeStream.iter = newPipe.Iter()
if err := changeStream.iter.Err(); err != nil {
return err
}
changeStream.iter.isChangeStream = true
return nil
}

// fetchResumeToken unmarshals the _id field from the document, setting an error
// on the changeStream if it is unable to.
func (changeStream *ChangeStream) fetchResumeToken(rawResult *bson.Raw) error {
changeStreamResult := struct {
ResumeToken *bson.Raw `bson:"_id,omitempty"`
}{}

err := rawResult.Unmarshal(&changeStreamResult)
if err != nil {
return err
}

if changeStreamResult.ResumeToken == nil {
return errMissingResumeToken
}

changeStream.resumeToken = changeStreamResult.ResumeToken
return nil
}

func (changeStream *ChangeStream) fetchResultSet(result interface{}) error {
rawResult := bson.Raw{}

// fetch the next set of documents from the cursor.
gotNext := changeStream.iter.Next(&rawResult)
err := changeStream.iter.Err()
if err != nil {
return err
}

if !gotNext && err == nil {
// If the iter.Err() method returns nil despite us not getting a next batch,
// it is becuase iter.Err() silences this case.
return ErrNotFound
}

// grab the resumeToken from the results
if err := changeStream.fetchResumeToken(&rawResult); err != nil {
return err
}

// put the raw results into the data structure the user provided.
if err := rawResult.Unmarshal(result); err != nil {
return err
}
return nil
}

func isResumableError(err error) bool {
_, isQueryError := err.(*QueryError)
// if it is not a database error OR it is a database error,
// but the error is a notMaster error
//and is not a missingResumeToken error (caused by the user provided pipeline)
return (!isQueryError || isNotMasterError(err)) && (err != errMissingResumeToken)
}

func runKillCursorsOnSession(session *Session, cursorId int64) error {
socket, err := session.acquireSocket(true)
if err != nil {
return err
}
err = socket.Query(&killCursorsOp{[]int64{cursorId}})
if err != nil {
return err
}
socket.Release()

return nil
}
Loading