1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from six import ensure_str |
---|
6 | |
---|
7 | from allmydata.uri import from_string |
---|
8 | from allmydata.util import base32, log, dictutil |
---|
9 | from allmydata.util.happinessutil import servers_of_happiness |
---|
10 | from allmydata.check_results import CheckAndRepairResults, CheckResults |
---|
11 | |
---|
12 | from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError |
---|
13 | from allmydata.mutable.servermap import ServerMap, ServermapUpdater |
---|
14 | from allmydata.mutable.retrieve import Retrieve # for verifying |
---|
15 | |
---|
16 | class MutableChecker(object): |
---|
17 | SERVERMAP_MODE = MODE_CHECK |
---|
18 | |
---|
19 | def __init__(self, node, storage_broker, history, monitor): |
---|
20 | self._node = node |
---|
21 | self._storage_broker = storage_broker |
---|
22 | self._history = history |
---|
23 | self._monitor = monitor |
---|
24 | self.bad_shares = [] # list of (server,shnum,failure) |
---|
25 | self._storage_index = self._node.get_storage_index() |
---|
26 | self.need_repair = False |
---|
27 | self.responded = set() # set of (binary) nodeids |
---|
28 | |
---|
29 | def check(self, verify=False, add_lease=False): |
---|
30 | servermap = ServerMap() |
---|
31 | # Updating the servermap in MODE_CHECK will stand a good chance |
---|
32 | # of finding all of the shares, and getting a good idea of |
---|
33 | # recoverability, etc, without verifying. |
---|
34 | u = ServermapUpdater(self._node, self._storage_broker, self._monitor, |
---|
35 | servermap, self.SERVERMAP_MODE, |
---|
36 | add_lease=add_lease) |
---|
37 | if self._history: |
---|
38 | self._history.notify_mapupdate(u.get_status()) |
---|
39 | d = u.update() |
---|
40 | d.addCallback(self._got_mapupdate_results) |
---|
41 | if verify: |
---|
42 | d.addCallback(self._verify_all_shares) |
---|
43 | d.addCallback(lambda res: servermap) |
---|
44 | d.addCallback(self._make_checker_results) |
---|
45 | return d |
---|
46 | |
---|
47 | def _got_mapupdate_results(self, servermap): |
---|
48 | # the file is healthy if there is exactly one recoverable version, it |
---|
49 | # has at least N distinct shares, and there are no unrecoverable |
---|
50 | # versions: all existing shares will be for the same version. |
---|
51 | self._monitor.raise_if_cancelled() |
---|
52 | self.best_version = None |
---|
53 | num_recoverable = len(servermap.recoverable_versions()) |
---|
54 | if num_recoverable: |
---|
55 | self.best_version = servermap.best_recoverable_version() |
---|
56 | |
---|
57 | # The file is unhealthy and needs to be repaired if: |
---|
58 | # - There are unrecoverable versions. |
---|
59 | if servermap.unrecoverable_versions(): |
---|
60 | self.need_repair = True |
---|
61 | # - There isn't a recoverable version. |
---|
62 | if num_recoverable != 1: |
---|
63 | self.need_repair = True |
---|
64 | # - The best recoverable version is missing some shares. |
---|
65 | if self.best_version: |
---|
66 | available_shares = servermap.shares_available() |
---|
67 | (num_distinct_shares, k, N) = available_shares[self.best_version] |
---|
68 | if num_distinct_shares < N: |
---|
69 | self.need_repair = True |
---|
70 | |
---|
71 | return servermap |
---|
72 | |
---|
73 | def _verify_all_shares(self, servermap): |
---|
74 | # read every byte of each share |
---|
75 | # |
---|
76 | # This logic is going to be very nearly the same as the |
---|
77 | # downloader. I bet we could pass the downloader a flag that |
---|
78 | # makes it do this, and piggyback onto that instead of |
---|
79 | # duplicating a bunch of code. |
---|
80 | # |
---|
81 | # Like: |
---|
82 | # r = Retrieve(blah, blah, blah, verify=True) |
---|
83 | # d = r.download() |
---|
84 | # (wait, wait, wait, d.callback) |
---|
85 | # |
---|
86 | # Then, when it has finished, we can check the servermap (which |
---|
87 | # we provided to Retrieve) to figure out which shares are bad, |
---|
88 | # since the Retrieve process will have updated the servermap as |
---|
89 | # it went along. |
---|
90 | # |
---|
91 | # By passing the verify=True flag to the constructor, we are |
---|
92 | # telling the downloader a few things. |
---|
93 | # |
---|
94 | # 1. It needs to download all N shares, not just K shares. |
---|
95 | # 2. It doesn't need to decrypt or decode the shares, only |
---|
96 | # verify them. |
---|
97 | if not self.best_version: |
---|
98 | return |
---|
99 | |
---|
100 | r = Retrieve(self._node, self._storage_broker, servermap, |
---|
101 | self.best_version, verify=True) |
---|
102 | d = r.download() |
---|
103 | d.addCallback(self._process_bad_shares) |
---|
104 | return d |
---|
105 | |
---|
106 | |
---|
107 | def _process_bad_shares(self, bad_shares): |
---|
108 | if bad_shares: |
---|
109 | self.need_repair = True |
---|
110 | self.bad_shares = bad_shares |
---|
111 | |
---|
112 | |
---|
113 | def _count_shares(self, smap, version): |
---|
114 | available_shares = smap.shares_available() |
---|
115 | (num_distinct_shares, k, N) = available_shares[version] |
---|
116 | counters = {} |
---|
117 | counters["count-shares-good"] = num_distinct_shares |
---|
118 | counters["count-shares-needed"] = k |
---|
119 | counters["count-shares-expected"] = N |
---|
120 | good_hosts = smap.all_servers_for_version(version) |
---|
121 | counters["count-good-share-hosts"] = len(good_hosts) |
---|
122 | vmap = smap.make_versionmap() |
---|
123 | counters["count-wrong-shares"] = sum([len(shares) |
---|
124 | for verinfo,shares in vmap.items() |
---|
125 | if verinfo != version]) |
---|
126 | |
---|
127 | return counters |
---|
128 | |
---|
129 | def _make_checker_results(self, smap): |
---|
130 | self._monitor.raise_if_cancelled() |
---|
131 | healthy = True |
---|
132 | report = [] |
---|
133 | summary = [] |
---|
134 | vmap = smap.make_versionmap() |
---|
135 | recoverable = smap.recoverable_versions() |
---|
136 | unrecoverable = smap.unrecoverable_versions() |
---|
137 | |
---|
138 | if recoverable: |
---|
139 | report.append("Recoverable Versions: " + |
---|
140 | "/".join(["%d*%s" % (len(vmap[v]), |
---|
141 | smap.summarize_version(v)) |
---|
142 | for v in recoverable])) |
---|
143 | if unrecoverable: |
---|
144 | report.append("Unrecoverable Versions: " + |
---|
145 | "/".join(["%d*%s" % (len(vmap[v]), |
---|
146 | smap.summarize_version(v)) |
---|
147 | for v in unrecoverable])) |
---|
148 | if smap.unrecoverable_versions(): |
---|
149 | healthy = False |
---|
150 | summary.append("some versions are unrecoverable") |
---|
151 | report.append("Unhealthy: some versions are unrecoverable") |
---|
152 | if len(recoverable) == 0: |
---|
153 | healthy = False |
---|
154 | summary.append("no versions are recoverable") |
---|
155 | report.append("Unhealthy: no versions are recoverable") |
---|
156 | if len(recoverable) > 1: |
---|
157 | healthy = False |
---|
158 | summary.append("multiple versions are recoverable") |
---|
159 | report.append("Unhealthy: there are multiple recoverable versions") |
---|
160 | |
---|
161 | if recoverable: |
---|
162 | best_version = smap.best_recoverable_version() |
---|
163 | report.append("Best Recoverable Version: " + |
---|
164 | smap.summarize_version(best_version)) |
---|
165 | counters = self._count_shares(smap, best_version) |
---|
166 | s = counters["count-shares-good"] |
---|
167 | k = counters["count-shares-needed"] |
---|
168 | N = counters["count-shares-expected"] |
---|
169 | if s < N: |
---|
170 | healthy = False |
---|
171 | report.append("Unhealthy: best version has only %d shares " |
---|
172 | "(encoding is %d-of-%d)" % (s, k, N)) |
---|
173 | summary.append("%d shares (enc %d-of-%d)" % (s, k, N)) |
---|
174 | elif unrecoverable: |
---|
175 | healthy = False |
---|
176 | # find a k and N from somewhere |
---|
177 | first = list(unrecoverable)[0] |
---|
178 | # not exactly the best version, but that doesn't matter too much |
---|
179 | counters = self._count_shares(smap, first) |
---|
180 | else: |
---|
181 | # couldn't find anything at all |
---|
182 | counters = { |
---|
183 | "count-shares-good": 0, |
---|
184 | "count-shares-needed": 3, # arbitrary defaults |
---|
185 | "count-shares-expected": 10, |
---|
186 | "count-good-share-hosts": 0, |
---|
187 | "count-wrong-shares": 0, |
---|
188 | } |
---|
189 | |
---|
190 | corrupt_share_locators = [] |
---|
191 | problems = [] |
---|
192 | if self.bad_shares: |
---|
193 | report.append("Corrupt Shares:") |
---|
194 | summary.append("Corrupt Shares:") |
---|
195 | for (server, shnum, f) in sorted(self.bad_shares, key=id): |
---|
196 | serverid = server.get_serverid() |
---|
197 | locator = (server, self._storage_index, shnum) |
---|
198 | corrupt_share_locators.append(locator) |
---|
199 | s = "%s-sh%d" % (ensure_str(server.get_name()), shnum) |
---|
200 | if f.check(CorruptShareError): |
---|
201 | ft = f.value.reason |
---|
202 | else: |
---|
203 | ft = str(f) |
---|
204 | report.append(" %s: %s" % (s, ft)) |
---|
205 | summary.append(s) |
---|
206 | p = (serverid, self._storage_index, shnum, f) |
---|
207 | problems.append(p) |
---|
208 | msg = ("CorruptShareError during mutable verify, " |
---|
209 | "serverid=%(serverid)s, si=%(si)s, shnum=%(shnum)d, " |
---|
210 | "where=%(where)s") |
---|
211 | log.msg(format=msg, serverid=server.get_name(), |
---|
212 | si=base32.b2a(self._storage_index), |
---|
213 | shnum=shnum, |
---|
214 | where=ft, |
---|
215 | level=log.WEIRD, umid="EkK8QA") |
---|
216 | |
---|
217 | sharemap = dictutil.DictOfSets() |
---|
218 | for verinfo in vmap: |
---|
219 | for (shnum, server, timestamp) in vmap[verinfo]: |
---|
220 | shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum) |
---|
221 | sharemap.add(shareid, server) |
---|
222 | if healthy: |
---|
223 | summary = "Healthy" |
---|
224 | else: |
---|
225 | summary = "Unhealthy: " + " ".join(summary) |
---|
226 | |
---|
227 | count_happiness = servers_of_happiness(sharemap) |
---|
228 | |
---|
229 | cr = CheckResults(from_string(self._node.get_uri()), |
---|
230 | self._storage_index, |
---|
231 | healthy=healthy, recoverable=bool(recoverable), |
---|
232 | count_happiness=count_happiness, |
---|
233 | count_shares_needed=counters["count-shares-needed"], |
---|
234 | count_shares_expected=counters["count-shares-expected"], |
---|
235 | count_shares_good=counters["count-shares-good"], |
---|
236 | count_good_share_hosts=counters["count-good-share-hosts"], |
---|
237 | count_recoverable_versions=len(recoverable), |
---|
238 | count_unrecoverable_versions=len(unrecoverable), |
---|
239 | servers_responding=list(smap.get_reachable_servers()), |
---|
240 | sharemap=sharemap, |
---|
241 | count_wrong_shares=counters["count-wrong-shares"], |
---|
242 | list_corrupt_shares=corrupt_share_locators, |
---|
243 | count_corrupt_shares=len(corrupt_share_locators), |
---|
244 | list_incompatible_shares=[], |
---|
245 | count_incompatible_shares=0, |
---|
246 | summary=summary, |
---|
247 | report=report, |
---|
248 | share_problems=problems, |
---|
249 | servermap=smap.copy()) |
---|
250 | return cr |
---|
251 | |
---|
252 | |
---|
253 | class MutableCheckAndRepairer(MutableChecker): |
---|
254 | SERVERMAP_MODE = MODE_WRITE # needed to get the privkey |
---|
255 | |
---|
256 | def __init__(self, node, storage_broker, history, monitor): |
---|
257 | MutableChecker.__init__(self, node, storage_broker, history, monitor) |
---|
258 | self.cr_results = CheckAndRepairResults(self._storage_index) |
---|
259 | self.need_repair = False |
---|
260 | |
---|
261 | def check(self, verify=False, add_lease=False): |
---|
262 | d = MutableChecker.check(self, verify, add_lease) |
---|
263 | d.addCallback(self._stash_pre_repair_results) |
---|
264 | d.addCallback(self._maybe_repair) |
---|
265 | d.addCallback(lambda res: self.cr_results) |
---|
266 | return d |
---|
267 | |
---|
268 | def _stash_pre_repair_results(self, pre_repair_results): |
---|
269 | self.cr_results.pre_repair_results = pre_repair_results |
---|
270 | return pre_repair_results |
---|
271 | |
---|
272 | def _maybe_repair(self, pre_repair_results): |
---|
273 | crr = self.cr_results |
---|
274 | self._monitor.raise_if_cancelled() |
---|
275 | if not self.need_repair: |
---|
276 | crr.post_repair_results = pre_repair_results |
---|
277 | return |
---|
278 | if self._node.is_readonly(): |
---|
279 | # ticket #625: we cannot yet repair read-only mutable files |
---|
280 | crr.post_repair_results = pre_repair_results |
---|
281 | crr.repair_attempted = False |
---|
282 | return |
---|
283 | crr.repair_attempted = True |
---|
284 | d = self._node.repair(pre_repair_results, monitor=self._monitor) |
---|
285 | def _repair_finished(rr): |
---|
286 | crr.repair_successful = rr.get_successful() |
---|
287 | crr.post_repair_results = self._make_checker_results(rr.servermap) |
---|
288 | crr.repair_results = rr # TODO? |
---|
289 | return |
---|
290 | def _repair_error(f): |
---|
291 | # I'm not sure if I want to pass through a failure or not. |
---|
292 | crr.repair_successful = False |
---|
293 | crr.repair_failure = f # TODO? |
---|
294 | #crr.post_repair_results = ?? |
---|
295 | return f |
---|
296 | d.addCallbacks(_repair_finished, _repair_error) |
---|
297 | return d |
---|