Ticket #1395: not-too-parallel-test-by-low-tech-patching.diff

File not-too-parallel-test-by-low-tech-patching.diff, 5.0 KB (added by zooko, at 2011-06-17T06:15:36Z)
  • src/allmydata/immutable/checker.py

    diff -rN -u old-ticket1395/src/allmydata/immutable/checker.py new-ticket1395/src/allmydata/immutable/checker.py
    old new  
    617617            # to free up the RAM
    618618            return None
    619619        def _get_blocks(vrbp):
    620             ds = []
    621             for blocknum in range(veup.num_segments):
     620            def _get_block(ign, blocknum):
    622621                db = vrbp.get_block(blocknum)
    623622                db.addCallback(_discard_result)
    624                 ds.append(db)
    625             # this gatherResults will fire once every block of this share has
    626             # been downloaded and verified, or else it will errback.
    627             return deferredutil.gatherResults(ds)
     623                return db
     624            dbs = defer.succeed(None)
     625            for blocknum in range(veup.num_segments):
     626                dbs.addCallback(_get_block, blocknum)
     627                # The Deferred we return will fire after every block of this
     628                # share has been downloaded and verified successfully, or else it
     629                # will errback as soon as the first error is observed.
     630                return dbs
     631
    628632        d.addCallback(_get_blocks)
    629633
    630634        # if none of those errbacked, the blocks (and the hashes above them)
  • src/allmydata/test/test_checker.py

    diff -rN -u old-ticket1395/src/allmydata/test/test_checker.py new-ticket1395/src/allmydata/test/test_checker.py
    old new  
    1 
    21import simplejson
    32from twisted.trial import unittest
    43from allmydata import check_results, uri
     
    319318
    320319        d.addCallback(lambda ign: self.failUnless(really_did_break))
    321320        return d
     321
     322class CounterHolder(object):
     323    def __init__(self):
     324        self._num_active_block_fetches = 0
     325        self._max_active_block_fetches = 0
     326
     327from allmydata.immutable.checker import ValidatedReadBucketProxy
     328class MockVRBP(ValidatedReadBucketProxy):
     329    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder):
     330        ValidatedReadBucketProxy.__init__(self, sharenum, bucket,
     331                                          share_hash_tree, num_blocks,
     332                                          block_size, share_size)
     333        self.counterholder = counterholder
     334
     335    def get_block(self, blocknum):
     336        self.counterholder._num_active_block_fetches += 1
     337        if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches:
     338            self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches
     339        d = ValidatedReadBucketProxy.get_block(self, blocknum)
     340        def _mark_no_longer_active(res):
     341            self.counterholder._num_active_block_fetches -= 1
     342            return res
     343        d.addBoth(_mark_no_longer_active)
     344        return d
     345
     346class TooParallel(GridTestMixin, unittest.TestCase):
     347    # bug #1395: immutable verifier was aggressively parallized, checking all
     348    # blocks of all shares at the same time, blowing our memory budget and
     349    # crashing with MemoryErrors on >1GB files.
     350
     351    def test_immutable(self):
     352        import allmydata.immutable.checker
     353        origVRBP = allmydata.immutable.checker.ValidatedReadBucketProxy
     354
     355        self.basedir = "checker/TooParallel/immutable"
     356
     357        # If any code asks to instantiate a ValidatedReadBucketProxy,
     358        # we give them a MockVRBP which is configured to use our
     359        # CounterHolder.
     360        counterholder = CounterHolder()
     361        def make_mock_VRBP(*args, **kwargs):
     362            return MockVRBP(counterholder=counterholder, *args, **kwargs)
     363        allmydata.immutable.checker.ValidatedReadBucketProxy = make_mock_VRBP
     364
     365        self.set_up_grid(num_servers=4)
     366        c0 = self.g.clients[0]
     367        c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1,
     368                                           "happy": 4,
     369                                           "n": 4,
     370                                           "max_segment_size": 5,
     371                                           }
     372        self.uris = {}
     373        DATA = "data" * 100 # 400/5 = 80 blocks
     374        d = c0.upload(Data(DATA, convergence=""))
     375        def _do_check(ur):
     376            n = c0.create_node_from_uri(ur.uri)
     377            return n.check(Monitor(), verify=True)
     378        d.addCallback(_do_check)
     379        def _check(cr):
     380            # the verifier works on all 4 shares in parallel, but only
     381            # fetches one block from each share at a time, so we expect to
     382            # see 4 parallel fetches
     383            self.failUnlessEqual(counterholder._max_active_block_fetches, 4)
     384        d.addCallback(_check)
     385        def _clean_up(res):
     386            allmydata.immutable.checker.ValidatedReadBucketProxy = origVRBP
     387            return res
     388        d.addBoth(_clean_up)
     389        return d
     390    test_immutable.timeout = 10