diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py
index 0268d8d..ce2c15f 100644
a
|
b
|
class Checker(log.PrefixingLogMixin): |
480 | 480 | fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(), |
481 | 481 | self._verifycap.get_storage_index()) |
482 | 482 | self.file_cancel_secret = fcs |
| 483 | self._num_active_block_fetches = 0 |
| 484 | self._max_active_block_fetches = 0 |
483 | 485 | |
484 | 486 | def _get_renewal_secret(self, seed): |
485 | 487 | return bucket_renewal_secret_hash(self.file_renewal_secret, seed) |
… |
… |
class Checker(log.PrefixingLogMixin): |
554 | 556 | f_value=str(f.value), |
555 | 557 | level=log.WEIRD, umid="hEGuQg") |
556 | 558 | |
| 559 | def _debug_start_block_fetch(self): |
| 560 | self._num_active_block_fetches += 1 |
| 561 | if self._num_active_block_fetches > self._max_active_block_fetches: |
| 562 | self._max_active_block_fetches = self._num_active_block_fetches |
| 563 | def _debug_finish_block_fetch(self): |
| 564 | self._num_active_block_fetches -= 1 |
557 | 565 | |
558 | 566 | def _download_and_verify(self, serverid, sharenum, bucket): |
559 | 567 | """Start an attempt to download and verify every block in this bucket |
… |
… |
class Checker(log.PrefixingLogMixin): |
612 | 620 | return d |
613 | 621 | d.addCallback(_got_ueb) |
614 | 622 | |
615 | | def _discard_result(r): |
616 | | assert isinstance(r, str), r |
617 | | # to free up the RAM |
618 | | return None |
619 | 623 | def _get_blocks(vrbp): |
620 | | ds = [] |
621 | | for blocknum in range(veup.num_segments): |
| 624 | def _discard_result(r): |
| 625 | assert isinstance(r, str), r |
| 626 | self._debug_finish_block_fetch() |
| 627 | # to free up the RAM |
| 628 | return None |
| 629 | def _get_block(ign, blocknum): |
| 630 | self._debug_start_block_fetch() |
622 | 631 | db = vrbp.get_block(blocknum) |
623 | 632 | db.addCallback(_discard_result) |
624 | | ds.append(db) |
625 | | # this gatherResults will fire once every block of this share has |
626 | | # been downloaded and verified, or else it will errback. |
627 | | return deferredutil.gatherResults(ds) |
| 633 | return db |
| 634 | dbs = defer.succeed(None) |
| 635 | for blocknum in range(veup.num_segments): |
| 636 | dbs.addCallback(_get_block, blocknum) |
| 637 | # The Deferred we return will fire after every block of this |
| 638 | # share has been downloaded and verified successfully, or else it |
| 639 | # will errback as soon as the first error is observed. |
| 640 | return dbs |
628 | 641 | d.addCallback(_get_blocks) |
629 | 642 | |
630 | 643 | # if none of those errbacked, the blocks (and the hashes above them) |
… |
… |
class Checker(log.PrefixingLogMixin): |
788 | 801 | cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good']) |
789 | 802 | |
790 | 803 | cr.set_data(d) |
| 804 | cr._debug_max_active_block_fetches = self._max_active_block_fetches |
791 | 805 | |
792 | 806 | return cr |
793 | 807 | |
diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py
index b302c10..4670018 100644
a
|
b
|
class AddLease(GridTestMixin, unittest.TestCase): |
319 | 319 | |
320 | 320 | d.addCallback(lambda ign: self.failUnless(really_did_break)) |
321 | 321 | return d |
| 322 | |
| 323 | class TooParallel(GridTestMixin, unittest.TestCase): |
| 324 | # bug #1395: immutable verifier was aggressively parallized, checking all |
| 325 | # blocks of all shares at the same time, blowing our memory budget and |
| 326 | # crashing with MemoryErrors on >1GB files. |
| 327 | |
| 328 | def test_immutable(self): |
| 329 | self.basedir = "checker/TooParallel/immutable" |
| 330 | self.set_up_grid(num_servers=4) |
| 331 | c0 = self.g.clients[0] |
| 332 | c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1, |
| 333 | "happy": 4, |
| 334 | "n": 4, |
| 335 | "max_segment_size": 5, |
| 336 | } |
| 337 | self.uris = {} |
| 338 | DATA = "data" * 100 # 400/5 = 80 blocks |
| 339 | d = c0.upload(Data(DATA, convergence="")) |
| 340 | def _do_check(ur): |
| 341 | n = c0.create_node_from_uri(ur.uri) |
| 342 | return n.check(Monitor(), verify=True) |
| 343 | d.addCallback(_do_check) |
| 344 | def _check(cr): |
| 345 | # the verifier works on all 4 shares in parallel, but only |
| 346 | # fetches one block from each share at a time, so we expect to |
| 347 | # see at most 4 parallel fetches |
| 348 | self.failUnlessEqual(cr._debug_max_active_block_fetches, 4) |
| 349 | d.addCallback(_check) |
| 350 | return d |
| 351 | |