source: trunk/src/allmydata/immutable/downloader/share.py

Last change on this file was fec97256, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2025-01-06T21:51:37Z

trim Python2 syntax

  • Property mode set to 100644
File size: 42.9 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import struct
6import time
7now = time.time
8
9from twisted.python.failure import Failure
10from foolscap.api import eventually
11from allmydata.util import base32, log, hashutil, mathutil
12from allmydata.util.spans import Spans, DataSpans
13from allmydata.interfaces import HASH_SIZE
14from allmydata.hashtree import IncompleteHashTree, BadHashError, \
15     NotEnoughHashesError
16
17from allmydata.immutable.layout import make_write_bucket_proxy
18from allmydata.util.observer import EventStreamObserver
19from .common import COMPLETE, CORRUPT, DEAD, BADSEGNUM
20
21
22class LayoutInvalid(Exception):
23    pass
24
25
26class DataUnavailable(Exception):
27    pass
28
29
30class Share:
31    """I represent a single instance of a single share (e.g. I reference the
32    shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
33    I am associated with a CommonShare that remembers data that is held in
34    common among e.g. SI=abcde/shnum2 across all servers. I am also
35    associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all
36    servers).
37    """
38    # this is a specific implementation of IShare for tahoe's native storage
39    # servers. A different backend would use a different class.
40
41    def __init__(self, rref, server, verifycap, commonshare, node,
42                 download_status, shnum, dyhb_rtt, logparent):
43        self._rref = rref
44        self._server = server
45        self._node = node # holds share_hash_tree and UEB
46        self.actual_segment_size = node.segment_size # might still be None
47        # XXX change node.guessed_segment_size to
48        # node.best_guess_segment_size(), which should give us the real ones
49        # if known, else its guess.
50        self._guess_offsets(verifycap, node.guessed_segment_size)
51        self.actual_offsets = None
52        self._UEB_length = None
53        self._commonshare = commonshare # holds block_hash_tree
54        self._download_status = download_status
55        self._storage_index = verifycap.storage_index
56        self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
57        self._shnum = shnum
58        self._dyhb_rtt = dyhb_rtt
59        # self._alive becomes False upon fatal corruption or server error
60        self._alive = True
61        self._loop_scheduled = False
62        self._lp = log.msg(format="%(share)s created", share=repr(self),
63                           level=log.NOISY, parent=logparent, umid="P7hv2w")
64
65        self._pending = Spans() # request sent but no response received yet
66        self._received = DataSpans() # ACK response received, with data
67        self._unavailable = Spans() # NAK response received, no data
68
69        # any given byte of the share can be in one of four states:
70        #  in: _wanted, _requested, _received
71        #      FALSE    FALSE       FALSE : don't care about it at all
72        #      TRUE     FALSE       FALSE : want it, haven't yet asked for it
73        #      TRUE     TRUE        FALSE : request is in-flight
74        #                                   or didn't get it
75        #      FALSE    TRUE        TRUE  : got it, haven't used it yet
76        #      FALSE    TRUE        FALSE : got it and used it
77        #      FALSE    FALSE       FALSE : block consumed, ready to ask again
78        #
79        # when we request data and get a NAK, we leave it in _requested
80        # to remind ourself to not ask for it again. We don't explicitly
81        # remove it from anything (maybe this should change).
82        #
83        # We retain the hashtrees in the Node, so we leave those spans in
84        # _requested (and never ask for them again, as long as the Node is
85        # alive). But we don't retain data blocks (too big), so when we
86        # consume a data block, we remove it from _requested, so a later
87        # download can re-fetch it.
88
89        self._requested_blocks = [] # (segnum, set(observer2..))
90        v = server.get_version()
91        ver = v[b"http://allmydata.org/tahoe/protocols/storage/v1"]
92        self._overrun_ok = ver[b"tolerates-immutable-read-overrun"]
93        # If _overrun_ok and we guess the offsets correctly, we can get
94        # everything in one RTT. If _overrun_ok and we guess wrong, we might
95        # need two RTT (but we could get lucky and do it in one). If overrun
96        # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version,
97        # 2=offset table, 3=UEB_length and everything else (hashes, block),
98        # 4=UEB.
99
100        self.had_corruption = False # for unit tests
101
102    def __repr__(self):
103        return "Share(sh%d-on-%s)" % (self._shnum, str(self._server.get_name(), "utf-8"))
104
105    def is_alive(self):
106        # XXX: reconsider. If the share sees a single error, should it remain
107        # dead for all time? Or should the next segment try again? This DEAD
108        # state is stored elsewhere too (SegmentFetcher per-share states?)
109        # and needs to be consistent. We clear _alive in self._fail(), which
110        # is called upon a network error, or layout failure, or hash failure
111        # in the UEB or a hash tree. We do not _fail() for a hash failure in
112        # a block, but of course we still tell our callers about
113        # state=CORRUPT so they'll find a different share.
114        return self._alive
115
116    def _guess_offsets(self, verifycap, guessed_segment_size):
117        self.guessed_segment_size = guessed_segment_size
118        size = verifycap.size
119        k = verifycap.needed_shares
120        N = verifycap.total_shares
121        r = self._node._calculate_sizes(guessed_segment_size)
122        # num_segments, block_size/tail_block_size
123        # guessed_segment_size/tail_segment_size/tail_segment_padded
124        share_size = mathutil.div_ceil(size, k)
125        # share_size is the amount of block data that will be put into each
126        # share, summed over all segments. It does not include hashes, the
127        # UEB, or other overhead.
128
129        # use the upload-side code to get this as accurate as possible
130        ht = IncompleteHashTree(N)
131        num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
132        wbp = make_write_bucket_proxy(None, None, share_size, r["block_size"],
133                                      r["num_segments"], num_share_hashes, 0)
134        self._fieldsize = wbp.fieldsize
135        self._fieldstruct = wbp.fieldstruct
136        self.guessed_offsets = wbp._offsets
137
138    # called by our client, the SegmentFetcher
139    def get_block(self, segnum):
140        """Add a block number to the list of requests. This will eventually
141        result in a fetch of the data necessary to validate the block, then
142        the block itself. The fetch order is generally
143        first-come-first-served, but requests may be answered out-of-order if
144        data becomes available sooner.
145
146        I return an EventStreamObserver, which has two uses. The first is to
147        call o.subscribe(), which gives me a place to send state changes and
148        eventually the data block. The second is o.cancel(), which removes
149        the request (if it is still active).
150
151        I will distribute the following events through my EventStreamObserver:
152         - state=OVERDUE: ?? I believe I should have had an answer by now.
153                          You may want to ask another share instead.
154         - state=BADSEGNUM: the segnum you asked for is too large. I must
155                            fetch a valid UEB before I can determine this,
156                            so the notification is asynchronous
157         - state=COMPLETE, block=data: here is a valid block
158         - state=CORRUPT: this share contains corrupted data
159         - state=DEAD, f=Failure: the server reported an error, this share
160                                  is unusable
161        """
162        log.msg("%s.get_block(%d)" % (repr(self), segnum),
163                level=log.NOISY, parent=self._lp, umid="RTo9MQ")
164        assert segnum >= 0
165        o = EventStreamObserver()
166        o.set_canceler(self, "_cancel_block_request")
167        for i,(segnum0,observers) in enumerate(self._requested_blocks):
168            if segnum0 == segnum:
169                observers.add(o)
170                break
171        else:
172            self._requested_blocks.append( (segnum, set([o])) )
173        self.schedule_loop()
174        return o
175
176    def _cancel_block_request(self, o):
177        new_requests = []
178        for e in self._requested_blocks:
179            (segnum0, observers) = e
180            observers.discard(o)
181            if observers:
182                new_requests.append(e)
183        self._requested_blocks = new_requests
184
185    # internal methods
186    def _active_segnum_and_observers(self):
187        if self._requested_blocks:
188            # we only retrieve information for one segment at a time, to
189            # minimize alacrity (first come, first served)
190            return self._requested_blocks[0]
191        return None, []
192
193    def schedule_loop(self):
194        if self._loop_scheduled:
195            return
196        self._loop_scheduled = True
197        eventually(self.loop)
198
199    def loop(self):
200        self._loop_scheduled = False
201        if not self._alive:
202            return
203        try:
204            # if any exceptions occur here, kill the download
205            log.msg("%s.loop, reqs=[%s], pending=%s, received=%s,"
206                    " unavailable=%s" %
207                    (repr(self),
208                     ",".join([str(req[0]) for req in self._requested_blocks]),
209                     self._pending.dump(), self._received.dump(),
210                     self._unavailable.dump() ),
211                    level=log.NOISY, parent=self._lp, umid="BaL1zw")
212            self._do_loop()
213            # all exception cases call self._fail(), which clears self._alive
214        except (BadHashError, NotEnoughHashesError, LayoutInvalid) as e:
215            # Abandon this share. We do this if we see corruption in the
216            # offset table, the UEB, or a hash tree. We don't abandon the
217            # whole share if we see corruption in a data block (we abandon
218            # just the one block, and still try to get data from other blocks
219            # on the same server). In theory, we could get good data from a
220            # share with a corrupt UEB (by first getting the UEB from some
221            # other share), or corrupt hash trees, but the logic to decide
222            # when this is safe is non-trivial. So for now, give up at the
223            # first sign of corruption.
224            #
225            # _satisfy_*() code which detects corruption should first call
226            # self._signal_corruption(), and then raise the exception.
227            log.msg(format="corruption detected in %(share)s",
228                    share=repr(self),
229                    level=log.UNUSUAL, parent=self._lp, umid="gWspVw")
230            self._fail(Failure(e), log.UNUSUAL)
231        except DataUnavailable as e:
232            # Abandon this share.
233            log.msg(format="need data that will never be available"
234                    " from %s: pending=%s, received=%s, unavailable=%s" %
235                    (repr(self),
236                     self._pending.dump(), self._received.dump(),
237                     self._unavailable.dump() ),
238                    level=log.UNUSUAL, parent=self._lp, umid="F7yJnQ")
239            self._fail(Failure(e), log.UNUSUAL)
240        except BaseException:
241            self._fail(Failure())
242            raise
243        log.msg("%s.loop done, reqs=[%s], pending=%s, received=%s,"
244                " unavailable=%s" %
245                (repr(self),
246                 ",".join([str(req[0]) for req in self._requested_blocks]),
247                 self._pending.dump(), self._received.dump(),
248                 self._unavailable.dump() ),
249                level=log.NOISY, parent=self._lp, umid="9lRaRA")
250
251    def _do_loop(self):
252        # we are (eventually) called after all state transitions:
253        #  new segments added to self._requested_blocks
254        #  new data received from servers (responses to our read() calls)
255        #  impatience timer fires (server appears slow)
256
257        # First, consume all of the information that we currently have, for
258        # all the segments people currently want.
259        start = now()
260        while self._get_satisfaction():
261            pass
262        self._download_status.add_misc_event("satisfy", start, now())
263
264        # When we get no satisfaction (from the data we've received so far),
265        # we determine what data we desire (to satisfy more requests). The
266        # number of segments is finite, so I can't get no satisfaction
267        # forever.
268        start = now()
269        wanted, needed = self._desire()
270        self._download_status.add_misc_event("desire", start, now())
271
272        # Finally, send out requests for whatever we need (desire minus
273        # have). You can't always get what you want, but if you try
274        # sometimes, you just might find, you get what you need.
275        self._send_requests(wanted + needed)
276
277        # and sometimes you can't even get what you need
278        start = now()
279        disappointment = needed & self._unavailable
280        if disappointment.len():
281            self.had_corruption = True
282            raise DataUnavailable("need %s but will never get it" %
283                                  disappointment.dump())
284        self._download_status.add_misc_event("checkdis", start, now())
285
286    def _get_satisfaction(self):
287        # return True if we retired a data block, and should therefore be
288        # called again. Return False if we don't retire a data block (even if
289        # we do retire some other data, like hash chains).
290
291        if self.actual_offsets is None:
292            if not self._satisfy_offsets():
293                # can't even look at anything without the offset table
294                return False
295
296        if not self._node.have_UEB:
297            if not self._satisfy_UEB():
298                # can't check any hashes without the UEB
299                return False
300            # the call to _satisfy_UEB() will immediately set the
301            # authoritative num_segments in all our CommonShares. If we
302            # guessed wrong, we might stil be working on a bogus segnum
303            # (beyond the real range). We catch this and signal BADSEGNUM
304            # before invoking any further code that touches hashtrees.
305        self.actual_segment_size = self._node.segment_size # might be updated
306        assert self.actual_segment_size is not None
307
308        # knowing the UEB means knowing num_segments
309        assert self._node.num_segments is not None
310
311        segnum, observers = self._active_segnum_and_observers()
312        # if segnum is None, we don't really need to do anything (we have no
313        # outstanding readers right now), but we'll fill in the bits that
314        # aren't tied to any particular segment.
315
316        if segnum is not None and segnum >= self._node.num_segments:
317            for o in observers:
318                o.notify(state=BADSEGNUM)
319            self._requested_blocks.pop(0)
320            return True
321
322        if self._node.share_hash_tree.needed_hashes(self._shnum):
323            if not self._satisfy_share_hash_tree():
324                # can't check block_hash_tree without a root
325                return False
326
327        if self._commonshare.need_block_hash_root():
328            block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
329            self._commonshare.set_block_hash_root(block_hash_root)
330
331        if segnum is None:
332            return False # we don't want any particular segment right now
333
334        # block_hash_tree
335        needed_hashes = self._commonshare.get_needed_block_hashes(segnum)
336        if needed_hashes:
337            if not self._satisfy_block_hash_tree(needed_hashes):
338                # can't check block without block_hash_tree
339                return False
340
341        # ciphertext_hash_tree
342        needed_hashes = self._node.get_needed_ciphertext_hashes(segnum)
343        if needed_hashes:
344            if not self._satisfy_ciphertext_hash_tree(needed_hashes):
345                # can't check decoded blocks without ciphertext_hash_tree
346                return False
347
348        # data blocks
349        return self._satisfy_data_block(segnum, observers)
350
351    def _satisfy_offsets(self):
352        version_s = self._received.get(0, 4)
353        if version_s is None:
354            return False
355        (version,) = struct.unpack(">L", version_s)
356        if version == 1:
357            table_start = 0x0c
358            self._fieldsize = 0x4
359            self._fieldstruct = "L"
360        elif version == 2:
361            table_start = 0x14
362            self._fieldsize = 0x8
363            self._fieldstruct = "Q"
364        else:
365            self.had_corruption = True
366            raise LayoutInvalid("unknown version %d (I understand 1 and 2)"
367                                % version)
368        offset_table_size = 6 * self._fieldsize
369        table_s = self._received.pop(table_start, offset_table_size)
370        if table_s is None:
371            return False
372        fields = struct.unpack(">"+6*self._fieldstruct, table_s)
373        offsets = {}
374        for i,field in enumerate(['data',
375                                  'plaintext_hash_tree', # UNUSED
376                                  'crypttext_hash_tree',
377                                  'block_hashes',
378                                  'share_hashes',
379                                  'uri_extension',
380                                  ] ):
381            offsets[field] = fields[i]
382        self.actual_offsets = offsets
383        log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields),
384                level=log.NOISY, parent=self._lp, umid="jedQcw")
385        self._received.remove(0, 4) # don't need this anymore
386
387        # validate the offsets a bit
388        share_hashes_size = offsets["uri_extension"] - offsets["share_hashes"]
389        if share_hashes_size < 0 or share_hashes_size % (2+HASH_SIZE) != 0:
390            # the share hash chain is stored as (hashnum,hash) pairs
391            self.had_corruption = True
392            raise LayoutInvalid("share hashes malformed -- should be a"
393                                " multiple of %d bytes -- not %d" %
394                                (2+HASH_SIZE, share_hashes_size))
395        block_hashes_size = offsets["share_hashes"] - offsets["block_hashes"]
396        if block_hashes_size < 0 or block_hashes_size % (HASH_SIZE) != 0:
397            # the block hash tree is stored as a list of hashes
398            self.had_corruption = True
399            raise LayoutInvalid("block hashes malformed -- should be a"
400                                " multiple of %d bytes -- not %d" %
401                                (HASH_SIZE, block_hashes_size))
402        # we only look at 'crypttext_hash_tree' if the UEB says we're
403        # actually using it. Same with 'plaintext_hash_tree'. This gives us
404        # some wiggle room: a place to stash data for later extensions.
405
406        return True
407
408    def _satisfy_UEB(self):
409        o = self.actual_offsets
410        fsize = self._fieldsize
411        UEB_length_s = self._received.get(o["uri_extension"], fsize)
412        if not UEB_length_s:
413            return False
414        (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
415        UEB_s = self._received.pop(o["uri_extension"]+fsize, UEB_length)
416        if not UEB_s:
417            return False
418        self._received.remove(o["uri_extension"], fsize)
419        try:
420            self._node.validate_and_store_UEB(UEB_s)
421            return True
422        except (LayoutInvalid, BadHashError) as e:
423            # TODO: if this UEB was bad, we'll keep trying to validate it
424            # over and over again. Only log.err on the first one, or better
425            # yet skip all but the first
426            f = Failure(e)
427            self._signal_corruption(f, o["uri_extension"], fsize+UEB_length)
428            self.had_corruption = True
429            raise
430
431    def _satisfy_share_hash_tree(self):
432        # the share hash chain is stored as (hashnum,hash) tuples, so you
433        # can't fetch just the pieces you need, because you don't know
434        # exactly where they are. So fetch everything, and parse the results
435        # later.
436        o = self.actual_offsets
437        hashlen = o["uri_extension"] - o["share_hashes"]
438        assert hashlen % (2+HASH_SIZE) == 0
439        hashdata = self._received.get(o["share_hashes"], hashlen)
440        if not hashdata:
441            return False
442        share_hashes = {}
443        for i in range(0, hashlen, 2+HASH_SIZE):
444            (hashnum,) = struct.unpack(">H", hashdata[i:i+2])
445            hashvalue = hashdata[i+2:i+2+HASH_SIZE]
446            share_hashes[hashnum] = hashvalue
447        # TODO: if they give us an empty set of hashes,
448        # process_share_hashes() won't fail. We must ensure that this
449        # situation doesn't allow unverified shares through. Manual testing
450        # shows that set_block_hash_root() throws an assert because an
451        # internal node is None instead of an actual hash, but we want
452        # something better. It's probably best to add a method to
453        # IncompleteHashTree which takes a leaf number and raises an
454        # exception unless that leaf is present and fully validated.
455        try:
456            self._node.process_share_hashes(share_hashes)
457            # adds to self._node.share_hash_tree
458        except (BadHashError, NotEnoughHashesError) as e:
459            f = Failure(e)
460            self._signal_corruption(f, o["share_hashes"], hashlen)
461            self.had_corruption = True
462            raise
463        self._received.remove(o["share_hashes"], hashlen)
464        return True
465
466    def _signal_corruption(self, f, start, offset):
467        # there was corruption somewhere in the given range
468        reason = "corruption in share[%d-%d): %s" % (start, start+offset,
469                                                     str(f.value))
470        return self._rref.callRemote(
471            "advise_corrupt_share", reason.encode("utf-8")
472        ).addErrback(log.err, "Error from remote call to advise_corrupt_share")
473
474    def _satisfy_block_hash_tree(self, needed_hashes):
475        o_bh = self.actual_offsets["block_hashes"]
476        block_hashes = {}
477        for hashnum in needed_hashes:
478            hashdata = self._received.get(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
479            if hashdata:
480                block_hashes[hashnum] = hashdata
481            else:
482                return False # missing some hashes
483        # note that we don't submit any hashes to the block_hash_tree until
484        # we've gotten them all, because the hash tree will throw an
485        # exception if we only give it a partial set (which it therefore
486        # cannot validate)
487        try:
488            self._commonshare.process_block_hashes(block_hashes)
489        except (BadHashError, NotEnoughHashesError) as e:
490            f = Failure(e)
491            hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())])
492            log.msg(format="hash failure in block_hashes=(%(hashnums)s),"
493                    " from %(share)s",
494                    hashnums=hashnums, shnum=self._shnum, share=repr(self),
495                    failure=f, level=log.WEIRD, parent=self._lp, umid="yNyFdA")
496            hsize = max(0, max(needed_hashes)) * HASH_SIZE
497            self._signal_corruption(f, o_bh, hsize)
498            self.had_corruption = True
499            raise
500        for hashnum in needed_hashes:
501            self._received.remove(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
502        return True
503
504    def _satisfy_ciphertext_hash_tree(self, needed_hashes):
505        start = self.actual_offsets["crypttext_hash_tree"]
506        hashes = {}
507        for hashnum in needed_hashes:
508            hashdata = self._received.get(start+hashnum*HASH_SIZE, HASH_SIZE)
509            if hashdata:
510                hashes[hashnum] = hashdata
511            else:
512                return False # missing some hashes
513        # we don't submit any hashes to the ciphertext_hash_tree until we've
514        # gotten them all
515        try:
516            self._node.process_ciphertext_hashes(hashes)
517        except (BadHashError, NotEnoughHashesError) as e:
518            f = Failure(e)
519            hashnums = ",".join([str(n) for n in sorted(hashes.keys())])
520            log.msg(format="hash failure in ciphertext_hashes=(%(hashnums)s),"
521                    " from %(share)s",
522                    hashnums=hashnums, share=repr(self), failure=f,
523                    level=log.WEIRD, parent=self._lp, umid="iZI0TA")
524            hsize = max(0, max(needed_hashes))*HASH_SIZE
525            self._signal_corruption(f, start, hsize)
526            self.had_corruption = True
527            raise
528        for hashnum in needed_hashes:
529            self._received.remove(start+hashnum*HASH_SIZE, HASH_SIZE)
530        return True
531
532    def _satisfy_data_block(self, segnum, observers):
533        tail = (segnum == self._node.num_segments-1)
534        datastart = self.actual_offsets["data"]
535        blockstart = datastart + segnum * self._node.block_size
536        blocklen = self._node.block_size
537        if tail:
538            blocklen = self._node.tail_block_size
539
540        block = self._received.pop(blockstart, blocklen)
541        if not block:
542            log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
543                                                              blockstart, blocklen),
544                    level=log.NOISY, parent=self._lp, umid="aK0RFw")
545            return False
546        log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
547                share=repr(self), start=blockstart, length=blocklen,
548                level=log.NOISY, parent=self._lp, umid="uTDNZg")
549        # this block is being retired, either as COMPLETE or CORRUPT, since
550        # no further data reads will help
551        assert self._requested_blocks[0][0] == segnum
552        try:
553            self._commonshare.check_block(segnum, block)
554            # hurrah, we have a valid block. Deliver it.
555            for o in observers:
556                # goes to SegmentFetcher._block_request_activity
557                o.notify(state=COMPLETE, block=block)
558            # now clear our received data, to dodge the #1170 spans.py
559            # complexity bug
560            self._received = DataSpans()
561        except (BadHashError, NotEnoughHashesError) as e:
562            # rats, we have a corrupt block. Notify our clients that they
563            # need to look elsewhere, and advise the server. Unlike
564            # corruption in other parts of the share, this doesn't cause us
565            # to abandon the whole share.
566            f = Failure(e)
567            log.msg(format="hash failure in block %(segnum)d, from %(share)s",
568                    segnum=segnum, share=repr(self), failure=f,
569                    level=log.WEIRD, parent=self._lp, umid="mZjkqA")
570            for o in observers:
571                o.notify(state=CORRUPT)
572            self._signal_corruption(f, blockstart, blocklen)
573            self.had_corruption = True
574        # in either case, we've retired this block
575        self._requested_blocks.pop(0)
576        # popping the request keeps us from turning around and wanting the
577        # block again right away
578        return True # got satisfaction
579
580    def _desire(self):
581        segnum, observers = self._active_segnum_and_observers() # maybe None
582
583        # 'want_it' is for data we merely want: we know that we don't really
584        # need it. This includes speculative reads, like the first 1KB of the
585        # share (for the offset table) and the first 2KB of the UEB.
586        #
587        # 'need_it' is for data that, if we have the real offset table, we'll
588        # need. If we are only guessing at the offset table, it's merely
589        # wanted. (The share is abandoned if we can't get data that we really
590        # need).
591        #
592        # 'gotta_gotta_have_it' is for data that we absolutely need,
593        # independent of whether we're still guessing about the offset table:
594        # the version number and the offset table itself.
595        #
596        # Mr. Popeil, I'm in trouble, need your assistance on the double. Aww..
597
598        desire = Spans(), Spans(), Spans()
599        (want_it, need_it, gotta_gotta_have_it) = desire
600
601        self.actual_segment_size = self._node.segment_size # might be updated
602        o = self.actual_offsets or self.guessed_offsets
603        segsize = self.actual_segment_size or self.guessed_segment_size
604        r = self._node._calculate_sizes(segsize)
605
606        if not self.actual_offsets:
607            # all _desire functions add bits to the three desire[] spans
608            self._desire_offsets(desire)
609
610        # we can use guessed offsets as long as this server tolerates
611        # overrun. Otherwise, we must wait for the offsets to arrive before
612        # we try to read anything else.
613        if self.actual_offsets or self._overrun_ok:
614            if not self._node.have_UEB:
615                self._desire_UEB(desire, o)
616            self._desire_share_hashes(desire, o)
617            if segnum is not None:
618                # They might be asking for a segment number that is beyond
619                # what we guess the file contains, but _desire_block_hashes
620                # and _desire_data will tolerate that.
621                self._desire_block_hashes(desire, o, segnum)
622                self._desire_data(desire, o, r, segnum, segsize)
623
624        log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
625                % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()),
626                level=log.NOISY, parent=self._lp, umid="IG7CgA")
627        if self.actual_offsets:
628            return (want_it, need_it+gotta_gotta_have_it)
629        else:
630            return (want_it+need_it, gotta_gotta_have_it)
631
632    def _desire_offsets(self, desire):
633        (want_it, need_it, gotta_gotta_have_it) = desire
634        if self._overrun_ok:
635            # easy! this includes version number, sizes, and offsets
636            want_it.add(0, 1024)
637            return
638
639        # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44).
640        # To be conservative, only request the data that we know lives there,
641        # even if that means more roundtrips.
642
643        gotta_gotta_have_it.add(0, 4)  # version number, always safe
644        version_s = self._received.get(0, 4)
645        if not version_s:
646            return
647        (version,) = struct.unpack(">L", version_s)
648        # The code in _satisfy_offsets will have checked this version
649        # already. There is no code path to get this far with version>2.
650        assert 1 <= version <= 2, "can't get here, version=%d" % version
651        if version == 1:
652            table_start = 0x0c
653            fieldsize = 0x4
654        elif version == 2:
655            table_start = 0x14
656            fieldsize = 0x8
657        offset_table_size = 6 * fieldsize
658        gotta_gotta_have_it.add(table_start, offset_table_size)
659
660    def _desire_UEB(self, desire, o):
661        (want_it, need_it, gotta_gotta_have_it) = desire
662
663        # UEB data is stored as (length,data).
664        if self._overrun_ok:
665            # We can pre-fetch 2kb, which should probably cover it. If it
666            # turns out to be larger, we'll come back here later with a known
667            # length and fetch the rest.
668            want_it.add(o["uri_extension"], 2048)
669            # now, while that is probably enough to fetch the whole UEB, it
670            # might not be, so we need to do the next few steps as well. In
671            # most cases, the following steps will not actually add anything
672            # to need_it
673
674        need_it.add(o["uri_extension"], self._fieldsize)
675        # only use a length if we're sure it's correct, otherwise we'll
676        # probably fetch a huge number
677        if not self.actual_offsets:
678            return
679        UEB_length_s = self._received.get(o["uri_extension"], self._fieldsize)
680        if UEB_length_s:
681            (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
682            # we know the length, so make sure we grab everything
683            need_it.add(o["uri_extension"]+self._fieldsize, UEB_length)
684
685    def _desire_share_hashes(self, desire, o):
686        (want_it, need_it, gotta_gotta_have_it) = desire
687
688        if self._node.share_hash_tree.needed_hashes(self._shnum):
689            hashlen = o["uri_extension"] - o["share_hashes"]
690            need_it.add(o["share_hashes"], hashlen)
691
692    def _desire_block_hashes(self, desire, o, segnum):
693        (want_it, need_it, gotta_gotta_have_it) = desire
694
695        # block hash chain
696        for hashnum in self._commonshare.get_desired_block_hashes(segnum):
697            need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
698
699        # ciphertext hash chain
700        for hashnum in self._node.get_desired_ciphertext_hashes(segnum):
701            need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
702
703    def _desire_data(self, desire, o, r, segnum, segsize):
704        if segnum > r["num_segments"]:
705            # they're asking for a segment that's beyond what we think is the
706            # end of the file. We won't get here if we've already learned the
707            # real UEB: _get_satisfaction() will notice the out-of-bounds and
708            # terminate the loop. So we must still be guessing, which means
709            # that they might be correct in asking for such a large segnum.
710            # But if they're right, then our segsize/segnum guess is
711            # certainly wrong, which means we don't know what data blocks to
712            # ask for yet. So don't bother adding anything. When the UEB
713            # comes back and we learn the correct segsize/segnums, we'll
714            # either reject the request or have enough information to proceed
715            # normally. This costs one roundtrip.
716            log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)"
717                    % (segnum, r["num_segments"]),
718                    level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
719            return
720        (want_it, need_it, gotta_gotta_have_it) = desire
721        tail = (segnum == r["num_segments"]-1)
722        datastart = o["data"]
723        blockstart = datastart + segnum * r["block_size"]
724        blocklen = r["block_size"]
725        if tail:
726            blocklen = r["tail_block_size"]
727        need_it.add(blockstart, blocklen)
728
729    def _send_requests(self, desired):
730        ask = desired - self._pending - self._received.get_spans()
731        log.msg("%s._send_requests, desired=%s, pending=%s, ask=%s" %
732                (repr(self), desired.dump(), self._pending.dump(), ask.dump()),
733                level=log.NOISY, parent=self._lp, umid="E94CVA")
734        # XXX At one time, this code distinguished between data blocks and
735        # hashes, and made sure to send (small) requests for hashes before
736        # sending (big) requests for blocks. The idea was to make sure that
737        # all hashes arrive before the blocks, so the blocks can be consumed
738        # and released in a single turn. I removed this for simplicity.
739        # Reconsider the removal: maybe bring it back.
740        ds = self._download_status
741
742        for (start, length) in ask:
743            # TODO: quantize to reasonably-large blocks
744            self._pending.add(start, length)
745            lp = log.msg(format="%(share)s._send_request"
746                         " [%(start)d:+%(length)d]",
747                         share=repr(self),
748                         start=start, length=length,
749                         level=log.NOISY, parent=self._lp, umid="sgVAyA")
750            block_ev = ds.add_block_request(self._server, self._shnum,
751                                            start, length, now())
752            d = self._send_request(start, length)
753            d.addCallback(self._got_data, start, length, block_ev, lp)
754            d.addErrback(self._got_error, start, length, block_ev, lp)
755            d.addCallback(self._trigger_loop)
756            d.addErrback(lambda f:
757                         log.err(format="unhandled error during send_request",
758                                 failure=f, parent=self._lp,
759                                 level=log.WEIRD, umid="qZu0wg"))
760
761    def _send_request(self, start, length):
762        return self._rref.callRemote("read", start, length)
763
764    def _got_data(self, data, start, length, block_ev, lp):
765        block_ev.finished(len(data), now())
766        if not self._alive:
767            return
768        log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d",
769                share=repr(self), start=start, length=length, datalen=len(data),
770                level=log.NOISY, parent=lp, umid="5Qn6VQ")
771        self._pending.remove(start, length)
772        self._received.add(start, data)
773
774        # if we ask for [a:c], and we get back [a:b] (b<c), that means we're
775        # never going to get [b:c]. If we really need that data, this block
776        # will never complete. The easiest way to get into this situation is
777        # to hit a share with a corrupted offset table, or one that's somehow
778        # been truncated. On the other hand, when overrun_ok is true, we ask
779        # for data beyond the end of the share all the time (it saves some
780        # RTT when we don't know the length of the share ahead of time). So
781        # not every asked-for-but-not-received byte is fatal.
782        if len(data) < length:
783            self._unavailable.add(start+len(data), length-len(data))
784
785        # XXX if table corruption causes our sections to overlap, then one
786        # consumer (i.e. block hash tree) will pop/remove the data that
787        # another consumer (i.e. block data) mistakenly thinks it needs. It
788        # won't ask for that data again, because the span is in
789        # self._requested. But that span won't be in self._unavailable
790        # because we got it back from the server. TODO: handle this properly
791        # (raise DataUnavailable). Then add sanity-checking
792        # no-overlaps-allowed tests to the offset-table unpacking code to
793        # catch this earlier. XXX
794
795        # accumulate a wanted/needed span (not as self._x, but passed into
796        # desire* functions). manage a pending/in-flight list. when the
797        # requests are sent out, empty/discard the wanted/needed span and
798        # populate/augment the pending list. when the responses come back,
799        # augment either received+data or unavailable.
800
801        # if a corrupt offset table results in double-usage, we'll send
802        # double requests.
803
804        # the wanted/needed span is only "wanted" for the first pass. Once
805        # the offset table arrives, it's all "needed".
806
807    def _got_error(self, f, start, length, block_ev, lp):
808        block_ev.error(now())
809        log.msg(format="error requesting %(start)d+%(length)d"
810                " from %(server)s for si %(si)s",
811                start=start, length=length,
812                server=self._server.get_name(), si=self._si_prefix,
813                failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw")
814        # retire our observers, assuming we won't be able to make any
815        # further progress
816        self._fail(f, log.UNUSUAL)
817
818    def _trigger_loop(self, res):
819        if self._alive:
820            self.schedule_loop()
821        return res
822
823    def _fail(self, f, level=log.WEIRD):
824        log.msg(format="abandoning %(share)s",
825                share=repr(self), failure=f,
826                level=level, parent=self._lp, umid="JKM2Og")
827        self._alive = False
828        for (segnum, observers) in self._requested_blocks:
829            for o in observers:
830                o.notify(state=DEAD, f=f)
831
832
833class CommonShare:
834    # TODO: defer creation of the hashtree until somebody uses us. There will
835    # be a lot of unused shares, and we shouldn't spend the memory on a large
836    # hashtree unless necessary.
837    """I hold data that is common across all instances of a single share,
838    like sh2 on both servers A and B. This is just the block hash tree.
839    """
840    def __init__(self, best_numsegs, si_prefix, shnum, logparent):
841        self.si_prefix = si_prefix
842        self.shnum = shnum
843
844        # in the beginning, before we have the real UEB, we can only guess at
845        # the number of segments. But we want to ask for block hashes early.
846        # So if we're asked for which block hashes are needed before we know
847        # numsegs for sure, we return a guess.
848        self._block_hash_tree = IncompleteHashTree(best_numsegs)
849        self._block_hash_tree_is_authoritative = False
850        self._block_hash_tree_leaves = best_numsegs
851        self._logparent = logparent
852
853    def __repr__(self):
854        return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum)
855
856    def set_authoritative_num_segments(self, numsegs):
857        if self._block_hash_tree_leaves != numsegs:
858            self._block_hash_tree = IncompleteHashTree(numsegs)
859            self._block_hash_tree_leaves = numsegs
860        self._block_hash_tree_is_authoritative = True
861
862    def need_block_hash_root(self):
863        return bool(not self._block_hash_tree[0])
864
865    def set_block_hash_root(self, roothash):
866        assert self._block_hash_tree_is_authoritative
867        self._block_hash_tree.set_hashes({0: roothash})
868
869    def get_desired_block_hashes(self, segnum):
870        if segnum < self._block_hash_tree_leaves:
871            return self._block_hash_tree.needed_hashes(segnum,
872                                                       include_leaf=True)
873
874        # the segnum might be out-of-bounds. Originally it was due to a race
875        # between the receipt of the UEB on one share (from which we learn
876        # the correct number of segments, update all hash trees to the right
877        # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery
878        # of a new Share to the SegmentFetcher while that BADSEGNUM was
879        # queued (which sends out requests to the stale segnum, now larger
880        # than the hash tree). I fixed that (by making SegmentFetcher.loop
881        # check for a bad segnum at the start of each pass, instead of using
882        # the queued BADSEGNUM or a flag it sets), but just in case this
883        # still happens, I'm leaving the < in place. If it gets hit, there's
884        # a potential lost-progress problem, but I'm pretty sure that it will
885        # get cleared up on the following turn.
886        return []
887
888    def get_needed_block_hashes(self, segnum):
889        assert self._block_hash_tree_is_authoritative
890        # XXX: include_leaf=True needs thought: how did the old downloader do
891        # it? I think it grabbed *all* block hashes and set them all at once.
892        # Since we want to fetch less data, we either need to fetch the leaf
893        # too, or wait to set the block hashes until we've also received the
894        # block itself, so we can hash it too, and set the chain+leaf all at
895        # the same time.
896        return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
897
898    def process_block_hashes(self, block_hashes):
899        assert self._block_hash_tree_is_authoritative
900        # this may raise BadHashError or NotEnoughHashesError
901        self._block_hash_tree.set_hashes(block_hashes)
902
903    def check_block(self, segnum, block):
904        assert self._block_hash_tree_is_authoritative
905        h = hashutil.block_hash(block)
906        # this may raise BadHashError or NotEnoughHashesError
907        self._block_hash_tree.set_hashes(leaves={segnum: h})
908
909# TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an
910# auxilliary OVERDUE callback. Just make sure to get all the messages in the
911# right order and on the right turns.
912
913# TODO: we're asking for too much data. We probably don't need
914# include_leaf=True in the block hash tree or ciphertext hash tree.
915
916# TODO: we ask for ciphertext hash tree nodes from all shares (whenever
917# _desire is called while we're missing those nodes), but we only consume it
918# from the first response, leaving the rest of the data sitting in _received.
919# This was ameliorated by clearing self._received after each block is
920# complete.
Note: See TracBrowser for help on using the repository browser.