1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | import time |
---|
6 | now = time.time |
---|
7 | from zope.interface import Interface |
---|
8 | from twisted.python.failure import Failure |
---|
9 | from twisted.internet import defer |
---|
10 | from foolscap.api import eventually |
---|
11 | from allmydata import uri |
---|
12 | from allmydata.codec import CRSDecoder |
---|
13 | from allmydata.util import base32, log, hashutil, mathutil, observer |
---|
14 | from allmydata.interfaces import DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE |
---|
15 | from allmydata.hashtree import IncompleteHashTree, BadHashError, \ |
---|
16 | NotEnoughHashesError |
---|
17 | |
---|
18 | # local imports |
---|
19 | from .finder import ShareFinder |
---|
20 | from .fetcher import SegmentFetcher |
---|
21 | from .segmentation import Segmentation |
---|
22 | from .common import BadCiphertextHashError |
---|
23 | |
---|
24 | class IDownloadStatusHandlingConsumer(Interface): |
---|
25 | def set_download_status_read_event(read_ev): |
---|
26 | """Record the DownloadStatus 'read event', to be updated with the |
---|
27 | time it takes to decrypt each chunk of data.""" |
---|
28 | |
---|
29 | class Cancel: |
---|
30 | def __init__(self, f): |
---|
31 | self._f = f |
---|
32 | self.active = True |
---|
33 | |
---|
34 | def cancel(self): |
---|
35 | if self.active: |
---|
36 | self.active = False |
---|
37 | self._f(self) |
---|
38 | |
---|
39 | |
---|
40 | class DownloadNode: |
---|
41 | """Internal class which manages downloads and holds state. External |
---|
42 | callers use CiphertextFileNode instead.""" |
---|
43 | |
---|
44 | default_max_segment_size = DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE |
---|
45 | |
---|
46 | # Share._node points to me |
---|
47 | def __init__(self, verifycap, storage_broker, secret_holder, |
---|
48 | terminator, history, download_status): |
---|
49 | assert isinstance(verifycap, uri.CHKFileVerifierURI) |
---|
50 | self._verifycap = verifycap |
---|
51 | self._storage_broker = storage_broker |
---|
52 | self._si_prefix = base32.b2a(verifycap.storage_index[:8])[:12] |
---|
53 | self.running = True |
---|
54 | if terminator: |
---|
55 | terminator.register(self) # calls self.stop() at stopService() |
---|
56 | # the rules are: |
---|
57 | # 1: Only send network requests if you're active (self.running is True) |
---|
58 | # 2: Use TimerService, not reactor.callLater |
---|
59 | # 3: You can do eventual-sends any time. |
---|
60 | # These rules should mean that once |
---|
61 | # stopService()+flushEventualQueue() fires, everything will be done. |
---|
62 | self._secret_holder = secret_holder |
---|
63 | self._history = history |
---|
64 | self._download_status = download_status |
---|
65 | |
---|
66 | self.share_hash_tree = IncompleteHashTree(self._verifycap.total_shares) |
---|
67 | |
---|
68 | # we guess the segment size, so Segmentation can pull non-initial |
---|
69 | # segments in a single roundtrip. This populates |
---|
70 | # .guessed_segment_size, .guessed_num_segments, and |
---|
71 | # .ciphertext_hash_tree (with a dummy, to let us guess which hashes |
---|
72 | # we'll need) |
---|
73 | self._build_guessed_tables(self.default_max_segment_size) |
---|
74 | |
---|
75 | # filled in when we parse a valid UEB |
---|
76 | self.have_UEB = False |
---|
77 | self.segment_size = None |
---|
78 | self.tail_segment_size = None |
---|
79 | self.tail_segment_padded = None |
---|
80 | self.num_segments = None |
---|
81 | self.block_size = None |
---|
82 | self.tail_block_size = None |
---|
83 | |
---|
84 | # things to track callers that want data |
---|
85 | |
---|
86 | # _segment_requests can have duplicates |
---|
87 | self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp) |
---|
88 | self._active_segment = None # a SegmentFetcher, with .segnum |
---|
89 | |
---|
90 | self._segsize_observers = observer.OneShotObserverList() |
---|
91 | |
---|
92 | # we create one top-level logparent for this _Node, and another one |
---|
93 | # for each read() call. Segmentation and get_segment() messages are |
---|
94 | # associated with the read() call, everything else is tied to the |
---|
95 | # _Node's log entry. |
---|
96 | lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:" |
---|
97 | " size=%(size)d," |
---|
98 | " guessed_segsize=%(guessed_segsize)d," |
---|
99 | " guessed_numsegs=%(guessed_numsegs)d", |
---|
100 | si=self._si_prefix, size=verifycap.size, |
---|
101 | guessed_segsize=self.guessed_segment_size, |
---|
102 | guessed_numsegs=self.guessed_num_segments, |
---|
103 | level=log.OPERATIONAL, umid="uJ0zAQ") |
---|
104 | self._lp = lp |
---|
105 | |
---|
106 | self._sharefinder = ShareFinder(storage_broker, verifycap, self, |
---|
107 | self._download_status, lp) |
---|
108 | self._shares = set() |
---|
109 | |
---|
110 | def _build_guessed_tables(self, max_segment_size): |
---|
111 | size = min(self._verifycap.size, max_segment_size) |
---|
112 | s = mathutil.next_multiple(size, self._verifycap.needed_shares) |
---|
113 | self.guessed_segment_size = s |
---|
114 | r = self._calculate_sizes(self.guessed_segment_size) |
---|
115 | self.guessed_num_segments = r["num_segments"] |
---|
116 | # as with CommonShare, our ciphertext_hash_tree is a stub until we |
---|
117 | # get the real num_segments |
---|
118 | self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments) |
---|
119 | self.ciphertext_hash_tree_leaves = self.guessed_num_segments |
---|
120 | |
---|
121 | def __repr__(self): |
---|
122 | return "ImmutableDownloadNode(%r)" % (self._si_prefix,) |
---|
123 | |
---|
124 | def stop(self): |
---|
125 | # called by the Terminator at shutdown, mostly for tests |
---|
126 | if self._active_segment: |
---|
127 | seg, self._active_segment = self._active_segment, None |
---|
128 | seg.stop() |
---|
129 | self._sharefinder.stop() |
---|
130 | |
---|
131 | # things called by outside callers, via CiphertextFileNode. get_segment() |
---|
132 | # may also be called by Segmentation. |
---|
133 | |
---|
134 | def read(self, consumer, offset, size): |
---|
135 | """I am the main entry point, from which FileNode.read() can get |
---|
136 | data. I feed the consumer with the desired range of ciphertext. I |
---|
137 | return a Deferred that fires (with the consumer) when the read is |
---|
138 | finished. |
---|
139 | |
---|
140 | Note that there is no notion of a 'file pointer': each call to read() |
---|
141 | uses an independent offset= value. |
---|
142 | """ |
---|
143 | # for concurrent operations: each gets its own Segmentation manager |
---|
144 | if size is None: |
---|
145 | size = self._verifycap.size |
---|
146 | # ignore overruns: clip size so offset+size does not go past EOF, and |
---|
147 | # so size is not negative (which indicates that offset >= EOF) |
---|
148 | size = max(0, min(size, self._verifycap.size-offset)) |
---|
149 | |
---|
150 | read_ev = self._download_status.add_read_event(offset, size, now()) |
---|
151 | if IDownloadStatusHandlingConsumer.providedBy(consumer): |
---|
152 | consumer.set_download_status_read_event(read_ev) |
---|
153 | consumer.set_download_status(self._download_status) |
---|
154 | |
---|
155 | lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)", |
---|
156 | si=base32.b2a(self._verifycap.storage_index)[:8], |
---|
157 | offset=offset, size=size, |
---|
158 | level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww") |
---|
159 | if self._history: |
---|
160 | sp = self._history.stats_provider |
---|
161 | sp.count("downloader.files_downloaded", 1) # really read() calls |
---|
162 | sp.count("downloader.bytes_downloaded", size) |
---|
163 | if size == 0: |
---|
164 | read_ev.finished(now()) |
---|
165 | # no data, so no producer, so no register/unregisterProducer |
---|
166 | return defer.succeed(consumer) |
---|
167 | |
---|
168 | # for concurrent operations, each read() gets its own Segmentation |
---|
169 | # manager |
---|
170 | s = Segmentation(self, offset, size, consumer, read_ev, lp) |
---|
171 | |
---|
172 | # this raises an interesting question: what segments to fetch? if |
---|
173 | # offset=0, always fetch the first segment, and then allow |
---|
174 | # Segmentation to be responsible for pulling the subsequent ones if |
---|
175 | # the first wasn't large enough. If offset>0, we're going to need an |
---|
176 | # extra roundtrip to get the UEB (and therefore the segment size) |
---|
177 | # before we can figure out which segment to get. TODO: allow the |
---|
178 | # offset-table-guessing code (which starts by guessing the segsize) |
---|
179 | # to assist the offset>0 process. |
---|
180 | d = s.start() |
---|
181 | def _done(res): |
---|
182 | read_ev.finished(now()) |
---|
183 | return res |
---|
184 | d.addBoth(_done) |
---|
185 | return d |
---|
186 | |
---|
187 | def get_segment(self, segnum, logparent=None): |
---|
188 | """Begin downloading a segment. I return a tuple (d, c): 'd' is a |
---|
189 | Deferred that fires with (offset,data) when the desired segment is |
---|
190 | available, and c is an object on which c.cancel() can be called to |
---|
191 | disavow interest in the segment (after which 'd' will never fire). |
---|
192 | |
---|
193 | You probably need to know the segment size before calling this, |
---|
194 | unless you want the first few bytes of the file. If you ask for a |
---|
195 | segment number which turns out to be too large, the Deferred will |
---|
196 | errback with BadSegmentNumberError. |
---|
197 | |
---|
198 | The Deferred fires with the offset of the first byte of the data |
---|
199 | segment, so that you can call get_segment() before knowing the |
---|
200 | segment size, and still know which data you received. |
---|
201 | |
---|
202 | The Deferred can also errback with other fatal problems, such as |
---|
203 | NotEnoughSharesError, NoSharesError, or BadCiphertextHashError. |
---|
204 | """ |
---|
205 | lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)", |
---|
206 | si=base32.b2a(self._verifycap.storage_index)[:8], |
---|
207 | segnum=segnum, |
---|
208 | level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") |
---|
209 | seg_ev = self._download_status.add_segment_request(segnum, now()) |
---|
210 | d = defer.Deferred() |
---|
211 | c = Cancel(self._cancel_request) |
---|
212 | self._segment_requests.append( (segnum, d, c, seg_ev, lp) ) |
---|
213 | self._start_new_segment() |
---|
214 | return (d, c) |
---|
215 | |
---|
216 | def get_segsize(self): |
---|
217 | """Return a Deferred that fires when we know the real segment size.""" |
---|
218 | if self.segment_size: |
---|
219 | return defer.succeed(self.segment_size) |
---|
220 | # TODO: this downloads (and discards) the first segment of the file. |
---|
221 | # We could make this more efficient by writing |
---|
222 | # fetcher.SegmentSizeFetcher, with the job of finding a single valid |
---|
223 | # share and extracting the UEB. We'd add Share.get_UEB() to request |
---|
224 | # just the UEB. |
---|
225 | (d,c) = self.get_segment(0) |
---|
226 | # this ensures that an error during get_segment() will errback the |
---|
227 | # caller, so Repair won't wait forever on completely missing files |
---|
228 | d.addCallback(lambda ign: self._segsize_observers.when_fired()) |
---|
229 | return d |
---|
230 | |
---|
231 | # things called by the Segmentation object used to transform |
---|
232 | # arbitrary-sized read() calls into quantized segment fetches |
---|
233 | |
---|
234 | def _start_new_segment(self): |
---|
235 | if self._active_segment is None and self._segment_requests: |
---|
236 | (segnum, d, c, seg_ev, lp) = self._segment_requests[0] |
---|
237 | k = self._verifycap.needed_shares |
---|
238 | log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d", |
---|
239 | node=repr(self), segnum=segnum, |
---|
240 | level=log.NOISY, parent=lp, umid="wAlnHQ") |
---|
241 | self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp) |
---|
242 | seg_ev.activate(now()) |
---|
243 | active_shares = [s for s in self._shares if s.is_alive()] |
---|
244 | fetcher.add_shares(active_shares) # this triggers the loop |
---|
245 | |
---|
246 | |
---|
247 | # called by our child ShareFinder |
---|
248 | def got_shares(self, shares): |
---|
249 | self._shares.update(shares) |
---|
250 | if self._active_segment: |
---|
251 | self._active_segment.add_shares(shares) |
---|
252 | def no_more_shares(self): |
---|
253 | self._no_more_shares = True |
---|
254 | if self._active_segment: |
---|
255 | self._active_segment.no_more_shares() |
---|
256 | |
---|
257 | # things called by our Share instances |
---|
258 | |
---|
259 | def validate_and_store_UEB(self, UEB_s): |
---|
260 | log.msg("validate_and_store_UEB", |
---|
261 | level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw") |
---|
262 | h = hashutil.uri_extension_hash(UEB_s) |
---|
263 | if h != self._verifycap.uri_extension_hash: |
---|
264 | raise BadHashError |
---|
265 | self._parse_and_store_UEB(UEB_s) # sets self._stuff |
---|
266 | # TODO: a malformed (but authentic) UEB could throw an assertion in |
---|
267 | # _parse_and_store_UEB, and we should abandon the download. |
---|
268 | self.have_UEB = True |
---|
269 | |
---|
270 | # inform the ShareFinder about our correct number of segments. This |
---|
271 | # will update the block-hash-trees in all existing CommonShare |
---|
272 | # instances, and will populate new ones with the correct value. |
---|
273 | self._sharefinder.update_num_segments() |
---|
274 | |
---|
275 | def _parse_and_store_UEB(self, UEB_s): |
---|
276 | # Note: the UEB contains needed_shares and total_shares. These are |
---|
277 | # redundant and inferior (the filecap contains the authoritative |
---|
278 | # values). However, because it is possible to encode the same file in |
---|
279 | # multiple ways, and the encoders might choose (poorly) to use the |
---|
280 | # same key for both (therefore getting the same SI), we might |
---|
281 | # encounter shares for both types. The UEB hashes will be different, |
---|
282 | # however, and we'll disregard the "other" encoding's shares as |
---|
283 | # corrupted. |
---|
284 | |
---|
285 | # therefore, we ignore d['total_shares'] and d['needed_shares']. |
---|
286 | |
---|
287 | d = uri.unpack_extension(UEB_s) |
---|
288 | |
---|
289 | log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s", |
---|
290 | ueb=repr(uri.unpack_extension_readable(UEB_s)), |
---|
291 | vcap=self._verifycap.to_string(), |
---|
292 | level=log.NOISY, parent=self._lp, umid="cVqZnA") |
---|
293 | |
---|
294 | k, N = self._verifycap.needed_shares, self._verifycap.total_shares |
---|
295 | |
---|
296 | self.segment_size = d['segment_size'] |
---|
297 | self._segsize_observers.fire(self.segment_size) |
---|
298 | |
---|
299 | r = self._calculate_sizes(self.segment_size) |
---|
300 | self.tail_segment_size = r["tail_segment_size"] |
---|
301 | self.tail_segment_padded = r["tail_segment_padded"] |
---|
302 | self.num_segments = r["num_segments"] |
---|
303 | self.block_size = r["block_size"] |
---|
304 | self.tail_block_size = r["tail_block_size"] |
---|
305 | log.msg("actual sizes: %s" % (r,), |
---|
306 | level=log.NOISY, parent=self._lp, umid="PY6P5Q") |
---|
307 | if (self.segment_size == self.guessed_segment_size |
---|
308 | and self.num_segments == self.guessed_num_segments): |
---|
309 | log.msg("my guess was right!", |
---|
310 | level=log.NOISY, parent=self._lp, umid="x340Ow") |
---|
311 | else: |
---|
312 | log.msg("my guess was wrong! Extra round trips for me.", |
---|
313 | level=log.NOISY, parent=self._lp, umid="tb7RJw") |
---|
314 | |
---|
315 | # zfec.Decode() instantiation is fast, but still, let's use the same |
---|
316 | # codec instance for all but the last segment. 3-of-10 takes 15us on |
---|
317 | # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is |
---|
318 | # 2.5ms, worst-case 254-of-255 is 9.3ms |
---|
319 | self._codec = CRSDecoder() |
---|
320 | self._codec.set_params(self.segment_size, k, N) |
---|
321 | |
---|
322 | |
---|
323 | # Ciphertext hash tree root is mandatory, so that there is at most |
---|
324 | # one ciphertext that matches this read-cap or verify-cap. The |
---|
325 | # integrity check on the shares is not sufficient to prevent the |
---|
326 | # original encoder from creating some shares of file A and other |
---|
327 | # shares of file B. self.ciphertext_hash_tree was a guess before: |
---|
328 | # this is where we create it for real. |
---|
329 | self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) |
---|
330 | self.ciphertext_hash_tree_leaves = self.num_segments |
---|
331 | self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) |
---|
332 | |
---|
333 | self.share_hash_tree.set_hashes({0: d['share_root_hash']}) |
---|
334 | |
---|
335 | # Our job is a fast download, not verification, so we ignore any |
---|
336 | # redundant fields. The Verifier uses a different code path which |
---|
337 | # does not ignore them. |
---|
338 | |
---|
339 | def _calculate_sizes(self, segment_size): |
---|
340 | # segments of ciphertext |
---|
341 | size = self._verifycap.size |
---|
342 | k = self._verifycap.needed_shares |
---|
343 | |
---|
344 | # this assert matches the one in encode.py:127 inside |
---|
345 | # Encoded._got_all_encoding_parameters, where the UEB is constructed |
---|
346 | assert segment_size % k == 0 |
---|
347 | |
---|
348 | # the last segment is usually short. We don't store a whole segsize, |
---|
349 | # but we do pad the segment up to a multiple of k, because the |
---|
350 | # encoder requires that. |
---|
351 | tail_segment_size = size % segment_size |
---|
352 | if tail_segment_size == 0: |
---|
353 | tail_segment_size = segment_size |
---|
354 | padded = mathutil.next_multiple(tail_segment_size, k) |
---|
355 | tail_segment_padded = padded |
---|
356 | |
---|
357 | num_segments = mathutil.div_ceil(size, segment_size) |
---|
358 | |
---|
359 | # each segment is turned into N blocks. All but the last are of size |
---|
360 | # block_size, and the last is of size tail_block_size |
---|
361 | block_size = segment_size // k |
---|
362 | tail_block_size = tail_segment_padded // k |
---|
363 | |
---|
364 | return { "tail_segment_size": tail_segment_size, |
---|
365 | "tail_segment_padded": tail_segment_padded, |
---|
366 | "num_segments": num_segments, |
---|
367 | "block_size": block_size, |
---|
368 | "tail_block_size": tail_block_size |
---|
369 | } |
---|
370 | |
---|
371 | |
---|
372 | def process_share_hashes(self, share_hashes): |
---|
373 | for hashnum in share_hashes: |
---|
374 | if hashnum >= len(self.share_hash_tree): |
---|
375 | # "BadHashError" is normally for e.g. a corrupt block. We |
---|
376 | # sort of abuse it here to mean a badly numbered hash (which |
---|
377 | # indicates corruption in the number bytes, rather than in |
---|
378 | # the data bytes). |
---|
379 | raise BadHashError("hashnum %d doesn't fit in hashtree(%d)" |
---|
380 | % (hashnum, len(self.share_hash_tree))) |
---|
381 | self.share_hash_tree.set_hashes(share_hashes) |
---|
382 | |
---|
383 | def get_desired_ciphertext_hashes(self, segnum): |
---|
384 | if segnum < self.ciphertext_hash_tree_leaves: |
---|
385 | return self.ciphertext_hash_tree.needed_hashes(segnum, |
---|
386 | include_leaf=True) |
---|
387 | return [] |
---|
388 | def get_needed_ciphertext_hashes(self, segnum): |
---|
389 | cht = self.ciphertext_hash_tree |
---|
390 | return cht.needed_hashes(segnum, include_leaf=True) |
---|
391 | |
---|
392 | def process_ciphertext_hashes(self, hashes): |
---|
393 | assert self.num_segments is not None |
---|
394 | # this may raise BadHashError or NotEnoughHashesError |
---|
395 | self.ciphertext_hash_tree.set_hashes(hashes) |
---|
396 | |
---|
397 | |
---|
398 | # called by our child SegmentFetcher |
---|
399 | |
---|
400 | def want_more_shares(self): |
---|
401 | self._sharefinder.hungry() |
---|
402 | |
---|
403 | def fetch_failed(self, sf, f): |
---|
404 | assert sf is self._active_segment |
---|
405 | self._active_segment = None |
---|
406 | # deliver error upwards |
---|
407 | for (d,c,seg_ev) in self._extract_requests(sf.segnum): |
---|
408 | seg_ev.error(now()) |
---|
409 | eventually(self._deliver, d, c, f) |
---|
410 | self._start_new_segment() |
---|
411 | |
---|
412 | def process_blocks(self, segnum, blocks): |
---|
413 | start = now() |
---|
414 | d = self._decode_blocks(segnum, blocks) |
---|
415 | d.addCallback(self._check_ciphertext_hash, segnum) |
---|
416 | def _deliver(result): |
---|
417 | log.msg(format="delivering segment(%(segnum)d)", |
---|
418 | segnum=segnum, |
---|
419 | level=log.OPERATIONAL, parent=self._lp, |
---|
420 | umid="j60Ojg") |
---|
421 | when = now() |
---|
422 | if isinstance(result, Failure): |
---|
423 | # this catches failures in decode or ciphertext hash |
---|
424 | for (d,c,seg_ev) in self._extract_requests(segnum): |
---|
425 | seg_ev.error(when) |
---|
426 | eventually(self._deliver, d, c, result) |
---|
427 | else: |
---|
428 | (offset, segment, decodetime) = result |
---|
429 | self._active_segment = None |
---|
430 | for (d,c,seg_ev) in self._extract_requests(segnum): |
---|
431 | # when we have two requests for the same segment, the |
---|
432 | # second one will not be "activated" before the data is |
---|
433 | # delivered, so to allow the status-reporting code to see |
---|
434 | # consistent behavior, we activate them all now. The |
---|
435 | # SegmentEvent will ignore duplicate activate() calls. |
---|
436 | # Note that this will result in an inaccurate "receive |
---|
437 | # speed" for the second request. |
---|
438 | seg_ev.activate(when) |
---|
439 | seg_ev.deliver(when, offset, len(segment), decodetime) |
---|
440 | eventually(self._deliver, d, c, result) |
---|
441 | self._download_status.add_misc_event("process_block", start, now()) |
---|
442 | self._start_new_segment() |
---|
443 | d.addBoth(_deliver) |
---|
444 | d.addErrback(log.err, "unhandled error during process_blocks", |
---|
445 | level=log.WEIRD, parent=self._lp, umid="MkEsCg") |
---|
446 | |
---|
447 | def _decode_blocks(self, segnum, blocks): |
---|
448 | start = now() |
---|
449 | tail = (segnum == self.num_segments-1) |
---|
450 | codec = self._codec |
---|
451 | block_size = self.block_size |
---|
452 | decoded_size = self.segment_size |
---|
453 | if tail: |
---|
454 | # account for the padding in the last segment |
---|
455 | codec = CRSDecoder() |
---|
456 | k, N = self._verifycap.needed_shares, self._verifycap.total_shares |
---|
457 | codec.set_params(self.tail_segment_padded, k, N) |
---|
458 | block_size = self.tail_block_size |
---|
459 | decoded_size = self.tail_segment_padded |
---|
460 | |
---|
461 | shares = [] |
---|
462 | shareids = [] |
---|
463 | for (shareid, share) in blocks.items(): |
---|
464 | assert len(share) == block_size |
---|
465 | shareids.append(shareid) |
---|
466 | shares.append(share) |
---|
467 | del blocks |
---|
468 | |
---|
469 | d = codec.decode(shares, shareids) # segment |
---|
470 | del shares |
---|
471 | def _process(buffers): |
---|
472 | decodetime = now() - start |
---|
473 | segment = b"".join(buffers) |
---|
474 | assert len(segment) == decoded_size |
---|
475 | del buffers |
---|
476 | if tail: |
---|
477 | segment = segment[:self.tail_segment_size] |
---|
478 | self._download_status.add_misc_event("decode", start, now()) |
---|
479 | return (segment, decodetime) |
---|
480 | d.addCallback(_process) |
---|
481 | return d |
---|
482 | |
---|
483 | def _check_ciphertext_hash(self, segment_and_decodetime, segnum): |
---|
484 | (segment, decodetime) = segment_and_decodetime |
---|
485 | start = now() |
---|
486 | assert self._active_segment.segnum == segnum |
---|
487 | assert self.segment_size is not None |
---|
488 | offset = segnum * self.segment_size |
---|
489 | |
---|
490 | h = hashutil.crypttext_segment_hash(segment) |
---|
491 | try: |
---|
492 | self.ciphertext_hash_tree.set_hashes(leaves={segnum: h}) |
---|
493 | self._download_status.add_misc_event("CThash", start, now()) |
---|
494 | return (offset, segment, decodetime) |
---|
495 | except (BadHashError, NotEnoughHashesError): |
---|
496 | format = ("hash failure in ciphertext_hash_tree:" |
---|
497 | " segnum=%(segnum)d, SI=%(si)r") |
---|
498 | log.msg(format=format, segnum=segnum, si=self._si_prefix, |
---|
499 | failure=Failure(), |
---|
500 | level=log.WEIRD, parent=self._lp, umid="MTwNnw") |
---|
501 | # this is especially weird, because we made it past the share |
---|
502 | # hash tree. It implies that we're using the wrong encoding, or |
---|
503 | # that the uploader deliberately constructed a bad UEB. |
---|
504 | msg = format % {"segnum": segnum, "si": self._si_prefix} |
---|
505 | raise BadCiphertextHashError(msg) |
---|
506 | |
---|
507 | def _deliver(self, d, c, result): |
---|
508 | # this method exists to handle cancel() that occurs between |
---|
509 | # _got_segment and _deliver |
---|
510 | if c.active: |
---|
511 | c.active = False # it is now too late to cancel |
---|
512 | d.callback(result) # might actually be an errback |
---|
513 | |
---|
514 | def _extract_requests(self, segnum): |
---|
515 | """Remove matching requests and return their (d,c) tuples so that the |
---|
516 | caller can retire them.""" |
---|
517 | retire = [(d,c,seg_ev) |
---|
518 | for (segnum0,d,c,seg_ev,lp) in self._segment_requests |
---|
519 | if segnum0 == segnum] |
---|
520 | self._segment_requests = [t for t in self._segment_requests |
---|
521 | if t[0] != segnum] |
---|
522 | return retire |
---|
523 | |
---|
524 | def _cancel_request(self, cancel): |
---|
525 | self._segment_requests = [t for t in self._segment_requests |
---|
526 | if t[2] != cancel] |
---|
527 | segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests] |
---|
528 | |
---|
529 | # self._active_segment might be None in rare circumstances, so make |
---|
530 | # sure we tolerate it |
---|
531 | if self._active_segment and self._active_segment.segnum not in segnums: |
---|
532 | seg, self._active_segment = self._active_segment, None |
---|
533 | seg.stop() |
---|
534 | self._start_new_segment() |
---|
535 | |
---|
536 | # called by ShareFinder to choose hashtree sizes in CommonShares, and by |
---|
537 | # SegmentFetcher to tell if it is still fetching a valid segnum. |
---|
538 | def get_num_segments(self): |
---|
539 | # returns (best_num_segments, authoritative) |
---|
540 | if self.num_segments is None: |
---|
541 | return (self.guessed_num_segments, False) |
---|
542 | return (self.num_segments, True) |
---|