source: trunk/src/allmydata/test/test_storage_http.py

Last change on this file was fced1ab0, checked in by Itamar Turner-Trauring <itamar@…>, at 2024-01-24T18:50:55Z

Switch to using pycddl for CBOR decoding.

  • Property mode set to 100644
File size: 62.4 KB
Line 
1"""
2Tests for HTTP storage client + server.
3
4The tests here are synchronous and don't involve running a real reactor.  This
5works, but has some caveats when it comes to testing HTTP endpoints:
6
7* Some HTTP endpoints are synchronous, some are not.
8* For synchronous endpoints, the result is immediately available on the
9  ``Deferred`` coming out of ``StubTreq``.
10* For asynchronous endpoints, you need to use ``StubTreq.flush()`` and
11  iterate the fake in-memory clock/reactor to advance time .
12
13So for HTTP endpoints, you should use ``HttpTestFixture.result_of_with_flush()``
14which handles both, and patches and moves forward the global Twisted
15``Cooperator`` since that is used to drive pull producers. This is,
16sadly, an internal implementation detail of Twisted being leaked to tests...
17
18For definitely synchronous calls, you can just use ``result_of()``.
19"""
20
21import time
22from base64 import b64encode
23from contextlib import contextmanager
24from os import urandom
25from typing import Union, Callable, Tuple, Iterable
26from queue import Queue
27from pycddl import ValidationError as CDDLValidationError
28from hypothesis import assume, given, strategies as st, settings as hypothesis_settings
29from fixtures import Fixture, TempDir, MonkeyPatch
30from treq.testing import StubTreq
31from klein import Klein
32from hyperlink import DecodedURL
33from collections_extended import RangeMap
34from twisted.internet.task import Clock, Cooperator
35from twisted.internet.interfaces import IReactorTime, IReactorFromThreads
36from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
37from twisted.web import http
38from twisted.web.http_headers import Headers
39from werkzeug import routing
40from werkzeug.exceptions import NotFound as WNotFound
41from testtools.matchers import Equals
42from zope.interface import implementer
43
44from ..util.cbor import dumps
45from ..util.deferredutil import async_to_deferred
46from ..util.cputhreadpool import disable_thread_pool_for_test
47from .common import SyncTestCase
48from ..storage.http_common import (
49    get_content_type,
50    CBOR_MIME_TYPE,
51    response_is_not_html,
52)
53from ..storage.common import si_b2a
54from ..storage.lease import LeaseInfo
55from ..storage.server import StorageServer
56from ..storage.http_server import (
57    HTTPServer,
58    _extract_secrets,
59    Secrets,
60    ClientSecretsException,
61    _authorized_route,
62    StorageIndexConverter,
63    _add_error_handling,
64    read_encoded,
65    _SCHEMAS as SERVER_SCHEMAS,
66    BaseApp,
67)
68from ..storage.http_client import (
69    StorageClient,
70    StorageClientFactory,
71    ClientException,
72    StorageClientImmutables,
73    ImmutableCreateResult,
74    UploadProgress,
75    StorageClientGeneral,
76    _encode_si,
77    StorageClientMutables,
78    TestWriteVectors,
79    WriteVector,
80    ReadVector,
81    ReadTestWriteResult,
82    TestVector,
83    limited_content,
84)
85
86
87class HTTPUtilities(SyncTestCase):
88    """Tests for HTTP common utilities."""
89
90    def test_get_content_type(self):
91        """``get_content_type()`` extracts the content-type from the header."""
92
93        def assert_header_values_result(values, expected_content_type):
94            headers = Headers()
95            if values:
96                headers.setRawHeaders("Content-Type", values)
97            content_type = get_content_type(headers)
98            self.assertEqual(content_type, expected_content_type)
99
100        assert_header_values_result(["text/html"], "text/html")
101        assert_header_values_result([], None)
102        assert_header_values_result(["text/plain", "application/json"], "text/plain")
103        assert_header_values_result(["text/html;encoding=utf-8"], "text/html")
104
105
106def _post_process(params):
107    secret_types, secrets = params
108    secrets = {t: s for (t, s) in zip(secret_types, secrets)}
109    headers = [
110        "{} {}".format(
111            secret_type.value, str(b64encode(secrets[secret_type]), "ascii").strip()
112        )
113        for secret_type in secret_types
114    ]
115    return secrets, headers
116
117
118# Creates a tuple of ({Secret enum value: secret_bytes}, [http headers with secrets]).
119SECRETS_STRATEGY = (
120    st.sets(st.sampled_from(Secrets))
121    .flatmap(
122        lambda secret_types: st.tuples(
123            st.just(secret_types),
124            st.lists(
125                st.binary(min_size=32, max_size=32),
126                min_size=len(secret_types),
127                max_size=len(secret_types),
128            ),
129        )
130    )
131    .map(_post_process)
132)
133
134
135class ExtractSecretsTests(SyncTestCase):
136    """
137    Tests for ``_extract_secrets``.
138    """
139
140    @given(secrets_to_send=SECRETS_STRATEGY)
141    def test_extract_secrets(self, secrets_to_send):
142        """
143        ``_extract_secrets()`` returns a dictionary with the extracted secrets
144        if the input secrets match the required secrets.
145        """
146        secrets, headers = secrets_to_send
147
148        # No secrets needed, none given:
149        self.assertEqual(_extract_secrets(headers, secrets.keys()), secrets)
150
151    @given(
152        secrets_to_send=SECRETS_STRATEGY,
153        secrets_to_require=st.sets(st.sampled_from(Secrets)),
154    )
155    def test_wrong_number_of_secrets(self, secrets_to_send, secrets_to_require):
156        """
157        If the wrong number of secrets are passed to ``_extract_secrets``, a
158        ``ClientSecretsException`` is raised.
159        """
160        secrets_to_send, headers = secrets_to_send
161        assume(secrets_to_send.keys() != secrets_to_require)
162
163        with self.assertRaises(ClientSecretsException):
164            _extract_secrets(headers, secrets_to_require)
165
166    def test_bad_secret_missing_value(self):
167        """
168        Missing value in ``_extract_secrets`` result in
169        ``ClientSecretsException``.
170        """
171        with self.assertRaises(ClientSecretsException):
172            _extract_secrets(["lease-renew-secret"], {Secrets.LEASE_RENEW})
173
174    def test_bad_secret_unknown_prefix(self):
175        """
176        Missing value in ``_extract_secrets`` result in
177        ``ClientSecretsException``.
178        """
179        with self.assertRaises(ClientSecretsException):
180            _extract_secrets(["FOO eA=="], set())
181
182    def test_bad_secret_not_base64(self):
183        """
184        A non-base64 value in ``_extract_secrets`` result in
185        ``ClientSecretsException``.
186        """
187        with self.assertRaises(ClientSecretsException):
188            _extract_secrets(["lease-renew-secret x"], {Secrets.LEASE_RENEW})
189
190    def test_bad_secret_wrong_length_lease_renew(self):
191        """
192        Lease renewal secrets must be 32-bytes long.
193        """
194        with self.assertRaises(ClientSecretsException):
195            _extract_secrets(["lease-renew-secret eA=="], {Secrets.LEASE_RENEW})
196
197    def test_bad_secret_wrong_length_lease_cancel(self):
198        """
199        Lease cancel secrets must be 32-bytes long.
200        """
201        with self.assertRaises(ClientSecretsException):
202            _extract_secrets(["lease-cancel-secret eA=="], {Secrets.LEASE_RENEW})
203
204
205class RouteConverterTests(SyncTestCase):
206    """Tests for custom werkzeug path segment converters."""
207
208    adapter = routing.Map(
209        [
210            routing.Rule(
211                "/<storage_index:storage_index>/", endpoint="si", methods=["GET"]
212            )
213        ],
214        converters={"storage_index": StorageIndexConverter},
215    ).bind("example.com", "/")
216
217    @given(storage_index=st.binary(min_size=16, max_size=16))
218    def test_good_storage_index_is_parsed(self, storage_index):
219        """
220        A valid storage index is accepted and parsed back out by
221        StorageIndexConverter.
222        """
223        self.assertEqual(
224            self.adapter.match(
225                "/{}/".format(str(si_b2a(storage_index), "ascii")), method="GET"
226            ),
227            ("si", {"storage_index": storage_index}),
228        )
229
230    def test_long_storage_index_is_not_parsed(self):
231        """An overly long storage_index string is not parsed."""
232        with self.assertRaises(WNotFound):
233            self.adapter.match("/{}/".format("a" * 27), method="GET")
234
235    def test_short_storage_index_is_not_parsed(self):
236        """An overly short storage_index string is not parsed."""
237        with self.assertRaises(WNotFound):
238            self.adapter.match("/{}/".format("a" * 25), method="GET")
239
240    def test_bad_characters_storage_index_is_not_parsed(self):
241        """A storage_index string with bad characters is not parsed."""
242        with self.assertRaises(WNotFound):
243            self.adapter.match("/{}_/".format("a" * 25), method="GET")
244
245    def test_invalid_storage_index_is_not_parsed(self):
246        """An invalid storage_index string is not parsed."""
247        with self.assertRaises(WNotFound):
248            self.adapter.match("/nomd2a65ylxjbqzsw7gcfh4ivr/", method="GET")
249
250
251# TODO should be actual swissnum
252SWISSNUM_FOR_TEST = b"abcd"
253
254
255def gen_bytes(length: int) -> bytes:
256    """Generate bytes to the given length."""
257    result = (b"0123456789abcdef" * ((length // 16) + 1))[:length]
258    assert len(result) == length
259    return result
260
261
262class TestApp(BaseApp):
263    """HTTP API for testing purposes."""
264
265    clock: IReactorTime
266    _app = Klein()
267    _add_error_handling(_app)
268    _swissnum = SWISSNUM_FOR_TEST  # Match what the test client is using
269
270    @_authorized_route(_app, set(), "/noop", methods=["GET"])
271    def noop(self, request, authorization):
272        return "noop"
273
274    @_authorized_route(_app, {Secrets.UPLOAD}, "/upload_secret", methods=["GET"])
275    def validate_upload_secret(self, request, authorization):
276        if authorization == {Secrets.UPLOAD: b"MAGIC"}:
277            return "GOOD SECRET"
278        else:
279            return "BAD: {}".format(authorization)
280
281    @_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"])
282    def bad_version(self, request, authorization):
283        """Return version result that violates the expected schema."""
284        request.setHeader("content-type", CBOR_MIME_TYPE)
285        return dumps({"garbage": 123})
286
287    @_authorized_route(_app, set(), "/bytes/<int:length>", methods=["GET"])
288    def generate_bytes(self, request, authorization, length):
289        """Return bytes to the given length using ``gen_bytes()``."""
290        return gen_bytes(length)
291
292    @_authorized_route(_app, set(), "/slowly_never_finish_result", methods=["GET"])
293    def slowly_never_finish_result(self, request, authorization):
294        """
295        Send data immediately, after 59 seconds, after another 59 seconds, and then
296        never again, without finishing the response.
297        """
298        request.write(b"a")
299        self.clock.callLater(59, request.write, b"b")
300        self.clock.callLater(59 + 59, request.write, b"c")
301        return Deferred()
302
303    @_authorized_route(_app, set(), "/die_unfinished", methods=["GET"])
304    def die(self, request, authorization):
305        """
306        Dies half-way.
307        """
308        request.transport.loseConnection()
309        return Deferred()
310
311    @_authorized_route(_app, set(), "/read_body", methods=["POST"])
312    @async_to_deferred
313    async def read_body(self, request, authorization):
314        """
315        Accept an advise_corrupt_share message, return the reason.
316
317        I.e. exercise codepaths used for reading CBOR from the body.
318        """
319        data = await read_encoded(
320            self.clock, request, SERVER_SCHEMAS["advise_corrupt_share"]
321        )
322        return data["reason"]
323
324
325def result_of(d):
326    """
327    Synchronously extract the result of a Deferred.
328    """
329    result = []
330    error = []
331    d.addCallbacks(result.append, error.append)
332    if result:
333        return result[0]
334    if error:
335        error[0].raiseException()
336    raise RuntimeError(
337        "We expected given Deferred to have result already, but it wasn't. "
338        + "This is probably a test design issue."
339    )
340
341
342class CustomHTTPServerTests(SyncTestCase):
343    """
344    Tests that use a custom HTTP server.
345    """
346
347    def setUp(self):
348        super(CustomHTTPServerTests, self).setUp()
349        disable_thread_pool_for_test(self)
350        StorageClientFactory.start_test_mode(
351            lambda pool: self.addCleanup(pool.closeCachedConnections)
352        )
353        self.addCleanup(StorageClientFactory.stop_test_mode)
354        # Could be a fixture, but will only be used in this test class so not
355        # going to bother:
356        self._http_server = TestApp()
357        treq = StubTreq(self._http_server._app.resource())
358        self.client = StorageClient(
359            DecodedURL.from_text("http://127.0.0.1"),
360            SWISSNUM_FOR_TEST,
361            treq=treq,
362            pool=None,
363            # We're using a Treq private API to get the reactor, alas, but only
364            # in a test, so not going to worry about it too much. This would be
365            # fixed if https://github.com/twisted/treq/issues/226 were ever
366            # fixed.
367            clock=treq._agent._memoryReactor,
368            analyze_response=response_is_not_html,
369        )
370        self._http_server.clock = self.client._clock
371
372    def test_bad_swissnum_from_client(self) -> None:
373        """
374        If the swissnum is invalid, a BAD REQUEST response code is returned.
375        """
376        headers = Headers()
377        # The value is not UTF-8.
378        headers.addRawHeader("Authorization", b"\x00\xFF\x00\xFF")
379        response = result_of(
380            self.client._treq.request(
381                "GET",
382                DecodedURL.from_text("http://127.0.0.1/noop"),
383                headers=headers,
384            )
385        )
386        self.assertEqual(response.code, 400)
387
388    def test_bad_secret(self) -> None:
389        """
390        If the secret is invalid (not base64), a BAD REQUEST
391        response code is returned.
392        """
393        bad_secret = b"upload-secret []<>"
394        headers = Headers()
395        headers.addRawHeader(
396            "X-Tahoe-Authorization",
397            bad_secret,
398        )
399        response = result_of(
400            self.client.request(
401                "GET",
402                DecodedURL.from_text("http://127.0.0.1/upload_secret"),
403                headers=headers,
404            )
405        )
406        self.assertEqual(response.code, 400)
407
408    def test_authorization_enforcement(self):
409        """
410        The requirement for secrets is enforced by the ``_authorized_route``
411        decorator; if they are not given, a 400 response code is returned.
412
413        Note that this refers to ``X-Tahoe-Authorization``, not the
414        ``Authorization`` header used for the swissnum.
415        """
416        # Without secret, get a 400 error.
417        response = result_of(
418            self.client.request(
419                "GET",
420                DecodedURL.from_text("http://127.0.0.1/upload_secret"),
421            )
422        )
423        self.assertEqual(response.code, 400)
424
425        # With secret, we're good.
426        response = result_of(
427            self.client.request(
428                "GET",
429                DecodedURL.from_text("http://127.0.0.1/upload_secret"),
430                upload_secret=b"MAGIC",
431            )
432        )
433        self.assertEqual(response.code, 200)
434        self.assertEqual(result_of(response.content()), b"GOOD SECRET")
435
436    def test_client_side_schema_validation(self):
437        """
438        The client validates returned CBOR message against a schema.
439        """
440        client = StorageClientGeneral(self.client)
441        with self.assertRaises(CDDLValidationError):
442            result_of(client.get_version())
443
444    @given(length=st.integers(min_value=1, max_value=1_000_000))
445    # On Python 3.12 we're getting weird deadline issues in CI, so disabling
446    # for now.
447    @hypothesis_settings(deadline=None)
448    def test_limited_content_fits(self, length):
449        """
450        ``http_client.limited_content()`` returns the body if it is less than
451        the max length.
452        """
453        for at_least_length in (length, length + 1, length + 1000, length + 100_000):
454            response = result_of(
455                self.client.request(
456                    "GET",
457                    DecodedURL.from_text(f"http://127.0.0.1/bytes/{length}"),
458                )
459            )
460
461            self.assertEqual(
462                result_of(
463                    limited_content(response, self._http_server.clock, at_least_length)
464                ).read(),
465                gen_bytes(length),
466            )
467
468    @given(length=st.integers(min_value=10, max_value=1_000_000))
469    def test_limited_content_does_not_fit(self, length):
470        """
471        If the body is longer than than max length,
472        ``http_client.limited_content()`` fails with a ``ValueError``.
473        """
474        for too_short in (length - 1, 5):
475            response = result_of(
476                self.client.request(
477                    "GET",
478                    DecodedURL.from_text(f"http://127.0.0.1/bytes/{length}"),
479                )
480            )
481
482            with self.assertRaises(ValueError):
483                result_of(limited_content(response, self._http_server.clock, too_short))
484
485    def test_limited_content_silence_causes_timeout(self):
486        """
487        ``http_client.limited_content() times out if it receives no data for 60
488        seconds.
489        """
490        response = result_of(
491            self.client.request(
492                "GET",
493                DecodedURL.from_text("http://127.0.0.1/slowly_never_finish_result"),
494            )
495        )
496
497        body_deferred = limited_content(response, self._http_server.clock, 4)
498        result = []
499        error = []
500        body_deferred.addCallbacks(result.append, error.append)
501
502        for i in range(59 + 59 + 60):
503            self.assertEqual((result, error), ([], []))
504            self._http_server.clock.advance(1)
505            # Push data between in-memory client and in-memory server:
506            self.client._treq._agent.flush()
507
508        # After 59 (second write) + 59 (third write) + 60 seconds (quiescent
509        # timeout) the limited_content() response times out.
510        self.assertTrue(error)
511        with self.assertRaises(CancelledError):
512            error[0].raiseException()
513
514    def test_limited_content_cancels_timeout_on_failed_response(self):
515        """
516        If the response fails somehow, the timeout is still cancelled.
517        """
518        response = result_of(
519            self.client.request(
520                "GET",
521                DecodedURL.from_text("http://127.0.0.1/die"),
522            )
523        )
524
525        d = limited_content(response, self._http_server.clock, 4)
526        with self.assertRaises(ValueError):
527            result_of(d)
528        self.assertEqual(len(self._http_server.clock.getDelayedCalls()), 0)
529
530    def test_request_with_no_content_type_same_as_cbor(self):
531        """
532        If no ``Content-Type`` header is set when sending a body, it is assumed
533        to be CBOR.
534        """
535        response = result_of(
536            self.client.request(
537                "POST",
538                DecodedURL.from_text("http://127.0.0.1/read_body"),
539                data=dumps({"reason": "test"}),
540            )
541        )
542        self.assertEqual(
543            result_of(limited_content(response, self._http_server.clock, 100)).read(),
544            b"test",
545        )
546
547    def test_request_with_wrong_content(self):
548        """
549        If a non-CBOR ``Content-Type`` header is set when sending a body, the
550        server complains appropriatly.
551        """
552        headers = Headers()
553        headers.setRawHeaders("content-type", ["some/value"])
554        response = result_of(
555            self.client.request(
556                "POST",
557                DecodedURL.from_text("http://127.0.0.1/read_body"),
558                data=dumps({"reason": "test"}),
559                headers=headers,
560            )
561        )
562        self.assertEqual(response.code, http.UNSUPPORTED_MEDIA_TYPE)
563
564
565@implementer(IReactorFromThreads)
566class Reactor(Clock):
567    """
568    Fake reactor that supports time APIs and callFromThread.
569
570    Advancing the clock also runs any callbacks scheduled via callFromThread.
571    """
572
573    def __init__(self):
574        Clock.__init__(self)
575        self._queue = Queue()
576
577    def callFromThread(self, callable, *args, **kwargs):
578        self._queue.put((callable, args, kwargs))
579
580    def advance(self, *args, **kwargs):
581        Clock.advance(self, *args, **kwargs)
582        while not self._queue.empty():
583            f, args, kwargs = self._queue.get()
584            f(*args, **kwargs)
585
586
587class HttpTestFixture(Fixture):
588    """
589    Setup HTTP tests' infrastructure, the storage server and corresponding
590    client.
591    """
592
593    def _setUp(self):
594        StorageClientFactory.start_test_mode(
595            lambda pool: self.addCleanup(pool.closeCachedConnections)
596        )
597        self.addCleanup(StorageClientFactory.stop_test_mode)
598        self.clock = Reactor()
599        self.tempdir = self.useFixture(TempDir())
600        # The global Cooperator used by Twisted (a) used by pull producers in
601        # twisted.web, (b) is driven by a real reactor. We want to push time
602        # forward ourselves since we rely on pull producers in the HTTP storage
603        # server.
604        self.mock = self.useFixture(
605            MonkeyPatch(
606                "twisted.internet.task._theCooperator",
607                Cooperator(scheduler=lambda c: self.clock.callLater(0.000001, c)),
608            )
609        )
610        self.storage_server = StorageServer(
611            self.tempdir.path, b"\x00" * 20, clock=self.clock
612        )
613        self.http_server = HTTPServer(
614            self.clock, self.storage_server, SWISSNUM_FOR_TEST
615        )
616        self.treq = StubTreq(self.http_server.get_resource())
617        self.client = StorageClient(
618            DecodedURL.from_text("http://127.0.0.1"),
619            SWISSNUM_FOR_TEST,
620            treq=self.treq,
621            pool=None,
622            clock=self.clock,
623            analyze_response=response_is_not_html,
624        )
625
626    def result_of_with_flush(self, d):
627        """
628        Like ``result_of``, but supports fake reactor and ``treq`` testing
629        infrastructure necessary to support asynchronous HTTP server endpoints.
630        """
631        d = ensureDeferred(d)
632        result = []
633        error = []
634        d.addCallbacks(result.append, error.append)
635
636        # Check for synchronous HTTP endpoint handler:
637        if result:
638            return result[0]
639        if error:
640            error[0].raiseException()
641
642        # OK, no result yet, probably async HTTP endpoint handler, so advance
643        # time, flush treq, and try again:
644        for i in range(10_000):
645            self.clock.advance(0.001)
646            self.treq.flush()
647            if result:
648                break
649            # By putting the sleep at the end, tests that are completely
650            # synchronous and don't use threads will have already broken out of
651            # the loop, and so will finish without any sleeps. This allows them
652            # to run as quickly as possible.
653            #
654            # However, some tests do talk to APIs that use a thread pool on the
655            # backend, so we need to allow actual time to pass for those.
656            time.sleep(0.001)
657
658        if result:
659            return result[0]
660        if error:
661            error[0].raiseException()
662
663        raise RuntimeError(
664            "We expected given Deferred to have result already, but it wasn't. "
665            + "This is probably a test design issue."
666        )
667
668
669class StorageClientWithHeadersOverride(object):
670    """Wrap ``StorageClient`` and override sent headers."""
671
672    def __init__(self, storage_client, add_headers):
673        self.storage_client = storage_client
674        self.add_headers = add_headers
675
676    def __getattr__(self, attr):
677        return getattr(self.storage_client, attr)
678
679    def request(self, *args, headers=None, **kwargs):
680        if headers is None:
681            headers = Headers()
682        for key, value in self.add_headers.items():
683            headers.setRawHeaders(key, [value])
684        return self.storage_client.request(*args, headers=headers, **kwargs)
685
686
687@contextmanager
688def assert_fails_with_http_code(test_case: SyncTestCase, code: int):
689    """
690    Context manager that asserts the code fails with the given HTTP response
691    code.
692    """
693    with test_case.assertRaises(ClientException) as e:
694        try:
695            yield
696        finally:
697            pass
698    test_case.assertEqual(e.exception.code, code)
699
700
701class GenericHTTPAPITests(SyncTestCase):
702    """
703    Tests of HTTP client talking to the HTTP server, for generic HTTP API
704    endpoints and concerns.
705    """
706
707    def setUp(self):
708        super(GenericHTTPAPITests, self).setUp()
709        disable_thread_pool_for_test(self)
710        self.http = self.useFixture(HttpTestFixture())
711
712    def test_missing_authentication(self) -> None:
713        """
714        If nothing is given in the ``Authorization`` header at all an
715        ``Unauthorized`` response is returned.
716        """
717        client = StubTreq(self.http.http_server.get_resource())
718        response = self.http.result_of_with_flush(
719            client.request(
720                "GET",
721                "http://127.0.0.1/storage/v1/version",
722            ),
723        )
724        self.assertThat(response.code, Equals(http.UNAUTHORIZED))
725
726    def test_bad_authentication(self):
727        """
728        If the wrong swissnum is used, an ``Unauthorized`` response code is
729        returned.
730        """
731        client = StorageClientGeneral(
732            StorageClient(
733                DecodedURL.from_text("http://127.0.0.1"),
734                b"something wrong",
735                treq=StubTreq(self.http.http_server.get_resource()),
736                pool=None,
737                clock=self.http.clock,
738                analyze_response=response_is_not_html,
739            )
740        )
741        with assert_fails_with_http_code(self, http.UNAUTHORIZED):
742            self.http.result_of_with_flush(client.get_version())
743
744    def test_unsupported_mime_type(self):
745        """
746        The client can request mime types other than CBOR, and if they are
747        unsupported a NOT ACCEPTABLE (406) error will be returned.
748        """
749        client = StorageClientGeneral(
750            StorageClientWithHeadersOverride(self.http.client, {"accept": "image/gif"})
751        )
752        with assert_fails_with_http_code(self, http.NOT_ACCEPTABLE):
753            self.http.result_of_with_flush(client.get_version())
754
755    def test_version(self):
756        """
757        The client can return the version.
758
759        We ignore available disk space and max immutable share size, since that
760        might change across calls.
761        """
762        client = StorageClientGeneral(self.http.client)
763        version = self.http.result_of_with_flush(client.get_version())
764        version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
765            b"available-space"
766        )
767        version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
768            b"maximum-immutable-share-size"
769        )
770        expected_version = self.http.storage_server.get_version()
771        expected_version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
772            b"available-space"
773        )
774        expected_version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
775            b"maximum-immutable-share-size"
776        )
777        self.assertEqual(version, expected_version)
778
779    def test_server_side_schema_validation(self):
780        """
781        Ensure that schema validation is happening: invalid CBOR should result
782        in bad request response code (error 400).
783
784        We don't bother checking every single request, the API on the
785        server-side is designed to require a schema, so it validates
786        everywhere.  But we check at least one to ensure we get correct
787        response code on bad input, so we know validation happened.
788        """
789        upload_secret = urandom(32)
790        lease_secret = urandom(32)
791        storage_index = urandom(16)
792        url = self.http.client.relative_url(
793            "/storage/v1/immutable/" + _encode_si(storage_index)
794        )
795        message = {"bad-message": "missing expected keys"}
796
797        response = self.http.result_of_with_flush(
798            self.http.client.request(
799                "POST",
800                url,
801                lease_renew_secret=lease_secret,
802                lease_cancel_secret=lease_secret,
803                upload_secret=upload_secret,
804                message_to_serialize=message,
805            )
806        )
807        self.assertEqual(response.code, http.BAD_REQUEST)
808
809
810class ImmutableHTTPAPITests(SyncTestCase):
811    """
812    Tests for immutable upload/download APIs.
813    """
814
815    def setUp(self):
816        super(ImmutableHTTPAPITests, self).setUp()
817        disable_thread_pool_for_test(self)
818        self.http = self.useFixture(HttpTestFixture())
819        self.imm_client = StorageClientImmutables(self.http.client)
820        self.general_client = StorageClientGeneral(self.http.client)
821
822    def create_upload(self, share_numbers, length):
823        """
824        Create a write bucket on server, return:
825
826            (upload_secret, lease_secret, storage_index, result)
827        """
828        upload_secret = urandom(32)
829        lease_secret = urandom(32)
830        storage_index = urandom(16)
831        created = self.http.result_of_with_flush(
832            self.imm_client.create(
833                storage_index,
834                share_numbers,
835                length,
836                upload_secret,
837                lease_secret,
838                lease_secret,
839            )
840        )
841        return (upload_secret, lease_secret, storage_index, created)
842
843    def test_upload_can_be_downloaded(self):
844        """
845        A single share can be uploaded in (possibly overlapping) chunks, and
846        then a random chunk can be downloaded, and it will match the original
847        file.
848
849        We don't exercise the full variation of overlapping chunks because
850        that's already done in test_storage.py.
851        """
852        length = 100
853        expected_data = bytes(range(100))
854
855        # Create a upload:
856        (upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
857        self.assertEqual(
858            created, ImmutableCreateResult(already_have=set(), allocated={1})
859        )
860
861        remaining = RangeMap()
862        remaining.set(True, 0, 100)
863
864        # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
865        def write(offset, length):
866            remaining.empty(offset, offset + length)
867            return self.imm_client.write_share_chunk(
868                storage_index,
869                1,
870                upload_secret,
871                offset,
872                expected_data[offset : offset + length],
873            )
874
875        upload_progress = self.http.result_of_with_flush(write(10, 10))
876        self.assertEqual(
877            upload_progress, UploadProgress(finished=False, required=remaining)
878        )
879        upload_progress = self.http.result_of_with_flush(write(30, 10))
880        self.assertEqual(
881            upload_progress, UploadProgress(finished=False, required=remaining)
882        )
883        upload_progress = self.http.result_of_with_flush(write(50, 10))
884        self.assertEqual(
885            upload_progress, UploadProgress(finished=False, required=remaining)
886        )
887
888        # Then, an overlapping write with matching data (15-35):
889        upload_progress = self.http.result_of_with_flush(write(15, 20))
890        self.assertEqual(
891            upload_progress, UploadProgress(finished=False, required=remaining)
892        )
893
894        # Now fill in the holes:
895        upload_progress = self.http.result_of_with_flush(write(0, 10))
896        self.assertEqual(
897            upload_progress, UploadProgress(finished=False, required=remaining)
898        )
899        upload_progress = self.http.result_of_with_flush(write(40, 10))
900        self.assertEqual(
901            upload_progress, UploadProgress(finished=False, required=remaining)
902        )
903        upload_progress = self.http.result_of_with_flush(write(60, 40))
904        self.assertEqual(
905            upload_progress, UploadProgress(finished=True, required=RangeMap())
906        )
907
908        # We can now read:
909        for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]:
910            downloaded = self.http.result_of_with_flush(
911                self.imm_client.read_share_chunk(storage_index, 1, offset, length)
912            )
913            self.assertEqual(downloaded, expected_data[offset : offset + length])
914
915    def test_write_with_wrong_upload_key(self):
916        """
917        A write with an upload key that is different than the original upload
918        key will fail.
919        """
920        (upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
921        with assert_fails_with_http_code(self, http.UNAUTHORIZED):
922            self.http.result_of_with_flush(
923                self.imm_client.write_share_chunk(
924                    storage_index,
925                    1,
926                    upload_secret + b"X",
927                    0,
928                    b"123",
929                )
930            )
931
932    def test_allocate_buckets_second_time_different_shares(self):
933        """
934        If allocate buckets endpoint is called second time with different
935        upload key on potentially different shares, that creates the buckets on
936        those shares that are different.
937        """
938        # Create a upload:
939        (upload_secret, lease_secret, storage_index, created) = self.create_upload(
940            {1, 2, 3}, 100
941        )
942
943        # Write half of share 1
944        self.http.result_of_with_flush(
945            self.imm_client.write_share_chunk(
946                storage_index,
947                1,
948                upload_secret,
949                0,
950                b"a" * 50,
951            )
952        )
953
954        # Add same shares with a different upload key share 1 overlaps with
955        # existing shares, this call shouldn't overwrite the existing
956        # work-in-progress.
957        upload_secret2 = b"x" * 2
958        created2 = self.http.result_of_with_flush(
959            self.imm_client.create(
960                storage_index,
961                {1, 4, 6},
962                100,
963                upload_secret2,
964                lease_secret,
965                lease_secret,
966            )
967        )
968        self.assertEqual(created2.allocated, {4, 6})
969
970        # Write second half of share 1
971        self.assertTrue(
972            self.http.result_of_with_flush(
973                self.imm_client.write_share_chunk(
974                    storage_index,
975                    1,
976                    upload_secret,
977                    50,
978                    b"b" * 50,
979                )
980            ).finished
981        )
982
983        # The upload of share 1 succeeded, demonstrating that second create()
984        # call didn't overwrite work-in-progress.
985        downloaded = self.http.result_of_with_flush(
986            self.imm_client.read_share_chunk(storage_index, 1, 0, 100)
987        )
988        self.assertEqual(downloaded, b"a" * 50 + b"b" * 50)
989
990        # We can successfully upload the shares created with the second upload secret.
991        self.assertTrue(
992            self.http.result_of_with_flush(
993                self.imm_client.write_share_chunk(
994                    storage_index,
995                    4,
996                    upload_secret2,
997                    0,
998                    b"x" * 100,
999                )
1000            ).finished
1001        )
1002
1003    def test_list_shares(self):
1004        """
1005        Once a share is finished uploading, it's possible to list it.
1006        """
1007        (upload_secret, _, storage_index, created) = self.create_upload({1, 2, 3}, 10)
1008
1009        # Initially there are no shares:
1010        self.assertEqual(
1011            self.http.result_of_with_flush(self.imm_client.list_shares(storage_index)),
1012            set(),
1013        )
1014
1015        # Upload shares 1 and 3:
1016        for share_number in [1, 3]:
1017            progress = self.http.result_of_with_flush(
1018                self.imm_client.write_share_chunk(
1019                    storage_index,
1020                    share_number,
1021                    upload_secret,
1022                    0,
1023                    b"0123456789",
1024                )
1025            )
1026            self.assertTrue(progress.finished)
1027
1028        # Now shares 1 and 3 exist:
1029        self.assertEqual(
1030            self.http.result_of_with_flush(self.imm_client.list_shares(storage_index)),
1031            {1, 3},
1032        )
1033
1034    def test_upload_bad_content_range(self):
1035        """
1036        Malformed or invalid Content-Range headers to the immutable upload
1037        endpoint result in a 416 error.
1038        """
1039        (upload_secret, _, storage_index, created) = self.create_upload({1}, 10)
1040
1041        def check_invalid(bad_content_range_value):
1042            client = StorageClientImmutables(
1043                StorageClientWithHeadersOverride(
1044                    self.http.client, {"content-range": bad_content_range_value}
1045                )
1046            )
1047            with assert_fails_with_http_code(
1048                self, http.REQUESTED_RANGE_NOT_SATISFIABLE
1049            ):
1050                self.http.result_of_with_flush(
1051                    client.write_share_chunk(
1052                        storage_index,
1053                        1,
1054                        upload_secret,
1055                        0,
1056                        b"0123456789",
1057                    )
1058                )
1059
1060        check_invalid("not a valid content-range header at all")
1061        check_invalid("bytes -1-9/10")
1062        check_invalid("bytes 0--9/10")
1063        check_invalid("teapots 0-9/10")
1064
1065    def test_list_shares_unknown_storage_index(self):
1066        """
1067        Listing unknown storage index's shares results in empty list of shares.
1068        """
1069        storage_index = bytes(range(16))
1070        self.assertEqual(
1071            self.http.result_of_with_flush(self.imm_client.list_shares(storage_index)),
1072            set(),
1073        )
1074
1075    def test_upload_non_existent_storage_index(self):
1076        """
1077        Uploading to a non-existent storage index or share number results in
1078        404.
1079        """
1080        (upload_secret, _, storage_index, _) = self.create_upload({1}, 10)
1081
1082        def unknown_check(storage_index, share_number):
1083            with assert_fails_with_http_code(self, http.NOT_FOUND):
1084                self.http.result_of_with_flush(
1085                    self.imm_client.write_share_chunk(
1086                        storage_index,
1087                        share_number,
1088                        upload_secret,
1089                        0,
1090                        b"0123456789",
1091                    )
1092                )
1093
1094        # Wrong share number:
1095        unknown_check(storage_index, 7)
1096        # Wrong storage index:
1097        unknown_check(b"X" * 16, 7)
1098
1099    def test_multiple_shares_uploaded_to_different_place(self):
1100        """
1101        If a storage index has multiple shares, uploads to different shares are
1102        stored separately and can be downloaded separately.
1103        """
1104        (upload_secret, _, storage_index, _) = self.create_upload({1, 2}, 10)
1105        self.http.result_of_with_flush(
1106            self.imm_client.write_share_chunk(
1107                storage_index,
1108                1,
1109                upload_secret,
1110                0,
1111                b"1" * 10,
1112            )
1113        )
1114        self.http.result_of_with_flush(
1115            self.imm_client.write_share_chunk(
1116                storage_index,
1117                2,
1118                upload_secret,
1119                0,
1120                b"2" * 10,
1121            )
1122        )
1123        self.assertEqual(
1124            self.http.result_of_with_flush(
1125                self.imm_client.read_share_chunk(storage_index, 1, 0, 10)
1126            ),
1127            b"1" * 10,
1128        )
1129        self.assertEqual(
1130            self.http.result_of_with_flush(
1131                self.imm_client.read_share_chunk(storage_index, 2, 0, 10)
1132            ),
1133            b"2" * 10,
1134        )
1135
1136    def test_mismatching_upload_fails(self):
1137        """
1138        If an uploaded chunk conflicts with an already uploaded chunk, a
1139        CONFLICT error is returned.
1140        """
1141        (upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
1142
1143        # Write:
1144        self.http.result_of_with_flush(
1145            self.imm_client.write_share_chunk(
1146                storage_index,
1147                1,
1148                upload_secret,
1149                0,
1150                b"0" * 10,
1151            )
1152        )
1153
1154        # Conflicting write:
1155        with assert_fails_with_http_code(self, http.CONFLICT):
1156            self.http.result_of_with_flush(
1157                self.imm_client.write_share_chunk(
1158                    storage_index,
1159                    1,
1160                    upload_secret,
1161                    0,
1162                    b"0123456789",
1163                )
1164            )
1165
1166    def test_timed_out_upload_allows_reupload(self):
1167        """
1168        If an in-progress upload times out, it is cancelled altogether,
1169        allowing a new upload to occur.
1170        """
1171        self._test_abort_or_timed_out_upload_to_existing_storage_index(
1172            lambda **kwargs: self.http.clock.advance(30 * 60 + 1)
1173        )
1174
1175    def test_abort_upload_allows_reupload(self):
1176        """
1177        If an in-progress upload is aborted, it is cancelled altogether,
1178        allowing a new upload to occur.
1179        """
1180
1181        def abort(storage_index, share_number, upload_secret):
1182            return self.http.result_of_with_flush(
1183                self.imm_client.abort_upload(storage_index, share_number, upload_secret)
1184            )
1185
1186        self._test_abort_or_timed_out_upload_to_existing_storage_index(abort)
1187
1188    def _test_abort_or_timed_out_upload_to_existing_storage_index(self, cancel_upload):
1189        """Start uploading to an existing storage index that then times out or aborts.
1190
1191        Re-uploading should work.
1192        """
1193        # Start an upload:
1194        (upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
1195        self.http.result_of_with_flush(
1196            self.imm_client.write_share_chunk(
1197                storage_index,
1198                1,
1199                upload_secret,
1200                0,
1201                b"123",
1202            )
1203        )
1204
1205        # Now, the upload is cancelled somehow:
1206        cancel_upload(
1207            storage_index=storage_index, upload_secret=upload_secret, share_number=1
1208        )
1209
1210        # Now we can create a new share with the same storage index without
1211        # complaint:
1212        upload_secret = urandom(32)
1213        lease_secret = urandom(32)
1214        created = self.http.result_of_with_flush(
1215            self.imm_client.create(
1216                storage_index,
1217                {1},
1218                100,
1219                upload_secret,
1220                lease_secret,
1221                lease_secret,
1222            )
1223        )
1224        self.assertEqual(created.allocated, {1})
1225
1226        # And write to it, too:
1227        self.http.result_of_with_flush(
1228            self.imm_client.write_share_chunk(
1229                storage_index,
1230                1,
1231                upload_secret,
1232                0,
1233                b"ABC",
1234            )
1235        )
1236
1237    def test_unknown_aborts(self):
1238        """
1239        Aborting uploads with an unknown storage index or share number will
1240        result 404 HTTP response code.
1241        """
1242        (upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
1243
1244        for si, num in [(storage_index, 3), (b"x" * 16, 1)]:
1245            with assert_fails_with_http_code(self, http.NOT_FOUND):
1246                self.http.result_of_with_flush(
1247                    self.imm_client.abort_upload(si, num, upload_secret)
1248                )
1249
1250    def test_unauthorized_abort(self):
1251        """
1252        An abort with the wrong key will return an unauthorized error, and will
1253        not abort the upload.
1254        """
1255        (upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
1256
1257        # Failed to abort becaues wrong upload secret:
1258        with assert_fails_with_http_code(self, http.UNAUTHORIZED):
1259            self.http.result_of_with_flush(
1260                self.imm_client.abort_upload(storage_index, 1, upload_secret + b"X")
1261            )
1262
1263        # We can still write to it:
1264        self.http.result_of_with_flush(
1265            self.imm_client.write_share_chunk(
1266                storage_index,
1267                1,
1268                upload_secret,
1269                0,
1270                b"ABC",
1271            )
1272        )
1273
1274    def test_too_late_abort(self):
1275        """
1276        An abort of an already-fully-uploaded immutable will result in 405
1277        error and will not affect the immutable.
1278        """
1279        uploaded_data = b"123"
1280        (upload_secret, _, storage_index, _) = self.create_upload({0}, 3)
1281        self.http.result_of_with_flush(
1282            self.imm_client.write_share_chunk(
1283                storage_index,
1284                0,
1285                upload_secret,
1286                0,
1287                uploaded_data,
1288            )
1289        )
1290
1291        # Can't abort, we finished upload:
1292        with assert_fails_with_http_code(self, http.NOT_ALLOWED):
1293            self.http.result_of_with_flush(
1294                self.imm_client.abort_upload(storage_index, 0, upload_secret)
1295            )
1296
1297        # Abort didn't prevent reading:
1298        self.assertEqual(
1299            uploaded_data,
1300            self.http.result_of_with_flush(
1301                self.imm_client.read_share_chunk(
1302                    storage_index,
1303                    0,
1304                    0,
1305                    3,
1306                )
1307            ),
1308        )
1309
1310    def test_lease_on_unknown_storage_index(self):
1311        """
1312        An attempt to renew an unknown storage index will result in a HTTP 404.
1313        """
1314        storage_index = urandom(16)
1315        secret = b"A" * 32
1316        with assert_fails_with_http_code(self, http.NOT_FOUND):
1317            self.http.result_of_with_flush(
1318                self.general_client.add_or_renew_lease(storage_index, secret, secret)
1319            )
1320
1321
1322class MutableHTTPAPIsTests(SyncTestCase):
1323    """Tests for mutable APIs."""
1324
1325    def setUp(self):
1326        super(MutableHTTPAPIsTests, self).setUp()
1327        disable_thread_pool_for_test(self)
1328        self.http = self.useFixture(HttpTestFixture())
1329        self.mut_client = StorageClientMutables(self.http.client)
1330
1331    def create_upload(self, data=b"abcdef"):
1332        """
1333        Utility that creates shares 0 and 1 with bodies
1334        ``{data}-{share_number}``.
1335        """
1336        write_secret = urandom(32)
1337        lease_secret = urandom(32)
1338        storage_index = urandom(16)
1339        self.http.result_of_with_flush(
1340            self.mut_client.read_test_write_chunks(
1341                storage_index,
1342                write_secret,
1343                lease_secret,
1344                lease_secret,
1345                {
1346                    0: TestWriteVectors(
1347                        write_vectors=[WriteVector(offset=0, data=data + b"-0")]
1348                    ),
1349                    1: TestWriteVectors(
1350                        write_vectors=[
1351                            WriteVector(offset=0, data=data),
1352                            WriteVector(offset=len(data), data=b"-1"),
1353                        ]
1354                    ),
1355                },
1356                [],
1357            )
1358        )
1359        return storage_index, write_secret, lease_secret
1360
1361    def test_write_can_be_read_small_data(self):
1362        """
1363        Small written data can be read using ``read_share_chunk``.
1364        """
1365        self.write_can_be_read(b"abcdef")
1366
1367    def test_write_can_be_read_large_data(self):
1368        """
1369        Large written data (50MB) can be read using ``read_share_chunk``.
1370        """
1371        self.write_can_be_read(b"abcdefghij" * 5 * 1024 * 1024)
1372
1373    def write_can_be_read(self, data):
1374        """
1375        Written data can be read using ``read_share_chunk``.
1376        """
1377        lease_secret = urandom(32)
1378        storage_index = urandom(16)
1379        self.http.result_of_with_flush(
1380            self.mut_client.read_test_write_chunks(
1381                storage_index,
1382                urandom(32),
1383                lease_secret,
1384                lease_secret,
1385                {
1386                    0: TestWriteVectors(
1387                        write_vectors=[WriteVector(offset=0, data=data)]
1388                    ),
1389                },
1390                [],
1391            )
1392        )
1393        read_data = self.http.result_of_with_flush(
1394            self.mut_client.read_share_chunk(storage_index, 0, 0, len(data))
1395        )
1396        self.assertEqual(read_data, data)
1397
1398    def test_read_before_write(self):
1399        """In combo read/test/write operation, reads happen before writes."""
1400        storage_index, write_secret, lease_secret = self.create_upload()
1401        result = self.http.result_of_with_flush(
1402            self.mut_client.read_test_write_chunks(
1403                storage_index,
1404                write_secret,
1405                lease_secret,
1406                lease_secret,
1407                {
1408                    0: TestWriteVectors(
1409                        write_vectors=[WriteVector(offset=1, data=b"XYZ")]
1410                    ),
1411                },
1412                [ReadVector(0, 8)],
1413            )
1414        )
1415        # Reads are from before the write:
1416        self.assertEqual(
1417            result,
1418            ReadTestWriteResult(
1419                success=True, reads={0: [b"abcdef-0"], 1: [b"abcdef-1"]}
1420            ),
1421        )
1422        # But the write did happen:
1423        data0 = self.http.result_of_with_flush(
1424            self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
1425        )
1426        data1 = self.http.result_of_with_flush(
1427            self.mut_client.read_share_chunk(storage_index, 1, 0, 8)
1428        )
1429        self.assertEqual((data0, data1), (b"aXYZef-0", b"abcdef-1"))
1430
1431    def test_conditional_write(self):
1432        """Uploads only happen if the test passes."""
1433        storage_index, write_secret, lease_secret = self.create_upload()
1434        result_failed = self.http.result_of_with_flush(
1435            self.mut_client.read_test_write_chunks(
1436                storage_index,
1437                write_secret,
1438                lease_secret,
1439                lease_secret,
1440                {
1441                    0: TestWriteVectors(
1442                        test_vectors=[TestVector(1, 4, b"FAIL")],
1443                        write_vectors=[WriteVector(offset=1, data=b"XYZ")],
1444                    ),
1445                },
1446                [],
1447            )
1448        )
1449        self.assertFalse(result_failed.success)
1450
1451        # This time the test matches:
1452        result = self.http.result_of_with_flush(
1453            self.mut_client.read_test_write_chunks(
1454                storage_index,
1455                write_secret,
1456                lease_secret,
1457                lease_secret,
1458                {
1459                    0: TestWriteVectors(
1460                        test_vectors=[TestVector(1, 4, b"bcde")],
1461                        write_vectors=[WriteVector(offset=1, data=b"XYZ")],
1462                    ),
1463                },
1464                [ReadVector(0, 8)],
1465            )
1466        )
1467        self.assertTrue(result.success)
1468        self.assertEqual(
1469            self.http.result_of_with_flush(
1470                self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
1471            ),
1472            b"aXYZef-0",
1473        )
1474
1475    def test_list_shares(self):
1476        """``list_shares()`` returns the shares for a given storage index."""
1477        storage_index, _, _ = self.create_upload()
1478        self.assertEqual(
1479            self.http.result_of_with_flush(self.mut_client.list_shares(storage_index)),
1480            {0, 1},
1481        )
1482
1483    def test_non_existent_list_shares(self):
1484        """A non-existent storage index errors when shares are listed."""
1485        with self.assertRaises(ClientException) as exc:
1486            self.http.result_of_with_flush(self.mut_client.list_shares(urandom(32)))
1487        self.assertEqual(exc.exception.code, http.NOT_FOUND)
1488
1489    def test_wrong_write_enabler(self):
1490        """Writes with the wrong write enabler fail, and are not processed."""
1491        storage_index, write_secret, lease_secret = self.create_upload()
1492        with self.assertRaises(ClientException) as exc:
1493            self.http.result_of_with_flush(
1494                self.mut_client.read_test_write_chunks(
1495                    storage_index,
1496                    urandom(32),
1497                    lease_secret,
1498                    lease_secret,
1499                    {
1500                        0: TestWriteVectors(
1501                            write_vectors=[WriteVector(offset=1, data=b"XYZ")]
1502                        ),
1503                    },
1504                    [ReadVector(0, 8)],
1505                )
1506            )
1507        self.assertEqual(exc.exception.code, http.UNAUTHORIZED)
1508
1509        # The write did not happen:
1510        self.assertEqual(
1511            self.http.result_of_with_flush(
1512                self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
1513            ),
1514            b"abcdef-0",
1515        )
1516
1517
1518class SharedImmutableMutableTestsMixin:
1519    """
1520    Shared tests for mutables and immutables where the API is the same.
1521    """
1522
1523    KIND: str  # either "mutable" or "immutable"
1524    general_client: StorageClientGeneral
1525    client: Union[StorageClientImmutables, StorageClientMutables]
1526    clientFactory: Callable[
1527        [StorageClient], Union[StorageClientImmutables, StorageClientMutables]
1528    ]
1529
1530    def upload(self, share_number: int, data_length=26) -> Tuple[bytes, bytes, bytes]:
1531        """
1532        Create a share, return (storage_index, uploaded_data, lease secret).
1533        """
1534        raise NotImplementedError
1535
1536    def get_leases(self, storage_index: bytes) -> Iterable[LeaseInfo]:
1537        """Get leases for the storage index."""
1538        raise NotImplementedError()
1539
1540    def test_advise_corrupt_share(self):
1541        """
1542        Advising share was corrupted succeeds from HTTP client's perspective,
1543        and calls appropriate method on server.
1544        """
1545        corrupted = []
1546        self.http.storage_server.advise_corrupt_share = lambda *args: corrupted.append(
1547            args
1548        )
1549
1550        storage_index, _, _ = self.upload(13)
1551        reason = "OHNO \u1235"
1552        self.http.result_of_with_flush(
1553            self.client.advise_corrupt_share(storage_index, 13, reason)
1554        )
1555
1556        self.assertEqual(
1557            corrupted,
1558            [(self.KIND.encode("ascii"), storage_index, 13, reason.encode("utf-8"))],
1559        )
1560
1561    def test_advise_corrupt_share_unknown(self):
1562        """
1563        Advising an unknown share was corrupted results in 404.
1564        """
1565        storage_index, _, _ = self.upload(13)
1566        reason = "OHNO \u1235"
1567        self.http.result_of_with_flush(
1568            self.client.advise_corrupt_share(storage_index, 13, reason)
1569        )
1570
1571        for si, share_number in [(storage_index, 11), (urandom(16), 13)]:
1572            with assert_fails_with_http_code(self, http.NOT_FOUND):
1573                self.http.result_of_with_flush(
1574                    self.client.advise_corrupt_share(si, share_number, reason)
1575                )
1576
1577    def test_lease_renew_and_add(self):
1578        """
1579        It's possible the renew the lease on an uploaded mutable/immutable, by
1580        using the same renewal secret, or add a new lease by choosing a
1581        different renewal secret.
1582        """
1583        # Create a storage index:
1584        storage_index, _, lease_secret = self.upload(0)
1585
1586        [lease] = self.get_leases(storage_index)
1587        initial_expiration_time = lease.get_expiration_time()
1588
1589        # Time passes:
1590        self.http.clock.advance(167)
1591
1592        # We renew the lease:
1593        self.http.result_of_with_flush(
1594            self.general_client.add_or_renew_lease(
1595                storage_index, lease_secret, lease_secret
1596            )
1597        )
1598
1599        # More time passes:
1600        self.http.clock.advance(10)
1601
1602        # We create a new lease:
1603        lease_secret2 = urandom(32)
1604        self.http.result_of_with_flush(
1605            self.general_client.add_or_renew_lease(
1606                storage_index, lease_secret2, lease_secret2
1607            )
1608        )
1609
1610        [lease1, lease2] = self.get_leases(storage_index)
1611        self.assertEqual(lease1.get_expiration_time(), initial_expiration_time + 167)
1612        self.assertEqual(lease2.get_expiration_time(), initial_expiration_time + 177)
1613
1614    def test_read_of_wrong_storage_index_fails(self):
1615        """
1616        Reading from unknown storage index results in 404.
1617        """
1618        with assert_fails_with_http_code(self, http.NOT_FOUND):
1619            self.http.result_of_with_flush(
1620                self.client.read_share_chunk(
1621                    b"1" * 16,
1622                    1,
1623                    0,
1624                    10,
1625                )
1626            )
1627
1628    def test_read_of_wrong_share_number_fails(self):
1629        """
1630        Reading from unknown storage index results in 404.
1631        """
1632        storage_index, _, _ = self.upload(1)
1633        with assert_fails_with_http_code(self, http.NOT_FOUND):
1634            self.http.result_of_with_flush(
1635                self.client.read_share_chunk(
1636                    storage_index,
1637                    7,  # different share number
1638                    0,
1639                    10,
1640                )
1641            )
1642
1643    def test_read_with_negative_offset_fails(self):
1644        """
1645        Malformed or unsupported Range headers result in 416 (requested range
1646        not satisfiable) error.
1647        """
1648        storage_index, _, _ = self.upload(1)
1649
1650        def check_bad_range(bad_range_value):
1651            client = self.clientFactory(
1652                StorageClientWithHeadersOverride(
1653                    self.http.client, {"range": bad_range_value}
1654                )
1655            )
1656
1657            with assert_fails_with_http_code(
1658                self, http.REQUESTED_RANGE_NOT_SATISFIABLE
1659            ):
1660                self.http.result_of_with_flush(
1661                    client.read_share_chunk(
1662                        storage_index,
1663                        1,
1664                        0,
1665                        10,
1666                    )
1667                )
1668
1669        # Bad unit
1670        check_bad_range("molluscs=0-9")
1671        # Negative offsets
1672        check_bad_range("bytes=-2-9")
1673        check_bad_range("bytes=0--10")
1674        # Negative offset no endpoint
1675        check_bad_range("bytes=-300-")
1676        check_bad_range("bytes=")
1677        # Multiple ranges are currently unsupported, even if they're
1678        # semantically valid under HTTP:
1679        check_bad_range("bytes=0-5, 6-7")
1680        # Ranges without an end are currently unsupported, even if they're
1681        # semantically valid under HTTP.
1682        check_bad_range("bytes=0-")
1683
1684    def _read_with_no_range_test(self, data_length):
1685        """
1686        A read with no range returns the whole mutable/immutable.
1687
1688        Actual test is defined in subclasses, to fix complaints from Hypothesis
1689        about the method having different executors.
1690        """
1691        storage_index, uploaded_data, _ = self.upload(1, data_length)
1692        response = self.http.result_of_with_flush(
1693            self.http.client.request(
1694                "GET",
1695                self.http.client.relative_url(
1696                    "/storage/v1/{}/{}/1".format(self.KIND, _encode_si(storage_index))
1697                ),
1698            )
1699        )
1700        self.assertEqual(response.code, http.OK)
1701        self.assertEqual(
1702            self.http.result_of_with_flush(response.content()), uploaded_data
1703        )
1704
1705    def test_validate_content_range_response_to_read(self):
1706        """
1707        The server responds to ranged reads with an appropriate Content-Range
1708        header.
1709        """
1710        storage_index, _, _ = self.upload(1, 26)
1711
1712        def check_range(requested_range, expected_response):
1713            headers = Headers()
1714            headers.setRawHeaders("range", [requested_range])
1715            response = self.http.result_of_with_flush(
1716                self.http.client.request(
1717                    "GET",
1718                    self.http.client.relative_url(
1719                        "/storage/v1/{}/{}/1".format(
1720                            self.KIND, _encode_si(storage_index)
1721                        )
1722                    ),
1723                    headers=headers,
1724                )
1725            )
1726            self.assertEqual(
1727                response.headers.getRawHeaders("content-range"), [expected_response]
1728            )
1729
1730        check_range("bytes=0-10", "bytes 0-10/*")
1731        check_range("bytes=3-17", "bytes 3-17/*")
1732        # TODO re-enable in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3907
1733        # Can't go beyond the end of the mutable/immutable!
1734        # check_range("bytes=10-100", "bytes 10-25/*")
1735
1736
1737class ImmutableSharedTests(SharedImmutableMutableTestsMixin, SyncTestCase):
1738    """Shared tests, running on immutables."""
1739
1740    KIND = "immutable"
1741    clientFactory = StorageClientImmutables
1742
1743    def setUp(self):
1744        super(ImmutableSharedTests, self).setUp()
1745        disable_thread_pool_for_test(self)
1746        self.http = self.useFixture(HttpTestFixture())
1747        self.client = self.clientFactory(self.http.client)
1748        self.general_client = StorageClientGeneral(self.http.client)
1749
1750    def upload(self, share_number, data_length=26):
1751        """
1752        Create a share, return (storage_index, uploaded_data, lease_secret).
1753        """
1754        uploaded_data = (b"abcdefghijklmnopqrstuvwxyz" * ((data_length // 26) + 1))[
1755            :data_length
1756        ]
1757        upload_secret = urandom(32)
1758        lease_secret = urandom(32)
1759        storage_index = urandom(16)
1760        self.http.result_of_with_flush(
1761            self.client.create(
1762                storage_index,
1763                {share_number},
1764                data_length,
1765                upload_secret,
1766                lease_secret,
1767                lease_secret,
1768            )
1769        )
1770        self.http.result_of_with_flush(
1771            self.client.write_share_chunk(
1772                storage_index,
1773                share_number,
1774                upload_secret,
1775                0,
1776                uploaded_data,
1777            )
1778        )
1779        return storage_index, uploaded_data, lease_secret
1780
1781    def get_leases(self, storage_index):
1782        return self.http.storage_server.get_leases(storage_index)
1783
1784    @given(data_length=st.integers(min_value=1, max_value=300000))
1785    def test_read_with_no_range(self, data_length):
1786        """
1787        A read with no range returns the whole immutable.
1788        """
1789        return self._read_with_no_range_test(data_length)
1790
1791
1792class MutableSharedTests(SharedImmutableMutableTestsMixin, SyncTestCase):
1793    """Shared tests, running on mutables."""
1794
1795    KIND = "mutable"
1796    clientFactory = StorageClientMutables
1797
1798    def setUp(self):
1799        super(MutableSharedTests, self).setUp()
1800        disable_thread_pool_for_test(self)
1801        self.http = self.useFixture(HttpTestFixture())
1802        self.client = self.clientFactory(self.http.client)
1803        self.general_client = StorageClientGeneral(self.http.client)
1804
1805    def upload(self, share_number, data_length=26):
1806        """
1807        Create a share, return (storage_index, uploaded_data, lease_secret).
1808        """
1809        data = (b"abcdefghijklmnopqrstuvwxyz" * ((data_length // 26) + 1))[:data_length]
1810        write_secret = urandom(32)
1811        lease_secret = urandom(32)
1812        storage_index = urandom(16)
1813        self.http.result_of_with_flush(
1814            self.client.read_test_write_chunks(
1815                storage_index,
1816                write_secret,
1817                lease_secret,
1818                lease_secret,
1819                {
1820                    share_number: TestWriteVectors(
1821                        write_vectors=[WriteVector(offset=0, data=data)]
1822                    ),
1823                },
1824                [],
1825            )
1826        )
1827        return storage_index, data, lease_secret
1828
1829    def get_leases(self, storage_index):
1830        return self.http.storage_server.get_slot_leases(storage_index)
1831
1832    @given(data_length=st.integers(min_value=1, max_value=300000))
1833    def test_read_with_no_range(self, data_length):
1834        """
1835        A read with no range returns the whole mutable.
1836        """
1837        return self._read_with_no_range_test(data_length)
Note: See TracBrowser for help on using the repository browser.