mirror of
https://github.com/jupyterhub/jupyterhub.git
synced 2025-10-09 19:13:03 +00:00
Merge branch 'rbac' into additional_scopes
This commit is contained in:
@@ -40,6 +40,7 @@ securityDefinitions:
|
||||
read:groups: Read-only access to groups
|
||||
admin:groups: Grants access to create/delete groups
|
||||
read:services: Read-only access to services
|
||||
read:hub: Read-only access to detailed information about JupyterHub
|
||||
proxy: Grants access to proxy's routing table, syncing and notifying about a new proxy
|
||||
shutdown: Grants access to shutdown the Hub
|
||||
security: # global security, do we want to keep only the apiKey (token: []), change to only oauth2 (with scope all) or have both (either can be used)?
|
||||
@@ -70,6 +71,9 @@ paths:
|
||||
/info:
|
||||
get:
|
||||
summary: Get detailed info about JupyterHub
|
||||
security:
|
||||
- oauth2:
|
||||
- read:hub
|
||||
description: |
|
||||
Detailed JupyterHub information, including Python version,
|
||||
JupyterHub's version and executable path,
|
||||
@@ -980,6 +984,11 @@ definitions:
|
||||
description: The names of users who are members of this group
|
||||
items:
|
||||
type: string
|
||||
roles:
|
||||
type: array
|
||||
description: The names of roles this group has
|
||||
items:
|
||||
type: string
|
||||
Service:
|
||||
type: object
|
||||
properties:
|
||||
|
@@ -320,9 +320,15 @@ class APIHandler(BaseHandler):
|
||||
# todo: Remove once we replace admin flag with role check
|
||||
return model
|
||||
|
||||
_user_model_types = {'name': str, 'admin': bool, 'groups': list, 'auth_state': dict}
|
||||
_user_model_types = {
|
||||
'name': str,
|
||||
'admin': bool,
|
||||
'groups': list,
|
||||
'roles': list,
|
||||
'auth_state': dict,
|
||||
}
|
||||
|
||||
_group_model_types = {'name': str, 'users': list}
|
||||
_group_model_types = {'name': str, 'users': list, 'roles': list}
|
||||
|
||||
def _check_model(self, model, model_types, name):
|
||||
"""Check a model provided by a REST API request
|
||||
|
@@ -334,7 +334,8 @@ class JupyterHub(Application):
|
||||
'scopes': ['users', 'groups'],
|
||||
'users': ['cyclops', 'gandalf'],
|
||||
'services': [],
|
||||
'tokens': []
|
||||
'tokens': [],
|
||||
'groups': []
|
||||
}
|
||||
]
|
||||
|
||||
@@ -1852,21 +1853,24 @@ class JupyterHub(Application):
|
||||
async def init_roles(self):
|
||||
"""Load default and predefined roles into the database"""
|
||||
db = self.db
|
||||
role_bearers = ['users', 'services', 'tokens']
|
||||
# tokens are added separately
|
||||
role_bearers = ['users', 'services', 'groups']
|
||||
|
||||
# load default roles
|
||||
self.log.debug('Loading default roles to database')
|
||||
default_roles = roles.get_default_roles()
|
||||
for role in default_roles:
|
||||
roles.create_role(db, role)
|
||||
|
||||
# load predefined roles from config file
|
||||
self.log.debug('Loading predefined roles from config file to database')
|
||||
for predef_role in self.load_roles:
|
||||
roles.create_role(db, predef_role)
|
||||
# add users, services and/or tokens
|
||||
# add users, services, and/or groups,
|
||||
# tokens need to be checked for permissions
|
||||
for bearer in role_bearers:
|
||||
if bearer in predef_role.keys():
|
||||
for bname in predef_role[bearer]:
|
||||
|
||||
if bearer == 'users':
|
||||
bname = self.authenticator.normalize_username(bname)
|
||||
if not (
|
||||
@@ -1883,19 +1887,21 @@ class JupyterHub(Application):
|
||||
roles.grant_role(
|
||||
db, entity=orm_obj, rolename=predef_role['name']
|
||||
)
|
||||
|
||||
# make sure that on no admin situation, all roles are reset
|
||||
admin_role = orm.Role.find(db, name='admin')
|
||||
if not admin_role.users:
|
||||
app_log.info(
|
||||
"No admin users found; assuming hub upgrade. Initializing default roles for all entities"
|
||||
)
|
||||
# make sure all users, services and tokens have at least one role (update with default)
|
||||
for bearer in role_bearers:
|
||||
Class = orm.get_class(bearer)
|
||||
for obj in db.query(Class):
|
||||
# if len(obj.roles) < 1: # todo: Should I check if some roles are already assigned?
|
||||
roles.assign_default_roles(db, entity=obj)
|
||||
db.commit()
|
||||
roles.check_for_default_roles(db, bearer)
|
||||
|
||||
# now add roles to tokens if their owner's permissions allow
|
||||
roles.add_predef_roles_tokens(db, self.load_roles)
|
||||
|
||||
# check tokens for default roles
|
||||
roles.check_for_default_roles(db, bearer='tokens')
|
||||
|
||||
async def _add_tokens(self, token_dict, kind):
|
||||
"""Add tokens for users or services to the database"""
|
||||
|
@@ -167,6 +167,14 @@ api_token_role_map = Table(
|
||||
Column('role_id', ForeignKey('roles.id', ondelete='CASCADE'), primary_key=True),
|
||||
)
|
||||
|
||||
# group:role many:many mapping table
|
||||
group_role_map = Table(
|
||||
'group_role_map',
|
||||
Base.metadata,
|
||||
Column('group_id', ForeignKey('groups.id', ondelete='CASCADE'), primary_key=True),
|
||||
Column('role_id', ForeignKey('roles.id', ondelete='CASCADE'), primary_key=True),
|
||||
)
|
||||
|
||||
|
||||
class Role(Base):
|
||||
"""User Roles"""
|
||||
@@ -179,6 +187,7 @@ class Role(Base):
|
||||
users = relationship('User', secondary='user_role_map', backref='roles')
|
||||
services = relationship('Service', secondary='service_role_map', backref='roles')
|
||||
tokens = relationship('APIToken', secondary='api_token_role_map', backref='roles')
|
||||
groups = relationship('Group', secondary='group_role_map', backref='roles')
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s %s (%s) - scopes: %s>" % (
|
||||
|
@@ -3,6 +3,9 @@
|
||||
# Distributed under the terms of the Modified BSD License.
|
||||
from itertools import chain
|
||||
|
||||
from sqlalchemy import func
|
||||
from tornado.log import app_log
|
||||
|
||||
from . import orm
|
||||
|
||||
|
||||
@@ -35,7 +38,9 @@ def get_default_roles():
|
||||
{
|
||||
'name': 'server',
|
||||
'description': 'Post activity only',
|
||||
'scopes': ['users:activity!user=username'],
|
||||
'scopes': [
|
||||
'users:activity'
|
||||
], # TO DO - fix scope to refer to only self once implemented
|
||||
},
|
||||
{
|
||||
'name': 'token',
|
||||
@@ -103,13 +108,29 @@ def _get_scope_hierarchy():
|
||||
return scopes
|
||||
|
||||
|
||||
def expand_scope(scopename):
|
||||
def horizontal_filter(func):
|
||||
"""Decorator to account for horizontal filtering in scope syntax"""
|
||||
|
||||
def ignore(scopename):
|
||||
# temporarily remove horizontal filtering if present
|
||||
scopename, mark, hor_filter = scopename.partition('!')
|
||||
expanded_scope = func(scopename)
|
||||
# add the filter back
|
||||
full_expanded_scope = {scope + mark + hor_filter for scope in expanded_scope}
|
||||
|
||||
return full_expanded_scope
|
||||
|
||||
return ignore
|
||||
|
||||
|
||||
@horizontal_filter
|
||||
def _expand_scope(scopename):
|
||||
"""Returns a set of all subscopes"""
|
||||
|
||||
scopes = _get_scope_hierarchy()
|
||||
subscopes = [scopename]
|
||||
|
||||
def expand_subscopes(index):
|
||||
def _expand_subscopes(index):
|
||||
|
||||
more_subscopes = list(
|
||||
filter(lambda scope: scope in scopes.keys(), subscopes[index:])
|
||||
@@ -122,9 +143,9 @@ def expand_scope(scopename):
|
||||
# record the index from where it should check for "subscopes of sub-subscopes"
|
||||
index_for_sssc = len(subscopes)
|
||||
# check for "subscopes of subscopes"
|
||||
expand_subscopes(index=1)
|
||||
_expand_subscopes(index=1)
|
||||
# check for "subscopes of sub-subscopes"
|
||||
expand_subscopes(index=index_for_sssc)
|
||||
_expand_subscopes(index=index_for_sssc)
|
||||
|
||||
expanded_scope = set(subscopes)
|
||||
|
||||
@@ -134,6 +155,16 @@ def expand_scope(scopename):
|
||||
def expand_roles_to_scopes(orm_object):
|
||||
"""Get the scopes listed in the roles of the User/Service/Group/Token"""
|
||||
scopes = _get_subscopes(*orm_object.roles)
|
||||
"""Get the scopes listed in the roles of the User/Service/Group/Token
|
||||
If User, take into account the user's groups roles as well"""
|
||||
|
||||
pass_roles = orm_object.roles
|
||||
if isinstance(orm_object, orm.User):
|
||||
groups_roles = []
|
||||
for group in orm_object.groups:
|
||||
groups_roles.extend(group.roles)
|
||||
pass_roles.extend(groups_roles)
|
||||
scopes = _get_subscopes(*pass_roles)
|
||||
if 'self' in scopes:
|
||||
scopes.remove('self')
|
||||
if isinstance(orm_object, orm.User) or hasattr(orm_object, 'orm_user'):
|
||||
@@ -149,16 +180,57 @@ def _get_subscopes(*args):
|
||||
for role in args:
|
||||
scope_list.extend(role.scopes)
|
||||
|
||||
scopes = set(chain.from_iterable(list(map(expand_scope, scope_list))))
|
||||
scopes = set(chain.from_iterable(list(map(_expand_scope, scope_list))))
|
||||
|
||||
return scopes
|
||||
|
||||
|
||||
def _check_scopes(*args):
|
||||
"""Check if provided scopes exist"""
|
||||
|
||||
allowed_scopes = _get_scope_hierarchy()
|
||||
allowed_filters = ['!user=', '!service=', '!group=', '!server=']
|
||||
subscopes = set(
|
||||
chain.from_iterable([x for x in allowed_scopes.values() if x is not None])
|
||||
)
|
||||
|
||||
for scope in args:
|
||||
# check the ! filters
|
||||
if '!' in scope:
|
||||
if any(filter in scope for filter in allowed_filters):
|
||||
scope = scope.split('!', 1)[0]
|
||||
else:
|
||||
raise NameError(
|
||||
'Scope filter %r in scope %r does not exist',
|
||||
scope.split('!', 1)[1],
|
||||
scope,
|
||||
)
|
||||
# check if the actual scope syntax exists
|
||||
if scope not in allowed_scopes.keys() and scope not in subscopes:
|
||||
raise NameError('Scope %r does not exist', scope)
|
||||
|
||||
|
||||
def _overwrite_role(role, role_dict):
|
||||
"""Overwrites role's description and/or scopes with role_dict if role not 'admin'"""
|
||||
|
||||
for attr in role_dict.keys():
|
||||
if attr == 'description' or attr == 'scopes':
|
||||
if role.name == 'admin' and role_dict[attr] != getattr(role, attr):
|
||||
raise ValueError(
|
||||
'admin role description or scopes cannot be overwritten'
|
||||
)
|
||||
else:
|
||||
setattr(role, attr, role_dict[attr])
|
||||
app_log.info('Role %r %r attribute has been changed', role.name, attr)
|
||||
|
||||
|
||||
def create_role(db, role_dict):
|
||||
"""Adds a new role to database or modifies an existing one"""
|
||||
|
||||
default_roles = get_default_roles()
|
||||
|
||||
if 'name' not in role_dict.keys():
|
||||
raise ValueError('Role must have a name')
|
||||
raise KeyError('Role definition must have a name')
|
||||
else:
|
||||
name = role_dict['name']
|
||||
role = orm.Role.find(db, name)
|
||||
@@ -166,21 +238,45 @@ def create_role(db, role_dict):
|
||||
description = role_dict.get('description')
|
||||
scopes = role_dict.get('scopes')
|
||||
|
||||
# check if the provided scopes exist
|
||||
if scopes:
|
||||
_check_scopes(*scopes)
|
||||
|
||||
if role is None:
|
||||
if not scopes:
|
||||
app_log.warning('Warning: New defined role %s has no scopes', name)
|
||||
|
||||
role = orm.Role(name=name, description=description, scopes=scopes)
|
||||
db.add(role)
|
||||
if role_dict not in default_roles:
|
||||
app_log.info('Role %s added to database', name)
|
||||
else:
|
||||
if description:
|
||||
role.description = description
|
||||
if scopes:
|
||||
role.scopes = scopes
|
||||
_overwrite_role(role, role_dict)
|
||||
|
||||
db.commit()
|
||||
|
||||
|
||||
def remove_role(db, rolename):
|
||||
"""Removes a role from database"""
|
||||
|
||||
# default roles are not removable
|
||||
default_roles = get_default_roles()
|
||||
if any(role['name'] == rolename for role in default_roles):
|
||||
raise ValueError('Default role %r cannot be removed', rolename)
|
||||
|
||||
role = orm.Role.find(db, rolename)
|
||||
if role:
|
||||
db.delete(role)
|
||||
db.commit()
|
||||
app_log.info('Role %s has been deleted', rolename)
|
||||
else:
|
||||
raise NameError('Cannot remove role %r that does not exist', rolename)
|
||||
|
||||
|
||||
def existing_only(func):
|
||||
"""Decorator for checking if objects and roles exist"""
|
||||
|
||||
def check_existence(db, entity, rolename):
|
||||
def _check_existence(db, entity, rolename):
|
||||
role = orm.Role.find(db, rolename)
|
||||
if entity is None:
|
||||
raise ValueError(
|
||||
@@ -191,23 +287,44 @@ def existing_only(func):
|
||||
else:
|
||||
func(db, entity, role)
|
||||
|
||||
return check_existence
|
||||
return _check_existence
|
||||
|
||||
|
||||
@existing_only
|
||||
def grant_role(db, entity, rolename):
|
||||
"""Adds a role for users, services or tokens"""
|
||||
if isinstance(entity, orm.APIToken):
|
||||
entity_repr = entity
|
||||
else:
|
||||
entity_repr = entity.name
|
||||
|
||||
if rolename not in entity.roles:
|
||||
entity.roles.append(rolename)
|
||||
db.commit()
|
||||
app_log.info(
|
||||
'Adding role %s for %s: %s',
|
||||
rolename.name,
|
||||
type(entity).__name__,
|
||||
entity_repr,
|
||||
)
|
||||
|
||||
|
||||
@existing_only
|
||||
def strip_role(db, entity, rolename):
|
||||
"""Removes a role for users, services or tokens"""
|
||||
if isinstance(entity, orm.APIToken):
|
||||
entity_repr = entity
|
||||
else:
|
||||
entity_repr = entity.name
|
||||
if rolename in entity.roles:
|
||||
entity.roles.remove(rolename)
|
||||
db.commit()
|
||||
app_log.info(
|
||||
'Removing role %s for %s: %s',
|
||||
rolename.name,
|
||||
type(entity).__name__,
|
||||
entity_repr,
|
||||
)
|
||||
|
||||
|
||||
def _switch_default_role(db, obj, admin):
|
||||
@@ -228,20 +345,56 @@ def _switch_default_role(db, obj, admin):
|
||||
add_and_remove(db, obj, admin_role, user_role)
|
||||
|
||||
|
||||
def _token_allowed_role(db, token, role):
|
||||
|
||||
"""Returns True if token allowed to have requested role through
|
||||
comparing the requested scopes with the set of token's owner scopes"""
|
||||
|
||||
standard_permissions = {'all', 'read:all'}
|
||||
|
||||
token_scopes = _get_subscopes(role)
|
||||
extra_scopes = token_scopes - standard_permissions
|
||||
# ignore horizontal filters
|
||||
raw_extra_scopes = {
|
||||
scope.split('!', 1)[0] if '!' in scope else scope for scope in extra_scopes
|
||||
}
|
||||
# find the owner and their roles
|
||||
owner = None
|
||||
if token.user_id:
|
||||
owner = db.query(orm.User).get(token.user_id)
|
||||
elif token.service_id:
|
||||
owner = db.query(orm.Service).get(token.service_id)
|
||||
if owner:
|
||||
owner_scopes = expand_roles_to_scopes(owner)
|
||||
# ignore horizontal filters
|
||||
raw_owner_scopes = {
|
||||
scope.split('!', 1)[0] if '!' in scope else scope for scope in owner_scopes
|
||||
}
|
||||
if (raw_extra_scopes).issubset(raw_owner_scopes):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
raise ValueError('Owner the token %r not found', token)
|
||||
|
||||
|
||||
def assign_default_roles(db, entity):
|
||||
"""Assigns the default roles to an entity:
|
||||
users and services get 'user' role, or admin role if they have admin flag
|
||||
Tokens get 'token' role"""
|
||||
default_token_role = orm.Role.find(db, 'token')
|
||||
# tokens can have only 'token' role as default
|
||||
# assign the default only for tokens
|
||||
if isinstance(entity, orm.APIToken):
|
||||
if isinstance(entity, orm.Group):
|
||||
pass
|
||||
elif isinstance(entity, orm.APIToken):
|
||||
app_log.debug('Assigning default roles to tokens')
|
||||
if not entity.roles and (entity.user or entity.service) is not None:
|
||||
default_token_role.tokens.append(entity)
|
||||
app_log.info('Added role %s to token %s', default_token_role.name, entity)
|
||||
db.commit()
|
||||
# users and services can have 'user' or 'admin' roles as default
|
||||
else:
|
||||
# todo: when we deprecate admin flag: replace with role check
|
||||
app_log.debug('Assigning default roles to %s', type(entity).__name__)
|
||||
_switch_default_role(db, entity, entity.admin)
|
||||
|
||||
|
||||
@@ -252,27 +405,22 @@ def update_roles(db, entity, roles):
|
||||
if isinstance(entity, orm.APIToken):
|
||||
role = orm.Role.find(db, rolename)
|
||||
if role:
|
||||
# compare the requested role permissions with the owner's permissions (scopes)
|
||||
token_scopes = _get_subscopes(role)
|
||||
extra_scopes = token_scopes - standard_permissions
|
||||
# find the owner and their roles
|
||||
owner = None
|
||||
if entity.user_id:
|
||||
owner = db.query(orm.User).get(entity.user_id)
|
||||
elif entity.service_id:
|
||||
owner = db.query(orm.Service).get(entity.service_id)
|
||||
if owner:
|
||||
owner_scopes = expand_roles_to_scopes(owner)
|
||||
if extra_scopes.issubset(owner_scopes):
|
||||
app_log.debug(
|
||||
'Checking token permissions against requested role %s', rolename
|
||||
)
|
||||
if _token_allowed_role(db, entity, role):
|
||||
role.tokens.append(entity)
|
||||
app_log.info('Adding role %s for token: %s', role.name, entity)
|
||||
else:
|
||||
raise ValueError(
|
||||
'Requested token role %r has more permissions than the token owner: [%s]'
|
||||
% (rolename, ",".join(extra_scopes - owner_scopes))
|
||||
'Requested token role %r of %r has more permissions than the token owner',
|
||||
rolename,
|
||||
entity,
|
||||
)
|
||||
else:
|
||||
raise NameError('Role %r does not exist' % rolename)
|
||||
else:
|
||||
app_log.debug('Assigning default roles to %s', type(entity).__name__)
|
||||
grant_role(db, entity=entity, rolename=rolename)
|
||||
|
||||
|
||||
@@ -283,4 +431,5 @@ def mock_roles(app, name, kind):
|
||||
default_roles = get_default_roles()
|
||||
for role in default_roles:
|
||||
create_role(app.db, role)
|
||||
app_log.info('Assigning default roles to mocked %s: %s', kind[:-1], name)
|
||||
assign_default_roles(db=app.db, entity=obj)
|
||||
|
@@ -1455,7 +1455,7 @@ async def test_groups_list(app):
|
||||
r = await api_request(app, 'groups')
|
||||
r.raise_for_status()
|
||||
reply = r.json()
|
||||
assert reply == [{'kind': 'group', 'name': 'alphaflight', 'users': []}]
|
||||
assert reply == [{'kind': 'group', 'name': 'alphaflight', 'users': [], 'roles': []}]
|
||||
|
||||
|
||||
@mark.group
|
||||
@@ -1490,7 +1490,12 @@ async def test_group_get(app):
|
||||
r = await api_request(app, 'groups/alphaflight')
|
||||
r.raise_for_status()
|
||||
reply = r.json()
|
||||
assert reply == {'kind': 'group', 'name': 'alphaflight', 'users': ['sasquatch']}
|
||||
assert reply == {
|
||||
'kind': 'group',
|
||||
'name': 'alphaflight',
|
||||
'users': ['sasquatch'],
|
||||
'roles': [],
|
||||
}
|
||||
|
||||
|
||||
@mark.group
|
||||
|
@@ -2,8 +2,11 @@
|
||||
# Copyright (c) Jupyter Development Team.
|
||||
# Distributed under the terms of the Modified BSD License.
|
||||
import json
|
||||
from itertools import chain
|
||||
|
||||
import pytest
|
||||
from pytest import mark
|
||||
from tornado.log import app_log
|
||||
|
||||
from .. import orm
|
||||
from .. import roles
|
||||
@@ -30,6 +33,10 @@ def test_orm_roles(db):
|
||||
db.add(service_role)
|
||||
db.commit()
|
||||
|
||||
group_role = orm.Role(name='group', scopes=['read:users'])
|
||||
db.add(group_role)
|
||||
db.commit()
|
||||
|
||||
user = orm.User(name='falafel')
|
||||
db.add(user)
|
||||
db.commit()
|
||||
@@ -38,23 +45,33 @@ def test_orm_roles(db):
|
||||
db.add(service)
|
||||
db.commit()
|
||||
|
||||
group = orm.Group(name='fast-food')
|
||||
db.add(group)
|
||||
db.commit()
|
||||
|
||||
assert user_role.users == []
|
||||
assert user_role.services == []
|
||||
assert user_role.groups == []
|
||||
assert service_role.users == []
|
||||
assert service_role.services == []
|
||||
assert service_role.groups == []
|
||||
assert user.roles == []
|
||||
assert service.roles == []
|
||||
assert group.roles == []
|
||||
|
||||
user_role.users.append(user)
|
||||
service_role.services.append(service)
|
||||
group_role.groups.append(group)
|
||||
db.commit()
|
||||
assert user_role.users == [user]
|
||||
assert user.roles == [user_role]
|
||||
assert service_role.services == [service]
|
||||
assert service.roles == [service_role]
|
||||
assert group_role.groups == [group]
|
||||
assert group.roles == [group_role]
|
||||
|
||||
# check token creation without specifying its role
|
||||
# assigns it the default 'user' role
|
||||
# assigns it the default 'token' role
|
||||
token = user.new_api_token()
|
||||
user_token = orm.APIToken.find(db, token=token)
|
||||
assert user_token in token_role.tokens
|
||||
@@ -75,12 +92,18 @@ def test_orm_roles(db):
|
||||
db.delete(service_token)
|
||||
db.commit()
|
||||
assert service_token not in service_role.tokens
|
||||
# check deleting the 'service' role removes it from service roles
|
||||
# check deleting the service_role removes it from service.roles
|
||||
db.delete(service_role)
|
||||
db.commit()
|
||||
assert service.roles == []
|
||||
# check deleting the group removes it from group_roles
|
||||
db.delete(group)
|
||||
db.commit()
|
||||
assert group_role.groups == []
|
||||
|
||||
# clean up db
|
||||
db.delete(service)
|
||||
db.delete(group_role)
|
||||
db.commit()
|
||||
|
||||
|
||||
@@ -176,6 +199,10 @@ def test_orm_roles_delete_cascade(db):
|
||||
),
|
||||
(['read:users:servers'], {'read:users:servers'}),
|
||||
(['admin:groups'], {'admin:groups'}),
|
||||
(
|
||||
['users:tokens!group=hobbits'],
|
||||
{'users:tokens!group=hobbits', 'read:users:tokens!group=hobbits'},
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_get_subscopes(db, scopes, subscopes):
|
||||
@@ -187,6 +214,7 @@ def test_get_subscopes(db, scopes, subscopes):
|
||||
db.delete(role)
|
||||
|
||||
|
||||
@mark.role
|
||||
async def test_load_default_roles(tmpdir, request):
|
||||
"""Test loading default roles in app.py"""
|
||||
kwargs = {}
|
||||
@@ -198,10 +226,161 @@ async def test_load_default_roles(tmpdir, request):
|
||||
db = hub.db
|
||||
await hub.init_roles()
|
||||
# test default roles loaded to database
|
||||
assert orm.Role.find(db, 'user') is not None
|
||||
assert orm.Role.find(db, 'admin') is not None
|
||||
assert orm.Role.find(db, 'server') is not None
|
||||
assert orm.Role.find(db, 'token') is not None
|
||||
default_roles = roles.get_default_roles()
|
||||
for role in default_roles:
|
||||
assert orm.Role.find(db, role['name']) is not None
|
||||
|
||||
|
||||
@mark.role
|
||||
@mark.parametrize(
|
||||
"role, role_def, response_type, response",
|
||||
[
|
||||
(
|
||||
'new-role',
|
||||
{
|
||||
'name': 'new-role',
|
||||
'description': 'Some description',
|
||||
'scopes': ['groups'],
|
||||
},
|
||||
'info',
|
||||
app_log.info('Role new-role added to database'),
|
||||
),
|
||||
('no_name', {'scopes': ['users']}, 'error', KeyError),
|
||||
(
|
||||
'no_scopes',
|
||||
{'name': 'no-permissions'},
|
||||
'warning',
|
||||
app_log.warning('Warning: New defined role no-permissions has no scopes'),
|
||||
),
|
||||
(
|
||||
'admin',
|
||||
{'name': 'admin', 'scopes': ['admin:users']},
|
||||
'error',
|
||||
ValueError,
|
||||
),
|
||||
(
|
||||
'admin',
|
||||
{'name': 'admin', 'description': 'New description'},
|
||||
'error',
|
||||
ValueError,
|
||||
),
|
||||
(
|
||||
'user',
|
||||
{'name': 'user', 'scopes': ['read:users:name']},
|
||||
'info',
|
||||
app_log.info('Role user scopes attribute has been changed'),
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_adding_new_roles(
|
||||
tmpdir, request, role, role_def, response_type, response
|
||||
):
|
||||
"""Test raising errors and warnings when creating new roles"""
|
||||
|
||||
kwargs = {'load_roles': [role_def]}
|
||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||
if ssl_enabled:
|
||||
kwargs['internal_certs_location'] = str(tmpdir)
|
||||
hub = MockHub(**kwargs)
|
||||
hub.init_db()
|
||||
db = hub.db
|
||||
|
||||
if response_type == 'error':
|
||||
with pytest.raises(response):
|
||||
await hub.init_roles()
|
||||
|
||||
elif response_type == 'warning' or response_type == 'info':
|
||||
with pytest.warns(response):
|
||||
await hub.init_roles()
|
||||
role = orm.Role.find(db, role_def['name'])
|
||||
assert role is not None
|
||||
if 'description' in role_def.keys():
|
||||
assert role.description == role_def['description']
|
||||
if 'scopes' in role_def.keys():
|
||||
assert role.scopes == role_def['scopes']
|
||||
|
||||
|
||||
@mark.role
|
||||
@mark.parametrize(
|
||||
"role_type, rolename, response_type, response",
|
||||
[
|
||||
(
|
||||
'existing',
|
||||
'test-role1',
|
||||
'info',
|
||||
app_log.info('Role user scopes attribute has been changed'),
|
||||
),
|
||||
('non-existing', 'test-role2', 'error', NameError),
|
||||
('default', 'user', 'error', ValueError),
|
||||
],
|
||||
)
|
||||
async def test_delete_roles(db, role_type, rolename, response_type, response):
|
||||
"""Test raising errors and info when deleting roles"""
|
||||
|
||||
if response_type == 'info':
|
||||
# add the role to db
|
||||
test_role = orm.Role(name=rolename)
|
||||
db.add(test_role)
|
||||
db.commit()
|
||||
check_role = orm.Role.find(db, rolename)
|
||||
assert check_role is not None
|
||||
# check the role is deleted and info raised
|
||||
with pytest.warns(response):
|
||||
roles.remove_role(db, rolename)
|
||||
check_role = orm.Role.find(db, rolename)
|
||||
assert check_role is None
|
||||
|
||||
elif response_type == 'error':
|
||||
with pytest.raises(response):
|
||||
roles.remove_role(db, rolename)
|
||||
|
||||
|
||||
@mark.role
|
||||
@mark.parametrize(
|
||||
"role, response",
|
||||
[
|
||||
(
|
||||
{
|
||||
'name': 'test-scopes-1',
|
||||
'scopes': [
|
||||
'users',
|
||||
'users!user=charlie',
|
||||
'admin:groups',
|
||||
'read:users:tokens',
|
||||
],
|
||||
},
|
||||
'existing',
|
||||
),
|
||||
({'name': 'test-scopes-2', 'scopes': ['uses']}, NameError),
|
||||
({'name': 'test-scopes-3', 'scopes': ['users:activities']}, NameError),
|
||||
({'name': 'test-scopes-4', 'scopes': ['groups!goup=class-A']}, NameError),
|
||||
],
|
||||
)
|
||||
async def test_scope_existence(tmpdir, request, role, response):
|
||||
"""Test checking of scopes provided in role definitions"""
|
||||
kwargs = {'load_roles': [role]}
|
||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||
if ssl_enabled:
|
||||
kwargs['internal_certs_location'] = str(tmpdir)
|
||||
hub = MockHub(**kwargs)
|
||||
hub.init_db()
|
||||
db = hub.db
|
||||
|
||||
if response == 'existing':
|
||||
roles.add_role(db, role)
|
||||
added_role = orm.Role.find(db, role['name'])
|
||||
assert added_role is not None
|
||||
assert added_role.scopes == role['scopes']
|
||||
|
||||
elif response == NameError:
|
||||
with pytest.raises(response):
|
||||
roles.add_role(db, role)
|
||||
added_role = orm.Role.find(db, role['name'])
|
||||
assert added_role is None
|
||||
|
||||
# delete the tested roles
|
||||
if added_role:
|
||||
roles.remove_role(db, added_role.name)
|
||||
|
||||
|
||||
@mark.role
|
||||
@@ -214,12 +393,6 @@ async def test_load_roles_users(tmpdir, request):
|
||||
'scopes': ['users', 'groups'],
|
||||
'users': ['cyclops', 'gandalf'],
|
||||
},
|
||||
{
|
||||
'name': 'user',
|
||||
'description': 'Only read access',
|
||||
'scopes': ['read:all'],
|
||||
'users': ['bilbo'],
|
||||
},
|
||||
]
|
||||
kwargs = {'load_roles': roles_to_load}
|
||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||
@@ -233,12 +406,8 @@ async def test_load_roles_users(tmpdir, request):
|
||||
await hub.init_users()
|
||||
await hub.init_roles()
|
||||
|
||||
# test if the 'user' role has been overwritten and assigned
|
||||
user_role = orm.Role.find(db, 'user')
|
||||
admin_role = orm.Role.find(db, 'admin')
|
||||
assert user_role is not None
|
||||
assert user_role.scopes == ['read:all']
|
||||
|
||||
user_role = orm.Role.find(db, 'user')
|
||||
# test if every user has a role (and no duplicates)
|
||||
# and admins have admin role
|
||||
for user in db.query(orm.User):
|
||||
@@ -256,40 +425,51 @@ async def test_load_roles_users(tmpdir, request):
|
||||
cyclops_user = orm.User.find(db, name='cyclops')
|
||||
assert teacher_role in cyclops_user.roles
|
||||
|
||||
# delete the test roles
|
||||
for role in roles_to_load:
|
||||
roles.remove_role(db, role['name'])
|
||||
|
||||
|
||||
@mark.role
|
||||
async def test_load_roles_services(tmpdir, request):
|
||||
services = [
|
||||
{'name': 'cull_idle', 'api_token': 'some-token'},
|
||||
{'name': 'idle-culler', 'api_token': 'some-token'},
|
||||
{'name': 'user_service', 'api_token': 'some-other-token'},
|
||||
{'name': 'admin_service', 'api_token': 'secret-token', 'admin': True},
|
||||
{'name': 'admin_service', 'api_token': 'secret-token'},
|
||||
]
|
||||
service_tokens = {
|
||||
'some-token': 'idle-culler',
|
||||
'some-other-token': 'user_service',
|
||||
'secret-token': 'admin_service',
|
||||
}
|
||||
roles_to_load = [
|
||||
{
|
||||
'name': 'culler',
|
||||
'name': 'idle-culler',
|
||||
'description': 'Cull idle servers',
|
||||
'scopes': ['users:servers', 'admin:servers'],
|
||||
'services': ['cull_idle'],
|
||||
'scopes': [
|
||||
'read:users:name',
|
||||
'read:users:activity',
|
||||
'read:users:servers',
|
||||
'users:servers',
|
||||
],
|
||||
'services': ['idle-culler'],
|
||||
},
|
||||
]
|
||||
kwargs = {'load_roles': roles_to_load}
|
||||
kwargs = {
|
||||
'load_roles': roles_to_load,
|
||||
'services': services,
|
||||
'service_tokens': service_tokens,
|
||||
}
|
||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||
if ssl_enabled:
|
||||
kwargs['internal_certs_location'] = str(tmpdir)
|
||||
hub = MockHub(**kwargs)
|
||||
hub.init_db()
|
||||
db = hub.db
|
||||
# clean db of previous services and add testing ones
|
||||
for service in db.query(orm.Service):
|
||||
db.delete(service)
|
||||
db.commit()
|
||||
for service in services:
|
||||
orm_service = orm.Service.find(db, name=service['name'])
|
||||
if orm_service is None:
|
||||
# not found, create a new one
|
||||
orm_service = orm.Service(name=service['name'])
|
||||
db.add(orm_service)
|
||||
orm_service.admin = service.get('admin', False)
|
||||
await hub.init_api_tokens()
|
||||
# make 'admin_service' admin
|
||||
admin_service = orm.Service.find(db, 'admin_service')
|
||||
admin_service.admin = True
|
||||
db.commit()
|
||||
await hub.init_roles()
|
||||
|
||||
@@ -298,9 +478,9 @@ async def test_load_roles_services(tmpdir, request):
|
||||
user_role = orm.Role.find(db, name='user')
|
||||
|
||||
# test if predefined roles loaded and assigned
|
||||
culler_role = orm.Role.find(db, name='culler')
|
||||
cull_idle = orm.Service.find(db, name='cull_idle')
|
||||
assert culler_role in cull_idle.roles
|
||||
culler_role = orm.Role.find(db, name='idle-culler')
|
||||
culler_service = orm.Service.find(db, name='idle-culler')
|
||||
assert culler_role in culler_service.roles
|
||||
|
||||
# test if every service has a role (and no duplicates)
|
||||
for service in db.query(orm.Service):
|
||||
@@ -320,38 +500,83 @@ async def test_load_roles_services(tmpdir, request):
|
||||
db.delete(service)
|
||||
db.commit()
|
||||
|
||||
# delete the test tokens
|
||||
for token in db.query(orm.APIToken):
|
||||
db.delete(token)
|
||||
db.commit()
|
||||
|
||||
# delete the test roles
|
||||
for role in roles_to_load:
|
||||
roles.remove_role(db, role['name'])
|
||||
|
||||
|
||||
@mark.role
|
||||
async def test_load_roles_tokens(tmpdir, request):
|
||||
services = [
|
||||
{'name': 'cull_idle', 'admin': True, 'api_token': 'another-secret-token'}
|
||||
]
|
||||
user_tokens = {
|
||||
'secret-token': 'cyclops',
|
||||
'super-secret-token': 'admin',
|
||||
}
|
||||
service_tokens = {
|
||||
'another-secret-token': 'cull_idle',
|
||||
async def test_load_roles_groups(tmpdir, request):
|
||||
"""Test loading predefined roles for groups in app.py"""
|
||||
groups_to_load = {
|
||||
'group1': ['gandalf'],
|
||||
'group2': ['bilbo', 'gargamel'],
|
||||
'group3': ['cyclops'],
|
||||
}
|
||||
roles_to_load = [
|
||||
{
|
||||
'name': 'culler',
|
||||
'description': 'Cull idle servers',
|
||||
'scopes': ['users:servers', 'admin:servers'],
|
||||
'tokens': ['another-secret-token'],
|
||||
'name': 'assistant',
|
||||
'description': 'Access users information only',
|
||||
'scopes': ['read:users'],
|
||||
'groups': ['group2'],
|
||||
},
|
||||
{
|
||||
'name': 'admin',
|
||||
'description': 'Admin access',
|
||||
'scopes': ['a lot'],
|
||||
'users': ['admin'],
|
||||
'name': 'head',
|
||||
'description': 'Whole user access',
|
||||
'scopes': ['users', 'admin:users'],
|
||||
'groups': ['group3'],
|
||||
},
|
||||
]
|
||||
kwargs = {'load_groups': groups_to_load, 'load_roles': roles_to_load}
|
||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||
if ssl_enabled:
|
||||
kwargs['internal_certs_location'] = str(tmpdir)
|
||||
hub = MockHub(**kwargs)
|
||||
hub.init_db()
|
||||
db = hub.db
|
||||
await hub.init_groups()
|
||||
await hub.init_roles()
|
||||
|
||||
assist_role = orm.Role.find(db, name='assistant')
|
||||
head_role = orm.Role.find(db, name='head')
|
||||
|
||||
group1 = orm.Group.find(db, name='group1')
|
||||
group2 = orm.Group.find(db, name='group2')
|
||||
group3 = orm.Group.find(db, name='group3')
|
||||
|
||||
# test group roles
|
||||
assert group1.roles == []
|
||||
assert group2 in assist_role.groups
|
||||
assert group3 in head_role.groups
|
||||
|
||||
# delete the test roles
|
||||
for role in roles_to_load:
|
||||
roles.remove_role(db, role['name'])
|
||||
|
||||
|
||||
@mark.role
|
||||
async def test_load_roles_user_tokens(tmpdir, request):
|
||||
user_tokens = {
|
||||
'secret-token': 'cyclops',
|
||||
'secrety-token': 'gandalf',
|
||||
'super-secret-token': 'admin',
|
||||
}
|
||||
roles_to_load = [
|
||||
{
|
||||
'name': 'reader',
|
||||
'description': 'Read all users models',
|
||||
'scopes': ['read:users'],
|
||||
'tokens': ['super-secret-token'],
|
||||
},
|
||||
]
|
||||
kwargs = {
|
||||
'load_roles': roles_to_load,
|
||||
'services': services,
|
||||
'api_tokens': user_tokens,
|
||||
'service_tokens': service_tokens,
|
||||
}
|
||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||
if ssl_enabled:
|
||||
@@ -365,37 +590,225 @@ async def test_load_roles_tokens(tmpdir, request):
|
||||
await hub.init_api_tokens()
|
||||
await hub.init_roles()
|
||||
|
||||
# test if another-secret-token has culler role
|
||||
service = orm.Service.find(db, 'cull_idle')
|
||||
culler_role = orm.Role.find(db, 'culler')
|
||||
# test if gandalf's token has the 'reader' role
|
||||
reader_role = orm.Role.find(db, 'reader')
|
||||
token = orm.APIToken.find(db, 'super-secret-token')
|
||||
assert reader_role in token.roles
|
||||
|
||||
# test if all other tokens have default 'user' role
|
||||
token_role = orm.Role.find(db, 'token')
|
||||
secret_token = orm.APIToken.find(db, 'secret-token')
|
||||
assert token_role in secret_token.roles
|
||||
secrety_token = orm.APIToken.find(db, 'secrety-token')
|
||||
assert token_role in secrety_token.roles
|
||||
|
||||
# delete the test tokens
|
||||
for token in db.query(orm.APIToken):
|
||||
db.delete(token)
|
||||
db.commit()
|
||||
|
||||
# delete the test roles
|
||||
for role in roles_to_load:
|
||||
roles.remove_role(db, role['name'])
|
||||
|
||||
|
||||
@mark.role
|
||||
async def test_load_roles_user_tokens_not_allowed(tmpdir, request):
|
||||
user_tokens = {
|
||||
'secret-token': 'bilbo',
|
||||
}
|
||||
roles_to_load = [
|
||||
{
|
||||
'name': 'user-creator',
|
||||
'description': 'Creates/deletes any user',
|
||||
'scopes': ['admin:users'],
|
||||
'tokens': ['secret-token'],
|
||||
},
|
||||
]
|
||||
kwargs = {
|
||||
'load_roles': roles_to_load,
|
||||
'api_tokens': user_tokens,
|
||||
}
|
||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||
if ssl_enabled:
|
||||
kwargs['internal_certs_location'] = str(tmpdir)
|
||||
hub = MockHub(**kwargs)
|
||||
hub.init_db()
|
||||
db = hub.db
|
||||
hub.authenticator.allowed_users = ['bilbo']
|
||||
await hub.init_users()
|
||||
await hub.init_api_tokens()
|
||||
|
||||
response = 'allowed'
|
||||
# bilbo has only default 'user' role
|
||||
# while bilbo's token is requesting role with higher permissions
|
||||
with pytest.raises(ValueError):
|
||||
await hub.init_roles()
|
||||
|
||||
# delete the test tokens
|
||||
for token in db.query(orm.APIToken):
|
||||
db.delete(token)
|
||||
db.commit()
|
||||
|
||||
# delete the test roles
|
||||
for role in roles_to_load:
|
||||
roles.remove_role(db, role['name'])
|
||||
|
||||
|
||||
@mark.role
|
||||
async def test_load_roles_service_tokens(tmpdir, request):
|
||||
services = [{'name': 'idle-culler', 'api_token': 'another-secret-token'}]
|
||||
service_tokens = {
|
||||
'another-secret-token': 'idle-culler',
|
||||
}
|
||||
roles_to_load = [
|
||||
{
|
||||
'name': 'idle-culler',
|
||||
'description': 'Cull idle servers',
|
||||
'scopes': [
|
||||
'read:users:name',
|
||||
'read:users:activity',
|
||||
'read:users:servers',
|
||||
'users:servers',
|
||||
],
|
||||
'services': ['idle-culler'],
|
||||
'tokens': ['another-secret-token'],
|
||||
},
|
||||
{
|
||||
'name': 'admin',
|
||||
'description': 'Admin access',
|
||||
'scopes': ['a lot'],
|
||||
'users': ['admin'],
|
||||
},
|
||||
]
|
||||
kwargs = {
|
||||
'load_roles': roles_to_load,
|
||||
'services': services,
|
||||
'service_tokens': service_tokens,
|
||||
}
|
||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||
if ssl_enabled:
|
||||
kwargs['internal_certs_location'] = str(tmpdir)
|
||||
hub = MockHub(**kwargs)
|
||||
hub.init_db()
|
||||
db = hub.db
|
||||
await hub.init_api_tokens()
|
||||
await hub.init_roles()
|
||||
|
||||
# test if another-secret-token has idle-culler role
|
||||
service = orm.Service.find(db, 'idle-culler')
|
||||
culler_role = orm.Role.find(db, 'idle-culler')
|
||||
token = orm.APIToken.find(db, 'another-secret-token')
|
||||
assert len(token.roles) == 1
|
||||
assert culler_role in token.roles
|
||||
|
||||
# test if all other tokens have default 'user' role
|
||||
token_role = orm.Role.find(db, 'token')
|
||||
sec_token = orm.APIToken.find(db, 'secret-token')
|
||||
assert token_role in sec_token.roles
|
||||
s_sec_token = orm.APIToken.find(db, 'super-secret-token')
|
||||
assert token_role in s_sec_token.roles
|
||||
# delete the test services
|
||||
for service in db.query(orm.Service):
|
||||
db.delete(service)
|
||||
db.commit()
|
||||
|
||||
# delete the test tokens
|
||||
for token in db.query(orm.APIToken):
|
||||
db.delete(token)
|
||||
db.commit()
|
||||
|
||||
# delete the test roles
|
||||
for role in roles_to_load:
|
||||
roles.remove_role(db, role['name'])
|
||||
|
||||
|
||||
@mark.role
|
||||
async def test_load_roles_service_tokens_not_allowed(tmpdir, request):
|
||||
services = [{'name': 'some-service', 'api_token': 'secret-token'}]
|
||||
service_tokens = {
|
||||
'secret-token': 'some-service',
|
||||
}
|
||||
roles_to_load = [
|
||||
{
|
||||
'name': 'user-reader',
|
||||
'description': 'Read-only user models',
|
||||
'scopes': ['read:users'],
|
||||
'services': ['some-service'],
|
||||
},
|
||||
# 'idle-culler' role has higher permissions that the token's owner 'some-service'
|
||||
{
|
||||
'name': 'idle-culler',
|
||||
'description': 'Cull idle servers',
|
||||
'scopes': [
|
||||
'read:users:name',
|
||||
'read:users:activity',
|
||||
'read:users:servers',
|
||||
'users:servers',
|
||||
],
|
||||
'tokens': ['secret-token'],
|
||||
},
|
||||
]
|
||||
kwargs = {
|
||||
'load_roles': roles_to_load,
|
||||
'services': services,
|
||||
'service_tokens': service_tokens,
|
||||
}
|
||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||
if ssl_enabled:
|
||||
kwargs['internal_certs_location'] = str(tmpdir)
|
||||
hub = MockHub(**kwargs)
|
||||
hub.init_db()
|
||||
db = hub.db
|
||||
await hub.init_api_tokens()
|
||||
with pytest.raises(ValueError):
|
||||
await hub.init_roles()
|
||||
|
||||
# delete the test services
|
||||
for service in db.query(orm.Service):
|
||||
db.delete(service)
|
||||
db.commit()
|
||||
|
||||
# delete the test tokens
|
||||
for token in db.query(orm.APIToken):
|
||||
db.delete(token)
|
||||
db.commit()
|
||||
|
||||
# delete the test roles
|
||||
for role in roles_to_load:
|
||||
roles.remove_role(db, role['name'])
|
||||
|
||||
|
||||
@mark.role
|
||||
@mark.parametrize(
|
||||
"headers, role_list, status",
|
||||
"headers, rolename, scopes, status",
|
||||
[
|
||||
({}, None, 200),
|
||||
({}, ['reader'], 200),
|
||||
({}, ['non-existing'], 404),
|
||||
({}, ['user_creator'], 403),
|
||||
# no role requested - gets default 'token' role
|
||||
({}, None, None, 200),
|
||||
# role scopes within the user's default 'user' role
|
||||
({}, 'self-reader', ['read:users'], 200),
|
||||
# role scopes outside of the user's role but within the group's role scopes of which the user is a member
|
||||
({}, 'groups-reader', ['read:groups'], 200),
|
||||
# non-existing role request
|
||||
({}, 'non-existing', [], 404),
|
||||
# role scopes outside of both user's role and group's role scopes
|
||||
({}, 'users-creator', ['admin:users'], 403),
|
||||
],
|
||||
)
|
||||
async def test_get_new_token_via_api(app, headers, role_list, status):
|
||||
async def test_get_new_token_via_api(app, headers, rolename, scopes, status):
|
||||
"""Test requesting a token via API with and without roles"""
|
||||
|
||||
user = add_user(app.db, app, name='user')
|
||||
roles.create_role(app.db, {'name': 'reader', 'scopes': ['all']})
|
||||
roles.create_role(app.db, {'name': 'user_creator', 'scopes': ['admin:users']})
|
||||
if role_list:
|
||||
body = json.dumps({'roles': role_list})
|
||||
if rolename and rolename != 'non-existing':
|
||||
roles.create_role(app.db, {'name': rolename, 'scopes': scopes})
|
||||
if rolename == 'groups-reader':
|
||||
# add role for a group
|
||||
roles.create_role(app.db, {'name': 'group-role', 'scopes': ['groups']})
|
||||
# create a group and add the user and group_role
|
||||
group = orm.Group.find(app.db, 'test-group')
|
||||
if not group:
|
||||
group = orm.Group(name='test-group')
|
||||
app.db.add(group)
|
||||
group_role = orm.Role.find(app.db, 'group-role')
|
||||
group.roles.append(group_role)
|
||||
user.groups.append(group)
|
||||
app.db.commit()
|
||||
if rolename:
|
||||
body = json.dumps({'roles': [rolename]})
|
||||
else:
|
||||
body = ''
|
||||
# request a new token
|
||||
@@ -408,11 +821,11 @@ async def test_get_new_token_via_api(app, headers, role_list, status):
|
||||
# check the new-token reply for roles
|
||||
reply = r.json()
|
||||
assert 'token' in reply
|
||||
assert reply['user'] == 'user'
|
||||
if not role_list:
|
||||
assert reply['user'] == user.name
|
||||
if not rolename:
|
||||
assert reply['roles'] == ['token']
|
||||
else:
|
||||
assert reply['roles'] == ['reader']
|
||||
assert reply['roles'] == [rolename]
|
||||
token_id = reply['id']
|
||||
|
||||
# delete the token
|
||||
@@ -421,3 +834,32 @@ async def test_get_new_token_via_api(app, headers, role_list, status):
|
||||
# verify deletion
|
||||
r = await api_request(app, 'users/user/tokens', token_id)
|
||||
assert r.status_code == 404
|
||||
|
||||
|
||||
@mark.role
|
||||
@mark.parametrize(
|
||||
"kind, has_user_scopes",
|
||||
[
|
||||
('users', True),
|
||||
('services', False),
|
||||
],
|
||||
)
|
||||
async def test_self_expansion(app, kind, has_user_scopes):
|
||||
Class = orm.get_class(kind)
|
||||
orm_obj = Class(name=f'test_{kind}')
|
||||
app.db.add(orm_obj)
|
||||
app.db.commit()
|
||||
test_role = orm.Role(name='test_role', scopes=['self'])
|
||||
orm_obj.roles.append(test_role)
|
||||
# test expansion of user/service scopes
|
||||
scopes = roles.expand_roles_to_scopes(orm_obj)
|
||||
assert bool(scopes) == has_user_scopes
|
||||
|
||||
# test expansion of token scopes
|
||||
orm_obj.new_api_token()
|
||||
print(orm_obj.api_tokens[0])
|
||||
token_scopes = scopes.get_scopes_for(orm_obj.api_tokens[0])
|
||||
print(token_scopes)
|
||||
assert bool(token_scopes) == has_user_scopes
|
||||
app.db.delete(orm_obj)
|
||||
app.db.delete(test_role)
|
||||
|
@@ -92,6 +92,7 @@ async def test_external_service(app):
|
||||
await maybe_future(app.init_roles())
|
||||
await app.init_api_tokens()
|
||||
await app.proxy.add_all_services(app._service_map)
|
||||
await app.init_roles()
|
||||
|
||||
service = app._service_map[name]
|
||||
api_token = service.orm.api_tokens[0]
|
||||
|
Reference in New Issue
Block a user