mirror of
https://github.com/jupyterhub/jupyterhub.git
synced 2025-10-16 14:33:00 +00:00
Add eventlogging infrastructure
- Introduce the EventLog class from BinderHub for emitting structured event data - Instrument server starts and stops to emit events - Defaults to not saving any events anywhere
This commit is contained in:
@@ -11,6 +11,8 @@ import re
|
|||||||
import signal
|
import signal
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
|
import json
|
||||||
|
from glob import glob
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from datetime import timezone
|
from datetime import timezone
|
||||||
@@ -87,6 +89,7 @@ from .auth import Authenticator, PAMAuthenticator
|
|||||||
from .crypto import CryptKeeper
|
from .crypto import CryptKeeper
|
||||||
from .spawner import Spawner, LocalProcessSpawner
|
from .spawner import Spawner, LocalProcessSpawner
|
||||||
from .objects import Hub, Server
|
from .objects import Hub, Server
|
||||||
|
from .events import EventLog
|
||||||
|
|
||||||
# For faking stats
|
# For faking stats
|
||||||
from .emptyclass import EmptyClass
|
from .emptyclass import EmptyClass
|
||||||
@@ -2069,6 +2072,7 @@ class JupyterHub(Application):
|
|||||||
internal_ssl_ca=self.internal_ssl_ca,
|
internal_ssl_ca=self.internal_ssl_ca,
|
||||||
trusted_alt_names=self.trusted_alt_names,
|
trusted_alt_names=self.trusted_alt_names,
|
||||||
shutdown_on_logout=self.shutdown_on_logout,
|
shutdown_on_logout=self.shutdown_on_logout,
|
||||||
|
event_log=self.event_log
|
||||||
)
|
)
|
||||||
# allow configured settings to have priority
|
# allow configured settings to have priority
|
||||||
settings.update(self.tornado_settings)
|
settings.update(self.tornado_settings)
|
||||||
@@ -2144,6 +2148,12 @@ class JupyterHub(Application):
|
|||||||
_log_cls("Authenticator", self.authenticator_class)
|
_log_cls("Authenticator", self.authenticator_class)
|
||||||
_log_cls("Spawner", self.spawner_class)
|
_log_cls("Spawner", self.spawner_class)
|
||||||
|
|
||||||
|
self.event_log = EventLog(parent=self)
|
||||||
|
|
||||||
|
for schema_file in glob(os.path.join(here, 'event-schemas','*.json')):
|
||||||
|
with open(schema_file) as f:
|
||||||
|
self.event_log.register_schema(json.load(f))
|
||||||
|
|
||||||
self.init_pycurl()
|
self.init_pycurl()
|
||||||
self.init_secrets()
|
self.init_secrets()
|
||||||
self.init_internal_ssl()
|
self.init_internal_ssl()
|
||||||
|
24
jupyterhub/event-schemas/server-actions.json
Normal file
24
jupyterhub/event-schemas/server-actions.json
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
{
|
||||||
|
"$id": "hub.jupyter.org/server-action",
|
||||||
|
"version": 1,
|
||||||
|
"title": "JupyterHub server events",
|
||||||
|
"description": "JupyterHub emits this event when a user's server starts or stops",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"action": {
|
||||||
|
"enum": [
|
||||||
|
"start",
|
||||||
|
"stop"
|
||||||
|
],
|
||||||
|
"description": "Action taken on this user's server"
|
||||||
|
},
|
||||||
|
"username": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Name of the user whose server this action applies to"
|
||||||
|
},
|
||||||
|
"servername": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Name of the server this action applies to"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
119
jupyterhub/events.py
Normal file
119
jupyterhub/events.py
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
"""
|
||||||
|
Emit structured, discrete events when various actions happen.
|
||||||
|
"""
|
||||||
|
from traitlets.config import Configurable
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
import jsonschema
|
||||||
|
from pythonjsonlogger import jsonlogger
|
||||||
|
from traitlets import TraitType
|
||||||
|
import json
|
||||||
|
import six
|
||||||
|
|
||||||
|
|
||||||
|
class Callable(TraitType):
|
||||||
|
"""
|
||||||
|
A trait which is callable.
|
||||||
|
|
||||||
|
Classes are callable, as are instances
|
||||||
|
with a __call__() method.
|
||||||
|
"""
|
||||||
|
info_text = 'a callable'
|
||||||
|
def validate(self, obj, value):
|
||||||
|
if six.callable(value):
|
||||||
|
return value
|
||||||
|
else:
|
||||||
|
self.error(obj, value)
|
||||||
|
|
||||||
|
def _skip_message(record, **kwargs):
|
||||||
|
"""
|
||||||
|
Remove 'message' from log record.
|
||||||
|
|
||||||
|
It is always emitted with 'null', and we do not want it,
|
||||||
|
since we are always emitting events only
|
||||||
|
"""
|
||||||
|
del record['message']
|
||||||
|
return json.dumps(record, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class EventLog(Configurable):
|
||||||
|
"""
|
||||||
|
Send structured events to a logging sink
|
||||||
|
"""
|
||||||
|
handlers_maker = Callable(
|
||||||
|
None,
|
||||||
|
config=True,
|
||||||
|
allow_none=True,
|
||||||
|
help="""
|
||||||
|
Callable that returns a list of logging.Handler instances to send events to.
|
||||||
|
|
||||||
|
When set to None (the default), events are discarded.
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
self.log = logging.getLogger(__name__)
|
||||||
|
# We don't want events to show up in the default logs
|
||||||
|
self.log.propagate = False
|
||||||
|
self.log.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
if self.handlers_maker:
|
||||||
|
self.handlers = self.handlers_maker(self)
|
||||||
|
formatter = jsonlogger.JsonFormatter(json_serializer=_skip_message)
|
||||||
|
for handler in self.handlers:
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
self.log.addHandler(handler)
|
||||||
|
|
||||||
|
self.schemas = {}
|
||||||
|
|
||||||
|
def register_schema(self, schema):
|
||||||
|
"""
|
||||||
|
Register a given JSON Schema with this event emitter
|
||||||
|
|
||||||
|
'version' and '$id' are required fields.
|
||||||
|
"""
|
||||||
|
# Check if our schema itself is valid
|
||||||
|
# This throws an exception if it isn't valid
|
||||||
|
jsonschema.validators.validator_for(schema).check_schema(schema)
|
||||||
|
|
||||||
|
# Check that the properties we require are present
|
||||||
|
required_schema_fields = {'$id', 'version'}
|
||||||
|
for rsf in required_schema_fields:
|
||||||
|
if rsf not in schema:
|
||||||
|
raise ValueError(
|
||||||
|
f'{rsf} is required in schema specification'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure reserved, auto-added fields are not in schema
|
||||||
|
reserved_fields = {'timestamp', 'schema', 'version'}
|
||||||
|
for rf in reserved_fields:
|
||||||
|
if rf in schema['properties']:
|
||||||
|
raise ValueError(
|
||||||
|
f'{rf} field is reserved by event emitter & can not be explicitly set in schema'
|
||||||
|
)
|
||||||
|
|
||||||
|
self.schemas[(schema['$id'], schema['version'])] = schema
|
||||||
|
|
||||||
|
def emit(self, schema_name, version, event):
|
||||||
|
"""
|
||||||
|
Emit event with given schema / version in a capsule.
|
||||||
|
"""
|
||||||
|
if not self.handlers_maker:
|
||||||
|
# If we don't have a handler setup, ignore everything
|
||||||
|
return
|
||||||
|
|
||||||
|
if (schema_name, version) not in self.schemas:
|
||||||
|
raise ValueError(f'Schema {schema_name} version {version} not registered')
|
||||||
|
schema = self.schemas[(schema_name, version)]
|
||||||
|
jsonschema.validate(event, schema)
|
||||||
|
|
||||||
|
capsule = {
|
||||||
|
'timestamp': datetime.utcnow().isoformat() + 'Z',
|
||||||
|
'schema': schema_name,
|
||||||
|
'version': version
|
||||||
|
}
|
||||||
|
capsule.update(event)
|
||||||
|
self.log.info(capsule)
|
@@ -156,6 +156,10 @@ class BaseHandler(RequestHandler):
|
|||||||
def oauth_provider(self):
|
def oauth_provider(self):
|
||||||
return self.settings['oauth_provider']
|
return self.settings['oauth_provider']
|
||||||
|
|
||||||
|
@property
|
||||||
|
def event_log(self):
|
||||||
|
return self.settings['event_log']
|
||||||
|
|
||||||
def finish(self, *args, **kwargs):
|
def finish(self, *args, **kwargs):
|
||||||
"""Roll back any uncommitted transactions from the handler."""
|
"""Roll back any uncommitted transactions from the handler."""
|
||||||
if self.db.dirty:
|
if self.db.dirty:
|
||||||
@@ -846,6 +850,11 @@ class BaseHandler(RequestHandler):
|
|||||||
SERVER_SPAWN_DURATION_SECONDS.labels(
|
SERVER_SPAWN_DURATION_SECONDS.labels(
|
||||||
status=ServerSpawnStatus.success
|
status=ServerSpawnStatus.success
|
||||||
).observe(time.perf_counter() - spawn_start_time)
|
).observe(time.perf_counter() - spawn_start_time)
|
||||||
|
self.event_log.emit('hub.jupyter.org/server-action', 1, {
|
||||||
|
'action': 'start',
|
||||||
|
'username': user.name,
|
||||||
|
'servername': server_name
|
||||||
|
})
|
||||||
proxy_add_start_time = time.perf_counter()
|
proxy_add_start_time = time.perf_counter()
|
||||||
spawner._proxy_pending = True
|
spawner._proxy_pending = True
|
||||||
try:
|
try:
|
||||||
@@ -1026,6 +1035,11 @@ class BaseHandler(RequestHandler):
|
|||||||
SERVER_STOP_DURATION_SECONDS.labels(
|
SERVER_STOP_DURATION_SECONDS.labels(
|
||||||
status=ServerStopStatus.success
|
status=ServerStopStatus.success
|
||||||
).observe(toc - tic)
|
).observe(toc - tic)
|
||||||
|
self.event_log.emit('hub.jupyter.org/server-action', 1, {
|
||||||
|
'action': 'stop',
|
||||||
|
'username': user.name,
|
||||||
|
'servername': server_name
|
||||||
|
})
|
||||||
except:
|
except:
|
||||||
SERVER_STOP_DURATION_SECONDS.labels(
|
SERVER_STOP_DURATION_SECONDS.labels(
|
||||||
status=ServerStopStatus.failure
|
status=ServerStopStatus.failure
|
||||||
|
Reference in New Issue
Block a user