-
Notifications
You must be signed in to change notification settings - Fork 168
Use force_close and concurrency restriction #3733
Conversation
Hello @bmbouter! Thanks for submitting the PR.
|
You mention "force_full" by accident in the commit message but nbd |
if semaphore: | ||
self.semaphore = semaphore | ||
else: | ||
self.semaphore = asyncio.Semaphore() # This will always be acquired |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be simplified to self.semaphore = semaphore or asyncio.Semaphore()
but this way might be more clear to readers, up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to remember that idiom! Thanks @dralley
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like your simplified idiom personally :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'm going to keep it as is because even though I like the shortened idiom, this for lets me keep the comment which I think is helpful since creating a random Semaphore might not be obvious why.
Without having tested it, LGTM. I'll let someone else review + test though since I don't want to tear up my environment. |
Do we have a way to regression-test this? Would it make sense to do so? Perhaps we could write a test and just have it be skipped by default, if it would take too long to run every time? |
Codecov Report
@@ Coverage Diff @@
## master #3733 +/- ##
======================================
Coverage 54.9% 54.9%
======================================
Files 64 64
Lines 2781 2781
======================================
Hits 1527 1527
Misses 1254 1254
Continue to review full report at Codecov.
|
@dralley when you say regression test do you mean a pulp smash test? Also this PR has several apsects, so we need to think about testing each one at some point. Overall, one option to do that with would be to make a dummy aiohttp server that would serve HTTP 1.1 with session continuation back to Pulp's downloaders at test time and then verify that the client disconnects even though the server tells it to keep going. I'm going to file two Pulp Test issues to have those functional tests tracked. |
Now that concurrency restriction is in the downloaders themselves, I believe |
Actually rather than including the name change with this PR I'm going to make a followon issue. That will give us a chance to pick the best name together before we make the change. I'll file an issue today. |
@@ -54,6 +54,10 @@ class HttpDownloader(BaseDownloader): | |||
allow for an active download to be arbitrarily long, while still detecting dead or closed | |||
sessions even when TCPKeepAlive is disabled. | |||
|
|||
If a session is not provided, the one created will disable HTTP session force TCP closure with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/will disable HTTP session force TCP closure with each request/will force TCP connection closure after each request/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in next push, ty
download concurrency rename ticket is here: https://pulp.plan.io/issues/4114 |
Here are the two testing tickets I wrote up: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is one problem with the current implementation. Otherwise, LGTM.
@@ -158,7 +163,8 @@ def __init__(self, url, session=None, auth=None, proxy=None, proxy_auth=None, | |||
return DownloadResult(path=self.path, artifact_attributes=self.artifact_attributes, | |||
url=self.url) | |||
|
|||
@backoff.on_exception(backoff.expo, aiohttp.ClientResponseError, max_tries=10, giveup=giveup) | |||
@backoff.on_exception(backoff.expo, aiohttp.ClientResponseError, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but doesn't this essentially have the same behavior as before? The backoff and waiting time is implemented as a decorator, i.e. happens outside of the method when the semaphore is not acquired.
I think the semaphore must be acquired outside of the run function. My idea would be to implement the limiting mechanism in the base class:
async def run(self, extra_data=None):
async with self.semaphore:
return await self._run(extra_data)
And the actual downloader would have:
@backoff.on_exception(backoff.expo, aiohttp.ClientResponseError,
max_tries=10, giveup=http_giveup)
async def _run(self, extra_data=None):
...
just as before.
This has the additional benefit that the actual Downloader does not need to re-implement the limiting mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gmbnomis indeed! Thank you again for your astute observation. I'm going to rework this now, and I'm going to file a test to ensure we're testing backoff correctly in core's functional tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed this test issue so that we would ensure this with each pulp_smash run on core. https://pulp.plan.io/issues/4118
The use of force_close was inspired by: aio-libs/aiohttp#2867 (comment) - Causes Factory and Downloaders to use force_close which causes the TCP connection to close with each request. - Ups the default concurrency to 20 - Moves the concurrency restriction feature to the Downloaders and updates HttpDownloader and FileDownloader to use it. - Stops using aiohttp for concurrency restriction because it's done in the downloaders. - The Factory now configures the concurrency restriction using the value on the remote. - creates a _run() method that subclassed Downloaders now need to implement instead of run(). https://pulp.plan.io/issues/4036 https://pulp.plan.io/issues/4075 closes #4036 closes #4075
I pushed what I rewrote and just tested. It performed well for me and also implements the suggested changes. Feel free to take a look. |
Use force_close and concurrency restriction
The use of force_close was inspired by:
aio-libs/aiohttp#2867 (comment)
connection to close with each request.
updates HttpDownloader and FileDownloader to use it.
the downloaders.
on the remote.
implement instead of run().
https://pulp.plan.io/issues/4036
https://pulp.plan.io/issues/4075
closes #4036
closes #4075