take pytest-tornado for a spin

it's nice!
This commit is contained in:
Min RK
2017-02-13 17:55:50 +01:00
parent 00aa92f7b6
commit 7afbe952e6
2 changed files with 76 additions and 26 deletions

View File

@@ -2,6 +2,7 @@
mock
codecov
pytest-cov
pytest-tornado
pytest>=2.8
notebook
requests-mock

View File

@@ -6,11 +6,14 @@
import logging
import os
import signal
from subprocess import Popen
import sys
import tempfile
import time
from unittest import mock
import pytest
import requests
from tornado import gen
from .. import spawner as spawnermod
@@ -50,54 +53,64 @@ def new_spawner(db, **kwargs):
return LocalProcessSpawner(db=db, **kwargs)
def test_spawner(db, io_loop):
@pytest.mark.gen_test
def test_spawner(db, request):
spawner = new_spawner(db)
ip, port = io_loop.run_sync(spawner.start)
ip, port = yield spawner.start()
assert ip == '127.0.0.1'
assert isinstance(port, int)
assert port > 0
spawner.user.server.ip = ip
spawner.user.server.port = port
db.commit()
# wait for the process to get to the while True: loop
time.sleep(1)
status = io_loop.run_sync(spawner.poll)
status = yield spawner.poll()
assert status is None
io_loop.run_sync(spawner.stop)
status = io_loop.run_sync(spawner.poll)
yield spawner.stop()
status = yield spawner.poll()
assert status == 1
def test_single_user_spawner(db, io_loop):
@gen.coroutine
def wait_for_spawner(spawner, timeout=10):
"""Wait for an http server to show up
polling at shorter intervals for early termination
"""
deadline = time.monotonic() + timeout
def wait():
return spawner.user.server.wait_up(timeout=1, http=True)
while time.monotonic() < deadline:
status = yield spawner.poll()
assert status is None
try:
yield wait()
except TimeoutError:
continue
else:
break
yield wait()
@pytest.mark.gen_test(run_sync=False)
def test_single_user_spawner(db, request):
spawner = new_spawner(db, cmd=['jupyterhub-singleuser'])
spawner.api_token = 'secret'
ip, port = io_loop.run_sync(spawner.start)
ip, port = yield spawner.start()
assert ip == '127.0.0.1'
assert isinstance(port, int)
assert port > 0
spawner.user.server.ip = ip
spawner.user.server.port = port
db.commit()
# wait for http server to come up,
# checking for early termination every 1s
def wait():
return spawner.user.server.wait_up(timeout=1, http=True)
for i in range(30):
status = io_loop.run_sync(spawner.poll)
assert status is None
try:
io_loop.run_sync(wait)
except TimeoutError:
continue
else:
break
io_loop.run_sync(wait)
status = io_loop.run_sync(spawner.poll)
assert status == None
io_loop.run_sync(spawner.stop)
status = io_loop.run_sync(spawner.poll)
wait_for_spawner(spawner)
status = yield spawner.poll()
assert status is None
yield spawner.stop()
status = yield spawner.poll()
assert status == 0
@@ -192,3 +205,39 @@ def test_string_formatting(db):
assert s.format_string(s.notebook_dir) == 'user/%s/' % name
assert s.format_string(s.default_url) == '/base/%s' % name
@pytest.mark.gen_test
def test_popen_kwargs(db):
mock_proc = mock.Mock(spec=Popen)
def mock_popen(*args, **kwargs):
mock_proc.args = args
mock_proc.kwargs = kwargs
mock_proc.pid = 5
return mock_proc
s = new_spawner(db, popen_kwargs={'shell': True}, cmd='jupyterhub-singleuser')
with mock.patch.object(spawnermod, 'Popen', mock_popen):
yield s.start()
assert mock_proc.kwargs['shell'] == True
assert mock_proc.args[0][:1] == (['jupyterhub-singleuser'])
@pytest.mark.gen_test
def test_shell_cmd(db, tmpdir, request):
f = tmpdir.join('bashrc')
f.write('export TESTVAR=foo\n')
s = new_spawner(db,
cmd=[sys.executable, '-m', 'jupyterhub.tests.mocksu'],
shell_cmd=['bash', '--rcfile', str(f), '-i', '-c'],
)
(ip, port) = yield s.start()
request.addfinalizer(s.stop)
s.user.server.ip = ip
s.user.server.port = port
db.commit()
yield wait_for_spawner(s)
r = requests.get('http://%s:%i/env' % (ip, port))
r.raise_for_status()
env = r.json()
assert env['TESTVAR'] == 'foo'