source: trunk/src/allmydata/test/test_upload.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 87.3 KB
Line 
1# -*- coding: utf-8 -*-
2
3"""
4Ported to Python 3.
5"""
6
7import os, shutil
8from io import BytesIO
9from base64 import (
10    b64encode,
11)
12
13from hypothesis import (
14    given,
15)
16from hypothesis.strategies import (
17    just,
18    integers,
19)
20
21from twisted.trial import unittest
22from twisted.python.failure import Failure
23from twisted.internet import defer, task
24from foolscap.api import fireEventually
25
26import allmydata # for __full_version__
27from allmydata import uri, monitor, client
28from allmydata.immutable import upload, encode
29from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
30from allmydata.util import log, base32
31from allmydata.util.assertutil import precondition
32from allmydata.util.deferredutil import DeferredListShouldSucceed
33from allmydata.test.no_network import GridTestMixin
34from allmydata.storage_client import StorageFarmBroker
35from allmydata.storage.server import storage_index_to_dir
36from allmydata.client import _Client
37from .common import (
38    EMPTY_CLIENT_CONFIG,
39    ShouldFailMixin,
40)
41from functools import reduce
42
43
44MiB = 1024*1024
45
46def extract_uri(results):
47    return results.get_uri()
48
49class Uploadable(unittest.TestCase):
50    def shouldEqual(self, data, expected):
51        self.failUnless(isinstance(data, list))
52        for e in data:
53            self.failUnless(isinstance(e, bytes))
54        s = b"".join(data)
55        self.failUnlessEqual(s, expected)
56
57    def test_filehandle_random_key(self):
58        return self._test_filehandle(convergence=None)
59
60    def test_filehandle_convergent_encryption(self):
61        return self._test_filehandle(convergence=b"some convergence string")
62
63    def _test_filehandle(self, convergence):
64        s = BytesIO(b"a"*41)
65        u = upload.FileHandle(s, convergence=convergence)
66        d = u.get_size()
67        d.addCallback(self.failUnlessEqual, 41)
68        d.addCallback(lambda res: u.read(1))
69        d.addCallback(self.shouldEqual, b"a")
70        d.addCallback(lambda res: u.read(80))
71        d.addCallback(self.shouldEqual, b"a"*40)
72        d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
73        d.addCallback(lambda res: s.close()) # that privilege is reserved for us
74        return d
75
76    def test_filename(self):
77        basedir = "upload/Uploadable/test_filename"
78        os.makedirs(basedir)
79        fn = os.path.join(basedir, "file")
80        f = open(fn, "wb")
81        f.write(b"a"*41)
82        f.close()
83        u = upload.FileName(fn, convergence=None)
84        d = u.get_size()
85        d.addCallback(self.failUnlessEqual, 41)
86        d.addCallback(lambda res: u.read(1))
87        d.addCallback(self.shouldEqual, b"a")
88        d.addCallback(lambda res: u.read(80))
89        d.addCallback(self.shouldEqual, b"a"*40)
90        d.addCallback(lambda res: u.close())
91        return d
92
93    def test_data(self):
94        s = b"a"*41
95        u = upload.Data(s, convergence=None)
96        d = u.get_size()
97        d.addCallback(self.failUnlessEqual, 41)
98        d.addCallback(lambda res: u.read(1))
99        d.addCallback(self.shouldEqual, b"a")
100        d.addCallback(lambda res: u.read(80))
101        d.addCallback(self.shouldEqual, b"a"*40)
102        d.addCallback(lambda res: u.close())
103        return d
104
105class ServerError(Exception):
106    pass
107
108class SetDEPMixin(object):
109    def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
110        p = {"k": k,
111             "happy": happy,
112             "n": n,
113             "max_segment_size": max_segsize,
114             }
115        self.node.encoding_params = p
116
117
118# This doesn't actually implement the whole interface, but adding a commented
119# interface implementation annotation for grepping purposes.
120#@implementer(RIStorageServer)
121class FakeStorageServer(object):
122    """
123    A fake Foolscap remote object, implemented by overriding callRemote() to
124    call local methods.
125    """
126    def __init__(self, mode, reactor=None):
127        self.mode = mode
128        self.allocated = []
129        self._alloc_queries = 0
130        self._get_queries = 0
131        self.version = {
132            b"http://allmydata.org/tahoe/protocols/storage/v1" :
133            {
134                b"maximum-immutable-share-size": 2**32 - 1,
135            },
136            b"application-version": bytes(allmydata.__full_version__, "ascii"),
137        }
138        if mode == "small":
139            self.version = {
140                b"http://allmydata.org/tahoe/protocols/storage/v1" :
141                {
142                    b"maximum-immutable-share-size": 10,
143                },
144                b"application-version": bytes(allmydata.__full_version__, "ascii"),
145            }
146
147
148    def callRemote(self, methname, *args, **kwargs):
149        def _call():
150            meth = getattr(self, methname)
151            return meth(*args, **kwargs)
152        d = fireEventually()
153        d.addCallback(lambda res: _call())
154        return d
155
156    def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
157                         sharenums, share_size, canary):
158        # print("FakeStorageServer.allocate_buckets(num=%d, size=%d, mode=%s, queries=%d)" % (len(sharenums), share_size, self.mode, self._alloc_queries))
159        if self.mode == "timeout":
160            return defer.Deferred()
161        if self.mode == "first-fail":
162            if self._alloc_queries == 0:
163                raise ServerError
164        if self.mode == "second-fail":
165            if self._alloc_queries == 1:
166                raise ServerError
167        self._alloc_queries += 1
168        if self.mode == "full":
169            return (set(), {},)
170        elif self.mode == "already got them":
171            return (set(sharenums), {},)
172        else:
173            for shnum in sharenums:
174                self.allocated.append( (storage_index, shnum) )
175            return (set(),
176                    dict([( shnum, FakeBucketWriter(share_size) )
177                          for shnum in sharenums]),
178                    )
179
180    def get_buckets(self, storage_index, **kw):
181        # this should map shnum to a BucketReader but there isn't a
182        # handy FakeBucketReader and we don't actually read the shares
183        # back anyway (just the keys)
184        return {
185            shnum: None
186            for (si, shnum) in self.allocated
187            if si == storage_index
188        }
189
190
191
192class FakeBucketWriter(object):
193    # a diagnostic version of storageserver.BucketWriter
194    def __init__(self, size):
195        self.data = BytesIO()
196        self.closed = False
197        self._size = size
198
199    def callRemote(self, methname, *args, **kwargs):
200        def _call():
201            meth = getattr(self, "remote_" + methname)
202            return meth(*args, **kwargs)
203        d = fireEventually()
204        d.addCallback(lambda res: _call())
205        return d
206
207
208    def callRemoteOnly(self, methname, *args, **kwargs):
209        d = self.callRemote(methname, *args, **kwargs)
210        del d # callRemoteOnly ignores this
211        return None
212
213
214    def remote_write(self, offset, data):
215        precondition(not self.closed)
216        precondition(offset >= 0)
217        precondition(offset+len(data) <= self._size,
218                     "offset=%d + data=%d > size=%d" %
219                     (offset, len(data), self._size))
220        self.data.seek(offset)
221        self.data.write(data)
222
223    def remote_close(self):
224        precondition(not self.closed)
225        self.closed = True
226
227    def remote_abort(self):
228        pass
229
230class FakeClient(object):
231    DEFAULT_ENCODING_PARAMETERS = {
232        "k":25,
233        "happy": 25,
234        "n": 100,
235        "max_segment_size": 1 * MiB,
236    }
237
238    def __init__(self, mode="good", num_servers=50, reactor=None):
239        self.num_servers = num_servers
240        self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
241        if isinstance(mode, str):
242            mode = dict([i,mode] for i in range(num_servers))
243        servers = [
244            (b"%20d" % fakeid, FakeStorageServer(mode[fakeid], reactor=reactor))
245            for fakeid in range(self.num_servers)
246        ]
247        self.storage_broker = StorageFarmBroker(
248            permute_peers=True,
249            tub_maker=None,
250            node_config=EMPTY_CLIENT_CONFIG,
251        )
252        for (serverid, rref) in servers:
253            ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % str(base32.b2a(serverid), "ascii"),
254                   "permutation-seed-base32": base32.b2a(serverid) }
255            self.storage_broker.test_add_rref(serverid, rref, ann)
256        self.last_servers = [s[1] for s in servers]
257
258    def log(self, *args, **kwargs):
259        pass
260    def get_encoding_parameters(self):
261        return self.encoding_params
262    def get_storage_broker(self):
263        return self.storage_broker
264    _secret_holder = client.SecretHolder(b"lease secret", b"convergence secret")
265
266class GotTooFarError(Exception):
267    pass
268
269class GiganticUploadable(upload.FileHandle):
270    def __init__(self, size):
271        self._size = size
272        self._fp = 0
273
274    def get_encryption_key(self):
275        return defer.succeed(b"\x00" * 16)
276    def get_size(self):
277        return defer.succeed(self._size)
278    def read(self, length):
279        left = self._size - self._fp
280        length = min(left, length)
281        self._fp += length
282        if self._fp > 1000000:
283            # terminate the test early.
284            raise GotTooFarError("we shouldn't be allowed to get this far")
285        return defer.succeed([b"\x00" * length])
286    def close(self):
287        pass
288
289DATA = b"""
290Once upon a time, there was a beautiful princess named Buttercup. She lived
291in a magical land where every file was stored securely among millions of
292machines, and nobody ever worried about their data being lost ever again.
293The End.
294"""
295assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
296
297SIZE_ZERO = 0
298SIZE_SMALL = 16
299SIZE_LARGE = len(DATA)
300
301
302def upload_data(uploader, data, reactor=None):
303    u = upload.Data(data, convergence=None)
304    return uploader.upload(u, reactor=reactor)
305
306
307def upload_filename(uploader, filename, reactor=None):
308    u = upload.FileName(filename, convergence=None)
309    return uploader.upload(u, reactor=reactor)
310
311
312def upload_filehandle(uploader, fh, reactor=None):
313    u = upload.FileHandle(fh, convergence=None)
314    return uploader.upload(u, reactor=reactor)
315
316
317class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
318    def setUp(self):
319        self.node = FakeClient(mode="good")
320        self.u = upload.Uploader()
321        self.u.running = True
322        self.u.parent = self.node
323
324    def _check_small(self, newuri, size):
325        u = uri.from_string(newuri)
326        self.failUnless(isinstance(u, uri.LiteralFileURI))
327        self.failUnlessEqual(len(u.data), size)
328
329    def _check_large(self, newuri, size):
330        u = uri.from_string(newuri)
331        self.failUnless(isinstance(u, uri.CHKFileURI))
332        self.failUnless(isinstance(u.get_storage_index(), bytes))
333        self.failUnlessEqual(len(u.get_storage_index()), 16)
334        self.failUnless(isinstance(u.key, bytes))
335        self.failUnlessEqual(len(u.key), 16)
336        self.failUnlessEqual(u.size, size)
337
338    def get_data(self, size):
339        return DATA[:size]
340
341    def test_too_large(self):
342        # we've removed the 4GiB share size limit (see ticket #346 for
343        # details), but still have an 8-byte field, so the limit is now
344        # 2**64, so make sure we reject files larger than that.
345        k = 3; happy = 7; n = 10
346        self.set_encoding_parameters(k, happy, n)
347        big = k*(2**64)
348        data1 = GiganticUploadable(big)
349        d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
350                            "This file is too large to be uploaded (data_size)",
351                            self.u.upload, data1)
352        data2 = GiganticUploadable(big-3)
353        d.addCallback(lambda res:
354                      self.shouldFail(FileTooLargeError,
355                                      "test_too_large-data2",
356                                      "This file is too large to be uploaded (offsets)",
357                                      self.u.upload, data2))
358        # I don't know where the actual limit is.. it depends upon how large
359        # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
360        return d
361
362    def test_data_zero(self):
363        data = self.get_data(SIZE_ZERO)
364        d = upload_data(self.u, data)
365        d.addCallback(extract_uri)
366        d.addCallback(self._check_small, SIZE_ZERO)
367        return d
368
369    def test_data_small(self):
370        data = self.get_data(SIZE_SMALL)
371        d = upload_data(self.u, data)
372        d.addCallback(extract_uri)
373        d.addCallback(self._check_small, SIZE_SMALL)
374        return d
375
376    def test_data_large(self):
377        data = self.get_data(SIZE_LARGE)
378        d = upload_data(self.u, data)
379        d.addCallback(extract_uri)
380        d.addCallback(self._check_large, SIZE_LARGE)
381        return d
382
383    def test_data_large_odd_segments(self):
384        data = self.get_data(SIZE_LARGE)
385        segsize = int(SIZE_LARGE / 2.5)
386        # we want 3 segments, since that's not a power of two
387        self.set_encoding_parameters(25, 25, 100, segsize)
388        d = upload_data(self.u, data)
389        d.addCallback(extract_uri)
390        d.addCallback(self._check_large, SIZE_LARGE)
391        return d
392
393    def test_filehandle_zero(self):
394        data = self.get_data(SIZE_ZERO)
395        d = upload_filehandle(self.u, BytesIO(data))
396        d.addCallback(extract_uri)
397        d.addCallback(self._check_small, SIZE_ZERO)
398        return d
399
400    def test_filehandle_small(self):
401        data = self.get_data(SIZE_SMALL)
402        d = upload_filehandle(self.u, BytesIO(data))
403        d.addCallback(extract_uri)
404        d.addCallback(self._check_small, SIZE_SMALL)
405        return d
406
407    def test_filehandle_large(self):
408        data = self.get_data(SIZE_LARGE)
409        d = upload_filehandle(self.u, BytesIO(data))
410        d.addCallback(extract_uri)
411        d.addCallback(self._check_large, SIZE_LARGE)
412        return d
413
414    def test_filename_zero(self):
415        fn = "Uploader-test_filename_zero.data"
416        f = open(fn, "wb")
417        data = self.get_data(SIZE_ZERO)
418        f.write(data)
419        f.close()
420        d = upload_filename(self.u, fn)
421        d.addCallback(extract_uri)
422        d.addCallback(self._check_small, SIZE_ZERO)
423        return d
424
425    def test_filename_small(self):
426        fn = "Uploader-test_filename_small.data"
427        f = open(fn, "wb")
428        data = self.get_data(SIZE_SMALL)
429        f.write(data)
430        f.close()
431        d = upload_filename(self.u, fn)
432        d.addCallback(extract_uri)
433        d.addCallback(self._check_small, SIZE_SMALL)
434        return d
435
436    def test_filename_large(self):
437        fn = "Uploader-test_filename_large.data"
438        f = open(fn, "wb")
439        data = self.get_data(SIZE_LARGE)
440        f.write(data)
441        f.close()
442        d = upload_filename(self.u, fn)
443        d.addCallback(extract_uri)
444        d.addCallback(self._check_large, SIZE_LARGE)
445        return d
446
447class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
448    def make_node(self, mode, num_servers=10):
449        self.node = FakeClient(mode, num_servers)
450        self.u = upload.Uploader()
451        self.u.running = True
452        self.u.parent = self.node
453
454    def _check_large(self, newuri, size):
455        u = uri.from_string(newuri)
456        self.failUnless(isinstance(u, uri.CHKFileURI))
457        self.failUnless(isinstance(u.get_storage_index(), bytes))
458        self.failUnlessEqual(len(u.get_storage_index()), 16)
459        self.failUnless(isinstance(u.key, bytes))
460        self.failUnlessEqual(len(u.key), 16)
461        self.failUnlessEqual(u.size, size)
462
463    def test_first_error(self):
464        mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
465        self.make_node(mode)
466        self.set_encoding_parameters(k=25, happy=1, n=50)
467        d = upload_data(self.u, DATA)
468        d.addCallback(extract_uri)
469        d.addCallback(self._check_large, SIZE_LARGE)
470        return d
471
472    def test_first_error_all(self):
473        self.make_node("first-fail")
474        d = self.shouldFail(UploadUnhappinessError, "first_error_all",
475                            "server selection failed",
476                            upload_data, self.u, DATA)
477        def _check(f):
478            # for some reason this is passed as a 1-tuple
479            (f,) = f
480            self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
481            # there should also be a 'last failure was' message
482            self.failUnlessIn("ServerError", str(f.value))
483        d.addCallback(_check)
484        return d
485
486    def test_second_error_all(self):
487        self.make_node("second-fail")
488        d = self.shouldFail(UploadUnhappinessError, "second_error_all",
489                            "server selection failed",
490                            upload_data, self.u, DATA)
491        def _check(f):
492            # for some reason this is passed as a 1-tuple
493            (f,) = f
494            self.failUnlessIn("shares could be placed or found on only 10 server(s)", str(f.value))
495        d.addCallback(_check)
496        return d
497
498    def test_allocation_error_some(self):
499        self.make_node({
500            0: "good",
501            1: "good",
502            2: "good",
503            3: "good",
504            4: "good",
505            5: "first-fail",
506            6: "first-fail",
507            7: "first-fail",
508            8: "first-fail",
509            9: "first-fail",
510        })
511        self.set_encoding_parameters(3, 7, 10)
512        d = self.shouldFail(UploadUnhappinessError, "second_error_some",
513                            "server selection failed",
514                            upload_data, self.u, DATA)
515        def _check(f):
516            # for some reason this is passed as a 1-tuple
517            (f,) = f
518            self.failUnlessIn("shares could be placed on only 5 server(s)", str(f.value))
519        d.addCallback(_check)
520        return d
521
522    def test_allocation_error_recovery(self):
523        self.make_node({
524            0: "good",
525            1: "good",
526            2: "good",
527            3: "good",
528            4: "second-fail",
529            5: "second-fail",
530            6: "first-fail",
531            7: "first-fail",
532            8: "first-fail",
533            9: "first-fail",
534        })
535        self.set_encoding_parameters(3, 7, 10)
536        # we placed shares on 0 through 5, which wasn't enough. so
537        # then we looped and only placed on 0-3 (because now 4-9 have
538        # all failed) ... so the error message should say we only
539        # placed on 6 servers (not 4) because those two shares *did*
540        # at some point succeed.
541        d = self.shouldFail(UploadUnhappinessError, "second_error_some",
542                            "server selection failed",
543                            upload_data, self.u, DATA)
544        def _check(f):
545            # for some reason this is passed as a 1-tuple
546            (f,) = f
547            self.failUnlessIn("shares could be placed on only 6 server(s)", str(f.value))
548        d.addCallback(_check)
549        return d
550
551    def test_good_servers_stay_writable(self):
552        self.make_node({
553            0: "good",
554            1: "good",
555            2: "second-fail",
556            3: "second-fail",
557            4: "second-fail",
558            5: "first-fail",
559            6: "first-fail",
560            7: "first-fail",
561            8: "first-fail",
562            9: "first-fail",
563        })
564        self.set_encoding_parameters(3, 7, 10)
565        # we placed shares on 0 through 5, which wasn't enough. so
566        # then we looped and only placed on 0-3 (because now 4-9 have
567        # all failed) ... so the error message should say we only
568        # placed on 6 servers (not 4) because those two shares *did*
569        # at some point succeed.
570        d = self.shouldFail(UploadUnhappinessError, "good_servers_stay_writable",
571                            "server selection failed",
572                            upload_data, self.u, DATA)
573        def _check(f):
574            # for some reason this is passed as a 1-tuple
575            (f,) = f
576            self.failUnlessIn("shares could be placed on only 5 server(s)", str(f.value))
577        d.addCallback(_check)
578        return d
579
580    def test_timeout(self):
581        clock = task.Clock()
582        self.make_node("timeout")
583        self.set_encoding_parameters(k=25, happy=1, n=50)
584        d = self.shouldFail(
585            UploadUnhappinessError, __name__,
586            "server selection failed",
587            upload_data, self.u, DATA, reactor=clock,
588        )
589        # XXX double-check; it's doing 3 iterations?
590        # XXX should only do 1!
591        clock.advance(15)
592        clock.advance(15)
593        return d
594
595
596
597class FullServer(unittest.TestCase):
598    def setUp(self):
599        self.node = FakeClient(mode="full")
600        self.u = upload.Uploader()
601        self.u.running = True
602        self.u.parent = self.node
603
604    def _should_fail(self, f):
605        self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
606
607    def test_data_large(self):
608        data = DATA
609        d = upload_data(self.u, data)
610        d.addBoth(self._should_fail)
611        return d
612
613class ServerSelection(unittest.TestCase):
614
615    def make_client(self, num_servers=50):
616        self.node = FakeClient(mode="good", num_servers=num_servers)
617        self.u = upload.Uploader()
618        self.u.running = True
619        self.u.parent = self.node
620
621    def get_data(self, size):
622        return DATA[:size]
623
624    def _check_large(self, newuri, size):
625        u = uri.from_string(newuri)
626        self.failUnless(isinstance(u, uri.CHKFileURI))
627        self.failUnless(isinstance(u.get_storage_index(), bytes))
628        self.failUnlessEqual(len(u.get_storage_index()), 16)
629        self.failUnless(isinstance(u.key, bytes))
630        self.failUnlessEqual(len(u.key), 16)
631        self.failUnlessEqual(u.size, size)
632
633    def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
634        p = {"k": k,
635             "happy": happy,
636             "n": n,
637             "max_segment_size": max_segsize,
638             }
639        self.node.encoding_params = p
640
641    def test_one_each(self):
642        # if we have 50 shares, and there are 50 servers, and they all accept
643        # a share, we should get exactly one share per server
644
645        self.make_client()
646        data = self.get_data(SIZE_LARGE)
647        self.set_encoding_parameters(25, 30, 50)
648        d = upload_data(self.u, data)
649        d.addCallback(extract_uri)
650        d.addCallback(self._check_large, SIZE_LARGE)
651        def _check(res):
652            for s in self.node.last_servers:
653                allocated = s.allocated
654                self.failUnlessEqual(len(allocated), 1)
655                self.failUnlessEqual(s._alloc_queries, 1)
656        d.addCallback(_check)
657        return d
658
659    def test_two_each(self):
660        # if we have 100 shares, and there are 50 servers, and they all
661        # accept all shares, we should get exactly two shares per server
662
663        self.make_client()
664        data = self.get_data(SIZE_LARGE)
665        # if there are 50 servers, then happy needs to be <= 50
666        self.set_encoding_parameters(50, 50, 100)
667        d = upload_data(self.u, data)
668        d.addCallback(extract_uri)
669        d.addCallback(self._check_large, SIZE_LARGE)
670        def _check(res):
671            for s in self.node.last_servers:
672                allocated = s.allocated
673                self.failUnlessEqual(len(allocated), 2)
674                self.failUnlessEqual(s._alloc_queries, 1)
675        d.addCallback(_check)
676        return d
677
678    def test_one_each_plus_one_extra(self):
679        # if we have 51 shares, and there are 50 servers, then one server
680        # gets two shares and the rest get just one
681
682        self.make_client()
683        data = self.get_data(SIZE_LARGE)
684        self.set_encoding_parameters(24, 41, 51)
685        d = upload_data(self.u, data)
686        d.addCallback(extract_uri)
687        d.addCallback(self._check_large, SIZE_LARGE)
688        def _check(res):
689            got_one = []
690            got_two = []
691            for s in self.node.last_servers:
692                allocated = s.allocated
693                self.failUnless(len(allocated) in (1,2), len(allocated))
694                if len(allocated) == 1:
695                    self.failUnlessEqual(s._alloc_queries, 1)
696                    got_one.append(s)
697                else:
698                    self.failUnlessEqual(s._alloc_queries, 1)
699                    got_two.append(s)
700            self.failUnlessEqual(len(got_one), 49)
701            self.failUnlessEqual(len(got_two), 1)
702        d.addCallback(_check)
703        return d
704
705    def test_four_each(self):
706        # if we have 200 shares, and there are 50 servers, then each server
707        # gets 4 shares. The design goal is to accomplish this with only two
708        # queries per server.
709
710        self.make_client()
711        data = self.get_data(SIZE_LARGE)
712        # if there are 50 servers, then happy should be no more than 50 if we
713        # want this to work.
714        self.set_encoding_parameters(100, 50, 200)
715        d = upload_data(self.u, data)
716        d.addCallback(extract_uri)
717        d.addCallback(self._check_large, SIZE_LARGE)
718        def _check(res):
719            for s in self.node.last_servers:
720                allocated = s.allocated
721                self.failUnlessEqual(len(allocated), 4)
722                self.failUnlessEqual(s._alloc_queries, 1)
723        d.addCallback(_check)
724        return d
725
726    def test_three_of_ten(self):
727        # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
728        # 4+4+2
729
730        self.make_client(3)
731        data = self.get_data(SIZE_LARGE)
732        self.set_encoding_parameters(3, 3, 10)
733        d = upload_data(self.u, data)
734        d.addCallback(extract_uri)
735        d.addCallback(self._check_large, SIZE_LARGE)
736        def _check(res):
737            counts = {}
738            for s in self.node.last_servers:
739                allocated = s.allocated
740                counts[len(allocated)] = counts.get(len(allocated), 0) + 1
741            histogram = [counts.get(i, 0) for i in range(5)]
742            self.failUnlessEqual(histogram, [0,0,0,2,1])
743        d.addCallback(_check)
744        return d
745
746    def test_some_big_some_small(self):
747        # 10 shares, 20 servers, but half the servers don't support a
748        # share-size large enough for our file
749        mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
750        self.node = FakeClient(mode, num_servers=20)
751        self.u = upload.Uploader()
752        self.u.running = True
753        self.u.parent = self.node
754
755        data = self.get_data(SIZE_LARGE)
756        self.set_encoding_parameters(3, 5, 10)
757        d = upload_data(self.u, data)
758        d.addCallback(extract_uri)
759        d.addCallback(self._check_large, SIZE_LARGE)
760        def _check(res):
761            # we should have put one share each on the big servers, and zero
762            # shares on the small servers
763            total_allocated = 0
764            for p in self.node.last_servers:
765                if p.mode == "good":
766                    self.failUnlessEqual(len(p.allocated), 1)
767                elif p.mode == "small":
768                    self.failUnlessEqual(len(p.allocated), 0)
769                total_allocated += len(p.allocated)
770            self.failUnlessEqual(total_allocated, 10)
771        d.addCallback(_check)
772        return d
773
774    def test_number_of_servers_contacted(self):
775        # This tests ensures that Tahoe only contacts 2n servers
776        # during peer selection
777        self.make_client(40)
778        self.set_encoding_parameters(3, 7, 10)
779        data = self.get_data(SIZE_LARGE)
780        d = upload_data(self.u, data)
781        def _check(res):
782            servers_contacted = []
783            for s in self.node.last_servers:
784                if(s._alloc_queries != 0):
785                    servers_contacted.append(s)
786            self.failUnless(len(servers_contacted), 20)
787        d.addCallback(_check)
788        return d
789
790class StorageIndex(unittest.TestCase):
791    def test_params_must_matter(self):
792        DATA = b"I am some data"
793        PARAMS = _Client.DEFAULT_ENCODING_PARAMETERS
794
795        u = upload.Data(DATA, convergence=b"")
796        u.set_default_encoding_parameters(PARAMS)
797        eu = upload.EncryptAnUploadable(u)
798        d1 = eu.get_storage_index()
799
800        # CHK means the same data should encrypt the same way
801        u = upload.Data(DATA, convergence=b"")
802        u.set_default_encoding_parameters(PARAMS)
803        eu = upload.EncryptAnUploadable(u)
804        d1a = eu.get_storage_index()
805
806        # but if we use a different convergence string it should be different
807        u = upload.Data(DATA, convergence=b"wheee!")
808        u.set_default_encoding_parameters(PARAMS)
809        eu = upload.EncryptAnUploadable(u)
810        d1salt1 = eu.get_storage_index()
811
812        # and if we add yet a different convergence it should be different again
813        u = upload.Data(DATA, convergence=b"NOT wheee!")
814        u.set_default_encoding_parameters(PARAMS)
815        eu = upload.EncryptAnUploadable(u)
816        d1salt2 = eu.get_storage_index()
817
818        # and if we use the first string again it should be the same as last time
819        u = upload.Data(DATA, convergence=b"wheee!")
820        u.set_default_encoding_parameters(PARAMS)
821        eu = upload.EncryptAnUploadable(u)
822        d1salt1a = eu.get_storage_index()
823
824        # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
825        u = upload.Data(DATA, convergence=b"")
826        u.set_default_encoding_parameters(PARAMS)
827        u.encoding_param_k = u.default_encoding_param_k + 1
828        eu = upload.EncryptAnUploadable(u)
829        d2 = eu.get_storage_index()
830
831        # and if we use a random key, it should be different than the CHK
832        u = upload.Data(DATA, convergence=None)
833        u.set_default_encoding_parameters(PARAMS)
834        eu = upload.EncryptAnUploadable(u)
835        d3 = eu.get_storage_index()
836        # and different from another instance
837        u = upload.Data(DATA, convergence=None)
838        u.set_default_encoding_parameters(PARAMS)
839        eu = upload.EncryptAnUploadable(u)
840        d4 = eu.get_storage_index()
841
842        d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
843        def _done(res):
844            si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
845            self.failUnlessEqual(si1, si1a)
846            self.failIfEqual(si1, si2)
847            self.failIfEqual(si1, si3)
848            self.failIfEqual(si1, si4)
849            self.failIfEqual(si3, si4)
850            self.failIfEqual(si1salt1, si1)
851            self.failIfEqual(si1salt1, si1salt2)
852            self.failIfEqual(si1salt2, si1)
853            self.failUnlessEqual(si1salt1, si1salt1a)
854        d.addCallback(_done)
855        return d
856
857# copied from python docs because itertools.combinations was added in
858# python 2.6 and we support >= 2.4.
859def combinations(iterable, r):
860    # combinations('ABCD', 2) --> AB AC AD BC BD CD
861    # combinations(range(4), 3) --> 012 013 023 123
862    pool = tuple(iterable)
863    n = len(pool)
864    if r > n:
865        return
866    indices = list(range(r))
867    yield tuple(pool[i] for i in indices)
868    while True:
869        for i in reversed(list(range(r))):
870            if indices[i] != i + n - r:
871                break
872        else:
873            return
874        indices[i] += 1
875        for j in range(i+1, r):
876            indices[j] = indices[j-1] + 1
877        yield tuple(pool[i] for i in indices)
878
879def is_happy_enough(servertoshnums, h, k):
880    """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
881    if len(servertoshnums) < h:
882        return False
883    for happysetcombo in combinations(iter(servertoshnums.keys()), h):
884        for subsetcombo in combinations(happysetcombo, k):
885            shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
886            if len(shnums) < k:
887                return False
888    return True
889
890
891class FileHandleTests(unittest.TestCase):
892    """
893    Tests for ``FileHandle``.
894    """
895    def test_get_encryption_key_convergent(self):
896        """
897        When ``FileHandle`` is initialized with a convergence secret,
898        ``FileHandle.get_encryption_key`` returns a deterministic result that
899        is a function of that secret.
900        """
901        secret = b"\x42" * 16
902        handle = upload.FileHandle(BytesIO(b"hello world"), secret)
903        handle.set_default_encoding_parameters({
904            "k": 3,
905            "happy": 5,
906            "n": 10,
907            # Remember this is the *max* segment size.  In reality, the data
908            # size is much smaller so the actual segment size incorporated
909            # into the encryption key is also smaller.
910            "max_segment_size": 128 * 1024,
911        })
912
913        self.assertEqual(
914            b64encode(self.successResultOf(handle.get_encryption_key())),
915            b"oBcuR/wKdCgCV2GKKXqiNg==",
916        )
917
918
919class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
920    ShouldFailMixin):
921
922    def setUp(self):
923        d = super(EncodingParameters, self).setUp()
924        self._curdir = os.path.abspath(os.path.curdir)
925        return d
926
927    def tearDown(self):
928        d = super(EncodingParameters, self).tearDown()
929        self.assertEqual(
930            os.path.abspath(os.path.curdir),
931            self._curdir,
932        )
933        return d
934
935    def find_all_shares(self, unused=None):
936        """Locate shares on disk. Returns a dict that maps
937        server to set of sharenums.
938        """
939        assert self.g, "I tried to find a grid at self.g, but failed"
940        servertoshnums = {} # k: server, v: set(shnum)
941
942        for i, c in self.g.servers_by_number.items():
943            for (dirp, dirns, fns) in os.walk(c.sharedir):
944                for fn in fns:
945                    try:
946                        sharenum = int(fn)
947                    except TypeError:
948                        # Whoops, I guess that's not a share file then.
949                        pass
950                    else:
951                        servertoshnums.setdefault(i, set()).add(sharenum)
952
953        return servertoshnums
954
955    def _do_upload_with_broken_servers(self, servers_to_break):
956        """
957        I act like a normal upload, but before I send the results of
958        Tahoe2ServerSelector to the Encoder, I break the first
959        servers_to_break ServerTrackers in the upload_servers part of the
960        return result.
961        """
962        assert self.g, "I tried to find a grid at self.g, but failed"
963        broker = self.g.clients[0].storage_broker
964        sh     = self.g.clients[0]._secret_holder
965        data = upload.Data(b"data" * 10000, convergence=b"")
966        data.set_default_encoding_parameters({'k': 3, 'happy': 4, 'n': 10})
967        uploadable = upload.EncryptAnUploadable(data)
968        encoder = encode.Encoder()
969        encoder.set_encrypted_uploadable(uploadable)
970        status = upload.UploadStatus()
971        selector = upload.Tahoe2ServerSelector("dglev", "test", status)
972        storage_index = encoder.get_param("storage_index")
973        share_size = encoder.get_param("share_size")
974        block_size = encoder.get_param("block_size")
975        num_segments = encoder.get_param("num_segments")
976        d = selector.get_shareholders(broker, sh, storage_index,
977                                      share_size, block_size, num_segments,
978                                      10, 3, 4, encoder.get_uri_extension_size())
979        def _have_shareholders(upload_trackers_and_already_servers):
980            (upload_trackers, already_servers) = upload_trackers_and_already_servers
981            assert servers_to_break <= len(upload_trackers)
982            for index in range(servers_to_break):
983                tracker = list(upload_trackers)[index]
984                for share in list(tracker.buckets.keys()):
985                    tracker.buckets[share].abort()
986            buckets = {}
987            servermap = already_servers.copy()
988            for tracker in upload_trackers:
989                buckets.update(tracker.buckets)
990                for bucket in tracker.buckets:
991                    servermap.setdefault(bucket, set()).add(tracker.get_serverid())
992            encoder.set_shareholders(buckets, servermap)
993            d = encoder.start()
994            return d
995        d.addCallback(_have_shareholders)
996        return d
997
998    def _has_happy_share_distribution(self):
999        servertoshnums = self.find_all_shares()
1000        k = self.g.clients[0].encoding_params['k']
1001        h = self.g.clients[0].encoding_params['happy']
1002        return is_happy_enough(servertoshnums, h, k)
1003
1004    def _add_server(self, server_number, readonly=False):
1005        assert self.g, "I tried to find a grid at self.g, but failed"
1006        ss = self.g.make_server(server_number, readonly)
1007        log.msg("just created a server, number: %s => %s" % (server_number, ss,))
1008        self.g.add_server(server_number, ss)
1009        self.g.rebuild_serverlist()
1010
1011    def _add_server_with_share(self, server_number, share_number=None,
1012                               readonly=False):
1013        self._add_server(server_number, readonly)
1014        if share_number is not None:
1015            self._copy_share_to_server(share_number, server_number)
1016
1017
1018    def _copy_share_to_server(self, share_number, server_number):
1019        ss = self.g.servers_by_number[server_number]
1020        # Copy share i from the directory associated with the first
1021        # storage server to the directory associated with this one.
1022        assert self.g, "I tried to find a grid at self.g, but failed"
1023        assert self.shares, "I tried to find shares at self.shares, but failed"
1024        old_share_location = self.shares[share_number][2]
1025        new_share_location = os.path.join(ss.storedir, "shares")
1026        si = uri.from_string(self.uri).get_storage_index()
1027        new_share_location = os.path.join(new_share_location,
1028                                          storage_index_to_dir(si))
1029        if not os.path.exists(new_share_location):
1030            os.makedirs(new_share_location)
1031        new_share_location = os.path.join(new_share_location,
1032                                          str(share_number))
1033        if old_share_location != new_share_location:
1034            shutil.copy(old_share_location, new_share_location)
1035        shares = self.find_uri_shares(self.uri)
1036        # Make sure that the storage server has the share.
1037        self.failUnless((share_number, ss.my_nodeid, new_share_location)
1038                        in shares)
1039
1040    def _setup_grid(self):
1041        """
1042        I set up a NoNetworkGrid with a single server and client.
1043        """
1044        self.set_up_grid(num_clients=1, num_servers=1)
1045
1046    def _setup_and_upload(self, **kwargs):
1047        """
1048        I set up a NoNetworkGrid with a single server and client,
1049        upload a file to it, store its uri in self.uri, and store its
1050        sharedata in self.shares.
1051        """
1052        self._setup_grid()
1053        client = self.g.clients[0]
1054        client.encoding_params['happy'] = 1
1055        if "n" in kwargs and "k" in kwargs:
1056            client.encoding_params['k'] = kwargs['k']
1057            client.encoding_params['n'] = kwargs['n']
1058        data = upload.Data(b"data" * 10000, convergence=b"")
1059        self.data = data
1060        d = client.upload(data)
1061        def _store_uri(ur):
1062            self.uri = ur.get_uri()
1063        d.addCallback(_store_uri)
1064        d.addCallback(lambda ign:
1065            self.find_uri_shares(self.uri))
1066        def _store_shares(shares):
1067            self.shares = shares
1068        d.addCallback(_store_shares)
1069        return d
1070
1071    def test_configure_parameters(self):
1072        self.basedir = self.mktemp()
1073        hooks = {0: self._set_up_nodes_extra_config}
1074        self.set_up_grid(client_config_hooks=hooks)
1075        c0 = self.g.clients[0]
1076
1077        DATA = b"data" * 100
1078        u = upload.Data(DATA, convergence=b"")
1079        d = c0.upload(u)
1080        d.addCallback(lambda ur: c0.create_node_from_uri(ur.get_uri()))
1081        m = monitor.Monitor()
1082        d.addCallback(lambda fn: fn.check(m))
1083        def _check(cr):
1084            self.failUnlessEqual(cr.get_encoding_needed(), 7)
1085            self.failUnlessEqual(cr.get_encoding_expected(), 12)
1086        d.addCallback(_check)
1087        return d
1088
1089
1090    def _setUp(self, ns):
1091        # Used by test_happy_semantics and test_preexisting_share_behavior
1092        # to set up the grid.
1093        self.node = FakeClient(mode="good", num_servers=ns)
1094        self.u = upload.Uploader()
1095        self.u.running = True
1096        self.u.parent = self.node
1097
1098
1099    def test_happy_semantics(self):
1100        self._setUp(2)
1101        DATA = upload.Data(b"kittens" * 10000, convergence=b"")
1102        # These parameters are unsatisfiable with only 2 servers.
1103        self.set_encoding_parameters(k=3, happy=5, n=10)
1104        d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
1105                            "shares could be placed or found on only 2 "
1106                            "server(s). We were asked to place shares on "
1107                            "at least 5 server(s) such that any 3 of them "
1108                            "have enough shares to recover the file",
1109                            self.u.upload, DATA)
1110        # Let's reset the client to have 10 servers
1111        d.addCallback(lambda ign:
1112            self._setUp(10))
1113        # These parameters are satisfiable with 10 servers.
1114        d.addCallback(lambda ign:
1115            self.set_encoding_parameters(k=3, happy=5, n=10))
1116        d.addCallback(lambda ign:
1117            self.u.upload(DATA))
1118        # Let's reset the client to have 7 servers
1119        # (this is less than n, but more than h)
1120        d.addCallback(lambda ign:
1121            self._setUp(7))
1122        # These parameters are satisfiable with 7 servers.
1123        d.addCallback(lambda ign:
1124            self.set_encoding_parameters(k=3, happy=5, n=10))
1125        d.addCallback(lambda ign:
1126            self.u.upload(DATA))
1127        return d
1128
1129    def test_aborted_shares(self):
1130        self.basedir = "upload/EncodingParameters/aborted_shares"
1131        self.set_up_grid(num_servers=4)
1132        c = self.g.clients[0]
1133        DATA = upload.Data(100 * b"kittens", convergence=b"")
1134        # These parameters are unsatisfiable with only 4 servers, but should
1135        # work with 5, as long as the original 4 are not stuck in the open
1136        # BucketWriter state (open() but not
1137        parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
1138        c.encoding_params = parms
1139        d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
1140                            "shares could be placed on only 4 "
1141                            "server(s) such that any 2 of them have enough "
1142                            "shares to recover the file, but we were asked "
1143                            "to place shares on at least 5 such servers",
1144                            c.upload, DATA)
1145        # now add the 5th server
1146        d.addCallback(lambda ign: self._add_server(4, False))
1147        # and this time the upload ought to succeed
1148        d.addCallback(lambda ign: c.upload(DATA))
1149        d.addCallback(lambda ign:
1150            self.failUnless(self._has_happy_share_distribution()))
1151        return d
1152
1153
1154    def test_problem_layout_comment_52(self):
1155        def _basedir():
1156            self.basedir = self.mktemp()
1157        _basedir()
1158        # This scenario is at
1159        # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:52
1160        #
1161        # The scenario in comment:52 proposes that we have a layout
1162        # like:
1163        # server 0: shares 1 - 9
1164        # server 1: share 0, read-only
1165        # server 2: share 0, read-only
1166        # server 3: share 0, read-only
1167        # To get access to the shares, we will first upload to one
1168        # server, which will then have shares 0 - 9. We'll then
1169        # add three new servers, configure them to not accept any new
1170        # shares, then write share 0 directly into the serverdir of each,
1171        # and then remove share 0 from server 0 in the same way.
1172        # Then each of servers 1 - 3 will report that they have share 0,
1173        # and will not accept any new share, while server 0 will report that
1174        # it has shares 1 - 9 and will accept new shares.
1175        # We'll then set 'happy' = 4, and see that an upload fails
1176        # (as it should)
1177        d = self._setup_and_upload()
1178        d.addCallback(lambda ign:
1179            self._add_server_with_share(server_number=1, share_number=0,
1180                                        readonly=True))
1181        d.addCallback(lambda ign:
1182            self._add_server_with_share(server_number=2, share_number=0,
1183                                        readonly=True))
1184        d.addCallback(lambda ign:
1185            self._add_server_with_share(server_number=3, share_number=0,
1186                                        readonly=True))
1187        # Remove the first share from server 0.
1188        def _remove_share_0_from_server_0():
1189            share_location = self.shares[0][2]
1190            os.remove(share_location)
1191        d.addCallback(lambda ign:
1192            _remove_share_0_from_server_0())
1193        # Set happy = 4 in the client.
1194        def _prepare():
1195            client = self.g.clients[0]
1196            client.encoding_params['happy'] = 4
1197            return client
1198        d.addCallback(lambda ign:
1199            _prepare())
1200        # Uploading data should fail
1201        d.addCallback(lambda client:
1202            self.shouldFail(UploadUnhappinessError,
1203                            "test_problem_layout_comment_52_test_1",
1204                            "shares could be placed or found on 4 server(s), "
1205                            "but they are not spread out evenly enough to "
1206                            "ensure that any 3 of these servers would have "
1207                            "enough shares to recover the file. "
1208                            "We were asked to place shares on at "
1209                            "least 4 servers such that any 3 of them have "
1210                            "enough shares to recover the file",
1211                            client.upload, upload.Data(b"data" * 10000,
1212                                                       convergence=b"")))
1213
1214        # Do comment:52, but like this:
1215        # server 2: empty
1216        # server 3: share 0, read-only
1217        # server 1: share 0, read-only
1218        # server 0: shares 0-9
1219        d.addCallback(lambda ign:
1220            _basedir())
1221        d.addCallback(lambda ign:
1222            self._setup_and_upload())
1223        d.addCallback(lambda ign:
1224            self._add_server(server_number=2))
1225        d.addCallback(lambda ign:
1226            self._add_server_with_share(server_number=3, share_number=0,
1227                                        readonly=True))
1228        d.addCallback(lambda ign:
1229            self._add_server_with_share(server_number=1, share_number=0,
1230                                        readonly=True))
1231        def _prepare2():
1232            client = self.g.clients[0]
1233            client.encoding_params['happy'] = 4
1234            return client
1235        d.addCallback(lambda ign:
1236            _prepare2())
1237        d.addCallback(lambda client:
1238            self.shouldFail(UploadUnhappinessError,
1239                            "test_problem_layout_comment_52_test_2",
1240                            "shares could be placed on only 3 server(s) such "
1241                            "that any 3 of them have enough shares to recover "
1242                            "the file, but we were asked to place shares on "
1243                            "at least 4 such servers.",
1244                            client.upload, upload.Data(b"data" * 10000,
1245                                                       convergence=b"")))
1246        return d
1247
1248
1249    def test_problem_layout_comment_53(self):
1250        # This scenario is at
1251        # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:53
1252        #
1253        # Set up the grid to have one server
1254        def _change_basedir(ign):
1255            self.basedir = self.mktemp()
1256        _change_basedir(None)
1257        # We start by uploading all of the shares to one server.
1258        # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1259        # one share from our initial upload to each of these.
1260        # The counterintuitive ordering of the share numbers is to deal with
1261        # the permuting of these servers -- distributing the shares this
1262        # way ensures that the Tahoe2ServerSelector sees them in the order
1263        # described below.
1264        d = self._setup_and_upload()
1265        d.addCallback(lambda ign:
1266            self._add_server_with_share(server_number=1, share_number=2))
1267        d.addCallback(lambda ign:
1268            self._add_server_with_share(server_number=2, share_number=0))
1269        d.addCallback(lambda ign:
1270            self._add_server_with_share(server_number=3, share_number=1))
1271        # So, we now have the following layout:
1272        # server 0: shares 0 - 9
1273        # server 1: share 2
1274        # server 2: share 0
1275        # server 3: share 1
1276        # We change the 'happy' parameter in the client to 4.
1277        # The Tahoe2ServerSelector will see the servers permuted as:
1278        # 2, 3, 1, 0
1279        # Ideally, a reupload of our original data should work.
1280        def _reset_encoding_parameters(ign, happy=4):
1281            client = self.g.clients[0]
1282            client.encoding_params['happy'] = happy
1283            return client
1284        d.addCallback(_reset_encoding_parameters)
1285        d.addCallback(lambda client:
1286            client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1287        d.addCallback(lambda ign:
1288            self.failUnless(self._has_happy_share_distribution()))
1289
1290
1291        # This scenario is basically comment:53, but changed so that the
1292        # Tahoe2ServerSelector sees the server with all of the shares before
1293        # any of the other servers.
1294        # The layout is:
1295        # server 2: shares 0 - 9
1296        # server 3: share 0
1297        # server 1: share 1
1298        # server 4: share 2
1299        # The Tahoe2ServerSelector sees the servers permuted as:
1300        # 2, 3, 1, 4
1301        # Note that server 0 has been replaced by server 4; this makes it
1302        # easier to ensure that the last server seen by Tahoe2ServerSelector
1303        # has only one share.
1304        d.addCallback(_change_basedir)
1305        d.addCallback(lambda ign:
1306            self._setup_and_upload())
1307        d.addCallback(lambda ign:
1308            self._add_server_with_share(server_number=2, share_number=0))
1309        d.addCallback(lambda ign:
1310            self._add_server_with_share(server_number=3, share_number=1))
1311        d.addCallback(lambda ign:
1312            self._add_server_with_share(server_number=1, share_number=2))
1313        # Copy all of the other shares to server number 2
1314        def _copy_shares(ign):
1315            for i in range(0, 10):
1316                self._copy_share_to_server(i, 2)
1317        d.addCallback(_copy_shares)
1318        # Remove the first server, and add a placeholder with share 0
1319        d.addCallback(lambda ign:
1320            self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1321        d.addCallback(lambda ign:
1322            self._add_server_with_share(server_number=4, share_number=0))
1323        # Now try uploading.
1324        d.addCallback(_reset_encoding_parameters)
1325        d.addCallback(lambda client:
1326            client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1327        d.addCallback(lambda ign:
1328            self.failUnless(self._has_happy_share_distribution()))
1329
1330
1331        # Try the same thing, but with empty servers after the first one
1332        # We want to make sure that Tahoe2ServerSelector will redistribute
1333        # shares as necessary, not simply discover an existing layout.
1334        # The layout is:
1335        # server 2: shares 0 - 9
1336        # server 3: empty
1337        # server 1: empty
1338        # server 4: empty
1339        d.addCallback(_change_basedir)
1340        d.addCallback(lambda ign:
1341            self._setup_and_upload())
1342        d.addCallback(lambda ign:
1343            self._add_server(server_number=2))
1344        d.addCallback(lambda ign:
1345            self._add_server(server_number=3))
1346        d.addCallback(lambda ign:
1347            self._add_server(server_number=1))
1348        d.addCallback(lambda ign:
1349            self._add_server(server_number=4))
1350        d.addCallback(_copy_shares)
1351        d.addCallback(lambda ign:
1352            self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1353        d.addCallback(_reset_encoding_parameters)
1354        d.addCallback(lambda client:
1355            client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1356        # Make sure that only as many shares as necessary to satisfy
1357        # servers of happiness were pushed.
1358        d.addCallback(lambda results:
1359            self.failUnlessEqual(results.get_pushed_shares(), 3))
1360        d.addCallback(lambda ign:
1361            self.failUnless(self._has_happy_share_distribution()))
1362        return d
1363
1364    def test_problem_layout_ticket_1124(self):
1365        self.basedir = self.mktemp()
1366        d = self._setup_and_upload(k=2, n=4)
1367
1368        # server 0: shares 0, 1, 2, 3
1369        # server 1: shares 0, 3
1370        # server 2: share 1
1371        # server 3: share 2
1372        # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1373        def _setup(ign):
1374            self._add_server_with_share(server_number=0, share_number=None)
1375            self._add_server_with_share(server_number=1, share_number=0)
1376            self._add_server_with_share(server_number=2, share_number=1)
1377            self._add_server_with_share(server_number=3, share_number=2)
1378            # Copy shares
1379            self._copy_share_to_server(3, 1)
1380            client = self.g.clients[0]
1381            client.encoding_params['happy'] = 4
1382            return client
1383
1384        d.addCallback(_setup)
1385        d.addCallback(lambda client:
1386            client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1387        d.addCallback(lambda ign:
1388            self.failUnless(self._has_happy_share_distribution()))
1389        return d
1390
1391    def test_happiness_with_some_readonly_servers(self):
1392        # Try the following layout
1393        # server 2: shares 0-9
1394        # server 4: share 0, read-only
1395        # server 3: share 1, read-only
1396        # server 1: share 2, read-only
1397        self.basedir = self.mktemp()
1398        d = self._setup_and_upload()
1399        d.addCallback(lambda ign:
1400            self._add_server_with_share(server_number=2, share_number=0))
1401        d.addCallback(lambda ign:
1402            self._add_server_with_share(server_number=3, share_number=1,
1403                                        readonly=True))
1404        d.addCallback(lambda ign:
1405            self._add_server_with_share(server_number=1, share_number=2,
1406                                        readonly=True))
1407        # Copy all of the other shares to server number 2
1408        def _copy_shares(ign):
1409            for i in range(1, 10):
1410                self._copy_share_to_server(i, 2)
1411        d.addCallback(_copy_shares)
1412        # Remove server 0, and add another in its place
1413        d.addCallback(lambda ign:
1414            self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1415        d.addCallback(lambda ign:
1416            self._add_server_with_share(server_number=4, share_number=0,
1417                                        readonly=True))
1418        def _reset_encoding_parameters(ign, happy=4):
1419            client = self.g.clients[0]
1420            client.encoding_params['happy'] = happy
1421            return client
1422        d.addCallback(_reset_encoding_parameters)
1423        d.addCallback(lambda client:
1424            client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1425        d.addCallback(lambda ign:
1426            self.failUnless(self._has_happy_share_distribution()))
1427        return d
1428
1429
1430    def test_happiness_with_all_readonly_servers(self):
1431        # server 3: share 1, read-only
1432        # server 1: share 2, read-only
1433        # server 2: shares 0-9, read-only
1434        # server 4: share 0, read-only
1435        # The idea with this test is to make sure that the survey of
1436        # read-only servers doesn't undercount servers of happiness
1437        self.basedir = self.mktemp()
1438        d = self._setup_and_upload()
1439        d.addCallback(lambda ign:
1440            self._add_server_with_share(server_number=4, share_number=0,
1441                                        readonly=True))
1442        d.addCallback(lambda ign:
1443            self._add_server_with_share(server_number=3, share_number=1,
1444                                        readonly=True))
1445        d.addCallback(lambda ign:
1446            self._add_server_with_share(server_number=1, share_number=2,
1447                                        readonly=True))
1448        d.addCallback(lambda ign:
1449            self._add_server_with_share(server_number=2, share_number=0,
1450                                        readonly=True))
1451        def _copy_shares(ign):
1452            for i in range(1, 10):
1453                self._copy_share_to_server(i, 2)
1454        d.addCallback(_copy_shares)
1455        d.addCallback(lambda ign:
1456            self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1457        def _reset_encoding_parameters(ign, happy=4):
1458            client = self.g.clients[0]
1459            client.encoding_params['happy'] = happy
1460            return client
1461        d.addCallback(_reset_encoding_parameters)
1462        d.addCallback(lambda client:
1463            client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1464        d.addCallback(lambda ign:
1465            self.failUnless(self._has_happy_share_distribution()))
1466        return d
1467
1468
1469    def test_dropped_servers_in_encoder(self):
1470        # The Encoder does its own "servers_of_happiness" check if it
1471        # happens to lose a bucket during an upload (it assumes that
1472        # the layout presented to it satisfies "servers_of_happiness"
1473        # until a failure occurs)
1474        #
1475        # This test simulates an upload where servers break after server
1476        # selection, but before they are written to.
1477        def _set_basedir(ign=None):
1478            self.basedir = self.mktemp()
1479        _set_basedir()
1480        d = self._setup_and_upload();
1481        # Add 5 servers
1482        def _do_server_setup(ign):
1483            self._add_server(server_number=1)
1484            self._add_server(server_number=2)
1485            self._add_server(server_number=3)
1486            self._add_server(server_number=4)
1487            self._add_server(server_number=5)
1488        d.addCallback(_do_server_setup)
1489        # remove the original server
1490        # (necessary to ensure that the Tahoe2ServerSelector will distribute
1491        #  all the shares)
1492        def _remove_server(ign):
1493            server = self.g.servers_by_number[0]
1494            self.g.remove_server(server.my_nodeid)
1495        d.addCallback(_remove_server)
1496        # This should succeed; we still have 4 servers, and the
1497        # happiness of the upload is 4.
1498        d.addCallback(lambda ign:
1499            self._do_upload_with_broken_servers(1))
1500        # Now, do the same thing over again, but drop 2 servers instead
1501        # of 1. This should fail, because servers_of_happiness is 4 and
1502        # we can't satisfy that.
1503        d.addCallback(_set_basedir)
1504        d.addCallback(lambda ign:
1505            self._setup_and_upload())
1506        d.addCallback(_do_server_setup)
1507        d.addCallback(_remove_server)
1508        d.addCallback(lambda ign:
1509            self.shouldFail(UploadUnhappinessError,
1510                            "test_dropped_servers_in_encoder",
1511                            "shares could be placed on only 3 server(s) "
1512                            "such that any 3 of them have enough shares to "
1513                            "recover the file, but we were asked to place "
1514                            "shares on at least 4",
1515                            self._do_upload_with_broken_servers, 2))
1516        # Now do the same thing over again, but make some of the servers
1517        # readonly, break some of the ones that aren't, and make sure that
1518        # happiness accounting is preserved.
1519        d.addCallback(_set_basedir)
1520        d.addCallback(lambda ign:
1521            self._setup_and_upload())
1522        def _do_server_setup_2(ign):
1523            self._add_server(1)
1524            self._add_server(2)
1525            self._add_server(3)
1526            self._add_server_with_share(4, 7, readonly=True)
1527            self._add_server_with_share(5, 8, readonly=True)
1528        d.addCallback(_do_server_setup_2)
1529        d.addCallback(_remove_server)
1530        d.addCallback(lambda ign:
1531            self._do_upload_with_broken_servers(1))
1532        d.addCallback(_set_basedir)
1533        d.addCallback(lambda ign:
1534            self._setup_and_upload())
1535        d.addCallback(_do_server_setup_2)
1536        d.addCallback(_remove_server)
1537        d.addCallback(lambda ign:
1538            self.shouldFail(UploadUnhappinessError,
1539                            "test_dropped_servers_in_encoder",
1540                            "shares could be placed on only 3 server(s) "
1541                            "such that any 3 of them have enough shares to "
1542                            "recover the file, but we were asked to place "
1543                            "shares on at least 4",
1544                            self._do_upload_with_broken_servers, 2))
1545        return d
1546
1547    def test_existing_share_detection(self):
1548        self.basedir = self.mktemp()
1549        d = self._setup_and_upload()
1550        # Our final setup should look like this:
1551        # server 1: shares 0 - 9, read-only
1552        # server 2: empty
1553        # server 3: empty
1554        # server 4: empty
1555        # The purpose of this test is to make sure that the server selector
1556        # knows about the shares on server 1, even though it is read-only.
1557        # It used to simply filter these out, which would cause the test
1558        # to fail when servers_of_happiness = 4.
1559        d.addCallback(lambda ign:
1560            self._add_server_with_share(1, 0, True))
1561        d.addCallback(lambda ign:
1562            self._add_server(2))
1563        d.addCallback(lambda ign:
1564            self._add_server(3))
1565        d.addCallback(lambda ign:
1566            self._add_server(4))
1567        def _copy_shares(ign):
1568            for i in range(1, 10):
1569                self._copy_share_to_server(i, 1)
1570        d.addCallback(_copy_shares)
1571        d.addCallback(lambda ign:
1572            self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1573        def _prepare_client(ign):
1574            client = self.g.clients[0]
1575            client.encoding_params['happy'] = 4
1576            return client
1577        d.addCallback(_prepare_client)
1578        d.addCallback(lambda client:
1579            client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1580        d.addCallback(lambda ign:
1581            self.failUnless(self._has_happy_share_distribution()))
1582        return d
1583
1584
1585    def test_query_counting(self):
1586        # If server selection fails, Tahoe2ServerSelector prints out a lot
1587        # of helpful diagnostic information, including query stats.
1588        # This test helps make sure that that information is accurate.
1589        self.basedir = self.mktemp()
1590        d = self._setup_and_upload()
1591        def _setup(ign):
1592            for i in range(1, 11):
1593                self._add_server(server_number=i)
1594            self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1595            c = self.g.clients[0]
1596            # We set happy to an unsatisfiable value so that we can check the
1597            # counting in the exception message. The same progress message
1598            # is also used when the upload is successful, but in that case it
1599            # only gets written to a log, so we can't see what it says.
1600            c.encoding_params['happy'] = 45
1601            return c
1602        d.addCallback(_setup)
1603        d.addCallback(lambda c:
1604            self.shouldFail(UploadUnhappinessError, "test_query_counting",
1605                            "0 queries placed some shares",
1606                            c.upload, upload.Data(b"data" * 10000,
1607                                                  convergence=b"")))
1608        # Now try with some readonly servers. We want to make sure that
1609        # the readonly server share discovery phase is counted correctly.
1610        def _reset(ign):
1611            self.basedir = self.mktemp()
1612            self.g = None
1613        d.addCallback(_reset)
1614        d.addCallback(lambda ign:
1615            self._setup_and_upload())
1616        def _then(ign):
1617            for i in range(1, 11):
1618                self._add_server(server_number=i)
1619            self._add_server(server_number=11, readonly=True)
1620            self._add_server(server_number=12, readonly=True)
1621            self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1622            c = self.g.clients[0]
1623            c.encoding_params['happy'] = 45
1624            return c
1625        d.addCallback(_then)
1626        d.addCallback(lambda c:
1627            self.shouldFail(UploadUnhappinessError, "test_query_counting",
1628                            "4 placed none (of which 4 placed none due to "
1629                            "the server being full",
1630                            c.upload, upload.Data(b"data" * 10000,
1631                                                  convergence=b"")))
1632        # Now try the case where the upload process finds a bunch of the
1633        # shares that it wants to place on the first server, including
1634        # the one that it wanted to allocate there. Though no shares will
1635        # be allocated in this request, it should still be called
1636        # productive, since it caused some homeless shares to be
1637        # removed.
1638        d.addCallback(_reset)
1639        d.addCallback(lambda ign:
1640            self._setup_and_upload())
1641
1642        def _next(ign):
1643            for i in range(1, 11):
1644                self._add_server(server_number=i)
1645            # Copy all of the shares to server 9, since that will be
1646            # the first one that the selector sees.
1647            for i in range(10):
1648                self._copy_share_to_server(i, 9)
1649            # Remove server 0, and its contents
1650            self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1651            # Make happiness unsatisfiable
1652            c = self.g.clients[0]
1653            c.encoding_params['happy'] = 45
1654            return c
1655        d.addCallback(_next)
1656        d.addCallback(lambda c:
1657            self.shouldFail(UploadUnhappinessError, "test_query_counting",
1658                            "0 queries placed some shares",
1659                            c.upload, upload.Data(b"data" * 10000,
1660                                                  convergence=b"")))
1661        return d
1662
1663
1664    def test_upper_limit_on_readonly_queries(self):
1665        self.basedir = self.mktemp()
1666        d = self._setup_and_upload()
1667        def _then(ign):
1668            for i in range(1, 11):
1669                self._add_server(server_number=i, readonly=True)
1670            self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1671            c = self.g.clients[0]
1672            c.encoding_params['k'] = 2
1673            c.encoding_params['happy'] = 4
1674            c.encoding_params['n'] = 4
1675            return c
1676        d.addCallback(_then)
1677        d.addCallback(lambda client:
1678            self.shouldFail(UploadUnhappinessError,
1679                            "test_upper_limit_on_readonly_queries",
1680                            "sent 8 queries to 8 servers",
1681                            client.upload,
1682                            upload.Data(b'data' * 10000, convergence=b"")))
1683        return d
1684
1685
1686    def test_exception_messages_during_server_selection(self):
1687        # server 1: read-only, no shares
1688        # server 2: read-only, no shares
1689        # server 3: read-only, no shares
1690        # server 4: read-only, no shares
1691        # server 5: read-only, no shares
1692        # This will fail, but we want to make sure that the log messages
1693        # are informative about why it has failed.
1694        self.basedir = self.mktemp()
1695        d = self._setup_and_upload()
1696        d.addCallback(lambda ign:
1697            self._add_server(server_number=1, readonly=True))
1698        d.addCallback(lambda ign:
1699            self._add_server(server_number=2, readonly=True))
1700        d.addCallback(lambda ign:
1701            self._add_server(server_number=3, readonly=True))
1702        d.addCallback(lambda ign:
1703            self._add_server(server_number=4, readonly=True))
1704        d.addCallback(lambda ign:
1705            self._add_server(server_number=5, readonly=True))
1706        d.addCallback(lambda ign:
1707            self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1708        def _reset_encoding_parameters(ign, happy=4):
1709            client = self.g.clients[0]
1710            client.encoding_params['happy'] = happy
1711            return client
1712        d.addCallback(_reset_encoding_parameters)
1713        d.addCallback(lambda client:
1714            self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1715                            "placed 0 shares out of 10 "
1716                            "total (10 homeless), want to place shares on at "
1717                            "least 4 servers such that any 3 of them have "
1718                            "enough shares to recover the file, "
1719                            "sent 5 queries to 5 servers, 0 queries placed "
1720                            "some shares, 5 placed none "
1721                            "(of which 5 placed none due to the server being "
1722                            "full and 0 placed none due to an error)",
1723                            client.upload,
1724                            upload.Data(b"data" * 10000, convergence=b"")))
1725
1726
1727        # server 1: read-only, no shares
1728        # server 2: broken, no shares
1729        # server 3: read-only, no shares
1730        # server 4: read-only, no shares
1731        # server 5: read-only, no shares
1732        def _reset(ign):
1733            self.basedir = self.mktemp()
1734        d.addCallback(_reset)
1735        d.addCallback(lambda ign:
1736            self._setup_and_upload())
1737        d.addCallback(lambda ign:
1738            self._add_server(server_number=1, readonly=True))
1739        d.addCallback(lambda ign:
1740            self._add_server(server_number=2))
1741        def _break_server_2(ign):
1742            serverid = self.g.servers_by_number[2].my_nodeid
1743            self.g.break_server(serverid)
1744        d.addCallback(_break_server_2)
1745        d.addCallback(lambda ign:
1746            self._add_server(server_number=3, readonly=True))
1747        d.addCallback(lambda ign:
1748            self._add_server(server_number=4, readonly=True))
1749        d.addCallback(lambda ign:
1750            self._add_server(server_number=5, readonly=True))
1751        d.addCallback(lambda ign:
1752            self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1753        d.addCallback(_reset_encoding_parameters)
1754        d.addCallback(lambda client:
1755            self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1756                            "placed 0 shares out of 10 "
1757                            "total (10 homeless), want to place shares on at "
1758                            "least 4 servers such that any 3 of them have "
1759                            "enough shares to recover the file, "
1760                            "sent 5 queries to 5 servers, 0 queries placed "
1761                            "some shares, 5 placed none "
1762                            "(of which 4 placed none due to the server being "
1763                            "full and 1 placed none due to an error)",
1764                            client.upload,
1765                            upload.Data(b"data" * 10000, convergence=b"")))
1766        # server 0, server 1 = empty, accepting shares
1767        # This should place all of the shares, but still fail with happy=4.
1768        # We want to make sure that the exception message is worded correctly.
1769        d.addCallback(_reset)
1770        d.addCallback(lambda ign:
1771            self._setup_grid())
1772        d.addCallback(lambda ign:
1773            self._add_server(server_number=1))
1774        d.addCallback(_reset_encoding_parameters)
1775        d.addCallback(lambda client:
1776            self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1777                            "shares could be placed or found on only 2 "
1778                            "server(s). We were asked to place shares on at "
1779                            "least 4 server(s) such that any 3 of them have "
1780                            "enough shares to recover the file.",
1781                            client.upload, upload.Data(b"data" * 10000,
1782                                                       convergence=b"")))
1783        # servers 0 - 4 = empty, accepting shares
1784        # This too should place all the shares, and this too should fail,
1785        # but since the effective happiness is more than the k encoding
1786        # parameter, it should trigger a different error message than the one
1787        # above.
1788        d.addCallback(_reset)
1789        d.addCallback(lambda ign:
1790            self._setup_grid())
1791        d.addCallback(lambda ign:
1792            self._add_server(server_number=1))
1793        d.addCallback(lambda ign:
1794            self._add_server(server_number=2))
1795        d.addCallback(lambda ign:
1796            self._add_server(server_number=3))
1797        d.addCallback(lambda ign:
1798            self._add_server(server_number=4))
1799        d.addCallback(_reset_encoding_parameters, happy=7)
1800        d.addCallback(lambda client:
1801            self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1802                            "shares could be placed on only 5 server(s) such "
1803                            "that any 3 of them have enough shares to recover "
1804                            "the file, but we were asked to place shares on "
1805                            "at least 7 such servers.",
1806                            client.upload, upload.Data(b"data" * 10000,
1807                                                       convergence=b"")))
1808        # server 0: shares 0 - 9
1809        # server 1: share 0, read-only
1810        # server 2: share 0, read-only
1811        # server 3: share 0, read-only
1812        # This should place all of the shares, but fail with happy=4.
1813        # Since the number of servers with shares is more than the number
1814        # necessary to reconstitute the file, this will trigger a different
1815        # error message than either of those above.
1816        d.addCallback(_reset)
1817        d.addCallback(lambda ign:
1818            self._setup_and_upload())
1819        d.addCallback(lambda ign:
1820            self._add_server_with_share(server_number=1, share_number=0,
1821                                        readonly=True))
1822        d.addCallback(lambda ign:
1823            self._add_server_with_share(server_number=2, share_number=0,
1824                                        readonly=True))
1825        d.addCallback(lambda ign:
1826            self._add_server_with_share(server_number=3, share_number=0,
1827                                        readonly=True))
1828        d.addCallback(_reset_encoding_parameters, happy=7)
1829        d.addCallback(lambda client:
1830            self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1831                            "shares could be placed or found on 4 server(s), "
1832                            "but they are not spread out evenly enough to "
1833                            "ensure that any 3 of these servers would have "
1834                            "enough shares to recover the file. We were asked "
1835                            "to place shares on at least 7 servers such that "
1836                            "any 3 of them have enough shares to recover the "
1837                            "file",
1838                            client.upload, upload.Data(b"data" * 10000,
1839                                                       convergence=b"")))
1840        return d
1841
1842
1843    def test_problem_layout_comment_187(self):
1844        # #778 comment 187 broke an initial attempt at a share
1845        # redistribution algorithm. This test is here to demonstrate the
1846        # breakage, and to test that subsequent algorithms don't also
1847        # break in the same way.
1848        self.basedir = self.mktemp()
1849        d = self._setup_and_upload(k=2, n=3)
1850
1851        # server 1: shares 0, 1, 2, readonly
1852        # server 2: share 0, readonly
1853        # server 3: share 0
1854        def _setup(ign):
1855            self._add_server_with_share(server_number=1, share_number=0,
1856                                        readonly=True)
1857            self._add_server_with_share(server_number=2, share_number=0,
1858                                        readonly=True)
1859            self._add_server_with_share(server_number=3, share_number=0)
1860            # Copy shares
1861            self._copy_share_to_server(1, 1)
1862            self._copy_share_to_server(2, 1)
1863            # Remove server 0
1864            self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1865            client = self.g.clients[0]
1866            client.encoding_params['happy'] = 3
1867            return client
1868
1869        d.addCallback(_setup)
1870        d.addCallback(lambda client:
1871            client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1872        d.addCallback(lambda ign:
1873            self.failUnless(self._has_happy_share_distribution()))
1874        return d
1875
1876    def test_problem_layout_ticket_1118(self):
1877        # #1118 includes a report from a user who hit an assertion in
1878        # the upload code with this layout.
1879        # Note that 'servers of happiness' lets this test work now
1880        self.basedir = self.mktemp()
1881        d = self._setup_and_upload(k=2, n=4)
1882
1883        # server 0: no shares
1884        # server 1: shares 0, 3
1885        # server 3: share 1
1886        # server 2: share 2
1887        # The order that they get queries is 0, 1, 3, 2
1888        def _setup(ign):
1889            self._add_server(server_number=0)
1890            self._add_server_with_share(server_number=1, share_number=0)
1891            self._add_server_with_share(server_number=2, share_number=2)
1892            self._add_server_with_share(server_number=3, share_number=1)
1893            # Copy shares
1894            self._copy_share_to_server(3, 1)
1895            self.delete_all_shares(self.get_serverdir(0))
1896            client = self.g.clients[0]
1897            client.encoding_params['happy'] = 4
1898            return client
1899
1900        d.addCallback(_setup)
1901        return d
1902
1903    def test_problem_layout_ticket_1128(self):
1904        # #1118 includes a report from a user who hit an assertion in
1905        # the upload code with this layout.
1906        self.basedir = self.mktemp()
1907        d = self._setup_and_upload(k=2, n=4)
1908
1909        # server 0: no shares
1910        # server 1: shares 0, 3
1911        # server 3: share 1
1912        # server 2: share 2
1913        # The order that they get queries is 0, 1, 3, 2
1914        def _setup(ign):
1915            self._add_server(server_number=0)
1916            self._add_server_with_share(server_number=1, share_number=0)
1917            self._add_server_with_share(server_number=2, share_number=2)
1918            self._add_server_with_share(server_number=3, share_number=1)
1919            # Copy shares
1920            self._copy_share_to_server(3, 1)
1921            #Remove shares from server 0
1922            self.delete_all_shares(self.get_serverdir(0))
1923            client = self.g.clients[0]
1924            client.encoding_params['happy'] = 4
1925            return client
1926
1927        d.addCallback(_setup)
1928        d.addCallback(lambda client:
1929                          client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1930        d.addCallback(lambda ign:
1931            self.failUnless(self._has_happy_share_distribution()))
1932        return d
1933
1934    def test_upload_succeeds_with_some_homeless_shares(self):
1935        # If the upload is forced to stop trying to place shares before
1936        # it has placed (or otherwise accounted) for all of them, but it
1937        # has placed enough to satisfy the upload health criteria that
1938        # we're using, it should still succeed.
1939        self.basedir = self.mktemp()
1940        d = self._setup_and_upload()
1941        def _server_setup(ign):
1942            # Add four servers so that we have a layout like this:
1943            # server 1: share 0, read-only
1944            # server 2: share 1, read-only
1945            # server 3: share 2, read-only
1946            # server 4: share 3, read-only
1947            # If we set happy = 4, the upload will manage to satisfy
1948            # servers of happiness, but not place all of the shares; we
1949            # want to test that the upload is declared successful in
1950            # this case.
1951            self._add_server_with_share(server_number=1, share_number=0,
1952                                        readonly=True)
1953            self._add_server_with_share(server_number=2, share_number=1,
1954                                        readonly=True)
1955            self._add_server_with_share(server_number=3, share_number=2,
1956                                        readonly=True)
1957            self._add_server_with_share(server_number=4, share_number=3,
1958                                        readonly=True)
1959            # Remove server 0.
1960            self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1961            # Set the client appropriately
1962            c = self.g.clients[0]
1963            c.encoding_params['happy'] = 4
1964            return c
1965        d.addCallback(_server_setup)
1966        d.addCallback(lambda client:
1967            client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1968        d.addCallback(lambda ign:
1969            self.failUnless(self._has_happy_share_distribution()))
1970        return d
1971
1972
1973    def test_uploader_skips_over_servers_with_only_one_share(self):
1974        # We want to make sure that the redistribution logic ignores
1975        # servers with only one share, since placing these shares
1976        # elsewhere will at best keep happiness the same as it was, and
1977        # at worst hurt it.
1978        self.basedir = self.mktemp()
1979        d = self._setup_and_upload()
1980        def _server_setup(ign):
1981            # Add some servers so that the upload will need to
1982            # redistribute, but will first pass over a couple of servers
1983            # that don't have enough shares to redistribute before
1984            # finding one that does have shares to redistribute.
1985            self._add_server_with_share(server_number=1, share_number=0)
1986            self._add_server_with_share(server_number=2, share_number=2)
1987            self._add_server_with_share(server_number=3, share_number=1)
1988            self._add_server_with_share(server_number=8, share_number=4)
1989            self._add_server_with_share(server_number=5, share_number=5)
1990            self._add_server_with_share(server_number=10, share_number=7)
1991            for i in range(4):
1992                self._copy_share_to_server(i, 2)
1993            return self.g.clients[0]
1994        d.addCallback(_server_setup)
1995        d.addCallback(lambda client:
1996            client.upload(upload.Data(b"data" * 10000, convergence=b"")))
1997        d.addCallback(lambda ign:
1998            self.failUnless(self._has_happy_share_distribution()))
1999        return d
2000
2001
2002    def test_server_selector_bucket_abort(self):
2003        # If server selection for an upload fails due to an unhappy
2004        # layout, the server selection process should abort the buckets it
2005        # allocates before failing, so that the space can be re-used.
2006        self.basedir = self.mktemp()
2007        self.set_up_grid(num_servers=5)
2008
2009        # Try to upload a file with happy=7, which is unsatisfiable with
2010        # the current grid. This will fail, but should not take up any
2011        # space on the storage servers after it fails.
2012        client = self.g.clients[0]
2013        client.encoding_params['happy'] = 7
2014        d = defer.succeed(None)
2015        d.addCallback(lambda ignored:
2016            self.shouldFail(UploadUnhappinessError,
2017                            "test_server_selection_bucket_abort",
2018                            "",
2019                            client.upload, upload.Data(b"data" * 10000,
2020                                                       convergence=b"")))
2021        # wait for the abort messages to get there.
2022        def _turn_barrier(res):
2023            return fireEventually(res)
2024        d.addCallback(_turn_barrier)
2025        def _then(ignored):
2026            for server in list(self.g.servers_by_number.values()):
2027                self.failUnlessEqual(server.allocated_size(), 0)
2028        d.addCallback(_then)
2029        return d
2030
2031
2032    def test_encoder_bucket_abort(self):
2033        # If enough servers die in the process of encoding and uploading
2034        # a file to make the layout unhappy, we should cancel the
2035        # newly-allocated buckets before dying.
2036        self.basedir = self.mktemp()
2037        self.set_up_grid(num_servers=4)
2038
2039        client = self.g.clients[0]
2040        client.encoding_params['happy'] = 7
2041
2042        d = defer.succeed(None)
2043        d.addCallback(lambda ignored:
2044            self.shouldFail(UploadUnhappinessError,
2045                            "test_encoder_bucket_abort",
2046                            "",
2047                            self._do_upload_with_broken_servers, 1))
2048        def _turn_barrier(res):
2049            return fireEventually(res)
2050        d.addCallback(_turn_barrier)
2051        def _then(ignored):
2052            for server in list(self.g.servers_by_number.values()):
2053                self.failUnlessEqual(server.allocated_size(), 0)
2054        d.addCallback(_then)
2055        return d
2056
2057
2058    def _set_up_nodes_extra_config(self, clientdir):
2059        cfgfn = os.path.join(clientdir, "tahoe.cfg")
2060        oldcfg = open(cfgfn, "r").read()
2061        f = open(cfgfn, "wt")
2062        f.write(oldcfg)
2063        f.write("\n")
2064        f.write("[client]\n")
2065        f.write("shares.needed = 7\n")
2066        f.write("shares.total = 12\n")
2067        f.write("\n")
2068        f.close()
2069        return None
2070
2071
2072class EncryptAnUploadableTests(unittest.TestCase):
2073    """
2074    Tests for ``EncryptAnUploadable``.
2075    """
2076    def test_same_length(self):
2077        """
2078        ``EncryptAnUploadable.read_encrypted`` returns ciphertext of the same
2079        length as the underlying plaintext.
2080        """
2081        plaintext = b"hello world"
2082        uploadable = upload.FileHandle(BytesIO(plaintext), None)
2083        uploadable.set_default_encoding_parameters({
2084            # These values shouldn't matter.
2085            "k": 3,
2086            "happy": 5,
2087            "n": 10,
2088            "max_segment_size": 128 * 1024,
2089        })
2090        encrypter = upload.EncryptAnUploadable(uploadable)
2091        ciphertext = b"".join(self.successResultOf(encrypter.read_encrypted(1024, False)))
2092        self.assertEqual(len(ciphertext), len(plaintext))
2093
2094    @given(just(b"hello world"), integers(min_value=0, max_value=len(b"hello world")))
2095    def test_known_result(self, plaintext, split_at):
2096        """
2097        ``EncryptAnUploadable.read_encrypted`` returns a known-correct ciphertext
2098        string for certain inputs.  The ciphertext is independent of the read
2099        sizes.
2100        """
2101        convergence = b"\x42" * 16
2102        uploadable = upload.FileHandle(BytesIO(plaintext), convergence)
2103        uploadable.set_default_encoding_parameters({
2104            # The convergence key is a function of k, n, and max_segment_size
2105            # (among other things).  The value for happy doesn't matter
2106            # though.
2107            "k": 3,
2108            "happy": 5,
2109            "n": 10,
2110            "max_segment_size": 128 * 1024,
2111        })
2112        encrypter = upload.EncryptAnUploadable(uploadable)
2113        def read(n):
2114            return b"".join(self.successResultOf(encrypter.read_encrypted(n, False)))
2115
2116        # Read the string in one or two pieces to make sure underlying state
2117        # is maintained properly.
2118        first = read(split_at)
2119        second = read(len(plaintext) - split_at)
2120        third = read(1)
2121        ciphertext = first + second + third
2122
2123        self.assertEqual(
2124            b"Jd2LHCRXozwrEJc=",
2125            b64encode(ciphertext),
2126        )
2127
2128    def test_large_read(self):
2129        """
2130        ``EncryptAnUploadable.read_encrypted`` succeeds even when the requested
2131        data length is much larger than the chunk size.
2132        """
2133        convergence = b"\x42" * 16
2134        # 4kB of plaintext
2135        plaintext = b"\xde\xad\xbe\xef" * 1024
2136        uploadable = upload.FileHandle(BytesIO(plaintext), convergence)
2137        uploadable.set_default_encoding_parameters({
2138            "k": 3,
2139            "happy": 5,
2140            "n": 10,
2141            "max_segment_size": 128 * 1024,
2142        })
2143        # Make the chunk size very small so we don't have to operate on a huge
2144        # amount of data to exercise the relevant codepath.
2145        encrypter = upload.EncryptAnUploadable(uploadable, chunk_size=1)
2146        d = encrypter.read_encrypted(len(plaintext), False)
2147        ciphertext = self.successResultOf(d)
2148        self.assertEqual(
2149            list(map(len, ciphertext)),
2150            # Chunk size was specified as 1 above so we will get the whole
2151            # plaintext in one byte chunks.
2152            [1] * len(plaintext),
2153        )
2154
2155
2156# TODO:
2157#  upload with exactly 75 servers (shares_of_happiness)
2158#  have a download fail
2159#  cancel a download (need to implement more cancel stuff)
2160
2161# from test_encode:
2162# NoNetworkGrid, upload part of ciphertext, kill server, continue upload
2163# check with Kevan, they want to live in test_upload, existing tests might cover
2164#     def test_lost_one_shareholder(self): # these are upload-side tests
2165#     def test_lost_one_shareholder_early(self):
2166#     def test_lost_many_shareholders(self):
2167#     def test_lost_all_shareholders(self):
Note: See TracBrowser for help on using the repository browser.