Skip to content

Commit

Permalink
accept abortSignal in execute method and check if signal is aborted w…
Browse files Browse the repository at this point in the history
…hen resolving every field
  • Loading branch information
igrlk committed Feb 11, 2023
1 parent f201681 commit f8b148f
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 0 deletions.
126 changes: 126 additions & 0 deletions src/execution/__tests__/executor-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1313,4 +1313,130 @@ describe('Execute: Handles basic execution tasks', () => {
expect(result).to.deep.equal({ data: { foo: { bar: 'bar' } } });
expect(possibleTypes).to.deep.equal([fooObject]);
});

describe('Abort execution', () => {
it('stops execution and throws an error when signal is aborted', async () => {
/**
* This test has 3 resolvers nested in each other.
* Every resolve function waits 200ms before returning data.
*
* The test waits for the first resolver and half of the 2nd resolver execution time (200ms + 100ms)
* and then aborts the execution.
*
* 2nd resolver execution finishes, and we then expect to not execute the 3rd resolver
* and to get an error about aborted operation.
*/

const WAIT_MS_BEFORE_RESOLVING = 200;
const ABORT_IN_MS_AFTER_STARTING_EXECUTION =
WAIT_MS_BEFORE_RESOLVING + WAIT_MS_BEFORE_RESOLVING / 2;

const schema = new GraphQLSchema({
query: new GraphQLObjectType({
name: 'Query',
fields: {
resolvesIn500ms: {
type: new GraphQLObjectType({
name: 'ResolvesIn500ms',
fields: {
resolvesIn400ms: {
type: new GraphQLObjectType({
name: 'ResolvesIn400ms',
fields: {
shouldNotBeResolved: {
type: GraphQLString,
resolve: () => {
throw new Error('This should not be executed!');
},
},
},
}),
resolve: () =>
new Promise((resolve) => {
setTimeout(() => resolve({}), WAIT_MS_BEFORE_RESOLVING);
}),
},
},
}),
resolve: () =>
new Promise((resolve) => {
setTimeout(() => resolve({}), WAIT_MS_BEFORE_RESOLVING);
}),
},
},
}),
});
const document = parse(`
query {
resolvesIn500ms {
resolvesIn400ms {
shouldNotBeResolved
}
}
}
`);

const abortController = new AbortController();
const executionPromise = execute({
schema,
document,
signal: abortController.signal,
});

setTimeout(
() => abortController.abort(),
ABORT_IN_MS_AFTER_STARTING_EXECUTION,
);

const result = await executionPromise;
expect(result.errors?.[0].message).to.eq(
'Execution aborted. Reason: AbortError: This operation was aborted',
);
expect(result.data).to.eql({
resolvesIn500ms: { resolvesIn400ms: null },
});
});

const abortMessageTestInputs = [
{ message: 'Aborted from somewhere', reason: 'Aborted from somewhere' },
{ message: undefined, reason: 'AbortError: This operation was aborted' },
];

for (const { message, reason } of abortMessageTestInputs) {
it('aborts with "Reason:" in the error message', async () => {
const schema = new GraphQLSchema({
query: new GraphQLObjectType({
name: 'Query',
fields: {
a: {
type: GraphQLString,
resolve: () =>
new Promise((resolve) => {
setTimeout(() => resolve({}), 100);
}),
},
},
}),
});

const document = parse(`
query { a }
`);

const abortController = new AbortController();
const executionPromise = execute({
schema,
document,
signal: abortController.signal,
});

abortController.abort(message);

const { errors } = await executionPromise;
expect(errors?.[0].message).to.eq(
`Execution aborted. Reason: ${reason}`,
);
});
}
});
});
43 changes: 43 additions & 0 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ export interface ExecutionContext {
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
errors: Array<GraphQLError>;
subsequentPayloads: Set<AsyncPayloadRecord>;
signal: Maybe<{
isAborted: boolean;
instance: AbortSignal;
}>;
}

/**
Expand Down Expand Up @@ -261,6 +265,7 @@ export interface ExecutionArgs {
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
typeResolver?: Maybe<GraphQLTypeResolver<any, any>>;
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
signal?: AbortSignal;
}

const UNEXPECTED_EXPERIMENTAL_DIRECTIVES =
Expand Down Expand Up @@ -337,9 +342,29 @@ export function experimentalExecuteIncrementally(
return executeImpl(exeContext);
}

function subscribeToAbortSignal(exeContext: ExecutionContext): {
unsubscribeFromAbortSignal: () => void;
} {
const onAbort = () => {
if ('signal' in exeContext && exeContext.signal) {
exeContext.signal.isAborted = true;
}
};

exeContext.signal?.instance.addEventListener('abort', onAbort);

return {
unsubscribeFromAbortSignal: () => {
exeContext.signal?.instance.removeEventListener('abort', onAbort);
},
};
}

function executeImpl(
exeContext: ExecutionContext,
): PromiseOrValue<ExecutionResult | ExperimentalIncrementalExecutionResults> {
const { unsubscribeFromAbortSignal } = subscribeToAbortSignal(exeContext);

// Return a Promise that will eventually resolve to the data described by
// The "Response" section of the GraphQL specification.
//
Expand All @@ -356,6 +381,8 @@ function executeImpl(
if (isPromise(result)) {
return result.then(
(data) => {
unsubscribeFromAbortSignal();

const initialResult = buildResponse(data, exeContext.errors);
if (exeContext.subsequentPayloads.size > 0) {
return {
Expand All @@ -369,11 +396,16 @@ function executeImpl(
return initialResult;
},
(error) => {
unsubscribeFromAbortSignal();

exeContext.errors.push(error);
return buildResponse(null, exeContext.errors);
},
);
}

unsubscribeFromAbortSignal();

const initialResult = buildResponse(result, exeContext.errors);
if (exeContext.subsequentPayloads.size > 0) {
return {
Expand All @@ -386,6 +418,7 @@ function executeImpl(
}
return initialResult;
} catch (error) {
unsubscribeFromAbortSignal();
exeContext.errors.push(error);
return buildResponse(null, exeContext.errors);
}
Expand Down Expand Up @@ -440,6 +473,7 @@ export function buildExecutionContext(
fieldResolver,
typeResolver,
subscribeFieldResolver,
signal,
} = args;

// If the schema used for execution is invalid, throw an error.
Expand Down Expand Up @@ -505,6 +539,7 @@ export function buildExecutionContext(
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
subsequentPayloads: new Set(),
errors: [],
signal: signal ? { instance: signal, isAborted: false } : null,
};
}

Expand Down Expand Up @@ -838,6 +873,14 @@ function completeValue(
result: unknown,
asyncPayloadRecord?: AsyncPayloadRecord,
): PromiseOrValue<unknown> {
if (exeContext.signal?.isAborted) {
throw new GraphQLError(
`Execution aborted. Reason: ${
exeContext.signal.instance.reason ?? 'Unknown.'
}`,
);
}

// If result is an Error, throw a located error.
if (result instanceof Error) {
throw result;
Expand Down

0 comments on commit f8b148f

Please sign in to comment.