1 | """ |
---|
2 | pytest infrastructure for benchmarks. |
---|
3 | |
---|
4 | The number of nodes is parameterized via a --number-of-nodes CLI option added |
---|
5 | to pytest. |
---|
6 | """ |
---|
7 | |
---|
8 | import os |
---|
9 | from shutil import which, rmtree |
---|
10 | from tempfile import mkdtemp |
---|
11 | from contextlib import contextmanager |
---|
12 | from time import time |
---|
13 | |
---|
14 | import pytest |
---|
15 | import pytest_twisted |
---|
16 | |
---|
17 | from twisted.internet import reactor |
---|
18 | from twisted.internet.defer import DeferredList, succeed |
---|
19 | |
---|
20 | from allmydata.util.iputil import allocate_tcp_port |
---|
21 | |
---|
22 | from integration.grid import Client, create_grid, create_flog_gatherer |
---|
23 | |
---|
24 | |
---|
25 | def pytest_addoption(parser): |
---|
26 | parser.addoption( |
---|
27 | "--number-of-nodes", |
---|
28 | action="append", |
---|
29 | default=[], |
---|
30 | type=int, |
---|
31 | help="list of number_of_nodes to benchmark against", |
---|
32 | ) |
---|
33 | # Required to be compatible with integration.util code that we indirectly |
---|
34 | # depend on, but also might be useful. |
---|
35 | parser.addoption( |
---|
36 | "--force-foolscap", |
---|
37 | action="store_true", |
---|
38 | default=False, |
---|
39 | dest="force_foolscap", |
---|
40 | help=( |
---|
41 | "If set, force Foolscap only for the storage protocol. " |
---|
42 | + "Otherwise HTTP will be used." |
---|
43 | ), |
---|
44 | ) |
---|
45 | |
---|
46 | |
---|
47 | def pytest_generate_tests(metafunc): |
---|
48 | # Make number_of_nodes accessible as a parameterized fixture: |
---|
49 | if "number_of_nodes" in metafunc.fixturenames: |
---|
50 | metafunc.parametrize( |
---|
51 | "number_of_nodes", |
---|
52 | metafunc.config.getoption("number_of_nodes"), |
---|
53 | scope="session", |
---|
54 | ) |
---|
55 | |
---|
56 | |
---|
57 | def port_allocator(): |
---|
58 | port = allocate_tcp_port() |
---|
59 | return succeed(port) |
---|
60 | |
---|
61 | |
---|
62 | @pytest.fixture(scope="session") |
---|
63 | def grid(request): |
---|
64 | """ |
---|
65 | Provides a new Grid with a single Introducer and flog-gathering process. |
---|
66 | |
---|
67 | Notably does _not_ provide storage servers; use the storage_nodes |
---|
68 | fixture if your tests need a Grid that can be used for puts / gets. |
---|
69 | """ |
---|
70 | tmp_path = mkdtemp(prefix="tahoe-benchmark") |
---|
71 | request.addfinalizer(lambda: rmtree(tmp_path)) |
---|
72 | flog_binary = which("flogtool") |
---|
73 | flog_gatherer = pytest_twisted.blockon( |
---|
74 | create_flog_gatherer(reactor, request, tmp_path, flog_binary) |
---|
75 | ) |
---|
76 | g = pytest_twisted.blockon( |
---|
77 | create_grid(reactor, request, tmp_path, flog_gatherer, port_allocator) |
---|
78 | ) |
---|
79 | return g |
---|
80 | |
---|
81 | |
---|
82 | @pytest.fixture(scope="session") |
---|
83 | def storage_nodes(grid, number_of_nodes): |
---|
84 | nodes_d = [] |
---|
85 | for _ in range(number_of_nodes): |
---|
86 | nodes_d.append(grid.add_storage_node()) |
---|
87 | |
---|
88 | nodes_status = pytest_twisted.blockon(DeferredList(nodes_d)) |
---|
89 | for ok, value in nodes_status: |
---|
90 | assert ok, "Storage node creation failed: {}".format(value) |
---|
91 | return grid.storage_servers |
---|
92 | |
---|
93 | |
---|
94 | @pytest.fixture(scope="session") |
---|
95 | def client_node(request, grid, storage_nodes, number_of_nodes) -> Client: |
---|
96 | """ |
---|
97 | Create a grid client node with number of shares matching number of nodes. |
---|
98 | """ |
---|
99 | client_node = pytest_twisted.blockon( |
---|
100 | grid.add_client( |
---|
101 | "client_node", |
---|
102 | needed=number_of_nodes, |
---|
103 | happy=number_of_nodes, |
---|
104 | total=number_of_nodes + 3, # Make sure FEC does some work |
---|
105 | ) |
---|
106 | ) |
---|
107 | print(f"Client node pid: {client_node.process.transport.pid}") |
---|
108 | return client_node |
---|
109 | |
---|
110 | def get_cpu_time_for_cgroup(): |
---|
111 | """ |
---|
112 | Get how many CPU seconds have been used in current cgroup so far. |
---|
113 | |
---|
114 | Assumes we're running in a v2 cgroup. |
---|
115 | """ |
---|
116 | with open("/proc/self/cgroup") as f: |
---|
117 | cgroup = f.read().strip().split(":")[-1] |
---|
118 | assert cgroup.startswith("/") |
---|
119 | cgroup = cgroup[1:] |
---|
120 | cpu_stat = os.path.join("/sys/fs/cgroup", cgroup, "cpu.stat") |
---|
121 | with open(cpu_stat) as f: |
---|
122 | for line in f.read().splitlines(): |
---|
123 | if line.startswith("usage_usec"): |
---|
124 | return int(line.split()[1]) / 1_000_000 |
---|
125 | raise ValueError("Failed to find usage_usec") |
---|
126 | |
---|
127 | |
---|
128 | class Benchmarker: |
---|
129 | """Keep track of benchmarking results.""" |
---|
130 | |
---|
131 | @contextmanager |
---|
132 | def record(self, capsys: pytest.CaptureFixture[str], name, **parameters): |
---|
133 | """Record the timing of running some code, if it succeeds.""" |
---|
134 | start_cpu = get_cpu_time_for_cgroup() |
---|
135 | start = time() |
---|
136 | yield |
---|
137 | elapsed = time() - start |
---|
138 | end_cpu = get_cpu_time_for_cgroup() |
---|
139 | elapsed_cpu = end_cpu - start_cpu |
---|
140 | # FOR now we just print the outcome: |
---|
141 | parameters = " ".join(f"{k}={v}" for (k, v) in parameters.items()) |
---|
142 | with capsys.disabled(): |
---|
143 | print( |
---|
144 | f"\nBENCHMARK RESULT: {name} {parameters} elapsed={elapsed:.3} (secs) CPU={elapsed_cpu:.3} (secs)\n" |
---|
145 | ) |
---|
146 | |
---|
147 | |
---|
148 | @pytest.fixture(scope="session") |
---|
149 | def tahoe_benchmarker(): |
---|
150 | return Benchmarker() |
---|