source: trunk/src/allmydata/util/cputhreadpool.py

Last change on this file was 774b4f2, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-11-21T13:31:34Z

Fix mypy errors

  • Property mode set to 100644
File size: 3.1 KB
Line 
1"""
2A global thread pool for CPU-intensive tasks.
3
4Motivation:
5
6* Certain tasks are blocking on CPU, and so should be run in a thread.
7* The Twisted thread pool is used for operations that don't necessarily block
8  on CPU, like DNS lookups.  CPU processing should not block DNS lookups!
9* The number of threads should be fixed, and tied to the number of available
10  CPUs.
11
12As a first pass, this uses ``os.cpu_count()`` to determine the max number of
13threads.  This may create too many threads, as it doesn't cover things like
14scheduler affinity or cgroups, but that's not the end of the world.
15"""
16
17import os
18from typing import TypeVar, Callable, cast
19from functools import partial
20import threading
21from typing_extensions import ParamSpec
22from unittest import TestCase
23
24from twisted.python.threadpool import ThreadPool
25from twisted.internet.threads import deferToThreadPool
26from twisted.internet import reactor
27from twisted.internet.interfaces import IReactorFromThreads
28
29_CPU_THREAD_POOL = ThreadPool(minthreads=0, maxthreads=os.cpu_count() or 1, name="TahoeCPU")
30if hasattr(threading, "_register_atexit"):
31    # This is a private API present in Python 3.8 or later, specifically
32    # designed for thread pool shutdown. Since it's private, it might go away
33    # at any point, so if it doesn't exist we still have a solution.
34    threading._register_atexit(_CPU_THREAD_POOL.stop)  # type: ignore
35else:
36    # Daemon threads allow shutdown to happen without any explicit stopping of
37    # threads. There are some bugs in old Python versions related to daemon
38    # threads (fixed in subsequent CPython patch releases), but Python's own
39    # thread pools use daemon threads in those versions so we're no worse off.
40    _CPU_THREAD_POOL.threadFactory = partial(  # type: ignore
41        _CPU_THREAD_POOL.threadFactory, daemon=True
42    )
43_CPU_THREAD_POOL.start()
44
45
46P = ParamSpec("P")
47R = TypeVar("R")
48
49# Is running in a thread pool disabled? Should only be true in synchronous unit
50# tests.
51_DISABLED = False
52
53
54async def defer_to_thread(f: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R:
55    """
56    Run the function in a thread, return the result.
57
58    However, if ``disable_thread_pool_for_test()`` was called the function will
59    be called synchronously inside the current thread.
60
61    To reduce chances of synchronous tests being misleading as a result, this
62    is an async function on presumption that will encourage immediate ``await``ing.
63    """
64    if _DISABLED:
65        return f(*args, **kwargs)
66
67    # deferToThreadPool has no type annotations...
68    result = await deferToThreadPool(cast(IReactorFromThreads, reactor), _CPU_THREAD_POOL, f, *args, **kwargs)
69    return result
70
71
72def disable_thread_pool_for_test(test: TestCase) -> None:
73    """
74    For the duration of the test, calls to ``defer_to_thread()`` will actually
75    run synchronously, which is useful for synchronous unit tests.
76    """
77    global _DISABLED
78
79    def restore():
80        global _DISABLED
81        _DISABLED = False
82
83    test.addCleanup(restore)
84
85    _DISABLED = True
86
87
88__all__ = ["defer_to_thread", "disable_thread_pool_for_test"]
Note: See TracBrowser for help on using the repository browser.