Merge pull request #2346 from minrk/push-activity

push last_activity to the hub
This commit is contained in:
Min RK
2019-02-15 11:16:12 +01:00
committed by GitHub
10 changed files with 444 additions and 20 deletions

View File

@@ -4,3 +4,4 @@
sphinx>=1.7
recommonmark==0.4.0
sphinx-copybutton
alabaster_jupyterhub

View File

@@ -89,7 +89,7 @@ paths:
post:
summary: Create multiple users
parameters:
- name: data
- name: body
in: body
required: true
schema:
@@ -147,7 +147,7 @@ paths:
in: path
required: true
type: string
- name: data
- name: body
in: body
required: true
description: Updated user info. At least one key to be updated (name or admin) is required.
@@ -176,6 +176,60 @@ paths:
responses:
'204':
description: The user has been deleted
/users/{name}/activity:
post:
summary:
Notify Hub of activity for a given user.
description:
Notify the Hub of activity by the user,
e.g. accessing a service or (more likely)
actively using a server.
parameters:
- name: name
description: username
in: path
required: true
type: string
- body:
in: body
schema:
type: object
properties:
last_activity:
type: string
format: date-time
description: |
Timestamp of last-seen activity for this user.
Only needed if this is not activity associated
with using a given server.
required: false
servers:
description: |
Register activity for specific servers by name.
The keys of this dict are the names of servers.
The default server has an empty name ('').
required: false
type: object
properties:
'<server name>':
description: |
Activity for a single server.
type: object
properties:
last_activity:
required: true
type: string
format: date-time
description: |
Timestamp of last-seen activity on this server.
example:
last_activity: '2019-02-06T12:54:14Z'
servers:
'':
last_activity: '2019-02-06T12:54:14Z'
gpu:
last_activity: '2019-02-06T12:54:14Z'
/users/{name}/server:
post:
summary: Start a user's single-user notebook server
@@ -370,7 +424,7 @@ paths:
in: path
required: true
type: string
- name: data
- name: body
in: body
required: true
description: The users to add to the group
@@ -395,7 +449,7 @@ paths:
in: path
required: true
type: string
- name: data
- name: body
in: body
required: true
description: The users to remove from the group
@@ -453,7 +507,7 @@ paths:
summary: Notify the Hub about a new proxy
description: Notifies the Hub of a new proxy to use.
parameters:
- name: data
- name: body
in: body
required: true
description: Any values that have changed for the new proxy. All keys are optional.

View File

@@ -4,16 +4,17 @@
# Distributed under the terms of the Modified BSD License.
import asyncio
from datetime import datetime
from datetime import datetime, timedelta, timezone
import json
from async_generator import aclosing
from dateutil.parser import parse as parse_date
from tornado import web
from tornado.iostream import StreamClosedError
from .. import orm
from ..user import User
from ..utils import admin_only, iterate_until, maybe_future, url_path_join
from ..utils import admin_only, isoformat, iterate_until, maybe_future, url_path_join
from .base import APIHandler
@@ -587,6 +588,139 @@ class SpawnProgressAPIHandler(APIHandler):
await self.send_event(failed_event)
def _parse_timestamp(timestamp):
"""Parse and return a utc timestamp
- raise HTTPError(400) on parse error
- handle and strip tz info for internal consistency
(we use naïve utc timestamps everywhere)
"""
try:
dt = parse_date(timestamp)
except Exception:
raise web.HTTPError(400, "Not a valid timestamp: %r", timestamp)
if dt.tzinfo:
# strip timezone info to naïve UTC datetime
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
now = datetime.utcnow()
if (dt - now) > timedelta(minutes=59):
raise web.HTTPError(
400,
"Rejecting activity from more than an hour in the future: {}".format(
isoformat(dt)
)
)
return dt
class ActivityAPIHandler(APIHandler):
def _validate_servers(self, user, servers):
"""Validate servers dict argument
- types are correct
- each server exists
- last_activity fields are parsed into datetime objects
"""
msg = "servers must be a dict of the form {server_name: {last_activity: timestamp}}"
if not isinstance(servers, dict):
raise web.HTTPError(400, msg)
spawners = user.orm_spawners
for server_name, server_info in servers.items():
if server_name not in spawners:
raise web.HTTPError(
400,
"No such server '{}' for user {}".format(
server_name,
user.name,
)
)
# check that each per-server field is a dict
if not isinstance(server_info, dict):
raise web.HTTPError(400, msg)
# check that last_activity is defined for each per-server dict
if 'last_activity' not in server_info:
raise web.HTTPError(400, msg)
# parse last_activity timestamps
# _parse_timestamp above is responsible for raising errors
server_info['last_activity'] = _parse_timestamp(server_info['last_activity'])
return servers
@admin_or_self
def post(self, username):
user = self.find_user(username)
if user is None:
# no such user
raise web.HTTPError(404, "No such user: %r", username)
body = self.get_json_body()
if not isinstance(body, dict):
raise web.HTTPError(400, "body must be a json dict")
last_activity_timestamp = body.get('last_activity')
servers = body.get('servers')
if not last_activity_timestamp and not servers:
raise web.HTTPError(
400,
"body must contain at least one of `last_activity` or `servers`"
)
if servers:
# validate server args
servers = self._validate_servers(user, servers)
# at this point we know that the servers dict
# is valid and contains only servers that exist
# and last_activity is defined and a valid datetime object
# update user.last_activity if specified
if last_activity_timestamp:
last_activity = _parse_timestamp(last_activity_timestamp)
if (
(not user.last_activity)
or last_activity > user.last_activity
):
self.log.debug("Activity for user %s: %s",
user.name,
isoformat(last_activity),
)
user.last_activity = last_activity
else:
self.log.debug(
"Not updating activity for %s: %s < %s",
user,
isoformat(last_activity),
isoformat(user.last_activity),
)
if servers:
for server_name, server_info in servers.items():
last_activity = server_info['last_activity']
spawner = user.orm_spawners[server_name]
if (
(not spawner.last_activity)
or last_activity > spawner.last_activity
):
self.log.debug("Activity on server %s/%s: %s",
user.name,
server_name,
isoformat(last_activity),
)
spawner.last_activity = last_activity
else:
self.log.debug(
"Not updating server activity on %s/%s: %s < %s",
user.name,
server_name,
isoformat(last_activity),
isoformat(user.last_activity),
)
self.db.commit()
default_handlers = [
(r"/api/user", SelfAPIHandler),
(r"/api/users", UserListAPIHandler),
@@ -597,5 +731,6 @@ default_handlers = [
(r"/api/users/([^/]+)/tokens/([^/]*)", UserTokenAPIHandler),
(r"/api/users/([^/]+)/servers/([^/]*)", UserServerAPIHandler),
(r"/api/users/([^/]+)/servers/([^/]*)/progress", SpawnProgressAPIHandler),
(r"/api/users/([^/]+)/activity", ActivityAPIHandler),
(r"/api/users/([^/]+)/admin-access", UserAdminAccessAPIHandler),
]

View File

@@ -310,7 +310,11 @@ class BaseHandler(RequestHandler):
now = datetime.utcnow()
orm_token.last_activity = now
if orm_token.user:
orm_token.user.last_activity = now
# FIXME: scopes should give us better control than this
# don't consider API requests originating from a server
# to be activity from the user
if not orm_token.note.startswith("Server at "):
orm_token.user.last_activity = now
self.db.commit()
if orm_token.service:

View File

@@ -107,6 +107,8 @@ class _ServiceSpawner(LocalProcessSpawner):
def start(self):
"""Start the process"""
env = self.get_env()
# no activity url for services
env.pop('JUPYTERHUB_ACTIVITY_URL', None)
if os.name == 'nt':
env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
cmd = self.cmd

View File

@@ -4,13 +4,17 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
from datetime import datetime, timezone
import json
import os
import random
from textwrap import dedent
from urllib.parse import urlparse
from jinja2 import ChoiceLoader, FunctionLoader
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from tornado import gen
from tornado import ioloop
from tornado.web import HTTPError, RequestHandler
@@ -21,8 +25,10 @@ except ImportError:
raise ImportError("JupyterHub single-user server requires notebook >= 4.0")
from traitlets import (
Any,
Bool,
Bytes,
Integer,
Unicode,
CUnicode,
default,
@@ -43,7 +49,7 @@ from notebook.base.handlers import IPythonHandler
from ._version import __version__, _check_version
from .log import log_request
from .services.auth import HubOAuth, HubOAuthenticated, HubOAuthCallbackHandler
from .utils import url_path_join, make_ssl_context
from .utils import isoformat, url_path_join, make_ssl_context, exponential_backoff
# Authenticate requests with the Hub
@@ -385,20 +391,32 @@ class SingleUserNotebookApp(NotebookApp):
path = list(_exclude_home(path))
return path
# create dynamic default http client,
# configured with any relevant ssl config
hub_http_client = Any()
@default('hub_http_client')
def _default_client(self):
ssl_context = make_ssl_context(
self.keyfile,
self.certfile,
cafile=self.client_ca,
)
AsyncHTTPClient.configure(
None,
defaults={
"ssl_options": ssl_context,
},
)
return AsyncHTTPClient()
async def check_hub_version(self):
"""Test a connection to my Hub
- exit if I can't connect at all
- check version and warn on sufficient mismatch
"""
ssl_context = make_ssl_context(
self.keyfile,
self.certfile,
cafile=self.client_ca,
)
AsyncHTTPClient.configure(None, defaults={"ssl_options" : ssl_context})
client = AsyncHTTPClient()
client = self.hub_http_client
RETRIES = 5
for i in range(1, RETRIES+1):
try:
@@ -415,6 +433,112 @@ class SingleUserNotebookApp(NotebookApp):
hub_version = resp.headers.get('X-JupyterHub-Version')
_check_version(hub_version, __version__, self.log)
server_name = Unicode()
@default('server_name')
def _server_name_default(self):
return os.environ.get('JUPYTERHUB_SERVER_NAME', '')
hub_activity_url = Unicode(
config=True,
help="URL for sending JupyterHub activity updates",
)
@default('hub_activity_url')
def _default_activity_url(self):
return os.environ.get('JUPYTERHUB_ACTIVITY_URL', '')
hub_activity_interval = Integer(
300,
config=True,
help="""
Interval (in seconds) on which to update the Hub
with our latest activity.
"""
)
@default('hub_activity_interval')
def _default_activity_interval(self):
env_value = os.environ.get('JUPYTERHUB_ACTIVITY_INTERVAL')
if env_value:
return int(env_value)
else:
return 300
_last_activity_sent = Any(allow_none=True)
async def notify_activity(self):
"""Notify jupyterhub of activity"""
client = self.hub_http_client
last_activity = self.web_app.last_activity()
if not last_activity:
self.log.debug("No activity to send to the Hub")
return
if last_activity:
# protect against mixed timezone comparisons
if not last_activity.tzinfo:
# assume naive timestamps are utc
self.log.warning("last activity is using naïve timestamps")
last_activity = last_activity.replace(tzinfo=timezone.utc)
if (
self._last_activity_sent
and last_activity < self._last_activity_sent
):
self.log.debug("No activity since %s", self._last_activity_sent)
return
last_activity_timestamp = isoformat(last_activity)
async def notify():
self.log.debug("Notifying Hub of activity %s", last_activity_timestamp)
req = HTTPRequest(
url=self.hub_activity_url,
method='POST',
headers={
"Authorization": "token {}".format(self.hub_auth.api_token),
"Content-Type": "application/json",
},
body=json.dumps({
'servers': {
self.server_name: {
'last_activity': last_activity_timestamp,
},
},
'last_activity': last_activity_timestamp,
})
)
try:
await client.fetch(req)
except Exception:
self.log.exception("Error notifying Hub of activity")
return False
else:
return True
await exponential_backoff(
notify,
fail_message="Failed to notify Hub of activity",
start_wait=1,
max_wait=15,
timeout=60,
)
self._last_activity_sent = last_activity
async def keep_activity_updated(self):
if not self.hub_activity_url or not self.hub_activity_interval:
self.log.warning("Activity events disabled")
return
self.log.info("Updating Hub with activity every %s seconds",
self.hub_activity_interval
)
while True:
try:
await self.notify_activity()
except Exception as e:
self.log.exception("Error notifying Hub of activity")
# add 20% jitter to the interval to avoid alignment
# of lots of requests from user servers
t = self.hub_activity_interval * (1 + 0.2 * (random.random() - 0.5))
await asyncio.sleep(t)
def initialize(self, argv=None):
# disable trash by default
# this can be re-enabled by config
@@ -425,6 +549,7 @@ class SingleUserNotebookApp(NotebookApp):
self.log.info("Starting jupyterhub-singleuser server version %s", __version__)
# start by hitting Hub to check version
ioloop.IOLoop.current().run_sync(self.check_hub_version)
ioloop.IOLoop.current().add_callback(self.keep_activity_updated)
super(SingleUserNotebookApp, self).start()
def init_hub_auth(self):

View File

@@ -653,7 +653,15 @@ class Spawner(LoggingConfigurable):
# Info previously passed on args
env['JUPYTERHUB_USER'] = self.user.name
env['JUPYTERHUB_SERVER_NAME'] = self.name
env['JUPYTERHUB_API_URL'] = self.hub.api_url
env['JUPYTERHUB_ACTIVITY_URL'] = url_path_join(
self.hub.api_url,
'users',
# tolerate mocks defining only user.name
getattr(self.user, 'escaped_name', self.user.name),
'activity',
)
env['JUPYTERHUB_BASE_URL'] = self.hub.base_url[:-4]
if self.server:
env['JUPYTERHUB_SERVICE_PREFIX'] = self.server.base_url

View File

@@ -1,7 +1,7 @@
"""Tests for the REST API."""
import asyncio
from datetime import datetime
from datetime import datetime, timedelta
from concurrent.futures import Future
import json
import re
@@ -16,7 +16,7 @@ from tornado import gen
import jupyterhub
from .. import orm
from ..utils import url_path_join as ujoin
from ..utils import url_path_join as ujoin, utcnow
from .mocking import public_host, public_url
from .utils import (
add_user,
@@ -1549,6 +1549,91 @@ async def test_info(app):
}
# ------------------
# Activity API tests
# ------------------
async def test_update_activity_403(app, user, admin_user):
token = user.new_api_token()
r = await api_request(
app,
"users/{}/activity".format(admin_user.name),
headers={"Authorization": "token {}".format(token)},
data="{}",
method="post",
)
assert r.status_code == 403
async def test_update_activity_admin(app, user, admin_user):
token = admin_user.new_api_token()
r = await api_request(
app,
"users/{}/activity".format(user.name),
headers={"Authorization": "token {}".format(token)},
data=json.dumps({"last_activity": utcnow().isoformat()}),
method="post",
)
r.raise_for_status()
@mark.parametrize(
"server_name, fresh",
[
("", True),
("", False),
("exists", True),
("exists", False),
("nope", True),
("nope", False),
],
)
async def test_update_server_activity(app, user, server_name, fresh):
token = user.new_api_token()
now = utcnow()
internal_now = now.replace(tzinfo=None)
# we use naive utc internally
# initialize last_activity for one named and the default server
for name in ("", "exists"):
user.spawners[name].orm_spawner.last_activity = now.replace(tzinfo=None)
app.db.commit()
td = timedelta(minutes=1)
if fresh:
activity = now + td
else:
activity = now - td
r = await api_request(
app,
"users/{}/activity".format(user.name),
headers={"Authorization": "token {}".format(token)},
data=json.dumps(
{"servers": {server_name: {"last_activity": activity.isoformat()}}}
),
method="post",
)
if server_name == "nope":
assert r.status_code == 400
reply = r.json()
assert server_name in reply["message"]
assert "No such server" in reply["message"]
assert user.name in reply["message"]
return
r.raise_for_status()
# check that last activity was updated
if fresh:
expected = activity.replace(tzinfo=None)
else:
expected = now.replace(tzinfo=None)
assert user.spawners[server_name].orm_spawner.last_activity == expected
# -----------------
# General API tests
# -----------------

View File

@@ -674,6 +674,9 @@ class User:
spawner._start_pending = False
spawner.stop_polling()
spawner._stop_pending = True
self.log.debug("Stopping %s", spawner._log_name)
try:
api_token = spawner.api_token
status = await spawner.poll()
@@ -705,6 +708,7 @@ class User:
self.log.debug("Deleting oauth client %s", oauth_client.identifier)
self.db.delete(oauth_client)
self.db.commit()
self.log.debug("Finished stopping %s", spawner._log_name)
finally:
spawner.orm_spawner.started = None
self.db.commit()

View File

@@ -541,3 +541,9 @@ async def iterate_until(deadline_future, generator):
else:
# neither is done, this shouldn't happen
continue
def utcnow():
"""Return timezone-aware utcnow"""
return datetime.now(timezone.utc)