Ticket #1223: 1223.diff
File 1223.diff, 9.5 KB (added by warner, at 2010-10-18T19:09:22Z) |
---|
-
src/allmydata/immutable/downloader/node.py
diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py index 33c16cf..04482e6 100644
a b class DownloadNode: 130 130 # for concurrent operations: each gets its own Segmentation manager 131 131 if size is None: 132 132 size = self._verifycap.size 133 # clip size so offset+size does not go past EOF 134 size = min(size, self._verifycap.size-offset) 133 # ignore overruns: clip size so offset+size does not go past EOF, and 134 # so size is not negative (which indicates that offset >= EOF) 135 size = max(0, min(size, self._verifycap.size-offset)) 135 136 if read_ev is None: 136 137 read_ev = self._download_status.add_read_event(offset, size, now()) 137 138 … … class DownloadNode: 143 144 sp = self._history.stats_provider 144 145 sp.count("downloader.files_downloaded", 1) # really read() calls 145 146 sp.count("downloader.bytes_downloaded", size) 147 if size == 0: 148 read_ev.finished(now()) 149 # no data, so no producer, so no register/unregisterProducer 150 return defer.succeed(consumer) 146 151 s = Segmentation(self, offset, size, consumer, read_ev, lp) 147 152 # this raises an interesting question: what segments to fetch? if 148 153 # offset=0, always fetch the first segment, and then allow -
src/allmydata/immutable/encode.py
diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py index c447448..df7a3a1 100644
a b class Encoder(object): 316 316 # of additional shares which can be substituted if the primary ones 317 317 # are unavailable 318 318 319 # we read data from the source one segment at a time, and then chop 320 # it into 'input_piece_size' pieces before handing it to the codec 321 319 322 crypttext_segment_hasher = hashutil.crypttext_segment_hasher() 320 323 321 324 # memory footprint: we only hold a tiny piece of the plaintext at any … … class Encoder(object): 350 353 crypttext_segment_hasher = hashutil.crypttext_segment_hasher() 351 354 352 355 d = self._gather_data(self.required_shares, input_piece_size, 353 crypttext_segment_hasher, 354 allow_short=True) 356 crypttext_segment_hasher, allow_short=True) 355 357 def _done_gathering(chunks): 356 358 for c in chunks: 357 359 # a short trailing chunk will have been padded by … … class Encoder(object): 369 371 370 372 def _gather_data(self, num_chunks, input_chunk_size, 371 373 crypttext_segment_hasher, 372 allow_short=False, 373 previous_chunks=[]): 374 allow_short=False): 374 375 """Return a Deferred that will fire when the required number of 375 376 chunks have been read (and hashed and encrypted). The Deferred fires 376 with the combination of any 'previous_chunks' and the new chunks 377 which were gathered.""" 377 with a list of chunks, each of size input_chunk_size.""" 378 379 # I originally built this to allow read_encrypted() to behave badly: 380 # to let it return more or less data than you asked for. It would 381 # stash the leftovers until later, and then recurse until it got 382 # enough. I don't think that was actually useful. 383 # 384 # who defines read_encrypted? 385 # offloaded.LocalCiphertextReader: real disk file: exact 386 # upload.EncryptAnUploadable: Uploadable, but a wrapper that makes 387 # it exact. The return value is a list of 50KiB chunks, to reduce 388 # the memory footprint of the encryption process. 389 # repairer.Repairer: immutable.filenode.CiphertextFileNode: exact 390 # 391 # This has been redefined to require read_encrypted() to behave like 392 # a local file: return exactly the amount requested unless it hits 393 # EOF. 394 # -warner 378 395 379 396 if self._aborted: 380 397 raise UploadAborted() 381 398 382 if not num_chunks: 383 return defer.succeed(previous_chunks) 384 385 d = self._uploadable.read_encrypted(input_chunk_size, False) 399 read_size = num_chunks * input_chunk_size 400 d = self._uploadable.read_encrypted(read_size, hash_only=False) 386 401 def _got(data): 402 assert isinstance(data, (list,tuple)) 387 403 if self._aborted: 388 404 raise UploadAborted() 389 encrypted_pieces = [] 390 length = 0 391 while data: 392 encrypted_piece = data.pop(0) 393 length += len(encrypted_piece) 394 crypttext_segment_hasher.update(encrypted_piece) 395 self._crypttext_hasher.update(encrypted_piece) 396 encrypted_pieces.append(encrypted_piece) 397 398 precondition(length <= input_chunk_size, 399 "length=%d > input_chunk_size=%d" % 400 (length, input_chunk_size)) 401 if allow_short: 402 if length < input_chunk_size: 403 # padding 404 pad_size = input_chunk_size - length 405 encrypted_pieces.append('\x00' * pad_size) 406 else: 407 # non-tail segments should be the full segment size 408 if length != input_chunk_size: 409 log.msg("non-tail segment should be full segment size: %d!=%d" 410 % (length, input_chunk_size), 411 level=log.BAD, umid="jNk5Yw") 412 precondition(length == input_chunk_size, 413 "length=%d != input_chunk_size=%d" % 414 (length, input_chunk_size)) 415 416 encrypted_piece = "".join(encrypted_pieces) 417 return previous_chunks + [encrypted_piece] 418 405 data = "".join(data) 406 precondition(len(data) <= read_size, len(data), read_size) 407 if not allow_short: 408 precondition(len(data) == read_size, len(data), read_size) 409 crypttext_segment_hasher.update(data) 410 self._crypttext_hasher.update(data) 411 if allow_short and len(data) < read_size: 412 # padding 413 data += "\x00" * (read_size - len(data)) 414 encrypted_pieces = [data[i:i+input_chunk_size] 415 for i in range(0, len(data), input_chunk_size)] 416 return encrypted_pieces 419 417 d.addCallback(_got) 420 d.addCallback(lambda chunks:421 self._gather_data(num_chunks-1, input_chunk_size,422 crypttext_segment_hasher,423 allow_short, chunks))424 418 return d 425 419 426 420 def _send_segment(self, (shares, shareids), segnum): -
src/allmydata/interfaces.py
diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index c5a47e1..48094a9 100644
a b class IUploadable(Interface): 1622 1622 1623 1623 If the data must be acquired through multiple internal read 1624 1624 operations, returning a list instead of a single string may help to 1625 reduce string copies. 1625 reduce string copies. However, the length of the concatenated strings 1626 must equal the amount of data requested, unless EOF is encountered. 1627 Long reads, or short reads without EOF, are not allowed. read() 1628 should return the same amount of data as a local disk file read, just 1629 in a different shape and asynchronously. 1626 1630 1627 1631 'length' will typically be equal to (min(get_size(),1MB)/req_shares), 1628 1632 so a 10kB file means length=3kB, 100kB file means length=30kB, -
src/allmydata/test/test_repairer.py
diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py index 49c4cff..942d327 100644
a b class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin, 672 672 return d 673 673 #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet." 674 674 675 def test_tiny_reads(self): 676 # ticket #1223 points out three problems: 677 # repairer reads beyond end of input file 678 # new-downloader does not tolerate overreads 679 # uploader does lots of tiny reads, inefficient 680 self.basedir = "repairer/Repairer/test_tiny_reads" 681 self.set_up_grid() 682 c0 = self.g.clients[0] 683 DATA = "a"*135 684 c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22 685 c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66 686 d = c0.upload(upload.Data(DATA, convergence="")) 687 def _then(ur): 688 self.uri = ur.uri 689 self.delete_shares_numbered(self.uri, [0]) 690 self.c0_filenode = c0.create_node_from_uri(ur.uri) 691 self._stash_counts() 692 return self.c0_filenode.check_and_repair(Monitor()) 693 d.addCallback(_then) 694 def _check(ign): 695 (r,a,w) = self._get_delta_counts() 696 # when the uploader (driven by the repairer) does full-segment 697 # reads, this makes 44 server read calls (2*k). Before, when it 698 # was doing input_chunk_size reads (7 bytes), it was doing over 699 # 400. 700 self.failIf(r > 100, "too many reads: %d>100" % r) 701 d.addCallback(_check) 702 return d 703 675 704 676 705 # XXX extend these tests to show that the checker detects which specific 677 706 # share on which specific server is broken -- this is necessary so that the