source: trunk/src/allmydata/immutable/checker.py

Last change on this file was fec97256, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2025-01-06T21:51:37Z

trim Python2 syntax

  • Property mode set to 100644
File size: 37.9 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from zope.interface import implementer
6from twisted.internet import defer
7from foolscap.api import DeadReferenceError, RemoteException
8from allmydata import hashtree, codec, uri
9from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
10from allmydata.hashtree import IncompleteHashTree
11from allmydata.check_results import CheckResults
12from allmydata.uri import CHKFileVerifierURI
13from allmydata.util.assertutil import precondition
14from allmydata.util import base32, deferredutil, dictutil, log, mathutil
15from allmydata.util.hashutil import file_renewal_secret_hash, \
16     file_cancel_secret_hash, bucket_renewal_secret_hash, \
17     bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
18     block_hash
19from allmydata.util.happinessutil import servers_of_happiness
20
21from allmydata.immutable import layout
22
23class IntegrityCheckReject(Exception):
24    pass
25class BadURIExtension(IntegrityCheckReject):
26    pass
27class BadURIExtensionHashValue(IntegrityCheckReject):
28    pass
29class BadOrMissingHash(IntegrityCheckReject):
30    pass
31class UnsupportedErasureCodec(BadURIExtension):
32    pass
33
34@implementer(IValidatedThingProxy)
35class ValidatedExtendedURIProxy:
36    """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
37    responsible for retrieving and validating the elements from the UEB."""
38
39    def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
40        # fetch_failures is for debugging -- see test_encode.py
41        self._fetch_failures = fetch_failures
42        self._readbucketproxy = readbucketproxy
43        precondition(IVerifierURI.providedBy(verifycap), verifycap)
44        self._verifycap = verifycap
45
46        # required
47        self.segment_size = None
48        self.crypttext_root_hash = None
49        self.share_root_hash = None
50
51        # computed
52        self.block_size = None
53        self.share_size = None
54        self.num_segments = None
55        self.tail_data_size = None
56        self.tail_segment_size = None
57
58        # optional
59        self.crypttext_hash = None
60
61    def __str__(self):
62        return "<%s %r>" % (self.__class__.__name__, self._verifycap.to_string())
63
64    def _check_integrity(self, data):
65        h = uri_extension_hash(data)
66        if h != self._verifycap.uri_extension_hash:
67            msg = ("The copy of uri_extension we received from %s was bad: wanted %r, got %r" %
68                   (self._readbucketproxy,
69                    base32.b2a(self._verifycap.uri_extension_hash),
70                    base32.b2a(h)))
71            if self._fetch_failures is not None:
72                self._fetch_failures["uri_extension"] += 1
73            raise BadURIExtensionHashValue(msg)
74        else:
75            return data
76
77    def _parse_and_validate(self, data):
78        self.share_size = mathutil.div_ceil(self._verifycap.size,
79                                            self._verifycap.needed_shares)
80
81        d = uri.unpack_extension(data)
82
83        # There are several kinds of things that can be found in a UEB.
84        # First, things that we really need to learn from the UEB in order to
85        # do this download. Next: things which are optional but not redundant
86        # -- if they are present in the UEB they will get used. Next, things
87        # that are optional and redundant. These things are required to be
88        # consistent: they don't have to be in the UEB, but if they are in
89        # the UEB then they will be checked for consistency with the
90        # already-known facts, and if they are inconsistent then an exception
91        # will be raised. These things aren't actually used -- they are just
92        # tested for consistency and ignored. Finally: things which are
93        # deprecated -- they ought not be in the UEB at all, and if they are
94        # present then a warning will be logged but they are otherwise
95        # ignored.
96
97        # First, things that we really need to learn from the UEB:
98        # segment_size, crypttext_root_hash, and share_root_hash.
99        self.segment_size = d['segment_size']
100
101        self.block_size = mathutil.div_ceil(self.segment_size,
102                                            self._verifycap.needed_shares)
103        self.num_segments = mathutil.div_ceil(self._verifycap.size,
104                                              self.segment_size)
105
106        self.tail_data_size = self._verifycap.size % self.segment_size
107        if not self.tail_data_size:
108            self.tail_data_size = self.segment_size
109        # padding for erasure code
110        self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
111                                                        self._verifycap.needed_shares)
112
113        # Ciphertext hash tree root is mandatory, so that there is at most
114        # one ciphertext that matches this read-cap or verify-cap. The
115        # integrity check on the shares is not sufficient to prevent the
116        # original encoder from creating some shares of file A and other
117        # shares of file B.
118        self.crypttext_root_hash = d['crypttext_root_hash']
119
120        self.share_root_hash = d['share_root_hash']
121
122
123        # Next: things that are optional and not redundant: crypttext_hash
124        if 'crypttext_hash' in d:
125            self.crypttext_hash = d['crypttext_hash']
126            if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
127                raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
128
129
130        # Next: things that are optional, redundant, and required to be
131        # consistent: codec_name, codec_params, tail_codec_params,
132        # num_segments, size, needed_shares, total_shares
133        if 'codec_name' in d:
134            if d['codec_name'] != b"crs":
135                raise UnsupportedErasureCodec(d['codec_name'])
136
137        if 'codec_params' in d:
138            ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
139            if ucpss != self.segment_size:
140                raise BadURIExtension("inconsistent erasure code params: "
141                                      "ucpss: %s != self.segment_size: %s" %
142                                      (ucpss, self.segment_size))
143            if ucpns != self._verifycap.needed_shares:
144                raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
145                                      "self._verifycap.needed_shares: %s" %
146                                      (ucpns, self._verifycap.needed_shares))
147            if ucpts != self._verifycap.total_shares:
148                raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
149                                      "self._verifycap.total_shares: %s" %
150                                      (ucpts, self._verifycap.total_shares))
151
152        if 'tail_codec_params' in d:
153            utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
154            if utcpss != self.tail_segment_size:
155                raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
156                                      "self.tail_segment_size: %s, self._verifycap.size: %s, "
157                                      "self.segment_size: %s, self._verifycap.needed_shares: %s"
158                                      % (utcpss, self.tail_segment_size, self._verifycap.size,
159                                         self.segment_size, self._verifycap.needed_shares))
160            if utcpns != self._verifycap.needed_shares:
161                raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
162                                      "self._verifycap.needed_shares: %s" % (utcpns,
163                                                                             self._verifycap.needed_shares))
164            if utcpts != self._verifycap.total_shares:
165                raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
166                                      "self._verifycap.total_shares: %s" % (utcpts,
167                                                                            self._verifycap.total_shares))
168
169        if 'num_segments' in d:
170            if d['num_segments'] != self.num_segments:
171                raise BadURIExtension("inconsistent num_segments: size: %s, "
172                                      "segment_size: %s, computed_num_segments: %s, "
173                                      "ueb_num_segments: %s" % (self._verifycap.size,
174                                                                self.segment_size,
175                                                                self.num_segments, d['num_segments']))
176
177        if 'size' in d:
178            if d['size'] != self._verifycap.size:
179                raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
180                                      (self._verifycap.size, d['size']))
181
182        if 'needed_shares' in d:
183            if d['needed_shares'] != self._verifycap.needed_shares:
184                raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
185                                      "needed shares: %s" % (self._verifycap.total_shares,
186                                                             d['needed_shares']))
187
188        if 'total_shares' in d:
189            if d['total_shares'] != self._verifycap.total_shares:
190                raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
191                                      "total shares: %s" % (self._verifycap.total_shares,
192                                                            d['total_shares']))
193
194        # Finally, things that are deprecated and ignored: plaintext_hash,
195        # plaintext_root_hash
196        if d.get('plaintext_hash'):
197            log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
198                    "and is no longer used.  Ignoring.  %s" % (self,))
199        if d.get('plaintext_root_hash'):
200            log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
201                    "reasons and is no longer used.  Ignoring.  %s" % (self,))
202
203        return self
204
205    def start(self):
206        """Fetch the UEB from bucket, compare its hash to the hash from
207        verifycap, then parse it. Returns a deferred which is called back
208        with self once the fetch is successful, or is erred back if it
209        fails."""
210        d = self._readbucketproxy.get_uri_extension()
211        d.addCallback(self._check_integrity)
212        d.addCallback(self._parse_and_validate)
213        return d
214
215class ValidatedReadBucketProxy(log.PrefixingLogMixin):
216    """I am a front-end for a remote storage bucket, responsible for
217    retrieving and validating data from that bucket.
218
219    My get_block() method is used by BlockDownloaders.
220    """
221
222    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
223                 block_size, share_size):
224        """ share_hash_tree is required to have already been initialized with
225        the root hash (the number-0 hash), using the share_root_hash from the
226        UEB"""
227        precondition(share_hash_tree[0] is not None, share_hash_tree)
228        prefix = "%d-%s-%s" % (sharenum, bucket,
229                               str(base32.b2a(share_hash_tree[0][:8])[:12], "ascii"))
230        log.PrefixingLogMixin.__init__(self,
231                                       facility="tahoe.immutable.download",
232                                       prefix=prefix)
233        self.sharenum = sharenum
234        self.bucket = bucket
235        self.share_hash_tree = share_hash_tree
236        self.num_blocks = num_blocks
237        self.block_size = block_size
238        self.share_size = share_size
239        self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
240
241    def get_all_sharehashes(self):
242        """Retrieve and validate all the share-hash-tree nodes that are
243        included in this share, regardless of whether we need them to
244        validate the share or not. Each share contains a minimal Merkle tree
245        chain, but there is lots of overlap, so usually we'll be using hashes
246        from other shares and not reading every single hash from this share.
247        The Verifier uses this function to read and validate every single
248        hash from this share.
249
250        Call this (and wait for the Deferred it returns to fire) before
251        calling get_block() for the first time: this lets us check that the
252        share share contains enough hashes to validate its own data, and
253        avoids downloading any share hash twice.
254
255        I return a Deferred which errbacks upon failure, probably with
256        BadOrMissingHash."""
257
258        d = self.bucket.get_share_hashes()
259        def _got_share_hashes(sh):
260            sharehashes = dict(sh)
261            try:
262                self.share_hash_tree.set_hashes(sharehashes)
263            except IndexError as le:
264                raise BadOrMissingHash(le)
265            except (hashtree.BadHashError, hashtree.NotEnoughHashesError) as le:
266                raise BadOrMissingHash(le)
267        d.addCallback(_got_share_hashes)
268        return d
269
270    def get_all_blockhashes(self):
271        """Retrieve and validate all the block-hash-tree nodes that are
272        included in this share. Each share contains a full Merkle tree, but
273        we usually only fetch the minimal subset necessary for any particular
274        block. This function fetches everything at once. The Verifier uses
275        this function to validate the block hash tree.
276
277        Call this (and wait for the Deferred it returns to fire) after
278        calling get_all_sharehashes() and before calling get_block() for the
279        first time: this lets us check that the share contains all block
280        hashes and avoids downloading them multiple times.
281
282        I return a Deferred which errbacks upon failure, probably with
283        BadOrMissingHash.
284        """
285
286        # get_block_hashes(anything) currently always returns everything
287        needed = list(range(len(self.block_hash_tree)))
288        d = self.bucket.get_block_hashes(needed)
289        def _got_block_hashes(blockhashes):
290            if len(blockhashes) < len(self.block_hash_tree):
291                raise BadOrMissingHash()
292            bh = dict(enumerate(blockhashes))
293
294            try:
295                self.block_hash_tree.set_hashes(bh)
296            except IndexError as le:
297                raise BadOrMissingHash(le)
298            except (hashtree.BadHashError, hashtree.NotEnoughHashesError) as le:
299                raise BadOrMissingHash(le)
300        d.addCallback(_got_block_hashes)
301        return d
302
303    def get_all_crypttext_hashes(self, crypttext_hash_tree):
304        """Retrieve and validate all the crypttext-hash-tree nodes that are
305        in this share. Normally we don't look at these at all: the download
306        process fetches them incrementally as needed to validate each segment
307        of ciphertext. But this is a convenient place to give the Verifier a
308        function to validate all of these at once.
309
310        Call this with a new hashtree object for each share, initialized with
311        the crypttext hash tree root. I return a Deferred which errbacks upon
312        failure, probably with BadOrMissingHash.
313        """
314
315        # get_crypttext_hashes() always returns everything
316        d = self.bucket.get_crypttext_hashes()
317        def _got_crypttext_hashes(hashes):
318            if len(hashes) < len(crypttext_hash_tree):
319                raise BadOrMissingHash()
320            ct_hashes = dict(enumerate(hashes))
321            try:
322                crypttext_hash_tree.set_hashes(ct_hashes)
323            except IndexError as le:
324                raise BadOrMissingHash(le)
325            except (hashtree.BadHashError, hashtree.NotEnoughHashesError) as le:
326                raise BadOrMissingHash(le)
327        d.addCallback(_got_crypttext_hashes)
328        return d
329
330    def get_block(self, blocknum):
331        # the first time we use this bucket, we need to fetch enough elements
332        # of the share hash tree to validate it from our share hash up to the
333        # hashroot.
334        if self.share_hash_tree.needed_hashes(self.sharenum):
335            d1 = self.bucket.get_share_hashes()
336        else:
337            d1 = defer.succeed([])
338
339        # We might need to grab some elements of our block hash tree, to
340        # validate the requested block up to the share hash.
341        blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
342        # We don't need the root of the block hash tree, as that comes in the
343        # share tree.
344        blockhashesneeded.discard(0)
345        d2 = self.bucket.get_block_hashes(blockhashesneeded)
346
347        if blocknum < self.num_blocks-1:
348            thisblocksize = self.block_size
349        else:
350            thisblocksize = self.share_size % self.block_size
351            if thisblocksize == 0:
352                thisblocksize = self.block_size
353        d3 = self.bucket.get_block_data(blocknum,
354                                        self.block_size, thisblocksize)
355
356        dl = deferredutil.gatherResults([d1, d2, d3])
357        dl.addCallback(self._got_data, blocknum)
358        return dl
359
360    def _got_data(self, results, blocknum):
361        precondition(blocknum < self.num_blocks,
362                     self, blocknum, self.num_blocks)
363        sharehashes, blockhashes, blockdata = results
364        try:
365            sharehashes = dict(sharehashes)
366        except ValueError as le:
367            le.args = tuple(le.args + (sharehashes,))
368            raise
369        blockhashes = dict(enumerate(blockhashes))
370
371        candidate_share_hash = None # in case we log it in the except block below
372        blockhash = None # in case we log it in the except block below
373
374        try:
375            if self.share_hash_tree.needed_hashes(self.sharenum):
376                # This will raise exception if the values being passed do not
377                # match the root node of self.share_hash_tree.
378                try:
379                    self.share_hash_tree.set_hashes(sharehashes)
380                except IndexError as le:
381                    # Weird -- sharehashes contained index numbers outside of
382                    # the range that fit into this hash tree.
383                    raise BadOrMissingHash(le)
384
385            # To validate a block we need the root of the block hash tree,
386            # which is also one of the leafs of the share hash tree, and is
387            # called "the share hash".
388            if not self.block_hash_tree[0]: # empty -- no root node yet
389                # Get the share hash from the share hash tree.
390                share_hash = self.share_hash_tree.get_leaf(self.sharenum)
391                if not share_hash:
392                    # No root node in block_hash_tree and also the share hash
393                    # wasn't sent by the server.
394                    raise hashtree.NotEnoughHashesError
395                self.block_hash_tree.set_hashes({0: share_hash})
396
397            if self.block_hash_tree.needed_hashes(blocknum):
398                self.block_hash_tree.set_hashes(blockhashes)
399
400            blockhash = block_hash(blockdata)
401            self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
402            #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
403            #        "%r .. %r: %s" %
404            #        (self.sharenum, blocknum, len(blockdata),
405            #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
406
407        except (hashtree.BadHashError, hashtree.NotEnoughHashesError) as le:
408            # log.WEIRD: indicates undetected disk/network error, or more
409            # likely a programming error
410            self.log("hash failure in block=%d, shnum=%d on %s" %
411                    (blocknum, self.sharenum, self.bucket))
412            if self.block_hash_tree.needed_hashes(blocknum):
413                self.log(""" failure occurred when checking the block_hash_tree.
414                This suggests that either the block data was bad, or that the
415                block hashes we received along with it were bad.""")
416            else:
417                self.log(""" the failure probably occurred when checking the
418                share_hash_tree, which suggests that the share hashes we
419                received from the remote peer were bad.""")
420            self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
421            self.log(" block length: %d" % len(blockdata))
422            self.log(" block hash: %r" % base32.b2a_or_none(blockhash))
423            if len(blockdata) < 100:
424                self.log(" block data: %r" % (blockdata,))
425            else:
426                self.log(" block data start/end: %r .. %r" %
427                        (blockdata[:50], blockdata[-50:]))
428            self.log(" share hash tree:\n" + self.share_hash_tree.dump())
429            self.log(" block hash tree:\n" + self.block_hash_tree.dump())
430            lines = []
431            for i,h in sorted(sharehashes.items()):
432                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
433            self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
434            lines = []
435            for i,h in list(blockhashes.items()):
436                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
437            log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
438            raise BadOrMissingHash(le)
439
440        # If we made it here, the block is good. If the hash trees didn't
441        # like what they saw, they would have raised a BadHashError, causing
442        # our caller to see a Failure and thus ignore this block (as well as
443        # dropping this bucket).
444        return blockdata
445
446
447class Checker(log.PrefixingLogMixin):
448    """I query all servers to see if M uniquely-numbered shares are
449    available.
450
451    If the verify flag was passed to my constructor, then for each share I
452    download every data block and all metadata from each server and perform a
453    cryptographic integrity check on all of it. If not, I just ask each
454    server 'Which shares do you have?' and believe its answer.
455
456    In either case, I wait until I have gotten responses from all servers.
457    This fact -- that I wait -- means that an ill-behaved server which fails
458    to answer my questions will make me wait indefinitely. If it is
459    ill-behaved in a way that triggers the underlying foolscap timeouts, then
460    I will wait only as long as those foolscap timeouts, but if it is
461    ill-behaved in a way which placates the foolscap timeouts but still
462    doesn't answer my question then I will wait indefinitely.
463
464    Before I send any new request to a server, I always ask the 'monitor'
465    object that was passed into my constructor whether this task has been
466    cancelled (by invoking its raise_if_cancelled() method).
467    """
468    def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
469                 monitor):
470        assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
471
472        prefix = str(base32.b2a(verifycap.get_storage_index()[:8])[:12], "utf-8")
473        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
474
475        self._verifycap = verifycap
476
477        self._monitor = monitor
478        self._servers = servers
479        self._verify = verify # bool: verify what the servers claim, or not?
480        self._add_lease = add_lease
481
482        frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
483                                       self._verifycap.get_storage_index())
484        self.file_renewal_secret = frs
485        fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
486                                      self._verifycap.get_storage_index())
487        self.file_cancel_secret = fcs
488
489    def _get_renewal_secret(self, seed):
490        return bucket_renewal_secret_hash(self.file_renewal_secret, seed)
491    def _get_cancel_secret(self, seed):
492        return bucket_cancel_secret_hash(self.file_cancel_secret, seed)
493
494    def _get_buckets(self, s, storageindex):
495        """Return a deferred that eventually fires with ({sharenum: bucket},
496        serverid, success). In case the server is disconnected or returns a
497        Failure then it fires with ({}, serverid, False) (A server
498        disconnecting or returning a Failure when we ask it for buckets is
499        the same, for our purposes, as a server that says it has none, except
500        that we want to track and report whether or not each server
501        responded.)"""
502
503        storage_server = s.get_storage_server()
504        lease_seed = s.get_lease_seed()
505        if self._add_lease:
506            renew_secret = self._get_renewal_secret(lease_seed)
507            cancel_secret = self._get_cancel_secret(lease_seed)
508            d2 = storage_server.add_lease(
509                storageindex,
510                renew_secret,
511                cancel_secret,
512            )
513            d2.addErrback(self._add_lease_failed, s.get_name(), storageindex)
514
515        d = storage_server.get_buckets(storageindex)
516        def _wrap_results(res):
517            return (res, True)
518
519        def _trap_errs(f):
520            level = log.WEIRD
521            if f.check(DeadReferenceError):
522                level = log.UNUSUAL
523            self.log("failure from server on 'get_buckets' the REMOTE failure was:",
524                     facility="tahoe.immutable.checker",
525                     failure=f, level=level, umid="AX7wZQ")
526            return ({}, False)
527
528        d.addCallbacks(_wrap_results, _trap_errs)
529        return d
530
531    def _add_lease_failed(self, f, server_name, storage_index):
532        # Older versions of Tahoe didn't handle the add-lease message very
533        # well: <=1.1.0 throws a NameError because it doesn't implement
534        # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
535        # (which is most of them, since we send add-lease to everybody,
536        # before we know whether or not they have any shares for us), and
537        # 1.2.0 throws KeyError even on known buckets due to an internal bug
538        # in the latency-measuring code.
539
540        # we want to ignore the known-harmless errors and log the others. In
541        # particular we want to log any local errors caused by coding
542        # problems.
543
544        if f.check(DeadReferenceError):
545            return
546        if f.check(RemoteException):
547            if f.value.failure.check(KeyError, IndexError, NameError):
548                # this may ignore a bit too much, but that only hurts us
549                # during debugging
550                return
551            self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
552                     name=server_name,
553                     f_value=str(f.value),
554                     failure=f,
555                     level=log.WEIRD, umid="atbAxw")
556            return
557        # local errors are cause for alarm
558        log.err(f,
559                format="local error in add_lease to [%(name)s]: %(f_value)s",
560                name=server_name,
561                f_value=str(f.value),
562                level=log.WEIRD, umid="hEGuQg")
563
564
565    def _download_and_verify(self, server, sharenum, bucket):
566        """Start an attempt to download and verify every block in this bucket
567        and return a deferred that will eventually fire once the attempt
568        completes.
569
570        If you download and verify every block then fire with (True,
571        sharenum, None), else if the share data couldn't be parsed because it
572        was of an unknown version number fire with (False, sharenum,
573        'incompatible'), else if any of the blocks were invalid, fire with
574        (False, sharenum, 'corrupt'), else if the server disconnected (False,
575        sharenum, 'disconnect'), else if the server returned a Failure during
576        the process fire with (False, sharenum, 'failure').
577
578        If there is an internal error such as an uncaught exception in this
579        code, then the deferred will errback, but if there is a remote error
580        such as the server failing or the returned data being incorrect then
581        it will not errback -- it will fire normally with the indicated
582        results."""
583
584        vcap = self._verifycap
585        b = layout.ReadBucketProxy(bucket, server, vcap.get_storage_index())
586        veup = ValidatedExtendedURIProxy(b, vcap)
587        d = veup.start()
588
589        def _got_ueb(vup):
590            share_hash_tree = IncompleteHashTree(vcap.total_shares)
591            share_hash_tree.set_hashes({0: vup.share_root_hash})
592
593            vrbp = ValidatedReadBucketProxy(sharenum, b,
594                                            share_hash_tree,
595                                            vup.num_segments,
596                                            vup.block_size,
597                                            vup.share_size)
598
599            # note: normal download doesn't use get_all_sharehashes(),
600            # because it gets more data than necessary. We've discussed the
601            # security properties of having verification and download look
602            # identical (so the server couldn't, say, provide good responses
603            # for one and not the other), but I think that full verification
604            # is more important than defending against inconsistent server
605            # behavior. Besides, they can't pass the verifier without storing
606            # all the data, so there's not so much to be gained by behaving
607            # inconsistently.
608            d = vrbp.get_all_sharehashes()
609            # we fill share_hash_tree before fetching any blocks, so the
610            # block fetches won't send redundant share-hash-tree requests, to
611            # speed things up. Then we fetch+validate all the blockhashes.
612            d.addCallback(lambda ign: vrbp.get_all_blockhashes())
613
614            cht = IncompleteHashTree(vup.num_segments)
615            cht.set_hashes({0: vup.crypttext_root_hash})
616            d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht))
617
618            d.addCallback(lambda ign: vrbp)
619            return d
620        d.addCallback(_got_ueb)
621
622        def _discard_result(r):
623            assert isinstance(r, bytes), r
624            # to free up the RAM
625            return None
626
627        def _get_blocks(vrbp):
628            def _get_block(ign, blocknum):
629                db = vrbp.get_block(blocknum)
630                db.addCallback(_discard_result)
631                return db
632
633            dbs = defer.succeed(None)
634            for blocknum in range(veup.num_segments):
635                dbs.addCallback(_get_block, blocknum)
636
637            # The Deferred we return will fire after every block of this
638            # share has been downloaded and verified successfully, or else it
639            # will errback as soon as the first error is observed.
640            return dbs
641
642        d.addCallback(_get_blocks)
643
644        # if none of those errbacked, the blocks (and the hashes above them)
645        # are good
646        def _all_good(ign):
647            return (True, sharenum, None)
648        d.addCallback(_all_good)
649
650        # but if anything fails, we'll land here
651        def _errb(f):
652            # We didn't succeed at fetching and verifying all the blocks of
653            # this share. Handle each reason for failure differently.
654
655            if f.check(DeadReferenceError):
656                return (False, sharenum, 'disconnect')
657            elif f.check(RemoteException):
658                return (False, sharenum, 'failure')
659            elif f.check(layout.ShareVersionIncompatible):
660                return (False, sharenum, 'incompatible')
661            elif f.check(layout.LayoutInvalid,
662                         layout.RidiculouslyLargeURIExtensionBlock,
663                         BadOrMissingHash,
664                         BadURIExtensionHashValue):
665                return (False, sharenum, 'corrupt')
666
667            # if it wasn't one of those reasons, re-raise the error
668            return f
669        d.addErrback(_errb)
670
671        return d
672
673    def _verify_server_shares(self, s):
674        """ Return a deferred which eventually fires with a tuple of
675        (set(sharenum), server, set(corruptsharenum),
676        set(incompatiblesharenum), success) showing all the shares verified
677        to be served by this server, and all the corrupt shares served by the
678        server, and all the incompatible shares served by the server. In case
679        the server is disconnected or returns a Failure then it fires with
680        the last element False.
681
682        A server disconnecting or returning a failure when we ask it for
683        shares is the same, for our purposes, as a server that says it has
684        none or offers invalid ones, except that we want to track and report
685        the server's behavior. Similarly, the presence of corrupt shares is
686        mainly of use for diagnostics -- you can typically treat it as just
687        like being no share at all by just observing its absence from the
688        verified shares dict and ignoring its presence in the corrupt shares
689        dict.
690
691        The 'success' argument means whether the server responded to *any*
692        queries during this process, so if it responded to some queries and
693        then disconnected and ceased responding, or returned a failure, it is
694        still marked with the True flag for 'success'.
695        """
696        d = self._get_buckets(s, self._verifycap.get_storage_index())
697
698        def _got_buckets(result):
699            bucketdict, success = result
700
701            shareverds = []
702            for (sharenum, bucket) in list(bucketdict.items()):
703                d = self._download_and_verify(s, sharenum, bucket)
704                shareverds.append(d)
705
706            dl = deferredutil.gatherResults(shareverds)
707
708            def collect(results):
709                verified = set()
710                corrupt = set()
711                incompatible = set()
712                for succ, sharenum, whynot in results:
713                    if succ:
714                        verified.add(sharenum)
715                    else:
716                        if whynot == 'corrupt':
717                            corrupt.add(sharenum)
718                        elif whynot == 'incompatible':
719                            incompatible.add(sharenum)
720                return (verified, s, corrupt, incompatible, success)
721
722            dl.addCallback(collect)
723            return dl
724
725        def _err(f):
726            f.trap(RemoteException, DeadReferenceError)
727            return (set(), s, set(), set(), False)
728
729        d.addCallbacks(_got_buckets, _err)
730        return d
731
732    def _check_server_shares(self, s):
733        """Return a deferred which eventually fires with a tuple of
734        (set(sharenum), server, set(corrupt), set(incompatible),
735        responded) showing all the shares claimed to be served by this
736        server. In case the server is disconnected then it fires with
737        (set(), server, set(), set(), False) (a server disconnecting
738        when we ask it for buckets is the same, for our purposes, as a
739        server that says it has none, except that we want to track and
740        report whether or not each server responded.)
741
742        see also _verify_server_shares()
743        """
744        def _curry_empty_corrupted(res):
745            buckets, responded = res
746            return (set(buckets), s, set(), set(), responded)
747        d = self._get_buckets(s, self._verifycap.get_storage_index())
748        d.addCallback(_curry_empty_corrupted)
749        return d
750
751    def _format_results(self, results):
752        SI = self._verifycap.get_storage_index()
753
754        verifiedshares = dictutil.DictOfSets() # {sharenum: set(server)}
755        servers = {} # {server: set(sharenums)}
756        corruptshare_locators = [] # (server, storageindex, sharenum)
757        incompatibleshare_locators = [] # (server, storageindex, sharenum)
758        servers_responding = set() # server
759
760        for verified, server, corrupt, incompatible, responded in results:
761            servers.setdefault(server, set()).update(verified)
762            for sharenum in verified:
763                verifiedshares.setdefault(sharenum, set()).add(server)
764            for sharenum in corrupt:
765                corruptshare_locators.append((server, SI, sharenum))
766            for sharenum in incompatible:
767                incompatibleshare_locators.append((server, SI, sharenum))
768            if responded:
769                servers_responding.add(server)
770
771        good_share_hosts = len([s for s in servers.keys() if servers[s]])
772
773        assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
774        if len(verifiedshares) == self._verifycap.total_shares:
775            healthy = True
776            summary = "Healthy"
777        else:
778            healthy = False
779            summary = ("Not Healthy: %d shares (enc %d-of-%d)" %
780                       (len(verifiedshares),
781                        self._verifycap.needed_shares,
782                        self._verifycap.total_shares))
783        if len(verifiedshares) >= self._verifycap.needed_shares:
784            recoverable = 1
785            unrecoverable = 0
786        else:
787            recoverable = 0
788            unrecoverable = 1
789
790        count_happiness = servers_of_happiness(verifiedshares)
791
792        cr = CheckResults(self._verifycap, SI,
793                          healthy=healthy, recoverable=bool(recoverable),
794                          count_happiness=count_happiness,
795                          count_shares_needed=self._verifycap.needed_shares,
796                          count_shares_expected=self._verifycap.total_shares,
797                          count_shares_good=len(verifiedshares),
798                          count_good_share_hosts=good_share_hosts,
799                          count_recoverable_versions=recoverable,
800                          count_unrecoverable_versions=unrecoverable,
801                          servers_responding=list(servers_responding),
802                          sharemap=verifiedshares,
803                          count_wrong_shares=0, # no such thing, for immutable
804                          list_corrupt_shares=corruptshare_locators,
805                          count_corrupt_shares=len(corruptshare_locators),
806                          list_incompatible_shares=incompatibleshare_locators,
807                          count_incompatible_shares=len(incompatibleshare_locators),
808                          summary=summary,
809                          report=[],
810                          share_problems=[],
811                          servermap=None)
812
813        return cr
814
815    def start(self):
816        ds = []
817        if self._verify:
818            for s in self._servers:
819                ds.append(self._verify_server_shares(s))
820        else:
821            for s in self._servers:
822                ds.append(self._check_server_shares(s))
823
824        return deferredutil.gatherResults(ds).addCallback(self._format_results)
Note: See TracBrowser for help on using the repository browser.