Ticket #1395: 1395-overparallel.diff

File 1395-overparallel.diff, 4.6 KB (added by warner, at 2011-06-11T19:37:12Z)

serialize block-fetches, add test

  • src/allmydata/immutable/checker.py

    diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py
    index 0268d8d..ce2c15f 100644
    a b class Checker(log.PrefixingLogMixin): 
    480480        fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
    481481                                      self._verifycap.get_storage_index())
    482482        self.file_cancel_secret = fcs
     483        self._num_active_block_fetches = 0
     484        self._max_active_block_fetches = 0
    483485
    484486    def _get_renewal_secret(self, seed):
    485487        return bucket_renewal_secret_hash(self.file_renewal_secret, seed)
    class Checker(log.PrefixingLogMixin): 
    554556                f_value=str(f.value),
    555557                level=log.WEIRD, umid="hEGuQg")
    556558
     559    def _debug_start_block_fetch(self):
     560        self._num_active_block_fetches += 1
     561        if self._num_active_block_fetches > self._max_active_block_fetches:
     562            self._max_active_block_fetches = self._num_active_block_fetches
     563    def _debug_finish_block_fetch(self):
     564        self._num_active_block_fetches -= 1
    557565
    558566    def _download_and_verify(self, serverid, sharenum, bucket):
    559567        """Start an attempt to download and verify every block in this bucket
    class Checker(log.PrefixingLogMixin): 
    612620            return d
    613621        d.addCallback(_got_ueb)
    614622
    615         def _discard_result(r):
    616             assert isinstance(r, str), r
    617             # to free up the RAM
    618             return None
    619623        def _get_blocks(vrbp):
    620             ds = []
    621             for blocknum in range(veup.num_segments):
     624            def _discard_result(r):
     625                assert isinstance(r, str), r
     626                self._debug_finish_block_fetch()
     627                # to free up the RAM
     628                return None
     629            def _get_block(ign, blocknum):
     630                self._debug_start_block_fetch()
    622631                db = vrbp.get_block(blocknum)
    623632                db.addCallback(_discard_result)
    624                 ds.append(db)
    625             # this gatherResults will fire once every block of this share has
    626             # been downloaded and verified, or else it will errback.
    627             return deferredutil.gatherResults(ds)
     633                return db
     634            dbs = defer.succeed(None)
     635            for blocknum in range(veup.num_segments):
     636                dbs.addCallback(_get_block, blocknum)
     637            # The Deferred we return will fire after every block of this
     638            # share has been downloaded and verified successfully, or else it
     639            # will errback as soon as the first error is observed.
     640            return dbs
    628641        d.addCallback(_get_blocks)
    629642
    630643        # if none of those errbacked, the blocks (and the hashes above them)
    class Checker(log.PrefixingLogMixin): 
    788801        cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good'])
    789802
    790803        cr.set_data(d)
     804        cr._debug_max_active_block_fetches = self._max_active_block_fetches
    791805
    792806        return cr
    793807
  • src/allmydata/test/test_checker.py

    diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py
    index b302c10..4670018 100644
    a b class AddLease(GridTestMixin, unittest.TestCase): 
    319319
    320320        d.addCallback(lambda ign: self.failUnless(really_did_break))
    321321        return d
     322
     323class TooParallel(GridTestMixin, unittest.TestCase):
     324    # bug #1395: immutable verifier was aggressively parallized, checking all
     325    # blocks of all shares at the same time, blowing our memory budget and
     326    # crashing with MemoryErrors on >1GB files.
     327
     328    def test_immutable(self):
     329        self.basedir = "checker/TooParallel/immutable"
     330        self.set_up_grid(num_servers=4)
     331        c0 = self.g.clients[0]
     332        c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1,
     333                                           "happy": 4,
     334                                           "n": 4,
     335                                           "max_segment_size": 5,
     336                                           }
     337        self.uris = {}
     338        DATA = "data" * 100 # 400/5 = 80 blocks
     339        d = c0.upload(Data(DATA, convergence=""))
     340        def _do_check(ur):
     341            n = c0.create_node_from_uri(ur.uri)
     342            return n.check(Monitor(), verify=True)
     343        d.addCallback(_do_check)
     344        def _check(cr):
     345            # the verifier works on all 4 shares in parallel, but only
     346            # fetches one block from each share at a time, so we expect to
     347            # see at most 4 parallel fetches
     348            self.failUnlessEqual(cr._debug_max_active_block_fetches, 4)
     349        d.addCallback(_check)
     350        return d
     351