mirror of
https://github.com/jupyterhub/jupyterhub.git
synced 2025-10-18 15:33:02 +00:00
Merge pull request #2721 from minrk/async-init-spawners
Add JupyterHub.init_spawners_timeout
This commit is contained in:
@@ -589,11 +589,14 @@ class SpawnProgressAPIHandler(APIHandler):
|
|||||||
async with aclosing(
|
async with aclosing(
|
||||||
iterate_until(spawn_future, spawner._generate_progress())
|
iterate_until(spawn_future, spawner._generate_progress())
|
||||||
) as events:
|
) as events:
|
||||||
async for event in events:
|
try:
|
||||||
# don't allow events to sneakily set the 'ready' flag
|
async for event in events:
|
||||||
if 'ready' in event:
|
# don't allow events to sneakily set the 'ready' flag
|
||||||
event.pop('ready', None)
|
if 'ready' in event:
|
||||||
await self.send_event(event)
|
event.pop('ready', None)
|
||||||
|
await self.send_event(event)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
# progress finished, wait for spawn to actually resolve,
|
# progress finished, wait for spawn to actually resolve,
|
||||||
# in case progress finished early
|
# in case progress finished early
|
||||||
|
@@ -11,8 +11,10 @@ import re
|
|||||||
import signal
|
import signal
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from datetime import timedelta
|
||||||
from datetime import timezone
|
from datetime import timezone
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from getpass import getuser
|
from getpass import getuser
|
||||||
@@ -984,6 +986,28 @@ class JupyterHub(Application):
|
|||||||
""",
|
""",
|
||||||
).tag(config=True)
|
).tag(config=True)
|
||||||
|
|
||||||
|
init_spawners_timeout = Integer(
|
||||||
|
10,
|
||||||
|
help="""
|
||||||
|
Timeout (in seconds) to wait for spawners to initialize
|
||||||
|
|
||||||
|
Checking if spawners are healthy can take a long time
|
||||||
|
if many spawners are active at hub start time.
|
||||||
|
|
||||||
|
If it takes longer than this timeout to check,
|
||||||
|
init_spawner will be left to complete in the background
|
||||||
|
and the http server is allowed to start.
|
||||||
|
|
||||||
|
A timeout of -1 means wait forever,
|
||||||
|
which can mean a slow startup of the Hub
|
||||||
|
but ensures that the Hub is fully consistent by the time it starts responding to requests.
|
||||||
|
This matches the behavior of jupyterhub 1.0.
|
||||||
|
|
||||||
|
.. versionadded: 1.1.0
|
||||||
|
|
||||||
|
""",
|
||||||
|
).tag(config=True)
|
||||||
|
|
||||||
db_url = Unicode(
|
db_url = Unicode(
|
||||||
'sqlite:///jupyterhub.sqlite',
|
'sqlite:///jupyterhub.sqlite',
|
||||||
help="url for the database. e.g. `sqlite:///jupyterhub.sqlite`",
|
help="url for the database. e.g. `sqlite:///jupyterhub.sqlite`",
|
||||||
@@ -1835,6 +1859,7 @@ class JupyterHub(Application):
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def init_spawners(self):
|
async def init_spawners(self):
|
||||||
|
self.log.debug("Initializing spawners")
|
||||||
db = self.db
|
db = self.db
|
||||||
|
|
||||||
def _user_summary(user):
|
def _user_summary(user):
|
||||||
@@ -1925,6 +1950,8 @@ class JupyterHub(Application):
|
|||||||
else:
|
else:
|
||||||
self.log.debug("%s not running", spawner._log_name)
|
self.log.debug("%s not running", spawner._log_name)
|
||||||
|
|
||||||
|
spawner._check_pending = False
|
||||||
|
|
||||||
# parallelize checks for running Spawners
|
# parallelize checks for running Spawners
|
||||||
check_futures = []
|
check_futures = []
|
||||||
for orm_user in db.query(orm.User):
|
for orm_user in db.query(orm.User):
|
||||||
@@ -1935,11 +1962,22 @@ class JupyterHub(Application):
|
|||||||
# spawner should be running
|
# spawner should be running
|
||||||
# instantiate Spawner wrapper and check if it's still alive
|
# instantiate Spawner wrapper and check if it's still alive
|
||||||
spawner = user.spawners[name]
|
spawner = user.spawners[name]
|
||||||
|
# signal that check is pending to avoid race conditions
|
||||||
|
spawner._check_pending = True
|
||||||
f = asyncio.ensure_future(check_spawner(user, name, spawner))
|
f = asyncio.ensure_future(check_spawner(user, name, spawner))
|
||||||
check_futures.append(f)
|
check_futures.append(f)
|
||||||
|
|
||||||
|
TOTAL_USERS.set(len(self.users))
|
||||||
|
|
||||||
|
# it's important that we get here before the first await
|
||||||
|
# so that we know all spawners are instantiated and in the check-pending state
|
||||||
|
|
||||||
# await checks after submitting them all
|
# await checks after submitting them all
|
||||||
await gen.multi(check_futures)
|
if check_futures:
|
||||||
|
self.log.debug(
|
||||||
|
"Awaiting checks for %i possibly-running spawners", len(check_futures)
|
||||||
|
)
|
||||||
|
await gen.multi(check_futures)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
# only perform this query if we are going to log it
|
# only perform this query if we are going to log it
|
||||||
@@ -1949,7 +1987,7 @@ class JupyterHub(Application):
|
|||||||
|
|
||||||
active_counts = self.users.count_active_users()
|
active_counts = self.users.count_active_users()
|
||||||
RUNNING_SERVERS.set(active_counts['active'])
|
RUNNING_SERVERS.set(active_counts['active'])
|
||||||
TOTAL_USERS.set(len(self.users))
|
return len(check_futures)
|
||||||
|
|
||||||
def init_oauth(self):
|
def init_oauth(self):
|
||||||
base_url = self.hub.base_url
|
base_url = self.hub.base_url
|
||||||
@@ -2112,6 +2150,7 @@ class JupyterHub(Application):
|
|||||||
super().initialize(*args, **kwargs)
|
super().initialize(*args, **kwargs)
|
||||||
if self.generate_config or self.generate_certs or self.subapp:
|
if self.generate_config or self.generate_certs or self.subapp:
|
||||||
return
|
return
|
||||||
|
self._start_future = asyncio.Future()
|
||||||
self.load_config_file(self.config_file)
|
self.load_config_file(self.config_file)
|
||||||
self.init_logging()
|
self.init_logging()
|
||||||
if 'JupyterHubApp' in self.config:
|
if 'JupyterHubApp' in self.config:
|
||||||
@@ -2162,11 +2201,61 @@ class JupyterHub(Application):
|
|||||||
self.init_services()
|
self.init_services()
|
||||||
await self.init_api_tokens()
|
await self.init_api_tokens()
|
||||||
self.init_tornado_settings()
|
self.init_tornado_settings()
|
||||||
await self.init_spawners()
|
|
||||||
self.cleanup_oauth_clients()
|
|
||||||
self.init_handlers()
|
self.init_handlers()
|
||||||
self.init_tornado_application()
|
self.init_tornado_application()
|
||||||
|
|
||||||
|
# init_spawners can take a while
|
||||||
|
init_spawners_timeout = self.init_spawners_timeout
|
||||||
|
if init_spawners_timeout < 0:
|
||||||
|
# negative timeout means forever (previous, most stable behavior)
|
||||||
|
init_spawners_timeout = 86400
|
||||||
|
print(init_spawners_timeout)
|
||||||
|
|
||||||
|
init_start_time = time.perf_counter()
|
||||||
|
init_spawners_future = asyncio.ensure_future(self.init_spawners())
|
||||||
|
|
||||||
|
def log_init_time(f):
|
||||||
|
n_spawners = f.result()
|
||||||
|
self.log.info(
|
||||||
|
"Initialized %i spawners in %.3f seconds",
|
||||||
|
n_spawners,
|
||||||
|
time.perf_counter() - init_start_time,
|
||||||
|
)
|
||||||
|
|
||||||
|
init_spawners_future.add_done_callback(log_init_time)
|
||||||
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
# don't allow a zero timeout because we still need to be sure
|
||||||
|
# that the Spawner objects are defined and pending
|
||||||
|
await gen.with_timeout(
|
||||||
|
timedelta(seconds=max(init_spawners_timeout, 1)), init_spawners_future
|
||||||
|
)
|
||||||
|
except gen.TimeoutError:
|
||||||
|
self.log.warning(
|
||||||
|
"init_spawners did not complete within %i seconds. "
|
||||||
|
"Allowing to complete in the background.",
|
||||||
|
self.init_spawners_timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
if init_spawners_future.done():
|
||||||
|
self.cleanup_oauth_clients()
|
||||||
|
else:
|
||||||
|
# schedule async operations after init_spawners finishes
|
||||||
|
async def finish_init_spawners():
|
||||||
|
await init_spawners_future
|
||||||
|
# schedule cleanup after spawners are all set up
|
||||||
|
# because it relies on the state resolved by init_spawners
|
||||||
|
self.cleanup_oauth_clients()
|
||||||
|
# trigger a proxy check as soon as all spawners are ready
|
||||||
|
# because this may be *after* the check made as part of normal startup.
|
||||||
|
# To avoid races with partially-complete start,
|
||||||
|
# ensure that start is complete before running this check.
|
||||||
|
await self._start_future
|
||||||
|
await self.proxy.check_routes(self.users, self._service_map)
|
||||||
|
|
||||||
|
asyncio.ensure_future(finish_init_spawners())
|
||||||
|
|
||||||
async def cleanup(self):
|
async def cleanup(self):
|
||||||
"""Shutdown managed services and various subprocesses. Cleanup runtime files."""
|
"""Shutdown managed services and various subprocesses. Cleanup runtime files."""
|
||||||
|
|
||||||
@@ -2452,6 +2541,7 @@ class JupyterHub(Application):
|
|||||||
atexit.register(self.atexit)
|
atexit.register(self.atexit)
|
||||||
# register cleanup on both TERM and INT
|
# register cleanup on both TERM and INT
|
||||||
self.init_signal()
|
self.init_signal()
|
||||||
|
self._start_future.set_result(None)
|
||||||
|
|
||||||
def init_signal(self):
|
def init_signal(self):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
@@ -879,7 +879,7 @@ class BaseHandler(RequestHandler):
|
|||||||
# clear spawner._spawn_future when it's done
|
# clear spawner._spawn_future when it's done
|
||||||
# keep an exception around, though, to prevent repeated implicit spawns
|
# keep an exception around, though, to prevent repeated implicit spawns
|
||||||
# if spawn is failing
|
# if spawn is failing
|
||||||
if f.exception() is None:
|
if f.cancelled() or f.exception() is None:
|
||||||
spawner._spawn_future = None
|
spawner._spawn_future = None
|
||||||
# Now we're all done. clear _spawn_pending flag
|
# Now we're all done. clear _spawn_pending flag
|
||||||
spawner._spawn_pending = False
|
spawner._spawn_pending = False
|
||||||
@@ -890,7 +890,7 @@ class BaseHandler(RequestHandler):
|
|||||||
# update failure count and abort if consecutive failure limit
|
# update failure count and abort if consecutive failure limit
|
||||||
# is reached
|
# is reached
|
||||||
def _track_failure_count(f):
|
def _track_failure_count(f):
|
||||||
if f.exception() is None:
|
if f.cancelled() or f.exception() is None:
|
||||||
# spawn succeeded, reset failure count
|
# spawn succeeded, reset failure count
|
||||||
self.settings['failure_count'] = 0
|
self.settings['failure_count'] = 0
|
||||||
return
|
return
|
||||||
|
@@ -86,6 +86,7 @@ class Spawner(LoggingConfigurable):
|
|||||||
_start_pending = False
|
_start_pending = False
|
||||||
_stop_pending = False
|
_stop_pending = False
|
||||||
_proxy_pending = False
|
_proxy_pending = False
|
||||||
|
_check_pending = False
|
||||||
_waiting_for_response = False
|
_waiting_for_response = False
|
||||||
_jupyterhub_version = None
|
_jupyterhub_version = None
|
||||||
_spawn_future = None
|
_spawn_future = None
|
||||||
@@ -121,6 +122,8 @@ class Spawner(LoggingConfigurable):
|
|||||||
return 'spawn'
|
return 'spawn'
|
||||||
elif self._stop_pending:
|
elif self._stop_pending:
|
||||||
return 'stop'
|
return 'stop'
|
||||||
|
elif self._check_pending:
|
||||||
|
return 'check'
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@@ -727,6 +727,7 @@ class User:
|
|||||||
spawner = self.spawners[server_name]
|
spawner = self.spawners[server_name]
|
||||||
spawner._spawn_pending = False
|
spawner._spawn_pending = False
|
||||||
spawner._start_pending = False
|
spawner._start_pending = False
|
||||||
|
spawner._check_pending = False
|
||||||
spawner.stop_polling()
|
spawner.stop_polling()
|
||||||
spawner._stop_pending = True
|
spawner._stop_pending = True
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user