diff --git a/plugin/pulpcore/plugin/download/base.py b/plugin/pulpcore/plugin/download/base.py index 0fe286a6fa..e366158e98 100644 --- a/plugin/pulpcore/plugin/download/base.py +++ b/plugin/pulpcore/plugin/download/base.py @@ -61,7 +61,8 @@ class BaseDownloader: ``custom_file_object`` option was specified, otherwise None. """ - def __init__(self, url, custom_file_object=None, expected_digests=None, expected_size=None): + def __init__(self, url, custom_file_object=None, expected_digests=None, expected_size=None, + semaphore=None): """ Create a BaseDownloader object. This is expected to be called by all subclasses. @@ -73,6 +74,8 @@ def __init__(self, url, custom_file_object=None, expected_digests=None, expected expected_digests (dict): Keyed on the algorithm name provided by hashlib and stores the value of the expected digest. e.g. {'md5': '912ec803b2ce49e4a541068d495ab570'} expected_size (int): The number of bytes the download is expected to have. + semaphore (asyncio.Semaphore): A semaphore the downloader must acquire before running. + Useful for limiting the number of outstanding downloaders in various ways. """ self.url = url if custom_file_object: @@ -83,6 +86,10 @@ def __init__(self, url, custom_file_object=None, expected_digests=None, expected self.path = self._writer.name self.expected_digests = expected_digests self.expected_size = expected_size + if semaphore: + self.semaphore = semaphore + else: + self.semaphore = asyncio.Semaphore() # This will always be acquired self._digests = {n: hashlib.new(n) for n in Artifact.DIGEST_FIELDS} self._size = 0 @@ -203,6 +210,9 @@ async def run(self): :class:`~pulpcore.plugin.download.DownloadResult` is usually set to the :attr:`~pulpcore.plugin.download.BaseDownloader.artifact_attributes` property value. + Additionally it is expected for the downloader to acquire self.semaphore before doing other + work. This allows the `semaphore` parameter to restricts the downloader as expected. + Returns: :class:`~pulpcore.plugin.download.DownloadResult` diff --git a/plugin/pulpcore/plugin/download/factory.py b/plugin/pulpcore/plugin/download/factory.py index efeaba59a9..e354ed220b 100644 --- a/plugin/pulpcore/plugin/download/factory.py +++ b/plugin/pulpcore/plugin/download/factory.py @@ -1,10 +1,12 @@ -import aiohttp +import asyncio import atexit import copy from gettext import gettext as _ import ssl from urllib.parse import urlparse +import aiohttp + from .http import HttpDownloader from .file import FileDownloader @@ -20,7 +22,8 @@ class DownloaderFactory: """ A factory for creating downloader objects that are configured from with remote settings. - The DownloadFactory correctly handles SSL settings, basic auth settings, and proxy settings. + The DownloadFactory correctly handles SSL settings, basic auth settings, proxy settings, and + connection limit settings. It supports handling urls with the `http`, `https`, and `file` protocols. The ``downloader_overrides`` option allows the caller to specify the download class to be used for @@ -38,6 +41,10 @@ class DownloaderFactory: http://aiohttp.readthedocs.io/en/stable/client_quickstart.html#timeouts Behaviorally, it should allow for an active download to be arbitrarily long, while still detecting dead or closed sessions even when TCPKeepAlive is disabled. + + Also for http and https urls, even though HTTP 1.1 is used, the TCP connection is setup and + closed with each request. This is done for compatibility reasons due to various issues related + to session continuation implementation in various servers. """ def __init__(self, remote, downloader_overrides=None): @@ -57,16 +64,19 @@ def __init__(self, remote, downloader_overrides=None): self._handler_map = {'https': self._http_or_https, 'http': self._http_or_https, 'file': self._generic} self._session = self._make_aiohttp_session_from_remote() + self._semaphore = asyncio.Semaphore(value=remote.connection_limit) atexit.register(self._session.close) def _make_aiohttp_session_from_remote(self): """ Build a :class:`aiohttp.ClientSession` from the remote's settings and timing settings. + This method is what provides the force_close of the TCP connection with each request. + Returns: :class:`aiohttp.ClientSession` """ - tcp_conn_opts = {} + tcp_conn_opts = {'force_close': True} sslcontext = None if self._remote.ssl_ca_certificate.name: @@ -90,9 +100,6 @@ def _make_aiohttp_session_from_remote(self): if self._remote.ssl_validation: tcp_conn_opts['verify_ssl'] = self._remote.ssl_validation - if self._remote.connection_limit: - tcp_conn_opts['limit'] = self._remote.connection_limit - conn = aiohttp.TCPConnector(**tcp_conn_opts) auth_options = {} @@ -109,6 +116,8 @@ def build(self, url, **kwargs): """ Build a downloader which can optionally verify integrity using either digest or size. + The built downloader also provides concurrency restriction if specified by the remote. + Args: url (str): The download URL. kwargs (dict): All kwargs are passed along to the downloader. At a minimum, these @@ -118,6 +127,7 @@ def build(self, url, **kwargs): subclass of :class:`~pulpcore.plugin.download.BaseDownloader`: A downloader that is configured with the remote settings. """ + kwargs['semaphore'] = self._semaphore scheme = urlparse(url).scheme.lower() try: builder = self._handler_map[scheme] diff --git a/plugin/pulpcore/plugin/download/file.py b/plugin/pulpcore/plugin/download/file.py index 533da5b581..8966b7bdb8 100644 --- a/plugin/pulpcore/plugin/download/file.py +++ b/plugin/pulpcore/plugin/download/file.py @@ -36,18 +36,22 @@ async def run(self, extra_data=None): """ Read, validate, and compute digests on the `url`. This is a coroutine. + This method supports restricting the number of concurrent downloaders by acquiring the + self.semaphore immediately before any downloading is done. + This method provides the same return object type and documented in :meth:`~pulpcore.plugin.download.BaseDownloader.run`. Args: extra_data (dict): Extra data passed to the downloader. """ - async with aiofiles.open(self._path, 'rb') as f_handle: - while True: - chunk = await f_handle.read(1048576) # 1 megabyte - if not chunk: - self.finalize() - break # the reading is done - self.handle_data(chunk) - return DownloadResult(path=self._path, artifact_attributes=self.artifact_attributes, - url=self.url) + async with self.semaphore: + async with aiofiles.open(self._path, 'rb') as f_handle: + while True: + chunk = await f_handle.read(1048576) # 1 megabyte + if not chunk: + self.finalize() + break # the reading is done + self.handle_data(chunk) + return DownloadResult(path=self._path, artifact_attributes=self.artifact_attributes, + url=self.url) diff --git a/plugin/pulpcore/plugin/download/http.py b/plugin/pulpcore/plugin/download/http.py index 0ea975e57c..8857162e70 100644 --- a/plugin/pulpcore/plugin/download/http.py +++ b/plugin/pulpcore/plugin/download/http.py @@ -12,7 +12,7 @@ logging.getLogger('backoff').addHandler(logging.StreamHandler()) -def giveup(exc): +def http_giveup(exc): """ Inspect a raised exception and determine if we should give up. @@ -54,6 +54,10 @@ class HttpDownloader(BaseDownloader): allow for an active download to be arbitrarily long, while still detecting dead or closed sessions even when TCPKeepAlive is disabled. + If a session is not provided, the one created will force TCP connection closure after each + request. This is done for compatibility reasons due to various issues related to session + continuation implementation in various servers. + `aiohttp.ClientSession` objects allows you to configure options that will apply to all downloaders using that session such as auth, timeouts, headers, etc. For more info on these options see the `aiohttp.ClientSession` docs for more information: @@ -128,7 +132,8 @@ def __init__(self, url, session=None, auth=None, proxy=None, proxy_auth=None, self._close_session_on_finalize = False else: timeout = aiohttp.ClientTimeout(total=None, sock_connect=600, sock_read=600) - self.session = aiohttp.ClientSession(timeout=timeout) + conn = aiohttp.TCPConnector({'force_close': True}) + self.session = aiohttp.ClientSession(connector=conn, timeout=timeout) self._close_session_on_finalize = True self.auth = auth self.proxy = proxy @@ -158,7 +163,8 @@ async def _handle_response(self, response): return DownloadResult(path=self.path, artifact_attributes=self.artifact_attributes, url=self.url) - @backoff.on_exception(backoff.expo, aiohttp.ClientResponseError, max_tries=10, giveup=giveup) + @backoff.on_exception(backoff.expo, aiohttp.ClientResponseError, + max_tries=10, giveup=http_giveup) async def run(self, extra_data=None): """ Download, validate, and compute digests on the `url`. This is a coroutine. @@ -167,16 +173,20 @@ async def run(self, extra_data=None): some 5XX errors. It retries with exponential backoff 10 times before allowing a final exception to be raised. + This method supports restricting the number of concurrent downloaders by acquiring the + self.semaphore immediately before any downloading is done. + This method provides the same return object type and documented in :meth:`~pulpcore.plugin.download.BaseDownloader.run`. Args: extra_data (dict): Extra data passed by the downloader. """ - async with self.session.get(self.url) as response: - response.raise_for_status() - to_return = await self._handle_response(response) - await response.release() - if self._close_session_on_finalize: - await self.session.close() - return to_return + async with self.semaphore: + async with self.session.get(self.url) as response: + response.raise_for_status() + to_return = await self._handle_response(response) + await response.release() + if self._close_session_on_finalize: + await self.session.close() + return to_return diff --git a/pulpcore/pulpcore/app/models/repository.py b/pulpcore/pulpcore/app/models/repository.py index 850e65f2be..29f717c4d5 100644 --- a/pulpcore/pulpcore/app/models/repository.py +++ b/pulpcore/pulpcore/app/models/repository.py @@ -104,7 +104,7 @@ def tls_storage_path(self, name): username = models.TextField(blank=True) password = models.TextField(blank=True) last_synced = models.DateTimeField(blank=True, null=True) - connection_limit = models.PositiveIntegerField(default=5) + connection_limit = models.PositiveIntegerField(default=20) class Meta: default_related_name = 'remotes'