Refactored scope module. Implemented filter in *ListApiHandlers

This commit is contained in:
Omar Richardson
2021-01-05 11:42:53 +01:00
parent 82bebfaff2
commit 662017f260
10 changed files with 260 additions and 252 deletions

View File

@@ -4,7 +4,6 @@
import asyncio
import concurrent.futures
import errno
import functools
import hashlib
import inspect
import os
@@ -18,7 +17,6 @@ import warnings
from binascii import b2a_hex
from datetime import datetime
from datetime import timezone
from enum import Enum
from hmac import compare_digest
from operator import itemgetter
@@ -30,8 +28,6 @@ from tornado.httpclient import HTTPError
from tornado.log import app_log
from tornado.platform.asyncio import to_asyncio_future
from . import orm # todo: only necessary for scopes, move later
def random_port():
"""Get a single random port."""
@@ -283,8 +279,13 @@ def authenticated_403(self):
@auth_decorator
def admin_only(self):
"""Decorator for restricting access to admin users"""
"""Decorator for restricting access to admin users
Deprecated in favor of scopes.need_scope()
"""
user = self.current_user
app_log.warning(
"Admin decorator is deprecated and will be removed soon. Use scope-based decorator instead"
)
if user is None or not user.admin:
raise web.HTTPError(403)
@@ -297,214 +298,6 @@ def metrics_authentication(self):
raise web.HTTPError(403)
# Todo: Move all scope-related methods to scope module
class Scope(Enum):
ALL = True
def get_user_scopes(name):
"""
Scopes have a metascope 'all' that should be expanded to everything a user can do.
At the moment that is a user-filtered version (optional read) access to
users
users:name
users:groups
users:activity
users:servers
users:tokens
"""
scope_list = [
'users',
'users:name',
'users:groups',
'users:activity',
'users:servers',
'users:tokens',
]
scope_list.extend(['read:' + scope for scope in scope_list])
return {"{}!user={}".format(scope, name) for scope in scope_list}
def needs_scope_expansion(filter_, filter_value, sub_scope):
"""
Check if there is a requirements to expand the `group` scope to individual `user` scopes.
Assumptions:
filter_ != Scope.ALL
"""
if not (filter_ == 'user' and 'group' in sub_scope):
return False
if 'user' in sub_scope:
return filter_value not in sub_scope['user']
else:
return True
def check_user_in_expanded_scope(handler, user_name, scope_group_names):
user = handler.find_user(user_name)
if user is None:
raise web.HTTPError(404, 'No such user found')
group_names = {group.name for group in user.groups}
return bool(set(scope_group_names) & group_names)
def _flatten_groups(groups):
user_set = set()
for group in groups:
user_set |= {
user.name for user in group.users
} # todo: I think this could be one query, no for loop
return user_set
def get_orm_class(kind):
class_dict = {
'users': orm.User,
'services': orm.Service,
'tokens': orm.APIToken,
'groups': orm.Group,
}
if kind not in class_dict:
raise ValueError(
"Kind must be one of %s, not %s" % (", ".join(class_dict), kind)
)
return class_dict[kind]
def _get_scope_filter(db, req_scope, sub_scope):
# Rough draft
scope_translator = {
'read:users': 'users',
'read:services': 'services',
'read:groups': 'groups',
}
if req_scope not in scope_translator:
raise AttributeError("Scope not found; scope filter not constructed")
kind = scope_translator[req_scope]
Class = get_orm_class(kind)
sub_scope_values = next(iter(sub_scope.values()))
query = db.query(Class).filter(Class.name.in_(sub_scope_values))
scope_filter = [entry.name for entry in query.all()]
if 'group' in sub_scope and kind == 'users':
groups = db.query(orm.Group).filter(orm.Group.name.in_(sub_scope['group']))
scope_filter += _flatten_groups(groups)
return set(scope_filter)
def check_scope(api_handler, req_scope, scopes, **kwargs):
# Parse user name and server name together
if 'user' in kwargs and 'server' in kwargs:
kwargs['server'] = "{}/{}".format(kwargs['user'], kwargs['server'])
if req_scope not in scopes:
return False
if scopes[req_scope] == Scope.ALL:
return True
# Apply filters
sub_scope = scopes[req_scope]
if 'scope_filter' in kwargs:
scope_filter = _get_scope_filter(api_handler.db, req_scope, sub_scope)
return scope_filter
else:
# Interface change: Now can have multiple filters
for (filter_, filter_value) in kwargs.items():
if filter_ in sub_scope and filter_value in sub_scope[filter_]:
return True
if needs_scope_expansion(filter_, filter_value, sub_scope):
group_names = sub_scope['group']
if check_user_in_expanded_scope(api_handler, filter_value, group_names):
return True
return False
# Todo: make some methods private
def parse_scopes(scope_list):
"""
Parses scopes and filters in something akin to JSON style
For instance, scope list ["users", "groups!group=foo", "users:servers!server=bar", "users:servers!server=baz"]
would lead to scope model
{
"users":scope.ALL,
"users:admin":{
"user":[
"alice"
]
},
"users:servers":{
"server":[
"bar",
"baz"
]
}
}
"""
parsed_scopes = {}
for scope in scope_list:
base_scope, _, filter_ = scope.partition('!')
if not filter_:
parsed_scopes[base_scope] = Scope.ALL
elif base_scope not in parsed_scopes:
parsed_scopes[base_scope] = {}
if parsed_scopes[base_scope] != Scope.ALL:
key, _, val = filter_.partition('=')
if key not in parsed_scopes[base_scope]:
parsed_scopes[base_scope][key] = []
parsed_scopes[base_scope][key].append(val)
return parsed_scopes
def needs_scope(scope):
"""Decorator to restrict access to users or services with the required scope"""
def scope_decorator(func):
@functools.wraps(func)
def _auth_func(self, *args, **kwargs):
sig = inspect.signature(func)
bound_sig = sig.bind(self, *args, **kwargs)
bound_sig.apply_defaults()
s_kwargs = {}
for resource in {'user', 'server', 'group', 'service'}:
resource_name = resource + '_name'
if resource_name in bound_sig.arguments:
resource_value = bound_sig.arguments[resource_name]
s_kwargs[resource] = resource_value
if 'scope_filter' in bound_sig.arguments:
s_kwargs['scope_filter'] = None
if 'all' in self.scopes and self.current_user:
# todo: What if no user is found? See test_api/test_referer_check
self.scopes |= get_user_scopes(self.current_user.name)
parsed_scopes = parse_scopes(self.scopes)
scope_filter = check_scope(self, scope, parsed_scopes, **s_kwargs)
if scope_filter:
if isinstance(scope_filter, set):
kwargs['scope_filter'] = scope_filter
return func(self, *args, **kwargs)
else:
# catching attr error occurring for older_requirements test
# could be done more ellegantly?
try:
request_path = self.request.path
except AttributeError:
request_path = 'the requested API'
app_log.warning(
"Not authorizing access to {}. Requires scope {}, not derived from scopes {}".format(
request_path, scope, ", ".join(self.scopes)
)
)
raise web.HTTPError(
403,
"Action is not authorized with current scopes; requires {}".format(
scope
),
)
return _auth_func
return scope_decorator
# Token utilities