Ticket #1223: 1223.diff

File 1223.diff, 9.5 KB (added by warner, at 2010-10-18T19:09:22Z)

potential fix

  • src/allmydata/immutable/downloader/node.py

    diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py
    index 33c16cf..04482e6 100644
    a b class DownloadNode: 
    130130        # for concurrent operations: each gets its own Segmentation manager
    131131        if size is None:
    132132            size = self._verifycap.size
    133         # clip size so offset+size does not go past EOF
    134         size = min(size, self._verifycap.size-offset)
     133        # ignore overruns: clip size so offset+size does not go past EOF, and
     134        # so size is not negative (which indicates that offset >= EOF)
     135        size = max(0, min(size, self._verifycap.size-offset))
    135136        if read_ev is None:
    136137            read_ev = self._download_status.add_read_event(offset, size, now())
    137138
    class DownloadNode: 
    143144            sp = self._history.stats_provider
    144145            sp.count("downloader.files_downloaded", 1) # really read() calls
    145146            sp.count("downloader.bytes_downloaded", size)
     147        if size == 0:
     148            read_ev.finished(now())
     149            # no data, so no producer, so no register/unregisterProducer
     150            return defer.succeed(consumer)
    146151        s = Segmentation(self, offset, size, consumer, read_ev, lp)
    147152        # this raises an interesting question: what segments to fetch? if
    148153        # offset=0, always fetch the first segment, and then allow
  • src/allmydata/immutable/encode.py

    diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py
    index c447448..df7a3a1 100644
    a b class Encoder(object): 
    316316        # of additional shares which can be substituted if the primary ones
    317317        # are unavailable
    318318
     319        # we read data from the source one segment at a time, and then chop
     320        # it into 'input_piece_size' pieces before handing it to the codec
     321
    319322        crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
    320323
    321324        # memory footprint: we only hold a tiny piece of the plaintext at any
    class Encoder(object): 
    350353        crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
    351354
    352355        d = self._gather_data(self.required_shares, input_piece_size,
    353                               crypttext_segment_hasher,
    354                               allow_short=True)
     356                              crypttext_segment_hasher, allow_short=True)
    355357        def _done_gathering(chunks):
    356358            for c in chunks:
    357359                # a short trailing chunk will have been padded by
    class Encoder(object): 
    369371
    370372    def _gather_data(self, num_chunks, input_chunk_size,
    371373                     crypttext_segment_hasher,
    372                      allow_short=False,
    373                      previous_chunks=[]):
     374                     allow_short=False):
    374375        """Return a Deferred that will fire when the required number of
    375376        chunks have been read (and hashed and encrypted). The Deferred fires
    376         with the combination of any 'previous_chunks' and the new chunks
    377         which were gathered."""
     377        with a list of chunks, each of size input_chunk_size."""
     378
     379        # I originally built this to allow read_encrypted() to behave badly:
     380        # to let it return more or less data than you asked for. It would
     381        # stash the leftovers until later, and then recurse until it got
     382        # enough. I don't think that was actually useful.
     383        #
     384        # who defines read_encrypted?
     385        #  offloaded.LocalCiphertextReader: real disk file: exact
     386        #  upload.EncryptAnUploadable: Uploadable, but a wrapper that makes
     387        #    it exact. The return value is a list of 50KiB chunks, to reduce
     388        #    the memory footprint of the encryption process.
     389        #  repairer.Repairer: immutable.filenode.CiphertextFileNode: exact
     390        #
     391        # This has been redefined to require read_encrypted() to behave like
     392        # a local file: return exactly the amount requested unless it hits
     393        # EOF.
     394        #  -warner
    378395
    379396        if self._aborted:
    380397            raise UploadAborted()
    381398
    382         if not num_chunks:
    383             return defer.succeed(previous_chunks)
    384 
    385         d = self._uploadable.read_encrypted(input_chunk_size, False)
     399        read_size = num_chunks * input_chunk_size
     400        d = self._uploadable.read_encrypted(read_size, hash_only=False)
    386401        def _got(data):
     402            assert isinstance(data, (list,tuple))
    387403            if self._aborted:
    388404                raise UploadAborted()
    389             encrypted_pieces = []
    390             length = 0
    391             while data:
    392                 encrypted_piece = data.pop(0)
    393                 length += len(encrypted_piece)
    394                 crypttext_segment_hasher.update(encrypted_piece)
    395                 self._crypttext_hasher.update(encrypted_piece)
    396                 encrypted_pieces.append(encrypted_piece)
    397 
    398             precondition(length <= input_chunk_size,
    399                          "length=%d > input_chunk_size=%d" %
    400                          (length, input_chunk_size))
    401             if allow_short:
    402                 if length < input_chunk_size:
    403                     # padding
    404                     pad_size = input_chunk_size - length
    405                     encrypted_pieces.append('\x00' * pad_size)
    406             else:
    407                 # non-tail segments should be the full segment size
    408                 if length != input_chunk_size:
    409                     log.msg("non-tail segment should be full segment size: %d!=%d"
    410                             % (length, input_chunk_size),
    411                             level=log.BAD, umid="jNk5Yw")
    412                 precondition(length == input_chunk_size,
    413                              "length=%d != input_chunk_size=%d" %
    414                              (length, input_chunk_size))
    415 
    416             encrypted_piece = "".join(encrypted_pieces)
    417             return previous_chunks + [encrypted_piece]
    418 
     405            data = "".join(data)
     406            precondition(len(data) <= read_size, len(data), read_size)
     407            if not allow_short:
     408                precondition(len(data) == read_size, len(data), read_size)
     409            crypttext_segment_hasher.update(data)
     410            self._crypttext_hasher.update(data)
     411            if allow_short and len(data) < read_size:
     412                # padding
     413                data += "\x00" * (read_size - len(data))
     414            encrypted_pieces = [data[i:i+input_chunk_size]
     415                                for i in range(0, len(data), input_chunk_size)]
     416            return encrypted_pieces
    419417        d.addCallback(_got)
    420         d.addCallback(lambda chunks:
    421                       self._gather_data(num_chunks-1, input_chunk_size,
    422                                         crypttext_segment_hasher,
    423                                         allow_short, chunks))
    424418        return d
    425419
    426420    def _send_segment(self, (shares, shareids), segnum):
  • src/allmydata/interfaces.py

    diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
    index c5a47e1..48094a9 100644
    a b class IUploadable(Interface): 
    16221622
    16231623        If the data must be acquired through multiple internal read
    16241624        operations, returning a list instead of a single string may help to
    1625         reduce string copies.
     1625        reduce string copies. However, the length of the concatenated strings
     1626        must equal the amount of data requested, unless EOF is encountered.
     1627        Long reads, or short reads without EOF, are not allowed. read()
     1628        should return the same amount of data as a local disk file read, just
     1629        in a different shape and asynchronously.
    16261630
    16271631        'length' will typically be equal to (min(get_size(),1MB)/req_shares),
    16281632        so a 10kB file means length=3kB, 100kB file means length=30kB,
  • src/allmydata/test/test_repairer.py

    diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py
    index 49c4cff..942d327 100644
    a b class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin, 
    672672        return d
    673673    #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
    674674
     675    def test_tiny_reads(self):
     676        # ticket #1223 points out three problems:
     677        #   repairer reads beyond end of input file
     678        #   new-downloader does not tolerate overreads
     679        #   uploader does lots of tiny reads, inefficient
     680        self.basedir = "repairer/Repairer/test_tiny_reads"
     681        self.set_up_grid()
     682        c0 = self.g.clients[0]
     683        DATA = "a"*135
     684        c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22
     685        c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66
     686        d = c0.upload(upload.Data(DATA, convergence=""))
     687        def _then(ur):
     688            self.uri = ur.uri
     689            self.delete_shares_numbered(self.uri, [0])
     690            self.c0_filenode = c0.create_node_from_uri(ur.uri)
     691            self._stash_counts()
     692            return self.c0_filenode.check_and_repair(Monitor())
     693        d.addCallback(_then)
     694        def _check(ign):
     695            (r,a,w) = self._get_delta_counts()
     696            # when the uploader (driven by the repairer) does full-segment
     697            # reads, this makes 44 server read calls (2*k). Before, when it
     698            # was doing input_chunk_size reads (7 bytes), it was doing over
     699            # 400.
     700            self.failIf(r > 100, "too many reads: %d>100" % r)
     701        d.addCallback(_check)
     702        return d
     703
    675704
    676705# XXX extend these tests to show that the checker detects which specific
    677706# share on which specific server is broken -- this is necessary so that the