Files
jupyterhub/jupyterhub/tests/test_scopes.py
2024-02-07 08:34:39 +01:00

1366 lines
42 KiB
Python

"""Test scopes for API handlers"""
import types
from operator import itemgetter
from unittest import mock
import pytest
from pytest import mark
from tornado import web
from .. import orm, roles, scopes
from .._memoize import FrozenDict
from ..apihandlers import APIHandler
from ..scopes import (
Scope,
_expand_self_scope,
_intersect_expanded_scopes,
_resolve_requested_scopes,
expand_scopes,
get_scopes_for,
has_scope,
identify_scopes,
needs_scope,
parse_scopes,
)
from .utils import add_user, api_request, auth_header
def get_handler_with_scopes(scopes):
handler = mock.Mock(spec=APIHandler)
handler.has_scope = types.MethodType(APIHandler.has_scope, handler)
handler.parsed_scopes = parse_scopes(scopes)
return handler
def test_scope_constructor():
user1 = 'george'
user2 = 'michael'
scope_list = [
'users',
f'read:users!user={user1}',
f'read:users!user={user2}',
]
parsed_scopes = parse_scopes(scope_list)
assert isinstance(parsed_scopes, FrozenDict)
assert 'read:users' in parsed_scopes
assert parsed_scopes['users']
assert set(parsed_scopes['read:users']['user']) == {user1, user2}
def test_scope_precendence():
scope_list = ['read:users!user=maeby', 'read:users']
parsed_scopes = parse_scopes(scope_list)
assert parsed_scopes['read:users'] == Scope.ALL
def test_scope_check_present():
handler = get_handler_with_scopes(['read:users'])
assert handler.has_scope('read:users')
assert handler.has_scope('read:users!user=maeby')
def test_scope_check_not_present():
handler = get_handler_with_scopes(['read:users!user=maeby'])
assert not handler.has_scope('read:users')
assert not handler.has_scope('read:users!user=gob')
assert not handler.has_scope('read:users!server=gob/server')
def test_scope_filters():
handler = get_handler_with_scopes(
['read:users', 'read:users!group=bluths', 'read:users!user=maeby']
)
assert handler.has_scope('read:users!group=bluth')
assert handler.has_scope('read:users!user=maeby')
def test_scope_parse_server_name():
handler = get_handler_with_scopes(
['servers!server=maeby/server1', 'read:users!user=maeby']
)
assert handler.has_scope('servers!server=maeby/server1')
class MockAPIHandler:
def __init__(self):
self.expanded_scopes = {'users'}
self.parsed_scopes = {}
self.request = mock.Mock(spec=APIHandler)
self.request.path = '/path'
self.db = None
def set_scopes(self, *scopes):
self.expanded_scopes = set(scopes)
self.parsed_scopes = parse_scopes(self.expanded_scopes)
@needs_scope('users')
def user_thing(self, user_name):
return True
@needs_scope('servers')
def server_thing(self, user_name, server_name):
return True
@needs_scope('read:groups')
def group_thing(self, group_name):
return True
@needs_scope('read:services')
def service_thing(self, service_name):
return True
@needs_scope('users')
def other_thing(self, non_filter_argument):
# Rely on inner vertical filtering
return True
@needs_scope('users')
@needs_scope('read:services')
def secret_thing(self):
return True
@pytest.fixture
def mock_handler():
obj = MockAPIHandler()
return obj
@mark.parametrize(
"scopes, method, arguments, is_allowed",
[
(['users'], 'user_thing', ('user',), True),
(['users'], 'user_thing', ('michael',), True),
([''], 'user_thing', ('michael',), False),
(['read:users'], 'user_thing', ('gob',), False),
(['read:users'], 'user_thing', ('michael',), False),
(['users!user=george'], 'user_thing', ('george',), True),
(['users!user=george'], 'user_thing', ('fake_user',), False),
(['users!user=george'], 'user_thing', ('oscar',), False),
(['users!user=george', 'users!user=oscar'], 'user_thing', ('oscar',), True),
(['servers'], 'server_thing', ('user1', 'server_1'), True),
(['servers'], 'server_thing', ('user1', ''), True),
(['servers'], 'server_thing', ('user1', None), True),
(
['servers!server=maeby/bluth'],
'server_thing',
('maeby', 'bluth'),
True,
),
(['servers!server=maeby/bluth'], 'server_thing', ('gob', 'bluth'), False),
(
['servers!server=maeby/bluth'],
'server_thing',
('maybe', 'bluth2'),
False,
),
(['read:services'], 'service_thing', ('service1',), True),
(
['users!user=george', 'read:groups!group=bluths'],
'group_thing',
('bluths',),
True,
),
(
['users!user=george', 'read:groups!group=bluths'],
'group_thing',
('george',),
False,
),
(
['groups!group=george', 'read:groups!group=bluths'],
'group_thing',
('george',),
False,
),
(['users'], 'other_thing', ('gob',), True),
(['read:users'], 'other_thing', ('gob',), False),
(['users!user=gob'], 'other_thing', ('gob',), True),
(['users!user=gob'], 'other_thing', ('maeby',), True),
],
)
def test_scope_method_access(mock_handler, scopes, method, arguments, is_allowed):
mock_handler.current_user = mock.Mock(name=arguments[0])
mock_handler.set_scopes(*scopes)
api_call = getattr(mock_handler, method)
if is_allowed:
assert api_call(*arguments)
else:
with pytest.raises(web.HTTPError):
api_call(*arguments)
def test_double_scoped_method_succeeds(mock_handler):
mock_handler.current_user = mock.Mock(name='lucille')
mock_handler.set_scopes('users', 'read:services')
mock_handler.parsed_scopes = parse_scopes(mock_handler.expanded_scopes)
assert mock_handler.secret_thing()
def test_double_scoped_method_denials(mock_handler):
mock_handler.current_user = mock.Mock(name='lucille2')
mock_handler.set_scopes('users', 'read:groups')
with pytest.raises(web.HTTPError):
mock_handler.secret_thing()
@mark.parametrize(
"user_name, in_group, status_code",
[
('martha', False, 200),
('michael', True, 200),
('gob', True, 200),
('tobias', False, 404),
('ann', False, 404),
],
)
async def test_expand_groups(app, user_name, in_group, status_code):
test_role = {
'name': 'test',
'description': '',
'users': [user_name],
'scopes': [
'read:users!user=martha',
'read:users!group=bluth',
'read:groups',
],
}
roles.create_role(app.db, test_role)
user = add_user(app.db, name=user_name)
group_name = 'bluth'
group = orm.Group.find(app.db, name=group_name)
if not group:
group = orm.Group(name=group_name)
app.db.add(group)
if in_group and user not in group.users:
group.users.append(user)
roles.update_roles(app.db, user, roles=['test'])
roles.strip_role(app.db, user, 'user')
app.db.commit()
r = await api_request(
app, 'users', user_name, headers=auth_header(app.db, user_name)
)
assert r.status_code == status_code
app.db.delete(group)
app.db.commit()
async def test_by_fake_user(app):
user_name = 'shade'
user = add_user(app.db, name=user_name)
auth_ = auth_header(app.db, user_name)
app.users.delete(user)
app.db.commit()
r = await api_request(app, 'users', headers=auth_)
assert r.status_code == 403
err_message = "No access to resources or resources not found"
async def test_request_fake_user(app, create_user_with_scopes):
fake_user = 'annyong'
user = create_user_with_scopes('read:users!group=stuff')
r = await api_request(
app, 'users', fake_user, headers=auth_header(app.db, user.name)
)
assert r.status_code == 404
# Consistency between no user and user not accessible
assert r.json()['message'] == err_message
async def test_refuse_exceeding_token_permissions(
app, create_user_with_scopes, create_temp_role
):
user = create_user_with_scopes('self')
user.new_api_token()
with pytest.raises(ValueError):
user.api_tokens[0].update_scopes(["admin:users"])
async def test_exceeding_user_permissions(
app,
create_user_with_scopes,
):
user = create_user_with_scopes('list:users', 'read:users:groups')
api_token = user.new_api_token()
orm_api_token = orm.APIToken.find(app.db, token=api_token)
# store scopes user does not have
orm_api_token.scopes = list(orm_api_token.scopes) + ['list:users', 'read:users']
headers = {'Authorization': 'token %s' % api_token}
r = await api_request(app, 'users', headers=headers)
assert r.status_code == 200
keys = {key for user in r.json() for key in user.keys()}
assert 'groups' in keys
assert 'last_activity' not in keys
async def test_user_service_separation(app, mockservice_url, create_temp_role):
name = mockservice_url.name
user = add_user(app.db, name=name)
create_temp_role(['read:users', 'list:users'], 'reader_role')
create_temp_role(['read:users:groups', 'list:users'], 'subreader_role')
roles.update_roles(app.db, user, roles=['subreader_role'])
roles.update_roles(app.db, mockservice_url.orm, roles=['reader_role'])
user.roles.remove(orm.Role.find(app.db, name='user'))
api_token = user.new_api_token()
headers = {'Authorization': 'token %s' % api_token}
r = await api_request(app, 'users', headers=headers)
assert r.status_code == 200
keys = {key for user in r.json() for key in user.keys()}
assert 'groups' in keys
assert 'last_activity' not in keys
async def test_request_user_outside_group(app, create_user_with_scopes):
outside_user = 'hello'
user = create_user_with_scopes('read:users!group=stuff')
add_user(app.db, name=outside_user)
r = await api_request(
app, 'users', outside_user, headers=auth_header(app.db, user.name)
)
assert r.status_code == 404
# Consistency between no user and user not accessible
assert r.json()['message'] == err_message
async def test_user_filter(app, create_user_with_scopes):
name_in_scope = {'lindsay', 'oscar', 'gob'}
user = create_user_with_scopes(
*(f'list:users!user={name}' for name in name_in_scope)
)
outside_scope = {'maeby', 'marta'}
group_name = 'bluth'
group = orm.Group.find(app.db, name=group_name)
if not group:
group = orm.Group(name=group_name)
app.db.add(group)
for name in name_in_scope | outside_scope:
group_user = add_user(app.db, name=name)
if name not in group.users:
group.users.append(group_user)
app.db.commit()
r = await api_request(app, 'users', headers=auth_header(app.db, user.name))
assert r.status_code == 200
result_names = {user['name'] for user in r.json()}
assert result_names == name_in_scope
app.db.delete(group)
app.db.commit()
async def test_service_filter(app, create_user_with_scopes):
services = [
{'name': 'cull_idle', 'api_token': 'some-token'},
{'name': 'user_service', 'api_token': 'some-other-token'},
]
for service in services:
app.services.append(service)
app.init_services()
user = create_user_with_scopes('list:services!service=cull_idle')
r = await api_request(app, 'services', headers=auth_header(app.db, user.name))
assert r.status_code == 200
service_names = set(r.json().keys())
assert service_names == {'cull_idle'}
async def test_user_filter_with_group(app, create_user_with_scopes):
group_name = 'sitwell'
user1 = create_user_with_scopes(f'list:users!group={group_name}')
user2 = create_user_with_scopes('self')
external_user = create_user_with_scopes('self')
name_set = {user1.name, user2.name}
group = orm.Group.find(app.db, name=group_name)
if not group:
group = orm.Group(name=group_name)
app.db.add(group)
for user in {user1, user2}:
group.users.append(user.orm_user)
app.db.commit()
r = await api_request(app, 'users', headers=auth_header(app.db, user1.name))
assert r.status_code == 200
result_names = {user['name'] for user in r.json()}
assert result_names == name_set
assert external_user.name not in result_names
app.db.delete(group)
app.db.commit()
async def test_group_scope_filter(app, create_user_with_scopes):
in_groups = {'sitwell', 'bluth'}
out_groups = {'austero'}
user = create_user_with_scopes(
*(f'list:groups!group={group}' for group in in_groups)
)
for group_name in in_groups | out_groups:
group = orm.Group.find(app.db, name=group_name)
if not group:
group = orm.Group(name=group_name)
app.db.add(group)
app.db.commit()
r = await api_request(app, 'groups', headers=auth_header(app.db, user.name))
assert r.status_code == 200
result_names = {user['name'] for user in r.json()}
assert result_names == in_groups
for group_name in in_groups | out_groups:
group = orm.Group.find(app.db, name=group_name)
app.db.delete(group)
app.db.commit()
async def test_vertical_filter(app, create_user_with_scopes):
user = create_user_with_scopes('list:users')
r = await api_request(app, 'users', headers=auth_header(app.db, user.name))
assert r.status_code == 200
allowed_keys = {'name', 'kind', 'admin'}
assert {key for user in r.json() for key in user.keys()} == allowed_keys
async def test_stacked_vertical_filter(app, create_user_with_scopes):
user = create_user_with_scopes(
'list:users', 'read:users:activity', 'read:users:groups'
)
r = await api_request(app, 'users', headers=auth_header(app.db, user.name))
assert r.status_code == 200
allowed_keys = {'admin', 'name', 'kind', 'groups', 'last_activity'}
for user in r.json():
result_model = set(user)
assert result_model == allowed_keys
async def test_cross_filter(app, create_user_with_scopes):
user = create_user_with_scopes('read:users:activity', 'self', 'list:users')
new_users = {'britta', 'jeff', 'annie'}
for new_user_name in new_users:
add_user(app.db, name=new_user_name)
app.db.commit()
r = await api_request(app, 'users', headers=auth_header(app.db, user.name))
assert r.status_code == 200
restricted_keys = {'admin', 'name', 'kind', 'last_activity'}
key_in_full_model = 'created'
for model_user in r.json():
if model_user['name'] == user.name:
assert key_in_full_model in model_user
else:
assert set(model_user.keys()) == restricted_keys
@mark.parametrize(
"kind, has_user_scopes",
[
('users', True),
('services', False),
],
)
async def test_metascope_self_expansion(
app, kind, has_user_scopes, create_user_with_scopes, create_service_with_scopes
):
if kind == 'users':
orm_obj = create_user_with_scopes('self').orm_user
else:
orm_obj = create_service_with_scopes('self')
# test expansion of user/service scopes
scopes = get_scopes_for(orm_obj)
assert isinstance(scopes, frozenset)
assert bool(scopes) == has_user_scopes
# test expansion of token scopes
orm_obj.new_api_token()
token_scopes = get_scopes_for(orm_obj.api_tokens[0])
assert bool(token_scopes) == has_user_scopes
async def test_metascope_inherit_expansion(app, create_user_with_scopes):
user = create_user_with_scopes('self')
user.new_api_token(scopes=["inherit"])
token = user.api_tokens[0]
# Check 'inherit' expansion
token_scope_set = get_scopes_for(token)
user_scope_set = get_scopes_for(user)
assert user_scope_set == token_scope_set
# Check no roles means no permissions
token.scopes = []
app.db.commit()
token_scope_set = get_scopes_for(token)
assert isinstance(token_scope_set, frozenset)
assert token_scope_set.issubset(identify_scopes(user.orm_user))
@mark.parametrize(
"scopes, can_stop ,num_servers, keys_in, keys_out",
[
(['read:servers!user=almond'], False, 2, {'name'}, {'state'}),
(['admin:users', 'read:users'], False, 0, set(), set()),
(
['read:servers!group=nuts', 'servers'],
True,
2,
{'name'},
{'state'},
),
(
['admin:server_state', 'read:servers'],
False,
2,
{'name', 'state'},
set(),
),
(
[
'read:servers!server=almond/bianca',
'admin:server_state!server=almond/bianca',
],
False,
1,
{'name', 'state'},
set(),
),
],
)
async def test_server_state_access(
app,
create_user_with_scopes,
create_service_with_scopes,
scopes,
can_stop,
num_servers,
keys_in,
keys_out,
):
with mock.patch.dict(
app.tornado_settings,
{'allow_named_servers': True, 'named_server_limit_per_user': 2},
):
user = create_user_with_scopes('self', name='almond')
group_name = 'nuts'
group = orm.Group.find(app.db, name=group_name)
if not group:
group = orm.Group(name=group_name)
app.db.add(group)
group.users.append(user.orm_user)
app.db.commit()
server_names = ['bianca', 'terry']
for server_name in server_names:
await api_request(
app, 'users', user.name, 'servers', server_name, method='post'
)
service = create_service_with_scopes("read:users:name!user=bianca", *scopes)
api_token = service.new_api_token()
headers = {'Authorization': 'token %s' % api_token}
# can I get the user model?
r = await api_request(app, 'users', user.name, headers=headers)
can_read_user_model = num_servers > 1 or 'read:users' in scopes
if can_read_user_model:
r.raise_for_status()
user_model = r.json()
if num_servers > 1:
assert 'servers' in user_model
server_models = user_model['servers']
assert len(server_models) == num_servers
for server, server_model in server_models.items():
assert keys_in.issubset(server_model)
assert keys_out.isdisjoint(server_model)
else:
assert 'servers' not in user_model
else:
assert r.status_code == 404
r = await api_request(
app,
'users',
user.name,
'servers',
server_names[0],
method='delete',
headers=headers,
)
if can_stop:
assert r.status_code == 204
else:
assert r.status_code == 403
app.db.delete(group)
app.db.commit()
@mark.parametrize(
"name, user_scopes, token_scopes, intersection_scopes",
[
(
'no_filter',
['users:activity'],
['users:activity'],
{'users:activity', 'read:users:activity'},
),
(
'valid_own_filter',
['read:users:activity'],
['read:users:activity!user'],
{'read:users:activity!user=temp_user_1'},
),
(
'valid_other_filter',
['read:users:activity'],
['read:users:activity!user=otheruser'],
{'read:users:activity!user=otheruser'},
),
(
'no_filter_owner_filter',
['read:users:activity!user'],
['read:users:activity'],
{'read:users:activity!user=temp_user_1'},
),
(
'valid_own_filter',
['read:users:activity!user'],
['read:users:activity!user'],
{'read:users:activity!user=temp_user_1'},
),
(
'invalid_filter',
['read:users:activity!user'],
['read:users:activity!user=otheruser'],
set(),
),
(
'subscopes_cross_filter',
['users!user=x'],
['read:users:name'],
{'read:users:name!user=x'},
),
(
'multiple_user_filter',
['users!user=x', 'users!user=y'],
['read:users:name!user=x'],
{'read:users:name!user=x'},
),
(
'no_intersection_group_user',
['users!group=y'],
['users!user=x'],
set(),
),
(
'no_intersection_user_server',
['servers!user=y'],
['servers!server=x'],
set(),
),
(
'users_and_groups_both',
['users!group=x', 'users!user=y'],
['read:users:name!group=x', 'read:users!user=y'],
{
'read:users:name!group=x',
'read:users!user=y',
'read:users:name!user=y',
'read:users:groups!user=y',
'read:users:activity!user=y',
},
),
(
'users_and_groups_user_only',
['users!group=x', 'users!user=y'],
['read:users:name!group=z', 'read:users!user=y'],
{
'read:users!user=y',
'read:users:name!user=y',
'read:users:groups!user=y',
'read:users:activity!user=y',
},
),
],
)
async def test_resolve_token_permissions(
app,
create_user_with_scopes,
create_temp_role,
name,
user_scopes,
token_scopes,
intersection_scopes,
):
orm_user = create_user_with_scopes(*user_scopes).orm_user
# ensure user has full permissions when token is created
# to create tokens with permissions exceeding their owner
roles.grant_role(app.db, orm_user, "admin")
create_temp_role(token_scopes, 'active-posting')
api_token = orm_user.new_api_token(roles=['active-posting'])
orm_api_token = orm.APIToken.find(app.db, token=api_token)
# drop admin so that filtering can be applied
roles.strip_role(app.db, orm_user, "admin")
# get expanded !user filter scopes for check
user_scopes = get_scopes_for(orm_user)
token_scopes = get_scopes_for(orm_api_token)
token_retained_scopes = get_scopes_for(orm_api_token)
assert token_retained_scopes == intersection_scopes
@mark.parametrize(
"scopes, model_keys",
[
(
{'read:services'},
{
'command',
'name',
'kind',
'info',
'display',
'pid',
'admin',
'prefix',
'url',
},
),
(
{'read:roles:services', 'read:services:name'},
{'name', 'kind', 'roles', 'admin'},
),
({'read:services:name'}, {'name', 'kind', 'admin'}),
],
)
async def test_service_model_filtering(
app, scopes, model_keys, create_user_with_scopes, create_service_with_scopes
):
user = create_user_with_scopes(*scopes, name='teddy')
service = create_service_with_scopes()
r = await api_request(
app, 'services', service.name, headers=auth_header(app.db, user.name)
)
assert r.status_code == 200
assert model_keys == r.json().keys()
@mark.parametrize(
"scopes, model_keys",
[
(
{'read:groups'},
{
'name',
'kind',
'users',
'properties',
},
),
(
{'read:roles:groups', 'read:groups:name'},
{'name', 'kind', 'roles'},
),
({'read:groups:name'}, {'name', 'kind'}),
],
)
async def test_group_model_filtering(
app, scopes, model_keys, create_user_with_scopes, create_service_with_scopes
):
user = create_user_with_scopes(*scopes, name='teddy')
group_name = 'baker_street'
group = orm.Group.find(app.db, name=group_name)
if not group:
group = orm.Group(name=group_name)
app.db.add(group)
app.db.commit()
r = await api_request(
app, 'groups', group_name, headers=auth_header(app.db, user.name)
)
assert r.status_code == 200
assert model_keys == r.json().keys()
app.db.delete(group)
app.db.commit()
async def test_roles_access(app, create_service_with_scopes, create_user_with_scopes):
user = add_user(app.db, name='miranda')
read_user = create_user_with_scopes('read:roles:users')
r = await api_request(
app, 'users', user.name, headers=auth_header(app.db, read_user.name)
)
assert r.status_code == 200
model_keys = {'kind', 'name', 'roles', 'admin'}
assert model_keys == r.json().keys()
@pytest.mark.parametrize(
"left, right, expected, should_warn",
[
(set(), set(), set(), False),
(set(), {"users"}, set(), False),
# no warning if users and groups only on the same side
(
{"users!user=x", "users!group=y"},
set(),
set(),
False,
),
# no warning if users are on both sizes
(
{"users!user=x", "users!user=y", "users!group=y"},
{"users!user=x"},
{"users!user=x"},
False,
),
# no warning if users and groups are both defined
# on both sides
(
{"users!user=x", "users!group=y"},
{"users!user=x", "users!group=y", "users!user=z"},
{"users!user=x", "users!group=y"},
False,
),
# warn if there's a user on one side and a group on the other
# which *may* intersect
(
{"users!group=y", "users!user=z"},
{"users!user=x"},
set(),
True,
),
# same for group->server
(
{"users!group=y", "users!user=z"},
{"users!server=x/y"},
set(),
True,
),
# this one actually shouldn't warn because server=x/y is under user=x,
# but we don't need to overcomplicate things just for a warning
(
{"users!group=y", "users!user=x"},
{"users!server=x/y"},
{"users!server=x/y"},
True,
),
# resolves server under user, without warning
(
{"read:servers!user=abc"},
{"read:servers!server=abc/xyz"},
{"read:servers!server=abc/xyz"},
False,
),
# user->server, no match
(
{"read:servers!user=abc"},
{"read:servers!server=abcd/xyz"},
set(),
False,
),
],
)
def test_intersect_expanded_scopes(left, right, expected, should_warn, recwarn):
# run every test in both directions, to ensure symmetry of the inputs
for a, b in [(left, right), (right, left)]:
intersection = _intersect_expanded_scopes(set(left), set(right))
assert intersection == set(expected)
if should_warn:
assert len(recwarn) == 1
else:
assert len(recwarn) == 0
@pytest.mark.parametrize(
"left, right, expected, groups",
[
(
["users!group=gx"],
["users!user=ux"],
["users!user=ux"],
{"gx": ["ux"]},
),
(
["read:users!group=gx"],
["read:users!user=nosuchuser"],
[],
{},
),
(
["read:users!group=gx"],
["read:users!server=nosuchuser/server"],
[],
{},
),
(
["read:users!group=gx"],
["read:users!server=ux/server"],
["read:users!server=ux/server"],
{"gx": ["ux"]},
),
(
["read:users!group=gx"],
["read:users!server=ux/server", "read:users!user=uy"],
["read:users!server=ux/server"],
{"gx": ["ux"], "gy": ["uy"]},
),
(
["read:users!group=gy"],
["read:users!server=ux/server", "read:users!user=uy"],
["read:users!user=uy"],
{"gx": ["ux"], "gy": ["uy"]},
),
(
# make sure the group > user > server hierarchy
# is managed
["read:servers!server=ux/server", "read:servers!group=gy"],
["read:servers!server=uy/server", "read:servers!user=ux"],
["read:servers!server=ux/server", "read:servers!server=uy/server"],
{"gx": ["ux"], "gy": ["uy"]},
),
(
# make sure the group > user hierarchy
# is managed
["read:servers!user=ux", "read:servers!group=gy"],
["read:servers!user=uy", "read:servers!group=gx"],
["read:servers!user=ux", "read:servers!user=uy"],
{"gx": ["ux"], "gy": ["uy"]},
),
],
)
def test_intersect_groups(request, db, left, right, expected, groups):
if isinstance(left, str):
left = {left}
if isinstance(right, str):
right = {right}
# if we have a db connection, we can actually resolve
created = []
for groupname, members in groups.items():
group = orm.Group.find(db, name=groupname)
if not group:
group = orm.Group(name=groupname)
db.add(group)
created.append(group)
db.commit()
for username in members:
user = orm.User.find(db, name=username)
if user is None:
user = orm.User(name=username)
db.add(user)
created.append(user)
user.groups.append(group)
db.commit()
def _cleanup():
for obj in created:
db.delete(obj)
db.commit()
request.addfinalizer(_cleanup)
# run every test in both directions, to ensure symmetry of the inputs
for a, b in [(left, right), (right, left)]:
intersection = _intersect_expanded_scopes(set(left), set(right), db)
assert intersection == set(expected)
@mark.user
@mark.parametrize(
"scopes, expected",
[
("list:users", ['in-1', 'in-2', 'out-1', 'out-2', 'admin', 'user']),
("read:users", 403),
("list:users!server=irrelevant", 403),
("list:users!user=nosuchuser", []),
("list:users!group=nosuchgroup", []),
("list:users!user=out-2", ['out-2']),
("list:users!group=GROUP", ['in-1', 'in-2']),
(
["list:users!group=GROUP", "list:users!user=out-2"],
['in-1', 'in-2', 'out-2'],
),
],
)
async def test_list_users_filter(
app, group, create_service_with_scopes, scopes, expected
):
# create users:
for i in (1, 2):
user = add_user(app.db, app, name=f'in-{i}')
group.users.append(user.orm_user)
add_user(app.db, app, name=f'out-{i}')
app.db.commit()
if isinstance(scopes, str):
scopes = [scopes]
# in-group are in the group
# out-group are not in the group
scopes = [s.replace("GROUP", group.name).replace("IN", "ingroup") for s in scopes]
orm_service = create_service_with_scopes(*scopes)
token = orm_service.new_api_token()
r = await api_request(app, 'users', headers={"Authorization": f"token {token}"})
if isinstance(expected, int):
assert r.status_code == expected
return
r.raise_for_status()
expected_models = [
{
'name': name,
'admin': name == 'admin',
'kind': 'user',
}
for name in sorted(expected)
]
assert sorted(r.json(), key=itemgetter('name')) == expected_models
@mark.group
@mark.parametrize(
"scopes, expected",
[
("list:groups", ['group1', 'group2', 'group3']),
("read:groups", 403),
("list:groups!user=irrelevant", 403),
("list:groups!group=nosuchgroup", []),
("list:groups!group=group1", ['group1']),
(
["list:groups!group=group1", "list:groups!group=group2"],
['group1', 'group2'],
),
(
# prefix match shouldn't match!
"list:groups!group=group",
[],
),
],
)
async def test_list_groups_filter(
request, app, create_service_with_scopes, scopes, expected
):
# create groups:
groups = []
for i in (1, 2, 3):
group = orm.Group(name=f'group{i}')
groups.append(group)
app.db.add(group)
app.db.commit()
def cleanup_groups():
for g in groups:
app.db.delete(g)
app.db.commit()
request.addfinalizer(cleanup_groups)
if isinstance(scopes, str):
scopes = [scopes]
orm_service = create_service_with_scopes(*scopes)
token = orm_service.new_api_token()
r = await api_request(app, 'groups', headers={"Authorization": f"token {token}"})
if isinstance(expected, int):
assert r.status_code == expected
return
r.raise_for_status()
expected_models = [
{
'name': name,
'kind': 'group',
}
for name in sorted(expected)
]
assert sorted(r.json(), key=itemgetter('name')) == expected_models
@pytest.mark.parametrize(
"custom_scopes",
[
{"custom:okay": {"description": "simple custom scope"}},
{
"custom:parent": {
"description": "parent",
"subscopes": ["custom:child"],
},
"custom:child": {"description": "child"},
},
{
"custom:extra": {
"description": "I have extra info",
"extra": "warn about me",
}
},
],
)
def test_custom_scopes(preserve_scopes, custom_scopes):
scopes.define_custom_scopes(custom_scopes)
for name, scope_def in custom_scopes.items():
assert name in scopes.scope_definitions
assert scopes.scope_definitions[name] == scope_def
# make sure describe works after registering custom scopes
scopes.describe_raw_scopes(list(custom_scopes.keys()))
@pytest.mark.parametrize(
"custom_scopes",
[
{
"read:users": {
"description": "Can't override",
},
},
{
"custom:empty": {},
},
{
"notcustom:prefix": {"descroption": "bad prefix"},
},
{
"custom:!illegal": {"descroption": "bad character"},
},
{
"custom:badsubscope": {
"description": "non-custom subscope not allowed",
"subscopes": [
"read:users",
],
},
},
{
"custom:nosubscope": {
"description": "subscope not defined",
"subscopes": [
"custom:undefined",
],
},
},
{
"custom:badsubscope": {
"description": "subscope not a list",
"subscopes": "custom:notalist",
},
"custom:notalist": {
"description": "the subscope",
},
},
],
)
def test_custom_scopes_bad(preserve_scopes, custom_scopes):
with pytest.raises(ValueError):
scopes.define_custom_scopes(custom_scopes)
assert scopes.scope_definitions == preserve_scopes
async def test_user_filter_expansion(app, create_user_with_scopes):
scope_list = _expand_self_scope('ignored')
# turn !user=ignored into !user
# Mimic the role 'self' based on '!user' filter for tokens
scope_list = [scope.partition("=")[0] for scope in scope_list]
user = create_user_with_scopes('self')
user.new_api_token(scopes=scope_list)
user.new_api_token()
manual_scope_set = get_scopes_for(user.api_tokens[0])
auto_scope_set = get_scopes_for(user.api_tokens[1])
assert manual_scope_set == auto_scope_set
@pytest.mark.parametrize(
"scopes, expected",
[
("read:users:name!user", ["read:users:name!user={user}"]),
(
"users:activity!user",
[
"read:users:activity!user={user}",
"users:activity!user={user}",
],
),
("self", ["*"]),
(["access:services", "access:services!service=x"], ["access:services"]),
("access:services!service", ["access:services!service={service}"]),
("access:servers!server", ["access:servers!server={server}"]),
],
)
def test_expand_scopes(app, user, scopes, expected, mockservice_external):
if isinstance(scopes, str):
scopes = [scopes]
db = app.db
service = mockservice_external
spawner_name = "salmon"
server_name = f"{user.name}/{spawner_name}"
if 'server' in str(scopes):
oauth_client = orm.OAuthClient()
db.add(oauth_client)
spawner = user.spawners[spawner_name]
spawner.orm_spawner.oauth_client = oauth_client
db.commit()
assert oauth_client.spawner is spawner.orm_spawner
else:
oauth_client = service.oauth_client
assert oauth_client is not None
def format_scopes(scopes):
return {
s.format(service=service.name, server=server_name, user=user.name)
for s in scopes
}
scopes = format_scopes(scopes)
expected = format_scopes(expected)
if "*" in expected:
expected.remove("*")
expected.update(_expand_self_scope(user.name))
expanded = expand_scopes(scopes, owner=user.orm_user, oauth_client=oauth_client)
assert isinstance(expanded, frozenset)
assert sorted(expanded) == sorted(expected)
@pytest.mark.parametrize(
"requested_scopes, have_scopes, expected_allowed, expected_disallowed",
[
(
["read:users:name!user"],
["read:users:name!user={user}"],
["read:users:name!user"],
[],
),
(
["read:servers!server"],
["read:servers!user"],
["read:servers!server"],
[],
),
(
["read:servers!server={server}"],
["read:servers"],
["read:servers!server={server}"],
[],
),
(
["admin:servers!server"],
["read:servers"],
["read:servers!server={server}"],
["admin:servers!server"],
),
(
["admin:servers", "read:users"],
["read:users"],
["read:users"],
["admin:servers"],
),
],
)
def test_resolve_requested_scopes(
app,
user,
group,
requested_scopes,
have_scopes,
expected_allowed,
expected_disallowed,
mockservice_external,
):
if isinstance(requested_scopes, str):
requested_scopes = [requested_scopes]
db = app.db
service = mockservice_external
spawner_name = "salmon"
server_name = f"{user.name}/{spawner_name}"
if '!server' in str(requested_scopes + have_scopes):
oauth_client = orm.OAuthClient()
db.add(oauth_client)
spawner = user.spawners[spawner_name]
spawner.orm_spawner.oauth_client = oauth_client
db.commit()
assert oauth_client.spawner is spawner.orm_spawner
else:
oauth_client = service.oauth_client
assert oauth_client is not None
def format_scopes(scopes):
return {
s.format(service=service.name, server=server_name, user=user.name)
for s in scopes
}
requested_scopes = format_scopes(requested_scopes)
have_scopes = format_scopes(have_scopes)
expected_allowed = format_scopes(expected_allowed)
expected_disallowed = format_scopes(expected_disallowed)
allowed, disallowed = _resolve_requested_scopes(
requested_scopes,
have_scopes,
user=user.orm_user,
client=oauth_client,
db=db,
)
assert allowed == expected_allowed
assert disallowed == expected_disallowed
@pytest.mark.parametrize(
"scope, have_scopes, ok",
[
# exact matches
("read:users", "read:users", True),
("read:users!user=USER", "read:users!user=USER", True),
("read:servers!server=USER/x", "read:servers!server=USER/x", True),
("read:groups!group=GROUP", "read:groups!group=GROUP", True),
# subscopes
("read:users:name", "read:users", True),
# subscopes with matching filter
("read:users:name!user=USER", "read:users!user=USER", True),
("read:users!user=USER", "read:users!group=GROUP", True),
("read:users!user=USER", "read:users", True),
("read:servers!server=USER/x", "read:servers", True),
("read:servers!server=USER/x", "servers!server=USER/x", True),
("read:servers!server=USER/x", "servers!user=USER", True),
("read:servers!server=USER/x", "servers!group=GROUP", True),
# shouldn't match
("read:users", "read:users!user=USER", False),
("read:users", "read:users!user=USER", False),
("read:users!user=USER", "read:users!user=other", False),
("read:users!user=USER", "read:users!group=other", False),
("read:servers!server=USER/x", "servers!server=other/x", False),
("read:servers!server=USER/x", "servers!user=other", False),
("read:servers!server=USER/x", "servers!group=other", False),
("servers!server=USER/x", "read:servers!server=USER/x", False),
],
)
def test_has_scope(app, user, group, scope, have_scopes, ok):
db = app.db
user.groups.append(group)
db.commit()
def _sub(scope):
return scope.replace("GROUP", group.name).replace("USER", user.name)
scope = _sub(scope)
have_scopes = [_sub(s) for s in have_scopes.split(",")]
parsed_scopes = parse_scopes(expand_scopes(have_scopes))
assert has_scope(scope, parsed_scopes, db=db) == ok
@pytest.mark.parametrize(
"scope, have_scopes, ok",
[
("read:users", "read:users", True),
("read:users", "read:users!user=x", True),
("read:users", "read:groups!user=x", False),
("read:users!user=x", "read:users!group=y", ValueError),
],
)
def test_has_scope_post_filter(scope, have_scopes, ok):
have_scopes = have_scopes.split(",")
if ok not in (True, False):
with pytest.raises(ok):
has_scope(scope, have_scopes, post_filter=True)
else:
assert has_scope(scope, have_scopes, post_filter=True) == ok