consolidate some test utilities in utils

instead of in test_api, test_pages

since they are used in a few places

also add user, username fixtures for generating test users
This commit is contained in:
Min RK
2019-01-04 11:03:20 +01:00
parent c009b39795
commit 8c63f669a9
6 changed files with 194 additions and 132 deletions

View File

@@ -47,7 +47,7 @@ from ..utils import random_port
from . import mocking
from .mocking import MockHub
from .utils import ssl_setup
from .utils import ssl_setup, add_user
from .test_services import mockservice_cmd
import jupyterhub.services.service
@@ -185,6 +185,43 @@ def cleanup_after(request, io_loop):
app.db.commit()
_username_counter = 0
def new_username(prefix='testuser'):
"""Return a new unique username"""
global _username_counter
_username_counter += 1
return '{}-{}'.format(prefix, _username_counter)
@fixture
def username():
"""allocate a temporary username
unique each time the fixture is used
"""
yield new_username()
@fixture
def user(app):
"""Fixture for creating a temporary user
Each time the fixture is used, a new user is created
"""
user = add_user(app.db, app, name=new_username())
yield user
@fixture
def admin_user(app, username):
"""Fixture for creating a temporary admin user"""
user = add_user(app.db, app, name=new_username('testadmin'), admin=True)
yield user
class MockServiceSpawner(jupyterhub.services.service._ServiceSpawner):
"""mock services for testing.

View File

@@ -49,7 +49,7 @@ from ..objects import Server
from ..spawner import LocalProcessSpawner, SimpleLocalProcessSpawner
from ..singleuser import SingleUserNotebookApp
from ..utils import random_port, url_path_join
from .utils import async_requests, ssl_setup
from .utils import async_requests, ssl_setup, public_host, public_url
from pamela import PAMError
@@ -223,7 +223,10 @@ class MockHub(JupyterHub):
last_activity_interval = 2
log_datefmt = '%M:%S'
external_certs = None
log_level = 10
@default('log_level')
def _default_log_level(self):
return 10
def __init__(self, *args, **kwargs):
if 'internal_certs_location' in kwargs:
@@ -351,31 +354,6 @@ class MockHub(JupyterHub):
return r.cookies
def public_host(app):
"""Return the public *host* (no URL prefix) of the given JupyterHub instance."""
if app.subdomain_host:
return app.subdomain_host
else:
return Server.from_url(app.proxy.public_url).host
def public_url(app, user_or_service=None, path=''):
"""Return the full, public base URL (including prefix) of the given JupyterHub instance."""
if user_or_service:
if app.subdomain_host:
host = user_or_service.host
else:
host = public_host(app)
prefix = user_or_service.prefix
else:
host = public_host(app)
prefix = Server.from_url(app.proxy.public_url).base_url
if path:
return host + url_path_join(prefix, path)
else:
return host + prefix
# single-user-server mocking:
class MockSingleUserServer(SingleUserNotebookApp):

View File

@@ -18,97 +18,13 @@ import jupyterhub
from .. import orm
from ..utils import url_path_join as ujoin
from .mocking import public_host, public_url
from .utils import async_requests
def check_db_locks(func):
"""Decorator that verifies no locks are held on database upon exit.
This decorator for test functions verifies no locks are held on the
application's database upon exit by creating and dropping a dummy table.
The decorator relies on an instance of JupyterHubApp being the first
argument to the decorated function.
Example
-------
@check_db_locks
def api_request(app, *api_path, **kwargs):
"""
def new_func(app, *args, **kwargs):
retval = func(app, *args, **kwargs)
temp_session = app.session_factory()
temp_session.execute('CREATE TABLE dummy (foo INT)')
temp_session.execute('DROP TABLE dummy')
temp_session.close()
return retval
return new_func
def find_user(db, name, app=None):
"""Find user in database."""
orm_user = db.query(orm.User).filter(orm.User.name == name).first()
if app is None:
return orm_user
else:
return app.users[orm_user.id]
def add_user(db, app=None, **kwargs):
"""Add a user to the database."""
orm_user = find_user(db, name=kwargs.get('name'))
if orm_user is None:
orm_user = orm.User(**kwargs)
db.add(orm_user)
else:
for attr, value in kwargs.items():
setattr(orm_user, attr, value)
db.commit()
if app:
return app.users[orm_user.id]
else:
return orm_user
def auth_header(db, name):
"""Return header with user's API authorization token."""
user = find_user(db, name)
if user is None:
user = add_user(db, name=name)
token = user.new_api_token()
return {'Authorization': 'token %s' % token}
@check_db_locks
async def api_request(app, *api_path, **kwargs):
"""Make an API request"""
base_url = app.hub.url
headers = kwargs.setdefault('headers', {})
if 'Authorization' not in headers and not kwargs.pop('noauth', False):
# make a copy to avoid modifying arg in-place
kwargs['headers'] = h = {}
h.update(headers)
h.update(auth_header(app.db, 'admin'))
url = ujoin(base_url, 'api', *api_path)
method = kwargs.pop('method', 'get')
f = getattr(async_requests, method)
if app.internal_ssl:
kwargs['cert'] = (app.internal_ssl_cert, app.internal_ssl_key)
kwargs["verify"] = app.internal_ssl_ca
resp = await f(url, **kwargs)
assert "frame-ancestors 'self'" in resp.headers['Content-Security-Policy']
assert ujoin(app.hub.base_url, "security/csp-report") in resp.headers['Content-Security-Policy']
assert 'http' not in resp.headers['Content-Security-Policy']
if not kwargs.get('stream', False) and resp.content:
assert resp.headers.get('content-type') == 'application/json'
return resp
from .utils import (
add_user,
api_request,
async_requests,
auth_header,
find_user,
)
# --------------------

View File

@@ -12,7 +12,8 @@ from requests import HTTPError
from jupyterhub import auth, crypto, orm
from .mocking import MockPAMAuthenticator, MockStructGroup, MockStructPasswd
from .test_api import add_user
from .utils import add_user
async def test_pam_auth():
authenticator = MockPAMAuthenticator()

View File

@@ -16,18 +16,15 @@ from ..auth import Authenticator
import mock
import pytest
from .mocking import FormSpawner, public_url, public_host
from .test_api import api_request, add_user
from .utils import async_requests
def get_page(path, app, hub=True, **kw):
if hub:
prefix = app.hub.base_url
else:
prefix = app.base_url
base_url = ujoin(public_host(app), prefix)
return async_requests.get(ujoin(base_url, path), **kw)
from .mocking import FormSpawner
from .utils import (
async_requests,
api_request,
add_user,
get_page,
public_url,
public_host,
)
async def test_root_no_auth(app):

View File

@@ -4,6 +4,10 @@ from concurrent.futures import ThreadPoolExecutor
from certipy import Certipy
import requests
from jupyterhub import orm
from jupyterhub.objects import Server
from jupyterhub.utils import url_path_join as ujoin
class _AsyncRequests:
"""Wrapper around requests to return a Future from request methods
@@ -46,3 +50,132 @@ def ssl_setup(cert_dir, authority_name):
"external", authority_name, overwrite=True, alt_names=alt_names
)
return external_certs
def check_db_locks(func):
"""Decorator that verifies no locks are held on database upon exit.
This decorator for test functions verifies no locks are held on the
application's database upon exit by creating and dropping a dummy table.
The decorator relies on an instance of JupyterHubApp being the first
argument to the decorated function.
Example
-------
@check_db_locks
def api_request(app, *api_path, **kwargs):
"""
def new_func(app, *args, **kwargs):
retval = func(app, *args, **kwargs)
temp_session = app.session_factory()
temp_session.execute('CREATE TABLE dummy (foo INT)')
temp_session.execute('DROP TABLE dummy')
temp_session.close()
return retval
return new_func
def find_user(db, name, app=None):
"""Find user in database."""
orm_user = db.query(orm.User).filter(orm.User.name == name).first()
if app is None:
return orm_user
else:
return app.users[orm_user.id]
def add_user(db, app=None, **kwargs):
"""Add a user to the database."""
orm_user = find_user(db, name=kwargs.get('name'))
if orm_user is None:
orm_user = orm.User(**kwargs)
db.add(orm_user)
else:
for attr, value in kwargs.items():
setattr(orm_user, attr, value)
db.commit()
if app:
return app.users[orm_user.id]
else:
return orm_user
def auth_header(db, name):
"""Return header with user's API authorization token."""
user = find_user(db, name)
if user is None:
user = add_user(db, name=name)
token = user.new_api_token()
return {'Authorization': 'token %s' % token}
@check_db_locks
async def api_request(app, *api_path, **kwargs):
"""Make an API request"""
base_url = app.hub.url
headers = kwargs.setdefault('headers', {})
if (
'Authorization' not in headers
and not kwargs.pop('noauth', False)
and 'cookies' not in kwargs
):
# make a copy to avoid modifying arg in-place
kwargs['headers'] = h = {}
h.update(headers)
h.update(auth_header(app.db, 'admin'))
url = ujoin(base_url, 'api', *api_path)
method = kwargs.pop('method', 'get')
f = getattr(async_requests, method)
if app.internal_ssl:
kwargs['cert'] = (app.internal_ssl_cert, app.internal_ssl_key)
kwargs["verify"] = app.internal_ssl_ca
resp = await f(url, **kwargs)
assert "frame-ancestors 'self'" in resp.headers['Content-Security-Policy']
assert ujoin(app.hub.base_url, "security/csp-report") in resp.headers['Content-Security-Policy']
assert 'http' not in resp.headers['Content-Security-Policy']
if not kwargs.get('stream', False) and resp.content:
assert resp.headers.get('content-type') == 'application/json'
return resp
def get_page(path, app, hub=True, **kw):
if hub:
prefix = app.hub.base_url
else:
prefix = app.base_url
base_url = ujoin(public_host(app), prefix)
return async_requests.get(ujoin(base_url, path), **kw)
def public_host(app):
"""Return the public *host* (no URL prefix) of the given JupyterHub instance."""
if app.subdomain_host:
return app.subdomain_host
else:
return Server.from_url(app.proxy.public_url).host
def public_url(app, user_or_service=None, path=''):
"""Return the full, public base URL (including prefix) of the given JupyterHub instance."""
if user_or_service:
if app.subdomain_host:
host = user_or_service.host
else:
host = public_host(app)
prefix = user_or_service.prefix
else:
host = public_host(app)
prefix = Server.from_url(app.proxy.public_url).base_url
if path:
return host + ujoin(prefix, path)
else:
return host + prefix