1 | """ |
---|
2 | Crawl the storage server shares. |
---|
3 | |
---|
4 | Ported to Python 3. |
---|
5 | """ |
---|
6 | |
---|
7 | import os |
---|
8 | import time |
---|
9 | import json |
---|
10 | import struct |
---|
11 | from twisted.internet import reactor |
---|
12 | from twisted.application import service |
---|
13 | from twisted.python.filepath import FilePath |
---|
14 | from allmydata.storage.common import si_b2a |
---|
15 | from allmydata.util import fileutil |
---|
16 | |
---|
17 | class TimeSliceExceeded(Exception): |
---|
18 | pass |
---|
19 | |
---|
20 | |
---|
21 | class MigratePickleFileError(Exception): |
---|
22 | """ |
---|
23 | A pickle-format file exists (the FilePath to the file will be the |
---|
24 | single arg). |
---|
25 | """ |
---|
26 | pass |
---|
27 | |
---|
28 | |
---|
29 | def _convert_cycle_data(state): |
---|
30 | """ |
---|
31 | :param dict state: cycle-to-date or history-item state |
---|
32 | |
---|
33 | :return dict: the state in the JSON form |
---|
34 | """ |
---|
35 | |
---|
36 | def _convert_expiration_mode(value): |
---|
37 | # original is a 4-tuple, with the last element being a 2-tuple |
---|
38 | # .. convert both to lists |
---|
39 | return [ |
---|
40 | value[0], |
---|
41 | value[1], |
---|
42 | value[2], |
---|
43 | list(value[3]), |
---|
44 | ] |
---|
45 | |
---|
46 | def _convert_lease_age(value): |
---|
47 | # if we're in cycle-to-date, this is a dict |
---|
48 | if isinstance(value, dict): |
---|
49 | return { |
---|
50 | "{},{}".format(k[0], k[1]): v |
---|
51 | for k, v in value.items() |
---|
52 | } |
---|
53 | # otherwise, it's a history-item and they're 3-tuples |
---|
54 | return [ |
---|
55 | list(v) |
---|
56 | for v in value |
---|
57 | ] |
---|
58 | |
---|
59 | converters = { |
---|
60 | "configured-expiration-mode": _convert_expiration_mode, |
---|
61 | "cycle-start-finish-times": list, |
---|
62 | "lease-age-histogram": _convert_lease_age, |
---|
63 | "corrupt-shares": lambda value: [ |
---|
64 | list(x) |
---|
65 | for x in value |
---|
66 | ], |
---|
67 | "leases-per-share-histogram": lambda value: { |
---|
68 | str(k): v |
---|
69 | for k, v in value.items() |
---|
70 | }, |
---|
71 | } |
---|
72 | return { |
---|
73 | k: converters.get(k, lambda z: z)(v) |
---|
74 | for k, v in state.items() |
---|
75 | } |
---|
76 | |
---|
77 | |
---|
78 | def _convert_pickle_state_to_json(state): |
---|
79 | """ |
---|
80 | :param dict state: the pickled state |
---|
81 | |
---|
82 | :return dict: the state in the JSON form |
---|
83 | """ |
---|
84 | assert state["version"] == 1, "Only known version is 1" |
---|
85 | |
---|
86 | converters = { |
---|
87 | "cycle-to-date": _convert_cycle_data, |
---|
88 | } |
---|
89 | return { |
---|
90 | k: converters.get(k, lambda x: x)(v) |
---|
91 | for k, v in state.items() |
---|
92 | } |
---|
93 | |
---|
94 | |
---|
95 | def _upgrade_pickle_to_json(state_path, convert_pickle): |
---|
96 | """ |
---|
97 | :param FilePath state_path: the filepath to ensure is json |
---|
98 | |
---|
99 | :param Callable[dict] convert_pickle: function to change |
---|
100 | pickle-style state into JSON-style state |
---|
101 | |
---|
102 | :returns FilePath: the local path where the state is stored |
---|
103 | |
---|
104 | If this state is pickle, convert to the JSON format and return the |
---|
105 | JSON path. |
---|
106 | """ |
---|
107 | json_state_path = state_path.siblingExtension(".json") |
---|
108 | |
---|
109 | # if there's no file there at all, we're done because there's |
---|
110 | # nothing to upgrade |
---|
111 | if not state_path.exists(): |
---|
112 | return json_state_path |
---|
113 | |
---|
114 | # upgrade the pickle data to JSON |
---|
115 | import pickle |
---|
116 | with state_path.open("rb") as f: |
---|
117 | state = pickle.load(f) |
---|
118 | new_state = convert_pickle(state) |
---|
119 | _dump_json_to_file(new_state, json_state_path) |
---|
120 | |
---|
121 | # we've written the JSON, delete the pickle |
---|
122 | state_path.remove() |
---|
123 | return json_state_path |
---|
124 | |
---|
125 | |
---|
126 | def _confirm_json_format(fp): |
---|
127 | """ |
---|
128 | :param FilePath fp: the original (pickle) name of a state file |
---|
129 | |
---|
130 | This confirms that we do _not_ have the pickle-version of a |
---|
131 | state-file and _do_ either have nothing, or the JSON version. If |
---|
132 | the pickle-version exists, an exception is raised. |
---|
133 | |
---|
134 | :returns FilePath: the JSON name of a state file |
---|
135 | """ |
---|
136 | if fp.path.endswith(".json"): |
---|
137 | return fp |
---|
138 | jsonfp = fp.siblingExtension(".json") |
---|
139 | if fp.exists(): |
---|
140 | raise MigratePickleFileError(fp) |
---|
141 | return jsonfp |
---|
142 | |
---|
143 | |
---|
144 | def _dump_json_to_file(js, afile): |
---|
145 | """ |
---|
146 | Dump the JSON object `js` to the FilePath `afile` |
---|
147 | """ |
---|
148 | with afile.open("wb") as f: |
---|
149 | data = json.dumps(js) |
---|
150 | f.write(data.encode("utf8")) |
---|
151 | |
---|
152 | |
---|
153 | class _LeaseStateSerializer(object): |
---|
154 | """ |
---|
155 | Read and write state for LeaseCheckingCrawler. This understands |
---|
156 | how to read the legacy pickle format files and upgrade them to the |
---|
157 | new JSON format (which will occur automatically). |
---|
158 | """ |
---|
159 | |
---|
160 | def __init__(self, state_path): |
---|
161 | self._path = _confirm_json_format(FilePath(state_path)) |
---|
162 | |
---|
163 | def load(self): |
---|
164 | """ |
---|
165 | :returns: deserialized JSON state |
---|
166 | """ |
---|
167 | with self._path.open("rb") as f: |
---|
168 | return json.load(f) |
---|
169 | |
---|
170 | def save(self, data): |
---|
171 | """ |
---|
172 | Serialize the given data as JSON into the state-path |
---|
173 | :returns: None |
---|
174 | """ |
---|
175 | tmpfile = self._path.siblingExtension(".tmp") |
---|
176 | _dump_json_to_file(data, tmpfile) |
---|
177 | fileutil.move_into_place(tmpfile.path, self._path.path) |
---|
178 | return None |
---|
179 | |
---|
180 | |
---|
181 | class ShareCrawler(service.MultiService): |
---|
182 | """A ShareCrawler subclass is attached to a StorageServer, and |
---|
183 | periodically walks all of its shares, processing each one in some |
---|
184 | fashion. This crawl is rate-limited, to reduce the IO burden on the host, |
---|
185 | since large servers can easily have a terabyte of shares, in several |
---|
186 | million files, which can take hours or days to read. |
---|
187 | |
---|
188 | Once the crawler starts a cycle, it will proceed at a rate limited by the |
---|
189 | allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor |
---|
190 | after it has worked for 'cpu_slice' seconds, and not resuming right away, |
---|
191 | always trying to use less than 'allowed_cpu_percentage'. |
---|
192 | |
---|
193 | Once the crawler finishes a cycle, it will put off starting the next one |
---|
194 | long enough to ensure that 'minimum_cycle_time' elapses between the start |
---|
195 | of two consecutive cycles. |
---|
196 | |
---|
197 | We assume that the normal upload/download/get_buckets traffic of a tahoe |
---|
198 | grid will cause the prefixdir contents to be mostly cached in the kernel, |
---|
199 | or that the number of buckets in each prefixdir will be small enough to |
---|
200 | load quickly. A 1TB allmydata.com server was measured to have 2.56M |
---|
201 | buckets, spread into the 1024 prefixdirs, with about 2500 buckets per |
---|
202 | prefix. On this server, each prefixdir took 130ms-200ms to list the first |
---|
203 | time, and 17ms to list the second time. |
---|
204 | |
---|
205 | To use a crawler, create a subclass which implements the process_bucket() |
---|
206 | method. It will be called with a prefixdir and a base32 storage index |
---|
207 | string. process_bucket() must run synchronously. Any keys added to |
---|
208 | self.state will be preserved. Override add_initial_state() to set up |
---|
209 | initial state keys. Override finished_cycle() to perform additional |
---|
210 | processing when the cycle is complete. Any status that the crawler |
---|
211 | produces should be put in the self.state dictionary. Status renderers |
---|
212 | (like a web page which describes the accomplishments of your crawler) |
---|
213 | will use crawler.get_state() to retrieve this dictionary; they can |
---|
214 | present the contents as they see fit. |
---|
215 | |
---|
216 | Then create an instance, with a reference to a StorageServer and a |
---|
217 | filename where it can store persistent state. The statefile is used to |
---|
218 | keep track of how far around the ring the process has travelled, as well |
---|
219 | as timing history to allow the pace to be predicted and controlled. The |
---|
220 | statefile will be updated and written to disk after each time slice (just |
---|
221 | before the crawler yields to the reactor), and also after each cycle is |
---|
222 | finished, and also when stopService() is called. Note that this means |
---|
223 | that a crawler which is interrupted with SIGKILL while it is in the |
---|
224 | middle of a time slice will lose progress: the next time the node is |
---|
225 | started, the crawler will repeat some unknown amount of work. |
---|
226 | |
---|
227 | The crawler instance must be started with startService() before it will |
---|
228 | do any work. To make it stop doing work, call stopService(). |
---|
229 | """ |
---|
230 | |
---|
231 | slow_start = 300 # don't start crawling for 5 minutes after startup |
---|
232 | # all three of these can be changed at any time |
---|
233 | allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average |
---|
234 | cpu_slice = 1.0 # use up to 1.0 seconds before yielding |
---|
235 | minimum_cycle_time = 300 # don't run a cycle faster than this |
---|
236 | |
---|
237 | def __init__(self, server, statefile, allowed_cpu_percentage=None): |
---|
238 | service.MultiService.__init__(self) |
---|
239 | if allowed_cpu_percentage is not None: |
---|
240 | self.allowed_cpu_percentage = allowed_cpu_percentage |
---|
241 | self.server = server |
---|
242 | self.sharedir = server.sharedir |
---|
243 | self._state_serializer = _LeaseStateSerializer(statefile) |
---|
244 | self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2] |
---|
245 | for i in range(2**10)] |
---|
246 | self.prefixes = [p.decode("ascii") for p in self.prefixes] |
---|
247 | self.prefixes.sort() |
---|
248 | self.timer = None |
---|
249 | self.bucket_cache = (None, []) |
---|
250 | self.current_sleep_time = None |
---|
251 | self.next_wake_time = None |
---|
252 | self.last_prefix_finished_time = None |
---|
253 | self.last_prefix_elapsed_time = None |
---|
254 | self.last_cycle_started_time = None |
---|
255 | self.last_cycle_elapsed_time = None |
---|
256 | self.load_state() |
---|
257 | |
---|
258 | def minus_or_none(self, a, b): |
---|
259 | if a is None: |
---|
260 | return None |
---|
261 | return a-b |
---|
262 | |
---|
263 | def get_progress(self): |
---|
264 | """I return information about how much progress the crawler is |
---|
265 | making. My return value is a dictionary. The primary key is |
---|
266 | 'cycle-in-progress': True if the crawler is currently traversing the |
---|
267 | shares, False if it is idle between cycles. |
---|
268 | |
---|
269 | Note that any of these 'time' keys could be None if I am called at |
---|
270 | certain moments, so application code must be prepared to tolerate |
---|
271 | this case. The estimates will also be None if insufficient data has |
---|
272 | been gatherered to form an estimate. |
---|
273 | |
---|
274 | If cycle-in-progress is True, the following keys will be present:: |
---|
275 | |
---|
276 | cycle-complete-percentage': float, from 0.0 to 100.0, indicating how |
---|
277 | far the crawler has progressed through |
---|
278 | the current cycle |
---|
279 | remaining-sleep-time: float, seconds from now when we do more work |
---|
280 | estimated-cycle-complete-time-left: |
---|
281 | float, seconds remaining until the current cycle is finished. |
---|
282 | TODO: this does not yet include the remaining time left in |
---|
283 | the current prefixdir, and it will be very inaccurate on fast |
---|
284 | crawlers (which can process a whole prefix in a single tick) |
---|
285 | estimated-time-per-cycle: float, seconds required to do a complete |
---|
286 | cycle |
---|
287 | |
---|
288 | If cycle-in-progress is False, the following keys are available:: |
---|
289 | |
---|
290 | next-crawl-time: float, seconds-since-epoch when next crawl starts |
---|
291 | remaining-wait-time: float, seconds from now when next crawl starts |
---|
292 | estimated-time-per-cycle: float, seconds required to do a complete |
---|
293 | cycle |
---|
294 | """ |
---|
295 | |
---|
296 | d = {} |
---|
297 | |
---|
298 | if self.state["current-cycle"] is None: |
---|
299 | d["cycle-in-progress"] = False |
---|
300 | d["next-crawl-time"] = self.next_wake_time |
---|
301 | d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time, |
---|
302 | time.time()) |
---|
303 | else: |
---|
304 | d["cycle-in-progress"] = True |
---|
305 | pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes) |
---|
306 | d["cycle-complete-percentage"] = pct |
---|
307 | remaining = None |
---|
308 | if self.last_prefix_elapsed_time is not None: |
---|
309 | left = len(self.prefixes) - self.last_complete_prefix_index |
---|
310 | remaining = left * self.last_prefix_elapsed_time |
---|
311 | # TODO: remainder of this prefix: we need to estimate the |
---|
312 | # per-bucket time, probably by measuring the time spent on |
---|
313 | # this prefix so far, divided by the number of buckets we've |
---|
314 | # processed. |
---|
315 | d["estimated-cycle-complete-time-left"] = remaining |
---|
316 | # it's possible to call get_progress() from inside a crawler's |
---|
317 | # finished_prefix() function |
---|
318 | d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time, |
---|
319 | time.time()) |
---|
320 | per_cycle = None |
---|
321 | if self.last_cycle_elapsed_time is not None: |
---|
322 | per_cycle = self.last_cycle_elapsed_time |
---|
323 | elif self.last_prefix_elapsed_time is not None: |
---|
324 | per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time |
---|
325 | d["estimated-time-per-cycle"] = per_cycle |
---|
326 | return d |
---|
327 | |
---|
328 | def get_state(self): |
---|
329 | """I return the current state of the crawler. This is a copy of my |
---|
330 | state dictionary. |
---|
331 | |
---|
332 | If we are not currently sleeping (i.e. get_state() was called from |
---|
333 | inside the process_prefixdir, process_bucket, or finished_cycle() |
---|
334 | methods, or if startService has not yet been called on this crawler), |
---|
335 | these two keys will be None. |
---|
336 | |
---|
337 | Subclasses can override this to add computed keys to the return value, |
---|
338 | but don't forget to start with the upcall. |
---|
339 | """ |
---|
340 | state = self.state.copy() # it isn't a deepcopy, so don't go crazy |
---|
341 | return state |
---|
342 | |
---|
343 | def load_state(self): |
---|
344 | # we use this to store state for both the crawler's internals and |
---|
345 | # anything the subclass-specific code needs. The state is stored |
---|
346 | # after each bucket is processed, after each prefixdir is processed, |
---|
347 | # and after a cycle is complete. The internal keys we use are: |
---|
348 | # ["version"]: int, always 1 |
---|
349 | # ["last-cycle-finished"]: int, or None if we have not yet finished |
---|
350 | # any cycle |
---|
351 | # ["current-cycle"]: int, or None if we are sleeping between cycles |
---|
352 | # ["current-cycle-start-time"]: int, seconds-since-epoch of when this |
---|
353 | # cycle was started, possibly by an earlier |
---|
354 | # process |
---|
355 | # ["last-complete-prefix"]: str, two-letter name of the last prefixdir |
---|
356 | # that was fully processed, or None if we |
---|
357 | # are sleeping between cycles, or if we |
---|
358 | # have not yet finished any prefixdir since |
---|
359 | # a cycle was started |
---|
360 | # ["last-complete-bucket"]: str, base32 storage index bucket name |
---|
361 | # of the last bucket to be processed, or |
---|
362 | # None if we are sleeping between cycles |
---|
363 | try: |
---|
364 | state = self._state_serializer.load() |
---|
365 | except Exception: |
---|
366 | state = {"version": 1, |
---|
367 | "last-cycle-finished": None, |
---|
368 | "current-cycle": None, |
---|
369 | "last-complete-prefix": None, |
---|
370 | "last-complete-bucket": None, |
---|
371 | } |
---|
372 | state.setdefault("current-cycle-start-time", time.time()) # approximate |
---|
373 | self.state = state |
---|
374 | lcp = state["last-complete-prefix"] |
---|
375 | if lcp == None: |
---|
376 | self.last_complete_prefix_index = -1 |
---|
377 | else: |
---|
378 | self.last_complete_prefix_index = self.prefixes.index(lcp) |
---|
379 | self.add_initial_state() |
---|
380 | |
---|
381 | def add_initial_state(self): |
---|
382 | """Hook method to add extra keys to self.state when first loaded. |
---|
383 | |
---|
384 | The first time this Crawler is used, or when the code has been |
---|
385 | upgraded, the saved state file may not contain all the keys you |
---|
386 | expect. Use this method to add any missing keys. Simply modify |
---|
387 | self.state as needed. |
---|
388 | |
---|
389 | This method for subclasses to override. No upcall is necessary. |
---|
390 | """ |
---|
391 | pass |
---|
392 | |
---|
393 | def save_state(self): |
---|
394 | lcpi = self.last_complete_prefix_index |
---|
395 | if lcpi == -1: |
---|
396 | last_complete_prefix = None |
---|
397 | else: |
---|
398 | last_complete_prefix = self.prefixes[lcpi] |
---|
399 | self.state["last-complete-prefix"] = last_complete_prefix |
---|
400 | self._state_serializer.save(self.get_state()) |
---|
401 | |
---|
402 | def startService(self): |
---|
403 | # arrange things to look like we were just sleeping, so |
---|
404 | # status/progress values work correctly |
---|
405 | self.sleeping_between_cycles = True |
---|
406 | self.current_sleep_time = self.slow_start |
---|
407 | self.next_wake_time = time.time() + self.slow_start |
---|
408 | self.timer = reactor.callLater(self.slow_start, self.start_slice) |
---|
409 | service.MultiService.startService(self) |
---|
410 | |
---|
411 | def stopService(self): |
---|
412 | if self.timer: |
---|
413 | self.timer.cancel() |
---|
414 | self.timer = None |
---|
415 | self.save_state() |
---|
416 | return service.MultiService.stopService(self) |
---|
417 | |
---|
418 | def start_slice(self): |
---|
419 | start_slice = time.time() |
---|
420 | self.timer = None |
---|
421 | self.sleeping_between_cycles = False |
---|
422 | self.current_sleep_time = None |
---|
423 | self.next_wake_time = None |
---|
424 | try: |
---|
425 | self.start_current_prefix(start_slice) |
---|
426 | finished_cycle = True |
---|
427 | except TimeSliceExceeded: |
---|
428 | finished_cycle = False |
---|
429 | self.save_state() |
---|
430 | if not self.running: |
---|
431 | # someone might have used stopService() to shut us down |
---|
432 | return |
---|
433 | # either we finished a whole cycle, or we ran out of time |
---|
434 | now = time.time() |
---|
435 | this_slice = now - start_slice |
---|
436 | # this_slice/(this_slice+sleep_time) = percentage |
---|
437 | # this_slice/percentage = this_slice+sleep_time |
---|
438 | # sleep_time = (this_slice/percentage) - this_slice |
---|
439 | sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice |
---|
440 | # if the math gets weird, or a timequake happens, don't sleep |
---|
441 | # forever. Note that this means that, while a cycle is running, we |
---|
442 | # will process at least one bucket every 5 minutes, no matter how |
---|
443 | # long that bucket takes. |
---|
444 | sleep_time = max(0.0, min(sleep_time, 299)) |
---|
445 | if finished_cycle: |
---|
446 | # how long should we sleep between cycles? Don't run faster than |
---|
447 | # allowed_cpu_percentage says, but also run faster than |
---|
448 | # minimum_cycle_time |
---|
449 | self.sleeping_between_cycles = True |
---|
450 | sleep_time = max(sleep_time, self.minimum_cycle_time) |
---|
451 | else: |
---|
452 | self.sleeping_between_cycles = False |
---|
453 | self.current_sleep_time = sleep_time # for status page |
---|
454 | self.next_wake_time = now + sleep_time |
---|
455 | self.yielding(sleep_time) |
---|
456 | self.timer = reactor.callLater(sleep_time, self.start_slice) |
---|
457 | |
---|
458 | def start_current_prefix(self, start_slice): |
---|
459 | state = self.state |
---|
460 | if state["current-cycle"] is None: |
---|
461 | self.last_cycle_started_time = time.time() |
---|
462 | state["current-cycle-start-time"] = self.last_cycle_started_time |
---|
463 | if state["last-cycle-finished"] is None: |
---|
464 | state["current-cycle"] = 0 |
---|
465 | else: |
---|
466 | state["current-cycle"] = state["last-cycle-finished"] + 1 |
---|
467 | self.started_cycle(state["current-cycle"]) |
---|
468 | cycle = state["current-cycle"] |
---|
469 | |
---|
470 | for i in range(self.last_complete_prefix_index+1, len(self.prefixes)): |
---|
471 | # if we want to yield earlier, just raise TimeSliceExceeded() |
---|
472 | prefix = self.prefixes[i] |
---|
473 | prefixdir = os.path.join(self.sharedir, prefix) |
---|
474 | if i == self.bucket_cache[0]: |
---|
475 | buckets = self.bucket_cache[1] |
---|
476 | else: |
---|
477 | try: |
---|
478 | buckets = os.listdir(prefixdir) |
---|
479 | buckets.sort() |
---|
480 | except EnvironmentError: |
---|
481 | buckets = [] |
---|
482 | self.bucket_cache = (i, buckets) |
---|
483 | self.process_prefixdir(cycle, prefix, prefixdir, |
---|
484 | buckets, start_slice) |
---|
485 | self.last_complete_prefix_index = i |
---|
486 | |
---|
487 | now = time.time() |
---|
488 | if self.last_prefix_finished_time is not None: |
---|
489 | elapsed = now - self.last_prefix_finished_time |
---|
490 | self.last_prefix_elapsed_time = elapsed |
---|
491 | self.last_prefix_finished_time = now |
---|
492 | |
---|
493 | self.finished_prefix(cycle, prefix) |
---|
494 | if time.time() >= start_slice + self.cpu_slice: |
---|
495 | raise TimeSliceExceeded() |
---|
496 | |
---|
497 | # yay! we finished the whole cycle |
---|
498 | self.last_complete_prefix_index = -1 |
---|
499 | self.last_prefix_finished_time = None # don't include the sleep |
---|
500 | now = time.time() |
---|
501 | if self.last_cycle_started_time is not None: |
---|
502 | self.last_cycle_elapsed_time = now - self.last_cycle_started_time |
---|
503 | state["last-complete-bucket"] = None |
---|
504 | state["last-cycle-finished"] = cycle |
---|
505 | state["current-cycle"] = None |
---|
506 | self.finished_cycle(cycle) |
---|
507 | self.save_state() |
---|
508 | |
---|
509 | def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice): |
---|
510 | """This gets a list of bucket names (i.e. storage index strings, |
---|
511 | base32-encoded) in sorted order. |
---|
512 | |
---|
513 | You can override this if your crawler doesn't care about the actual |
---|
514 | shares, for example a crawler which merely keeps track of how many |
---|
515 | buckets are being managed by this server. |
---|
516 | |
---|
517 | Subclasses which *do* care about actual bucket should leave this |
---|
518 | method along, and implement process_bucket() instead. |
---|
519 | """ |
---|
520 | |
---|
521 | for bucket in buckets: |
---|
522 | last_complete = self.state["last-complete-bucket"] |
---|
523 | if last_complete is not None and bucket <= last_complete: |
---|
524 | continue |
---|
525 | self.process_bucket(cycle, prefix, prefixdir, bucket) |
---|
526 | self.state["last-complete-bucket"] = bucket |
---|
527 | if time.time() >= start_slice + self.cpu_slice: |
---|
528 | raise TimeSliceExceeded() |
---|
529 | |
---|
530 | # the remaining methods are explictly for subclasses to implement. |
---|
531 | |
---|
532 | def started_cycle(self, cycle): |
---|
533 | """Notify a subclass that the crawler is about to start a cycle. |
---|
534 | |
---|
535 | This method is for subclasses to override. No upcall is necessary. |
---|
536 | """ |
---|
537 | pass |
---|
538 | |
---|
539 | def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): |
---|
540 | """Examine a single bucket. Subclasses should do whatever they want |
---|
541 | to do to the shares therein, then update self.state as necessary. |
---|
542 | |
---|
543 | If the crawler is never interrupted by SIGKILL, this method will be |
---|
544 | called exactly once per share (per cycle). If it *is* interrupted, |
---|
545 | then the next time the node is started, some amount of work will be |
---|
546 | duplicated, according to when self.save_state() was last called. By |
---|
547 | default, save_state() is called at the end of each timeslice, and |
---|
548 | after finished_cycle() returns, and when stopService() is called. |
---|
549 | |
---|
550 | To reduce the chance of duplicate work (i.e. to avoid adding multiple |
---|
551 | records to a database), you can call save_state() at the end of your |
---|
552 | process_bucket() method. This will reduce the maximum duplicated work |
---|
553 | to one bucket per SIGKILL. It will also add overhead, probably 1-20ms |
---|
554 | per bucket (and some disk writes), which will count against your |
---|
555 | allowed_cpu_percentage, and which may be considerable if |
---|
556 | process_bucket() runs quickly. |
---|
557 | |
---|
558 | This method is for subclasses to override. No upcall is necessary. |
---|
559 | """ |
---|
560 | pass |
---|
561 | |
---|
562 | def finished_prefix(self, cycle, prefix): |
---|
563 | """Notify a subclass that the crawler has just finished processing a |
---|
564 | prefix directory (all buckets with the same two-character/10bit |
---|
565 | prefix). To impose a limit on how much work might be duplicated by a |
---|
566 | SIGKILL that occurs during a timeslice, you can call |
---|
567 | self.save_state() here, but be aware that it may represent a |
---|
568 | significant performance hit. |
---|
569 | |
---|
570 | This method is for subclasses to override. No upcall is necessary. |
---|
571 | """ |
---|
572 | pass |
---|
573 | |
---|
574 | def finished_cycle(self, cycle): |
---|
575 | """Notify subclass that a cycle (one complete traversal of all |
---|
576 | prefixdirs) has just finished. 'cycle' is the number of the cycle |
---|
577 | that just finished. This method should perform summary work and |
---|
578 | update self.state to publish information to status displays. |
---|
579 | |
---|
580 | One-shot crawlers, such as those used to upgrade shares to a new |
---|
581 | format or populate a database for the first time, can call |
---|
582 | self.stopService() (or more likely self.disownServiceParent()) to |
---|
583 | prevent it from running a second time. Don't forget to set some |
---|
584 | persistent state so that the upgrader won't be run again the next |
---|
585 | time the node is started. |
---|
586 | |
---|
587 | This method is for subclasses to override. No upcall is necessary. |
---|
588 | """ |
---|
589 | pass |
---|
590 | |
---|
591 | def yielding(self, sleep_time): |
---|
592 | """The crawler is about to sleep for 'sleep_time' seconds. This |
---|
593 | method is mostly for the convenience of unit tests. |
---|
594 | |
---|
595 | This method is for subclasses to override. No upcall is necessary. |
---|
596 | """ |
---|
597 | pass |
---|
598 | |
---|
599 | |
---|
600 | class BucketCountingCrawler(ShareCrawler): |
---|
601 | """I keep track of how many buckets are being managed by this server. |
---|
602 | This is equivalent to the number of distributed files and directories for |
---|
603 | which I am providing storage. The actual number of files+directories in |
---|
604 | the full grid is probably higher (especially when there are more servers |
---|
605 | than 'N', the number of generated shares), because some files+directories |
---|
606 | will have shares on other servers instead of me. Also note that the |
---|
607 | number of buckets will differ from the number of shares in small grids, |
---|
608 | when more than one share is placed on a single server. |
---|
609 | """ |
---|
610 | |
---|
611 | minimum_cycle_time = 60*60 # we don't need this more than once an hour |
---|
612 | |
---|
613 | def __init__(self, server, statefile, num_sample_prefixes=1): |
---|
614 | ShareCrawler.__init__(self, server, statefile) |
---|
615 | self.num_sample_prefixes = num_sample_prefixes |
---|
616 | |
---|
617 | def add_initial_state(self): |
---|
618 | # ["bucket-counts"][cyclenum][prefix] = number |
---|
619 | # ["last-complete-cycle"] = cyclenum # maintained by base class |
---|
620 | # ["last-complete-bucket-count"] = number |
---|
621 | # ["storage-index-samples"][prefix] = (cyclenum, |
---|
622 | # list of SI strings (base32)) |
---|
623 | self.state.setdefault("bucket-counts", {}) |
---|
624 | self.state.setdefault("last-complete-bucket-count", None) |
---|
625 | self.state.setdefault("storage-index-samples", {}) |
---|
626 | |
---|
627 | def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice): |
---|
628 | # we override process_prefixdir() because we don't want to look at |
---|
629 | # the individual buckets. We'll save state after each one. On my |
---|
630 | # laptop, a mostly-empty storage server can process about 70 |
---|
631 | # prefixdirs in a 1.0s slice. |
---|
632 | if cycle not in self.state["bucket-counts"]: |
---|
633 | self.state["bucket-counts"][cycle] = {} |
---|
634 | self.state["bucket-counts"][cycle][prefix] = len(buckets) |
---|
635 | if prefix in self.prefixes[:self.num_sample_prefixes]: |
---|
636 | self.state["storage-index-samples"][prefix] = (cycle, buckets) |
---|
637 | |
---|
638 | def finished_cycle(self, cycle): |
---|
639 | last_counts = self.state["bucket-counts"].get(cycle, []) |
---|
640 | if len(last_counts) == len(self.prefixes): |
---|
641 | # great, we have a whole cycle. |
---|
642 | num_buckets = sum(last_counts.values()) |
---|
643 | self.state["last-complete-bucket-count"] = num_buckets |
---|
644 | # get rid of old counts |
---|
645 | for old_cycle in list(self.state["bucket-counts"].keys()): |
---|
646 | if old_cycle != cycle: |
---|
647 | del self.state["bucket-counts"][old_cycle] |
---|
648 | # get rid of old samples too |
---|
649 | for prefix in list(self.state["storage-index-samples"].keys()): |
---|
650 | old_cycle,buckets = self.state["storage-index-samples"][prefix] |
---|
651 | if old_cycle != cycle: |
---|
652 | del self.state["storage-index-samples"][prefix] |
---|
653 | |
---|