Files
jupyterhub/jupyterhub/tests/utils.py
Min RK f05aecf5f9 test_api passes without threads
just put requests in a thread via `utils.async_requests`

eliminates db threads issue
2017-07-27 11:29:16 +02:00

19 lines
588 B
Python

from concurrent.futures import ThreadPoolExecutor
import requests
class _AsyncRequests:
"""Wrapper around requests to return a Future from request methods
A single thread is allocated to avoid blocking the IOLoop thread.
"""
def __init__(self):
self.executor = ThreadPoolExecutor(1)
def __getattr__(self, name):
requests_method = getattr(requests, name)
return lambda *args, **kwargs: self.executor.submit(requests_method, *args, **kwargs)
# async_requests.get = requests.get returning a Future, etc.
async_requests = _AsyncRequests()