1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from zope.interface import implementer |
---|
6 | from twisted.internet import defer |
---|
7 | from foolscap.api import DeadReferenceError, RemoteException |
---|
8 | from allmydata import hashtree, codec, uri |
---|
9 | from allmydata.interfaces import IValidatedThingProxy, IVerifierURI |
---|
10 | from allmydata.hashtree import IncompleteHashTree |
---|
11 | from allmydata.check_results import CheckResults |
---|
12 | from allmydata.uri import CHKFileVerifierURI |
---|
13 | from allmydata.util.assertutil import precondition |
---|
14 | from allmydata.util import base32, deferredutil, dictutil, log, mathutil |
---|
15 | from allmydata.util.hashutil import file_renewal_secret_hash, \ |
---|
16 | file_cancel_secret_hash, bucket_renewal_secret_hash, \ |
---|
17 | bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \ |
---|
18 | block_hash |
---|
19 | from allmydata.util.happinessutil import servers_of_happiness |
---|
20 | |
---|
21 | from allmydata.immutable import layout |
---|
22 | |
---|
23 | class IntegrityCheckReject(Exception): |
---|
24 | pass |
---|
25 | class BadURIExtension(IntegrityCheckReject): |
---|
26 | pass |
---|
27 | class BadURIExtensionHashValue(IntegrityCheckReject): |
---|
28 | pass |
---|
29 | class BadOrMissingHash(IntegrityCheckReject): |
---|
30 | pass |
---|
31 | class UnsupportedErasureCodec(BadURIExtension): |
---|
32 | pass |
---|
33 | |
---|
34 | @implementer(IValidatedThingProxy) |
---|
35 | class ValidatedExtendedURIProxy: |
---|
36 | """ I am a front-end for a remote UEB (using a local ReadBucketProxy), |
---|
37 | responsible for retrieving and validating the elements from the UEB.""" |
---|
38 | |
---|
39 | def __init__(self, readbucketproxy, verifycap, fetch_failures=None): |
---|
40 | # fetch_failures is for debugging -- see test_encode.py |
---|
41 | self._fetch_failures = fetch_failures |
---|
42 | self._readbucketproxy = readbucketproxy |
---|
43 | precondition(IVerifierURI.providedBy(verifycap), verifycap) |
---|
44 | self._verifycap = verifycap |
---|
45 | |
---|
46 | # required |
---|
47 | self.segment_size = None |
---|
48 | self.crypttext_root_hash = None |
---|
49 | self.share_root_hash = None |
---|
50 | |
---|
51 | # computed |
---|
52 | self.block_size = None |
---|
53 | self.share_size = None |
---|
54 | self.num_segments = None |
---|
55 | self.tail_data_size = None |
---|
56 | self.tail_segment_size = None |
---|
57 | |
---|
58 | # optional |
---|
59 | self.crypttext_hash = None |
---|
60 | |
---|
61 | def __str__(self): |
---|
62 | return "<%s %r>" % (self.__class__.__name__, self._verifycap.to_string()) |
---|
63 | |
---|
64 | def _check_integrity(self, data): |
---|
65 | h = uri_extension_hash(data) |
---|
66 | if h != self._verifycap.uri_extension_hash: |
---|
67 | msg = ("The copy of uri_extension we received from %s was bad: wanted %r, got %r" % |
---|
68 | (self._readbucketproxy, |
---|
69 | base32.b2a(self._verifycap.uri_extension_hash), |
---|
70 | base32.b2a(h))) |
---|
71 | if self._fetch_failures is not None: |
---|
72 | self._fetch_failures["uri_extension"] += 1 |
---|
73 | raise BadURIExtensionHashValue(msg) |
---|
74 | else: |
---|
75 | return data |
---|
76 | |
---|
77 | def _parse_and_validate(self, data): |
---|
78 | self.share_size = mathutil.div_ceil(self._verifycap.size, |
---|
79 | self._verifycap.needed_shares) |
---|
80 | |
---|
81 | d = uri.unpack_extension(data) |
---|
82 | |
---|
83 | # There are several kinds of things that can be found in a UEB. |
---|
84 | # First, things that we really need to learn from the UEB in order to |
---|
85 | # do this download. Next: things which are optional but not redundant |
---|
86 | # -- if they are present in the UEB they will get used. Next, things |
---|
87 | # that are optional and redundant. These things are required to be |
---|
88 | # consistent: they don't have to be in the UEB, but if they are in |
---|
89 | # the UEB then they will be checked for consistency with the |
---|
90 | # already-known facts, and if they are inconsistent then an exception |
---|
91 | # will be raised. These things aren't actually used -- they are just |
---|
92 | # tested for consistency and ignored. Finally: things which are |
---|
93 | # deprecated -- they ought not be in the UEB at all, and if they are |
---|
94 | # present then a warning will be logged but they are otherwise |
---|
95 | # ignored. |
---|
96 | |
---|
97 | # First, things that we really need to learn from the UEB: |
---|
98 | # segment_size, crypttext_root_hash, and share_root_hash. |
---|
99 | self.segment_size = d['segment_size'] |
---|
100 | |
---|
101 | self.block_size = mathutil.div_ceil(self.segment_size, |
---|
102 | self._verifycap.needed_shares) |
---|
103 | self.num_segments = mathutil.div_ceil(self._verifycap.size, |
---|
104 | self.segment_size) |
---|
105 | |
---|
106 | self.tail_data_size = self._verifycap.size % self.segment_size |
---|
107 | if not self.tail_data_size: |
---|
108 | self.tail_data_size = self.segment_size |
---|
109 | # padding for erasure code |
---|
110 | self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, |
---|
111 | self._verifycap.needed_shares) |
---|
112 | |
---|
113 | # Ciphertext hash tree root is mandatory, so that there is at most |
---|
114 | # one ciphertext that matches this read-cap or verify-cap. The |
---|
115 | # integrity check on the shares is not sufficient to prevent the |
---|
116 | # original encoder from creating some shares of file A and other |
---|
117 | # shares of file B. |
---|
118 | self.crypttext_root_hash = d['crypttext_root_hash'] |
---|
119 | |
---|
120 | self.share_root_hash = d['share_root_hash'] |
---|
121 | |
---|
122 | |
---|
123 | # Next: things that are optional and not redundant: crypttext_hash |
---|
124 | if 'crypttext_hash' in d: |
---|
125 | self.crypttext_hash = d['crypttext_hash'] |
---|
126 | if len(self.crypttext_hash) != CRYPTO_VAL_SIZE: |
---|
127 | raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),)) |
---|
128 | |
---|
129 | |
---|
130 | # Next: things that are optional, redundant, and required to be |
---|
131 | # consistent: codec_name, codec_params, tail_codec_params, |
---|
132 | # num_segments, size, needed_shares, total_shares |
---|
133 | if 'codec_name' in d: |
---|
134 | if d['codec_name'] != b"crs": |
---|
135 | raise UnsupportedErasureCodec(d['codec_name']) |
---|
136 | |
---|
137 | if 'codec_params' in d: |
---|
138 | ucpss, ucpns, ucpts = codec.parse_params(d['codec_params']) |
---|
139 | if ucpss != self.segment_size: |
---|
140 | raise BadURIExtension("inconsistent erasure code params: " |
---|
141 | "ucpss: %s != self.segment_size: %s" % |
---|
142 | (ucpss, self.segment_size)) |
---|
143 | if ucpns != self._verifycap.needed_shares: |
---|
144 | raise BadURIExtension("inconsistent erasure code params: ucpns: %s != " |
---|
145 | "self._verifycap.needed_shares: %s" % |
---|
146 | (ucpns, self._verifycap.needed_shares)) |
---|
147 | if ucpts != self._verifycap.total_shares: |
---|
148 | raise BadURIExtension("inconsistent erasure code params: ucpts: %s != " |
---|
149 | "self._verifycap.total_shares: %s" % |
---|
150 | (ucpts, self._verifycap.total_shares)) |
---|
151 | |
---|
152 | if 'tail_codec_params' in d: |
---|
153 | utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params']) |
---|
154 | if utcpss != self.tail_segment_size: |
---|
155 | raise BadURIExtension("inconsistent erasure code params: utcpss: %s != " |
---|
156 | "self.tail_segment_size: %s, self._verifycap.size: %s, " |
---|
157 | "self.segment_size: %s, self._verifycap.needed_shares: %s" |
---|
158 | % (utcpss, self.tail_segment_size, self._verifycap.size, |
---|
159 | self.segment_size, self._verifycap.needed_shares)) |
---|
160 | if utcpns != self._verifycap.needed_shares: |
---|
161 | raise BadURIExtension("inconsistent erasure code params: utcpns: %s != " |
---|
162 | "self._verifycap.needed_shares: %s" % (utcpns, |
---|
163 | self._verifycap.needed_shares)) |
---|
164 | if utcpts != self._verifycap.total_shares: |
---|
165 | raise BadURIExtension("inconsistent erasure code params: utcpts: %s != " |
---|
166 | "self._verifycap.total_shares: %s" % (utcpts, |
---|
167 | self._verifycap.total_shares)) |
---|
168 | |
---|
169 | if 'num_segments' in d: |
---|
170 | if d['num_segments'] != self.num_segments: |
---|
171 | raise BadURIExtension("inconsistent num_segments: size: %s, " |
---|
172 | "segment_size: %s, computed_num_segments: %s, " |
---|
173 | "ueb_num_segments: %s" % (self._verifycap.size, |
---|
174 | self.segment_size, |
---|
175 | self.num_segments, d['num_segments'])) |
---|
176 | |
---|
177 | if 'size' in d: |
---|
178 | if d['size'] != self._verifycap.size: |
---|
179 | raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" % |
---|
180 | (self._verifycap.size, d['size'])) |
---|
181 | |
---|
182 | if 'needed_shares' in d: |
---|
183 | if d['needed_shares'] != self._verifycap.needed_shares: |
---|
184 | raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB " |
---|
185 | "needed shares: %s" % (self._verifycap.total_shares, |
---|
186 | d['needed_shares'])) |
---|
187 | |
---|
188 | if 'total_shares' in d: |
---|
189 | if d['total_shares'] != self._verifycap.total_shares: |
---|
190 | raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB " |
---|
191 | "total shares: %s" % (self._verifycap.total_shares, |
---|
192 | d['total_shares'])) |
---|
193 | |
---|
194 | # Finally, things that are deprecated and ignored: plaintext_hash, |
---|
195 | # plaintext_root_hash |
---|
196 | if d.get('plaintext_hash'): |
---|
197 | log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons " |
---|
198 | "and is no longer used. Ignoring. %s" % (self,)) |
---|
199 | if d.get('plaintext_root_hash'): |
---|
200 | log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security " |
---|
201 | "reasons and is no longer used. Ignoring. %s" % (self,)) |
---|
202 | |
---|
203 | return self |
---|
204 | |
---|
205 | def start(self): |
---|
206 | """Fetch the UEB from bucket, compare its hash to the hash from |
---|
207 | verifycap, then parse it. Returns a deferred which is called back |
---|
208 | with self once the fetch is successful, or is erred back if it |
---|
209 | fails.""" |
---|
210 | d = self._readbucketproxy.get_uri_extension() |
---|
211 | d.addCallback(self._check_integrity) |
---|
212 | d.addCallback(self._parse_and_validate) |
---|
213 | return d |
---|
214 | |
---|
215 | class ValidatedReadBucketProxy(log.PrefixingLogMixin): |
---|
216 | """I am a front-end for a remote storage bucket, responsible for |
---|
217 | retrieving and validating data from that bucket. |
---|
218 | |
---|
219 | My get_block() method is used by BlockDownloaders. |
---|
220 | """ |
---|
221 | |
---|
222 | def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, |
---|
223 | block_size, share_size): |
---|
224 | """ share_hash_tree is required to have already been initialized with |
---|
225 | the root hash (the number-0 hash), using the share_root_hash from the |
---|
226 | UEB""" |
---|
227 | precondition(share_hash_tree[0] is not None, share_hash_tree) |
---|
228 | prefix = "%d-%s-%s" % (sharenum, bucket, |
---|
229 | str(base32.b2a(share_hash_tree[0][:8])[:12], "ascii")) |
---|
230 | log.PrefixingLogMixin.__init__(self, |
---|
231 | facility="tahoe.immutable.download", |
---|
232 | prefix=prefix) |
---|
233 | self.sharenum = sharenum |
---|
234 | self.bucket = bucket |
---|
235 | self.share_hash_tree = share_hash_tree |
---|
236 | self.num_blocks = num_blocks |
---|
237 | self.block_size = block_size |
---|
238 | self.share_size = share_size |
---|
239 | self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks) |
---|
240 | |
---|
241 | def get_all_sharehashes(self): |
---|
242 | """Retrieve and validate all the share-hash-tree nodes that are |
---|
243 | included in this share, regardless of whether we need them to |
---|
244 | validate the share or not. Each share contains a minimal Merkle tree |
---|
245 | chain, but there is lots of overlap, so usually we'll be using hashes |
---|
246 | from other shares and not reading every single hash from this share. |
---|
247 | The Verifier uses this function to read and validate every single |
---|
248 | hash from this share. |
---|
249 | |
---|
250 | Call this (and wait for the Deferred it returns to fire) before |
---|
251 | calling get_block() for the first time: this lets us check that the |
---|
252 | share share contains enough hashes to validate its own data, and |
---|
253 | avoids downloading any share hash twice. |
---|
254 | |
---|
255 | I return a Deferred which errbacks upon failure, probably with |
---|
256 | BadOrMissingHash.""" |
---|
257 | |
---|
258 | d = self.bucket.get_share_hashes() |
---|
259 | def _got_share_hashes(sh): |
---|
260 | sharehashes = dict(sh) |
---|
261 | try: |
---|
262 | self.share_hash_tree.set_hashes(sharehashes) |
---|
263 | except IndexError as le: |
---|
264 | raise BadOrMissingHash(le) |
---|
265 | except (hashtree.BadHashError, hashtree.NotEnoughHashesError) as le: |
---|
266 | raise BadOrMissingHash(le) |
---|
267 | d.addCallback(_got_share_hashes) |
---|
268 | return d |
---|
269 | |
---|
270 | def get_all_blockhashes(self): |
---|
271 | """Retrieve and validate all the block-hash-tree nodes that are |
---|
272 | included in this share. Each share contains a full Merkle tree, but |
---|
273 | we usually only fetch the minimal subset necessary for any particular |
---|
274 | block. This function fetches everything at once. The Verifier uses |
---|
275 | this function to validate the block hash tree. |
---|
276 | |
---|
277 | Call this (and wait for the Deferred it returns to fire) after |
---|
278 | calling get_all_sharehashes() and before calling get_block() for the |
---|
279 | first time: this lets us check that the share contains all block |
---|
280 | hashes and avoids downloading them multiple times. |
---|
281 | |
---|
282 | I return a Deferred which errbacks upon failure, probably with |
---|
283 | BadOrMissingHash. |
---|
284 | """ |
---|
285 | |
---|
286 | # get_block_hashes(anything) currently always returns everything |
---|
287 | needed = list(range(len(self.block_hash_tree))) |
---|
288 | d = self.bucket.get_block_hashes(needed) |
---|
289 | def _got_block_hashes(blockhashes): |
---|
290 | if len(blockhashes) < len(self.block_hash_tree): |
---|
291 | raise BadOrMissingHash() |
---|
292 | bh = dict(enumerate(blockhashes)) |
---|
293 | |
---|
294 | try: |
---|
295 | self.block_hash_tree.set_hashes(bh) |
---|
296 | except IndexError as le: |
---|
297 | raise BadOrMissingHash(le) |
---|
298 | except (hashtree.BadHashError, hashtree.NotEnoughHashesError) as le: |
---|
299 | raise BadOrMissingHash(le) |
---|
300 | d.addCallback(_got_block_hashes) |
---|
301 | return d |
---|
302 | |
---|
303 | def get_all_crypttext_hashes(self, crypttext_hash_tree): |
---|
304 | """Retrieve and validate all the crypttext-hash-tree nodes that are |
---|
305 | in this share. Normally we don't look at these at all: the download |
---|
306 | process fetches them incrementally as needed to validate each segment |
---|
307 | of ciphertext. But this is a convenient place to give the Verifier a |
---|
308 | function to validate all of these at once. |
---|
309 | |
---|
310 | Call this with a new hashtree object for each share, initialized with |
---|
311 | the crypttext hash tree root. I return a Deferred which errbacks upon |
---|
312 | failure, probably with BadOrMissingHash. |
---|
313 | """ |
---|
314 | |
---|
315 | # get_crypttext_hashes() always returns everything |
---|
316 | d = self.bucket.get_crypttext_hashes() |
---|
317 | def _got_crypttext_hashes(hashes): |
---|
318 | if len(hashes) < len(crypttext_hash_tree): |
---|
319 | raise BadOrMissingHash() |
---|
320 | ct_hashes = dict(enumerate(hashes)) |
---|
321 | try: |
---|
322 | crypttext_hash_tree.set_hashes(ct_hashes) |
---|
323 | except IndexError as le: |
---|
324 | raise BadOrMissingHash(le) |
---|
325 | except (hashtree.BadHashError, hashtree.NotEnoughHashesError) as le: |
---|
326 | raise BadOrMissingHash(le) |
---|
327 | d.addCallback(_got_crypttext_hashes) |
---|
328 | return d |
---|
329 | |
---|
330 | def get_block(self, blocknum): |
---|
331 | # the first time we use this bucket, we need to fetch enough elements |
---|
332 | # of the share hash tree to validate it from our share hash up to the |
---|
333 | # hashroot. |
---|
334 | if self.share_hash_tree.needed_hashes(self.sharenum): |
---|
335 | d1 = self.bucket.get_share_hashes() |
---|
336 | else: |
---|
337 | d1 = defer.succeed([]) |
---|
338 | |
---|
339 | # We might need to grab some elements of our block hash tree, to |
---|
340 | # validate the requested block up to the share hash. |
---|
341 | blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True) |
---|
342 | # We don't need the root of the block hash tree, as that comes in the |
---|
343 | # share tree. |
---|
344 | blockhashesneeded.discard(0) |
---|
345 | d2 = self.bucket.get_block_hashes(blockhashesneeded) |
---|
346 | |
---|
347 | if blocknum < self.num_blocks-1: |
---|
348 | thisblocksize = self.block_size |
---|
349 | else: |
---|
350 | thisblocksize = self.share_size % self.block_size |
---|
351 | if thisblocksize == 0: |
---|
352 | thisblocksize = self.block_size |
---|
353 | d3 = self.bucket.get_block_data(blocknum, |
---|
354 | self.block_size, thisblocksize) |
---|
355 | |
---|
356 | dl = deferredutil.gatherResults([d1, d2, d3]) |
---|
357 | dl.addCallback(self._got_data, blocknum) |
---|
358 | return dl |
---|
359 | |
---|
360 | def _got_data(self, results, blocknum): |
---|
361 | precondition(blocknum < self.num_blocks, |
---|
362 | self, blocknum, self.num_blocks) |
---|
363 | sharehashes, blockhashes, blockdata = results |
---|
364 | try: |
---|
365 | sharehashes = dict(sharehashes) |
---|
366 | except ValueError as le: |
---|
367 | le.args = tuple(le.args + (sharehashes,)) |
---|
368 | raise |
---|
369 | blockhashes = dict(enumerate(blockhashes)) |
---|
370 | |
---|
371 | candidate_share_hash = None # in case we log it in the except block below |
---|
372 | blockhash = None # in case we log it in the except block below |
---|
373 | |
---|
374 | try: |
---|
375 | if self.share_hash_tree.needed_hashes(self.sharenum): |
---|
376 | # This will raise exception if the values being passed do not |
---|
377 | # match the root node of self.share_hash_tree. |
---|
378 | try: |
---|
379 | self.share_hash_tree.set_hashes(sharehashes) |
---|
380 | except IndexError as le: |
---|
381 | # Weird -- sharehashes contained index numbers outside of |
---|
382 | # the range that fit into this hash tree. |
---|
383 | raise BadOrMissingHash(le) |
---|
384 | |
---|
385 | # To validate a block we need the root of the block hash tree, |
---|
386 | # which is also one of the leafs of the share hash tree, and is |
---|
387 | # called "the share hash". |
---|
388 | if not self.block_hash_tree[0]: # empty -- no root node yet |
---|
389 | # Get the share hash from the share hash tree. |
---|
390 | share_hash = self.share_hash_tree.get_leaf(self.sharenum) |
---|
391 | if not share_hash: |
---|
392 | # No root node in block_hash_tree and also the share hash |
---|
393 | # wasn't sent by the server. |
---|
394 | raise hashtree.NotEnoughHashesError |
---|
395 | self.block_hash_tree.set_hashes({0: share_hash}) |
---|
396 | |
---|
397 | if self.block_hash_tree.needed_hashes(blocknum): |
---|
398 | self.block_hash_tree.set_hashes(blockhashes) |
---|
399 | |
---|
400 | blockhash = block_hash(blockdata) |
---|
401 | self.block_hash_tree.set_hashes(leaves={blocknum: blockhash}) |
---|
402 | #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d " |
---|
403 | # "%r .. %r: %s" % |
---|
404 | # (self.sharenum, blocknum, len(blockdata), |
---|
405 | # blockdata[:50], blockdata[-50:], base32.b2a(blockhash))) |
---|
406 | |
---|
407 | except (hashtree.BadHashError, hashtree.NotEnoughHashesError) as le: |
---|
408 | # log.WEIRD: indicates undetected disk/network error, or more |
---|
409 | # likely a programming error |
---|
410 | self.log("hash failure in block=%d, shnum=%d on %s" % |
---|
411 | (blocknum, self.sharenum, self.bucket)) |
---|
412 | if self.block_hash_tree.needed_hashes(blocknum): |
---|
413 | self.log(""" failure occurred when checking the block_hash_tree. |
---|
414 | This suggests that either the block data was bad, or that the |
---|
415 | block hashes we received along with it were bad.""") |
---|
416 | else: |
---|
417 | self.log(""" the failure probably occurred when checking the |
---|
418 | share_hash_tree, which suggests that the share hashes we |
---|
419 | received from the remote peer were bad.""") |
---|
420 | self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash)) |
---|
421 | self.log(" block length: %d" % len(blockdata)) |
---|
422 | self.log(" block hash: %r" % base32.b2a_or_none(blockhash)) |
---|
423 | if len(blockdata) < 100: |
---|
424 | self.log(" block data: %r" % (blockdata,)) |
---|
425 | else: |
---|
426 | self.log(" block data start/end: %r .. %r" % |
---|
427 | (blockdata[:50], blockdata[-50:])) |
---|
428 | self.log(" share hash tree:\n" + self.share_hash_tree.dump()) |
---|
429 | self.log(" block hash tree:\n" + self.block_hash_tree.dump()) |
---|
430 | lines = [] |
---|
431 | for i,h in sorted(sharehashes.items()): |
---|
432 | lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) |
---|
433 | self.log(" sharehashes:\n" + "\n".join(lines) + "\n") |
---|
434 | lines = [] |
---|
435 | for i,h in list(blockhashes.items()): |
---|
436 | lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) |
---|
437 | log.msg(" blockhashes:\n" + "\n".join(lines) + "\n") |
---|
438 | raise BadOrMissingHash(le) |
---|
439 | |
---|
440 | # If we made it here, the block is good. If the hash trees didn't |
---|
441 | # like what they saw, they would have raised a BadHashError, causing |
---|
442 | # our caller to see a Failure and thus ignore this block (as well as |
---|
443 | # dropping this bucket). |
---|
444 | return blockdata |
---|
445 | |
---|
446 | |
---|
447 | class Checker(log.PrefixingLogMixin): |
---|
448 | """I query all servers to see if M uniquely-numbered shares are |
---|
449 | available. |
---|
450 | |
---|
451 | If the verify flag was passed to my constructor, then for each share I |
---|
452 | download every data block and all metadata from each server and perform a |
---|
453 | cryptographic integrity check on all of it. If not, I just ask each |
---|
454 | server 'Which shares do you have?' and believe its answer. |
---|
455 | |
---|
456 | In either case, I wait until I have gotten responses from all servers. |
---|
457 | This fact -- that I wait -- means that an ill-behaved server which fails |
---|
458 | to answer my questions will make me wait indefinitely. If it is |
---|
459 | ill-behaved in a way that triggers the underlying foolscap timeouts, then |
---|
460 | I will wait only as long as those foolscap timeouts, but if it is |
---|
461 | ill-behaved in a way which placates the foolscap timeouts but still |
---|
462 | doesn't answer my question then I will wait indefinitely. |
---|
463 | |
---|
464 | Before I send any new request to a server, I always ask the 'monitor' |
---|
465 | object that was passed into my constructor whether this task has been |
---|
466 | cancelled (by invoking its raise_if_cancelled() method). |
---|
467 | """ |
---|
468 | def __init__(self, verifycap, servers, verify, add_lease, secret_holder, |
---|
469 | monitor): |
---|
470 | assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap)) |
---|
471 | |
---|
472 | prefix = str(base32.b2a(verifycap.get_storage_index()[:8])[:12], "utf-8") |
---|
473 | log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix) |
---|
474 | |
---|
475 | self._verifycap = verifycap |
---|
476 | |
---|
477 | self._monitor = monitor |
---|
478 | self._servers = servers |
---|
479 | self._verify = verify # bool: verify what the servers claim, or not? |
---|
480 | self._add_lease = add_lease |
---|
481 | |
---|
482 | frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(), |
---|
483 | self._verifycap.get_storage_index()) |
---|
484 | self.file_renewal_secret = frs |
---|
485 | fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(), |
---|
486 | self._verifycap.get_storage_index()) |
---|
487 | self.file_cancel_secret = fcs |
---|
488 | |
---|
489 | def _get_renewal_secret(self, seed): |
---|
490 | return bucket_renewal_secret_hash(self.file_renewal_secret, seed) |
---|
491 | def _get_cancel_secret(self, seed): |
---|
492 | return bucket_cancel_secret_hash(self.file_cancel_secret, seed) |
---|
493 | |
---|
494 | def _get_buckets(self, s, storageindex): |
---|
495 | """Return a deferred that eventually fires with ({sharenum: bucket}, |
---|
496 | serverid, success). In case the server is disconnected or returns a |
---|
497 | Failure then it fires with ({}, serverid, False) (A server |
---|
498 | disconnecting or returning a Failure when we ask it for buckets is |
---|
499 | the same, for our purposes, as a server that says it has none, except |
---|
500 | that we want to track and report whether or not each server |
---|
501 | responded.)""" |
---|
502 | |
---|
503 | storage_server = s.get_storage_server() |
---|
504 | lease_seed = s.get_lease_seed() |
---|
505 | if self._add_lease: |
---|
506 | renew_secret = self._get_renewal_secret(lease_seed) |
---|
507 | cancel_secret = self._get_cancel_secret(lease_seed) |
---|
508 | d2 = storage_server.add_lease( |
---|
509 | storageindex, |
---|
510 | renew_secret, |
---|
511 | cancel_secret, |
---|
512 | ) |
---|
513 | d2.addErrback(self._add_lease_failed, s.get_name(), storageindex) |
---|
514 | |
---|
515 | d = storage_server.get_buckets(storageindex) |
---|
516 | def _wrap_results(res): |
---|
517 | return (res, True) |
---|
518 | |
---|
519 | def _trap_errs(f): |
---|
520 | level = log.WEIRD |
---|
521 | if f.check(DeadReferenceError): |
---|
522 | level = log.UNUSUAL |
---|
523 | self.log("failure from server on 'get_buckets' the REMOTE failure was:", |
---|
524 | facility="tahoe.immutable.checker", |
---|
525 | failure=f, level=level, umid="AX7wZQ") |
---|
526 | return ({}, False) |
---|
527 | |
---|
528 | d.addCallbacks(_wrap_results, _trap_errs) |
---|
529 | return d |
---|
530 | |
---|
531 | def _add_lease_failed(self, f, server_name, storage_index): |
---|
532 | # Older versions of Tahoe didn't handle the add-lease message very |
---|
533 | # well: <=1.1.0 throws a NameError because it doesn't implement |
---|
534 | # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets |
---|
535 | # (which is most of them, since we send add-lease to everybody, |
---|
536 | # before we know whether or not they have any shares for us), and |
---|
537 | # 1.2.0 throws KeyError even on known buckets due to an internal bug |
---|
538 | # in the latency-measuring code. |
---|
539 | |
---|
540 | # we want to ignore the known-harmless errors and log the others. In |
---|
541 | # particular we want to log any local errors caused by coding |
---|
542 | # problems. |
---|
543 | |
---|
544 | if f.check(DeadReferenceError): |
---|
545 | return |
---|
546 | if f.check(RemoteException): |
---|
547 | if f.value.failure.check(KeyError, IndexError, NameError): |
---|
548 | # this may ignore a bit too much, but that only hurts us |
---|
549 | # during debugging |
---|
550 | return |
---|
551 | self.log(format="error in add_lease from [%(name)s]: %(f_value)s", |
---|
552 | name=server_name, |
---|
553 | f_value=str(f.value), |
---|
554 | failure=f, |
---|
555 | level=log.WEIRD, umid="atbAxw") |
---|
556 | return |
---|
557 | # local errors are cause for alarm |
---|
558 | log.err(f, |
---|
559 | format="local error in add_lease to [%(name)s]: %(f_value)s", |
---|
560 | name=server_name, |
---|
561 | f_value=str(f.value), |
---|
562 | level=log.WEIRD, umid="hEGuQg") |
---|
563 | |
---|
564 | |
---|
565 | def _download_and_verify(self, server, sharenum, bucket): |
---|
566 | """Start an attempt to download and verify every block in this bucket |
---|
567 | and return a deferred that will eventually fire once the attempt |
---|
568 | completes. |
---|
569 | |
---|
570 | If you download and verify every block then fire with (True, |
---|
571 | sharenum, None), else if the share data couldn't be parsed because it |
---|
572 | was of an unknown version number fire with (False, sharenum, |
---|
573 | 'incompatible'), else if any of the blocks were invalid, fire with |
---|
574 | (False, sharenum, 'corrupt'), else if the server disconnected (False, |
---|
575 | sharenum, 'disconnect'), else if the server returned a Failure during |
---|
576 | the process fire with (False, sharenum, 'failure'). |
---|
577 | |
---|
578 | If there is an internal error such as an uncaught exception in this |
---|
579 | code, then the deferred will errback, but if there is a remote error |
---|
580 | such as the server failing or the returned data being incorrect then |
---|
581 | it will not errback -- it will fire normally with the indicated |
---|
582 | results.""" |
---|
583 | |
---|
584 | vcap = self._verifycap |
---|
585 | b = layout.ReadBucketProxy(bucket, server, vcap.get_storage_index()) |
---|
586 | veup = ValidatedExtendedURIProxy(b, vcap) |
---|
587 | d = veup.start() |
---|
588 | |
---|
589 | def _got_ueb(vup): |
---|
590 | share_hash_tree = IncompleteHashTree(vcap.total_shares) |
---|
591 | share_hash_tree.set_hashes({0: vup.share_root_hash}) |
---|
592 | |
---|
593 | vrbp = ValidatedReadBucketProxy(sharenum, b, |
---|
594 | share_hash_tree, |
---|
595 | vup.num_segments, |
---|
596 | vup.block_size, |
---|
597 | vup.share_size) |
---|
598 | |
---|
599 | # note: normal download doesn't use get_all_sharehashes(), |
---|
600 | # because it gets more data than necessary. We've discussed the |
---|
601 | # security properties of having verification and download look |
---|
602 | # identical (so the server couldn't, say, provide good responses |
---|
603 | # for one and not the other), but I think that full verification |
---|
604 | # is more important than defending against inconsistent server |
---|
605 | # behavior. Besides, they can't pass the verifier without storing |
---|
606 | # all the data, so there's not so much to be gained by behaving |
---|
607 | # inconsistently. |
---|
608 | d = vrbp.get_all_sharehashes() |
---|
609 | # we fill share_hash_tree before fetching any blocks, so the |
---|
610 | # block fetches won't send redundant share-hash-tree requests, to |
---|
611 | # speed things up. Then we fetch+validate all the blockhashes. |
---|
612 | d.addCallback(lambda ign: vrbp.get_all_blockhashes()) |
---|
613 | |
---|
614 | cht = IncompleteHashTree(vup.num_segments) |
---|
615 | cht.set_hashes({0: vup.crypttext_root_hash}) |
---|
616 | d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht)) |
---|
617 | |
---|
618 | d.addCallback(lambda ign: vrbp) |
---|
619 | return d |
---|
620 | d.addCallback(_got_ueb) |
---|
621 | |
---|
622 | def _discard_result(r): |
---|
623 | assert isinstance(r, bytes), r |
---|
624 | # to free up the RAM |
---|
625 | return None |
---|
626 | |
---|
627 | def _get_blocks(vrbp): |
---|
628 | def _get_block(ign, blocknum): |
---|
629 | db = vrbp.get_block(blocknum) |
---|
630 | db.addCallback(_discard_result) |
---|
631 | return db |
---|
632 | |
---|
633 | dbs = defer.succeed(None) |
---|
634 | for blocknum in range(veup.num_segments): |
---|
635 | dbs.addCallback(_get_block, blocknum) |
---|
636 | |
---|
637 | # The Deferred we return will fire after every block of this |
---|
638 | # share has been downloaded and verified successfully, or else it |
---|
639 | # will errback as soon as the first error is observed. |
---|
640 | return dbs |
---|
641 | |
---|
642 | d.addCallback(_get_blocks) |
---|
643 | |
---|
644 | # if none of those errbacked, the blocks (and the hashes above them) |
---|
645 | # are good |
---|
646 | def _all_good(ign): |
---|
647 | return (True, sharenum, None) |
---|
648 | d.addCallback(_all_good) |
---|
649 | |
---|
650 | # but if anything fails, we'll land here |
---|
651 | def _errb(f): |
---|
652 | # We didn't succeed at fetching and verifying all the blocks of |
---|
653 | # this share. Handle each reason for failure differently. |
---|
654 | |
---|
655 | if f.check(DeadReferenceError): |
---|
656 | return (False, sharenum, 'disconnect') |
---|
657 | elif f.check(RemoteException): |
---|
658 | return (False, sharenum, 'failure') |
---|
659 | elif f.check(layout.ShareVersionIncompatible): |
---|
660 | return (False, sharenum, 'incompatible') |
---|
661 | elif f.check(layout.LayoutInvalid, |
---|
662 | layout.RidiculouslyLargeURIExtensionBlock, |
---|
663 | BadOrMissingHash, |
---|
664 | BadURIExtensionHashValue): |
---|
665 | return (False, sharenum, 'corrupt') |
---|
666 | |
---|
667 | # if it wasn't one of those reasons, re-raise the error |
---|
668 | return f |
---|
669 | d.addErrback(_errb) |
---|
670 | |
---|
671 | return d |
---|
672 | |
---|
673 | def _verify_server_shares(self, s): |
---|
674 | """ Return a deferred which eventually fires with a tuple of |
---|
675 | (set(sharenum), server, set(corruptsharenum), |
---|
676 | set(incompatiblesharenum), success) showing all the shares verified |
---|
677 | to be served by this server, and all the corrupt shares served by the |
---|
678 | server, and all the incompatible shares served by the server. In case |
---|
679 | the server is disconnected or returns a Failure then it fires with |
---|
680 | the last element False. |
---|
681 | |
---|
682 | A server disconnecting or returning a failure when we ask it for |
---|
683 | shares is the same, for our purposes, as a server that says it has |
---|
684 | none or offers invalid ones, except that we want to track and report |
---|
685 | the server's behavior. Similarly, the presence of corrupt shares is |
---|
686 | mainly of use for diagnostics -- you can typically treat it as just |
---|
687 | like being no share at all by just observing its absence from the |
---|
688 | verified shares dict and ignoring its presence in the corrupt shares |
---|
689 | dict. |
---|
690 | |
---|
691 | The 'success' argument means whether the server responded to *any* |
---|
692 | queries during this process, so if it responded to some queries and |
---|
693 | then disconnected and ceased responding, or returned a failure, it is |
---|
694 | still marked with the True flag for 'success'. |
---|
695 | """ |
---|
696 | d = self._get_buckets(s, self._verifycap.get_storage_index()) |
---|
697 | |
---|
698 | def _got_buckets(result): |
---|
699 | bucketdict, success = result |
---|
700 | |
---|
701 | shareverds = [] |
---|
702 | for (sharenum, bucket) in list(bucketdict.items()): |
---|
703 | d = self._download_and_verify(s, sharenum, bucket) |
---|
704 | shareverds.append(d) |
---|
705 | |
---|
706 | dl = deferredutil.gatherResults(shareverds) |
---|
707 | |
---|
708 | def collect(results): |
---|
709 | verified = set() |
---|
710 | corrupt = set() |
---|
711 | incompatible = set() |
---|
712 | for succ, sharenum, whynot in results: |
---|
713 | if succ: |
---|
714 | verified.add(sharenum) |
---|
715 | else: |
---|
716 | if whynot == 'corrupt': |
---|
717 | corrupt.add(sharenum) |
---|
718 | elif whynot == 'incompatible': |
---|
719 | incompatible.add(sharenum) |
---|
720 | return (verified, s, corrupt, incompatible, success) |
---|
721 | |
---|
722 | dl.addCallback(collect) |
---|
723 | return dl |
---|
724 | |
---|
725 | def _err(f): |
---|
726 | f.trap(RemoteException, DeadReferenceError) |
---|
727 | return (set(), s, set(), set(), False) |
---|
728 | |
---|
729 | d.addCallbacks(_got_buckets, _err) |
---|
730 | return d |
---|
731 | |
---|
732 | def _check_server_shares(self, s): |
---|
733 | """Return a deferred which eventually fires with a tuple of |
---|
734 | (set(sharenum), server, set(corrupt), set(incompatible), |
---|
735 | responded) showing all the shares claimed to be served by this |
---|
736 | server. In case the server is disconnected then it fires with |
---|
737 | (set(), server, set(), set(), False) (a server disconnecting |
---|
738 | when we ask it for buckets is the same, for our purposes, as a |
---|
739 | server that says it has none, except that we want to track and |
---|
740 | report whether or not each server responded.) |
---|
741 | |
---|
742 | see also _verify_server_shares() |
---|
743 | """ |
---|
744 | def _curry_empty_corrupted(res): |
---|
745 | buckets, responded = res |
---|
746 | return (set(buckets), s, set(), set(), responded) |
---|
747 | d = self._get_buckets(s, self._verifycap.get_storage_index()) |
---|
748 | d.addCallback(_curry_empty_corrupted) |
---|
749 | return d |
---|
750 | |
---|
751 | def _format_results(self, results): |
---|
752 | SI = self._verifycap.get_storage_index() |
---|
753 | |
---|
754 | verifiedshares = dictutil.DictOfSets() # {sharenum: set(server)} |
---|
755 | servers = {} # {server: set(sharenums)} |
---|
756 | corruptshare_locators = [] # (server, storageindex, sharenum) |
---|
757 | incompatibleshare_locators = [] # (server, storageindex, sharenum) |
---|
758 | servers_responding = set() # server |
---|
759 | |
---|
760 | for verified, server, corrupt, incompatible, responded in results: |
---|
761 | servers.setdefault(server, set()).update(verified) |
---|
762 | for sharenum in verified: |
---|
763 | verifiedshares.setdefault(sharenum, set()).add(server) |
---|
764 | for sharenum in corrupt: |
---|
765 | corruptshare_locators.append((server, SI, sharenum)) |
---|
766 | for sharenum in incompatible: |
---|
767 | incompatibleshare_locators.append((server, SI, sharenum)) |
---|
768 | if responded: |
---|
769 | servers_responding.add(server) |
---|
770 | |
---|
771 | good_share_hosts = len([s for s in servers.keys() if servers[s]]) |
---|
772 | |
---|
773 | assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares) |
---|
774 | if len(verifiedshares) == self._verifycap.total_shares: |
---|
775 | healthy = True |
---|
776 | summary = "Healthy" |
---|
777 | else: |
---|
778 | healthy = False |
---|
779 | summary = ("Not Healthy: %d shares (enc %d-of-%d)" % |
---|
780 | (len(verifiedshares), |
---|
781 | self._verifycap.needed_shares, |
---|
782 | self._verifycap.total_shares)) |
---|
783 | if len(verifiedshares) >= self._verifycap.needed_shares: |
---|
784 | recoverable = 1 |
---|
785 | unrecoverable = 0 |
---|
786 | else: |
---|
787 | recoverable = 0 |
---|
788 | unrecoverable = 1 |
---|
789 | |
---|
790 | count_happiness = servers_of_happiness(verifiedshares) |
---|
791 | |
---|
792 | cr = CheckResults(self._verifycap, SI, |
---|
793 | healthy=healthy, recoverable=bool(recoverable), |
---|
794 | count_happiness=count_happiness, |
---|
795 | count_shares_needed=self._verifycap.needed_shares, |
---|
796 | count_shares_expected=self._verifycap.total_shares, |
---|
797 | count_shares_good=len(verifiedshares), |
---|
798 | count_good_share_hosts=good_share_hosts, |
---|
799 | count_recoverable_versions=recoverable, |
---|
800 | count_unrecoverable_versions=unrecoverable, |
---|
801 | servers_responding=list(servers_responding), |
---|
802 | sharemap=verifiedshares, |
---|
803 | count_wrong_shares=0, # no such thing, for immutable |
---|
804 | list_corrupt_shares=corruptshare_locators, |
---|
805 | count_corrupt_shares=len(corruptshare_locators), |
---|
806 | list_incompatible_shares=incompatibleshare_locators, |
---|
807 | count_incompatible_shares=len(incompatibleshare_locators), |
---|
808 | summary=summary, |
---|
809 | report=[], |
---|
810 | share_problems=[], |
---|
811 | servermap=None) |
---|
812 | |
---|
813 | return cr |
---|
814 | |
---|
815 | def start(self): |
---|
816 | ds = [] |
---|
817 | if self._verify: |
---|
818 | for s in self._servers: |
---|
819 | ds.append(self._verify_server_shares(s)) |
---|
820 | else: |
---|
821 | for s in self._servers: |
---|
822 | ds.append(self._check_server_shares(s)) |
---|
823 | |
---|
824 | return deferredutil.gatherResults(ds).addCallback(self._format_results) |
---|