Changed scopes from list to set and made filters additive

This commit is contained in:
0mar
2020-12-14 17:39:06 +01:00
parent 62c56ec2c8
commit 3eccf7abdd
5 changed files with 63 additions and 46 deletions

View File

@@ -80,7 +80,7 @@ class BaseHandler(RequestHandler):
The current user (None if not logged in) may be accessed
via the `self.current_user` property during the handling of any request.
"""
self.scopes = []
self.scopes = set()
try:
await self.get_current_user()
except Exception:

View File

@@ -7,7 +7,6 @@ from . import orm
def get_default_roles():
"""Returns a list of default role dictionaries"""
default_roles = [
@@ -43,7 +42,6 @@ def get_default_roles():
def get_scopes():
"""
Returns a dictionary of scopes:
scopes.keys() = scopes of highest level and scopes that have their own subscopes
@@ -74,7 +72,6 @@ def get_scopes():
def expand_scope(scopename):
"""Returns a set of all subscopes"""
scopes = get_scopes()
@@ -103,7 +100,6 @@ def expand_scope(scopename):
def get_subscopes(*args):
"""Returns a set of all available subscopes for a specified role or list of roles"""
scope_list = []
@@ -117,7 +113,6 @@ def get_subscopes(*args):
def add_role(db, role_dict):
"""Adds a new role to database or modifies an existing one"""
if 'name' not in role_dict.keys():
@@ -154,7 +149,6 @@ def get_orm_class(kind):
def existing_only(func):
"""Decorator for checking if objects and roles exist"""
def check_existence(db, objname, kind, rolename):
@@ -175,7 +169,6 @@ def existing_only(func):
@existing_only
def add_obj(db, objname, kind, rolename):
"""Adds a role for users, services or tokens"""
if rolename not in objname.roles:
@@ -185,7 +178,6 @@ def add_obj(db, objname, kind, rolename):
@existing_only
def remove_obj(db, objname, kind, rolename):
"""Removes a role for users, services or tokens"""
if rolename in objname.roles:
@@ -194,7 +186,6 @@ def remove_obj(db, objname, kind, rolename):
def switch_default_role(db, obj, kind, admin):
"""Switch between default user and admin roles for users/services"""
user_role = orm.Role.find(db, 'user')
@@ -215,7 +206,6 @@ def switch_default_role(db, obj, kind, admin):
def update_roles(db, obj, kind, roles=None):
"""Updates object's roles if specified,
assigns default if no roles specified"""

View File

@@ -1628,14 +1628,12 @@ async def test_get_service(app, mockservice_url):
'info': {},
'display': True,
}
with mock_role(app, 'service'):
r = await api_request(
app,
'services/%s' % mockservice.name,
headers={'Authorization': 'token %s' % mockservice.api_token},
)
r.raise_for_status()
with mock_role(app, 'user'):
r = await api_request(
app, 'services/%s' % mockservice.name, headers=auth_header(db, 'user')
)

View File

@@ -66,11 +66,14 @@ def test_scope_filters():
assert check_scope(handler, 'read:users', parsed_scopes, user='maeby')
def test_scope_one_filter_only():
def test_scope_multiple_filters():
handler = None
with pytest.raises(AttributeError):
check_scope(
handler, 'all', parse_scopes(['all']), user='george_michael', group='bluths'
assert check_scope(
handler,
'read:users',
parse_scopes(['read:users!user=george_michael']),
user='george_michael',
group='bluths',
)
@@ -85,7 +88,7 @@ def test_scope_parse_server_name():
class MockAPIHandler:
def __init__(self):
self.scopes = ['users']
self.scopes = {'users'}
@needs_scope('users')
def user_thing(self, user_name):
@@ -162,8 +165,9 @@ class MockAPIHandler:
)
def test_scope_method_access(scopes, method, arguments, is_allowed):
obj = MockAPIHandler()
obj.current_user = mock.Mock(name=arguments[0])
obj.request = mock.Mock(spec=HTTPServerRequest)
obj.scopes = scopes
obj.scopes = set(scopes)
api_call = getattr(obj, method)
if is_allowed:
assert api_call(*arguments)

View File

@@ -295,10 +295,37 @@ def metrics_authentication(self):
raise web.HTTPError(403)
# Todo: Move all scope-related methods to scope module
class Scope(Enum):
ALL = True
def get_user_scopes(name):
"""
Scopes have a metascope 'all' that should be expanded to everything a user can do.
At the moment that is a user-filtered version (optional read) access to
users
users:name
users:groups
users:activity
users:servers
users:tokens
"""
scope_list = [
'users',
'users:name',
'users:groups',
'users:activity',
'users:servers',
'users:tokens',
]
scope_list.extend(
['read:' + scope for scope in scope_list]
) # Todo: Put this in closure
return {"{}!user={}".format(scope, name) for scope in scope_list}
def needs_scope_expansion(filter_, filter_value, sub_scope):
"""
Check if there is a requirements to expand the `group` scope to individual `user` scopes.
@@ -324,29 +351,24 @@ def check_user_in_expanded_scope(handler, user_name, scope_group_names):
def check_scope(api_handler, req_scope, scopes, **kwargs):
# Parse user name and server name together
if 'user' in kwargs and 'server' in kwargs:
user_name = kwargs.pop('user')
kwargs['server'] = "{}/{}".format(user_name, kwargs['server'])
if len(kwargs) > 1:
raise AttributeError("Please specify exactly one filter")
kwargs['server'] = "{}/{}".format(kwargs['user'], kwargs['server'])
if req_scope not in scopes:
return False
if scopes[req_scope] == Scope.ALL:
return True
# Apply filters
if not kwargs:
return False
filter_, filter_value = list(kwargs.items())[0]
sub_scope = scopes[req_scope]
if filter_ not in sub_scope:
valid_scope = False
else:
valid_scope = filter_value in sub_scope[filter_]
if not valid_scope and needs_scope_expansion(filter_, filter_value, sub_scope):
for (
filter_,
filter_value,
) in kwargs.items(): # Interface change: Now can have multiple filters
if filter_ in sub_scope and filter_value in sub_scope[filter_]:
return True
if needs_scope_expansion(filter_, filter_value, sub_scope):
group_names = sub_scope['group']
valid_scope |= check_user_in_expanded_scope(
api_handler, filter_value, group_names
)
return valid_scope
if check_user_in_expanded_scope(api_handler, filter_value, group_names):
return True
return False
def parse_scopes(scope_list):
@@ -400,6 +422,9 @@ def needs_scope(scope):
if resource_name in bound_sig.arguments:
resource_value = bound_sig.arguments[resource_name]
s_kwargs[resource] = resource_value
if 'all' in self.scopes and self.current_user:
# todo: What if no user is found? See test_api/test_referer_check
self.scopes |= get_user_scopes(self.current_user.name)
parsed_scopes = parse_scopes(self.scopes)
if check_scope(self, scope, parsed_scopes, **s_kwargs):
return func(self, *args, **kwargs)