mirror of
https://github.com/jupyterhub/jupyterhub.git
synced 2025-10-17 23:13:00 +00:00
added token role check during loading config file and logs for role creation/changes/assignements
This commit is contained in:
@@ -1828,17 +1828,20 @@ class JupyterHub(Application):
|
|||||||
async def init_roles(self):
|
async def init_roles(self):
|
||||||
"""Load default and predefined roles into the database"""
|
"""Load default and predefined roles into the database"""
|
||||||
db = self.db
|
db = self.db
|
||||||
role_bearers = ['users', 'services', 'tokens', 'groups']
|
role_bearers = ['users', 'services', 'groups']
|
||||||
|
|
||||||
# load default roles
|
# load default roles
|
||||||
|
self.log.debug('Loading default roles to database')
|
||||||
default_roles = roles.get_default_roles()
|
default_roles = roles.get_default_roles()
|
||||||
for role in default_roles:
|
for role in default_roles:
|
||||||
roles.add_role(db, role)
|
roles.add_role(db, role)
|
||||||
|
|
||||||
# load predefined roles from config file
|
# load predefined roles from config file
|
||||||
|
self.log.debug('Loading predefined roles from config file to database')
|
||||||
for predef_role in self.load_roles:
|
for predef_role in self.load_roles:
|
||||||
roles.add_role(db, predef_role)
|
roles.add_role(db, predef_role)
|
||||||
# add users, services, tokens and/or groups
|
# add users, services, and/or groups,
|
||||||
|
# tokens need to be checked for permissions
|
||||||
for bearer in role_bearers:
|
for bearer in role_bearers:
|
||||||
if bearer in predef_role.keys():
|
if bearer in predef_role.keys():
|
||||||
for bname in predef_role[bearer]:
|
for bname in predef_role[bearer]:
|
||||||
@@ -1861,6 +1864,12 @@ class JupyterHub(Application):
|
|||||||
for bearer in role_bearers:
|
for bearer in role_bearers:
|
||||||
roles.check_for_default_roles(db, bearer)
|
roles.check_for_default_roles(db, bearer)
|
||||||
|
|
||||||
|
# now add roles to tokens if their owner's permissions allow
|
||||||
|
roles.add_predef_roles_tokens(db, self.load_roles)
|
||||||
|
|
||||||
|
# check tokens for default roles
|
||||||
|
roles.check_for_default_roles(db, bearer='tokens')
|
||||||
|
|
||||||
async def _add_tokens(self, token_dict, kind):
|
async def _add_tokens(self, token_dict, kind):
|
||||||
"""Add tokens for users or services to the database"""
|
"""Add tokens for users or services to the database"""
|
||||||
if kind == 'user':
|
if kind == 'user':
|
||||||
|
@@ -4,6 +4,7 @@
|
|||||||
from itertools import chain
|
from itertools import chain
|
||||||
|
|
||||||
from sqlalchemy import func
|
from sqlalchemy import func
|
||||||
|
from tornado.log import app_log
|
||||||
|
|
||||||
from . import orm
|
from . import orm
|
||||||
|
|
||||||
@@ -122,8 +123,10 @@ def add_role(db, role_dict):
|
|||||||
|
|
||||||
"""Adds a new role to database or modifies an existing one"""
|
"""Adds a new role to database or modifies an existing one"""
|
||||||
|
|
||||||
|
default_roles = get_default_roles()
|
||||||
|
|
||||||
if 'name' not in role_dict.keys():
|
if 'name' not in role_dict.keys():
|
||||||
raise ValueError('Role must have a name')
|
raise ValueError('Role definition in config file must have a name!')
|
||||||
else:
|
else:
|
||||||
name = role_dict['name']
|
name = role_dict['name']
|
||||||
role = orm.Role.find(db, name)
|
role = orm.Role.find(db, name)
|
||||||
@@ -134,11 +137,17 @@ def add_role(db, role_dict):
|
|||||||
if role is None:
|
if role is None:
|
||||||
role = orm.Role(name=name, description=description, scopes=scopes)
|
role = orm.Role(name=name, description=description, scopes=scopes)
|
||||||
db.add(role)
|
db.add(role)
|
||||||
|
if role_dict not in default_roles:
|
||||||
|
app_log.info('Adding role %s to database', name)
|
||||||
else:
|
else:
|
||||||
if description:
|
if description:
|
||||||
role.description = description
|
if role.description != description:
|
||||||
|
app_log.info('Changing role %s description to %s', name, description)
|
||||||
|
role.description = description
|
||||||
if scopes:
|
if scopes:
|
||||||
role.scopes = scopes
|
if role.scopes != scopes:
|
||||||
|
app_log.info('Changing role %s scopes to %s', name, scopes)
|
||||||
|
role.scopes = scopes
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
@@ -184,9 +193,15 @@ def add_obj(db, objname, kind, rolename):
|
|||||||
|
|
||||||
"""Adds a role for users, services, tokens or groups"""
|
"""Adds a role for users, services, tokens or groups"""
|
||||||
|
|
||||||
|
if kind == 'tokens':
|
||||||
|
log_objname = objname
|
||||||
|
else:
|
||||||
|
log_objname = objname.name
|
||||||
|
|
||||||
if rolename not in objname.roles:
|
if rolename not in objname.roles:
|
||||||
objname.roles.append(rolename)
|
objname.roles.append(rolename)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
app_log.info('Adding role %s for %s: %s', rolename.name, kind[:-1], log_objname)
|
||||||
|
|
||||||
|
|
||||||
@existing_only
|
@existing_only
|
||||||
@@ -194,9 +209,17 @@ def remove_obj(db, objname, kind, rolename):
|
|||||||
|
|
||||||
"""Removes a role for users, services or tokens"""
|
"""Removes a role for users, services or tokens"""
|
||||||
|
|
||||||
|
if kind == 'tokens':
|
||||||
|
log_objname = objname
|
||||||
|
else:
|
||||||
|
log_objname = objname.name
|
||||||
|
|
||||||
if rolename in objname.roles:
|
if rolename in objname.roles:
|
||||||
objname.roles.remove(rolename)
|
objname.roles.remove(rolename)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
app_log.info(
|
||||||
|
'Removing role %s for %s: %s', rolename.name, kind[:-1], log_objname
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def switch_default_role(db, obj, kind, admin):
|
def switch_default_role(db, obj, kind, admin):
|
||||||
@@ -260,36 +283,67 @@ def update_roles(db, obj, kind, roles=None):
|
|||||||
if Class == orm.APIToken:
|
if Class == orm.APIToken:
|
||||||
role = orm.Role.find(db, rolename)
|
role = orm.Role.find(db, rolename)
|
||||||
if role:
|
if role:
|
||||||
|
app_log.debug(
|
||||||
|
'Checking token permissions against requested role %s', rolename
|
||||||
|
)
|
||||||
token_scopes, owner_scopes = check_token_roles(db, obj, role)
|
token_scopes, owner_scopes = check_token_roles(db, obj, role)
|
||||||
if token_scopes.issubset(owner_scopes):
|
if token_scopes.issubset(owner_scopes):
|
||||||
role.tokens.append(obj)
|
role.tokens.append(obj)
|
||||||
|
app_log.info(
|
||||||
|
'Adding role %s for %s: %s', role.name, kind[:-1], obj
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
'Requested token role %r has higher permissions than the token owner'
|
'Requested token role %r of %r with scopes %r has higher permissions than the owner scopes %r'
|
||||||
% rolename
|
% (rolename, obj, token_scopes, owner_scopes)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
raise NameError('Role %r does not exist' % rolename)
|
raise NameError('Role %r does not exist' % rolename)
|
||||||
else:
|
else:
|
||||||
add_obj(db, objname=obj.name, kind=kind, rolename=rolename)
|
add_obj(db, objname=obj.name, kind=kind, rolename=rolename)
|
||||||
else:
|
else:
|
||||||
|
# CHECK ME - Does the default role assignment here make sense?
|
||||||
|
|
||||||
# groups can be without a role
|
# groups can be without a role
|
||||||
if Class == orm.Group:
|
if Class == orm.Group:
|
||||||
pass
|
pass
|
||||||
# tokens can have only 'user' role as default
|
# tokens can have only 'user' role as default
|
||||||
# assign the default only for user tokens
|
# assign the default only for user tokens
|
||||||
|
# service tokens with no specified role remain without any role (no default)
|
||||||
elif Class == orm.APIToken:
|
elif Class == orm.APIToken:
|
||||||
|
app_log.debug('Assigning default roles to tokens')
|
||||||
if len(obj.roles) < 1 and obj.user is not None:
|
if len(obj.roles) < 1 and obj.user is not None:
|
||||||
user_role.tokens.append(obj)
|
user_role.tokens.append(obj)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
app_log.info('Adding role %s to token %s', 'user', obj)
|
||||||
# users and services can have 'user' or 'admin' roles as default
|
# users and services can have 'user' or 'admin' roles as default
|
||||||
else:
|
else:
|
||||||
|
app_log.debug('Assigning default roles to %s', kind)
|
||||||
switch_default_role(db, obj, kind, obj.admin)
|
switch_default_role(db, obj, kind, obj.admin)
|
||||||
|
|
||||||
|
|
||||||
|
def add_predef_roles_tokens(db, predef_roles):
|
||||||
|
|
||||||
|
"""Adds tokens to predefined roles in config file
|
||||||
|
if their permissions allow"""
|
||||||
|
|
||||||
|
for predef_role in predef_roles:
|
||||||
|
if 'tokens' in predef_role.keys():
|
||||||
|
token_role = orm.Role.find(db, name=predef_role['name'])
|
||||||
|
for token_name in predef_role['tokens']:
|
||||||
|
token = orm.APIToken.find(db, token_name)
|
||||||
|
if token is None:
|
||||||
|
raise ValueError(
|
||||||
|
"Token %r does not exist and cannot assign it to role %r"
|
||||||
|
% (token_name, token_role.name)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
update_roles(db, obj=token, kind='tokens', roles=[token_role.name])
|
||||||
|
|
||||||
|
|
||||||
def check_for_default_roles(db, bearer):
|
def check_for_default_roles(db, bearer):
|
||||||
|
|
||||||
"""Checks that all role bearers have at least one role (default if none).
|
"""Checks that role bearers have at least one role (default if none).
|
||||||
Groups can be without a role"""
|
Groups can be without a role"""
|
||||||
|
|
||||||
Class = get_orm_class(bearer)
|
Class = get_orm_class(bearer)
|
||||||
@@ -306,14 +360,15 @@ def check_for_default_roles(db, bearer):
|
|||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
def mock_roles(app, name, kind):
|
def mock_roles(db, name, kind):
|
||||||
|
|
||||||
"""Loads and assigns default roles for mocked objects"""
|
"""Loads and assigns default roles for mocked objects"""
|
||||||
|
|
||||||
Class = get_orm_class(kind)
|
Class = get_orm_class(kind)
|
||||||
|
|
||||||
obj = Class.find(app.db, name=name)
|
obj = Class.find(db, name=name)
|
||||||
default_roles = get_default_roles()
|
default_roles = get_default_roles()
|
||||||
for role in default_roles:
|
for role in default_roles:
|
||||||
add_role(app.db, role)
|
add_role(db, role)
|
||||||
update_roles(db=app.db, obj=obj, kind=kind)
|
app_log.info('Assigning default roles to mocked %s: %s', kind[:-1], name)
|
||||||
|
update_roles(db=db, obj=obj, kind=kind)
|
||||||
|
@@ -246,7 +246,7 @@ def _mockservice(request, app, url=False):
|
|||||||
):
|
):
|
||||||
app.services = [spec]
|
app.services = [spec]
|
||||||
app.init_services()
|
app.init_services()
|
||||||
mock_roles(app, name, 'services')
|
mock_roles(app.db, name, 'services')
|
||||||
assert name in app._service_map
|
assert name in app._service_map
|
||||||
service = app._service_map[name]
|
service = app._service_map[name]
|
||||||
|
|
||||||
|
@@ -277,8 +277,13 @@ async def test_load_roles_services(tmpdir, request):
|
|||||||
services = [
|
services = [
|
||||||
{'name': 'cull_idle', 'api_token': 'some-token'},
|
{'name': 'cull_idle', 'api_token': 'some-token'},
|
||||||
{'name': 'user_service', 'api_token': 'some-other-token'},
|
{'name': 'user_service', 'api_token': 'some-other-token'},
|
||||||
{'name': 'admin_service', 'api_token': 'secret-token', 'admin': True},
|
{'name': 'admin_service', 'api_token': 'secret-token'},
|
||||||
]
|
]
|
||||||
|
service_tokens = {
|
||||||
|
'some-token': 'cull_idle',
|
||||||
|
'some-other-token': 'user_service',
|
||||||
|
'secret-token': 'admin_service',
|
||||||
|
}
|
||||||
roles_to_load = [
|
roles_to_load = [
|
||||||
{
|
{
|
||||||
'name': 'culler',
|
'name': 'culler',
|
||||||
@@ -287,24 +292,21 @@ async def test_load_roles_services(tmpdir, request):
|
|||||||
'services': ['cull_idle'],
|
'services': ['cull_idle'],
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
kwargs = {'load_roles': roles_to_load}
|
kwargs = {
|
||||||
|
'load_roles': roles_to_load,
|
||||||
|
'services': services,
|
||||||
|
'service_tokens': service_tokens,
|
||||||
|
}
|
||||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||||
if ssl_enabled:
|
if ssl_enabled:
|
||||||
kwargs['internal_certs_location'] = str(tmpdir)
|
kwargs['internal_certs_location'] = str(tmpdir)
|
||||||
hub = MockHub(**kwargs)
|
hub = MockHub(**kwargs)
|
||||||
hub.init_db()
|
hub.init_db()
|
||||||
db = hub.db
|
db = hub.db
|
||||||
# clean db of previous services and add testing ones
|
await hub.init_api_tokens()
|
||||||
for service in db.query(orm.Service):
|
# make 'admin_service' admin
|
||||||
db.delete(service)
|
admin_service = orm.Service.find(db, 'admin_service')
|
||||||
db.commit()
|
admin_service.admin = True
|
||||||
for service in services:
|
|
||||||
orm_service = orm.Service.find(db, name=service['name'])
|
|
||||||
if orm_service is None:
|
|
||||||
# not found, create a new one
|
|
||||||
orm_service = orm.Service(name=service['name'])
|
|
||||||
db.add(orm_service)
|
|
||||||
orm_service.admin = service.get('admin', False)
|
|
||||||
db.commit()
|
db.commit()
|
||||||
await hub.init_roles()
|
await hub.init_roles()
|
||||||
|
|
||||||
@@ -329,6 +331,11 @@ async def test_load_roles_services(tmpdir, request):
|
|||||||
db.delete(service)
|
db.delete(service)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
|
# delete the test tokens
|
||||||
|
for token in db.query(orm.APIToken):
|
||||||
|
db.delete(token)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
@mark.role
|
@mark.role
|
||||||
async def test_load_roles_groups(tmpdir, request):
|
async def test_load_roles_groups(tmpdir, request):
|
||||||
@@ -376,30 +383,23 @@ async def test_load_roles_groups(tmpdir, request):
|
|||||||
|
|
||||||
|
|
||||||
@mark.role
|
@mark.role
|
||||||
async def test_load_roles_tokens(tmpdir, request):
|
async def test_load_roles_user_tokens(tmpdir, request):
|
||||||
services = [
|
|
||||||
{'name': 'cull_idle', 'admin': True, 'api_token': 'another-secret-token'}
|
|
||||||
]
|
|
||||||
user_tokens = {
|
user_tokens = {
|
||||||
'secret-token': 'cyclops',
|
'secret-token': 'cyclops',
|
||||||
|
'secrety-token': 'gandalf',
|
||||||
'super-secret-token': 'admin',
|
'super-secret-token': 'admin',
|
||||||
}
|
}
|
||||||
service_tokens = {
|
|
||||||
'another-secret-token': 'cull_idle',
|
|
||||||
}
|
|
||||||
roles_to_load = [
|
roles_to_load = [
|
||||||
{
|
{
|
||||||
'name': 'culler',
|
'name': 'reader',
|
||||||
'description': 'Cull idle servers',
|
'description': 'Read-only own model',
|
||||||
'scopes': ['users:servers', 'admin:servers'],
|
'scopes': ['read:all'],
|
||||||
'tokens': ['another-secret-token'],
|
'tokens': ['secrety-token'],
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
kwargs = {
|
kwargs = {
|
||||||
'load_roles': roles_to_load,
|
'load_roles': roles_to_load,
|
||||||
'services': services,
|
|
||||||
'api_tokens': user_tokens,
|
'api_tokens': user_tokens,
|
||||||
'service_tokens': service_tokens,
|
|
||||||
}
|
}
|
||||||
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||||
if ssl_enabled:
|
if ssl_enabled:
|
||||||
@@ -413,12 +413,10 @@ async def test_load_roles_tokens(tmpdir, request):
|
|||||||
await hub.init_api_tokens()
|
await hub.init_api_tokens()
|
||||||
await hub.init_roles()
|
await hub.init_roles()
|
||||||
|
|
||||||
# test if another-secret-token has culler role
|
# test if gandalf's token has the 'reader' role
|
||||||
service = orm.Service.find(db, 'cull_idle')
|
reader_role = orm.Role.find(db, 'reader')
|
||||||
culler_role = orm.Role.find(db, 'culler')
|
token = orm.APIToken.find(db, 'secrety-token')
|
||||||
token = orm.APIToken.find(db, 'another-secret-token')
|
assert reader_role in token.roles
|
||||||
assert len(token.roles) == 1
|
|
||||||
assert culler_role in token.roles
|
|
||||||
|
|
||||||
# test if all other tokens have default 'user' role
|
# test if all other tokens have default 'user' role
|
||||||
user_role = orm.Role.find(db, 'user')
|
user_role = orm.Role.find(db, 'user')
|
||||||
@@ -427,6 +425,154 @@ async def test_load_roles_tokens(tmpdir, request):
|
|||||||
s_sec_token = orm.APIToken.find(db, 'super-secret-token')
|
s_sec_token = orm.APIToken.find(db, 'super-secret-token')
|
||||||
assert user_role in s_sec_token.roles
|
assert user_role in s_sec_token.roles
|
||||||
|
|
||||||
|
# delete the test tokens
|
||||||
|
for token in db.query(orm.APIToken):
|
||||||
|
db.delete(token)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
|
@mark.role
|
||||||
|
async def test_load_roles_user_tokens_not_allowed(tmpdir, request):
|
||||||
|
user_tokens = {
|
||||||
|
'secret-token': 'bilbo',
|
||||||
|
}
|
||||||
|
roles_to_load = [
|
||||||
|
{
|
||||||
|
'name': 'user-reader',
|
||||||
|
'description': 'Read-only any user model',
|
||||||
|
'scopes': ['read:users'],
|
||||||
|
'tokens': ['secret-token'],
|
||||||
|
},
|
||||||
|
]
|
||||||
|
kwargs = {
|
||||||
|
'load_roles': roles_to_load,
|
||||||
|
'api_tokens': user_tokens,
|
||||||
|
}
|
||||||
|
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||||
|
if ssl_enabled:
|
||||||
|
kwargs['internal_certs_location'] = str(tmpdir)
|
||||||
|
hub = MockHub(**kwargs)
|
||||||
|
hub.init_db()
|
||||||
|
db = hub.db
|
||||||
|
hub.authenticator.allowed_users = ['bilbo']
|
||||||
|
await hub.init_users()
|
||||||
|
await hub.init_api_tokens()
|
||||||
|
|
||||||
|
response = 'allowed'
|
||||||
|
# bilbo has only default 'user' role
|
||||||
|
# while bilbo's token is requesting role with higher permissions
|
||||||
|
try:
|
||||||
|
await hub.init_roles()
|
||||||
|
except ValueError:
|
||||||
|
response = 'denied'
|
||||||
|
|
||||||
|
assert response == 'denied'
|
||||||
|
|
||||||
|
# delete the test tokens
|
||||||
|
for token in db.query(orm.APIToken):
|
||||||
|
db.delete(token)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
|
@mark.role
|
||||||
|
async def test_load_roles_service_tokens(tmpdir, request):
|
||||||
|
services = [{'name': 'cull_idle', 'api_token': 'another-secret-token'}]
|
||||||
|
service_tokens = {
|
||||||
|
'another-secret-token': 'cull_idle',
|
||||||
|
}
|
||||||
|
roles_to_load = [
|
||||||
|
{
|
||||||
|
'name': 'culler',
|
||||||
|
'description': 'Cull idle servers',
|
||||||
|
'scopes': ['users:servers', 'admin:users:servers'],
|
||||||
|
'tokens': ['another-secret-token'],
|
||||||
|
},
|
||||||
|
]
|
||||||
|
kwargs = {
|
||||||
|
'load_roles': roles_to_load,
|
||||||
|
'services': services,
|
||||||
|
'service_tokens': service_tokens,
|
||||||
|
}
|
||||||
|
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||||
|
if ssl_enabled:
|
||||||
|
kwargs['internal_certs_location'] = str(tmpdir)
|
||||||
|
hub = MockHub(**kwargs)
|
||||||
|
hub.init_db()
|
||||||
|
db = hub.db
|
||||||
|
await hub.init_api_tokens()
|
||||||
|
# make the service admin
|
||||||
|
service = orm.Service.find(db, 'cull_idle')
|
||||||
|
service.admin = True
|
||||||
|
await hub.init_roles()
|
||||||
|
|
||||||
|
# test if another-secret-token has culler role
|
||||||
|
culler_role = orm.Role.find(db, 'culler')
|
||||||
|
token = orm.APIToken.find(db, 'another-secret-token')
|
||||||
|
assert len(token.roles) == 1
|
||||||
|
assert culler_role in token.roles
|
||||||
|
|
||||||
|
# delete the test services
|
||||||
|
for service in db.query(orm.Service):
|
||||||
|
db.delete(service)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
# delete the test tokens
|
||||||
|
for token in db.query(orm.APIToken):
|
||||||
|
db.delete(token)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
|
@mark.role
|
||||||
|
async def test_load_roles_service_tokens_not_allowed(tmpdir, request):
|
||||||
|
services = [{'name': 'some-service', 'api_token': 'secret-token'}]
|
||||||
|
service_tokens = {
|
||||||
|
'secret-token': 'some-service',
|
||||||
|
}
|
||||||
|
roles_to_load = [
|
||||||
|
{
|
||||||
|
'name': 'user-reader',
|
||||||
|
'description': 'Read-only user models',
|
||||||
|
'scopes': ['read:users'],
|
||||||
|
'services': ['some-service'],
|
||||||
|
},
|
||||||
|
# 'culler' role has higher permissions that the token's owner 'some-service'
|
||||||
|
{
|
||||||
|
'name': 'culler',
|
||||||
|
'description': 'Cull idle servers',
|
||||||
|
'scopes': ['users:servers', 'admin:users:servers'],
|
||||||
|
'tokens': ['secret-token'],
|
||||||
|
},
|
||||||
|
]
|
||||||
|
kwargs = {
|
||||||
|
'load_roles': roles_to_load,
|
||||||
|
'services': services,
|
||||||
|
'service_tokens': service_tokens,
|
||||||
|
}
|
||||||
|
ssl_enabled = getattr(request.module, "ssl_enabled", False)
|
||||||
|
if ssl_enabled:
|
||||||
|
kwargs['internal_certs_location'] = str(tmpdir)
|
||||||
|
hub = MockHub(**kwargs)
|
||||||
|
hub.init_db()
|
||||||
|
db = hub.db
|
||||||
|
await hub.init_api_tokens()
|
||||||
|
response = 'allowed'
|
||||||
|
try:
|
||||||
|
await hub.init_roles()
|
||||||
|
except ValueError:
|
||||||
|
response = 'denied'
|
||||||
|
|
||||||
|
assert response == 'denied'
|
||||||
|
|
||||||
|
# delete the test services
|
||||||
|
for service in db.query(orm.Service):
|
||||||
|
db.delete(service)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
# delete the test tokens
|
||||||
|
for token in db.query(orm.APIToken):
|
||||||
|
db.delete(token)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
@mark.role
|
@mark.role
|
||||||
@mark.parametrize(
|
@mark.parametrize(
|
||||||
|
Reference in New Issue
Block a user