diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index c4836eea49..b97a11fd33 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -1,4 +1,4 @@ -import { assert } from 'chai'; +import { assert, expect } from 'chai'; import { describe, it } from 'mocha'; import { expectJSON } from '../../__testUtils__/expectJSON.js'; @@ -851,6 +851,57 @@ describe('Execute: stream directive', () => { ]); }); it('Handles async errors thrown by completeValue after initialCount is reached', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 1) { + nonNullName + } + } + `); + const result = await complete(document, { + friendList: () => [ + Promise.resolve({ nonNullName: friends[0].name }), + Promise.resolve({ + nonNullName: () => Promise.reject(new Error('Oops')), + }), + Promise.resolve({ nonNullName: friends[1].name }), + ], + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [{ nonNullName: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['friendList', 1], + errors: [ + { + message: 'Oops', + locations: [{ line: 4, column: 11 }], + path: ['friendList', 1, 'nonNullName'], + }, + ], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ nonNullName: 'Han' }], + path: ['friendList', 2], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles async errors thrown by completeValue after initialCount is reached for a non-nullable list', async () => { const document = parse(` query { nonNullFriendList @stream(initialCount: 1) { @@ -946,6 +997,50 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Handles async errors thrown by completeValue after initialCount is reached from async iterable for a non-nullable list', async () => { + const document = parse(` + query { + nonNullFriendList @stream(initialCount: 1) { + nonNullName + } + } + `); + const result = await complete(document, { + async *nonNullFriendList() { + yield await Promise.resolve({ nonNullName: friends[0].name }); + yield await Promise.resolve({ + nonNullName: () => Promise.reject(new Error('Oops')), + }); + yield await Promise.resolve({ + nonNullName: friends[1].name, + }); /* c8 ignore start */ + } /* c8 ignore stop */, + }); + expectJSON(result).toDeepEqual([ + { + data: { + nonNullFriendList: [{ nonNullName: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: null, + path: ['nonNullFriendList', 1], + errors: [ + { + message: 'Oops', + locations: [{ line: 4, column: 11 }], + path: ['nonNullFriendList', 1, 'nonNullName'], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); it('Filters payloads that are nulled', async () => { const document = parse(` query { @@ -961,8 +1056,8 @@ describe('Execute: stream directive', () => { nestedObject: { nonNullScalarField: () => Promise.resolve(null), async *nestedFriendList() { - yield await Promise.resolve(friends[0]); - }, + yield await Promise.resolve(friends[0]); /* c8 ignore start */ + } /* c8 ignore stop */, }, }); expectJSON(result).toDeepEqual({ @@ -1061,9 +1156,6 @@ describe('Execute: stream directive', () => { path: ['nestedObject', 'nestedFriendList', 0], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); @@ -1088,8 +1180,8 @@ describe('Execute: stream directive', () => { deeperNestedObject: { nonNullScalarField: () => Promise.resolve(null), async *deeperNestedFriendList() { - yield await Promise.resolve(friends[0]); - }, + yield await Promise.resolve(friends[0]); /* c8 ignore start */ + } /* c8 ignore stop */, }, }, }); @@ -1176,14 +1268,17 @@ describe('Execute: stream directive', () => { it('Returns iterator and ignores errors when stream payloads are filtered', async () => { let returned = false; - let index = 0; + let requested = false; const iterable = { [Symbol.asyncIterator]: () => ({ next: () => { - const friend = friends[index++]; - if (!friend) { - return Promise.resolve({ done: true, value: undefined }); + if (requested) { + /* c8 ignore next 3 */ + // Not reached, iterator should end immediately. + expect.fail('Not reached'); } + requested = true; + const friend = friends[0]; return Promise.resolve({ done: false, value: { @@ -1261,17 +1356,12 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, + hasNext: false, }, }); - const result3 = await iterator.next(); - expectJSON(result3).toDeepEqual({ - done: false, - value: { hasNext: false }, - }); - const result4 = await iterator.next(); - expectJSON(result4).toDeepEqual({ done: true, value: undefined }); + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ done: true, value: undefined }); assert(returned); }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 9f3cd4db74..e7a6bed841 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -2026,36 +2026,23 @@ async function executeStreamIterator( exeContext, }); - const dataPromise = executeStreamIteratorItem( - iterator, - exeContext, - fieldNodes, - info, - itemType, - asyncPayloadRecord, - itemPath, - ); - - asyncPayloadRecord.addItems( - dataPromise - .then(({ value }) => value) - .then( - (value) => [value], - (err) => { - asyncPayloadRecord.errors.push(err); - return null; - }, - ), - ); + let iteration; try { // eslint-disable-next-line no-await-in-loop - const { done } = await dataPromise; - if (done) { - break; - } - } catch (err) { - // entire stream has errored and bubbled upwards + iteration = await executeStreamIteratorItem( + iterator, + exeContext, + fieldNodes, + info, + itemType, + asyncPayloadRecord, + itemPath, + ); + } catch (error) { + asyncPayloadRecord.errors.push(error); filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); + asyncPayloadRecord.addItems(null); + // entire stream has errored and bubbled upwards if (iterator?.return) { iterator.return().catch(() => { // ignore errors @@ -2063,6 +2050,28 @@ async function executeStreamIterator( } return; } + + const { done, value: completedItem } = iteration; + + let completedItems: PromiseOrValue | null>; + if (isPromise(completedItem)) { + completedItems = completedItem.then( + (value) => [value], + (error) => { + asyncPayloadRecord.errors.push(error); + filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); + return null; + }, + ); + } else { + completedItems = [completedItem]; + } + + asyncPayloadRecord.addItems(completedItems); + + if (done) { + break; + } previousAsyncPayloadRecord = asyncPayloadRecord; index++; }