Added filtering to decorator and added tests

This commit is contained in:
0mar
2020-11-09 14:25:02 +01:00
parent fad0679ce4
commit 365921d162
4 changed files with 87 additions and 63 deletions

View File

@@ -300,6 +300,10 @@ def metrics_authentication(self):
def check_scope(req_scope, scopes, **kwargs):
# Parse user name and server name together
if 'user' in kwargs and 'server' in kwargs:
user_name = kwargs.pop('user')
kwargs['server'] = "{}/{}".format(user_name, kwargs['server'])
if len(kwargs) > 1:
raise AttributeError("Please specify exactly one filter")
base_scope = req_scope.split('!')[0]
@@ -317,6 +321,26 @@ def check_scope(req_scope, scopes, **kwargs):
def parse_scopes(scope_list):
"""
Parses scopes and filters in something akin to JSON style
For instance, scope list ["users", "groups:group=foo", "users:servers:server=bar", "users:servers:server=baz"]
would lead to scope model
{
"users":True,
"users:admin":{
"user":[
"alice"
]
},
"users:servers":{
"server":[
"bar",
"baz"
]
}
}
"""
parsed_scopes = {}
for scope in scope_list:
scope_ = scope.split('!')
@@ -325,10 +349,11 @@ def parse_scopes(scope_list):
filter_ = scope_[1]
if base_scope not in parsed_scopes:
parsed_scopes[base_scope] = {}
key, val = filter_.split('=')
if key not in parsed_scopes[base_scope]:
parsed_scopes[base_scope][key] = []
parsed_scopes[base_scope][key].append(val)
if parsed_scopes[base_scope] != True:
key, val = filter_.split('=')
if key not in parsed_scopes[base_scope]:
parsed_scopes[base_scope][key] = []
parsed_scopes[base_scope][key].append(val)
else:
parsed_scopes[base_scope] = True
return parsed_scopes
@@ -340,18 +365,19 @@ def needs_scope(scope):
def scope_decorator(func):
@functools.wraps(func)
def _auth_func(self, *args, **kwargs):
allows_subset = 'subset' in inspect.signature(func).parameters
if scope in self.scopes:
sig = inspect.signature(func)
bound_sig = sig.bind(self, *args, **kwargs)
bound_sig.apply_defaults()
s_kwargs = {}
for resource in {'user', 'server', 'group', 'service'}:
resource_name = resource + '_name'
if resource_name in bound_sig.arguments:
resource_value = bound_sig.arguments[resource_name]
s_kwargs[resource] = resource_value
if check_scope(scope, parse_scopes(self.scopes), **s_kwargs):
return func(self, *args, **kwargs)
elif allows_subset:
# Check if access is not restricted to user/server/service
match_string = re.compile("^" + re.escape(scope) + r"!.+=.+$")
subscopes = filter(lambda s: re.search(match_string, s), self.scopes)
subset = [subscope.split('=')[1] for subscope in subscopes]
if subset:
kwargs['subset'] = subset
return func(self, *args, **kwargs)
raise web.HTTPError(403, "Action is not authorized with current scopes")
else:
raise web.HTTPError(403, "Action is not authorized with current scopes")
return _auth_func