source: trunk/src/allmydata/util/deferredutil.py

Last change on this file was 054c8935, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-06-14T18:16:10Z

Pacify mypy

  • Property mode set to 100644
File size: 11.2 KB
Line 
1"""
2Utilities for working with Twisted Deferreds.
3"""
4
5from __future__ import annotations
6
7import time
8from functools import wraps
9
10from typing import (
11    Callable,
12    Any,
13    Sequence,
14    TypeVar,
15    Optional,
16    Coroutine,
17    Generator
18)
19from typing_extensions import ParamSpec
20
21from foolscap.api import eventually
22from eliot.twisted import (
23    inline_callbacks,
24)
25from twisted.internet import defer, reactor, error
26from twisted.internet.defer import Deferred
27from twisted.python.failure import Failure
28
29from allmydata.util import log
30from allmydata.util.assertutil import _assert
31from allmydata.util.pollmixin import PollMixin
32
33
34class TimeoutError(Exception):
35    pass
36
37
38def timeout_call(reactor, d, timeout):
39    """
40    This returns the result of 'd', unless 'timeout' expires before
41    'd' is completed in which case a TimeoutError is raised.
42    """
43    timer_d = defer.Deferred()
44
45    def _timed_out():
46        timer_d.errback(Failure(TimeoutError()))
47
48    def _got_result(x):
49        try:
50            timer.cancel()
51            timer_d.callback(x)
52        except (error.AlreadyCalled, defer.AlreadyCalledError):
53            pass
54        return None
55
56    timer = reactor.callLater(timeout, _timed_out)
57    d.addBoth(_got_result)
58    return timer_d
59
60
61
62# utility wrapper for DeferredList
63def _check_deferred_list(results):
64    # if any of the component Deferreds failed, return the first failure such
65    # that an addErrback() would fire. If all were ok, return a list of the
66    # results (without the success/failure booleans)
67    for success,f in results:
68        if not success:
69            return f
70    return [r[1] for r in results]
71
72def DeferredListShouldSucceed(dl):
73    d = defer.DeferredList(dl)
74    d.addCallback(_check_deferred_list)
75    return d
76
77def _parseDListResult(l):
78    return [x[1] for x in l]
79
80def _unwrapFirstError(f):
81    f.trap(defer.FirstError)
82    raise f.value.subFailure
83
84def gatherResults(deferredList):
85    """Returns list with result of given Deferreds.
86
87    This builds on C{DeferredList} but is useful since you don't
88    need to parse the result for success/failure.
89
90    @type deferredList:  C{list} of L{Deferred}s
91    """
92    d = defer.DeferredList(deferredList, fireOnOneErrback=True, consumeErrors=True)
93    d.addCallbacks(_parseDListResult, _unwrapFirstError)
94    return d
95
96
97def _with_log(op, res):
98    """
99    The default behaviour on firing an already-fired Deferred is unhelpful for
100    debugging, because the AlreadyCalledError can easily get lost or be raised
101    in a context that results in a different error. So make sure it is logged
102    (for the abstractions defined here). If we are in a test, log.err will cause
103    the test to fail.
104    """
105    try:
106        op(res)
107    except defer.AlreadyCalledError as e:
108        log.err(e, op=repr(op), level=log.WEIRD)
109
110def eventually_callback(d):
111    def _callback(res):
112        eventually(_with_log, d.callback, res)
113        return res
114    return _callback
115
116def eventually_errback(d):
117    def _errback(res):
118        eventually(_with_log, d.errback, res)
119        return res
120    return _errback
121
122def eventual_chain(source, target):
123    source.addCallbacks(eventually_callback(target), eventually_errback(target))
124
125
126class HookMixin(object):
127    """
128    I am a helper mixin that maintains a collection of named hooks, primarily
129    for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
130    and can then be fired exactly once at the appropriate time by '_call_hook'.
131    If 'ignore_count' is given, that number of calls to '_call_hook' will be
132    ignored before firing the hook.
133
134    I assume a '_hooks' attribute that should set by the class constructor to
135    a dict mapping each valid hook name to None.
136    """
137    def set_hook(self, name, d=None, ignore_count=0):
138        """
139        Called by the hook observer (e.g. by a test).
140        If d is not given, an unfired Deferred is created and returned.
141        The hook must not already be set.
142        """
143        self._log("set_hook %r, ignore_count=%r" % (name, ignore_count))
144        if d is None:
145            d = defer.Deferred()
146        _assert(ignore_count >= 0, ignore_count=ignore_count)
147        _assert(name in self._hooks, name=name)
148        _assert(self._hooks[name] is None, name=name, hook=self._hooks[name])
149        _assert(isinstance(d, defer.Deferred), d=d)
150
151        self._hooks[name] = (d, ignore_count)
152        return d
153
154    def _call_hook(self, res, name, **kwargs):
155        """
156        Called to trigger the hook, with argument 'res'. This is a no-op if
157        the hook is unset. If the hook's ignore_count is positive, it will be
158        decremented; if it was already zero, the hook will be unset, and then
159        its Deferred will be fired synchronously.
160
161        The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
162        This ensures that if 'res' is a failure, the hook will be errbacked,
163        which will typically cause the test to also fail.
164        'res' is returned so that the current result or failure will be passed
165        through.
166
167        Accepts a single keyword argument, async, defaulting to False.
168        """
169        async_ = kwargs.get("async", False)
170        hook = self._hooks[name]
171        if hook is None:
172            return res  # pass on error/result
173
174        (d, ignore_count) = hook
175        self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
176        if ignore_count > 0:
177            self._hooks[name] = (d, ignore_count - 1)
178        else:
179            self._hooks[name] = None
180            if async_:
181                _with_log(eventually_callback(d), res)
182            else:
183                _with_log(d.callback, res)
184        return res
185
186    def _log(self, msg):
187        log.msg(msg, level=log.NOISY)
188
189
190class WaitForDelayedCallsMixin(PollMixin):
191    def _delayed_calls_done(self):
192        # We're done when the only remaining DelayedCalls fire after threshold.
193        # (These will be associated with the test timeout, or else they *should*
194        # cause an unclean reactor error because the test should have waited for
195        # them.)
196        threshold = time.time() + 10
197        for delayed in reactor.getDelayedCalls():
198            if delayed.getTime() < threshold:
199                return False
200        return True
201
202    def wait_for_delayed_calls(self, res=None):
203        """
204        Use like this at the end of a test:
205          d.addBoth(self.wait_for_delayed_calls)
206        """
207        d = self.poll(self._delayed_calls_done)
208        d.addErrback(log.err, "error while waiting for delayed calls")
209        d.addBoth(lambda ign: res)
210        return d
211
212@inline_callbacks
213def until(
214        action: Callable[[], defer.Deferred[Any]],
215        condition: Callable[[], bool],
216) -> Generator[Any, None, None]:
217    """
218    Run a Deferred-returning function until a condition is true.
219
220    :param action: The action to run.
221    :param condition: The predicate signaling stop.
222
223    :return: A Deferred that fires after the condition signals stop.
224    """
225    while True:
226        yield action()
227        if condition():
228            break
229
230
231P = ParamSpec("P")
232R = TypeVar("R")
233
234
235def async_to_deferred(f: Callable[P, Coroutine[defer.Deferred[R], None, R]]) -> Callable[P, Deferred[R]]:
236    """
237    Wrap an async function to return a Deferred instead.
238
239    Maybe solution to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3886
240    """
241
242    @wraps(f)
243    def not_async(*args: P.args, **kwargs: P.kwargs) -> Deferred[R]:
244        return defer.Deferred.fromCoroutine(f(*args, **kwargs))
245
246    return not_async
247
248
249class MultiFailure(Exception):
250    """
251    More than one failure occurred.
252    """
253
254    def __init__(self, failures: Sequence[Failure]) -> None:
255        super(MultiFailure, self).__init__()
256        self.failures = failures
257
258
259_T = TypeVar("_T")
260
261# Eventually this should be in Twisted upstream:
262# https://github.com/twisted/twisted/pull/11818
263def race(ds: Sequence[Deferred[_T]]) -> Deferred[tuple[int, _T]]:
264    """
265    Select the first available result from the sequence of Deferreds and
266    cancel the rest.
267    @return: A cancellable L{Deferred} that fires with the index and output of
268        the element of C{ds} to have a success result first, or that fires
269        with L{MultiFailure} holding a list of their failures if they all
270        fail.
271    """
272    # Keep track of the Deferred for the action which completed first.  When
273    # it completes, all of the other Deferreds will get cancelled but this one
274    # shouldn't be.  Even though it "completed" it isn't really done - the
275    # caller will still be using it for something.  If we cancelled it,
276    # cancellation could propagate down to them.
277    winner: Optional[Deferred] = None
278
279    # The cancellation function for the Deferred this function returns.
280    def cancel(result: Deferred) -> None:
281        # If it is cancelled then we cancel all of the Deferreds for the
282        # individual actions because there is no longer the possibility of
283        # delivering any of their results anywhere.  We don't have to fire
284        # `result` because the Deferred will do that for us.
285        for d in to_cancel:
286            d.cancel()
287
288    # The Deferred that this function will return.  It will fire with the
289    # index and output of the action that completes first, or None if all of
290    # the actions fail.  If it is cancelled, all of the actions will be
291    # cancelled.
292    final_result: Deferred[tuple[int, _T]] = Deferred(canceller=cancel)
293
294    # A callback for an individual action.
295    def succeeded(this_output: _T, this_index: int) -> None:
296        # If it is the first action to succeed then it becomes the "winner",
297        # its index/output become the externally visible result, and the rest
298        # of the action Deferreds get cancelled.  If it is not the first
299        # action to succeed (because some action did not support
300        # cancellation), just ignore the result.  It is uncommon for this
301        # callback to be entered twice.  The only way it can happen is if one
302        # of the input Deferreds has a cancellation function that fires the
303        # Deferred with a success result.
304        nonlocal winner
305        if winner is None:
306            # This is the first success.  Act on it.
307            winner = to_cancel[this_index]
308
309            # Cancel the rest.
310            for d in to_cancel:
311                if d is not winner:
312                    d.cancel()
313
314            # Fire our Deferred
315            final_result.callback((this_index, this_output))
316
317    # Keep track of how many actions have failed.  If they all fail we need to
318    # deliver failure notification on our externally visible result.
319    failure_state = []
320
321    def failed(failure: Failure, this_index: int) -> None:
322        failure_state.append((this_index, failure))
323        if len(failure_state) == len(to_cancel):
324            # Every operation failed.
325            failure_state.sort()
326            failures = [f for (ignored, f) in failure_state]
327            final_result.errback(MultiFailure(failures))
328
329    # Copy the sequence of Deferreds so we know it doesn't get mutated out
330    # from under us.
331    to_cancel = list(ds)
332    for index, d in enumerate(ds):
333        # Propagate the position of this action as well as the argument to f
334        # to the success callback so we can cancel the right Deferreds and
335        # propagate the result outwards.
336        d.addCallbacks(succeeded, failed, callbackArgs=(index,), errbackArgs=(index,))
337
338    return final_result
Note: See TracBrowser for help on using the repository browser.