mirror of
https://github.com/jupyterhub/jupyterhub.git
synced 2025-10-18 07:23:00 +00:00
Move exponential backoff into a function
Also use the 'Full Jitter' jitter algorithm from https://www.awsarchitectureblog.com/2015/03/backoff.html
This commit is contained in:
@@ -20,7 +20,7 @@ from .. import __version__
|
||||
from .. import orm
|
||||
from ..objects import Server
|
||||
from ..spawner import LocalProcessSpawner
|
||||
from ..utils import url_path_join, DT_SCALE
|
||||
from ..utils import url_path_join, exponential_backoff
|
||||
|
||||
# pattern for the authentication token header
|
||||
auth_header_pat = re.compile(r'^(?:token|bearer)\s+([^\s]+)$', flags=re.IGNORECASE)
|
||||
@@ -597,7 +597,7 @@ class UserSpawnHandler(BaseHandler):
|
||||
# record redirect count in query parameter
|
||||
if redirects:
|
||||
self.log.warning("Redirect loop detected on %s", self.request.uri)
|
||||
yield gen.sleep(min(1 * (DT_SCALE ** redirects), 10))
|
||||
yield gen.sleep(min(1 * (2 ** redirects), 10))
|
||||
# rewrite target url with new `redirects` query value
|
||||
url_parts = urlparse(target)
|
||||
query_parts = parse_qs(url_parts.query)
|
||||
|
@@ -26,7 +26,7 @@ from traitlets import (
|
||||
|
||||
from .objects import Server
|
||||
from .traitlets import Command, ByteSpecification
|
||||
from .utils import random_port, url_path_join, DT_MIN, DT_MAX, DT_SCALE
|
||||
from .utils import random_port, url_path_join, exponential_backoff
|
||||
|
||||
|
||||
class Spawner(LoggingConfigurable):
|
||||
@@ -666,21 +666,19 @@ class Spawner(LoggingConfigurable):
|
||||
self.log.exception("Unhandled error in poll callback for %s", self)
|
||||
return status
|
||||
|
||||
death_interval = Float(DT_MIN)
|
||||
|
||||
@gen.coroutine
|
||||
def wait_for_death(self, timeout=10):
|
||||
"""Wait for the single-user server to die, up to timeout seconds"""
|
||||
loop = IOLoop.current()
|
||||
tic = loop.time()
|
||||
dt = self.death_interval
|
||||
while dt > 0:
|
||||
@gen.coroutine
|
||||
def _wait_for_death():
|
||||
status = yield self.poll()
|
||||
if status is not None:
|
||||
break
|
||||
else:
|
||||
yield gen.sleep(dt)
|
||||
dt = min(dt * DT_SCALE, DT_MAX, timeout - (loop.time() - tic))
|
||||
return status is not None
|
||||
|
||||
yield exponential_backoff(
|
||||
_wait_for_death,
|
||||
'Process did not die in {timeout} seconds'.format(timeout=timeout),
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
|
||||
def _try_setcwd(path):
|
||||
|
@@ -4,6 +4,7 @@
|
||||
# Distributed under the terms of the Modified BSD License.
|
||||
|
||||
from binascii import b2a_hex
|
||||
import random
|
||||
import errno
|
||||
import hashlib
|
||||
from hmac import compare_digest
|
||||
@@ -48,29 +49,62 @@ def can_connect(ip, port):
|
||||
else:
|
||||
return True
|
||||
|
||||
# exponential falloff factors:
|
||||
# start at 100ms, falloff by 2x
|
||||
# never longer than 5s
|
||||
DT_MIN = 0.1
|
||||
DT_SCALE = 2
|
||||
DT_MAX = 5
|
||||
@gen.coroutine
|
||||
def exponential_backoff(
|
||||
pass_func,
|
||||
fail_message,
|
||||
start_wait=0.1,
|
||||
scale_factor=2,
|
||||
max_wait=5,
|
||||
timeout=10,
|
||||
*args, **kwargs):
|
||||
"""
|
||||
Exponentially backoff until pass_func is true.
|
||||
|
||||
This function will wait with exponential backoff + random jitter for as
|
||||
many iterations as needed, with maximum timeout timeout. If pass_func is
|
||||
still returning false at the end of timeout, a TimeoutError will be raised.
|
||||
|
||||
It'll start waiting at start_wait, scaling up by continuously multiplying itself
|
||||
by scale_factor until pass_func returns true. It'll never wait for more than
|
||||
max_wait seconds per iteration.
|
||||
|
||||
*args and **kwargs are passed to pass_func. pass_func maybe a future, although
|
||||
that is not entirely recommended.
|
||||
|
||||
It'll return the value of pass_func when it's truthy!
|
||||
"""
|
||||
loop = ioloop.IOLoop.current()
|
||||
start_tic = loop.time()
|
||||
dt = start_wait
|
||||
while True:
|
||||
if (loop.time() - start_tic) > timeout:
|
||||
# We time out!
|
||||
break
|
||||
ret = yield gen.maybe_future(pass_func(*args, **kwargs))
|
||||
# Truthy!
|
||||
if ret:
|
||||
return ret
|
||||
else:
|
||||
yield gen.sleep(dt)
|
||||
# Add some random jitter to improve performance
|
||||
# This makes sure that we don't overload any single iteration
|
||||
# of the tornado loop with too many things
|
||||
# See https://www.awsarchitectureblog.com/2015/03/backoff.html
|
||||
# for a good example of why and how this helps. We're using their
|
||||
# full Jitter implementation equivalent.
|
||||
dt = min(max_wait, random.uniform(0, dt * scale_factor))
|
||||
raise TimeoutError(fail_message)
|
||||
|
||||
|
||||
@gen.coroutine
|
||||
def wait_for_server(ip, port, timeout=10):
|
||||
"""Wait for any server to show up at ip:port."""
|
||||
if ip in {'', '0.0.0.0'}:
|
||||
ip = '127.0.0.1'
|
||||
loop = ioloop.IOLoop.current()
|
||||
tic = loop.time()
|
||||
dt = DT_MIN
|
||||
while dt > 0:
|
||||
if can_connect(ip, port):
|
||||
return
|
||||
else:
|
||||
yield gen.sleep(dt)
|
||||
dt = min(dt * DT_SCALE, DT_MAX, timeout - (loop.time() - tic))
|
||||
raise TimeoutError(
|
||||
"Server at {ip}:{port} didn't respond in {timeout} seconds".format(**locals())
|
||||
yield exponential_backoff(
|
||||
lambda: can_connect(ip, port),
|
||||
"Server at {ip}:{port} didn't respond in {timeout} seconds".format(ip=ip, port=port, timeout=timeout)
|
||||
)
|
||||
|
||||
|
||||
@@ -80,13 +114,12 @@ def wait_for_http_server(url, timeout=10):
|
||||
|
||||
Any non-5XX response code will do, even 404.
|
||||
"""
|
||||
loop = ioloop.IOLoop.current()
|
||||
tic = loop.time()
|
||||
client = AsyncHTTPClient()
|
||||
dt = DT_MIN
|
||||
while dt > 0:
|
||||
@gen.coroutine
|
||||
def is_reachable():
|
||||
try:
|
||||
r = yield client.fetch(url, follow_redirects=False)
|
||||
return r
|
||||
except HTTPError as e:
|
||||
if e.code >= 500:
|
||||
# failed to respond properly, wait and try again
|
||||
@@ -95,25 +128,21 @@ def wait_for_http_server(url, timeout=10):
|
||||
# but 502 or other proxy error is conceivable
|
||||
app_log.warning(
|
||||
"Server at %s responded with error: %s", url, e.code)
|
||||
yield gen.sleep(dt)
|
||||
else:
|
||||
app_log.debug("Server at %s responded with %s", url, e.code)
|
||||
return e.response
|
||||
except (OSError, socket.error) as e:
|
||||
if e.errno not in {errno.ECONNABORTED, errno.ECONNREFUSED, errno.ECONNRESET}:
|
||||
app_log.warning("Failed to connect to %s (%s)", url, e)
|
||||
yield gen.sleep(dt)
|
||||
else:
|
||||
return r
|
||||
dt = min(dt * DT_SCALE, DT_MAX, timeout - (loop.time() - tic))
|
||||
|
||||
raise TimeoutError(
|
||||
"Server at {url} didn't respond in {timeout} seconds".format(**locals())
|
||||
return False
|
||||
re = yield exponential_backoff(
|
||||
is_reachable,
|
||||
"Server at {url} didn't respond in {timeout} seconds".format(url=url, timeout=timeout)
|
||||
)
|
||||
return re
|
||||
|
||||
|
||||
# Decorators for authenticated Handlers
|
||||
|
||||
def auth_decorator(check_auth):
|
||||
"""Make an authentication decorator.
|
||||
|
||||
|
Reference in New Issue
Block a user