mirror of
https://github.com/jupyterhub/jupyterhub.git
synced 2025-10-11 12:03:00 +00:00
No more need for mock roles
This commit is contained in:
@@ -78,13 +78,6 @@ def mock_open_session(username, service, encoding):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def mock_role(app, role='admin', name=None):
|
|
||||||
scopes = get_scopes(role)
|
|
||||||
if name is not None:
|
|
||||||
scopes = [scope.format(username=name) for scope in scopes]
|
|
||||||
return mock.patch.dict(app.tornado_settings, {'mock_scopes': scopes})
|
|
||||||
|
|
||||||
|
|
||||||
class MockSpawner(SimpleLocalProcessSpawner):
|
class MockSpawner(SimpleLocalProcessSpawner):
|
||||||
"""Base mock spawner
|
"""Base mock spawner
|
||||||
|
|
||||||
|
@@ -18,7 +18,6 @@ from .. import roles
|
|||||||
from ..objects import Server
|
from ..objects import Server
|
||||||
from ..utils import url_path_join as ujoin
|
from ..utils import url_path_join as ujoin
|
||||||
from ..utils import utcnow
|
from ..utils import utcnow
|
||||||
from .mocking import mock_role
|
|
||||||
from .mocking import public_host
|
from .mocking import public_host
|
||||||
from .mocking import public_url
|
from .mocking import public_url
|
||||||
from .utils import add_user
|
from .utils import add_user
|
||||||
@@ -181,8 +180,7 @@ async def test_get_users(app):
|
|||||||
{'name': 'user', 'admin': False, 'roles': ['user'], 'last_activity': None}
|
{'name': 'user', 'admin': False, 'roles': ['user'], 'last_activity': None}
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
with mock_role(app, 'user'):
|
r = await api_request(app, 'users', headers=auth_header(db, 'user'))
|
||||||
r = await api_request(app, 'users', headers=auth_header(db, 'user'))
|
|
||||||
assert r.status_code == 403
|
assert r.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
@@ -284,12 +282,11 @@ async def test_get_self(app):
|
|||||||
assert model['name'] == u.name
|
assert model['name'] == u.name
|
||||||
|
|
||||||
# invalid auth gets 403
|
# invalid auth gets 403
|
||||||
with mock_role(app, 'user'):
|
r = await api_request(
|
||||||
r = await api_request(
|
app,
|
||||||
app,
|
'user',
|
||||||
'user',
|
headers={'Authorization': 'token notvalid'},
|
||||||
headers={'Authorization': 'token notvalid'},
|
)
|
||||||
)
|
|
||||||
assert r.status_code == 403
|
assert r.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
@@ -314,12 +311,11 @@ async def test_add_user(app):
|
|||||||
async def test_get_user(app):
|
async def test_get_user(app):
|
||||||
name = 'user'
|
name = 'user'
|
||||||
_ = await api_request(app, 'users', name, headers=auth_header(app.db, name))
|
_ = await api_request(app, 'users', name, headers=auth_header(app.db, name))
|
||||||
with mock_role(app, role=name, name=name):
|
r = await api_request(
|
||||||
r = await api_request(
|
app,
|
||||||
app,
|
'users',
|
||||||
'users',
|
name,
|
||||||
name,
|
)
|
||||||
)
|
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
|
||||||
user = normalize_user(r.json())
|
user = normalize_user(r.json())
|
||||||
@@ -518,15 +514,14 @@ async def test_user_set_auth_state(app, auth_state_enabled):
|
|||||||
assert user.name == name
|
assert user.name == name
|
||||||
user_auth_state = await user.get_auth_state()
|
user_auth_state = await user.get_auth_state()
|
||||||
assert user_auth_state is None
|
assert user_auth_state is None
|
||||||
with mock_role(app, 'user'):
|
r = await api_request(
|
||||||
r = await api_request(
|
app,
|
||||||
app,
|
'users',
|
||||||
'users',
|
name,
|
||||||
name,
|
method='patch',
|
||||||
method='patch',
|
data=json.dumps({'auth_state': auth_state}),
|
||||||
data=json.dumps({'auth_state': auth_state}),
|
headers=auth_header(app.db, name),
|
||||||
headers=auth_header(app.db, name),
|
)
|
||||||
)
|
|
||||||
assert r.status_code == 403
|
assert r.status_code == 403
|
||||||
user_auth_state = await user.get_auth_state()
|
user_auth_state = await user.get_auth_state()
|
||||||
assert user_auth_state is None
|
assert user_auth_state is None
|
||||||
@@ -1348,16 +1343,15 @@ async def test_token_authenticator_noauth(app):
|
|||||||
"""Create a token for a user relying on Authenticator.authenticate and no auth header"""
|
"""Create a token for a user relying on Authenticator.authenticate and no auth header"""
|
||||||
name = 'user'
|
name = 'user'
|
||||||
data = {'auth': {'username': name, 'password': name}}
|
data = {'auth': {'username': name, 'password': name}}
|
||||||
with mock_role(app, 'admin'):
|
r = await api_request(
|
||||||
r = await api_request(
|
app,
|
||||||
app,
|
'users',
|
||||||
'users',
|
name,
|
||||||
name,
|
'tokens',
|
||||||
'tokens',
|
method='post',
|
||||||
method='post',
|
data=json.dumps(data) if data else None,
|
||||||
data=json.dumps(data) if data else None,
|
noauth=True,
|
||||||
noauth=True,
|
)
|
||||||
)
|
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
reply = r.json()
|
reply = r.json()
|
||||||
assert 'token' in reply
|
assert 'token' in reply
|
||||||
@@ -1372,16 +1366,15 @@ async def test_token_authenticator_dict_noauth(app):
|
|||||||
app.authenticator.auth_state = {'who': 'cares'}
|
app.authenticator.auth_state = {'who': 'cares'}
|
||||||
name = 'user'
|
name = 'user'
|
||||||
data = {'auth': {'username': name, 'password': name}}
|
data = {'auth': {'username': name, 'password': name}}
|
||||||
with mock_role(app, 'user'):
|
r = await api_request(
|
||||||
r = await api_request(
|
app,
|
||||||
app,
|
'users',
|
||||||
'users',
|
name,
|
||||||
name,
|
'tokens',
|
||||||
'tokens',
|
method='post',
|
||||||
method='post',
|
data=json.dumps(data) if data else None,
|
||||||
data=json.dumps(data) if data else None,
|
noauth=True,
|
||||||
noauth=True,
|
)
|
||||||
)
|
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
reply = r.json()
|
reply = r.json()
|
||||||
assert 'token' in reply
|
assert 'token' in reply
|
||||||
@@ -1405,8 +1398,7 @@ async def test_token_list(app, as_user, for_user, status):
|
|||||||
if for_user != 'missing':
|
if for_user != 'missing':
|
||||||
for_user_obj = add_user(app.db, app, name=for_user)
|
for_user_obj = add_user(app.db, app, name=for_user)
|
||||||
headers = {'Authorization': 'token %s' % u.new_api_token()}
|
headers = {'Authorization': 'token %s' % u.new_api_token()}
|
||||||
with mock_role(app, role=as_user, name=as_user):
|
r = await api_request(app, 'users', for_user, 'tokens', headers=headers)
|
||||||
r = await api_request(app, 'users', for_user, 'tokens', headers=headers)
|
|
||||||
assert r.status_code == status
|
assert r.status_code == status
|
||||||
if status != 200:
|
if status != 200:
|
||||||
return
|
return
|
||||||
@@ -1417,10 +1409,9 @@ async def test_token_list(app, as_user, for_user, status):
|
|||||||
assert all(token['user'] == for_user for token in reply['oauth_tokens'])
|
assert all(token['user'] == for_user for token in reply['oauth_tokens'])
|
||||||
# validate individual token ids
|
# validate individual token ids
|
||||||
for token in reply['api_tokens'] + reply['oauth_tokens']:
|
for token in reply['api_tokens'] + reply['oauth_tokens']:
|
||||||
with mock_role(app, role=as_user, name=as_user):
|
r = await api_request(
|
||||||
r = await api_request(
|
app, 'users', for_user, 'tokens', token['id'], headers=headers
|
||||||
app, 'users', for_user, 'tokens', token['id'], headers=headers
|
)
|
||||||
)
|
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
reply = r.json()
|
reply = r.json()
|
||||||
assert normalize_token(reply) == normalize_token(token)
|
assert normalize_token(reply) == normalize_token(token)
|
||||||
@@ -1603,8 +1594,7 @@ async def test_get_services(app, mockservice_url):
|
|||||||
'display': True,
|
'display': True,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
with mock_role(app, 'user'):
|
r = await api_request(app, 'services', headers=auth_header(db, 'user'))
|
||||||
r = await api_request(app, 'services', headers=auth_header(db, 'user'))
|
|
||||||
assert r.status_code == 403
|
assert r.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
@@ -1647,16 +1637,14 @@ async def test_root_api(app):
|
|||||||
if app.internal_ssl:
|
if app.internal_ssl:
|
||||||
kwargs['cert'] = (app.internal_ssl_cert, app.internal_ssl_key)
|
kwargs['cert'] = (app.internal_ssl_cert, app.internal_ssl_key)
|
||||||
kwargs["verify"] = app.internal_ssl_ca
|
kwargs["verify"] = app.internal_ssl_ca
|
||||||
with mock_role(app):
|
r = await api_request(app, bypass_proxy=True)
|
||||||
r = await api_request(app, bypass_proxy=True)
|
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
expected = {'version': jupyterhub.__version__}
|
expected = {'version': jupyterhub.__version__}
|
||||||
assert r.json() == expected
|
assert r.json() == expected
|
||||||
|
|
||||||
|
|
||||||
async def test_info(app):
|
async def test_info(app):
|
||||||
with mock_role(app):
|
r = await api_request(app, 'info')
|
||||||
r = await api_request(app, 'info')
|
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
data = r.json()
|
data = r.json()
|
||||||
assert data['version'] == jupyterhub.__version__
|
assert data['version'] == jupyterhub.__version__
|
||||||
@@ -1686,14 +1674,13 @@ async def test_info(app):
|
|||||||
|
|
||||||
async def test_update_activity_403(app, user, admin_user):
|
async def test_update_activity_403(app, user, admin_user):
|
||||||
token = user.new_api_token()
|
token = user.new_api_token()
|
||||||
with mock_role(app, 'user'):
|
r = await api_request(
|
||||||
r = await api_request(
|
app,
|
||||||
app,
|
"users/{}/activity".format(admin_user.name),
|
||||||
"users/{}/activity".format(admin_user.name),
|
headers={"Authorization": "token {}".format(token)},
|
||||||
headers={"Authorization": "token {}".format(token)},
|
data="{}",
|
||||||
data="{}",
|
method="post",
|
||||||
method="post",
|
)
|
||||||
)
|
|
||||||
assert r.status_code == 403
|
assert r.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user