push activity to hub via api

servers publish activity on a regular interval (default: 5m)
This commit is contained in:
Min RK
2019-02-11 15:02:09 +01:00
parent 297f6988bd
commit 0063752a7f
4 changed files with 279 additions and 12 deletions

View File

@@ -4,16 +4,17 @@
# Distributed under the terms of the Modified BSD License.
import asyncio
from datetime import datetime
from datetime import datetime, timedelta, timezone
import json
from async_generator import aclosing
from dateutil.parser import parse as parse_date
from tornado import web
from tornado.iostream import StreamClosedError
from .. import orm
from ..user import User
from ..utils import admin_only, iterate_until, maybe_future, url_path_join
from ..utils import admin_only, isoformat, iterate_until, maybe_future, url_path_join
from .base import APIHandler
@@ -587,6 +588,139 @@ class SpawnProgressAPIHandler(APIHandler):
await self.send_event(failed_event)
def _parse_timestamp(timestamp):
"""Parse and return a utc timestamp
- raise HTTPError(400) on parse error
- handle and strip tz info for internal consistency
(we use naïve utc timestamps everywhere)
"""
try:
dt = parse_date(timestamp)
except Exception:
raise web.HTTPError(400, "Not a valid timestamp: %r", timestamp)
if dt.tzinfo:
# strip timezone info to naïve UTC datetime
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
now = datetime.utcnow()
if (dt - now) > timedelta(minutes=59):
raise web.HTTPError(
400,
"Rejecting activity from more than an hour in the future: {}".format(
isoformat(dt)
)
)
return dt
class ActivityAPIHandler(APIHandler):
def _validate_servers(self, user, servers):
"""Validate servers dict argument
- types are correct
- each server exists
- last_activity fields are parsed into datetime objects
"""
msg = "servers must be a dict of the form {server_name: {last_activity: timestamp}}"
if not isinstance(servers, dict):
raise web.HTTPError(400, msg)
spawners = user.orm_spawners
for server_name, server_info in servers.items():
if server_name not in spawners:
raise web.HTTPError(
400,
"No such server '{}' for user {}".format(
server_name,
user.name,
)
)
# check that each per-server field is a dict
if not isinstance(server_info, dict):
raise web.HTTPError(400, msg)
# check that last_activity is defined for each per-server dict
if 'last_activity' not in server_info:
raise web.HTTPError(400, msg)
# parse last_activity timestamps
# _parse_timestamp above is responsible for raising errors
server_info['last_activity'] = _parse_timestamp(server_info['last_activity'])
return servers
@admin_or_self
def post(self, username):
user = self.find_user(username)
if user is None:
# no such user
raise web.HTTPError(404, "No such user: %r", username)
body = self.get_json_body()
if not isinstance(body, dict):
raise web.HTTPError(400, "body must be a json dict")
last_activity_timestamp = body.get('last_activity')
servers = body.get('servers')
if not last_activity_timestamp and not servers:
raise web.HTTPError(
400,
"body must contain at least one of `last_activity` or `servers`"
)
if servers:
# validate server args
servers = self._validate_servers(user, servers)
# at this point we know that the servers dict
# is valid and contains only servers that exist
# and last_activity is defined and a valid datetime object
# update user.last_activity if specified
if last_activity_timestamp:
last_activity = _parse_timestamp(last_activity_timestamp)
if (
(not user.last_activity)
or last_activity > user.last_activity
):
self.log.debug("Activity for user %s: %s",
user.name,
isoformat(last_activity),
)
user.last_activity = last_activity
else:
self.log.debug(
"Not updating activity for %s: %s < %s",
user,
isoformat(last_activity),
isoformat(user.last_activity),
)
if servers:
for server_name, server_info in servers.items():
last_activity = server_info['last_activity']
spawner = user.orm_spawners[server_name]
if (
(not spawner.last_activity)
or last_activity > spawner.last_activity
):
self.log.debug("Activity on server %s/%s: %s",
user.name,
server_name,
isoformat(last_activity),
)
spawner.last_activity
else:
self.log.debug(
"Not updating server activity on %s/%s: %s < %s",
user.name,
server_name,
isoformat(last_activity),
isoformat(user.last_activity),
)
self.db.commit()
default_handlers = [
(r"/api/user", SelfAPIHandler),
(r"/api/users", UserListAPIHandler),
@@ -597,5 +731,6 @@ default_handlers = [
(r"/api/users/([^/]+)/tokens/([^/]*)", UserTokenAPIHandler),
(r"/api/users/([^/]+)/servers/([^/]*)", UserServerAPIHandler),
(r"/api/users/([^/]+)/servers/([^/]*)/progress", SpawnProgressAPIHandler),
(r"/api/users/([^/]+)/activity", ActivityAPIHandler),
(r"/api/users/([^/]+)/admin-access", UserAdminAccessAPIHandler),
]

View File

@@ -107,6 +107,8 @@ class _ServiceSpawner(LocalProcessSpawner):
def start(self):
"""Start the process"""
env = self.get_env()
# no activity url for services
env.pop('JUPYTERHUB_ACTIVITY_URL', None)
if os.name == 'nt':
env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
cmd = self.cmd

View File

@@ -4,13 +4,17 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
from datetime import datetime, timezone
import json
import os
import random
from textwrap import dedent
from urllib.parse import urlparse
from jinja2 import ChoiceLoader, FunctionLoader
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from tornado import gen
from tornado import ioloop
from tornado.web import HTTPError, RequestHandler
@@ -21,8 +25,10 @@ except ImportError:
raise ImportError("JupyterHub single-user server requires notebook >= 4.0")
from traitlets import (
Any,
Bool,
Bytes,
Integer,
Unicode,
CUnicode,
default,
@@ -43,7 +49,7 @@ from notebook.base.handlers import IPythonHandler
from ._version import __version__, _check_version
from .log import log_request
from .services.auth import HubOAuth, HubOAuthenticated, HubOAuthCallbackHandler
from .utils import url_path_join, make_ssl_context
from .utils import isoformat, url_path_join, make_ssl_context, exponential_backoff
# Authenticate requests with the Hub
@@ -385,20 +391,30 @@ class SingleUserNotebookApp(NotebookApp):
path = list(_exclude_home(path))
return path
hub_http_client = Any()
@default('hub_http_client')
def _default_client(self):
ssl_context = make_ssl_context(
self.keyfile,
self.certfile,
cafile=self.client_ca,
)
AsyncHTTPClient.configure(
None,
defaults={
"ssl_options": ssl_context,
},
)
return AsyncHTTPClient()
async def check_hub_version(self):
"""Test a connection to my Hub
- exit if I can't connect at all
- check version and warn on sufficient mismatch
"""
ssl_context = make_ssl_context(
self.keyfile,
self.certfile,
cafile=self.client_ca,
)
AsyncHTTPClient.configure(None, defaults={"ssl_options" : ssl_context})
client = AsyncHTTPClient()
client = self.hub_http_client
RETRIES = 5
for i in range(1, RETRIES+1):
try:
@@ -415,6 +431,112 @@ class SingleUserNotebookApp(NotebookApp):
hub_version = resp.headers.get('X-JupyterHub-Version')
_check_version(hub_version, __version__, self.log)
server_name = Unicode()
@default('server_name')
def _server_name_default(self):
return os.environ.get('JUPYTERHUB_SERVER_NAME', '')
hub_activity_url = Unicode(
config=True,
help="URL for sending JupyterHub activity updates",
)
@default('hub_activity_url')
def _default_activity_url(self):
return os.environ.get('JUPYTERHUB_ACTIVITY_URL', '')
hub_activity_interval = Integer(
300,
config=True,
help="""
Interval (in seconds) on which to update the Hub
with our latest activity.
"""
)
@default('hub_activity_interval')
def _default_activity_interval(self):
env_value = os.environ.get('JUPYTERHUB_ACTIVITY_INTERVAL')
if env_value:
return int(env_value)
else:
return 300
_last_activity_sent = Any(allow_none=True)
async def notify_activity(self):
"""Notify jupyterhub of activity"""
client = self.hub_http_client
last_activity = self.web_app.last_activity()
if not last_activity:
self.log.debug("No activity to send to the Hub")
return
if last_activity:
# protect against mixed timezone comparisons
if not last_activity.tzinfo:
# assume naive timestamps are utc
self.log.warning("last activity is using naïve timestamps")
last_activity = last_activity.replace(tzinfo=timezone.utc)
if (
self._last_activity_sent
and last_activity < self._last_activity_sent
):
self.log.debug("No activity since %s", self._last_activity_sent)
return
last_activity_timestamp = isoformat(last_activity)
async def notify():
self.log.debug("Notifying Hub of activity %s", last_activity_timestamp)
req = HTTPRequest(
url=self.hub_activity_url,
method='POST',
headers={
"Authorization": "token {}".format(self.hub_auth.api_token),
"Content-Type": "application/json",
},
body=json.dumps({
'servers': {
self.server_name: {
'last_activity': last_activity_timestamp,
},
},
'last_activity': last_activity_timestamp,
})
)
try:
await client.fetch(req)
except Exception:
self.log.exception("Error notifying Hub of activity")
return False
else:
return True
await exponential_backoff(
notify,
fail_message="Failed to notify Hub of activity",
start_wait=1,
max_wait=15,
timeout=60,
)
self._last_activity_sent = last_activity
async def keep_activity_updated(self):
if not self.hub_activity_url or not self.hub_activity_interval:
self.log.warning("Activity events disabled")
return
self.log.info("Updating Hub with activity every %s seconds",
self.hub_activity_interval
)
while True:
try:
await self.notify_activity()
except Exception as e:
self.log.exception("Error notifying Hub of activity")
# add 20% jitter to the interval to avoid alignment
# of lots of requests from user servers
t = self.hub_activity_interval * (1 + 0.2 * (random.random() - 0.5))
await asyncio.sleep(t)
def initialize(self, argv=None):
# disable trash by default
# this can be re-enabled by config
@@ -425,6 +547,7 @@ class SingleUserNotebookApp(NotebookApp):
self.log.info("Starting jupyterhub-singleuser server version %s", __version__)
# start by hitting Hub to check version
ioloop.IOLoop.current().run_sync(self.check_hub_version)
ioloop.IOLoop.current().add_callback(self.keep_activity_updated)
super(SingleUserNotebookApp, self).start()
def init_hub_auth(self):

View File

@@ -653,7 +653,14 @@ class Spawner(LoggingConfigurable):
# Info previously passed on args
env['JUPYTERHUB_USER'] = self.user.name
env['JUPYTERHUB_SERVER_NAME'] = self.name
env['JUPYTERHUB_API_URL'] = self.hub.api_url
env['JUPYTERHUB_ACTIVITY_URL'] = url_path_join(
self.hub.api_url,
'users',
self.user.escaped_name,
'activity',
)
env['JUPYTERHUB_BASE_URL'] = self.hub.base_url[:-4]
if self.server:
env['JUPYTERHUB_SERVICE_PREFIX'] = self.server.base_url