Skip to content

Commit

Permalink
refactor connector keep-live timer
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolay Kim committed Feb 6, 2017
1 parent b1c5f47 commit 964921d
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 93 deletions.
15 changes: 11 additions & 4 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,13 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
self._request_class = request_class
self._response_class = response_class
self._ws_response_class = ws_response_class
self._time_service = (
time_service
if time_service is not None
else TimeService(self._loop))

if time_service is not None:
self._time_service_owner = False
self._time_service = time_service
else:
self._time_service_owner = True
self._time_service = TimeService(self._loop)

def __del__(self, _warnings=warnings):
if not self.closed:
Expand Down Expand Up @@ -486,6 +489,10 @@ def close(self):
if not self.closed:
self._connector.close()
self._connector = None

if self._time_service_owner:
self._time_service.close()

ret = helpers.create_future(self._loop)
ret.set_result(None)
return ret
Expand Down
8 changes: 4 additions & 4 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ def __aenter__(self):

@asyncio.coroutine
def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
yield from self.release()
else:
self.close()
# similar to _RequestContextManager, we do not need to check
# for exceptions, response object can closes connection
# is state is broken
yield from self.release()
80 changes: 39 additions & 41 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from collections import defaultdict
from hashlib import md5, sha1, sha256
from itertools import chain
from math import ceil
from types import MappingProxyType

from yarl import URL
Expand Down Expand Up @@ -112,8 +111,7 @@ class BaseConnector(object):
_source_traceback = None

def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel,
force_close=False, limit=20,
loop=None):
force_close=False, limit=20, time_service=None, loop=None):

if force_close:
if keepalive_timeout is not None and \
Expand All @@ -122,7 +120,7 @@ def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel,
'be set if force_close is True')
else:
if keepalive_timeout is sentinel:
keepalive_timeout = 30
keepalive_timeout = 15.0

if loop is None:
loop = asyncio.get_event_loop()
Expand All @@ -135,18 +133,29 @@ def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel,
self._acquired = defaultdict(set)
self._conn_timeout = conn_timeout
self._keepalive_timeout = keepalive_timeout
self._cleanup_handle = None
self._force_close = force_close
self._limit = limit
self._waiters = defaultdict(list)

if time_service is not None:
self._time_service_owner = False
self._time_service = time_service
else:
self._time_service_owner = True
self._time_service = helpers.TimeService(loop)

self._loop = loop
self._factory = functools.partial(
aiohttp.StreamProtocol, loop=loop,
disconnect_error=ServerDisconnectedError)

self.cookies = SimpleCookie()

self._cleanup_handle = None
if (keepalive_timeout is not sentinel and
keepalive_timeout is not None):
self._cleanup()

def __del__(self, _warnings=warnings):
if self._closed:
return
Expand Down Expand Up @@ -197,43 +206,29 @@ def _cleanup(self):
"""Cleanup unused transports."""
if self._cleanup_handle:
self._cleanup_handle.cancel()
self._cleanup_handle = None

now = self._loop.time()
now = self._time_service.loop_time()

connections = {}
timeout = self._keepalive_timeout
if self._conns:
connections = {}
deadline = now - self._keepalive_timeout
for key, conns in self._conns.items():
alive = []
for transport, proto, use_time in conns:
if transport is not None:
if proto.is_connected():
if use_time - deadline < 0:
transport.close()
else:
alive.append((transport, proto, use_time))

for key, conns in self._conns.items():
alive = []
for transport, proto, t0 in conns:
if transport is not None:
if proto and not proto.is_connected():
transport = None
else:
delta = t0 + self._keepalive_timeout - now
if delta < 0:
transport.close()
transport = None
elif delta < timeout:
timeout = delta

if transport is not None:
alive.append((transport, proto, t0))
if alive:
connections[key] = alive

if connections:
self._cleanup_handle = self._loop.call_at(
ceil(now + timeout), self._cleanup)

self._conns = connections

def _start_cleanup_task(self):
if self._cleanup_handle is None:
now = self._loop.time()
self._cleanup_handle = self._loop.call_at(
ceil(now + self._keepalive_timeout), self._cleanup)
if alive:
connections[key] = alive

self._conns = connections

self._cleanup_handle = self._time_service.call_later(
self._keepalive_timeout / 2.0, self._cleanup)

def close(self):
"""Close all opened transports."""
Expand All @@ -247,6 +242,9 @@ def close(self):
if self._loop.is_closed():
return ret

if self._time_service_owner:
self._time_service.close()

for key, data in self._conns.items():
for transport, proto, t0 in data:
transport.close()
Expand Down Expand Up @@ -330,6 +328,7 @@ def _get(self, key):
conns = self._conns[key]
except KeyError:
return None, None

t1 = self._loop.time()
while conns:
transport, proto, t0 = conns.pop()
Expand All @@ -342,6 +341,7 @@ def _get(self, key):
# The very last connection was reclaimed: drop the key
del self._conns[key]
return transport, proto

# No more connections: drop the key
del self._conns[key]
return None, None
Expand Down Expand Up @@ -398,8 +398,6 @@ def _release(self, key, req, transport, protocol, *, should_close=False):
conns.append((transport, protocol, self._loop.time()))
reader.unset_parser()

self._start_cleanup_task()

@asyncio.coroutine
def _create_connection(self, req):
raise NotImplementedError()
Expand Down
43 changes: 26 additions & 17 deletions aiohttp/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ def __init__(self, loop, *, interval=1.0):
self._cb = loop.call_at(self._loop_time + self._interval, self._on_cb)
self._scheduled = []

def stop(self):
def close(self):
if self._cb:
self._cb.cancel()

Expand Down Expand Up @@ -673,6 +673,9 @@ def strtime(self):
self._strtime = s = self._format_date_time()
return self._strtime

def loop_time(self):
return self._loop_time

def call_later(self, delay, callback, *args):
"""Arrange for a callback to be called at a given time.
Expand Down Expand Up @@ -703,43 +706,49 @@ def timeout(self, timeout):
timeout - value in seconds or None to disable timeout logic
"""
return LowresTimeout(timeout, self, self._loop)
when = self._loop_time + timeout if timeout is not None else 0.0

ctx = _TimeServiceTimeoutContext(when, self._loop)

if timeout is not None:
heapq.heappush(self._scheduled, ctx)

return ctx

class LowresTimeout:

class _TimeServiceTimeoutContext(TimerHandle):
""" Low resolution timeout context manager """

def __init__(self, timeout, time_service, loop):
self._loop = loop
self._timeout = timeout
self._time_service = time_service
def __init__(self, when, loop):
super().__init__(when, self.cancel, (), loop)

self._task = None
self._cancelled = False
self._cancel_handler = None

def __enter__(self):
self._task = asyncio.Task.current_task(loop=self._loop)
if self._task is None:
self._cancelled = True
raise RuntimeError('Timeout context manager should be used '
'inside a task')
if self._timeout is not None:
self._cancel_handler = self._time_service.call_later(
self._timeout, self._cancel_task)

return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._task = None

if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
raise asyncio.TimeoutError from None
if self._timeout is not None:
self._cancel_handler.cancel()
self._cancel_handler = None

def _cancel_task(self):
self._cancelled = self._task.cancel()
self._cancelled = True

def cancel(self):
if not self._cancelled:
if self._task is not None:
self._task.cancel()
self._task = None

self._cancelled = True


class HeadersMixin:
Expand Down
2 changes: 1 addition & 1 deletion aiohttp/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def shutdown(self, timeout=None):
coros = [conn.shutdown(timeout) for conn in self._connections]
yield from asyncio.gather(*coros, loop=self._loop)
self._connections.clear()
self._time_service.stop()
self._time_service.close()

finish_connections = shutdown

Expand Down
Loading

0 comments on commit 964921d

Please sign in to comment.