Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Attempt to figure out what's going on with timeouts #3857

Merged
merged 4 commits into from
Sep 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3857.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor some HTTP timeout code.
98 changes: 42 additions & 56 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from prometheus_client import Counter
from signedjson.sign import sign_json

from twisted.internet import defer, protocol, reactor
from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, HTTPConnectionPool
Expand All @@ -40,10 +40,8 @@
HttpResponseException,
SynapseError,
)
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.logcontext import make_deferred_yieldable

logger = logging.getLogger(__name__)
Expand All @@ -66,13 +64,14 @@

class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.reactor = hs.get_reactor()
self.tls_client_options_factory = hs.tls_client_options_factory

def endpointForURI(self, uri):
destination = uri.netloc.decode('ascii')

return matrix_federation_endpoint(
reactor, destination, timeout=10,
self.reactor, destination, timeout=10,
tls_client_options_factory=self.tls_client_options_factory
)

Expand All @@ -90,6 +89,7 @@ def __init__(self, hs):
self.hs = hs
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
reactor = hs.get_reactor()
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
Expand All @@ -100,6 +100,7 @@ def __init__(self, hs):
self._store = hs.get_datastore()
self.version_string = hs.version_string.encode('ascii')
self._next_id = 1
self.default_timeout = 60

def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
return urllib.parse.urlunparse(
Expand Down Expand Up @@ -143,6 +144,11 @@ def _request(self, destination, method, path,
(May also fail with plenty of other Exceptions for things like DNS
failures, connection failures, SSL failures.)
"""
if timeout:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout

if (
self.hs.config.federation_domain_whitelist is not None and
destination not in self.hs.config.federation_domain_whitelist
Expand Down Expand Up @@ -215,13 +221,9 @@ def _request(self, destination, method, path,
headers=Headers(headers_dict),
data=data,
agent=self.agent,
reactor=self.hs.get_reactor()
)
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
response = yield make_deferred_yieldable(
request_deferred,
)
Expand Down Expand Up @@ -261,6 +263,13 @@ def _request(self, destination, method, path,
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)

logger.debug(
"{%s} Waiting %s before sending to %s...",
txn_id,
delay,
destination
)

yield self.clock.sleep(delay)
retries_left -= 1
else:
Expand All @@ -279,10 +288,9 @@ def _request(self, destination, method, path,
# :'(
# Update transactions table?
with logcontext.PreserveLoggingContext():
body = yield self._timeout_deferred(
treq.content(response),
timeout,
)
d = treq.content(response)
d.addTimeout(_sec_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
)
Expand Down Expand Up @@ -396,10 +404,9 @@ def put_json(self, destination, path, args={}, data={},
check_content_type_is_json(response.headers)

with logcontext.PreserveLoggingContext():
body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
defer.returnValue(body)

@defer.inlineCallbacks
Expand Down Expand Up @@ -449,10 +456,14 @@ def post_json(self, destination, path, data={}, long_retries=False,
check_content_type_is_json(response.headers)

with logcontext.PreserveLoggingContext():
body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
d = treq.json_content(response)
if timeout:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout

d.addTimeout(_sec_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)

defer.returnValue(body)

Expand Down Expand Up @@ -504,10 +515,9 @@ def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
check_content_type_is_json(response.headers)

with logcontext.PreserveLoggingContext():
body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)

defer.returnValue(body)

Expand Down Expand Up @@ -554,10 +564,9 @@ def delete_json(self, destination, path, long_retries=False,
check_content_type_is_json(response.headers)

with logcontext.PreserveLoggingContext():
body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)

defer.returnValue(body)

Expand Down Expand Up @@ -599,38 +608,15 @@ def get_file(self, destination, path, output_stream, args={},

try:
with logcontext.PreserveLoggingContext():
length = yield self._timeout_deferred(
_readBodyToFile(
response, output_stream, max_size
),
)
d = _readBodyToFile(response, output_stream, max_size)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
length = yield make_deferred_yieldable(d)
except Exception:
logger.exception("Failed to download body")
raise

defer.returnValue((length, headers))

def _timeout_deferred(self, deferred, timeout_ms=None):
"""Times the deferred out after `timeout_ms` ms

Args:
deferred (Deferred)
timeout_ms (int|None): Timeout in milliseconds. If None defaults
to 60 seconds.

Returns:
Deferred
"""

add_timeout_to_deferred(
deferred,
timeout_ms / 1000. if timeout_ms else 60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)

return deferred


class _ReadBodyToFileProtocol(protocol.Protocol):
def __init__(self, stream, deferred, max_size):
Expand Down
157 changes: 157 additions & 0 deletions tests/http/test_fedclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from mock import Mock

from twisted.internet.defer import TimeoutError
from twisted.internet.error import ConnectingCancelledError, DNSLookupError
from twisted.web.client import ResponseNeverReceived

from synapse.http.matrixfederationclient import MatrixFederationHttpClient

from tests.unittest import HomeserverTestCase


class FederationClientTests(HomeserverTestCase):
def make_homeserver(self, reactor, clock):

hs = self.setup_test_homeserver(reactor=reactor, clock=clock)
hs.tls_client_options_factory = None
return hs

def prepare(self, reactor, clock, homeserver):

self.cl = MatrixFederationHttpClient(self.hs)
self.reactor.lookups["testserv"] = "1.2.3.4"

def test_dns_error(self):
"""
If the DNS raising returns an error, it will bubble up.
"""
d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000)
self.pump()

f = self.failureResultOf(d)
self.assertIsInstance(f.value, DNSLookupError)

def test_client_never_connect(self):
"""
If the HTTP request is not connected and is timed out, it'll give a
ConnectingCancelledError.
"""
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)

self.pump()

# Nothing happened yet
self.assertFalse(d.called)

# Make sure treq is trying to connect
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
self.assertEqual(clients[0][0], '1.2.3.4')
self.assertEqual(clients[0][1], 8008)

# Deferred is still without a result
self.assertFalse(d.called)

# Push by enough to time it out
self.reactor.advance(10.5)
f = self.failureResultOf(d)

self.assertIsInstance(f.value, ConnectingCancelledError)

def test_client_connect_no_response(self):
"""
If the HTTP request is connected, but gets no response before being
timed out, it'll give a ResponseNeverReceived.
"""
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)

self.pump()

# Nothing happened yet
self.assertFalse(d.called)

# Make sure treq is trying to connect
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
self.assertEqual(clients[0][0], '1.2.3.4')
self.assertEqual(clients[0][1], 8008)

conn = Mock()
client = clients[0][2].buildProtocol(None)
client.makeConnection(conn)

# Deferred is still without a result
self.assertFalse(d.called)

# Push by enough to time it out
self.reactor.advance(10.5)
f = self.failureResultOf(d)

self.assertIsInstance(f.value, ResponseNeverReceived)

def test_client_gets_headers(self):
"""
Once the client gets the headers, _request returns successfully.
"""
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)

self.pump()

conn = Mock()
clients = self.reactor.tcpClients
client = clients[0][2].buildProtocol(None)
client.makeConnection(conn)

# Deferred does not have a result
self.assertFalse(d.called)

# Send it the HTTP response
client.dataReceived(b"HTTP/1.1 200 OK\r\nServer: Fake\r\n\r\n")

# We should get a successful response
r = self.successResultOf(d)
self.assertEqual(r.code, 200)

def test_client_headers_no_body(self):
"""
If the HTTP request is connected, but gets no response before being
timed out, it'll give a ResponseNeverReceived.
"""
d = self.cl.post_json("testserv:8008", "foo/bar", timeout=10000)

self.pump()

conn = Mock()
clients = self.reactor.tcpClients
client = clients[0][2].buildProtocol(None)
client.makeConnection(conn)

# Deferred does not have a result
self.assertFalse(d.called)

# Send it the HTTP response
client.dataReceived(
(b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n"
b"Server: Fake\r\n\r\n")
)

# Push by enough to time it out
self.reactor.advance(10.5)
f = self.failureResultOf(d)

self.assertIsInstance(f.value, TimeoutError)
Loading