Merge pull request #220 from minrk/coverage

add some test coverage
This commit is contained in:
Min RK
2015-04-07 16:21:45 -07:00
10 changed files with 230 additions and 8 deletions

4
.coveragerc Normal file
View File

@@ -0,0 +1,4 @@
[run]
omit =
jupyterhub/tests/*
jupyterhub/singleuser.py

3
.gitignore vendored
View File

@@ -14,3 +14,6 @@ share/jupyter/hub/static/css/style.min.css
share/jupyter/hub/static/css/style.min.css.map
*.egg-info
MANIFEST
.coverage
htmlcov

View File

@@ -12,4 +12,6 @@ install:
- pip install -f travis-wheels/wheelhouse -r dev-requirements.txt .
- pip install -f travis-wheels/wheelhouse ipython[notebook]
script:
- py.test jupyterhub
- py.test --cov jupyterhub jupyterhub/tests
after_success:
- coveralls

View File

@@ -1,2 +1,4 @@
-r requirements.txt
coveralls
pytest-cov
pytest

View File

@@ -1003,6 +1003,9 @@ class JupyterHub(Application):
# register cleanup on both TERM and INT
atexit.register(self.atexit)
self.init_signal()
def init_signal(self):
signal.signal(signal.SIGTERM, self.sigterm)
def sigterm(self, signum, frame):
@@ -1027,7 +1030,10 @@ class JupyterHub(Application):
if not self.io_loop:
return
if self.http_server:
if self.io_loop._running:
self.io_loop.add_callback(self.http_server.stop)
else:
self.http_server.stop()
self.io_loop.add_callback(self.io_loop.stop)
@gen.coroutine

View File

@@ -126,11 +126,11 @@ class LocalAuthenticator(Authenticator):
def check_group_whitelist(self, username):
if not self.group_whitelist:
return False
for group in self.group_whitelist:
for grnam in self.group_whitelist:
try:
group = getgrnam(self.group_whitelist)
group = getgrnam(grnam)
except KeyError:
self.log.error('No such group: [%s]' % self.group_whitelist)
self.log.error('No such group: [%s]' % grnam)
continue
if username in group.gr_mem:
return True

View File

@@ -7,6 +7,8 @@ import threading
from unittest import mock
import requests
from tornado import gen
from tornado.concurrent import Future
from tornado.ioloop import IOLoop
@@ -95,12 +97,18 @@ class MockHub(JupyterHub):
def _admin_users_default(self):
return {'admin'}
def init_signal(self):
pass
def start(self, argv=None):
self.db_file = NamedTemporaryFile()
self.db_url = 'sqlite:///' + self.db_file.name
evt = threading.Event()
@gen.coroutine
def _start_co():
assert self.io_loop._running
# put initialize in start for SQLAlchemy threading reasons
yield super(MockHub, self).initialize(argv=argv)
# add an initial user
@@ -108,16 +116,19 @@ class MockHub(JupyterHub):
self.db.add(user)
self.db.commit()
yield super(MockHub, self).start()
yield self.hub.server.wait_up(http=True)
self.io_loop.add_callback(evt.set)
def _start():
self.io_loop = IOLoop.current()
self.io_loop = IOLoop()
self.io_loop.make_current()
self.io_loop.add_callback(_start_co)
self.io_loop.start()
self._thread = threading.Thread(target=_start)
self._thread.start()
evt.wait(timeout=5)
ready = evt.wait(timeout=10)
assert ready
def stop(self):
super().stop()
@@ -126,3 +137,15 @@ class MockHub(JupyterHub):
# ignore the call that will fire in atexit
self.cleanup = lambda : None
self.db_file.close()
def login_user(self, name):
r = requests.post(self.proxy.public_server.url + 'hub/login',
data={
'username': name,
'password': name,
},
allow_redirects=False,
)
assert r.cookies
return r.cookies

View File

@@ -1,6 +1,7 @@
"""Tests for the REST API"""
import json
import time
from datetime import timedelta
import requests
@@ -284,3 +285,18 @@ def test_get_proxy(app, io_loop):
r.raise_for_status()
reply = r.json()
assert list(reply.keys()) == ['/']
def test_shutdown(app):
r = api_request(app, 'shutdown', method='post', data=json.dumps({
'servers': True,
'proxy': True,
}))
r.raise_for_status()
reply = r.json()
for i in range(100):
if app.io_loop._running:
time.sleep(0.1)
else:
break
assert not app.io_loop._running

View File

@@ -3,8 +3,13 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
from subprocess import CalledProcessError
from unittest import mock
import pytest
from .mocking import MockPAMAuthenticator
from jupyterhub import auth, orm
def test_pam_auth(io_loop):
authenticator = MockPAMAuthenticator()
@@ -39,3 +44,106 @@ def test_pam_auth_whitelist(io_loop):
'password': 'mal',
}))
assert authorized is None
class MockGroup:
def __init__(self, *names):
self.gr_mem = names
def test_pam_auth_group_whitelist(io_loop):
g = MockGroup('kaylee')
def getgrnam(name):
return g
authenticator = MockPAMAuthenticator(group_whitelist={'group'})
with mock.patch.object(auth, 'getgrnam', getgrnam):
authorized = io_loop.run_sync(lambda : authenticator.authenticate(None, {
'username': 'kaylee',
'password': 'kaylee',
}))
assert authorized == 'kaylee'
with mock.patch.object(auth, 'getgrnam', getgrnam):
authorized = io_loop.run_sync(lambda : authenticator.authenticate(None, {
'username': 'mal',
'password': 'mal',
}))
assert authorized is None
def test_pam_auth_no_such_group(io_loop):
authenticator = MockPAMAuthenticator(group_whitelist={'nosuchcrazygroup'})
authorized = io_loop.run_sync(lambda : authenticator.authenticate(None, {
'username': 'kaylee',
'password': 'kaylee',
}))
assert authorized is None
def test_wont_add_system_user(io_loop):
user = orm.User(name='lioness4321')
authenticator = auth.PAMAuthenticator(whitelist={'mal'})
authenticator.create_system_users = False
with pytest.raises(KeyError):
io_loop.run_sync(lambda : authenticator.add_user(user))
def test_cant_add_system_user(io_loop):
user = orm.User(name='lioness4321')
authenticator = auth.PAMAuthenticator(whitelist={'mal'})
authenticator.create_system_users = True
def check_output(cmd, *a, **kw):
raise CalledProcessError(1, cmd)
with mock.patch.object(auth, 'check_output', check_output):
with pytest.raises(RuntimeError):
io_loop.run_sync(lambda : authenticator.add_user(user))
def test_add_system_user(io_loop):
user = orm.User(name='lioness4321')
authenticator = auth.PAMAuthenticator(whitelist={'mal'})
authenticator.create_system_users = True
def check_output(*a, **kw):
return
record = {}
def check_call(cmd, *a, **kw):
record['cmd'] = cmd
with mock.patch.object(auth, 'check_output', check_output), \
mock.patch.object(auth, 'check_call', check_call):
io_loop.run_sync(lambda : authenticator.add_user(user))
assert user.name in record['cmd']
def test_delete_user(io_loop):
user = orm.User(name='zoe')
a = MockPAMAuthenticator(whitelist={'mal'})
assert 'zoe' not in a.whitelist
a.add_user(user)
assert 'zoe' in a.whitelist
a.delete_user(user)
assert 'zoe' not in a.whitelist
def test_urls():
a = auth.PAMAuthenticator()
logout = a.logout_url('/base/url/')
login = a.login_url('/base/url')
assert logout == '/base/url/logout'
assert login == '/base/url/login'
def test_handlers(app):
a = auth.PAMAuthenticator()
handlers = a.get_handlers(app)
assert handlers[0][0] == '/login'

View File

@@ -0,0 +1,58 @@
"""Tests for HTML pages"""
import requests
from ..utils import url_path_join as ujoin
from .. import orm
def get_page(path, app, **kw):
base_url = ujoin(app.proxy.public_server.host, app.hub.server.base_url)
print(base_url)
return requests.get(ujoin(base_url, path), **kw)
def test_root_no_auth(app, io_loop):
print(app.hub.server.is_up())
routes = io_loop.run_sync(app.proxy.get_routes)
print(routes)
print(app.hub.server)
r = requests.get(app.proxy.public_server.host)
r.raise_for_status()
assert r.url == ujoin(app.proxy.public_server.host, app.hub.server.base_url)
def test_root_auth(app):
cookies = app.login_user('river')
r = requests.get(app.proxy.public_server.host, cookies=cookies)
r.raise_for_status()
assert r.url == ujoin(app.proxy.public_server.host, '/user/river')
def test_home_no_auth(app):
r = get_page('home', app, allow_redirects=False)
r.raise_for_status()
assert r.status_code == 302
assert '/hub/login' in r.headers['Location']
def test_home_auth(app):
cookies = app.login_user('river')
r = get_page('home', app, cookies=cookies)
r.raise_for_status()
assert r.url.endswith('home')
def test_admin_no_auth(app):
r = get_page('admin', app)
assert r.status_code == 403
def test_admin_not_admin(app):
cookies = app.login_user('wash')
r = get_page('admin', app, cookies=cookies)
assert r.status_code == 403
def test_admin(app):
cookies = app.login_user('river')
u = orm.User.find(app.db, 'river')
u.admin = True
app.db.commit()
r = get_page('admin', app, cookies=cookies)
r.raise_for_status()
assert r.url.endswith('/admin')