Skip to content

Commit

Permalink
fix(change_stream): do not check isGetMore if error[mongoErrorContext…
Browse files Browse the repository at this point in the history
…Symbol] is undefined (#1720)

Fixes NODE-1494
  • Loading branch information
kvwalker authored and mbroadst committed Jun 18, 2018
1 parent ecf1394 commit b2c8129
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 45 deletions.
33 changes: 1 addition & 32 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

const EventEmitter = require('events');
const inherits = require('util').inherits;
const MongoNetworkError = require('mongodb-core').MongoNetworkError;
const mongoErrorContextSymbol = require('mongodb-core').mongoErrorContextSymbol;
const GET_MORE_NON_RESUMABLE_CODES = require('./error_codes').GET_MORE_NON_RESUMABLE_CODES;
const isResumableError = require('./error').isResumableError;

var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference'];

Expand Down Expand Up @@ -298,35 +296,6 @@ ChangeStream.prototype.stream = function(options) {
return this.cursor.stream(options);
};

// From spec@https://github.com/mongodb/specifications/blob/35e466ddf25059cb30e4113de71cdebd3754657f/source/change-streams.rst#resumable-error:
//
// An error is considered resumable if it meets any of the following criteria:
// - any error encountered which is not a server error (e.g. a timeout error or network error)
// - any server error response from a getMore command excluding those containing the following error codes
// - Interrupted: 11601
// - CappedPositionLost: 136
// - CursorKilled: 237
// - a server error response with an error message containing the substring "not master" or "node is recovering"
//
// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors.

function isGetMoreError(error) {
return !!error[mongoErrorContextSymbol].isGetMore;
}

function isResumableError(error) {
if (!isGetMoreError(error)) {
return false;
}

return !!(
error instanceof MongoNetworkError ||
!GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
error.message.match(/not master/) ||
error.message.match(/node is recovering/)
);
}

// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
var processNewChange = function(self, err, change, callback) {
// Handle errors
Expand Down
43 changes: 43 additions & 0 deletions lib/error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
'use strict';

const MongoNetworkError = require('mongodb-core').MongoNetworkError;
const mongoErrorContextSymbol = require('mongodb-core').mongoErrorContextSymbol;

const GET_MORE_NON_RESUMABLE_CODES = new Set([
136, // CappedPositionLost
237, // CursorKilled
11601 // Interrupted
]);

// From spec@https://github.com/mongodb/specifications/blob/35e466ddf25059cb30e4113de71cdebd3754657f/source/change-streams.rst#resumable-error:
//
// An error is considered resumable if it meets any of the following criteria:
// - any error encountered which is not a server error (e.g. a timeout error or network error)
// - any server error response from a getMore command excluding those containing the following error codes
// - Interrupted: 11601
// - CappedPositionLost: 136
// - CursorKilled: 237
// - a server error response with an error message containing the substring "not master" or "node is recovering"
//
// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors.

function isGetMoreError(error) {
if (error[mongoErrorContextSymbol]) {
return error[mongoErrorContextSymbol].isGetMore;
}
}

function isResumableError(error) {
if (!isGetMoreError(error)) {
return false;
}

return !!(
error instanceof MongoNetworkError ||
!GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
error.message.match(/not master/) ||
error.message.match(/node is recovering/)
);
}

module.exports = { GET_MORE_NON_RESUMABLE_CODES, isResumableError };
9 changes: 0 additions & 9 deletions lib/error_codes.js

This file was deleted.

18 changes: 14 additions & 4 deletions test/unit/change_stream_resume_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const MongoClient = require('../../lib/mongo_client');
const ObjectId = require('../../index').ObjectId;
const Timestamp = require('../../index').Timestamp;
const Long = require('../../index').Long;
const GET_MORE_NON_RESUMABLE_CODES = require('../../lib/error_codes').GET_MORE_NON_RESUMABLE_CODES;
const GET_MORE_NON_RESUMABLE_CODES = require('../../lib/error').GET_MORE_NON_RESUMABLE_CODES;
const isResumableError = require('../../lib/error').isResumableError;

describe('Change Stream Resume Tests', function() {
const test = {};
Expand Down Expand Up @@ -126,15 +127,15 @@ describe('Change Stream Resume Tests', function() {
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
},
{
description: `should resume on an error that says "not master"`,
description: `should resume on an error that says 'not master'`,
passing: true,
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
firstGetMore: req => req.reply({ ok: 0, errmsg: 'not master' }),
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
},
{
description: `should resume on an error that says "node is recovering"`,
description: `should resume on an error that says 'node is recovering'`,
passing: true,
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
Expand Down Expand Up @@ -175,14 +176,17 @@ describe('Change Stream Resume Tests', function() {
test.server = server;
});
});

afterEach(done => changeStream.close(() => client.close(() => mock.cleanup(done))));

configs.forEach(config => {
it(config.description, {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: function() {
test.server.setMessageHandler(makeServerHandler(config));
client = new MongoClient(`mongodb://${test.server.uri()}`, { socketTimeoutMS: 300 });
client = new MongoClient(`mongodb://${test.server.uri()}`, {
socketTimeoutMS: 300
});
return client
.connect()
.then(client => client.db('test'))
Expand Down Expand Up @@ -210,3 +214,9 @@ describe('Change Stream Resume Tests', function() {
});
});
});

describe('Change Stream Resume Error Tests', function() {
it('should properly process errors that lack the `mongoErrorContextSymbol`', function() {
expect(() => isResumableError(new Error())).to.not.throw();
});
});

0 comments on commit b2c8129

Please sign in to comment.