1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | import os |
---|
6 | from twisted.trial import unittest |
---|
7 | from twisted.internet import defer, error |
---|
8 | from twisted.python.usage import UsageError |
---|
9 | from io import StringIO |
---|
10 | from unittest import mock |
---|
11 | from ..util import i2p_provider |
---|
12 | from ..scripts import create_node, runner |
---|
13 | |
---|
14 | def mock_txi2p(txi2p): |
---|
15 | return mock.patch("allmydata.util.i2p_provider._import_txi2p", |
---|
16 | return_value=txi2p) |
---|
17 | |
---|
18 | def mock_i2p(i2p): |
---|
19 | return mock.patch("allmydata.util.i2p_provider._import_i2p", |
---|
20 | return_value=i2p) |
---|
21 | |
---|
22 | def make_cli_config(basedir, *argv): |
---|
23 | parent = runner.Options() |
---|
24 | cli_config = create_node.CreateNodeOptions() |
---|
25 | cli_config.parent = parent |
---|
26 | cli_config.parseOptions(argv) |
---|
27 | cli_config["basedir"] = basedir |
---|
28 | cli_config.stdout = StringIO() |
---|
29 | return cli_config |
---|
30 | |
---|
31 | class TryToConnect(unittest.TestCase): |
---|
32 | def test_try(self): |
---|
33 | reactor = object() |
---|
34 | txi2p = mock.Mock() |
---|
35 | d = defer.succeed(True) |
---|
36 | txi2p.testAPI = mock.Mock(return_value=d) |
---|
37 | ep = object() |
---|
38 | stdout = StringIO() |
---|
39 | with mock.patch("allmydata.util.i2p_provider.clientFromString", |
---|
40 | return_value=ep) as cfs: |
---|
41 | d = i2p_provider._try_to_connect(reactor, "desc", stdout, txi2p) |
---|
42 | r = self.successResultOf(d) |
---|
43 | self.assertTrue(r) |
---|
44 | cfs.assert_called_with(reactor, "desc") |
---|
45 | txi2p.testAPI.assert_called_with(reactor, 'SAM', ep) |
---|
46 | |
---|
47 | def test_try_handled_error(self): |
---|
48 | reactor = object() |
---|
49 | txi2p = mock.Mock() |
---|
50 | d = defer.fail(error.ConnectError("oops")) |
---|
51 | txi2p.testAPI = mock.Mock(return_value=d) |
---|
52 | ep = object() |
---|
53 | stdout = StringIO() |
---|
54 | with mock.patch("allmydata.util.i2p_provider.clientFromString", |
---|
55 | return_value=ep) as cfs: |
---|
56 | d = i2p_provider._try_to_connect(reactor, "desc", stdout, txi2p) |
---|
57 | r = self.successResultOf(d) |
---|
58 | self.assertIs(r, None) |
---|
59 | cfs.assert_called_with(reactor, "desc") |
---|
60 | txi2p.testAPI.assert_called_with(reactor, 'SAM', ep) |
---|
61 | self.assertEqual(stdout.getvalue(), |
---|
62 | "Unable to reach I2P SAM API at 'desc': " |
---|
63 | "An error occurred while connecting: oops.\n") |
---|
64 | |
---|
65 | def test_try_unhandled_error(self): |
---|
66 | reactor = object() |
---|
67 | txi2p = mock.Mock() |
---|
68 | d = defer.fail(ValueError("oops")) |
---|
69 | txi2p.testAPI = mock.Mock(return_value=d) |
---|
70 | ep = object() |
---|
71 | stdout = StringIO() |
---|
72 | with mock.patch("allmydata.util.i2p_provider.clientFromString", |
---|
73 | return_value=ep) as cfs: |
---|
74 | d = i2p_provider._try_to_connect(reactor, "desc", stdout, txi2p) |
---|
75 | f = self.failureResultOf(d) |
---|
76 | self.assertIsInstance(f.value, ValueError) |
---|
77 | self.assertEqual(str(f.value), "oops") |
---|
78 | cfs.assert_called_with(reactor, "desc") |
---|
79 | txi2p.testAPI.assert_called_with(reactor, 'SAM', ep) |
---|
80 | self.assertEqual(stdout.getvalue(), "") |
---|
81 | |
---|
82 | class ConnectToI2P(unittest.TestCase): |
---|
83 | def _do_test_connect(self, endpoint, reachable): |
---|
84 | reactor = object() |
---|
85 | txi2p = object() |
---|
86 | args = [] |
---|
87 | if endpoint: |
---|
88 | args = ["--i2p-sam-port=%s" % endpoint] |
---|
89 | cli_config = make_cli_config("basedir", "--listen=i2p", *args) |
---|
90 | stdout = cli_config.stdout |
---|
91 | expected_port = "tcp:127.0.0.1:7656" |
---|
92 | if endpoint: |
---|
93 | expected_port = endpoint |
---|
94 | tried = [] |
---|
95 | def _try_to_connect(reactor, port, stdout, txi2p): |
---|
96 | tried.append( (reactor, port, stdout, txi2p) ) |
---|
97 | if not reachable: |
---|
98 | return defer.succeed(None) |
---|
99 | if port == expected_port: |
---|
100 | return defer.succeed(True) |
---|
101 | return defer.succeed(None) |
---|
102 | |
---|
103 | with mock.patch("allmydata.util.i2p_provider._try_to_connect", |
---|
104 | _try_to_connect): |
---|
105 | d = i2p_provider._connect_to_i2p(reactor, cli_config, txi2p) |
---|
106 | if not reachable: |
---|
107 | f = self.failureResultOf(d) |
---|
108 | self.assertIsInstance(f.value, ValueError) |
---|
109 | self.assertEqual(str(f.value), |
---|
110 | "unable to reach any default I2P SAM port") |
---|
111 | return |
---|
112 | successful_port = self.successResultOf(d) |
---|
113 | self.assertEqual(successful_port, expected_port) |
---|
114 | expected = [(reactor, "tcp:127.0.0.1:7656", stdout, txi2p)] |
---|
115 | if endpoint: |
---|
116 | expected = [(reactor, endpoint, stdout, txi2p)] |
---|
117 | self.assertEqual(tried, expected) |
---|
118 | |
---|
119 | def test_connect(self): |
---|
120 | return self._do_test_connect(None, True) |
---|
121 | def test_connect_endpoint(self): |
---|
122 | return self._do_test_connect("tcp:other:port", True) |
---|
123 | def test_connect_unreachable(self): |
---|
124 | return self._do_test_connect(None, False) |
---|
125 | |
---|
126 | |
---|
127 | class CreateDest(unittest.TestCase): |
---|
128 | def test_no_txi2p(self): |
---|
129 | with mock.patch("allmydata.util.i2p_provider._import_txi2p", |
---|
130 | return_value=None): |
---|
131 | d = i2p_provider.create_config("reactor", "cli_config") |
---|
132 | f = self.failureResultOf(d) |
---|
133 | self.assertIsInstance(f.value, ValueError) |
---|
134 | self.assertEqual(str(f.value), |
---|
135 | "Cannot create I2P Destination without txi2p. " |
---|
136 | "Please 'pip install tahoe-lafs[i2p]' to fix this.") |
---|
137 | |
---|
138 | def _do_test_launch(self, executable): |
---|
139 | basedir = self.mktemp() |
---|
140 | os.mkdir(basedir) |
---|
141 | args = ["--listen=i2p", "--i2p-launch"] |
---|
142 | if executable: |
---|
143 | args.append("--i2p-executable=%s" % executable) |
---|
144 | self.assertRaises(UsageError, make_cli_config, basedir, *args) |
---|
145 | |
---|
146 | def test_launch(self): |
---|
147 | return self._do_test_launch(None) |
---|
148 | def test_launch_executable(self): |
---|
149 | return self._do_test_launch("myi2p") |
---|
150 | |
---|
151 | def test_sam_endpoint(self): |
---|
152 | basedir = self.mktemp() |
---|
153 | os.mkdir(basedir) |
---|
154 | private_dir = os.path.join(basedir, "private") |
---|
155 | os.mkdir(private_dir) |
---|
156 | privkeyfile = os.path.abspath(os.path.join(private_dir, "i2p_dest.privkey")) |
---|
157 | reactor = object() |
---|
158 | cli_config = make_cli_config(basedir, "--listen=i2p") |
---|
159 | connect_to_i2p = mock.Mock(return_value=defer.succeed("goodport")) |
---|
160 | txi2p = mock.Mock() |
---|
161 | ep = object() |
---|
162 | dest = mock.Mock() |
---|
163 | dest.host = "FOOBAR.b32.i2p" |
---|
164 | txi2p.generateDestination = mock.Mock(return_value=defer.succeed(dest)) |
---|
165 | |
---|
166 | with mock_txi2p(txi2p): |
---|
167 | with mock.patch("allmydata.util.i2p_provider._connect_to_i2p", |
---|
168 | connect_to_i2p): |
---|
169 | with mock.patch("allmydata.util.i2p_provider.clientFromString", |
---|
170 | return_value=ep) as cfs: |
---|
171 | d = i2p_provider.create_config(reactor, cli_config) |
---|
172 | i2p_config = self.successResultOf(d) |
---|
173 | |
---|
174 | connect_to_i2p.assert_called_with(reactor, cli_config, txi2p) |
---|
175 | cfs.assert_called_with(reactor, "goodport") |
---|
176 | txi2p.generateDestination.assert_called_with(reactor, privkeyfile, 'SAM', ep) |
---|
177 | |
---|
178 | expected = {"sam.port": "goodport", |
---|
179 | "dest": "true", |
---|
180 | "dest.port": "3457", |
---|
181 | "dest.private_key_file": os.path.join("private", |
---|
182 | "i2p_dest.privkey"), |
---|
183 | } |
---|
184 | self.assertEqual(dict(i2p_config.node_config["i2p"]), expected) |
---|
185 | self.assertEqual(i2p_config.tub_ports, ["listen:i2p"]) |
---|
186 | self.assertEqual(i2p_config.tub_locations, ["i2p:FOOBAR.b32.i2p:3457"]) |
---|
187 | |
---|
188 | _None = object() |
---|
189 | class FakeConfig(dict): |
---|
190 | def get_config(self, section, option, default=_None, boolean=False): |
---|
191 | if section != "i2p": |
---|
192 | raise ValueError(section) |
---|
193 | value = self.get(option, default) |
---|
194 | if value is _None: |
---|
195 | raise KeyError |
---|
196 | return value |
---|
197 | |
---|
198 | class Provider(unittest.TestCase): |
---|
199 | def test_build(self): |
---|
200 | i2p_provider.create("reactor", FakeConfig()) |
---|
201 | |
---|
202 | def test_handler_disabled(self): |
---|
203 | p = i2p_provider.create("reactor", FakeConfig(enabled=False)) |
---|
204 | self.assertEqual(p.get_i2p_handler(), None) |
---|
205 | |
---|
206 | def test_handler_no_i2p(self): |
---|
207 | with mock_i2p(None): |
---|
208 | p = i2p_provider.create("reactor", FakeConfig()) |
---|
209 | self.assertEqual(p.get_i2p_handler(), None) |
---|
210 | |
---|
211 | def test_handler_sam_endpoint(self): |
---|
212 | i2p = mock.Mock() |
---|
213 | handler = object() |
---|
214 | i2p.sam_endpoint = mock.Mock(return_value=handler) |
---|
215 | ep = object() |
---|
216 | reactor = object() |
---|
217 | |
---|
218 | with mock_i2p(i2p): |
---|
219 | p = i2p_provider.create(reactor, |
---|
220 | FakeConfig(**{"sam.port": "ep_desc"})) |
---|
221 | with mock.patch("allmydata.util.i2p_provider.clientFromString", |
---|
222 | return_value=ep) as cfs: |
---|
223 | h = p.get_i2p_handler() |
---|
224 | cfs.assert_called_with(reactor, "ep_desc") |
---|
225 | self.assertIs(h, handler) |
---|
226 | i2p.sam_endpoint.assert_called_with(ep, keyfile=None) |
---|
227 | |
---|
228 | def test_handler_launch(self): |
---|
229 | i2p = mock.Mock() |
---|
230 | handler = object() |
---|
231 | i2p.launch = mock.Mock(return_value=handler) |
---|
232 | reactor = object() |
---|
233 | |
---|
234 | with mock_i2p(i2p): |
---|
235 | p = i2p_provider.create(reactor, |
---|
236 | FakeConfig(launch=True)) |
---|
237 | h = p.get_i2p_handler() |
---|
238 | self.assertIs(h, handler) |
---|
239 | i2p.launch.assert_called_with(i2p_configdir=None, i2p_binary=None) |
---|
240 | |
---|
241 | def test_handler_launch_configdir(self): |
---|
242 | i2p = mock.Mock() |
---|
243 | handler = object() |
---|
244 | i2p.launch = mock.Mock(return_value=handler) |
---|
245 | reactor = object() |
---|
246 | |
---|
247 | with mock_i2p(i2p): |
---|
248 | p = i2p_provider.create(reactor, |
---|
249 | FakeConfig(launch=True, |
---|
250 | **{"i2p.configdir": "configdir"})) |
---|
251 | h = p.get_i2p_handler() |
---|
252 | self.assertIs(h, handler) |
---|
253 | i2p.launch.assert_called_with(i2p_configdir="configdir", i2p_binary=None) |
---|
254 | |
---|
255 | def test_handler_launch_configdir_executable(self): |
---|
256 | i2p = mock.Mock() |
---|
257 | handler = object() |
---|
258 | i2p.launch = mock.Mock(return_value=handler) |
---|
259 | reactor = object() |
---|
260 | |
---|
261 | with mock_i2p(i2p): |
---|
262 | p = i2p_provider.create(reactor, |
---|
263 | FakeConfig(launch=True, |
---|
264 | **{"i2p.configdir": "configdir", |
---|
265 | "i2p.executable": "myi2p", |
---|
266 | })) |
---|
267 | h = p.get_i2p_handler() |
---|
268 | self.assertIs(h, handler) |
---|
269 | i2p.launch.assert_called_with(i2p_configdir="configdir", i2p_binary="myi2p") |
---|
270 | |
---|
271 | def test_handler_configdir(self): |
---|
272 | i2p = mock.Mock() |
---|
273 | handler = object() |
---|
274 | i2p.local_i2p = mock.Mock(return_value=handler) |
---|
275 | reactor = object() |
---|
276 | |
---|
277 | with mock_i2p(i2p): |
---|
278 | p = i2p_provider.create(reactor, |
---|
279 | FakeConfig(**{"i2p.configdir": "configdir"})) |
---|
280 | h = p.get_i2p_handler() |
---|
281 | i2p.local_i2p.assert_called_with("configdir") |
---|
282 | self.assertIs(h, handler) |
---|
283 | |
---|
284 | def test_handler_launch_executable(self): |
---|
285 | i2p = mock.Mock() |
---|
286 | handler = object() |
---|
287 | i2p.launch = mock.Mock(return_value=handler) |
---|
288 | reactor = object() |
---|
289 | |
---|
290 | with mock_i2p(i2p): |
---|
291 | p = i2p_provider.create(reactor, |
---|
292 | FakeConfig(launch=True, |
---|
293 | **{"i2p.executable": "myi2p"})) |
---|
294 | h = p.get_i2p_handler() |
---|
295 | self.assertIs(h, handler) |
---|
296 | i2p.launch.assert_called_with(i2p_configdir=None, i2p_binary="myi2p") |
---|
297 | |
---|
298 | def test_handler_default(self): |
---|
299 | i2p = mock.Mock() |
---|
300 | handler = object() |
---|
301 | i2p.default = mock.Mock(return_value=handler) |
---|
302 | reactor = object() |
---|
303 | |
---|
304 | with mock_i2p(i2p): |
---|
305 | p = i2p_provider.create(reactor, FakeConfig()) |
---|
306 | h = p.get_i2p_handler() |
---|
307 | self.assertIs(h, handler) |
---|
308 | i2p.default.assert_called_with(reactor, keyfile=None) |
---|
309 | |
---|
310 | class ProviderListener(unittest.TestCase): |
---|
311 | def test_listener(self): |
---|
312 | """Does the I2P Provider object's get_listener() method correctly |
---|
313 | convert the [i2p] section of tahoe.cfg into an |
---|
314 | endpoint/descriptor? |
---|
315 | """ |
---|
316 | |
---|
317 | i2p = mock.Mock() |
---|
318 | handler = object() |
---|
319 | i2p.local_i2p = mock.Mock(return_value=handler) |
---|
320 | reactor = object() |
---|
321 | |
---|
322 | privkeyfile = os.path.join("private", "i2p_dest.privkey") |
---|
323 | with mock_i2p(i2p): |
---|
324 | p = i2p_provider.create(reactor, |
---|
325 | FakeConfig(**{ |
---|
326 | "i2p.configdir": "configdir", |
---|
327 | "sam.port": "good:port", |
---|
328 | "dest": "true", |
---|
329 | "dest.port": "3457", |
---|
330 | "dest.private_key_file": privkeyfile, |
---|
331 | })) |
---|
332 | endpoint_or_description = p.get_listener() |
---|
333 | self.assertEqual(endpoint_or_description, |
---|
334 | "i2p:%s:3457:api=SAM:apiEndpoint=good\\:port" % privkeyfile) |
---|
335 | |
---|
336 | class Provider_CheckI2PConfig(unittest.TestCase): |
---|
337 | def test_default(self): |
---|
338 | # default config doesn't start an I2P service, so it should be |
---|
339 | # happy both with and without txi2p |
---|
340 | |
---|
341 | p = i2p_provider.create("reactor", FakeConfig()) |
---|
342 | p.check_dest_config() |
---|
343 | |
---|
344 | with mock_txi2p(None): |
---|
345 | p = i2p_provider.create("reactor", FakeConfig()) |
---|
346 | p.check_dest_config() |
---|
347 | |
---|
348 | def test_no_txi2p(self): |
---|
349 | with mock_txi2p(None): |
---|
350 | with self.assertRaises(ValueError) as ctx: |
---|
351 | i2p_provider.create("reactor", FakeConfig(dest=True)) |
---|
352 | self.assertEqual( |
---|
353 | str(ctx.exception), |
---|
354 | "Cannot create I2P Destination without txi2p. " |
---|
355 | "Please 'pip install tahoe-lafs[i2p]' to fix." |
---|
356 | ) |
---|
357 | |
---|
358 | def test_no_launch_no_control(self): |
---|
359 | with self.assertRaises(ValueError) as ctx: |
---|
360 | i2p_provider.create("reactor", FakeConfig(dest=True)) |
---|
361 | self.assertEqual( |
---|
362 | str(ctx.exception), |
---|
363 | "[i2p] dest = true, but we have neither " |
---|
364 | "sam.port= nor launch=true nor configdir=" |
---|
365 | ) |
---|
366 | |
---|
367 | def test_missing_keys(self): |
---|
368 | with self.assertRaises(ValueError) as ctx: |
---|
369 | i2p_provider.create("reactor", |
---|
370 | FakeConfig( |
---|
371 | dest=True, |
---|
372 | **{"sam.port": "x", |
---|
373 | } |
---|
374 | )) |
---|
375 | self.assertEqual(str(ctx.exception), "[i2p] dest = true, " |
---|
376 | "but dest.port= is missing") |
---|
377 | |
---|
378 | with self.assertRaises(ValueError) as ctx: |
---|
379 | i2p_provider.create("reactor", |
---|
380 | FakeConfig(dest=True, |
---|
381 | **{"sam.port": "x", |
---|
382 | "dest.port": "y", |
---|
383 | })) |
---|
384 | self.assertEqual( |
---|
385 | str(ctx.exception), |
---|
386 | "[i2p] dest = true, " |
---|
387 | "but dest.private_key_file= is missing" |
---|
388 | ) |
---|
389 | |
---|
390 | def test_launch_not_implemented(self): |
---|
391 | with self.assertRaises(NotImplementedError) as ctx: |
---|
392 | i2p_provider.create("reactor", |
---|
393 | FakeConfig(dest=True, launch=True, |
---|
394 | **{"dest.port": "x", |
---|
395 | "dest.private_key_file": "y", |
---|
396 | })) |
---|
397 | self.assertEqual( |
---|
398 | str(ctx.exception), |
---|
399 | "[i2p] launch is under development." |
---|
400 | ) |
---|
401 | |
---|
402 | def test_ok(self): |
---|
403 | i2p_provider.create( |
---|
404 | "reactor", |
---|
405 | FakeConfig( |
---|
406 | dest=True, **{ |
---|
407 | "sam.port": "x", |
---|
408 | "dest.port": "y", |
---|
409 | "dest.private_key_file": "z", |
---|
410 | } |
---|
411 | ) |
---|
412 | ) |
---|