Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify auth header destiny during the redirect #5850

Merged
merged 2 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/client_advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ For *text/plain* ::

await session.post(url, data='Привет, Мир!')

.. note::
``Authorization`` header will be removed if you get redirected
to a different host or protocol.

Custom Cookies
--------------

Expand Down
83 changes: 69 additions & 14 deletions tests/test_client_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import json
import pathlib
import socket
import ssl
from unittest import mock

import pytest
from async_generator import async_generator, yield_
from multidict import MultiDict
from yarl import URL

import aiohttp
from aiohttp import Fingerprint, ServerFingerprintMismatch, hdrs, web
Expand Down Expand Up @@ -2327,25 +2329,75 @@ async def test_creds_in_auth_and_url() -> None:
await session.close()


async def test_drop_auth_on_redirect_to_other_host(aiohttp_server) -> None:
async def srv1(request):
assert request.host == "host1.com"
@pytest.fixture
def create_server_for_url_and_handler(aiohttp_server, tls_certificate_authority):
def create(url, srv):
app = web.Application()
app.router.add_route("GET", url.path, srv)

kwargs = {}
if url.scheme == "https":
cert = tls_certificate_authority.issue_cert(
url.host, "localhost", "127.0.0.1"
)
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
cert.configure_cert(ssl_ctx)
kwargs["ssl"] = ssl_ctx
return aiohttp_server(app, **kwargs)

return create


@pytest.mark.parametrize(
["url_from", "url_to"],
[
["http://host1.com/path1", "http://host2.com/path2"],
["http://host1.com/path1", "https://host1.com/path1"],
["https://host1.com/path1", "http://host1.com/path2"],
],
ids=(
"entirely different hosts",
"http -> https",
"https -> http",
),
)
async def test_drop_auth_on_redirect_to_other_host(
create_server_for_url_and_handler,
url_from,
url_to,
) -> None:
url_from, url_to = URL(url_from), URL(url_to)

async def srv_from(request):
assert request.host == url_from.host
assert request.headers["Authorization"] == "Basic dXNlcjpwYXNz"
raise web.HTTPFound("http://host2.com/path2")
raise web.HTTPFound(url_to)

async def srv2(request):
assert request.host == "host2.com"
assert "Authorization" not in request.headers
async def srv_to(request):
assert request.host == url_to.host
assert "Authorization" not in request.headers, "Header wasn't dropped"
return web.Response()

app = web.Application()
app.router.add_route("GET", "/path1", srv1)
app.router.add_route("GET", "/path2", srv2)
server_from = await create_server_for_url_and_handler(url_from, srv_from)
server_to = await create_server_for_url_and_handler(url_to, srv_to)

server = await aiohttp_server(app)
assert (
url_from.host != url_to.host or server_from.scheme != server_to.scheme
), "Invalid test case, host or scheme must differ"

protocol_port_map = {
"http": 80,
"https": 443,
}
etc_hosts = {
(url_from.host, protocol_port_map[server_from.scheme]): server_from,
(url_to.host, protocol_port_map[server_to.scheme]): server_to,
}

class FakeResolver(AbstractResolver):
async def resolve(self, host, port=0, family=socket.AF_INET):
server = etc_hosts[(host, port)]

return [
{
"hostname": host,
Expand All @@ -2360,14 +2412,17 @@ async def resolve(self, host, port=0, family=socket.AF_INET):
async def close(self):
pass

connector = aiohttp.TCPConnector(resolver=FakeResolver())
connector = aiohttp.TCPConnector(resolver=FakeResolver(), ssl=False)

async with aiohttp.ClientSession(connector=connector) as client:
resp = await client.get(
"http://host1.com/path1", auth=aiohttp.BasicAuth("user", "pass")
url_from,
auth=aiohttp.BasicAuth("user", "pass"),
)
assert resp.status == 200
resp = await client.get(
"http://host1.com/path1", headers={"Authorization": "Basic dXNlcjpwYXNz"}
url_from,
headers={"Authorization": "Basic dXNlcjpwYXNz"},
)
assert resp.status == 200

Expand Down