From c821ecc977fd6b4e439546dc75a3864c8ef6cdd4 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 30 Jan 2024 15:11:28 +0000 Subject: [PATCH] graphql-ws: optimise the "resolve" routine (#548) graphql-ws: optimise the "resolve" routine * Partially addresses https://github.com/cylc/cylc-uiserver/issues/547 * The resolve routine is implmented recursively in the graphql-ws library. * Because the function is async this results in a large number of async tasks being created when the library is used with large, deeply nested schema. * Async tasks have an overhead, above that of regular function calls. * For the example in https://github.com/cylc/cylc-uiserver/issues/547 this resulted in over 10 seconds of overheads. * websockets: avoid duplicate resolve call Due to inheritance, we were calling `resolve(execution_result.data)` twice. --------- Co-authored-by: Tim Pillinger <26465611+wxtim@users.noreply.github.com> --- changes.d/548.feat.md | 1 + cylc/uiserver/websockets/resolve.py | 67 +++++++++++++++++++++++++++++ cylc/uiserver/websockets/tornado.py | 18 ++++++-- 3 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 changes.d/548.feat.md create mode 100644 cylc/uiserver/websockets/resolve.py diff --git a/changes.d/548.feat.md b/changes.d/548.feat.md new file mode 100644 index 00000000..e1803848 --- /dev/null +++ b/changes.d/548.feat.md @@ -0,0 +1 @@ +Improve the performance of the GraphQL server. diff --git a/cylc/uiserver/websockets/resolve.py b/cylc/uiserver/websockets/resolve.py new file mode 100644 index 00000000..71bb8f23 --- /dev/null +++ b/cylc/uiserver/websockets/resolve.py @@ -0,0 +1,67 @@ +# MIT License +# +# Copyright (c) 2017, Syrus Akbary +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +This file contains an implementation of "resolve" derived from the one +found in the graphql-ws library with the above license. + +This is temporary code until the change makes its way upstream. +""" + +# NOTE: transient dependency from graphql-ws purposefully not +# reflected in cylc-uiserver dependencies +from promise import Promise + +from graphql_ws.base_async import is_awaitable + + +async def resolve( + data, + _container=None, + _key=None, +): + """ + Wait on any awaitable children of a data element and resolve + any Promises. + """ + stack = [(data, _container, _key)] + + while stack: + _data, _container, _key = stack.pop() + + if is_awaitable(_data): + _data = await _data + if isinstance(_data, Promise): + _data = _data.value + if _container is not None: + _container[_key] = _data + if isinstance(_data, dict): + items = _data.items() + elif isinstance(_data, list): + items = enumerate(_data) + else: + items = None + if items is not None: + stack.extend([ + (child, _data, key) + for key, child in items + ]) diff --git a/cylc/uiserver/websockets/tornado.py b/cylc/uiserver/websockets/tornado.py index fcabac34..aaf2e285 100644 --- a/cylc/uiserver/websockets/tornado.py +++ b/cylc/uiserver/websockets/tornado.py @@ -13,9 +13,8 @@ from asyncio.queues import QueueEmpty from tornado.websocket import WebSocketClosedError from graphql.execution.middleware import MiddlewareManager -from graphql_ws.base import ConnectionClosedException +from graphql_ws.base import ConnectionClosedException, BaseSubscriptionServer from graphql_ws.base_async import ( - resolve, BaseAsyncConnectionContext, BaseAsyncSubscriptionServer ) @@ -29,6 +28,9 @@ from typing import Union, Awaitable, Any, List, Tuple, Dict, Optional from cylc.uiserver.authorise import AuthorizationMiddleware +from cylc.uiserver.websockets.resolve import resolve + + setup_observable_extension() NO_MSG_DELAY = 1.0 @@ -163,4 +165,14 @@ async def send_execution_result(self, connection_context, op_id, execution_resul await resolve(execution_result.data) request_context = connection_context.request_context await request_context['resolvers'].flow_delta_processed(request_context, op_id) - await super().send_execution_result(connection_context, op_id, execution_result) + else: + await resolve(execution_result.data) + + # NOTE: skip TornadoSubscriptionServer.send_execution_result because it + # calls "resolve" then invokes BaseSubscriptionServer.send_execution_result + await BaseSubscriptionServer.send_execution_result( + self, + connection_context, + op_id, + execution_result, + )