source: trunk/src/allmydata/test/web/test_webish.py

Last change on this file was eef52fa, checked in by Jean-Paul Calderone <exarkun@…>, at 2023-07-11T20:32:33Z

remove unused imports

  • Property mode set to 100644
File size: 9.8 KB
Line 
1"""
2Tests for ``allmydata.webish``.
3"""
4
5import tempfile
6from uuid import (
7    uuid4,
8)
9from io import (
10    BytesIO,
11)
12
13from hypothesis import (
14    given,
15)
16from hypothesis.strategies import (
17    integers,
18)
19
20from testtools.matchers import (
21    AfterPreprocessing,
22    Contains,
23    Equals,
24    MatchesAll,
25    Not,
26    IsInstance,
27    HasLength,
28)
29
30from twisted.python.filepath import (
31    FilePath,
32)
33from twisted.web.test.requesthelper import (
34    DummyChannel,
35)
36from twisted.web.resource import (
37    Resource,
38)
39
40from ..common import (
41    SyncTestCase,
42)
43
44from ...webish import (
45    TahoeLAFSRequest,
46    TahoeLAFSSite,
47    anonymous_tempfile_factory,
48)
49
50
51class TahoeLAFSRequestTests(SyncTestCase):
52    """
53    Tests for ``TahoeLAFSRequest``.
54    """
55    def _fields_test(self, method, request_headers, request_body, match_fields):
56        channel = DummyChannel()
57        request = TahoeLAFSRequest(
58            channel,
59        )
60        for (k, v) in request_headers.items():
61            request.requestHeaders.setRawHeaders(k, [v])
62        request.gotLength(len(request_body))
63        request.handleContentChunk(request_body)
64        request.requestReceived(method, b"/", b"HTTP/1.1")
65
66        # We don't really care what happened to the request.  What we do care
67        # about is what the `fields` attribute is set to.
68        self.assertThat(
69            request.fields,
70            match_fields,
71        )
72
73    def test_no_form_fields(self):
74        """
75        When a ``GET`` request is received, ``TahoeLAFSRequest.fields`` is None.
76        """
77        self._fields_test(b"GET", {}, b"", Equals(None))
78
79    def test_form_fields_if_filename_set(self):
80        """
81        When a ``POST`` request is received, form fields are parsed into
82        ``TahoeLAFSRequest.fields`` and the body is bytes (presuming ``filename``
83        is set).
84        """
85        form_data, boundary = multipart_formdata([
86            [param(u"name", u"foo"),
87             body(u"bar"),
88            ],
89            [param(u"name", u"baz"),
90             param(u"filename", u"quux"),
91             body(u"some file contents"),
92            ],
93        ])
94        self._fields_test(
95            b"POST",
96            {b"content-type": b"multipart/form-data; boundary=" + bytes(boundary, 'ascii')},
97            form_data.encode("ascii"),
98            AfterPreprocessing(
99                lambda fs: {
100                    k: fs.getvalue(k)
101                    for k
102                    in fs.keys()
103                },
104                Equals({
105                    "foo": "bar",
106                    "baz": b"some file contents",
107                }),
108            ),
109        )
110
111    def test_form_fields_if_name_is_file(self):
112        """
113        When a ``POST`` request is received, form fields are parsed into
114        ``TahoeLAFSRequest.fields`` and the body is bytes when ``name``
115        is set to ``"file"``.
116        """
117        form_data, boundary = multipart_formdata([
118            [param(u"name", u"foo"),
119             body(u"bar"),
120            ],
121            [param(u"name", u"file"),
122             body(u"some file contents"),
123            ],
124        ])
125        self._fields_test(
126            b"POST",
127            {b"content-type": b"multipart/form-data; boundary=" + bytes(boundary, 'ascii')},
128            form_data.encode("ascii"),
129            AfterPreprocessing(
130                lambda fs: {
131                    k: fs.getvalue(k)
132                    for k
133                    in fs.keys()
134                },
135                Equals({
136                    "foo": "bar",
137                    "file": b"some file contents",
138                }),
139            ),
140        )
141
142    def test_form_fields_require_correct_mime_type(self):
143        """
144        The body of a ``POST`` is not parsed into fields if its mime type is
145        not ``multipart/form-data``.
146
147        Reproducer for https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3854
148        """
149        data = u'{"lalala": "lolo"}'
150        data = data.encode("utf-8")
151        self._fields_test(b"POST", {"content-type": "application/json"},
152                          data, Equals(None))
153
154
155class TahoeLAFSSiteTests(SyncTestCase):
156    """
157    Tests for ``TahoeLAFSSite``.
158    """
159    def _test_censoring(self, path, censored):
160        """
161        Verify that the event logged for a request for ``path`` does not include
162        ``path`` but instead includes ``censored``.
163
164        :param bytes path: A request path.
165
166        :param bytes censored: A replacement value for the request path in the
167            access log.
168
169        :return: ``None`` if the logging looks good.
170        """
171        logPath = self.mktemp()
172        tempdir = self.mktemp()
173        FilePath(tempdir).makedirs()
174
175        site = TahoeLAFSSite(
176            anonymous_tempfile_factory(tempdir),
177            Resource(),
178            logPath=logPath,
179        )
180        site.startFactory()
181
182        channel = DummyChannel()
183        channel.factory = site
184        request = TahoeLAFSRequest(channel)
185
186        request.gotLength(None)
187        request.requestReceived(b"GET", path, b"HTTP/1.1")
188
189        self.assertThat(
190            FilePath(logPath).getContent(),
191            MatchesAll(
192                Contains(censored),
193                Not(Contains(path)),
194            ),
195        )
196
197    def test_private_key_censoring(self):
198        """
199        The log event for a request including a **private-key** query
200        argument has the private key value censored.
201        """
202        self._test_censoring(
203            b"/uri?uri=URI:CHK:aaa:bbb&private-key=AAAAaaaabbbb==",
204            b"/uri?uri=[CENSORED]&private-key=[CENSORED]",
205        )
206
207    def test_uri_censoring(self):
208        """
209        The log event for a request for **/uri/<CAP>** has the capability value
210        censored.
211        """
212        self._test_censoring(
213            b"/uri/URI:CHK:aaa:bbb",
214            b"/uri/[CENSORED]",
215        )
216
217    def test_file_censoring(self):
218        """
219        The log event for a request for **/file/<CAP>** has the capability value
220        censored.
221        """
222        self._test_censoring(
223            b"/file/URI:CHK:aaa:bbb",
224            b"/file/[CENSORED]",
225        )
226
227    def test_named_censoring(self):
228        """
229        The log event for a request for **/named/<CAP>** has the capability value
230        censored.
231        """
232        self._test_censoring(
233            b"/named/URI:CHK:aaa:bbb",
234            b"/named/[CENSORED]",
235        )
236
237    def test_uri_queryarg_censoring(self):
238        """
239        The log event for a request for **/uri?cap=<CAP>** has the capability
240        value censored.
241        """
242        self._test_censoring(
243            b"/uri?uri=URI:CHK:aaa:bbb",
244            b"/uri?uri=[CENSORED]",
245        )
246
247    def _create_request(self, tempdir):
248        """
249        Create and return a new ``TahoeLAFSRequest`` hooked up to a
250        ``TahoeLAFSSite``.
251
252        :param FilePath tempdir: The temporary directory to configure the site
253            to write large temporary request bodies to.  The temporary files
254            will be named for ease of testing.
255
256        :return TahoeLAFSRequest: The new request instance.
257        """
258        site = TahoeLAFSSite(
259            lambda: tempfile.NamedTemporaryFile(dir=tempdir.path),
260            Resource(),
261            logPath=self.mktemp(),
262        )
263        site.startFactory()
264
265        channel = DummyChannel()
266        channel.site = site
267        request = TahoeLAFSRequest(channel)
268        return request
269
270    @given(integers(min_value=0, max_value=1024 * 1024 - 1))
271    def test_small_content(self, request_body_size):
272        """
273        A request body smaller than 1 MiB is kept in memory.
274        """
275        tempdir = FilePath(self.mktemp())
276        tempdir.makedirs()
277        request = self._create_request(tempdir)
278        request.gotLength(request_body_size)
279        self.assertThat(
280            request.content,
281            IsInstance(BytesIO),
282        )
283
284    def _large_request_test(self, request_body_size):
285        """
286        Assert that when a request with a body of the given size is
287        received its content is written a temporary file created by the given
288        tempfile factory.
289        """
290        tempdir = FilePath(self.mktemp())
291        tempdir.makedirs()
292        request = self._create_request(tempdir)
293        request.gotLength(request_body_size)
294        # We can see the temporary file in the temporary directory we
295        # specified because _create_request makes a request that uses named
296        # temporary files instead of the usual anonymous temporary files.
297        self.assertThat(
298            tempdir.children(),
299            HasLength(1),
300        )
301
302    def test_unknown_request_size(self):
303        """
304        A request body with an unknown size is written to a file in the temporary
305        directory passed to ``TahoeLAFSSite``.
306        """
307        self._large_request_test(None)
308
309    @given(integers(min_value=1024 * 1024))
310    def test_large_request(self, request_body_size):
311        """
312        A request body of 1 MiB or more is written to a file in the temporary
313        directory passed to ``TahoeLAFSSite``.
314        """
315        self._large_request_test(request_body_size)
316
317
318def param(name, value):
319    return u"; {}={}".format(name, value)
320
321
322def body(value):
323    return u"\r\n\r\n{}".format(value)
324
325
326def _field(field):
327    yield u"Content-Disposition: form-data"
328    for param in field:
329        yield param
330
331
332def _multipart_formdata(fields):
333    for field in fields:
334        yield u"".join(_field(field)) + u"\r\n"
335
336
337def multipart_formdata(fields):
338    """
339    Serialize some simple fields into a multipart/form-data string.
340
341    :param fields: A list of lists of unicode strings to assemble into the
342        result.  See ``param`` and ``body``.
343
344    :return unicode: The given fields combined into a multipart/form-data
345        string.
346    """
347    boundary = str(uuid4())
348    parts = list(_multipart_formdata(fields))
349    parts.insert(0, u"")
350    return (
351        (u"--" + boundary + u"\r\n").join(parts),
352        boundary,
353    )
Note: See TracBrowser for help on using the repository browser.