Ticket #1395: not-too-parallel-test-by-patching.diff

File not-too-parallel-test-by-patching.diff, 6.8 KB (added by zooko, at 2011-06-17T06:09:26Z)
  • src/allmydata/immutable/checker.py

    diff -rN -u old-ticket1395/src/allmydata/immutable/checker.py new-ticket1395/src/allmydata/immutable/checker.py
    old new  
    617617            # to free up the RAM
    618618            return None
    619619        def _get_blocks(vrbp):
    620             ds = []
    621             for blocknum in range(veup.num_segments):
     620            def _get_block(ign, blocknum):
    622621                db = vrbp.get_block(blocknum)
    623622                db.addCallback(_discard_result)
    624                 ds.append(db)
    625             # this gatherResults will fire once every block of this share has
    626             # been downloaded and verified, or else it will errback.
    627             return deferredutil.gatherResults(ds)
     623                return db
     624            dbs = defer.succeed(None)
     625            for blocknum in range(veup.num_segments):
     626                dbs.addCallback(_get_block, blocknum)
     627                # The Deferred we return will fire after every block of this
     628                # share has been downloaded and verified successfully, or else it
     629                # will errback as soon as the first error is observed.
     630                return dbs
     631
    628632        d.addCallback(_get_blocks)
    629633
    630634        # if none of those errbacked, the blocks (and the hashes above them)
  • src/allmydata/test/test_checker.py

    diff -rN -u old-ticket1395/src/allmydata/test/test_checker.py new-ticket1395/src/allmydata/test/test_checker.py
    old new  
     1from allmydata.util import mockutil
    12
    23import simplejson
    34from twisted.trial import unittest
     
    319320
    320321        d.addCallback(lambda ign: self.failUnless(really_did_break))
    321322        return d
     323
     324class CounterHolder(object):
     325    def __init__(self):
     326        self._num_active_block_fetches = 0
     327        self._max_active_block_fetches = 0
     328
     329from allmydata.immutable.checker import ValidatedReadBucketProxy
     330class MockVRBP(ValidatedReadBucketProxy):
     331    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder):
     332        ValidatedReadBucketProxy.__init__(self, sharenum, bucket,
     333                                          share_hash_tree, num_blocks,
     334                                          block_size, share_size)
     335        self.counterholder = counterholder
     336
     337    def get_block(self, blocknum):
     338        self.counterholder._num_active_block_fetches += 1
     339        if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches:
     340            self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches
     341        d = ValidatedReadBucketProxy.get_block(self, blocknum)
     342        def _mark_no_longer_active(res):
     343            self.counterholder._num_active_block_fetches -= 1
     344            return res
     345        d.addBoth(_mark_no_longer_active)
     346        return d
     347
     348class TooParallel(GridTestMixin, unittest.TestCase):
     349    # bug #1395: immutable verifier was aggressively parallized, checking all
     350    # blocks of all shares at the same time, blowing our memory budget and
     351    # crashing with MemoryErrors on >1GB files.
     352
     353    @mockutil.patch('allmydata.immutable.checker.ValidatedReadBucketProxy')
     354    def test_immutable(self, mockVRBPC):
     355        self.basedir = "checker/TooParallel/immutable"
     356
     357        # If any code asks to instantiate a ValidatedReadBucketProxy,
     358        # we give them a MockVRBP which is configured to use our
     359        # CounterHolder.
     360        counterholder = CounterHolder()
     361        def make_mock_VRBP(*args, **kwargs):
     362            return MockVRBP(counterholder=counterholder, *args, **kwargs)
     363        mockVRBPC.side_effect = make_mock_VRBP
     364
     365        self.set_up_grid(num_servers=4)
     366        c0 = self.g.clients[0]
     367        c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1,
     368                                           "happy": 4,
     369                                           "n": 4,
     370                                           "max_segment_size": 5,
     371                                           }
     372        self.uris = {}
     373        DATA = "data" * 100 # 400/5 = 80 blocks
     374        d = c0.upload(Data(DATA, convergence=""))
     375        def _do_check(ur):
     376            n = c0.create_node_from_uri(ur.uri)
     377            return n.check(Monitor(), verify=True)
     378        d.addCallback(_do_check)
     379        def _check(cr):
     380            # the verifier works on all 4 shares in parallel, but only
     381            # fetches one block from each share at a time, so we expect to
     382            # see 4 parallel fetches
     383            self.failUnlessEqual(counterholder._max_active_block_fetches, 4)
     384        d.addCallback(_check)
     385        return d
     386    test_immutable.timeout = 10
  • src/allmydata/util/mockutil.py

    diff -rN -u old-ticket1395/src/allmydata/util/mockutil.py new-ticket1395/src/allmydata/util/mockutil.py
    old new  
     1import mock
     2
     3from mock import wraps, DEFAULT, _importer
     4from mock import _patch as original_under_patch
     5
     6Deferred = None
     7try:
     8    from twisted.internet import defer
     9    Deferred = defer.Deferred
     10except ImportError:
     11    pass
     12
     13# copied from Michael Foord's mock.py and modified
     14
     15class _deferrable_under_patch(original_under_patch):
     16    def decorate_callable(self, func):
     17        if hasattr(func, 'patchings'):
     18            func.patchings.append(self)
     19            return func
     20
     21        @wraps(func)
     22        def patched(*args, **keywargs):
     23            # don't use a with here (backwards compatability with 2.5)
     24            extra_args = []
     25            for patching in patched.patchings:
     26                arg = patching.__enter__()
     27                if patching.new is DEFAULT:
     28                    extra_args.append(arg)
     29            args += tuple(extra_args)
     30            def cleanup(res):
     31                for patching in reversed(getattr(patched, 'patchings', [])):
     32                    patching.__exit__()
     33                return res
     34            singleton = {}
     35            retval = singleton
     36            try:
     37                retval = func(*args, **keywargs)
     38            except:
     39                cleanup(None)
     40                raise
     41            if Deferred is None or not isinstance(retval, Deferred):
     42                return cleanup(retval)
     43            retval.addBoth(cleanup)
     44            return retval
     45
     46        patched.patchings = [self]
     47        if hasattr(func, 'func_code'):
     48            # not in Python 3
     49            patched.compat_co_firstlineno = getattr(
     50                func, "compat_co_firstlineno",
     51                func.func_code.co_firstlineno
     52            )
     53        return patched