Implemented default token roles, self scope for users and tokens for mockservices

This commit is contained in:
0mar
2021-03-11 19:33:05 +01:00
parent bf333d8e35
commit 7496fda089
6 changed files with 28 additions and 25 deletions

View File

@@ -617,8 +617,8 @@ class APIToken(Hashed, Base):
db.add(orm_token)
# load default roles if they haven't been initiated
# correct to have this here? otherwise some tests fail
user_role = Role.find(db, 'user')
if not user_role:
token_role = Role.find(db, 'token')
if not token_role:
default_roles = get_default_roles()
for role in default_roles:
add_role(db, role)

View File

@@ -37,9 +37,11 @@ def get_default_roles():
'description': 'Post activity only',
'scopes': ['users:activity!user=username'],
},
# {'name': 'token',
# 'description': 'Token with same rights as token owner',
# 'scopes': ['all']}
{
'name': 'token',
'description': 'Token with same rights as token owner',
'scopes': ['all'],
},
]
return default_roles
@@ -79,7 +81,7 @@ def get_scope_hierarchy():
"""
scopes = {
'all': ['read:all'], # Todo: optional
'all': None, # Optional 'read:all' as subscope, not implemented at this stage
'users': ['read:users', 'users:activity', 'users:servers'],
'read:users': [
'read:users:name',
@@ -134,7 +136,7 @@ def get_scopes_for(orm_object):
scopes = get_subscopes(*orm_object.roles)
if 'self' in scopes:
scopes.remove('self')
scopes += expand_self_scope(orm_object.name)
scopes |= expand_self_scope(orm_object.name)
return scopes
@@ -236,8 +238,8 @@ def update_roles(db, obj, kind, roles=None):
assigns default if no roles specified"""
Class = orm.get_class(kind)
user_role = orm.Role.find(db, 'user')
default_token_role = orm.Role.find(db, 'token')
standard_permissions = {'all', 'read:all'}
if roles:
for rolename in roles:
if Class == orm.APIToken:
@@ -246,6 +248,7 @@ def update_roles(db, obj, kind, roles=None):
if role:
# compare the requested role permissions with the owner's permissions (scopes)
token_scopes = get_subscopes(role)
extra_scopes = token_scopes - standard_permissions
# find the owner and their roles
owner = None
if obj.user_id:
@@ -253,24 +256,24 @@ def update_roles(db, obj, kind, roles=None):
elif obj.service_id:
owner = db.query(orm.Service).get(obj.service_id)
if owner:
owner_scopes = get_subscopes(*owner.roles)
if token_scopes.issubset(owner_scopes):
owner_scopes = get_scopes_for(owner)
if (extra_scopes).issubset(owner_scopes):
role.tokens.append(obj)
else:
raise ValueError(
'Requested token role %r has higher permissions than the token owner'
% rolename
'Requested token role %r has more permissions than the token owner: [%s]'
% (rolename, ",".join(extra_scopes - owner_scopes))
)
else:
raise NameError('Role %r does not exist' % rolename)
else:
add_obj(db, objname=obj.name, kind=kind, rolename=rolename)
else:
# tokens can have only 'user' role as default
# assign the default only for user tokens
# tokens can have only 'token' role as default
# assign the default only for tokens
if Class == orm.APIToken:
if len(obj.roles) < 1 and obj.user is not None:
user_role.tokens.append(obj)
if not obj.roles and obj.user is not None:
default_token_role.tokens.append(obj)
db.commit()
# users and services can have 'user' or 'admin' roles as default
else:

View File

@@ -23,6 +23,7 @@ def get_scopes_for(orm_object):
token_scopes = roles.get_scopes_for(orm_object)
owner_scopes = roles.get_scopes_for(owner)
if 'all' in token_scopes:
token_scopes.remove('all')
token_scopes |= owner_scopes
scopes = token_scopes & owner_scopes
discarded_token_scopes = token_scopes - scopes

View File

@@ -45,6 +45,7 @@ from . import mocking
from .. import crypto
from .. import orm
from ..roles import mock_roles
from ..roles import update_roles
from ..utils import random_port
from .mocking import MockHub
from .test_services import mockservice_cmd
@@ -249,6 +250,8 @@ def _mockservice(request, app, url=False):
mock_roles(app, name, 'services')
assert name in app._service_map
service = app._service_map[name]
token = service.orm.api_tokens[0]
update_roles(app.db, token, 'tokens', roles=['token'])
async def start():
# wait for proxy to be updated before starting the service

View File

@@ -1319,7 +1319,7 @@ async def test_token_for_user(app, as_user, for_user, status):
if for_user != 'missing':
for_user_obj = add_user(app.db, app, name=for_user)
data = {'username': for_user}
headers = {'Authorization': 'token %s' % u.new_api_token(roles=[as_user])}
headers = {'Authorization': 'token %s' % u.new_api_token()}
r = await api_request(
app,
'users',
@@ -1414,7 +1414,7 @@ async def test_token_list(app, as_user, for_user, status):
u = add_user(app.db, app, name=as_user)
if for_user != 'missing':
for_user_obj = add_user(app.db, app, name=for_user)
headers = {'Authorization': 'token %s' % u.new_api_token(roles=[as_user])}
headers = {'Authorization': 'token %s' % u.new_api_token()}
r = await api_request(app, 'users', for_user, 'tokens', headers=headers)
assert r.status_code == status
if status != 200:

View File

@@ -120,18 +120,14 @@ def add_user(db, app=None, **kwargs):
return orm_user
def auth_header(db, name, inherit=True):
def auth_header(db, name):
"""Return header with user's API authorization token.
If inherit is True, copies the roles of the invoking user
"""
user = find_user(db, name)
if user is None:
raise KeyError(f"No such user: {name}")
if inherit:
roles = [role.name for role in user.roles]
else:
roles = None
token = user.new_api_token(roles=roles)
token = user.new_api_token()
return {'Authorization': 'token %s' % token}