update to roles utils

This commit is contained in:
IvanaH8
2020-10-21 16:36:50 +02:00
parent ced80f9e6b
commit 4142dc1bc0
6 changed files with 112 additions and 140 deletions

View File

@@ -88,7 +88,7 @@ class UserListAPIHandler(APIHandler):
user = self.user_from_username(name)
if admin:
user.admin = True
roles.DefaultRoles.add_default_role(self.db, user)
roles.update_roles(self.db, user)
self.db.commit()
try:
await maybe_future(self.authenticator.add_user(user))
@@ -151,7 +151,7 @@ class UserAPIHandler(APIHandler):
self._check_user_model(data)
if 'admin' in data:
user.admin = data['admin']
roles.DefaultRoles.add_default_role(self.db, user)
roles.update_roles(self.db, user)
self.db.commit()
try:
@@ -208,9 +208,9 @@ class UserAPIHandler(APIHandler):
if key == 'auth_state':
await user.save_auth_state(value)
else:
if key == 'admin' and value != user.admin:
roles.DefaultRoles.change_admin(self.db, user=user, admin=value)
setattr(user, key, value)
if key == 'admin':
roles.update_roles(self.db, user=user)
self.db.commit()
user_ = self.user_model(user)
user_['auth_state'] = await user.get_auth_state()

View File

@@ -1715,7 +1715,6 @@ class JupyterHub(Application):
for name in admin_users:
# ensure anyone specified as admin in config is admin in db
# and gets admin role
user = orm.User.find(db, name)
if user is None:
user = orm.User(name=name, admin=True)
@@ -1825,11 +1824,15 @@ class JupyterHub(Application):
"""Load default and predefined roles into the database"""
db = self.db
# load default roles
roles.DefaultRoles.load_to_database(db)
default_roles = roles.get_default_roles()
for role in default_roles:
roles.add_role(db, role)
# load predefined roles from config file
for predef_role in self.load_roles:
role = roles.add_predef_role(db, predef_role)
roles.add_role(db, predef_role)
role = orm.Role.find(db, predef_role['name'])
# handle users
for username in predef_role['users']:
username = self.authenticator.normalize_username(username)
@@ -1847,9 +1850,11 @@ class JupyterHub(Application):
db.add(user)
roles.add_user(db, user=user, role=role)
# make sure every existing user has a default user or admin role
# make sure all users have at least one role (update with default)
for user in db.query(orm.User):
roles.DefaultRoles.add_default_role(db, user)
if len(user.roles) < 1:
roles.update_roles(db, user)
db.commit()
async def _add_tokens(self, token_dict, kind):

View File

@@ -4,14 +4,20 @@
from .orm import Role
# define default roles
class DefaultRoles:
def get_default_roles():
user = Role(name='user', description='Everything the user can do', scopes=['all'])
admin = Role(
name='admin',
description='Admin privileges (currently can do everything)',
scopes=[
"""Returns a list of default roles dictionaries"""
default_roles = [
{
'name': 'user',
'description': 'Everything the user can do',
'scopes': ['all'],
},
{
'name': 'admin',
'description': 'Admin privileges (currently can do everything)',
'scopes': [
'all',
'users',
'users:tokens',
@@ -23,63 +29,32 @@ class DefaultRoles:
'proxy',
'shutdown',
],
},
{
'name': 'server',
'description': 'Post activity only',
'scopes': ['users:activity!user=username'],
},
]
return default_roles
def add_role(db, role_dict):
"""Adds a new role to database or modifies an existing one"""
role = Role.find(db, role_dict['name'])
if role is None:
role = Role(
name=role_dict['name'],
description=role_dict['description'],
scopes=role_dict['scopes'],
)
server = Role(
name='server',
description='Post activity only',
scopes=['users:activity!user=username'],
)
roles = (user, admin, server)
def __init__(cls, roles=roles):
cls.roles = roles
@classmethod
def get_user_role(cls, db):
return Role.find(db, name=cls.user.name)
@classmethod
def get_admin_role(cls, db):
return Role.find(db, name=cls.admin.name)
@classmethod
def get_server_role(cls, db):
return Role.find(db, name=cls.server.name)
@classmethod
def load_to_database(cls, db):
for role in cls.roles:
db_role = Role.find(db, name=role.name)
if db_role is None:
new_role = Role(
name=role.name, description=role.description, scopes=role.scopes,
)
db.add(new_role)
db.commit()
@classmethod
def add_default_role(cls, db, user):
role = None
if user.admin and cls.admin not in user.roles:
role = cls.get_admin_role(db)
if not user.admin and cls.user not in user.roles:
role = cls.get_user_role(db)
if role is not None:
add_user(db, user, role)
db.commit()
@classmethod
def change_admin(cls, db, user, admin):
user_role = cls.get_user_role(db)
admin_role = cls.get_admin_role(db)
if admin:
if user_role in user.roles:
remove_user(db, user, user_role)
add_user(db, user, admin_role)
db.add(role)
else:
if admin_role in user.roles:
remove_user(db, user, admin_role)
add_user(db, user, user_role)
role.description = role_dict['description']
role.scopes = role_dict['scopes']
db.commit()
@@ -95,33 +70,21 @@ def remove_user(db, user, role):
db.commit()
def add_predef_role(db, predef_role):
"""
Returns either the role to write into db or updated role if already in db
"""
role = Role.find(db, predef_role['name'])
# if a new role, add to db, if existing, rewrite its attributes apart from the name
if role is None:
role = Role(
name=predef_role['name'],
description=predef_role['description'],
scopes=predef_role['scopes'],
)
db.add(role)
def update_roles(db, user):
"""Updates roles if user has no role with default or when user admin status is changed"""
user_role = Role.find(db, 'user')
admin_role = Role.find(db, 'admin')
if user.admin:
if user_role in user.roles:
remove_user(db, user, user_role)
add_user(db, user, admin_role)
else:
if admin_role in user.roles:
remove_user(db, user, admin_role)
# only add user role if the user has no other roles
if len(user.roles) < 1:
add_user(db, user, user_role)
db.commit()
else:
# check if it's not one of the default roles
if not any(d.name == predef_role['name'] for d in DefaultRoles.roles):
# if description and scopes specified, rewrite the old ones
if 'description' in predef_role.keys():
role.description = predef_role['description']
if 'scopes' in predef_role.keys():
role.scopes = predef_role['scopes']
# FIXME - for now deletes old users and writes new ones
role.users = []
else:
raise ValueError(
"The role %r is a default role that cannot be overwritten, use a different role name"
% predef_role['name']
)
return role

View File

@@ -336,7 +336,8 @@ class MockHub(JupyterHub):
user = self.db.query(orm.User).filter(orm.User.name == 'user').first()
if user is None:
user = orm.User(name='user')
roles.DefaultRoles.add_default_role(self.db, user=user)
user_role = orm.Role.find(self.db, 'user')
roles.add_user(self.db, user=user, role=user_role)
self.db.add(user)
self.db.commit()

View File

@@ -233,8 +233,8 @@ async def test_add_user(app):
assert user.name == name
assert not user.admin
# assert newuser has default 'user' role
assert roles.DefaultRoles.get_user_role(db=db) in user.roles
assert roles.DefaultRoles.get_admin_role(db=db) not in user.roles
assert orm.Role.find(db, 'user') in user.roles
assert orm.Role.find(db, 'admin') not in user.roles
@mark.user
@@ -291,8 +291,8 @@ async def test_add_multi_user(app):
assert user.name == name
assert not user.admin
# assert default 'user' role added
assert roles.DefaultRoles.get_user_role(db=db) in user.roles
assert roles.DefaultRoles.get_admin_role(db=db) not in user.roles
assert orm.Role.find(db, 'user') in user.roles
assert orm.Role.find(db, 'admin') not in user.roles
# try to create the same users again
r = await api_request(
@@ -333,8 +333,8 @@ async def test_add_multi_user_admin(app):
assert user is not None
assert user.name == name
assert user.admin
assert roles.DefaultRoles.get_user_role(db=db) not in user.roles
assert roles.DefaultRoles.get_admin_role(db=db) in user.roles
assert orm.Role.find(db, 'user') not in user.roles
assert orm.Role.find(db, 'admin') in user.roles
@mark.user
@@ -369,13 +369,12 @@ async def test_add_admin(app):
)
assert r.status_code == 201
user = find_user(db, name)
user_role = orm.Role.find(db, 'user')
assert user is not None
assert user.name == name
assert user.admin
# assert newadmin has default 'admin' role
assert roles.DefaultRoles.get_user_role(db=db) not in user.roles
assert roles.DefaultRoles.get_admin_role(db=db) in user.roles
assert orm.Role.find(db, 'user') not in user.roles
assert orm.Role.find(db, 'admin') in user.roles
@mark.user
@@ -397,8 +396,8 @@ async def test_make_admin(app):
assert user is not None
assert user.name == name
assert not user.admin
assert roles.DefaultRoles.get_user_role(db=db) in user.roles
assert roles.DefaultRoles.get_admin_role(db=db) not in user.roles
assert orm.Role.find(db, 'user') in user.roles
assert orm.Role.find(db, 'admin') not in user.roles
r = await api_request(
app, 'users', name, method='patch', data=json.dumps({'admin': True})
@@ -409,8 +408,8 @@ async def test_make_admin(app):
assert user is not None
assert user.name == name
assert user.admin
assert roles.DefaultRoles.get_user_role(db=db) not in user.roles
assert roles.DefaultRoles.get_admin_role(db=db) in user.roles
assert orm.Role.find(db, 'user') not in user.roles
assert orm.Role.find(db, 'admin') in user.roles
@mark.user

View File

@@ -3,7 +3,7 @@
from pytest import mark
from .. import orm
from ..roles import DefaultRoles
from .. import roles
from .mocking import MockHub
@@ -95,37 +95,41 @@ def test_role_delete_cascade(db):
@mark.role
async def test_load_roles(tmpdir, request):
"""Test loading default and predefined roles in app.py"""
to_load = [
roles_to_load = [
{
'name': 'teacher',
'description': 'Access users information, servers and groups without create/delete privileges',
'scopes': ['users', 'groups'],
'users': ['cyclops', 'gandalf'],
}
},
{
'name': 'user',
'description': 'Only read access',
'scopes': ['read:all'],
'users': ['test_user'],
},
]
kwargs = {'load_roles': to_load}
kwargs = {'load_roles': roles_to_load}
ssl_enabled = getattr(request.module, "ssl_enabled", False)
if ssl_enabled:
kwargs['internal_certs_location'] = str(tmpdir)
# keep the users and groups from test_load_groups
hub = MockHub(test_clean_db=False, **kwargs)
hub = MockHub(**kwargs)
hub.init_db()
db = hub.db
await hub.init_users()
await hub.init_roles()
db = hub.db
# test default roles loaded to database
assert DefaultRoles.get_user_role(db) is not None
assert DefaultRoles.get_admin_role(db) is not None
assert DefaultRoles.get_server_role(db) is not None
# test if every existing user has a correct default role
# test if the 'user' role has been overwritten
user_role = orm.Role.find(db, 'user')
assert user_role is not None
assert user_role.scopes == ['read:all']
# test other default roles loaded to database
assert orm.Role.find(db, 'user') is not None
assert orm.Role.find(db, 'admin') is not None
assert orm.Role.find(db, 'server') is not None
# test if every existing user has a role (and no duplicates)
for user in db.query(orm.User):
assert len(user.roles) > 0
assert len(user.roles) == len(set(user.roles))
if user.admin:
assert DefaultRoles.get_admin_role(db) in user.roles
assert DefaultRoles.get_user_role(db) not in user.roles
else:
assert DefaultRoles.get_user_role(db) in user.roles
assert DefaultRoles.get_admin_role(db) not in user.roles
# test if predefined roles loaded and assigned
teacher_role = orm.Role.find(db, name='teacher')
assert teacher_role is not None