add user.count_active_users

produces summary of active/pending/ready spawner counts

Avoids brittle bookkeeping of running counts,
computing the value upon request.

For 10k users this is still only a few milliseconds, which seems worth it
This commit is contained in:
Min RK
2017-08-01 17:00:45 +02:00
parent 7520d4b81e
commit f49606dff6
3 changed files with 68 additions and 45 deletions

View File

@@ -368,27 +368,41 @@ class BaseHandler(RequestHandler):
return self.settings.get('concurrent_spawn_limit', 0) return self.settings.get('concurrent_spawn_limit', 0)
@property @property
def spawn_pending_count(self): def concurrent_user_limit(self):
return self.settings.setdefault('_spawn_pending_count', 0) return self.settings.get('concurrent_user_limit', 0)
@spawn_pending_count.setter
def spawn_pending_count(self, value):
self.settings['_spawn_pending_count'] = value
@gen.coroutine @gen.coroutine
def spawn_single_user(self, user, server_name='', options=None): def spawn_single_user(self, user, server_name='', options=None):
if server_name in user.spawners and user.spawners[server_name]._spawn_pending: if server_name in user.spawners and user.spawners[server_name].pending:
raise RuntimeError("Spawn already pending for: %s" % user.name) raise RuntimeError("Spawn already pending for: %s" % user.name)
# count active users and pending spawns
# we could do careful bookkeeping to avoid
# but for 10k users this takes ~5ms
# and saves us from bookkeeping errors
active_counts = self.users.count_active_users()
spawn_pending_count = active_counts['spawn_pending'] + active_counts['proxy_pending']
active_count = active_counts['active']
concurrent_spawn_limit = self.concurrent_spawn_limit concurrent_spawn_limit = self.concurrent_spawn_limit
if concurrent_spawn_limit and self.spawn_pending_count >= concurrent_spawn_limit: concurrent_user_limit = self.concurrent_user_limit
if concurrent_spawn_limit and spawn_pending_count >= concurrent_spawn_limit:
self.log.info( self.log.info(
'%s pending spawns, throttling', '%s pending spawns, throttling',
concurrent_spawn_limit, spawn_pending_count,
) )
raise web.HTTPError( raise web.HTTPError(
429, 429,
"User startup rate limit exceeded. Try to start again in a few minutes.") "User startup rate limit exceeded. Try again in a few minutes.")
if concurrent_user_limit and active_count >= concurrent_user_limit:
self.log.info(
'%s servers active, no space available',
active_count,
)
raise web.HTTPError(
429,
"Active user limit exceeded. Try again in a few minutes.")
tic = IOLoop.current().time() tic = IOLoop.current().time()
user_server_name = user.name user_server_name = user.name
@@ -401,15 +415,14 @@ class BaseHandler(RequestHandler):
f = user.spawn(server_name, options) f = user.spawn(server_name, options)
# increment spawn_pending only after spawn starts
self.log.debug("%i%s concurrent spawns", self.log.debug("%i%s concurrent spawns",
self.spawn_pending_count, spawn_pending_count,
'/%i' % concurrent_spawn_limit if concurrent_spawn_limit else '') '/%i' % concurrent_spawn_limit if concurrent_spawn_limit else '')
# FIXME: Move this out of settings, since this isn't really a setting self.log.debug("%i%s active users",
self.spawn_pending_count += 1 active_count,
'/%i' % concurrent_user_limit if concurrent_user_limit else '')
spawner = user.spawners[server_name] spawner = user.spawners[server_name]
spawner._proxy_pending = True
@gen.coroutine @gen.coroutine
def finish_user_spawn(f=None): def finish_user_spawn(f=None):
@@ -420,12 +433,12 @@ class BaseHandler(RequestHandler):
""" """
if f and f.exception() is not None: if f and f.exception() is not None:
# failed, don't add to the proxy # failed, don't add to the proxy
self.spawn_pending_count -= 1
return return
toc = IOLoop.current().time() toc = IOLoop.current().time()
self.log.info("User %s took %.3f seconds to start", user_server_name, toc-tic) self.log.info("User %s took %.3f seconds to start", user_server_name, toc-tic)
self.statsd.timing('spawner.success', (toc - tic) * 1000) self.statsd.timing('spawner.success', (toc - tic) * 1000)
try: try:
spawner._proxy_pending = True
yield self.proxy.add_user(user, server_name) yield self.proxy.add_user(user, server_name)
except Exception: except Exception:
self.log.exception("Failed to add %s to proxy!", user_server_name) self.log.exception("Failed to add %s to proxy!", user_server_name)
@@ -435,7 +448,6 @@ class BaseHandler(RequestHandler):
spawner.add_poll_callback(self.user_stopped, user) spawner.add_poll_callback(self.user_stopped, user)
finally: finally:
spawner._proxy_pending = False spawner._proxy_pending = False
self.spawn_pending_count -= 1
try: try:
yield gen.with_timeout(timedelta(seconds=self.slow_spawn_timeout), f) yield gen.with_timeout(timedelta(seconds=self.slow_spawn_timeout), f)
@@ -462,14 +474,9 @@ class BaseHandler(RequestHandler):
# schedule finish for when the user finishes spawning # schedule finish for when the user finishes spawning
IOLoop.current().add_future(f, finish_user_spawn) IOLoop.current().add_future(f, finish_user_spawn)
else: else:
self.spawn_pending_count -= 1
toc = IOLoop.current().time() toc = IOLoop.current().time()
self.statsd.timing('spawner.failure', (toc - tic) * 1000) self.statsd.timing('spawner.failure', (toc - tic) * 1000)
raise web.HTTPError(500, "Spawner failed to start [status=%s]" % status) raise web.HTTPError(500, "Spawner failed to start [status=%s]" % status)
except Exception:
# error in start
self.spawn_pending_count -= 1
raise
else: else:
yield finish_user_spawn() yield finish_user_spawn()

View File

@@ -392,7 +392,6 @@ def test_make_admin(app):
@mark.gen_test @mark.gen_test
def test_spawn(app): def test_spawn(app):
settings = app.tornado_application.settings
db = app.db db = app.db
name = 'wash' name = 'wash'
user = add_user(db, app=app, name=name) user = add_user(db, app=app, name=name)
@@ -444,12 +443,11 @@ def test_spawn(app):
assert before_servers == after_servers assert before_servers == after_servers
tokens = list(db.query(orm.APIToken).filter(orm.APIToken.user_id == user.id)) tokens = list(db.query(orm.APIToken).filter(orm.APIToken.user_id == user.id))
assert tokens == [] assert tokens == []
assert settings['_spawn_pending_count'] == 0 assert app.users.count_active_users()['pending'] == 0
@mark.gen_test @mark.gen_test
def test_slow_spawn(app, no_patience, slow_spawn): def test_slow_spawn(app, no_patience, slow_spawn):
settings = app.tornado_application.settings
db = app.db db = app.db
name = 'zoe' name = 'zoe'
app_user = add_user(db, app=app, name=name) app_user = add_user(db, app=app, name=name)
@@ -459,7 +457,7 @@ def test_slow_spawn(app, no_patience, slow_spawn):
assert app_user.spawner is not None assert app_user.spawner is not None
assert app_user.spawner._spawn_pending assert app_user.spawner._spawn_pending
assert not app_user.spawner._stop_pending assert not app_user.spawner._stop_pending
assert settings['_spawn_pending_count'] == 1 assert app.users.count_active_users()['pending'] == 1
@gen.coroutine @gen.coroutine
def wait_spawn(): def wait_spawn():
@@ -493,31 +491,29 @@ def test_slow_spawn(app, no_patience, slow_spawn):
assert app_user.spawner is not None assert app_user.spawner is not None
r = yield api_request(app, 'users', name, 'server', method='delete') r = yield api_request(app, 'users', name, 'server', method='delete')
assert r.status_code == 400 assert r.status_code == 400
assert settings['_spawn_pending_count'] == 0 assert app.users.count_active_users()['pending'] == 0
assert app.users.count_active_users()['active'] == 0
@mark.gen_test @mark.gen_test
def test_never_spawn(app, no_patience, never_spawn): def test_never_spawn(app, no_patience, never_spawn):
settings = app.tornado_application.settings
db = app.db db = app.db
name = 'badger' name = 'badger'
app_user = add_user(db, app=app, name=name) app_user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post') r = yield api_request(app, 'users', name, 'server', method='post')
assert app_user.spawner is not None assert app_user.spawner is not None
assert app_user.spawner._spawn_pending assert app_user.spawner._spawn_pending
assert settings['_spawn_pending_count'] == 1 assert app.users.count_active_users()['pending'] == 1
@gen.coroutine while app_user.spawner.pending:
def wait_pending(): yield gen.sleep(0.1)
while app_user.spawner._spawn_pending: print(app_user.spawner.pending)
yield gen.sleep(0.1)
yield wait_pending()
assert not app_user.spawner._spawn_pending assert not app_user.spawner._spawn_pending
status = yield app_user.spawner.poll() status = yield app_user.spawner.poll()
assert status is not None assert status is not None
# failed spawn should decrements pending count # failed spawn should decrements pending count
assert settings['_spawn_pending_count'] == 0 assert app.users.count_active_users()['pending'] == 0
@mark.gen_test @mark.gen_test
@@ -528,7 +524,7 @@ def test_bad_spawn(app, no_patience, bad_spawn):
user = add_user(db, app=app, name=name) user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post') r = yield api_request(app, 'users', name, 'server', method='post')
assert r.status_code == 500 assert r.status_code == 500
assert settings['_spawn_pending_count'] == 0 assert app.users.count_active_users()['pending'] == 0
@mark.gen_test @mark.gen_test
@@ -539,11 +535,11 @@ def test_slow_bad_spawn(app, no_patience, slow_bad_spawn):
user = add_user(db, app=app, name=name) user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post') r = yield api_request(app, 'users', name, 'server', method='post')
r.raise_for_status() r.raise_for_status()
while user.spawner._spawn_pending: while user.spawner.pending:
yield gen.sleep(0.1) yield gen.sleep(0.1)
# spawn failed # spawn failed
assert not user.running('') assert not user.running('')
assert settings['_spawn_pending_count'] == 0 assert app.users.count_active_users()['pending'] == 0
@mark.gen_test @mark.gen_test
@@ -561,7 +557,7 @@ def test_spawn_limit(app, no_patience, slow_spawn, request):
for name in names: for name in names:
yield api_request(app, 'users', name, 'server', method='post') yield api_request(app, 'users', name, 'server', method='post')
yield gen.sleep(0.5) yield gen.sleep(0.5)
assert settings['_spawn_pending_count'] == 2 assert app.users.count_active_users()['pending'] == 2
# ykka and hjarka's spawns are pending. Essun should fail with 429 # ykka and hjarka's spawns are pending. Essun should fail with 429
name = 'essun' name = 'essun'
@@ -575,16 +571,16 @@ def test_spawn_limit(app, no_patience, slow_spawn, request):
# race? hjarka could finish in this time # race? hjarka could finish in this time
# come back to this if we see intermittent failures here # come back to this if we see intermittent failures here
assert settings['_spawn_pending_count'] == 1 assert app.users.count_active_users()['pending'] == 1
r = yield api_request(app, 'users', name, 'server', method='post') r = yield api_request(app, 'users', name, 'server', method='post')
r.raise_for_status() r.raise_for_status()
assert settings['_spawn_pending_count'] == 2 assert app.users.count_active_users()['pending'] == 2
users.append(user) users.append(user)
while not all(u.running('') for u in users): while not all(u.running('') for u in users):
yield gen.sleep(0.1) yield gen.sleep(0.1)
# everybody's running, pending count should be back to 0 # everybody's running, pending count should be back to 0
assert settings['_spawn_pending_count'] == 0 assert app.users.count_active_users()['pending'] == 0
for u in users: for u in users:
r = yield api_request(app, 'users', u.name, 'server', method='delete') r = yield api_request(app, 'users', u.name, 'server', method='delete')
yield r.raise_for_status() yield r.raise_for_status()

View File

@@ -1,6 +1,7 @@
# Copyright (c) Jupyter Development Team. # Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License. # Distributed under the terms of the Modified BSD License.
from collections import defaultdict
from datetime import datetime, timedelta from datetime import datetime, timedelta
from urllib.parse import quote, urlparse from urllib.parse import quote, urlparse
@@ -73,6 +74,25 @@ class UserDict(dict):
db.commit() db.commit()
dict.__delitem__(self, user_id) dict.__delitem__(self, user_id)
def count_active_users(self):
"""Count the number of user servers that are active/pending/ready
Returns dict with counts of active/pending/ready servers
"""
counts = defaultdict(lambda : 0)
for user in self.values():
for spawner in user.spawners.values():
pending = spawner.pending
if pending:
counts['pending'] += 1
counts[pending + '_pending'] += 1
if spawner.active:
counts['active'] += 1
if spawner.ready:
counts['ready'] += 1
return counts
class _SpawnerDict(dict): class _SpawnerDict(dict):
def __init__(self, spawner_factory): def __init__(self, spawner_factory):
@@ -294,8 +314,8 @@ class User(HasTraits):
spawner = self.spawners[server_name] spawner = self.spawners[server_name]
spawner.orm_spawner.server = orm_server spawner.server = Server(orm_server=orm_server)
server = spawner.server assert orm_spawner.server is orm_server
# Passing user_options to the spawner # Passing user_options to the spawner
spawner.user_options = options or {} spawner.user_options = options or {}
@@ -434,7 +454,7 @@ class User(HasTraits):
# remove server entry from db # remove server entry from db
if spawner.server is not None: if spawner.server is not None:
self.db.delete(spawner.orm_spawner.server) self.db.delete(spawner.orm_spawner.server)
spawner.orm_spawner.server = None spawner.server = None
if not spawner.will_resume: if not spawner.will_resume:
# find and remove the API token if the spawner isn't # find and remove the API token if the spawner isn't
# going to re-use it next time # going to re-use it next time