1 | import json |
---|
2 | import time |
---|
3 | import os |
---|
4 | import struct |
---|
5 | from allmydata.storage.crawler import ( |
---|
6 | ShareCrawler, |
---|
7 | _confirm_json_format, |
---|
8 | _convert_cycle_data, |
---|
9 | _dump_json_to_file, |
---|
10 | ) |
---|
11 | from allmydata.storage.shares import get_share_file |
---|
12 | from allmydata.storage.common import UnknownMutableContainerVersionError, \ |
---|
13 | UnknownImmutableContainerVersionError |
---|
14 | from twisted.python import log as twlog |
---|
15 | from twisted.python.filepath import FilePath |
---|
16 | |
---|
17 | |
---|
18 | def _convert_pickle_state_to_json(state): |
---|
19 | """ |
---|
20 | Convert a pickle-serialized crawler-history state to the new JSON |
---|
21 | format. |
---|
22 | |
---|
23 | :param dict state: the pickled state |
---|
24 | |
---|
25 | :return dict: the state in the JSON form |
---|
26 | """ |
---|
27 | return { |
---|
28 | str(k): _convert_cycle_data(v) |
---|
29 | for k, v in state.items() |
---|
30 | } |
---|
31 | |
---|
32 | |
---|
33 | class _HistorySerializer(object): |
---|
34 | """ |
---|
35 | Serialize the 'history' file of the lease-crawler state. This is |
---|
36 | "storage/lease_checker.history" for the pickle or |
---|
37 | "storage/lease_checker.history.json" for the new JSON format. |
---|
38 | """ |
---|
39 | |
---|
40 | def __init__(self, history_path): |
---|
41 | self._path = _confirm_json_format(FilePath(history_path)) |
---|
42 | |
---|
43 | if not self._path.exists(): |
---|
44 | _dump_json_to_file({}, self._path) |
---|
45 | |
---|
46 | def load(self): |
---|
47 | """ |
---|
48 | Deserialize the existing data. |
---|
49 | |
---|
50 | :return dict: the existing history state |
---|
51 | """ |
---|
52 | with self._path.open("rb") as f: |
---|
53 | history = json.load(f) |
---|
54 | return history |
---|
55 | |
---|
56 | def save(self, new_history): |
---|
57 | """ |
---|
58 | Serialize the existing data as JSON. |
---|
59 | """ |
---|
60 | _dump_json_to_file(new_history, self._path) |
---|
61 | return None |
---|
62 | |
---|
63 | |
---|
64 | class LeaseCheckingCrawler(ShareCrawler): |
---|
65 | """I examine the leases on all shares, determining which are still valid |
---|
66 | and which have expired. I can remove the expired leases (if so |
---|
67 | configured), and the share will be deleted when the last lease is |
---|
68 | removed. |
---|
69 | |
---|
70 | I collect statistics on the leases and make these available to a web |
---|
71 | status page, including:: |
---|
72 | |
---|
73 | Space recovered during this cycle-so-far: |
---|
74 | actual (only if expiration_enabled=True): |
---|
75 | num-buckets, num-shares, sum of share sizes, real disk usage |
---|
76 | ('real disk usage' means we use stat(fn).st_blocks*512 and include any |
---|
77 | space used by the directory) |
---|
78 | what it would have been with the original lease expiration time |
---|
79 | what it would have been with our configured expiration time |
---|
80 | |
---|
81 | Prediction of space that will be recovered during the rest of this cycle |
---|
82 | Prediction of space that will be recovered by the entire current cycle. |
---|
83 | |
---|
84 | Space recovered during the last 10 cycles <-- saved in separate pickle |
---|
85 | |
---|
86 | Shares/buckets examined: |
---|
87 | this cycle-so-far |
---|
88 | prediction of rest of cycle |
---|
89 | during last 10 cycles <-- separate pickle |
---|
90 | start/finish time of last 10 cycles <-- separate pickle |
---|
91 | expiration time used for last 10 cycles <-- separate pickle |
---|
92 | |
---|
93 | Histogram of leases-per-share: |
---|
94 | this-cycle-to-date |
---|
95 | last 10 cycles <-- separate pickle |
---|
96 | Histogram of lease ages, buckets = 1day |
---|
97 | cycle-to-date |
---|
98 | last 10 cycles <-- separate pickle |
---|
99 | |
---|
100 | All cycle-to-date values remain valid until the start of the next cycle. |
---|
101 | |
---|
102 | """ |
---|
103 | |
---|
104 | slow_start = 360 # wait 6 minutes after startup |
---|
105 | minimum_cycle_time = 12*60*60 # not more than twice per day |
---|
106 | |
---|
107 | def __init__(self, server, statefile, historyfile, |
---|
108 | expiration_enabled, mode, |
---|
109 | override_lease_duration, # used if expiration_mode=="age" |
---|
110 | cutoff_date, # used if expiration_mode=="cutoff-date" |
---|
111 | sharetypes): |
---|
112 | self._history_serializer = _HistorySerializer(historyfile) |
---|
113 | self.expiration_enabled = expiration_enabled |
---|
114 | self.mode = mode |
---|
115 | self.override_lease_duration = None |
---|
116 | self.cutoff_date = None |
---|
117 | if self.mode == "age": |
---|
118 | assert isinstance(override_lease_duration, (int, type(None))) |
---|
119 | self.override_lease_duration = override_lease_duration # seconds |
---|
120 | elif self.mode == "cutoff-date": |
---|
121 | assert isinstance(cutoff_date, int) # seconds-since-epoch |
---|
122 | assert cutoff_date is not None |
---|
123 | self.cutoff_date = cutoff_date |
---|
124 | else: |
---|
125 | raise ValueError("GC mode '%s' must be 'age' or 'cutoff-date'" % mode) |
---|
126 | self.sharetypes_to_expire = sharetypes |
---|
127 | ShareCrawler.__init__(self, server, statefile) |
---|
128 | |
---|
129 | def add_initial_state(self): |
---|
130 | # we fill ["cycle-to-date"] here (even though they will be reset in |
---|
131 | # self.started_cycle) just in case someone grabs our state before we |
---|
132 | # get started: unit tests do this |
---|
133 | so_far = self.create_empty_cycle_dict() |
---|
134 | self.state.setdefault("cycle-to-date", so_far) |
---|
135 | # in case we upgrade the code while a cycle is in progress, update |
---|
136 | # the keys individually |
---|
137 | for k in so_far: |
---|
138 | self.state["cycle-to-date"].setdefault(k, so_far[k]) |
---|
139 | |
---|
140 | def create_empty_cycle_dict(self): |
---|
141 | recovered = self.create_empty_recovered_dict() |
---|
142 | so_far = {"corrupt-shares": [], |
---|
143 | "space-recovered": recovered, |
---|
144 | "lease-age-histogram": {}, # (minage,maxage)->count |
---|
145 | "leases-per-share-histogram": {}, # leasecount->numshares |
---|
146 | } |
---|
147 | return so_far |
---|
148 | |
---|
149 | def create_empty_recovered_dict(self): |
---|
150 | recovered = {} |
---|
151 | for a in ("actual", "original", "configured", "examined"): |
---|
152 | for b in ("buckets", "shares", "sharebytes", "diskbytes"): |
---|
153 | recovered[a+"-"+b] = 0 |
---|
154 | recovered[a+"-"+b+"-mutable"] = 0 |
---|
155 | recovered[a+"-"+b+"-immutable"] = 0 |
---|
156 | return recovered |
---|
157 | |
---|
158 | def started_cycle(self, cycle): |
---|
159 | self.state["cycle-to-date"] = self.create_empty_cycle_dict() |
---|
160 | |
---|
161 | def stat(self, fn): |
---|
162 | return os.stat(fn) |
---|
163 | |
---|
164 | def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): |
---|
165 | bucketdir = os.path.join(prefixdir, storage_index_b32) |
---|
166 | s = self.stat(bucketdir) |
---|
167 | would_keep_shares = [] |
---|
168 | wks = None |
---|
169 | |
---|
170 | for fn in os.listdir(bucketdir): |
---|
171 | try: |
---|
172 | shnum = int(fn) |
---|
173 | except ValueError: |
---|
174 | continue # non-numeric means not a sharefile |
---|
175 | sharefile = os.path.join(bucketdir, fn) |
---|
176 | try: |
---|
177 | wks = self.process_share(sharefile) |
---|
178 | except (UnknownMutableContainerVersionError, |
---|
179 | UnknownImmutableContainerVersionError, |
---|
180 | struct.error): |
---|
181 | twlog.msg("lease-checker error processing %s" % sharefile) |
---|
182 | twlog.err() |
---|
183 | which = [storage_index_b32, shnum] |
---|
184 | self.state["cycle-to-date"]["corrupt-shares"].append(which) |
---|
185 | wks = (1, 1, 1, "unknown") |
---|
186 | would_keep_shares.append(wks) |
---|
187 | |
---|
188 | sharetype = None |
---|
189 | if wks: |
---|
190 | # use the last share's sharetype as the buckettype |
---|
191 | sharetype = wks[3] |
---|
192 | rec = self.state["cycle-to-date"]["space-recovered"] |
---|
193 | self.increment(rec, "examined-buckets", 1) |
---|
194 | if sharetype: |
---|
195 | self.increment(rec, "examined-buckets-"+sharetype, 1) |
---|
196 | del wks |
---|
197 | |
---|
198 | try: |
---|
199 | bucket_diskbytes = s.st_blocks * 512 |
---|
200 | except AttributeError: |
---|
201 | bucket_diskbytes = 0 # no stat().st_blocks on windows |
---|
202 | if sum([wks[0] for wks in would_keep_shares]) == 0: |
---|
203 | self.increment_bucketspace("original", bucket_diskbytes, sharetype) |
---|
204 | if sum([wks[1] for wks in would_keep_shares]) == 0: |
---|
205 | self.increment_bucketspace("configured", bucket_diskbytes, sharetype) |
---|
206 | if sum([wks[2] for wks in would_keep_shares]) == 0: |
---|
207 | self.increment_bucketspace("actual", bucket_diskbytes, sharetype) |
---|
208 | |
---|
209 | def process_share(self, sharefilename): |
---|
210 | # first, find out what kind of a share it is |
---|
211 | sf = get_share_file(sharefilename) |
---|
212 | sharetype = sf.sharetype |
---|
213 | now = time.time() |
---|
214 | s = self.stat(sharefilename) |
---|
215 | |
---|
216 | num_leases = 0 |
---|
217 | num_valid_leases_original = 0 |
---|
218 | num_valid_leases_configured = 0 |
---|
219 | expired_leases_configured = [] |
---|
220 | |
---|
221 | for li in sf.get_leases(): |
---|
222 | num_leases += 1 |
---|
223 | original_expiration_time = li.get_expiration_time() |
---|
224 | grant_renew_time = li.get_grant_renew_time_time() |
---|
225 | age = li.get_age() |
---|
226 | self.add_lease_age_to_histogram(age) |
---|
227 | |
---|
228 | # expired-or-not according to original expiration time |
---|
229 | if original_expiration_time > now: |
---|
230 | num_valid_leases_original += 1 |
---|
231 | |
---|
232 | # expired-or-not according to our configured age limit |
---|
233 | expired = False |
---|
234 | if self.mode == "age": |
---|
235 | age_limit = original_expiration_time |
---|
236 | if self.override_lease_duration is not None: |
---|
237 | age_limit = self.override_lease_duration |
---|
238 | if age > age_limit: |
---|
239 | expired = True |
---|
240 | else: |
---|
241 | assert self.mode == "cutoff-date" |
---|
242 | if grant_renew_time < self.cutoff_date: |
---|
243 | expired = True |
---|
244 | if sharetype not in self.sharetypes_to_expire: |
---|
245 | expired = False |
---|
246 | |
---|
247 | if expired: |
---|
248 | expired_leases_configured.append(li) |
---|
249 | else: |
---|
250 | num_valid_leases_configured += 1 |
---|
251 | |
---|
252 | so_far = self.state["cycle-to-date"] |
---|
253 | self.increment(so_far["leases-per-share-histogram"], str(num_leases), 1) |
---|
254 | self.increment_space("examined", s, sharetype) |
---|
255 | |
---|
256 | would_keep_share = [1, 1, 1, sharetype] |
---|
257 | |
---|
258 | if self.expiration_enabled: |
---|
259 | for li in expired_leases_configured: |
---|
260 | sf.cancel_lease(li.cancel_secret) |
---|
261 | |
---|
262 | if num_valid_leases_original == 0: |
---|
263 | would_keep_share[0] = 0 |
---|
264 | self.increment_space("original", s, sharetype) |
---|
265 | |
---|
266 | if num_valid_leases_configured == 0: |
---|
267 | would_keep_share[1] = 0 |
---|
268 | self.increment_space("configured", s, sharetype) |
---|
269 | if self.expiration_enabled: |
---|
270 | would_keep_share[2] = 0 |
---|
271 | self.increment_space("actual", s, sharetype) |
---|
272 | |
---|
273 | return would_keep_share |
---|
274 | |
---|
275 | def increment_space(self, a, s, sharetype): |
---|
276 | sharebytes = s.st_size |
---|
277 | try: |
---|
278 | # note that stat(2) says that st_blocks is 512 bytes, and that |
---|
279 | # st_blksize is "optimal file sys I/O ops blocksize", which is |
---|
280 | # independent of the block-size that st_blocks uses. |
---|
281 | diskbytes = s.st_blocks * 512 |
---|
282 | except AttributeError: |
---|
283 | # the docs say that st_blocks is only on linux. I also see it on |
---|
284 | # MacOS. But it isn't available on windows. |
---|
285 | diskbytes = sharebytes |
---|
286 | so_far_sr = self.state["cycle-to-date"]["space-recovered"] |
---|
287 | self.increment(so_far_sr, a+"-shares", 1) |
---|
288 | self.increment(so_far_sr, a+"-sharebytes", sharebytes) |
---|
289 | self.increment(so_far_sr, a+"-diskbytes", diskbytes) |
---|
290 | if sharetype: |
---|
291 | self.increment(so_far_sr, a+"-shares-"+sharetype, 1) |
---|
292 | self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes) |
---|
293 | self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes) |
---|
294 | |
---|
295 | def increment_bucketspace(self, a, bucket_diskbytes, sharetype): |
---|
296 | rec = self.state["cycle-to-date"]["space-recovered"] |
---|
297 | self.increment(rec, a+"-diskbytes", bucket_diskbytes) |
---|
298 | self.increment(rec, a+"-buckets", 1) |
---|
299 | if sharetype: |
---|
300 | self.increment(rec, a+"-diskbytes-"+sharetype, bucket_diskbytes) |
---|
301 | self.increment(rec, a+"-buckets-"+sharetype, 1) |
---|
302 | |
---|
303 | def increment(self, d, k, delta=1): |
---|
304 | if k not in d: |
---|
305 | d[k] = 0 |
---|
306 | d[k] += delta |
---|
307 | |
---|
308 | def add_lease_age_to_histogram(self, age): |
---|
309 | bucket_interval = 24*60*60 |
---|
310 | bucket_number = int(age/bucket_interval) |
---|
311 | bucket_start = bucket_number * bucket_interval |
---|
312 | bucket_end = bucket_start + bucket_interval |
---|
313 | k = (bucket_start, bucket_end) |
---|
314 | self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1) |
---|
315 | |
---|
316 | def convert_lease_age_histogram(self, lah): |
---|
317 | # convert { (minage,maxage) : count } into [ (minage,maxage,count) ] |
---|
318 | # since the former is not JSON-safe (JSON dictionaries must have |
---|
319 | # string keys). |
---|
320 | json_safe_lah = [] |
---|
321 | for k in sorted(lah): |
---|
322 | (minage,maxage) = k |
---|
323 | json_safe_lah.append( (minage, maxage, lah[k]) ) |
---|
324 | return json_safe_lah |
---|
325 | |
---|
326 | def finished_cycle(self, cycle): |
---|
327 | # add to our history state, prune old history |
---|
328 | h = {} |
---|
329 | |
---|
330 | start = self.state["current-cycle-start-time"] |
---|
331 | now = time.time() |
---|
332 | h["cycle-start-finish-times"] = [start, now] |
---|
333 | h["expiration-enabled"] = self.expiration_enabled |
---|
334 | h["configured-expiration-mode"] = [ |
---|
335 | self.mode, |
---|
336 | self.override_lease_duration, |
---|
337 | self.cutoff_date, |
---|
338 | self.sharetypes_to_expire, |
---|
339 | ] |
---|
340 | |
---|
341 | s = self.state["cycle-to-date"] |
---|
342 | |
---|
343 | # state["lease-age-histogram"] is a dictionary (mapping |
---|
344 | # (minage,maxage) tuple to a sharecount), but we report |
---|
345 | # self.get_state()["lease-age-histogram"] as a list of |
---|
346 | # (min,max,sharecount) tuples, because JSON can handle that better. |
---|
347 | # We record the list-of-tuples form into the history for the same |
---|
348 | # reason. |
---|
349 | lah = self.convert_lease_age_histogram(s["lease-age-histogram"]) |
---|
350 | h["lease-age-histogram"] = lah |
---|
351 | h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy() |
---|
352 | h["corrupt-shares"] = s["corrupt-shares"][:] |
---|
353 | # note: if ["shares-recovered"] ever acquires an internal dict, this |
---|
354 | # copy() needs to become a deepcopy |
---|
355 | h["space-recovered"] = s["space-recovered"].copy() |
---|
356 | |
---|
357 | history = self._history_serializer.load() |
---|
358 | history[str(cycle)] = h |
---|
359 | while len(history) > 10: |
---|
360 | oldcycles = sorted(int(k) for k in history.keys()) |
---|
361 | del history[str(oldcycles[0])] |
---|
362 | self._history_serializer.save(history) |
---|
363 | |
---|
364 | def get_state(self): |
---|
365 | """In addition to the crawler state described in |
---|
366 | ShareCrawler.get_state(), I return the following keys which are |
---|
367 | specific to the lease-checker/expirer. Note that the non-history keys |
---|
368 | (with 'cycle' in their names) are only present if a cycle is |
---|
369 | currently running. If the crawler is between cycles, it appropriate |
---|
370 | to show the latest item in the 'history' key instead. Also note that |
---|
371 | each history item has all the data in the 'cycle-to-date' value, plus |
---|
372 | cycle-start-finish-times. |
---|
373 | |
---|
374 | cycle-to-date: |
---|
375 | expiration-enabled |
---|
376 | configured-expiration-mode |
---|
377 | lease-age-histogram (list of (minage,maxage,sharecount) tuples) |
---|
378 | leases-per-share-histogram |
---|
379 | corrupt-shares (list of (si_b32,shnum) tuples, minimal verification) |
---|
380 | space-recovered |
---|
381 | |
---|
382 | estimated-remaining-cycle: |
---|
383 | # Values may be None if not enough data has been gathered to |
---|
384 | # produce an estimate. |
---|
385 | space-recovered |
---|
386 | |
---|
387 | estimated-current-cycle: |
---|
388 | # cycle-to-date plus estimated-remaining. Values may be None if |
---|
389 | # not enough data has been gathered to produce an estimate. |
---|
390 | space-recovered |
---|
391 | |
---|
392 | history: maps cyclenum to a dict with the following keys: |
---|
393 | cycle-start-finish-times |
---|
394 | expiration-enabled |
---|
395 | configured-expiration-mode |
---|
396 | lease-age-histogram |
---|
397 | leases-per-share-histogram |
---|
398 | corrupt-shares |
---|
399 | space-recovered |
---|
400 | |
---|
401 | The 'space-recovered' structure is a dictionary with the following |
---|
402 | keys: |
---|
403 | # 'examined' is what was looked at |
---|
404 | examined-buckets, examined-buckets-mutable, examined-buckets-immutable |
---|
405 | examined-shares, -mutable, -immutable |
---|
406 | examined-sharebytes, -mutable, -immutable |
---|
407 | examined-diskbytes, -mutable, -immutable |
---|
408 | |
---|
409 | # 'actual' is what was actually deleted |
---|
410 | actual-buckets, -mutable, -immutable |
---|
411 | actual-shares, -mutable, -immutable |
---|
412 | actual-sharebytes, -mutable, -immutable |
---|
413 | actual-diskbytes, -mutable, -immutable |
---|
414 | |
---|
415 | # would have been deleted, if the original lease timer was used |
---|
416 | original-buckets, -mutable, -immutable |
---|
417 | original-shares, -mutable, -immutable |
---|
418 | original-sharebytes, -mutable, -immutable |
---|
419 | original-diskbytes, -mutable, -immutable |
---|
420 | |
---|
421 | # would have been deleted, if our configured max_age was used |
---|
422 | configured-buckets, -mutable, -immutable |
---|
423 | configured-shares, -mutable, -immutable |
---|
424 | configured-sharebytes, -mutable, -immutable |
---|
425 | configured-diskbytes, -mutable, -immutable |
---|
426 | |
---|
427 | """ |
---|
428 | progress = self.get_progress() |
---|
429 | |
---|
430 | state = ShareCrawler.get_state(self) # does a shallow copy |
---|
431 | state["history"] = self._history_serializer.load() |
---|
432 | |
---|
433 | if not progress["cycle-in-progress"]: |
---|
434 | del state["cycle-to-date"] |
---|
435 | return state |
---|
436 | |
---|
437 | so_far = state["cycle-to-date"].copy() |
---|
438 | state["cycle-to-date"] = so_far |
---|
439 | |
---|
440 | lah = so_far["lease-age-histogram"] |
---|
441 | so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah) |
---|
442 | so_far["expiration-enabled"] = self.expiration_enabled |
---|
443 | so_far["configured-expiration-mode"] = [ |
---|
444 | self.mode, |
---|
445 | self.override_lease_duration, |
---|
446 | self.cutoff_date, |
---|
447 | self.sharetypes_to_expire, |
---|
448 | ] |
---|
449 | |
---|
450 | so_far_sr = so_far["space-recovered"] |
---|
451 | remaining_sr = {} |
---|
452 | remaining = {"space-recovered": remaining_sr} |
---|
453 | cycle_sr = {} |
---|
454 | cycle = {"space-recovered": cycle_sr} |
---|
455 | |
---|
456 | if progress["cycle-complete-percentage"] > 0.0: |
---|
457 | pc = progress["cycle-complete-percentage"] / 100.0 |
---|
458 | m = (1-pc)/pc |
---|
459 | for a in ("actual", "original", "configured", "examined"): |
---|
460 | for b in ("buckets", "shares", "sharebytes", "diskbytes"): |
---|
461 | for c in ("", "-mutable", "-immutable"): |
---|
462 | k = a+"-"+b+c |
---|
463 | remaining_sr[k] = m * so_far_sr[k] |
---|
464 | cycle_sr[k] = so_far_sr[k] + remaining_sr[k] |
---|
465 | else: |
---|
466 | for a in ("actual", "original", "configured", "examined"): |
---|
467 | for b in ("buckets", "shares", "sharebytes", "diskbytes"): |
---|
468 | for c in ("", "-mutable", "-immutable"): |
---|
469 | k = a+"-"+b+c |
---|
470 | remaining_sr[k] = None |
---|
471 | cycle_sr[k] = None |
---|
472 | |
---|
473 | state["estimated-remaining-cycle"] = remaining |
---|
474 | state["estimated-current-cycle"] = cycle |
---|
475 | return state |
---|