1 | """ |
---|
2 | General functionality useful for the implementation of integration tests. |
---|
3 | """ |
---|
4 | |
---|
5 | from __future__ import annotations |
---|
6 | |
---|
7 | from contextlib import contextmanager |
---|
8 | from typing import Any |
---|
9 | from typing_extensions import Literal |
---|
10 | from tempfile import NamedTemporaryFile |
---|
11 | import sys |
---|
12 | import time |
---|
13 | import json |
---|
14 | from os import mkdir, environ |
---|
15 | from os.path import exists, join, basename |
---|
16 | from io import StringIO, BytesIO |
---|
17 | from subprocess import check_output |
---|
18 | |
---|
19 | from twisted.python.filepath import ( |
---|
20 | FilePath, |
---|
21 | ) |
---|
22 | from twisted.internet.defer import Deferred, succeed |
---|
23 | from twisted.internet.protocol import ProcessProtocol |
---|
24 | from twisted.internet.error import ProcessExitedAlready, ProcessDone |
---|
25 | from twisted.internet.threads import deferToThread |
---|
26 | from twisted.internet.interfaces import IProcessTransport, IReactorProcess |
---|
27 | |
---|
28 | from attrs import frozen, evolve |
---|
29 | import requests |
---|
30 | |
---|
31 | from cryptography.hazmat.primitives.asymmetric import rsa |
---|
32 | from cryptography.hazmat.backends import default_backend |
---|
33 | from cryptography.hazmat.primitives.serialization import ( |
---|
34 | Encoding, |
---|
35 | PrivateFormat, |
---|
36 | NoEncryption, |
---|
37 | ) |
---|
38 | |
---|
39 | from paramiko.rsakey import RSAKey |
---|
40 | from boltons.funcutils import wraps |
---|
41 | |
---|
42 | from allmydata.util import base32 |
---|
43 | from allmydata.util.configutil import ( |
---|
44 | get_config, |
---|
45 | set_config, |
---|
46 | write_config, |
---|
47 | ) |
---|
48 | from allmydata import client |
---|
49 | from allmydata.interfaces import DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE |
---|
50 | |
---|
51 | import pytest_twisted |
---|
52 | |
---|
53 | |
---|
54 | def block_with_timeout(deferred, reactor, timeout=120): |
---|
55 | """Block until Deferred has result, but timeout instead of waiting forever.""" |
---|
56 | deferred.addTimeout(timeout, reactor) |
---|
57 | return pytest_twisted.blockon(deferred) |
---|
58 | |
---|
59 | |
---|
60 | class _ProcessExitedProtocol(ProcessProtocol): |
---|
61 | """ |
---|
62 | Internal helper that .callback()s on self.done when the process |
---|
63 | exits (for any reason). |
---|
64 | """ |
---|
65 | |
---|
66 | def __init__(self): |
---|
67 | self.done = Deferred() |
---|
68 | |
---|
69 | def processEnded(self, reason): |
---|
70 | self.done.callback(None) |
---|
71 | |
---|
72 | |
---|
73 | class ProcessFailed(Exception): |
---|
74 | """ |
---|
75 | A subprocess has failed. |
---|
76 | |
---|
77 | :ivar ProcessTerminated reason: the original reason from .processExited |
---|
78 | |
---|
79 | :ivar StringIO output: all stdout and stderr collected to this point. |
---|
80 | """ |
---|
81 | |
---|
82 | def __init__(self, reason, output): |
---|
83 | self.reason = reason |
---|
84 | self.output = output |
---|
85 | |
---|
86 | def __str__(self): |
---|
87 | return "<ProcessFailed: {}>:\n{}".format(self.reason, self.output) |
---|
88 | |
---|
89 | |
---|
90 | class _CollectOutputProtocol(ProcessProtocol): |
---|
91 | """ |
---|
92 | Internal helper. Collects all output (stdout + stderr) into |
---|
93 | self.output, and callback's on done with all of it after the |
---|
94 | process exits (for any reason). |
---|
95 | """ |
---|
96 | |
---|
97 | def __init__(self, capture_stderr=True, stdin=None): |
---|
98 | self.done = Deferred() |
---|
99 | self.output = BytesIO() |
---|
100 | self.capture_stderr = capture_stderr |
---|
101 | self._stdin = stdin |
---|
102 | |
---|
103 | def connectionMade(self): |
---|
104 | if self._stdin is not None: |
---|
105 | self.transport.write(self._stdin) |
---|
106 | self.transport.closeStdin() |
---|
107 | |
---|
108 | def processEnded(self, reason): |
---|
109 | if not self.done.called: |
---|
110 | self.done.callback(self.output.getvalue()) |
---|
111 | |
---|
112 | def processExited(self, reason): |
---|
113 | if not isinstance(reason.value, ProcessDone): |
---|
114 | self.done.errback(ProcessFailed(reason, self.output.getvalue())) |
---|
115 | |
---|
116 | def outReceived(self, data): |
---|
117 | self.output.write(data) |
---|
118 | |
---|
119 | def errReceived(self, data): |
---|
120 | if self.capture_stderr: |
---|
121 | self.output.write(data) |
---|
122 | |
---|
123 | |
---|
124 | class _DumpOutputProtocol(ProcessProtocol): |
---|
125 | """ |
---|
126 | Internal helper. |
---|
127 | """ |
---|
128 | def __init__(self, f): |
---|
129 | self.done = Deferred() |
---|
130 | self._out = f if f is not None else sys.stdout |
---|
131 | |
---|
132 | def processEnded(self, reason): |
---|
133 | if not self.done.called: |
---|
134 | self.done.callback(None) |
---|
135 | |
---|
136 | def processExited(self, reason): |
---|
137 | if not isinstance(reason.value, ProcessDone): |
---|
138 | self.done.errback(reason) |
---|
139 | |
---|
140 | def outReceived(self, data): |
---|
141 | data = str(data, sys.stdout.encoding) |
---|
142 | self._out.write(data) |
---|
143 | |
---|
144 | def errReceived(self, data): |
---|
145 | data = str(data, sys.stdout.encoding) |
---|
146 | self._out.write(data) |
---|
147 | |
---|
148 | |
---|
149 | class _MagicTextProtocol(ProcessProtocol): |
---|
150 | """ |
---|
151 | Internal helper. Monitors all stdout looking for a magic string, |
---|
152 | and then .callback()s on self.done and .errback's if the process exits |
---|
153 | """ |
---|
154 | |
---|
155 | def __init__(self, magic_text: str, name: str) -> None: |
---|
156 | self.magic_seen = Deferred() |
---|
157 | self.name = f"{name}: " |
---|
158 | self.exited = Deferred() |
---|
159 | self._magic_text = magic_text |
---|
160 | self._output = StringIO() |
---|
161 | |
---|
162 | def processEnded(self, reason): |
---|
163 | self.exited.callback(None) |
---|
164 | |
---|
165 | def outReceived(self, data): |
---|
166 | data = str(data, sys.stdout.encoding) |
---|
167 | for line in data.splitlines(): |
---|
168 | sys.stdout.write(self.name + line + "\n") |
---|
169 | self._output.write(data) |
---|
170 | if not self.magic_seen.called and self._magic_text in self._output.getvalue(): |
---|
171 | print("Saw '{}' in the logs".format(self._magic_text)) |
---|
172 | self.magic_seen.callback(self) |
---|
173 | |
---|
174 | def errReceived(self, data): |
---|
175 | data = str(data, sys.stderr.encoding) |
---|
176 | for line in data.splitlines(): |
---|
177 | sys.stdout.write(self.name + line + "\n") |
---|
178 | |
---|
179 | |
---|
180 | def _cleanup_process_async(transport: IProcessTransport) -> None: |
---|
181 | """ |
---|
182 | If the given process transport seems to still be associated with a |
---|
183 | running process, send a SIGTERM to that process. |
---|
184 | |
---|
185 | :param transport: The transport to use. |
---|
186 | |
---|
187 | :raise: ``ValueError`` if ``allow_missing`` is ``False`` and the transport |
---|
188 | has no process. |
---|
189 | """ |
---|
190 | if transport.pid is None: |
---|
191 | # in cases of "restart", we will have registered a finalizer |
---|
192 | # that will kill the process -- but already explicitly killed |
---|
193 | # it (and then ran again) due to the "restart". So, if the |
---|
194 | # process is already killed, our job is done. |
---|
195 | print("Process already cleaned up and that's okay.") |
---|
196 | return |
---|
197 | print("signaling {} with TERM".format(transport.pid)) |
---|
198 | try: |
---|
199 | transport.signalProcess('TERM') |
---|
200 | except ProcessExitedAlready: |
---|
201 | # The transport object thought it still had a process but the real OS |
---|
202 | # process has already exited. That's fine. We accomplished what we |
---|
203 | # wanted to. |
---|
204 | pass |
---|
205 | |
---|
206 | def _cleanup_tahoe_process(tahoe_transport, exited): |
---|
207 | """ |
---|
208 | Terminate the given process with a kill signal (SIGTERM on POSIX, |
---|
209 | TerminateProcess on Windows). |
---|
210 | |
---|
211 | :param tahoe_transport: The `IProcessTransport` representing the process. |
---|
212 | :param exited: A `Deferred` which fires when the process has exited. |
---|
213 | |
---|
214 | :return: After the process has exited. |
---|
215 | """ |
---|
216 | from twisted.internet import reactor |
---|
217 | _cleanup_process_async(tahoe_transport) |
---|
218 | print(f"signaled, blocking on exit {exited}") |
---|
219 | block_with_timeout(exited, reactor) |
---|
220 | print("exited, goodbye") |
---|
221 | |
---|
222 | |
---|
223 | def run_tahoe(reactor, request, *args, **kwargs): |
---|
224 | """ |
---|
225 | Helper to run tahoe with optional coverage. |
---|
226 | |
---|
227 | :returns: a Deferred that fires when the command is done (or a |
---|
228 | ProcessFailed exception if it exits non-zero) |
---|
229 | """ |
---|
230 | stdin = kwargs.get("stdin", None) |
---|
231 | protocol = _CollectOutputProtocol(stdin=stdin) |
---|
232 | process = _tahoe_runner_optional_coverage(protocol, reactor, request, args) |
---|
233 | process.exited = protocol.done |
---|
234 | return protocol.done |
---|
235 | |
---|
236 | |
---|
237 | def _tahoe_runner_optional_coverage(proto, reactor, request, other_args): |
---|
238 | """ |
---|
239 | Internal helper. Calls spawnProcess with `-m |
---|
240 | allmydata.scripts.runner` and `other_args`, optionally inserting a |
---|
241 | `--coverage` option if the `request` indicates we should. |
---|
242 | """ |
---|
243 | if request.config.getoption('coverage', False): |
---|
244 | args = [sys.executable, '-b', '-m', 'coverage', 'run', '-m', 'allmydata.scripts.runner', '--coverage'] |
---|
245 | else: |
---|
246 | args = [sys.executable, '-b', '-m', 'allmydata.scripts.runner'] |
---|
247 | args += other_args |
---|
248 | return reactor.spawnProcess( |
---|
249 | proto, |
---|
250 | sys.executable, |
---|
251 | args, |
---|
252 | env=environ, |
---|
253 | ) |
---|
254 | |
---|
255 | |
---|
256 | class TahoeProcess(object): |
---|
257 | """ |
---|
258 | A running Tahoe process, with associated information. |
---|
259 | """ |
---|
260 | |
---|
261 | def __init__(self, process_transport, node_dir): |
---|
262 | self._process_transport = process_transport # IProcessTransport instance |
---|
263 | self._node_dir = node_dir # path |
---|
264 | |
---|
265 | @property |
---|
266 | def transport(self): |
---|
267 | return self._process_transport |
---|
268 | |
---|
269 | @property |
---|
270 | def node_dir(self): |
---|
271 | return self._node_dir |
---|
272 | |
---|
273 | def get_config(self): |
---|
274 | return client.read_config( |
---|
275 | self._node_dir, |
---|
276 | u"portnum", |
---|
277 | ) |
---|
278 | |
---|
279 | def kill(self): |
---|
280 | """ |
---|
281 | Kill the process, block until it's done. |
---|
282 | Does nothing if the process is already stopped (or never started). |
---|
283 | """ |
---|
284 | print(f"TahoeProcess.kill({self.transport.pid} / {self.node_dir})") |
---|
285 | _cleanup_tahoe_process(self.transport, self.transport.exited) |
---|
286 | |
---|
287 | def kill_async(self): |
---|
288 | """ |
---|
289 | Kill the process, return a Deferred that fires when it's done. |
---|
290 | Does nothing if the process is already stopped (or never started). |
---|
291 | """ |
---|
292 | print(f"TahoeProcess.kill_async({self.transport.pid} / {self.node_dir})") |
---|
293 | _cleanup_process_async(self.transport) |
---|
294 | return self.transport.exited |
---|
295 | |
---|
296 | def restart_async(self, reactor: IReactorProcess, request: Any) -> Deferred: |
---|
297 | """ |
---|
298 | Stop and then re-start the associated process. |
---|
299 | |
---|
300 | :return: A Deferred that fires after the new process is ready to |
---|
301 | handle requests. |
---|
302 | """ |
---|
303 | d = self.kill_async() |
---|
304 | d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None)) |
---|
305 | def got_new_process(proc): |
---|
306 | # Grab the new transport since the one we had before is no longer |
---|
307 | # valid after the stop/start cycle. |
---|
308 | self._process_transport = proc.transport |
---|
309 | d.addCallback(got_new_process) |
---|
310 | return d |
---|
311 | |
---|
312 | def __str__(self): |
---|
313 | return "<TahoeProcess in '{}'>".format(self._node_dir) |
---|
314 | |
---|
315 | |
---|
316 | def _run_node(reactor, node_dir, request, magic_text): |
---|
317 | """ |
---|
318 | Run a tahoe process from its node_dir. |
---|
319 | |
---|
320 | :returns: a TahoeProcess for this node |
---|
321 | """ |
---|
322 | if magic_text is None: |
---|
323 | magic_text = "client running" |
---|
324 | protocol = _MagicTextProtocol(magic_text, basename(node_dir)) |
---|
325 | |
---|
326 | # "tahoe run" is consistent across Linux/macOS/Windows, unlike the old |
---|
327 | # "start" command. |
---|
328 | transport = _tahoe_runner_optional_coverage( |
---|
329 | protocol, |
---|
330 | reactor, |
---|
331 | request, |
---|
332 | [ |
---|
333 | '--eliot-destination', 'file:{}/logs/eliot.json'.format(node_dir), |
---|
334 | 'run', |
---|
335 | node_dir, |
---|
336 | ], |
---|
337 | ) |
---|
338 | transport.exited = protocol.exited |
---|
339 | |
---|
340 | tahoe_process = TahoeProcess( |
---|
341 | transport, |
---|
342 | node_dir, |
---|
343 | ) |
---|
344 | |
---|
345 | request.addfinalizer(tahoe_process.kill) |
---|
346 | |
---|
347 | d = protocol.magic_seen |
---|
348 | d.addCallback(lambda ignored: tahoe_process) |
---|
349 | return d |
---|
350 | |
---|
351 | |
---|
352 | def basic_node_configuration(request, flog_gatherer, node_dir: str): |
---|
353 | """ |
---|
354 | Setup common configuration options for a node, given a ``pytest`` request |
---|
355 | fixture. |
---|
356 | """ |
---|
357 | config_path = join(node_dir, 'tahoe.cfg') |
---|
358 | config = get_config(config_path) |
---|
359 | set_config( |
---|
360 | config, |
---|
361 | u'node', |
---|
362 | u'log_gatherer.furl', |
---|
363 | flog_gatherer, |
---|
364 | ) |
---|
365 | force_foolscap = request.config.getoption("force_foolscap") |
---|
366 | assert force_foolscap in (True, False) |
---|
367 | set_config( |
---|
368 | config, |
---|
369 | 'storage', |
---|
370 | 'force_foolscap', |
---|
371 | str(force_foolscap), |
---|
372 | ) |
---|
373 | set_config( |
---|
374 | config, |
---|
375 | 'client', |
---|
376 | 'force_foolscap', |
---|
377 | str(force_foolscap), |
---|
378 | ) |
---|
379 | write_config(FilePath(config_path), config) |
---|
380 | |
---|
381 | |
---|
382 | def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, name, web_port, |
---|
383 | storage=True, |
---|
384 | magic_text=None, |
---|
385 | needed=2, |
---|
386 | happy=3, |
---|
387 | total=4): |
---|
388 | """ |
---|
389 | Helper to create a single node, run it and return the instance |
---|
390 | spawnProcess returned (ITransport) |
---|
391 | """ |
---|
392 | node_dir = join(temp_dir, name) |
---|
393 | if web_port is None: |
---|
394 | web_port = '' |
---|
395 | if exists(node_dir): |
---|
396 | created_d = succeed(None) |
---|
397 | else: |
---|
398 | print("creating: {}".format(node_dir)) |
---|
399 | mkdir(node_dir) |
---|
400 | done_proto = _ProcessExitedProtocol() |
---|
401 | args = [ |
---|
402 | 'create-node', |
---|
403 | '--nickname', name, |
---|
404 | '--introducer', introducer_furl, |
---|
405 | '--hostname', 'localhost', |
---|
406 | '--listen', 'tcp', |
---|
407 | '--webport', web_port, |
---|
408 | '--shares-needed', str(needed), |
---|
409 | '--shares-happy', str(happy), |
---|
410 | '--shares-total', str(total), |
---|
411 | '--helper', |
---|
412 | ] |
---|
413 | if not storage: |
---|
414 | args.append('--no-storage') |
---|
415 | args.append(node_dir) |
---|
416 | |
---|
417 | _tahoe_runner_optional_coverage(done_proto, reactor, request, args) |
---|
418 | created_d = done_proto.done |
---|
419 | |
---|
420 | def created(_): |
---|
421 | basic_node_configuration(request, flog_gatherer.furl, node_dir) |
---|
422 | created_d.addCallback(created) |
---|
423 | |
---|
424 | d = Deferred() |
---|
425 | d.callback(None) |
---|
426 | d.addCallback(lambda _: created_d) |
---|
427 | d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text)) |
---|
428 | return d |
---|
429 | |
---|
430 | |
---|
431 | class UnwantedFilesException(Exception): |
---|
432 | """ |
---|
433 | While waiting for some files to appear, some undesired files |
---|
434 | appeared instead (or in addition). |
---|
435 | """ |
---|
436 | def __init__(self, waiting, unwanted): |
---|
437 | super(UnwantedFilesException, self).__init__( |
---|
438 | u"While waiting for '{}', unwanted files appeared: {}".format( |
---|
439 | waiting, |
---|
440 | u', '.join(unwanted), |
---|
441 | ) |
---|
442 | ) |
---|
443 | |
---|
444 | |
---|
445 | class ExpectedFileMismatchException(Exception): |
---|
446 | """ |
---|
447 | A file or files we wanted weren't found within the timeout. |
---|
448 | """ |
---|
449 | def __init__(self, path, timeout): |
---|
450 | super(ExpectedFileMismatchException, self).__init__( |
---|
451 | u"Contents of '{}' mismatched after {}s".format(path, timeout), |
---|
452 | ) |
---|
453 | |
---|
454 | |
---|
455 | class ExpectedFileUnfoundException(Exception): |
---|
456 | """ |
---|
457 | A file or files we expected to find didn't appear within the |
---|
458 | timeout. |
---|
459 | """ |
---|
460 | def __init__(self, path, timeout): |
---|
461 | super(ExpectedFileUnfoundException, self).__init__( |
---|
462 | u"Didn't find '{}' after {}s".format(path, timeout), |
---|
463 | ) |
---|
464 | |
---|
465 | |
---|
466 | |
---|
467 | class FileShouldVanishException(Exception): |
---|
468 | """ |
---|
469 | A file or files we expected to disappear did not within the |
---|
470 | timeout |
---|
471 | """ |
---|
472 | def __init__(self, path, timeout): |
---|
473 | super(FileShouldVanishException, self).__init__( |
---|
474 | u"'{}' still exists after {}s".format(path, timeout), |
---|
475 | ) |
---|
476 | |
---|
477 | |
---|
478 | def run_in_thread(f): |
---|
479 | """Decorator for integration tests that runs code in a thread. |
---|
480 | |
---|
481 | Because we're using pytest_twisted, tests that rely on the reactor are |
---|
482 | expected to return a Deferred and use async APIs so the reactor can run. |
---|
483 | |
---|
484 | In the case of the integration test suite, it launches nodes in the |
---|
485 | background using Twisted APIs. The nodes stdout and stderr is read via |
---|
486 | Twisted code. If the reactor doesn't run, reads don't happen, and |
---|
487 | eventually the buffers fill up, and the nodes block when they try to flush |
---|
488 | logs. |
---|
489 | |
---|
490 | We can switch to Twisted APIs (treq instead of requests etc.), but |
---|
491 | sometimes it's easier or expedient to just have a blocking test. So this |
---|
492 | decorator allows you to run the test in a thread, and the reactor can keep |
---|
493 | running in the main thread. |
---|
494 | |
---|
495 | See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3597 for tracking bug. |
---|
496 | """ |
---|
497 | @wraps(f) |
---|
498 | def test(*args, **kwargs): |
---|
499 | return deferToThread(lambda: f(*args, **kwargs)) |
---|
500 | return test |
---|
501 | |
---|
502 | |
---|
503 | def await_file_contents(path, contents, timeout=15, error_if=None): |
---|
504 | """ |
---|
505 | wait up to `timeout` seconds for the file at `path` (any path-like |
---|
506 | object) to have the exact content `contents`. |
---|
507 | |
---|
508 | :param error_if: if specified, a list of additional paths; if any |
---|
509 | of these paths appear an Exception is raised. |
---|
510 | """ |
---|
511 | start_time = time.time() |
---|
512 | while time.time() - start_time < timeout: |
---|
513 | print(" waiting for '{}'".format(path)) |
---|
514 | if error_if and any([exists(p) for p in error_if]): |
---|
515 | raise UnwantedFilesException( |
---|
516 | waiting=path, |
---|
517 | unwanted=[p for p in error_if if exists(p)], |
---|
518 | ) |
---|
519 | if exists(path): |
---|
520 | try: |
---|
521 | with open(path, 'r') as f: |
---|
522 | current = f.read() |
---|
523 | except IOError: |
---|
524 | print("IOError; trying again") |
---|
525 | else: |
---|
526 | if current == contents: |
---|
527 | return True |
---|
528 | print(" file contents still mismatched") |
---|
529 | print(" wanted: {}".format(contents.replace('\n', ' '))) |
---|
530 | print(" got: {}".format(current.replace('\n', ' '))) |
---|
531 | time.sleep(1) |
---|
532 | if exists(path): |
---|
533 | raise ExpectedFileMismatchException(path, timeout) |
---|
534 | raise ExpectedFileUnfoundException(path, timeout) |
---|
535 | |
---|
536 | |
---|
537 | def await_files_exist(paths, timeout=15, await_all=False): |
---|
538 | """ |
---|
539 | wait up to `timeout` seconds for any of the paths to exist; when |
---|
540 | any exist, a list of all found filenames is returned. Otherwise, |
---|
541 | an Exception is raised |
---|
542 | """ |
---|
543 | start_time = time.time() |
---|
544 | while time.time() - start_time < timeout: |
---|
545 | print(" waiting for: {}".format(' '.join(paths))) |
---|
546 | found = [p for p in paths if exists(p)] |
---|
547 | print("found: {}".format(found)) |
---|
548 | if await_all: |
---|
549 | if len(found) == len(paths): |
---|
550 | return found |
---|
551 | else: |
---|
552 | if len(found) > 0: |
---|
553 | return found |
---|
554 | time.sleep(1) |
---|
555 | if await_all: |
---|
556 | nice_paths = ' and '.join(paths) |
---|
557 | else: |
---|
558 | nice_paths = ' or '.join(paths) |
---|
559 | raise ExpectedFileUnfoundException(nice_paths, timeout) |
---|
560 | |
---|
561 | |
---|
562 | def await_file_vanishes(path, timeout=10): |
---|
563 | start_time = time.time() |
---|
564 | while time.time() - start_time < timeout: |
---|
565 | print(" waiting for '{}' to vanish".format(path)) |
---|
566 | if not exists(path): |
---|
567 | return |
---|
568 | time.sleep(1) |
---|
569 | raise FileShouldVanishException(path, timeout) |
---|
570 | |
---|
571 | |
---|
572 | def cli(node, *argv): |
---|
573 | """ |
---|
574 | Run a tahoe CLI subcommand for a given node in a blocking manner, returning |
---|
575 | the output. |
---|
576 | """ |
---|
577 | arguments = ["tahoe", '--node-directory', node.node_dir] |
---|
578 | return check_output(arguments + list(argv)) |
---|
579 | |
---|
580 | |
---|
581 | def node_url(node_dir, uri_fragment): |
---|
582 | """ |
---|
583 | Create a fully qualified URL by reading config from `node_dir` and |
---|
584 | adding the `uri_fragment` |
---|
585 | """ |
---|
586 | with open(join(node_dir, "node.url"), "r") as f: |
---|
587 | base = f.read().strip() |
---|
588 | url = base + uri_fragment |
---|
589 | return url |
---|
590 | |
---|
591 | |
---|
592 | def _check_status(response): |
---|
593 | """ |
---|
594 | Check the response code is a 2xx (raise an exception otherwise) |
---|
595 | """ |
---|
596 | if response.status_code < 200 or response.status_code >= 300: |
---|
597 | raise ValueError( |
---|
598 | "Expected a 2xx code, got {}".format(response.status_code) |
---|
599 | ) |
---|
600 | |
---|
601 | |
---|
602 | def web_get(tahoe, uri_fragment, **kwargs): |
---|
603 | """ |
---|
604 | Make a GET request to the webport of `tahoe` (a `TahoeProcess`, |
---|
605 | usually from a fixture (e.g. `alice`). This will look like: |
---|
606 | `http://localhost:<webport>/<uri_fragment>`. All `kwargs` are |
---|
607 | passed on to `requests.get` |
---|
608 | """ |
---|
609 | url = node_url(tahoe.node_dir, uri_fragment) |
---|
610 | resp = requests.get(url, **kwargs) |
---|
611 | _check_status(resp) |
---|
612 | return resp.content |
---|
613 | |
---|
614 | |
---|
615 | def web_post(tahoe, uri_fragment, **kwargs): |
---|
616 | """ |
---|
617 | Make a POST request to the webport of `node` (a `TahoeProcess, |
---|
618 | usually from a fixture e.g. `alice`). This will look like: |
---|
619 | `http://localhost:<webport>/<uri_fragment>`. All `kwargs` are |
---|
620 | passed on to `requests.post` |
---|
621 | """ |
---|
622 | url = node_url(tahoe.node_dir, uri_fragment) |
---|
623 | resp = requests.post(url, **kwargs) |
---|
624 | _check_status(resp) |
---|
625 | return resp.content |
---|
626 | |
---|
627 | |
---|
628 | @run_in_thread |
---|
629 | def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_servers=1): |
---|
630 | """ |
---|
631 | Uses the status API to wait for a client-type node (in `tahoe`, a |
---|
632 | `TahoeProcess` instance usually from a fixture e.g. `alice`) to be |
---|
633 | 'ready'. A client is deemed ready if: |
---|
634 | |
---|
635 | - it answers `http://<node_url>/statistics/?t=json/` |
---|
636 | - there is at least one storage-server connected (configurable via |
---|
637 | ``minimum_number_of_servers``) |
---|
638 | - every storage-server has a "last_received_data" and it is |
---|
639 | within the last `liveness` seconds |
---|
640 | |
---|
641 | We will try for up to `timeout` seconds for the above conditions |
---|
642 | to be true. Otherwise, an exception is raised |
---|
643 | """ |
---|
644 | start = time.time() |
---|
645 | while (time.time() - start) < float(timeout): |
---|
646 | try: |
---|
647 | data = web_get(tahoe, u"", params={u"t": u"json"}) |
---|
648 | js = json.loads(data) |
---|
649 | except Exception as e: |
---|
650 | print("waiting because '{}'".format(e)) |
---|
651 | time.sleep(1) |
---|
652 | continue |
---|
653 | servers = js['servers'] |
---|
654 | |
---|
655 | if len(servers) < minimum_number_of_servers: |
---|
656 | print(f"waiting because {servers} is fewer than required ({minimum_number_of_servers})") |
---|
657 | time.sleep(1) |
---|
658 | continue |
---|
659 | |
---|
660 | now = time.time() |
---|
661 | server_times = [ |
---|
662 | server['last_received_data'] |
---|
663 | for server |
---|
664 | in servers |
---|
665 | if server['last_received_data'] is not None |
---|
666 | ] |
---|
667 | print( |
---|
668 | f"Now: {time.ctime(now)}\n" |
---|
669 | f"Liveness required: {liveness}\n" |
---|
670 | f"Server last-received-data: {[time.ctime(s) for s in server_times]}\n" |
---|
671 | f"Server ages: {[now - s for s in server_times]}\n" |
---|
672 | ) |
---|
673 | |
---|
674 | # check that all times are 'recent enough' (it's OK if _some_ servers |
---|
675 | # are down, we just want to make sure a sufficient number are up) |
---|
676 | alive = [t for t in server_times if now - t <= liveness] |
---|
677 | if len(alive) < minimum_number_of_servers: |
---|
678 | print( |
---|
679 | f"waiting because we found {len(alive)} servers " |
---|
680 | f"and want {minimum_number_of_servers}" |
---|
681 | ) |
---|
682 | time.sleep(1) |
---|
683 | continue |
---|
684 | |
---|
685 | # we have a status with at least one server, and all servers |
---|
686 | # have been contacted recently |
---|
687 | return True |
---|
688 | # we only fall out of the loop when we've timed out |
---|
689 | raise RuntimeError( |
---|
690 | "Waited {} seconds for {} to be 'ready' but it never was".format( |
---|
691 | timeout, |
---|
692 | tahoe, |
---|
693 | ) |
---|
694 | ) |
---|
695 | |
---|
696 | |
---|
697 | def generate_ssh_key(path): |
---|
698 | """Create a new SSH private/public key pair.""" |
---|
699 | key = RSAKey.generate(2048) |
---|
700 | key.write_private_key_file(path) |
---|
701 | with open(path + ".pub", "wb") as f: |
---|
702 | s = "%s %s" % (key.get_name(), key.get_base64()) |
---|
703 | f.write(s.encode("ascii")) |
---|
704 | |
---|
705 | |
---|
706 | @frozen |
---|
707 | class CHK: |
---|
708 | """ |
---|
709 | Represent the CHK encoding sufficiently to run a ``tahoe put`` command |
---|
710 | using it. |
---|
711 | """ |
---|
712 | kind = "chk" |
---|
713 | max_shares = 256 |
---|
714 | |
---|
715 | def customize(self) -> CHK: |
---|
716 | # Nothing to do. |
---|
717 | return self |
---|
718 | |
---|
719 | @classmethod |
---|
720 | def load(cls, params: None) -> CHK: |
---|
721 | assert params is None |
---|
722 | return cls() |
---|
723 | |
---|
724 | def to_json(self) -> None: |
---|
725 | return None |
---|
726 | |
---|
727 | @contextmanager |
---|
728 | def to_argv(self) -> None: |
---|
729 | yield [] |
---|
730 | |
---|
731 | @frozen |
---|
732 | class SSK: |
---|
733 | """ |
---|
734 | Represent the SSK encodings (SDMF and MDMF) sufficiently to run a |
---|
735 | ``tahoe put`` command using one of them. |
---|
736 | """ |
---|
737 | kind = "ssk" |
---|
738 | |
---|
739 | # SDMF and MDMF encode share counts (N and k) into the share itself as an |
---|
740 | # unsigned byte. They could have encoded (share count - 1) to fit the |
---|
741 | # full range supported by ZFEC into the unsigned byte - but they don't. |
---|
742 | # So 256 is inaccessible to those formats and we set the upper bound at |
---|
743 | # 255. |
---|
744 | max_shares = 255 |
---|
745 | |
---|
746 | name: Literal["sdmf", "mdmf"] |
---|
747 | key: None | bytes |
---|
748 | |
---|
749 | @classmethod |
---|
750 | def load(cls, params: dict) -> SSK: |
---|
751 | assert params.keys() == {"format", "mutable", "key"} |
---|
752 | return cls(params["format"], params["key"].encode("ascii")) |
---|
753 | def customize(self) -> SSK: |
---|
754 | """ |
---|
755 | Return an SSK with a newly generated random RSA key. |
---|
756 | """ |
---|
757 | return evolve(self, key=generate_rsa_key()) |
---|
758 | |
---|
759 | def to_json(self) -> dict[str, str]: |
---|
760 | return { |
---|
761 | "format": self.name, |
---|
762 | "mutable": None, |
---|
763 | "key": self.key.decode("ascii"), |
---|
764 | } |
---|
765 | |
---|
766 | @contextmanager |
---|
767 | def to_argv(self) -> None: |
---|
768 | with NamedTemporaryFile() as f: |
---|
769 | f.write(self.key) |
---|
770 | f.flush() |
---|
771 | yield [f"--format={self.name}", "--mutable", f"--private-key-path={f.name}"] |
---|
772 | |
---|
773 | |
---|
774 | def upload(alice: TahoeProcess, fmt: CHK | SSK, data: bytes) -> str: |
---|
775 | """ |
---|
776 | Upload the given data to the given node. |
---|
777 | |
---|
778 | :param alice: The node to upload to. |
---|
779 | |
---|
780 | :param fmt: The name of the format for the upload. CHK, SDMF, or MDMF. |
---|
781 | |
---|
782 | :param data: The data to upload. |
---|
783 | |
---|
784 | :return: The capability for the uploaded data. |
---|
785 | """ |
---|
786 | |
---|
787 | with NamedTemporaryFile() as f: |
---|
788 | f.write(data) |
---|
789 | f.flush() |
---|
790 | with fmt.to_argv() as fmt_argv: |
---|
791 | argv = [alice.process, "put"] + fmt_argv + [f.name] |
---|
792 | return cli(*argv).decode("utf-8").strip() |
---|
793 | |
---|
794 | |
---|
795 | async def reconfigure(reactor, request, node: TahoeProcess, |
---|
796 | params: tuple[int, int, int], |
---|
797 | convergence: None | bytes, |
---|
798 | max_segment_size: None | int = None) -> None: |
---|
799 | """ |
---|
800 | Reconfigure a Tahoe-LAFS node with different ZFEC parameters and |
---|
801 | convergence secret. |
---|
802 | |
---|
803 | TODO This appears to have issues on Windows. |
---|
804 | |
---|
805 | If the current configuration is different from the specified |
---|
806 | configuration, the node will be restarted so it takes effect. |
---|
807 | |
---|
808 | :param reactor: A reactor to use to restart the process. |
---|
809 | :param request: The pytest request object to use to arrange process |
---|
810 | cleanup. |
---|
811 | :param node: The Tahoe-LAFS node to reconfigure. |
---|
812 | :param params: The ``happy``, ``needed``, and ``total`` ZFEC encoding |
---|
813 | parameters. |
---|
814 | :param convergence: If given, the convergence secret. If not given, the |
---|
815 | existing convergence secret will be left alone. |
---|
816 | |
---|
817 | :return: ``None`` after the node configuration has been rewritten, the |
---|
818 | node has been restarted, and the node is ready to provide service. |
---|
819 | """ |
---|
820 | happy, needed, total = params |
---|
821 | config = node.get_config() |
---|
822 | |
---|
823 | changed = False |
---|
824 | cur_happy = int(config.get_config("client", "shares.happy")) |
---|
825 | cur_needed = int(config.get_config("client", "shares.needed")) |
---|
826 | cur_total = int(config.get_config("client", "shares.total")) |
---|
827 | |
---|
828 | if (happy, needed, total) != (cur_happy, cur_needed, cur_total): |
---|
829 | changed = True |
---|
830 | config.set_config("client", "shares.happy", str(happy)) |
---|
831 | config.set_config("client", "shares.needed", str(needed)) |
---|
832 | config.set_config("client", "shares.total", str(total)) |
---|
833 | |
---|
834 | if convergence is not None: |
---|
835 | cur_convergence = config.get_private_config("convergence").encode("ascii") |
---|
836 | if base32.a2b(cur_convergence) != convergence: |
---|
837 | changed = True |
---|
838 | config.write_private_config("convergence", base32.b2a(convergence)) |
---|
839 | |
---|
840 | if max_segment_size is not None: |
---|
841 | cur_segment_size = int(config.get_config("client", "shares._max_immutable_segment_size_for_testing", DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE)) |
---|
842 | if cur_segment_size != max_segment_size: |
---|
843 | changed = True |
---|
844 | config.set_config( |
---|
845 | "client", |
---|
846 | "shares._max_immutable_segment_size_for_testing", |
---|
847 | str(max_segment_size) |
---|
848 | ) |
---|
849 | |
---|
850 | if changed: |
---|
851 | # restart the node |
---|
852 | print(f"Restarting {node.node_dir} for ZFEC reconfiguration") |
---|
853 | await node.restart_async(reactor, request) |
---|
854 | print("Restarted. Waiting for ready state.") |
---|
855 | await await_client_ready(node) |
---|
856 | print("Ready.") |
---|
857 | else: |
---|
858 | print("Config unchanged, not restarting.") |
---|
859 | |
---|
860 | |
---|
861 | def generate_rsa_key() -> bytes: |
---|
862 | """ |
---|
863 | Generate a 2048 bit RSA key suitable for use with SSKs. |
---|
864 | """ |
---|
865 | return rsa.generate_private_key( |
---|
866 | public_exponent=65537, |
---|
867 | key_size=2048, |
---|
868 | backend=default_backend() |
---|
869 | ).private_bytes( |
---|
870 | encoding=Encoding.PEM, |
---|
871 | format=PrivateFormat.TraditionalOpenSSL, |
---|
872 | encryption_algorithm=NoEncryption(), |
---|
873 | ) |
---|