Skip to content

Commit

Permalink
Consistent Publisher error handling with SSE
Browse files Browse the repository at this point in the history
Closes gh-1080
  • Loading branch information
rstoyanchev committed Oct 18, 2024
1 parent cb9afd0 commit 58cce01
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import graphql.ErrorType;
import graphql.ExecutionResult;
import graphql.GraphQLError;
import graphql.GraphqlErrorBuilder;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -86,7 +87,7 @@ protected Mono<ServerResponse> prepareResponse(ServerRequest request, WebGraphQl
if (response.getData() instanceof Publisher) {
resultFlux = Flux.from((Publisher<ExecutionResult>) response.getData())
.map(ExecutionResult::toSpecification)
.onErrorResume(SubscriptionPublisherException.class, (ex) -> Mono.just(ex.toMap()));
.onErrorResume(this::exceptionToResultMap);
}
else {
if (this.logger.isDebugEnabled()) {
Expand All @@ -102,14 +103,25 @@ protected Mono<ServerResponse> prepareResponse(ServerRequest request, WebGraphQl
}

Flux<ServerSentEvent<Map<String, Object>>> sseFlux =
resultFlux.map((event) -> ServerSentEvent.builder(event).event("next").build());
resultFlux.map((event) -> ServerSentEvent.builder(event).event("next").build())
.concatWith(COMPLETE_EVENT);

Mono<ServerResponse> responseMono = ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(BodyInserters.fromServerSentEvents(sseFlux.concatWith(COMPLETE_EVENT)))
.body(BodyInserters.fromServerSentEvents(sseFlux))
.onErrorResume(Throwable.class, (ex) -> ServerResponse.badRequest().build());

return ((this.timeout != null) ? responseMono.timeout(this.timeout) : responseMono);
}

private Mono<Map<String, Object>> exceptionToResultMap(Throwable ex) {
return Mono.just((ex instanceof SubscriptionPublisherException spe) ?
spe.toMap() :
GraphqlErrorBuilder.newError()
.message("Subscription error")
.errorType(org.springframework.graphql.execution.ErrorType.INTERNAL_ERROR)
.build()
.toSpecification());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import graphql.ErrorType;
import graphql.ExecutionResult;
import graphql.GraphQLError;
import graphql.GraphqlErrorBuilder;
import org.reactivestreams.Publisher;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -119,10 +120,10 @@ private SseSubscriber(ServerResponse.SseBuilder sseBuilder) {

@Override
protected void hookOnNext(Map<String, Object> value) {
writeResult(value);
sendNext(value);
}

private void writeResult(Map<String, Object> value) {
private void sendNext(Map<String, Object> value) {
try {
this.sseBuilder.event("next");
this.sseBuilder.data(value);
Expand All @@ -139,18 +140,21 @@ private void cancelWithError(Throwable ex) {

@Override
protected void hookOnError(Throwable ex) {
if (ex instanceof SubscriptionPublisherException spe) {
ExecutionResult result = ExecutionResult.newExecutionResult().errors(spe.getErrors()).build();
writeResult(result.toSpecification());
hookOnComplete();
}
else {
this.sseBuilder.error(ex);
}
sendNext(exceptionToResultMap(ex));
sendComplete();
}

@Override
protected void hookOnComplete() {
private static Map<String, Object> exceptionToResultMap(Throwable ex) {
return ((ex instanceof SubscriptionPublisherException spe) ?
spe.toMap() :
GraphqlErrorBuilder.newError()
.message("Subscription error")
.errorType(org.springframework.graphql.execution.ErrorType.INTERNAL_ERROR)
.build()
.toSpecification());
}

private void sendComplete() {
try {
this.sseBuilder.event("complete").data("");
}
Expand All @@ -160,6 +164,11 @@ protected void hookOnComplete() {
this.sseBuilder.complete();
}

@Override
protected void hookOnComplete() {
sendComplete();
}

static Consumer<ServerResponse.SseBuilder> connect(Flux<Map<String, Object>> resultFlux) {
return (sseBuilder) -> {
SseSubscriber subscriber = new SseSubscriber(sseBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class GraphQlSseHandlerTests {

private static final DataFetcher<?> SEARCH_DATA_FETCHER = env -> {
String author = env.getArgument("author");
return Flux.fromIterable(BookSource.books()).filter((book) -> book.getAuthor().getFullName().contains(author));
return Flux.fromIterable(BookSource.books())
.filter((book) -> book.getAuthor().getFullName().contains(author));
};

private final MockServerHttpRequest httpRequest = MockServerHttpRequest.post("/graphql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
*
* @author Brian Clozel
*/
@SuppressWarnings("ReactiveStreamsUnusedPublisher")
class GraphQlSseHandlerTests {

private static final List<HttpMessageConverter<?>> MESSAGE_READERS =
Expand Down Expand Up @@ -92,7 +93,7 @@ void shouldRejectQueryOperations() throws Exception {
void shouldWriteMultipleEventsForSubscription() throws Exception {
GraphQlSseHandler handler = createSseHandler(SEARCH_DATA_FETCHER);
MockHttpServletRequest request = createServletRequest("""
{ "query": "subscription TestSubscription { bookSearch(author:\\\"Orwell\\\") { id name } }" }
{ "query": "subscription TestSubscription { bookSearch(author:\\"Orwell\\") { id name } }" }
""");
MockHttpServletResponse response = handleAndAwait(request, handler);

Expand All @@ -118,7 +119,7 @@ void shouldWriteEventsAndTerminalError() throws Exception {

GraphQlSseHandler handler = createSseHandler(errorDataFetcher);
MockHttpServletRequest request = createServletRequest("""
{ "query": "subscription TestSubscription { bookSearch(author:\\\"Orwell\\\") { id name } }" }
{ "query": "subscription TestSubscription { bookSearch(author:\\"Orwell\\") { id name } }" }
""");
MockHttpServletResponse response = handleAndAwait(request, handler);

Expand All @@ -140,7 +141,7 @@ void shouldWriteEventsAndTerminalError() throws Exception {
void shouldCancelDataFetcherPublisherWhenWritingFails() throws Exception {
GraphQlSseHandler handler = createSseHandler(SEARCH_DATA_FETCHER);
MockHttpServletRequest servletRequest = createServletRequest("""
{ "query": "subscription TestSubscription { bookSearch(author:\\\"Orwell\\\") { id name } }" }
{ "query": "subscription TestSubscription { bookSearch(author:\\"Orwell\\") { id name } }" }
""");
HttpServletResponse servletResponse = mock(HttpServletResponse.class);
ServletOutputStream outputStream = mock(ServletOutputStream.class);
Expand All @@ -165,7 +166,7 @@ void shouldCancelDataFetcherWhenAsyncTimeout() throws Exception {

GraphQlSseHandler handler = createSseHandler(errorDataFetcher);
MockHttpServletRequest servletRequest = createServletRequest("""
{ "query": "subscription TestSubscription { bookSearch(author:\\\"Orwell\\\") { id name } }" }
{ "query": "subscription TestSubscription { bookSearch(author:\\"Orwell\\") { id name } }" }
""");

MockHttpServletResponse servletResponse = handleRequest(servletRequest, handler);
Expand Down

0 comments on commit 58cce01

Please sign in to comment.