diff --git a/jupyterhub/apihandlers/users.py b/jupyterhub/apihandlers/users.py index 8545bf89..4f19ce89 100644 --- a/jupyterhub/apihandlers/users.py +++ b/jupyterhub/apihandlers/users.py @@ -4,16 +4,17 @@ # Distributed under the terms of the Modified BSD License. import asyncio -from datetime import datetime +from datetime import datetime, timedelta, timezone import json from async_generator import aclosing +from dateutil.parser import parse as parse_date from tornado import web from tornado.iostream import StreamClosedError from .. import orm from ..user import User -from ..utils import admin_only, iterate_until, maybe_future, url_path_join +from ..utils import admin_only, isoformat, iterate_until, maybe_future, url_path_join from .base import APIHandler @@ -587,6 +588,139 @@ class SpawnProgressAPIHandler(APIHandler): await self.send_event(failed_event) +def _parse_timestamp(timestamp): + """Parse and return a utc timestamp + + - raise HTTPError(400) on parse error + - handle and strip tz info for internal consistency + (we use naïve utc timestamps everywhere) + """ + try: + dt = parse_date(timestamp) + except Exception: + raise web.HTTPError(400, "Not a valid timestamp: %r", timestamp) + if dt.tzinfo: + # strip timezone info to naïve UTC datetime + dt = dt.astimezone(timezone.utc).replace(tzinfo=None) + + now = datetime.utcnow() + if (dt - now) > timedelta(minutes=59): + raise web.HTTPError( + 400, + "Rejecting activity from more than an hour in the future: {}".format( + isoformat(dt) + ) + ) + return dt + + +class ActivityAPIHandler(APIHandler): + + def _validate_servers(self, user, servers): + """Validate servers dict argument + + - types are correct + - each server exists + - last_activity fields are parsed into datetime objects + """ + msg = "servers must be a dict of the form {server_name: {last_activity: timestamp}}" + if not isinstance(servers, dict): + raise web.HTTPError(400, msg) + + spawners = user.orm_spawners + for server_name, server_info in servers.items(): + if server_name not in spawners: + raise web.HTTPError( + 400, + "No such server '{}' for user {}".format( + server_name, + user.name, + ) + ) + # check that each per-server field is a dict + if not isinstance(server_info, dict): + raise web.HTTPError(400, msg) + # check that last_activity is defined for each per-server dict + if 'last_activity' not in server_info: + raise web.HTTPError(400, msg) + # parse last_activity timestamps + # _parse_timestamp above is responsible for raising errors + server_info['last_activity'] = _parse_timestamp(server_info['last_activity']) + return servers + + @admin_or_self + def post(self, username): + user = self.find_user(username) + if user is None: + # no such user + raise web.HTTPError(404, "No such user: %r", username) + + body = self.get_json_body() + if not isinstance(body, dict): + raise web.HTTPError(400, "body must be a json dict") + + last_activity_timestamp = body.get('last_activity') + servers = body.get('servers') + if not last_activity_timestamp and not servers: + raise web.HTTPError( + 400, + "body must contain at least one of `last_activity` or `servers`" + ) + + if servers: + # validate server args + servers = self._validate_servers(user, servers) + # at this point we know that the servers dict + # is valid and contains only servers that exist + # and last_activity is defined and a valid datetime object + + # update user.last_activity if specified + if last_activity_timestamp: + last_activity = _parse_timestamp(last_activity_timestamp) + if ( + (not user.last_activity) + or last_activity > user.last_activity + ): + self.log.debug("Activity for user %s: %s", + user.name, + isoformat(last_activity), + ) + user.last_activity = last_activity + else: + self.log.debug( + "Not updating activity for %s: %s < %s", + user, + isoformat(last_activity), + isoformat(user.last_activity), + ) + + if servers: + for server_name, server_info in servers.items(): + last_activity = server_info['last_activity'] + spawner = user.orm_spawners[server_name] + + if ( + (not spawner.last_activity) + or last_activity > spawner.last_activity + ): + self.log.debug("Activity on server %s/%s: %s", + user.name, + server_name, + isoformat(last_activity), + ) + spawner.last_activity + else: + self.log.debug( + "Not updating server activity on %s/%s: %s < %s", + user.name, + server_name, + isoformat(last_activity), + isoformat(user.last_activity), + ) + + self.db.commit() + + default_handlers = [ (r"/api/user", SelfAPIHandler), (r"/api/users", UserListAPIHandler), @@ -597,5 +731,6 @@ default_handlers = [ (r"/api/users/([^/]+)/tokens/([^/]*)", UserTokenAPIHandler), (r"/api/users/([^/]+)/servers/([^/]*)", UserServerAPIHandler), (r"/api/users/([^/]+)/servers/([^/]*)/progress", SpawnProgressAPIHandler), + (r"/api/users/([^/]+)/activity", ActivityAPIHandler), (r"/api/users/([^/]+)/admin-access", UserAdminAccessAPIHandler), ] diff --git a/jupyterhub/services/service.py b/jupyterhub/services/service.py index f97b3546..11e05ece 100644 --- a/jupyterhub/services/service.py +++ b/jupyterhub/services/service.py @@ -107,6 +107,8 @@ class _ServiceSpawner(LocalProcessSpawner): def start(self): """Start the process""" env = self.get_env() + # no activity url for services + env.pop('JUPYTERHUB_ACTIVITY_URL', None) if os.name == 'nt': env['SYSTEMROOT'] = os.environ['SYSTEMROOT'] cmd = self.cmd diff --git a/jupyterhub/singleuser.py b/jupyterhub/singleuser.py index 84038173..c8332313 100755 --- a/jupyterhub/singleuser.py +++ b/jupyterhub/singleuser.py @@ -4,13 +4,17 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio +from datetime import datetime, timezone +import json import os +import random from textwrap import dedent from urllib.parse import urlparse from jinja2 import ChoiceLoader, FunctionLoader -from tornado.httpclient import AsyncHTTPClient +from tornado.httpclient import AsyncHTTPClient, HTTPRequest from tornado import gen from tornado import ioloop from tornado.web import HTTPError, RequestHandler @@ -21,8 +25,10 @@ except ImportError: raise ImportError("JupyterHub single-user server requires notebook >= 4.0") from traitlets import ( + Any, Bool, Bytes, + Integer, Unicode, CUnicode, default, @@ -43,7 +49,7 @@ from notebook.base.handlers import IPythonHandler from ._version import __version__, _check_version from .log import log_request from .services.auth import HubOAuth, HubOAuthenticated, HubOAuthCallbackHandler -from .utils import url_path_join, make_ssl_context +from .utils import isoformat, url_path_join, make_ssl_context, exponential_backoff # Authenticate requests with the Hub @@ -385,20 +391,30 @@ class SingleUserNotebookApp(NotebookApp): path = list(_exclude_home(path)) return path + hub_http_client = Any() + @default('hub_http_client') + def _default_client(self): + ssl_context = make_ssl_context( + self.keyfile, + self.certfile, + cafile=self.client_ca, + ) + AsyncHTTPClient.configure( + None, + defaults={ + "ssl_options": ssl_context, + }, + ) + return AsyncHTTPClient() + + async def check_hub_version(self): """Test a connection to my Hub - exit if I can't connect at all - check version and warn on sufficient mismatch """ - ssl_context = make_ssl_context( - self.keyfile, - self.certfile, - cafile=self.client_ca, - ) - AsyncHTTPClient.configure(None, defaults={"ssl_options" : ssl_context}) - - client = AsyncHTTPClient() + client = self.hub_http_client RETRIES = 5 for i in range(1, RETRIES+1): try: @@ -415,6 +431,112 @@ class SingleUserNotebookApp(NotebookApp): hub_version = resp.headers.get('X-JupyterHub-Version') _check_version(hub_version, __version__, self.log) + server_name = Unicode() + @default('server_name') + def _server_name_default(self): + return os.environ.get('JUPYTERHUB_SERVER_NAME', '') + + hub_activity_url = Unicode( + config=True, + help="URL for sending JupyterHub activity updates", + ) + @default('hub_activity_url') + def _default_activity_url(self): + return os.environ.get('JUPYTERHUB_ACTIVITY_URL', '') + + hub_activity_interval = Integer( + 300, + config=True, + help=""" + Interval (in seconds) on which to update the Hub + with our latest activity. + """ + ) + @default('hub_activity_interval') + def _default_activity_interval(self): + env_value = os.environ.get('JUPYTERHUB_ACTIVITY_INTERVAL') + if env_value: + return int(env_value) + else: + return 300 + + _last_activity_sent = Any(allow_none=True) + + async def notify_activity(self): + """Notify jupyterhub of activity""" + client = self.hub_http_client + last_activity = self.web_app.last_activity() + if not last_activity: + self.log.debug("No activity to send to the Hub") + return + if last_activity: + # protect against mixed timezone comparisons + if not last_activity.tzinfo: + # assume naive timestamps are utc + self.log.warning("last activity is using naïve timestamps") + last_activity = last_activity.replace(tzinfo=timezone.utc) + + if ( + self._last_activity_sent + and last_activity < self._last_activity_sent + ): + self.log.debug("No activity since %s", self._last_activity_sent) + return + + last_activity_timestamp = isoformat(last_activity) + + async def notify(): + self.log.debug("Notifying Hub of activity %s", last_activity_timestamp) + req = HTTPRequest( + url=self.hub_activity_url, + method='POST', + headers={ + "Authorization": "token {}".format(self.hub_auth.api_token), + "Content-Type": "application/json", + }, + body=json.dumps({ + 'servers': { + self.server_name: { + 'last_activity': last_activity_timestamp, + }, + }, + 'last_activity': last_activity_timestamp, + }) + ) + try: + await client.fetch(req) + except Exception: + self.log.exception("Error notifying Hub of activity") + return False + else: + return True + + await exponential_backoff( + notify, + fail_message="Failed to notify Hub of activity", + start_wait=1, + max_wait=15, + timeout=60, + ) + self._last_activity_sent = last_activity + + async def keep_activity_updated(self): + if not self.hub_activity_url or not self.hub_activity_interval: + self.log.warning("Activity events disabled") + return + self.log.info("Updating Hub with activity every %s seconds", + self.hub_activity_interval + ) + while True: + try: + await self.notify_activity() + except Exception as e: + self.log.exception("Error notifying Hub of activity") + # add 20% jitter to the interval to avoid alignment + # of lots of requests from user servers + t = self.hub_activity_interval * (1 + 0.2 * (random.random() - 0.5)) + await asyncio.sleep(t) + def initialize(self, argv=None): # disable trash by default # this can be re-enabled by config @@ -425,6 +547,7 @@ class SingleUserNotebookApp(NotebookApp): self.log.info("Starting jupyterhub-singleuser server version %s", __version__) # start by hitting Hub to check version ioloop.IOLoop.current().run_sync(self.check_hub_version) + ioloop.IOLoop.current().add_callback(self.keep_activity_updated) super(SingleUserNotebookApp, self).start() def init_hub_auth(self): diff --git a/jupyterhub/spawner.py b/jupyterhub/spawner.py index 26302799..08d8bc7c 100644 --- a/jupyterhub/spawner.py +++ b/jupyterhub/spawner.py @@ -653,7 +653,14 @@ class Spawner(LoggingConfigurable): # Info previously passed on args env['JUPYTERHUB_USER'] = self.user.name + env['JUPYTERHUB_SERVER_NAME'] = self.name env['JUPYTERHUB_API_URL'] = self.hub.api_url + env['JUPYTERHUB_ACTIVITY_URL'] = url_path_join( + self.hub.api_url, + 'users', + self.user.escaped_name, + 'activity', + ) env['JUPYTERHUB_BASE_URL'] = self.hub.base_url[:-4] if self.server: env['JUPYTERHUB_SERVICE_PREFIX'] = self.server.base_url