1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | import struct |
---|
6 | import time |
---|
7 | now = time.time |
---|
8 | |
---|
9 | from twisted.python.failure import Failure |
---|
10 | from foolscap.api import eventually |
---|
11 | from allmydata.util import base32, log, hashutil, mathutil |
---|
12 | from allmydata.util.spans import Spans, DataSpans |
---|
13 | from allmydata.interfaces import HASH_SIZE |
---|
14 | from allmydata.hashtree import IncompleteHashTree, BadHashError, \ |
---|
15 | NotEnoughHashesError |
---|
16 | |
---|
17 | from allmydata.immutable.layout import make_write_bucket_proxy |
---|
18 | from allmydata.util.observer import EventStreamObserver |
---|
19 | from .common import COMPLETE, CORRUPT, DEAD, BADSEGNUM |
---|
20 | |
---|
21 | |
---|
22 | class LayoutInvalid(Exception): |
---|
23 | pass |
---|
24 | |
---|
25 | |
---|
26 | class DataUnavailable(Exception): |
---|
27 | pass |
---|
28 | |
---|
29 | |
---|
30 | class Share: |
---|
31 | """I represent a single instance of a single share (e.g. I reference the |
---|
32 | shnum2 for share SI=abcde on server xy12t, not the one on server ab45q). |
---|
33 | I am associated with a CommonShare that remembers data that is held in |
---|
34 | common among e.g. SI=abcde/shnum2 across all servers. I am also |
---|
35 | associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all |
---|
36 | servers). |
---|
37 | """ |
---|
38 | # this is a specific implementation of IShare for tahoe's native storage |
---|
39 | # servers. A different backend would use a different class. |
---|
40 | |
---|
41 | def __init__(self, rref, server, verifycap, commonshare, node, |
---|
42 | download_status, shnum, dyhb_rtt, logparent): |
---|
43 | self._rref = rref |
---|
44 | self._server = server |
---|
45 | self._node = node # holds share_hash_tree and UEB |
---|
46 | self.actual_segment_size = node.segment_size # might still be None |
---|
47 | # XXX change node.guessed_segment_size to |
---|
48 | # node.best_guess_segment_size(), which should give us the real ones |
---|
49 | # if known, else its guess. |
---|
50 | self._guess_offsets(verifycap, node.guessed_segment_size) |
---|
51 | self.actual_offsets = None |
---|
52 | self._UEB_length = None |
---|
53 | self._commonshare = commonshare # holds block_hash_tree |
---|
54 | self._download_status = download_status |
---|
55 | self._storage_index = verifycap.storage_index |
---|
56 | self._si_prefix = base32.b2a(verifycap.storage_index)[:8] |
---|
57 | self._shnum = shnum |
---|
58 | self._dyhb_rtt = dyhb_rtt |
---|
59 | # self._alive becomes False upon fatal corruption or server error |
---|
60 | self._alive = True |
---|
61 | self._loop_scheduled = False |
---|
62 | self._lp = log.msg(format="%(share)s created", share=repr(self), |
---|
63 | level=log.NOISY, parent=logparent, umid="P7hv2w") |
---|
64 | |
---|
65 | self._pending = Spans() # request sent but no response received yet |
---|
66 | self._received = DataSpans() # ACK response received, with data |
---|
67 | self._unavailable = Spans() # NAK response received, no data |
---|
68 | |
---|
69 | # any given byte of the share can be in one of four states: |
---|
70 | # in: _wanted, _requested, _received |
---|
71 | # FALSE FALSE FALSE : don't care about it at all |
---|
72 | # TRUE FALSE FALSE : want it, haven't yet asked for it |
---|
73 | # TRUE TRUE FALSE : request is in-flight |
---|
74 | # or didn't get it |
---|
75 | # FALSE TRUE TRUE : got it, haven't used it yet |
---|
76 | # FALSE TRUE FALSE : got it and used it |
---|
77 | # FALSE FALSE FALSE : block consumed, ready to ask again |
---|
78 | # |
---|
79 | # when we request data and get a NAK, we leave it in _requested |
---|
80 | # to remind ourself to not ask for it again. We don't explicitly |
---|
81 | # remove it from anything (maybe this should change). |
---|
82 | # |
---|
83 | # We retain the hashtrees in the Node, so we leave those spans in |
---|
84 | # _requested (and never ask for them again, as long as the Node is |
---|
85 | # alive). But we don't retain data blocks (too big), so when we |
---|
86 | # consume a data block, we remove it from _requested, so a later |
---|
87 | # download can re-fetch it. |
---|
88 | |
---|
89 | self._requested_blocks = [] # (segnum, set(observer2..)) |
---|
90 | v = server.get_version() |
---|
91 | ver = v[b"http://allmydata.org/tahoe/protocols/storage/v1"] |
---|
92 | self._overrun_ok = ver[b"tolerates-immutable-read-overrun"] |
---|
93 | # If _overrun_ok and we guess the offsets correctly, we can get |
---|
94 | # everything in one RTT. If _overrun_ok and we guess wrong, we might |
---|
95 | # need two RTT (but we could get lucky and do it in one). If overrun |
---|
96 | # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version, |
---|
97 | # 2=offset table, 3=UEB_length and everything else (hashes, block), |
---|
98 | # 4=UEB. |
---|
99 | |
---|
100 | self.had_corruption = False # for unit tests |
---|
101 | |
---|
102 | def __repr__(self): |
---|
103 | return "Share(sh%d-on-%s)" % (self._shnum, str(self._server.get_name(), "utf-8")) |
---|
104 | |
---|
105 | def is_alive(self): |
---|
106 | # XXX: reconsider. If the share sees a single error, should it remain |
---|
107 | # dead for all time? Or should the next segment try again? This DEAD |
---|
108 | # state is stored elsewhere too (SegmentFetcher per-share states?) |
---|
109 | # and needs to be consistent. We clear _alive in self._fail(), which |
---|
110 | # is called upon a network error, or layout failure, or hash failure |
---|
111 | # in the UEB or a hash tree. We do not _fail() for a hash failure in |
---|
112 | # a block, but of course we still tell our callers about |
---|
113 | # state=CORRUPT so they'll find a different share. |
---|
114 | return self._alive |
---|
115 | |
---|
116 | def _guess_offsets(self, verifycap, guessed_segment_size): |
---|
117 | self.guessed_segment_size = guessed_segment_size |
---|
118 | size = verifycap.size |
---|
119 | k = verifycap.needed_shares |
---|
120 | N = verifycap.total_shares |
---|
121 | r = self._node._calculate_sizes(guessed_segment_size) |
---|
122 | # num_segments, block_size/tail_block_size |
---|
123 | # guessed_segment_size/tail_segment_size/tail_segment_padded |
---|
124 | share_size = mathutil.div_ceil(size, k) |
---|
125 | # share_size is the amount of block data that will be put into each |
---|
126 | # share, summed over all segments. It does not include hashes, the |
---|
127 | # UEB, or other overhead. |
---|
128 | |
---|
129 | # use the upload-side code to get this as accurate as possible |
---|
130 | ht = IncompleteHashTree(N) |
---|
131 | num_share_hashes = len(ht.needed_hashes(0, include_leaf=True)) |
---|
132 | wbp = make_write_bucket_proxy(None, None, share_size, r["block_size"], |
---|
133 | r["num_segments"], num_share_hashes, 0) |
---|
134 | self._fieldsize = wbp.fieldsize |
---|
135 | self._fieldstruct = wbp.fieldstruct |
---|
136 | self.guessed_offsets = wbp._offsets |
---|
137 | |
---|
138 | # called by our client, the SegmentFetcher |
---|
139 | def get_block(self, segnum): |
---|
140 | """Add a block number to the list of requests. This will eventually |
---|
141 | result in a fetch of the data necessary to validate the block, then |
---|
142 | the block itself. The fetch order is generally |
---|
143 | first-come-first-served, but requests may be answered out-of-order if |
---|
144 | data becomes available sooner. |
---|
145 | |
---|
146 | I return an EventStreamObserver, which has two uses. The first is to |
---|
147 | call o.subscribe(), which gives me a place to send state changes and |
---|
148 | eventually the data block. The second is o.cancel(), which removes |
---|
149 | the request (if it is still active). |
---|
150 | |
---|
151 | I will distribute the following events through my EventStreamObserver: |
---|
152 | - state=OVERDUE: ?? I believe I should have had an answer by now. |
---|
153 | You may want to ask another share instead. |
---|
154 | - state=BADSEGNUM: the segnum you asked for is too large. I must |
---|
155 | fetch a valid UEB before I can determine this, |
---|
156 | so the notification is asynchronous |
---|
157 | - state=COMPLETE, block=data: here is a valid block |
---|
158 | - state=CORRUPT: this share contains corrupted data |
---|
159 | - state=DEAD, f=Failure: the server reported an error, this share |
---|
160 | is unusable |
---|
161 | """ |
---|
162 | log.msg("%s.get_block(%d)" % (repr(self), segnum), |
---|
163 | level=log.NOISY, parent=self._lp, umid="RTo9MQ") |
---|
164 | assert segnum >= 0 |
---|
165 | o = EventStreamObserver() |
---|
166 | o.set_canceler(self, "_cancel_block_request") |
---|
167 | for i,(segnum0,observers) in enumerate(self._requested_blocks): |
---|
168 | if segnum0 == segnum: |
---|
169 | observers.add(o) |
---|
170 | break |
---|
171 | else: |
---|
172 | self._requested_blocks.append( (segnum, set([o])) ) |
---|
173 | self.schedule_loop() |
---|
174 | return o |
---|
175 | |
---|
176 | def _cancel_block_request(self, o): |
---|
177 | new_requests = [] |
---|
178 | for e in self._requested_blocks: |
---|
179 | (segnum0, observers) = e |
---|
180 | observers.discard(o) |
---|
181 | if observers: |
---|
182 | new_requests.append(e) |
---|
183 | self._requested_blocks = new_requests |
---|
184 | |
---|
185 | # internal methods |
---|
186 | def _active_segnum_and_observers(self): |
---|
187 | if self._requested_blocks: |
---|
188 | # we only retrieve information for one segment at a time, to |
---|
189 | # minimize alacrity (first come, first served) |
---|
190 | return self._requested_blocks[0] |
---|
191 | return None, [] |
---|
192 | |
---|
193 | def schedule_loop(self): |
---|
194 | if self._loop_scheduled: |
---|
195 | return |
---|
196 | self._loop_scheduled = True |
---|
197 | eventually(self.loop) |
---|
198 | |
---|
199 | def loop(self): |
---|
200 | self._loop_scheduled = False |
---|
201 | if not self._alive: |
---|
202 | return |
---|
203 | try: |
---|
204 | # if any exceptions occur here, kill the download |
---|
205 | log.msg("%s.loop, reqs=[%s], pending=%s, received=%s," |
---|
206 | " unavailable=%s" % |
---|
207 | (repr(self), |
---|
208 | ",".join([str(req[0]) for req in self._requested_blocks]), |
---|
209 | self._pending.dump(), self._received.dump(), |
---|
210 | self._unavailable.dump() ), |
---|
211 | level=log.NOISY, parent=self._lp, umid="BaL1zw") |
---|
212 | self._do_loop() |
---|
213 | # all exception cases call self._fail(), which clears self._alive |
---|
214 | except (BadHashError, NotEnoughHashesError, LayoutInvalid) as e: |
---|
215 | # Abandon this share. We do this if we see corruption in the |
---|
216 | # offset table, the UEB, or a hash tree. We don't abandon the |
---|
217 | # whole share if we see corruption in a data block (we abandon |
---|
218 | # just the one block, and still try to get data from other blocks |
---|
219 | # on the same server). In theory, we could get good data from a |
---|
220 | # share with a corrupt UEB (by first getting the UEB from some |
---|
221 | # other share), or corrupt hash trees, but the logic to decide |
---|
222 | # when this is safe is non-trivial. So for now, give up at the |
---|
223 | # first sign of corruption. |
---|
224 | # |
---|
225 | # _satisfy_*() code which detects corruption should first call |
---|
226 | # self._signal_corruption(), and then raise the exception. |
---|
227 | log.msg(format="corruption detected in %(share)s", |
---|
228 | share=repr(self), |
---|
229 | level=log.UNUSUAL, parent=self._lp, umid="gWspVw") |
---|
230 | self._fail(Failure(e), log.UNUSUAL) |
---|
231 | except DataUnavailable as e: |
---|
232 | # Abandon this share. |
---|
233 | log.msg(format="need data that will never be available" |
---|
234 | " from %s: pending=%s, received=%s, unavailable=%s" % |
---|
235 | (repr(self), |
---|
236 | self._pending.dump(), self._received.dump(), |
---|
237 | self._unavailable.dump() ), |
---|
238 | level=log.UNUSUAL, parent=self._lp, umid="F7yJnQ") |
---|
239 | self._fail(Failure(e), log.UNUSUAL) |
---|
240 | except BaseException: |
---|
241 | self._fail(Failure()) |
---|
242 | raise |
---|
243 | log.msg("%s.loop done, reqs=[%s], pending=%s, received=%s," |
---|
244 | " unavailable=%s" % |
---|
245 | (repr(self), |
---|
246 | ",".join([str(req[0]) for req in self._requested_blocks]), |
---|
247 | self._pending.dump(), self._received.dump(), |
---|
248 | self._unavailable.dump() ), |
---|
249 | level=log.NOISY, parent=self._lp, umid="9lRaRA") |
---|
250 | |
---|
251 | def _do_loop(self): |
---|
252 | # we are (eventually) called after all state transitions: |
---|
253 | # new segments added to self._requested_blocks |
---|
254 | # new data received from servers (responses to our read() calls) |
---|
255 | # impatience timer fires (server appears slow) |
---|
256 | |
---|
257 | # First, consume all of the information that we currently have, for |
---|
258 | # all the segments people currently want. |
---|
259 | start = now() |
---|
260 | while self._get_satisfaction(): |
---|
261 | pass |
---|
262 | self._download_status.add_misc_event("satisfy", start, now()) |
---|
263 | |
---|
264 | # When we get no satisfaction (from the data we've received so far), |
---|
265 | # we determine what data we desire (to satisfy more requests). The |
---|
266 | # number of segments is finite, so I can't get no satisfaction |
---|
267 | # forever. |
---|
268 | start = now() |
---|
269 | wanted, needed = self._desire() |
---|
270 | self._download_status.add_misc_event("desire", start, now()) |
---|
271 | |
---|
272 | # Finally, send out requests for whatever we need (desire minus |
---|
273 | # have). You can't always get what you want, but if you try |
---|
274 | # sometimes, you just might find, you get what you need. |
---|
275 | self._send_requests(wanted + needed) |
---|
276 | |
---|
277 | # and sometimes you can't even get what you need |
---|
278 | start = now() |
---|
279 | disappointment = needed & self._unavailable |
---|
280 | if disappointment.len(): |
---|
281 | self.had_corruption = True |
---|
282 | raise DataUnavailable("need %s but will never get it" % |
---|
283 | disappointment.dump()) |
---|
284 | self._download_status.add_misc_event("checkdis", start, now()) |
---|
285 | |
---|
286 | def _get_satisfaction(self): |
---|
287 | # return True if we retired a data block, and should therefore be |
---|
288 | # called again. Return False if we don't retire a data block (even if |
---|
289 | # we do retire some other data, like hash chains). |
---|
290 | |
---|
291 | if self.actual_offsets is None: |
---|
292 | if not self._satisfy_offsets(): |
---|
293 | # can't even look at anything without the offset table |
---|
294 | return False |
---|
295 | |
---|
296 | if not self._node.have_UEB: |
---|
297 | if not self._satisfy_UEB(): |
---|
298 | # can't check any hashes without the UEB |
---|
299 | return False |
---|
300 | # the call to _satisfy_UEB() will immediately set the |
---|
301 | # authoritative num_segments in all our CommonShares. If we |
---|
302 | # guessed wrong, we might stil be working on a bogus segnum |
---|
303 | # (beyond the real range). We catch this and signal BADSEGNUM |
---|
304 | # before invoking any further code that touches hashtrees. |
---|
305 | self.actual_segment_size = self._node.segment_size # might be updated |
---|
306 | assert self.actual_segment_size is not None |
---|
307 | |
---|
308 | # knowing the UEB means knowing num_segments |
---|
309 | assert self._node.num_segments is not None |
---|
310 | |
---|
311 | segnum, observers = self._active_segnum_and_observers() |
---|
312 | # if segnum is None, we don't really need to do anything (we have no |
---|
313 | # outstanding readers right now), but we'll fill in the bits that |
---|
314 | # aren't tied to any particular segment. |
---|
315 | |
---|
316 | if segnum is not None and segnum >= self._node.num_segments: |
---|
317 | for o in observers: |
---|
318 | o.notify(state=BADSEGNUM) |
---|
319 | self._requested_blocks.pop(0) |
---|
320 | return True |
---|
321 | |
---|
322 | if self._node.share_hash_tree.needed_hashes(self._shnum): |
---|
323 | if not self._satisfy_share_hash_tree(): |
---|
324 | # can't check block_hash_tree without a root |
---|
325 | return False |
---|
326 | |
---|
327 | if self._commonshare.need_block_hash_root(): |
---|
328 | block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum) |
---|
329 | self._commonshare.set_block_hash_root(block_hash_root) |
---|
330 | |
---|
331 | if segnum is None: |
---|
332 | return False # we don't want any particular segment right now |
---|
333 | |
---|
334 | # block_hash_tree |
---|
335 | needed_hashes = self._commonshare.get_needed_block_hashes(segnum) |
---|
336 | if needed_hashes: |
---|
337 | if not self._satisfy_block_hash_tree(needed_hashes): |
---|
338 | # can't check block without block_hash_tree |
---|
339 | return False |
---|
340 | |
---|
341 | # ciphertext_hash_tree |
---|
342 | needed_hashes = self._node.get_needed_ciphertext_hashes(segnum) |
---|
343 | if needed_hashes: |
---|
344 | if not self._satisfy_ciphertext_hash_tree(needed_hashes): |
---|
345 | # can't check decoded blocks without ciphertext_hash_tree |
---|
346 | return False |
---|
347 | |
---|
348 | # data blocks |
---|
349 | return self._satisfy_data_block(segnum, observers) |
---|
350 | |
---|
351 | def _satisfy_offsets(self): |
---|
352 | version_s = self._received.get(0, 4) |
---|
353 | if version_s is None: |
---|
354 | return False |
---|
355 | (version,) = struct.unpack(">L", version_s) |
---|
356 | if version == 1: |
---|
357 | table_start = 0x0c |
---|
358 | self._fieldsize = 0x4 |
---|
359 | self._fieldstruct = "L" |
---|
360 | elif version == 2: |
---|
361 | table_start = 0x14 |
---|
362 | self._fieldsize = 0x8 |
---|
363 | self._fieldstruct = "Q" |
---|
364 | else: |
---|
365 | self.had_corruption = True |
---|
366 | raise LayoutInvalid("unknown version %d (I understand 1 and 2)" |
---|
367 | % version) |
---|
368 | offset_table_size = 6 * self._fieldsize |
---|
369 | table_s = self._received.pop(table_start, offset_table_size) |
---|
370 | if table_s is None: |
---|
371 | return False |
---|
372 | fields = struct.unpack(">"+6*self._fieldstruct, table_s) |
---|
373 | offsets = {} |
---|
374 | for i,field in enumerate(['data', |
---|
375 | 'plaintext_hash_tree', # UNUSED |
---|
376 | 'crypttext_hash_tree', |
---|
377 | 'block_hashes', |
---|
378 | 'share_hashes', |
---|
379 | 'uri_extension', |
---|
380 | ] ): |
---|
381 | offsets[field] = fields[i] |
---|
382 | self.actual_offsets = offsets |
---|
383 | log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields), |
---|
384 | level=log.NOISY, parent=self._lp, umid="jedQcw") |
---|
385 | self._received.remove(0, 4) # don't need this anymore |
---|
386 | |
---|
387 | # validate the offsets a bit |
---|
388 | share_hashes_size = offsets["uri_extension"] - offsets["share_hashes"] |
---|
389 | if share_hashes_size < 0 or share_hashes_size % (2+HASH_SIZE) != 0: |
---|
390 | # the share hash chain is stored as (hashnum,hash) pairs |
---|
391 | self.had_corruption = True |
---|
392 | raise LayoutInvalid("share hashes malformed -- should be a" |
---|
393 | " multiple of %d bytes -- not %d" % |
---|
394 | (2+HASH_SIZE, share_hashes_size)) |
---|
395 | block_hashes_size = offsets["share_hashes"] - offsets["block_hashes"] |
---|
396 | if block_hashes_size < 0 or block_hashes_size % (HASH_SIZE) != 0: |
---|
397 | # the block hash tree is stored as a list of hashes |
---|
398 | self.had_corruption = True |
---|
399 | raise LayoutInvalid("block hashes malformed -- should be a" |
---|
400 | " multiple of %d bytes -- not %d" % |
---|
401 | (HASH_SIZE, block_hashes_size)) |
---|
402 | # we only look at 'crypttext_hash_tree' if the UEB says we're |
---|
403 | # actually using it. Same with 'plaintext_hash_tree'. This gives us |
---|
404 | # some wiggle room: a place to stash data for later extensions. |
---|
405 | |
---|
406 | return True |
---|
407 | |
---|
408 | def _satisfy_UEB(self): |
---|
409 | o = self.actual_offsets |
---|
410 | fsize = self._fieldsize |
---|
411 | UEB_length_s = self._received.get(o["uri_extension"], fsize) |
---|
412 | if not UEB_length_s: |
---|
413 | return False |
---|
414 | (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s) |
---|
415 | UEB_s = self._received.pop(o["uri_extension"]+fsize, UEB_length) |
---|
416 | if not UEB_s: |
---|
417 | return False |
---|
418 | self._received.remove(o["uri_extension"], fsize) |
---|
419 | try: |
---|
420 | self._node.validate_and_store_UEB(UEB_s) |
---|
421 | return True |
---|
422 | except (LayoutInvalid, BadHashError) as e: |
---|
423 | # TODO: if this UEB was bad, we'll keep trying to validate it |
---|
424 | # over and over again. Only log.err on the first one, or better |
---|
425 | # yet skip all but the first |
---|
426 | f = Failure(e) |
---|
427 | self._signal_corruption(f, o["uri_extension"], fsize+UEB_length) |
---|
428 | self.had_corruption = True |
---|
429 | raise |
---|
430 | |
---|
431 | def _satisfy_share_hash_tree(self): |
---|
432 | # the share hash chain is stored as (hashnum,hash) tuples, so you |
---|
433 | # can't fetch just the pieces you need, because you don't know |
---|
434 | # exactly where they are. So fetch everything, and parse the results |
---|
435 | # later. |
---|
436 | o = self.actual_offsets |
---|
437 | hashlen = o["uri_extension"] - o["share_hashes"] |
---|
438 | assert hashlen % (2+HASH_SIZE) == 0 |
---|
439 | hashdata = self._received.get(o["share_hashes"], hashlen) |
---|
440 | if not hashdata: |
---|
441 | return False |
---|
442 | share_hashes = {} |
---|
443 | for i in range(0, hashlen, 2+HASH_SIZE): |
---|
444 | (hashnum,) = struct.unpack(">H", hashdata[i:i+2]) |
---|
445 | hashvalue = hashdata[i+2:i+2+HASH_SIZE] |
---|
446 | share_hashes[hashnum] = hashvalue |
---|
447 | # TODO: if they give us an empty set of hashes, |
---|
448 | # process_share_hashes() won't fail. We must ensure that this |
---|
449 | # situation doesn't allow unverified shares through. Manual testing |
---|
450 | # shows that set_block_hash_root() throws an assert because an |
---|
451 | # internal node is None instead of an actual hash, but we want |
---|
452 | # something better. It's probably best to add a method to |
---|
453 | # IncompleteHashTree which takes a leaf number and raises an |
---|
454 | # exception unless that leaf is present and fully validated. |
---|
455 | try: |
---|
456 | self._node.process_share_hashes(share_hashes) |
---|
457 | # adds to self._node.share_hash_tree |
---|
458 | except (BadHashError, NotEnoughHashesError) as e: |
---|
459 | f = Failure(e) |
---|
460 | self._signal_corruption(f, o["share_hashes"], hashlen) |
---|
461 | self.had_corruption = True |
---|
462 | raise |
---|
463 | self._received.remove(o["share_hashes"], hashlen) |
---|
464 | return True |
---|
465 | |
---|
466 | def _signal_corruption(self, f, start, offset): |
---|
467 | # there was corruption somewhere in the given range |
---|
468 | reason = "corruption in share[%d-%d): %s" % (start, start+offset, |
---|
469 | str(f.value)) |
---|
470 | return self._rref.callRemote( |
---|
471 | "advise_corrupt_share", reason.encode("utf-8") |
---|
472 | ).addErrback(log.err, "Error from remote call to advise_corrupt_share") |
---|
473 | |
---|
474 | def _satisfy_block_hash_tree(self, needed_hashes): |
---|
475 | o_bh = self.actual_offsets["block_hashes"] |
---|
476 | block_hashes = {} |
---|
477 | for hashnum in needed_hashes: |
---|
478 | hashdata = self._received.get(o_bh+hashnum*HASH_SIZE, HASH_SIZE) |
---|
479 | if hashdata: |
---|
480 | block_hashes[hashnum] = hashdata |
---|
481 | else: |
---|
482 | return False # missing some hashes |
---|
483 | # note that we don't submit any hashes to the block_hash_tree until |
---|
484 | # we've gotten them all, because the hash tree will throw an |
---|
485 | # exception if we only give it a partial set (which it therefore |
---|
486 | # cannot validate) |
---|
487 | try: |
---|
488 | self._commonshare.process_block_hashes(block_hashes) |
---|
489 | except (BadHashError, NotEnoughHashesError) as e: |
---|
490 | f = Failure(e) |
---|
491 | hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())]) |
---|
492 | log.msg(format="hash failure in block_hashes=(%(hashnums)s)," |
---|
493 | " from %(share)s", |
---|
494 | hashnums=hashnums, shnum=self._shnum, share=repr(self), |
---|
495 | failure=f, level=log.WEIRD, parent=self._lp, umid="yNyFdA") |
---|
496 | hsize = max(0, max(needed_hashes)) * HASH_SIZE |
---|
497 | self._signal_corruption(f, o_bh, hsize) |
---|
498 | self.had_corruption = True |
---|
499 | raise |
---|
500 | for hashnum in needed_hashes: |
---|
501 | self._received.remove(o_bh+hashnum*HASH_SIZE, HASH_SIZE) |
---|
502 | return True |
---|
503 | |
---|
504 | def _satisfy_ciphertext_hash_tree(self, needed_hashes): |
---|
505 | start = self.actual_offsets["crypttext_hash_tree"] |
---|
506 | hashes = {} |
---|
507 | for hashnum in needed_hashes: |
---|
508 | hashdata = self._received.get(start+hashnum*HASH_SIZE, HASH_SIZE) |
---|
509 | if hashdata: |
---|
510 | hashes[hashnum] = hashdata |
---|
511 | else: |
---|
512 | return False # missing some hashes |
---|
513 | # we don't submit any hashes to the ciphertext_hash_tree until we've |
---|
514 | # gotten them all |
---|
515 | try: |
---|
516 | self._node.process_ciphertext_hashes(hashes) |
---|
517 | except (BadHashError, NotEnoughHashesError) as e: |
---|
518 | f = Failure(e) |
---|
519 | hashnums = ",".join([str(n) for n in sorted(hashes.keys())]) |
---|
520 | log.msg(format="hash failure in ciphertext_hashes=(%(hashnums)s)," |
---|
521 | " from %(share)s", |
---|
522 | hashnums=hashnums, share=repr(self), failure=f, |
---|
523 | level=log.WEIRD, parent=self._lp, umid="iZI0TA") |
---|
524 | hsize = max(0, max(needed_hashes))*HASH_SIZE |
---|
525 | self._signal_corruption(f, start, hsize) |
---|
526 | self.had_corruption = True |
---|
527 | raise |
---|
528 | for hashnum in needed_hashes: |
---|
529 | self._received.remove(start+hashnum*HASH_SIZE, HASH_SIZE) |
---|
530 | return True |
---|
531 | |
---|
532 | def _satisfy_data_block(self, segnum, observers): |
---|
533 | tail = (segnum == self._node.num_segments-1) |
---|
534 | datastart = self.actual_offsets["data"] |
---|
535 | blockstart = datastart + segnum * self._node.block_size |
---|
536 | blocklen = self._node.block_size |
---|
537 | if tail: |
---|
538 | blocklen = self._node.tail_block_size |
---|
539 | |
---|
540 | block = self._received.pop(blockstart, blocklen) |
---|
541 | if not block: |
---|
542 | log.msg("no data for block %s (want [%d:+%d])" % (repr(self), |
---|
543 | blockstart, blocklen), |
---|
544 | level=log.NOISY, parent=self._lp, umid="aK0RFw") |
---|
545 | return False |
---|
546 | log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]", |
---|
547 | share=repr(self), start=blockstart, length=blocklen, |
---|
548 | level=log.NOISY, parent=self._lp, umid="uTDNZg") |
---|
549 | # this block is being retired, either as COMPLETE or CORRUPT, since |
---|
550 | # no further data reads will help |
---|
551 | assert self._requested_blocks[0][0] == segnum |
---|
552 | try: |
---|
553 | self._commonshare.check_block(segnum, block) |
---|
554 | # hurrah, we have a valid block. Deliver it. |
---|
555 | for o in observers: |
---|
556 | # goes to SegmentFetcher._block_request_activity |
---|
557 | o.notify(state=COMPLETE, block=block) |
---|
558 | # now clear our received data, to dodge the #1170 spans.py |
---|
559 | # complexity bug |
---|
560 | self._received = DataSpans() |
---|
561 | except (BadHashError, NotEnoughHashesError) as e: |
---|
562 | # rats, we have a corrupt block. Notify our clients that they |
---|
563 | # need to look elsewhere, and advise the server. Unlike |
---|
564 | # corruption in other parts of the share, this doesn't cause us |
---|
565 | # to abandon the whole share. |
---|
566 | f = Failure(e) |
---|
567 | log.msg(format="hash failure in block %(segnum)d, from %(share)s", |
---|
568 | segnum=segnum, share=repr(self), failure=f, |
---|
569 | level=log.WEIRD, parent=self._lp, umid="mZjkqA") |
---|
570 | for o in observers: |
---|
571 | o.notify(state=CORRUPT) |
---|
572 | self._signal_corruption(f, blockstart, blocklen) |
---|
573 | self.had_corruption = True |
---|
574 | # in either case, we've retired this block |
---|
575 | self._requested_blocks.pop(0) |
---|
576 | # popping the request keeps us from turning around and wanting the |
---|
577 | # block again right away |
---|
578 | return True # got satisfaction |
---|
579 | |
---|
580 | def _desire(self): |
---|
581 | segnum, observers = self._active_segnum_and_observers() # maybe None |
---|
582 | |
---|
583 | # 'want_it' is for data we merely want: we know that we don't really |
---|
584 | # need it. This includes speculative reads, like the first 1KB of the |
---|
585 | # share (for the offset table) and the first 2KB of the UEB. |
---|
586 | # |
---|
587 | # 'need_it' is for data that, if we have the real offset table, we'll |
---|
588 | # need. If we are only guessing at the offset table, it's merely |
---|
589 | # wanted. (The share is abandoned if we can't get data that we really |
---|
590 | # need). |
---|
591 | # |
---|
592 | # 'gotta_gotta_have_it' is for data that we absolutely need, |
---|
593 | # independent of whether we're still guessing about the offset table: |
---|
594 | # the version number and the offset table itself. |
---|
595 | # |
---|
596 | # Mr. Popeil, I'm in trouble, need your assistance on the double. Aww.. |
---|
597 | |
---|
598 | desire = Spans(), Spans(), Spans() |
---|
599 | (want_it, need_it, gotta_gotta_have_it) = desire |
---|
600 | |
---|
601 | self.actual_segment_size = self._node.segment_size # might be updated |
---|
602 | o = self.actual_offsets or self.guessed_offsets |
---|
603 | segsize = self.actual_segment_size or self.guessed_segment_size |
---|
604 | r = self._node._calculate_sizes(segsize) |
---|
605 | |
---|
606 | if not self.actual_offsets: |
---|
607 | # all _desire functions add bits to the three desire[] spans |
---|
608 | self._desire_offsets(desire) |
---|
609 | |
---|
610 | # we can use guessed offsets as long as this server tolerates |
---|
611 | # overrun. Otherwise, we must wait for the offsets to arrive before |
---|
612 | # we try to read anything else. |
---|
613 | if self.actual_offsets or self._overrun_ok: |
---|
614 | if not self._node.have_UEB: |
---|
615 | self._desire_UEB(desire, o) |
---|
616 | self._desire_share_hashes(desire, o) |
---|
617 | if segnum is not None: |
---|
618 | # They might be asking for a segment number that is beyond |
---|
619 | # what we guess the file contains, but _desire_block_hashes |
---|
620 | # and _desire_data will tolerate that. |
---|
621 | self._desire_block_hashes(desire, o, segnum) |
---|
622 | self._desire_data(desire, o, r, segnum, segsize) |
---|
623 | |
---|
624 | log.msg("end _desire: want_it=%s need_it=%s gotta=%s" |
---|
625 | % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()), |
---|
626 | level=log.NOISY, parent=self._lp, umid="IG7CgA") |
---|
627 | if self.actual_offsets: |
---|
628 | return (want_it, need_it+gotta_gotta_have_it) |
---|
629 | else: |
---|
630 | return (want_it+need_it, gotta_gotta_have_it) |
---|
631 | |
---|
632 | def _desire_offsets(self, desire): |
---|
633 | (want_it, need_it, gotta_gotta_have_it) = desire |
---|
634 | if self._overrun_ok: |
---|
635 | # easy! this includes version number, sizes, and offsets |
---|
636 | want_it.add(0, 1024) |
---|
637 | return |
---|
638 | |
---|
639 | # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44). |
---|
640 | # To be conservative, only request the data that we know lives there, |
---|
641 | # even if that means more roundtrips. |
---|
642 | |
---|
643 | gotta_gotta_have_it.add(0, 4) # version number, always safe |
---|
644 | version_s = self._received.get(0, 4) |
---|
645 | if not version_s: |
---|
646 | return |
---|
647 | (version,) = struct.unpack(">L", version_s) |
---|
648 | # The code in _satisfy_offsets will have checked this version |
---|
649 | # already. There is no code path to get this far with version>2. |
---|
650 | assert 1 <= version <= 2, "can't get here, version=%d" % version |
---|
651 | if version == 1: |
---|
652 | table_start = 0x0c |
---|
653 | fieldsize = 0x4 |
---|
654 | elif version == 2: |
---|
655 | table_start = 0x14 |
---|
656 | fieldsize = 0x8 |
---|
657 | offset_table_size = 6 * fieldsize |
---|
658 | gotta_gotta_have_it.add(table_start, offset_table_size) |
---|
659 | |
---|
660 | def _desire_UEB(self, desire, o): |
---|
661 | (want_it, need_it, gotta_gotta_have_it) = desire |
---|
662 | |
---|
663 | # UEB data is stored as (length,data). |
---|
664 | if self._overrun_ok: |
---|
665 | # We can pre-fetch 2kb, which should probably cover it. If it |
---|
666 | # turns out to be larger, we'll come back here later with a known |
---|
667 | # length and fetch the rest. |
---|
668 | want_it.add(o["uri_extension"], 2048) |
---|
669 | # now, while that is probably enough to fetch the whole UEB, it |
---|
670 | # might not be, so we need to do the next few steps as well. In |
---|
671 | # most cases, the following steps will not actually add anything |
---|
672 | # to need_it |
---|
673 | |
---|
674 | need_it.add(o["uri_extension"], self._fieldsize) |
---|
675 | # only use a length if we're sure it's correct, otherwise we'll |
---|
676 | # probably fetch a huge number |
---|
677 | if not self.actual_offsets: |
---|
678 | return |
---|
679 | UEB_length_s = self._received.get(o["uri_extension"], self._fieldsize) |
---|
680 | if UEB_length_s: |
---|
681 | (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s) |
---|
682 | # we know the length, so make sure we grab everything |
---|
683 | need_it.add(o["uri_extension"]+self._fieldsize, UEB_length) |
---|
684 | |
---|
685 | def _desire_share_hashes(self, desire, o): |
---|
686 | (want_it, need_it, gotta_gotta_have_it) = desire |
---|
687 | |
---|
688 | if self._node.share_hash_tree.needed_hashes(self._shnum): |
---|
689 | hashlen = o["uri_extension"] - o["share_hashes"] |
---|
690 | need_it.add(o["share_hashes"], hashlen) |
---|
691 | |
---|
692 | def _desire_block_hashes(self, desire, o, segnum): |
---|
693 | (want_it, need_it, gotta_gotta_have_it) = desire |
---|
694 | |
---|
695 | # block hash chain |
---|
696 | for hashnum in self._commonshare.get_desired_block_hashes(segnum): |
---|
697 | need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) |
---|
698 | |
---|
699 | # ciphertext hash chain |
---|
700 | for hashnum in self._node.get_desired_ciphertext_hashes(segnum): |
---|
701 | need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE) |
---|
702 | |
---|
703 | def _desire_data(self, desire, o, r, segnum, segsize): |
---|
704 | if segnum > r["num_segments"]: |
---|
705 | # they're asking for a segment that's beyond what we think is the |
---|
706 | # end of the file. We won't get here if we've already learned the |
---|
707 | # real UEB: _get_satisfaction() will notice the out-of-bounds and |
---|
708 | # terminate the loop. So we must still be guessing, which means |
---|
709 | # that they might be correct in asking for such a large segnum. |
---|
710 | # But if they're right, then our segsize/segnum guess is |
---|
711 | # certainly wrong, which means we don't know what data blocks to |
---|
712 | # ask for yet. So don't bother adding anything. When the UEB |
---|
713 | # comes back and we learn the correct segsize/segnums, we'll |
---|
714 | # either reject the request or have enough information to proceed |
---|
715 | # normally. This costs one roundtrip. |
---|
716 | log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)" |
---|
717 | % (segnum, r["num_segments"]), |
---|
718 | level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") |
---|
719 | return |
---|
720 | (want_it, need_it, gotta_gotta_have_it) = desire |
---|
721 | tail = (segnum == r["num_segments"]-1) |
---|
722 | datastart = o["data"] |
---|
723 | blockstart = datastart + segnum * r["block_size"] |
---|
724 | blocklen = r["block_size"] |
---|
725 | if tail: |
---|
726 | blocklen = r["tail_block_size"] |
---|
727 | need_it.add(blockstart, blocklen) |
---|
728 | |
---|
729 | def _send_requests(self, desired): |
---|
730 | ask = desired - self._pending - self._received.get_spans() |
---|
731 | log.msg("%s._send_requests, desired=%s, pending=%s, ask=%s" % |
---|
732 | (repr(self), desired.dump(), self._pending.dump(), ask.dump()), |
---|
733 | level=log.NOISY, parent=self._lp, umid="E94CVA") |
---|
734 | # XXX At one time, this code distinguished between data blocks and |
---|
735 | # hashes, and made sure to send (small) requests for hashes before |
---|
736 | # sending (big) requests for blocks. The idea was to make sure that |
---|
737 | # all hashes arrive before the blocks, so the blocks can be consumed |
---|
738 | # and released in a single turn. I removed this for simplicity. |
---|
739 | # Reconsider the removal: maybe bring it back. |
---|
740 | ds = self._download_status |
---|
741 | |
---|
742 | for (start, length) in ask: |
---|
743 | # TODO: quantize to reasonably-large blocks |
---|
744 | self._pending.add(start, length) |
---|
745 | lp = log.msg(format="%(share)s._send_request" |
---|
746 | " [%(start)d:+%(length)d]", |
---|
747 | share=repr(self), |
---|
748 | start=start, length=length, |
---|
749 | level=log.NOISY, parent=self._lp, umid="sgVAyA") |
---|
750 | block_ev = ds.add_block_request(self._server, self._shnum, |
---|
751 | start, length, now()) |
---|
752 | d = self._send_request(start, length) |
---|
753 | d.addCallback(self._got_data, start, length, block_ev, lp) |
---|
754 | d.addErrback(self._got_error, start, length, block_ev, lp) |
---|
755 | d.addCallback(self._trigger_loop) |
---|
756 | d.addErrback(lambda f: |
---|
757 | log.err(format="unhandled error during send_request", |
---|
758 | failure=f, parent=self._lp, |
---|
759 | level=log.WEIRD, umid="qZu0wg")) |
---|
760 | |
---|
761 | def _send_request(self, start, length): |
---|
762 | return self._rref.callRemote("read", start, length) |
---|
763 | |
---|
764 | def _got_data(self, data, start, length, block_ev, lp): |
---|
765 | block_ev.finished(len(data), now()) |
---|
766 | if not self._alive: |
---|
767 | return |
---|
768 | log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d", |
---|
769 | share=repr(self), start=start, length=length, datalen=len(data), |
---|
770 | level=log.NOISY, parent=lp, umid="5Qn6VQ") |
---|
771 | self._pending.remove(start, length) |
---|
772 | self._received.add(start, data) |
---|
773 | |
---|
774 | # if we ask for [a:c], and we get back [a:b] (b<c), that means we're |
---|
775 | # never going to get [b:c]. If we really need that data, this block |
---|
776 | # will never complete. The easiest way to get into this situation is |
---|
777 | # to hit a share with a corrupted offset table, or one that's somehow |
---|
778 | # been truncated. On the other hand, when overrun_ok is true, we ask |
---|
779 | # for data beyond the end of the share all the time (it saves some |
---|
780 | # RTT when we don't know the length of the share ahead of time). So |
---|
781 | # not every asked-for-but-not-received byte is fatal. |
---|
782 | if len(data) < length: |
---|
783 | self._unavailable.add(start+len(data), length-len(data)) |
---|
784 | |
---|
785 | # XXX if table corruption causes our sections to overlap, then one |
---|
786 | # consumer (i.e. block hash tree) will pop/remove the data that |
---|
787 | # another consumer (i.e. block data) mistakenly thinks it needs. It |
---|
788 | # won't ask for that data again, because the span is in |
---|
789 | # self._requested. But that span won't be in self._unavailable |
---|
790 | # because we got it back from the server. TODO: handle this properly |
---|
791 | # (raise DataUnavailable). Then add sanity-checking |
---|
792 | # no-overlaps-allowed tests to the offset-table unpacking code to |
---|
793 | # catch this earlier. XXX |
---|
794 | |
---|
795 | # accumulate a wanted/needed span (not as self._x, but passed into |
---|
796 | # desire* functions). manage a pending/in-flight list. when the |
---|
797 | # requests are sent out, empty/discard the wanted/needed span and |
---|
798 | # populate/augment the pending list. when the responses come back, |
---|
799 | # augment either received+data or unavailable. |
---|
800 | |
---|
801 | # if a corrupt offset table results in double-usage, we'll send |
---|
802 | # double requests. |
---|
803 | |
---|
804 | # the wanted/needed span is only "wanted" for the first pass. Once |
---|
805 | # the offset table arrives, it's all "needed". |
---|
806 | |
---|
807 | def _got_error(self, f, start, length, block_ev, lp): |
---|
808 | block_ev.error(now()) |
---|
809 | log.msg(format="error requesting %(start)d+%(length)d" |
---|
810 | " from %(server)s for si %(si)s", |
---|
811 | start=start, length=length, |
---|
812 | server=self._server.get_name(), si=self._si_prefix, |
---|
813 | failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw") |
---|
814 | # retire our observers, assuming we won't be able to make any |
---|
815 | # further progress |
---|
816 | self._fail(f, log.UNUSUAL) |
---|
817 | |
---|
818 | def _trigger_loop(self, res): |
---|
819 | if self._alive: |
---|
820 | self.schedule_loop() |
---|
821 | return res |
---|
822 | |
---|
823 | def _fail(self, f, level=log.WEIRD): |
---|
824 | log.msg(format="abandoning %(share)s", |
---|
825 | share=repr(self), failure=f, |
---|
826 | level=level, parent=self._lp, umid="JKM2Og") |
---|
827 | self._alive = False |
---|
828 | for (segnum, observers) in self._requested_blocks: |
---|
829 | for o in observers: |
---|
830 | o.notify(state=DEAD, f=f) |
---|
831 | |
---|
832 | |
---|
833 | class CommonShare: |
---|
834 | # TODO: defer creation of the hashtree until somebody uses us. There will |
---|
835 | # be a lot of unused shares, and we shouldn't spend the memory on a large |
---|
836 | # hashtree unless necessary. |
---|
837 | """I hold data that is common across all instances of a single share, |
---|
838 | like sh2 on both servers A and B. This is just the block hash tree. |
---|
839 | """ |
---|
840 | def __init__(self, best_numsegs, si_prefix, shnum, logparent): |
---|
841 | self.si_prefix = si_prefix |
---|
842 | self.shnum = shnum |
---|
843 | |
---|
844 | # in the beginning, before we have the real UEB, we can only guess at |
---|
845 | # the number of segments. But we want to ask for block hashes early. |
---|
846 | # So if we're asked for which block hashes are needed before we know |
---|
847 | # numsegs for sure, we return a guess. |
---|
848 | self._block_hash_tree = IncompleteHashTree(best_numsegs) |
---|
849 | self._block_hash_tree_is_authoritative = False |
---|
850 | self._block_hash_tree_leaves = best_numsegs |
---|
851 | self._logparent = logparent |
---|
852 | |
---|
853 | def __repr__(self): |
---|
854 | return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum) |
---|
855 | |
---|
856 | def set_authoritative_num_segments(self, numsegs): |
---|
857 | if self._block_hash_tree_leaves != numsegs: |
---|
858 | self._block_hash_tree = IncompleteHashTree(numsegs) |
---|
859 | self._block_hash_tree_leaves = numsegs |
---|
860 | self._block_hash_tree_is_authoritative = True |
---|
861 | |
---|
862 | def need_block_hash_root(self): |
---|
863 | return bool(not self._block_hash_tree[0]) |
---|
864 | |
---|
865 | def set_block_hash_root(self, roothash): |
---|
866 | assert self._block_hash_tree_is_authoritative |
---|
867 | self._block_hash_tree.set_hashes({0: roothash}) |
---|
868 | |
---|
869 | def get_desired_block_hashes(self, segnum): |
---|
870 | if segnum < self._block_hash_tree_leaves: |
---|
871 | return self._block_hash_tree.needed_hashes(segnum, |
---|
872 | include_leaf=True) |
---|
873 | |
---|
874 | # the segnum might be out-of-bounds. Originally it was due to a race |
---|
875 | # between the receipt of the UEB on one share (from which we learn |
---|
876 | # the correct number of segments, update all hash trees to the right |
---|
877 | # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery |
---|
878 | # of a new Share to the SegmentFetcher while that BADSEGNUM was |
---|
879 | # queued (which sends out requests to the stale segnum, now larger |
---|
880 | # than the hash tree). I fixed that (by making SegmentFetcher.loop |
---|
881 | # check for a bad segnum at the start of each pass, instead of using |
---|
882 | # the queued BADSEGNUM or a flag it sets), but just in case this |
---|
883 | # still happens, I'm leaving the < in place. If it gets hit, there's |
---|
884 | # a potential lost-progress problem, but I'm pretty sure that it will |
---|
885 | # get cleared up on the following turn. |
---|
886 | return [] |
---|
887 | |
---|
888 | def get_needed_block_hashes(self, segnum): |
---|
889 | assert self._block_hash_tree_is_authoritative |
---|
890 | # XXX: include_leaf=True needs thought: how did the old downloader do |
---|
891 | # it? I think it grabbed *all* block hashes and set them all at once. |
---|
892 | # Since we want to fetch less data, we either need to fetch the leaf |
---|
893 | # too, or wait to set the block hashes until we've also received the |
---|
894 | # block itself, so we can hash it too, and set the chain+leaf all at |
---|
895 | # the same time. |
---|
896 | return self._block_hash_tree.needed_hashes(segnum, include_leaf=True) |
---|
897 | |
---|
898 | def process_block_hashes(self, block_hashes): |
---|
899 | assert self._block_hash_tree_is_authoritative |
---|
900 | # this may raise BadHashError or NotEnoughHashesError |
---|
901 | self._block_hash_tree.set_hashes(block_hashes) |
---|
902 | |
---|
903 | def check_block(self, segnum, block): |
---|
904 | assert self._block_hash_tree_is_authoritative |
---|
905 | h = hashutil.block_hash(block) |
---|
906 | # this may raise BadHashError or NotEnoughHashesError |
---|
907 | self._block_hash_tree.set_hashes(leaves={segnum: h}) |
---|
908 | |
---|
909 | # TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an |
---|
910 | # auxilliary OVERDUE callback. Just make sure to get all the messages in the |
---|
911 | # right order and on the right turns. |
---|
912 | |
---|
913 | # TODO: we're asking for too much data. We probably don't need |
---|
914 | # include_leaf=True in the block hash tree or ciphertext hash tree. |
---|
915 | |
---|
916 | # TODO: we ask for ciphertext hash tree nodes from all shares (whenever |
---|
917 | # _desire is called while we're missing those nodes), but we only consume it |
---|
918 | # from the first response, leaving the rest of the data sitting in _received. |
---|
919 | # This was ameliorated by clearing self._received after each block is |
---|
920 | # complete. |
---|