diff --git a/changestreams.go b/changestreams.go index 75cd5b1d4..c19ded317 100644 --- a/changestreams.go +++ b/changestreams.go @@ -39,6 +39,37 @@ type ChangeStreamOptions struct { Collation *Collation } +// Watch constructs a new ChangeStream capable of receiving continuing data +// from the database. +func (coll *Collection) Watch(pipeline interface{}, + options ChangeStreamOptions) (*ChangeStream, error) { + + if pipeline == nil { + pipeline = []bson.M{} + } + + pipe := constructChangeStreamPipeline(pipeline, options) + + pIter := coll.Pipe(&pipe).Iter() + + // check that there was no issue creating the iterator. + // this will fail immediately with an error from the server if running against + // a standalone. + if err := pIter.Err(); err != nil { + return nil, err + } + + pIter.isChangeStream = true + + return &ChangeStream{ + iter: pIter, + collection: coll, + resumeToken: nil, + options: options, + pipeline: pipeline, + }, nil +} + // Next retrieves the next document from the change stream, blocking if necessary. // Next returns true if a document was successfully unmarshalled into result, // and false if an error occured. When Next returns false, the Err method should