Added scope utilities and tests for them

This commit is contained in:
0mar
2020-11-05 15:40:00 +01:00
parent 422fbf8dcc
commit 154edebbf4
3 changed files with 120 additions and 4 deletions

View File

@@ -78,12 +78,12 @@ class BaseHandler(RequestHandler):
The current user (None if not logged in) may be accessed
via the `self.current_user` property during the handling of any request.
"""
self.scopes = []
try:
await self.get_current_user()
except Exception:
self.log.exception("Failed to get current user")
self._jupyterhub_user = None
self.scopes = []
return await maybe_future(super().prepare())
@@ -429,8 +429,6 @@ class BaseHandler(RequestHandler):
self.log.exception("Error getting current user")
if self._jupyterhub_user is not None or self.get_current_user_oauth_token():
self.scopes = self.settings.get("mock_scopes", [])
else:
self.scopes = []
return self._jupyterhub_user
@property

View File

@@ -0,0 +1,83 @@
"""
test scopes as used in rbac
"""
from unittest import mock
import pytest
from tornado import web
from ..utils import check_scope
from ..utils import needs_scope
from ..utils import parse_scopes
from .utils import get_scopes
def test_scope_constructor():
user1 = 'george'
user2 = 'michael'
scope_list = [
'users',
'read:users!user={}'.format(user1),
'read:users!user={}'.format(user2),
]
parsed_scopes = parse_scopes(scope_list)
assert 'read:users' in parsed_scopes
assert parsed_scopes['users']
assert set(parsed_scopes['read:users']['user']) == {user1, user2}
def test_scope_precendence():
scope_list = ['read:users!user=maeby', 'read:users']
parsed_scopes = parse_scopes(scope_list)
assert parsed_scopes['read:users'] == True
def test_scope_check_present():
scope_list = ['read:users']
parsed_scopes = parse_scopes(scope_list)
assert check_scope('read:users', parsed_scopes)
assert check_scope('read:users!user=maeby', parsed_scopes)
def test_scope_check_not_present(): # What should this return when the broad scope is asked and a small one satisfied?
scope_list = ['read:users!user=maeby']
parsed_scopes = parse_scopes(scope_list)
assert not check_scope('read:users', parsed_scopes)
assert not check_scope('read:users', parsed_scopes, user='gob')
assert not check_scope('read:users', parsed_scopes, server='gob/server')
def test_scope_filters():
scope_list = ['read:users', 'read:users!group=bluths', 'read:users!user=maeby']
parsed_scopes = parse_scopes(scope_list)
assert check_scope('read:users!group=bluths', parsed_scopes)
assert check_scope('read:users!user=maeby', parsed_scopes)
def test_scope_one_filter_only():
with pytest.raises(AttributeError):
check_scope('all', parse_scopes(['all']), user='george_michael', group='bluths')
class Test:
def __init__(self):
self.scopes = ['read:users']
@needs_scope('read:users')
def foo(self, user):
return True
def test_scope_def():
obj = Test()
obj.scopes = ['read:users']
assert obj.foo('user')
assert obj.foo('user2')
def test_wrong_scope():
obj = Test()
obj.scopes = []
with pytest.raises(web.HTTPError):
obj.foo('user1')

View File

@@ -299,13 +299,48 @@ def metrics_authentication(self):
raise web.HTTPError(403)
def check_scope(req_scope, scopes, **kwargs):
if len(kwargs) > 1:
raise AttributeError("Please specify exactly one filter")
base_scope = req_scope.split('!')[0]
if base_scope not in scopes:
return False
if scopes[base_scope] == True: # is this pretty?
return True
# Apply filters
if not kwargs:
return False
filter_ = list(kwargs)[0]
if filter_ not in scopes[base_scope]:
return False
return kwargs[filter_] in scopes[req_scope][filter_]
def parse_scopes(scope_list):
parsed_scopes = {}
for scope in scope_list:
scope_ = scope.split('!')
base_scope = scope_[0]
if len(scope_) > 1:
filter_ = scope_[1]
if base_scope not in parsed_scopes:
parsed_scopes[base_scope] = {}
key, val = filter_.split('=')
if key not in parsed_scopes[base_scope]:
parsed_scopes[base_scope][key] = []
parsed_scopes[base_scope][key].append(val)
else:
parsed_scopes[base_scope] = True
return parsed_scopes
def needs_scope(scope):
"""Decorator to restrict access to users or services with the required scope"""
def scope_decorator(func):
@functools.wraps(func)
def _auth_func(self, *args, **kwargs):
allows_subset = 'subset' in func.__code__.co_varnames
allows_subset = 'subset' in inspect.signature(func).parameters
if scope in self.scopes:
return func(self, *args, **kwargs)
elif allows_subset: