add new token management to REST API

- list tokens
- create new tokens
- delete tokens
This commit is contained in:
Min RK
2018-04-13 12:10:50 +02:00
parent 6437093a67
commit 251289fc05
2 changed files with 221 additions and 4 deletions

View File

@@ -11,6 +11,7 @@ from tornado import web
from tornado.iostream import StreamClosedError
from .. import orm
from ..user import User
from ..utils import admin_only, iterate_until, maybe_future, url_path_join
from .base import APIHandler
@@ -189,6 +190,89 @@ class UserAPIHandler(APIHandler):
self.write(json.dumps(user_))
class UserTokenListAPIHandler(APIHandler):
"""API endpoint for listing/creating tokens"""
@admin_or_self
def get(self, name):
"""Get tokens for a given user"""
user = self.find_user(name)
if not user:
raise web.HTTPError(404, "No such user: %s" % name)
api_tokens = []
def sort_key(token):
return token.last_activity or token.created
for token in sorted(user.api_tokens, key=sort_key):
api_tokens.append(self.token_model(token))
oauth_tokens = []
for token in sorted(user.oauth_tokens, key=sort_key):
oauth_tokens.append(self.token_model(token))
self.write(json.dumps({
'api_tokens': api_tokens,
'oauth_tokens': oauth_tokens,
}))
@admin_or_self
def post(self, name):
requester = self.get_current_user()
user = self.find_user(name)
if requester is not user and not requester.admin:
raise web.HTTPError(403, "Only admins can request tokens for other users")
if not user:
raise web.HTTPError(404, "No such user: %s" % name)
body = self.get_json_body()
if requester is not user:
kind = 'user' if isinstance(requester, User) else 'service'
note = (body or {}).get('note')
if not note:
note = "via api"
if requester is not user:
note += " by %s %s" % (kind, requester.name)
api_token = user.new_api_token(note=note)
if requester is not user:
self.log.info("%s %s requested API token for %s", kind.title(), requester.name, user.name)
else:
user_kind = 'user' if isinstance(user, User) else 'service'
self.log.info("%s %s requested new API token", user_kind.title(), user.name)
# retrieve the model
token_model = self.token_model(orm.APIToken.find(self.db, api_token))
token_model['token'] = api_token
self.write(json.dumps(token_model))
class UserTokenAPIHandler(APIHandler):
"""API endpoint for listing/creating tokens"""
@admin_or_self
def get(self, name, token):
""""""
user = self.find_user(name)
if not user:
raise web.HTTPError(404, "No such user: %s" % name)
orm_token = orm.APIToken.find(self.db, token)
if orm_token is None:
orm_token = orm.OAuthAccessToken.find(self.db, token)
if orm_token is None or orm_token.user is not user.orm_user:
raise web.HTTPError(404, "Token not found %s", orm_token)
self.write(json.dumps(self.token_model(orm_token)))
@admin_or_self
def delete(self, name, token):
"""Delete a token"""
user = self.find_user(name)
if not user:
raise web.HTTPError(404, "No such user: %s" % name)
orm_token = orm.APIToken.find(self.db, token)
if orm_token is None:
orm_token = orm.OAuthAccessToken.find(self.db, token)
print(user)
if orm_token is None or orm_token.user is not user.orm_user:
raise web.HTTPError(404, "Token not found")
self.db.delete(orm_token)
self.db.commit()
self.set_header('Content-Type', 'text/plain')
self.set_status(204)
class UserServerAPIHandler(APIHandler):
"""Start and stop single-user servers"""
@@ -373,6 +457,8 @@ default_handlers = [
(r"/api/users/([^/]+)", UserAPIHandler),
(r"/api/users/([^/]+)/server", UserServerAPIHandler),
(r"/api/users/([^/]+)/server/progress", SpawnProgressAPIHandler),
(r"/api/users/([^/]+)/tokens", UserTokenListAPIHandler),
(r"/api/users/([^/]+)/tokens/([^/]*)", UserTokenAPIHandler),
(r"/api/users/([^/]+)/servers/([^/]*)", UserServerAPIHandler),
(r"/api/users/([^/]+)/servers/([^/]*)/progress", SpawnProgressAPIHandler),
(r"/api/users/([^/]+)/admin-access", UserAdminAccessAPIHandler),

View File

@@ -1096,7 +1096,7 @@ def test_cookie(app):
@mark.gen_test
def test_token(app):
def test_check_token(app):
name = 'book'
user = add_user(app.db, app=app, name=name)
token = user.new_api_token()
@@ -1113,7 +1113,7 @@ def test_token(app):
({}, 200),
({'Authorization': 'token bad'}, 403),
])
def test_get_new_token(app, headers, status):
def test_get_new_token_deprecated(app, headers, status):
# request a new token
r = yield api_request(app, 'authorizations', 'token',
method='post',
@@ -1131,7 +1131,7 @@ def test_get_new_token(app, headers, status):
@mark.gen_test
def test_token_formdata(app):
def test_token_formdata_deprecated(app):
"""Create a token for a user with formdata and no auth header"""
data = {
'username': 'fake',
@@ -1158,7 +1158,7 @@ def test_token_formdata(app):
('user', 'other', 403),
('user', 'user', 200),
])
def test_token_as_user(app, as_user, for_user, status):
def test_token_as_user_deprecated(app, as_user, for_user, status):
# ensure both users exist
u = add_user(app.db, app, name=as_user)
if for_user != 'missing':
@@ -1183,6 +1183,137 @@ def test_token_as_user(app, as_user, for_user, status):
assert reply['name'] == data['username']
@mark.gen_test
@mark.parametrize("headers, status, note", [
({}, 200, 'test note'),
({}, 200, ''),
({'Authorization': 'token bad'}, 403, ''),
])
def test_get_new_token(app, headers, status, note):
if note:
body = json.dumps({'note': note})
else:
body = ''
# request a new token
r = yield api_request(app, 'users/admin/tokens',
method='post',
headers=headers,
data=body,
)
assert r.status_code == status
if status != 200:
return
# check the new-token reply
reply = r.json()
assert 'token' in reply
assert reply['user'] == 'admin'
assert reply['created']
assert 'last_activity' in reply
if note:
assert reply['note'] == note
else:
assert reply['note'] == 'via api'
token = reply['token']
# check the validity of the new token
r = yield api_request(app, 'users/admin/tokens', token)
r.raise_for_status()
reply = r.json()
assert reply['user'] == 'admin'
assert reply['created']
assert 'last_activity' in reply
if note:
assert reply['note'] == note
else:
assert reply['note'] == 'via api'
# delete the token
r = yield api_request(app, 'users/admin/tokens', token,
method='delete')
assert r.status_code == 204
# verify deletion
r = yield api_request(app, 'users/admin/tokens', token)
assert r.status_code == 404
@mark.gen_test
@mark.parametrize("as_user, for_user, status", [
('admin', 'other', 200),
('admin', 'missing', 404),
('user', 'other', 403),
('user', 'user', 200),
])
def test_token_for_user(app, as_user, for_user, status):
# ensure both users exist
u = add_user(app.db, app, name=as_user)
if for_user != 'missing':
add_user(app.db, app, name=for_user)
data = {'username': for_user}
headers = {
'Authorization': 'token %s' % u.new_api_token(),
}
r = yield api_request(app, 'users', for_user, 'tokens',
method='post',
data=json.dumps(data),
headers=headers,
)
assert r.status_code == status
reply = r.json()
if status != 200:
return
assert 'token' in reply
token = reply['token']
r = yield api_request(app, 'users', for_user, 'tokens', token,
headers=headers,
)
r.raise_for_status()
reply = r.json()
assert reply['user'] == for_user
if for_user == as_user:
note = 'via api'
else:
note = 'via api by user %s' % as_user
assert reply['note'] == note
# delete the token
r = yield api_request(app, 'users', for_user, 'tokens', token,
method='delete',
headers=headers,
)
assert r.status_code == 204
r = yield api_request(app, 'users', for_user, 'tokens', token,
headers=headers,
)
assert r.status_code == 404
@mark.gen_test
@mark.parametrize("as_user, for_user, status", [
('admin', 'other', 200),
('admin', 'missing', 404),
('user', 'other', 403),
('user', 'user', 200),
])
def test_token_list(app, as_user, for_user, status):
u = add_user(app.db, app, name=as_user)
if for_user != 'missing':
for_user_obj = add_user(app.db, app, name=for_user)
headers = {
'Authorization': 'token %s' % u.new_api_token(),
}
r = yield api_request(app, 'users', for_user, 'tokens',
headers=headers,
)
assert r.status_code == status
if status != 200:
return
reply = r.json()
assert sorted(reply) == ['api_tokens', 'oauth_tokens']
assert len(reply['api_tokens']) == len(for_user_obj.api_tokens)
assert all(token['user'] == for_user for token in reply['api_tokens'])
assert all(token['user'] == for_user for token in reply['oauth_tokens'])
# ---------------
# Group API tests
# ---------------