Skip to content

Commit

Permalink
fix(ChangeStream): make CursorNotFound error resumable
Browse files Browse the repository at this point in the history
  • Loading branch information
emadum authored Jul 14, 2020
1 parent 19d7deb commit 3d8ac55
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 22 deletions.
7 changes: 6 additions & 1 deletion src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const GET_MORE_RESUMABLE_CODES = new Set([
150, // StaleEpoch
13388, // StaleConfig
234, // RetryChangeStream
133 // FailedToSatisfyReadPreference
133, // FailedToSatisfyReadPreference
43 // CursorNotFound
]);

/**
Expand Down Expand Up @@ -394,6 +395,10 @@ function isResumableError(error?: any, wireVersion?: any) {
}

if (wireVersion >= 9) {
// DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable
if (error.code === 43) {
return true;
}
return error.hasErrorLabel('ResumableChangeStreamError');
}

Expand Down
14 changes: 14 additions & 0 deletions test/spec/change-stream/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,20 @@ Although synchronous drivers must provide a `non-blocking mode of iteration <../

If the test expects an error and one was not thrown by either creating the change stream or executing the test's operations, iterating the change stream once allows for an error to be thrown by a ``getMore`` command. If the test does not expect any error, the change stream should be iterated only until it returns as many result documents as are expected by the test.

Testing on Sharded Clusters
---------------------------

When writing data on sharded clusters, majority-committed data does not always show up in the response of the first
``getMore`` command after the data is written. This is because in sharded clusters, no data from shard A may be returned
until all other shard reports an entry that sorts after the change in shard A.

To account for this, drivers MUST NOT rely on change stream documents in certain batches. For example, if expecting two
documents in a change stream, these may not be part of the same ``getMore`` response, or even be produced in two
subsequent ``getMore`` responses. Drivers MUST allow for a ``getMore`` to produce empty batches when testing on a
sharded cluster. By default, this can take up to 10 seconds, but can be controlled by enabling the ``writePeriodicNoops``
server parameter and configuring the ``periodNoopIntervalSecs`` parameter. Choosing lower values allows for running
change stream tests with smaller timeouts.

Prose Tests
===========

Expand Down
19 changes: 5 additions & 14 deletions test/spec/change-stream/change-streams-errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,12 @@
],
"result": {
"error": {
"code": 280,
"errorLabels": [
"NonResumableChangeStreamError"
]
"code": 280
}
}
},
{
"description": "change stream errors on MaxTimeMSExpired",
"description": "change stream errors on ElectionInProgress",
"minServerVersion": "4.2",
"failPoint": {
"configureFailPoint": "failCommand",
Expand All @@ -121,7 +118,7 @@
"failCommands": [
"getMore"
],
"errorCode": 50,
"errorCode": 216,
"closeConnection": false
}
},
Expand All @@ -130,13 +127,7 @@
"replicaset",
"sharded"
],
"changeStreamPipeline": [
{
"$project": {
"_id": 0
}
}
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
Expand All @@ -152,7 +143,7 @@
],
"result": {
"error": {
"code": 50
"code": 216
}
}
}
Expand Down
11 changes: 4 additions & 7 deletions test/spec/change-stream/change-streams-errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,21 @@ tests:
result:
error:
code: 280
errorLabels: [ "NonResumableChangeStreamError" ]
-
description: change stream errors on MaxTimeMSExpired
description: change stream errors on ElectionInProgress
minServerVersion: "4.2"
failPoint:
configureFailPoint: failCommand
mode: { times: 1 }
data:
failCommands: ["getMore"]
errorCode: 50 # An error code that's not on the old blacklist or whitelist
errorCode: 216 # An error code that's not on the old blacklist or whitelist
closeConnection: false
target: collection
topology:
- replicaset
- sharded
changeStreamPipeline:
-
$project: { _id: 0 }
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
Expand All @@ -101,4 +98,4 @@ tests:
z: 3
result:
error:
code: 50
code: 216
96 changes: 96 additions & 0 deletions test/spec/change-stream/change-streams-resume-whitelist.json
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,102 @@
}
]
}
},
{
"description": "change stream resumes after CursorNotFound",
"minServerVersion": "4.2",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"getMore"
],
"errorCode": 43,
"closeConnection": false
}
},
"target": "collection",
"topology": [
"replicaset",
"sharded"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
},
{
"command_started_event": {
"command": {
"getMore": 42,
"collection": "test"
},
"command_name": "getMore",
"database_name": "change-stream-tests"
}
},
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"_id": "42",
"documentKey": "42",
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
}
]
}
}
]
}
66 changes: 66 additions & 0 deletions test/spec/change-stream/change-streams-resume-whitelist.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1105,3 +1105,69 @@ tests:
fullDocument:
x:
$numberInt: "1"
-
# CursorNotFound is special-cased to be resumable regardless of server versions or error labels, so this test has
# no maxWireVersion.
description: "change stream resumes after CursorNotFound"
minServerVersion: "4.2"
failPoint:
configureFailPoint: failCommand
mode: { times: 1 }
data:
failCommands: ["getMore"]
errorCode: 43
closeConnection: false
target: collection
topology:
- replicaset
- sharded
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
x: 1
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream: {}
command_name: aggregate
database_name: *database_name
-
command_started_event:
command:
getMore: 42
collection: *collection_name
command_name: getMore
database_name: *database_name
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream: {}
command_name: aggregate
database_name: *database_name
result:
success:
-
_id: "42"
documentKey: "42"
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
x:
$numberInt: "1"

0 comments on commit 3d8ac55

Please sign in to comment.