1 | """ |
---|
2 | Tests for allmydata.storage.crawler. |
---|
3 | |
---|
4 | Ported to Python 3. |
---|
5 | """ |
---|
6 | |
---|
7 | |
---|
8 | import time |
---|
9 | import os.path |
---|
10 | from twisted.trial import unittest |
---|
11 | from twisted.application import service |
---|
12 | from twisted.internet import defer |
---|
13 | from foolscap.api import eventually, fireEventually |
---|
14 | |
---|
15 | from allmydata.util import fileutil, hashutil, pollmixin |
---|
16 | from allmydata.storage.server import StorageServer, si_b2a |
---|
17 | from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded |
---|
18 | |
---|
19 | from allmydata.test.common_util import StallMixin |
---|
20 | |
---|
21 | class BucketEnumeratingCrawler(ShareCrawler): |
---|
22 | cpu_slice = 500 # make sure it can complete in a single slice |
---|
23 | slow_start = 0 |
---|
24 | def __init__(self, *args, **kwargs): |
---|
25 | ShareCrawler.__init__(self, *args, **kwargs) |
---|
26 | self.all_buckets = [] |
---|
27 | self.finished_d = defer.Deferred() |
---|
28 | def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): |
---|
29 | # Bucket _inputs_ are bytes, and that's what we will compare this |
---|
30 | # to: |
---|
31 | storage_index_b32 = storage_index_b32.encode("ascii") |
---|
32 | self.all_buckets.append(storage_index_b32) |
---|
33 | def finished_cycle(self, cycle): |
---|
34 | eventually(self.finished_d.callback, None) |
---|
35 | |
---|
36 | class PacedCrawler(ShareCrawler): |
---|
37 | cpu_slice = 500 # make sure it can complete in a single slice |
---|
38 | slow_start = 0 |
---|
39 | def __init__(self, *args, **kwargs): |
---|
40 | ShareCrawler.__init__(self, *args, **kwargs) |
---|
41 | self.countdown = 6 |
---|
42 | self.all_buckets = [] |
---|
43 | self.finished_d = defer.Deferred() |
---|
44 | self.yield_cb = None |
---|
45 | def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): |
---|
46 | # Bucket _inputs_ are bytes, and that's what we will compare this |
---|
47 | # to: |
---|
48 | storage_index_b32 = storage_index_b32.encode("ascii") |
---|
49 | self.all_buckets.append(storage_index_b32) |
---|
50 | self.countdown -= 1 |
---|
51 | if self.countdown == 0: |
---|
52 | # force a timeout. We restore it in yielding() |
---|
53 | self.cpu_slice = -1.0 |
---|
54 | def yielding(self, sleep_time): |
---|
55 | self.cpu_slice = 500 |
---|
56 | if self.yield_cb: |
---|
57 | self.yield_cb() |
---|
58 | def finished_cycle(self, cycle): |
---|
59 | eventually(self.finished_d.callback, None) |
---|
60 | |
---|
61 | class ConsumingCrawler(ShareCrawler): |
---|
62 | cpu_slice = 0.5 |
---|
63 | allowed_cpu_percentage = 0.5 |
---|
64 | minimum_cycle_time = 0 |
---|
65 | slow_start = 0 |
---|
66 | |
---|
67 | def __init__(self, *args, **kwargs): |
---|
68 | ShareCrawler.__init__(self, *args, **kwargs) |
---|
69 | self.accumulated = 0.0 |
---|
70 | self.cycles = 0 |
---|
71 | self.last_yield = 0.0 |
---|
72 | def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): |
---|
73 | start = time.time() |
---|
74 | time.sleep(0.05) |
---|
75 | elapsed = time.time() - start |
---|
76 | self.accumulated += elapsed |
---|
77 | self.last_yield += elapsed |
---|
78 | def finished_cycle(self, cycle): |
---|
79 | self.cycles += 1 |
---|
80 | def yielding(self, sleep_time): |
---|
81 | self.last_yield = 0.0 |
---|
82 | |
---|
83 | class OneShotCrawler(ShareCrawler): |
---|
84 | cpu_slice = 500 # make sure it can complete in a single slice |
---|
85 | slow_start = 0 |
---|
86 | def __init__(self, *args, **kwargs): |
---|
87 | ShareCrawler.__init__(self, *args, **kwargs) |
---|
88 | self.counter = 0 |
---|
89 | self.finished_d = defer.Deferred() |
---|
90 | def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): |
---|
91 | self.counter += 1 |
---|
92 | def finished_cycle(self, cycle): |
---|
93 | self.finished_d.callback(None) |
---|
94 | self.disownServiceParent() |
---|
95 | |
---|
96 | class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin): |
---|
97 | def setUp(self): |
---|
98 | self.s = service.MultiService() |
---|
99 | self.s.startService() |
---|
100 | |
---|
101 | def tearDown(self): |
---|
102 | return self.s.stopService() |
---|
103 | |
---|
104 | def si(self, i): |
---|
105 | return hashutil.storage_index_hash(b"%d" % (i,)) |
---|
106 | def rs(self, i, serverid): |
---|
107 | return hashutil.bucket_renewal_secret_hash(b"%d" % (i,), serverid) |
---|
108 | def cs(self, i, serverid): |
---|
109 | return hashutil.bucket_cancel_secret_hash(b"%d" % (i,), serverid) |
---|
110 | |
---|
111 | def write(self, i, ss, serverid, tail=0): |
---|
112 | si = self.si(i) |
---|
113 | si = si[:-1] + bytes(bytearray((tail,))) |
---|
114 | had,made = ss.allocate_buckets(si, |
---|
115 | self.rs(i, serverid), |
---|
116 | self.cs(i, serverid), |
---|
117 | set([0]), 99) |
---|
118 | made[0].write(0, b"data") |
---|
119 | made[0].close() |
---|
120 | return si_b2a(si) |
---|
121 | |
---|
122 | def test_immediate(self): |
---|
123 | self.basedir = "crawler/Basic/immediate" |
---|
124 | fileutil.make_dirs(self.basedir) |
---|
125 | serverid = b"\x00" * 20 |
---|
126 | ss = StorageServer(self.basedir, serverid) |
---|
127 | ss.setServiceParent(self.s) |
---|
128 | |
---|
129 | sis = [self.write(i, ss, serverid) for i in range(10)] |
---|
130 | statefile = os.path.join(self.basedir, "statefile") |
---|
131 | |
---|
132 | c = BucketEnumeratingCrawler(ss, statefile, allowed_cpu_percentage=.1) |
---|
133 | c.load_state() |
---|
134 | |
---|
135 | c.start_current_prefix(time.time()) |
---|
136 | self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) |
---|
137 | |
---|
138 | # make sure the statefile has been returned to the starting point |
---|
139 | c.finished_d = defer.Deferred() |
---|
140 | c.all_buckets = [] |
---|
141 | c.start_current_prefix(time.time()) |
---|
142 | self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) |
---|
143 | |
---|
144 | # check that a new crawler picks up on the state file properly |
---|
145 | c2 = BucketEnumeratingCrawler(ss, statefile) |
---|
146 | c2.load_state() |
---|
147 | |
---|
148 | c2.start_current_prefix(time.time()) |
---|
149 | self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets)) |
---|
150 | |
---|
151 | def test_service(self): |
---|
152 | self.basedir = "crawler/Basic/service" |
---|
153 | fileutil.make_dirs(self.basedir) |
---|
154 | serverid = b"\x00" * 20 |
---|
155 | ss = StorageServer(self.basedir, serverid) |
---|
156 | ss.setServiceParent(self.s) |
---|
157 | |
---|
158 | sis = [self.write(i, ss, serverid) for i in range(10)] |
---|
159 | |
---|
160 | statefile = os.path.join(self.basedir, "statefile") |
---|
161 | c = BucketEnumeratingCrawler(ss, statefile) |
---|
162 | c.setServiceParent(self.s) |
---|
163 | |
---|
164 | # it should be legal to call get_state() and get_progress() right |
---|
165 | # away, even before the first tick is performed. No work should have |
---|
166 | # been done yet. |
---|
167 | s = c.get_state() |
---|
168 | p = c.get_progress() |
---|
169 | self.failUnlessEqual(s["last-complete-prefix"], None) |
---|
170 | self.failUnlessEqual(s["current-cycle"], None) |
---|
171 | self.failUnlessEqual(p["cycle-in-progress"], False) |
---|
172 | |
---|
173 | d = c.finished_d |
---|
174 | def _check(ignored): |
---|
175 | self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) |
---|
176 | d.addCallback(_check) |
---|
177 | return d |
---|
178 | |
---|
179 | def test_paced(self): |
---|
180 | self.basedir = "crawler/Basic/paced" |
---|
181 | fileutil.make_dirs(self.basedir) |
---|
182 | serverid = b"\x00" * 20 |
---|
183 | ss = StorageServer(self.basedir, serverid) |
---|
184 | ss.setServiceParent(self.s) |
---|
185 | |
---|
186 | # put four buckets in each prefixdir |
---|
187 | sis = [] |
---|
188 | for i in range(10): |
---|
189 | for tail in range(4): |
---|
190 | sis.append(self.write(i, ss, serverid, tail)) |
---|
191 | |
---|
192 | statefile = os.path.join(self.basedir, "statefile") |
---|
193 | |
---|
194 | c = PacedCrawler(ss, statefile) |
---|
195 | c.load_state() |
---|
196 | try: |
---|
197 | c.start_current_prefix(time.time()) |
---|
198 | except TimeSliceExceeded: |
---|
199 | pass |
---|
200 | # that should stop in the middle of one of the buckets. Since we |
---|
201 | # aren't using its normal scheduler, we have to save its state |
---|
202 | # manually. |
---|
203 | c.save_state() |
---|
204 | c.cpu_slice = PacedCrawler.cpu_slice |
---|
205 | self.failUnlessEqual(len(c.all_buckets), 6) |
---|
206 | |
---|
207 | c.start_current_prefix(time.time()) # finish it |
---|
208 | self.failUnlessEqual(len(sis), len(c.all_buckets)) |
---|
209 | self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) |
---|
210 | |
---|
211 | # make sure the statefile has been returned to the starting point |
---|
212 | c.finished_d = defer.Deferred() |
---|
213 | c.all_buckets = [] |
---|
214 | c.start_current_prefix(time.time()) |
---|
215 | self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) |
---|
216 | del c |
---|
217 | |
---|
218 | # start a new crawler, it should start from the beginning |
---|
219 | c = PacedCrawler(ss, statefile) |
---|
220 | c.load_state() |
---|
221 | try: |
---|
222 | c.start_current_prefix(time.time()) |
---|
223 | except TimeSliceExceeded: |
---|
224 | pass |
---|
225 | # that should stop in the middle of one of the buckets. Since we |
---|
226 | # aren't using its normal scheduler, we have to save its state |
---|
227 | # manually. |
---|
228 | c.save_state() |
---|
229 | c.cpu_slice = PacedCrawler.cpu_slice |
---|
230 | |
---|
231 | # a third crawler should pick up from where it left off |
---|
232 | c2 = PacedCrawler(ss, statefile) |
---|
233 | c2.all_buckets = c.all_buckets[:] |
---|
234 | c2.load_state() |
---|
235 | c2.countdown = -1 |
---|
236 | c2.start_current_prefix(time.time()) |
---|
237 | self.failUnlessEqual(len(sis), len(c2.all_buckets)) |
---|
238 | self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets)) |
---|
239 | del c, c2 |
---|
240 | |
---|
241 | # now stop it at the end of a bucket (countdown=4), to exercise a |
---|
242 | # different place that checks the time |
---|
243 | c = PacedCrawler(ss, statefile) |
---|
244 | c.load_state() |
---|
245 | c.countdown = 4 |
---|
246 | try: |
---|
247 | c.start_current_prefix(time.time()) |
---|
248 | except TimeSliceExceeded: |
---|
249 | pass |
---|
250 | # that should stop at the end of one of the buckets. Again we must |
---|
251 | # save state manually. |
---|
252 | c.save_state() |
---|
253 | c.cpu_slice = PacedCrawler.cpu_slice |
---|
254 | self.failUnlessEqual(len(c.all_buckets), 4) |
---|
255 | c.start_current_prefix(time.time()) # finish it |
---|
256 | self.failUnlessEqual(len(sis), len(c.all_buckets)) |
---|
257 | self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) |
---|
258 | del c |
---|
259 | |
---|
260 | # stop it again at the end of the bucket, check that a new checker |
---|
261 | # picks up correctly |
---|
262 | c = PacedCrawler(ss, statefile) |
---|
263 | c.load_state() |
---|
264 | c.countdown = 4 |
---|
265 | try: |
---|
266 | c.start_current_prefix(time.time()) |
---|
267 | except TimeSliceExceeded: |
---|
268 | pass |
---|
269 | # that should stop at the end of one of the buckets. |
---|
270 | c.save_state() |
---|
271 | |
---|
272 | c2 = PacedCrawler(ss, statefile) |
---|
273 | c2.all_buckets = c.all_buckets[:] |
---|
274 | c2.load_state() |
---|
275 | c2.countdown = -1 |
---|
276 | c2.start_current_prefix(time.time()) |
---|
277 | self.failUnlessEqual(len(sis), len(c2.all_buckets)) |
---|
278 | self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets)) |
---|
279 | del c, c2 |
---|
280 | |
---|
281 | def test_paced_service(self): |
---|
282 | self.basedir = "crawler/Basic/paced_service" |
---|
283 | fileutil.make_dirs(self.basedir) |
---|
284 | serverid = b"\x00" * 20 |
---|
285 | ss = StorageServer(self.basedir, serverid) |
---|
286 | ss.setServiceParent(self.s) |
---|
287 | |
---|
288 | sis = [self.write(i, ss, serverid) for i in range(10)] |
---|
289 | |
---|
290 | statefile = os.path.join(self.basedir, "statefile") |
---|
291 | c = PacedCrawler(ss, statefile) |
---|
292 | |
---|
293 | did_check_progress = [False] |
---|
294 | def check_progress(): |
---|
295 | c.yield_cb = None |
---|
296 | try: |
---|
297 | p = c.get_progress() |
---|
298 | self.failUnlessEqual(p["cycle-in-progress"], True) |
---|
299 | pct = p["cycle-complete-percentage"] |
---|
300 | # after 6 buckets, we happen to be at 76.17% complete. As |
---|
301 | # long as we create shares in deterministic order, this will |
---|
302 | # continue to be true. |
---|
303 | self.failUnlessEqual(int(pct), 76) |
---|
304 | left = p["remaining-sleep-time"] |
---|
305 | self.failUnless(isinstance(left, float), left) |
---|
306 | self.failUnless(left > 0.0, left) |
---|
307 | except Exception as e: |
---|
308 | did_check_progress[0] = e |
---|
309 | else: |
---|
310 | did_check_progress[0] = True |
---|
311 | c.yield_cb = check_progress |
---|
312 | |
---|
313 | c.setServiceParent(self.s) |
---|
314 | # that should get through 6 buckets, pause for a little while (and |
---|
315 | # run check_progress()), then resume |
---|
316 | |
---|
317 | d = c.finished_d |
---|
318 | def _check(ignored): |
---|
319 | if did_check_progress[0] is not True: |
---|
320 | raise did_check_progress[0] |
---|
321 | self.failUnless(did_check_progress[0]) |
---|
322 | self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) |
---|
323 | # at this point, the crawler should be sitting in the inter-cycle |
---|
324 | # timer, which should be pegged at the minumum cycle time |
---|
325 | self.failUnless(c.timer) |
---|
326 | self.failUnless(c.sleeping_between_cycles) |
---|
327 | self.failUnlessEqual(c.current_sleep_time, c.minimum_cycle_time) |
---|
328 | |
---|
329 | p = c.get_progress() |
---|
330 | self.failUnlessEqual(p["cycle-in-progress"], False) |
---|
331 | naptime = p["remaining-wait-time"] |
---|
332 | self.failUnless(isinstance(naptime, float), naptime) |
---|
333 | # min-cycle-time is 300, so this is basically testing that it took |
---|
334 | # less than 290s to crawl |
---|
335 | self.failUnless(naptime > 10.0, naptime) |
---|
336 | soon = p["next-crawl-time"] - time.time() |
---|
337 | self.failUnless(soon > 10.0, soon) |
---|
338 | |
---|
339 | d.addCallback(_check) |
---|
340 | return d |
---|
341 | |
---|
342 | def OFF_test_cpu_usage(self): |
---|
343 | # this test can't actually assert anything, because too many |
---|
344 | # buildslave machines are slow. But on a fast developer machine, it |
---|
345 | # can produce interesting results. So if you care about how well the |
---|
346 | # Crawler is accomplishing it's run-slowly goals, re-enable this test |
---|
347 | # and read the stdout when it runs. |
---|
348 | |
---|
349 | self.basedir = "crawler/Basic/cpu_usage" |
---|
350 | fileutil.make_dirs(self.basedir) |
---|
351 | serverid = b"\x00" * 20 |
---|
352 | ss = StorageServer(self.basedir, serverid) |
---|
353 | ss.setServiceParent(self.s) |
---|
354 | |
---|
355 | for i in range(10): |
---|
356 | self.write(i, ss, serverid) |
---|
357 | |
---|
358 | statefile = os.path.join(self.basedir, "statefile") |
---|
359 | c = ConsumingCrawler(ss, statefile) |
---|
360 | c.setServiceParent(self.s) |
---|
361 | |
---|
362 | # this will run as fast as it can, consuming about 50ms per call to |
---|
363 | # process_bucket(), limited by the Crawler to about 50% cpu. We let |
---|
364 | # it run for a few seconds, then compare how much time |
---|
365 | # process_bucket() got vs wallclock time. It should get between 10% |
---|
366 | # and 70% CPU. This is dicey, there's about 100ms of overhead per |
---|
367 | # 300ms slice (saving the state file takes about 150-200us, but we do |
---|
368 | # it 1024 times per cycle, one for each [empty] prefixdir), leaving |
---|
369 | # 200ms for actual processing, which is enough to get through 4 |
---|
370 | # buckets each slice, then the crawler sleeps for 300ms/0.5 = 600ms, |
---|
371 | # giving us 900ms wallclock per slice. In 4.0 seconds we can do 4.4 |
---|
372 | # slices, giving us about 17 shares, so we merely assert that we've |
---|
373 | # finished at least one cycle in that time. |
---|
374 | |
---|
375 | # with a short cpu_slice (so we can keep this test down to 4 |
---|
376 | # seconds), the overhead is enough to make a nominal 50% usage more |
---|
377 | # like 30%. Forcing sleep_time to 0 only gets us 67% usage. |
---|
378 | |
---|
379 | start = time.time() |
---|
380 | d = self.stall(delay=4.0) |
---|
381 | def _done(res): |
---|
382 | elapsed = time.time() - start |
---|
383 | percent = 100.0 * c.accumulated / elapsed |
---|
384 | # our buildslaves vary too much in their speeds and load levels, |
---|
385 | # and many of them only manage to hit 7% usage when our target is |
---|
386 | # 50%. So don't assert anything about the results, just log them. |
---|
387 | print() |
---|
388 | print("crawler: got %d%% percent when trying for 50%%" % percent) |
---|
389 | print("crawler: got %d full cycles" % c.cycles) |
---|
390 | d.addCallback(_done) |
---|
391 | return d |
---|
392 | |
---|
393 | def test_empty_subclass(self): |
---|
394 | self.basedir = "crawler/Basic/empty_subclass" |
---|
395 | fileutil.make_dirs(self.basedir) |
---|
396 | serverid = b"\x00" * 20 |
---|
397 | ss = StorageServer(self.basedir, serverid) |
---|
398 | ss.setServiceParent(self.s) |
---|
399 | |
---|
400 | for i in range(10): |
---|
401 | self.write(i, ss, serverid) |
---|
402 | |
---|
403 | statefile = os.path.join(self.basedir, "statefile") |
---|
404 | c = ShareCrawler(ss, statefile) |
---|
405 | c.slow_start = 0 |
---|
406 | c.setServiceParent(self.s) |
---|
407 | |
---|
408 | # we just let it run for a while, to get figleaf coverage of the |
---|
409 | # empty methods in the base class |
---|
410 | |
---|
411 | def _check(): |
---|
412 | return bool(c.state["last-cycle-finished"] is not None) |
---|
413 | d = self.poll(_check) |
---|
414 | def _done(ignored): |
---|
415 | state = c.get_state() |
---|
416 | self.failUnless(state["last-cycle-finished"] is not None) |
---|
417 | d.addCallback(_done) |
---|
418 | return d |
---|
419 | |
---|
420 | |
---|
421 | def test_oneshot(self): |
---|
422 | self.basedir = "crawler/Basic/oneshot" |
---|
423 | fileutil.make_dirs(self.basedir) |
---|
424 | serverid = b"\x00" * 20 |
---|
425 | ss = StorageServer(self.basedir, serverid) |
---|
426 | ss.setServiceParent(self.s) |
---|
427 | |
---|
428 | for i in range(30): |
---|
429 | self.write(i, ss, serverid) |
---|
430 | |
---|
431 | statefile = os.path.join(self.basedir, "statefile") |
---|
432 | c = OneShotCrawler(ss, statefile) |
---|
433 | c.setServiceParent(self.s) |
---|
434 | |
---|
435 | d = c.finished_d |
---|
436 | def _finished_first_cycle(ignored): |
---|
437 | return fireEventually(c.counter) |
---|
438 | d.addCallback(_finished_first_cycle) |
---|
439 | def _check(old_counter): |
---|
440 | # the crawler should do any work after it's been stopped |
---|
441 | self.failUnlessEqual(old_counter, c.counter) |
---|
442 | self.failIf(c.running) |
---|
443 | self.failIf(c.timer) |
---|
444 | self.failIf(c.current_sleep_time) |
---|
445 | s = c.get_state() |
---|
446 | self.failUnlessEqual(s["last-cycle-finished"], 0) |
---|
447 | self.failUnlessEqual(s["current-cycle"], None) |
---|
448 | d.addCallback(_check) |
---|
449 | return d |
---|
450 | |
---|