periodically poll single-user servers

If they aren't running, unregister them
and remove them from the proxy so that future
logins are handled by the hub, and spawn new servers.
This commit is contained in:
MinRK
2014-09-22 16:04:17 -07:00
parent 1f3fe9c84d
commit 7af715864c
3 changed files with 88 additions and 7 deletions

View File

@@ -111,8 +111,8 @@ class JupyterHubApp(Application):
Useful for daemonizing jupyterhub. Useful for daemonizing jupyterhub.
""" """
) )
proxy_check_interval = Integer(int(1e4), config=True, proxy_check_interval = Integer(10, config=True,
help="Interval (in ms) at which to check if the proxy is running." help="Interval (in seconds) at which to check if the proxy is running."
) )
data_files_path = Unicode(DATA_FILES_PATH, config=True, data_files_path = Unicode(DATA_FILES_PATH, config=True,
@@ -414,7 +414,16 @@ class JupyterHubApp(Application):
if user.server: if user.server:
parts.append('running at %s' % user.server) parts.append('running at %s' % user.server)
return ' '.join(parts) return ' '.join(parts)
@gen.coroutine
def user_stopped(user):
status = yield user.spawner.poll()
self.log.warn("User %s server stopped with exit code: %s",
user.name, status,
)
yield self.proxy.delete_user(user)
yield user.stop()
for user in db.query(orm.User): for user in db.query(orm.User):
if not user.state: if not user.state:
user_summaries.append(_user_summary(user)) user_summaries.append(_user_summary(user))
@@ -425,6 +434,8 @@ class JupyterHubApp(Application):
if status is None: if status is None:
self.log.info("User %s still running", user.name) self.log.info("User %s still running", user.name)
user.spawner = spawner user.spawner = spawner
spawner.add_poll_callback(user_stopped, user)
spawner.start_polling()
else: else:
self.log.warn("Failed to load state for %s, assuming server is not running.", user.name) self.log.warn("Failed to load state for %s, assuming server is not running.", user.name)
# not running, state is invalid # not running, state is invalid
@@ -650,7 +661,7 @@ class JupyterHubApp(Application):
# only check / restart the proxy if we started it in the first place. # only check / restart the proxy if we started it in the first place.
# this means a restarted Hub cannot restart a Proxy that its # this means a restarted Hub cannot restart a Proxy that its
# predecessor started. # predecessor started.
pc = PeriodicCallback(self.check_proxy, self.proxy_check_interval) pc = PeriodicCallback(self.check_proxy, 1e3 * self.proxy_check_interval)
pc.start() pc.start()
# start the webserver # start the webserver

View File

@@ -169,8 +169,18 @@ class BaseHandler(RequestHandler):
config=self.config, config=self.config,
) )
yield self.proxy.add_user(user) yield self.proxy.add_user(user)
user.spawner.add_poll_callback(self.user_stopped, user)
raise gen.Return(user) raise gen.Return(user)
@gen.coroutine
def user_stopped(self, user):
status = yield user.spawner.poll()
self.log.warn("User %s server stopped, with exit code: %s",
user.name, status,
)
yield self.proxy.delete_user(user)
yield user.stop()
@gen.coroutine @gen.coroutine
def stop_single_user(self, user): def stop_single_user(self, user):
yield self.proxy.delete_user(user) yield self.proxy.delete_user(user)

View File

@@ -7,18 +7,16 @@ import errno
import os import os
import pwd import pwd
import signal import signal
import time
from subprocess import Popen from subprocess import Popen
from tornado import gen from tornado import gen
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop, PeriodicCallback
from IPython.config import LoggingConfigurable from IPython.config import LoggingConfigurable
from IPython.utils.traitlets import ( from IPython.utils.traitlets import (
Any, Bool, Dict, Enum, Instance, Integer, List, Unicode, Any, Bool, Dict, Enum, Instance, Integer, List, Unicode,
) )
from .utils import random_port from .utils import random_port
@@ -38,6 +36,12 @@ class Spawner(LoggingConfigurable):
hub = Any() hub = Any()
api_token = Unicode() api_token = Unicode()
poll_interval = Integer(30, config=True,
help="""Interval (in seconds) on which to poll the spawner."""
)
_callbacks = List()
_poll_callback = Any()
debug = Bool(False, config=True, debug = Bool(False, config=True,
help="Enable debug-logging of the single-user server" help="Enable debug-logging of the single-user server"
) )
@@ -134,6 +138,60 @@ class Spawner(LoggingConfigurable):
return None if it is, an exit status (0 if unknown) if it is not. return None if it is, an exit status (0 if unknown) if it is not.
""" """
raise NotImplementedError("Override in subclass. Must be a Tornado gen.coroutine.") raise NotImplementedError("Override in subclass. Must be a Tornado gen.coroutine.")
def add_poll_callback(self, callback, *args, **kwargs):
"""add a callback to fire when the subprocess stops
as noticed by periodic poll_and_notify()
"""
if args or kwargs:
cb = callback
callback = lambda : cb(*args, **kwargs)
self._callbacks.append(callback)
def stop_polling(self):
"""stop the periodic poll"""
if self._poll_callback:
self._poll_callback.stop()
self._poll_callback = None
def start_polling(self):
"""Start polling periodically
callbacks registered via `add_poll_callback` will fire
if/when the process stops.
Explicit termination via the stop method will not trigger the callbacks.
"""
if self.poll_interval <= 0:
self.log.debug("Not polling subprocess")
return
else:
self.log.debug("Polling subprocess every %is", self.poll_interval)
self.stop_polling()
self._poll_callback = PeriodicCallback(
self.poll_and_notify,
1e3 * self.poll_interval
)
self._poll_callback.start()
@gen.coroutine
def poll_and_notify(self):
"""Used as a callback to periodically poll the process,
and notify any watchers
"""
status = yield self.poll()
if status is None:
# still running, nothing to do here
return
self.stop_polling()
add_callback = IOLoop.current().add_callback
for callback in self._callbacks:
add_callback(callback)
def set_user_setuid(username): def set_user_setuid(username):
@@ -245,6 +303,7 @@ class LocalProcessSpawner(Spawner):
preexec_fn=self.make_preexec_fn(self.user.name), preexec_fn=self.make_preexec_fn(self.user.name),
) )
self.pid = self.proc.pid self.pid = self.proc.pid
self.start_polling()
@gen.coroutine @gen.coroutine
def poll(self): def poll(self):
@@ -284,6 +343,7 @@ class LocalProcessSpawner(Spawner):
if `now`, skip waiting for clean shutdown if `now`, skip waiting for clean shutdown
""" """
self.stop_polling()
if not now: if not now:
# SIGINT to request clean shutdown # SIGINT to request clean shutdown
self.log.debug("Interrupting %i", self.pid) self.log.debug("Interrupting %i", self.pid)