diff --git a/jupyterhub/apihandlers/users.py b/jupyterhub/apihandlers/users.py index 8398de6d..c253e4ea 100644 --- a/jupyterhub/apihandlers/users.py +++ b/jupyterhub/apihandlers/users.py @@ -88,7 +88,7 @@ class UserListAPIHandler(APIHandler): user = self.user_from_username(name) if admin: user.admin = True - roles.DefaultRoles.add_default_role(self.db, user) + roles.update_roles(self.db, user) self.db.commit() try: await maybe_future(self.authenticator.add_user(user)) @@ -151,7 +151,7 @@ class UserAPIHandler(APIHandler): self._check_user_model(data) if 'admin' in data: user.admin = data['admin'] - roles.DefaultRoles.add_default_role(self.db, user) + roles.update_roles(self.db, user) self.db.commit() try: @@ -208,9 +208,9 @@ class UserAPIHandler(APIHandler): if key == 'auth_state': await user.save_auth_state(value) else: - if key == 'admin' and value != user.admin: - roles.DefaultRoles.change_admin(self.db, user=user, admin=value) setattr(user, key, value) + if key == 'admin': + roles.update_roles(self.db, user=user) self.db.commit() user_ = self.user_model(user) user_['auth_state'] = await user.get_auth_state() diff --git a/jupyterhub/app.py b/jupyterhub/app.py index f72f899e..70483993 100644 --- a/jupyterhub/app.py +++ b/jupyterhub/app.py @@ -1715,7 +1715,6 @@ class JupyterHub(Application): for name in admin_users: # ensure anyone specified as admin in config is admin in db - # and gets admin role user = orm.User.find(db, name) if user is None: user = orm.User(name=name, admin=True) @@ -1825,11 +1824,15 @@ class JupyterHub(Application): """Load default and predefined roles into the database""" db = self.db # load default roles - roles.DefaultRoles.load_to_database(db) + default_roles = roles.get_default_roles() + for role in default_roles: + roles.add_role(db, role) # load predefined roles from config file for predef_role in self.load_roles: - role = roles.add_predef_role(db, predef_role) + roles.add_role(db, predef_role) + role = orm.Role.find(db, predef_role['name']) + # handle users for username in predef_role['users']: username = self.authenticator.normalize_username(username) @@ -1847,9 +1850,11 @@ class JupyterHub(Application): db.add(user) roles.add_user(db, user=user, role=role) - # make sure every existing user has a default user or admin role + # make sure all users have at least one role (update with default) for user in db.query(orm.User): - roles.DefaultRoles.add_default_role(db, user) + if len(user.roles) < 1: + roles.update_roles(db, user) + db.commit() async def _add_tokens(self, token_dict, kind): diff --git a/jupyterhub/roles.py b/jupyterhub/roles.py index ac5180e2..53e0c637 100644 --- a/jupyterhub/roles.py +++ b/jupyterhub/roles.py @@ -4,83 +4,58 @@ from .orm import Role -# define default roles -class DefaultRoles: +def get_default_roles(): - user = Role(name='user', description='Everything the user can do', scopes=['all']) - admin = Role( - name='admin', - description='Admin privileges (currently can do everything)', - scopes=[ - 'all', - 'users', - 'users:tokens', - 'admin:users', - 'admin:users:servers', - 'groups', - 'admin:groups', - 'read:services', - 'proxy', - 'shutdown', - ], - ) - server = Role( - name='server', - description='Post activity only', - scopes=['users:activity!user=username'], - ) - roles = (user, admin, server) + """Returns a list of default roles dictionaries""" - def __init__(cls, roles=roles): - cls.roles = roles + default_roles = [ + { + 'name': 'user', + 'description': 'Everything the user can do', + 'scopes': ['all'], + }, + { + 'name': 'admin', + 'description': 'Admin privileges (currently can do everything)', + 'scopes': [ + 'all', + 'users', + 'users:tokens', + 'admin:users', + 'admin:users:servers', + 'groups', + 'admin:groups', + 'read:services', + 'proxy', + 'shutdown', + ], + }, + { + 'name': 'server', + 'description': 'Post activity only', + 'scopes': ['users:activity!user=username'], + }, + ] + return default_roles - @classmethod - def get_user_role(cls, db): - return Role.find(db, name=cls.user.name) - @classmethod - def get_admin_role(cls, db): - return Role.find(db, name=cls.admin.name) +def add_role(db, role_dict): - @classmethod - def get_server_role(cls, db): - return Role.find(db, name=cls.server.name) + """Adds a new role to database or modifies an existing one""" - @classmethod - def load_to_database(cls, db): - for role in cls.roles: - db_role = Role.find(db, name=role.name) - if db_role is None: - new_role = Role( - name=role.name, description=role.description, scopes=role.scopes, - ) - db.add(new_role) - db.commit() + role = Role.find(db, role_dict['name']) - @classmethod - def add_default_role(cls, db, user): - role = None - if user.admin and cls.admin not in user.roles: - role = cls.get_admin_role(db) - if not user.admin and cls.user not in user.roles: - role = cls.get_user_role(db) - if role is not None: - add_user(db, user, role) - db.commit() - - @classmethod - def change_admin(cls, db, user, admin): - user_role = cls.get_user_role(db) - admin_role = cls.get_admin_role(db) - if admin: - if user_role in user.roles: - remove_user(db, user, user_role) - add_user(db, user, admin_role) - else: - if admin_role in user.roles: - remove_user(db, user, admin_role) - add_user(db, user, user_role) - db.commit() + if role is None: + role = Role( + name=role_dict['name'], + description=role_dict['description'], + scopes=role_dict['scopes'], + ) + db.add(role) + else: + role.description = role_dict['description'] + role.scopes = role_dict['scopes'] + db.commit() def add_user(db, user, role): @@ -95,33 +70,21 @@ def remove_user(db, user, role): db.commit() -def add_predef_role(db, predef_role): - """ - Returns either the role to write into db or updated role if already in db - """ - role = Role.find(db, predef_role['name']) - # if a new role, add to db, if existing, rewrite its attributes apart from the name - if role is None: - role = Role( - name=predef_role['name'], - description=predef_role['description'], - scopes=predef_role['scopes'], - ) - db.add(role) - db.commit() +def update_roles(db, user): + + """Updates roles if user has no role with default or when user admin status is changed""" + + user_role = Role.find(db, 'user') + admin_role = Role.find(db, 'admin') + + if user.admin: + if user_role in user.roles: + remove_user(db, user, user_role) + add_user(db, user, admin_role) else: - # check if it's not one of the default roles - if not any(d.name == predef_role['name'] for d in DefaultRoles.roles): - # if description and scopes specified, rewrite the old ones - if 'description' in predef_role.keys(): - role.description = predef_role['description'] - if 'scopes' in predef_role.keys(): - role.scopes = predef_role['scopes'] - # FIXME - for now deletes old users and writes new ones - role.users = [] - else: - raise ValueError( - "The role %r is a default role that cannot be overwritten, use a different role name" - % predef_role['name'] - ) - return role + if admin_role in user.roles: + remove_user(db, user, admin_role) + # only add user role if the user has no other roles + if len(user.roles) < 1: + add_user(db, user, user_role) + db.commit() diff --git a/jupyterhub/tests/mocking.py b/jupyterhub/tests/mocking.py index d7fddf8b..003e193c 100644 --- a/jupyterhub/tests/mocking.py +++ b/jupyterhub/tests/mocking.py @@ -336,7 +336,8 @@ class MockHub(JupyterHub): user = self.db.query(orm.User).filter(orm.User.name == 'user').first() if user is None: user = orm.User(name='user') - roles.DefaultRoles.add_default_role(self.db, user=user) + user_role = orm.Role.find(self.db, 'user') + roles.add_user(self.db, user=user, role=user_role) self.db.add(user) self.db.commit() diff --git a/jupyterhub/tests/test_api.py b/jupyterhub/tests/test_api.py index dab18512..f46c22c6 100644 --- a/jupyterhub/tests/test_api.py +++ b/jupyterhub/tests/test_api.py @@ -233,8 +233,8 @@ async def test_add_user(app): assert user.name == name assert not user.admin # assert newuser has default 'user' role - assert roles.DefaultRoles.get_user_role(db=db) in user.roles - assert roles.DefaultRoles.get_admin_role(db=db) not in user.roles + assert orm.Role.find(db, 'user') in user.roles + assert orm.Role.find(db, 'admin') not in user.roles @mark.user @@ -291,8 +291,8 @@ async def test_add_multi_user(app): assert user.name == name assert not user.admin # assert default 'user' role added - assert roles.DefaultRoles.get_user_role(db=db) in user.roles - assert roles.DefaultRoles.get_admin_role(db=db) not in user.roles + assert orm.Role.find(db, 'user') in user.roles + assert orm.Role.find(db, 'admin') not in user.roles # try to create the same users again r = await api_request( @@ -333,8 +333,8 @@ async def test_add_multi_user_admin(app): assert user is not None assert user.name == name assert user.admin - assert roles.DefaultRoles.get_user_role(db=db) not in user.roles - assert roles.DefaultRoles.get_admin_role(db=db) in user.roles + assert orm.Role.find(db, 'user') not in user.roles + assert orm.Role.find(db, 'admin') in user.roles @mark.user @@ -369,13 +369,12 @@ async def test_add_admin(app): ) assert r.status_code == 201 user = find_user(db, name) - user_role = orm.Role.find(db, 'user') assert user is not None assert user.name == name assert user.admin # assert newadmin has default 'admin' role - assert roles.DefaultRoles.get_user_role(db=db) not in user.roles - assert roles.DefaultRoles.get_admin_role(db=db) in user.roles + assert orm.Role.find(db, 'user') not in user.roles + assert orm.Role.find(db, 'admin') in user.roles @mark.user @@ -397,8 +396,8 @@ async def test_make_admin(app): assert user is not None assert user.name == name assert not user.admin - assert roles.DefaultRoles.get_user_role(db=db) in user.roles - assert roles.DefaultRoles.get_admin_role(db=db) not in user.roles + assert orm.Role.find(db, 'user') in user.roles + assert orm.Role.find(db, 'admin') not in user.roles r = await api_request( app, 'users', name, method='patch', data=json.dumps({'admin': True}) @@ -409,8 +408,8 @@ async def test_make_admin(app): assert user is not None assert user.name == name assert user.admin - assert roles.DefaultRoles.get_user_role(db=db) not in user.roles - assert roles.DefaultRoles.get_admin_role(db=db) in user.roles + assert orm.Role.find(db, 'user') not in user.roles + assert orm.Role.find(db, 'admin') in user.roles @mark.user diff --git a/jupyterhub/tests/test_roles.py b/jupyterhub/tests/test_roles.py index 5c3c0689..8b20fcf3 100644 --- a/jupyterhub/tests/test_roles.py +++ b/jupyterhub/tests/test_roles.py @@ -3,7 +3,7 @@ from pytest import mark from .. import orm -from ..roles import DefaultRoles +from .. import roles from .mocking import MockHub @@ -95,37 +95,41 @@ def test_role_delete_cascade(db): @mark.role async def test_load_roles(tmpdir, request): """Test loading default and predefined roles in app.py""" - to_load = [ + roles_to_load = [ { 'name': 'teacher', 'description': 'Access users information, servers and groups without create/delete privileges', 'scopes': ['users', 'groups'], 'users': ['cyclops', 'gandalf'], - } + }, + { + 'name': 'user', + 'description': 'Only read access', + 'scopes': ['read:all'], + 'users': ['test_user'], + }, ] - kwargs = {'load_roles': to_load} + kwargs = {'load_roles': roles_to_load} ssl_enabled = getattr(request.module, "ssl_enabled", False) if ssl_enabled: kwargs['internal_certs_location'] = str(tmpdir) - # keep the users and groups from test_load_groups - hub = MockHub(test_clean_db=False, **kwargs) + hub = MockHub(**kwargs) hub.init_db() + db = hub.db await hub.init_users() await hub.init_roles() - db = hub.db - # test default roles loaded to database - assert DefaultRoles.get_user_role(db) is not None - assert DefaultRoles.get_admin_role(db) is not None - assert DefaultRoles.get_server_role(db) is not None - # test if every existing user has a correct default role + # test if the 'user' role has been overwritten + user_role = orm.Role.find(db, 'user') + assert user_role is not None + assert user_role.scopes == ['read:all'] + # test other default roles loaded to database + assert orm.Role.find(db, 'user') is not None + assert orm.Role.find(db, 'admin') is not None + assert orm.Role.find(db, 'server') is not None + # test if every existing user has a role (and no duplicates) for user in db.query(orm.User): + assert len(user.roles) > 0 assert len(user.roles) == len(set(user.roles)) - if user.admin: - assert DefaultRoles.get_admin_role(db) in user.roles - assert DefaultRoles.get_user_role(db) not in user.roles - else: - assert DefaultRoles.get_user_role(db) in user.roles - assert DefaultRoles.get_admin_role(db) not in user.roles # test if predefined roles loaded and assigned teacher_role = orm.Role.find(db, name='teacher') assert teacher_role is not None