mirror of
https://github.com/jupyterhub/jupyterhub.git
synced 2025-10-17 23:13:00 +00:00
increase some test coverage
This commit is contained in:
@@ -4,9 +4,14 @@
|
||||
# Distributed under the terms of the Modified BSD License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
from tornado import gen
|
||||
|
||||
from .. import spawner as spawnermod
|
||||
from ..spawner import LocalProcessSpawner
|
||||
@@ -39,6 +44,7 @@ def new_spawner(db, **kwargs):
|
||||
kwargs.setdefault('INTERRUPT_TIMEOUT', 1)
|
||||
kwargs.setdefault('TERM_TIMEOUT', 1)
|
||||
kwargs.setdefault('KILL_TIMEOUT', 1)
|
||||
kwargs.setdefault('poll_interval', 1)
|
||||
return LocalProcessSpawner(db=db, **kwargs)
|
||||
|
||||
|
||||
@@ -110,3 +116,53 @@ def test_stop_spawner_stop_now(db, io_loop):
|
||||
status = io_loop.run_sync(spawner.poll)
|
||||
assert status == -signal.SIGTERM
|
||||
|
||||
def test_spawner_poll(db, io_loop):
|
||||
first_spawner = new_spawner(db)
|
||||
user = first_spawner.user
|
||||
io_loop.run_sync(first_spawner.start)
|
||||
proc = first_spawner.proc
|
||||
status = io_loop.run_sync(first_spawner.poll)
|
||||
assert status is None
|
||||
user.state = first_spawner.get_state()
|
||||
assert 'pid' in user.state
|
||||
|
||||
# create a new Spawner, loading from state of previous
|
||||
spawner = new_spawner(db, user=first_spawner.user)
|
||||
spawner.start_polling()
|
||||
|
||||
# wait for the process to get to the while True: loop
|
||||
io_loop.run_sync(lambda : gen.sleep(1))
|
||||
status = io_loop.run_sync(spawner.poll)
|
||||
assert status is None
|
||||
|
||||
# kill the process
|
||||
proc.terminate()
|
||||
for i in range(10):
|
||||
if proc.poll() is None:
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
assert proc.poll() is not None
|
||||
|
||||
io_loop.run_sync(lambda : gen.sleep(2))
|
||||
status = io_loop.run_sync(spawner.poll)
|
||||
assert status is not None
|
||||
|
||||
def test_setcwd():
|
||||
cwd = os.getcwd()
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
td = os.path.realpath(os.path.abspath(td))
|
||||
spawnermod._try_setcwd(td)
|
||||
assert os.path.samefile(os.getcwd(), td)
|
||||
os.chdir(cwd)
|
||||
chdir = os.chdir
|
||||
temp_root = os.path.realpath(os.path.abspath(tempfile.gettempdir()))
|
||||
def raiser(path):
|
||||
path = os.path.realpath(os.path.abspath(path))
|
||||
if not path.startswith(temp_root):
|
||||
raise OSError(path)
|
||||
chdir(path)
|
||||
with mock.patch('os.chdir', raiser):
|
||||
spawnermod._try_setcwd(cwd)
|
||||
assert os.getcwd().startswith(temp_root)
|
||||
os.chdir(cwd)
|
||||
|
Reference in New Issue
Block a user