mirror of
https://github.com/jupyterhub/jupyterhub.git
synced 2025-10-17 15:03:02 +00:00
Merge pull request #3397 from 0mar/roles_interface
Refactor scopes tests
This commit is contained in:
@@ -1927,7 +1927,7 @@ class JupyterHub(Application):
|
||||
# make sure that on no admin situation, all roles are reset
|
||||
admin_role = orm.Role.find(db, name='admin')
|
||||
if not admin_role.users:
|
||||
app_log.info(
|
||||
app_log.warning(
|
||||
"No admin users found; assuming hub upgrade. Initializing default roles for all entities"
|
||||
)
|
||||
# make sure all users, services and tokens have at least one role (update with default)
|
||||
|
@@ -371,7 +371,7 @@ def _token_allowed_role(db, token, role):
|
||||
raw_owner_scopes = {
|
||||
scope.split('!', 1)[0] if '!' in scope else scope for scope in owner_scopes
|
||||
}
|
||||
if (raw_extra_scopes).issubset(raw_owner_scopes):
|
||||
if raw_extra_scopes.issubset(raw_owner_scopes):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
@@ -89,6 +89,10 @@ class MockAPIHandler:
|
||||
self.request = mock.Mock(spec=HTTPServerRequest)
|
||||
self.request.path = '/path'
|
||||
|
||||
def set_scopes(self, *scopes):
|
||||
self.raw_scopes = set(scopes)
|
||||
self.parsed_scopes = parse_scopes(self.raw_scopes)
|
||||
|
||||
@needs_scope('users')
|
||||
def user_thing(self, user_name):
|
||||
return True
|
||||
@@ -116,6 +120,12 @@ class MockAPIHandler:
|
||||
return True
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_handler():
|
||||
obj = MockAPIHandler()
|
||||
return obj
|
||||
|
||||
|
||||
@mark.parametrize(
|
||||
"scopes, method, arguments, is_allowed",
|
||||
[
|
||||
@@ -169,12 +179,10 @@ class MockAPIHandler:
|
||||
(['users!user=gob'], 'other_thing', ('maeby',), True),
|
||||
],
|
||||
)
|
||||
def test_scope_method_access(scopes, method, arguments, is_allowed):
|
||||
obj = MockAPIHandler()
|
||||
obj.current_user = mock.Mock(name=arguments[0])
|
||||
obj.raw_scopes = set(scopes)
|
||||
obj.parsed_scopes = parse_scopes(obj.raw_scopes)
|
||||
api_call = getattr(obj, method)
|
||||
def test_scope_method_access(mock_handler, scopes, method, arguments, is_allowed):
|
||||
mock_handler.current_user = mock.Mock(name=arguments[0])
|
||||
mock_handler.set_scopes(*scopes)
|
||||
api_call = getattr(mock_handler, method)
|
||||
if is_allowed:
|
||||
assert api_call(*arguments)
|
||||
else:
|
||||
@@ -182,31 +190,18 @@ def test_scope_method_access(scopes, method, arguments, is_allowed):
|
||||
api_call(*arguments)
|
||||
|
||||
|
||||
def test_double_scoped_method_succeeds():
|
||||
obj = MockAPIHandler()
|
||||
obj.current_user = mock.Mock(name='lucille')
|
||||
obj.raw_scopes = {'users', 'read:services'}
|
||||
obj.parsed_scopes = parse_scopes(obj.raw_scopes)
|
||||
assert obj.secret_thing()
|
||||
def test_double_scoped_method_succeeds(mock_handler):
|
||||
mock_handler.current_user = mock.Mock(name='lucille')
|
||||
mock_handler.set_scopes('users', 'read:services')
|
||||
mock_handler.parsed_scopes = parse_scopes(mock_handler.raw_scopes)
|
||||
assert mock_handler.secret_thing()
|
||||
|
||||
|
||||
def test_double_scoped_method_denials():
|
||||
obj = MockAPIHandler()
|
||||
obj.current_user = mock.Mock(name='lucille2')
|
||||
obj.raw_scopes = {'users', 'read:groups'}
|
||||
obj.parsed_scopes = parse_scopes(obj.raw_scopes)
|
||||
def test_double_scoped_method_denials(mock_handler):
|
||||
mock_handler.current_user = mock.Mock(name='lucille2')
|
||||
mock_handler.set_scopes('users', 'read:groups')
|
||||
with pytest.raises(web.HTTPError):
|
||||
obj.secret_thing()
|
||||
|
||||
|
||||
def generate_test_role(user_name, scopes, role_name='test'):
|
||||
role = {
|
||||
'name': role_name,
|
||||
'description': '',
|
||||
'users': [user_name],
|
||||
'scopes': scopes,
|
||||
}
|
||||
return role
|
||||
mock_handler.secret_thing()
|
||||
|
||||
|
||||
@mark.parametrize(
|
||||
@@ -239,7 +234,6 @@ async def test_expand_groups(app, user_name, in_group, status_code):
|
||||
app.db.add(group)
|
||||
if in_group and user not in group.users:
|
||||
group.users.append(user)
|
||||
kind = 'users'
|
||||
roles.update_roles(app.db, user, roles=['test'])
|
||||
roles.strip_role(app.db, user, 'user')
|
||||
app.db.commit()
|
||||
@@ -247,6 +241,8 @@ async def test_expand_groups(app, user_name, in_group, status_code):
|
||||
app, 'users', user_name, headers=auth_header(app.db, user_name)
|
||||
)
|
||||
assert r.status_code == status_code
|
||||
app.db.delete(group)
|
||||
app.db.commit()
|
||||
|
||||
|
||||
async def test_by_fake_user(app):
|
||||
@@ -262,79 +258,127 @@ async def test_by_fake_user(app):
|
||||
err_message = "No access to resources or resources not found"
|
||||
|
||||
|
||||
async def test_request_fake_user(app):
|
||||
user_name = 'buster'
|
||||
fake_user = 'annyong'
|
||||
user = add_user(app.db, name=user_name)
|
||||
test_role = generate_test_role(user_name, ['read:users!group=stuff'])
|
||||
roles.create_role(app.db, test_role)
|
||||
roles.grant_role(app.db, entity=user, rolename='test')
|
||||
@pytest.fixture
|
||||
def create_temp_role(app):
|
||||
"""Generate a temporary role with certain scopes.
|
||||
Convenience function that provides setup, database handling and teardown"""
|
||||
temp_roles = []
|
||||
index = [1]
|
||||
|
||||
def temp_role_creator(scopes, role_name=None):
|
||||
if not role_name:
|
||||
role_name = f'temp_role_{index[0]}'
|
||||
index[0] += 1
|
||||
temp_role = orm.Role(name=role_name, scopes=list(scopes))
|
||||
temp_roles.append(temp_role)
|
||||
app.db.add(temp_role)
|
||||
app.db.commit()
|
||||
return temp_role
|
||||
|
||||
yield temp_role_creator
|
||||
for role in temp_roles:
|
||||
app.db.delete(role)
|
||||
app.db.commit()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def create_user_with_scopes(app, create_temp_role):
|
||||
"""Generate a temporary user with specific scopes.
|
||||
Convenience function that provides setup, database handling and teardown"""
|
||||
temp_users = []
|
||||
counter = 0
|
||||
get_role = create_temp_role
|
||||
|
||||
def temp_user_creator(*scopes):
|
||||
nonlocal counter
|
||||
counter += 1
|
||||
name = f"temp_user_{counter}"
|
||||
role = get_role(scopes)
|
||||
orm_user = orm.User(name=name)
|
||||
app.db.add(orm_user)
|
||||
app.db.commit()
|
||||
temp_users.append(orm_user)
|
||||
roles.update_roles(app.db, orm_user, roles=[role.name])
|
||||
return app.users[orm_user.id]
|
||||
|
||||
yield temp_user_creator
|
||||
for user in temp_users:
|
||||
app.users.delete(user)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def create_service_with_scopes(app, create_temp_role):
|
||||
"""Generate a temporary service with specific scopes.
|
||||
Convenience function that provides setup, database handling and teardown"""
|
||||
temp_service = []
|
||||
counter = 0
|
||||
role_function = create_temp_role
|
||||
|
||||
def temp_service_creator(*scopes):
|
||||
nonlocal counter
|
||||
counter += 1
|
||||
name = f"temp_service_{counter}"
|
||||
role = role_function(scopes)
|
||||
app.services.append({'name': name})
|
||||
app.init_services()
|
||||
orm_service = orm.Service.find(app.db, name)
|
||||
app.db.commit()
|
||||
roles.update_roles(app.db, orm_service, roles=[role.name])
|
||||
return orm_service
|
||||
|
||||
yield temp_service_creator
|
||||
for service in temp_service:
|
||||
app.db.delete(service)
|
||||
app.db.commit()
|
||||
|
||||
|
||||
async def test_request_fake_user(app, create_user_with_scopes):
|
||||
fake_user = 'annyong'
|
||||
user = create_user_with_scopes('read:users!group=stuff')
|
||||
r = await api_request(
|
||||
app, 'users', fake_user, headers=auth_header(app.db, user_name)
|
||||
app, 'users', fake_user, headers=auth_header(app.db, user.name)
|
||||
)
|
||||
assert r.status_code == 404
|
||||
# Consistency between no user and user not accessible
|
||||
assert r.json()['message'] == err_message
|
||||
|
||||
|
||||
async def test_refuse_exceeding_token_permissions(app):
|
||||
user_name = 'abed'
|
||||
user = add_user(app.db, name=user_name)
|
||||
add_user(app.db, name='user')
|
||||
api_token = user.new_api_token()
|
||||
exceeding_role = generate_test_role(user_name, ['read:users'], 'exceeding_role')
|
||||
roles.create_role(app.db, exceeding_role)
|
||||
roles.grant_role(app.db, entity=user.api_tokens[0], rolename='exceeding_role')
|
||||
app.db.commit()
|
||||
headers = {'Authorization': 'token %s' % api_token}
|
||||
r = await api_request(app, 'users', headers=headers)
|
||||
assert r.status_code == 200
|
||||
result_names = {user['name'] for user in r.json()}
|
||||
assert result_names == {user_name}
|
||||
async def test_refuse_exceeding_token_permissions(
|
||||
app, create_user_with_scopes, create_temp_role
|
||||
):
|
||||
user = create_user_with_scopes('self')
|
||||
user.new_api_token()
|
||||
create_temp_role(['admin:users'], 'exceeding_role')
|
||||
with pytest.raises(ValueError):
|
||||
roles.update_roles(app.db, entity=user.api_tokens[0], roles=['exceeding_role'])
|
||||
|
||||
|
||||
async def test_exceeding_user_permissions(app):
|
||||
user_name = 'abed'
|
||||
user = add_user(app.db, name=user_name)
|
||||
add_user(app.db, name='user')
|
||||
async def test_exceeding_user_permissions(
|
||||
app, create_user_with_scopes, create_temp_role
|
||||
):
|
||||
user = create_user_with_scopes('read:users:groups')
|
||||
api_token = user.new_api_token()
|
||||
orm_api_token = orm.APIToken.find(app.db, token=api_token)
|
||||
reader_role = generate_test_role(user_name, ['read:users'], 'reader_role')
|
||||
subreader_role = generate_test_role(
|
||||
user_name, ['read:users:groups'], 'subreader_role'
|
||||
)
|
||||
roles.create_role(app.db, reader_role)
|
||||
roles.create_role(app.db, subreader_role)
|
||||
app.db.commit()
|
||||
roles.update_roles(app.db, user, roles=['reader_role'])
|
||||
roles.update_roles(app.db, orm_api_token, roles=['subreader_role'])
|
||||
orm_api_token.roles.remove(orm.Role.find(app.db, name='token'))
|
||||
app.db.commit()
|
||||
|
||||
create_temp_role(['read:users'], 'reader_role')
|
||||
roles.grant_role(app.db, orm_api_token, rolename='reader_role')
|
||||
headers = {'Authorization': 'token %s' % api_token}
|
||||
r = await api_request(app, 'users', headers=headers)
|
||||
assert r.status_code == 200
|
||||
keys = {key for user in r.json() for key in user.keys()}
|
||||
assert 'groups' in keys
|
||||
assert 'last_activity' not in keys
|
||||
roles.strip_role(app.db, user, 'reader_role')
|
||||
|
||||
|
||||
async def test_user_service_separation(app, mockservice_url):
|
||||
async def test_user_service_separation(app, mockservice_url, create_temp_role):
|
||||
name = mockservice_url.name
|
||||
user = add_user(app.db, name=name)
|
||||
|
||||
reader_role = generate_test_role(name, ['read:users'], 'reader_role')
|
||||
subreader_role = generate_test_role(name, ['read:users:groups'], 'subreader_role')
|
||||
roles.create_role(app.db, reader_role)
|
||||
roles.create_role(app.db, subreader_role)
|
||||
app.db.commit()
|
||||
create_temp_role(['read:users'], 'reader_role')
|
||||
create_temp_role(['read:users:groups'], 'subreader_role')
|
||||
roles.update_roles(app.db, user, roles=['subreader_role'])
|
||||
roles.update_roles(app.db, mockservice_url.orm, roles=['reader_role'])
|
||||
user.roles.remove(orm.Role.find(app.db, name='user'))
|
||||
api_token = user.new_api_token()
|
||||
app.db.commit()
|
||||
headers = {'Authorization': 'token %s' % api_token}
|
||||
r = await api_request(app, 'users', headers=headers)
|
||||
assert r.status_code == 200
|
||||
@@ -343,33 +387,22 @@ async def test_user_service_separation(app, mockservice_url):
|
||||
assert 'last_activity' not in keys
|
||||
|
||||
|
||||
async def test_request_user_outside_group(app):
|
||||
user_name = 'buster'
|
||||
fake_user = 'hello'
|
||||
user = add_user(app.db, name=user_name)
|
||||
add_user(app.db, name=fake_user)
|
||||
test_role = generate_test_role(user_name, ['read:users!group=stuff'])
|
||||
roles.create_role(app.db, test_role)
|
||||
roles.grant_role(app.db, entity=user, rolename='test')
|
||||
roles.strip_role(app.db, entity=user, rolename='user')
|
||||
app.db.commit()
|
||||
async def test_request_user_outside_group(app, create_user_with_scopes):
|
||||
outside_user = 'hello'
|
||||
user = create_user_with_scopes('read:users!group=stuff')
|
||||
add_user(app.db, name=outside_user)
|
||||
r = await api_request(
|
||||
app, 'users', fake_user, headers=auth_header(app.db, user_name)
|
||||
app, 'users', outside_user, headers=auth_header(app.db, user.name)
|
||||
)
|
||||
assert r.status_code == 404
|
||||
# Consistency between no user and user not accessible
|
||||
assert r.json()['message'] == err_message
|
||||
|
||||
|
||||
async def test_user_filter(app):
|
||||
user_name = 'rita'
|
||||
user = add_user(app.db, name=user_name)
|
||||
app.db.commit()
|
||||
scopes = ['read:users!user=lindsay', 'read:users!user=gob', 'read:users!user=oscar']
|
||||
test_role = generate_test_role(user, scopes)
|
||||
roles.create_role(app.db, test_role)
|
||||
roles.grant_role(app.db, entity=user, rolename='test')
|
||||
roles.strip_role(app.db, entity=user, rolename='user')
|
||||
async def test_user_filter(app, create_user_with_scopes):
|
||||
user = create_user_with_scopes(
|
||||
'read:users!user=lindsay', 'read:users!user=gob', 'read:users!user=oscar'
|
||||
)
|
||||
name_in_scope = {'lindsay', 'oscar', 'gob'}
|
||||
outside_scope = {'maeby', 'marta'}
|
||||
group_name = 'bluth'
|
||||
@@ -378,17 +411,19 @@ async def test_user_filter(app):
|
||||
group = orm.Group(name=group_name)
|
||||
app.db.add(group)
|
||||
for name in name_in_scope | outside_scope:
|
||||
user = add_user(app.db, name=name)
|
||||
group_user = add_user(app.db, name=name)
|
||||
if name not in group.users:
|
||||
group.users.append(user)
|
||||
group.users.append(group_user)
|
||||
app.db.commit()
|
||||
r = await api_request(app, 'users', headers=auth_header(app.db, user_name))
|
||||
r = await api_request(app, 'users', headers=auth_header(app.db, user.name))
|
||||
assert r.status_code == 200
|
||||
result_names = {user['name'] for user in r.json()}
|
||||
assert result_names == name_in_scope
|
||||
app.db.delete(group)
|
||||
app.db.commit()
|
||||
|
||||
|
||||
async def test_service_filter(app):
|
||||
async def test_service_filter(app, create_user_with_scopes):
|
||||
services = [
|
||||
{'name': 'cull_idle', 'api_token': 'some-token'},
|
||||
{'name': 'user_service', 'api_token': 'some-other-token'},
|
||||
@@ -396,124 +431,90 @@ async def test_service_filter(app):
|
||||
for service in services:
|
||||
app.services.append(service)
|
||||
app.init_services()
|
||||
user_name = 'buster'
|
||||
user = add_user(app.db, name=user_name)
|
||||
app.db.commit()
|
||||
test_role = generate_test_role(user, ['read:services!service=cull_idle'])
|
||||
roles.create_role(app.db, test_role)
|
||||
roles.grant_role(app.db, entity=user, rolename='test')
|
||||
r = await api_request(app, 'services', headers=auth_header(app.db, user_name))
|
||||
user = create_user_with_scopes('read:services!service=cull_idle')
|
||||
r = await api_request(app, 'services', headers=auth_header(app.db, user.name))
|
||||
assert r.status_code == 200
|
||||
service_names = set(r.json().keys())
|
||||
assert service_names == {'cull_idle'}
|
||||
|
||||
|
||||
async def test_user_filter_with_group(app):
|
||||
# Move role setup to setup method?
|
||||
user_name = 'sally'
|
||||
user = add_user(app.db, name=user_name)
|
||||
external_user_name = 'britta'
|
||||
add_user(app.db, name=external_user_name)
|
||||
test_role = generate_test_role(user_name, ['read:users!group=sitwell'])
|
||||
roles.create_role(app.db, test_role)
|
||||
roles.grant_role(app.db, entity=user, rolename='test')
|
||||
|
||||
name_set = {'sally', 'stan'}
|
||||
async def test_user_filter_with_group(app, create_user_with_scopes):
|
||||
group_name = 'sitwell'
|
||||
user1 = create_user_with_scopes(f'read:users!group={group_name}')
|
||||
user2 = create_user_with_scopes('self')
|
||||
external_user = create_user_with_scopes('self')
|
||||
name_set = {user1.name, user2.name}
|
||||
group = orm.Group.find(app.db, name=group_name)
|
||||
if not group:
|
||||
group = orm.Group(name=group_name)
|
||||
app.db.add(group)
|
||||
for name in name_set:
|
||||
user = add_user(app.db, name=name)
|
||||
if name not in group.users:
|
||||
group.users.append(user)
|
||||
for user in {user1, user2}:
|
||||
group.users.append(user)
|
||||
app.db.commit()
|
||||
|
||||
r = await api_request(app, 'users', headers=auth_header(app.db, user_name))
|
||||
r = await api_request(app, 'users', headers=auth_header(app.db, user1.name))
|
||||
assert r.status_code == 200
|
||||
result_names = {user['name'] for user in r.json()}
|
||||
assert result_names == name_set
|
||||
assert external_user_name not in result_names
|
||||
assert external_user.name not in result_names
|
||||
app.db.delete(group)
|
||||
app.db.commit()
|
||||
|
||||
|
||||
async def test_group_scope_filter(app):
|
||||
user_name = 'rollerblade'
|
||||
user = add_user(app.db, name=user_name)
|
||||
scopes = ['read:groups!group=sitwell', 'read:groups!group=bluth']
|
||||
test_role = generate_test_role(user_name, scopes)
|
||||
roles.create_role(app.db, test_role)
|
||||
roles.grant_role(app.db, entity=user, rolename='test')
|
||||
|
||||
group_set = {'sitwell', 'bluth', 'austero'}
|
||||
for group_name in group_set:
|
||||
async def test_group_scope_filter(app, create_user_with_scopes):
|
||||
in_groups = {'sitwell', 'bluth'}
|
||||
out_groups = {'austero'}
|
||||
user = create_user_with_scopes(
|
||||
*(f'read:groups!group={group}' for group in in_groups)
|
||||
)
|
||||
for group_name in in_groups | out_groups:
|
||||
group = orm.Group.find(app.db, name=group_name)
|
||||
if not group:
|
||||
group = orm.Group(name=group_name)
|
||||
app.db.add(group)
|
||||
app.db.commit()
|
||||
r = await api_request(app, 'groups', headers=auth_header(app.db, user_name))
|
||||
r = await api_request(app, 'groups', headers=auth_header(app.db, user.name))
|
||||
assert r.status_code == 200
|
||||
result_names = {user['name'] for user in r.json()}
|
||||
assert result_names == {'sitwell', 'bluth'}
|
||||
|
||||
|
||||
async def test_vertical_filter(app):
|
||||
user_name = 'lindsey'
|
||||
user = add_user(app.db, name=user_name)
|
||||
test_role = generate_test_role(user_name, ['read:users:name'])
|
||||
roles.create_role(app.db, test_role)
|
||||
roles.grant_role(app.db, entity=user, rolename='test')
|
||||
roles.strip_role(app.db, entity=user, rolename='user')
|
||||
assert result_names == in_groups
|
||||
for group_name in in_groups | out_groups:
|
||||
group = orm.Group.find(app.db, name=group_name)
|
||||
app.db.delete(group)
|
||||
app.db.commit()
|
||||
|
||||
r = await api_request(app, 'users', headers=auth_header(app.db, user_name))
|
||||
|
||||
async def test_vertical_filter(app, create_user_with_scopes):
|
||||
user = create_user_with_scopes('read:users:name')
|
||||
r = await api_request(app, 'users', headers=auth_header(app.db, user.name))
|
||||
assert r.status_code == 200
|
||||
allowed_keys = {'name', 'kind'}
|
||||
assert set([key for user in r.json() for key in user.keys()]) == allowed_keys
|
||||
|
||||
|
||||
async def test_stacked_vertical_filter(app):
|
||||
user_name = 'user'
|
||||
user = add_user(app.db, name=user_name)
|
||||
test_role = generate_test_role(
|
||||
user_name, ['read:users:activity', 'read:users:servers']
|
||||
)
|
||||
roles.create_role(app.db, test_role)
|
||||
roles.grant_role(app.db, entity=user, rolename='test')
|
||||
roles.strip_role(app.db, entity=user, rolename='user')
|
||||
app.db.commit()
|
||||
|
||||
r = await api_request(app, 'users', headers=auth_header(app.db, user_name))
|
||||
async def test_stacked_vertical_filter(app, create_user_with_scopes):
|
||||
user = create_user_with_scopes('read:users:activity', 'read:users:servers')
|
||||
r = await api_request(app, 'users', headers=auth_header(app.db, user.name))
|
||||
assert r.status_code == 200
|
||||
allowed_keys = {'name', 'kind', 'servers', 'last_activity'}
|
||||
result_model = set([key for user in r.json() for key in user.keys()])
|
||||
assert result_model == allowed_keys
|
||||
|
||||
|
||||
async def test_cross_filter(app):
|
||||
user_name = 'abed'
|
||||
user = add_user(app.db, name=user_name)
|
||||
test_role = generate_test_role(
|
||||
user_name, ['read:users:activity', 'read:users!user=abed']
|
||||
)
|
||||
roles.create_role(app.db, test_role)
|
||||
roles.grant_role(app.db, entity=user, rolename='test')
|
||||
roles.strip_role(app.db, entity=user, rolename='user')
|
||||
app.db.commit()
|
||||
async def test_cross_filter(app, create_user_with_scopes):
|
||||
user = create_user_with_scopes('read:users:activity', 'self')
|
||||
new_users = {'britta', 'jeff', 'annie'}
|
||||
for new_user_name in new_users:
|
||||
add_user(app.db, name=new_user_name)
|
||||
app.db.commit()
|
||||
r = await api_request(app, 'users', headers=auth_header(app.db, user_name))
|
||||
r = await api_request(app, 'users', headers=auth_header(app.db, user.name))
|
||||
assert r.status_code == 200
|
||||
restricted_keys = {'name', 'kind', 'last_activity'}
|
||||
key_in_full_model = 'created'
|
||||
for user in r.json():
|
||||
if user['name'] == user_name:
|
||||
assert key_in_full_model in user
|
||||
for model_user in r.json():
|
||||
if model_user['name'] == user.name:
|
||||
assert key_in_full_model in model_user
|
||||
else:
|
||||
assert set(user.keys()) == restricted_keys
|
||||
assert set(model_user.keys()) == restricted_keys
|
||||
|
||||
|
||||
@mark.parametrize(
|
||||
@@ -523,13 +524,13 @@ async def test_cross_filter(app):
|
||||
('services', False),
|
||||
],
|
||||
)
|
||||
async def test_metascope_self_expansion(app, kind, has_user_scopes):
|
||||
Class = orm.get_class(kind)
|
||||
orm_obj = Class(name=f'test_{kind}')
|
||||
app.db.add(orm_obj)
|
||||
app.db.commit()
|
||||
test_role = orm.Role(name='test_role', scopes=['self'])
|
||||
orm_obj.roles.append(test_role)
|
||||
async def test_metascope_self_expansion(
|
||||
app, kind, has_user_scopes, create_user_with_scopes, create_service_with_scopes
|
||||
):
|
||||
if kind == 'users':
|
||||
orm_obj = create_user_with_scopes('self')
|
||||
else:
|
||||
orm_obj = create_service_with_scopes('self')
|
||||
# test expansion of user/service scopes
|
||||
scopes = roles.expand_roles_to_scopes(orm_obj)
|
||||
assert bool(scopes) == has_user_scopes
|
||||
@@ -538,14 +539,10 @@ async def test_metascope_self_expansion(app, kind, has_user_scopes):
|
||||
orm_obj.new_api_token()
|
||||
token_scopes = get_scopes_for(orm_obj.api_tokens[0])
|
||||
assert bool(token_scopes) == has_user_scopes
|
||||
app.db.delete(orm_obj)
|
||||
app.db.delete(test_role)
|
||||
app.db.commit()
|
||||
|
||||
|
||||
async def test_metascope_all_expansion(app):
|
||||
user = add_user(app.db, name='user')
|
||||
scope_set = {scope for role in user.roles for scope in role.scopes}
|
||||
async def test_metascope_all_expansion(app, create_user_with_scopes):
|
||||
user = create_user_with_scopes('self')
|
||||
user.new_api_token()
|
||||
token = user.api_tokens[0]
|
||||
# Check 'all' expansion
|
||||
@@ -622,7 +619,7 @@ async def test_server_state_access(
|
||||
service.roles.append(role)
|
||||
app.db.commit()
|
||||
api_token = service.new_api_token()
|
||||
app.init_roles()
|
||||
await app.init_roles()
|
||||
headers = {'Authorization': 'token %s' % api_token}
|
||||
r = await api_request(app, 'users', username, headers=headers)
|
||||
r.raise_for_status()
|
||||
|
Reference in New Issue
Block a user