diff --git a/jupyterhub/apihandlers/base.py b/jupyterhub/apihandlers/base.py index 8bd05ecc..a416e6c8 100644 --- a/jupyterhub/apihandlers/base.py +++ b/jupyterhub/apihandlers/base.py @@ -150,10 +150,13 @@ class APIHandler(BaseHandler): expires_at = None if isinstance(token, orm.APIToken): kind = 'api_token' + roles = [r.name for r in token.roles] extra = {'note': token.note} expires_at = token.expires_at elif isinstance(token, orm.OAuthAccessToken): kind = 'oauth' + # oauth tokens do not bear roles + roles = [] extra = {'oauth_client': token.client.description or token.client.client_id} if token.expires_at: expires_at = datetime.fromtimestamp(token.expires_at) @@ -174,6 +177,7 @@ class APIHandler(BaseHandler): owner_key: owner, 'id': token.api_id, 'kind': kind, + 'roles': [role for role in roles], 'created': isoformat(token.created), 'last_activity': isoformat(token.last_activity), 'expires_at': isoformat(expires_at), diff --git a/jupyterhub/apihandlers/users.py b/jupyterhub/apihandlers/users.py index c253e4ea..2a072190 100644 --- a/jupyterhub/apihandlers/users.py +++ b/jupyterhub/apihandlers/users.py @@ -14,6 +14,7 @@ from tornado.iostream import StreamClosedError from .. import orm from .. import roles +from ..roles import update_roles from ..user import User from ..utils import admin_only from ..utils import isoformat @@ -88,7 +89,7 @@ class UserListAPIHandler(APIHandler): user = self.user_from_username(name) if admin: user.admin = True - roles.update_roles(self.db, user) + update_roles(self.db, obj=user, kind='users') self.db.commit() try: await maybe_future(self.authenticator.add_user(user)) @@ -151,7 +152,7 @@ class UserAPIHandler(APIHandler): self._check_user_model(data) if 'admin' in data: user.admin = data['admin'] - roles.update_roles(self.db, user) + update_roles(self.db, obj=user, kind='users') self.db.commit() try: @@ -210,7 +211,7 @@ class UserAPIHandler(APIHandler): else: setattr(user, key, value) if key == 'admin': - roles.update_roles(self.db, user=user) + update_roles(self.db, obj=user, kind='users') self.db.commit() user_ = self.user_model(user) user_['auth_state'] = await user.get_auth_state() @@ -296,9 +297,13 @@ class UserTokenListAPIHandler(APIHandler): if requester is not user: note += " by %s %s" % (kind, requester.name) - api_token = user.new_api_token( - note=note, expires_in=body.get('expires_in', None) - ) + token_roles = body.get('roles') + try: + api_token = user.new_api_token( + note=note, expires_in=body.get('expires_in', None), roles=token_roles + ) + except ValueError: + raise web.HTTPError(404, "Requested roles %r not found" % token_roles) if requester is not user: self.log.info( "%s %s requested API token for %s", diff --git a/jupyterhub/app.py b/jupyterhub/app.py index 6618c6f0..4ea92bc7 100644 --- a/jupyterhub/app.py +++ b/jupyterhub/app.py @@ -318,18 +318,21 @@ class JupyterHub(Application): For instance:: - roles = [ - { - 'name': 'teacher', - 'description': 'Access users information, servers and groups without create/delete privileges', - 'scopes': ['users', 'groups'], - 'users': ['cyclops', 'wolverine'] - } - ] + load_roles = [ + { + 'name': 'teacher', + 'description': 'Access users information, servers and groups without create/delete privileges', + 'scopes': ['users', 'groups'], + 'users': ['cyclops', 'gandalf'], + 'services': [], + 'tokens': [] + } + ] + All keys apart from 'name' are optional. See all the available scopes in the JupyterHub REST API documentation. - The default roles are in roles.py. + Default roles are defined in roles.py. """, ).tag(config=True) @@ -1823,6 +1826,8 @@ class JupyterHub(Application): async def init_roles(self): """Load default and predefined roles into the database""" db = self.db + role_bearers = ['users', 'services', 'tokens'] + # load default roles default_roles = roles.get_default_roles() for role in default_roles: @@ -1831,43 +1836,31 @@ class JupyterHub(Application): # load predefined roles from config file for predef_role in self.load_roles: roles.add_role(db, predef_role) - role = orm.Role.find(db, predef_role['name']) - - # handle users - if 'users' in predef_role.keys(): - for username in predef_role['users']: - username = self.authenticator.normalize_username(username) - if not ( - await maybe_future( - self.authenticator.check_allowed(username, None) + # add users, services and/or tokens + for bearer in role_bearers: + if bearer in predef_role.keys(): + for bname in predef_role[bearer]: + if bearer == 'users': + bname = self.authenticator.normalize_username(bname) + if not ( + await maybe_future( + self.authenticator.check_allowed(bname, None) + ) + ): + raise ValueError( + "Username %r is not in Authenticator.allowed_users" + % bname + ) + roles.add_obj( + db, objname=bname, kind=bearer, rolename=predef_role['name'] ) - ): - raise ValueError( - "Username %r is not in Authenticator.allowed_users" - % username - ) - user = orm.User.find(db, name=username) - if user is None: - raise ValueError("%r does not exist" % username) - else: - roles.add_user(db, user=user, role=role) - # handle services - if 'services' in predef_role.keys(): - for servicename in predef_role['services']: - service = orm.Service.find(db, name=servicename) - if service is None: - raise ValueError("%r does not exist" % servicename) - else: - roles.add_user(db, user=service, role=role) - - # make sure all users and services have at least one role (update with default) - Classes = [orm.User, orm.Service] - for ormClass in Classes: - for obj in db.query(ormClass): + # make sure all users, services and tokens have at least one role (update with default) + for bearer in role_bearers: + Class = roles.get_orm_class(bearer) + for obj in db.query(Class): if len(obj.roles) < 1: - roles.update_roles(db, obj) - + roles.update_roles(db, obj=obj, kind=bearer) db.commit() async def _add_tokens(self, token_dict, kind): diff --git a/jupyterhub/handlers/base.py b/jupyterhub/handlers/base.py index c8b2dc9e..89722008 100644 --- a/jupyterhub/handlers/base.py +++ b/jupyterhub/handlers/base.py @@ -30,6 +30,7 @@ from tornado.web import RequestHandler from .. import __version__ from .. import orm +from .. import roles from ..metrics import PROXY_ADD_DURATION_SECONDS from ..metrics import PROXY_DELETE_DURATION_SECONDS from ..metrics import ProxyDeleteStatus @@ -453,6 +454,7 @@ class BaseHandler(RequestHandler): # not found, create and register user u = orm.User(name=username) self.db.add(u) + roles.update_roles(self.db, obj=u, kind='users') self.db.commit() user = self._user_from_orm(u) return user @@ -722,6 +724,7 @@ class BaseHandler(RequestHandler): # Only set `admin` if the authenticator returned an explicit value. if admin is not None and admin != user.admin: user.admin = admin + roles.update_roles(self.db, obj=user, kind='users') self.db.commit() # always set auth_state and commit, # because there could be key-rotation or clearing of previous values diff --git a/jupyterhub/orm.py b/jupyterhub/orm.py index 66d9f31d..90ca63f7 100644 --- a/jupyterhub/orm.py +++ b/jupyterhub/orm.py @@ -39,6 +39,9 @@ from sqlalchemy.types import Text from sqlalchemy.types import TypeDecorator from tornado.log import app_log +from .roles import add_role +from .roles import get_default_roles +from .roles import update_roles from .utils import compare_token from .utils import hash_token from .utils import new_token @@ -151,6 +154,18 @@ service_role_map = Table( Column('role_id', ForeignKey('roles.id', ondelete='CASCADE'), primary_key=True), ) +# token:role many:many mapping table +api_token_role_map = Table( + 'api_token_role_map', + Base.metadata, + Column( + 'api_token_id', + ForeignKey('api_tokens.id', ondelete='CASCADE'), + primary_key=True, + ), + Column('role_id', ForeignKey('roles.id', ondelete='CASCADE'), primary_key=True), +) + class Role(Base): """User Roles""" @@ -162,6 +177,7 @@ class Role(Base): scopes = Column(JSONList) users = relationship('User', secondary='user_role_map', backref='roles') services = relationship('Service', secondary='service_role_map', backref='roles') + tokens = relationship('APIToken', secondary='api_token_role_map', backref='roles') def __repr__(self): return "<%s %s (%s) - scopes: %s>" % ( @@ -570,6 +586,7 @@ class APIToken(Hashed, Base): token=None, user=None, service=None, + roles=None, note='', generated=True, expires_in=None, @@ -598,6 +615,14 @@ class APIToken(Hashed, Base): if expires_in is not None: orm_token.expires_at = cls.now() + timedelta(seconds=expires_in) db.add(orm_token) + # load default roles if they haven't been initiated + # correct to have this here? otherwise some tests fail + user_role = Role.find(db, 'user') + if not user_role: + default_roles = get_default_roles() + for role in default_roles: + add_role(db, role) + update_roles(db, obj=orm_token, kind='tokens', roles=roles) db.commit() return token diff --git a/jupyterhub/roles.py b/jupyterhub/roles.py index a56b3344..7f78067e 100644 --- a/jupyterhub/roles.py +++ b/jupyterhub/roles.py @@ -1,12 +1,12 @@ """Roles utils""" # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. -from .orm import Role +from . import orm def get_default_roles(): - """Returns a list of default roles dictionaries""" + """Returns a list of default role dictionaries""" default_roles = [ { @@ -43,50 +43,127 @@ def add_role(db, role_dict): """Adds a new role to database or modifies an existing one""" - role = Role.find(db, role_dict['name']) + if 'name' not in role_dict.keys(): + raise ValueError('Role must have a name') + else: + name = role_dict['name'] + role = orm.Role.find(db, name) + + description = role_dict.get('description') + scopes = role_dict.get('scopes') if role is None: - role = Role( - name=role_dict['name'], - description=role_dict['description'], - scopes=role_dict['scopes'], - ) + role = orm.Role(name=name, description=description, scopes=scopes,) db.add(role) else: - role.description = role_dict['description'] - role.scopes = role_dict['scopes'] - role.users = [] - role.services = [] + if description: + role.description = description + if scopes: + role.scopes = scopes db.commit() -def add_user(db, user, role): - if role is not None and role not in user.roles: - user.roles.append(role) - db.commit() - - -def remove_user(db, user, role): - if role is not None and role in user.roles: - user.roles.remove(role) - db.commit() - - -def update_roles(db, user): - - """Updates roles if user has no role with default or when user admin status is changed""" - - user_role = Role.find(db, 'user') - admin_role = Role.find(db, 'admin') - - if user.admin: - if user_role in user.roles: - remove_user(db, user, user_role) - add_user(db, user, admin_role) +def get_orm_class(kind): + if kind == 'users': + Class = orm.User + elif kind == 'services': + Class = orm.Service + elif kind == 'tokens': + Class = orm.APIToken else: - if admin_role in user.roles: - remove_user(db, user, admin_role) - # only add user role if the user has no other roles - if len(user.roles) < 1: - add_user(db, user, user_role) - db.commit() + raise ValueError("kind must be users, services or tokens, not %r" % kind) + + return Class + + +def existing_only(func): + + """Decorator for checking if objects and roles exist""" + + def check_existence(db, objname, kind, rolename): + + Class = get_orm_class(kind) + obj = Class.find(db, objname) + role = orm.Role.find(db, rolename) + + if obj is None: + raise ValueError("%r of kind %r does not exist" % (objname, kind)) + elif role is None: + raise ValueError("Role %r does not exist" % rolename) + else: + func(db, obj, kind, role) + + return check_existence + + +@existing_only +def add_obj(db, objname, kind, rolename): + + """Adds a role for users, services or tokens""" + + if rolename not in objname.roles: + objname.roles.append(rolename) + db.commit() + + +@existing_only +def remove_obj(db, objname, kind, rolename): + + """Removes a role for users, services or tokens""" + + if rolename in objname.roles: + objname.roles.remove(rolename) + db.commit() + + +def switch_default_role(db, obj, kind, admin): + + """Switch between default user and admin roles for users/services""" + + user_role = orm.Role.find(db, 'user') + admin_role = orm.Role.find(db, 'admin') + + def add_and_remove(db, obj, kind, current_role, new_role): + + if current_role in obj.roles: + remove_obj(db, objname=obj.name, kind=kind, rolename=current_role.name) + # only add new default role if the user has no other roles + if len(obj.roles) < 1: + add_obj(db, objname=obj.name, kind=kind, rolename=new_role.name) + + if admin: + add_and_remove(db, obj, kind, user_role, admin_role) + else: + add_and_remove(db, obj, kind, admin_role, user_role) + + +def update_roles(db, obj, kind, roles=None): + + """Updates object's roles if specified, + assigns default if no roles specified""" + + Class = get_orm_class(kind) + user_role = orm.Role.find(db, 'user') + + if roles: + for rolename in roles: + if Class == orm.APIToken: + # FIXME - check if specified roles do not add permissions + # on top of the token owner's scopes + role = orm.Role.find(db, rolename) + if role: + role.tokens.append(obj) + else: + raise ValueError('Role %r does not exist' % rolename) + else: + add_obj(db, objname=obj.name, kind=kind, rolename=rolename) + else: + # tokens can have only 'user' role as default + # assign the default only for user tokens + if Class == orm.APIToken: + if len(obj.roles) < 1 and obj.user is not None: + user_role.tokens.append(obj) + db.commit() + # users and services can have 'user' or 'admin' roles as default + else: + switch_default_role(db, obj, kind, obj.admin) diff --git a/jupyterhub/tests/mocking.py b/jupyterhub/tests/mocking.py index 003e193c..154a9922 100644 --- a/jupyterhub/tests/mocking.py +++ b/jupyterhub/tests/mocking.py @@ -312,7 +312,7 @@ class MockHub(JupyterHub): test_clean_db = Bool(True) def init_db(self): - """Ensure we start with a clean user list""" + """Ensure we start with a clean user & role list""" super().init_db() if self.test_clean_db: for user in self.db.query(orm.User): @@ -336,10 +336,10 @@ class MockHub(JupyterHub): user = self.db.query(orm.User).filter(orm.User.name == 'user').first() if user is None: user = orm.User(name='user') - user_role = orm.Role.find(self.db, 'user') - roles.add_user(self.db, user=user, role=user_role) self.db.add(user) self.db.commit() + roles.update_roles(self.db, obj=user, kind='users') + self.db.commit() def stop(self): super().stop() diff --git a/jupyterhub/tests/populate_db.py b/jupyterhub/tests/populate_db.py index fec3d7e9..2b5c6007 100644 --- a/jupyterhub/tests/populate_db.py +++ b/jupyterhub/tests/populate_db.py @@ -10,8 +10,6 @@ from datetime import datetime import jupyterhub from jupyterhub import orm -# FIXME - for later versions of jupyterhub add code to test roles - def populate_db(url): """Populate a jupyterhub database""" diff --git a/jupyterhub/tests/test_api.py b/jupyterhub/tests/test_api.py index 966dadcf..9d6a5dca 100644 --- a/jupyterhub/tests/test_api.py +++ b/jupyterhub/tests/test_api.py @@ -70,7 +70,7 @@ async def test_referer_check(app): # add admin user user = find_user(app.db, 'admin') if user is None: - user = add_user(app.db, name='admin', admin=True, roles=['admin']) + user = add_user(app.db, name='admin', admin=True) cookies = await app.login_user('admin') r = await api_request( @@ -1159,7 +1159,7 @@ async def test_token_as_user_deprecated(app, as_user, for_user, status): # ensure both users exist u = add_user(app.db, app, name=as_user) if for_user != 'missing': - add_user(app.db, app, name=for_user) + for_user_obj = add_user(app.db, app, name=for_user) data = {'username': for_user} headers = {'Authorization': 'token %s' % u.new_api_token()} r = await api_request( @@ -1252,7 +1252,7 @@ async def test_token_for_user(app, as_user, for_user, status): # ensure both users exist u = add_user(app.db, app, name=as_user) if for_user != 'missing': - add_user(app.db, app, name=for_user) + for_user_obj = add_user(app.db, app, name=for_user) data = {'username': for_user} headers = {'Authorization': 'token %s' % u.new_api_token()} r = await api_request( @@ -1269,6 +1269,7 @@ async def test_token_for_user(app, as_user, for_user, status): if status != 200: return assert 'token' in reply + token_id = reply['id'] r = await api_request(app, 'users', for_user, 'tokens', token_id, headers=headers) r.raise_for_status() diff --git a/jupyterhub/tests/test_named_servers.py b/jupyterhub/tests/test_named_servers.py index 20e515a2..884921c0 100644 --- a/jupyterhub/tests/test_named_servers.py +++ b/jupyterhub/tests/test_named_servers.py @@ -56,6 +56,7 @@ async def test_default_server(app, named_servers): assert user_model == fill_user( { 'name': username, + 'roles': ['user'], 'auth_state': None, 'server': user.url, 'servers': { @@ -86,7 +87,7 @@ async def test_default_server(app, named_servers): user_model = normalize_user(r.json()) assert user_model == fill_user( - {'name': username, 'servers': {}, 'auth_state': None} + {'name': username, 'roles': ['user'], 'servers': {}, 'auth_state': None} ) @@ -117,6 +118,7 @@ async def test_create_named_server(app, named_servers): assert user_model == fill_user( { 'name': username, + 'roles': ['user'], 'auth_state': None, 'servers': { servername: { @@ -159,7 +161,7 @@ async def test_delete_named_server(app, named_servers): user_model = normalize_user(r.json()) assert user_model == fill_user( - {'name': username, 'auth_state': None, 'servers': {}} + {'name': username, 'roles': ['user'], 'auth_state': None, 'servers': {}} ) # wrapper Spawner is gone assert servername not in user.spawners diff --git a/jupyterhub/tests/test_orm.py b/jupyterhub/tests/test_orm.py index 4b6c12ae..e9a7975c 100644 --- a/jupyterhub/tests/test_orm.py +++ b/jupyterhub/tests/test_orm.py @@ -560,4 +560,3 @@ def test_expiring_oauth_code(app, user): assert orm_code in db.query(orm.OAuthCode) orm.OAuthCode.purge_expired(db) assert orm_code not in db.query(orm.OAuthCode) - diff --git a/jupyterhub/tests/test_roles.py b/jupyterhub/tests/test_roles.py index 1e4ebe1d..da398dc9 100644 --- a/jupyterhub/tests/test_roles.py +++ b/jupyterhub/tests/test_roles.py @@ -1,48 +1,87 @@ """Test roles""" -# import pytest +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. +import json + from pytest import mark from .. import orm from .. import roles from ..utils import maybe_future from .mocking import MockHub +from .utils import add_user +from .utils import api_request @mark.role def test_orm_roles(db): """Test orm roles setup""" + user_role = orm.Role.find(db, name='user') + if not user_role: + user_role = orm.Role(name='user') + db.add(user_role) + db.commit() + + service_role = orm.Role(name='service') + db.add(service_role) + db.commit() + user = orm.User(name='falafel') db.add(user) + db.commit() + service = orm.Service(name='kebab') db.add(service) - role = orm.Role(name='default') - db.add(role) db.commit() - assert role.users == [] - assert role.services == [] + + assert user_role.users == [] + assert user_role.services == [] + assert service_role.users == [] + assert service_role.services == [] assert user.roles == [] assert service.roles == [] - role.users.append(user) - role.services.append(service) + user_role.users.append(user) + service_role.services.append(service) db.commit() - assert role.users == [user] - assert user.roles == [role] - assert role.services == [service] - assert service.roles == [role] + assert user_role.users == [user] + assert user.roles == [user_role] + assert service_role.services == [service] + assert service.roles == [service_role] + # check token creation without specifying its role + # assigns it the default 'user' role + token = user.new_api_token() + user_token = orm.APIToken.find(db, token=token) + assert user_token in user_role.tokens + assert user_role in user_token.roles + + # check creating token with a specific role + token = service.new_api_token(roles=['service']) + service_token = orm.APIToken.find(db, token=token) + assert service_token in service_role.tokens + assert service_role in service_token.roles + + # check deleting user removes the user and the token from roles db.delete(user) db.commit() - assert role.users == [] - db.delete(role) + assert user_role.users == [] + assert user_token not in user_role.tokens + # check deleting the service token removes it from 'service' role + db.delete(service_token) + db.commit() + assert service_token not in service_role.tokens + # check deleting the 'service' role removes it from service roles + db.delete(service_role) db.commit() assert service.roles == [] + db.delete(service) db.commit() @mark.role -def test_orm_role_delete_cascade(db): +def test_orm_roles_delete_cascade(db): """Orm roles cascade""" user1 = orm.User(name='user1') user2 = orm.User(name='user2') @@ -148,8 +187,6 @@ async def test_load_roles_users(tmpdir, request): hub.authenticator.admin_users = ['admin'] hub.authenticator.allowed_users = ['cyclops', 'gandalf', 'bilbo', 'gargamel'] await hub.init_users() - for user in db.query(orm.User): - print(user.name) await hub.init_roles() # test if the 'user' role has been overwritten and assigned @@ -178,6 +215,11 @@ async def test_load_roles_users(tmpdir, request): @mark.role async def test_load_roles_services(tmpdir, request): + services = [ + {'name': 'cull_idle', 'api_token': 'some-token'}, + {'name': 'user_service', 'api_token': 'some-other-token'}, + {'name': 'admin_service', 'api_token': 'secret-token', 'admin': True}, + ] roles_to_load = [ { 'name': 'culler', @@ -190,22 +232,20 @@ async def test_load_roles_services(tmpdir, request): ssl_enabled = getattr(request.module, "ssl_enabled", False) if ssl_enabled: kwargs['internal_certs_location'] = str(tmpdir) - hub = MockHub(test_clean_db=False, **kwargs) + hub = MockHub(**kwargs) hub.init_db() db = hub.db - # add test services to db - services = [ - {'name': 'cull_idle', 'admin': False}, - {'name': 'user_service', 'admin': False}, - {'name': 'admin_service', 'admin': True}, - ] - for service_specs in services: - service = orm.Service.find(db, service_specs['name']) - if service is None: - service = orm.Service( - name=service_specs['name'], admin=service_specs['admin'] - ) - db.add(service) + # clean db of previous services and add testing ones + for service in db.query(orm.Service): + db.delete(service) + db.commit() + for service in services: + orm_service = orm.Service.find(db, name=service['name']) + if orm_service is None: + # not found, create a new one + orm_service = orm.Service(name=service['name']) + db.add(orm_service) + orm_service.admin = service.get('admin', False) db.commit() await hub.init_roles() @@ -224,3 +264,104 @@ async def test_load_roles_services(tmpdir, request): cull_idle = orm.Service.find(db, name='cull_idle') assert culler_role in cull_idle.roles assert user_role not in cull_idle.roles + + # delete the test services + for service in db.query(orm.Service): + db.delete(service) + db.commit() + + +@mark.role +async def test_load_roles_tokens(tmpdir, request): + services = [ + {'name': 'cull_idle', 'admin': True, 'api_token': 'another-secret-token'} + ] + user_tokens = { + 'secret-token': 'cyclops', + 'super-secret-token': 'admin', + } + service_tokens = { + 'another-secret-token': 'cull_idle', + } + roles_to_load = [ + { + 'name': 'culler', + 'description': 'Cull idle servers', + 'scopes': ['users:servers', 'admin:servers'], + 'tokens': ['another-secret-token'], + }, + ] + kwargs = { + 'load_roles': roles_to_load, + 'services': services, + 'api_tokens': user_tokens, + 'service_tokens': service_tokens, + } + ssl_enabled = getattr(request.module, "ssl_enabled", False) + if ssl_enabled: + kwargs['internal_certs_location'] = str(tmpdir) + hub = MockHub(**kwargs) + hub.init_db() + db = hub.db + hub.authenticator.admin_users = ['admin'] + hub.authenticator.allowed_users = ['cyclops', 'gandalf'] + await hub.init_users() + await hub.init_api_tokens() + await hub.init_roles() + + # test if another-secret-token has culler role + service = orm.Service.find(db, 'cull_idle') + culler_role = orm.Role.find(db, 'culler') + token = orm.APIToken.find(db, 'another-secret-token') + assert len(token.roles) == 1 + assert culler_role in token.roles + + # test if all other tokens have default 'user' role + user_role = orm.Role.find(db, 'user') + sec_token = orm.APIToken.find(db, 'secret-token') + assert user_role in sec_token.roles + s_sec_token = orm.APIToken.find(db, 'super-secret-token') + assert user_role in s_sec_token.roles + + +@mark.role +@mark.parametrize( + "headers, role_list, status", + [ + ({}, None, 200), + ({}, ['reader'], 200), + ({}, ['non-existing'], 404), + # FIXME - add requesting token with 'not allowed' role + # granting more permission than the token owner has + ], +) +async def test_get_new_token_via_api(app, headers, role_list, status): + user = add_user(app.db, app, name='user') + roles.add_role(app.db, {'name': 'reader', 'scopes': ['read:all']}) + if role_list: + body = json.dumps({'roles': role_list}) + else: + body = '' + # request a new token + r = await api_request( + app, 'users/user/tokens', method='post', headers=headers, data=body + ) + assert r.status_code == status + if status != 200: + return + # check the new-token reply for roles + reply = r.json() + assert 'token' in reply + assert reply['user'] == 'user' + if not role_list: + assert reply['roles'] == ['user'] + else: + assert reply['roles'] == ['reader'] + token_id = reply['id'] + + # delete the token + r = await api_request(app, 'users/user/tokens', token_id, method='delete') + assert r.status_code == 204 + # verify deletion + r = await api_request(app, 'users/user/tokens', token_id) + assert r.status_code == 404 diff --git a/jupyterhub/tests/utils.py b/jupyterhub/tests/utils.py index 09aeb196..4c542871 100644 --- a/jupyterhub/tests/utils.py +++ b/jupyterhub/tests/utils.py @@ -6,6 +6,7 @@ from certipy import Certipy from jupyterhub import orm from jupyterhub.objects import Server +from jupyterhub.roles import update_roles from jupyterhub.utils import url_path_join as ujoin @@ -101,6 +102,8 @@ def add_user(db, app=None, **kwargs): for attr, value in kwargs.items(): setattr(orm_user, attr, value) db.commit() + requested_roles = kwargs.get('roles') + update_roles(db, obj=orm_user, kind='users', roles=requested_roles) if app: return app.users[orm_user.id] else: