Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Respect the @cancellable flag for ReplicationEndpoints #12700

Merged
merged 4 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12700.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Respect the `@cancellable` flag for `ReplicationEndpoint`s.
21 changes: 19 additions & 2 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError
from synapse.http.server import HttpServer
from synapse.http.server import HttpServer, is_method_cancellable
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing
from synapse.logging.opentracing import trace
from synapse.types import JsonDict
Expand Down Expand Up @@ -310,6 +311,12 @@ def register(self, http_server: HttpServer) -> None:
url_args = list(self.PATH_ARGS)
method = self.METHOD

if self.CACHE and is_method_cancellable(self._handle_request):
raise Exception(
f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
"is set. The cancellable flag would have no effect."
)

if self.CACHE:
url_args.append("txn_id")

Expand All @@ -324,7 +331,7 @@ def register(self, http_server: HttpServer) -> None:
)

async def _check_auth_and_handle(
self, request: Request, **kwargs: Any
self, request: SynapseRequest, **kwargs: Any
) -> Tuple[int, JsonDict]:
"""Called on new incoming requests when caching is enabled. Checks
if there is a cached response for the request and returns that,
Expand All @@ -340,8 +347,18 @@ async def _check_auth_and_handle(
if self.CACHE:
txn_id = kwargs.pop("txn_id")

# We ignore the `@cancellable` flag, since cancellation wouldn't interupt
# `_handle_request` and `ResponseCache` does not handle cancellation
# correctly yet. In particular, there may be issues to do with logging
# context lifetimes.

return await self.response_cache.wrap(
txn_id, self._handle_request, request, **kwargs
)

# The `@cancellable` decorator may be applied to `_handle_request`. But we
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
# so we have to set up the cancellable flag ourselves.
request.is_render_cancellable = is_method_cancellable(self._handle_request)

return await self._handle_request(request, **kwargs)
13 changes: 13 additions & 0 deletions tests/replication/http/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
106 changes: 106 additions & 0 deletions tests/replication/http/test__base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from http import HTTPStatus
from typing import Tuple

from twisted.web.server import Request

from synapse.api.errors import Codes
from synapse.http.server import JsonResource, cancellable
from synapse.replication.http import REPLICATION_PREFIX
from synapse.replication.http._base import ReplicationEndpoint
from synapse.server import HomeServer
from synapse.types import JsonDict

from tests import unittest
from tests.http.server._base import EndpointCancellationTestHelperMixin


class CancellableReplicationEndpoint(ReplicationEndpoint):
NAME = "cancellable_sleep"
PATH_ARGS = ()
CACHE = False

def __init__(self, hs: HomeServer):
super().__init__(hs)
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload() -> JsonDict:
return {}

@cancellable
async def _handle_request( # type: ignore[override]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was mypy's complaint here, out of interest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember the exact wording. It's because the base class defines _handle_request like so:

    @abc.abstractmethod
    async def _handle_request(
        self, request: Request, **kwargs: Any
    ) -> Tuple[int, JsonDict]:

but all derived classes ditch the kwargs in favour of their real arguments, as specified by PATH_ARGS:

    async def _handle_request(  # type: ignore[override]
        self, request: Request, user_id: str, account_data_type: str
    ) -> Tuple[int, JsonDict]:

which doesn't accept arbitrary kwargs, so mypy is unhappy.

self, request: Request
) -> Tuple[int, JsonDict]:
await self.clock.sleep(1.0)
return HTTPStatus.OK, {"result": True}


class UncancellableReplicationEndpoint(ReplicationEndpoint):
NAME = "uncancellable_sleep"
PATH_ARGS = ()
CACHE = False

def __init__(self, hs: HomeServer):
super().__init__(hs)
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload() -> JsonDict:
return {}

async def _handle_request( # type: ignore[override]
self, request: Request
) -> Tuple[int, JsonDict]:
await self.clock.sleep(1.0)
return HTTPStatus.OK, {"result": True}


class ReplicationEndpointCancellationTestCase(
unittest.HomeserverTestCase, EndpointCancellationTestHelperMixin
):
"""Tests for `ReplicationEndpoint` cancellation."""

def create_test_resource(self):
"""Overrides `HomeserverTestCase.create_test_resource`."""
resource = JsonResource(self.hs)

CancellableReplicationEndpoint(self.hs).register(resource)
UncancellableReplicationEndpoint(self.hs).register(resource)

return resource

def test_cancellable_disconnect(self) -> None:
"""Test that handlers with the `@cancellable` flag can be cancelled."""
path = f"{REPLICATION_PREFIX}/{CancellableReplicationEndpoint.NAME}/"
channel = self.make_request("POST", path, await_result=False)
self._test_disconnect(
self.reactor,
channel,
expect_cancellation=True,
expected_body={"error": "Request cancelled", "errcode": Codes.UNKNOWN},
)

def test_uncancellable_disconnect(self) -> None:
"""Test that handlers without the `@cancellable` flag cannot be cancelled."""
path = f"{REPLICATION_PREFIX}/{UncancellableReplicationEndpoint.NAME}/"
channel = self.make_request("POST", path, await_result=False)
self._test_disconnect(
self.reactor,
channel,
expect_cancellation=False,
expected_body={"result": True},
)