only preserve params when ?next= is not specified

This commit is contained in:
Min RK
2020-11-17 11:00:37 +01:00
parent 553ee26312
commit 87e4f458fb
2 changed files with 99 additions and 8 deletions

View File

@@ -634,6 +634,12 @@ class BaseHandler(RequestHandler):
next_url, next_url,
) )
# this is where we know if next_url is coming from ?next= param or we are using a default url
if next_url:
next_url_from_param = True
else:
next_url_from_param = False
if not next_url: if not next_url:
# custom default URL, usually passed because user landed on that page but was not logged in # custom default URL, usually passed because user landed on that page but was not logged in
if default: if default:
@@ -659,7 +665,10 @@ class BaseHandler(RequestHandler):
else: else:
next_url = url_path_join(self.hub.base_url, 'home') next_url = url_path_join(self.hub.base_url, 'home')
next_url = self.append_query_parameters(next_url, exclude=['next']) if not next_url_from_param:
# when a request made with ?next=... assume all the params have already been encoded
# otherwise, preserve params from the current request across the redirect
next_url = self.append_query_parameters(next_url, exclude=['next'])
return next_url return next_url
def append_query_parameters(self, url, exclude=None): def append_query_parameters(self, url, exclude=None):

View File

@@ -31,7 +31,7 @@ async def test_root_no_auth(app):
url = ujoin(public_host(app), app.hub.base_url) url = ujoin(public_host(app), app.hub.base_url)
r = await async_requests.get(url) r = await async_requests.get(url)
r.raise_for_status() r.raise_for_status()
assert r.url == ujoin(url, 'login') assert r.url == url_concat(ujoin(url, 'login'), dict(next=app.hub.base_url))
async def test_root_auth(app): async def test_root_auth(app):
@@ -622,9 +622,16 @@ async def test_login_strip(app):
(False, '//other.domain', '', None), (False, '//other.domain', '', None),
(False, '///other.domain/triple', '', None), (False, '///other.domain/triple', '', None),
(False, '\\\\other.domain/backslashes', '', None), (False, '\\\\other.domain/backslashes', '', None),
# params are handled correctly # params are handled correctly (ignored if ?next= specified)
(True, '/hub/admin', 'hub/admin?left=1&right=2', [('left', 1), ('right', 2)]), (
(False, '/hub/admin', 'hub/admin?left=1&right=2', [('left', 1), ('right', 2)]), True,
'/hub/admin?left=1&right=2',
'hub/admin?left=1&right=2',
{"left": "abc"},
),
(False, '/hub/admin', 'hub/admin', [('left', 1), ('right', 2)]),
(True, '', '', {"keep": "yes"}),
(False, '', '', {"keep": "yes"}),
], ],
) )
async def test_login_redirect(app, running, next_url, location, params): async def test_login_redirect(app, running, next_url, location, params):
@@ -633,10 +640,15 @@ async def test_login_redirect(app, running, next_url, location, params):
if location: if location:
location = ujoin(app.base_url, location) location = ujoin(app.base_url, location)
elif running: elif running:
# location not specified,
location = user.url location = user.url
if params:
location = url_concat(location, params)
else: else:
# use default url # use default url
location = ujoin(app.base_url, 'hub/spawn') location = ujoin(app.base_url, 'hub/spawn')
if params:
location = url_concat(location, params)
url = 'login' url = 'login'
if params: if params:
@@ -655,7 +667,73 @@ async def test_login_redirect(app, running, next_url, location, params):
r = await get_page(url, app, cookies=cookies, allow_redirects=False) r = await get_page(url, app, cookies=cookies, allow_redirects=False)
r.raise_for_status() r.raise_for_status()
assert r.status_code == 302 assert r.status_code == 302
assert location == r.headers['Location'] assert r.headers["Location"] == location
@pytest.mark.parametrize(
'location, next, extra_params',
[
(
"{base_url}hub/spawn?a=5",
None,
{"a": "5"},
), # no ?next= given, preserve params
("/x", "/x", {"a": "5"}), # ?next=given, params ignored
(
"/x?b=10",
"/x?b=10",
{"a": "5"},
), # ?next=given with params, additional params ignored
],
)
async def test_next_url(app, user, location, next, extra_params):
params = {}
if extra_params:
params.update(extra_params)
if next:
params["next"] = next
url = url_concat("/", params)
cookies = await app.login_user("monster")
# location can be a string template
location = location.format(base_url=app.base_url)
r = await get_page(url, app, cookies=cookies, allow_redirects=False)
r.raise_for_status()
assert r.status_code == 302
assert r.headers["Location"] == location
async def test_next_url_params_sequence(app, user):
"""Test each step of / -> login -> spawn
and whether they preserve url params
"""
params = {"xyz": "5"}
# first request: root page, with params, not logged in
r = await get_page("/?xyz=5", app, allow_redirects=False)
r.raise_for_status()
location = r.headers["Location"]
# next page: login
cookies = await app.login_user(user.name)
assert location == url_concat(
ujoin(app.base_url, "/hub/login"), {"next": ujoin(app.base_url, "/hub/?xyz=5")}
)
r = await async_requests.get(
public_host(app) + location, cookies=cookies, allow_redirects=False
)
r.raise_for_status()
location = r.headers["Location"]
# after login, redirect back
assert location == ujoin(app.base_url, "/hub/?xyz=5")
r = await async_requests.get(
public_host(app) + location, cookies=cookies, allow_redirects=False
)
r.raise_for_status()
location = r.headers["Location"]
assert location == ujoin(app.base_url, "/hub/spawn?xyz=5")
async def test_auto_login(app, request): async def test_auto_login(app, request):
@@ -669,14 +747,18 @@ async def test_auto_login(app, request):
) )
# no auto_login: end up at /hub/login # no auto_login: end up at /hub/login
r = await async_requests.get(base_url) r = await async_requests.get(base_url)
assert r.url == public_url(app, path='hub/login') assert r.url == url_concat(
public_url(app, path="hub/login"), {"next": app.hub.base_url}
)
# enable auto_login: redirect from /hub/login to /hub/dummy # enable auto_login: redirect from /hub/login to /hub/dummy
authenticator = Authenticator(auto_login=True) authenticator = Authenticator(auto_login=True)
authenticator.login_url = lambda base_url: ujoin(base_url, 'dummy') authenticator.login_url = lambda base_url: ujoin(base_url, 'dummy')
with mock.patch.dict(app.tornado_settings, {'authenticator': authenticator}): with mock.patch.dict(app.tornado_settings, {'authenticator': authenticator}):
r = await async_requests.get(base_url) r = await async_requests.get(base_url)
assert r.url == public_url(app, path='hub/dummy') assert r.url == url_concat(
public_url(app, path="hub/dummy"), {"next": app.hub.base_url}
)
async def test_auto_login_logout(app): async def test_auto_login_logout(app):