Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize completion loops #4053

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 188 additions & 6 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1091,10 +1091,97 @@ async function completeAsyncIteratorValue(
undefined,
];
let index = 0;
const streamUsage = getStreamUsage(exeContext, fieldGroup, path);
// eslint-disable-next-line no-constant-condition
while (true) {
if (streamUsage && index >= streamUsage.initialCount) {
const itemPath = addPath(path, index, undefined);
let iteration;
try {
// eslint-disable-next-line no-await-in-loop
iteration = await asyncIterator.next();
} catch (rawError) {
throw locatedError(rawError, toNodes(fieldGroup), pathToArray(path));
}

// TODO: add test case for stream returning done before initialCount
/* c8 ignore next 3 */
if (iteration.done) {
break;
}

const item = iteration.value;
// TODO: add tests for stream backed by asyncIterator that returns a promise
/* c8 ignore start */
if (isPromise(item)) {
completedResults.push(
completePromisedListItemValue(
item,
graphqlWrappedResult,
exeContext,
itemType,
fieldGroup,
info,
itemPath,
incrementalContext,
deferMap,
),
);
containsPromise = true;
} else if (
/* c8 ignore stop */
completeListItemValue(
item,
completedResults,
graphqlWrappedResult,
exeContext,
itemType,
fieldGroup,
info,
itemPath,
incrementalContext,
deferMap,
)
// TODO: add tests for stream backed by asyncIterator that completes to a promise
/* c8 ignore start */
) {
containsPromise = true;
}
/* c8 ignore stop */
index++;
}

return containsPromise
? /* c8 ignore start */ Promise.all(completedResults).then((resolved) => [
resolved,
graphqlWrappedResult[1],
])
: /* c8 ignore stop */ graphqlWrappedResult;
}

/**
* Complete a async iterator value by completing the result and calling
* recursively until all the results are completed.
*/
async function completeAsyncIteratorValueWithPossibleStream(
exeContext: ExecutionContext,
itemType: GraphQLOutputType,
fieldGroup: FieldGroup,
info: GraphQLResolveInfo,
path: Path,
asyncIterator: AsyncIterator<unknown>,
streamUsage: StreamUsage,
incrementalContext: IncrementalContext | undefined,
deferMap: ReadonlyMap<DeferUsage, DeferredFragmentRecord> | undefined,
): Promise<GraphQLWrappedResult<ReadonlyArray<unknown>>> {
let containsPromise = false;
const completedResults: Array<unknown> = [];
const graphqlWrappedResult: GraphQLWrappedResult<Array<unknown>> = [
completedResults,
[],
];
let index = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
if (index >= streamUsage.initialCount) {
const returnFn = asyncIterator.return;
let streamRecord: SubsequentResultRecord | CancellableStreamRecord;
if (returnFn === undefined) {
Expand Down Expand Up @@ -1208,17 +1295,32 @@ function completeListValue(
deferMap: ReadonlyMap<DeferUsage, DeferredFragmentRecord> | undefined,
): PromiseOrValue<GraphQLWrappedResult<ReadonlyArray<unknown>>> {
const itemType = returnType.ofType;
const streamUsage = getStreamUsage(exeContext, fieldGroup, path);

if (isAsyncIterable(result)) {
const asyncIterator = result[Symbol.asyncIterator]();

return completeAsyncIteratorValue(
if (streamUsage === undefined) {
return completeAsyncIteratorValue(
exeContext,
itemType,
fieldGroup,
info,
path,
asyncIterator,
incrementalContext,
deferMap,
);
}

return completeAsyncIteratorValueWithPossibleStream(
exeContext,
itemType,
fieldGroup,
info,
path,
asyncIterator,
streamUsage,
incrementalContext,
deferMap,
);
Expand All @@ -1230,13 +1332,27 @@ function completeListValue(
);
}

return completeIterableValue(
if (streamUsage === undefined) {
return completeIterableValue(
exeContext,
itemType,
fieldGroup,
info,
path,
result,
incrementalContext,
deferMap,
);
}

return completeIterableValueWithPossibleStream(
exeContext,
itemType,
fieldGroup,
info,
path,
result,
streamUsage,
incrementalContext,
deferMap,
);
Expand All @@ -1261,13 +1377,79 @@ function completeIterableValue(
undefined,
];
let index = 0;
const streamUsage = getStreamUsage(exeContext, fieldGroup, path);
for (const item of items) {
// No need to modify the info object containing the path,
// since from here on it is not ever accessed by resolver functions.
const itemPath = addPath(path, index, undefined);

if (isPromise(item)) {
completedResults.push(
completePromisedListItemValue(
item,
graphqlWrappedResult,
exeContext,
itemType,
fieldGroup,
info,
itemPath,
incrementalContext,
deferMap,
),
);
containsPromise = true;
} else if (
completeListItemValue(
item,
completedResults,
graphqlWrappedResult,
exeContext,
itemType,
fieldGroup,
info,
itemPath,
incrementalContext,
deferMap,
)
) {
containsPromise = true;
}
index++;
}

return containsPromise
? Promise.all(completedResults).then((resolved) => [
resolved,
graphqlWrappedResult[1],
])
: graphqlWrappedResult;
}

function completeIterableValueWithPossibleStream(
exeContext: ExecutionContext,
itemType: GraphQLOutputType,
fieldGroup: FieldGroup,
info: GraphQLResolveInfo,
path: Path,
items: Iterable<unknown>,
streamUsage: StreamUsage,
incrementalContext: IncrementalContext | undefined,
deferMap: ReadonlyMap<DeferUsage, DeferredFragmentRecord> | undefined,
): PromiseOrValue<GraphQLWrappedResult<ReadonlyArray<unknown>>> {
// This is specified as a simple map, however we're optimizing the path
// where the list contains no Promises by avoiding creating another Promise.
let containsPromise = false;
const completedResults: Array<unknown> = [];
const graphqlWrappedResult: GraphQLWrappedResult<Array<unknown>> = [
completedResults,
[],
];
let index = 0;
const iterator = items[Symbol.iterator]();
let iteration = iterator.next();
while (!iteration.done) {
const item = iteration.value;

if (streamUsage && index >= streamUsage.initialCount) {
if (index >= streamUsage.initialCount) {
const streamRecord: SubsequentResultRecord = {
label: streamUsage.label,
path,
Expand Down
Loading