source: trunk/src/allmydata/test/test_helper.py

Last change on this file was 0bdea02, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-03-24T19:13:20Z

Fix lint.

  • Property mode set to 100644
File size: 18.1 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import annotations
5
6import os
7from struct import (
8    pack,
9)
10from functools import (
11    partial,
12)
13
14import attr
15
16from twisted.internet import defer
17from twisted.trial import unittest
18from twisted.application import service
19
20from foolscap.api import Tub, fireEventually, flushEventualQueue
21
22from eliot.twisted import (
23    inline_callbacks,
24)
25
26from allmydata.introducer.client import IntroducerClient
27from allmydata.crypto import aes
28from allmydata.storage.server import (
29    si_b2a,
30    StorageServer,
31    FoolscapStorageServer,
32)
33from allmydata.storage_client import StorageFarmBroker
34from allmydata.immutable.layout import (
35    make_write_bucket_proxy,
36)
37from allmydata.immutable import offloaded, upload
38from allmydata import uri, client
39from allmydata.util import hashutil, fileutil, mathutil, dictutil
40
41from .no_network import (
42    NoNetworkServer,
43    LocalWrapper,
44    fireNow,
45)
46from .common import (
47    EMPTY_CLIENT_CONFIG,
48    SyncTestCase,
49)
50
51from testtools.matchers import (
52    Equals,
53    MatchesListwise,
54    IsInstance,
55)
56from testtools.twistedsupport import (
57    succeeded,
58)
59
60MiB = 1024*1024
61
62DATA = b"I need help\n" * 1000
63
64class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
65    def start_encrypted(self, eu):
66        d = eu.get_size()
67        def _got_size(size):
68            d2 = eu.get_all_encoding_parameters()
69            def _got_parms(parms):
70                # just pretend we did the upload
71                needed_shares, happy, total_shares, segsize = parms
72                ueb_data = {"needed_shares": needed_shares,
73                            "total_shares": total_shares,
74                            "segment_size": segsize,
75                            "size": size,
76                            }
77                ueb_hash = b"fake"
78                v = uri.CHKFileVerifierURI(self._storage_index, b"x"*32,
79                                           needed_shares, total_shares, size)
80                _UR = upload.UploadResults
81                ur = _UR(file_size=size,
82                         ciphertext_fetched=0,
83                         preexisting_shares=0,
84                         pushed_shares=total_shares,
85                         sharemap={},
86                         servermap={},
87                         timings={},
88                         uri_extension_data=ueb_data,
89                         uri_extension_hash=ueb_hash,
90                         verifycapstr=v.to_string())
91                self._upload_status.set_results(ur)
92                return ur
93            d2.addCallback(_got_parms)
94            return d2
95        d.addCallback(_got_size)
96        return d
97
98@attr.s
99class FakeCHKCheckerAndUEBFetcher(object):
100    """
101    A fake of ``CHKCheckerAndUEBFetcher`` which hard-codes some check result.
102    """
103    peer_getter = attr.ib()
104    storage_index = attr.ib()
105    logparent = attr.ib()
106
107    _sharemap = attr.ib()
108    _ueb_data = attr.ib()
109
110    @property
111    def _ueb_hash(self):
112        return hashutil.uri_extension_hash(
113            uri.pack_extension(self._ueb_data),
114        )
115
116    def check(self):
117        return defer.succeed((
118            self._sharemap,
119            self._ueb_data,
120            self._ueb_hash,
121        ))
122
123class FakeClient(service.MultiService):
124    introducer_clients : list[IntroducerClient] = []
125    DEFAULT_ENCODING_PARAMETERS = {"k":25,
126                                   "happy": 75,
127                                   "n": 100,
128                                   "max_segment_size": 1*MiB,
129                                   }
130
131    def get_encoding_parameters(self):
132        return self.DEFAULT_ENCODING_PARAMETERS
133    def get_storage_broker(self):
134        return self.storage_broker
135
136def flush_but_dont_ignore(res):
137    d = flushEventualQueue()
138    def _done(ignored):
139        return res
140    d.addCallback(_done)
141    return d
142
143def wait_a_few_turns(ignored=None):
144    d = fireEventually()
145    d.addCallback(fireEventually)
146    d.addCallback(fireEventually)
147    d.addCallback(fireEventually)
148    d.addCallback(fireEventually)
149    d.addCallback(fireEventually)
150    return d
151
152def upload_data(uploader, data, convergence):
153    u = upload.Data(data, convergence=convergence)
154    return uploader.upload(u)
155
156
157def make_uploader(helper_furl, parent, override_name=None):
158    """
159    Make an ``upload.Uploader`` service pointed at the given helper and with
160    the given service parent.
161
162    :param bytes helper_furl: The Foolscap URL of the upload helper.
163
164    :param IServiceCollection parent: A parent to assign to the new uploader.
165
166    :param str override_name: If not ``None``, a new name for the uploader
167        service.  Multiple services cannot coexist with the same name.
168    """
169    u = upload.Uploader(helper_furl)
170    if override_name is not None:
171        u.name = override_name
172    u.setServiceParent(parent)
173    return u
174
175
176class AssistedUpload(unittest.TestCase):
177    def setUp(self):
178        self.tub = t = Tub()
179        t.setOption("expose-remote-exception-types", False)
180        self.s = FakeClient()
181        self.s.storage_broker = StorageFarmBroker(
182            True,
183            lambda h: self.tub,
184            EMPTY_CLIENT_CONFIG,
185        )
186        self.s.secret_holder = client.SecretHolder(b"lease secret", b"converge")
187        self.s.startService()
188
189        t.setServiceParent(self.s)
190        self.s.tub = t
191        # we never actually use this for network traffic, so it can use a
192        # bogus host/port
193        t.setLocation(b"bogus:1234")
194
195    def setUpHelper(self, basedir, chk_upload=CHKUploadHelper_fake, chk_checker=None):
196        fileutil.make_dirs(basedir)
197        self.helper = offloaded.Helper(
198            basedir,
199            self.s.storage_broker,
200            self.s.secret_holder,
201            None,
202            None,
203        )
204        if chk_upload is not None:
205            self.helper.chk_upload = chk_upload
206        if chk_checker is not None:
207            self.helper.chk_checker = chk_checker
208        self.helper_furl = self.tub.registerReference(self.helper)
209
210    def tearDown(self):
211        d = self.s.stopService()
212        d.addCallback(fireEventually)
213        d.addBoth(flush_but_dont_ignore)
214        return d
215
216    def test_one(self):
217        """
218        Some data that has never been uploaded before can be uploaded in CHK
219        format using the ``RIHelper`` provider and ``Uploader.upload``.
220        """
221        self.basedir = "helper/AssistedUpload/test_one"
222        self.setUpHelper(self.basedir)
223        u = make_uploader(self.helper_furl, self.s)
224
225        d = wait_a_few_turns()
226
227        def _ready(res):
228            self.assertTrue(
229                u._helper,
230                "Expected uploader to have a helper reference, had {} instead.".format(
231                    u._helper,
232                ),
233            )
234            return upload_data(u, DATA, convergence=b"some convergence string")
235        d.addCallback(_ready)
236
237        def _uploaded(results):
238            the_uri = results.get_uri()
239            self.assertIn(b"CHK", the_uri)
240            self.assertNotEqual(
241                results.get_pushed_shares(),
242                0,
243            )
244        d.addCallback(_uploaded)
245
246        def _check_empty(res):
247            # Make sure the intermediate artifacts aren't left lying around.
248            files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
249            self.assertEqual(files, [])
250            files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
251            self.assertEqual(files, [])
252        d.addCallback(_check_empty)
253
254        return d
255
256    @inline_callbacks
257    def test_concurrent(self):
258        """
259        The same data can be uploaded by more than one ``Uploader`` at a time.
260        """
261        self.basedir = "helper/AssistedUpload/test_concurrent"
262        self.setUpHelper(self.basedir)
263        u1 = make_uploader(self.helper_furl, self.s, "u1")
264        u2 = make_uploader(self.helper_furl, self.s, "u2")
265
266        yield wait_a_few_turns()
267
268        for u in [u1, u2]:
269            self.assertTrue(
270                u._helper,
271                "Expected uploader to have a helper reference, had {} instead.".format(
272                    u._helper,
273                ),
274            )
275
276        uploads = list(
277            upload_data(u, DATA, convergence=b"some convergence string")
278            for u
279            in [u1, u2]
280        )
281
282        result1, result2 = yield defer.gatherResults(uploads)
283
284        self.assertEqual(
285            result1.get_uri(),
286            result2.get_uri(),
287        )
288        # It would be really cool to assert that result1.get_pushed_shares() +
289        # result2.get_pushed_shares() == total_shares here.  However, we're
290        # faking too much for that to be meaningful here.  Also it doesn't
291        # hold because we don't actually push _anything_, we just lie about
292        # having pushed stuff.
293
294    def test_previous_upload_failed(self):
295        self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
296        self.setUpHelper(self.basedir)
297
298        # we want to make sure that an upload which fails (leaving the
299        # ciphertext in the CHK_encoding/ directory) does not prevent a later
300        # attempt to upload that file from working. We simulate this by
301        # populating the directory manually. The hardest part is guessing the
302        # storage index.
303
304        k = FakeClient.DEFAULT_ENCODING_PARAMETERS["k"]
305        n = FakeClient.DEFAULT_ENCODING_PARAMETERS["n"]
306        max_segsize = FakeClient.DEFAULT_ENCODING_PARAMETERS["max_segment_size"]
307        segsize = min(max_segsize, len(DATA))
308        # this must be a multiple of 'required_shares'==k
309        segsize = mathutil.next_multiple(segsize, k)
310
311        key = hashutil.convergence_hash(k, n, segsize, DATA, b"test convergence string")
312        assert len(key) == 16
313        encryptor = aes.create_encryptor(key)
314        SI = hashutil.storage_index_hash(key)
315        SI_s = str(si_b2a(SI), "utf-8")
316        encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
317        f = open(encfile, "wb")
318        f.write(aes.encrypt_data(encryptor, DATA))
319        f.close()
320
321        u = make_uploader(self.helper_furl, self.s)
322
323        d = wait_a_few_turns()
324
325        def _ready(res):
326            assert u._helper
327            return upload_data(u, DATA, convergence=b"test convergence string")
328        d.addCallback(_ready)
329        def _uploaded(results):
330            the_uri = results.get_uri()
331            assert b"CHK" in the_uri
332        d.addCallback(_uploaded)
333
334        def _check_empty(res):
335            files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
336            self.failUnlessEqual(files, [])
337            files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
338            self.failUnlessEqual(files, [])
339        d.addCallback(_check_empty)
340
341        return d
342
343    @inline_callbacks
344    def test_already_uploaded(self):
345        """
346        If enough shares to satisfy the needed parameter already exist, the upload
347        succeeds without pushing any shares.
348        """
349        params = FakeClient.DEFAULT_ENCODING_PARAMETERS
350        chk_checker = partial(
351            FakeCHKCheckerAndUEBFetcher,
352            sharemap=dictutil.DictOfSets({
353                0: {b"server0"},
354                1: {b"server1"},
355            }),
356            ueb_data={
357                "size": len(DATA),
358                "segment_size": min(params["max_segment_size"], len(DATA)),
359                "needed_shares": params["k"],
360                "total_shares": params["n"],
361            },
362        )
363        self.basedir = "helper/AssistedUpload/test_already_uploaded"
364        self.setUpHelper(
365            self.basedir,
366            chk_checker=chk_checker,
367        )
368        u = make_uploader(self.helper_furl, self.s)
369
370        yield wait_a_few_turns()
371
372        assert u._helper
373
374        results = yield upload_data(u, DATA, convergence=b"some convergence string")
375        the_uri = results.get_uri()
376        assert b"CHK" in the_uri
377
378        files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
379        self.failUnlessEqual(files, [])
380        files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
381        self.failUnlessEqual(files, [])
382
383        self.assertEqual(
384            results.get_pushed_shares(),
385            0,
386        )
387
388
389class CHKCheckerAndUEBFetcherTests(SyncTestCase):
390    """
391    Tests for ``CHKCheckerAndUEBFetcher``.
392    """
393    def test_check_no_peers(self):
394        """
395        If the supplied "peer getter" returns no peers then
396        ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires
397        with ``False``.
398        """
399        storage_index = b"a" * 16
400        peers = {storage_index: []}
401        caf = offloaded.CHKCheckerAndUEBFetcher(
402            peers.get,
403            storage_index,
404            None,
405        )
406        self.assertThat(
407            caf.check(),
408            succeeded(Equals(False)),
409        )
410
411    @inline_callbacks
412    def test_check_ueb_unavailable(self):
413        """
414        If the UEB cannot be read from any of the peers supplied by the "peer
415        getter" then ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred``
416        that fires with ``False``.
417        """
418        storage_index = b"a" * 16
419        serverid = b"b" * 20
420        storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid))
421        rref_without_ueb = LocalWrapper(storage, fireNow)
422        yield write_bad_share(rref_without_ueb, storage_index)
423        server_without_ueb = NoNetworkServer(serverid, rref_without_ueb)
424        peers = {storage_index: [server_without_ueb]}
425        caf = offloaded.CHKCheckerAndUEBFetcher(
426            peers.get,
427            storage_index,
428            None,
429        )
430        self.assertThat(
431            caf.check(),
432            succeeded(Equals(False)),
433        )
434
435    @inline_callbacks
436    def test_not_enough_shares(self):
437        """
438        If fewer shares are found than are required to reassemble the data then
439        ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires
440        with ``False``.
441        """
442        storage_index = b"a" * 16
443        serverid = b"b" * 20
444        storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid))
445        rref_with_ueb = LocalWrapper(storage, fireNow)
446        ueb = {
447            "needed_shares": 2,
448            "total_shares": 2,
449            "segment_size": 128 * 1024,
450            "size": 1024,
451        }
452        yield write_good_share(rref_with_ueb, storage_index, ueb, [0])
453
454        server_with_ueb = NoNetworkServer(serverid, rref_with_ueb)
455        peers = {storage_index: [server_with_ueb]}
456        caf = offloaded.CHKCheckerAndUEBFetcher(
457            peers.get,
458            storage_index,
459            None,
460        )
461        self.assertThat(
462            caf.check(),
463            succeeded(Equals(False)),
464        )
465
466    @inline_callbacks
467    def test_enough_shares(self):
468        """
469        If enough shares are found to reassemble the data then
470        ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires
471        with share and share placement information.
472        """
473        storage_index = b"a" * 16
474        serverids = list(
475            ch * 20
476            for ch
477            in [b"b", b"c"]
478        )
479        storages = list(
480            FoolscapStorageServer(StorageServer(self.mktemp(), serverid))
481            for serverid
482            in serverids
483        )
484        rrefs_with_ueb = list(
485            LocalWrapper(storage, fireNow)
486            for storage
487            in storages
488        )
489        ueb = {
490            "needed_shares": len(serverids),
491            "total_shares": len(serverids),
492            "segment_size": 128 * 1024,
493            "size": 1024,
494        }
495        for n, rref_with_ueb in enumerate(rrefs_with_ueb):
496            yield write_good_share(rref_with_ueb, storage_index, ueb, [n])
497
498        servers_with_ueb = list(
499            NoNetworkServer(serverid, rref_with_ueb)
500            for (serverid, rref_with_ueb)
501            in zip(serverids, rrefs_with_ueb)
502        )
503        peers = {storage_index: servers_with_ueb}
504        caf = offloaded.CHKCheckerAndUEBFetcher(
505            peers.get,
506            storage_index,
507            None,
508        )
509        self.assertThat(
510            caf.check(),
511            succeeded(MatchesListwise([
512                Equals({
513                    n: {serverid}
514                    for (n, serverid)
515                    in enumerate(serverids)
516                }),
517                Equals(ueb),
518                IsInstance(bytes),
519            ])),
520        )
521
522
523def write_bad_share(storage_rref, storage_index):
524    """
525    Write a share with a corrupt URI extension block.
526    """
527    # Write some trash to the right bucket on this storage server.  It won't
528    # have a recoverable UEB block.
529    return write_share(storage_rref, storage_index, [0], b"\0" * 1024)
530
531
532def write_good_share(storage_rref, storage_index, ueb, sharenums):
533    """
534    Write a valid share with the given URI extension block.
535    """
536    write_proxy = make_write_bucket_proxy(
537        storage_rref,
538        None,
539        1024,
540        ueb["segment_size"],
541        1,
542        1,
543        ueb["size"],
544    )
545    # See allmydata/immutable/layout.py
546    offset = write_proxy._offsets["uri_extension"]
547    filler = b"\0" * (offset - len(write_proxy._offset_data))
548    ueb_data = uri.pack_extension(ueb)
549    data = (
550        write_proxy._offset_data +
551        filler +
552        pack(write_proxy.fieldstruct, len(ueb_data)) +
553        ueb_data
554    )
555    return write_share(storage_rref, storage_index, sharenums, data)
556
557
558@inline_callbacks
559def write_share(storage_rref, storage_index, sharenums, sharedata):
560    """
561    Write the given share data to the given storage index using the given
562    IStorageServer remote reference.
563
564    :param foolscap.ipb.IRemoteReference storage_rref: A remote reference to
565        an IStorageServer.
566
567    :param bytes storage_index: The storage index to which to write the share
568        data.
569
570    :param [int] sharenums: The share numbers to which to write this sharedata.
571
572    :param bytes sharedata: The ciphertext to write as the share.
573    """
574    ignored, writers = yield storage_rref.callRemote(
575        "allocate_buckets",
576        storage_index,
577        b"x" * 16,
578        b"x" * 16,
579        sharenums,
580        len(sharedata),
581        LocalWrapper(None),
582
583    )
584    [writer] = writers.values()
585    yield writer.callRemote("write", 0, sharedata)
586    yield writer.callRemote("close")
Note: See TracBrowser for help on using the repository browser.