resolve self in _get_subscopes

avoids inconsistent behavior in different uses of _get_subscopes where 'self' is left unmodified,
leading to errors
This commit is contained in:
Min RK
2021-04-20 14:58:34 +02:00
parent be76b5ebba
commit d8ded9aed8

View File

@@ -163,22 +163,24 @@ def _expand_scope(scopename):
def expand_roles_to_scopes(orm_object): def expand_roles_to_scopes(orm_object):
"""Get the scopes listed in the roles of the User/Service/Group/Token """Get the scopes listed in the roles of the User/Service/Group/Token
If User, take into account the user's groups roles as well""" If User, take into account the user's groups roles as well"""
from .user import User
if isinstance(orm_object, User):
orm_object = User.orm_user
pass_roles = orm_object.roles pass_roles = orm_object.roles
if isinstance(orm_object, orm.User): if isinstance(orm_object, orm.User):
groups_roles = [] groups_roles = []
for group in orm_object.groups: for group in orm_object.groups:
groups_roles.extend(group.roles) groups_roles.extend(group.roles)
pass_roles.extend(groups_roles) pass_roles.extend(groups_roles)
scopes = _get_subscopes(*pass_roles)
if 'self' in scopes: scopes = _get_subscopes(*pass_roles, owner=orm_object)
scopes.remove('self')
if isinstance(orm_object, orm.User) or hasattr(orm_object, 'orm_user'):
scopes |= expand_self_scope(orm_object.name)
return scopes return scopes
def _get_subscopes(*args): def _get_subscopes(*args, owner=None):
"""Returns a set of all available subscopes for a specified role or list of roles""" """Returns a set of all available subscopes for a specified role or list of roles"""
scope_list = [] scope_list = []
@@ -188,6 +190,11 @@ def _get_subscopes(*args):
scopes = set(chain.from_iterable(list(map(_expand_scope, scope_list)))) scopes = set(chain.from_iterable(list(map(_expand_scope, scope_list))))
if 'self' in scopes:
scopes.remove('self')
if owner and isinstance(owner, orm.User):
scopes |= expand_self_scope(owner.name)
return scopes return scopes
@@ -380,32 +387,36 @@ def _token_allowed_role(db, token, role):
"""Returns True if token allowed to have requested role through """Returns True if token allowed to have requested role through
comparing the requested scopes with the set of token's owner scopes""" comparing the requested scopes with the set of token's owner scopes"""
standard_permissions = {'all', 'read:all'} owner = token.user
if owner is None:
owner = token.service
token_scopes = _get_subscopes(role) if owner is None:
extra_scopes = token_scopes - standard_permissions raise ValueError(f"Owner not found for {token}")
token_scopes = _get_subscopes(role, owner=owner)
implicit_permissions = {'all', 'read:all'}
explicit_scopes = token_scopes - implicit_permissions
# ignore horizontal filters # ignore horizontal filters
raw_extra_scopes = { raw_scopes = {
scope.split('!', 1)[0] if '!' in scope else scope for scope in extra_scopes scope.split('!', 1)[0] if '!' in scope else scope for scope in explicit_scopes
} }
# find the owner and their roles # find the owner's scopes
owner = None owner_scopes = expand_roles_to_scopes(owner)
if token.user_id: # ignore horizontal filters
owner = db.query(orm.User).get(token.user_id) raw_owner_scopes = {
elif token.service_id: scope.split('!', 1)[0] if '!' in scope else scope for scope in owner_scopes
owner = db.query(orm.Service).get(token.service_id) }
if owner: disallowed_scopes = raw_scopes.difference(raw_owner_scopes)
owner_scopes = expand_roles_to_scopes(owner) if not disallowed_scopes:
# ignore horizontal filters # no scopes requested outside owner's own scopes
raw_owner_scopes = { return True
scope.split('!', 1)[0] if '!' in scope else scope for scope in owner_scopes
}
if raw_extra_scopes.issubset(raw_owner_scopes):
return True
else:
return False
else: else:
raise ValueError('Owner the token %r not found', token) app_log.warning(
f"Token requesting scopes exceeding owner {owner.name}: {disallowed_scopes}"
)
return False
def assign_default_roles(db, entity): def assign_default_roles(db, entity):
@@ -440,12 +451,10 @@ def update_roles(db, entity, roles):
) )
if _token_allowed_role(db, entity, role): if _token_allowed_role(db, entity, role):
role.tokens.append(entity) role.tokens.append(entity)
app_log.info('Adding role %s for token: %s', role.name, entity) app_log.info('Adding role %s to token: %s', role.name, entity)
else: else:
raise ValueError( raise ValueError(
'Requested token role %r of %r has more permissions than the token owner', f'Requested token role {rolename} of {entity} has more permissions than the token owner'
rolename,
entity,
) )
else: else:
raise NameError('Role %r does not exist' % rolename) raise NameError('Role %r does not exist' % rolename)