Skip to content

Commit

Permalink
feat(change-streams): add support for change streams
Browse files Browse the repository at this point in the history
* Adding collection support

* Setting up basic collections.changes() support

* Adding pipeline support

* Tidying up, adding DB support

* Now showing only messages recieved after the current time

* Fixing bugs in timestamp and database support

* Getting aggregation pipeline support to work

* Refactoring to use promises

* Removing co dependency

* Changing to use Readable stream

* Tidying

* Rearranging arguments

* Error handling and alerts when oplogStream closes

* Using ChangeStream

* Starting ChangeStream tests

* Renaming function

* Creeating close() and resumeToken() menthods

* Tidying console log

* Getting imperative form working

* Linting and tidying up

* Extending test coverage

* Testing closing works properly

* Tidying

* Adding JSDoc details

* Fixing package.json

* Increasing modularity and documentation

* Re-enabling simulator

* Caching resume tokens and erroring if projected out

* Enforcing semicolons

* Support cursor invalidation and testing resume token caching

NODE-1063

* Making resumability work

* Testing resumability using callbacks

* Adding promise test

* Updating to use assert.ifError()

* Using crypto to generate simulator noConflict tokens

* Testing resumeAfter behaviour

* Testing invalidate behaviour

* Fixing minor bugs

* Workaround for bug with unknown cause

* Update to use watch() instead of changes()

* Adding function definition

* Uppercase supportedStages

* JSdoc for .watch()

* Remove resumeToken() method

* Impove input validation

* Change eslint to warn when semicolon not used

* Improve Change Stream documentation

* Support full document lookup

NODE-1059

* Isolate noConflict inside simulator

* Use instanceof to check for errors and replset

* Improve modularity

* Improve tests

* remove disableResume support

* Simplify logic and use instanceof

* Tidy resumeToken

* Restructure aggregation command options

* Connect to real server

* Removing simulated MongoNetworkError support, using Kill instead

* Improve resumability and read preference support

* Renaming changeNotification to change

* Improve stream compatibility and documentation

* Basic stream support

* Document stream method

* Testing for multiple pipes

* Remove Change Stream on Database support

* Improve test reliability and coverage

* Support .resume(), .pause() and resumability

* Make cursorOptions top-level constant

* Remove database support

* add test for multiple pipes

* Remove Db.watch() in tests

* Improve test coverage

* Tweaking Travis config

* Re-enable testing against multiple MongoDB versions

* temporarily use patched topology manager

* Revert "temporarily use patched topology manager"
  • Loading branch information
Sebastian Hallum Clarke authored and mbroadst committed Aug 30, 2017
1 parent 6339625 commit c02d25c
Show file tree
Hide file tree
Showing 11 changed files with 7,552 additions and 5 deletions.
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"plugins": [
],
"rules": {
"no-console":0
"no-console": 0,
"semi": 1
}
}
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ env:
- MONGODB_VERSION=3.0.x
- MONGODB_VERSION=3.2.x
- MONGODB_VERSION=3.4.x
- MONGODB_VERSION=3.5.x
1 change: 1 addition & 0 deletions conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"test/functional/authentication_tests.js",
"test/functional/gridfs_stream_tests.js",
"lib/admin.js",
"lib/change_stream.js",
"lib/collection.js",
"lib/cursor.js",
"lib/aggregation_cursor.js",
Expand Down
337 changes: 337 additions & 0 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,337 @@
var EventEmitter = require('events'),
inherits = require('util').inherits,
MongoNetworkError = require('mongodb-core').MongoNetworkError;

var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference'];

/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @class ChangeStream
* @since 3.0.0
* @param {(Db|Collection)} changeDomain The collection against which to create the change stream
* @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
* @param {object} [options=null] Optional settings
* @param {string} [options.fullDocument=none] Allowed values: ‘none’, ‘lookup’. When set to ‘lookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
* @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
* @param {object} [options.resumeAfter=null] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
* @param {number} [options.batchSize=null] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
* @fires ChangeStream#close
* @fires ChangeStream#data
* @fires ChangeStream#end
* @fires ChangeStream#error
* @return {ChangeStream} a ChangeStream instance.
*/
var ChangeStream = function(collection, pipeline, options) {
var Collection = require('./collection');

// Ensure the provided collection is actually a collection
if (!(collection instanceof Collection)) {
throw new Error('collection provided to ChangeStream constructor is not an instance of Collection');
}

var self = this;
self.pipeline = pipeline || [];
self.options = options || {};
self.promiseLibrary = collection.s.promiseLibrary;

// Extract namespace and serverConfig from the collection
self.namespace = {
collection: collection.collectionName,
database: collection.s.db.databaseName
};

self.serverConfig = collection.s.db.serverConfig;

// Determine correct read preference
self.options.readPreference = self.options.readPreference || collection.s.readPreference;

// Create contained Change Stream cursor
self.cursor = createChangeStreamCursor(self);

// Listen for any data listeners being added to ChangeStream
self.on('newListener', function(eventName) {
if (eventName === 'data' && self.cursor && self.cursor.listenerCount('data') === 0) {
self.cursor.on('data', function (change) {
processNewChange(self, null, change);
});
}
});

// Listen for all data listeners being removed from ChangeStream
self.on('removeListener', function(eventName){
if (eventName === 'data' && self.listenerCount('data') === 0 && self.cursor) {
self.cursor.removeAllListeners('data');
}
});

};

// Create a new change stream cursor based on self's configuration
var createChangeStreamCursor = function (self) {
if (self.resumeToken) {
self.options.resumeAfter = self.resumeToken;
}

var changeStreamCursor =
buildChangeStreamAggregationCommand(self.serverConfig, self.namespace, self.pipeline, self.resumeToken, self.options);

/**
* Fired for each new matching change in the specified namespace. Attaching a 'data' event listener to a Change Stream will switch the stream into flowing mode. Data will then be passed as soon as it is available.
*
* @event ChangeStream#data
* @type {object}
*/
if (self.listenerCount('data') > 0) {
changeStreamCursor.on('data', function (change) {
processNewChange(self, null, change);
});
}

/**
* Change stream close event
*
* @event ChangeStream#close
* @type {null}
*/
changeStreamCursor.on('close', function () {
self.emit('close');
});

/**
* Change stream end event
*
* @event ChangeStream#end
* @type {null}
*/
changeStreamCursor.on('end', function () {
self.emit('end');
});

/**
* Fired when the stream encounters an error.
*
* @event ChangeStream#error
* @type {Error}
*/
changeStreamCursor.on('error', function(error) {
self.emit('error', error);
});

return changeStreamCursor;
};

var buildChangeStreamAggregationCommand = function (serverConfig, namespace, pipeline, resumeToken, options) {
var changeNotificationStageOptions = {};
if (options.fullDocument) {
changeNotificationStageOptions.fullDocument = options.fullDocument;
}

if (resumeToken || options.resumeAfter) {
changeNotificationStageOptions.resumeAfter = resumeToken || options.resumeAfter;
}

// Map cursor options
var cursorOptions = {};
cursorOptionNames.forEach(function(optionName) {
if (options[optionName]) {
cursorOptions[optionName] = options[optionName];
}
});

var changeStreamPipeline = [
{ $changeNotification: changeNotificationStageOptions }
];

changeStreamPipeline = changeStreamPipeline.concat(pipeline);

var command = {
aggregate : namespace.collection,
pipeline : changeStreamPipeline,
cursor: {
batchSize: options.batchSize || 1
}
};

// Create and return the cursor
return serverConfig.cursor(namespace.database + '.' + namespace.collection, command, cursorOptions);
};

/**
* Check if there is any document still available in the Change Stream
* @function ChangeStream.prototype.hasNext
* @param {ChangeStream~resultCallback} [callback] The result callback.
* @throws {MongoError}
* @return {Promise} returns Promise if no callback passed
*/
ChangeStream.prototype.hasNext = function (callback) {
return this.cursor.hasNext(callback);
};

/**
* Get the next available document from the Change Stream, returns null if no more documents are available.
* @function ChangeStream.prototype.next
* @param {ChangeStream~resultCallback} [callback] The result callback.
* @throws {MongoError}
* @return {Promise} returns Promise if no callback passed
*/
ChangeStream.prototype.next = function (callback) {
var self = this;
if (this.isClosed()) {
if (callback) return callback(new Error('Change Stream is not open.'), null);
return self.promiseLibrary.reject(new Error('Change Stream is not open.'));
}
return this.cursor.next().then(function(change) {
return processNewChange(self, null, change, callback);
}).catch(function(err) {
return processNewChange(self, err, null, callback);
});
};

/**
* Is the cursor closed
* @method ChangeStream.prototype.isClosed
* @return {boolean}
*/
ChangeStream.prototype.isClosed = function () {
if (this.cursor) {
return this.cursor.isClosed();
}
return true;
};

/**
* Close the Change Stream
* @method ChangeStream.prototype.close
* @param {ChangeStream~resultCallback} [callback] The result callback.
* @return {Promise} returns Promise if no callback passed
*/
ChangeStream.prototype.close = function (callback) {
if (!this.cursor) {
if (callback) return callback();
return this.promiseLibrary.resolve();
}

// Tidy up the existing cursor
var cursor = this.cursor;
delete this.cursor;
return cursor.close(callback);
};

/**
* This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
* @method
* @param {Writable} destination The destination for writing data
* @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options}
* @return {null}
*/
ChangeStream.prototype.pipe = function (destination, options) {
if (!this.pipeDestinations) {
this.pipeDestinations = [];
}
this.pipeDestinations.push(destination);
return this.cursor.pipe(destination, options);
};

/**
* This method will remove the hooks set up for a previous pipe() call.
* @param {Writable} [destination] The destination for writing data
* @return {null}
*/
ChangeStream.prototype.unpipe = function (destination) {
if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) {
this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1);
}
return this.cursor.unpipe(destination);
};

/**
* This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
* @return {null}
*/
ChangeStream.prototype.pause = function () {
return this.cursor.pause();
};

/**
* This method will cause the readable stream to resume emitting data events.
* @return {null}
*/
ChangeStream.prototype.resume = function () {
return this.cursor.resume();
};

/**
* Return a modified Readable stream including a possible transform method.
* @method
* @param {object} [options=null] Optional settings.
* @param {function} [options.transform=null] A transformation method applied to each document emitted by the stream.
* @return {Cursor}
*/
ChangeStream.prototype.stream = function (options) {
this.streamOptions = options;
return this.cursor.stream(options);
};

// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
var processNewChange = function (self, err, change, callback) {
// Handle errors
if (err) {
// Handle resumable MongoNetworkErrors
if (err instanceof MongoNetworkError && !self.attemptingResume) {
self.attemptingResume = true;
return self.cursor.close(function(closeErr) {
if (closeErr) {
if (callback) return callback(err, null);
return self.promiseLibrary.reject(err);
}

// Establish a new cursor
self.cursor = createChangeStreamCursor(self);

// Attempt to reconfigure piping
if (self.pipeDestinations) {
var cursorStream = self.cursor.stream(self.streamOptions);
for (var pipeDestination in self.pipeDestinations) {
cursorStream.pipe(pipeDestination);
}
}

// Attempt the next() operation again
if (callback) return self.next(callback);
return self.next();
});
}

if (typeof callback === 'function') return callback(err, null);
if (self.listenerCount('error')) return self.emit('error', err);
return self.promiseLibrary.reject(err);
}
self.attemptingResume = false;

// Cache the resume token if it is present. If it is not present return an error.
if (!change || !change._id) {
var noResumeTokenError = new Error('A change stream document has been recieved that lacks a resume token (_id).');
if (typeof callback === 'function') return callback(noResumeTokenError, null);
if (self.listenerCount('error')) return self.emit('error', noResumeTokenError);
return self.promiseLibrary.reject(noResumeTokenError);
}
self.resumeToken = change._id;

// Return the change
if (typeof callback === 'function') return callback(err, change);
if (self.listenerCount('data')) return self.emit('data', change);
return self.promiseLibrary.resolve(change);
};

/**
* The callback format for results
* @callback ChangeStream~resultCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {(object|null)} result The result object if the command was executed successfully.
*/

inherits(ChangeStream, EventEmitter);

module.exports = ChangeStream;
Loading

0 comments on commit c02d25c

Please sign in to comment.