add iterate_until utility

allows iterating through an async generator, yielding items until another Future resolves

if/when that deadline Future resolves, ready items will continue to be yielded until there is one that actually needs to wait
at which point the iteration will halt
This commit is contained in:
Min RK
2018-04-05 10:56:00 +02:00
parent c9e12182a2
commit 707b300bd6
4 changed files with 104 additions and 42 deletions

View File

@@ -10,7 +10,7 @@ from tornado import web
from tornado.iostream import StreamClosedError
from .. import orm
from ..utils import admin_only, maybe_future, url_path_join
from ..utils import admin_only, iterate_until, maybe_future, url_path_join
from .base import APIHandler
@@ -339,28 +339,7 @@ class SpawnProgressAPIHandler(APIHandler):
raise web.HTTPError(400, "%s is not starting...", spawner._log_name)
# retrieve progress events from the Spawner
progress_iter = spawner._generate_progress().__aiter__()
while True:
event_future = asyncio.ensure_future(progress_iter.__anext__())
await asyncio.wait(
[event_future, spawn_future],
return_when=asyncio.FIRST_COMPLETED)
if event_future.done():
try:
event = event_future.result()
except StopAsyncIteration:
break
elif spawn_future.done():
# spawn is done *and* event is not ready
# cancel event future to avoid warnings about
# unawaited tasks
if not event_future.cancelled():
event_future.cancel()
break
else:
# neither is done, this shouldn't be possible
continue
async for event in iterate_until(spawn_future, spawner._generate_progress()):
# don't allow events to sneakily set the 'ready' flag
if 'ready' in event:
event.pop('ready', None)

View File

@@ -32,7 +32,7 @@ from traitlets import (
from .objects import Server
from .traitlets import Command, ByteSpecification, Callable
from .utils import maybe_future, random_port, url_path_join, exponential_backoff
from .utils import iterate_until, maybe_future, random_port, url_path_join, exponential_backoff
class Spawner(LoggingConfigurable):
@@ -706,25 +706,10 @@ class Spawner(LoggingConfigurable):
await yield_({
"progress": 0,
"message": "Server requested",
})
})
progress_iter = self.progress().__aiter__()
while True:
f = asyncio.ensure_future(progress_iter.__anext__())
await asyncio.wait(
[f, spawn_future],
return_when=asyncio.FIRST_COMPLETED)
if f.done():
try:
await yield_(f.result())
except StopAsyncIteration:
break
elif spawn_future.done():
# cancel event future to avoid warnings about
# unawaited tasks
if not f.cancelled():
f.cancel()
break
async for event in iterate_until(spawn_future, self.progress()):
await yield_(event)
@async_generator
async def progress(self):

View File

@@ -0,0 +1,57 @@
"""Tests for utilities"""
import asyncio
import pytest
from async_generator import async_generator, yield_
from ..utils import iterate_until
@async_generator
async def yield_n(n, delay=0.01):
"""Yield n items with a delay between each"""
for i in range(n):
if delay:
await asyncio.sleep(delay)
await yield_(i)
def schedule_future(io_loop, *, delay, result=None):
"""Construct a Future that will resolve after a delay"""
f = asyncio.Future()
if delay:
io_loop.call_later(delay, lambda: f.set_result(result))
else:
f.set_result(result)
return f
@pytest.mark.gen_test
@pytest.mark.parametrize("deadline, n, delay, expected", [
(0, 3, 1, []),
(0, 3, 0, [0, 1, 2]),
(5, 3, 0.01, [0, 1, 2]),
(0.5, 10, 0.2, [0, 1]),
])
async def test_iterate_until(io_loop, deadline, n, delay, expected):
f = schedule_future(io_loop, delay=deadline)
yielded = []
async for item in iterate_until(f, yield_n(n, delay=delay)):
yielded.append(item)
assert yielded == expected
@pytest.mark.gen_test
async def test_iterate_until_ready_after_deadline(io_loop):
f = schedule_future(io_loop, delay=0)
@async_generator
async def gen():
for i in range(5):
await yield_(i)
yielded = []
async for item in iterate_until(f, gen()):
yielded.append(item)
assert yielded == list(range(5))

View File

@@ -19,6 +19,7 @@ import threading
import uuid
import warnings
from async_generator import async_generator, yield_
from tornado import gen, ioloop, web
from tornado.platform.asyncio import to_asyncio_future
from tornado.httpclient import AsyncHTTPClient, HTTPError
@@ -445,3 +446,43 @@ def maybe_future(obj):
return asyncio.wrap_future(obj)
else:
return to_asyncio_future(gen.maybe_future(obj))
@async_generator
async def iterate_until(deadline_future, generator):
"""An async generator that yields items from a generator
until a deadline future resolves
This could *almost* be implemented as a context manager
like asyncio_timeout with a Future for the cutoff.
However, we want one distinction: continue yielding items
after the future is complete, as long as the are already finished.
Usage::
async for item in iterate_until(some_future, some_async_generator()):
print(item)
"""
aiter = generator.__aiter__()
while True:
item_future = asyncio.ensure_future(aiter.__anext__())
await asyncio.wait(
[item_future, deadline_future],
return_when=asyncio.FIRST_COMPLETED)
if item_future.done():
try:
await yield_(item_future.result())
except StopAsyncIteration:
break
elif deadline_future.done():
# deadline is done *and* next item is not ready
# cancel item future to avoid warnings about
# unawaited tasks
if not item_future.cancelled():
item_future.cancel()
break
else:
# neither is done, this shouldn't happen
continue