cors: handle mismatched implicit/explicit ports in host header

http://host:80 should match http://host

cors tests are parametrized to make it easier to add more cases
This commit is contained in:
Min RK
2021-12-02 11:01:53 +01:00
parent 85e37e7f8c
commit ced941a6aa
2 changed files with 83 additions and 78 deletions

View File

@@ -9,6 +9,7 @@ from datetime import timedelta
from unittest import mock
from urllib.parse import quote
from urllib.parse import urlparse
from urllib.parse import urlunparse
from pytest import fixture
from pytest import mark
@@ -65,7 +66,15 @@ async def test_auth_api(app):
assert r.status_code == 403
async def test_cors_checks(request, app):
@mark.parametrize(
"content_type, status",
[
("text/plain", 403),
# accepted, but invalid
("application/json; charset=UTF-8", 400),
],
)
async def test_post_content_type(app, content_type, status):
url = ujoin(public_host(app), app.hub.base_url)
host = urlparse(url).netloc
# add admin user
@@ -74,42 +83,6 @@ async def test_cors_checks(request, app):
user = add_user(app.db, name='admin', admin=True)
cookies = await app.login_user('admin')
r = await api_request(
app, 'users', headers={'Authorization': '', 'Referer': 'null'}, cookies=cookies
)
assert r.status_code == 403
r = await api_request(
app,
'users',
headers={
'Authorization': '',
'Referer': 'http://attack.com/csrf/vulnerability',
},
cookies=cookies,
)
assert r.status_code == 403
r = await api_request(
app,
'users',
headers={'Authorization': '', 'Referer': url, 'Host': host},
cookies=cookies,
)
assert r.status_code == 200
r = await api_request(
app,
'users',
headers={
'Authorization': '',
'Referer': ujoin(url, 'foo/bar/baz/bat'),
'Host': host,
},
cookies=cookies,
)
assert r.status_code == 200
r = await api_request(
app,
'users',
@@ -117,24 +90,62 @@ async def test_cors_checks(request, app):
data='{}',
headers={
"Authorization": "",
"Content-Type": "text/plain",
"Content-Type": content_type,
},
cookies=cookies,
)
assert r.status_code == 403
assert r.status_code == status
r = await api_request(
app,
'users',
method='post',
data='{}',
headers={
"Authorization": "",
"Content-Type": "application/json; charset=UTF-8",
},
cookies=cookies,
)
assert r.status_code == 400 # accepted, but invalid
@mark.parametrize(
"host, referer, status",
[
('$host', '$url', 200),
(None, None, 200),
(None, 'null', 403),
(None, 'http://attack.com/csrf/vulnerability', 403),
('$host', {"path": "/user/someuser"}, 403),
('$host', {"path": "{path}/foo/bar/subpath"}, 200),
# mismatch host
("mismatch.com", "$url", 403),
# explicit host, matches
("fake.example", {"netloc": "fake.example"}, 200),
# explicit port, matches implicit port
("fake.example:80", {"netloc": "fake.example"}, 200),
# explicit port, mismatch
("fake.example:81", {"netloc": "fake.example"}, 403),
# implicit ports, mismatch proto
("fake.example", {"netloc": "fake.example", "scheme": "https"}, 403),
],
)
async def test_cors_check(request, app, host, referer, status):
url = ujoin(public_host(app), app.hub.base_url)
real_host = urlparse(url).netloc
if host == "$host":
host = real_host
if referer == '$url':
referer = url
elif isinstance(referer, dict):
parsed_url = urlparse(url)
# apply {}
url_ns = {key: getattr(parsed_url, key) for key in parsed_url._fields}
for key, value in referer.items():
referer[key] = value.format(**url_ns)
referer = urlunparse(parsed_url._replace(**referer))
# disable default auth header, cors is for cookie auth
headers = {"Authorization": ""}
if host is not None:
headers['X-Forwarded-Host'] = host
if referer is not None:
headers['Referer'] = referer
# add admin user
user = find_user(app.db, 'admin')
if user is None:
user = add_user(app.db, name='admin', admin=True)
cookies = await app.login_user('admin')
# test custom forwarded_host_header behavior
app.forwarded_host_header = 'X-Forwarded-Host'
@@ -148,28 +159,10 @@ async def test_cors_checks(request, app):
r = await api_request(
app,
'users',
headers={
'Authorization': '',
'Referer': url,
'Host': host,
'X-Forwarded-Host': 'example.com',
},
headers=headers,
cookies=cookies,
)
assert r.status_code == 403
r = await api_request(
app,
'users',
headers={
'Authorization': '',
'Referer': url,
'Host': host,
'X-Forwarded-Host': host,
},
cookies=cookies,
)
assert r.status_code == 200
assert r.status_code == status
# --------------