Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-4683): make ChangeStream an async iterable #3454

Merged
merged 22 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,26 @@ export class ChangeStream<
}, callback);
}

async *[Symbol.asyncIterator](): AsyncGenerator<TChange, void, void> {
if (this.closed) {
return;
}

try {
// Change streams run indefinitely as long as errors are resumable
// So the only loop breaking condition is if `next()` throws
while (true) {
yield await this.next();
}
} finally {
try {
await this.close();
} catch (error) {
andymina marked this conversation as resolved.
Show resolved Hide resolved
// we're not concerned with errors from close()
}
}
}

/** Is the cursor closed */
get closed(): boolean {
return this[kClosed] || this.cursor.closed;
Expand Down
320 changes: 296 additions & 24 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import { promisify } from 'util';
import {
AbstractCursor,
ChangeStream,
ChangeStreamDocument,
ChangeStreamOptions,
Collection,
CommandStartedEvent,
Db,
Long,
MongoAPIError,
MongoChangeStreamError,
MongoClient,
MongoServerError,
Expand All @@ -41,6 +41,9 @@ const initIteratorMode = async (cs: ChangeStream) => {
return;
};

const is4_2Server = (serverVersion: string) =>
gte(serverVersion, '4.2.0') && lt(serverVersion, '4.3.0');

// Define the pipeline processing changes
const pipeline = [
{ $addFields: { addedField: 'This is a field added using $addFields' } },
Expand Down Expand Up @@ -70,6 +73,7 @@ describe('Change Streams', function () {
});

afterEach(async () => {
sinon.restore();
await changeStream.close();
await client.close();
await mock.cleanup();
Expand Down Expand Up @@ -949,31 +953,175 @@ describe('Change Streams', function () {
'This test only worked because of timing, changeStream.close does not remove the change listener';
});

describe('#tryNext()', function () {
it('should return null on single iteration of empty cursor', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
const doc = await changeStream.tryNext();
expect(doc).to.be.null;
}
describe('iterator api', function () {
describe('#tryNext()', function () {
it('should return null on single iteration of empty cursor', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
const doc = await changeStream.tryNext();
expect(doc).to.be.null;
}
});

it('should iterate a change stream until first empty batch', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
// tryNext doesn't send the initial agg, just checks the driver document batch cache
const firstTry = await changeStream.tryNext();
expect(firstTry).to.be.null;

await initIteratorMode(changeStream);
await collection.insertOne({ a: 42 });

const secondTry = await changeStream.tryNext();
expect(secondTry).to.be.an('object');

const thirdTry = await changeStream.tryNext();
expect(thirdTry).to.be.null;
}
});
});

it('should iterate a change stream until first empty batch', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
// tryNext doesn't send the initial agg, just checks the driver document batch cache
const firstTry = await changeStream.tryNext();
expect(firstTry).to.be.null;
describe('#asyncIterator', function () {
it(
'can iterate through changes',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
const changeStreamIterator = changeStream[Symbol.asyncIterator]();
andymina marked this conversation as resolved.
Show resolved Hide resolved

await initIteratorMode(changeStream);
await collection.insertOne({ a: 42 });
const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }];
await collection.insertMany(docs);

const secondTry = await changeStream.tryNext();
expect(secondTry).to.be.an('object');
for (const doc of docs) {
const change = await changeStreamIterator.next();
const { fullDocument } = change.value;
expect(fullDocument.city).to.equal(doc.city);
}
}
);

const thirdTry = await changeStream.tryNext();
expect(thirdTry).to.be.null;
}
it(
'should close the change stream when return is called',
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
const changeStreamIterator = changeStream[Symbol.asyncIterator]();

const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }];
await collection.insertMany(docs);

await changeStreamIterator.next();
await changeStreamIterator.return();
expect(changeStream.closed).to.be.true;
expect(changeStream.cursor.closed).to.be.true;
}
);

it(
'should close the change stream when an error is thrown',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
const changeStreamIterator = changeStream[Symbol.asyncIterator]();

const unresumableErrorCode = 1000;
await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
} as FailPoint);

await collection.insertOne({ city: 'New York City' });
try {
await changeStreamIterator.next();
expect.fail(
'Change stream did not throw unresumable error and did not produce any events'
);
} catch (error) {
expect(changeStream.closed).to.be.true;
expect(changeStream.cursor.closed).to.be.true;
}
}
);

it(
'should not produce events on closed stream',
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch([]);
changeStream.close();

const changeStreamIterator = changeStream[Symbol.asyncIterator]();
const change = await changeStreamIterator.next();

expect(change.value).to.be.undefined;
}
);

it(
'cannot be used with emitter-based iteration',
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch([]);
changeStream.on('change', sinon.stub());
const changeStreamIterator = changeStream[Symbol.asyncIterator]();

const error = await changeStreamIterator.next().catch(e => e);
expect(error).to.be.instanceOf(MongoAPIError);
}
);

it(
'can be used with raw iterator API',
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
const changeStreamIterator = changeStream[Symbol.asyncIterator]();

const docs = [{ city: 'Los Angeles' }, { city: 'Miami' }];
await collection.insertMany(docs);

await changeStream.next();

try {
const change = await changeStreamIterator.next();
expect(change.value).to.not.be.undefined;

const { fullDocument } = change.value;
expect(fullDocument.city).to.equal(docs[1].city);
} catch (error) {
expect.fail('Async could not be used with raw iterator API');
}
}
);

it(
'ignores errors thrown from close',
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
const changeStreamIterator = changeStream[Symbol.asyncIterator]();

sinon.stub(changeStream.cursor, 'close').throws(new MongoAPIError('testing'));

try {
await changeStreamIterator.return();
} catch (error) {
expect.fail('Async iterator threw an error on close');
}
}
);
});
});

Expand Down Expand Up @@ -1651,9 +1799,6 @@ describe('ChangeStream resumability', function () {
{ error: 'CursorNotFound', code: 43, message: 'cursor not found' }
];

const is4_2Server = (serverVersion: string) =>
gte(serverVersion, '4.2.0') && lt(serverVersion, '4.3.0');

beforeEach(function () {
assert(this.currentTest != null);
if (
Expand Down Expand Up @@ -2198,6 +2343,133 @@ describe('ChangeStream resumability', function () {
);
});
});

andymina marked this conversation as resolved.
Show resolved Hide resolved
context('#asyncIterator', function () {
for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
const changeStreamIterator = changeStream[Symbol.asyncIterator]();

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: code,
errmsg: message
}
} as FailPoint);

await collection.insertOne({ city: 'New York City' });
await changeStreamIterator.next();

expect(aggregateEvents).to.have.lengthOf(2);
}
);
}

for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '<4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
const changeStreamIterator = changeStream[Symbol.asyncIterator]();

// on 3.6 servers, no postBatchResumeToken is sent back in the initial aggregate response.
// This means that a resume token isn't cached until the first change has been iterated.
// In order to test the resume, we need to ensure that at least one document has
// been iterated so we have a resume token to resume on.
await collection.insertOne({ city: 'New York City' });
await changeStreamIterator.next();

const mock = sinon
.stub(changeStream.cursor, '_getMore')
.callsFake((_batchSize, callback) => {
mock.restore();
const error = new MongoServerError({ message });
error.code = code;
callback(error);
});

await collection.insertOne({ city: 'New York City' });
await changeStreamIterator.next();

expect(aggregateEvents).to.have.lengthOf(2);
}
);
}

it(
andymina marked this conversation as resolved.
Show resolved Hide resolved
'maintains change stream options on resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([], changeStreamResumeOptions);
await initIteratorMode(changeStream);
const changeStreamIterator = changeStream[Symbol.asyncIterator]();

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: resumableErrorCodes[0].code,
errmsg: resumableErrorCodes[0].message
}
} as FailPoint);

expect(changeStream.cursor)
.to.have.property('options')
.that.containSubset(changeStreamResumeOptions);

await collection.insertOne({ city: 'New York City' });
await changeStreamIterator.next();

expect(changeStream.cursor)
.to.have.property('options')
.that.containSubset(changeStreamResumeOptions);
}
);

context('when the error is not a resumable error', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
const changeStreamIterator = changeStream[Symbol.asyncIterator]();

const unresumableErrorCode = 1000;
await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
} as FailPoint);

await collection.insertOne({ city: 'New York City' });

const error = await changeStreamIterator.next().catch(e => e);
expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(1);
}
);
});
});
});

describe('event emitter based iteration', function () {
Expand Down