1 | """ |
---|
2 | Utilities for working with Twisted Deferreds. |
---|
3 | """ |
---|
4 | |
---|
5 | from __future__ import annotations |
---|
6 | |
---|
7 | import time |
---|
8 | from functools import wraps |
---|
9 | |
---|
10 | from typing import ( |
---|
11 | Callable, |
---|
12 | Any, |
---|
13 | Sequence, |
---|
14 | TypeVar, |
---|
15 | Optional, |
---|
16 | Coroutine, |
---|
17 | Generator |
---|
18 | ) |
---|
19 | from typing_extensions import ParamSpec |
---|
20 | |
---|
21 | from foolscap.api import eventually |
---|
22 | from eliot.twisted import ( |
---|
23 | inline_callbacks, |
---|
24 | ) |
---|
25 | from twisted.internet import defer, reactor, error |
---|
26 | from twisted.internet.defer import Deferred |
---|
27 | from twisted.python.failure import Failure |
---|
28 | |
---|
29 | from allmydata.util import log |
---|
30 | from allmydata.util.assertutil import _assert |
---|
31 | from allmydata.util.pollmixin import PollMixin |
---|
32 | |
---|
33 | |
---|
34 | class TimeoutError(Exception): |
---|
35 | pass |
---|
36 | |
---|
37 | |
---|
38 | def timeout_call(reactor, d, timeout): |
---|
39 | """ |
---|
40 | This returns the result of 'd', unless 'timeout' expires before |
---|
41 | 'd' is completed in which case a TimeoutError is raised. |
---|
42 | """ |
---|
43 | timer_d = defer.Deferred() |
---|
44 | |
---|
45 | def _timed_out(): |
---|
46 | timer_d.errback(Failure(TimeoutError())) |
---|
47 | |
---|
48 | def _got_result(x): |
---|
49 | try: |
---|
50 | timer.cancel() |
---|
51 | timer_d.callback(x) |
---|
52 | except (error.AlreadyCalled, defer.AlreadyCalledError): |
---|
53 | pass |
---|
54 | return None |
---|
55 | |
---|
56 | timer = reactor.callLater(timeout, _timed_out) |
---|
57 | d.addBoth(_got_result) |
---|
58 | return timer_d |
---|
59 | |
---|
60 | |
---|
61 | |
---|
62 | # utility wrapper for DeferredList |
---|
63 | def _check_deferred_list(results): |
---|
64 | # if any of the component Deferreds failed, return the first failure such |
---|
65 | # that an addErrback() would fire. If all were ok, return a list of the |
---|
66 | # results (without the success/failure booleans) |
---|
67 | for success,f in results: |
---|
68 | if not success: |
---|
69 | return f |
---|
70 | return [r[1] for r in results] |
---|
71 | |
---|
72 | def DeferredListShouldSucceed(dl): |
---|
73 | d = defer.DeferredList(dl) |
---|
74 | d.addCallback(_check_deferred_list) |
---|
75 | return d |
---|
76 | |
---|
77 | def _parseDListResult(l): |
---|
78 | return [x[1] for x in l] |
---|
79 | |
---|
80 | def _unwrapFirstError(f): |
---|
81 | f.trap(defer.FirstError) |
---|
82 | raise f.value.subFailure |
---|
83 | |
---|
84 | def gatherResults(deferredList): |
---|
85 | """Returns list with result of given Deferreds. |
---|
86 | |
---|
87 | This builds on C{DeferredList} but is useful since you don't |
---|
88 | need to parse the result for success/failure. |
---|
89 | |
---|
90 | @type deferredList: C{list} of L{Deferred}s |
---|
91 | """ |
---|
92 | d = defer.DeferredList(deferredList, fireOnOneErrback=True, consumeErrors=True) |
---|
93 | d.addCallbacks(_parseDListResult, _unwrapFirstError) |
---|
94 | return d |
---|
95 | |
---|
96 | |
---|
97 | def _with_log(op, res): |
---|
98 | """ |
---|
99 | The default behaviour on firing an already-fired Deferred is unhelpful for |
---|
100 | debugging, because the AlreadyCalledError can easily get lost or be raised |
---|
101 | in a context that results in a different error. So make sure it is logged |
---|
102 | (for the abstractions defined here). If we are in a test, log.err will cause |
---|
103 | the test to fail. |
---|
104 | """ |
---|
105 | try: |
---|
106 | op(res) |
---|
107 | except defer.AlreadyCalledError as e: |
---|
108 | log.err(e, op=repr(op), level=log.WEIRD) |
---|
109 | |
---|
110 | def eventually_callback(d): |
---|
111 | def _callback(res): |
---|
112 | eventually(_with_log, d.callback, res) |
---|
113 | return res |
---|
114 | return _callback |
---|
115 | |
---|
116 | def eventually_errback(d): |
---|
117 | def _errback(res): |
---|
118 | eventually(_with_log, d.errback, res) |
---|
119 | return res |
---|
120 | return _errback |
---|
121 | |
---|
122 | def eventual_chain(source, target): |
---|
123 | source.addCallbacks(eventually_callback(target), eventually_errback(target)) |
---|
124 | |
---|
125 | |
---|
126 | class HookMixin(object): |
---|
127 | """ |
---|
128 | I am a helper mixin that maintains a collection of named hooks, primarily |
---|
129 | for use in tests. Each hook is set to an unfired Deferred using 'set_hook', |
---|
130 | and can then be fired exactly once at the appropriate time by '_call_hook'. |
---|
131 | If 'ignore_count' is given, that number of calls to '_call_hook' will be |
---|
132 | ignored before firing the hook. |
---|
133 | |
---|
134 | I assume a '_hooks' attribute that should set by the class constructor to |
---|
135 | a dict mapping each valid hook name to None. |
---|
136 | """ |
---|
137 | def set_hook(self, name, d=None, ignore_count=0): |
---|
138 | """ |
---|
139 | Called by the hook observer (e.g. by a test). |
---|
140 | If d is not given, an unfired Deferred is created and returned. |
---|
141 | The hook must not already be set. |
---|
142 | """ |
---|
143 | self._log("set_hook %r, ignore_count=%r" % (name, ignore_count)) |
---|
144 | if d is None: |
---|
145 | d = defer.Deferred() |
---|
146 | _assert(ignore_count >= 0, ignore_count=ignore_count) |
---|
147 | _assert(name in self._hooks, name=name) |
---|
148 | _assert(self._hooks[name] is None, name=name, hook=self._hooks[name]) |
---|
149 | _assert(isinstance(d, defer.Deferred), d=d) |
---|
150 | |
---|
151 | self._hooks[name] = (d, ignore_count) |
---|
152 | return d |
---|
153 | |
---|
154 | def _call_hook(self, res, name, **kwargs): |
---|
155 | """ |
---|
156 | Called to trigger the hook, with argument 'res'. This is a no-op if |
---|
157 | the hook is unset. If the hook's ignore_count is positive, it will be |
---|
158 | decremented; if it was already zero, the hook will be unset, and then |
---|
159 | its Deferred will be fired synchronously. |
---|
160 | |
---|
161 | The expected usage is "deferred.addBoth(self._call_hook, 'hookname')". |
---|
162 | This ensures that if 'res' is a failure, the hook will be errbacked, |
---|
163 | which will typically cause the test to also fail. |
---|
164 | 'res' is returned so that the current result or failure will be passed |
---|
165 | through. |
---|
166 | |
---|
167 | Accepts a single keyword argument, async, defaulting to False. |
---|
168 | """ |
---|
169 | async_ = kwargs.get("async", False) |
---|
170 | hook = self._hooks[name] |
---|
171 | if hook is None: |
---|
172 | return res # pass on error/result |
---|
173 | |
---|
174 | (d, ignore_count) = hook |
---|
175 | self._log("call_hook %r, ignore_count=%r" % (name, ignore_count)) |
---|
176 | if ignore_count > 0: |
---|
177 | self._hooks[name] = (d, ignore_count - 1) |
---|
178 | else: |
---|
179 | self._hooks[name] = None |
---|
180 | if async_: |
---|
181 | _with_log(eventually_callback(d), res) |
---|
182 | else: |
---|
183 | _with_log(d.callback, res) |
---|
184 | return res |
---|
185 | |
---|
186 | def _log(self, msg): |
---|
187 | log.msg(msg, level=log.NOISY) |
---|
188 | |
---|
189 | |
---|
190 | class WaitForDelayedCallsMixin(PollMixin): |
---|
191 | def _delayed_calls_done(self): |
---|
192 | # We're done when the only remaining DelayedCalls fire after threshold. |
---|
193 | # (These will be associated with the test timeout, or else they *should* |
---|
194 | # cause an unclean reactor error because the test should have waited for |
---|
195 | # them.) |
---|
196 | threshold = time.time() + 10 |
---|
197 | for delayed in reactor.getDelayedCalls(): |
---|
198 | if delayed.getTime() < threshold: |
---|
199 | return False |
---|
200 | return True |
---|
201 | |
---|
202 | def wait_for_delayed_calls(self, res=None): |
---|
203 | """ |
---|
204 | Use like this at the end of a test: |
---|
205 | d.addBoth(self.wait_for_delayed_calls) |
---|
206 | """ |
---|
207 | d = self.poll(self._delayed_calls_done) |
---|
208 | d.addErrback(log.err, "error while waiting for delayed calls") |
---|
209 | d.addBoth(lambda ign: res) |
---|
210 | return d |
---|
211 | |
---|
212 | @inline_callbacks |
---|
213 | def until( |
---|
214 | action: Callable[[], defer.Deferred[Any]], |
---|
215 | condition: Callable[[], bool], |
---|
216 | ) -> Generator[Any, None, None]: |
---|
217 | """ |
---|
218 | Run a Deferred-returning function until a condition is true. |
---|
219 | |
---|
220 | :param action: The action to run. |
---|
221 | :param condition: The predicate signaling stop. |
---|
222 | |
---|
223 | :return: A Deferred that fires after the condition signals stop. |
---|
224 | """ |
---|
225 | while True: |
---|
226 | yield action() |
---|
227 | if condition(): |
---|
228 | break |
---|
229 | |
---|
230 | |
---|
231 | P = ParamSpec("P") |
---|
232 | R = TypeVar("R") |
---|
233 | |
---|
234 | |
---|
235 | def async_to_deferred(f: Callable[P, Coroutine[defer.Deferred[R], None, R]]) -> Callable[P, Deferred[R]]: |
---|
236 | """ |
---|
237 | Wrap an async function to return a Deferred instead. |
---|
238 | |
---|
239 | Maybe solution to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3886 |
---|
240 | """ |
---|
241 | |
---|
242 | @wraps(f) |
---|
243 | def not_async(*args: P.args, **kwargs: P.kwargs) -> Deferred[R]: |
---|
244 | return defer.Deferred.fromCoroutine(f(*args, **kwargs)) |
---|
245 | |
---|
246 | return not_async |
---|
247 | |
---|
248 | |
---|
249 | class MultiFailure(Exception): |
---|
250 | """ |
---|
251 | More than one failure occurred. |
---|
252 | """ |
---|
253 | |
---|
254 | def __init__(self, failures: Sequence[Failure]) -> None: |
---|
255 | super(MultiFailure, self).__init__() |
---|
256 | self.failures = failures |
---|
257 | |
---|
258 | |
---|
259 | _T = TypeVar("_T") |
---|
260 | |
---|
261 | # Eventually this should be in Twisted upstream: |
---|
262 | # https://github.com/twisted/twisted/pull/11818 |
---|
263 | def race(ds: Sequence[Deferred[_T]]) -> Deferred[tuple[int, _T]]: |
---|
264 | """ |
---|
265 | Select the first available result from the sequence of Deferreds and |
---|
266 | cancel the rest. |
---|
267 | @return: A cancellable L{Deferred} that fires with the index and output of |
---|
268 | the element of C{ds} to have a success result first, or that fires |
---|
269 | with L{MultiFailure} holding a list of their failures if they all |
---|
270 | fail. |
---|
271 | """ |
---|
272 | # Keep track of the Deferred for the action which completed first. When |
---|
273 | # it completes, all of the other Deferreds will get cancelled but this one |
---|
274 | # shouldn't be. Even though it "completed" it isn't really done - the |
---|
275 | # caller will still be using it for something. If we cancelled it, |
---|
276 | # cancellation could propagate down to them. |
---|
277 | winner: Optional[Deferred] = None |
---|
278 | |
---|
279 | # The cancellation function for the Deferred this function returns. |
---|
280 | def cancel(result: Deferred) -> None: |
---|
281 | # If it is cancelled then we cancel all of the Deferreds for the |
---|
282 | # individual actions because there is no longer the possibility of |
---|
283 | # delivering any of their results anywhere. We don't have to fire |
---|
284 | # `result` because the Deferred will do that for us. |
---|
285 | for d in to_cancel: |
---|
286 | d.cancel() |
---|
287 | |
---|
288 | # The Deferred that this function will return. It will fire with the |
---|
289 | # index and output of the action that completes first, or None if all of |
---|
290 | # the actions fail. If it is cancelled, all of the actions will be |
---|
291 | # cancelled. |
---|
292 | final_result: Deferred[tuple[int, _T]] = Deferred(canceller=cancel) |
---|
293 | |
---|
294 | # A callback for an individual action. |
---|
295 | def succeeded(this_output: _T, this_index: int) -> None: |
---|
296 | # If it is the first action to succeed then it becomes the "winner", |
---|
297 | # its index/output become the externally visible result, and the rest |
---|
298 | # of the action Deferreds get cancelled. If it is not the first |
---|
299 | # action to succeed (because some action did not support |
---|
300 | # cancellation), just ignore the result. It is uncommon for this |
---|
301 | # callback to be entered twice. The only way it can happen is if one |
---|
302 | # of the input Deferreds has a cancellation function that fires the |
---|
303 | # Deferred with a success result. |
---|
304 | nonlocal winner |
---|
305 | if winner is None: |
---|
306 | # This is the first success. Act on it. |
---|
307 | winner = to_cancel[this_index] |
---|
308 | |
---|
309 | # Cancel the rest. |
---|
310 | for d in to_cancel: |
---|
311 | if d is not winner: |
---|
312 | d.cancel() |
---|
313 | |
---|
314 | # Fire our Deferred |
---|
315 | final_result.callback((this_index, this_output)) |
---|
316 | |
---|
317 | # Keep track of how many actions have failed. If they all fail we need to |
---|
318 | # deliver failure notification on our externally visible result. |
---|
319 | failure_state = [] |
---|
320 | |
---|
321 | def failed(failure: Failure, this_index: int) -> None: |
---|
322 | failure_state.append((this_index, failure)) |
---|
323 | if len(failure_state) == len(to_cancel): |
---|
324 | # Every operation failed. |
---|
325 | failure_state.sort() |
---|
326 | failures = [f for (ignored, f) in failure_state] |
---|
327 | final_result.errback(MultiFailure(failures)) |
---|
328 | |
---|
329 | # Copy the sequence of Deferreds so we know it doesn't get mutated out |
---|
330 | # from under us. |
---|
331 | to_cancel = list(ds) |
---|
332 | for index, d in enumerate(ds): |
---|
333 | # Propagate the position of this action as well as the argument to f |
---|
334 | # to the success callback so we can cancel the right Deferreds and |
---|
335 | # propagate the result outwards. |
---|
336 | d.addCallbacks(succeeded, failed, callbackArgs=(index,), errbackArgs=(index,)) |
---|
337 | |
---|
338 | return final_result |
---|