Skip to content

Commit

Permalink
Use force_close and concurrency restriction
Browse files Browse the repository at this point in the history
The use of force_close was inspired by:

aio-libs/aiohttp#2867 (comment)

- Causes Factory and Downloaders to use force_close which causes the TCP
  connection to close with each request.
- Ups the default concurrency to 20
- Moves the concurrency restriction feature to the Downloaders and
  updates HttpDownloader and FileDownloader to use it.
- Stops using aiohttp for concurrency restriction because it's done in
  the downloaders.
- The Factory now configures the concurrency restriction using the value
  on the remote.

https://pulp.plan.io/issues/4036
https://pulp.plan.io/issues/4075

closes #4036
closes #4075
  • Loading branch information
Brian Bouterse committed Nov 1, 2018
1 parent 21a1d7d commit 7dbc5e6
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 27 deletions.
12 changes: 11 additions & 1 deletion plugin/pulpcore/plugin/download/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class BaseDownloader:
``custom_file_object`` option was specified, otherwise None.
"""

def __init__(self, url, custom_file_object=None, expected_digests=None, expected_size=None):
def __init__(self, url, custom_file_object=None, expected_digests=None, expected_size=None,
semaphore=None):
"""
Create a BaseDownloader object. This is expected to be called by all subclasses.
Expand All @@ -73,6 +74,8 @@ def __init__(self, url, custom_file_object=None, expected_digests=None, expected
expected_digests (dict): Keyed on the algorithm name provided by hashlib and stores the
value of the expected digest. e.g. {'md5': '912ec803b2ce49e4a541068d495ab570'}
expected_size (int): The number of bytes the download is expected to have.
semaphore (asyncio.Semaphore): A semaphore the downloader must acquire before running.
Useful for limiting the number of outstanding downloaders in various ways.
"""
self.url = url
if custom_file_object:
Expand All @@ -83,6 +86,10 @@ def __init__(self, url, custom_file_object=None, expected_digests=None, expected
self.path = self._writer.name
self.expected_digests = expected_digests
self.expected_size = expected_size
if semaphore:
self.semaphore = semaphore
else:
self.semaphore = asyncio.Semaphore() # This will always be acquired
self._digests = {n: hashlib.new(n) for n in Artifact.DIGEST_FIELDS}
self._size = 0

Expand Down Expand Up @@ -203,6 +210,9 @@ async def run(self):
:class:`~pulpcore.plugin.download.DownloadResult` is usually set to the
:attr:`~pulpcore.plugin.download.BaseDownloader.artifact_attributes` property value.
Additionally it is expected for the downloader to acquire self.semaphore before doing other
work. This allows the `semaphore` parameter to restricts the downloader as expected.
Returns:
:class:`~pulpcore.plugin.download.DownloadResult`
Expand Down
22 changes: 16 additions & 6 deletions plugin/pulpcore/plugin/download/factory.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import aiohttp
import asyncio
import atexit
import copy
from gettext import gettext as _
import ssl
from urllib.parse import urlparse

import aiohttp

from .http import HttpDownloader
from .file import FileDownloader

Expand All @@ -20,7 +22,8 @@ class DownloaderFactory:
"""
A factory for creating downloader objects that are configured from with remote settings.
The DownloadFactory correctly handles SSL settings, basic auth settings, and proxy settings.
The DownloadFactory correctly handles SSL settings, basic auth settings, proxy settings, and
connection limit settings.
It supports handling urls with the `http`, `https`, and `file` protocols. The
``downloader_overrides`` option allows the caller to specify the download class to be used for
Expand All @@ -38,6 +41,10 @@ class DownloaderFactory:
http://aiohttp.readthedocs.io/en/stable/client_quickstart.html#timeouts Behaviorally, it should
allow for an active download to be arbitrarily long, while still detecting dead or closed
sessions even when TCPKeepAlive is disabled.
Also for http and https urls, even though HTTP 1.1 is used, the TCP connection is setup and
closed with each request. This is done for compatibility reasons due to various issues related
to session continuation implementation in various servers.
"""

def __init__(self, remote, downloader_overrides=None):
Expand All @@ -57,16 +64,19 @@ def __init__(self, remote, downloader_overrides=None):
self._handler_map = {'https': self._http_or_https, 'http': self._http_or_https,
'file': self._generic}
self._session = self._make_aiohttp_session_from_remote()
self._semaphore = asyncio.Semaphore(value=remote.connection_limit)
atexit.register(self._session.close)

def _make_aiohttp_session_from_remote(self):
"""
Build a :class:`aiohttp.ClientSession` from the remote's settings and timing settings.
This method is what provides the force_close of the TCP connection with each request.
Returns:
:class:`aiohttp.ClientSession`
"""
tcp_conn_opts = {}
tcp_conn_opts = {'force_close': True}

sslcontext = None
if self._remote.ssl_ca_certificate.name:
Expand All @@ -90,9 +100,6 @@ def _make_aiohttp_session_from_remote(self):
if self._remote.ssl_validation:
tcp_conn_opts['verify_ssl'] = self._remote.ssl_validation

if self._remote.connection_limit:
tcp_conn_opts['limit'] = self._remote.connection_limit

conn = aiohttp.TCPConnector(**tcp_conn_opts)

auth_options = {}
Expand All @@ -109,6 +116,8 @@ def build(self, url, **kwargs):
"""
Build a downloader which can optionally verify integrity using either digest or size.
The built downloader also provides concurrency restriction if specified by the remote.
Args:
url (str): The download URL.
kwargs (dict): All kwargs are passed along to the downloader. At a minimum, these
Expand All @@ -118,6 +127,7 @@ def build(self, url, **kwargs):
subclass of :class:`~pulpcore.plugin.download.BaseDownloader`: A downloader that
is configured with the remote settings.
"""
kwargs['semaphore'] = self._semaphore
scheme = urlparse(url).scheme.lower()
try:
builder = self._handler_map[scheme]
Expand Down
22 changes: 13 additions & 9 deletions plugin/pulpcore/plugin/download/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,22 @@ async def run(self, extra_data=None):
"""
Read, validate, and compute digests on the `url`. This is a coroutine.
This method supports restricting the number of concurrent downloaders by acquiring the
self.semaphore immediately before any downloading is done.
This method provides the same return object type and documented in
:meth:`~pulpcore.plugin.download.BaseDownloader.run`.
Args:
extra_data (dict): Extra data passed to the downloader.
"""
async with aiofiles.open(self._path, 'rb') as f_handle:
while True:
chunk = await f_handle.read(1048576) # 1 megabyte
if not chunk:
self.finalize()
break # the reading is done
self.handle_data(chunk)
return DownloadResult(path=self._path, artifact_attributes=self.artifact_attributes,
url=self.url)
async with self.semaphore:
async with aiofiles.open(self._path, 'rb') as f_handle:
while True:
chunk = await f_handle.read(1048576) # 1 megabyte
if not chunk:
self.finalize()
break # the reading is done
self.handle_data(chunk)
return DownloadResult(path=self._path, artifact_attributes=self.artifact_attributes,
url=self.url)
30 changes: 20 additions & 10 deletions plugin/pulpcore/plugin/download/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
logging.getLogger('backoff').addHandler(logging.StreamHandler())


def giveup(exc):
def http_giveup(exc):
"""
Inspect a raised exception and determine if we should give up.
Expand Down Expand Up @@ -54,6 +54,10 @@ class HttpDownloader(BaseDownloader):
allow for an active download to be arbitrarily long, while still detecting dead or closed
sessions even when TCPKeepAlive is disabled.
If a session is not provided, the one created will force TCP connection closure after each
request. This is done for compatibility reasons due to various issues related to session
continuation implementation in various servers.
`aiohttp.ClientSession` objects allows you to configure options that will apply to all
downloaders using that session such as auth, timeouts, headers, etc. For more info on these
options see the `aiohttp.ClientSession` docs for more information:
Expand Down Expand Up @@ -128,7 +132,8 @@ def __init__(self, url, session=None, auth=None, proxy=None, proxy_auth=None,
self._close_session_on_finalize = False
else:
timeout = aiohttp.ClientTimeout(total=None, sock_connect=600, sock_read=600)
self.session = aiohttp.ClientSession(timeout=timeout)
conn = aiohttp.TCPConnector({'force_close': True})
self.session = aiohttp.ClientSession(connector=conn, timeout=timeout)
self._close_session_on_finalize = True
self.auth = auth
self.proxy = proxy
Expand Down Expand Up @@ -158,7 +163,8 @@ async def _handle_response(self, response):
return DownloadResult(path=self.path, artifact_attributes=self.artifact_attributes,
url=self.url)

@backoff.on_exception(backoff.expo, aiohttp.ClientResponseError, max_tries=10, giveup=giveup)
@backoff.on_exception(backoff.expo, aiohttp.ClientResponseError,
max_tries=10, giveup=http_giveup)
async def run(self, extra_data=None):
"""
Download, validate, and compute digests on the `url`. This is a coroutine.
Expand All @@ -167,16 +173,20 @@ async def run(self, extra_data=None):
some 5XX errors. It retries with exponential backoff 10 times before allowing
a final exception to be raised.
This method supports restricting the number of concurrent downloaders by acquiring the
self.semaphore immediately before any downloading is done.
This method provides the same return object type and documented in
:meth:`~pulpcore.plugin.download.BaseDownloader.run`.
Args:
extra_data (dict): Extra data passed by the downloader.
"""
async with self.session.get(self.url) as response:
response.raise_for_status()
to_return = await self._handle_response(response)
await response.release()
if self._close_session_on_finalize:
await self.session.close()
return to_return
async with self.semaphore:
async with self.session.get(self.url) as response:
response.raise_for_status()
to_return = await self._handle_response(response)
await response.release()
if self._close_session_on_finalize:
await self.session.close()
return to_return
2 changes: 1 addition & 1 deletion pulpcore/pulpcore/app/models/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def tls_storage_path(self, name):
username = models.TextField(blank=True)
password = models.TextField(blank=True)
last_synced = models.DateTimeField(blank=True, null=True)
connection_limit = models.PositiveIntegerField(default=5)
connection_limit = models.PositiveIntegerField(default=20)

class Meta:
default_related_name = 'remotes'
Expand Down

0 comments on commit 7dbc5e6

Please sign in to comment.