1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from six import ensure_binary, ensure_text |
---|
6 | |
---|
7 | import os, re, itertools |
---|
8 | from base64 import b32decode |
---|
9 | import json |
---|
10 | from operator import ( |
---|
11 | setitem, |
---|
12 | ) |
---|
13 | from functools import ( |
---|
14 | partial, |
---|
15 | ) |
---|
16 | |
---|
17 | from testtools.matchers import ( |
---|
18 | Is, |
---|
19 | ) |
---|
20 | |
---|
21 | from twisted.internet import defer, address |
---|
22 | from twisted.python import log |
---|
23 | from twisted.python.filepath import FilePath |
---|
24 | from twisted.web.template import flattenString |
---|
25 | |
---|
26 | from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue |
---|
27 | from twisted.application import service |
---|
28 | from allmydata.crypto import ed25519 |
---|
29 | from allmydata.crypto.util import remove_prefix |
---|
30 | from allmydata.crypto.error import BadSignature |
---|
31 | from allmydata.interfaces import InsufficientVersionError |
---|
32 | from allmydata.introducer.client import IntroducerClient |
---|
33 | from allmydata.introducer.server import IntroducerService, FurlFileConflictError |
---|
34 | from allmydata.introducer.common import get_tubid_string_from_ann, \ |
---|
35 | get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \ |
---|
36 | UnknownKeyError |
---|
37 | from allmydata.node import ( |
---|
38 | create_node_dir, |
---|
39 | read_config, |
---|
40 | ) |
---|
41 | # the "new way" to create introducer node instance |
---|
42 | from allmydata.introducer.server import create_introducer |
---|
43 | from allmydata.web import introweb |
---|
44 | from allmydata.client import ( |
---|
45 | create_client, |
---|
46 | create_introducer_clients, |
---|
47 | ) |
---|
48 | from allmydata.util import pollmixin, idlib, fileutil, yamlutil |
---|
49 | from allmydata.util.iputil import ( |
---|
50 | listenOnUnused, |
---|
51 | ) |
---|
52 | from allmydata.scripts.common import ( |
---|
53 | write_introducer, |
---|
54 | ) |
---|
55 | import allmydata.test.common_util as testutil |
---|
56 | from .common import ( |
---|
57 | SyncTestCase, |
---|
58 | AsyncTestCase, |
---|
59 | AsyncBrokenTestCase, |
---|
60 | ) |
---|
61 | |
---|
62 | class LoggingMultiService(service.MultiService): |
---|
63 | def log(self, msg, **kw): |
---|
64 | log.msg(msg, **kw) |
---|
65 | |
---|
66 | class Node(testutil.SignalMixin, testutil.ReallyEqualMixin, AsyncTestCase): |
---|
67 | |
---|
68 | def test_backwards_compat_import(self): |
---|
69 | # for old introducer .tac files |
---|
70 | from allmydata.introducer import IntroducerNode |
---|
71 | IntroducerNode # pyflakes |
---|
72 | |
---|
73 | @defer.inlineCallbacks |
---|
74 | def test_create(self): |
---|
75 | """ |
---|
76 | A brand new introducer creates its config dir |
---|
77 | """ |
---|
78 | basedir = "introducer.IntroducerNode.test_create" |
---|
79 | yield create_introducer(basedir) |
---|
80 | self.assertTrue(os.path.exists(basedir)) |
---|
81 | |
---|
82 | def test_introducer_clients_unloadable(self): |
---|
83 | """ |
---|
84 | ``create_introducer_clients`` raises ``EnvironmentError`` if |
---|
85 | ``introducers.yaml`` exists but we can't read it. |
---|
86 | """ |
---|
87 | basedir = u"introducer.IntroducerNode.test_introducer_clients_unloadable" |
---|
88 | os.mkdir(basedir) |
---|
89 | os.mkdir(os.path.join(basedir, u"private")) |
---|
90 | yaml_fname = os.path.join(basedir, u"private", u"introducers.yaml") |
---|
91 | with open(yaml_fname, 'w') as f: |
---|
92 | f.write(u'---\n') |
---|
93 | os.chmod(yaml_fname, 0o000) |
---|
94 | self.addCleanup(lambda: os.chmod(yaml_fname, 0o700)) |
---|
95 | |
---|
96 | config = read_config(basedir, "portnum") |
---|
97 | with self.assertRaises(EnvironmentError): |
---|
98 | create_introducer_clients(config, Tub()) |
---|
99 | |
---|
100 | @defer.inlineCallbacks |
---|
101 | def test_furl(self): |
---|
102 | basedir = "introducer.IntroducerNode.test_furl" |
---|
103 | create_node_dir(basedir, "testing") |
---|
104 | public_fn = os.path.join(basedir, "introducer.furl") |
---|
105 | private_fn = os.path.join(basedir, "private", "introducer.furl") |
---|
106 | |
---|
107 | q1 = yield create_introducer(basedir) |
---|
108 | del q1 |
---|
109 | # new nodes create unguessable furls in private/introducer.furl |
---|
110 | ifurl = fileutil.read(private_fn, mode="r") |
---|
111 | self.failUnless(ifurl) |
---|
112 | ifurl = ifurl.strip() |
---|
113 | self.failIf(ifurl.endswith("/introducer"), ifurl) |
---|
114 | |
---|
115 | # old nodes created guessable furls in BASEDIR/introducer.furl |
---|
116 | guessable = ifurl[:ifurl.rfind("/")] + "/introducer" |
---|
117 | fileutil.write(public_fn, guessable+"\n", mode="w") # text |
---|
118 | |
---|
119 | # if we see both files, throw an error |
---|
120 | with self.assertRaises(FurlFileConflictError): |
---|
121 | yield create_introducer(basedir) |
---|
122 | |
---|
123 | # when we see only the public one, move it to private/ and use |
---|
124 | # the existing furl instead of creating a new one |
---|
125 | os.unlink(private_fn) |
---|
126 | |
---|
127 | q2 = yield create_introducer(basedir) |
---|
128 | del q2 |
---|
129 | self.failIf(os.path.exists(public_fn)) |
---|
130 | ifurl2 = fileutil.read(private_fn, mode="r") |
---|
131 | self.failUnless(ifurl2) |
---|
132 | self.failUnlessEqual(ifurl2.strip(), guessable) |
---|
133 | |
---|
134 | @defer.inlineCallbacks |
---|
135 | def test_web_static(self): |
---|
136 | basedir = u"introducer.Node.test_web_static" |
---|
137 | create_node_dir(basedir, "testing") |
---|
138 | fileutil.write(os.path.join(basedir, "tahoe.cfg"), |
---|
139 | "[node]\n" + |
---|
140 | "web.port = tcp:0:interface=127.0.0.1\n" + |
---|
141 | "web.static = relative\n") |
---|
142 | c = yield create_introducer(basedir) |
---|
143 | w = c.getServiceNamed("webish") |
---|
144 | abs_basedir = fileutil.abspath_expanduser_unicode(basedir) |
---|
145 | expected = fileutil.abspath_expanduser_unicode(u"relative", abs_basedir) |
---|
146 | self.failUnlessReallyEqual(w.staticdir, expected) |
---|
147 | |
---|
148 | |
---|
149 | class ServiceMixin(object): |
---|
150 | def setUp(self): |
---|
151 | self.parent = LoggingMultiService() |
---|
152 | self.parent.startService() |
---|
153 | return super(ServiceMixin, self).setUp() |
---|
154 | |
---|
155 | def tearDown(self): |
---|
156 | log.msg("TestIntroducer.tearDown") |
---|
157 | d = defer.maybeDeferred(super(ServiceMixin, self).tearDown) |
---|
158 | d.addCallback(lambda res: self.parent.stopService()) |
---|
159 | d.addCallback(flushEventualQueue) |
---|
160 | return d |
---|
161 | |
---|
162 | class Introducer(ServiceMixin, AsyncTestCase): |
---|
163 | def test_create(self): |
---|
164 | ic = IntroducerClient(None, "introducer.furl", u"my_nickname", |
---|
165 | "my_version", "oldest_version", fakeseq, |
---|
166 | FilePath(self.mktemp())) |
---|
167 | self.failUnless(isinstance(ic, IntroducerClient)) |
---|
168 | |
---|
169 | def test_listen(self): |
---|
170 | i = IntroducerService() |
---|
171 | i.setServiceParent(self.parent) |
---|
172 | |
---|
173 | |
---|
174 | def fakeseq(): |
---|
175 | return 1, "nonce" |
---|
176 | |
---|
177 | seqnum_counter = itertools.count(1) |
---|
178 | def realseq(): |
---|
179 | return next(seqnum_counter), str(os.randint(1,100000)) |
---|
180 | |
---|
181 | def make_ann(furl): |
---|
182 | ann = { "anonymous-storage-FURL": furl, |
---|
183 | "permutation-seed-base32": get_tubid_string(furl) } |
---|
184 | return ann |
---|
185 | |
---|
186 | def make_ann_t(ic, furl, privkey, seqnum): |
---|
187 | assert privkey |
---|
188 | ann_d = ic.create_announcement_dict("storage", make_ann(furl)) |
---|
189 | ann_d["seqnum"] = seqnum |
---|
190 | ann_d["nonce"] = "nonce" |
---|
191 | ann_t = sign_to_foolscap(ann_d, privkey) |
---|
192 | return ann_t |
---|
193 | |
---|
194 | class Client(AsyncTestCase): |
---|
195 | def test_duplicate_receive_v2(self): |
---|
196 | ic1 = IntroducerClient(None, |
---|
197 | "introducer.furl", u"my_nickname", |
---|
198 | "ver23", "oldest_version", fakeseq, |
---|
199 | FilePath(self.mktemp())) |
---|
200 | # we use a second client just to create a different-looking |
---|
201 | # announcement |
---|
202 | ic2 = IntroducerClient(None, |
---|
203 | "introducer.furl", u"my_nickname", |
---|
204 | "ver24","oldest_version",fakeseq, |
---|
205 | FilePath(self.mktemp())) |
---|
206 | announcements = [] |
---|
207 | def _received(key_s, ann): |
---|
208 | announcements.append( (key_s, ann) ) |
---|
209 | ic1.subscribe_to("storage", _received) |
---|
210 | furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp" |
---|
211 | furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp" |
---|
212 | furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo" |
---|
213 | |
---|
214 | private_key, public_key = ed25519.create_signing_keypair() |
---|
215 | public_key_str = ed25519.string_from_verifying_key(public_key) |
---|
216 | pubkey_s = remove_prefix(public_key_str, b"pub-") |
---|
217 | |
---|
218 | # ann1: ic1, furl1 |
---|
219 | # ann1a: ic1, furl1a (same SturdyRef, different connection hints) |
---|
220 | # ann1b: ic2, furl1 |
---|
221 | # ann2: ic2, furl2 |
---|
222 | |
---|
223 | self.ann1 = make_ann_t(ic1, furl1, private_key, seqnum=10) |
---|
224 | self.ann1old = make_ann_t(ic1, furl1, private_key, seqnum=9) |
---|
225 | self.ann1noseqnum = make_ann_t(ic1, furl1, private_key, seqnum=None) |
---|
226 | self.ann1b = make_ann_t(ic2, furl1, private_key, seqnum=11) |
---|
227 | self.ann1a = make_ann_t(ic1, furl1a, private_key, seqnum=12) |
---|
228 | self.ann2 = make_ann_t(ic2, furl2, private_key, seqnum=13) |
---|
229 | |
---|
230 | ic1.remote_announce_v2([self.ann1]) # queues eventual-send |
---|
231 | d = fireEventually() |
---|
232 | def _then1(ign): |
---|
233 | self.failUnlessEqual(len(announcements), 1) |
---|
234 | key_s,ann = announcements[0] |
---|
235 | self.failUnlessEqual(key_s, pubkey_s) |
---|
236 | self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1) |
---|
237 | self.failUnlessEqual(ann["my-version"], "ver23") |
---|
238 | d.addCallback(_then1) |
---|
239 | |
---|
240 | # now send a duplicate announcement. This should not fire the |
---|
241 | # subscriber |
---|
242 | d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1])) |
---|
243 | d.addCallback(fireEventually) |
---|
244 | def _then2(ign): |
---|
245 | self.failUnlessEqual(len(announcements), 1) |
---|
246 | d.addCallback(_then2) |
---|
247 | |
---|
248 | # an older announcement shouldn't fire the subscriber either |
---|
249 | d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1old])) |
---|
250 | d.addCallback(fireEventually) |
---|
251 | def _then2a(ign): |
---|
252 | self.failUnlessEqual(len(announcements), 1) |
---|
253 | d.addCallback(_then2a) |
---|
254 | |
---|
255 | # announcement with no seqnum cannot replace one with-seqnum |
---|
256 | d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1noseqnum])) |
---|
257 | d.addCallback(fireEventually) |
---|
258 | def _then2b(ign): |
---|
259 | self.failUnlessEqual(len(announcements), 1) |
---|
260 | d.addCallback(_then2b) |
---|
261 | |
---|
262 | # and a replacement announcement: same FURL, new other stuff. The |
---|
263 | # subscriber *should* be fired. |
---|
264 | d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b])) |
---|
265 | d.addCallback(fireEventually) |
---|
266 | def _then3(ign): |
---|
267 | self.failUnlessEqual(len(announcements), 2) |
---|
268 | key_s,ann = announcements[-1] |
---|
269 | self.failUnlessEqual(key_s, pubkey_s) |
---|
270 | self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1) |
---|
271 | self.failUnlessEqual(ann["my-version"], "ver24") |
---|
272 | d.addCallback(_then3) |
---|
273 | |
---|
274 | # and a replacement announcement with a different FURL (it uses |
---|
275 | # different connection hints) |
---|
276 | d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a])) |
---|
277 | d.addCallback(fireEventually) |
---|
278 | def _then4(ign): |
---|
279 | self.failUnlessEqual(len(announcements), 3) |
---|
280 | key_s,ann = announcements[-1] |
---|
281 | self.failUnlessEqual(key_s, pubkey_s) |
---|
282 | self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a) |
---|
283 | self.failUnlessEqual(ann["my-version"], "ver23") |
---|
284 | d.addCallback(_then4) |
---|
285 | |
---|
286 | # now add a new subscription, which should be called with the |
---|
287 | # backlog. The introducer only records one announcement per index, so |
---|
288 | # the backlog will only have the latest message. |
---|
289 | announcements2 = [] |
---|
290 | def _received2(key_s, ann): |
---|
291 | announcements2.append( (key_s, ann) ) |
---|
292 | d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2)) |
---|
293 | d.addCallback(fireEventually) |
---|
294 | def _then5(ign): |
---|
295 | self.failUnlessEqual(len(announcements2), 1) |
---|
296 | key_s,ann = announcements2[-1] |
---|
297 | self.failUnlessEqual(key_s, pubkey_s) |
---|
298 | self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a) |
---|
299 | self.failUnlessEqual(ann["my-version"], "ver23") |
---|
300 | d.addCallback(_then5) |
---|
301 | return d |
---|
302 | |
---|
303 | class Server(AsyncTestCase): |
---|
304 | def test_duplicate(self): |
---|
305 | i = IntroducerService() |
---|
306 | ic1 = IntroducerClient(None, |
---|
307 | "introducer.furl", u"my_nickname", |
---|
308 | "ver23", "oldest_version", realseq, |
---|
309 | FilePath(self.mktemp())) |
---|
310 | furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp" |
---|
311 | |
---|
312 | private_key, _ = ed25519.create_signing_keypair() |
---|
313 | |
---|
314 | ann1 = make_ann_t(ic1, furl1, private_key, seqnum=10) |
---|
315 | ann1_old = make_ann_t(ic1, furl1, private_key, seqnum=9) |
---|
316 | ann1_new = make_ann_t(ic1, furl1, private_key, seqnum=11) |
---|
317 | ann1_noseqnum = make_ann_t(ic1, furl1, private_key, seqnum=None) |
---|
318 | ann1_badseqnum = make_ann_t(ic1, furl1, private_key, seqnum="not an int") |
---|
319 | |
---|
320 | i.remote_publish_v2(ann1, None) |
---|
321 | all = i.get_announcements() |
---|
322 | self.failUnlessEqual(len(all), 1) |
---|
323 | self.failUnlessEqual(all[0].announcement["seqnum"], 10) |
---|
324 | self.failUnlessEqual(i._debug_counts["inbound_message"], 1) |
---|
325 | self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 0) |
---|
326 | self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0) |
---|
327 | self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0) |
---|
328 | self.failUnlessEqual(i._debug_counts["inbound_update"], 0) |
---|
329 | |
---|
330 | i.remote_publish_v2(ann1, None) |
---|
331 | all = i.get_announcements() |
---|
332 | self.failUnlessEqual(len(all), 1) |
---|
333 | self.failUnlessEqual(all[0].announcement["seqnum"], 10) |
---|
334 | self.failUnlessEqual(i._debug_counts["inbound_message"], 2) |
---|
335 | self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1) |
---|
336 | self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0) |
---|
337 | self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0) |
---|
338 | self.failUnlessEqual(i._debug_counts["inbound_update"], 0) |
---|
339 | |
---|
340 | i.remote_publish_v2(ann1_old, None) |
---|
341 | all = i.get_announcements() |
---|
342 | self.failUnlessEqual(len(all), 1) |
---|
343 | self.failUnlessEqual(all[0].announcement["seqnum"], 10) |
---|
344 | self.failUnlessEqual(i._debug_counts["inbound_message"], 3) |
---|
345 | self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1) |
---|
346 | self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0) |
---|
347 | self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1) |
---|
348 | self.failUnlessEqual(i._debug_counts["inbound_update"], 0) |
---|
349 | |
---|
350 | i.remote_publish_v2(ann1_new, None) |
---|
351 | all = i.get_announcements() |
---|
352 | self.failUnlessEqual(len(all), 1) |
---|
353 | self.failUnlessEqual(all[0].announcement["seqnum"], 11) |
---|
354 | self.failUnlessEqual(i._debug_counts["inbound_message"], 4) |
---|
355 | self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1) |
---|
356 | self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0) |
---|
357 | self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1) |
---|
358 | self.failUnlessEqual(i._debug_counts["inbound_update"], 1) |
---|
359 | |
---|
360 | i.remote_publish_v2(ann1_noseqnum, None) |
---|
361 | all = i.get_announcements() |
---|
362 | self.failUnlessEqual(len(all), 1) |
---|
363 | self.failUnlessEqual(all[0].announcement["seqnum"], 11) |
---|
364 | self.failUnlessEqual(i._debug_counts["inbound_message"], 5) |
---|
365 | self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1) |
---|
366 | self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 1) |
---|
367 | self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1) |
---|
368 | self.failUnlessEqual(i._debug_counts["inbound_update"], 1) |
---|
369 | |
---|
370 | i.remote_publish_v2(ann1_badseqnum, None) |
---|
371 | all = i.get_announcements() |
---|
372 | self.failUnlessEqual(len(all), 1) |
---|
373 | self.failUnlessEqual(all[0].announcement["seqnum"], 11) |
---|
374 | self.failUnlessEqual(i._debug_counts["inbound_message"], 6) |
---|
375 | self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1) |
---|
376 | self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 2) |
---|
377 | self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1) |
---|
378 | self.failUnlessEqual(i._debug_counts["inbound_update"], 1) |
---|
379 | |
---|
380 | |
---|
381 | NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE |
---|
382 | |
---|
383 | class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): |
---|
384 | |
---|
385 | def create_tub(self, portnum=None): |
---|
386 | tubfile = os.path.join(self.basedir, "tub.pem") |
---|
387 | self.central_tub = tub = Tub(certFile=tubfile) |
---|
388 | #tub.setOption("logLocalFailures", True) |
---|
389 | #tub.setOption("logRemoteFailures", True) |
---|
390 | tub.setOption("expose-remote-exception-types", False) |
---|
391 | tub.setServiceParent(self.parent) |
---|
392 | self.central_portnum = listenOnUnused(tub, portnum) |
---|
393 | |
---|
394 | class Queue(SystemTestMixin, AsyncTestCase): |
---|
395 | def test_queue_until_connected(self): |
---|
396 | self.basedir = "introducer/QueueUntilConnected/queued" |
---|
397 | os.makedirs(self.basedir) |
---|
398 | self.create_tub() |
---|
399 | introducer = IntroducerService() |
---|
400 | introducer.setServiceParent(self.parent) |
---|
401 | iff = os.path.join(self.basedir, "introducer.furl") |
---|
402 | ifurl = self.central_tub.registerReference(introducer, furlFile=iff) |
---|
403 | tub2 = Tub() |
---|
404 | tub2.setServiceParent(self.parent) |
---|
405 | c = IntroducerClient(tub2, ifurl, |
---|
406 | u"nickname", "version", "oldest", fakeseq, |
---|
407 | FilePath(self.mktemp())) |
---|
408 | furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short") |
---|
409 | private_key, _ = ed25519.create_signing_keypair() |
---|
410 | |
---|
411 | d = introducer.disownServiceParent() |
---|
412 | |
---|
413 | def _offline(ign): |
---|
414 | # now that the introducer server is offline, create a client and |
---|
415 | # publish some messages |
---|
416 | c.setServiceParent(self.parent) # this starts the reconnector |
---|
417 | c.publish("storage", make_ann(furl1), private_key) |
---|
418 | |
---|
419 | introducer.setServiceParent(self.parent) # restart the server |
---|
420 | # now wait for the messages to be delivered |
---|
421 | def _got_announcement(): |
---|
422 | return bool(introducer.get_announcements()) |
---|
423 | return self.poll(_got_announcement) |
---|
424 | |
---|
425 | d.addCallback(_offline) |
---|
426 | |
---|
427 | def _done(ign): |
---|
428 | v = introducer.get_announcements()[0] |
---|
429 | furl = v.announcement["anonymous-storage-FURL"] |
---|
430 | self.failUnlessEqual(furl, furl1) |
---|
431 | d.addCallback(_done) |
---|
432 | |
---|
433 | # now let the ack get back |
---|
434 | def _wait_until_idle(ign): |
---|
435 | def _idle(): |
---|
436 | if c._debug_outstanding: |
---|
437 | return False |
---|
438 | if introducer._debug_outstanding: |
---|
439 | return False |
---|
440 | return True |
---|
441 | return self.poll(_idle) |
---|
442 | |
---|
443 | d.addCallback(_wait_until_idle) |
---|
444 | return d |
---|
445 | |
---|
446 | |
---|
447 | class SystemTest(SystemTestMixin, AsyncTestCase): |
---|
448 | |
---|
449 | def do_system_test(self): |
---|
450 | self.create_tub() |
---|
451 | introducer = IntroducerService() |
---|
452 | introducer.setServiceParent(self.parent) |
---|
453 | iff = os.path.join(self.basedir, "introducer.furl") |
---|
454 | tub = self.central_tub |
---|
455 | ifurl = self.central_tub.registerReference(introducer, furlFile=iff) |
---|
456 | self.introducer_furl = ifurl |
---|
457 | |
---|
458 | # we have 5 clients who publish themselves as storage servers, and a |
---|
459 | # sixth which does which not. All 6 clients subscriber to hear about |
---|
460 | # storage. When the connections are fully established, all six nodes |
---|
461 | # should have 5 connections each. |
---|
462 | NUM_STORAGE = 5 |
---|
463 | NUM_CLIENTS = 6 |
---|
464 | |
---|
465 | clients = [] |
---|
466 | tubs = {} |
---|
467 | received_announcements = {} |
---|
468 | subscribing_clients = [] |
---|
469 | publishing_clients = [] |
---|
470 | printable_serverids = {} |
---|
471 | self.the_introducer = introducer |
---|
472 | privkeys = {} |
---|
473 | pubkeys = {} |
---|
474 | expected_announcements = [0 for c in range(NUM_CLIENTS)] |
---|
475 | |
---|
476 | for i in range(NUM_CLIENTS): |
---|
477 | tub = Tub() |
---|
478 | #tub.setOption("logLocalFailures", True) |
---|
479 | #tub.setOption("logRemoteFailures", True) |
---|
480 | tub.setOption("expose-remote-exception-types", False) |
---|
481 | tub.setServiceParent(self.parent) |
---|
482 | listenOnUnused(tub) |
---|
483 | log.msg("creating client %d: %s" % (i, tub.getShortTubID())) |
---|
484 | c = IntroducerClient(tub, self.introducer_furl, |
---|
485 | NICKNAME % str(i), |
---|
486 | "version", "oldest", |
---|
487 | fakeseq, |
---|
488 | FilePath(self.mktemp())) |
---|
489 | received_announcements[c] = {} |
---|
490 | def got(key_s_or_tubid, ann, announcements): |
---|
491 | index = key_s_or_tubid or get_tubid_string_from_ann(ann) |
---|
492 | announcements[index] = ann |
---|
493 | c.subscribe_to("storage", got, received_announcements[c]) |
---|
494 | subscribing_clients.append(c) |
---|
495 | expected_announcements[i] += 1 # all expect a 'storage' announcement |
---|
496 | |
---|
497 | node_furl = tub.registerReference(Referenceable()) |
---|
498 | private_key, public_key = ed25519.create_signing_keypair() |
---|
499 | public_key_str = ed25519.string_from_verifying_key(public_key) |
---|
500 | privkeys[i] = private_key |
---|
501 | pubkeys[i] = public_key_str |
---|
502 | |
---|
503 | if i < NUM_STORAGE: |
---|
504 | # sign all announcements |
---|
505 | c.publish("storage", make_ann(node_furl), private_key) |
---|
506 | printable_serverids[i] = remove_prefix(public_key_str, b"pub-") |
---|
507 | publishing_clients.append(c) |
---|
508 | else: |
---|
509 | # the last one does not publish anything |
---|
510 | pass |
---|
511 | |
---|
512 | if i == 2: |
---|
513 | # also publish something that nobody cares about |
---|
514 | boring_furl = tub.registerReference(Referenceable()) |
---|
515 | c.publish("boring", make_ann(boring_furl), private_key) |
---|
516 | |
---|
517 | c.setServiceParent(self.parent) |
---|
518 | clients.append(c) |
---|
519 | tubs[c] = tub |
---|
520 | |
---|
521 | def _wait_for_connected(ign): |
---|
522 | def _connected(): |
---|
523 | for c in clients: |
---|
524 | if not c.connected_to_introducer(): |
---|
525 | return False |
---|
526 | return True |
---|
527 | return self.poll(_connected) |
---|
528 | |
---|
529 | # we watch the clients to determine when the system has settled down. |
---|
530 | # Then we can look inside the server to assert things about its |
---|
531 | # state. |
---|
532 | |
---|
533 | def _wait_for_expected_announcements(ign): |
---|
534 | def _got_expected_announcements(): |
---|
535 | for i,c in enumerate(subscribing_clients): |
---|
536 | if len(received_announcements[c]) < expected_announcements[i]: |
---|
537 | return False |
---|
538 | return True |
---|
539 | return self.poll(_got_expected_announcements) |
---|
540 | |
---|
541 | # before shutting down any Tub, we'd like to know that there are no |
---|
542 | # messages outstanding |
---|
543 | |
---|
544 | def _wait_until_idle(ign): |
---|
545 | def _idle(): |
---|
546 | for c in subscribing_clients + publishing_clients: |
---|
547 | if c._debug_outstanding: |
---|
548 | return False |
---|
549 | if self.the_introducer._debug_outstanding: |
---|
550 | return False |
---|
551 | return True |
---|
552 | return self.poll(_idle) |
---|
553 | |
---|
554 | d = defer.succeed(None) |
---|
555 | d.addCallback(_wait_for_connected) |
---|
556 | d.addCallback(_wait_for_expected_announcements) |
---|
557 | d.addCallback(_wait_until_idle) |
---|
558 | |
---|
559 | def _check1(res): |
---|
560 | log.msg("doing _check1") |
---|
561 | dc = self.the_introducer._debug_counts |
---|
562 | # each storage server publishes a record. There is also one |
---|
563 | # "boring" |
---|
564 | self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+1) |
---|
565 | self.failUnlessEqual(dc["inbound_duplicate"], 0) |
---|
566 | self.failUnlessEqual(dc["inbound_update"], 0) |
---|
567 | self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) |
---|
568 | # the number of outbound messages is tricky.. I think it depends |
---|
569 | # upon a race between the publish and the subscribe messages. |
---|
570 | self.failUnless(dc["outbound_message"] > 0) |
---|
571 | # each client subscribes to "storage", and each server publishes |
---|
572 | self.failUnlessEqual(dc["outbound_announcements"], |
---|
573 | NUM_STORAGE*NUM_CLIENTS) |
---|
574 | |
---|
575 | for c in subscribing_clients: |
---|
576 | cdc = c._debug_counts |
---|
577 | self.failUnless(cdc["inbound_message"]) |
---|
578 | self.failUnlessEqual(cdc["inbound_announcement"], |
---|
579 | NUM_STORAGE) |
---|
580 | self.failUnlessEqual(cdc["wrong_service"], 0) |
---|
581 | self.failUnlessEqual(cdc["duplicate_announcement"], 0) |
---|
582 | self.failUnlessEqual(cdc["update"], 0) |
---|
583 | self.failUnlessEqual(cdc["new_announcement"], |
---|
584 | NUM_STORAGE) |
---|
585 | anns = received_announcements[c] |
---|
586 | self.failUnlessEqual(len(anns), NUM_STORAGE) |
---|
587 | |
---|
588 | serverid0 = printable_serverids[0] |
---|
589 | ann = anns[serverid0] |
---|
590 | nick = ann["nickname"] |
---|
591 | self.assertIsInstance(nick, str) |
---|
592 | self.failUnlessEqual(nick, NICKNAME % "0") |
---|
593 | for c in publishing_clients: |
---|
594 | cdc = c._debug_counts |
---|
595 | expected = 1 |
---|
596 | if c in [clients[2], # boring |
---|
597 | ]: |
---|
598 | expected = 2 |
---|
599 | self.failUnlessEqual(cdc["outbound_message"], expected) |
---|
600 | # now check the web status, make sure it renders without error |
---|
601 | ir = introweb.IntroducerRoot(self.parent) |
---|
602 | self.parent.nodeid = b"NODEID" |
---|
603 | log.msg("_check1 done") |
---|
604 | return flattenString(None, ir._create_element()) |
---|
605 | d.addCallback(_check1) |
---|
606 | |
---|
607 | def _check2(flattened_bytes): |
---|
608 | text = flattened_bytes.decode("utf-8") |
---|
609 | self.assertIn(NICKNAME % "0", text) # a v2 client |
---|
610 | self.assertIn(NICKNAME % "1", text) # another v2 client |
---|
611 | for i in range(NUM_STORAGE): |
---|
612 | self.assertIn(ensure_text(printable_serverids[i]), text, |
---|
613 | (i,printable_serverids[i],text)) |
---|
614 | # make sure there isn't a double-base32ed string too |
---|
615 | self.assertNotIn(idlib.nodeid_b2a(printable_serverids[i]), text, |
---|
616 | (i,printable_serverids[i],text)) |
---|
617 | log.msg("_check2 done") |
---|
618 | d.addCallback(_check2) |
---|
619 | |
---|
620 | # force an introducer reconnect, by shutting down the Tub it's using |
---|
621 | # and starting a new Tub (with the old introducer). Everybody should |
---|
622 | # reconnect and republish, but the introducer should ignore the |
---|
623 | # republishes as duplicates. However, because the server doesn't know |
---|
624 | # what each client does and does not know, it will send them a copy |
---|
625 | # of the current announcement table anyway. |
---|
626 | |
---|
627 | d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub")) |
---|
628 | d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) |
---|
629 | |
---|
630 | def _wait_for_introducer_loss(ign): |
---|
631 | def _introducer_lost(): |
---|
632 | for c in clients: |
---|
633 | if c.connected_to_introducer(): |
---|
634 | return False |
---|
635 | return True |
---|
636 | return self.poll(_introducer_lost) |
---|
637 | d.addCallback(_wait_for_introducer_loss) |
---|
638 | |
---|
639 | def _restart_introducer_tub(_ign): |
---|
640 | log.msg("restarting introducer's Tub") |
---|
641 | # reset counters |
---|
642 | for i in range(NUM_CLIENTS): |
---|
643 | c = subscribing_clients[i] |
---|
644 | for k in c._debug_counts: |
---|
645 | c._debug_counts[k] = 0 |
---|
646 | for k in self.the_introducer._debug_counts: |
---|
647 | self.the_introducer._debug_counts[k] = 0 |
---|
648 | expected_announcements[i] += 1 # new 'storage' for everyone |
---|
649 | self.create_tub(self.central_portnum) |
---|
650 | newfurl = self.central_tub.registerReference(self.the_introducer, |
---|
651 | furlFile=iff) |
---|
652 | assert newfurl == self.introducer_furl |
---|
653 | d.addCallback(_restart_introducer_tub) |
---|
654 | |
---|
655 | d.addCallback(_wait_for_connected) |
---|
656 | d.addCallback(_wait_for_expected_announcements) |
---|
657 | d.addCallback(_wait_until_idle) |
---|
658 | d.addCallback(lambda _ign: log.msg(" reconnected")) |
---|
659 | |
---|
660 | # TODO: publish something while the introducer is offline, then |
---|
661 | # confirm it gets delivered when the connection is reestablished |
---|
662 | def _check2(res): |
---|
663 | log.msg("doing _check2") |
---|
664 | # assert that the introducer sent out new messages, one per |
---|
665 | # subscriber |
---|
666 | dc = self.the_introducer._debug_counts |
---|
667 | self.failUnlessEqual(dc["outbound_announcements"], |
---|
668 | NUM_STORAGE*NUM_CLIENTS) |
---|
669 | self.failUnless(dc["outbound_message"] > 0) |
---|
670 | self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) |
---|
671 | for c in subscribing_clients: |
---|
672 | cdc = c._debug_counts |
---|
673 | self.failUnlessEqual(cdc["inbound_message"], 1) |
---|
674 | self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE) |
---|
675 | self.failUnlessEqual(cdc["new_announcement"], 0) |
---|
676 | self.failUnlessEqual(cdc["wrong_service"], 0) |
---|
677 | self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE) |
---|
678 | d.addCallback(_check2) |
---|
679 | |
---|
680 | # Then force an introducer restart, by shutting down the Tub, |
---|
681 | # destroying the old introducer, and starting a new Tub+Introducer. |
---|
682 | # Everybody should reconnect and republish, and the (new) introducer |
---|
683 | # will distribute the new announcements, but the clients should |
---|
684 | # ignore the republishes as duplicates. |
---|
685 | |
---|
686 | d.addCallback(lambda _ign: log.msg("shutting down introducer")) |
---|
687 | d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) |
---|
688 | d.addCallback(_wait_for_introducer_loss) |
---|
689 | d.addCallback(lambda _ign: log.msg("introducer lost")) |
---|
690 | |
---|
691 | def _restart_introducer(_ign): |
---|
692 | log.msg("restarting introducer") |
---|
693 | self.create_tub(self.central_portnum) |
---|
694 | # reset counters |
---|
695 | for i in range(NUM_CLIENTS): |
---|
696 | c = subscribing_clients[i] |
---|
697 | for k in c._debug_counts: |
---|
698 | c._debug_counts[k] = 0 |
---|
699 | expected_announcements[i] += 1 # new 'storage' for everyone |
---|
700 | introducer = IntroducerService() |
---|
701 | self.the_introducer = introducer |
---|
702 | newfurl = self.central_tub.registerReference(self.the_introducer, |
---|
703 | furlFile=iff) |
---|
704 | assert newfurl == self.introducer_furl |
---|
705 | d.addCallback(_restart_introducer) |
---|
706 | |
---|
707 | d.addCallback(_wait_for_connected) |
---|
708 | d.addCallback(_wait_for_expected_announcements) |
---|
709 | d.addCallback(_wait_until_idle) |
---|
710 | |
---|
711 | def _check3(res): |
---|
712 | log.msg("doing _check3") |
---|
713 | dc = self.the_introducer._debug_counts |
---|
714 | self.failUnlessEqual(dc["outbound_announcements"], |
---|
715 | NUM_STORAGE*NUM_CLIENTS) |
---|
716 | self.failUnless(dc["outbound_message"] > 0) |
---|
717 | self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) |
---|
718 | for c in subscribing_clients: |
---|
719 | cdc = c._debug_counts |
---|
720 | self.failUnless(cdc["inbound_message"] > 0) |
---|
721 | self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE) |
---|
722 | self.failUnlessEqual(cdc["new_announcement"], 0) |
---|
723 | self.failUnlessEqual(cdc["wrong_service"], 0) |
---|
724 | self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE) |
---|
725 | |
---|
726 | d.addCallback(_check3) |
---|
727 | return d |
---|
728 | |
---|
729 | |
---|
730 | def test_system_v2_server(self): |
---|
731 | self.basedir = "introducer/SystemTest/system_v2_server" |
---|
732 | os.makedirs(self.basedir) |
---|
733 | return self.do_system_test() |
---|
734 | |
---|
735 | class FakeRemoteReference(object): |
---|
736 | def notifyOnDisconnect(self, *args, **kwargs): pass |
---|
737 | def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y" |
---|
738 | def getPeer(self): return address.IPv4Address("TCP", "remote.example.com", |
---|
739 | 3456) |
---|
740 | |
---|
741 | class ClientInfo(AsyncTestCase): |
---|
742 | def test_client_v2(self): |
---|
743 | introducer = IntroducerService() |
---|
744 | tub = introducer_furl = None |
---|
745 | client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2", |
---|
746 | "my_version", "oldest", |
---|
747 | fakeseq, FilePath(self.mktemp())) |
---|
748 | #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" |
---|
749 | #ann_s = make_ann_t(client_v2, furl1, None, 10) |
---|
750 | #introducer.remote_publish_v2(ann_s, Referenceable()) |
---|
751 | subscriber = FakeRemoteReference() |
---|
752 | introducer.remote_subscribe_v2(subscriber, "storage", |
---|
753 | client_v2._my_subscriber_info) |
---|
754 | subs = introducer.get_subscribers() |
---|
755 | self.failUnlessEqual(len(subs), 1) |
---|
756 | s0 = subs[0] |
---|
757 | self.failUnlessEqual(s0.service_name, "storage") |
---|
758 | self.failUnlessEqual(s0.nickname, NICKNAME % u"v2") |
---|
759 | self.failUnlessEqual(s0.version, "my_version") |
---|
760 | |
---|
761 | |
---|
762 | class Announcements(AsyncTestCase): |
---|
763 | def test_client_v2_signed(self): |
---|
764 | introducer = IntroducerService() |
---|
765 | tub = introducer_furl = None |
---|
766 | client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", |
---|
767 | "my_version", "oldest", |
---|
768 | fakeseq, FilePath(self.mktemp())) |
---|
769 | furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" |
---|
770 | |
---|
771 | private_key, public_key = ed25519.create_signing_keypair() |
---|
772 | public_key_str = remove_prefix(ed25519.string_from_verifying_key(public_key), b"pub-") |
---|
773 | |
---|
774 | ann_t0 = make_ann_t(client_v2, furl1, private_key, 10) |
---|
775 | canary0 = Referenceable() |
---|
776 | introducer.remote_publish_v2(ann_t0, canary0) |
---|
777 | a = introducer.get_announcements() |
---|
778 | self.failUnlessEqual(len(a), 1) |
---|
779 | self.assertThat(a[0].canary, Is(canary0)) |
---|
780 | self.failUnlessEqual(a[0].index, ("storage", public_key_str)) |
---|
781 | self.failUnlessEqual(a[0].nickname, u"nick-v2") |
---|
782 | self.failUnlessEqual(a[0].service_name, "storage") |
---|
783 | self.failUnlessEqual(a[0].version, "my_version") |
---|
784 | self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1) |
---|
785 | |
---|
786 | def _load_cache(self, cache_filepath): |
---|
787 | with cache_filepath.open() as f: |
---|
788 | return yamlutil.safe_load(f) |
---|
789 | |
---|
790 | @defer.inlineCallbacks |
---|
791 | def test_client_cache(self): |
---|
792 | """ |
---|
793 | Announcements received by an introducer client are written to that |
---|
794 | introducer client's cache file. |
---|
795 | """ |
---|
796 | basedir = FilePath("introducer/ClientSeqnums/test_client_cache_1") |
---|
797 | private = basedir.child("private") |
---|
798 | private.makedirs() |
---|
799 | write_introducer(basedir, "default", "nope") |
---|
800 | cache_filepath = basedir.descendant([ |
---|
801 | "private", |
---|
802 | "introducer_default_cache.yaml", |
---|
803 | ]) |
---|
804 | |
---|
805 | # if storage is enabled, the Client will publish its storage server |
---|
806 | # during startup (although the announcement will wait in a queue |
---|
807 | # until the introducer connection is established). To avoid getting |
---|
808 | # confused by this, disable storage. |
---|
809 | with basedir.child("tahoe.cfg").open("w") as f: |
---|
810 | f.write(b"[storage]\n") |
---|
811 | f.write(b"enabled = false\n") |
---|
812 | |
---|
813 | c = yield create_client(basedir.path) |
---|
814 | ic = c.introducer_clients[0] |
---|
815 | private_key, public_key = ed25519.create_signing_keypair() |
---|
816 | public_key_str = remove_prefix(ed25519.string_from_verifying_key(public_key), b"pub-") |
---|
817 | furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short") |
---|
818 | ann_t = make_ann_t(ic, furl1, private_key, 1) |
---|
819 | |
---|
820 | ic.got_announcements([ann_t]) |
---|
821 | yield flushEventualQueue() |
---|
822 | |
---|
823 | # check the cache for the announcement |
---|
824 | announcements = self._load_cache(cache_filepath) |
---|
825 | self.failUnlessEqual(len(announcements), 1) |
---|
826 | self.failUnlessEqual(ensure_binary(announcements[0]['key_s']), public_key_str) |
---|
827 | ann = announcements[0]["ann"] |
---|
828 | self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1) |
---|
829 | self.failUnlessEqual(ann["seqnum"], 1) |
---|
830 | |
---|
831 | # a new announcement that replaces the first should replace the |
---|
832 | # cached entry, not duplicate it |
---|
833 | furl2 = furl1 + "er" |
---|
834 | ann_t2 = make_ann_t(ic, furl2, private_key, 2) |
---|
835 | ic.got_announcements([ann_t2]) |
---|
836 | yield flushEventualQueue() |
---|
837 | announcements = self._load_cache(cache_filepath) |
---|
838 | self.failUnlessEqual(len(announcements), 1) |
---|
839 | self.failUnlessEqual(ensure_binary(announcements[0]['key_s']), public_key_str) |
---|
840 | ann = announcements[0]["ann"] |
---|
841 | self.failUnlessEqual(ann["anonymous-storage-FURL"], furl2) |
---|
842 | self.failUnlessEqual(ann["seqnum"], 2) |
---|
843 | |
---|
844 | # but a third announcement with a different key should add to the |
---|
845 | # cache |
---|
846 | private_key2, public_key2 = ed25519.create_signing_keypair() |
---|
847 | public_key_str2 = remove_prefix(ed25519.string_from_verifying_key(public_key2), b"pub-") |
---|
848 | furl3 = "pb://onug64tu@127.0.0.1:456/short" |
---|
849 | ann_t3 = make_ann_t(ic, furl3, private_key2, 1) |
---|
850 | ic.got_announcements([ann_t3]) |
---|
851 | yield flushEventualQueue() |
---|
852 | |
---|
853 | announcements = self._load_cache(cache_filepath) |
---|
854 | self.failUnlessEqual(len(announcements), 2) |
---|
855 | self.failUnlessEqual(set([public_key_str, public_key_str2]), |
---|
856 | set([ensure_binary(a["key_s"]) for a in announcements])) |
---|
857 | self.failUnlessEqual(set([furl2, furl3]), |
---|
858 | set([a["ann"]["anonymous-storage-FURL"] |
---|
859 | for a in announcements])) |
---|
860 | |
---|
861 | # test loading |
---|
862 | yield flushEventualQueue() |
---|
863 | ic2 = IntroducerClient(None, "introducer.furl", u"my_nickname", |
---|
864 | "my_version", "oldest_version", fakeseq, |
---|
865 | ic._cache_filepath) |
---|
866 | announcements = {} |
---|
867 | def got(key_s, ann): |
---|
868 | announcements[key_s] = ann |
---|
869 | ic2.subscribe_to("storage", got) |
---|
870 | ic2._load_announcements() # normally happens when connection fails |
---|
871 | yield flushEventualQueue() |
---|
872 | |
---|
873 | self.failUnless(public_key_str in announcements) |
---|
874 | self.failUnlessEqual(announcements[public_key_str]["anonymous-storage-FURL"], |
---|
875 | furl2) |
---|
876 | self.failUnlessEqual(announcements[public_key_str2]["anonymous-storage-FURL"], |
---|
877 | furl3) |
---|
878 | |
---|
879 | c2 = yield create_client(basedir.path) |
---|
880 | c2.introducer_clients[0]._load_announcements() |
---|
881 | yield flushEventualQueue() |
---|
882 | self.assertEqual(c2.storage_broker.get_all_serverids(), |
---|
883 | frozenset([public_key_str, public_key_str2])) |
---|
884 | |
---|
885 | class ClientSeqnums(AsyncBrokenTestCase): |
---|
886 | |
---|
887 | @defer.inlineCallbacks |
---|
888 | def test_client(self): |
---|
889 | basedir = FilePath("introducer/ClientSeqnums/test_client") |
---|
890 | private = basedir.child("private") |
---|
891 | private.makedirs() |
---|
892 | write_introducer(basedir, "default", "nope") |
---|
893 | # if storage is enabled, the Client will publish its storage server |
---|
894 | # during startup (although the announcement will wait in a queue |
---|
895 | # until the introducer connection is established). To avoid getting |
---|
896 | # confused by this, disable storage. |
---|
897 | with basedir.child("tahoe.cfg").open("w") as f: |
---|
898 | f.write(b"[storage]\n") |
---|
899 | f.write(b"enabled = false\n") |
---|
900 | |
---|
901 | c = yield create_client(basedir.path) |
---|
902 | ic = c.introducer_clients[0] |
---|
903 | outbound = ic._outbound_announcements |
---|
904 | published = ic._published_announcements |
---|
905 | def read_seqnum(): |
---|
906 | seqnum = basedir.child("announcement-seqnum").getContent() |
---|
907 | return int(seqnum) |
---|
908 | |
---|
909 | ic.publish("sA", {"key": "value1"}, c._node_private_key) |
---|
910 | self.failUnlessEqual(read_seqnum(), 1) |
---|
911 | self.failUnless("sA" in outbound) |
---|
912 | self.failUnlessEqual(outbound["sA"]["seqnum"], 1) |
---|
913 | nonce1 = outbound["sA"]["nonce"] |
---|
914 | self.failUnless(isinstance(nonce1, bytes)) |
---|
915 | # Make nonce unicode, to match JSON: |
---|
916 | outbound["sA"]["nonce"] = str(nonce1, "utf-8") |
---|
917 | self.failUnlessEqual(json.loads(published["sA"][0]), |
---|
918 | outbound["sA"]) |
---|
919 | # [1] is the signature, [2] is the pubkey |
---|
920 | |
---|
921 | # publishing a second service causes both services to be |
---|
922 | # re-published, with the next higher sequence number |
---|
923 | ic.publish("sB", {"key": "value2"}, c._node_private_key) |
---|
924 | self.failUnlessEqual(read_seqnum(), 2) |
---|
925 | self.failUnless("sB" in outbound) |
---|
926 | self.failUnlessEqual(outbound["sB"]["seqnum"], 2) |
---|
927 | self.failUnless("sA" in outbound) |
---|
928 | self.failUnlessEqual(outbound["sA"]["seqnum"], 2) |
---|
929 | nonce2 = outbound["sA"]["nonce"] |
---|
930 | self.failUnless(isinstance(nonce2, bytes)) |
---|
931 | self.failIfEqual(nonce1, nonce2) |
---|
932 | # Make nonce unicode, to match JSON: |
---|
933 | outbound["sA"]["nonce"] = str(nonce2, "utf-8") |
---|
934 | outbound["sB"]["nonce"] = str(outbound["sB"]["nonce"], "utf-8") |
---|
935 | self.failUnlessEqual(json.loads(published["sA"][0]), |
---|
936 | outbound["sA"]) |
---|
937 | self.failUnlessEqual(json.loads(published["sB"][0]), |
---|
938 | outbound["sB"]) |
---|
939 | |
---|
940 | |
---|
941 | |
---|
942 | class TooNewServer(IntroducerService): |
---|
943 | VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999": |
---|
944 | { }, |
---|
945 | "application-version": "greetings from the crazy future", |
---|
946 | } |
---|
947 | |
---|
948 | class NonV1Server(SystemTestMixin, AsyncTestCase): |
---|
949 | # if the client connects to a server that doesn't provide the 'v2' |
---|
950 | # protocol, it is supposed to provide a useful error instead of a weird |
---|
951 | # exception. |
---|
952 | |
---|
953 | def test_failure(self): |
---|
954 | self.basedir = "introducer/NonV1Server/failure" |
---|
955 | os.makedirs(self.basedir) |
---|
956 | self.create_tub() |
---|
957 | i = TooNewServer() |
---|
958 | i.setServiceParent(self.parent) |
---|
959 | self.introducer_furl = self.central_tub.registerReference(i) |
---|
960 | |
---|
961 | tub = Tub() |
---|
962 | tub.setOption("expose-remote-exception-types", False) |
---|
963 | tub.setServiceParent(self.parent) |
---|
964 | listenOnUnused(tub) |
---|
965 | c = IntroducerClient(tub, self.introducer_furl, |
---|
966 | u"nickname-client", "version", "oldest", |
---|
967 | fakeseq, FilePath(self.mktemp())) |
---|
968 | announcements = {} |
---|
969 | def got(key_s, ann): |
---|
970 | announcements[key_s] = ann |
---|
971 | c.subscribe_to("storage", got) |
---|
972 | |
---|
973 | c.setServiceParent(self.parent) |
---|
974 | |
---|
975 | # now we wait for it to connect and notice the bad version |
---|
976 | |
---|
977 | def _got_bad(): |
---|
978 | return bool(c._introducer_error) or bool(c._publisher) |
---|
979 | d = self.poll(_got_bad) |
---|
980 | def _done(res): |
---|
981 | self.failUnless(c._introducer_error) |
---|
982 | self.failUnless(c._introducer_error.check(InsufficientVersionError), |
---|
983 | c._introducer_error) |
---|
984 | d.addCallback(_done) |
---|
985 | return d |
---|
986 | |
---|
987 | class DecodeFurl(SyncTestCase): |
---|
988 | def test_decode(self): |
---|
989 | # make sure we have a working base64.b32decode. The one in |
---|
990 | # python2.4.[01] was broken. |
---|
991 | furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i' |
---|
992 | m = re.match(r'pb://(\w+)@', furl) |
---|
993 | assert m |
---|
994 | nodeid = b32decode(m.group(1).upper().encode("ascii")) |
---|
995 | self.failUnlessEqual(nodeid, b"\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d") |
---|
996 | |
---|
997 | class Signatures(SyncTestCase): |
---|
998 | |
---|
999 | def test_sign(self): |
---|
1000 | ann = {"key1": "value1"} |
---|
1001 | private_key, public_key = ed25519.create_signing_keypair() |
---|
1002 | public_key_str = ed25519.string_from_verifying_key(public_key) |
---|
1003 | ann_t = sign_to_foolscap(ann, private_key) |
---|
1004 | (msg, sig, key) = ann_t |
---|
1005 | self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes |
---|
1006 | self.failUnlessEqual(json.loads(msg.decode("utf-8")), ann) |
---|
1007 | self.failUnless(sig.startswith(b"v0-")) |
---|
1008 | self.failUnless(key.startswith(b"v0-")) |
---|
1009 | (ann2,key2) = unsign_from_foolscap(ann_t) |
---|
1010 | self.failUnlessEqual(ann2, ann) |
---|
1011 | self.failUnlessEqual(b"pub-" + key2, public_key_str) |
---|
1012 | |
---|
1013 | # not signed |
---|
1014 | self.failUnlessRaises(UnknownKeyError, |
---|
1015 | unsign_from_foolscap, (msg, None, key)) |
---|
1016 | self.failUnlessRaises(UnknownKeyError, |
---|
1017 | unsign_from_foolscap, (msg, sig, None)) |
---|
1018 | # bad signature |
---|
1019 | bad_ann = {"key1": "value2"} |
---|
1020 | bad_msg = json.dumps(bad_ann).encode("utf-8") |
---|
1021 | self.failUnlessRaises(BadSignature, |
---|
1022 | unsign_from_foolscap, (bad_msg, sig, key)) |
---|
1023 | |
---|
1024 | # unrecognized signatures |
---|
1025 | self.failUnlessRaises(UnknownKeyError, |
---|
1026 | unsign_from_foolscap, (bad_msg, b"v999-sig", key)) |
---|
1027 | self.failUnlessRaises(UnknownKeyError, |
---|
1028 | unsign_from_foolscap, (bad_msg, sig, b"v999-key")) |
---|
1029 | |
---|
1030 | def test_unsigned_announcement(self): |
---|
1031 | """ |
---|
1032 | An incorrectly signed announcement is not delivered to subscribers. |
---|
1033 | """ |
---|
1034 | private_key, public_key = ed25519.create_signing_keypair() |
---|
1035 | public_key_str = ed25519.string_from_verifying_key(public_key) |
---|
1036 | |
---|
1037 | ic = IntroducerClient( |
---|
1038 | Tub(), |
---|
1039 | "pb://", |
---|
1040 | u"fake_nick", |
---|
1041 | "0.0.0", |
---|
1042 | "1.2.3", |
---|
1043 | (0, u"i am a nonce"), |
---|
1044 | FilePath(self.mktemp()), |
---|
1045 | ) |
---|
1046 | received = {} |
---|
1047 | ic.subscribe_to("good-stuff", partial(setitem, received)) |
---|
1048 | |
---|
1049 | # Deliver a good message to prove our test code is valid. |
---|
1050 | ann = {"service-name": "good-stuff", "payload": "hello"} |
---|
1051 | ann_t = sign_to_foolscap(ann, private_key) |
---|
1052 | ic.got_announcements([ann_t]) |
---|
1053 | |
---|
1054 | self.assertEqual( |
---|
1055 | {public_key_str[len("pub-"):]: ann}, |
---|
1056 | received, |
---|
1057 | ) |
---|
1058 | received.clear() |
---|
1059 | |
---|
1060 | # Now deliver one without a valid signature and observe that it isn't |
---|
1061 | # delivered to the subscriber. |
---|
1062 | ann = {"service-name": "good-stuff", "payload": "bad stuff"} |
---|
1063 | (msg, sig, key) = sign_to_foolscap(ann, private_key) |
---|
1064 | # Drop a base32 word from the middle of the key to invalidate the |
---|
1065 | # signature. |
---|
1066 | sig_a = bytearray(sig) |
---|
1067 | sig_a[20:22] = [] |
---|
1068 | sig = bytes(sig_a) |
---|
1069 | ann_t = (msg, sig, key) |
---|
1070 | ic.got_announcements([ann_t]) |
---|
1071 | |
---|
1072 | # The received announcements dict should remain empty because we |
---|
1073 | # should not receive the announcement with the invalid signature. |
---|
1074 | self.assertEqual( |
---|
1075 | {}, |
---|
1076 | received, |
---|
1077 | ) |
---|
1078 | |
---|
1079 | |
---|
1080 | # add tests of StorageFarmBroker: if it receives duplicate announcements, it |
---|
1081 | # should leave the Reconnector in place, also if it receives |
---|
1082 | # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it |
---|
1083 | # should tear down the Reconnector and make a new one. This behavior used to |
---|
1084 | # live in the IntroducerClient, and thus used to be tested by test_introducer |
---|
1085 | |
---|
1086 | # copying more tests from old branch: |
---|
1087 | |
---|
1088 | # then also add Upgrade test |
---|