Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(change_stream): do not check isGetMore if error[mongoErrorContextSymbol] is undefined #1720

Merged
merged 2 commits into from
Jun 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 1 addition & 32 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

const EventEmitter = require('events');
const inherits = require('util').inherits;
const MongoNetworkError = require('mongodb-core').MongoNetworkError;
const mongoErrorContextSymbol = require('mongodb-core').mongoErrorContextSymbol;
const GET_MORE_NON_RESUMABLE_CODES = require('./error_codes').GET_MORE_NON_RESUMABLE_CODES;
const isResumableError = require('./error').isResumableError;

var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference'];

Expand Down Expand Up @@ -298,35 +296,6 @@ ChangeStream.prototype.stream = function(options) {
return this.cursor.stream(options);
};

// From spec@https://github.com/mongodb/specifications/blob/35e466ddf25059cb30e4113de71cdebd3754657f/source/change-streams.rst#resumable-error:
//
// An error is considered resumable if it meets any of the following criteria:
// - any error encountered which is not a server error (e.g. a timeout error or network error)
// - any server error response from a getMore command excluding those containing the following error codes
// - Interrupted: 11601
// - CappedPositionLost: 136
// - CursorKilled: 237
// - a server error response with an error message containing the substring "not master" or "node is recovering"
//
// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors.

function isGetMoreError(error) {
return !!error[mongoErrorContextSymbol].isGetMore;
}

function isResumableError(error) {
if (!isGetMoreError(error)) {
return false;
}

return !!(
error instanceof MongoNetworkError ||
!GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
error.message.match(/not master/) ||
error.message.match(/node is recovering/)
);
}

// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
var processNewChange = function(self, err, change, callback) {
// Handle errors
Expand Down
43 changes: 43 additions & 0 deletions lib/error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
'use strict';

const MongoNetworkError = require('mongodb-core').MongoNetworkError;
const mongoErrorContextSymbol = require('mongodb-core').mongoErrorContextSymbol;

const GET_MORE_NON_RESUMABLE_CODES = new Set([
136, // CappedPositionLost
237, // CursorKilled
11601 // Interrupted
]);

// From spec@https://github.com/mongodb/specifications/blob/35e466ddf25059cb30e4113de71cdebd3754657f/source/change-streams.rst#resumable-error:
//
// An error is considered resumable if it meets any of the following criteria:
// - any error encountered which is not a server error (e.g. a timeout error or network error)
// - any server error response from a getMore command excluding those containing the following error codes
// - Interrupted: 11601
// - CappedPositionLost: 136
// - CursorKilled: 237
// - a server error response with an error message containing the substring "not master" or "node is recovering"
//
// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors.

function isGetMoreError(error) {
if (error[mongoErrorContextSymbol]) {
return error[mongoErrorContextSymbol].isGetMore;
}
}

function isResumableError(error) {
if (!isGetMoreError(error)) {
return false;
}

return !!(
error instanceof MongoNetworkError ||
!GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
error.message.match(/not master/) ||
error.message.match(/node is recovering/)
);
}

module.exports = { GET_MORE_NON_RESUMABLE_CODES, isResumableError };
9 changes: 0 additions & 9 deletions lib/error_codes.js

This file was deleted.

18 changes: 14 additions & 4 deletions test/unit/change_stream_resume_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const MongoClient = require('../../lib/mongo_client');
const ObjectId = require('../../index').ObjectId;
const Timestamp = require('../../index').Timestamp;
const Long = require('../../index').Long;
const GET_MORE_NON_RESUMABLE_CODES = require('../../lib/error_codes').GET_MORE_NON_RESUMABLE_CODES;
const GET_MORE_NON_RESUMABLE_CODES = require('../../lib/error').GET_MORE_NON_RESUMABLE_CODES;
const isResumableError = require('../../lib/error').isResumableError;

describe('Change Stream Resume Tests', function() {
const test = {};
Expand Down Expand Up @@ -126,15 +127,15 @@ describe('Change Stream Resume Tests', function() {
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
},
{
description: `should resume on an error that says "not master"`,
description: `should resume on an error that says 'not master'`,
passing: true,
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
firstGetMore: req => req.reply({ ok: 0, errmsg: 'not master' }),
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
},
{
description: `should resume on an error that says "node is recovering"`,
description: `should resume on an error that says 'node is recovering'`,
passing: true,
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
Expand Down Expand Up @@ -175,14 +176,17 @@ describe('Change Stream Resume Tests', function() {
test.server = server;
});
});

afterEach(done => changeStream.close(() => client.close(() => mock.cleanup(done))));

configs.forEach(config => {
it(config.description, {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: function() {
test.server.setMessageHandler(makeServerHandler(config));
client = new MongoClient(`mongodb://${test.server.uri()}`, { socketTimeoutMS: 300 });
client = new MongoClient(`mongodb://${test.server.uri()}`, {
socketTimeoutMS: 300
});
return client
.connect()
.then(client => client.db('test'))
Expand Down Expand Up @@ -210,3 +214,9 @@ describe('Change Stream Resume Tests', function() {
});
});
});

describe('Change Stream Resume Error Tests', function() {
it('should properly process errors that lack the `mongoErrorContextSymbol`', function() {
expect(() => isResumableError(new Error())).to.not.throw();
});
});