source: trunk/src/allmydata/mutable/checker.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 12.4 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from six import ensure_str
6
7from allmydata.uri import from_string
8from allmydata.util import base32, log, dictutil
9from allmydata.util.happinessutil import servers_of_happiness
10from allmydata.check_results import CheckAndRepairResults, CheckResults
11
12from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError
13from allmydata.mutable.servermap import ServerMap, ServermapUpdater
14from allmydata.mutable.retrieve import Retrieve # for verifying
15
16class MutableChecker(object):
17    SERVERMAP_MODE = MODE_CHECK
18
19    def __init__(self, node, storage_broker, history, monitor):
20        self._node = node
21        self._storage_broker = storage_broker
22        self._history = history
23        self._monitor = monitor
24        self.bad_shares = [] # list of (server,shnum,failure)
25        self._storage_index = self._node.get_storage_index()
26        self.need_repair = False
27        self.responded = set() # set of (binary) nodeids
28
29    def check(self, verify=False, add_lease=False):
30        servermap = ServerMap()
31        # Updating the servermap in MODE_CHECK will stand a good chance
32        # of finding all of the shares, and getting a good idea of
33        # recoverability, etc, without verifying.
34        u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
35                             servermap, self.SERVERMAP_MODE,
36                             add_lease=add_lease)
37        if self._history:
38            self._history.notify_mapupdate(u.get_status())
39        d = u.update()
40        d.addCallback(self._got_mapupdate_results)
41        if verify:
42            d.addCallback(self._verify_all_shares)
43        d.addCallback(lambda res: servermap)
44        d.addCallback(self._make_checker_results)
45        return d
46
47    def _got_mapupdate_results(self, servermap):
48        # the file is healthy if there is exactly one recoverable version, it
49        # has at least N distinct shares, and there are no unrecoverable
50        # versions: all existing shares will be for the same version.
51        self._monitor.raise_if_cancelled()
52        self.best_version = None
53        num_recoverable = len(servermap.recoverable_versions())
54        if num_recoverable:
55            self.best_version = servermap.best_recoverable_version()
56
57        # The file is unhealthy and needs to be repaired if:
58        # - There are unrecoverable versions.
59        if servermap.unrecoverable_versions():
60            self.need_repair = True
61        # - There isn't a recoverable version.
62        if num_recoverable != 1:
63            self.need_repair = True
64        # - The best recoverable version is missing some shares.
65        if self.best_version:
66            available_shares = servermap.shares_available()
67            (num_distinct_shares, k, N) = available_shares[self.best_version]
68            if num_distinct_shares < N:
69                self.need_repair = True
70
71        return servermap
72
73    def _verify_all_shares(self, servermap):
74        # read every byte of each share
75        #
76        # This logic is going to be very nearly the same as the
77        # downloader. I bet we could pass the downloader a flag that
78        # makes it do this, and piggyback onto that instead of
79        # duplicating a bunch of code.
80        #
81        # Like:
82        #  r = Retrieve(blah, blah, blah, verify=True)
83        #  d = r.download()
84        #  (wait, wait, wait, d.callback)
85        #
86        #  Then, when it has finished, we can check the servermap (which
87        #  we provided to Retrieve) to figure out which shares are bad,
88        #  since the Retrieve process will have updated the servermap as
89        #  it went along.
90        #
91        #  By passing the verify=True flag to the constructor, we are
92        #  telling the downloader a few things.
93        #
94        #  1. It needs to download all N shares, not just K shares.
95        #  2. It doesn't need to decrypt or decode the shares, only
96        #     verify them.
97        if not self.best_version:
98            return
99
100        r = Retrieve(self._node, self._storage_broker, servermap,
101                     self.best_version, verify=True)
102        d = r.download()
103        d.addCallback(self._process_bad_shares)
104        return d
105
106
107    def _process_bad_shares(self, bad_shares):
108        if bad_shares:
109            self.need_repair = True
110        self.bad_shares = bad_shares
111
112
113    def _count_shares(self, smap, version):
114        available_shares = smap.shares_available()
115        (num_distinct_shares, k, N) = available_shares[version]
116        counters = {}
117        counters["count-shares-good"] = num_distinct_shares
118        counters["count-shares-needed"] = k
119        counters["count-shares-expected"] = N
120        good_hosts = smap.all_servers_for_version(version)
121        counters["count-good-share-hosts"] = len(good_hosts)
122        vmap = smap.make_versionmap()
123        counters["count-wrong-shares"] = sum([len(shares)
124                                          for verinfo,shares in vmap.items()
125                                          if verinfo != version])
126
127        return counters
128
129    def _make_checker_results(self, smap):
130        self._monitor.raise_if_cancelled()
131        healthy = True
132        report = []
133        summary = []
134        vmap = smap.make_versionmap()
135        recoverable = smap.recoverable_versions()
136        unrecoverable = smap.unrecoverable_versions()
137
138        if recoverable:
139            report.append("Recoverable Versions: " +
140                          "/".join(["%d*%s" % (len(vmap[v]),
141                                               smap.summarize_version(v))
142                                    for v in recoverable]))
143        if unrecoverable:
144            report.append("Unrecoverable Versions: " +
145                          "/".join(["%d*%s" % (len(vmap[v]),
146                                               smap.summarize_version(v))
147                                    for v in unrecoverable]))
148        if smap.unrecoverable_versions():
149            healthy = False
150            summary.append("some versions are unrecoverable")
151            report.append("Unhealthy: some versions are unrecoverable")
152        if len(recoverable) == 0:
153            healthy = False
154            summary.append("no versions are recoverable")
155            report.append("Unhealthy: no versions are recoverable")
156        if len(recoverable) > 1:
157            healthy = False
158            summary.append("multiple versions are recoverable")
159            report.append("Unhealthy: there are multiple recoverable versions")
160
161        if recoverable:
162            best_version = smap.best_recoverable_version()
163            report.append("Best Recoverable Version: " +
164                          smap.summarize_version(best_version))
165            counters = self._count_shares(smap, best_version)
166            s = counters["count-shares-good"]
167            k = counters["count-shares-needed"]
168            N = counters["count-shares-expected"]
169            if s < N:
170                healthy = False
171                report.append("Unhealthy: best version has only %d shares "
172                              "(encoding is %d-of-%d)" % (s, k, N))
173                summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
174        elif unrecoverable:
175            healthy = False
176            # find a k and N from somewhere
177            first = list(unrecoverable)[0]
178            # not exactly the best version, but that doesn't matter too much
179            counters = self._count_shares(smap, first)
180        else:
181            # couldn't find anything at all
182            counters = {
183                "count-shares-good": 0,
184                "count-shares-needed": 3, # arbitrary defaults
185                "count-shares-expected": 10,
186                "count-good-share-hosts": 0,
187                "count-wrong-shares": 0,
188                }
189
190        corrupt_share_locators = []
191        problems = []
192        if self.bad_shares:
193            report.append("Corrupt Shares:")
194            summary.append("Corrupt Shares:")
195        for (server, shnum, f) in sorted(self.bad_shares, key=id):
196            serverid = server.get_serverid()
197            locator = (server, self._storage_index, shnum)
198            corrupt_share_locators.append(locator)
199            s = "%s-sh%d" % (ensure_str(server.get_name()), shnum)
200            if f.check(CorruptShareError):
201                ft = f.value.reason
202            else:
203                ft = str(f)
204            report.append(" %s: %s" % (s, ft))
205            summary.append(s)
206            p = (serverid, self._storage_index, shnum, f)
207            problems.append(p)
208            msg = ("CorruptShareError during mutable verify, "
209                   "serverid=%(serverid)s, si=%(si)s, shnum=%(shnum)d, "
210                   "where=%(where)s")
211            log.msg(format=msg, serverid=server.get_name(),
212                    si=base32.b2a(self._storage_index),
213                    shnum=shnum,
214                    where=ft,
215                    level=log.WEIRD, umid="EkK8QA")
216
217        sharemap = dictutil.DictOfSets()
218        for verinfo in vmap:
219            for (shnum, server, timestamp) in vmap[verinfo]:
220                shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
221                sharemap.add(shareid, server)
222        if healthy:
223            summary = "Healthy"
224        else:
225            summary = "Unhealthy: " + " ".join(summary)
226
227        count_happiness = servers_of_happiness(sharemap)
228
229        cr = CheckResults(from_string(self._node.get_uri()),
230                          self._storage_index,
231                          healthy=healthy, recoverable=bool(recoverable),
232                          count_happiness=count_happiness,
233                          count_shares_needed=counters["count-shares-needed"],
234                          count_shares_expected=counters["count-shares-expected"],
235                          count_shares_good=counters["count-shares-good"],
236                          count_good_share_hosts=counters["count-good-share-hosts"],
237                          count_recoverable_versions=len(recoverable),
238                          count_unrecoverable_versions=len(unrecoverable),
239                          servers_responding=list(smap.get_reachable_servers()),
240                          sharemap=sharemap,
241                          count_wrong_shares=counters["count-wrong-shares"],
242                          list_corrupt_shares=corrupt_share_locators,
243                          count_corrupt_shares=len(corrupt_share_locators),
244                          list_incompatible_shares=[],
245                          count_incompatible_shares=0,
246                          summary=summary,
247                          report=report,
248                          share_problems=problems,
249                          servermap=smap.copy())
250        return cr
251
252
253class MutableCheckAndRepairer(MutableChecker):
254    SERVERMAP_MODE = MODE_WRITE # needed to get the privkey
255
256    def __init__(self, node, storage_broker, history, monitor):
257        MutableChecker.__init__(self, node, storage_broker, history, monitor)
258        self.cr_results = CheckAndRepairResults(self._storage_index)
259        self.need_repair = False
260
261    def check(self, verify=False, add_lease=False):
262        d = MutableChecker.check(self, verify, add_lease)
263        d.addCallback(self._stash_pre_repair_results)
264        d.addCallback(self._maybe_repair)
265        d.addCallback(lambda res: self.cr_results)
266        return d
267
268    def _stash_pre_repair_results(self, pre_repair_results):
269        self.cr_results.pre_repair_results = pre_repair_results
270        return pre_repair_results
271
272    def _maybe_repair(self, pre_repair_results):
273        crr = self.cr_results
274        self._monitor.raise_if_cancelled()
275        if not self.need_repair:
276            crr.post_repair_results = pre_repair_results
277            return
278        if self._node.is_readonly():
279            # ticket #625: we cannot yet repair read-only mutable files
280            crr.post_repair_results = pre_repair_results
281            crr.repair_attempted = False
282            return
283        crr.repair_attempted = True
284        d = self._node.repair(pre_repair_results, monitor=self._monitor)
285        def _repair_finished(rr):
286            crr.repair_successful = rr.get_successful()
287            crr.post_repair_results = self._make_checker_results(rr.servermap)
288            crr.repair_results = rr # TODO?
289            return
290        def _repair_error(f):
291            # I'm not sure if I want to pass through a failure or not.
292            crr.repair_successful = False
293            crr.repair_failure = f # TODO?
294            #crr.post_repair_results = ??
295            return f
296        d.addCallbacks(_repair_finished, _repair_error)
297        return d
Note: See TracBrowser for help on using the repository browser.