From 707b300bd6c0147d5dba7fbe5d0bf7b27c161b5c Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 5 Apr 2018 10:56:00 +0200 Subject: [PATCH] add iterate_until utility allows iterating through an async generator, yielding items until another Future resolves if/when that deadline Future resolves, ready items will continue to be yielded until there is one that actually needs to wait at which point the iteration will halt --- jupyterhub/apihandlers/users.py | 25 ++------------- jupyterhub/spawner.py | 23 +++---------- jupyterhub/tests/test_utils.py | 57 +++++++++++++++++++++++++++++++++ jupyterhub/utils.py | 41 ++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 42 deletions(-) create mode 100644 jupyterhub/tests/test_utils.py diff --git a/jupyterhub/apihandlers/users.py b/jupyterhub/apihandlers/users.py index 1598770a..069f02ad 100644 --- a/jupyterhub/apihandlers/users.py +++ b/jupyterhub/apihandlers/users.py @@ -10,7 +10,7 @@ from tornado import web from tornado.iostream import StreamClosedError from .. import orm -from ..utils import admin_only, maybe_future, url_path_join +from ..utils import admin_only, iterate_until, maybe_future, url_path_join from .base import APIHandler @@ -339,28 +339,7 @@ class SpawnProgressAPIHandler(APIHandler): raise web.HTTPError(400, "%s is not starting...", spawner._log_name) # retrieve progress events from the Spawner - progress_iter = spawner._generate_progress().__aiter__() - while True: - event_future = asyncio.ensure_future(progress_iter.__anext__()) - await asyncio.wait( - [event_future, spawn_future], - return_when=asyncio.FIRST_COMPLETED) - if event_future.done(): - try: - event = event_future.result() - except StopAsyncIteration: - break - elif spawn_future.done(): - # spawn is done *and* event is not ready - # cancel event future to avoid warnings about - # unawaited tasks - if not event_future.cancelled(): - event_future.cancel() - break - else: - # neither is done, this shouldn't be possible - continue - + async for event in iterate_until(spawn_future, spawner._generate_progress()): # don't allow events to sneakily set the 'ready' flag if 'ready' in event: event.pop('ready', None) diff --git a/jupyterhub/spawner.py b/jupyterhub/spawner.py index 8453915a..822617d9 100644 --- a/jupyterhub/spawner.py +++ b/jupyterhub/spawner.py @@ -32,7 +32,7 @@ from traitlets import ( from .objects import Server from .traitlets import Command, ByteSpecification, Callable -from .utils import maybe_future, random_port, url_path_join, exponential_backoff +from .utils import iterate_until, maybe_future, random_port, url_path_join, exponential_backoff class Spawner(LoggingConfigurable): @@ -706,25 +706,10 @@ class Spawner(LoggingConfigurable): await yield_({ "progress": 0, "message": "Server requested", - }) + }) - progress_iter = self.progress().__aiter__() - while True: - f = asyncio.ensure_future(progress_iter.__anext__()) - await asyncio.wait( - [f, spawn_future], - return_when=asyncio.FIRST_COMPLETED) - if f.done(): - try: - await yield_(f.result()) - except StopAsyncIteration: - break - elif spawn_future.done(): - # cancel event future to avoid warnings about - # unawaited tasks - if not f.cancelled(): - f.cancel() - break + async for event in iterate_until(spawn_future, self.progress()): + await yield_(event) @async_generator async def progress(self): diff --git a/jupyterhub/tests/test_utils.py b/jupyterhub/tests/test_utils.py new file mode 100644 index 00000000..d2f1bfa6 --- /dev/null +++ b/jupyterhub/tests/test_utils.py @@ -0,0 +1,57 @@ +"""Tests for utilities""" + +import asyncio +import pytest + +from async_generator import async_generator, yield_ +from ..utils import iterate_until + + +@async_generator +async def yield_n(n, delay=0.01): + """Yield n items with a delay between each""" + for i in range(n): + if delay: + await asyncio.sleep(delay) + await yield_(i) + + +def schedule_future(io_loop, *, delay, result=None): + """Construct a Future that will resolve after a delay""" + f = asyncio.Future() + if delay: + io_loop.call_later(delay, lambda: f.set_result(result)) + else: + f.set_result(result) + return f + + +@pytest.mark.gen_test +@pytest.mark.parametrize("deadline, n, delay, expected", [ + (0, 3, 1, []), + (0, 3, 0, [0, 1, 2]), + (5, 3, 0.01, [0, 1, 2]), + (0.5, 10, 0.2, [0, 1]), +]) +async def test_iterate_until(io_loop, deadline, n, delay, expected): + f = schedule_future(io_loop, delay=deadline) + + yielded = [] + async for item in iterate_until(f, yield_n(n, delay=delay)): + yielded.append(item) + assert yielded == expected + + +@pytest.mark.gen_test +async def test_iterate_until_ready_after_deadline(io_loop): + f = schedule_future(io_loop, delay=0) + + @async_generator + async def gen(): + for i in range(5): + await yield_(i) + + yielded = [] + async for item in iterate_until(f, gen()): + yielded.append(item) + assert yielded == list(range(5)) diff --git a/jupyterhub/utils.py b/jupyterhub/utils.py index 6545ba6d..4745aabb 100644 --- a/jupyterhub/utils.py +++ b/jupyterhub/utils.py @@ -19,6 +19,7 @@ import threading import uuid import warnings +from async_generator import async_generator, yield_ from tornado import gen, ioloop, web from tornado.platform.asyncio import to_asyncio_future from tornado.httpclient import AsyncHTTPClient, HTTPError @@ -445,3 +446,43 @@ def maybe_future(obj): return asyncio.wrap_future(obj) else: return to_asyncio_future(gen.maybe_future(obj)) + + +@async_generator +async def iterate_until(deadline_future, generator): + """An async generator that yields items from a generator + until a deadline future resolves + + This could *almost* be implemented as a context manager + like asyncio_timeout with a Future for the cutoff. + + However, we want one distinction: continue yielding items + after the future is complete, as long as the are already finished. + + Usage:: + + async for item in iterate_until(some_future, some_async_generator()): + print(item) + + """ + aiter = generator.__aiter__() + while True: + item_future = asyncio.ensure_future(aiter.__anext__()) + await asyncio.wait( + [item_future, deadline_future], + return_when=asyncio.FIRST_COMPLETED) + if item_future.done(): + try: + await yield_(item_future.result()) + except StopAsyncIteration: + break + elif deadline_future.done(): + # deadline is done *and* next item is not ready + # cancel item future to avoid warnings about + # unawaited tasks + if not item_future.cancelled(): + item_future.cancel() + break + else: + # neither is done, this shouldn't happen + continue