Skip to content

Commit

Permalink
fix(NODE-6242): close becomes true after calling close when documents…
Browse files Browse the repository at this point in the history
… still remain (#4161)
  • Loading branch information
nbbeeken committed Jun 27, 2024
1 parent fb724eb commit e3d70c3
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 10 deletions.
22 changes: 19 additions & 3 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ export abstract class AbstractCursor<
}
}

/**
* The cursor has no id until it receives a response from the initial cursor creating command.
*
* It is non-zero for as long as the database has an open cursor.
*
* The initiating command may receive a zero id if the entire result is in the `firstBatch`.
*/
get id(): Long | undefined {
return this.cursorId ?? undefined;
}
Expand Down Expand Up @@ -249,10 +256,17 @@ export abstract class AbstractCursor<
this.cursorSession = clientSession;
}

/**
* The cursor is closed and all remaining locally buffered documents have been iterated.
*/
get closed(): boolean {
return this.isClosed;
return this.isClosed && (this.documents?.length ?? 0) === 0;
}

/**
* A `killCursors` command was attempted on this cursor.
* This is performed if the cursor id is non zero.
*/
get killed(): boolean {
return this.isKilled;
}
Expand Down Expand Up @@ -294,7 +308,7 @@ export abstract class AbstractCursor<
return;
}

if (this.closed && (this.documents?.length ?? 0) === 0) {
if (this.closed) {
return;
}

Expand Down Expand Up @@ -752,9 +766,11 @@ export abstract class AbstractCursor<
!session.hasEnded
) {
this.isKilled = true;
const cursorId = this.cursorId;
this.cursorId = Long.ZERO;
await executeOperation(
this.cursorClient,
new KillCursorsOperation(this.cursorId, this.cursorNamespace, this.selectedServer, {
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
session
})
);
Expand Down
3 changes: 2 additions & 1 deletion test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,8 @@ describe('Change Streams', function () {
await changeStreamIterator.next();
await changeStreamIterator.return();
expect(changeStream.closed).to.be.true;
expect(changeStream.cursor).property('closed', true);
expect(changeStream.cursor).property('isClosed', true);
expect(changeStream.cursor).nested.property('session.hasEnded', true);
}
);

Expand Down
68 changes: 65 additions & 3 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
type FindCursor,
MongoAPIError,
type MongoClient,
MongoCursorExhaustedError,
MongoServerError
} from '../../mongodb';

Expand Down Expand Up @@ -193,7 +194,9 @@ describe('class AbstractCursor', function () {
const error = await cursor.toArray().catch(e => e);

expect(error).be.instanceOf(MongoAPIError);
expect(cursor.closed).to.be.true;
expect(cursor.id.isZero()).to.be.true;
// The first batch exhausted the cursor, the only thing to clean up is the session
expect(cursor.session.hasEnded).to.be.true;
});
});

Expand Down Expand Up @@ -225,7 +228,9 @@ describe('class AbstractCursor', function () {
}
} catch (error) {
expect(error).to.be.instanceOf(MongoAPIError);
expect(cursor.closed).to.be.true;
expect(cursor.id.isZero()).to.be.true;
// The first batch exhausted the cursor, the only thing to clean up is the session
expect(cursor.session.hasEnded).to.be.true;
}
});
});
Expand Down Expand Up @@ -259,7 +264,9 @@ describe('class AbstractCursor', function () {

const error = await cursor.forEach(iterator).catch(e => e);
expect(error).to.be.instanceOf(MongoAPIError);
expect(cursor.closed).to.be.true;
expect(cursor.id.isZero()).to.be.true;
// The first batch exhausted the cursor, the only thing to clean up is the session
expect(cursor.session.hasEnded).to.be.true;
});
});
});
Expand Down Expand Up @@ -299,4 +306,59 @@ describe('class AbstractCursor', function () {
expect(error).to.be.instanceof(MongoServerError);
});
});

describe('cursor end state', function () {
let client: MongoClient;
let cursor: FindCursor;

beforeEach(async function () {
client = this.configuration.newClient();
const test = client.db().collection('test');
await test.deleteMany({});
await test.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }]);
});

afterEach(async function () {
await cursor.close();
await client.close();
});

describe('when the last batch has been received', () => {
it('has a zero id and is not closed and is never killed', async function () {
cursor = client.db().collection('test').find({});
expect(cursor).to.have.property('closed', false);
await cursor.tryNext();
expect(cursor.id.isZero()).to.be.true;
expect(cursor).to.have.property('closed', false);
expect(cursor).to.have.property('killed', false);
});
});

describe('when the last document has been iterated', () => {
it('has a zero id and is closed and is never killed', async function () {
cursor = client.db().collection('test').find({});
await cursor.next();
await cursor.next();
await cursor.next();
await cursor.next();
expect(await cursor.next()).to.be.null;
expect(cursor.id.isZero()).to.be.true;
expect(cursor).to.have.property('closed', true);
expect(cursor).to.have.property('killed', false);
});
});

describe('when some documents have been iterated and the cursor is closed', () => {
it('has a zero id and is not closed and is killed', async function () {
cursor = client.db().collection('test').find({}, { batchSize: 2 });
await cursor.next();
await cursor.close();
expect(cursor).to.have.property('closed', false);
expect(cursor).to.have.property('killed', true);
expect(cursor.id.isZero()).to.be.true;
const error = await cursor.next().catch(error => error);
expect(error).to.be.instanceOf(MongoCursorExhaustedError);
});
});
});
});
7 changes: 4 additions & 3 deletions test/integration/node-specific/cursor_async_iterator.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ describe('Cursor Async Iterator Tests', function () {
}

expect(count).to.equal(1);
expect(cursor.closed).to.be.true;
expect(cursor.killed).to.be.true;
});

it('cleans up cursor when breaking out of for await of loops', async function () {
Expand All @@ -106,7 +106,8 @@ describe('Cursor Async Iterator Tests', function () {
break;
}

expect(cursor.closed).to.be.true;
// The expectation is that we have "cleaned" up the cursor on the server side
expect(cursor.killed).to.be.true;
});

it('returns when attempting to reuse the cursor after a break', async function () {
Expand All @@ -118,7 +119,7 @@ describe('Cursor Async Iterator Tests', function () {
break;
}

expect(cursor.closed).to.be.true;
expect(cursor.killed).to.be.true;

for await (const doc of cursor) {
expect.fail('Async generator returns immediately if cursor is closed', doc);
Expand Down

0 comments on commit e3d70c3

Please sign in to comment.