mirror of
https://github.com/jupyterhub/jupyterhub.git
synced 2025-10-13 13:03:01 +00:00
Merge pull request #1356 from minrk/proxy-race
rework spawn futures to fix races
This commit is contained in:
@@ -104,22 +104,17 @@ class APIHandler(BaseHandler):
|
||||
'pending': None,
|
||||
'last_activity': user.last_activity.isoformat(),
|
||||
}
|
||||
if user.spawners['']._spawn_pending:
|
||||
model['pending'] = 'spawn'
|
||||
elif user.spawners['']._stop_pending:
|
||||
model['pending'] = 'stop'
|
||||
model['pending'] = user.spawners[''].pending or None
|
||||
|
||||
if self.allow_named_servers:
|
||||
servers = model['servers'] = {}
|
||||
for name, spawner in user.spawners.items():
|
||||
if spawner.ready:
|
||||
servers[name] = s = {'name': name}
|
||||
if spawner._spawn_pending:
|
||||
s['pending'] = 'spawn'
|
||||
elif spawner._stop_pending:
|
||||
s['pending'] = 'stop'
|
||||
if spawner.pending:
|
||||
s['pending'] = spawner.pending
|
||||
if spawner.server:
|
||||
s['url'] = user.url + name
|
||||
s['url'] = user.url + name + '/'
|
||||
return model
|
||||
|
||||
def group_model(self, group):
|
||||
|
@@ -178,19 +178,34 @@ class UserAPIHandler(APIHandler):
|
||||
|
||||
class UserServerAPIHandler(APIHandler):
|
||||
"""Start and stop single-user servers"""
|
||||
|
||||
@gen.coroutine
|
||||
@admin_or_self
|
||||
def post(self, name, server_name=''):
|
||||
user = self.find_user(name)
|
||||
if server_name:
|
||||
if not self.allow_named_servers:
|
||||
raise web.HTTPError(400, "Named servers are not enabled.")
|
||||
if server_name and not self.allow_named_servers:
|
||||
raise web.HTTPError(400, "Named servers are not enabled.")
|
||||
if self.allow_named_servers and not server_name:
|
||||
server_name = user.default_server_name()
|
||||
spawner = user.spawners[server_name]
|
||||
pending = spawner.pending
|
||||
if pending == 'spawn':
|
||||
self.set_header('Content-Type', 'text/plain')
|
||||
self.set_status(202)
|
||||
return
|
||||
elif pending:
|
||||
raise web.HTTPError(400, "%s is pending %s" % (spawner._log_name, pending))
|
||||
|
||||
if spawner.ready:
|
||||
# include notify, so that a server that died is noticed immediately
|
||||
state = yield spawner.poll_and_notify()
|
||||
# set _spawn_pending flag to prevent races while we wait
|
||||
spawner._spawn_pending = True
|
||||
try:
|
||||
state = yield spawner.poll_and_notify()
|
||||
finally:
|
||||
spawner._spawn_pending = False
|
||||
if state is None:
|
||||
raise web.HTTPError(400, "%s's server %s is already running" % (name, server_name))
|
||||
raise web.HTTPError(400, "%s is already running" % spawner._log_name)
|
||||
|
||||
options = self.get_json_body()
|
||||
yield self.spawn_single_user(user, server_name, options=options)
|
||||
@@ -209,17 +224,18 @@ class UserServerAPIHandler(APIHandler):
|
||||
raise web.HTTPError(404, "%s has no server named '%s'" % (name, server_name))
|
||||
|
||||
spawner = user.spawners[server_name]
|
||||
|
||||
if spawner._stop_pending:
|
||||
if spawner.pending == 'stop':
|
||||
self.log.debug("%s already stopping", spawner._log_name)
|
||||
self.set_header('Content-Type', 'text/plain')
|
||||
self.set_status(202)
|
||||
return
|
||||
|
||||
if not spawner.ready:
|
||||
raise web.HTTPError(400, "%s's server %s is not running" % (name, server_name))
|
||||
raise web.HTTPError(400, "%s is not running" % spawner._log_name)
|
||||
# include notify, so that a server that died is noticed immediately
|
||||
status = yield spawner.poll_and_notify()
|
||||
if status is not None:
|
||||
raise web.HTTPError(400, "%s's server %s is not running" % (name, server_name))
|
||||
raise web.HTTPError(400, "%s is not running" % spawner._log_name)
|
||||
yield self.stop_single_user(user, server_name)
|
||||
status = 202 if spawner._stop_pending else 204
|
||||
self.set_header('Content-Type', 'text/plain')
|
||||
|
@@ -20,7 +20,7 @@ from .. import __version__
|
||||
from .. import orm
|
||||
from ..objects import Server
|
||||
from ..spawner import LocalProcessSpawner
|
||||
from ..utils import url_path_join, exponential_backoff
|
||||
from ..utils import default_server_name, url_path_join, exponential_backoff
|
||||
|
||||
# pattern for the authentication token header
|
||||
auth_header_pat = re.compile(r'^(?:token|bearer)\s+([^\s]+)$', flags=re.IGNORECASE)
|
||||
@@ -376,8 +376,16 @@ class BaseHandler(RequestHandler):
|
||||
|
||||
@gen.coroutine
|
||||
def spawn_single_user(self, user, server_name='', options=None):
|
||||
if server_name in user.spawners and user.spawners[server_name].pending == 'spawn':
|
||||
raise RuntimeError("Spawn already pending for: %s" % user.name)
|
||||
user_server_name = user.name
|
||||
if self.allow_named_servers and not server_name:
|
||||
server_name = default_server_name(user)
|
||||
|
||||
if server_name:
|
||||
user_server_name = '%s:%s' % (user.name, server_name)
|
||||
|
||||
if server_name in user.spawners and user.spawners[server_name].pending:
|
||||
pending = user.spawners[server_name].pending
|
||||
raise RuntimeError("%s pending %s" % (user_server_name, pending))
|
||||
|
||||
# count active servers and pending spawns
|
||||
# we could do careful bookkeeping to avoid
|
||||
@@ -391,32 +399,26 @@ class BaseHandler(RequestHandler):
|
||||
active_server_limit = self.active_server_limit
|
||||
|
||||
if concurrent_spawn_limit and spawn_pending_count >= concurrent_spawn_limit:
|
||||
self.log.info(
|
||||
'%s pending spawns, throttling',
|
||||
spawn_pending_count,
|
||||
)
|
||||
raise web.HTTPError(
|
||||
429,
|
||||
"User startup rate limit exceeded. Try again in a few minutes.")
|
||||
self.log.info(
|
||||
'%s pending spawns, throttling',
|
||||
spawn_pending_count,
|
||||
)
|
||||
raise web.HTTPError(
|
||||
429,
|
||||
"User startup rate limit exceeded. Try again in a few minutes.",
|
||||
)
|
||||
if active_server_limit and active_count >= active_server_limit:
|
||||
self.log.info(
|
||||
'%s servers active, no space available',
|
||||
active_count,
|
||||
)
|
||||
raise web.HTTPError(
|
||||
429,
|
||||
"Active user limit exceeded. Try again in a few minutes.")
|
||||
self.log.info(
|
||||
'%s servers active, no space available',
|
||||
active_count,
|
||||
)
|
||||
raise web.HTTPError(429, "Active user limit exceeded. Try again in a few minutes.")
|
||||
|
||||
tic = IOLoop.current().time()
|
||||
user_server_name = user.name
|
||||
if server_name:
|
||||
user_server_name = '%s:%s' % (user.name, server_name)
|
||||
else:
|
||||
user_server_name = user.name
|
||||
|
||||
self.log.debug("Initiating spawn for %s", user_server_name)
|
||||
|
||||
f = user.spawn(server_name, options)
|
||||
spawn_future = user.spawn(server_name, options)
|
||||
|
||||
self.log.debug("%i%s concurrent spawns",
|
||||
spawn_pending_count,
|
||||
@@ -426,22 +428,28 @@ class BaseHandler(RequestHandler):
|
||||
'/%i' % active_server_limit if active_server_limit else '')
|
||||
|
||||
spawner = user.spawners[server_name]
|
||||
# set spawn_pending now, so there's no gap where _spawn_pending is False
|
||||
# while we are waiting for _proxy_pending to be set
|
||||
spawner._spawn_pending = True
|
||||
|
||||
@gen.coroutine
|
||||
def finish_user_spawn(f=None):
|
||||
def finish_user_spawn():
|
||||
"""Finish the user spawn by registering listeners and notifying the proxy.
|
||||
|
||||
If the spawner is slow to start, this is passed as an async callback,
|
||||
otherwise it is called immediately.
|
||||
"""
|
||||
if f and f.exception() is not None:
|
||||
# failed, don't add to the proxy
|
||||
return
|
||||
# wait for spawn Future
|
||||
try:
|
||||
yield spawn_future
|
||||
except Exception:
|
||||
spawner._spawn_pending = False
|
||||
raise
|
||||
toc = IOLoop.current().time()
|
||||
self.log.info("User %s took %.3f seconds to start", user_server_name, toc-tic)
|
||||
self.statsd.timing('spawner.success', (toc - tic) * 1000)
|
||||
spawner._proxy_pending = True
|
||||
try:
|
||||
spawner._proxy_pending = True
|
||||
yield self.proxy.add_user(user, server_name)
|
||||
except Exception:
|
||||
self.log.exception("Failed to add %s to proxy!", user_server_name)
|
||||
@@ -451,37 +459,41 @@ class BaseHandler(RequestHandler):
|
||||
spawner.add_poll_callback(self.user_stopped, user, server_name)
|
||||
finally:
|
||||
spawner._proxy_pending = False
|
||||
spawner._spawn_pending = False
|
||||
|
||||
try:
|
||||
yield gen.with_timeout(timedelta(seconds=self.slow_spawn_timeout), f)
|
||||
yield gen.with_timeout(timedelta(seconds=self.slow_spawn_timeout), finish_user_spawn())
|
||||
except gen.TimeoutError:
|
||||
# waiting_for_response indicates server process has started,
|
||||
# but is yet to become responsive.
|
||||
if not spawner._waiting_for_response:
|
||||
if spawner._spawn_pending and not spawner._waiting_for_response:
|
||||
# still in Spawner.start, which is taking a long time
|
||||
# we shouldn't poll while spawn is incomplete.
|
||||
self.log.warning("User %s is slow to start (timeout=%s)",
|
||||
user_server_name, self.slow_spawn_timeout)
|
||||
# schedule finish for when the user finishes spawning
|
||||
IOLoop.current().add_future(f, finish_user_spawn)
|
||||
else:
|
||||
# start has finished, but the server hasn't come up
|
||||
# check if the server died while we were waiting
|
||||
status = yield user.spawner.poll()
|
||||
if status is None:
|
||||
# hit timeout, but server's running. Hope that it'll show up soon enough,
|
||||
# though it's possible that it started at the wrong URL
|
||||
self.log.warning("User %s is slow to become responsive (timeout=%s)",
|
||||
user_server_name, self.slow_spawn_timeout)
|
||||
self.log.debug("Expecting server for %s at: %s", user_server_name, spawner.server.url)
|
||||
# schedule finish for when the user finishes spawning
|
||||
IOLoop.current().add_future(f, finish_user_spawn)
|
||||
else:
|
||||
toc = IOLoop.current().time()
|
||||
self.statsd.timing('spawner.failure', (toc - tic) * 1000)
|
||||
raise web.HTTPError(500, "Spawner failed to start [status=%s]" % status)
|
||||
else:
|
||||
yield finish_user_spawn()
|
||||
user_server_name, self.slow_spawn_timeout)
|
||||
return
|
||||
|
||||
# start has finished, but the server hasn't come up
|
||||
# check if the server died while we were waiting
|
||||
status = yield spawner.poll()
|
||||
if status is not None:
|
||||
toc = IOLoop.current().time()
|
||||
self.statsd.timing('spawner.failure', (toc - tic) * 1000)
|
||||
raise web.HTTPError(500, "Spawner failed to start [status=%s]" % status)
|
||||
|
||||
if spawner._waiting_for_response:
|
||||
# hit timeout waiting for response, but server's running.
|
||||
# Hope that it'll show up soon enough,
|
||||
# though it's possible that it started at the wrong URL
|
||||
self.log.warning("User %s is slow to become responsive (timeout=%s)",
|
||||
user_server_name, self.slow_spawn_timeout)
|
||||
self.log.debug("Expecting server for %s at: %s",
|
||||
user_server_name, spawner.server.url)
|
||||
if spawner._proxy_pending:
|
||||
# User.spawn finished, but it hasn't been added to the proxy
|
||||
# Could be due to load or a slow proxy
|
||||
self.log.warning("User %s is slow to be added to the proxy (timeout=%s)",
|
||||
user_server_name, self.slow_spawn_timeout)
|
||||
|
||||
@gen.coroutine
|
||||
def user_stopped(self, user, server_name):
|
||||
@@ -501,36 +513,37 @@ class BaseHandler(RequestHandler):
|
||||
if name not in user.spawners:
|
||||
raise KeyError("User %s has no such spawner %r", user.name, name)
|
||||
spawner = user.spawners[name]
|
||||
if spawner._stop_pending:
|
||||
raise RuntimeError("Stop already pending for: %s:%s" % (user.name, name))
|
||||
tic = IOLoop.current().time()
|
||||
yield self.proxy.delete_user(user, name)
|
||||
f = user.stop()
|
||||
@gen.coroutine
|
||||
def finish_stop(f=None):
|
||||
"""Finish the stop action by noticing that the user is stopped.
|
||||
if spawner.pending:
|
||||
raise RuntimeError("%s pending %s" % (user_server_name, spawner.pending))
|
||||
# set user._stop_pending before doing anything async
|
||||
# to avoid races
|
||||
spawner._stop_pending = True
|
||||
|
||||
If the spawner is slow to stop, this is passed as an async callback,
|
||||
otherwise it is called immediately.
|
||||
@gen.coroutine
|
||||
def stop():
|
||||
"""Stop the server
|
||||
|
||||
1. remove it from the proxy
|
||||
2. stop the server
|
||||
3. notice that it stopped
|
||||
"""
|
||||
if f and f.exception() is not None:
|
||||
# failed, don't do anything
|
||||
return
|
||||
tic = IOLoop.current().time()
|
||||
try:
|
||||
yield self.proxy.delete_user(user, name)
|
||||
yield user.stop(name)
|
||||
finally:
|
||||
spawner._stop_pending = False
|
||||
toc = IOLoop.current().time()
|
||||
self.log.info("User %s server took %.3f seconds to stop", user.name, toc-tic)
|
||||
self.log.info("User %s server took %.3f seconds to stop", user.name, toc - tic)
|
||||
|
||||
try:
|
||||
yield gen.with_timeout(timedelta(seconds=self.slow_stop_timeout), f)
|
||||
yield gen.with_timeout(timedelta(seconds=self.slow_stop_timeout), stop())
|
||||
except gen.TimeoutError:
|
||||
if spawner._stop_pending:
|
||||
# hit timeout, but stop is still pending
|
||||
self.log.warning("User %s:%s server is slow to stop", user.name, name)
|
||||
# schedule finish for when the server finishes stopping
|
||||
IOLoop.current().add_future(f, finish_stop)
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
yield finish_stop()
|
||||
|
||||
#---------------------------------------------------------------
|
||||
# template rendering
|
||||
@@ -653,7 +666,8 @@ class UserSpawnHandler(BaseHandler):
|
||||
|
||||
# logged in as correct user, spawn the server
|
||||
spawner = current_user.spawner
|
||||
if spawner._spawn_pending or spawner._proxy_pending:
|
||||
if spawner.pending:
|
||||
self.log.info("%s is pending %s", spawner._log_name, spawner.pending)
|
||||
# spawn has started, but not finished
|
||||
self.statsd.incr('redirects.user_spawn_pending', 1)
|
||||
html = self.render_template("spawn_pending.html", user=current_user)
|
||||
|
@@ -84,10 +84,11 @@ class LoginHandler(BaseHandler):
|
||||
|
||||
if user:
|
||||
already_running = False
|
||||
if user.spawner:
|
||||
if user.spawner.ready:
|
||||
status = yield user.spawner.poll()
|
||||
already_running = (status is None)
|
||||
if not already_running and not user.spawner.options_form:
|
||||
if not already_running and not user.spawner.options_form \
|
||||
and not user.spawner.pending:
|
||||
# logging in triggers spawn
|
||||
yield self.spawn_single_user(user)
|
||||
self.redirect(self.get_next_url())
|
||||
|
@@ -115,6 +115,10 @@ class SpawnHandler(BaseHandler):
|
||||
self.log.warning("User is already running: %s", url)
|
||||
self.redirect(url)
|
||||
return
|
||||
if user.spawner.pending:
|
||||
raise web.HTTPError(
|
||||
400, "%s is pending %s" % (user.spawner._log_name, user.spawner.pending)
|
||||
)
|
||||
form_options = {}
|
||||
for key, byte_list in self.request.body_arguments.items():
|
||||
form_options[key] = [ bs.decode('utf8') for bs in byte_list ]
|
||||
|
@@ -231,9 +231,10 @@ class Proxy(LoggingConfigurable):
|
||||
user.name, spawner.proxy_spec, spawner.server.host,
|
||||
)
|
||||
|
||||
if spawner._spawn_pending:
|
||||
if spawner.pending and spawner.pending != 'spawn':
|
||||
raise RuntimeError(
|
||||
"User %s's spawn is pending, shouldn't be added to the proxy yet!", user.name)
|
||||
"%s is pending %s, shouldn't be added to the proxy yet!" % (spawner._log_name, spawner.pending)
|
||||
)
|
||||
|
||||
yield self.add_route(
|
||||
spawner.proxy_spec,
|
||||
@@ -326,7 +327,7 @@ class Proxy(LoggingConfigurable):
|
||||
spec, route['target'], spawner.server,
|
||||
)
|
||||
futures.append(self.add_user(user, name))
|
||||
elif spawner._proxy_pending:
|
||||
elif spawner._spawn_pending:
|
||||
good_routes.add(spawner.proxy_spec)
|
||||
|
||||
# check service routes
|
||||
|
@@ -49,17 +49,29 @@ class Spawner(LoggingConfigurable):
|
||||
|
||||
# private attributes for tracking status
|
||||
_spawn_pending = False
|
||||
_start_pending = False
|
||||
_stop_pending = False
|
||||
_proxy_pending = False
|
||||
_waiting_for_response = False
|
||||
|
||||
@property
|
||||
def _log_name(self):
|
||||
"""Return username:servername or username
|
||||
|
||||
Used in logging for consistency with named servers.
|
||||
"""
|
||||
if self.name:
|
||||
return '%s:%s' % (self.user.name, self.name)
|
||||
else:
|
||||
return self.user.name
|
||||
|
||||
@property
|
||||
def pending(self):
|
||||
"""Return the current pending event, if any
|
||||
|
||||
Return False if nothing is pending.
|
||||
"""
|
||||
if self._spawn_pending or self._proxy_pending:
|
||||
if self._spawn_pending:
|
||||
return 'spawn'
|
||||
elif self._stop_pending:
|
||||
return 'stop'
|
||||
|
@@ -38,6 +38,27 @@ def test_create_named_server(app, named_servers):
|
||||
assert prefix == user.spawners[servername].server.base_url
|
||||
assert prefix.endswith('/user/%s/%s/' % (username, servername))
|
||||
|
||||
r = yield api_request(app, 'users', username)
|
||||
r.raise_for_status()
|
||||
|
||||
user_model = r.json()
|
||||
user_model.pop('last_activity')
|
||||
assert user_model == {
|
||||
'name': username,
|
||||
'groups': [],
|
||||
'kind': 'user',
|
||||
'admin': False,
|
||||
'pending': None,
|
||||
'server': None,
|
||||
'servers': {
|
||||
name: {
|
||||
'name': name,
|
||||
'url': url_path_join(user.url, name, '/'),
|
||||
}
|
||||
for name in ['1', servername]
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.gen_test
|
||||
def test_delete_named_server(app, named_servers):
|
||||
@@ -69,9 +90,9 @@ def test_delete_named_server(app, named_servers):
|
||||
'servers': {
|
||||
name: {
|
||||
'name': name,
|
||||
'url': url_path_join(user.url, name),
|
||||
'url': url_path_join(user.url, name, '/'),
|
||||
}
|
||||
for name in ['1', servername]
|
||||
for name in ['1']
|
||||
},
|
||||
}
|
||||
|
||||
|
@@ -317,8 +317,6 @@ class User(HasTraits):
|
||||
url of the server will be /user/:name/:server_name
|
||||
"""
|
||||
db = self.db
|
||||
if self.allow_named_servers and not server_name:
|
||||
server_name = default_server_name(self)
|
||||
|
||||
base_url = url_path_join(self.base_url, server_name) + '/'
|
||||
|
||||
@@ -368,7 +366,7 @@ class User(HasTraits):
|
||||
if (authenticator):
|
||||
yield gen.maybe_future(authenticator.pre_spawn_start(self, spawner))
|
||||
|
||||
spawner._spawn_pending = True
|
||||
spawner._start_pending = True
|
||||
# wait for spawner.start to return
|
||||
try:
|
||||
# run optional preparation work to bootstrap the notebook
|
||||
@@ -434,6 +432,7 @@ class User(HasTraits):
|
||||
user=self.name,
|
||||
), exc_info=True)
|
||||
# raise original exception
|
||||
spawner._start_pending = False
|
||||
raise e
|
||||
spawner.start_polling()
|
||||
|
||||
@@ -475,7 +474,7 @@ class User(HasTraits):
|
||||
_check_version(__version__, server_version, self.log)
|
||||
finally:
|
||||
spawner._waiting_for_response = False
|
||||
spawner._spawn_pending = False
|
||||
spawner._start_pending = False
|
||||
return self
|
||||
|
||||
@gen.coroutine
|
||||
@@ -486,6 +485,7 @@ class User(HasTraits):
|
||||
"""
|
||||
spawner = self.spawners[server_name]
|
||||
spawner._spawn_pending = False
|
||||
spawner._start_pending = False
|
||||
spawner.stop_polling()
|
||||
spawner._stop_pending = True
|
||||
try:
|
||||
|
Reference in New Issue
Block a user