Merge pull request #1301 from minrk/total-running-limit

add active_server_limit
This commit is contained in:
Min RK
2017-08-03 11:08:23 +02:00
committed by GitHub
8 changed files with 234 additions and 56 deletions

View File

@@ -553,10 +553,39 @@ class JupyterHub(Application):
help="""
Maximum number of concurrent users that can be spawning at a time.
If more than this many users attempt to spawn at a time, their
request is rejected with a 429 error asking them to try again.
Spawning lots of servers at the same time can cause performance
problems for the Hub or the underlying spawning system.
Set this limit to prevent bursts of logins from attempting
to spawn too many servers at the same time.
If set to 0, no concurrent_spawn_limit is enforced.
This does not limit the number of total running servers.
See active_server_limit for that.
If more than this many users attempt to spawn at a time, their
requests will be rejected with a 429 error asking them to try again.
Users will have to wait for some of the spawning services
to finish starting before they can start their own.
If set to 0, no limit is enforced.
"""
).tag(config=True)
active_server_limit = Integer(
0,
help="""
Maximum number of concurrent servers that can be active at a time.
Setting this can limit the total resources your users can consume.
An active server is any server that's not fully stopped.
It is considered active from the time it has been requested
until the time that it has completely stopped.
If this many user servers are active, users will not be able to
launch new servers until a server is shutdown.
Spawn requests will be rejected with a 429 error asking them to try again.
If set to 0, no limit is enforced.
"""
).tag(config=True)
@@ -1273,6 +1302,7 @@ class JupyterHub(Application):
allow_named_servers=self.allow_named_servers,
oauth_provider=self.oauth_provider,
concurrent_spawn_limit=self.concurrent_spawn_limit,
active_server_limit=self.active_server_limit,
)
# allow configured settings to have priority
settings.update(self.tornado_settings)

View File

@@ -371,27 +371,41 @@ class BaseHandler(RequestHandler):
return self.settings.get('concurrent_spawn_limit', 0)
@property
def spawn_pending_count(self):
return self.settings.setdefault('_spawn_pending_count', 0)
@spawn_pending_count.setter
def spawn_pending_count(self, value):
self.settings['_spawn_pending_count'] = value
def active_server_limit(self):
return self.settings.get('active_server_limit', 0)
@gen.coroutine
def spawn_single_user(self, user, server_name='', options=None):
if server_name in user.spawners and user.spawners[server_name]._spawn_pending:
if server_name in user.spawners and user.spawners[server_name].pending == 'spawn':
raise RuntimeError("Spawn already pending for: %s" % user.name)
# count active servers and pending spawns
# we could do careful bookkeeping to avoid
# but for 10k users this takes ~5ms
# and saves us from bookkeeping errors
active_counts = self.users.count_active_users()
spawn_pending_count = active_counts['spawn_pending'] + active_counts['proxy_pending']
active_count = active_counts['active']
concurrent_spawn_limit = self.concurrent_spawn_limit
if concurrent_spawn_limit and self.spawn_pending_count >= concurrent_spawn_limit:
active_server_limit = self.active_server_limit
if concurrent_spawn_limit and spawn_pending_count >= concurrent_spawn_limit:
self.log.info(
'%s pending spawns, throttling',
concurrent_spawn_limit,
spawn_pending_count,
)
raise web.HTTPError(
429,
"User startup rate limit exceeded. Try to start again in a few minutes.")
"User startup rate limit exceeded. Try again in a few minutes.")
if active_server_limit and active_count >= active_server_limit:
self.log.info(
'%s servers active, no space available',
active_count,
)
raise web.HTTPError(
429,
"Active user limit exceeded. Try again in a few minutes.")
tic = IOLoop.current().time()
user_server_name = user.name
@@ -404,15 +418,14 @@ class BaseHandler(RequestHandler):
f = user.spawn(server_name, options)
# increment spawn_pending only after spawn starts
self.log.debug("%i%s concurrent spawns",
self.spawn_pending_count,
spawn_pending_count,
'/%i' % concurrent_spawn_limit if concurrent_spawn_limit else '')
# FIXME: Move this out of settings, since this isn't really a setting
self.spawn_pending_count += 1
self.log.debug("%i%s active servers",
active_count,
'/%i' % active_server_limit if active_server_limit else '')
spawner = user.spawners[server_name]
spawner._proxy_pending = True
@gen.coroutine
def finish_user_spawn(f=None):
@@ -423,12 +436,12 @@ class BaseHandler(RequestHandler):
"""
if f and f.exception() is not None:
# failed, don't add to the proxy
self.spawn_pending_count -= 1
return
toc = IOLoop.current().time()
self.log.info("User %s took %.3f seconds to start", user_server_name, toc-tic)
self.statsd.timing('spawner.success', (toc - tic) * 1000)
try:
spawner._proxy_pending = True
yield self.proxy.add_user(user, server_name)
except Exception:
self.log.exception("Failed to add %s to proxy!", user_server_name)
@@ -438,7 +451,6 @@ class BaseHandler(RequestHandler):
spawner.add_poll_callback(self.user_stopped, user)
finally:
spawner._proxy_pending = False
self.spawn_pending_count -= 1
try:
yield gen.with_timeout(timedelta(seconds=self.slow_spawn_timeout), f)
@@ -465,14 +477,9 @@ class BaseHandler(RequestHandler):
# schedule finish for when the user finishes spawning
IOLoop.current().add_future(f, finish_user_spawn)
else:
self.spawn_pending_count -= 1
toc = IOLoop.current().time()
self.statsd.timing('spawner.failure', (toc - tic) * 1000)
raise web.HTTPError(500, "Spawner failed to start [status=%s]" % status)
except Exception:
# error in start
self.spawn_pending_count -= 1
raise
else:
yield finish_user_spawn()

View File

@@ -62,6 +62,11 @@ class Server(HasTraits):
return self.connect_port
return self.port
@classmethod
def from_orm(cls, orm_server):
"""Create a server from an orm.Server"""
return cls(orm_server=orm_server)
@classmethod
def from_url(cls, url):
"""Create a Server from a given URL"""

View File

@@ -235,7 +235,7 @@ class Service(LoggingConfigurable):
@property
def server(self):
if self.orm.server:
return Server(orm_server=self.orm.server)
return Server.from_orm(self.orm.server)
else:
return None

View File

@@ -15,13 +15,15 @@ import warnings
from subprocess import Popen
from tempfile import mkdtemp
from sqlalchemy import inspect
from tornado import gen
from tornado.ioloop import PeriodicCallback, IOLoop
from traitlets.config import LoggingConfigurable
from traitlets import (
Any, Bool, Dict, Instance, Integer, Float, List, Unicode,
validate,
observe, validate,
)
from .objects import Server
@@ -51,9 +53,50 @@ class Spawner(LoggingConfigurable):
_proxy_pending = False
_waiting_for_response = False
@property
def pending(self):
"""Return the current pending event, if any
Return False if nothing is pending.
"""
if self._spawn_pending or self._proxy_pending:
return 'spawn'
elif self._stop_pending:
return 'stop'
return False
@property
def ready(self):
"""Is this server ready to use?
A server is not ready if an event is pending.
"""
if self.pending:
return False
if self.server is None:
return False
return True
@property
def active(self):
"""Return True if the server is active.
This includes fully running and ready or any pending start/stop event.
"""
return bool(self.pending or self.ready)
authenticator = Any()
hub = Any()
orm_spawner = Any()
@observe('orm_spawner')
def _orm_spawner_changed(self, change):
if change.new and change.new.server:
self._server = Server(orm_server=change.new.server)
else:
self._server = None
user = Any()
def __init_subclass__(cls, **kwargs):
@@ -70,8 +113,24 @@ class Spawner(LoggingConfigurable):
@property
def server(self):
if hasattr(self, '_server'):
return self._server
if self.orm_spawner and self.orm_spawner.server:
return Server(orm_server=self.orm_spawner.server)
@server.setter
def server(self, server):
self._server = server
if self.orm_spawner:
if self.orm_spawner.server is not None:
# delete the old value
db = inspect(self.orm_spawner.server).session
db.delete(self.orm_spawner.server)
if server is None:
self.orm_spawner.server = None
else:
self.orm_spawner.server = server.orm_server
@property
def name(self):
if self.orm_spawner:

View File

@@ -392,7 +392,6 @@ def test_make_admin(app):
@mark.gen_test
def test_spawn(app):
settings = app.tornado_application.settings
db = app.db
name = 'wash'
user = add_user(db, app=app, name=name)
@@ -444,12 +443,11 @@ def test_spawn(app):
assert before_servers == after_servers
tokens = list(db.query(orm.APIToken).filter(orm.APIToken.user_id == user.id))
assert tokens == []
assert settings['_spawn_pending_count'] == 0
assert app.users.count_active_users()['pending'] == 0
@mark.gen_test
def test_slow_spawn(app, no_patience, slow_spawn):
settings = app.tornado_application.settings
db = app.db
name = 'zoe'
app_user = add_user(db, app=app, name=name)
@@ -459,7 +457,7 @@ def test_slow_spawn(app, no_patience, slow_spawn):
assert app_user.spawner is not None
assert app_user.spawner._spawn_pending
assert not app_user.spawner._stop_pending
assert settings['_spawn_pending_count'] == 1
assert app.users.count_active_users()['pending'] == 1
@gen.coroutine
def wait_spawn():
@@ -493,31 +491,29 @@ def test_slow_spawn(app, no_patience, slow_spawn):
assert app_user.spawner is not None
r = yield api_request(app, 'users', name, 'server', method='delete')
assert r.status_code == 400
assert settings['_spawn_pending_count'] == 0
assert app.users.count_active_users()['pending'] == 0
assert app.users.count_active_users()['active'] == 0
@mark.gen_test
def test_never_spawn(app, no_patience, never_spawn):
settings = app.tornado_application.settings
db = app.db
name = 'badger'
app_user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post')
assert app_user.spawner is not None
assert app_user.spawner._spawn_pending
assert settings['_spawn_pending_count'] == 1
assert app.users.count_active_users()['pending'] == 1
@gen.coroutine
def wait_pending():
while app_user.spawner._spawn_pending:
yield gen.sleep(0.1)
while app_user.spawner.pending:
yield gen.sleep(0.1)
print(app_user.spawner.pending)
yield wait_pending()
assert not app_user.spawner._spawn_pending
status = yield app_user.spawner.poll()
assert status is not None
# failed spawn should decrements pending count
assert settings['_spawn_pending_count'] == 0
assert app.users.count_active_users()['pending'] == 0
@mark.gen_test
@@ -528,7 +524,7 @@ def test_bad_spawn(app, no_patience, bad_spawn):
user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post')
assert r.status_code == 500
assert settings['_spawn_pending_count'] == 0
assert app.users.count_active_users()['pending'] == 0
@mark.gen_test
@@ -539,11 +535,11 @@ def test_slow_bad_spawn(app, no_patience, slow_bad_spawn):
user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post')
r.raise_for_status()
while user.spawner._spawn_pending:
while user.spawner.pending:
yield gen.sleep(0.1)
# spawn failed
assert not user.running('')
assert settings['_spawn_pending_count'] == 0
assert app.users.count_active_users()['pending'] == 0
@mark.gen_test
@@ -561,7 +557,7 @@ def test_spawn_limit(app, no_patience, slow_spawn, request):
for name in names:
yield api_request(app, 'users', name, 'server', method='post')
yield gen.sleep(0.5)
assert settings['_spawn_pending_count'] == 2
assert app.users.count_active_users()['pending'] == 2
# ykka and hjarka's spawns are pending. Essun should fail with 429
name = 'essun'
@@ -575,23 +571,82 @@ def test_spawn_limit(app, no_patience, slow_spawn, request):
# race? hjarka could finish in this time
# come back to this if we see intermittent failures here
assert settings['_spawn_pending_count'] == 1
assert app.users.count_active_users()['pending'] == 1
r = yield api_request(app, 'users', name, 'server', method='post')
r.raise_for_status()
assert settings['_spawn_pending_count'] == 2
assert app.users.count_active_users()['pending'] == 2
users.append(user)
while not all(u.running('') for u in users):
yield gen.sleep(0.1)
# everybody's running, pending count should be back to 0
assert settings['_spawn_pending_count'] == 0
assert app.users.count_active_users()['pending'] == 0
for u in users:
r = yield api_request(app, 'users', u.name, 'server', method='delete')
yield r.raise_for_status()
while any(u.running('') for u in users):
while any(u.spawner.active for u in users):
yield gen.sleep(0.1)
@mark.gen_test
def test_active_server_limit(app, request):
db = app.db
settings = app.tornado_application.settings
settings['active_server_limit'] = 2
def _restore_limit():
settings['active_server_limit'] = 0
request.addfinalizer(_restore_limit)
# start two pending spawns
names = ['ykka', 'hjarka']
users = [ add_user(db, app=app, name=name) for name in names ]
for name in names:
r = yield api_request(app, 'users', name, 'server', method='post')
r.raise_for_status()
counts = app.users.count_active_users()
assert counts['active'] == 2
assert counts['ready'] == 2
assert counts['pending'] == 0
# ykka and hjarka's servers are running. Essun should fail with 429
name = 'essun'
user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post')
assert r.status_code == 429
counts = app.users.count_active_users()
assert counts['active'] == 2
assert counts['ready'] == 2
assert counts['pending'] == 0
# stop one server
yield api_request(app, 'users', names[0], 'server', method='delete')
counts = app.users.count_active_users()
assert counts['active'] == 1
assert counts['ready'] == 1
assert counts['pending'] == 0
r = yield api_request(app, 'users', name, 'server', method='post')
r.raise_for_status()
counts = app.users.count_active_users()
assert counts['active'] == 2
assert counts['ready'] == 2
assert counts['pending'] == 0
users.append(user)
# everybody's running, pending count should be back to 0
assert app.users.count_active_users()['pending'] == 0
for u in users:
if not u.spawner.active:
continue
r = yield api_request(app, 'users', u.name, 'server', method='delete')
r.raise_for_status()
counts = app.users.count_active_users()
assert counts['active'] == 0
assert counts['ready'] == 0
assert counts['pending'] == 0
@mark.gen_test
def test_get_proxy(app):
r = yield api_request(app, 'proxy')

View File

@@ -16,7 +16,7 @@ import pytest
from tornado import gen
from ..user import User
from ..objects import Hub
from ..objects import Hub, Server
from .. import spawner as spawnermod
from ..spawner import LocalProcessSpawner, Spawner
from .. import orm
@@ -234,7 +234,10 @@ def test_shell_cmd(db, tmpdir, request):
cmd=[sys.executable, '-m', 'jupyterhub.tests.mocksu'],
shell_cmd=['bash', '--rcfile', str(f), '-i', '-c'],
)
s.orm_spawner.server = orm.Server()
server = orm.Server()
db.add(server)
db.commit()
s.server = Server.from_orm(server)
db.commit()
(ip, port) = yield s.start()
request.addfinalizer(s.stop)

View File

@@ -1,6 +1,7 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
from collections import defaultdict
from datetime import datetime, timedelta
from urllib.parse import quote, urlparse
@@ -8,12 +9,13 @@ from oauth2.error import ClientNotFoundError
from sqlalchemy import inspect
from tornado import gen
from tornado.log import app_log
from traitlets import HasTraits, Any, Dict, default
from .utils import url_path_join, default_server_name
from . import orm
from ._version import _check_version, __version__
from traitlets import HasTraits, Any, Dict, observe, default
from .objects import Server
from .spawner import LocalProcessSpawner
from .crypto import encrypt, decrypt, CryptKeeper, EncryptionUnavailable, InvalidToken
@@ -73,6 +75,25 @@ class UserDict(dict):
db.commit()
dict.__delitem__(self, user_id)
def count_active_users(self):
"""Count the number of user servers that are active/pending/ready
Returns dict with counts of active/pending/ready servers
"""
counts = defaultdict(lambda : 0)
for user in self.values():
for spawner in user.spawners.values():
pending = spawner.pending
if pending:
counts['pending'] += 1
counts[pending + '_pending'] += 1
if spawner.active:
counts['active'] += 1
if spawner.ready:
counts['ready'] += 1
return counts
class _SpawnerDict(dict):
def __init__(self, spawner_factory):
@@ -294,8 +315,8 @@ class User(HasTraits):
spawner = self.spawners[server_name]
spawner.orm_spawner.server = orm_server
server = spawner.server
spawner.server = server = Server(orm_server=orm_server)
assert spawner.orm_spawner.server is orm_server
# Passing user_options to the spawner
spawner.user_options = options or {}
@@ -432,9 +453,7 @@ class User(HasTraits):
spawner.orm_spawner.state = spawner.get_state()
self.last_activity = datetime.utcnow()
# remove server entry from db
if spawner.server is not None:
self.db.delete(spawner.orm_spawner.server)
spawner.orm_spawner.server = None
spawner.server = None
if not spawner.will_resume:
# find and remove the API token if the spawner isn't
# going to re-use it next time