Skip to content

Commit

Permalink
fix(ChangeStream): whitelist change stream resumable errors
Browse files Browse the repository at this point in the history
  - Changes which errors are considered resumable on change streams,
    adding support for the new ResumableChangeStreamError label.
  - Removes ElectionInProgress (216) from ResumableChangeStreamError.
  - Updates ChangeStream prose tests which described startAfter
    behavior for unsupported server versions.
  - Fixes use of startAfter/resumeAfter when resuming from an
    invalidate event. Implement prose tests #17 and #18.

NODE-2478
NODE-2522
  • Loading branch information
emadum authored Apr 10, 2020
1 parent e7007d2 commit f4bf912
Show file tree
Hide file tree
Showing 15 changed files with 6,092 additions and 699 deletions.
60 changes: 34 additions & 26 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ class ChangeStreamCursor extends Cursor {
['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]);

if (this.resumeToken) {
result.resumeAfter = this.resumeToken;
const resumeKey =
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
result[resumeKey] = this.resumeToken;
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
result.startAtOperationTime = this.startAtOperationTime;
}
Expand All @@ -305,6 +307,26 @@ class ChangeStreamCursor extends Cursor {
return result;
}

cacheResumeToken(resumeToken) {
if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) {
this.resumeToken = this.cursorState.postBatchResumeToken;
} else {
this.resumeToken = resumeToken;
}
this.hasReceived = true;
}

_processBatch(batchName, response) {
const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor[batchName].length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
}

_initializeCursor(callback) {
super._initializeCursor((err, result) => {
if (err) {
Expand All @@ -323,15 +345,9 @@ class ChangeStreamCursor extends Cursor {
this.startAtOperationTime = response.operationTime;
}

const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor.firstBatch.length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
this._processBatch('firstBatch', response);

this.emit('init', result);
this.emit('response');
callback(err, result);
});
Expand All @@ -344,15 +360,9 @@ class ChangeStreamCursor extends Cursor {
return;
}

const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor.nextBatch.length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
this._processBatch('nextBatch', response);

this.emit('more', response);
this.emit('response');
callback(err, response);
});
Expand All @@ -374,6 +384,7 @@ function createChangeStreamCursor(self, options) {

const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline);
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);

const changeStreamCursor = new ChangeStreamCursor(
self.topology,
new AggregateOperation(self.parent, pipeline, options),
Expand Down Expand Up @@ -472,9 +483,10 @@ function processNewChange(args) {
const change = args.change;
const callback = args.callback;
const eventEmitter = args.eventEmitter || false;
const cursor = changeStream.cursor;

// If the changeStream is closed, then it should not process a change.
if (changeStream.isClosed()) {
// If the cursor is null, then it should not process a change.
if (cursor == null) {
// We do not error in the eventEmitter case.
if (eventEmitter) {
return;
Expand All @@ -486,12 +498,12 @@ function processNewChange(args) {
: changeStream.promiseLibrary.reject(error);
}

const cursor = changeStream.cursor;
const topology = changeStream.topology;
const options = changeStream.cursor.options;
const wireVersion = maxWireVersion(cursor.server);

if (error) {
if (isResumableError(error) && !changeStream.attemptingResume) {
if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) {
changeStream.attemptingResume = true;

// stop listening to all events from old cursor
Expand Down Expand Up @@ -557,11 +569,7 @@ function processNewChange(args) {
}

// cache the resume token
if (cursor.bufferedCount() === 0 && cursor.cursorState.postBatchResumeToken) {
cursor.resumeToken = cursor.cursorState.postBatchResumeToken;
} else {
cursor.resumeToken = change._id;
}
cursor.cacheResumeToken(change._id);

// wipe the startAtOperationTime if there was one so that there won't be a conflict
// between resumeToken and startAtOperationTime if we need to reconnect the cursor
Expand Down
33 changes: 29 additions & 4 deletions lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,26 @@ const GET_MORE_NON_RESUMABLE_CODES = new Set([
237, // CursorKilled
11601 // Interrupted
]);
// From spec@https://github.com/mongodb/specifications/blob/f93d78191f3db2898a59013a7ed5650352ef6da8/source/change-streams/change-streams.rst#resumable-error
const GET_MORE_RESUMABLE_CODES = new Set([
6, // HostUnreachable
7, // HostNotFound
89, // NetworkTimeout
91, // ShutdownInProgress
189, // PrimarySteppedDown
262, // ExceededTimeLimit
9001, // SocketException
10107, // NotMaster
11600, // InterruptedAtShutdown
11602, // InterruptedDueToReplStateChange
13435, // NotMasterNoSlaveOk
13436, // NotMasterOrSecondary
63, // StaleShardVersion
150, // StaleEpoch
13388, // StaleConfig
234, // RetryChangeStream
133 // FailedToSatisfyReadPreference
]);

/**
* Creates a new MongoError
Expand Down Expand Up @@ -329,7 +349,7 @@ function isGetMoreError(error) {
}
}

function isResumableError(error) {
function isResumableError(error, wireVersion) {
if (!isGetMoreError(error)) {
return false;
}
Expand All @@ -338,14 +358,19 @@ function isResumableError(error) {
return true;
}

return !(
GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
error.hasErrorLabel('NonRetryableChangeStreamError')
if (wireVersion >= 9) {
return error.hasErrorLabel('ResumableChangeStreamError');
}

return (
GET_MORE_RESUMABLE_CODES.has(error.code) &&
!error.hasErrorLabel('NonResumableChangeStreamError')
);
}

module.exports = {
GET_MORE_NON_RESUMABLE_CODES,
GET_MORE_RESUMABLE_CODES,
MongoError,
MongoNetworkError,
MongoParseError,
Expand Down
24 changes: 13 additions & 11 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -772,22 +772,24 @@ function relayEvents(listener, emitter, events) {
* @param {(Topology|Server)} topologyOrServer
*/
function maxWireVersion(topologyOrServer) {
if (topologyOrServer.ismaster) {
return topologyOrServer.ismaster.maxWireVersion;
}
if (topologyOrServer) {
if (topologyOrServer.ismaster) {
return topologyOrServer.ismaster.maxWireVersion;
}

if (typeof topologyOrServer.lastIsMaster === 'function') {
const lastIsMaster = topologyOrServer.lastIsMaster();
if (lastIsMaster) {
return lastIsMaster.maxWireVersion;
if (typeof topologyOrServer.lastIsMaster === 'function') {
const lastIsMaster = topologyOrServer.lastIsMaster();
if (lastIsMaster) {
return lastIsMaster.maxWireVersion;
}
}
}

if (topologyOrServer.description) {
return topologyOrServer.description.maxWireVersion;
if (topologyOrServer.description) {
return topologyOrServer.description.maxWireVersion;
}
}

return null;
return 0;
}

/*
Expand Down
Loading

0 comments on commit f4bf912

Please sign in to comment.