source: trunk/src/allmydata/storage/expirer.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 18.2 KB
Line 
1import json
2import time
3import os
4import struct
5from allmydata.storage.crawler import (
6    ShareCrawler,
7    _confirm_json_format,
8    _convert_cycle_data,
9    _dump_json_to_file,
10)
11from allmydata.storage.shares import get_share_file
12from allmydata.storage.common import UnknownMutableContainerVersionError, \
13     UnknownImmutableContainerVersionError
14from twisted.python import log as twlog
15from twisted.python.filepath import FilePath
16
17
18def _convert_pickle_state_to_json(state):
19    """
20    Convert a pickle-serialized crawler-history state to the new JSON
21    format.
22
23    :param dict state: the pickled state
24
25    :return dict: the state in the JSON form
26    """
27    return {
28        str(k): _convert_cycle_data(v)
29        for k, v in state.items()
30    }
31
32
33class _HistorySerializer(object):
34    """
35    Serialize the 'history' file of the lease-crawler state. This is
36    "storage/lease_checker.history" for the pickle or
37    "storage/lease_checker.history.json" for the new JSON format.
38    """
39
40    def __init__(self, history_path):
41        self._path = _confirm_json_format(FilePath(history_path))
42
43        if not self._path.exists():
44            _dump_json_to_file({}, self._path)
45
46    def load(self):
47        """
48        Deserialize the existing data.
49
50        :return dict: the existing history state
51        """
52        with self._path.open("rb") as f:
53            history = json.load(f)
54        return history
55
56    def save(self, new_history):
57        """
58        Serialize the existing data as JSON.
59        """
60        _dump_json_to_file(new_history, self._path)
61        return None
62
63
64class LeaseCheckingCrawler(ShareCrawler):
65    """I examine the leases on all shares, determining which are still valid
66    and which have expired. I can remove the expired leases (if so
67    configured), and the share will be deleted when the last lease is
68    removed.
69
70    I collect statistics on the leases and make these available to a web
71    status page, including::
72
73    Space recovered during this cycle-so-far:
74     actual (only if expiration_enabled=True):
75      num-buckets, num-shares, sum of share sizes, real disk usage
76      ('real disk usage' means we use stat(fn).st_blocks*512 and include any
77       space used by the directory)
78     what it would have been with the original lease expiration time
79     what it would have been with our configured expiration time
80
81    Prediction of space that will be recovered during the rest of this cycle
82    Prediction of space that will be recovered by the entire current cycle.
83
84    Space recovered during the last 10 cycles  <-- saved in separate pickle
85
86    Shares/buckets examined:
87     this cycle-so-far
88     prediction of rest of cycle
89     during last 10 cycles <-- separate pickle
90    start/finish time of last 10 cycles  <-- separate pickle
91    expiration time used for last 10 cycles <-- separate pickle
92
93    Histogram of leases-per-share:
94     this-cycle-to-date
95     last 10 cycles <-- separate pickle
96    Histogram of lease ages, buckets = 1day
97     cycle-to-date
98     last 10 cycles <-- separate pickle
99
100    All cycle-to-date values remain valid until the start of the next cycle.
101
102    """
103
104    slow_start = 360 # wait 6 minutes after startup
105    minimum_cycle_time = 12*60*60 # not more than twice per day
106
107    def __init__(self, server, statefile, historyfile,
108                 expiration_enabled, mode,
109                 override_lease_duration, # used if expiration_mode=="age"
110                 cutoff_date, # used if expiration_mode=="cutoff-date"
111                 sharetypes):
112        self._history_serializer = _HistorySerializer(historyfile)
113        self.expiration_enabled = expiration_enabled
114        self.mode = mode
115        self.override_lease_duration = None
116        self.cutoff_date = None
117        if self.mode == "age":
118            assert isinstance(override_lease_duration, (int, type(None)))
119            self.override_lease_duration = override_lease_duration # seconds
120        elif self.mode == "cutoff-date":
121            assert isinstance(cutoff_date, int) # seconds-since-epoch
122            assert cutoff_date is not None
123            self.cutoff_date = cutoff_date
124        else:
125            raise ValueError("GC mode '%s' must be 'age' or 'cutoff-date'" % mode)
126        self.sharetypes_to_expire = sharetypes
127        ShareCrawler.__init__(self, server, statefile)
128
129    def add_initial_state(self):
130        # we fill ["cycle-to-date"] here (even though they will be reset in
131        # self.started_cycle) just in case someone grabs our state before we
132        # get started: unit tests do this
133        so_far = self.create_empty_cycle_dict()
134        self.state.setdefault("cycle-to-date", so_far)
135        # in case we upgrade the code while a cycle is in progress, update
136        # the keys individually
137        for k in so_far:
138            self.state["cycle-to-date"].setdefault(k, so_far[k])
139
140    def create_empty_cycle_dict(self):
141        recovered = self.create_empty_recovered_dict()
142        so_far = {"corrupt-shares": [],
143                  "space-recovered": recovered,
144                  "lease-age-histogram": {}, # (minage,maxage)->count
145                  "leases-per-share-histogram": {}, # leasecount->numshares
146                  }
147        return so_far
148
149    def create_empty_recovered_dict(self):
150        recovered = {}
151        for a in ("actual", "original", "configured", "examined"):
152            for b in ("buckets", "shares", "sharebytes", "diskbytes"):
153                recovered[a+"-"+b] = 0
154                recovered[a+"-"+b+"-mutable"] = 0
155                recovered[a+"-"+b+"-immutable"] = 0
156        return recovered
157
158    def started_cycle(self, cycle):
159        self.state["cycle-to-date"] = self.create_empty_cycle_dict()
160
161    def stat(self, fn):
162        return os.stat(fn)
163
164    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
165        bucketdir = os.path.join(prefixdir, storage_index_b32)
166        s = self.stat(bucketdir)
167        would_keep_shares = []
168        wks = None
169
170        for fn in os.listdir(bucketdir):
171            try:
172                shnum = int(fn)
173            except ValueError:
174                continue # non-numeric means not a sharefile
175            sharefile = os.path.join(bucketdir, fn)
176            try:
177                wks = self.process_share(sharefile)
178            except (UnknownMutableContainerVersionError,
179                    UnknownImmutableContainerVersionError,
180                    struct.error):
181                twlog.msg("lease-checker error processing %s" % sharefile)
182                twlog.err()
183                which = [storage_index_b32, shnum]
184                self.state["cycle-to-date"]["corrupt-shares"].append(which)
185                wks = (1, 1, 1, "unknown")
186            would_keep_shares.append(wks)
187
188        sharetype = None
189        if wks:
190            # use the last share's sharetype as the buckettype
191            sharetype = wks[3]
192        rec = self.state["cycle-to-date"]["space-recovered"]
193        self.increment(rec, "examined-buckets", 1)
194        if sharetype:
195            self.increment(rec, "examined-buckets-"+sharetype, 1)
196        del wks
197
198        try:
199            bucket_diskbytes = s.st_blocks * 512
200        except AttributeError:
201            bucket_diskbytes = 0 # no stat().st_blocks on windows
202        if sum([wks[0] for wks in would_keep_shares]) == 0:
203            self.increment_bucketspace("original", bucket_diskbytes, sharetype)
204        if sum([wks[1] for wks in would_keep_shares]) == 0:
205            self.increment_bucketspace("configured", bucket_diskbytes, sharetype)
206        if sum([wks[2] for wks in would_keep_shares]) == 0:
207            self.increment_bucketspace("actual", bucket_diskbytes, sharetype)
208
209    def process_share(self, sharefilename):
210        # first, find out what kind of a share it is
211        sf = get_share_file(sharefilename)
212        sharetype = sf.sharetype
213        now = time.time()
214        s = self.stat(sharefilename)
215
216        num_leases = 0
217        num_valid_leases_original = 0
218        num_valid_leases_configured = 0
219        expired_leases_configured = []
220
221        for li in sf.get_leases():
222            num_leases += 1
223            original_expiration_time = li.get_expiration_time()
224            grant_renew_time = li.get_grant_renew_time_time()
225            age = li.get_age()
226            self.add_lease_age_to_histogram(age)
227
228            #  expired-or-not according to original expiration time
229            if original_expiration_time > now:
230                num_valid_leases_original += 1
231
232            #  expired-or-not according to our configured age limit
233            expired = False
234            if self.mode == "age":
235                age_limit = original_expiration_time
236                if self.override_lease_duration is not None:
237                    age_limit = self.override_lease_duration
238                if age > age_limit:
239                    expired = True
240            else:
241                assert self.mode == "cutoff-date"
242                if grant_renew_time < self.cutoff_date:
243                    expired = True
244            if sharetype not in self.sharetypes_to_expire:
245                expired = False
246
247            if expired:
248                expired_leases_configured.append(li)
249            else:
250                num_valid_leases_configured += 1
251
252        so_far = self.state["cycle-to-date"]
253        self.increment(so_far["leases-per-share-histogram"], str(num_leases), 1)
254        self.increment_space("examined", s, sharetype)
255
256        would_keep_share = [1, 1, 1, sharetype]
257
258        if self.expiration_enabled:
259            for li in expired_leases_configured:
260                sf.cancel_lease(li.cancel_secret)
261
262        if num_valid_leases_original == 0:
263            would_keep_share[0] = 0
264            self.increment_space("original", s, sharetype)
265
266        if num_valid_leases_configured == 0:
267            would_keep_share[1] = 0
268            self.increment_space("configured", s, sharetype)
269            if self.expiration_enabled:
270                would_keep_share[2] = 0
271                self.increment_space("actual", s, sharetype)
272
273        return would_keep_share
274
275    def increment_space(self, a, s, sharetype):
276        sharebytes = s.st_size
277        try:
278            # note that stat(2) says that st_blocks is 512 bytes, and that
279            # st_blksize is "optimal file sys I/O ops blocksize", which is
280            # independent of the block-size that st_blocks uses.
281            diskbytes = s.st_blocks * 512
282        except AttributeError:
283            # the docs say that st_blocks is only on linux. I also see it on
284            # MacOS. But it isn't available on windows.
285            diskbytes = sharebytes
286        so_far_sr = self.state["cycle-to-date"]["space-recovered"]
287        self.increment(so_far_sr, a+"-shares", 1)
288        self.increment(so_far_sr, a+"-sharebytes", sharebytes)
289        self.increment(so_far_sr, a+"-diskbytes", diskbytes)
290        if sharetype:
291            self.increment(so_far_sr, a+"-shares-"+sharetype, 1)
292            self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes)
293            self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes)
294
295    def increment_bucketspace(self, a, bucket_diskbytes, sharetype):
296        rec = self.state["cycle-to-date"]["space-recovered"]
297        self.increment(rec, a+"-diskbytes", bucket_diskbytes)
298        self.increment(rec, a+"-buckets", 1)
299        if sharetype:
300            self.increment(rec, a+"-diskbytes-"+sharetype, bucket_diskbytes)
301            self.increment(rec, a+"-buckets-"+sharetype, 1)
302
303    def increment(self, d, k, delta=1):
304        if k not in d:
305            d[k] = 0
306        d[k] += delta
307
308    def add_lease_age_to_histogram(self, age):
309        bucket_interval = 24*60*60
310        bucket_number = int(age/bucket_interval)
311        bucket_start = bucket_number * bucket_interval
312        bucket_end = bucket_start + bucket_interval
313        k = (bucket_start, bucket_end)
314        self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1)
315
316    def convert_lease_age_histogram(self, lah):
317        # convert { (minage,maxage) : count } into [ (minage,maxage,count) ]
318        # since the former is not JSON-safe (JSON dictionaries must have
319        # string keys).
320        json_safe_lah = []
321        for k in sorted(lah):
322            (minage,maxage) = k
323            json_safe_lah.append( (minage, maxage, lah[k]) )
324        return json_safe_lah
325
326    def finished_cycle(self, cycle):
327        # add to our history state, prune old history
328        h = {}
329
330        start = self.state["current-cycle-start-time"]
331        now = time.time()
332        h["cycle-start-finish-times"] = [start, now]
333        h["expiration-enabled"] = self.expiration_enabled
334        h["configured-expiration-mode"] = [
335            self.mode,
336            self.override_lease_duration,
337            self.cutoff_date,
338            self.sharetypes_to_expire,
339        ]
340
341        s = self.state["cycle-to-date"]
342
343        # state["lease-age-histogram"] is a dictionary (mapping
344        # (minage,maxage) tuple to a sharecount), but we report
345        # self.get_state()["lease-age-histogram"] as a list of
346        # (min,max,sharecount) tuples, because JSON can handle that better.
347        # We record the list-of-tuples form into the history for the same
348        # reason.
349        lah = self.convert_lease_age_histogram(s["lease-age-histogram"])
350        h["lease-age-histogram"] = lah
351        h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy()
352        h["corrupt-shares"] = s["corrupt-shares"][:]
353        # note: if ["shares-recovered"] ever acquires an internal dict, this
354        # copy() needs to become a deepcopy
355        h["space-recovered"] = s["space-recovered"].copy()
356
357        history = self._history_serializer.load()
358        history[str(cycle)] = h
359        while len(history) > 10:
360            oldcycles = sorted(int(k) for k in history.keys())
361            del history[str(oldcycles[0])]
362        self._history_serializer.save(history)
363
364    def get_state(self):
365        """In addition to the crawler state described in
366        ShareCrawler.get_state(), I return the following keys which are
367        specific to the lease-checker/expirer. Note that the non-history keys
368        (with 'cycle' in their names) are only present if a cycle is
369        currently running. If the crawler is between cycles, it appropriate
370        to show the latest item in the 'history' key instead. Also note that
371        each history item has all the data in the 'cycle-to-date' value, plus
372        cycle-start-finish-times.
373
374         cycle-to-date:
375          expiration-enabled
376          configured-expiration-mode
377          lease-age-histogram (list of (minage,maxage,sharecount) tuples)
378          leases-per-share-histogram
379          corrupt-shares (list of (si_b32,shnum) tuples, minimal verification)
380          space-recovered
381
382         estimated-remaining-cycle:
383          # Values may be None if not enough data has been gathered to
384          # produce an estimate.
385          space-recovered
386
387         estimated-current-cycle:
388          # cycle-to-date plus estimated-remaining. Values may be None if
389          # not enough data has been gathered to produce an estimate.
390          space-recovered
391
392         history: maps cyclenum to a dict with the following keys:
393          cycle-start-finish-times
394          expiration-enabled
395          configured-expiration-mode
396          lease-age-histogram
397          leases-per-share-histogram
398          corrupt-shares
399          space-recovered
400
401         The 'space-recovered' structure is a dictionary with the following
402         keys:
403          # 'examined' is what was looked at
404          examined-buckets, examined-buckets-mutable, examined-buckets-immutable
405          examined-shares, -mutable, -immutable
406          examined-sharebytes, -mutable, -immutable
407          examined-diskbytes, -mutable, -immutable
408
409          # 'actual' is what was actually deleted
410          actual-buckets, -mutable, -immutable
411          actual-shares, -mutable, -immutable
412          actual-sharebytes, -mutable, -immutable
413          actual-diskbytes, -mutable, -immutable
414
415          # would have been deleted, if the original lease timer was used
416          original-buckets, -mutable, -immutable
417          original-shares, -mutable, -immutable
418          original-sharebytes, -mutable, -immutable
419          original-diskbytes, -mutable, -immutable
420
421          # would have been deleted, if our configured max_age was used
422          configured-buckets, -mutable, -immutable
423          configured-shares, -mutable, -immutable
424          configured-sharebytes, -mutable, -immutable
425          configured-diskbytes, -mutable, -immutable
426
427        """
428        progress = self.get_progress()
429
430        state = ShareCrawler.get_state(self) # does a shallow copy
431        state["history"] = self._history_serializer.load()
432
433        if not progress["cycle-in-progress"]:
434            del state["cycle-to-date"]
435            return state
436
437        so_far = state["cycle-to-date"].copy()
438        state["cycle-to-date"] = so_far
439
440        lah = so_far["lease-age-histogram"]
441        so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah)
442        so_far["expiration-enabled"] = self.expiration_enabled
443        so_far["configured-expiration-mode"] = [
444            self.mode,
445            self.override_lease_duration,
446            self.cutoff_date,
447            self.sharetypes_to_expire,
448        ]
449
450        so_far_sr = so_far["space-recovered"]
451        remaining_sr = {}
452        remaining = {"space-recovered": remaining_sr}
453        cycle_sr = {}
454        cycle = {"space-recovered": cycle_sr}
455
456        if progress["cycle-complete-percentage"] > 0.0:
457            pc = progress["cycle-complete-percentage"] / 100.0
458            m = (1-pc)/pc
459            for a in ("actual", "original", "configured", "examined"):
460                for b in ("buckets", "shares", "sharebytes", "diskbytes"):
461                    for c in ("", "-mutable", "-immutable"):
462                        k = a+"-"+b+c
463                        remaining_sr[k] = m * so_far_sr[k]
464                        cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
465        else:
466            for a in ("actual", "original", "configured", "examined"):
467                for b in ("buckets", "shares", "sharebytes", "diskbytes"):
468                    for c in ("", "-mutable", "-immutable"):
469                        k = a+"-"+b+c
470                        remaining_sr[k] = None
471                        cycle_sr[k] = None
472
473        state["estimated-remaining-cycle"] = remaining
474        state["estimated-current-cycle"] = cycle
475        return state
Note: See TracBrowser for help on using the repository browser.