source: trunk/src/allmydata/storage/crawler.py

Last change on this file was 53084f7, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-27T23:49:07Z

remove more Python2 compatibility

  • Property mode set to 100644
File size: 26.8 KB
Line 
1"""
2Crawl the storage server shares.
3
4Ported to Python 3.
5"""
6
7import os
8import time
9import json
10import struct
11from twisted.internet import reactor
12from twisted.application import service
13from twisted.python.filepath import FilePath
14from allmydata.storage.common import si_b2a
15from allmydata.util import fileutil
16
17class TimeSliceExceeded(Exception):
18    pass
19
20
21class MigratePickleFileError(Exception):
22    """
23    A pickle-format file exists (the FilePath to the file will be the
24    single arg).
25    """
26    pass
27
28
29def _convert_cycle_data(state):
30    """
31    :param dict state: cycle-to-date or history-item state
32
33    :return dict: the state in the JSON form
34    """
35
36    def _convert_expiration_mode(value):
37        # original is a 4-tuple, with the last element being a 2-tuple
38        # .. convert both to lists
39        return [
40            value[0],
41            value[1],
42            value[2],
43            list(value[3]),
44        ]
45
46    def _convert_lease_age(value):
47        # if we're in cycle-to-date, this is a dict
48        if isinstance(value, dict):
49            return {
50                "{},{}".format(k[0], k[1]): v
51                for k, v in value.items()
52            }
53        # otherwise, it's a history-item and they're 3-tuples
54        return [
55            list(v)
56            for v in value
57        ]
58
59    converters = {
60        "configured-expiration-mode": _convert_expiration_mode,
61        "cycle-start-finish-times": list,
62        "lease-age-histogram": _convert_lease_age,
63        "corrupt-shares": lambda value: [
64            list(x)
65            for x in value
66        ],
67        "leases-per-share-histogram": lambda value: {
68            str(k): v
69            for k, v in value.items()
70        },
71    }
72    return {
73            k: converters.get(k, lambda z: z)(v)
74            for k, v in state.items()
75    }
76
77
78def _convert_pickle_state_to_json(state):
79    """
80    :param dict state: the pickled state
81
82    :return dict: the state in the JSON form
83    """
84    assert state["version"] == 1, "Only known version is 1"
85
86    converters = {
87        "cycle-to-date": _convert_cycle_data,
88    }
89    return {
90        k: converters.get(k, lambda x: x)(v)
91        for k, v in state.items()
92    }
93
94
95def _upgrade_pickle_to_json(state_path, convert_pickle):
96    """
97    :param FilePath state_path: the filepath to ensure is json
98
99    :param Callable[dict] convert_pickle: function to change
100        pickle-style state into JSON-style state
101
102    :returns FilePath: the local path where the state is stored
103
104    If this state is pickle, convert to the JSON format and return the
105    JSON path.
106    """
107    json_state_path = state_path.siblingExtension(".json")
108
109    # if there's no file there at all, we're done because there's
110    # nothing to upgrade
111    if not state_path.exists():
112        return json_state_path
113
114    # upgrade the pickle data to JSON
115    import pickle
116    with state_path.open("rb") as f:
117        state = pickle.load(f)
118    new_state = convert_pickle(state)
119    _dump_json_to_file(new_state, json_state_path)
120
121    # we've written the JSON, delete the pickle
122    state_path.remove()
123    return json_state_path
124
125
126def _confirm_json_format(fp):
127    """
128    :param FilePath fp: the original (pickle) name of a state file
129
130    This confirms that we do _not_ have the pickle-version of a
131    state-file and _do_ either have nothing, or the JSON version. If
132    the pickle-version exists, an exception is raised.
133
134    :returns FilePath: the JSON name of a state file
135    """
136    if fp.path.endswith(".json"):
137        return fp
138    jsonfp = fp.siblingExtension(".json")
139    if fp.exists():
140        raise MigratePickleFileError(fp)
141    return jsonfp
142
143
144def _dump_json_to_file(js, afile):
145    """
146    Dump the JSON object `js` to the FilePath `afile`
147    """
148    with afile.open("wb") as f:
149        data = json.dumps(js)
150        f.write(data.encode("utf8"))
151
152
153class _LeaseStateSerializer(object):
154    """
155    Read and write state for LeaseCheckingCrawler. This understands
156    how to read the legacy pickle format files and upgrade them to the
157    new JSON format (which will occur automatically).
158    """
159
160    def __init__(self, state_path):
161        self._path = _confirm_json_format(FilePath(state_path))
162
163    def load(self):
164        """
165        :returns: deserialized JSON state
166        """
167        with self._path.open("rb") as f:
168            return json.load(f)
169
170    def save(self, data):
171        """
172        Serialize the given data as JSON into the state-path
173        :returns: None
174        """
175        tmpfile = self._path.siblingExtension(".tmp")
176        _dump_json_to_file(data, tmpfile)
177        fileutil.move_into_place(tmpfile.path, self._path.path)
178        return None
179
180
181class ShareCrawler(service.MultiService):
182    """A ShareCrawler subclass is attached to a StorageServer, and
183    periodically walks all of its shares, processing each one in some
184    fashion. This crawl is rate-limited, to reduce the IO burden on the host,
185    since large servers can easily have a terabyte of shares, in several
186    million files, which can take hours or days to read.
187
188    Once the crawler starts a cycle, it will proceed at a rate limited by the
189    allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
190    after it has worked for 'cpu_slice' seconds, and not resuming right away,
191    always trying to use less than 'allowed_cpu_percentage'.
192
193    Once the crawler finishes a cycle, it will put off starting the next one
194    long enough to ensure that 'minimum_cycle_time' elapses between the start
195    of two consecutive cycles.
196
197    We assume that the normal upload/download/get_buckets traffic of a tahoe
198    grid will cause the prefixdir contents to be mostly cached in the kernel,
199    or that the number of buckets in each prefixdir will be small enough to
200    load quickly. A 1TB allmydata.com server was measured to have 2.56M
201    buckets, spread into the 1024 prefixdirs, with about 2500 buckets per
202    prefix. On this server, each prefixdir took 130ms-200ms to list the first
203    time, and 17ms to list the second time.
204
205    To use a crawler, create a subclass which implements the process_bucket()
206    method. It will be called with a prefixdir and a base32 storage index
207    string. process_bucket() must run synchronously. Any keys added to
208    self.state will be preserved. Override add_initial_state() to set up
209    initial state keys. Override finished_cycle() to perform additional
210    processing when the cycle is complete. Any status that the crawler
211    produces should be put in the self.state dictionary. Status renderers
212    (like a web page which describes the accomplishments of your crawler)
213    will use crawler.get_state() to retrieve this dictionary; they can
214    present the contents as they see fit.
215
216    Then create an instance, with a reference to a StorageServer and a
217    filename where it can store persistent state. The statefile is used to
218    keep track of how far around the ring the process has travelled, as well
219    as timing history to allow the pace to be predicted and controlled. The
220    statefile will be updated and written to disk after each time slice (just
221    before the crawler yields to the reactor), and also after each cycle is
222    finished, and also when stopService() is called. Note that this means
223    that a crawler which is interrupted with SIGKILL while it is in the
224    middle of a time slice will lose progress: the next time the node is
225    started, the crawler will repeat some unknown amount of work.
226
227    The crawler instance must be started with startService() before it will
228    do any work. To make it stop doing work, call stopService().
229    """
230
231    slow_start = 300 # don't start crawling for 5 minutes after startup
232    # all three of these can be changed at any time
233    allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
234    cpu_slice = 1.0 # use up to 1.0 seconds before yielding
235    minimum_cycle_time = 300 # don't run a cycle faster than this
236
237    def __init__(self, server, statefile, allowed_cpu_percentage=None):
238        service.MultiService.__init__(self)
239        if allowed_cpu_percentage is not None:
240            self.allowed_cpu_percentage = allowed_cpu_percentage
241        self.server = server
242        self.sharedir = server.sharedir
243        self._state_serializer = _LeaseStateSerializer(statefile)
244        self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2]
245                         for i in range(2**10)]
246        self.prefixes = [p.decode("ascii") for p in self.prefixes]
247        self.prefixes.sort()
248        self.timer = None
249        self.bucket_cache = (None, [])
250        self.current_sleep_time = None
251        self.next_wake_time = None
252        self.last_prefix_finished_time = None
253        self.last_prefix_elapsed_time = None
254        self.last_cycle_started_time = None
255        self.last_cycle_elapsed_time = None
256        self.load_state()
257
258    def minus_or_none(self, a, b):
259        if a is None:
260            return None
261        return a-b
262
263    def get_progress(self):
264        """I return information about how much progress the crawler is
265        making. My return value is a dictionary. The primary key is
266        'cycle-in-progress': True if the crawler is currently traversing the
267        shares, False if it is idle between cycles.
268
269        Note that any of these 'time' keys could be None if I am called at
270        certain moments, so application code must be prepared to tolerate
271        this case. The estimates will also be None if insufficient data has
272        been gatherered to form an estimate.
273
274        If cycle-in-progress is True, the following keys will be present::
275
276         cycle-complete-percentage': float, from 0.0 to 100.0, indicating how
277                                     far the crawler has progressed through
278                                     the current cycle
279         remaining-sleep-time: float, seconds from now when we do more work
280         estimated-cycle-complete-time-left:
281                float, seconds remaining until the current cycle is finished.
282                TODO: this does not yet include the remaining time left in
283                the current prefixdir, and it will be very inaccurate on fast
284                crawlers (which can process a whole prefix in a single tick)
285         estimated-time-per-cycle: float, seconds required to do a complete
286                                   cycle
287
288        If cycle-in-progress is False, the following keys are available::
289
290         next-crawl-time: float, seconds-since-epoch when next crawl starts
291         remaining-wait-time: float, seconds from now when next crawl starts
292         estimated-time-per-cycle: float, seconds required to do a complete
293                                   cycle
294        """
295
296        d = {}
297
298        if self.state["current-cycle"] is None:
299            d["cycle-in-progress"] = False
300            d["next-crawl-time"] = self.next_wake_time
301            d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time,
302                                                          time.time())
303        else:
304            d["cycle-in-progress"] = True
305            pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes)
306            d["cycle-complete-percentage"] = pct
307            remaining = None
308            if self.last_prefix_elapsed_time is not None:
309                left = len(self.prefixes) - self.last_complete_prefix_index
310                remaining = left * self.last_prefix_elapsed_time
311                # TODO: remainder of this prefix: we need to estimate the
312                # per-bucket time, probably by measuring the time spent on
313                # this prefix so far, divided by the number of buckets we've
314                # processed.
315            d["estimated-cycle-complete-time-left"] = remaining
316            # it's possible to call get_progress() from inside a crawler's
317            # finished_prefix() function
318            d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time,
319                                                           time.time())
320        per_cycle = None
321        if self.last_cycle_elapsed_time is not None:
322            per_cycle = self.last_cycle_elapsed_time
323        elif self.last_prefix_elapsed_time is not None:
324            per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time
325        d["estimated-time-per-cycle"] = per_cycle
326        return d
327
328    def get_state(self):
329        """I return the current state of the crawler. This is a copy of my
330        state dictionary.
331
332        If we are not currently sleeping (i.e. get_state() was called from
333        inside the process_prefixdir, process_bucket, or finished_cycle()
334        methods, or if startService has not yet been called on this crawler),
335        these two keys will be None.
336
337        Subclasses can override this to add computed keys to the return value,
338        but don't forget to start with the upcall.
339        """
340        state = self.state.copy() # it isn't a deepcopy, so don't go crazy
341        return state
342
343    def load_state(self):
344        # we use this to store state for both the crawler's internals and
345        # anything the subclass-specific code needs. The state is stored
346        # after each bucket is processed, after each prefixdir is processed,
347        # and after a cycle is complete. The internal keys we use are:
348        #  ["version"]: int, always 1
349        #  ["last-cycle-finished"]: int, or None if we have not yet finished
350        #                           any cycle
351        #  ["current-cycle"]: int, or None if we are sleeping between cycles
352        #  ["current-cycle-start-time"]: int, seconds-since-epoch of when this
353        #                                cycle was started, possibly by an earlier
354        #                                process
355        #  ["last-complete-prefix"]: str, two-letter name of the last prefixdir
356        #                            that was fully processed, or None if we
357        #                            are sleeping between cycles, or if we
358        #                            have not yet finished any prefixdir since
359        #                            a cycle was started
360        #  ["last-complete-bucket"]: str, base32 storage index bucket name
361        #                            of the last bucket to be processed, or
362        #                            None if we are sleeping between cycles
363        try:
364            state = self._state_serializer.load()
365        except Exception:
366            state = {"version": 1,
367                     "last-cycle-finished": None,
368                     "current-cycle": None,
369                     "last-complete-prefix": None,
370                     "last-complete-bucket": None,
371                     }
372        state.setdefault("current-cycle-start-time", time.time()) # approximate
373        self.state = state
374        lcp = state["last-complete-prefix"]
375        if lcp == None:
376            self.last_complete_prefix_index = -1
377        else:
378            self.last_complete_prefix_index = self.prefixes.index(lcp)
379        self.add_initial_state()
380
381    def add_initial_state(self):
382        """Hook method to add extra keys to self.state when first loaded.
383
384        The first time this Crawler is used, or when the code has been
385        upgraded, the saved state file may not contain all the keys you
386        expect. Use this method to add any missing keys. Simply modify
387        self.state as needed.
388
389        This method for subclasses to override. No upcall is necessary.
390        """
391        pass
392
393    def save_state(self):
394        lcpi = self.last_complete_prefix_index
395        if lcpi == -1:
396            last_complete_prefix = None
397        else:
398            last_complete_prefix = self.prefixes[lcpi]
399        self.state["last-complete-prefix"] = last_complete_prefix
400        self._state_serializer.save(self.get_state())
401
402    def startService(self):
403        # arrange things to look like we were just sleeping, so
404        # status/progress values work correctly
405        self.sleeping_between_cycles = True
406        self.current_sleep_time = self.slow_start
407        self.next_wake_time = time.time() + self.slow_start
408        self.timer = reactor.callLater(self.slow_start, self.start_slice)
409        service.MultiService.startService(self)
410
411    def stopService(self):
412        if self.timer:
413            self.timer.cancel()
414            self.timer = None
415        self.save_state()
416        return service.MultiService.stopService(self)
417
418    def start_slice(self):
419        start_slice = time.time()
420        self.timer = None
421        self.sleeping_between_cycles = False
422        self.current_sleep_time = None
423        self.next_wake_time = None
424        try:
425            self.start_current_prefix(start_slice)
426            finished_cycle = True
427        except TimeSliceExceeded:
428            finished_cycle = False
429        self.save_state()
430        if not self.running:
431            # someone might have used stopService() to shut us down
432            return
433        # either we finished a whole cycle, or we ran out of time
434        now = time.time()
435        this_slice = now - start_slice
436        # this_slice/(this_slice+sleep_time) = percentage
437        # this_slice/percentage = this_slice+sleep_time
438        # sleep_time = (this_slice/percentage) - this_slice
439        sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice
440        # if the math gets weird, or a timequake happens, don't sleep
441        # forever. Note that this means that, while a cycle is running, we
442        # will process at least one bucket every 5 minutes, no matter how
443        # long that bucket takes.
444        sleep_time = max(0.0, min(sleep_time, 299))
445        if finished_cycle:
446            # how long should we sleep between cycles? Don't run faster than
447            # allowed_cpu_percentage says, but also run faster than
448            # minimum_cycle_time
449            self.sleeping_between_cycles = True
450            sleep_time = max(sleep_time, self.minimum_cycle_time)
451        else:
452            self.sleeping_between_cycles = False
453        self.current_sleep_time = sleep_time # for status page
454        self.next_wake_time = now + sleep_time
455        self.yielding(sleep_time)
456        self.timer = reactor.callLater(sleep_time, self.start_slice)
457
458    def start_current_prefix(self, start_slice):
459        state = self.state
460        if state["current-cycle"] is None:
461            self.last_cycle_started_time = time.time()
462            state["current-cycle-start-time"] = self.last_cycle_started_time
463            if state["last-cycle-finished"] is None:
464                state["current-cycle"] = 0
465            else:
466                state["current-cycle"] = state["last-cycle-finished"] + 1
467            self.started_cycle(state["current-cycle"])
468        cycle = state["current-cycle"]
469
470        for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
471            # if we want to yield earlier, just raise TimeSliceExceeded()
472            prefix = self.prefixes[i]
473            prefixdir = os.path.join(self.sharedir, prefix)
474            if i == self.bucket_cache[0]:
475                buckets = self.bucket_cache[1]
476            else:
477                try:
478                    buckets = os.listdir(prefixdir)
479                    buckets.sort()
480                except EnvironmentError:
481                    buckets = []
482                self.bucket_cache = (i, buckets)
483            self.process_prefixdir(cycle, prefix, prefixdir,
484                                   buckets, start_slice)
485            self.last_complete_prefix_index = i
486
487            now = time.time()
488            if self.last_prefix_finished_time is not None:
489                elapsed = now - self.last_prefix_finished_time
490                self.last_prefix_elapsed_time = elapsed
491            self.last_prefix_finished_time = now
492
493            self.finished_prefix(cycle, prefix)
494            if time.time() >= start_slice + self.cpu_slice:
495                raise TimeSliceExceeded()
496
497        # yay! we finished the whole cycle
498        self.last_complete_prefix_index = -1
499        self.last_prefix_finished_time = None # don't include the sleep
500        now = time.time()
501        if self.last_cycle_started_time is not None:
502            self.last_cycle_elapsed_time = now - self.last_cycle_started_time
503        state["last-complete-bucket"] = None
504        state["last-cycle-finished"] = cycle
505        state["current-cycle"] = None
506        self.finished_cycle(cycle)
507        self.save_state()
508
509    def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
510        """This gets a list of bucket names (i.e. storage index strings,
511        base32-encoded) in sorted order.
512
513        You can override this if your crawler doesn't care about the actual
514        shares, for example a crawler which merely keeps track of how many
515        buckets are being managed by this server.
516
517        Subclasses which *do* care about actual bucket should leave this
518        method along, and implement process_bucket() instead.
519        """
520
521        for bucket in buckets:
522            last_complete = self.state["last-complete-bucket"]
523            if last_complete is not None and bucket <= last_complete:
524                continue
525            self.process_bucket(cycle, prefix, prefixdir, bucket)
526            self.state["last-complete-bucket"] = bucket
527            if time.time() >= start_slice + self.cpu_slice:
528                raise TimeSliceExceeded()
529
530    # the remaining methods are explictly for subclasses to implement.
531
532    def started_cycle(self, cycle):
533        """Notify a subclass that the crawler is about to start a cycle.
534
535        This method is for subclasses to override. No upcall is necessary.
536        """
537        pass
538
539    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
540        """Examine a single bucket. Subclasses should do whatever they want
541        to do to the shares therein, then update self.state as necessary.
542
543        If the crawler is never interrupted by SIGKILL, this method will be
544        called exactly once per share (per cycle). If it *is* interrupted,
545        then the next time the node is started, some amount of work will be
546        duplicated, according to when self.save_state() was last called. By
547        default, save_state() is called at the end of each timeslice, and
548        after finished_cycle() returns, and when stopService() is called.
549
550        To reduce the chance of duplicate work (i.e. to avoid adding multiple
551        records to a database), you can call save_state() at the end of your
552        process_bucket() method. This will reduce the maximum duplicated work
553        to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
554        per bucket (and some disk writes), which will count against your
555        allowed_cpu_percentage, and which may be considerable if
556        process_bucket() runs quickly.
557
558        This method is for subclasses to override. No upcall is necessary.
559        """
560        pass
561
562    def finished_prefix(self, cycle, prefix):
563        """Notify a subclass that the crawler has just finished processing a
564        prefix directory (all buckets with the same two-character/10bit
565        prefix). To impose a limit on how much work might be duplicated by a
566        SIGKILL that occurs during a timeslice, you can call
567        self.save_state() here, but be aware that it may represent a
568        significant performance hit.
569
570        This method is for subclasses to override. No upcall is necessary.
571        """
572        pass
573
574    def finished_cycle(self, cycle):
575        """Notify subclass that a cycle (one complete traversal of all
576        prefixdirs) has just finished. 'cycle' is the number of the cycle
577        that just finished. This method should perform summary work and
578        update self.state to publish information to status displays.
579
580        One-shot crawlers, such as those used to upgrade shares to a new
581        format or populate a database for the first time, can call
582        self.stopService() (or more likely self.disownServiceParent()) to
583        prevent it from running a second time. Don't forget to set some
584        persistent state so that the upgrader won't be run again the next
585        time the node is started.
586
587        This method is for subclasses to override. No upcall is necessary.
588        """
589        pass
590
591    def yielding(self, sleep_time):
592        """The crawler is about to sleep for 'sleep_time' seconds. This
593        method is mostly for the convenience of unit tests.
594
595        This method is for subclasses to override. No upcall is necessary.
596        """
597        pass
598
599
600class BucketCountingCrawler(ShareCrawler):
601    """I keep track of how many buckets are being managed by this server.
602    This is equivalent to the number of distributed files and directories for
603    which I am providing storage. The actual number of files+directories in
604    the full grid is probably higher (especially when there are more servers
605    than 'N', the number of generated shares), because some files+directories
606    will have shares on other servers instead of me. Also note that the
607    number of buckets will differ from the number of shares in small grids,
608    when more than one share is placed on a single server.
609    """
610
611    minimum_cycle_time = 60*60 # we don't need this more than once an hour
612
613    def __init__(self, server, statefile, num_sample_prefixes=1):
614        ShareCrawler.__init__(self, server, statefile)
615        self.num_sample_prefixes = num_sample_prefixes
616
617    def add_initial_state(self):
618        # ["bucket-counts"][cyclenum][prefix] = number
619        # ["last-complete-cycle"] = cyclenum # maintained by base class
620        # ["last-complete-bucket-count"] = number
621        # ["storage-index-samples"][prefix] = (cyclenum,
622        #                                      list of SI strings (base32))
623        self.state.setdefault("bucket-counts", {})
624        self.state.setdefault("last-complete-bucket-count", None)
625        self.state.setdefault("storage-index-samples", {})
626
627    def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
628        # we override process_prefixdir() because we don't want to look at
629        # the individual buckets. We'll save state after each one. On my
630        # laptop, a mostly-empty storage server can process about 70
631        # prefixdirs in a 1.0s slice.
632        if cycle not in self.state["bucket-counts"]:
633            self.state["bucket-counts"][cycle] = {}
634        self.state["bucket-counts"][cycle][prefix] = len(buckets)
635        if prefix in self.prefixes[:self.num_sample_prefixes]:
636            self.state["storage-index-samples"][prefix] = (cycle, buckets)
637
638    def finished_cycle(self, cycle):
639        last_counts = self.state["bucket-counts"].get(cycle, [])
640        if len(last_counts) == len(self.prefixes):
641            # great, we have a whole cycle.
642            num_buckets = sum(last_counts.values())
643            self.state["last-complete-bucket-count"] = num_buckets
644            # get rid of old counts
645            for old_cycle in list(self.state["bucket-counts"].keys()):
646                if old_cycle != cycle:
647                    del self.state["bucket-counts"][old_cycle]
648        # get rid of old samples too
649        for prefix in list(self.state["storage-index-samples"].keys()):
650            old_cycle,buckets = self.state["storage-index-samples"][prefix]
651            if old_cycle != cycle:
652                del self.state["storage-index-samples"][prefix]
653
Note: See TracBrowser for help on using the repository browser.