Merge pull request #1771 from minrk/progress

Progress on spawn-pending page
This commit is contained in:
Carol Willing
2018-04-12 16:16:37 -07:00
committed by GitHub
14 changed files with 644 additions and 62 deletions

View File

@@ -18,10 +18,6 @@ class APIHandler(BaseHandler):
def content_security_policy(self):
return '; '.join([super().content_security_policy, "default-src 'none'"])
def set_default_headers(self):
super().set_default_headers()
self.set_header('Content-Type', 'application/json')
def check_referer(self):
"""Check Origin for cross-site API requests.
@@ -94,50 +90,45 @@ class APIHandler(BaseHandler):
'message': message or status_message,
}))
def server_model(self, spawner):
"""Get the JSON model for a Spawner"""
return {
'name': spawner.name,
'last_activity': isoformat(spawner.orm_spawner.last_activity),
'started': isoformat(spawner.orm_spawner.started),
'pending': spawner.pending or None,
'url': url_path_join(spawner.user.url, spawner.name, '/'),
'progress_url': spawner._progress_url,
}
def user_model(self, user):
"""Get the JSON model for a User object"""
if isinstance(user, orm.User):
user = self.users[user.id]
last_activity = user.last_activity
# don't call isoformat if last_activity is None
if last_activity:
last_activity = isoformat(last_activity)
model = {
'kind': 'user',
'name': user.name,
'admin': user.admin,
'groups': [ g.name for g in user.groups ],
'server': user.url if user.running else None,
'progress_url': user.progress_url(''),
'pending': None,
'created': isoformat(user.created),
'last_activity': last_activity,
'started': None,
'last_activity': isoformat(user.last_activity),
}
if '' in user.spawners:
spawner = user.spawners['']
model['pending'] = spawner.pending or None
if spawner.active and spawner.orm_spawner.started:
model['started'] = isoformat(spawner.orm_spawner.started)
server_model = self.server_model(user.spawners[''])
# copy some values from the default server to the user model
for key in ('started', 'pending'):
model[key] = server_model[key]
if self.allow_named_servers:
servers = model['servers'] = {}
for name, spawner in user.spawners.items():
if spawner.ready:
last_activity = spawner.orm_spawner.last_activity
if last_activity:
last_activity = isoformat(last_activity)
servers[name] = s = {
'name': name,
'last_activity': last_activity,
'started': isoformat(spawner.orm_spawner.started),
}
if spawner.pending:
s['pending'] = spawner.pending
if spawner.server:
s['url'] = url_path_join(user.url, name, '/')
servers[name] = self.server_model(spawner)
return model
def group_model(self, group):

View File

@@ -3,12 +3,15 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
import json
from tornado import gen, web
from async_generator import aclosing
from tornado import web
from tornado.iostream import StreamClosedError
from .. import orm
from ..utils import admin_only, maybe_future
from ..utils import admin_only, iterate_until, maybe_future, url_path_join
from .base import APIHandler
@@ -17,6 +20,7 @@ class SelfAPIHandler(APIHandler):
Based on the authentication info. Acts as a 'whoami' for auth tokens.
"""
async def get(self):
user = self.get_current_user()
if user is None:
@@ -102,6 +106,7 @@ def admin_or_self(method):
return method(self, name, *args, **kwargs)
return m
class UserAPIHandler(APIHandler):
@admin_or_self
@@ -269,11 +274,106 @@ class UserAdminAccessAPIHandler(APIHandler):
raise web.HTTPError(404)
class SpawnProgressAPIHandler(APIHandler):
"""EventStream handler for pending spawns"""
def get_content_type(self):
return 'text/event-stream'
async def send_event(self, event):
try:
self.write('data: {}\n\n'.format(json.dumps(event)))
await self.flush()
except StreamClosedError:
self.log.warning("Stream closed while handling %s", self.request.uri)
# raise Finish to halt the handler
raise web.Finish()
@admin_or_self
async def get(self, username, server_name=''):
self.set_header('Cache-Control', 'no-cache')
if server_name is None:
server_name = ''
user = self.find_user(username)
if user is None:
# no such user
raise web.HTTPError(404)
if server_name not in user.spawners:
# user has no such server
raise web.HTTPError(404)
spawner = user.spawners[server_name]
# cases:
# - spawner already started and ready
# - spawner not running at all
# - spawner failed
# - spawner pending start (what we expect)
url = url_path_join(user.url, server_name, '/')
ready_event = {
'progress': 100,
'ready': True,
'message': "Server ready at {}".format(url),
'html_message': 'Server ready at <a href="{0}">{0}</a>'.format(url),
'url': url,
}
failed_event = {
'progress': 100,
'failed': True,
'message': "Spawn failed",
}
if spawner.ready:
# spawner already ready. Trigger progress-completion immediately
self.log.info("Server %s is already started", spawner._log_name)
await self.send_event(ready_event)
return
spawn_future = spawner._spawn_future
if not spawner._spawn_pending:
# not pending, no progress to fetch
# check if spawner has just failed
f = spawn_future
if f and f.done() and f.exception():
failed_event['message'] = "Spawn failed: %s" % f.exception()
await self.send_event(failed_event)
return
else:
raise web.HTTPError(400, "%s is not starting...", spawner._log_name)
# retrieve progress events from the Spawner
async with aclosing(iterate_until(spawn_future, spawner._generate_progress())) as events:
async for event in events:
# don't allow events to sneakily set the 'ready' flag
if 'ready' in event:
event.pop('ready', None)
await self.send_event(event)
# progress finished, wait for spawn to actually resolve,
# in case progress finished early
# (ignore errors, which will be logged elsewhere)
await asyncio.wait([spawn_future])
# progress and spawn finished, check if spawn succeeded
if spawner.ready:
# spawner is ready, signal completion and redirect
self.log.info("Server %s is ready", spawner._log_name)
await self.send_event(ready_event)
else:
# what happened? Maybe spawn failed?
f = spawn_future
if f and f.done() and f.exception():
failed_event['message'] = "Spawn failed: %s" % f.exception()
else:
self.log.warning("Server %s didn't start for unknown reason", spawner._log_name)
await self.send_event(failed_event)
default_handlers = [
(r"/api/user", SelfAPIHandler),
(r"/api/users", UserListAPIHandler),
(r"/api/users/([^/]+)", UserAPIHandler),
(r"/api/users/([^/]+)/server", UserServerAPIHandler),
(r"/api/users/([^/]+)/server/progress", SpawnProgressAPIHandler),
(r"/api/users/([^/]+)/servers/([^/]*)", UserServerAPIHandler),
(r"/api/users/([^/]+)/servers/([^/]*)/progress", SpawnProgressAPIHandler),
(r"/api/users/([^/]+)/admin-access", UserAdminAccessAPIHandler),
]

View File

@@ -135,11 +135,15 @@ class BaseHandler(RequestHandler):
"report-uri " + self.csp_report_uri,
])
def get_content_type(self):
return 'text/html'
def set_default_headers(self):
"""
Set any headers passed as tornado_settings['headers'].
By default sets Content-Security-Policy of frame-ancestors 'self'.
Also responsible for setting content-type header
"""
# wrap in HTTPHeaders for case-insensitivity
headers = HTTPHeaders(self.settings.get('headers', {}))
@@ -152,6 +156,7 @@ class BaseHandler(RequestHandler):
self.set_header('Access-Control-Allow-Headers', 'accept, content-type, authorization')
if 'Content-Security-Policy' not in headers:
self.set_header('Content-Security-Policy', self.content_security_policy)
self.set_header('Content-Type', self.get_content_type())
#---------------------------------------------------------------
# Login and cookie-related
@@ -692,11 +697,8 @@ class BaseHandler(RequestHandler):
try:
await gen.with_timeout(timedelta(seconds=self.slow_stop_timeout), stop())
except gen.TimeoutError:
if spawner._stop_pending:
# hit timeout, but stop is still pending
self.log.warning("User %s:%s server is slow to stop", user.name, name)
else:
raise
#---------------------------------------------------------------
# template rendering
@@ -891,7 +893,12 @@ class UserSpawnHandler(BaseHandler):
self.log.info("%s is pending %s", spawner._log_name, spawner.pending)
# spawn has started, but not finished
self.statsd.incr('redirects.user_spawn_pending', 1)
html = self.render_template("spawn_pending.html", user=user)
url_parts = []
html = self.render_template(
"spawn_pending.html",
user=user,
progress_url=spawner._progress_url,
)
self.finish(html)
return
@@ -919,7 +926,11 @@ class UserSpawnHandler(BaseHandler):
self.log.info("%s is pending %s", spawner._log_name, spawner.pending)
# spawn has started, but not finished
self.statsd.incr('redirects.user_spawn_pending', 1)
html = self.render_template("spawn_pending.html", user=user)
html = self.render_template(
"spawn_pending.html",
user=user,
progress_url=spawner._progress_url,
)
self.finish(html)
return

View File

@@ -5,6 +5,7 @@ Contains base Spawner class & default implementation
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
import errno
import json
import os
@@ -16,9 +17,11 @@ import warnings
from subprocess import Popen
from tempfile import mkdtemp
# FIXME: remove when we drop Python 3.5 support
from async_generator import async_generator, yield_
from sqlalchemy import inspect
from tornado import gen, concurrent
from tornado.ioloop import PeriodicCallback
from traitlets.config import LoggingConfigurable
@@ -29,7 +32,7 @@ from traitlets import (
from .objects import Server
from .traitlets import Command, ByteSpecification, Callable
from .utils import maybe_future, random_port, url_path_join, exponential_backoff
from .utils import iterate_until, maybe_future, random_port, url_path_join, exponential_backoff
class Spawner(LoggingConfigurable):
@@ -264,11 +267,10 @@ class Spawner(LoggingConfigurable):
"""Get the options form
Returns:
(Future(str)): the content of the options form presented to the user
Future (str): the content of the options form presented to the user
prior to starting a Spawner.
.. versionadded:: 0.9.0
Introduced.
.. versionadded:: 0.9
"""
if callable(self.options_form):
options_form = await maybe_future(self.options_form(self))
@@ -690,6 +692,58 @@ class Spawner(LoggingConfigurable):
if self.pre_spawn_hook:
return self.pre_spawn_hook(self)
@property
def _progress_url(self):
return self.user.progress_url(self.name)
@async_generator
async def _generate_progress(self):
"""Private wrapper of progress generator
This method is always an async generator and will always yield at least one event.
"""
if not self._spawn_pending:
raise RuntimeError("Spawn not pending, can't generate progress")
await yield_({
"progress": 0,
"message": "Server requested",
})
from async_generator import aclosing
async with aclosing(self.progress()) as progress:
async for event in progress:
await yield_(event)
@async_generator
async def progress(self):
"""Async generator for progress events
Must be an async generator
For Python 3.5-compatibility, use the async_generator package
Should yield messages of the form:
::
{
"progress": 80, # integer, out of 100
"message": text, # text message (will be escaped for HTML)
"html_message": html_text, # optional html-formatted message (may have links)
}
In HTML contexts, html_message will be displayed instead of message if present.
Progress will be updated if defined.
To update messages without progress omit the progress field.
.. versionadded:: 0.9
"""
await yield_({
"progress": 50,
"message": "Spawning server...",
})
async def start(self):
"""Start the single-user server
@@ -1042,6 +1096,7 @@ class LocalProcessSpawner(Spawner):
if self.ip:
self.server.ip = self.ip
self.server.port = self.port
self.db.commit()
return (self.ip or '127.0.0.1', self.port)
async def poll(self):

View File

@@ -7,10 +7,11 @@ import os
import logging
from getpass import getuser
from subprocess import TimeoutExpired
import time
from unittest import mock
from pytest import fixture, raises
from tornado import ioloop, gen
from tornado.httpclient import HTTPError
from .. import orm
from .. import crypto
@@ -25,6 +26,7 @@ import jupyterhub.services.service
# global db session object
_db = None
@fixture
def db():
"""Get a db session"""
@@ -53,6 +55,30 @@ def io_loop(request):
return io_loop
@fixture(autouse=True)
def cleanup_after(request, io_loop):
"""function-scoped fixture to shutdown user servers
allows cleanup of servers between tests
without having to launch a whole new app
"""
try:
yield
finally:
if not MockHub.initialized():
return
app = MockHub.instance()
for uid, user in app.users.items():
for name, spawner in list(user.spawners.items()):
if spawner.active:
try:
io_loop.run_sync(lambda: app.proxy.delete_user(user, name))
except HTTPError:
pass
io_loop.run_sync(lambda: user.stop(name))
app.db.commit()
@fixture(scope='module')
def app(request, io_loop):
"""Mock a jupyterhub app for testing"""

View File

@@ -8,9 +8,9 @@ import sys
from unittest import mock
from urllib.parse import urlparse, quote
import uuid
from async_generator import async_generator, yield_
from pytest import mark
from tornado import gen
import jupyterhub
@@ -201,10 +201,17 @@ def normalize_user(user):
"""
for key in ('created', 'last_activity', 'started'):
user[key] = normalize_timestamp(user[key])
if user['progress_url']:
user['progress_url'] = re.sub(
r'.*/hub/api',
'PREFIX/hub/api',
user['progress_url'],
)
if 'servers' in user:
for server in user['servers'].values():
for key in ('started', 'last_activity'):
server[key] = normalize_timestamp(server[key])
server['progress_url'] = re.sub(r'.*/hub/api', 'PREFIX/hub/api', server['progress_url'])
return user
def fill_user(model):
@@ -221,6 +228,7 @@ def fill_user(model):
model.setdefault('created', TIMESTAMP)
model.setdefault('last_activity', TIMESTAMP)
model.setdefault('started', None)
model.setdefault('progress_url', 'PREFIX/hub/api/users/{name}/server/progress'.format(**model))
return model
@@ -676,6 +684,204 @@ def test_slow_bad_spawn(app, no_patience, slow_bad_spawn):
assert app.users.count_active_users()['pending'] == 0
def next_event(it):
"""read an event from an eventstream"""
while True:
try:
line = next(it)
except StopIteration:
return
if line.startswith('data:'):
return json.loads(line.split(':', 1)[1])
@mark.gen_test
def test_progress(request, app, no_patience, slow_spawn):
db = app.db
name = 'martin'
app_user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post')
r.raise_for_status()
r = yield api_request(app, 'users', name, 'server/progress', stream=True)
r.raise_for_status()
request.addfinalizer(r.close)
ex = async_requests.executor
line_iter = iter(r.iter_lines(decode_unicode=True))
evt = yield ex.submit(next_event, line_iter)
assert evt == {
'progress': 0,
'message': 'Server requested',
}
evt = yield ex.submit(next_event, line_iter)
assert evt == {
'progress': 50,
'message': 'Spawning server...',
}
evt = yield ex.submit(next_event, line_iter)
url = app_user.url
assert evt == {
'progress': 100,
'message': 'Server ready at {}'.format(url),
'html_message': 'Server ready at <a href="{0}">{0}</a>'.format(url),
'url': url,
'ready': True,
}
@mark.gen_test
def test_progress_not_started(request, app):
db = app.db
name = 'nope'
app_user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post')
r.raise_for_status()
r = yield api_request(app, 'users', name, 'server', method='delete')
r.raise_for_status()
r = yield api_request(app, 'users', name, 'server/progress')
assert r.status_code == 404
@mark.gen_test
def test_progress_not_found(request, app):
db = app.db
name = 'noserver'
r = yield api_request(app, 'users', 'nosuchuser', 'server/progress')
assert r.status_code == 404
app_user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server/progress')
assert r.status_code == 404
@mark.gen_test
def test_progress_ready(request, app):
"""Test progress API when spawner is already started
e.g. a race between requesting progress and progress already being complete
"""
db = app.db
name = 'saga'
app_user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post')
r.raise_for_status()
r = yield api_request(app, 'users', name, 'server/progress', stream=True)
r.raise_for_status()
request.addfinalizer(r.close)
ex = async_requests.executor
line_iter = iter(r.iter_lines(decode_unicode=True))
evt = yield ex.submit(next_event, line_iter)
assert evt['progress'] == 100
assert evt['ready']
assert evt['url'] == app_user.url
@mark.gen_test
def test_progress_bad(request, app, no_patience, bad_spawn):
"""Test progress API when spawner has already failed"""
db = app.db
name = 'simon'
app_user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post')
assert r.status_code == 500
r = yield api_request(app, 'users', name, 'server/progress', stream=True)
r.raise_for_status()
request.addfinalizer(r.close)
ex = async_requests.executor
line_iter = iter(r.iter_lines(decode_unicode=True))
evt = yield ex.submit(next_event, line_iter)
assert evt == {
'progress': 100,
'failed': True,
'message': "Spawn failed: I don't work!",
}
@mark.gen_test
def test_progress_bad_slow(request, app, no_patience, slow_bad_spawn):
"""Test progress API when spawner fails while watching"""
db = app.db
name = 'eugene'
app_user = add_user(db, app=app, name=name)
r = yield api_request(app, 'users', name, 'server', method='post')
assert r.status_code == 202
r = yield api_request(app, 'users', name, 'server/progress', stream=True)
r.raise_for_status()
request.addfinalizer(r.close)
ex = async_requests.executor
line_iter = iter(r.iter_lines(decode_unicode=True))
evt = yield ex.submit(next_event, line_iter)
assert evt['progress'] == 0
evt = yield ex.submit(next_event, line_iter)
assert evt['progress'] == 50
evt = yield ex.submit(next_event, line_iter)
assert evt == {
'progress': 100,
'failed': True,
'message': "Spawn failed: I don't work!",
}
@async_generator
async def progress_forever():
"""progress function that yields messages forever"""
for i in range(1, 10):
await yield_({
'progress': i,
'message': 'Stage %s' % i,
})
# wait a long time before the next event
await gen.sleep(10)
if sys.version_info >= (3, 6):
# additional progress_forever defined as native
# async generator
# to test for issues with async_generator wrappers
exec("""
async def progress_forever_native():
for i in range(1, 10):
yield {
'progress': i,
'message': 'Stage %s' % i,
}
# wait a long time before the next event
await gen.sleep(10)
""", globals())
@mark.gen_test
def test_spawn_progress_cutoff(request, app, no_patience, slow_spawn):
"""Progress events stop when Spawner finishes
even if progress iterator is still going.
"""
db = app.db
name = 'geddy'
app_user = add_user(db, app=app, name=name)
if sys.version_info >= (3, 6):
# Python >= 3.6, try native async generator
app_user.spawner.progress = globals()['progress_forever_native']
else:
app_user.spawner.progress = progress_forever
app_user.spawner.delay = 1
r = yield api_request(app, 'users', name, 'server', method='post')
r.raise_for_status()
r = yield api_request(app, 'users', name, 'server/progress', stream=True)
r.raise_for_status()
request.addfinalizer(r.close)
ex = async_requests.executor
line_iter = iter(r.iter_lines(decode_unicode=True))
evt = yield ex.submit(next_event, line_iter)
assert evt['progress'] == 0
evt = yield ex.submit(next_event, line_iter)
assert evt == {
'progress': 1,
'message': 'Stage 1',
}
evt = yield ex.submit(next_event, line_iter)
assert evt['progress'] == 100
@mark.gen_test
def test_spawn_limit(app, no_patience, slow_spawn, request):
db = app.db

View File

@@ -41,6 +41,8 @@ def test_default_server(app, named_servers):
'started': TIMESTAMP,
'last_activity': TIMESTAMP,
'url': user.url,
'pending': None,
'progress_url': 'PREFIX/hub/api/users/{}/server/progress'.format(username),
},
},
})
@@ -96,6 +98,9 @@ def test_create_named_server(app, named_servers):
'started': TIMESTAMP,
'last_activity': TIMESTAMP,
'url': url_path_join(user.url, name, '/'),
'pending': None,
'progress_url': 'PREFIX/hub/api/users/{}/servers/{}/progress'.format(
username, servername),
}
for name in [servername]
},
@@ -124,13 +129,7 @@ def test_delete_named_server(app, named_servers):
assert user_model == fill_user({
'name': username,
'auth_state': None,
'servers': {
name: {
'name': name,
'url': url_path_join(user.url, name, '/'),
}
for name in []
},
'servers': {},
})
@pytest.mark.gen_test

View File

@@ -110,9 +110,6 @@ def test_spawn_redirect(app):
cookies = yield app.login_user(name)
u = app.users[orm.User.find(app.db, name)]
# ensure wash's server isn't running:
r = yield api_request(app, 'users', name, 'server', method='delete', cookies=cookies)
r.raise_for_status()
status = yield u.spawner.poll()
assert status is not None

View File

@@ -0,0 +1,59 @@
"""Tests for utilities"""
import asyncio
import pytest
from async_generator import aclosing, async_generator, yield_
from ..utils import iterate_until
@async_generator
async def yield_n(n, delay=0.01):
"""Yield n items with a delay between each"""
for i in range(n):
if delay:
await asyncio.sleep(delay)
await yield_(i)
def schedule_future(io_loop, *, delay, result=None):
"""Construct a Future that will resolve after a delay"""
f = asyncio.Future()
if delay:
io_loop.call_later(delay, lambda: f.set_result(result))
else:
f.set_result(result)
return f
@pytest.mark.gen_test
@pytest.mark.parametrize("deadline, n, delay, expected", [
(0, 3, 1, []),
(0, 3, 0, [0, 1, 2]),
(5, 3, 0.01, [0, 1, 2]),
(0.5, 10, 0.2, [0, 1]),
])
async def test_iterate_until(io_loop, deadline, n, delay, expected):
f = schedule_future(io_loop, delay=deadline)
yielded = []
async with aclosing(iterate_until(f, yield_n(n, delay=delay))) as items:
async for item in items:
yielded.append(item)
assert yielded == expected
@pytest.mark.gen_test
async def test_iterate_until_ready_after_deadline(io_loop):
f = schedule_future(io_loop, delay=0)
@async_generator
async def gen():
for i in range(5):
await yield_(i)
yielded = []
async with aclosing(iterate_until(f, gen())) as items:
async for item in items:
yielded.append(item)
assert yielded == list(range(5))

View File

@@ -320,6 +320,15 @@ class User:
else:
return self.base_url
def progress_url(self, server_name=''):
"""API URL for progress endpoint for a server with a given name"""
url_parts = [self.settings['hub'].base_url, 'api/users', self.escaped_name]
if server_name:
url_parts.extend(['servers', server_name, 'progress'])
else:
url_parts.extend(['server/progress'])
return url_path_join(*url_parts)
async def spawn(self, server_name='', options=None):
"""Start the user's spawner
@@ -397,6 +406,7 @@ class User:
if ip_port:
# get ip, port info from return value of start()
server.ip, server.port = ip_port
db.commit()
else:
# prior to 0.7, spawners had to store this info in user.server themselves.
# Handle < 0.7 behavior with a warning, assuming info was stored in db by the Spawner.
@@ -545,5 +555,10 @@ class User:
except Exception:
self.log.exception("Error in Authenticator.post_spawn_stop for %s", self)
spawner._stop_pending = False
# pop the Spawner object
if not (
spawner._spawn_future and
(not spawner._spawn_future.done() or spawner._spawn_future.exception())
):
# pop Spawner *unless* it's stopping due to an error
# because some pages serve latest-spawn error messages
self.spawners.pop(server_name)

View File

@@ -19,6 +19,7 @@ import threading
import uuid
import warnings
from async_generator import aclosing, async_generator, yield_
from tornado import gen, ioloop, web
from tornado.platform.asyncio import to_asyncio_future
from tornado.httpclient import AsyncHTTPClient, HTTPError
@@ -44,6 +45,10 @@ def isoformat(dt):
Naïve datetime objects are assumed to be UTC
"""
# allow null timestamps to remain None without
# having to check if isoformat should be called
if dt is None:
return None
if dt.tzinfo:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt.isoformat() + 'Z'
@@ -445,3 +450,48 @@ def maybe_future(obj):
return asyncio.wrap_future(obj)
else:
return to_asyncio_future(gen.maybe_future(obj))
@async_generator
async def iterate_until(deadline_future, generator):
"""An async generator that yields items from a generator
until a deadline future resolves
This could *almost* be implemented as a context manager
like asyncio_timeout with a Future for the cutoff.
However, we want one distinction: continue yielding items
after the future is complete, as long as the are already finished.
Usage::
async for item in iterate_until(some_future, some_async_generator()):
print(item)
"""
async with aclosing(generator.__aiter__()) as aiter:
while True:
item_future = asyncio.ensure_future(aiter.__anext__())
await asyncio.wait(
[item_future, deadline_future],
return_when=asyncio.FIRST_COMPLETED)
if item_future.done():
try:
await yield_(item_future.result())
except (StopAsyncIteration, asyncio.CancelledError):
break
elif deadline_future.done():
# deadline is done *and* next item is not ready
# cancel item future to avoid warnings about
# unawaited tasks
if not item_future.cancelled():
item_future.cancel()
# resolve cancellation to avoid garbage collection issues
try:
await item_future
except asyncio.CancelledError:
pass
break
else:
# neither is done, this shouldn't happen
continue

View File

@@ -1,4 +1,5 @@
alembic
async_generator>=1.8
traitlets>=4.3.2
tornado>=4.1
jinja2

View File

@@ -12,3 +12,17 @@
.hidden {
display: none;
}
#progress-log {
margin-top: 8px;
}
.progress-log-event {
border-top: 1px solid #e7e7e7;
padding: 8px;
}
// hover-highlight on log events?
// .progress-log-event:hover {
// background: rgba(66, 165, 245, 0.2);
// }

View File

@@ -9,10 +9,20 @@
<p>Your server is starting up.</p>
<p>You will be redirected automatically when it's ready for you.</p>
{% endblock %}
<p><i class="fa fa-spinner fa-pulse fa-fw fa-3x" aria-hidden="true"></i></p>
<a role="button" id="refresh" class="btn btn-lg btn-primary" href="#">refresh</a>
<div class="progress">
<div id="progress-bar" class="progress-bar" role="progressbar" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%;">
<span class="sr-only"><span id="sr-progress">0%</span> Complete</span>
</div>
</div>
<p id="progress-message"></p>
</div>
<div class="row">
<div class="col-md-8 col-md-offset-2">
<details id="progress-details">
<summary>Event log</summary>
<div id="progress-log"></div>
</details>
</div>
</div>
{% endblock %}
@@ -24,9 +34,57 @@ require(["jquery"], function ($) {
$("#refresh").click(function () {
window.location.reload();
})
setTimeout(function () {
// hook up event-stream for progress
var evtSource = new EventSource("{{ progress_url }}");
var progressMessage = $("#progress-message");
var progressBar = $("#progress-bar");
var srProgress = $("#sr-progress");
var progressLog = $("#progress-log");
evtSource.onmessage = function(e) {
var evt = JSON.parse(e.data);
console.log(evt);
if (evt.progress !== undefined) {
// update progress
var progText = evt.progress.toString();
progressBar.attr('aria-valuenow', progText);
srProgress.text(progText + '%');
progressBar.css('width', progText + '%');
}
// update message
var html_message;
if (evt.html_message !== undefined) {
progressMessage.html(evt.html_message);
html_message = evt.html_message;
} else if (evt.message !== undefined) {
progressMessage.text(evt.message);
html_message = progressMessage.html();
}
if (html_message) {
progressLog.append(
$("<div>")
.addClass('progress-log-event')
.html(html_message)
);
}
if (evt.ready) {
evtSource.close();
// reload the current page
// which should result in a redirect to the running server
window.location.reload();
}, 5000);
}
if (evt.failed) {
evtSource.close();
// turn progress bar red
progressBar.addClass('progress-bar-danger');
// open event log for debugging
$('#progress-details').prop('open', true);
}
}
});
</script>
{% endblock %}