WIP Implemented scopes

This commit is contained in:
Omar Richardson
2020-11-18 17:12:26 +01:00
parent 2e9ecfff02
commit 99c3f77c58
2 changed files with 68 additions and 26 deletions

View File

@@ -6,6 +6,7 @@ from tornado import web
from ..utils import check_scope
from ..utils import needs_scope
from ..utils import parse_scopes
from ..utils import Scope
def test_scope_constructor():
@@ -26,43 +27,54 @@ def test_scope_constructor():
def test_scope_precendence():
scope_list = ['read:users!user=maeby', 'read:users']
parsed_scopes = parse_scopes(scope_list)
assert parsed_scopes['read:users'] == True
assert parsed_scopes['read:users'] == Scope.ALL
def test_scope_check_present():
handler = None
scope_list = ['read:users']
parsed_scopes = parse_scopes(scope_list)
assert check_scope('read:users', parsed_scopes)
assert check_scope('read:users!user=maeby', parsed_scopes)
assert check_scope(handler, 'read:users', parsed_scopes)
assert check_scope(handler, 'read:users', parsed_scopes, user='maeby')
def test_scope_check_not_present(): # What should this return when the broad scope is asked and a small one satisfied?
def test_scope_check_not_present():
handler = None
scope_list = ['read:users!user=maeby']
parsed_scopes = parse_scopes(scope_list)
assert not check_scope('read:users', parsed_scopes)
assert not check_scope('read:users', parsed_scopes, user='gob')
assert not check_scope('read:users', parsed_scopes, server='gob/server')
assert not check_scope(handler, 'read:users', parsed_scopes)
assert not check_scope(handler, 'read:users', parsed_scopes, user='gob')
assert not check_scope(
handler, 'read:users', parsed_scopes, user='gob', server='server'
)
def test_scope_filters():
handler = None
scope_list = ['read:users', 'read:users!group=bluths', 'read:users!user=maeby']
parsed_scopes = parse_scopes(scope_list)
assert check_scope('read:users!group=bluths', parsed_scopes)
assert check_scope('read:users!user=maeby', parsed_scopes)
assert check_scope(handler, 'read:users', parsed_scopes, group='bluth')
assert check_scope(handler, 'read:users', parsed_scopes, user='maeby')
def test_scope_one_filter_only():
handler = None
with pytest.raises(AttributeError):
check_scope('all', parse_scopes(['all']), user='george_michael', group='bluths')
check_scope(
handler, 'all', parse_scopes(['all']), user='george_michael', group='bluths'
)
def test_scope_parse_server_name():
handler = None
scope_list = ['users:servers!server=maeby/server1', 'read:users!user=maeby']
parsed_scopes = parse_scopes(scope_list)
assert check_scope('users:servers', parsed_scopes, user='maeby', server='server1')
assert check_scope(
handler, 'users:servers', parsed_scopes, user='maeby', server='server1'
)
class Test:
class MockAPI:
def __init__(self):
self.scopes = ['users']
@@ -140,7 +152,7 @@ class Test:
],
)
def test_scope_method_access(scopes, method, arguments, is_allowed):
obj = Test()
obj = MockAPI()
obj.scopes = scopes
api_call = getattr(obj, method)
if is_allowed:

View File

@@ -9,7 +9,6 @@ import hashlib
import inspect
import os
import random
import re
import socket
import ssl
import sys
@@ -35,7 +34,7 @@ from tornado.httpclient import HTTPError
from tornado.log import app_log
from tornado.platform.asyncio import to_asyncio_future
from .. import orm
from . import orm
def random_port():
@@ -306,15 +305,40 @@ class Scope(Enum):
ALL = True
def needs_scope_expansion(filter_, filter_value, sub_scope):
"""
Check if there is a requirements to expand the `group` scope to individual `user` scopes.
Assumptions:
req_scopes in scopes
filter_ != Scope.ALL
This can be made arbitrarily intelligent but that would make it more complex
"""
if not (filter_ == 'user' and 'group' in sub_scope):
return False
if 'user' in sub_scope:
return filter_value not in sub_scope['user']
else:
return True
def expand_groups_to_users(db, filter_scope):
"""Update the group filters to account for the individual users"""
if 'group' in filter_scope:
groups = db.query(orm.Group)
user_set = orm.User.query.filter(orm.User.group.in_(groups))
return user_set.get_names()
return [user.name for user in user_set]
def check_scope(req_scope, scopes, **kwargs):
def check_user_in_expanded_scope(handler, user_name, scope_group_names):
user = handler.find_user(user_name)
if user is None:
raise web.HTTPError(404, 'No such user found')
group_names = {group.name for group in user.groups}
return bool(scope_group_names & group_names)
def check_scope(api_handler, req_scope, scopes, **kwargs):
# Parse user name and server name together
if 'user' in kwargs and 'server' in kwargs:
user_name = kwargs.pop('user')
@@ -329,19 +353,24 @@ def check_scope(req_scope, scopes, **kwargs):
if not kwargs:
return False
filter_, filter_value = list(kwargs.items())[0]
if filter_ not in scopes[req_scope]:
return False
return filter_value in scopes[req_scope][filter_]
sub_scope = scopes[req_scope]
if filter_ not in sub_scope:
if needs_scope_expansion(filter_, filter_value, sub_scope):
group_names = sub_scope['groups']
return check_user_in_expanded_scope(api_handler, filter_value, group_names)
else:
return False
return filter_value in sub_scope[filter_]
def parse_scopes(scope_list):
"""
Parses scopes and filters in something akin to JSON style
For instance, scope list ["users", "groups:group=foo", "users:servers:server=bar", "users:servers:server=baz"]
For instance, scope list ["users", "groups!group=foo", "users:servers!server=bar", "users:servers!server=baz"]
would lead to scope model
{
"users":True,
"users":scope.ALL,
"users:admin":{
"user":[
"alice"
@@ -358,15 +387,15 @@ def parse_scopes(scope_list):
parsed_scopes = {}
for scope in scope_list:
base_scope, _, filter_ = scope.partition('!')
if base_scope not in parsed_scopes:
if not filter_:
parsed_scopes[base_scope] = Scope.ALL
elif base_scope not in parsed_scopes:
parsed_scopes[base_scope] = {}
if parsed_scopes[base_scope] != Scope.ALL:
key, _, val = filter_.partition('=')
if key not in parsed_scopes[base_scope]:
parsed_scopes[base_scope][key] = []
parsed_scopes[base_scope][key].append(val)
else:
parsed_scopes[base_scope] = Scope.ALL
return parsed_scopes
@@ -385,7 +414,8 @@ def needs_scope(scope):
if resource_name in bound_sig.arguments:
resource_value = bound_sig.arguments[resource_name]
s_kwargs[resource] = resource_value
if check_scope(scope, parse_scopes(self.scopes), **s_kwargs):
parsed_scopes = parse_scopes(self.scopes)
if check_scope(self, scope, parsed_scopes, **s_kwargs):
return func(self, *args, **kwargs)
else:
raise web.HTTPError(403, "Action is not authorized with current scopes")