| 321 | |
| 322 | class CounterHolder(object): |
| 323 | def __init__(self): |
| 324 | self._num_active_block_fetches = 0 |
| 325 | self._max_active_block_fetches = 0 |
| 326 | |
| 327 | from allmydata.immutable.checker import ValidatedReadBucketProxy |
| 328 | class MockVRBP(ValidatedReadBucketProxy): |
| 329 | def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder): |
| 330 | ValidatedReadBucketProxy.__init__(self, sharenum, bucket, |
| 331 | share_hash_tree, num_blocks, |
| 332 | block_size, share_size) |
| 333 | self.counterholder = counterholder |
| 334 | |
| 335 | def get_block(self, blocknum): |
| 336 | self.counterholder._num_active_block_fetches += 1 |
| 337 | if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches: |
| 338 | self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches |
| 339 | d = ValidatedReadBucketProxy.get_block(self, blocknum) |
| 340 | def _mark_no_longer_active(res): |
| 341 | self.counterholder._num_active_block_fetches -= 1 |
| 342 | return res |
| 343 | d.addBoth(_mark_no_longer_active) |
| 344 | return d |
| 345 | |
| 346 | class TooParallel(GridTestMixin, unittest.TestCase): |
| 347 | # bug #1395: immutable verifier was aggressively parallized, checking all |
| 348 | # blocks of all shares at the same time, blowing our memory budget and |
| 349 | # crashing with MemoryErrors on >1GB files. |
| 350 | |
| 351 | def test_immutable(self): |
| 352 | import allmydata.immutable.checker |
| 353 | origVRBP = allmydata.immutable.checker.ValidatedReadBucketProxy |
| 354 | |
| 355 | self.basedir = "checker/TooParallel/immutable" |
| 356 | |
| 357 | # If any code asks to instantiate a ValidatedReadBucketProxy, |
| 358 | # we give them a MockVRBP which is configured to use our |
| 359 | # CounterHolder. |
| 360 | counterholder = CounterHolder() |
| 361 | def make_mock_VRBP(*args, **kwargs): |
| 362 | return MockVRBP(counterholder=counterholder, *args, **kwargs) |
| 363 | allmydata.immutable.checker.ValidatedReadBucketProxy = make_mock_VRBP |
| 364 | |
| 365 | self.set_up_grid(num_servers=4) |
| 366 | c0 = self.g.clients[0] |
| 367 | c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1, |
| 368 | "happy": 4, |
| 369 | "n": 4, |
| 370 | "max_segment_size": 5, |
| 371 | } |
| 372 | self.uris = {} |
| 373 | DATA = "data" * 100 # 400/5 = 80 blocks |
| 374 | d = c0.upload(Data(DATA, convergence="")) |
| 375 | def _do_check(ur): |
| 376 | n = c0.create_node_from_uri(ur.uri) |
| 377 | return n.check(Monitor(), verify=True) |
| 378 | d.addCallback(_do_check) |
| 379 | def _check(cr): |
| 380 | # the verifier works on all 4 shares in parallel, but only |
| 381 | # fetches one block from each share at a time, so we expect to |
| 382 | # see 4 parallel fetches |
| 383 | self.failUnlessEqual(counterholder._max_active_block_fetches, 4) |
| 384 | d.addCallback(_check) |
| 385 | def _clean_up(res): |
| 386 | allmydata.immutable.checker.ValidatedReadBucketProxy = origVRBP |
| 387 | return res |
| 388 | d.addBoth(_clean_up) |
| 389 | return d |
| 390 | test_immutable.timeout = 10 |