From bb389e69e1ea87d347497dd796b3dd5134b5babf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 1 Mar 2022 01:34:01 +0000 Subject: [PATCH 1/5] fix incorrect unwrapFirstError import this was being imported from the wrong place --- synapse/handlers/message.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a9c964cd7533..ce1fa3c78ea3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -55,8 +55,8 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester -from synapse.util import json_decoder, json_encoder, log_failure -from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError +from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError +from synapse.util.async_helpers import Linearizer, gather_results from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client From c831ba78e2265a348cdc9fdbba9a22eaffe7c056 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 1 Mar 2022 01:33:05 +0000 Subject: [PATCH 2/5] Refactor `concurrently_execute` to use `yieldable_gather_results` --- synapse/util/async_helpers.py | 16 +++------ tests/util/test_async_helpers.py | 59 ++++++++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 3f7299aff7eb..8f8784baed35 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -193,9 +193,9 @@ def __repr__(self) -> str: T = TypeVar("T") -def concurrently_execute( +async def concurrently_execute( func: Callable[[T], Any], args: Iterable[T], limit: int -) -> defer.Deferred: +) -> None: """Executes the function with each argument concurrently while limiting the number of concurrent executions. @@ -221,15 +221,9 @@ async def _concurrently_execute_inner(value: T) -> None: # We use `itertools.islice` to handle the case where the number of args is # less than the limit, avoiding needlessly spawning unnecessary background # tasks. - return make_deferred_yieldable( - defer.gatherResults( - [ - run_in_background(_concurrently_execute_inner, value) - for value in itertools.islice(it, limit) - ], - consumeErrors=True, - ) - ).addErrback(unwrapFirstError) + await yieldable_gather_results( + _concurrently_execute_inner, (value for value in itertools.islice(it, limit)) + ) def yieldable_gather_results( diff --git a/tests/util/test_async_helpers.py b/tests/util/test_async_helpers.py index ab89cab81256..7e6ed12fedcf 100644 --- a/tests/util/test_async_helpers.py +++ b/tests/util/test_async_helpers.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from twisted.internet import defer -from twisted.internet.defer import CancelledError, Deferred +from twisted.internet.defer import CancelledError, Deferred, ensureDeferred from twisted.internet.task import Clock from synapse.logging.context import ( @@ -21,7 +21,11 @@ PreserveLoggingContext, current_context, ) -from synapse.util.async_helpers import ObservableDeferred, timeout_deferred +from synapse.util.async_helpers import ( + ObservableDeferred, + concurrently_execute, + timeout_deferred, +) from tests.unittest import TestCase @@ -171,3 +175,54 @@ def errback(res, deferred_name): ) self.failureResultOf(timing_out_d, defer.TimeoutError) self.assertIs(current_context(), context_one) + + +class _TestException(Exception): + pass + + +class ConcurrentlyExecuteTest(TestCase): + def test_limits_runners(self): + """If we have more tasks than runners, we should get the limit of runners""" + started = 0 + waiters = [] + processed = [] + + async def callback(v): + # when we first enter, bump the start count + nonlocal started + started += 1 + + # record the fact we got an item + processed.append(v) + + # wait for the goahead before returning + d2 = Deferred() + waiters.append(d2) + await d2 + + # set it going + d2 = ensureDeferred(concurrently_execute(callback, [1, 2, 3, 4, 5], 3)) + + # check we got exactly 3 processes + self.assertEqual(started, 3) + self.assertEqual(len(waiters), 3) + + # let one finish + waiters.pop().callback(0) + + # ... which should start another + self.assertEqual(started, 4) + self.assertEqual(len(waiters), 3) + + # we still shouldn't be done + self.assertNoResult(d2) + + # finish the job + while waiters: + waiters.pop().callback(0) + + # check everything got done + self.assertEqual(started, 5) + self.assertCountEqual(processed, [1, 2, 3, 4, 5]) + self.successResultOf(d2) From eb152c890ff39be29e5da3dd1fa05b82bf6542d6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 1 Mar 2022 01:46:06 +0000 Subject: [PATCH 3/5] Improve exception handling in `yieldable_gather_results` Try to avoid swallowing so many stack traces. --- synapse/util/async_helpers.py | 38 +++++++++++++++------- tests/util/test_async_helpers.py | 56 ++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 8f8784baed35..a83296a2292b 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -29,6 +29,7 @@ Hashable, Iterable, Iterator, + List, Optional, Set, Tuple, @@ -51,7 +52,7 @@ make_deferred_yieldable, run_in_background, ) -from synapse.util import Clock, unwrapFirstError +from synapse.util import Clock logger = logging.getLogger(__name__) @@ -226,9 +227,9 @@ async def _concurrently_execute_inner(value: T) -> None: ) -def yieldable_gather_results( - func: Callable, iter: Iterable, *args: Any, **kwargs: Any -) -> defer.Deferred: +async def yieldable_gather_results( + func: Callable[..., Awaitable[T]], iter: Iterable, *args: Any, **kwargs: Any +) -> List[T]: """Executes the function with each argument concurrently. Args: @@ -239,15 +240,30 @@ def yieldable_gather_results( **kwargs: Keyword arguments to be passed to each call to func Returns - Deferred[list]: Resolved when all functions have been invoked, or errors if - one of the function calls fails. + A list containing the results of the function """ - return make_deferred_yieldable( - defer.gatherResults( - [run_in_background(func, item, *args, **kwargs) for item in iter], - consumeErrors=True, + try: + return await make_deferred_yieldable( + defer.gatherResults( + [run_in_background(func, item, *args, **kwargs) for item in iter], + consumeErrors=True, + ) ) - ).addErrback(unwrapFirstError) + except defer.FirstError as dfe: + # unwrap the error from defer.gatherResults. + + # The raised exception's traceback only includes func() etc if + # the 'await' happens before the exception is thrown - ie if the failure + # happens *asynchronously* - otherwise Twisted throws away the traceback as it + # could be large. + # + # We could maybe reconstruct a fake traceback from Failure.frames. Or maybe + # we could throw Twisted into the fires of Mordor. + + # suppress exception chaining, because the FirstError doesn't tell us anything + # very interesting. + assert isinstance(dfe.subFailure.value, BaseException) + raise dfe.subFailure.value from None T1 = TypeVar("T1") diff --git a/tests/util/test_async_helpers.py b/tests/util/test_async_helpers.py index 7e6ed12fedcf..cce8d595fc7e 100644 --- a/tests/util/test_async_helpers.py +++ b/tests/util/test_async_helpers.py @@ -11,9 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import traceback + from twisted.internet import defer from twisted.internet.defer import CancelledError, Deferred, ensureDeferred from twisted.internet.task import Clock +from twisted.python.failure import Failure from synapse.logging.context import ( SENTINEL_CONTEXT, @@ -226,3 +229,56 @@ async def callback(v): self.assertEqual(started, 5) self.assertCountEqual(processed, [1, 2, 3, 4, 5]) self.successResultOf(d2) + + def test_preserves_stacktraces(self): + """Test that the stacktrace from an exception thrown in the callback is preserved""" + d1 = Deferred() + + async def callback(v): + # alas, this doesn't work at all without an await here + await d1 + raise _TestException("bah") + + async def caller(): + try: + await concurrently_execute(callback, [1], 2) + except _TestException as e: + tb = traceback.extract_tb(e.__traceback__) + # we expect to see "caller", "concurrently_execute" and "callback". + self.assertEqual(tb[0].name, "caller") + self.assertEqual(tb[1].name, "concurrently_execute") + self.assertEqual(tb[-1].name, "callback") + else: + self.fail("No exception thrown") + + d2 = ensureDeferred(caller()) + d1.callback(0) + self.successResultOf(d2) + + def test_preserves_stacktraces_on_preformed_failure(self): + """Test that the stacktrace on a Failure returned by the callback is preserved""" + d1 = Deferred() + f = Failure(_TestException("bah")) + + async def callback(v): + # alas, this doesn't work at all without an await here + await d1 + await defer.fail(f) + + async def caller(): + try: + await concurrently_execute(callback, [1], 2) + except _TestException as e: + tb = traceback.extract_tb(e.__traceback__) + # we expect to see "caller", "concurrently_execute", "callback", + # and some magic from inside ensureDeferred that happens when .fail + # is called. + self.assertEqual(tb[0].name, "caller") + self.assertEqual(tb[1].name, "concurrently_execute") + self.assertEqual(tb[-2].name, "callback") + else: + self.fail("No exception thrown") + + d2 = ensureDeferred(caller()) + d1.callback(0) + self.successResultOf(d2) From 95d2bcb5f3c20906bc764f49048c6f2541d92ac2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 1 Mar 2022 01:47:01 +0000 Subject: [PATCH 4/5] mark unwrapFirstError deprecated --- synapse/util/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 511f52534b3c..58b4220ff355 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -81,7 +81,9 @@ def _handle_frozendict(obj: Any) -> Dict[Any, Any]: def unwrapFirstError(failure: Failure) -> Failure: - # defer.gatherResults and DeferredLists wrap failures. + # Deprecated: you probably just want to catch defer.FirstError and reraise + # the subFailure's value, which will do a better job of preserving stacktraces. + # (actually, you probably want to use yieldable_gather_results anyway) failure.trap(defer.FirstError) return failure.value.subFailure # type: ignore[union-attr] # Issue in Twisted's annotations From 467d710f161b39faf02374689a483ceeb08f89c4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 1 Mar 2022 01:50:43 +0000 Subject: [PATCH 5/5] changelog --- changelog.d/12109.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12109.misc diff --git a/changelog.d/12109.misc b/changelog.d/12109.misc new file mode 100644 index 000000000000..3295e49f43b9 --- /dev/null +++ b/changelog.d/12109.misc @@ -0,0 +1 @@ +Improve exception handling for concurrent execution.