source: trunk/src/allmydata/test/test_hung_server.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 14.7 KB
Line 
1# -*- coding: utf-8 -*-
2
3"""
4Ported to Python 3.
5"""
6
7import os, shutil
8from twisted.trial import unittest
9from twisted.internet import defer
10from allmydata import uri
11from allmydata.util.consumer import download_to_data
12from allmydata.immutable import upload
13from allmydata.mutable.common import UnrecoverableFileError
14from allmydata.mutable.publish import MutableData
15from allmydata.storage.common import storage_index_to_dir
16from allmydata.test.no_network import GridTestMixin
17from allmydata.test.common import ShouldFailMixin
18from allmydata.util.pollmixin import PollMixin
19from allmydata.interfaces import NotEnoughSharesError
20
21immutable_plaintext = b"data" * 10000
22mutable_plaintext = b"muta" * 10000
23
24class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
25                             unittest.TestCase):
26    def _break(self, servers):
27        for (id, ss) in servers:
28            self.g.break_server(id)
29
30    def _hang(self, servers, **kwargs):
31        for (id, ss) in servers:
32            self.g.hang_server(id, **kwargs)
33
34    def _unhang(self, servers, **kwargs):
35        for (id, ss) in servers:
36            self.g.unhang_server(id, **kwargs)
37
38    def _hang_shares(self, shnums, **kwargs):
39        # hang all servers who are holding the given shares
40        hung_serverids = set()
41        for (i_shnum, i_serverid, i_sharefile) in self.shares:
42            if i_shnum in shnums:
43                if i_serverid not in hung_serverids:
44                    self.g.hang_server(i_serverid, **kwargs)
45                    hung_serverids.add(i_serverid)
46
47    def _delete_all_shares_from(self, servers):
48        serverids = [id for (id, ss) in servers]
49        for (i_shnum, i_serverid, i_sharefile) in self.shares:
50            if i_serverid in serverids:
51                os.unlink(i_sharefile)
52
53    def _corrupt_all_shares_in(self, servers, corruptor_func):
54        serverids = [id for (id, ss) in servers]
55        for (i_shnum, i_serverid, i_sharefile) in self.shares:
56            if i_serverid in serverids:
57                self._corrupt_share((i_shnum, i_sharefile), corruptor_func)
58
59    def _copy_all_shares_from(self, from_servers, to_server):
60        serverids = [id for (id, ss) in from_servers]
61        for (i_shnum, i_serverid, i_sharefile) in self.shares:
62            if i_serverid in serverids:
63                self._copy_share((i_shnum, i_sharefile), to_server)
64
65    def _copy_share(self, share, to_server):
66        (sharenum, sharefile) = share
67        (id, ss) = to_server
68        shares_dir = os.path.join(ss.original._server.storedir, "shares")
69        si = uri.from_string(self.uri).get_storage_index()
70        si_dir = os.path.join(shares_dir, storage_index_to_dir(si))
71        if not os.path.exists(si_dir):
72            os.makedirs(si_dir)
73        new_sharefile = os.path.join(si_dir, str(sharenum))
74        shutil.copy(sharefile, new_sharefile)
75        self.shares = self.find_uri_shares(self.uri)
76        # Make sure that the storage server has the share.
77        self.failUnless((sharenum, ss.original._server.my_nodeid, new_sharefile)
78                        in self.shares)
79
80    def _corrupt_share(self, share, corruptor_func):
81        (sharenum, sharefile) = share
82        data = open(sharefile, "rb").read()
83        newdata = corruptor_func(data)
84        os.unlink(sharefile)
85        wf = open(sharefile, "wb")
86        wf.write(newdata)
87        wf.close()
88
89    def _set_up(self, mutable, testdir, num_clients=1, num_servers=10):
90        self.mutable = mutable
91        if mutable:
92            self.basedir = "hung_server/mutable_" + testdir
93        else:
94            self.basedir = "hung_server/immutable_" + testdir
95
96        self.set_up_grid(num_clients=num_clients, num_servers=num_servers)
97
98        self.c0 = self.g.clients[0]
99        nm = self.c0.nodemaker
100        self.servers = sorted([(s.get_serverid(), s.get_rref())
101                               for s in nm.storage_broker.get_connected_servers()])
102        self.servers = self.servers[5:] + self.servers[:5]
103
104        if mutable:
105            uploadable = MutableData(mutable_plaintext)
106            d = nm.create_mutable_file(uploadable)
107            def _uploaded_mutable(node):
108                self.uri = node.get_uri()
109                self.shares = self.find_uri_shares(self.uri)
110            d.addCallback(_uploaded_mutable)
111        else:
112            data = upload.Data(immutable_plaintext, convergence=b"")
113            d = self.c0.upload(data)
114            def _uploaded_immutable(upload_res):
115                self.uri = upload_res.get_uri()
116                self.shares = self.find_uri_shares(self.uri)
117            d.addCallback(_uploaded_immutable)
118        return d
119
120    def _start_download(self):
121        n = self.c0.create_node_from_uri(self.uri)
122        if self.mutable:
123            d = n.download_best_version()
124        else:
125            d = download_to_data(n)
126        return d
127
128    def _wait_for_data(self, n):
129        if self.mutable:
130            d = n.download_best_version()
131        else:
132            d = download_to_data(n)
133        return d
134
135    def _check(self, resultingdata):
136        if self.mutable:
137            self.failUnlessEqual(resultingdata, mutable_plaintext)
138        else:
139            self.failUnlessEqual(resultingdata, immutable_plaintext)
140
141    def _download_and_check(self):
142        d = self._start_download()
143        d.addCallback(self._check)
144        return d
145
146    def _should_fail_download(self):
147        if self.mutable:
148            return self.shouldFail(UnrecoverableFileError, self.basedir,
149                                   "no recoverable versions",
150                                   self._download_and_check)
151        else:
152            return self.shouldFail(NotEnoughSharesError, self.basedir,
153                                   "ran out of shares",
154                                   self._download_and_check)
155
156
157    def test_10_good_sanity_check(self):
158        d = defer.succeed(None)
159        for mutable in [False, True]:
160            d.addCallback(lambda ign, mutable=mutable: self._set_up(mutable, "test_10_good_sanity_check"))
161            d.addCallback(lambda ign: self._download_and_check())
162        return d
163
164    def test_10_good_copied_share(self):
165        d = defer.succeed(None)
166        for mutable in [False, True]:
167            d.addCallback(lambda ign, mutable=mutable: self._set_up(mutable, "test_10_good_copied_share"))
168            d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0]))
169            d.addCallback(lambda ign: self._download_and_check())
170            return d
171
172    def test_3_good_7_noshares(self):
173        d = defer.succeed(None)
174        for mutable in [False, True]:
175            d.addCallback(lambda ign, mutable=mutable: self._set_up(mutable, "test_3_good_7_noshares"))
176            d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[3:]))
177            d.addCallback(lambda ign: self._download_and_check())
178        return d
179
180    def test_2_good_8_broken_fail(self):
181        d = defer.succeed(None)
182        for mutable in [False, True]:
183            d.addCallback(lambda ign, mutable=mutable: self._set_up(mutable, "test_2_good_8_broken_fail"))
184            d.addCallback(lambda ign: self._break(self.servers[2:]))
185            d.addCallback(lambda ign: self._should_fail_download())
186        return d
187
188    def test_2_good_8_noshares_fail(self):
189        d = defer.succeed(None)
190        for mutable in [False, True]:
191            d.addCallback(lambda ign, mutable=mutable: self._set_up(mutable, "test_2_good_8_noshares_fail"))
192            d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[2:]))
193            d.addCallback(lambda ign: self._should_fail_download())
194        return d
195
196    def test_2_good_8_broken_copied_share(self):
197        d = defer.succeed(None)
198        for mutable in [False, True]:
199            d.addCallback(lambda ign, mutable=mutable: self._set_up(mutable, "test_2_good_8_broken_copied_share"))
200            d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0]))
201            d.addCallback(lambda ign: self._break(self.servers[2:]))
202            d.addCallback(lambda ign: self._download_and_check())
203        return d
204
205    def test_2_good_8_broken_duplicate_share_fail(self):
206        d = defer.succeed(None)
207        for mutable in [False, True]:
208            d.addCallback(lambda ign, mutable=mutable: self._set_up(mutable, "test_2_good_8_broken_duplicate_share_fail"))
209            d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[1:2], self.servers[0]))
210            d.addCallback(lambda ign: self._break(self.servers[2:]))
211            d.addCallback(lambda ign: self._should_fail_download())
212        return d
213
214    def test_3_good_7_hung_immutable(self):
215        d = defer.succeed(None)
216        d.addCallback(lambda ign: self._set_up(False, "test_3_good_7_hung"))
217        d.addCallback(lambda ign: self._hang(self.servers[3:]))
218        d.addCallback(lambda ign: self._download_and_check())
219        return d
220
221    def test_5_overdue_immutable(self):
222        # restrict the ShareFinder to only allow 5 outstanding requests, and
223        # arrange for the first 5 servers to hang. Then trigger the OVERDUE
224        # timers (simulating 10 seconds passed), at which point the
225        # ShareFinder should send additional queries and finish the download
226        # quickly. If we didn't have OVERDUE timers, this test would fail by
227        # timing out.
228        done = []
229        d = self._set_up(False, "test_5_overdue_immutable")
230        def _reduce_max_outstanding_requests_and_download(ign):
231            # we need to hang the first 5 servers, so we have to
232            # figure out where the shares were placed.
233            si = uri.from_string(self.uri).get_storage_index()
234            placed = self.c0.storage_broker.get_servers_for_psi(si)
235            self._hang([(s.get_serverid(), s) for s in placed[:5]])
236
237            n = self.c0.create_node_from_uri(self.uri)
238            n._cnode._maybe_create_download_node()
239            self._sf = n._cnode._node._sharefinder
240            self._sf.max_outstanding_requests = 5
241            self._sf.OVERDUE_TIMEOUT = 1000.0
242            d2 = download_to_data(n)
243            # start download, but don't wait for it to complete yet
244            def _done(res):
245                done.append(res) # we will poll for this later
246            d2.addBoth(_done)
247        d.addCallback(_reduce_max_outstanding_requests_and_download)
248        from foolscap.eventual import fireEventually, flushEventualQueue
249        # wait here a while
250        d.addCallback(lambda res: fireEventually(res))
251        d.addCallback(lambda res: flushEventualQueue())
252        d.addCallback(lambda ign: self.failIf(done))
253        def _check_waiting(ign):
254            # all the share requests should now be stuck waiting
255            self.failUnlessEqual(len(self._sf.pending_requests), 5)
256            # but none should be marked as OVERDUE until the timers expire
257            self.failUnlessEqual(len(self._sf.overdue_requests), 0)
258        d.addCallback(_check_waiting)
259        def _mark_overdue(ign):
260            # declare four requests overdue, allowing new requests to take
261            # their place, and leaving one stuck. The finder will keep
262            # sending requests until there are 5 non-overdue ones
263            # outstanding, at which point we'll have 4 OVERDUE, 1
264            # stuck-but-not-overdue, and 4 live requests. All 4 live requests
265            # will retire before the download is complete and the ShareFinder
266            # is shut off. That will leave 4 OVERDUE and 1
267            # stuck-but-not-overdue, for a total of 5 requests in in
268            # _sf.pending_requests
269            for t in list(self._sf.overdue_timers.values())[:4]:
270                t.reset(-1.0)
271            # the timers ought to fire before the eventual-send does
272            return fireEventually()
273        d.addCallback(_mark_overdue)
274        def _we_are_done():
275            return bool(done)
276        d.addCallback(lambda ign: self.poll(_we_are_done))
277        def _check_done(ign):
278            self.failUnlessEqual(done, [immutable_plaintext])
279            self.failUnlessEqual(len(self._sf.pending_requests), 5)
280            self.failUnlessEqual(len(self._sf.overdue_requests), 4)
281        d.addCallback(_check_done)
282        return d
283
284    def test_2_good_8_hung_then_1_recovers_immutable(self):
285        d = defer.succeed(None)
286        d.addCallback(lambda ign: self._set_up(False, "test_2_good_8_hung_then_1_recovers"))
287        d.addCallback(lambda ign: self._hang(self.servers[2:3]))
288        d.addCallback(lambda ign: self._hang(self.servers[3:]))
289        d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
290        d.addCallback(lambda ign: self._download_and_check())
291        return d
292
293    def test_2_good_8_hung_then_1_recovers_with_2_shares_immutable(self):
294        d = defer.succeed(None)
295        d.addCallback(lambda ign: self._set_up(False, "test_2_good_8_hung_then_1_recovers_with_2_shares"))
296        d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))
297        d.addCallback(lambda ign: self._hang(self.servers[2:3]))
298        d.addCallback(lambda ign: self._hang(self.servers[3:]))
299        d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
300        d.addCallback(lambda ign: self._download_and_check())
301        return d
302
303    # The tests below do not currently pass for mutable files. The
304    # mutable-file downloader does not yet handle hung servers, and the tests
305    # hang forever (hence the use of SkipTest rather than .todo)
306
307    def test_3_good_7_hung_mutable(self):
308        raise unittest.SkipTest("still broken")
309        d = defer.succeed(None)
310        d.addCallback(lambda ign: self._set_up(True, "test_3_good_7_hung"))
311        d.addCallback(lambda ign: self._hang(self.servers[3:]))
312        d.addCallback(lambda ign: self._download_and_check())
313        return d
314
315    def test_2_good_8_hung_then_1_recovers_mutable(self):
316        raise unittest.SkipTest("still broken")
317        d = defer.succeed(None)
318        d.addCallback(lambda ign: self._set_up(True, "test_2_good_8_hung_then_1_recovers"))
319        d.addCallback(lambda ign: self._hang(self.servers[2:3]))
320        d.addCallback(lambda ign: self._hang(self.servers[3:]))
321        d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
322        d.addCallback(lambda ign: self._download_and_check())
323        return d
324
325    def test_2_good_8_hung_then_1_recovers_with_2_shares_mutable(self):
326        raise unittest.SkipTest("still broken")
327        d = defer.succeed(None)
328        d.addCallback(lambda ign: self._set_up(True, "test_2_good_8_hung_then_1_recovers_with_2_shares"))
329        d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))
330        d.addCallback(lambda ign: self._hang(self.servers[2:3]))
331        d.addCallback(lambda ign: self._hang(self.servers[3:]))
332        d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
333        d.addCallback(lambda ign: self._download_and_check())
334        return d
Note: See TracBrowser for help on using the repository browser.