From 5644ddfec9a68469ddcf06b36f296adf114a41fd Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 18 Feb 2024 15:35:32 +0200 Subject: [PATCH] incremental delivery rewrite --- src/execution/IncrementalPublisher.ts | 990 ++++++------ src/execution/__tests__/defer-test.ts | 50 +- src/execution/__tests__/executor-test.ts | 51 + src/execution/__tests__/stream-test.ts | 107 +- src/execution/buildFieldPlan.ts | 109 +- src/execution/collectFields.ts | 16 +- src/execution/execute.ts | 1328 ++++++++++------- .../rules/SingleFieldSubscriptionsRule.ts | 16 +- 8 files changed, 1433 insertions(+), 1234 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 1ca31acb88..ffde6f70b8 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -1,6 +1,8 @@ +import { isPromise } from '../jsutils/isPromise.js'; import type { ObjMap } from '../jsutils/ObjMap.js'; import type { Path } from '../jsutils/Path.js'; import { pathToArray } from '../jsutils/Path.js'; +import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { @@ -8,13 +10,7 @@ import type { GraphQLFormattedError, } from '../error/GraphQLError.js'; -import type { GroupedFieldSet } from './buildFieldPlan.js'; - -interface IncrementalUpdate> { - pending: ReadonlyArray; - incremental: ReadonlyArray>; - completed: ReadonlyArray; -} +import type { DeferUsageSet } from './buildFieldPlan.js'; /** * The result of GraphQL execution. @@ -78,7 +74,10 @@ export interface FormattedInitialIncrementalExecutionResult< export interface SubsequentIncrementalExecutionResult< TData = unknown, TExtensions = ObjMap, -> extends Partial> { +> { + pending?: ReadonlyArray; + incremental?: ReadonlyArray>; + completed?: ReadonlyArray; hasNext: boolean; extensions?: TExtensions; } @@ -94,12 +93,15 @@ export interface FormattedSubsequentIncrementalExecutionResult< extensions?: TExtensions; } +interface RawDeferResult> { + errors?: ReadonlyArray; + data: TData; +} + export interface IncrementalDeferResult< TData = ObjMap, TExtensions = ObjMap, -> { - errors?: ReadonlyArray; - data: TData; +> extends RawDeferResult { id: string; subPath?: ReadonlyArray; extensions?: TExtensions; @@ -116,12 +118,15 @@ export interface FormattedIncrementalDeferResult< extensions?: TExtensions; } -export interface IncrementalStreamResult< - TData = Array, - TExtensions = ObjMap, -> { +interface RawStreamItemsResult> { errors?: ReadonlyArray; items: TData; +} + +export interface IncrementalStreamResult< + TData = ReadonlyArray, + TExtensions = ObjMap, +> extends RawStreamItemsResult { id: string; subPath?: ReadonlyArray; extensions?: TExtensions; @@ -166,214 +171,198 @@ export interface FormattedCompletedResult { errors?: ReadonlyArray; } +export function buildIncrementalResponse( + context: IncrementalPublisherContext, + result: ObjMap, + errors: ReadonlyArray | undefined, + futures: ReadonlyArray, +): ExperimentalIncrementalExecutionResults { + const incrementalPublisher = new IncrementalPublisher(context); + return incrementalPublisher.buildResponse(result, errors, futures); +} + +interface IncrementalPublisherContext { + cancellableStreams?: Set | undefined; +} + /** * This class is used to publish incremental results to the client, enabling semi-concurrent * execution while preserving result order. * - * The internal publishing state is managed as follows: - * - * '_released': the set of Subsequent Result records that are ready to be sent to the client, - * i.e. their parents have completed and they have also completed. - * - * `_pending`: the set of Subsequent Result records that are definitely pending, i.e. their - * parents have completed so that they can no longer be filtered. This includes all Subsequent - * Result records in `released`, as well as the records that have not yet completed. - * * @internal */ -export class IncrementalPublisher { - private _nextId = 0; - private _released: Set; +class IncrementalPublisher { + private _context: IncrementalPublisherContext; + private _nextId: number; private _pending: Set; - + private _completedResultQueue: Array; + private _newPending: Set; + private _incremental: Array; + private _completed: Array; // these are assigned within the Promise executor called synchronously within the constructor private _signalled!: Promise; private _resolve!: () => void; - constructor() { - this._released = new Set(); + constructor(context: IncrementalPublisherContext) { + this._context = context; + this._nextId = 0; this._pending = new Set(); + this._completedResultQueue = []; + this._newPending = new Set(); + this._incremental = []; + this._completed = []; this._reset(); } - reportNewDeferFragmentRecord( - deferredFragmentRecord: DeferredFragmentRecord, - parentIncrementalResultRecord: - | InitialResultRecord - | DeferredFragmentRecord - | StreamItemsRecord, - ): void { - parentIncrementalResultRecord.children.add(deferredFragmentRecord); - } + buildResponse( + data: ObjMap, + errors: ReadonlyArray | undefined, + futures: ReadonlyArray, + ): ExperimentalIncrementalExecutionResults { + this._addFutures(futures); + this._pruneEmpty(); - reportNewDeferredGroupedFieldSetRecord( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, - ): void { - for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { - deferredFragmentRecord._pending.add(deferredGroupedFieldSetRecord); - deferredFragmentRecord.deferredGroupedFieldSetRecords.add( - deferredGroupedFieldSetRecord, - ); - } + const pending = this._pendingSourcesToResults(); + + const initialResult: InitialIncrementalExecutionResult = + errors === undefined + ? { data, pending, hasNext: true } + : { errors, data, pending, hasNext: true }; + + return { + initialResult, + subsequentResults: this._subscribe(), + }; } - reportNewStreamItemsRecord( - streamItemsRecord: StreamItemsRecord, - parentIncrementalDataRecord: IncrementalDataRecord, - ): void { - if (isDeferredGroupedFieldSetRecord(parentIncrementalDataRecord)) { - for (const parent of parentIncrementalDataRecord.deferredFragmentRecords) { - parent.children.add(streamItemsRecord); + private _addFutures(futures: ReadonlyArray): void { + for (const future of futures) { + if (isDeferredGroupedFieldSetRecord(future)) { + for (const deferredFragmentRecord of future.deferredFragmentRecords) { + this._addDeferredFragmentRecord(deferredFragmentRecord); + } + + const result = future.result; + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => { + this._enqueueCompletedDeferredGroupedFieldSet(resolved); + }); + } else { + this._enqueueCompletedDeferredGroupedFieldSet(result); + } + + continue; } - } else { - parentIncrementalDataRecord.children.add(streamItemsRecord); - } - } - completeDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, - data: ObjMap, - ): void { - deferredGroupedFieldSetRecord.data = data; - for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { - deferredFragmentRecord._pending.delete(deferredGroupedFieldSetRecord); - if (deferredFragmentRecord._pending.size === 0) { - this.completeDeferredFragmentRecord(deferredFragmentRecord); + const streamRecord = future.streamRecord; + if (streamRecord.id === undefined) { + this._newPending.add(streamRecord); } - } - } - markErroredDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, - error: GraphQLError, - ): void { - for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { - deferredFragmentRecord.errors.push(error); - this.completeDeferredFragmentRecord(deferredFragmentRecord); + const result = future.getResult(); + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => { + this._enqueueCompletedStreamItems(resolved); + }); + } else { + this._enqueueCompletedStreamItems(result); + } } } - completeDeferredFragmentRecord( + private _addDeferredFragmentRecord( deferredFragmentRecord: DeferredFragmentRecord, ): void { - this._release(deferredFragmentRecord); - } - - completeStreamItemsRecord( - streamItemsRecord: StreamItemsRecord, - items: Array, - ) { - streamItemsRecord.items = items; - streamItemsRecord.isCompleted = true; - this._release(streamItemsRecord); - } + const parent = deferredFragmentRecord.parent; + if (parent === undefined) { + if (deferredFragmentRecord.id !== undefined) { + return; + } - markErroredStreamItemsRecord( - streamItemsRecord: StreamItemsRecord, - error: GraphQLError, - ) { - streamItemsRecord.streamRecord.errors.push(error); - this.setIsFinalRecord(streamItemsRecord); - streamItemsRecord.isCompleted = true; - streamItemsRecord.streamRecord.earlyReturn?.().catch(() => { - // ignore error - }); - this._release(streamItemsRecord); - } + this._newPending.add(deferredFragmentRecord); + return; + } - setIsFinalRecord(streamItemsRecord: StreamItemsRecord) { - streamItemsRecord.isFinalRecord = true; - } + if (parent.children.has(deferredFragmentRecord)) { + return; + } - setIsCompletedAsyncIterator(streamItemsRecord: StreamItemsRecord) { - streamItemsRecord.isCompletedAsyncIterator = true; - this.setIsFinalRecord(streamItemsRecord); - } + parent.children.add(deferredFragmentRecord); - addFieldError( - incrementalDataRecord: IncrementalDataRecord, - error: GraphQLError, - ) { - incrementalDataRecord.errors.push(error); + this._addDeferredFragmentRecord(parent); } - buildDataResponse( - initialResultRecord: InitialResultRecord, - data: ObjMap | null, - ): ExecutionResult | ExperimentalIncrementalExecutionResults { - const pendingSources = this._publish(initialResultRecord.children); - - const errors = initialResultRecord.errors; - const initialResult = errors.length === 0 ? { data } : { errors, data }; - if (pendingSources.size > 0) { - return { - initialResult: { - ...initialResult, - pending: this._pendingSourcesToResults(pendingSources), - hasNext: true, - }, - subsequentResults: this._subscribe(), - }; + private _pruneEmpty() { + const maybeEmptyNewPending = this._newPending; + this._newPending = new Set(); + for (const node of maybeEmptyNewPending) { + if (isDeferredFragmentRecord(node)) { + if (node.deferredGroupedFieldSetRecords.length > 0) { + this._newPending.add(node); + continue; + } + for (const child of node.children) { + this._addNonEmptyNewPending(child); + } + } else { + this._newPending.add(node); + } } - return initialResult; } - buildErrorResponse( - initialResultRecord: InitialResultRecord, - error: GraphQLError, - ): ExecutionResult { - const errors = initialResultRecord.errors; - errors.push(error); - return { data: null, errors }; + private _addNonEmptyNewPending( + deferredFragmentRecord: DeferredFragmentRecord, + ): void { + if (deferredFragmentRecord.deferredGroupedFieldSetRecords.length > 0) { + this._newPending.add(deferredFragmentRecord); + return; + } + /* c8 ignore next 5 */ + // TODO: add test case for this, if when skipping an empty deferred fragment, the empty fragment has nested children. + for (const child of deferredFragmentRecord.children) { + this._addNonEmptyNewPending(child); + } } - filter( - nullPath: Path | undefined, - erroringIncrementalDataRecord: IncrementalDataRecord, + private _enqueueCompletedDeferredGroupedFieldSet( + result: DeferredGroupedFieldSetResult, ): void { - const nullPathArray = pathToArray(nullPath); - - const streams = new Set(); - - const children = this._getChildren(erroringIncrementalDataRecord); - const descendants = this._getDescendants(children); - - for (const child of descendants) { - if (!this._nullsChildSubsequentResultRecord(child, nullPathArray)) { - continue; - } - - child.filtered = true; - - if (isStreamItemsRecord(child)) { - streams.add(child.streamRecord); + let hasPendingParent = false; + for (const deferredFragmentRecord of result.deferredFragmentRecords) { + if (deferredFragmentRecord.id !== undefined) { + hasPendingParent = true; } + deferredFragmentRecord.results.push(result); + } + if (hasPendingParent) { + this._completedResultQueue.push(result); + this._trigger(); } + } - streams.forEach((stream) => { - stream.earlyReturn?.().catch(() => { - // ignore error - }); - }); + private _enqueueCompletedStreamItems(result: StreamItemsResult): void { + this._completedResultQueue.push(result); + this._trigger(); } - private _pendingSourcesToResults( - pendingSources: ReadonlySet, - ): Array { + private _pendingSourcesToResults(): Array { const pendingResults: Array = []; - for (const pendingSource of pendingSources) { - pendingSource.pendingSent = true; - const id = this._getNextId(); + for (const pendingSource of this._newPending) { + const id = String(this._getNextId()); + this._pending.add(pendingSource); pendingSource.id = id; const pendingResult: PendingResult = { id, - path: pendingSource.path, + path: pathToArray(pendingSource.path), }; if (pendingSource.label !== undefined) { pendingResult.label = pendingSource.label; } pendingResults.push(pendingResult); } + this._newPending.clear(); return pendingResults; } @@ -391,47 +380,71 @@ export class IncrementalPublisher { const _next = async (): Promise< IteratorResult > => { - // eslint-disable-next-line no-constant-condition - while (true) { - if (isDone) { - return { value: undefined, done: true }; - } + while (!isDone) { + let pending: Array = []; + + let completedResult: FutureResult | undefined; + while ( + (completedResult = this._completedResultQueue.shift()) !== undefined + ) { + if (isDeferredGroupedFieldSetResult(completedResult)) { + this._handleCompletedDeferredGroupedFieldSet(completedResult); + } else { + this._handleCompletedStreamItems(completedResult); + } - for (const item of this._released) { - this._pending.delete(item); + pending = [...pending, ...this._pendingSourcesToResults()]; } - const released = this._released; - this._released = new Set(); - const result = this._getIncrementalResult(released); + if (this._incremental.length > 0 || this._completed.length > 0) { + const hasNext = this._pending.size > 0; - if (this._pending.size === 0) { - isDone = true; - } + if (!hasNext) { + isDone = true; + } + + const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = + { hasNext }; + + if (pending.length > 0) { + subsequentIncrementalExecutionResult.pending = pending; + } + if (this._incremental.length > 0) { + subsequentIncrementalExecutionResult.incremental = + this._incremental; + } + if (this._completed.length > 0) { + subsequentIncrementalExecutionResult.completed = this._completed; + } + + this._incremental = []; + this._completed = []; - if (result !== undefined) { - return { value: result, done: false }; + return { value: subsequentIncrementalExecutionResult, done: false }; } // eslint-disable-next-line no-await-in-loop await this._signalled; } + + await returnStreamIterators().catch(() => { + // ignore errors + }); + + return { value: undefined, done: true }; }; const returnStreamIterators = async (): Promise => { - const streams = new Set(); - const descendants = this._getDescendants(this._pending); - for (const subsequentResultRecord of descendants) { - if (isStreamItemsRecord(subsequentResultRecord)) { - streams.add(subsequentResultRecord.streamRecord); - } + const cancellableStreams = this._context.cancellableStreams; + if (cancellableStreams === undefined) { + return; } const promises: Array> = []; - streams.forEach((streamRecord) => { - if (streamRecord.earlyReturn) { + for (const streamRecord of cancellableStreams) { + if (streamRecord.earlyReturn !== undefined) { promises.push(streamRecord.earlyReturn()); } - }); + } await Promise.all(promises); }; @@ -475,385 +488,374 @@ export class IncrementalPublisher { this._signalled = signalled; } - private _introduce(item: SubsequentResultRecord) { - this._pending.add(item); - } - - private _release(item: SubsequentResultRecord): void { - if (this._pending.has(item)) { - this._released.add(item); - this._trigger(); + private _handleCompletedDeferredGroupedFieldSet( + deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult, + ): void { + if ( + isNonReconcilableDeferredGroupedFieldSetResult( + deferredGroupedFieldSetResult, + ) + ) { + for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + const id = deferredFragmentRecord.id; + if (id !== undefined) { + this._completed.push({ + id, + errors: deferredGroupedFieldSetResult.errors, + }); + this._pending.delete(deferredFragmentRecord); + } + } + return; } - } - - private _push(item: SubsequentResultRecord): void { - this._released.add(item); - this._pending.add(item); - this._trigger(); - } - - private _getIncrementalResult( - completedRecords: ReadonlySet, - ): SubsequentIncrementalExecutionResult | undefined { - const { pending, incremental, completed } = - this._processPending(completedRecords); - - const hasNext = this._pending.size > 0; - if (incremental.length === 0 && completed.length === 0 && hasNext) { - return undefined; + for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + deferredFragmentRecord.reconcilableResults.push( + deferredGroupedFieldSetResult, + ); } - const result: SubsequentIncrementalExecutionResult = { hasNext }; - if (pending.length) { - result.pending = pending; + if (deferredGroupedFieldSetResult.futures) { + this._addFutures(deferredGroupedFieldSetResult.futures); } - if (incremental.length) { - result.incremental = incremental; - } - if (completed.length) { - result.completed = completed; - } - - return result; - } - private _processPending( - completedRecords: ReadonlySet, - ): IncrementalUpdate { - const newPendingSources = new Set(); - const incrementalResults: Array = []; - const completedResults: Array = []; - for (const subsequentResultRecord of completedRecords) { - this._publish(subsequentResultRecord.children, newPendingSources); - if (isStreamItemsRecord(subsequentResultRecord)) { - if (subsequentResultRecord.isFinalRecord) { - newPendingSources.delete(subsequentResultRecord.streamRecord); - completedResults.push( - this._completedRecordToResult(subsequentResultRecord.streamRecord), - ); - } - if (subsequentResultRecord.isCompletedAsyncIterator) { - // async iterable resolver just finished but there may be pending payloads - continue; - } - if (subsequentResultRecord.streamRecord.errors.length > 0) { + for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + const id = deferredFragmentRecord.id; + // TODO: add test case for this. + // Presumably, this can occur if an error causes a fragment to be completed early, + // while an asynchronous deferred grouped field set result is enqueued. + /* c8 ignore next 3 */ + if (id === undefined) { + continue; + } + const fragmentResults = deferredFragmentRecord.reconcilableResults; + if ( + deferredFragmentRecord.deferredGroupedFieldSetRecords.length !== + fragmentResults.length + ) { + continue; + } + for (const fragmentResult of fragmentResults) { + if (fragmentResult.sent) { continue; } - const incrementalResult: IncrementalStreamResult = { - // safe because `items` is always defined when the record is completed - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - items: subsequentResultRecord.items!, - // safe because `id` is defined once the stream has been released as pending - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - id: subsequentResultRecord.streamRecord.id!, - }; - if (subsequentResultRecord.errors.length > 0) { - incrementalResult.errors = subsequentResultRecord.errors; - } - incrementalResults.push(incrementalResult); - } else { - newPendingSources.delete(subsequentResultRecord); - completedResults.push( - this._completedRecordToResult(subsequentResultRecord), + fragmentResult.sent = true; + const { bestId, subPath } = this._getBestIdAndSubPath( + id, + deferredFragmentRecord, + fragmentResult, ); - if (subsequentResultRecord.errors.length > 0) { - continue; + const incrementalEntry: IncrementalDeferResult = { + ...fragmentResult.result, + id: bestId, + }; + if (subPath !== undefined) { + incrementalEntry.subPath = subPath; } - for (const deferredGroupedFieldSetRecord of subsequentResultRecord.deferredGroupedFieldSetRecords) { - if (!deferredGroupedFieldSetRecord.sent) { - deferredGroupedFieldSetRecord.sent = true; - const incrementalResult: IncrementalDeferResult = - this._getIncrementalDeferResult(deferredGroupedFieldSetRecord); - if (deferredGroupedFieldSetRecord.errors.length > 0) { - incrementalResult.errors = deferredGroupedFieldSetRecord.errors; - } - incrementalResults.push(incrementalResult); + this._incremental.push(incrementalEntry); + } + this._completed.push({ id }); + this._pending.delete(deferredFragmentRecord); + for (const child of deferredFragmentRecord.children) { + this._newPending.add(child); + for (const childResult of child.results) { + if (!isPromise(childResult)) { + this._completedResultQueue.push(childResult); } } } } - return { - pending: this._pendingSourcesToResults(newPendingSources), - incremental: incrementalResults, - completed: completedResults, - }; + this._pruneEmpty(); } - private _getIncrementalDeferResult( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, - ): IncrementalDeferResult { - const { data, deferredFragmentRecords } = deferredGroupedFieldSetRecord; - let maxLength: number | undefined; - let idWithLongestPath: string | undefined; - for (const deferredFragmentRecord of deferredFragmentRecords) { - const id = deferredFragmentRecord.id; - // TODO: add test - /* c8 ignore next 3 */ - if (id === undefined) { - continue; + private _handleCompletedStreamItems( + streamItemsResult: StreamItemsResult, + ): void { + const streamRecord = streamItemsResult.streamRecord; + const id = streamRecord.id; + // TODO: Consider adding invariant or non-null assertion, as this should never happen. Since the stream is converted into a linked list + // for ordering purposes, if an entry errors, additional entries will not be processed. + /* c8 ignore next 3 */ + if (id === undefined) { + return; + } + if (streamItemsResult.result === undefined) { + this._completed.push({ id }); + this._pending.delete(streamRecord); + const cancellableStreams = this._context.cancellableStreams; + if (cancellableStreams !== undefined) { + cancellableStreams.delete(streamRecord); } - const length = deferredFragmentRecord.path.length; - if (maxLength === undefined || length > maxLength) { - maxLength = length; - idWithLongestPath = id; + } else if (streamItemsResult.result === null) { + this._completed.push({ + id, + errors: streamItemsResult.errors, + }); + this._pending.delete(streamRecord); + const cancellableStreams = this._context.cancellableStreams; + if (cancellableStreams !== undefined) { + cancellableStreams.delete(streamRecord); } - } - const subPath = deferredGroupedFieldSetRecord.path.slice(maxLength); - const incrementalDeferResult: IncrementalDeferResult = { - // safe because `data``is always defined when the record is completed - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - data: data!, - // safe because `id` is always defined once the fragment has been released - // as pending and at least one fragment has been completed, so must have been - // released as pending - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - id: idWithLongestPath!, - }; - - if (subPath.length > 0) { - incrementalDeferResult.subPath = subPath; - } + streamRecord.earlyReturn?.().catch(() => { + /* c8 ignore next 1 */ + // ignore error + }); + } else { + const incrementalEntry: IncrementalStreamResult = { + id, + ...streamItemsResult.result, + }; - return incrementalDeferResult; - } + this._incremental.push(incrementalEntry); - private _completedRecordToResult( - completedRecord: DeferredFragmentRecord | StreamRecord, - ): CompletedResult { - const result: CompletedResult = { - // safe because `id` is defined once the stream has been released as pending - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - id: completedRecord.id!, - }; - if (completedRecord.errors.length > 0) { - result.errors = completedRecord.errors; + if (streamItemsResult.futures) { + this._addFutures(streamItemsResult.futures); + this._pruneEmpty(); + } } - return result; } - private _publish( - subsequentResultRecords: ReadonlySet, - pendingSources = new Set(), - ): Set { - const emptyRecords: Array = []; - - for (const subsequentResultRecord of subsequentResultRecords) { - if (subsequentResultRecord.filtered) { - continue; - } - if (isStreamItemsRecord(subsequentResultRecord)) { - if (subsequentResultRecord.isCompleted) { - this._push(subsequentResultRecord); - } else { - this._introduce(subsequentResultRecord); - } + private _getBestIdAndSubPath( + initialId: string, + initialDeferredFragmentRecord: DeferredFragmentRecord, + deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult, + ): { bestId: string; subPath: ReadonlyArray | undefined } { + let maxLength = pathToArray(initialDeferredFragmentRecord.path).length; + let bestId = initialId; - const stream = subsequentResultRecord.streamRecord; - if (!stream.pendingSent) { - pendingSources.add(stream); - } + for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + if (deferredFragmentRecord === initialDeferredFragmentRecord) { continue; } - - if (subsequentResultRecord._pending.size > 0) { - this._introduce(subsequentResultRecord); - } else if ( - subsequentResultRecord.deferredGroupedFieldSetRecords.size === 0 - ) { - emptyRecords.push(subsequentResultRecord); + const id = deferredFragmentRecord.id; + // TODO: add test case for when an fragment has not been released, but might be processed for the shortest path. + /* c8 ignore next 3 */ + if (id === undefined) { continue; - } else { - this._push(subsequentResultRecord); } - - if (!subsequentResultRecord.pendingSent) { - pendingSources.add(subsequentResultRecord); + const fragmentPath = pathToArray(deferredFragmentRecord.path); + const length = fragmentPath.length; + if (length > maxLength) { + maxLength = length; + bestId = id; } } - - for (const emptyRecord of emptyRecords) { - this._publish(emptyRecord.children, pendingSources); - } - - return pendingSources; + const subPath = deferredGroupedFieldSetResult.path.slice(maxLength); + return { + bestId, + subPath: subPath.length > 0 ? subPath : undefined, + }; } +} - private _getChildren( - erroringIncrementalDataRecord: IncrementalDataRecord, - ): ReadonlySet { - const children = new Set(); - if (isDeferredGroupedFieldSetRecord(erroringIncrementalDataRecord)) { - for (const erroringIncrementalResultRecord of erroringIncrementalDataRecord.deferredFragmentRecords) { - for (const child of erroringIncrementalResultRecord.children) { - children.add(child); - } - } - } else { - for (const child of erroringIncrementalDataRecord.children) { - children.add(child); - } - } - return children; - } +export function isDeferredFragmentRecord( + subsequentResultRecord: SubsequentResultRecord, +): subsequentResultRecord is DeferredFragmentRecord { + return subsequentResultRecord instanceof DeferredFragmentRecord; +} - private _getDescendants( - children: ReadonlySet, - descendants = new Set(), - ): ReadonlySet { - for (const child of children) { - descendants.add(child); - this._getDescendants(child.children, descendants); - } - return descendants; - } +export function isDeferredGroupedFieldSetRecord( + future: Future, +): future is DeferredGroupedFieldSetRecord { + return future instanceof DeferredGroupedFieldSetRecord; +} - private _nullsChildSubsequentResultRecord( - subsequentResultRecord: SubsequentResultRecord, - nullPath: ReadonlyArray, - ): boolean { - const incrementalDataRecords = isStreamItemsRecord(subsequentResultRecord) - ? [subsequentResultRecord] - : subsequentResultRecord.deferredGroupedFieldSetRecords; - - for (const incrementalDataRecord of incrementalDataRecords) { - if (this._matchesPath(incrementalDataRecord.path, nullPath)) { - return true; - } - } +export interface IncrementalContext { + deferUsageSet: DeferUsageSet | undefined; + path: Path | undefined; + errors?: Map | undefined; + futures?: Array | undefined; +} - return false; - } +export type DeferredGroupedFieldSetResult = + | ReconcilableDeferredGroupedFieldSetResult + | NonReconcilableDeferredGroupedFieldSetResult; - private _matchesPath( - testPath: ReadonlyArray, - basePath: ReadonlyArray, - ): boolean { - for (let i = 0; i < basePath.length; i++) { - if (basePath[i] !== testPath[i]) { - // testPath points to a path unaffected at basePath - return false; - } - } - return true; - } +export function isDeferredGroupedFieldSetResult( + subsequentResult: DeferredGroupedFieldSetResult | StreamItemsResult, +): subsequentResult is DeferredGroupedFieldSetResult { + return 'deferredFragmentRecords' in subsequentResult; } -function isDeferredGroupedFieldSetRecord( - incrementalDataRecord: unknown, -): incrementalDataRecord is DeferredGroupedFieldSetRecord { - return incrementalDataRecord instanceof DeferredGroupedFieldSetRecord; +interface ReconcilableDeferredGroupedFieldSetResult { + deferredFragmentRecords: ReadonlyArray; + path: Array; + result: RawDeferResult; + futures?: ReadonlyArray | undefined; + sent?: true | undefined; } -function isStreamItemsRecord( - subsequentResultRecord: unknown, -): subsequentResultRecord is StreamItemsRecord { - return subsequentResultRecord instanceof StreamItemsRecord; +interface NonReconcilableDeferredGroupedFieldSetResult { + result: null; + errors: ReadonlyArray; + deferredFragmentRecords: ReadonlyArray; + path: Array; } -/** @internal */ -export class InitialResultRecord { - errors: Array; - children: Set; - constructor() { - this.errors = []; - this.children = new Set(); - } +export function isNonReconcilableDeferredGroupedFieldSetResult( + deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult, +): deferredGroupedFieldSetResult is NonReconcilableDeferredGroupedFieldSetResult { + return deferredGroupedFieldSetResult.result === null; } /** @internal */ export class DeferredGroupedFieldSetRecord { - path: ReadonlyArray; + path: Path | undefined; deferredFragmentRecords: ReadonlyArray; - groupedFieldSet: GroupedFieldSet; - shouldInitiateDefer: boolean; - errors: Array; - data: ObjMap | undefined; - sent: boolean; + result: PromiseOrValue; constructor(opts: { path: Path | undefined; + deferUsageSet: DeferUsageSet; deferredFragmentRecords: ReadonlyArray; - groupedFieldSet: GroupedFieldSet; - shouldInitiateDefer: boolean; + executor: ( + incrementalContext: IncrementalContext, + ) => PromiseOrValue; }) { - this.path = pathToArray(opts.path); - this.deferredFragmentRecords = opts.deferredFragmentRecords; - this.groupedFieldSet = opts.groupedFieldSet; - this.shouldInitiateDefer = opts.shouldInitiateDefer; - this.errors = []; - this.sent = false; + const { path, deferUsageSet, deferredFragmentRecords, executor } = opts; + this.path = path; + this.deferredFragmentRecords = deferredFragmentRecords; + + const incrementalContext: IncrementalContext = { + deferUsageSet, + path, + }; + + for (const deferredFragmentRecord of deferredFragmentRecords) { + deferredFragmentRecord.deferredGroupedFieldSetRecords.push(this); + } + + this.result = deferredFragmentRecords.some( + (deferredFragmentRecord) => deferredFragmentRecord.id !== undefined, + ) + ? executor(incrementalContext) + : Promise.resolve().then(() => executor(incrementalContext)); } } /** @internal */ export class DeferredFragmentRecord { - path: ReadonlyArray; + path: Path | undefined; label: string | undefined; - id: string | undefined; - children: Set; - deferredGroupedFieldSetRecords: Set; - errors: Array; - filtered: boolean; - pendingSent?: boolean; - _pending: Set; - - constructor(opts: { path: Path | undefined; label: string | undefined }) { - this.path = pathToArray(opts.path); + deferredGroupedFieldSetRecords: Array; + results: Array; + reconcilableResults: Array; + parent: DeferredFragmentRecord | undefined; + children: Set; + id?: string | undefined; + + constructor(opts: { + path: Path | undefined; + label: string | undefined; + parent: DeferredFragmentRecord | undefined; + }) { + this.path = opts.path; this.label = opts.label; + this.deferredGroupedFieldSetRecords = []; + this.results = []; + this.reconcilableResults = []; + this.parent = opts.parent; this.children = new Set(); - this.filtered = false; - this.deferredGroupedFieldSetRecords = new Set(); - this.errors = []; - this._pending = new Set(); } } /** @internal */ export class StreamRecord { label: string | undefined; - path: ReadonlyArray; - id: string | undefined; - errors: Array; - earlyReturn?: (() => Promise) | undefined; - pendingSent?: boolean; + path: Path; + earlyReturn: (() => Promise) | undefined; + id?: string | undefined; constructor(opts: { label: string | undefined; path: Path; earlyReturn?: (() => Promise) | undefined; }) { - this.label = opts.label; - this.path = pathToArray(opts.path); - this.errors = []; - this.earlyReturn = opts.earlyReturn; + const { label, path, earlyReturn } = opts; + this.label = label; + this.path = path; + this.earlyReturn = earlyReturn; } } +interface NonReconcilableStreamItemsResult { + streamRecord: StreamRecord; + result: null; + errors: ReadonlyArray; +} + +interface NonTerminatingStreamItemsResult { + streamRecord: StreamRecord; + result: RawStreamItemsResult; + futures?: ReadonlyArray | undefined; +} + +interface TerminatingStreamItemsResult { + streamRecord: StreamRecord; + result?: never; + futures?: never; + errors?: never; +} + +export type StreamItemsResult = + | NonReconcilableStreamItemsResult + | NonTerminatingStreamItemsResult + | TerminatingStreamItemsResult; + +export function isNonTerminatingStreamItemsResult( + streamItemsResult: StreamItemsResult, +): streamItemsResult is NonTerminatingStreamItemsResult { + return streamItemsResult.result != null; +} + /** @internal */ export class StreamItemsRecord { - errors: Array; streamRecord: StreamRecord; - path: ReadonlyArray; - items: Array | undefined; - children: Set; - isFinalRecord?: boolean; - isCompletedAsyncIterator?: boolean; - isCompleted: boolean; - filtered: boolean; - - constructor(opts: { streamRecord: StreamRecord; path: Path | undefined }) { - this.streamRecord = opts.streamRecord; - this.path = pathToArray(opts.path); - this.children = new Set(); - this.errors = []; - this.isCompleted = false; - this.filtered = false; + nextStreamItems: StreamItemsRecord | undefined; + + private _result: PromiseOrValue; + + constructor(opts: { + streamRecord: StreamRecord; + itemPath?: Path | undefined; + executor: ( + incrementalContext: IncrementalContext, + ) => PromiseOrValue; + }) { + const { streamRecord, itemPath, executor } = opts; + this.streamRecord = streamRecord; + const incrementalContext: IncrementalContext = { + deferUsageSet: undefined, + path: itemPath, + }; + + this._result = executor(incrementalContext); + } + + getResult(): PromiseOrValue { + if (isPromise(this._result)) { + return this._result.then((resolved) => + this._prependNextStreamItems(resolved), + ); + } + + return this._prependNextStreamItems(this._result); + } + + private _prependNextStreamItems( + result: StreamItemsResult, + ): StreamItemsResult { + return isNonTerminatingStreamItemsResult(result) && + this.nextStreamItems !== undefined + ? { + ...result, + futures: [this.nextStreamItems, ...(result.futures ?? [])], + } + : result; } } -export type IncrementalDataRecord = - | InitialResultRecord - | DeferredGroupedFieldSetRecord - | StreamItemsRecord; +export type Future = DeferredGroupedFieldSetRecord | StreamItemsRecord; + +export type FutureResult = DeferredGroupedFieldSetResult | StreamItemsResult; -type SubsequentResultRecord = DeferredFragmentRecord | StreamItemsRecord; +type SubsequentResultRecord = DeferredFragmentRecord | StreamRecord; diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index d03570270a..03bf8126c6 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -367,12 +367,6 @@ describe('Execute: defer directive', () => { }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { friends: [{ name: 'Han' }, { name: 'Leia' }, { name: 'C-3PO' }], @@ -380,7 +374,7 @@ describe('Execute: defer directive', () => { id: '1', }, ], - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -674,8 +668,8 @@ describe('Execute: defer directive', () => { hero: {}, }, pending: [ - { id: '0', path: [], label: 'DeferName' }, - { id: '1', path: ['hero'], label: 'DeferID' }, + { id: '0', path: ['hero'], label: 'DeferID' }, + { id: '1', path: [], label: 'DeferName' }, ], hasNext: true, }, @@ -685,17 +679,17 @@ describe('Execute: defer directive', () => { data: { id: '1', }, - id: '1', + id: '0', }, { data: { name: 'Luke', }, - id: '0', + id: '1', subPath: ['hero'], }, ], - completed: [{ id: '1' }, { id: '0' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -983,37 +977,27 @@ describe('Execute: defer directive', () => { hasNext: true, }, { - pending: [{ id: '1', path: ['hero', 'nestedObject'] }], + pending: [ + { id: '1', path: ['hero', 'nestedObject'] }, + { id: '2', path: ['hero', 'nestedObject', 'deeperObject'] }, + ], incremental: [ { data: { bar: 'bar' }, id: '0', subPath: ['nestedObject', 'deeperObject'], }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - pending: [{ id: '2', path: ['hero', 'nestedObject', 'deeperObject'] }], - incremental: [ { data: { baz: 'baz' }, id: '1', subPath: ['deeperObject'], }, - ], - hasNext: true, - completed: [{ id: '1' }], - }, - { - incremental: [ { data: { bak: 'bak' }, id: '2', }, ], - completed: [{ id: '2' }], + completed: [{ id: '0' }, { id: '1' }, { id: '2' }], hasNext: false, }, ]); @@ -1132,8 +1116,8 @@ describe('Execute: defer directive', () => { }, }, pending: [ - { id: '0', path: [] }, - { id: '1', path: ['a', 'b'] }, + { id: '0', path: ['a', 'b'] }, + { id: '1', path: [] }, ], hasNext: true, }, @@ -1141,14 +1125,14 @@ describe('Execute: defer directive', () => { incremental: [ { data: { e: { f: 'f' } }, - id: '1', + id: '0', }, { data: { g: { h: 'h' } }, - id: '0', + id: '1', }, ], - completed: [{ id: '1' }, { id: '0' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -1277,6 +1261,7 @@ describe('Execute: defer directive', () => { }, ], completed: [ + { id: '0' }, { id: '1', errors: [ @@ -1288,7 +1273,6 @@ describe('Execute: defer directive', () => { }, ], }, - { id: '0' }, ], hasNext: false, }, diff --git a/src/execution/__tests__/executor-test.ts b/src/execution/__tests__/executor-test.ts index c29b4ae60d..de33f8c91b 100644 --- a/src/execution/__tests__/executor-test.ts +++ b/src/execution/__tests__/executor-test.ts @@ -635,6 +635,57 @@ describe('Execute: Handles basic execution tasks', () => { expect(isAsyncResolverFinished).to.equal(true); }); + it('handles async bubbling errors combined with non-bubbling errors', async () => { + const schema = new GraphQLSchema({ + query: new GraphQLObjectType({ + name: 'Query', + fields: { + asyncNonNullError: { + type: new GraphQLNonNull(GraphQLString), + async resolve() { + await resolveOnNextTick(); + return null; + }, + }, + asyncError: { + type: GraphQLString, + async resolve() { + await resolveOnNextTick(); + throw new Error('Oops'); + }, + }, + }, + }), + }); + + // Order is important here, as the nullable error should resolve first + const document = parse(` + { + asyncError + asyncNonNullError + } + `); + + const result = execute({ schema, document }); + + expectJSON(await result).toDeepEqual({ + data: null, + errors: [ + { + message: 'Oops', + locations: [{ line: 3, column: 9 }], + path: ['asyncError'], + }, + { + message: + 'Cannot return null for non-nullable field Query.asyncNonNullError.', + locations: [{ line: 4, column: 9 }], + path: ['asyncNonNullError'], + }, + ], + }); + }); + it('Full response path is included for non-nullable fields', () => { const A: GraphQLObjectType = new GraphQLObjectType({ name: 'A', diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 6e1928f945..22be4e4d35 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -146,11 +146,10 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [{ items: ['banana'], id: '0' }], - hasNext: true, - }, - { - incremental: [{ items: ['coconut'], id: '0' }], + incremental: [ + { items: ['banana'], id: '0' }, + { items: ['coconut'], id: '0' }, + ], completed: [{ id: '0' }], hasNext: false, }, @@ -170,15 +169,11 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [{ items: ['apple'], id: '0' }], - hasNext: true, - }, - { - incremental: [{ items: ['banana'], id: '0' }], - hasNext: true, - }, - { - incremental: [{ items: ['coconut'], id: '0' }], + incremental: [ + { items: ['apple'], id: '0' }, + { items: ['banana'], id: '0' }, + { items: ['coconut'], id: '0' }, + ], completed: [{ id: '0' }], hasNext: false, }, @@ -228,11 +223,6 @@ describe('Execute: stream directive', () => { items: ['banana'], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: ['coconut'], id: '0', @@ -297,11 +287,6 @@ describe('Execute: stream directive', () => { items: [['banana', 'banana', 'banana']], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: [['coconut', 'coconut', 'coconut']], id: '0', @@ -1470,12 +1455,17 @@ describe('Execute: stream directive', () => { }, ], }, + ], + completed: [{ id: '0' }], + hasNext: true, + }, + { + incremental: [ { items: [{ name: 'Luke' }], id: '1', }, ], - completed: [{ id: '0' }], hasNext: true, }, { @@ -1954,40 +1944,49 @@ describe('Execute: stream directive', () => { hasNext: true, }); - const result2Promise = iterator.next(); - resolveIterableCompletion(null); - const result2 = await result2Promise; + const result2 = await iterator.next(); expectJSON(result2).toDeepEqual({ value: { - pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ { data: { name: 'Luke' }, id: '0', }, + ], + completed: [{ id: '0' }], + hasNext: true, + }, + done: false, + }); + + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + value: { + pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], + incremental: [ { items: [{ id: '2' }], id: '1', }, ], - completed: [{ id: '0' }], hasNext: true, }, done: false, }); - - const result3Promise = iterator.next(); - resolveSlowField('Han'); - const result3 = await result3Promise; - expectJSON(result3).toDeepEqual({ + const result4Promise = iterator.next(); + resolveIterableCompletion(null); + const result4 = await result4Promise; + expectJSON(result4).toDeepEqual({ value: { completed: [{ id: '1' }], hasNext: true, }, done: false, }); - const result4 = await iterator.next(); - expectJSON(result4).toDeepEqual({ + const result5Promise = iterator.next(); + resolveSlowField('Han'); + const result5 = await result5Promise; + expectJSON(result5).toDeepEqual({ value: { incremental: [ { @@ -2000,8 +1999,8 @@ describe('Execute: stream directive', () => { }, done: false, }); - const result5 = await iterator.next(); - expectJSON(result5).toDeepEqual({ + const result6 = await iterator.next(); + expectJSON(result6).toDeepEqual({ value: undefined, done: true, }); @@ -2060,16 +2059,11 @@ describe('Execute: stream directive', () => { const result2 = await result2Promise; expectJSON(result2).toDeepEqual({ value: { - pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ { data: { name: 'Luke' }, id: '0', }, - { - items: [{ id: '2' }], - id: '1', - }, ], completed: [{ id: '0' }], hasNext: true, @@ -2080,13 +2074,13 @@ describe('Execute: stream directive', () => { const result3 = await iterator.next(); expectJSON(result3).toDeepEqual({ value: { + pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ { - data: { name: 'Han' }, - id: '2', + items: [{ id: '2' }], + id: '1', }, ], - completed: [{ id: '2' }], hasNext: true, }, done: false, @@ -2096,14 +2090,29 @@ describe('Execute: stream directive', () => { const result4 = await result4Promise; expectJSON(result4).toDeepEqual({ value: { - completed: [{ id: '1' }], - hasNext: false, + incremental: [ + { + data: { name: 'Han' }, + id: '2', + }, + ], + completed: [{ id: '2' }], + hasNext: true, }, done: false, }); const result5 = await iterator.next(); expectJSON(result5).toDeepEqual({ + value: { + completed: [{ id: '1' }], + hasNext: false, + }, + done: false, + }); + + const result6 = await iterator.next(); + expectJSON(result6).toDeepEqual({ value: undefined, done: true, }); diff --git a/src/execution/buildFieldPlan.ts b/src/execution/buildFieldPlan.ts index 390e2cf813..d29ae94cde 100644 --- a/src/execution/buildFieldPlan.ts +++ b/src/execution/buildFieldPlan.ts @@ -1,63 +1,39 @@ import { getBySet } from '../jsutils/getBySet.js'; import { isSameSet } from '../jsutils/isSameSet.js'; -import type { DeferUsage, FieldDetails } from './collectFields.js'; +import type { + DeferUsage, + FieldGroup, + GroupedFieldSet, +} from './collectFields.js'; export type DeferUsageSet = ReadonlySet; -export interface FieldGroup { - fields: ReadonlyArray; - deferUsages?: DeferUsageSet | undefined; -} - -export type GroupedFieldSet = Map; - -export interface NewGroupedFieldSetDetails { +export interface FieldPlan { groupedFieldSet: GroupedFieldSet; - shouldInitiateDefer: boolean; + newGroupedFieldSets: Map; } export function buildFieldPlan( - fields: Map>, + originalGroupedFieldSet: GroupedFieldSet, parentDeferUsages: DeferUsageSet = new Set(), -): { - groupedFieldSet: GroupedFieldSet; - newGroupedFieldSetDetailsMap: Map; -} { - const groupedFieldSet = new Map< - string, - { - fields: Array; - deferUsages: DeferUsageSet; - } - >(); +): FieldPlan { + const groupedFieldSet = new Map(); - const newGroupedFieldSetDetailsMap = new Map< - DeferUsageSet, - { - groupedFieldSet: Map< - string, - { - fields: Array; - deferUsages: DeferUsageSet; - } - >; - shouldInitiateDefer: boolean; - } - >(); + const newGroupedFieldSets = new Map>(); const map = new Map< string, { deferUsageSet: DeferUsageSet; - fieldDetailsList: ReadonlyArray; + fieldGroup: FieldGroup; } >(); - for (const [responseKey, fieldDetailsList] of fields) { + for (const [responseKey, fieldGroup] of originalGroupedFieldSet) { const deferUsageSet = new Set(); let inOriginalResult = false; - for (const fieldDetails of fieldDetailsList) { + for (const fieldDetails of fieldGroup) { const deferUsage = fieldDetails.deferUsage; if (deferUsage === undefined) { inOriginalResult = true; @@ -77,65 +53,26 @@ export function buildFieldPlan( } }); } - map.set(responseKey, { deferUsageSet, fieldDetailsList }); + map.set(responseKey, { deferUsageSet, fieldGroup }); } - for (const [responseKey, { deferUsageSet, fieldDetailsList }] of map) { + for (const [responseKey, { deferUsageSet, fieldGroup }] of map) { if (isSameSet(deferUsageSet, parentDeferUsages)) { - let fieldGroup = groupedFieldSet.get(responseKey); - if (fieldGroup === undefined) { - fieldGroup = { - fields: [], - deferUsages: deferUsageSet, - }; - groupedFieldSet.set(responseKey, fieldGroup); - } - fieldGroup.fields.push(...fieldDetailsList); + groupedFieldSet.set(responseKey, fieldGroup); continue; } - let newGroupedFieldSetDetails = getBySet( - newGroupedFieldSetDetailsMap, - deferUsageSet, - ); - let newGroupedFieldSet; - if (newGroupedFieldSetDetails === undefined) { - newGroupedFieldSet = new Map< - string, - { - fields: Array; - deferUsages: DeferUsageSet; - knownDeferUsages: DeferUsageSet; - } - >(); - - newGroupedFieldSetDetails = { - groupedFieldSet: newGroupedFieldSet, - shouldInitiateDefer: Array.from(deferUsageSet).some( - (deferUsage) => !parentDeferUsages.has(deferUsage), - ), - }; - newGroupedFieldSetDetailsMap.set( - deferUsageSet, - newGroupedFieldSetDetails, - ); - } else { - newGroupedFieldSet = newGroupedFieldSetDetails.groupedFieldSet; - } - let fieldGroup = newGroupedFieldSet.get(responseKey); - if (fieldGroup === undefined) { - fieldGroup = { - fields: [], - deferUsages: deferUsageSet, - }; - newGroupedFieldSet.set(responseKey, fieldGroup); + let newGroupedFieldSet = getBySet(newGroupedFieldSets, deferUsageSet); + if (newGroupedFieldSet === undefined) { + newGroupedFieldSet = new Map(); + newGroupedFieldSets.set(deferUsageSet, newGroupedFieldSet); } - fieldGroup.fields.push(...fieldDetailsList); + newGroupedFieldSet.set(responseKey, fieldGroup); } return { groupedFieldSet, - newGroupedFieldSetDetailsMap, + newGroupedFieldSets, }; } diff --git a/src/execution/collectFields.ts b/src/execution/collectFields.ts index 03ba5efde6..d411ff3f77 100644 --- a/src/execution/collectFields.ts +++ b/src/execution/collectFields.ts @@ -36,6 +36,10 @@ export interface FieldDetails { deferUsage: DeferUsage | undefined; } +export type FieldGroup = ReadonlyArray; + +export type GroupedFieldSet = ReadonlyMap; + interface CollectFieldsContext { schema: GraphQLSchema; fragments: ObjMap; @@ -61,7 +65,7 @@ export function collectFields( runtimeType: GraphQLObjectType, operation: OperationDefinitionNode, ): { - fields: Map>; + groupedFieldSet: GroupedFieldSet; newDeferUsages: ReadonlyArray; } { const groupedFieldSet = new AccumulatorMap(); @@ -81,7 +85,7 @@ export function collectFields( groupedFieldSet, newDeferUsages, ); - return { fields: groupedFieldSet, newDeferUsages }; + return { groupedFieldSet, newDeferUsages }; } /** @@ -101,9 +105,9 @@ export function collectSubfields( variableValues: { [variable: string]: unknown }, operation: OperationDefinitionNode, returnType: GraphQLObjectType, - fieldDetails: ReadonlyArray, + fieldGroup: FieldGroup, ): { - fields: Map>; + groupedFieldSet: GroupedFieldSet; newDeferUsages: ReadonlyArray; } { const context: CollectFieldsContext = { @@ -117,7 +121,7 @@ export function collectSubfields( const subGroupedFieldSet = new AccumulatorMap(); const newDeferUsages: Array = []; - for (const fieldDetail of fieldDetails) { + for (const fieldDetail of fieldGroup) { const node = fieldDetail.node; if (node.selectionSet) { collectFieldsImpl( @@ -131,7 +135,7 @@ export function collectSubfields( } return { - fields: subGroupedFieldSet, + groupedFieldSet: subGroupedFieldSet, newDeferUsages, }; } diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 370186f552..742457be21 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -47,25 +47,30 @@ import { GraphQLStreamDirective } from '../type/directives.js'; import type { GraphQLSchema } from '../type/schema.js'; import { assertValidSchema } from '../type/validate.js'; +import type { DeferUsageSet, FieldPlan } from './buildFieldPlan.js'; +import { buildFieldPlan } from './buildFieldPlan.js'; import type { - DeferUsageSet, + DeferUsage, FieldGroup, GroupedFieldSet, - NewGroupedFieldSetDetails, -} from './buildFieldPlan.js'; -import { buildFieldPlan } from './buildFieldPlan.js'; -import type { DeferUsage, FieldDetails } from './collectFields.js'; -import { collectFields, collectSubfields } from './collectFields.js'; +} from './collectFields.js'; +import { + collectFields, + collectSubfields as _collectSubfields, +} from './collectFields.js'; import type { + DeferredGroupedFieldSetResult, ExecutionResult, ExperimentalIncrementalExecutionResults, - IncrementalDataRecord, + Future, + IncrementalContext, + StreamItemsResult, } from './IncrementalPublisher.js'; import { + buildIncrementalResponse, DeferredFragmentRecord, DeferredGroupedFieldSetRecord, - IncrementalPublisher, - InitialResultRecord, + isDeferredGroupedFieldSetRecord, StreamItemsRecord, StreamRecord, } from './IncrementalPublisher.js'; @@ -81,31 +86,42 @@ import { // so just disable it for entire file. /** - * A memoized function for building subfield plans with regard to the return - * type. Memoizing ensures the subfield plans are not repeatedly calculated, which + * A memoized collection of relevant subfields with regard to the return + * type. Memoizing ensures the subfields are not repeatedly calculated, which * saves overhead when resolving lists of values. */ -const buildSubFieldPlan = memoize3( +const collectSubfields = memoize3( ( exeContext: ExecutionContext, returnType: GraphQLObjectType, fieldGroup: FieldGroup, - ) => { - const { fields: subFields, newDeferUsages } = collectSubfields( + ) => + _collectSubfields( exeContext.schema, exeContext.fragments, exeContext.variableValues, exeContext.operation, returnType, - fieldGroup.fields, - ); - return { - ...buildFieldPlan(subFields, fieldGroup.deferUsages), - newDeferUsages, - }; - }, + fieldGroup, + ), ); +const buildSubFieldPlan = ( + originalGroupedFieldSet: GroupedFieldSet, + deferUsageSet: DeferUsageSet | undefined, +) => { + let fieldPlan = ( + originalGroupedFieldSet as unknown as { _fieldPlan: FieldPlan } + )._fieldPlan; + if (fieldPlan !== undefined) { + return fieldPlan; + } + fieldPlan = buildFieldPlan(originalGroupedFieldSet, deferUsageSet); + (originalGroupedFieldSet as unknown as { _fieldPlan: FieldPlan })._fieldPlan = + fieldPlan; + return fieldPlan; +}; + /** * Terminology * @@ -142,7 +158,9 @@ export interface ExecutionContext { fieldResolver: GraphQLFieldResolver; typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; - incrementalPublisher: IncrementalPublisher; + errors?: Map | undefined; + cancellableStreams?: Set | undefined; + futures?: Array; } export interface ExecutionArgs { @@ -251,22 +269,179 @@ function executeImpl( // Errors from sub-fields of a NonNull type may propagate to the top level, // at which point we still log the error and null the parent field, which // in this case is the entire response. - const incrementalPublisher = exeContext.incrementalPublisher; - const initialResultRecord = new InitialResultRecord(); try { - const data = executeOperation(exeContext, initialResultRecord); + let data: PromiseOrValue>; + const { operation, schema, fragments, variableValues, rootValue } = + exeContext; + const rootType = schema.getRootType(operation.operation); + if (rootType == null) { + throw new GraphQLError( + `Schema is not configured to execute ${operation.operation} operation.`, + { nodes: operation }, + ); + } + + const { groupedFieldSet: nonPartitionedGroupedFieldSet, newDeferUsages } = + collectFields(schema, fragments, variableValues, rootType, operation); + if (newDeferUsages.length === 0) { + data = executeRootGroupedFieldSet( + exeContext, + operation.operation, + rootType, + rootValue, + nonPartitionedGroupedFieldSet, + undefined, + ); + } else { + const { groupedFieldSet, newGroupedFieldSets } = buildFieldPlan( + nonPartitionedGroupedFieldSet, + ); + + const newDeferMap = addNewDeferredFragments(newDeferUsages, new Map()); + + data = executeRootGroupedFieldSet( + exeContext, + operation.operation, + rootType, + rootValue, + groupedFieldSet, + newDeferMap, + ); + + if (newGroupedFieldSets.size > 0) { + const newDeferredGroupedFieldSetRecords = + executeDeferredGroupedFieldSets( + exeContext, + rootType, + rootValue, + undefined, + newGroupedFieldSets, + newDeferMap, + ); + + addFutures(exeContext, newDeferredGroupedFieldSetRecords); + } + } + if (isPromise(data)) { return data.then( - (resolved) => - incrementalPublisher.buildDataResponse(initialResultRecord, resolved), - (error) => - incrementalPublisher.buildErrorResponse(initialResultRecord, error), + (resolved) => buildDataResponse(exeContext, resolved), + (error) => ({ + data: null, + errors: withError(exeContext.errors, error), + }), ); } - return incrementalPublisher.buildDataResponse(initialResultRecord, data); + return buildDataResponse(exeContext, data); } catch (error) { - return incrementalPublisher.buildErrorResponse(initialResultRecord, error); + return { data: null, errors: withError(exeContext.errors, error) }; + } +} + +function addFutures( + context: ExecutionContext | IncrementalContext, + newFutures: ReadonlyArray, +): void { + const futures = context.futures; + if (futures === undefined) { + context.futures = [...newFutures]; + return; + } + futures.push(...newFutures); +} + +function addFuture( + context: ExecutionContext | IncrementalContext, + newFuture: Future, +): void { + const futures = context.futures; + if (futures === undefined) { + context.futures = [newFuture]; + return; + } + futures.push(newFuture); +} + +function withError( + errors: ReadonlyMap | undefined, + error: GraphQLError, +): ReadonlyArray { + return errors === undefined ? [error] : [...errors.values(), error]; +} + +function buildDataResponse( + exeContext: ExecutionContext, + data: ObjMap, +): ExecutionResult | ExperimentalIncrementalExecutionResults { + const { errors, futures } = exeContext; + if (futures === undefined) { + return buildSingleResult(data, errors); + } + + if (errors === undefined) { + return buildIncrementalResponse(exeContext, data, undefined, futures); + } + + const filteredFutures = filterFutures(undefined, errors, futures); + + if (filteredFutures.length === 0) { + return buildSingleResult(data, errors); + } + + return buildIncrementalResponse( + exeContext, + data, + Array.from(errors.values()), + filteredFutures, + ); +} + +function buildSingleResult( + data: ObjMap, + errors: ReadonlyMap | undefined, +): ExecutionResult { + return errors !== undefined + ? { errors: Array.from(errors.values()), data } + : { data }; +} + +function filterFutures( + initialPath: Path | undefined, + errors: ReadonlyMap, + futures: ReadonlyArray, +): ReadonlyArray { + const filteredFutures: Array = []; + for (const future of futures) { + let currentPath: Path | undefined = isDeferredGroupedFieldSetRecord(future) + ? future.path + : future.streamRecord.path; + + if (errors.has(currentPath)) { + continue; + } + + const paths: Array = [currentPath]; + let filtered = false; + while (currentPath !== initialPath) { + // Because currentPath leads to initialPath or is undefined, and the + // loop will exit if initialPath is undefined, currentPath must be + // defined. + // TODO: Consider, however, adding an invariant. + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + currentPath = currentPath!.prev; + if (errors.has(currentPath)) { + filtered = true; + break; + } + paths.push(currentPath); + } + + if (!filtered) { + filteredFutures.push(future); + } } + + return filteredFutures; } /** @@ -370,7 +545,6 @@ export function buildExecutionContext( fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, - incrementalPublisher: new IncrementalPublisher(), }; } @@ -381,105 +555,51 @@ function buildPerEventExecutionContext( return { ...exeContext, rootValue: payload, + errors: undefined, }; } -/** - * Implements the "Executing operations" section of the spec. - */ -function executeOperation( +function executeRootGroupedFieldSet( exeContext: ExecutionContext, - initialResultRecord: InitialResultRecord, + operation: OperationTypeNode, + rootType: GraphQLObjectType, + rootValue: unknown, + groupedFieldSet: GroupedFieldSet, + deferMap: ReadonlyMap | undefined, ): PromiseOrValue> { - const { - operation, - schema, - fragments, - variableValues, - rootValue, - incrementalPublisher, - } = exeContext; - const rootType = schema.getRootType(operation.operation); - if (rootType == null) { - throw new GraphQLError( - `Schema is not configured to execute ${operation.operation} operation.`, - { nodes: operation }, - ); - } - - const { fields, newDeferUsages } = collectFields( - schema, - fragments, - variableValues, - rootType, - operation, - ); - const { groupedFieldSet, newGroupedFieldSetDetailsMap } = - buildFieldPlan(fields); - - const newDeferMap = addNewDeferredFragments( - incrementalPublisher, - newDeferUsages, - initialResultRecord, - ); - - const path = undefined; - - const newDeferredGroupedFieldSetRecords = addNewDeferredGroupedFieldSets( - incrementalPublisher, - newGroupedFieldSetDetailsMap, - newDeferMap, - path, - ); - - let result; - switch (operation.operation) { + switch (operation) { case OperationTypeNode.QUERY: - result = executeFields( + return executeFields( exeContext, rootType, rootValue, - path, + undefined, groupedFieldSet, - initialResultRecord, - newDeferMap, + undefined, + deferMap, ); - break; case OperationTypeNode.MUTATION: - result = executeFieldsSerially( + return executeFieldsSerially( exeContext, rootType, rootValue, - path, + undefined, groupedFieldSet, - initialResultRecord, - newDeferMap, + deferMap, ); - break; case OperationTypeNode.SUBSCRIPTION: // TODO: deprecate `subscribe` and move all logic here // Temporary solution until we finish merging execute and subscribe together - result = executeFields( + return executeFields( exeContext, rootType, rootValue, - path, + undefined, groupedFieldSet, - initialResultRecord, - newDeferMap, + undefined, + deferMap, ); } - - executeDeferredGroupedFieldSets( - exeContext, - rootType, - rootValue, - path, - newDeferredGroupedFieldSetRecords, - newDeferMap, - ); - - return result; } /** @@ -492,8 +612,7 @@ function executeFieldsSerially( sourceValue: unknown, path: Path | undefined, groupedFieldSet: GroupedFieldSet, - incrementalDataRecord: InitialResultRecord, - deferMap: ReadonlyMap, + deferMap: ReadonlyMap | undefined, ): PromiseOrValue> { return promiseReduce( groupedFieldSet, @@ -505,15 +624,15 @@ function executeFieldsSerially( sourceValue, fieldGroup, fieldPath, - incrementalDataRecord, + undefined, deferMap, ); if (result === undefined) { return results; } if (isPromise(result)) { - return result.then((resolvedResult) => { - results[responseName] = resolvedResult; + return result.then((resolved) => { + results[responseName] = resolved; return results; }); } @@ -534,8 +653,8 @@ function executeFields( sourceValue: unknown, path: Path | undefined, groupedFieldSet: GroupedFieldSet, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, ): PromiseOrValue> { const results = Object.create(null); let containsPromise = false; @@ -549,7 +668,7 @@ function executeFields( sourceValue, fieldGroup, fieldPath, - incrementalDataRecord, + incrementalContext, deferMap, ); @@ -565,12 +684,12 @@ function executeFields( // Ensure that any promises returned by other fields are handled, as they may also reject. return promiseForObject(results).finally(() => { throw error; - }); + }) as never; } throw error; } - // If there are no promises, we can just return the object + // If there are no promises, we can just return the object and any futures if (!containsPromise) { return results; } @@ -582,7 +701,7 @@ function executeFields( } function toNodes(fieldGroup: FieldGroup): ReadonlyArray { - return fieldGroup.fields.map((fieldDetails) => fieldDetails.node); + return fieldGroup.map((fieldDetails) => fieldDetails.node); } /** @@ -597,10 +716,10 @@ function executeField( source: unknown, fieldGroup: FieldGroup, path: Path, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, ): PromiseOrValue { - const fieldName = fieldGroup.fields[0].node.name.value; + const fieldName = fieldGroup[0].node.name.value; const fieldDef = exeContext.schema.getField(parentType, fieldName); if (!fieldDef) { return; @@ -624,7 +743,7 @@ function executeField( // TODO: find a way to memoize, in case this field is within a List type. const args = getArgumentValues( fieldDef, - fieldGroup.fields[0].node, + fieldGroup[0].node, exeContext.variableValues, ); @@ -643,7 +762,7 @@ function executeField( info, path, result, - incrementalDataRecord, + incrementalContext, deferMap, ); } @@ -655,7 +774,7 @@ function executeField( info, path, result, - incrementalDataRecord, + incrementalContext, deferMap, ); @@ -669,9 +788,8 @@ function executeField( returnType, fieldGroup, path, - incrementalDataRecord, + incrementalContext, ); - exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; }); } @@ -683,9 +801,8 @@ function executeField( returnType, fieldGroup, path, - incrementalDataRecord, + incrementalContext, ); - exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; } } @@ -723,7 +840,7 @@ function handleFieldError( returnType: GraphQLOutputType, fieldGroup: FieldGroup, path: Path, - incrementalDataRecord: IncrementalDataRecord, + incrementalContext: IncrementalContext | undefined, ): void { const error = locatedError(rawError, toNodes(fieldGroup), pathToArray(path)); @@ -735,7 +852,13 @@ function handleFieldError( // Otherwise, error protection is applied, logging the error and resolving // a null value for this field if one is encountered. - exeContext.incrementalPublisher.addFieldError(incrementalDataRecord, error); + const context = incrementalContext ?? exeContext; + let errors = context.errors; + if (errors === undefined) { + errors = new Map(); + context.errors = errors; + } + errors.set(path, error); } /** @@ -766,8 +889,8 @@ function completeValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, ): PromiseOrValue { // If result is an Error, throw a located error. if (result instanceof Error) { @@ -784,7 +907,7 @@ function completeValue( info, path, result, - incrementalDataRecord, + incrementalContext, deferMap, ); if (completed === null) { @@ -809,7 +932,7 @@ function completeValue( info, path, result, - incrementalDataRecord, + incrementalContext, deferMap, ); } @@ -830,7 +953,7 @@ function completeValue( info, path, result, - incrementalDataRecord, + incrementalContext, deferMap, ); } @@ -844,7 +967,7 @@ function completeValue( info, path, result, - incrementalDataRecord, + incrementalContext, deferMap, ); } @@ -863,8 +986,8 @@ async function completePromisedValue( info: GraphQLResolveInfo, path: Path, result: Promise, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, ): Promise { try { const resolved = await result; @@ -875,7 +998,7 @@ async function completePromisedValue( info, path, resolved, - incrementalDataRecord, + incrementalContext, deferMap, ); if (isPromise(completed)) { @@ -889,9 +1012,8 @@ async function completePromisedValue( returnType, fieldGroup, path, - incrementalDataRecord, + incrementalContext, ); - exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; } } @@ -913,19 +1035,17 @@ function getStreamUsage( // TODO: add test for this case (a streamed list nested under a list). /* c8 ignore next 7 */ - if ( - (fieldGroup as unknown as { _streamUsage: StreamUsage })._streamUsage !== - undefined - ) { - return (fieldGroup as unknown as { _streamUsage: StreamUsage }) - ._streamUsage; + let streamUsage = (fieldGroup as unknown as { _streamUsage: StreamUsage }) + ._streamUsage; + if (streamUsage !== undefined) { + return streamUsage; } // validation only allows equivalent streams on multiple fields, so it is // safe to only check the first fieldNode for the stream directive const stream = getDirectiveValues( GraphQLStreamDirective, - fieldGroup.fields[0].node, + fieldGroup[0].node, exeContext.variableValues, ); @@ -952,17 +1072,15 @@ function getStreamUsage( '`@stream` directive not supported on subscription operations. Disable `@stream` by setting the `if` argument to `false`.', ); - const streamedFieldGroup: FieldGroup = { - fields: fieldGroup.fields.map((fieldDetails) => ({ - node: fieldDetails.node, - deferUsage: undefined, - })), - }; + const streamedReadonlyArray: FieldGroup = fieldGroup.map((fieldDetails) => ({ + node: fieldDetails.node, + deferUsage: undefined, + })); - const streamUsage = { + streamUsage = { initialCount: stream.initialCount, label: typeof stream.label === 'string' ? stream.label : undefined, - fieldGroup: streamedFieldGroup, + fieldGroup: streamedReadonlyArray, }; (fieldGroup as unknown as { _streamUsage: StreamUsage })._streamUsage = @@ -981,8 +1099,8 @@ async function completeAsyncIteratorValue( info: GraphQLResolveInfo, path: Path, asyncIterator: AsyncIterator, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, ): Promise> { const streamUsage = getStreamUsage(exeContext, fieldGroup, path); let containsPromise = false; @@ -991,27 +1109,38 @@ async function completeAsyncIteratorValue( // eslint-disable-next-line no-constant-condition while (true) { if (streamUsage && index >= streamUsage.initialCount) { - const earlyReturn = asyncIterator.return; const streamRecord = new StreamRecord({ label: streamUsage.label, path, - earlyReturn: - earlyReturn === undefined - ? undefined - : earlyReturn.bind(asyncIterator), + earlyReturn: asyncIterator.return?.bind(asyncIterator), }); - // eslint-disable-next-line @typescript-eslint/no-floating-promises - executeStreamAsyncIterator( + + if (exeContext.cancellableStreams === undefined) { + exeContext.cancellableStreams = new Set(); + } + exeContext.cancellableStreams.add(streamRecord); + + const firstStreamItems = firstAsyncStreamItems( + streamRecord, + path, index, + toNodes(fieldGroup), asyncIterator, - exeContext, - streamUsage.fieldGroup, - info, - itemType, - path, - incrementalDataRecord, - streamRecord, + (currentItemPath, currentItem, currentIncrementalContext) => + completeStreamItems( + streamRecord, + currentItemPath, + currentItem, + exeContext, + currentIncrementalContext, + streamUsage.fieldGroup, + info, + itemType, + ), ); + + const context = incrementalContext ?? exeContext; + addFuture(context, firstStreamItems); break; } @@ -1036,7 +1165,7 @@ async function completeAsyncIteratorValue( fieldGroup, info, itemPath, - incrementalDataRecord, + incrementalContext, deferMap, ) ) { @@ -1044,6 +1173,7 @@ async function completeAsyncIteratorValue( } index += 1; } + return containsPromise ? Promise.all(completedResults) : completedResults; } @@ -1058,8 +1188,8 @@ function completeListValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, ): PromiseOrValue> { const itemType = returnType.ofType; @@ -1073,7 +1203,7 @@ function completeListValue( info, path, asyncIterator, - incrementalDataRecord, + incrementalContext, deferMap, ); } @@ -1089,34 +1219,46 @@ function completeListValue( // This is specified as a simple map, however we're optimizing the path // where the list contains no Promises by avoiding creating another Promise. let containsPromise = false; - let currentParents = incrementalDataRecord; const completedResults: Array = []; let index = 0; - let streamRecord: StreamRecord | undefined; - for (const item of result) { - // No need to modify the info object containing the path, - // since from here on it is not ever accessed by resolver functions. - const itemPath = addPath(path, index, undefined); + const iterator = result[Symbol.iterator](); + let iteration = iterator.next(); + while (!iteration.done) { + const item = iteration.value; if (streamUsage && index >= streamUsage.initialCount) { - if (streamRecord === undefined) { - streamRecord = new StreamRecord({ label: streamUsage.label, path }); - } - currentParents = executeStreamField( + const streamRecord = new StreamRecord({ + label: streamUsage.label, path, - itemPath, - item, - exeContext, - streamUsage.fieldGroup, - info, - itemType, - currentParents, + }); + + const firstStreamItems = firstSyncStreamItems( streamRecord, + item, + index, + iterator, + (currentItemPath, currentItem, currentIncrementalContext) => + completeStreamItems( + streamRecord, + currentItemPath, + currentItem, + exeContext, + currentIncrementalContext, + streamUsage.fieldGroup, + info, + itemType, + ), ); - index++; - continue; + + const context = incrementalContext ?? exeContext; + addFuture(context, firstStreamItems); + break; } + // No need to modify the info object containing the path, + // since from here on it is not ever accessed by resolver functions. + const itemPath = addPath(path, index, undefined); + if ( completeListItemValue( item, @@ -1126,7 +1268,7 @@ function completeListValue( fieldGroup, info, itemPath, - incrementalDataRecord, + incrementalContext, deferMap, ) ) { @@ -1134,12 +1276,8 @@ function completeListValue( } index++; - } - if (streamRecord !== undefined) { - exeContext.incrementalPublisher.setIsFinalRecord( - currentParents as StreamItemsRecord, - ); + iteration = iterator.next(); } return containsPromise ? Promise.all(completedResults) : completedResults; @@ -1158,8 +1296,8 @@ function completeListItemValue( fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemPath: Path, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, ): boolean { if (isPromise(item)) { completedResults.push( @@ -1170,7 +1308,7 @@ function completeListItemValue( info, itemPath, item, - incrementalDataRecord, + incrementalContext, deferMap, ), ); @@ -1186,7 +1324,7 @@ function completeListItemValue( info, itemPath, item, - incrementalDataRecord, + incrementalContext, deferMap, ); @@ -1201,11 +1339,7 @@ function completeListItemValue( itemType, fieldGroup, itemPath, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter( - itemPath, - incrementalDataRecord, + incrementalContext, ); return null; }), @@ -1222,9 +1356,8 @@ function completeListItemValue( itemType, fieldGroup, itemPath, - incrementalDataRecord, + incrementalContext, ); - exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); completedResults.push(null); } @@ -1260,8 +1393,8 @@ function completeAbstractValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, ): PromiseOrValue> { const resolveTypeFn = returnType.resolveType ?? exeContext.typeResolver; const contextValue = exeContext.contextValue; @@ -1283,7 +1416,7 @@ function completeAbstractValue( info, path, result, - incrementalDataRecord, + incrementalContext, deferMap, ), ); @@ -1303,7 +1436,7 @@ function completeAbstractValue( info, path, result, - incrementalDataRecord, + incrementalContext, deferMap, ); } @@ -1373,8 +1506,8 @@ function completeObjectValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, ): PromiseOrValue> { // If there is an isTypeOf predicate function, call it with the // current result. If isTypeOf returns false, then raise an error rather @@ -1393,7 +1526,7 @@ function completeObjectValue( fieldGroup, path, result, - incrementalDataRecord, + incrementalContext, deferMap, ); }); @@ -1410,7 +1543,7 @@ function completeObjectValue( fieldGroup, path, result, - incrementalDataRecord, + incrementalContext, deferMap, ); } @@ -1444,46 +1577,25 @@ function invalidReturnTypeError( * */ function addNewDeferredFragments( - incrementalPublisher: IncrementalPublisher, newDeferUsages: ReadonlyArray, - incrementalDataRecord: IncrementalDataRecord, - deferMap?: ReadonlyMap, + newDeferMap: Map, path?: Path | undefined, ): ReadonlyMap { - if (newDeferUsages.length === 0) { - // Given no DeferUsages, return the existing map, creating one if necessary. - return deferMap ?? new Map(); - } - - // Create a copy of the old map. - const newDeferMap = - deferMap === undefined - ? new Map() - : new Map(deferMap); - // For each new deferUsage object: for (const newDeferUsage of newDeferUsages) { const parentDeferUsage = newDeferUsage.parentDeferUsage; - // If the parent defer usage is not defined, the parent result record is either: - // - the InitialResultRecord, or - // - a StreamItemsRecord, as `@defer` may be nested under `@stream`. const parent = parentDeferUsage === undefined - ? (incrementalDataRecord as InitialResultRecord | StreamItemsRecord) + ? undefined : deferredFragmentRecordFromDeferUsage(parentDeferUsage, newDeferMap); // Instantiate the new record. const deferredFragmentRecord = new DeferredFragmentRecord({ path, label: newDeferUsage.label, - }); - - // Report the new record to the Incremental Publisher. - incrementalPublisher.reportNewDeferFragmentRecord( - deferredFragmentRecord, parent, - ); + }); // Update the map. newDeferMap.set(newDeferUsage, deferredFragmentRecord); @@ -1500,74 +1612,71 @@ function deferredFragmentRecordFromDeferUsage( return deferMap.get(deferUsage)!; } -function addNewDeferredGroupedFieldSets( - incrementalPublisher: IncrementalPublisher, - newGroupedFieldSetDetailsMap: Map, - deferMap: ReadonlyMap, - path?: Path | undefined, -): ReadonlyArray { - const newDeferredGroupedFieldSetRecords: Array = - []; - - for (const [ - deferUsageSet, - { groupedFieldSet, shouldInitiateDefer }, - ] of newGroupedFieldSetDetailsMap) { - const deferredFragmentRecords = getDeferredFragmentRecords( - deferUsageSet, - deferMap, - ); - const deferredGroupedFieldSetRecord = new DeferredGroupedFieldSetRecord({ - path, - deferredFragmentRecords, - groupedFieldSet, - shouldInitiateDefer, - }); - incrementalPublisher.reportNewDeferredGroupedFieldSetRecord( - deferredGroupedFieldSetRecord, - ); - newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord); - } - - return newDeferredGroupedFieldSetRecords; -} - -function getDeferredFragmentRecords( - deferUsages: DeferUsageSet, - deferMap: ReadonlyMap, -): ReadonlyArray { - return Array.from(deferUsages).map((deferUsage) => - deferredFragmentRecordFromDeferUsage(deferUsage, deferMap), - ); -} - function collectAndExecuteSubfields( exeContext: ExecutionContext, returnType: GraphQLObjectType, fieldGroup: FieldGroup, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, ): PromiseOrValue> { // Collect sub-fields to execute to complete this value. - const { groupedFieldSet, newGroupedFieldSetDetailsMap, newDeferUsages } = - buildSubFieldPlan(exeContext, returnType, fieldGroup); + const { groupedFieldSet: nonPartitionedGroupedFieldSet, newDeferUsages } = + collectSubfields(exeContext, returnType, fieldGroup); - const incrementalPublisher = exeContext.incrementalPublisher; + if (newDeferUsages.length === 0) { + if (deferMap === undefined) { + return executeFields( + exeContext, + returnType, + result, + path, + nonPartitionedGroupedFieldSet, + incrementalContext, + undefined, + ); + } - const newDeferMap = addNewDeferredFragments( - incrementalPublisher, - newDeferUsages, - incrementalDataRecord, - deferMap, - path, + const { groupedFieldSet, newGroupedFieldSets } = buildSubFieldPlan( + nonPartitionedGroupedFieldSet, + incrementalContext?.deferUsageSet, + ); + + const subFields = executeFields( + exeContext, + returnType, + result, + path, + groupedFieldSet, + incrementalContext, + deferMap, + ); + + if (newGroupedFieldSets.size > 0) { + const newDeferredGroupedFieldSetRecords = executeDeferredGroupedFieldSets( + exeContext, + returnType, + result, + path, + newGroupedFieldSets, + deferMap, + ); + + const context = incrementalContext ?? exeContext; + addFutures(context, newDeferredGroupedFieldSetRecords); + } + return subFields; + } + + const { groupedFieldSet, newGroupedFieldSets } = buildSubFieldPlan( + nonPartitionedGroupedFieldSet, + incrementalContext?.deferUsageSet, ); - const newDeferredGroupedFieldSetRecords = addNewDeferredGroupedFieldSets( - incrementalPublisher, - newGroupedFieldSetDetailsMap, - newDeferMap, + const newDeferMap = addNewDeferredFragments( + newDeferUsages, + new Map(deferMap), path, ); @@ -1577,19 +1686,23 @@ function collectAndExecuteSubfields( result, path, groupedFieldSet, - incrementalDataRecord, + incrementalContext, newDeferMap, ); - executeDeferredGroupedFieldSets( - exeContext, - returnType, - result, - path, - newDeferredGroupedFieldSetRecords, - newDeferMap, - ); + if (newGroupedFieldSets.size > 0) { + const newDeferredGroupedFieldSetRecords = executeDeferredGroupedFieldSets( + exeContext, + returnType, + result, + path, + newGroupedFieldSets, + newDeferMap, + ); + const context = incrementalContext ?? exeContext; + addFutures(context, newDeferredGroupedFieldSetRecords); + } return subFields; } @@ -1806,7 +1919,7 @@ function executeSubscription( ); } - const { fields } = collectFields( + const { groupedFieldSet } = collectFields( schema, fragments, variableValues, @@ -1814,15 +1927,15 @@ function executeSubscription( operation, ); - const firstRootField = fields.entries().next().value as [ + const firstRootField = groupedFieldSet.entries().next().value as [ string, - ReadonlyArray, + FieldGroup, ]; - const [responseName, fieldDetailsList] = firstRootField; - const fieldName = fieldDetailsList[0].node.name.value; + const [responseName, fieldGroup] = firstRootField; + const fieldName = fieldGroup[0].node.name.value; const fieldDef = schema.getField(rootType, fieldName); - const fieldNodes = fieldDetailsList.map((fieldDetails) => fieldDetails.node); + const fieldNodes = fieldGroup.map((fieldDetails) => fieldDetails.node); if (!fieldDef) { throw new GraphQLError( `The subscription field "${fieldName}" is not defined.`, @@ -1890,132 +2003,344 @@ function executeDeferredGroupedFieldSets( parentType: GraphQLObjectType, sourceValue: unknown, path: Path | undefined, - newDeferredGroupedFieldSetRecords: ReadonlyArray, + newGroupedFieldSets: Map, deferMap: ReadonlyMap, -): void { - for (const deferredGroupedFieldSetRecord of newDeferredGroupedFieldSetRecords) { - if (deferredGroupedFieldSetRecord.shouldInitiateDefer) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - Promise.resolve().then(() => +): ReadonlyArray { + const newDeferredGroupedFieldSetRecords: Array = + []; + + for (const [deferUsageSet, groupedFieldSet] of newGroupedFieldSets) { + const deferredFragmentRecords = getDeferredFragmentRecords( + deferUsageSet, + deferMap, + ); + + const deferredGroupedFieldSetRecord = new DeferredGroupedFieldSetRecord({ + path, + deferUsageSet, + deferredFragmentRecords, + executor: (incrementalContext) => executeDeferredGroupedFieldSet( + deferredFragmentRecords, exeContext, parentType, sourceValue, path, - deferredGroupedFieldSetRecord, + groupedFieldSet, + incrementalContext, deferMap, ), - ); - continue; - } + }); - executeDeferredGroupedFieldSet( - exeContext, - parentType, - sourceValue, - path, - deferredGroupedFieldSetRecord, - deferMap, - ); + newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord); } + + return newDeferredGroupedFieldSetRecords; } function executeDeferredGroupedFieldSet( + deferredFragmentRecords: ReadonlyArray, exeContext: ExecutionContext, parentType: GraphQLObjectType, sourceValue: unknown, path: Path | undefined, - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + groupedFieldSet: GroupedFieldSet, + incrementalContext: IncrementalContext, deferMap: ReadonlyMap, -): void { +): PromiseOrValue { + let data; try { - const incrementalResult = executeFields( + data = executeFields( exeContext, parentType, sourceValue, path, - deferredGroupedFieldSetRecord.groupedFieldSet, - deferredGroupedFieldSetRecord, + groupedFieldSet, + incrementalContext, deferMap, ); + } catch (error) { + return { + deferredFragmentRecords, + path: pathToArray(path), + result: null, + errors: withError(incrementalContext.errors, error), + }; + } - if (isPromise(incrementalResult)) { - incrementalResult.then( - (resolved) => - exeContext.incrementalPublisher.completeDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord, - resolved, - ), - (error) => - exeContext.incrementalPublisher.markErroredDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord, - error, - ), - ); - return; - } - - exeContext.incrementalPublisher.completeDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord, - incrementalResult, + if (isPromise(data)) { + return data.then( + (resolved) => + buildDeferredGroupedFieldSetResult( + incrementalContext, + deferredFragmentRecords, + path, + resolved, + ), + (error) => ({ + deferredFragmentRecords, + path: pathToArray(path), + result: null, + errors: withError(incrementalContext.errors, error), + }), ); + } + + return buildDeferredGroupedFieldSetResult( + incrementalContext, + deferredFragmentRecords, + path, + data, + ); +} + +function buildDeferredGroupedFieldSetResult( + incrementalContext: IncrementalContext, + deferredFragmentRecords: ReadonlyArray, + path: Path | undefined, + data: ObjMap, +): DeferredGroupedFieldSetResult { + const { errors, futures } = incrementalContext; + if (futures === undefined) { + return { + deferredFragmentRecords, + path: pathToArray(path), + result: + errors === undefined + ? { + data, + } + : { data, errors: [...errors.values()] }, + }; + } + + if (errors === undefined) { + return { + deferredFragmentRecords, + path: pathToArray(path), + result: { data }, + futures, + }; + } + + return { + deferredFragmentRecords, + path: pathToArray(path), + result: { data, errors: [...errors.values()] }, + futures: filterFutures(path, errors, futures), + }; +} + +function getDeferredFragmentRecords( + deferUsages: DeferUsageSet, + deferMap: ReadonlyMap, +): ReadonlyArray { + return Array.from(deferUsages).map((deferUsage) => + deferredFragmentRecordFromDeferUsage(deferUsage, deferMap), + ); +} + +function firstSyncStreamItems( + streamRecord: StreamRecord, + initialItem: PromiseOrValue, + initialIndex: number, + iterator: Iterator, + executor: ( + itemPath: Path, + item: PromiseOrValue, + incrementalContext: IncrementalContext, + ) => PromiseOrValue, +): StreamItemsRecord { + const path = streamRecord.path; + const initialPath = addPath(path, initialIndex, undefined); + + const firstStreamItems = new StreamItemsRecord({ + streamRecord, + itemPath: initialPath, + executor: (incrementalContext) => + Promise.resolve().then(() => { + const firstResult = executor( + initialPath, + initialItem, + incrementalContext, + ); + let currentIndex = initialIndex; + let currentStreamItems = firstStreamItems; + let iteration = iterator.next(); + while (!iteration.done) { + const item = iteration.value; + currentIndex++; + + const currentPath = addPath(path, currentIndex, undefined); + + const nextStreamItems = new StreamItemsRecord({ + streamRecord, + itemPath: currentPath, + executor: (nextIncrementalContext) => + executor(currentPath, item, nextIncrementalContext), + }); + + currentStreamItems.nextStreamItems = nextStreamItems; + currentStreamItems = nextStreamItems; + iteration = iterator.next(); + } + + currentStreamItems.nextStreamItems = new StreamItemsRecord({ + streamRecord, + executor: () => ({ streamRecord }), + }); + + return firstResult; + }), + }); + return firstStreamItems; +} + +function firstAsyncStreamItems( + streamRecord: StreamRecord, + path: Path, + initialIndex: number, + nodes: ReadonlyArray, + asyncIterator: AsyncIterator, + executor: ( + itemPath: Path, + item: PromiseOrValue, + incrementalContext: IncrementalContext, + ) => PromiseOrValue, +): StreamItemsRecord { + const initialPath = addPath(path, initialIndex, undefined); + const firstStreamItems: StreamItemsRecord = new StreamItemsRecord({ + streamRecord, + itemPath: initialPath, + executor: (incrementalContext) => + Promise.resolve().then(() => + getNextAsyncStreamItemsResult( + streamRecord, + firstStreamItems, + path, + initialIndex, + incrementalContext, + nodes, + asyncIterator, + executor, + ), + ), + }); + return firstStreamItems; +} + +async function getNextAsyncStreamItemsResult( + streamRecord: StreamRecord, + initialStreamItemsRecord: StreamItemsRecord, + path: Path, + index: number, + incrementalContext: IncrementalContext, + nodes: ReadonlyArray, + asyncIterator: AsyncIterator, + executor: ( + itemPath: Path, + item: PromiseOrValue, + incrementalContext: IncrementalContext, + ) => PromiseOrValue, +): Promise { + let iteration; + try { + iteration = await asyncIterator.next(); } catch (error) { - exeContext.incrementalPublisher.markErroredDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord, - error, - ); + return { + streamRecord, + result: null, + errors: [locatedError(error, nodes, pathToArray(path))], + }; + } + + if (iteration.done) { + return { streamRecord }; } + + const itemPath = addPath(path, index, undefined); + + const result = executor(itemPath, iteration.value, incrementalContext); + + const nextStreamItems: StreamItemsRecord = nextAsyncStreamItems( + streamRecord, + path, + itemPath, + index, + nodes, + asyncIterator, + executor, + ); + initialStreamItemsRecord.nextStreamItems = nextStreamItems; + + return result; } -function executeStreamField( +function nextAsyncStreamItems( + streamRecord: StreamRecord, path: Path, + initialPath: Path, + initialIndex: number, + nodes: ReadonlyArray, + asyncIterator: AsyncIterator, + executor: ( + itemPath: Path, + item: PromiseOrValue, + incrementalContext: IncrementalContext, + ) => PromiseOrValue, +): StreamItemsRecord { + const nextStreamItems: StreamItemsRecord = new StreamItemsRecord({ + streamRecord, + itemPath: initialPath, + executor: (incrementalContext) => + Promise.resolve().then(() => + getNextAsyncStreamItemsResult( + streamRecord, + nextStreamItems, + path, + initialIndex + 1, + incrementalContext, + nodes, + asyncIterator, + executor, + ), + ), + }); + return nextStreamItems; +} + +function completeStreamItems( + streamRecord: StreamRecord, itemPath: Path, - item: PromiseOrValue, + item: unknown, exeContext: ExecutionContext, + incrementalContext: IncrementalContext, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, - incrementalDataRecord: IncrementalDataRecord, - streamRecord: StreamRecord, -): StreamItemsRecord { - const incrementalPublisher = exeContext.incrementalPublisher; - const streamItemsRecord = new StreamItemsRecord({ - streamRecord, - path: itemPath, - }); - incrementalPublisher.reportNewStreamItemsRecord( - streamItemsRecord, - incrementalDataRecord, - ); - +): PromiseOrValue { if (isPromise(item)) { - completePromisedValue( + return completePromisedValue( exeContext, itemType, fieldGroup, info, itemPath, item, - streamItemsRecord, + incrementalContext, new Map(), ).then( - (value) => - incrementalPublisher.completeStreamItemsRecord(streamItemsRecord, [ - value, - ]), - (error) => { - incrementalPublisher.filter(path, streamItemsRecord); - incrementalPublisher.markErroredStreamItemsRecord( - streamItemsRecord, - error, - ); - }, + (resolvedItem) => + buildStreamItemsResult(incrementalContext, streamRecord, resolvedItem), + (error) => ({ + streamRecord, + result: null, + errors: withError(incrementalContext.errors, error), + }), ); - - return streamItemsRecord; } - let completedItem: PromiseOrValue; + let completedItem; try { try { completedItem = completeValue( @@ -2025,7 +2350,7 @@ function executeStreamField( info, itemPath, item, - streamItemsRecord, + incrementalContext, new Map(), ); } catch (rawError) { @@ -2035,19 +2360,20 @@ function executeStreamField( itemType, fieldGroup, itemPath, - streamItemsRecord, + incrementalContext, ); completedItem = null; - incrementalPublisher.filter(itemPath, streamItemsRecord); } } catch (error) { - incrementalPublisher.filter(path, streamItemsRecord); - incrementalPublisher.markErroredStreamItemsRecord(streamItemsRecord, error); - return streamItemsRecord; + return { + streamRecord, + result: null, + errors: withError(incrementalContext.errors, error), + }; } if (isPromise(completedItem)) { - completedItem + return completedItem .then(undefined, (rawError) => { handleFieldError( rawError, @@ -2055,178 +2381,66 @@ function executeStreamField( itemType, fieldGroup, itemPath, - streamItemsRecord, + incrementalContext, ); - incrementalPublisher.filter(itemPath, streamItemsRecord); return null; }) .then( - (value) => - incrementalPublisher.completeStreamItemsRecord(streamItemsRecord, [ - value, - ]), - (error) => { - incrementalPublisher.filter(path, streamItemsRecord); - incrementalPublisher.markErroredStreamItemsRecord( - streamItemsRecord, - error, - ); - }, + (resolvedItem) => + buildStreamItemsResult( + incrementalContext, + streamRecord, + resolvedItem, + ), + (error) => ({ + streamRecord, + result: null, + errors: withError(incrementalContext.errors, error), + }), ); - - return streamItemsRecord; } - incrementalPublisher.completeStreamItemsRecord(streamItemsRecord, [ + return buildStreamItemsResult( + incrementalContext, + streamRecord, completedItem, - ]); - return streamItemsRecord; -} - -async function executeStreamAsyncIteratorItem( - asyncIterator: AsyncIterator, - exeContext: ExecutionContext, - fieldGroup: FieldGroup, - info: GraphQLResolveInfo, - itemType: GraphQLOutputType, - streamItemsRecord: StreamItemsRecord, - itemPath: Path, -): Promise> { - let item; - try { - const iteration = await asyncIterator.next(); - if (streamItemsRecord.streamRecord.errors.length > 0) { - return { done: true, value: undefined }; - } - if (iteration.done) { - exeContext.incrementalPublisher.setIsCompletedAsyncIterator( - streamItemsRecord, - ); - return { done: true, value: undefined }; - } - item = iteration.value; - } catch (rawError) { - throw locatedError( - rawError, - toNodes(fieldGroup), - streamItemsRecord.streamRecord.path, - ); - } - let completedItem; - try { - completedItem = completeValue( - exeContext, - itemType, - fieldGroup, - info, - itemPath, - item, - streamItemsRecord, - new Map(), - ); - - if (isPromise(completedItem)) { - completedItem = completedItem.then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - itemType, - fieldGroup, - itemPath, - streamItemsRecord, - ); - exeContext.incrementalPublisher.filter(itemPath, streamItemsRecord); - return null; - }); - } - return { done: false, value: completedItem }; - } catch (rawError) { - handleFieldError( - rawError, - exeContext, - itemType, - fieldGroup, - itemPath, - streamItemsRecord, - ); - exeContext.incrementalPublisher.filter(itemPath, streamItemsRecord); - return { done: false, value: null }; - } + ); } -async function executeStreamAsyncIterator( - initialIndex: number, - asyncIterator: AsyncIterator, - exeContext: ExecutionContext, - fieldGroup: FieldGroup, - info: GraphQLResolveInfo, - itemType: GraphQLOutputType, - path: Path, - incrementalDataRecord: IncrementalDataRecord, +function buildStreamItemsResult( + incrementalContext: IncrementalContext, streamRecord: StreamRecord, -): Promise { - const incrementalPublisher = exeContext.incrementalPublisher; - let index = initialIndex; - let currentIncrementalDataRecord = incrementalDataRecord; - // eslint-disable-next-line no-constant-condition - while (true) { - const itemPath = addPath(path, index, undefined); - const streamItemsRecord = new StreamItemsRecord({ + completedItem: unknown, +): StreamItemsResult { + const { errors, futures } = incrementalContext; + if (futures === undefined) { + return { streamRecord, - path: itemPath, - }); - incrementalPublisher.reportNewStreamItemsRecord( - streamItemsRecord, - currentIncrementalDataRecord, - ); - - let iteration; - try { - // eslint-disable-next-line no-await-in-loop - iteration = await executeStreamAsyncIteratorItem( - asyncIterator, - exeContext, - fieldGroup, - info, - itemType, - streamItemsRecord, - itemPath, - ); - } catch (error) { - incrementalPublisher.filter(path, streamItemsRecord); - incrementalPublisher.markErroredStreamItemsRecord( - streamItemsRecord, - error, - ); - return; - } - - const { done, value: completedItem } = iteration; - - if (isPromise(completedItem)) { - completedItem.then( - (value) => - incrementalPublisher.completeStreamItemsRecord(streamItemsRecord, [ - value, - ]), - (error) => { - incrementalPublisher.filter(path, streamItemsRecord); - incrementalPublisher.markErroredStreamItemsRecord( - streamItemsRecord, - error, - ); - }, - ); - } else { - incrementalPublisher.completeStreamItemsRecord(streamItemsRecord, [ - completedItem, - ]); - } + result: + errors === undefined + ? { items: [completedItem] } + : { + items: [completedItem], + errors: [...errors.values()], + }, + }; + } - if (done) { - break; - } - currentIncrementalDataRecord = streamItemsRecord; - index++; + if (errors === undefined) { + return { + streamRecord, + result: { items: [completedItem] }, + futures, + }; } + + const path = incrementalContext.path; + return { + streamRecord, + result: { + items: [completedItem], + errors: [...errors.values()], + }, + futures: filterFutures(path, errors, futures), + }; } diff --git a/src/validation/rules/SingleFieldSubscriptionsRule.ts b/src/validation/rules/SingleFieldSubscriptionsRule.ts index 06d9545fbc..700bc0bda7 100644 --- a/src/validation/rules/SingleFieldSubscriptionsRule.ts +++ b/src/validation/rules/SingleFieldSubscriptionsRule.ts @@ -10,15 +10,13 @@ import type { import { Kind } from '../../language/kinds.js'; import type { ASTVisitor } from '../../language/visitor.js'; -import type { FieldDetails } from '../../execution/collectFields.js'; +import type { FieldGroup } from '../../execution/collectFields.js'; import { collectFields } from '../../execution/collectFields.js'; import type { ValidationContext } from '../ValidationContext.js'; -function toNodes( - fieldDetailsList: ReadonlyArray, -): ReadonlyArray { - return fieldDetailsList.map((fieldDetails) => fieldDetails.node); +function toNodes(fieldGroup: FieldGroup): ReadonlyArray { + return fieldGroup.map((fieldDetails) => fieldDetails.node); } /** @@ -49,15 +47,15 @@ export function SingleFieldSubscriptionsRule( fragments[definition.name.value] = definition; } } - const { fields } = collectFields( + const { groupedFieldSet } = collectFields( schema, fragments, variableValues, subscriptionType, node, ); - if (fields.size > 1) { - const fieldGroups = [...fields.values()]; + if (groupedFieldSet.size > 1) { + const fieldGroups = [...groupedFieldSet.values()]; const extraFieldGroups = fieldGroups.slice(1); const extraFieldSelections = extraFieldGroups.flatMap( (fieldGroup) => toNodes(fieldGroup), @@ -71,7 +69,7 @@ export function SingleFieldSubscriptionsRule( ), ); } - for (const fieldGroup of fields.values()) { + for (const fieldGroup of groupedFieldSet.values()) { const fieldName = toNodes(fieldGroup)[0].name.value; if (fieldName.startsWith('__')) { context.reportError(