Skip to content

Commit

Permalink
fix(changeStream): properly handle changeStream event mid-close (mong…
Browse files Browse the repository at this point in the history
…odb#1902)

If a changeStream gets an event while it is in the middle of closing,
a race condition can occur where the event is still processed after
the stream has closed. This commit adds handling for this edge
case by returning an error to callbacks, rejecting promises,
and simply ignoring it in emitter mode.

Fixes NODE-1831
  • Loading branch information
daprahamian authored and kiku-jw committed Feb 11, 2019
1 parent ce11b57 commit 0dd1a3e
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 0 deletions.
14 changes: 14 additions & 0 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,20 @@ function processNewChange(args) {
const change = args.change;
const callback = args.callback;
const eventEmitter = args.eventEmitter || false;

// If the changeStream is closed, then it should not process a change.
if (changeStream.isClosed()) {
// We do not error in the eventEmitter case.
if (eventEmitter) {
return;
}

const error = new MongoError('ChangeStream is closed');
return typeof callback === 'function'
? callback(error, null)
: changeStream.promiseLibrary.reject(error);
}

const topology = changeStream.topology;
const options = changeStream.cursor.options;

Expand Down
105 changes: 105 additions & 0 deletions test/functional/change_stream_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1677,4 +1677,109 @@ describe('Change Streams', function() {
.then(() => finish(), err => finish(err));
}
});

describe('should properly handle a changeStream event being processed mid-close', function() {
let client, coll;

function write() {
return Promise.resolve()
.then(() => coll.insertOne({ a: 1 }))
.then(() => coll.insertOne({ b: 2 }))
.then(() => coll.insertOne({ c: 3 }));
}

beforeEach(function() {
client = this.configuration.newClient();
return client.connect().then(_client => {
client = _client;
coll = client.db(this.configuration.db).collection('tester');
});
});

afterEach(function() {
coll = undefined;
if (client) {
return client.close().then(() => {
client = undefined;
});
}
});

it('when invoked with promises', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
test: function() {
function read() {
const changeStream = coll.watch();
return Promise.resolve()
.then(() => changeStream.next())
.then(() => changeStream.next())
.then(() => {
const nextP = changeStream.next();

return changeStream.close().then(() => nextP);
});
}

return Promise.all([read(), write()]).then(
() => Promise.reject(new Error('Expected operation to fail with error')),
err => expect(err.message).to.equal('ChangeStream is closed')
);
}
});

it('when invoked with callbacks', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
test: function(done) {
const changeStream = coll.watch();

changeStream.next(() => {
changeStream.next(() => {
changeStream.next(err => {
let _err = null;
try {
expect(err.message).to.equal('ChangeStream is closed');
} catch (e) {
_err = e;
} finally {
done(_err);
}
});
changeStream.close();
});
});

write().catch(() => {});
}
});

it('when invoked using eventEmitter API', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
test: function(done) {
let closed = false;
const close = _err => {
if (closed) {
return;
}
closed = true;
return done(_err);
};

const changeStream = coll.watch();

let counter = 0;
changeStream.on('change', () => {
counter += 1;
if (counter === 2) {
changeStream.close();
setTimeout(() => close());
} else if (counter >= 3) {
close(new Error('Should not have received more than 2 events'));
}
});
changeStream.on('error', err => close(err));

setTimeout(() => write().catch(() => {}));
}
});
});
});

0 comments on commit 0dd1a3e

Please sign in to comment.